From b539e2e749e3d49b47e598f6e022721b3779814a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mart=C3=ADn=20Dacosta?= Date: Mon, 21 Dec 2020 20:03:22 +0100 Subject: [PATCH 1/2] Allow to configure Kafka Listener's onlyLogRecordMetadata See gh-24582 --- ...entKafkaListenerContainerFactoryConfigurer.java | 1 + .../boot/autoconfigure/kafka/KafkaProperties.java | 14 ++++++++++++++ .../kafka/KafkaAutoConfigurationTests.java | 2 ++ 3 files changed, 17 insertions(+) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index 7431fcf848b..09a45bdb8fa 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -189,6 +189,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { map.from(properties::getMonitorInterval).as(Duration::getSeconds).as(Number::intValue) .to(container::setMonitorInterval); map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig); + map.from(properties::getOnlyLogRecordMetadata).to(container::setOnlyLogRecordMetadata); map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal); map.from(this.transactionManager).to(container::setTransactionManager); map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index a8f092cd7bd..83fb5aa5a17 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -913,6 +913,12 @@ public class KafkaProperties { */ private Boolean logContainerConfig; + /** + * Whether to suppress the entire record from being written to the log when + * retries are being attempted. + */ + private Boolean onlyLogRecordMetadata; + /** * Whether the container should fail to start if at least one of the configured * topics are not present on the broker. @@ -1015,6 +1021,14 @@ public class KafkaProperties { this.logContainerConfig = logContainerConfig; } + public Boolean getOnlyLogRecordMetadata() { + return this.onlyLogRecordMetadata; + } + + public void setOnlyLogRecordMetadata(Boolean onlyLogRecordMetadata) { + this.onlyLogRecordMetadata = onlyLogRecordMetadata; + } + public boolean isMissingTopicsFatal() { return this.missingTopicsFatal; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index a5e592bf7b9..c00a815286c 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -392,6 +392,7 @@ class KafkaAutoConfigurationTests { "spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch", "spring.kafka.listener.idle-between-polls=1s", "spring.kafka.listener.idle-event-interval=1s", "spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.log-container-config=true", + "spring.kafka.listener.only-log-record-metadata=true", "spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true", "spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo", "spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true") @@ -418,6 +419,7 @@ class KafkaAutoConfigurationTests { assertThat(containerProperties.getIdleEventInterval()).isEqualTo(1000L); assertThat(containerProperties.getMonitorInterval()).isEqualTo(45); assertThat(containerProperties.isLogContainerConfig()).isTrue(); + assertThat(containerProperties.isOnlyLogRecordMetadata()).isTrue(); assertThat(containerProperties.isMissingTopicsFatal()).isTrue(); assertThat(kafkaListenerContainerFactory).extracting("concurrency").isEqualTo(3); assertThat(kafkaListenerContainerFactory.isBatchListener()).isTrue(); From ec683605d4a3a8f99712255ef8ff30f5f5418c0a Mon Sep 17 00:00:00 2001 From: Stephane Nicoll Date: Tue, 22 Dec 2020 09:53:53 +0100 Subject: [PATCH 2/2] Polish "Allow to configure Kafka Listener's onlyLogRecordMetadata" See gh-24582 --- .../ConcurrentKafkaListenerContainerFactoryConfigurer.java | 2 +- .../boot/autoconfigure/kafka/KafkaProperties.java | 6 +++--- .../boot/autoconfigure/kafka/KafkaPropertiesTests.java | 1 + 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index 09a45bdb8fa..47b12837bd1 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -189,7 +189,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { map.from(properties::getMonitorInterval).as(Duration::getSeconds).as(Number::intValue) .to(container::setMonitorInterval); map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig); - map.from(properties::getOnlyLogRecordMetadata).to(container::setOnlyLogRecordMetadata); + map.from(properties::isOnlyLogRecordMetadata).to(container::setOnlyLogRecordMetadata); map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal); map.from(this.transactionManager).to(container::setTransactionManager); map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 83fb5aa5a17..e0cd916be8c 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -917,7 +917,7 @@ public class KafkaProperties { * Whether to suppress the entire record from being written to the log when * retries are being attempted. */ - private Boolean onlyLogRecordMetadata; + private boolean onlyLogRecordMetadata; /** * Whether the container should fail to start if at least one of the configured @@ -1021,11 +1021,11 @@ public class KafkaProperties { this.logContainerConfig = logContainerConfig; } - public Boolean getOnlyLogRecordMetadata() { + public boolean isOnlyLogRecordMetadata() { return this.onlyLogRecordMetadata; } - public void setOnlyLogRecordMetadata(Boolean onlyLogRecordMetadata) { + public void setOnlyLogRecordMetadata(boolean onlyLogRecordMetadata) { this.onlyLogRecordMetadata = onlyLogRecordMetadata; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java index 87361f78599..31ce0f4003e 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java @@ -47,6 +47,7 @@ class KafkaPropertiesTests { void listenerDefaultValuesAreConsistent() { ContainerProperties container = new ContainerProperties("test"); Listener listenerProperties = new KafkaProperties().getListener(); + assertThat(listenerProperties.isOnlyLogRecordMetadata()).isEqualTo(container.isOnlyLogRecordMetadata()); assertThat(listenerProperties.isMissingTopicsFatal()).isEqualTo(container.isMissingTopicsFatal()); }