diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index 88f21da1ef7..99c557f4ece 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -30,6 +30,7 @@ import org.springframework.context.annotation.Import; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; @@ -103,4 +104,12 @@ public class KafkaAutoConfiguration { return jaas; } + @Bean + @ConditionalOnMissingBean(KafkaAdmin.class) + public KafkaAdmin kafkaAdmin() { + KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties()); + kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast()); + return kafkaAdmin; + } + } diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 09d3b38bbba..fd68596ead4 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -72,6 +72,8 @@ public class KafkaProperties { private final Producer producer = new Producer(); + private final Admin admin = new Admin(); + private final Listener listener = new Listener(); private final Ssl ssl = new Ssl(); @@ -112,6 +114,10 @@ public class KafkaProperties { return this.listener; } + public Admin getAdmin() { + return this.admin; + } + public Ssl getSsl() { return this.ssl; } @@ -186,6 +192,20 @@ public class KafkaProperties { return properties; } + /** + * Create an initial map of admin properties from the state of this instance. + *

+ * This allows you to add additional properties, if necessary, and override the + * default kafkaAdmin bean. + * @return the admin properties initialized with the customizations defined on this + * instance + */ + public Map buildAdminProperties() { + Map properties = buildCommonProperties(); + properties.putAll(this.admin.buildProperties()); + return properties; + } + private static String resourceToPath(Resource resource) { try { return resource.getFile().getAbsolutePath(); @@ -643,6 +663,80 @@ public class KafkaProperties { } + public static class Admin { + + private final Ssl ssl = new Ssl(); + + /** + * Id to pass to the server when making requests; used for server-side logging. + */ + private String clientId; + + /** + * Additional admin-specific properties used to configure the client. + */ + private final Map properties = new HashMap<>(); + + /** + * Fail fast if the broker is not available on startup. + */ + private boolean failFast; + + public Ssl getSsl() { + return this.ssl; + } + + public String getClientId() { + return this.clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public boolean isFailFast() { + return this.failFast; + } + + public void setFailFast(boolean failFast) { + this.failFast = failFast; + } + + public Map getProperties() { + return this.properties; + } + + public Map buildProperties() { + Map properties = new HashMap<>(); + if (this.clientId != null) { + properties.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientId); + } + if (this.ssl.getKeyPassword() != null) { + properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, + this.ssl.getKeyPassword()); + } + if (this.ssl.getKeystoreLocation() != null) { + properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, + resourceToPath(this.ssl.getKeystoreLocation())); + } + if (this.ssl.getKeystorePassword() != null) { + properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, + this.ssl.getKeystorePassword()); + } + if (this.ssl.getTruststoreLocation() != null) { + properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, + resourceToPath(this.ssl.getTruststoreLocation())); + } + if (this.ssl.getTruststorePassword() != null) { + properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, + this.ssl.getTruststorePassword()); + } + properties.putAll(this.properties); + return properties; + } + + } + public static class Template { /** diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java index ed056d3d859..02c417c7acd 100644 --- a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java +++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java @@ -19,6 +19,8 @@ package org.springframework.boot.autoconfigure.kafka; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.Producer; import org.junit.After; import org.junit.ClassRule; import org.junit.Test; @@ -27,6 +29,7 @@ import org.springframework.boot.test.util.TestPropertyValues; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.test.rule.KafkaEmbedded; @@ -43,6 +46,8 @@ public class KafkaAutoConfigurationIntegrationTests { private static final String TEST_TOPIC = "testTopic"; + private static final String ADMIN_CREATED_TOPIC = "adminCreatedTopic"; + @ClassRule public static final KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true, TEST_TOPIC); @@ -56,13 +61,13 @@ public class KafkaAutoConfigurationIntegrationTests { } } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Test public void testEndToEnd() throws Exception { load(KafkaConfig.class, "spring.kafka.bootstrap-servers:" + kafkaEmbedded.getBrokersAsString(), "spring.kafka.consumer.group-id=testGroup", "spring.kafka.consumer.auto-offset-reset=earliest"); - @SuppressWarnings("unchecked") KafkaTemplate template = this.context .getBean(KafkaTemplate.class); template.send(TEST_TOPIC, "foo", "bar"); @@ -70,6 +75,11 @@ public class KafkaAutoConfigurationIntegrationTests { assertThat(listener.latch.await(30, TimeUnit.SECONDS)).isTrue(); assertThat(listener.key).isEqualTo("foo"); assertThat(listener.received).isEqualTo("bar"); + + DefaultKafkaProducerFactory producerFactory = this.context.getBean(DefaultKafkaProducerFactory.class); + Producer producer = producerFactory.createProducer(); + assertThat(producer.partitionsFor(ADMIN_CREATED_TOPIC).size()).isEqualTo(10); + producer.close(); } private void load(Class config, String... environment) { @@ -93,6 +103,11 @@ public class KafkaAutoConfigurationIntegrationTests { return new Listener(); } + @Bean + public NewTopic adminCreated() { + return new NewTopic(ADMIN_CREATED_TOPIC, 10, (short) 1); + } + } public static class Listener { diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 2e1743e547b..b6933fde9d0 100644 --- a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -22,6 +22,7 @@ import java.util.Map; import javax.security.auth.login.AppConfigurationEntry; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SslConfigs; @@ -38,9 +39,11 @@ import org.springframework.context.annotation.AnnotationConfigApplicationContext import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; +import org.springframework.kafka.test.utils.KafkaTestUtils; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.entry; @@ -170,6 +173,38 @@ public class KafkaAutoConfigurationTests { assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox"); } + @Test + public void adminProperties() { + load("spring.kafka.clientId=cid", + "spring.kafka.properties.foo.bar.baz=qux.fiz.buz", + "spring.kafka.admin.fail-fast=true", + "spring.kafka.admin.properties.fiz.buz=fix.fox", + "spring.kafka.admin.ssl.key-password=p4", + "spring.kafka.admin.ssl.keystore-location=classpath:ksLocP", + "spring.kafka.admin.ssl.keystore-password=p5", + "spring.kafka.admin.ssl.truststore-location=classpath:tsLocP", + "spring.kafka.admin.ssl.truststore-password=p6"); + KafkaAdmin admin = this.context + .getBean(KafkaAdmin.class); + Map configs = admin.getConfig(); + // common + assertThat(configs.get(AdminClientConfig.CLIENT_ID_CONFIG)).isEqualTo("cid"); + // admin + assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4"); + assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) + .endsWith(File.separator + "ksLocP"); + assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p5"); + assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)) + .endsWith(File.separator + "tsLocP"); + assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)) + .isEqualTo("p6"); + assertThat(this.context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) + .isEmpty(); + assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); + assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox"); + assertThat(KafkaTestUtils.getPropertyValue(admin, "fatalIfBrokerNotAvailable", Boolean.class)).isTrue(); + } + @SuppressWarnings("unchecked") @Test public void listenerProperties() { diff --git a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc index babf83d470a..443126242e6 100644 --- a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc +++ b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc @@ -939,6 +939,14 @@ content into your application; rather pick only the properties that you need. spring.jms.template.time-to-live= # Time-to-live of a message when sending in milliseconds. Enable QoS when set. # APACHE KAFKA ({sc-spring-boot-autoconfigure}/kafka/KafkaProperties.{sc-ext}[KafkaProperties]) + spring.kafka.admin.client-id= # Id to pass to the server when making requests; used for server-side logging. + spring.kafka.admin.fail-fast=false # Fail fast if the broker is not available on startup. + spring.kafka.admin.properties.*= # Additional admin-specific properties used to configure the client. + spring.kafka.admin.ssl.key-password= # Password of the private key in the key store file. + spring.kafka.admin.ssl.keystore-location= # Location of the key store file. + spring.kafka.admin.ssl.keystore-password= # Store password for the key store file. + spring.kafka.admin.ssl.truststore-location= # Location of the trust store file. + spring.kafka.admin.ssl.truststore-password= # Store password for the trust store file. spring.kafka.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster. spring.kafka.client-id= # Id to pass to the server when making requests; used for server-side logging. spring.kafka.consumer.auto-commit-interval= # Frequency in milliseconds that the consumer offsets are auto-committed to Kafka if 'enable.auto.commit' true. diff --git a/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc b/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc index 9ca8db743d9..77d54c67395 100644 --- a/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc +++ b/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc @@ -4816,6 +4816,9 @@ Kafka configuration is controlled by external configuration properties in spring.kafka.consumer.group-id=myGroup ---- +TIP: To create a topic on startup, simply add a bean of type `NewTopic`. If the topic +already exists, the related bean is ignored. + See {sc-spring-boot-autoconfigure}/kafka/KafkaProperties.{sc-ext}[`KafkaProperties`] for more of the supported options.