diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index b7d40d0195b..b7629c1d2ac 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -68,6 +68,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { if (container.getConcurrency() != null) { listenerContainerFactory.setConcurrency(container.getConcurrency()); } + listenerContainerFactory.setBatchListener(container.getBatchListener()); } } diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index cd96bd5463d..c191b06a208 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -672,6 +672,11 @@ public class KafkaProperties { */ private Long ackTime; + /** + * If true listener container factory will be configured to create batch listener. + */ + private boolean batchListener; + public AckMode getAckMode() { return this.ackMode; } @@ -712,6 +717,13 @@ public class KafkaProperties { this.ackTime = ackTime; } + public boolean getBatchListener() { + return this.batchListener; + } + + public void setBatchListener(boolean batchListener) { + this.batchListener = batchListener; + } } public static class Ssl { diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 34328bbf740..6e7f8ef1fcc 100644 --- a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -176,6 +176,7 @@ public class KafkaAutoConfigurationTests { "spring.kafka.listener.ack-time=456", "spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000", + "spring.kafka.listener.batch-listener=true", "spring.kafka.jaas.enabled=true", "spring.kafka.jaas.login-module=foo", "spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true"); @@ -198,6 +199,8 @@ public class KafkaAutoConfigurationTests { assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3); assertThat(dfa.getPropertyValue("containerProperties.pollTimeout")) .isEqualTo(2000L); + assertThat(dfa.getPropertyValue("batchListener")) + .isEqualTo(true); assertThat(this.context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) .hasSize(1); KafkaJaasLoginModuleInitializer jaas = this.context diff --git a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc index c39e8715234..35b8931e8d5 100644 --- a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc +++ b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc @@ -986,6 +986,7 @@ content into your application; rather pick only the properties that you need. spring.kafka.listener.ack-time= # Time in milliseconds between offset commits when ackMode is "TIME" or "COUNT_TIME". spring.kafka.listener.concurrency= # Number of threads to run in the listener containers. spring.kafka.listener.poll-timeout= # Timeout in milliseconds to use when polling the consumer. + spring.kafka.listener.batch-listener= # If true listener container factory will be configured to create batch listener. spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before considering a request complete. spring.kafka.producer.batch-size= # Number of records to batch before sending. spring.kafka.producer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.