From b960fa32372ca1a5f7ffb375bed73648e6dbf411 Mon Sep 17 00:00:00 2001 From: Stephane Nicoll Date: Wed, 14 Oct 2020 14:26:50 +0200 Subject: [PATCH] Polish "Add configuration options for Kafka Stream's CleanupConfig" See gh-23636 --- .../autoconfigure/kafka/KafkaProperties.java | 76 +++++++++---------- ...aStreamsAnnotationDrivenConfiguration.java | 7 +- .../kafka/KafkaAutoConfigurationTests.java | 22 ++++++ .../kafka/KafkaPropertiesTests.java | 10 +++ 4 files changed, 71 insertions(+), 44 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 60f9dfd73b8..a8f092cd7bd 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -37,7 +37,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.boot.convert.DurationUnit; import org.springframework.core.io.Resource; -import org.springframework.kafka.core.CleanupConfig; import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.util.CollectionUtils; @@ -686,6 +685,8 @@ public class KafkaProperties { private final Security security = new Security(); + private final Cleanup cleanup = new Cleanup(); + /** * Kafka streams application.id property; default spring.application.name. */ @@ -723,11 +724,6 @@ public class KafkaProperties { */ private String stateDir; - /** - * Cleanup configuration for the state stores. - */ - private Cleanup cleanup; - /** * Additional Kafka properties used to configure the streams. */ @@ -741,6 +737,10 @@ public class KafkaProperties { return this.security; } + public Cleanup getCleanup() { + return this.cleanup; + } + public String getApplicationId() { return this.applicationId; } @@ -797,14 +797,6 @@ public class KafkaProperties { this.stateDir = stateDir; } - public Cleanup getCleanup() { - return cleanup; - } - - public void setCleanup(Cleanup cleanup) { - this.cleanup = cleanup; - } - public Map getProperties() { return this.properties; } @@ -1248,6 +1240,36 @@ public class KafkaProperties { } + public static class Cleanup { + + /** + * Cleanup the application’s local state directory on startup. + */ + private boolean onStartup = false; + + /** + * Cleanup the application’s local state directory on shutdown. + */ + private boolean onShutdown = true; + + public boolean isOnStartup() { + return this.onStartup; + } + + public void setOnStartup(boolean onStartup) { + this.onStartup = onStartup; + } + + public boolean isOnShutdown() { + return this.onShutdown; + } + + public void setOnShutdown(boolean onShutdown) { + this.onShutdown = onShutdown; + } + + } + public enum IsolationLevel { /** @@ -1273,32 +1295,6 @@ public class KafkaProperties { } - public static class Cleanup { - - /** - * Cleanup the application's state on start. - */ - private boolean onStart = false; - - /** - * Cleanup the application's state on stop. - */ - private boolean onStop = true; - - public CleanupConfig buildCleanupConfig() { - return new CleanupConfig(this.onStart, this.onStop); - } - - public boolean isOnStart() { - return onStart; - } - - public boolean isOnStop() { - return onStop; - } - - } - @SuppressWarnings("serial") private static class Properties extends HashMap { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java index ef36f273534..feec421f840 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java @@ -34,6 +34,7 @@ import org.springframework.core.env.Environment; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.StreamsBuilderFactoryBean; +import org.springframework.kafka.core.CleanupConfig; /** * Configuration for Kafka Streams annotation-driven support. @@ -91,11 +92,9 @@ class KafkaStreamsAnnotationDrivenConfiguration { @Override public void afterPropertiesSet() { this.factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup()); - KafkaProperties.Cleanup cleanup = this.properties.getStreams().getCleanup(); - if (cleanup != null) { - this.factoryBean.setCleanupConfig(cleanup.buildCleanupConfig()); - } + CleanupConfig cleanupConfig = new CleanupConfig(cleanup.isOnStartup(), cleanup.isOnShutdown()); + this.factoryBean.setCleanupConfig(cleanupConfig); } } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 3e3e93e0528..a5e592bf7b9 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.AutoConfigurations; @@ -50,6 +51,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.StreamsBuilderFactoryBean; +import org.springframework.kafka.core.CleanupConfig; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; @@ -340,6 +342,26 @@ class KafkaAutoConfigurationTests { }); } + @Test + void streamsWithCleanupConfig() { + this.contextRunner + .withUserConfiguration(EnableKafkaStreamsConfiguration.class, TestKafkaStreamsConfiguration.class) + .withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", + "spring.kafka.streams.auto-startup=false", "spring.kafka.streams.cleanup.on-startup=true", + "spring.kafka.streams.cleanup.on-shutdown=false") + .run((context) -> { + StreamsBuilderFactoryBean streamsBuilderFactoryBean = context + .getBean(StreamsBuilderFactoryBean.class); + assertThat(streamsBuilderFactoryBean) + .extracting("cleanupConfig", InstanceOfAssertFactories.type(CleanupConfig.class)) + .satisfies((cleanupConfig) -> { + assertThat(cleanupConfig.cleanupOnStart()).isTrue(); + assertThat(cleanupConfig.cleanupOnStop()).isFalse(); + }); + }); + } + @Test void streamsApplicationIdIsMandatory() { this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class).run((context) -> { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java index faa4d470202..87361f78599 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java @@ -18,8 +18,10 @@ package org.springframework.boot.autoconfigure.kafka; import org.junit.jupiter.api.Test; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Cleanup; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.IsolationLevel; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener; +import org.springframework.kafka.core.CleanupConfig; import org.springframework.kafka.listener.ContainerProperties; import static org.assertj.core.api.Assertions.assertThat; @@ -48,4 +50,12 @@ class KafkaPropertiesTests { assertThat(listenerProperties.isMissingTopicsFatal()).isEqualTo(container.isMissingTopicsFatal()); } + @Test + void cleanupConfigDefaultValuesAreConsistent() { + CleanupConfig cleanupConfig = new CleanupConfig(); + Cleanup cleanup = new KafkaProperties().getStreams().getCleanup(); + assertThat(cleanup.isOnStartup()).isEqualTo(cleanupConfig.cleanupOnStart()); + assertThat(cleanup.isOnShutdown()).isEqualTo(cleanupConfig.cleanupOnStop()); + } + }