Add support for Kafka immediateStop property
See gh-29884
This commit is contained in:
parent
c5307a8bfe
commit
d56403b64f
|
@ -207,6 +207,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
|
||||||
map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig);
|
map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig);
|
||||||
map.from(properties::isOnlyLogRecordMetadata).to(container::setOnlyLogRecordMetadata);
|
map.from(properties::isOnlyLogRecordMetadata).to(container::setOnlyLogRecordMetadata);
|
||||||
map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal);
|
map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal);
|
||||||
|
map.from(properties::isImmediateStop).to(container::setStopImmediate);
|
||||||
map.from(this.transactionManager).to(container::setTransactionManager);
|
map.from(this.transactionManager).to(container::setTransactionManager);
|
||||||
map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener);
|
map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener);
|
||||||
}
|
}
|
||||||
|
|
|
@ -946,6 +946,12 @@ public class KafkaProperties {
|
||||||
*/
|
*/
|
||||||
private boolean missingTopicsFatal = false;
|
private boolean missingTopicsFatal = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether the container stops after the current record is processed or after all
|
||||||
|
* the records from the previous poll are processed.
|
||||||
|
*/
|
||||||
|
private boolean immediateStop = false;
|
||||||
|
|
||||||
public Type getType() {
|
public Type getType() {
|
||||||
return this.type;
|
return this.type;
|
||||||
}
|
}
|
||||||
|
@ -1066,6 +1072,14 @@ public class KafkaProperties {
|
||||||
this.missingTopicsFatal = missingTopicsFatal;
|
this.missingTopicsFatal = missingTopicsFatal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isImmediateStop() {
|
||||||
|
return this.immediateStop;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setImmediateStop(boolean immediateStop) {
|
||||||
|
this.immediateStop = immediateStop;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Ssl {
|
public static class Ssl {
|
||||||
|
|
|
@ -382,21 +382,20 @@ class KafkaAutoConfigurationTests {
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Test
|
@Test
|
||||||
void listenerProperties() {
|
void listenerProperties() {
|
||||||
this.contextRunner
|
this.contextRunner.withPropertyValues("spring.kafka.template.default-topic=testTopic",
|
||||||
.withPropertyValues("spring.kafka.template.default-topic=testTopic",
|
"spring.kafka.template.transaction-id-prefix=txOverride", "spring.kafka.listener.ack-mode=MANUAL",
|
||||||
"spring.kafka.template.transaction-id-prefix=txOverride",
|
"spring.kafka.listener.client-id=client", "spring.kafka.listener.ack-count=123",
|
||||||
"spring.kafka.listener.ack-mode=MANUAL", "spring.kafka.listener.client-id=client",
|
"spring.kafka.listener.ack-time=456", "spring.kafka.listener.concurrency=3",
|
||||||
"spring.kafka.listener.ack-count=123", "spring.kafka.listener.ack-time=456",
|
"spring.kafka.listener.poll-timeout=2000", "spring.kafka.listener.no-poll-threshold=2.5",
|
||||||
"spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000",
|
"spring.kafka.listener.type=batch", "spring.kafka.listener.idle-between-polls=1s",
|
||||||
"spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch",
|
"spring.kafka.listener.idle-event-interval=1s",
|
||||||
"spring.kafka.listener.idle-between-polls=1s", "spring.kafka.listener.idle-event-interval=1s",
|
"spring.kafka.listener.idle-partition-event-interval=1s", "spring.kafka.listener.monitor-interval=45",
|
||||||
"spring.kafka.listener.idle-partition-event-interval=1s",
|
"spring.kafka.listener.log-container-config=true",
|
||||||
"spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.log-container-config=true",
|
"spring.kafka.listener.only-log-record-metadata=true",
|
||||||
"spring.kafka.listener.only-log-record-metadata=true",
|
"spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true",
|
||||||
"spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true",
|
"spring.kafka.listener.immediate-stop=true", "spring.kafka.producer.transaction-id-prefix=foo",
|
||||||
"spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo",
|
"spring.kafka.jaas.login-module=foo", "spring.kafka.jaas.control-flag=REQUISITE",
|
||||||
"spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true")
|
"spring.kafka.jaas.options.useKeyTab=true").run((context) -> {
|
||||||
.run((context) -> {
|
|
||||||
DefaultKafkaProducerFactory<?, ?> producerFactory = context
|
DefaultKafkaProducerFactory<?, ?> producerFactory = context
|
||||||
.getBean(DefaultKafkaProducerFactory.class);
|
.getBean(DefaultKafkaProducerFactory.class);
|
||||||
DefaultKafkaConsumerFactory<?, ?> consumerFactory = context
|
DefaultKafkaConsumerFactory<?, ?> consumerFactory = context
|
||||||
|
@ -423,6 +422,7 @@ class KafkaAutoConfigurationTests {
|
||||||
assertThat(containerProperties.isLogContainerConfig()).isTrue();
|
assertThat(containerProperties.isLogContainerConfig()).isTrue();
|
||||||
assertThat(containerProperties.isOnlyLogRecordMetadata()).isTrue();
|
assertThat(containerProperties.isOnlyLogRecordMetadata()).isTrue();
|
||||||
assertThat(containerProperties.isMissingTopicsFatal()).isTrue();
|
assertThat(containerProperties.isMissingTopicsFatal()).isTrue();
|
||||||
|
assertThat(containerProperties.isStopImmediate()).isTrue();
|
||||||
assertThat(kafkaListenerContainerFactory).extracting("concurrency").isEqualTo(3);
|
assertThat(kafkaListenerContainerFactory).extracting("concurrency").isEqualTo(3);
|
||||||
assertThat(kafkaListenerContainerFactory.isBatchListener()).isTrue();
|
assertThat(kafkaListenerContainerFactory.isBatchListener()).isTrue();
|
||||||
assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)).hasSize(1);
|
assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)).hasSize(1);
|
||||||
|
|
Loading…
Reference in New Issue