diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index b7629c1d2ac..3ac3d9b0bcb 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2016 the original author or authors. + * Copyright 2012-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -68,7 +68,9 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { if (container.getConcurrency() != null) { listenerContainerFactory.setConcurrency(container.getConcurrency()); } - listenerContainerFactory.setBatchListener(container.getBatchListener()); + if (container.getType() == Listener.Type.BATCH) { + listenerContainerFactory.setBatchListener(true); + } } } diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index c191b06a208..51a8fffe6e2 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -645,6 +645,25 @@ public class KafkaProperties { public static class Listener { + public enum Type { + + /** + * Invokes the endpoint with one ConsumerRecord at a time. + */ + SINGLE, + + /** + * Invokes the endpoint with a batch of ConsumerRecord. + */ + BATCH; + + } + + /** + * Listener type. + */ + private Type type = Type.SINGLE; + /** * Listener AckMode; see the spring-kafka documentation. */ @@ -672,10 +691,13 @@ public class KafkaProperties { */ private Long ackTime; - /** - * If true listener container factory will be configured to create batch listener. - */ - private boolean batchListener; + public Type getType() { + return this.type; + } + + public void setType(Type type) { + this.type = type; + } public AckMode getAckMode() { return this.ackMode; @@ -717,13 +739,6 @@ public class KafkaProperties { this.ackTime = ackTime; } - public boolean getBatchListener() { - return this.batchListener; - } - - public void setBatchListener(boolean batchListener) { - this.batchListener = batchListener; - } } public static class Ssl { diff --git a/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json index 356299c4b0d..04bf53776c3 100644 --- a/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -329,6 +329,10 @@ "name": "spring.kafka.jaas.control-flag", "defaultValue": "required" }, + { + "name": "spring.kafka.listener.type", + "defaultValue": "single" + }, { "name": "spring.mobile.devicedelegatingviewresolver.enabled", "type": "java.lang.Boolean", diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 6e7f8ef1fcc..b79f447ee18 100644 --- a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -176,7 +176,7 @@ public class KafkaAutoConfigurationTests { "spring.kafka.listener.ack-time=456", "spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000", - "spring.kafka.listener.batch-listener=true", + "spring.kafka.listener.type=batch", "spring.kafka.jaas.enabled=true", "spring.kafka.jaas.login-module=foo", "spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true"); diff --git a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc index 35b8931e8d5..88f45dadb3f 100644 --- a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc +++ b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc @@ -986,7 +986,7 @@ content into your application; rather pick only the properties that you need. spring.kafka.listener.ack-time= # Time in milliseconds between offset commits when ackMode is "TIME" or "COUNT_TIME". spring.kafka.listener.concurrency= # Number of threads to run in the listener containers. spring.kafka.listener.poll-timeout= # Timeout in milliseconds to use when polling the consumer. - spring.kafka.listener.batch-listener= # If true listener container factory will be configured to create batch listener. + spring.kafka.listener.type=single # Listener type. spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before considering a request complete. spring.kafka.producer.batch-size= # Number of records to batch before sending. spring.kafka.producer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.