Merge pull request #9151 from garyrussell:kafkaKerberos
* pr/9151: Polish "Add Kafka Kerberos Configuration Properties" Add Kafka Kerberos Configuration Properties
This commit is contained in:
		
						commit
						d20b00a0c4
					
				| 
						 | 
					@ -16,9 +16,13 @@
 | 
				
			||||||
 | 
					
 | 
				
			||||||
package org.springframework.boot.autoconfigure.kafka;
 | 
					package org.springframework.boot.autoconfigure.kafka;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import java.io.IOException;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 | 
					import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 | 
				
			||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
 | 
					import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
 | 
				
			||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 | 
					import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 | 
				
			||||||
 | 
					import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 | 
				
			||||||
 | 
					import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas;
 | 
				
			||||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
 | 
					import org.springframework.boot.context.properties.EnableConfigurationProperties;
 | 
				
			||||||
import org.springframework.context.annotation.Bean;
 | 
					import org.springframework.context.annotation.Bean;
 | 
				
			||||||
import org.springframework.context.annotation.Configuration;
 | 
					import org.springframework.context.annotation.Configuration;
 | 
				
			||||||
| 
						 | 
					@ -28,6 +32,7 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
 | 
				
			||||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
 | 
					import org.springframework.kafka.core.DefaultKafkaProducerFactory;
 | 
				
			||||||
import org.springframework.kafka.core.KafkaTemplate;
 | 
					import org.springframework.kafka.core.KafkaTemplate;
 | 
				
			||||||
import org.springframework.kafka.core.ProducerFactory;
 | 
					import org.springframework.kafka.core.ProducerFactory;
 | 
				
			||||||
 | 
					import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
 | 
				
			||||||
import org.springframework.kafka.support.LoggingProducerListener;
 | 
					import org.springframework.kafka.support.LoggingProducerListener;
 | 
				
			||||||
import org.springframework.kafka.support.ProducerListener;
 | 
					import org.springframework.kafka.support.ProducerListener;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -35,6 +40,7 @@ import org.springframework.kafka.support.ProducerListener;
 | 
				
			||||||
 * {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka.
 | 
					 * {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka.
 | 
				
			||||||
 *
 | 
					 *
 | 
				
			||||||
 * @author Gary Russell
 | 
					 * @author Gary Russell
 | 
				
			||||||
 | 
					 * @author Stephane Nicoll
 | 
				
			||||||
 * @since 1.5.0
 | 
					 * @since 1.5.0
 | 
				
			||||||
 */
 | 
					 */
 | 
				
			||||||
@Configuration
 | 
					@Configuration
 | 
				
			||||||
| 
						 | 
					@ -81,4 +87,20 @@ public class KafkaAutoConfiguration {
 | 
				
			||||||
				this.properties.buildProducerProperties());
 | 
									this.properties.buildProducerProperties());
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						@Bean
 | 
				
			||||||
 | 
						@ConditionalOnProperty(name = "spring.kafka.jaas.enabled")
 | 
				
			||||||
 | 
						@ConditionalOnMissingBean
 | 
				
			||||||
 | 
						public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {
 | 
				
			||||||
 | 
							KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();
 | 
				
			||||||
 | 
							Jaas jaasProperties = this.properties.getJaas();
 | 
				
			||||||
 | 
							if (jaasProperties.getControlFlag() != null) {
 | 
				
			||||||
 | 
								jaas.setControlFlag(jaasProperties.getControlFlag());
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if (jaasProperties.getLoginModule() != null) {
 | 
				
			||||||
 | 
								jaas.setLoginModule(jaasProperties.getLoginModule());
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							jaas.setOptions(jaasProperties.getOptions());
 | 
				
			||||||
 | 
							return jaas;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 | 
				
			||||||
import org.springframework.boot.context.properties.ConfigurationProperties;
 | 
					import org.springframework.boot.context.properties.ConfigurationProperties;
 | 
				
			||||||
import org.springframework.core.io.Resource;
 | 
					import org.springframework.core.io.Resource;
 | 
				
			||||||
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
 | 
					import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
 | 
				
			||||||
 | 
					import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
 | 
				
			||||||
import org.springframework.util.CollectionUtils;
 | 
					import org.springframework.util.CollectionUtils;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/**
 | 
					/**
 | 
				
			||||||
| 
						 | 
					@ -74,6 +75,8 @@ public class KafkaProperties {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	private final Ssl ssl = new Ssl();
 | 
						private final Ssl ssl = new Ssl();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						private final Jaas jaas = new Jaas();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	private final Template template = new Template();
 | 
						private final Template template = new Template();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	public List<String> getBootstrapServers() {
 | 
						public List<String> getBootstrapServers() {
 | 
				
			||||||
| 
						 | 
					@ -116,6 +119,10 @@ public class KafkaProperties {
 | 
				
			||||||
		return this.ssl;
 | 
							return this.ssl;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						public Jaas getJaas() {
 | 
				
			||||||
 | 
							return this.jaas;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	public Template getTemplate() {
 | 
						public Template getTemplate() {
 | 
				
			||||||
		return this.template;
 | 
							return this.template;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
| 
						 | 
					@ -776,4 +783,63 @@ public class KafkaProperties {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						public static class Jaas {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							/**
 | 
				
			||||||
 | 
							 * Enable JAAS configuration.
 | 
				
			||||||
 | 
							 */
 | 
				
			||||||
 | 
							private boolean enabled;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							/**
 | 
				
			||||||
 | 
							 * Login module.
 | 
				
			||||||
 | 
							 */
 | 
				
			||||||
 | 
							private String loginModule = "com.sun.security.auth.module.Krb5LoginModule";
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							/**
 | 
				
			||||||
 | 
							 * Control flag for login configuration.
 | 
				
			||||||
 | 
							 */
 | 
				
			||||||
 | 
							private KafkaJaasLoginModuleInitializer.ControlFlag controlFlag =
 | 
				
			||||||
 | 
									KafkaJaasLoginModuleInitializer.ControlFlag.REQUIRED;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							/**
 | 
				
			||||||
 | 
							 * Additional JAAS options.
 | 
				
			||||||
 | 
							 */
 | 
				
			||||||
 | 
							private final Map<String, String> options = new HashMap<>();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							public boolean isEnabled() {
 | 
				
			||||||
 | 
								return this.enabled;
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							public void setEnabled(boolean enabled) {
 | 
				
			||||||
 | 
								this.enabled = enabled;
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							public String getLoginModule() {
 | 
				
			||||||
 | 
								return this.loginModule;
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							public void setLoginModule(String loginModule) {
 | 
				
			||||||
 | 
								this.loginModule = loginModule;
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							public KafkaJaasLoginModuleInitializer.ControlFlag getControlFlag() {
 | 
				
			||||||
 | 
								return this.controlFlag;
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							public void setControlFlag(KafkaJaasLoginModuleInitializer.ControlFlag controlFlag) {
 | 
				
			||||||
 | 
								this.controlFlag = controlFlag;
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							public Map<String, String> getOptions() {
 | 
				
			||||||
 | 
								return this.options;
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							public void setOptions(Map<String, String> options) {
 | 
				
			||||||
 | 
								if (options != null) {
 | 
				
			||||||
 | 
									this.options.putAll(options);
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -325,6 +325,10 @@
 | 
				
			||||||
    "description": "Log a warning for transactions executed without a single enlisted resource.",
 | 
					    "description": "Log a warning for transactions executed without a single enlisted resource.",
 | 
				
			||||||
    "defaultValue": true
 | 
					    "defaultValue": true
 | 
				
			||||||
  },
 | 
					  },
 | 
				
			||||||
 | 
					  {
 | 
				
			||||||
 | 
					    "name": "spring.kafka.jaas.control-flag",
 | 
				
			||||||
 | 
					    "defaultValue": "required"
 | 
				
			||||||
 | 
					  },
 | 
				
			||||||
  {
 | 
					  {
 | 
				
			||||||
    "name": "spring.mobile.devicedelegatingviewresolver.enabled",
 | 
					    "name": "spring.mobile.devicedelegatingviewresolver.enabled",
 | 
				
			||||||
    "type": "java.lang.Boolean",
 | 
					    "type": "java.lang.Boolean",
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,5 +1,5 @@
 | 
				
			||||||
/*
 | 
					/*
 | 
				
			||||||
 * Copyright 2012-2016 the original author or authors.
 | 
					 * Copyright 2012-2017 the original author or authors.
 | 
				
			||||||
 *
 | 
					 *
 | 
				
			||||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
					 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 * you may not use this file except in compliance with the License.
 | 
					 * you may not use this file except in compliance with the License.
 | 
				
			||||||
| 
						 | 
					@ -20,6 +20,8 @@ import java.io.File;
 | 
				
			||||||
import java.util.Collections;
 | 
					import java.util.Collections;
 | 
				
			||||||
import java.util.Map;
 | 
					import java.util.Map;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import javax.security.auth.login.AppConfigurationEntry;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
 | 
					import org.apache.kafka.clients.consumer.ConsumerConfig;
 | 
				
			||||||
import org.apache.kafka.clients.producer.ProducerConfig;
 | 
					import org.apache.kafka.clients.producer.ProducerConfig;
 | 
				
			||||||
import org.apache.kafka.common.config.SslConfigs;
 | 
					import org.apache.kafka.common.config.SslConfigs;
 | 
				
			||||||
| 
						 | 
					@ -38,8 +40,10 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
 | 
				
			||||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
 | 
					import org.springframework.kafka.core.DefaultKafkaProducerFactory;
 | 
				
			||||||
import org.springframework.kafka.core.KafkaTemplate;
 | 
					import org.springframework.kafka.core.KafkaTemplate;
 | 
				
			||||||
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
 | 
					import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
 | 
				
			||||||
 | 
					import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import static org.assertj.core.api.Assertions.assertThat;
 | 
					import static org.assertj.core.api.Assertions.assertThat;
 | 
				
			||||||
 | 
					import static org.assertj.core.api.Assertions.entry;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/**
 | 
					/**
 | 
				
			||||||
 * Tests for {@link KafkaAutoConfiguration}.
 | 
					 * Tests for {@link KafkaAutoConfiguration}.
 | 
				
			||||||
| 
						 | 
					@ -160,6 +164,8 @@ public class KafkaAutoConfigurationTests {
 | 
				
			||||||
		assertThat(configs.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2);
 | 
							assertThat(configs.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2);
 | 
				
			||||||
		assertThat(configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
 | 
							assertThat(configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
 | 
				
			||||||
				.isEqualTo(IntegerSerializer.class);
 | 
									.isEqualTo(IntegerSerializer.class);
 | 
				
			||||||
 | 
							assertThat(this.context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
 | 
				
			||||||
 | 
									.isEmpty();
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	@Test
 | 
						@Test
 | 
				
			||||||
| 
						 | 
					@ -169,7 +175,11 @@ public class KafkaAutoConfigurationTests {
 | 
				
			||||||
				"spring.kafka.listener.ack-count=123",
 | 
									"spring.kafka.listener.ack-count=123",
 | 
				
			||||||
				"spring.kafka.listener.ack-time=456",
 | 
									"spring.kafka.listener.ack-time=456",
 | 
				
			||||||
				"spring.kafka.listener.concurrency=3",
 | 
									"spring.kafka.listener.concurrency=3",
 | 
				
			||||||
				"spring.kafka.listener.poll-timeout=2000");
 | 
									"spring.kafka.listener.poll-timeout=2000",
 | 
				
			||||||
 | 
									"spring.kafka.jaas.enabled=true",
 | 
				
			||||||
 | 
									"spring.kafka.jaas.login-module=foo",
 | 
				
			||||||
 | 
									"spring.kafka.jaas.control-flag=REQUISITE",
 | 
				
			||||||
 | 
									"spring.kafka.jaas.options.useKeyTab=true");
 | 
				
			||||||
		DefaultKafkaProducerFactory<?, ?> producerFactory = this.context
 | 
							DefaultKafkaProducerFactory<?, ?> producerFactory = this.context
 | 
				
			||||||
				.getBean(DefaultKafkaProducerFactory.class);
 | 
									.getBean(DefaultKafkaProducerFactory.class);
 | 
				
			||||||
		DefaultKafkaConsumerFactory<?, ?> consumerFactory = this.context
 | 
							DefaultKafkaConsumerFactory<?, ?> consumerFactory = this.context
 | 
				
			||||||
| 
						 | 
					@ -189,6 +199,16 @@ public class KafkaAutoConfigurationTests {
 | 
				
			||||||
		assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3);
 | 
							assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3);
 | 
				
			||||||
		assertThat(dfa.getPropertyValue("containerProperties.pollTimeout"))
 | 
							assertThat(dfa.getPropertyValue("containerProperties.pollTimeout"))
 | 
				
			||||||
				.isEqualTo(2000L);
 | 
									.isEqualTo(2000L);
 | 
				
			||||||
 | 
							assertThat(this.context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
 | 
				
			||||||
 | 
									.hasSize(1);
 | 
				
			||||||
 | 
							KafkaJaasLoginModuleInitializer jaas = this.context.getBean(
 | 
				
			||||||
 | 
									KafkaJaasLoginModuleInitializer.class);
 | 
				
			||||||
 | 
							dfa = new DirectFieldAccessor(jaas);
 | 
				
			||||||
 | 
							assertThat(dfa.getPropertyValue("loginModule")).isEqualTo("foo");
 | 
				
			||||||
 | 
							assertThat(dfa.getPropertyValue("controlFlag"))
 | 
				
			||||||
 | 
									.isEqualTo(AppConfigurationEntry.LoginModuleControlFlag.REQUISITE);
 | 
				
			||||||
 | 
							assertThat(((Map<String, String>) dfa.getPropertyValue("options")))
 | 
				
			||||||
 | 
									.containsExactly(entry("useKeyTab", "true"));
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	private void load(String... environment) {
 | 
						private void load(String... environment) {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -942,6 +942,10 @@ content into your application; rather pick only the properties that you need.
 | 
				
			||||||
	spring.kafka.consumer.key-deserializer= # Deserializer class for keys.
 | 
						spring.kafka.consumer.key-deserializer= # Deserializer class for keys.
 | 
				
			||||||
	spring.kafka.consumer.max-poll-records= # Maximum number of records returned in a single call to poll().
 | 
						spring.kafka.consumer.max-poll-records= # Maximum number of records returned in a single call to poll().
 | 
				
			||||||
	spring.kafka.consumer.value-deserializer= # Deserializer class for values.
 | 
						spring.kafka.consumer.value-deserializer= # Deserializer class for values.
 | 
				
			||||||
 | 
						spring.kafka.jaas.control-flag=required # Control flag for login configuration.
 | 
				
			||||||
 | 
						spring.kafka.jaas.enabled= # Enable JAAS configuration.
 | 
				
			||||||
 | 
						spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule # Login module.
 | 
				
			||||||
 | 
						spring.kafka.jaas.options= # Additional JAAS options.
 | 
				
			||||||
	spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".
 | 
						spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".
 | 
				
			||||||
	spring.kafka.listener.ack-mode= # Listener AckMode; see the spring-kafka documentation.
 | 
						spring.kafka.listener.ack-mode= # Listener AckMode; see the spring-kafka documentation.
 | 
				
			||||||
	spring.kafka.listener.ack-time= # Time in milliseconds between offset commits when ackMode is "TIME" or "COUNT_TIME".
 | 
						spring.kafka.listener.ack-time= # Time in milliseconds between offset commits when ackMode is "TIME" or "COUNT_TIME".
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue