Merge pull request #24582 from martindacos
* pr/24582: Polish "Allow to configure Kafka Listener's onlyLogRecordMetadata" Allow to configure Kafka Listener's onlyLogRecordMetadata Closes gh-24582
This commit is contained in:
commit
72a22f80dc
|
@ -189,6 +189,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
|
|||
map.from(properties::getMonitorInterval).as(Duration::getSeconds).as(Number::intValue)
|
||||
.to(container::setMonitorInterval);
|
||||
map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig);
|
||||
map.from(properties::isOnlyLogRecordMetadata).to(container::setOnlyLogRecordMetadata);
|
||||
map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal);
|
||||
map.from(this.transactionManager).to(container::setTransactionManager);
|
||||
map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener);
|
||||
|
|
|
@ -913,6 +913,12 @@ public class KafkaProperties {
|
|||
*/
|
||||
private Boolean logContainerConfig;
|
||||
|
||||
/**
|
||||
* Whether to suppress the entire record from being written to the log when
|
||||
* retries are being attempted.
|
||||
*/
|
||||
private boolean onlyLogRecordMetadata;
|
||||
|
||||
/**
|
||||
* Whether the container should fail to start if at least one of the configured
|
||||
* topics are not present on the broker.
|
||||
|
@ -1015,6 +1021,14 @@ public class KafkaProperties {
|
|||
this.logContainerConfig = logContainerConfig;
|
||||
}
|
||||
|
||||
public boolean isOnlyLogRecordMetadata() {
|
||||
return this.onlyLogRecordMetadata;
|
||||
}
|
||||
|
||||
public void setOnlyLogRecordMetadata(boolean onlyLogRecordMetadata) {
|
||||
this.onlyLogRecordMetadata = onlyLogRecordMetadata;
|
||||
}
|
||||
|
||||
public boolean isMissingTopicsFatal() {
|
||||
return this.missingTopicsFatal;
|
||||
}
|
||||
|
|
|
@ -392,6 +392,7 @@ class KafkaAutoConfigurationTests {
|
|||
"spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch",
|
||||
"spring.kafka.listener.idle-between-polls=1s", "spring.kafka.listener.idle-event-interval=1s",
|
||||
"spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.log-container-config=true",
|
||||
"spring.kafka.listener.only-log-record-metadata=true",
|
||||
"spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true",
|
||||
"spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo",
|
||||
"spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true")
|
||||
|
@ -418,6 +419,7 @@ class KafkaAutoConfigurationTests {
|
|||
assertThat(containerProperties.getIdleEventInterval()).isEqualTo(1000L);
|
||||
assertThat(containerProperties.getMonitorInterval()).isEqualTo(45);
|
||||
assertThat(containerProperties.isLogContainerConfig()).isTrue();
|
||||
assertThat(containerProperties.isOnlyLogRecordMetadata()).isTrue();
|
||||
assertThat(containerProperties.isMissingTopicsFatal()).isTrue();
|
||||
assertThat(kafkaListenerContainerFactory).extracting("concurrency").isEqualTo(3);
|
||||
assertThat(kafkaListenerContainerFactory.isBatchListener()).isTrue();
|
||||
|
|
|
@ -47,6 +47,7 @@ class KafkaPropertiesTests {
|
|||
void listenerDefaultValuesAreConsistent() {
|
||||
ContainerProperties container = new ContainerProperties("test");
|
||||
Listener listenerProperties = new KafkaProperties().getListener();
|
||||
assertThat(listenerProperties.isOnlyLogRecordMetadata()).isEqualTo(container.isOnlyLogRecordMetadata());
|
||||
assertThat(listenerProperties.isMissingTopicsFatal()).isEqualTo(container.isMissingTopicsFatal());
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue