From d56403b64fbe4d371d240d08bc6ccd34f5295762 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 16 Feb 2022 12:16:23 -0500 Subject: [PATCH] Add support for Kafka immediateStop property See gh-29884 --- ...fkaListenerContainerFactoryConfigurer.java | 1 + .../autoconfigure/kafka/KafkaProperties.java | 14 +++++++++ .../kafka/KafkaAutoConfigurationTests.java | 30 +++++++++---------- 3 files changed, 30 insertions(+), 15 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index ef8d008877d..f8d24a790ad 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -207,6 +207,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig); map.from(properties::isOnlyLogRecordMetadata).to(container::setOnlyLogRecordMetadata); map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal); + map.from(properties::isImmediateStop).to(container::setStopImmediate); map.from(this.transactionManager).to(container::setTransactionManager); map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener); } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 52c99804163..3d15670cef2 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -946,6 +946,12 @@ public class KafkaProperties { */ private boolean missingTopicsFatal = false; + /** + * Whether the container stops after the current record is processed or after all + * the records from the previous poll are processed. + */ + private boolean immediateStop = false; + public Type getType() { return this.type; } @@ -1066,6 +1072,14 @@ public class KafkaProperties { this.missingTopicsFatal = missingTopicsFatal; } + public boolean isImmediateStop() { + return this.immediateStop; + } + + public void setImmediateStop(boolean immediateStop) { + this.immediateStop = immediateStop; + } + } public static class Ssl { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 179f0cba884..12f98a78241 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -382,21 +382,20 @@ class KafkaAutoConfigurationTests { @SuppressWarnings("unchecked") @Test void listenerProperties() { - this.contextRunner - .withPropertyValues("spring.kafka.template.default-topic=testTopic", - "spring.kafka.template.transaction-id-prefix=txOverride", - "spring.kafka.listener.ack-mode=MANUAL", "spring.kafka.listener.client-id=client", - "spring.kafka.listener.ack-count=123", "spring.kafka.listener.ack-time=456", - "spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000", - "spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch", - "spring.kafka.listener.idle-between-polls=1s", "spring.kafka.listener.idle-event-interval=1s", - "spring.kafka.listener.idle-partition-event-interval=1s", - "spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.log-container-config=true", - "spring.kafka.listener.only-log-record-metadata=true", - "spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true", - "spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo", - "spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true") - .run((context) -> { + this.contextRunner.withPropertyValues("spring.kafka.template.default-topic=testTopic", + "spring.kafka.template.transaction-id-prefix=txOverride", "spring.kafka.listener.ack-mode=MANUAL", + "spring.kafka.listener.client-id=client", "spring.kafka.listener.ack-count=123", + "spring.kafka.listener.ack-time=456", "spring.kafka.listener.concurrency=3", + "spring.kafka.listener.poll-timeout=2000", "spring.kafka.listener.no-poll-threshold=2.5", + "spring.kafka.listener.type=batch", "spring.kafka.listener.idle-between-polls=1s", + "spring.kafka.listener.idle-event-interval=1s", + "spring.kafka.listener.idle-partition-event-interval=1s", "spring.kafka.listener.monitor-interval=45", + "spring.kafka.listener.log-container-config=true", + "spring.kafka.listener.only-log-record-metadata=true", + "spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true", + "spring.kafka.listener.immediate-stop=true", "spring.kafka.producer.transaction-id-prefix=foo", + "spring.kafka.jaas.login-module=foo", "spring.kafka.jaas.control-flag=REQUISITE", + "spring.kafka.jaas.options.useKeyTab=true").run((context) -> { DefaultKafkaProducerFactory producerFactory = context .getBean(DefaultKafkaProducerFactory.class); DefaultKafkaConsumerFactory consumerFactory = context @@ -423,6 +422,7 @@ class KafkaAutoConfigurationTests { assertThat(containerProperties.isLogContainerConfig()).isTrue(); assertThat(containerProperties.isOnlyLogRecordMetadata()).isTrue(); assertThat(containerProperties.isMissingTopicsFatal()).isTrue(); + assertThat(containerProperties.isStopImmediate()).isTrue(); assertThat(kafkaListenerContainerFactory).extracting("concurrency").isEqualTo(3); assertThat(kafkaListenerContainerFactory.isBatchListener()).isTrue(); assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)).hasSize(1);