();
+ if (this.bootstrapServers != null) {
+ properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
+ }
+ if (this.clientId != null) {
+ properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
+ }
+ if (this.ssl.getKeyPassword() != null) {
+ properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.ssl.getKeyPassword());
+ }
+ if (this.ssl.getKeystoreLocation() != null) {
+ properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
+ resourceToPath(this.ssl.getKeystoreLocation()));
+ }
+ if (this.ssl.getKeystorePassword() != null) {
+ properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
+ this.ssl.getKeystorePassword());
+ }
+ if (this.ssl.getTruststoreLocation() != null) {
+ properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
+ resourceToPath(this.ssl.getTruststoreLocation()));
+ }
+ if (this.ssl.getTruststorePassword() != null) {
+ properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
+ this.ssl.getTruststorePassword());
+ }
+ return properties;
+ }
+
+ /**
+ * Create an initial map of consumer properties from the state of this instance.
+ * This allows you to add additional properties, if necessary, and override the
+ * default kafkaConsumerFactory bean.
+ * @return the consumer properties initialized with the customizations defined on
+ * this instance
+ */
+ public Map buildConsumerProperties() {
+ Map props = buildCommonProperties();
+ props.putAll(this.consumer.buildProperties());
+ return props;
+ }
+
+ /**
+ * Create an initial map of producer properties from the state of this instance.
+ * This allows you to add additional properties, if necessary, and override the
+ * default kafkaProducerFactory bean.
+ * @return the producer properties initialized with the customizations defined on
+ * this instance
+ */
+ public Map buildProducerProperties() {
+ Map props = buildCommonProperties();
+ props.putAll(this.producer.buildProperties());
+ return props;
+ }
+
+ private static String resourceToPath(Resource resource) {
+ try {
+ return resource.getFile().getAbsolutePath();
+ }
+ catch (IOException ex) {
+ throw new IllegalStateException(String.format(
+ "Resource '%s' must be on a file system", resource), ex);
+ }
+ }
+
+ public static class Consumer {
+
+ private final Ssl ssl = new Ssl();
+
+ /**
+ * Frequency in milliseconds that the consumer offsets are auto-committed to
+ * Kafka if 'enable.auto.commit' true.
+ */
+ private Long autoCommitInterval;
+
+ /**
+ * What to do when there is no initial offset in Kafka or if the current offset
+ * does not exist any more on the server.
+ */
+ private String autoOffsetReset;
+
+ /**
+ * Comma-delimited list of host:port pairs to use for establishing the initial
+ * connection to the Kafka cluster.
+ */
+ private List bootstrapServers;
+
+ /**
+ * Id to pass to the server when making requests; used for server-side logging.
+ */
+ private String clientId;
+
+ /**
+ * If true the consumer's offset will be periodically committed in the background.
+ */
+ private Boolean enableAutoCommit;
+
+ /**
+ * Maximum amount of time in milliseconds the server will block before answering
+ * the fetch request if there isn't sufficient data to immediately satisfy the
+ * requirement given by "fetch.min.bytes".
+ */
+ private Integer fetchMaxWait;
+
+ /**
+ * Minimum amount of data the server should return for a fetch request in bytes.
+ */
+ private Integer fetchMinSize;
+
+ /**
+ * Unique string that identifies the consumer group this consumer belongs to.
+ */
+ private String groupId;
+
+ /**
+ * Expected time in milliseconds between heartbeats to the consumer coordinator.
+ */
+ private Integer heartbeatInterval;
+
+ /**
+ * Deserializer class for keys.
+ */
+ private Class> keyDeserializer = StringDeserializer.class;
+
+ /**
+ * Deserializer class for values.
+ */
+ private Class> valueDeserializer = StringDeserializer.class;
+
+ public Ssl getSsl() {
+ return this.ssl;
+ }
+
+ public Long getAutoCommitInterval() {
+ return this.autoCommitInterval;
+ }
+
+ public void setAutoCommitInterval(Long autoCommitInterval) {
+ this.autoCommitInterval = autoCommitInterval;
+ }
+
+ public String getAutoOffsetReset() {
+ return this.autoOffsetReset;
+ }
+
+ public void setAutoOffsetReset(String autoOffsetReset) {
+ this.autoOffsetReset = autoOffsetReset;
+ }
+
+ public List getBootstrapServers() {
+ return this.bootstrapServers;
+ }
+
+ public void setBootstrapServers(List bootstrapServers) {
+ this.bootstrapServers = bootstrapServers;
+ }
+
+ public String getClientId() {
+ return this.clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public Boolean getEnableAutoCommit() {
+ return this.enableAutoCommit;
+ }
+
+ public void setEnableAutoCommit(Boolean enableAutoCommit) {
+ this.enableAutoCommit = enableAutoCommit;
+ }
+
+ public Integer getFetchMaxWait() {
+ return this.fetchMaxWait;
+ }
+
+ public void setFetchMaxWait(Integer fetchMaxWait) {
+ this.fetchMaxWait = fetchMaxWait;
+ }
+
+ public Integer getFetchMinSize() {
+ return this.fetchMinSize;
+ }
+
+ public void setFetchMinSize(Integer fetchMinSize) {
+ this.fetchMinSize = fetchMinSize;
+ }
+
+ public String getGroupId() {
+ return this.groupId;
+ }
+
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
+ }
+
+ public Integer getHeartbeatInterval() {
+ return this.heartbeatInterval;
+ }
+
+ public void setHeartbeatInterval(Integer heartbeatInterval) {
+ this.heartbeatInterval = heartbeatInterval;
+ }
+
+ public Class> getKeyDeserializer() {
+ return this.keyDeserializer;
+ }
+
+ public void setKeyDeserializer(Class> keyDeserializer) {
+ this.keyDeserializer = keyDeserializer;
+ }
+
+ public Class> getValueDeserializer() {
+ return this.valueDeserializer;
+ }
+
+ public void setValueDeserializer(Class> valueDeserializer) {
+ this.valueDeserializer = valueDeserializer;
+ }
+
+ public Map buildProperties() {
+ Map properties = new HashMap();
+ if (this.autoCommitInterval != null) {
+ properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, this.autoCommitInterval);
+ }
+ if (this.autoOffsetReset != null) {
+ properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.autoOffsetReset);
+ }
+ if (this.bootstrapServers != null) {
+ properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
+ }
+ if (this.clientId != null) {
+ properties.put(ConsumerConfig.CLIENT_ID_CONFIG, this.clientId);
+ }
+ if (this.enableAutoCommit != null) {
+ properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.enableAutoCommit);
+ }
+ if (this.fetchMaxWait != null) {
+ properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, this.fetchMaxWait);
+ }
+ if (this.fetchMinSize != null) {
+ properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, this.fetchMinSize);
+ }
+ if (this.groupId != null) {
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
+ }
+ if (this.heartbeatInterval != null) {
+ properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, this.heartbeatInterval);
+ }
+ if (this.keyDeserializer != null) {
+ properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, this.keyDeserializer);
+ }
+ if (this.ssl.getKeyPassword() != null) {
+ properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.ssl.getKeyPassword());
+ }
+ if (this.ssl.getKeystoreLocation() != null) {
+ properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, resourceToPath(this.ssl.getKeystoreLocation()));
+ }
+ if (this.ssl.getKeystorePassword() != null) {
+ properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.ssl.getKeystorePassword());
+ }
+ if (this.ssl.getTruststoreLocation() != null) {
+ properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
+ resourceToPath(this.ssl.getTruststoreLocation()));
+ }
+ if (this.ssl.getTruststorePassword() != null) {
+ properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.ssl.getTruststorePassword());
+ }
+ if (this.valueDeserializer != null) {
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, this.valueDeserializer);
+ }
+ return properties;
+ }
+
+ }
+
+ public static class Producer {
+
+ private final Ssl ssl = new Ssl();
+
+ /**
+ * Number of acknowledgments the producer requires the leader to have received
+ * before considering a request complete.
+ */
+ private String acks;
+
+ /**
+ * Number of records to batch before sending.
+ */
+ private Integer batchSize;
+
+ /**
+ * Comma-delimited list of host:port pairs to use for establishing the initial
+ * connection to the Kafka cluster.
+ */
+ private List bootstrapServers;
+
+ /**
+ * Total bytes of memory the producer can use to buffer records waiting to be
+ * sent to the server.
+ */
+ private Long bufferMemory;
+
+ /**
+ * Id to pass to the server when making requests; used for server-side logging.
+ */
+ private String clientId;
+
+ /**
+ * Compression type for all data generated by the producer.
+ */
+ private String compressionType;
+
+ /**
+ * Serializer class for keys.
+ */
+ private Class> keySerializer = StringSerializer.class;
+
+ /**
+ * Serializer class for values.
+ */
+ private Class> valueSerializer = StringSerializer.class;
+
+ /**
+ * When greater than zero, enables retrying of failed sends.
+ */
+ private Integer retries;
+
+ public Ssl getSsl() {
+ return this.ssl;
+ }
+
+ public String getAcks() {
+ return this.acks;
+ }
+
+ public void setAcks(String acks) {
+ this.acks = acks;
+ }
+
+ public Integer getBatchSize() {
+ return this.batchSize;
+ }
+
+ public void setBatchSize(Integer batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ public List getBootstrapServers() {
+ return this.bootstrapServers;
+ }
+
+ public void setBootstrapServers(List bootstrapServers) {
+ this.bootstrapServers = bootstrapServers;
+ }
+
+ public Long getBufferMemory() {
+ return this.bufferMemory;
+ }
+
+ public void setBufferMemory(Long bufferMemory) {
+ this.bufferMemory = bufferMemory;
+ }
+
+ public String getClientId() {
+ return this.clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public String getCompressionType() {
+ return this.compressionType;
+ }
+
+ public void setCompressionType(String compressionType) {
+ this.compressionType = compressionType;
+ }
+
+ public Class> getKeySerializer() {
+ return this.keySerializer;
+ }
+
+ public void setKeySerializer(Class> keySerializer) {
+ this.keySerializer = keySerializer;
+ }
+
+ public Class> getValueSerializer() {
+ return this.valueSerializer;
+ }
+
+ public void setValueSerializer(Class> valueSerializer) {
+ this.valueSerializer = valueSerializer;
+ }
+
+ public Integer getRetries() {
+ return this.retries;
+ }
+
+ public void setRetries(Integer retries) {
+ this.retries = retries;
+ }
+
+ public Map buildProperties() {
+ Map properties = new HashMap();
+ if (this.acks != null) {
+ properties.put(ProducerConfig.ACKS_CONFIG, this.acks);
+ }
+ if (this.batchSize != null) {
+ properties.put(ProducerConfig.BATCH_SIZE_CONFIG, this.batchSize);
+ }
+ if (this.bootstrapServers != null) {
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
+ }
+ if (this.bufferMemory != null) {
+ properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, this.bufferMemory);
+ }
+ if (this.clientId != null) {
+ properties.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientId);
+ }
+ if (this.compressionType != null) {
+ properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, this.compressionType);
+ }
+ if (this.keySerializer != null) {
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, this.keySerializer);
+ }
+ if (this.retries != null) {
+ properties.put(ProducerConfig.RETRIES_CONFIG, this.retries);
+ }
+ if (this.ssl.getKeyPassword() != null) {
+ properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.ssl.getKeyPassword());
+ }
+ if (this.ssl.getKeystoreLocation() != null) {
+ properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, resourceToPath(this.ssl.getKeystoreLocation()));
+ }
+ if (this.ssl.getKeystorePassword() != null) {
+ properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.ssl.getKeystorePassword());
+ }
+ if (this.ssl.getTruststoreLocation() != null) {
+ properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
+ resourceToPath(this.ssl.getTruststoreLocation()));
+ }
+ if (this.ssl.getTruststorePassword() != null) {
+ properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.ssl.getTruststorePassword());
+ }
+ if (this.valueSerializer != null) {
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, this.valueSerializer);
+ }
+ return properties;
+ }
+
+ }
+
+ public static class Template {
+
+ /**
+ * Default topic to which messages will be sent.
+ */
+ private String defaultTopic;
+
+ public String getDefaultTopic() {
+ return this.defaultTopic;
+ }
+
+ public void setDefaultTopic(String defaultTopic) {
+ this.defaultTopic = defaultTopic;
+ }
+
+ }
+
+ public static class Listener {
+
+ /**
+ * Listener AckMode; see the spring-kafka documentation.
+ */
+ private AckMode ackMode;
+
+ /**
+ * Number of threads to run in the listener containers.
+ */
+ private Integer concurrency;
+
+ /**
+ * Timeout in milliseconds to use when polling the consumer.
+ */
+ private Long pollTimeout;
+
+ /**
+ * Number of records between offset commits when ackMode is "COUNT" or
+ * "COUNT_TIME".
+ */
+ private Integer ackCount;
+
+ /**
+ * Time in milliseconds between offset commits when ackMode is "TIME" or
+ * "COUNT_TIME".
+ */
+ private Long ackTime;
+
+ public AckMode getAckMode() {
+ return this.ackMode;
+ }
+
+ public void setAckMode(AckMode ackMode) {
+ this.ackMode = ackMode;
+ }
+
+ public Integer getConcurrency() {
+ return this.concurrency;
+ }
+
+ public void setConcurrency(Integer concurrency) {
+ this.concurrency = concurrency;
+ }
+
+ public Long getPollTimeout() {
+ return this.pollTimeout;
+ }
+
+ public void setPollTimeout(Long pollTimeout) {
+ this.pollTimeout = pollTimeout;
+ }
+
+ public Integer getAckCount() {
+ return this.ackCount;
+ }
+
+ public void setAckCount(Integer ackCount) {
+ this.ackCount = ackCount;
+ }
+
+ public Long getAckTime() {
+ return this.ackTime;
+ }
+
+ public void setAckTime(Long ackTime) {
+ this.ackTime = ackTime;
+ }
+
+ }
+
+ public static class Ssl {
+
+ /**
+ * Password of the private key in the key store file.
+ */
+ private String keyPassword;
+
+ /**
+ * Location of the key store file.
+ */
+ private Resource keystoreLocation;
+
+ /**
+ * Store password for the key store file.
+ */
+ private String keystorePassword;
+
+ /**
+ * Location of the trust store file.
+ */
+ private Resource truststoreLocation;
+
+ /**
+ * Store password for the trust store file.
+ */
+ private String truststorePassword;
+
+ public String getKeyPassword() {
+ return this.keyPassword;
+ }
+
+ public void setKeyPassword(String keyPassword) {
+ this.keyPassword = keyPassword;
+ }
+
+ public Resource getKeystoreLocation() {
+ return this.keystoreLocation;
+ }
+
+ public void setKeystoreLocation(Resource keystoreLocation) {
+ this.keystoreLocation = keystoreLocation;
+ }
+
+ public String getKeystorePassword() {
+ return this.keystorePassword;
+ }
+
+ public void setKeystorePassword(String keystorePassword) {
+ this.keystorePassword = keystorePassword;
+ }
+
+ public Resource getTruststoreLocation() {
+ return this.truststoreLocation;
+ }
+
+ public void setTruststoreLocation(Resource truststoreLocation) {
+ this.truststoreLocation = truststoreLocation;
+ }
+
+ public String getTruststorePassword() {
+ return this.truststorePassword;
+ }
+
+ public void setTruststorePassword(String truststorePassword) {
+ this.truststorePassword = truststorePassword;
+ }
+
+ }
+
+}
diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/package-info.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/package-info.java
new file mode 100644
index 00000000000..c4d1bbbd058
--- /dev/null
+++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2012-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Auto-configuration for Apache Kafka.
+ */
+package org.springframework.boot.autoconfigure.kafka;
diff --git a/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json
index a9d4dadb641..b2ccbda8b44 100644
--- a/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json
+++ b/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json
@@ -409,6 +409,76 @@
}
]
},
+ {
+ "name": "spring.kafka.consumer.auto-offset-reset",
+ "values": [
+ {
+ "value": "earliest",
+ "description": "Automatically reset the offset to the earliest offset."
+ },
+ {
+ "value": "latest",
+ "description": "Automatically reset the offset to the latest offset."
+ },
+ {
+ "value": "none",
+ "description": "Throw exception to the consumer if no previous offset is found for the consumer's group."
+ },
+ {
+ "value": "exception",
+ "description": "Throw exception to the consumer."
+ }
+ ],
+ "providers": [
+ {
+ "name": "any"
+ }
+ ]
+ },
+ {
+ "name": "spring.kafka.consumer.key-deserializer",
+ "providers": [
+ {
+ "name": "handle-as",
+ "parameters": {
+ "target": "org.apache.kafka.common.serialization.Deserializer"
+ }
+ }
+ ]
+ },
+ {
+ "name": "spring.kafka.consumer.value-deserializer",
+ "providers": [
+ {
+ "name": "handle-as",
+ "parameters": {
+ "target": "org.apache.kafka.common.serialization.Deserializer"
+ }
+ }
+ ]
+ },
+ {
+ "name": "spring.kafka.producer.key-serializer",
+ "providers": [
+ {
+ "name": "handle-as",
+ "parameters": {
+ "target": "org.apache.kafka.common.serialization.Serializer"
+ }
+ }
+ ]
+ },
+ {
+ "name": "spring.kafka.producer.value-serializer",
+ "providers": [
+ {
+ "name": "handle-as",
+ "parameters": {
+ "target": "org.apache.kafka.common.serialization.Serializer"
+ }
+ }
+ ]
+ },
{
"name": "spring.http.converters.preferred-json-mapper",
"values": [
diff --git a/spring-boot-autoconfigure/src/main/resources/META-INF/spring.factories b/spring-boot-autoconfigure/src/main/resources/META-INF/spring.factories
index 03e96c9a9a9..10f0275b1f7 100644
--- a/spring-boot-autoconfigure/src/main/resources/META-INF/spring.factories
+++ b/spring-boot-autoconfigure/src/main/resources/META-INF/spring.factories
@@ -62,6 +62,7 @@ org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration,\
org.springframework.boot.autoconfigure.groovy.template.GroovyTemplateAutoConfiguration,\
org.springframework.boot.autoconfigure.jersey.JerseyAutoConfiguration,\
org.springframework.boot.autoconfigure.jooq.JooqAutoConfiguration,\
+org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration,\
org.springframework.boot.autoconfigure.liquibase.LiquibaseAutoConfiguration,\
org.springframework.boot.autoconfigure.mail.MailSenderAutoConfiguration,\
org.springframework.boot.autoconfigure.mail.MailSenderValidatorAutoConfiguration,\
diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java
new file mode 100644
index 00000000000..42f097f78be
--- /dev/null
+++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2012-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.boot.autoconfigure.kafka;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import org.springframework.boot.test.util.EnvironmentTestUtils;
+import org.springframework.context.annotation.AnnotationConfigApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.kafka.test.rule.KafkaEmbedded;
+import org.springframework.messaging.handler.annotation.Header;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration tests for {@link KafkaAutoConfiguration}.
+ *
+ * @author Gary Russell
+ */
+public class KafkaAutoConfigurationIntegrationTests {
+
+ private static final String TEST_TOPIC = "testTopic";
+
+ @ClassRule
+ public static final KafkaEmbedded kafkaEmbedded =
+ new KafkaEmbedded(1, true, TEST_TOPIC);
+
+ private AnnotationConfigApplicationContext context;
+
+ @After
+ public void close() {
+ if (this.context != null) {
+ this.context.close();
+ }
+ }
+
+ @Test
+ public void testEndToEnd() throws Exception {
+ load(KafkaConfig.class,
+ "spring.kafka.bootstrap-servers:" + kafkaEmbedded.getBrokersAsString(),
+ "spring.kafka.consumer.group-id=testGroup",
+ "spring.kafka.consumer.auto-offset-reset=earliest");
+ @SuppressWarnings("unchecked")
+ KafkaTemplate template = this.context.getBean(KafkaTemplate.class);
+ template.send(TEST_TOPIC, "foo", "bar");
+ Listener listener = this.context.getBean(Listener.class);
+ assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue();
+ assertThat(listener.key).isEqualTo("foo");
+ assertThat(listener.received).isEqualTo("bar");
+ }
+
+ private void load(Class> config, String... environment) {
+ this.context = doLoad(new Class>[] { config }, environment);
+ }
+
+ private AnnotationConfigApplicationContext doLoad(Class>[] configs,
+ String... environment) {
+ AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext();
+ applicationContext.register(configs);
+ applicationContext.register(KafkaAutoConfiguration.class);
+ EnvironmentTestUtils.addEnvironment(applicationContext, environment);
+ applicationContext.refresh();
+ return applicationContext;
+ }
+
+ public static class KafkaConfig {
+
+ @Bean
+ public Listener listener() {
+ return new Listener();
+ }
+
+ }
+
+ public static class Listener {
+
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ private volatile String received;
+
+ private volatile String key;
+
+ @KafkaListener(topics = TEST_TOPIC)
+ public void listen(String foo,
+ @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
+ this.received = foo;
+ this.key = key;
+ this.latch.countDown();
+ }
+
+ }
+
+}
diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java
new file mode 100644
index 00000000000..c86d8f01b78
--- /dev/null
+++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2012-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.boot.autoconfigure.kafka;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.junit.After;
+import org.junit.Test;
+
+import org.springframework.beans.DirectFieldAccessor;
+import org.springframework.boot.test.util.EnvironmentTestUtils;
+import org.springframework.context.annotation.AnnotationConfigApplicationContext;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+/**
+ * Tests for {@link KafkaAutoConfiguration}.
+ *
+ * @author Gary Russell
+ * @author Stephane Nicoll
+ * @since 1.5
+ */
+public class KafkaAutoConfigurationTests {
+
+ private AnnotationConfigApplicationContext context;
+
+ @After
+ public void closeContext() {
+ if (this.context != null) {
+ this.context.close();
+ }
+ }
+
+ @Test
+ public void consumerProperties() {
+ load("spring.kafka.bootstrap-servers=foo:1234",
+ "spring.kafka.ssl.key-password=p1",
+ "spring.kafka.ssl.keystore-location=classpath:ksLoc",
+ "spring.kafka.ssl.keystore-password=p2",
+ "spring.kafka.ssl.truststore-location=classpath:tsLoc",
+ "spring.kafka.ssl.truststore-password=p3",
+ "spring.kafka.consumer.auto-commit-interval=123",
+ "spring.kafka.consumer.auto-offset-reset=earliest",
+ "spring.kafka.consumer.client-id=ccid", // test override common
+ "spring.kafka.consumer.enable-auto-commit=false",
+ "spring.kafka.consumer.fetch-max-wait=456",
+ "spring.kafka.consumer.fetch-min-size=789",
+ "spring.kafka.consumer.group-id=bar",
+ "spring.kafka.consumer.heartbeat-interval=234",
+ "spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer",
+ "spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer");
+ DefaultKafkaConsumerFactory, ?> consumerFactory = this.context.getBean(DefaultKafkaConsumerFactory.class);
+ @SuppressWarnings("unchecked")
+ Map consumerProps = (Map) new DirectFieldAccessor(consumerFactory)
+ .getPropertyValue("configs");
+ // common
+ assertThat(consumerProps.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
+ .isEqualTo(Collections.singletonList("foo:1234"));
+ assertThat(consumerProps.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p1");
+ assertThat((String) consumerProps.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+ .endsWith(File.separator + "ksLoc");
+ assertThat(consumerProps.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p2");
+ assertThat((String) consumerProps.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
+ .endsWith(File.separator + "tsLoc");
+ assertThat(consumerProps.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).isEqualTo("p3");
+ // consumer
+ assertThat(consumerProps.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("ccid"); // override
+ assertThat(consumerProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)).isEqualTo(Boolean.FALSE);
+ assertThat(consumerProps.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)).isEqualTo(123L);
+ assertThat(consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest");
+ assertThat(consumerProps.get(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG)).isEqualTo(456);
+ assertThat(consumerProps.get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)).isEqualTo(789);
+ assertThat(consumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)).isEqualTo("bar");
+ assertThat(consumerProps.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)).isEqualTo(234);
+ assertThat(consumerProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)).isEqualTo(LongDeserializer.class);
+ assertThat(consumerProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
+ .isEqualTo(IntegerDeserializer.class);
+ }
+
+ @Test
+ public void producerProperties() {
+ load("spring.kafka.clientId=cid",
+ "spring.kafka.producer.acks=all",
+ "spring.kafka.producer.batch-size=20",
+ "spring.kafka.producer.bootstrap-servers=bar:1234", // test override common
+ "spring.kafka.producer.buffer-memory=12345",
+ "spring.kafka.producer.compression-type=gzip",
+ "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer",
+ "spring.kafka.producer.retries=2",
+ "spring.kafka.producer.ssl.key-password=p4",
+ "spring.kafka.producer.ssl.keystore-location=classpath:ksLocP",
+ "spring.kafka.producer.ssl.keystore-password=p5",
+ "spring.kafka.producer.ssl.truststore-location=classpath:tsLocP",
+ "spring.kafka.producer.ssl.truststore-password=p6",
+ "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer");
+ DefaultKafkaProducerFactory, ?> producerFactory = this.context.getBean(DefaultKafkaProducerFactory.class);
+ @SuppressWarnings("unchecked")
+ Map producerProps = (Map) new DirectFieldAccessor(producerFactory)
+ .getPropertyValue("configs");
+ // common
+ assertThat(producerProps.get(ProducerConfig.CLIENT_ID_CONFIG)).isEqualTo("cid");
+ // producer
+ assertThat(producerProps.get(ProducerConfig.ACKS_CONFIG)).isEqualTo("all");
+ assertThat(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG)).isEqualTo(20);
+ assertThat(producerProps.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
+ .isEqualTo(Collections.singletonList("bar:1234")); // override
+ assertThat(producerProps.get(ProducerConfig.BUFFER_MEMORY_CONFIG)).isEqualTo(12345L);
+ assertThat(producerProps.get(ProducerConfig.COMPRESSION_TYPE_CONFIG)).isEqualTo("gzip");
+ assertThat(producerProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)).isEqualTo(LongSerializer.class);
+ assertThat(producerProps.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4");
+ assertThat((String) producerProps.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+ .endsWith(File.separator + "ksLocP");
+ assertThat(producerProps.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p5");
+ assertThat((String) producerProps.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
+ .endsWith(File.separator + "tsLocP");
+ assertThat(producerProps.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).isEqualTo("p6");
+ assertThat(producerProps.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2);
+ assertThat(producerProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)).isEqualTo(IntegerSerializer.class);
+ }
+
+ @Test
+ public void listenerProperties() {
+ load("spring.kafka.template.default-topic=testTopic",
+
+ "spring.kafka.listener.ack-mode=MANUAL",
+ "spring.kafka.listener.ack-count=123",
+ "spring.kafka.listener.ack-time=456",
+ "spring.kafka.listener.concurrency=3",
+ "spring.kafka.listener.poll-timeout=2000");
+ DefaultKafkaProducerFactory, ?> producerFactory = this.context.getBean(DefaultKafkaProducerFactory.class);
+ DefaultKafkaConsumerFactory, ?> consumerFactory = this.context.getBean(DefaultKafkaConsumerFactory.class);
+ KafkaTemplate, ?> kafkaTemplate = this.context.getBean(KafkaTemplate.class);
+ KafkaListenerContainerFactory> kafkaListenerContainerFactory = this.context
+ .getBean(KafkaListenerContainerFactory.class);
+ assertThat(new DirectFieldAccessor(kafkaTemplate).getPropertyValue("producerFactory"))
+ .isEqualTo(producerFactory);
+ assertThat(kafkaTemplate.getDefaultTopic()).isEqualTo("testTopic");
+ DirectFieldAccessor dfa = new DirectFieldAccessor(kafkaListenerContainerFactory);
+ assertThat(dfa.getPropertyValue("consumerFactory")).isEqualTo(consumerFactory);
+ assertThat(dfa.getPropertyValue("containerProperties.ackMode")).isEqualTo(AckMode.MANUAL);
+ assertThat(dfa.getPropertyValue("containerProperties.ackCount")).isEqualTo(123);
+ assertThat(dfa.getPropertyValue("containerProperties.ackTime")).isEqualTo(456L);
+ assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3);
+ assertThat(dfa.getPropertyValue("containerProperties.pollTimeout")).isEqualTo(2000L);
+ }
+
+ private void load(String... environment) {
+ AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
+ ctx.register(KafkaAutoConfiguration.class);
+ EnvironmentTestUtils.addEnvironment(ctx, environment);
+ ctx.refresh();
+ this.context = ctx;
+ }
+
+}
diff --git a/spring-boot-autoconfigure/src/test/resources/ksLoc b/spring-boot-autoconfigure/src/test/resources/ksLoc
new file mode 100644
index 00000000000..696f2109e66
--- /dev/null
+++ b/spring-boot-autoconfigure/src/test/resources/ksLoc
@@ -0,0 +1 @@
+Test file for Kafka.
diff --git a/spring-boot-autoconfigure/src/test/resources/ksLocP b/spring-boot-autoconfigure/src/test/resources/ksLocP
new file mode 100644
index 00000000000..696f2109e66
--- /dev/null
+++ b/spring-boot-autoconfigure/src/test/resources/ksLocP
@@ -0,0 +1 @@
+Test file for Kafka.
diff --git a/spring-boot-autoconfigure/src/test/resources/tsLoc b/spring-boot-autoconfigure/src/test/resources/tsLoc
new file mode 100644
index 00000000000..696f2109e66
--- /dev/null
+++ b/spring-boot-autoconfigure/src/test/resources/tsLoc
@@ -0,0 +1 @@
+Test file for Kafka.
diff --git a/spring-boot-autoconfigure/src/test/resources/tsLocP b/spring-boot-autoconfigure/src/test/resources/tsLocP
new file mode 100644
index 00000000000..696f2109e66
--- /dev/null
+++ b/spring-boot-autoconfigure/src/test/resources/tsLocP
@@ -0,0 +1 @@
+Test file for Kafka.
diff --git a/spring-boot-dependencies/pom.xml b/spring-boot-dependencies/pom.xml
index 0512aea8d85..3ef99d9c899 100644
--- a/spring-boot-dependencies/pom.xml
+++ b/spring-boot-dependencies/pom.xml
@@ -156,6 +156,7 @@
0.21.0.RELEASE
4.3.5.RELEASE
1.2.0.RELEASE
+ 1.1.1.RELEASE
1.2.6.RELEASE
1.1.5.RELEASE
1.2.0.RELEASE
@@ -2123,6 +2124,11 @@
spring-hateoas
${spring-hateoas.version}