Merge pull request #10309 from garyrussell:kafkaAdmin
* pr/10309: Polish "Add KafkaAdmin Auto Configuration" Add KafkaAdmin Auto Configuration
This commit is contained in:
		
						commit
						1f73e6c2fc
					
				| 
						 | 
				
			
			@ -30,6 +30,7 @@ import org.springframework.context.annotation.Import;
 | 
			
		|||
import org.springframework.kafka.core.ConsumerFactory;
 | 
			
		||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
 | 
			
		||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
 | 
			
		||||
import org.springframework.kafka.core.KafkaAdmin;
 | 
			
		||||
import org.springframework.kafka.core.KafkaTemplate;
 | 
			
		||||
import org.springframework.kafka.core.ProducerFactory;
 | 
			
		||||
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
 | 
			
		||||
| 
						 | 
				
			
			@ -103,4 +104,12 @@ public class KafkaAutoConfiguration {
 | 
			
		|||
		return jaas;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	@Bean
 | 
			
		||||
	@ConditionalOnMissingBean(KafkaAdmin.class)
 | 
			
		||||
	public KafkaAdmin kafkaAdmin() {
 | 
			
		||||
		KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
 | 
			
		||||
		kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
 | 
			
		||||
		return kafkaAdmin;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -72,6 +72,8 @@ public class KafkaProperties {
 | 
			
		|||
 | 
			
		||||
	private final Producer producer = new Producer();
 | 
			
		||||
 | 
			
		||||
	private final Admin admin = new Admin();
 | 
			
		||||
 | 
			
		||||
	private final Listener listener = new Listener();
 | 
			
		||||
 | 
			
		||||
	private final Ssl ssl = new Ssl();
 | 
			
		||||
| 
						 | 
				
			
			@ -112,6 +114,10 @@ public class KafkaProperties {
 | 
			
		|||
		return this.listener;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	public Admin getAdmin() {
 | 
			
		||||
		return this.admin;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	public Ssl getSsl() {
 | 
			
		||||
		return this.ssl;
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -186,6 +192,20 @@ public class KafkaProperties {
 | 
			
		|||
		return properties;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/**
 | 
			
		||||
	 * Create an initial map of admin properties from the state of this instance.
 | 
			
		||||
	 * <p>
 | 
			
		||||
	 * This allows you to add additional properties, if necessary, and override the
 | 
			
		||||
	 * default kafkaAdmin bean.
 | 
			
		||||
	 * @return the admin properties initialized with the customizations defined on this
 | 
			
		||||
	 * instance
 | 
			
		||||
	 */
 | 
			
		||||
	public Map<String, Object> buildAdminProperties() {
 | 
			
		||||
		Map<String, Object> properties = buildCommonProperties();
 | 
			
		||||
		properties.putAll(this.admin.buildProperties());
 | 
			
		||||
		return properties;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	private static String resourceToPath(Resource resource) {
 | 
			
		||||
		try {
 | 
			
		||||
			return resource.getFile().getAbsolutePath();
 | 
			
		||||
| 
						 | 
				
			
			@ -643,6 +663,80 @@ public class KafkaProperties {
 | 
			
		|||
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	public static class Admin {
 | 
			
		||||
 | 
			
		||||
		private final Ssl ssl = new Ssl();
 | 
			
		||||
 | 
			
		||||
		/**
 | 
			
		||||
		 * Id to pass to the server when making requests; used for server-side logging.
 | 
			
		||||
		 */
 | 
			
		||||
		private String clientId;
 | 
			
		||||
 | 
			
		||||
		/**
 | 
			
		||||
		 * Additional admin-specific properties used to configure the client.
 | 
			
		||||
		 */
 | 
			
		||||
		private final Map<String, String> properties = new HashMap<>();
 | 
			
		||||
 | 
			
		||||
		/**
 | 
			
		||||
		 * Fail fast if the broker is not available on startup.
 | 
			
		||||
		 */
 | 
			
		||||
		private boolean failFast;
 | 
			
		||||
 | 
			
		||||
		public Ssl getSsl() {
 | 
			
		||||
			return this.ssl;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		public String getClientId() {
 | 
			
		||||
			return this.clientId;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		public void setClientId(String clientId) {
 | 
			
		||||
			this.clientId = clientId;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		public boolean isFailFast() {
 | 
			
		||||
			return this.failFast;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		public void setFailFast(boolean failFast) {
 | 
			
		||||
			this.failFast = failFast;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		public Map<String, String> getProperties() {
 | 
			
		||||
			return this.properties;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		public Map<String, Object> buildProperties() {
 | 
			
		||||
			Map<String, Object> properties = new HashMap<>();
 | 
			
		||||
			if (this.clientId != null) {
 | 
			
		||||
				properties.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientId);
 | 
			
		||||
			}
 | 
			
		||||
			if (this.ssl.getKeyPassword() != null) {
 | 
			
		||||
				properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG,
 | 
			
		||||
						this.ssl.getKeyPassword());
 | 
			
		||||
			}
 | 
			
		||||
			if (this.ssl.getKeystoreLocation() != null) {
 | 
			
		||||
				properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
 | 
			
		||||
						resourceToPath(this.ssl.getKeystoreLocation()));
 | 
			
		||||
			}
 | 
			
		||||
			if (this.ssl.getKeystorePassword() != null) {
 | 
			
		||||
				properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
 | 
			
		||||
						this.ssl.getKeystorePassword());
 | 
			
		||||
			}
 | 
			
		||||
			if (this.ssl.getTruststoreLocation() != null) {
 | 
			
		||||
				properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
 | 
			
		||||
						resourceToPath(this.ssl.getTruststoreLocation()));
 | 
			
		||||
			}
 | 
			
		||||
			if (this.ssl.getTruststorePassword() != null) {
 | 
			
		||||
				properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
 | 
			
		||||
						this.ssl.getTruststorePassword());
 | 
			
		||||
			}
 | 
			
		||||
			properties.putAll(this.properties);
 | 
			
		||||
			return properties;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	public static class Template {
 | 
			
		||||
 | 
			
		||||
		/**
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -19,6 +19,8 @@ package org.springframework.boot.autoconfigure.kafka;
 | 
			
		|||
import java.util.concurrent.CountDownLatch;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
 | 
			
		||||
import org.apache.kafka.clients.admin.NewTopic;
 | 
			
		||||
import org.apache.kafka.clients.producer.Producer;
 | 
			
		||||
import org.junit.After;
 | 
			
		||||
import org.junit.ClassRule;
 | 
			
		||||
import org.junit.Test;
 | 
			
		||||
| 
						 | 
				
			
			@ -27,6 +29,7 @@ import org.springframework.boot.test.util.TestPropertyValues;
 | 
			
		|||
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
 | 
			
		||||
import org.springframework.context.annotation.Bean;
 | 
			
		||||
import org.springframework.kafka.annotation.KafkaListener;
 | 
			
		||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
 | 
			
		||||
import org.springframework.kafka.core.KafkaTemplate;
 | 
			
		||||
import org.springframework.kafka.support.KafkaHeaders;
 | 
			
		||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
 | 
			
		||||
| 
						 | 
				
			
			@ -43,6 +46,8 @@ public class KafkaAutoConfigurationIntegrationTests {
 | 
			
		|||
 | 
			
		||||
	private static final String TEST_TOPIC = "testTopic";
 | 
			
		||||
 | 
			
		||||
	private static final String ADMIN_CREATED_TOPIC = "adminCreatedTopic";
 | 
			
		||||
 | 
			
		||||
	@ClassRule
 | 
			
		||||
	public static final KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true,
 | 
			
		||||
			TEST_TOPIC);
 | 
			
		||||
| 
						 | 
				
			
			@ -56,13 +61,13 @@ public class KafkaAutoConfigurationIntegrationTests {
 | 
			
		|||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	@SuppressWarnings({ "unchecked", "rawtypes" })
 | 
			
		||||
	@Test
 | 
			
		||||
	public void testEndToEnd() throws Exception {
 | 
			
		||||
		load(KafkaConfig.class,
 | 
			
		||||
				"spring.kafka.bootstrap-servers:" + kafkaEmbedded.getBrokersAsString(),
 | 
			
		||||
				"spring.kafka.consumer.group-id=testGroup",
 | 
			
		||||
				"spring.kafka.consumer.auto-offset-reset=earliest");
 | 
			
		||||
		@SuppressWarnings("unchecked")
 | 
			
		||||
		KafkaTemplate<String, String> template = this.context
 | 
			
		||||
				.getBean(KafkaTemplate.class);
 | 
			
		||||
		template.send(TEST_TOPIC, "foo", "bar");
 | 
			
		||||
| 
						 | 
				
			
			@ -70,6 +75,11 @@ public class KafkaAutoConfigurationIntegrationTests {
 | 
			
		|||
		assertThat(listener.latch.await(30, TimeUnit.SECONDS)).isTrue();
 | 
			
		||||
		assertThat(listener.key).isEqualTo("foo");
 | 
			
		||||
		assertThat(listener.received).isEqualTo("bar");
 | 
			
		||||
 | 
			
		||||
		DefaultKafkaProducerFactory producerFactory = this.context.getBean(DefaultKafkaProducerFactory.class);
 | 
			
		||||
		Producer producer = producerFactory.createProducer();
 | 
			
		||||
		assertThat(producer.partitionsFor(ADMIN_CREATED_TOPIC).size()).isEqualTo(10);
 | 
			
		||||
		producer.close();
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	private void load(Class<?> config, String... environment) {
 | 
			
		||||
| 
						 | 
				
			
			@ -93,6 +103,11 @@ public class KafkaAutoConfigurationIntegrationTests {
 | 
			
		|||
			return new Listener();
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		@Bean
 | 
			
		||||
		public NewTopic adminCreated() {
 | 
			
		||||
			return new NewTopic(ADMIN_CREATED_TOPIC, 10, (short) 1);
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	public static class Listener {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -22,6 +22,7 @@ import java.util.Map;
 | 
			
		|||
 | 
			
		||||
import javax.security.auth.login.AppConfigurationEntry;
 | 
			
		||||
 | 
			
		||||
import org.apache.kafka.clients.admin.AdminClientConfig;
 | 
			
		||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
 | 
			
		||||
import org.apache.kafka.clients.producer.ProducerConfig;
 | 
			
		||||
import org.apache.kafka.common.config.SslConfigs;
 | 
			
		||||
| 
						 | 
				
			
			@ -38,9 +39,11 @@ import org.springframework.context.annotation.AnnotationConfigApplicationContext
 | 
			
		|||
import org.springframework.kafka.config.KafkaListenerContainerFactory;
 | 
			
		||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
 | 
			
		||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
 | 
			
		||||
import org.springframework.kafka.core.KafkaAdmin;
 | 
			
		||||
import org.springframework.kafka.core.KafkaTemplate;
 | 
			
		||||
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
 | 
			
		||||
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
 | 
			
		||||
import org.springframework.kafka.test.utils.KafkaTestUtils;
 | 
			
		||||
 | 
			
		||||
import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
import static org.assertj.core.api.Assertions.entry;
 | 
			
		||||
| 
						 | 
				
			
			@ -170,6 +173,38 @@ public class KafkaAutoConfigurationTests {
 | 
			
		|||
		assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	@Test
 | 
			
		||||
	public void adminProperties() {
 | 
			
		||||
		load("spring.kafka.clientId=cid",
 | 
			
		||||
				"spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
 | 
			
		||||
				"spring.kafka.admin.fail-fast=true",
 | 
			
		||||
				"spring.kafka.admin.properties.fiz.buz=fix.fox",
 | 
			
		||||
				"spring.kafka.admin.ssl.key-password=p4",
 | 
			
		||||
				"spring.kafka.admin.ssl.keystore-location=classpath:ksLocP",
 | 
			
		||||
				"spring.kafka.admin.ssl.keystore-password=p5",
 | 
			
		||||
				"spring.kafka.admin.ssl.truststore-location=classpath:tsLocP",
 | 
			
		||||
				"spring.kafka.admin.ssl.truststore-password=p6");
 | 
			
		||||
		KafkaAdmin admin = this.context
 | 
			
		||||
				.getBean(KafkaAdmin.class);
 | 
			
		||||
		Map<String, Object> configs = admin.getConfig();
 | 
			
		||||
		// common
 | 
			
		||||
		assertThat(configs.get(AdminClientConfig.CLIENT_ID_CONFIG)).isEqualTo("cid");
 | 
			
		||||
		// admin
 | 
			
		||||
		assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4");
 | 
			
		||||
		assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
 | 
			
		||||
				.endsWith(File.separator + "ksLocP");
 | 
			
		||||
		assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p5");
 | 
			
		||||
		assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
 | 
			
		||||
				.endsWith(File.separator + "tsLocP");
 | 
			
		||||
		assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
 | 
			
		||||
				.isEqualTo("p6");
 | 
			
		||||
		assertThat(this.context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
 | 
			
		||||
				.isEmpty();
 | 
			
		||||
		assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
 | 
			
		||||
		assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
 | 
			
		||||
		assertThat(KafkaTestUtils.getPropertyValue(admin, "fatalIfBrokerNotAvailable", Boolean.class)).isTrue();
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	@SuppressWarnings("unchecked")
 | 
			
		||||
	@Test
 | 
			
		||||
	public void listenerProperties() {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -939,6 +939,14 @@ content into your application; rather pick only the properties that you need.
 | 
			
		|||
	spring.jms.template.time-to-live= # Time-to-live of a message when sending in milliseconds. Enable QoS when set.
 | 
			
		||||
 | 
			
		||||
	# APACHE KAFKA ({sc-spring-boot-autoconfigure}/kafka/KafkaProperties.{sc-ext}[KafkaProperties])
 | 
			
		||||
	spring.kafka.admin.client-id= # Id to pass to the server when making requests; used for server-side logging.
 | 
			
		||||
	spring.kafka.admin.fail-fast=false # Fail fast if the broker is not available on startup.
 | 
			
		||||
	spring.kafka.admin.properties.*= # Additional admin-specific properties used to configure the client.
 | 
			
		||||
	spring.kafka.admin.ssl.key-password= # Password of the private key in the key store file.
 | 
			
		||||
	spring.kafka.admin.ssl.keystore-location= # Location of the key store file.
 | 
			
		||||
	spring.kafka.admin.ssl.keystore-password= # Store password for the key store file.
 | 
			
		||||
	spring.kafka.admin.ssl.truststore-location= # Location of the trust store file.
 | 
			
		||||
	spring.kafka.admin.ssl.truststore-password= # Store password for the trust store file.
 | 
			
		||||
	spring.kafka.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.
 | 
			
		||||
	spring.kafka.client-id= # Id to pass to the server when making requests; used for server-side logging.
 | 
			
		||||
	spring.kafka.consumer.auto-commit-interval= # Frequency in milliseconds that the consumer offsets are auto-committed to Kafka if 'enable.auto.commit' true.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -4816,6 +4816,9 @@ Kafka configuration is controlled by external configuration properties in
 | 
			
		|||
	spring.kafka.consumer.group-id=myGroup
 | 
			
		||||
----
 | 
			
		||||
 | 
			
		||||
TIP: To create a topic on startup, simply add a bean of type `NewTopic`. If the topic
 | 
			
		||||
already exists, the related bean is ignored.
 | 
			
		||||
 | 
			
		||||
See {sc-spring-boot-autoconfigure}/kafka/KafkaProperties.{sc-ext}[`KafkaProperties`]
 | 
			
		||||
for more of the supported options.
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue