diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java index 7d466d57196..5b5f9dc4123 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java @@ -89,12 +89,13 @@ public class PulsarAutoConfiguration { @ConditionalOnMissingBean(PulsarProducerFactory.class) @ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "false") DefaultPulsarProducerFactory pulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver, - ObjectProvider> customizersProvider, PulsarTopicBuilder topicBuilder) { + ObjectProvider> customizersProvider, + ObjectProvider topicBuilderProvider) { List> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers( customizersProvider); DefaultPulsarProducerFactory producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(), lambdaSafeCustomizers, topicResolver); - producerFactory.setTopicBuilder(topicBuilder); + topicBuilderProvider.ifAvailable(producerFactory::setTopicBuilder); return producerFactory; } @@ -102,7 +103,8 @@ public class PulsarAutoConfiguration { @ConditionalOnMissingBean(PulsarProducerFactory.class) @ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "true", matchIfMissing = true) CachingPulsarProducerFactory cachingPulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver, - ObjectProvider> customizersProvider, PulsarTopicBuilder topicBuilder) { + ObjectProvider> customizersProvider, + ObjectProvider topicBuilderProvider) { PulsarProperties.Producer.Cache cacheProperties = this.properties.getProducer().getCache(); List> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers( customizersProvider); @@ -110,7 +112,7 @@ public class PulsarAutoConfiguration { this.properties.getProducer().getTopicName(), lambdaSafeCustomizers, topicResolver, cacheProperties.getExpireAfterAccess(), cacheProperties.getMaximumSize(), cacheProperties.getInitialCapacity()); - producerFactory.setTopicBuilder(topicBuilder); + topicBuilderProvider.ifAvailable(producerFactory::setTopicBuilder); return producerFactory; } @@ -144,7 +146,8 @@ public class PulsarAutoConfiguration { @Bean @ConditionalOnMissingBean(PulsarConsumerFactory.class) DefaultPulsarConsumerFactory pulsarConsumerFactory(PulsarClient pulsarClient, - ObjectProvider> customizersProvider, PulsarTopicBuilder topicBuilder) { + ObjectProvider> customizersProvider, + ObjectProvider topicBuilderProvider) { List> customizers = new ArrayList<>(); customizers.add(this.propertiesMapper::customizeConsumerBuilder); customizers.addAll(customizersProvider.orderedStream().toList()); @@ -152,7 +155,7 @@ public class PulsarAutoConfiguration { .of((builder) -> applyConsumerBuilderCustomizers(customizers, builder)); DefaultPulsarConsumerFactory consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, lambdaSafeCustomizers); - consumerFactory.setTopicBuilder(topicBuilder); + topicBuilderProvider.ifAvailable(consumerFactory::setTopicBuilder); return consumerFactory; } @@ -190,7 +193,8 @@ public class PulsarAutoConfiguration { @Bean @ConditionalOnMissingBean(PulsarReaderFactory.class) DefaultPulsarReaderFactory pulsarReaderFactory(PulsarClient pulsarClient, - ObjectProvider> customizersProvider, PulsarTopicBuilder topicBuilder) { + ObjectProvider> customizersProvider, + ObjectProvider topicBuilderProvider) { List> customizers = new ArrayList<>(); customizers.add(this.propertiesMapper::customizeReaderBuilder); customizers.addAll(customizersProvider.orderedStream().toList()); @@ -198,7 +202,7 @@ public class PulsarAutoConfiguration { .of((builder) -> applyReaderBuilderCustomizers(customizers, builder)); DefaultPulsarReaderFactory readerFactory = new DefaultPulsarReaderFactory<>(pulsarClient, lambdaSafeCustomizers); - readerFactory.setTopicBuilder(topicBuilder); + topicBuilderProvider.ifAvailable(readerFactory::setTopicBuilder); return readerFactory; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java index e4f6897f0b7..45aefa0f5d1 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java @@ -257,15 +257,7 @@ public class PulsarProperties { */ private List typeMappings = new ArrayList<>(); - private Topic topic = new Topic(); - - public Topic getTopic() { - return this.topic; - } - - public void setTopic(Topic topic) { - this.topic = topic; - } + private final Topic topic = new Topic(); public List getTypeMappings() { return this.typeMappings; @@ -275,6 +267,10 @@ public class PulsarProperties { this.typeMappings = typeMappings; } + public Topic getTopic() { + return this.topic; + } + /** * A mapping from message type to topic and/or schema info to use (at least one of * {@code topicName} or {@code schemaInfo} must be specified. diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java index 6b983e93298..5ca96e70579 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2023 the original author or authors. + * Copyright 2012-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -49,6 +49,7 @@ import org.springframework.pulsar.reactive.config.annotation.EnableReactivePulsa import org.springframework.pulsar.reactive.core.DefaultReactivePulsarConsumerFactory; import org.springframework.pulsar.reactive.core.DefaultReactivePulsarReaderFactory; import org.springframework.pulsar.reactive.core.DefaultReactivePulsarSenderFactory; +import org.springframework.pulsar.reactive.core.DefaultReactivePulsarSenderFactory.Builder; import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer; import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer; import org.springframework.pulsar.reactive.core.ReactiveMessageSenderBuilderCustomizer; @@ -114,18 +115,18 @@ public class PulsarReactiveAutoConfiguration { DefaultReactivePulsarSenderFactory reactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient, ObjectProvider reactiveMessageSenderCache, TopicResolver topicResolver, ObjectProvider> customizersProvider, - PulsarTopicBuilder topicBuilder) { + ObjectProvider topicBuilderProvider) { List> customizers = new ArrayList<>(); customizers.add(this.propertiesMapper::customizeMessageSenderBuilder); customizers.addAll(customizersProvider.orderedStream().toList()); List> lambdaSafeCustomizers = List .of((builder) -> applyMessageSenderBuilderCustomizers(customizers, builder)); - return DefaultReactivePulsarSenderFactory.builderFor(reactivePulsarClient) + Builder senderFactoryBuilder = DefaultReactivePulsarSenderFactory.builderFor(reactivePulsarClient) .withDefaultConfigCustomizers(lambdaSafeCustomizers) .withMessageSenderCache(reactiveMessageSenderCache.getIfAvailable()) - .withTopicResolver(topicResolver) - .withTopicBuilder(topicBuilder) - .build(); + .withTopicResolver(topicResolver); + topicBuilderProvider.ifAvailable(senderFactoryBuilder::withTopicBuilder); + return senderFactoryBuilder.build(); } @SuppressWarnings("unchecked") @@ -140,7 +141,7 @@ public class PulsarReactiveAutoConfiguration { DefaultReactivePulsarConsumerFactory reactivePulsarConsumerFactory( ReactivePulsarClient pulsarReactivePulsarClient, ObjectProvider> customizersProvider, - PulsarTopicBuilder topicBuilder) { + ObjectProvider topicBuilderProvider) { List> customizers = new ArrayList<>(); customizers.add(this.propertiesMapper::customizeMessageConsumerBuilder); customizers.addAll(customizersProvider.orderedStream().toList()); @@ -148,7 +149,7 @@ public class PulsarReactiveAutoConfiguration { .of((builder) -> applyMessageConsumerBuilderCustomizers(customizers, builder)); DefaultReactivePulsarConsumerFactory consumerFactory = new DefaultReactivePulsarConsumerFactory<>( pulsarReactivePulsarClient, lambdaSafeCustomizers); - consumerFactory.setTopicBuilder(topicBuilder); + topicBuilderProvider.ifAvailable(consumerFactory::setTopicBuilder); return consumerFactory; } @@ -175,7 +176,7 @@ public class PulsarReactiveAutoConfiguration { @ConditionalOnMissingBean(ReactivePulsarReaderFactory.class) DefaultReactivePulsarReaderFactory reactivePulsarReaderFactory(ReactivePulsarClient reactivePulsarClient, ObjectProvider> customizersProvider, - PulsarTopicBuilder topicBuilder) { + ObjectProvider topicBuilderProvider) { List> customizers = new ArrayList<>(); customizers.add(this.propertiesMapper::customizeMessageReaderBuilder); customizers.addAll(customizersProvider.orderedStream().toList()); @@ -183,7 +184,7 @@ public class PulsarReactiveAutoConfiguration { .of((builder) -> applyMessageReaderBuilderCustomizers(customizers, builder)); DefaultReactivePulsarReaderFactory readerFactory = new DefaultReactivePulsarReaderFactory<>( reactivePulsarClient, lambdaSafeCustomizers); - readerFactory.setTopicBuilder(topicBuilder); + topicBuilderProvider.ifAvailable(readerFactory::setTopicBuilder); return readerFactory; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java index b61f10fdae1..d8d30e942e3 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java @@ -67,6 +67,7 @@ import org.springframework.pulsar.core.PulsarConsumerFactory; import org.springframework.pulsar.core.PulsarProducerFactory; import org.springframework.pulsar.core.PulsarReaderFactory; import org.springframework.pulsar.core.PulsarTemplate; +import org.springframework.pulsar.core.PulsarTopicBuilder; import org.springframework.pulsar.core.ReaderBuilderCustomizer; import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.TopicResolver; @@ -126,6 +127,7 @@ class PulsarAutoConfigurationTests { .hasSingleBean(PulsarConnectionDetails.class) .hasSingleBean(DefaultPulsarClientFactory.class) .hasSingleBean(PulsarClient.class) + .hasSingleBean(PulsarTopicBuilder.class) .hasSingleBean(PulsarAdministration.class) .hasSingleBean(DefaultSchemaResolver.class) .hasSingleBean(DefaultTopicResolver.class) @@ -141,6 +143,12 @@ class PulsarAutoConfigurationTests { .hasSingleBean(PulsarReaderEndpointRegistry.class)); } + @Test + void topicDefaultsCanBeDisabled() { + this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false") + .run((context) -> assertThat(context).doesNotHaveBean(PulsarTopicBuilder.class)); + } + @Nested class ProducerFactoryTests { @@ -221,7 +229,15 @@ class PulsarAutoConfigurationTests { .hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)) .hasFieldOrPropertyWithValue("topicResolver", context.getBean(TopicResolver.class)) .extracting("topicBuilder") - .isNotNull()); // prototype so only check not-null + .isNotNull()); + } + + @Test + void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() { + this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false") + .run((context) -> assertThat(context).getBean(DefaultPulsarProducerFactory.class) + .extracting("topicBuilder") + .isNull()); } @ParameterizedTest @@ -379,7 +395,16 @@ class PulsarAutoConfigurationTests { this.contextRunner.run((context) -> assertThat(context).getBean(DefaultPulsarConsumerFactory.class) .hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)) .extracting("topicBuilder") - .isNotNull()); // prototype so only check not-null + .isNotNull()); + } + + @Test + void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() { + this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false") + .run((context) -> assertThat(context).getBean(DefaultPulsarConsumerFactory.class) + .hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)) + .extracting("topicBuilder") + .isNull()); } @Test @@ -580,7 +605,15 @@ class PulsarAutoConfigurationTests { this.contextRunner.run((context) -> assertThat(context).getBean(DefaultPulsarReaderFactory.class) .hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)) .extracting("topicBuilder") - .isNotNull()); // prototype so only check not-null + .isNotNull()); + } + + @Test + void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() { + this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false") + .run((context) -> assertThat(context).getBean(DefaultPulsarReaderFactory.class) + .extracting("topicBuilder") + .isNull()); } @Test diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java index 86fc67c9a02..0ecb9e85e9f 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2023 the original author or authors. + * Copyright 2012-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -115,6 +115,7 @@ class PulsarReactiveAutoConfigurationTests { void autoConfiguresBeans() { this.contextRunner.run((context) -> assertThat(context).hasSingleBean(PulsarConfiguration.class) .hasSingleBean(PulsarClient.class) + .hasSingleBean(PulsarTopicBuilder.class) .hasSingleBean(PulsarAdministration.class) .hasSingleBean(DefaultSchemaResolver.class) .hasSingleBean(DefaultTopicResolver.class) @@ -129,6 +130,12 @@ class PulsarReactiveAutoConfigurationTests { .hasSingleBean(ReactivePulsarListenerEndpointRegistry.class)); } + @Test + void topicDefaultsCanBeDisabled() { + this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false") + .run((context) -> assertThat(context).doesNotHaveBean(PulsarTopicBuilder.class)); + } + @Test @SuppressWarnings("rawtypes") void injectsExpectedBeansIntoReactivePulsarClient() { @@ -178,14 +185,17 @@ class PulsarReactiveAutoConfigurationTests { assertThat(senderFactory) .extracting("topicResolver", InstanceOfAssertFactories.type(TopicResolver.class)) .isSameAs(context.getBean(TopicResolver.class)); - assertThat(senderFactory).extracting("topicBuilder").isNotNull(); // prototype - // so - // only - // check - // not-null + assertThat(senderFactory).extracting("topicBuilder").isNotNull(); }); } + @Test + void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() { + this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false") + .run((context) -> assertThat((DefaultReactivePulsarSenderFactory) context + .getBean(DefaultReactivePulsarSenderFactory.class)).extracting("topicBuilder").isNull()); + } + @Test void injectsExpectedBeansIntoReactiveMessageSenderCache() { ProducerCacheProvider provider = mock(ProducerCacheProvider.class); @@ -273,6 +283,15 @@ class PulsarReactiveAutoConfigurationTests { }); } + @Test + void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() { + this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false") + .run((context) -> assertThat( + (ReactivePulsarConsumerFactory) context.getBean(DefaultReactivePulsarConsumerFactory.class)) + .extracting("topicBuilder") + .isNull()); + } + @Test void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { this.contextRunner.withPropertyValues("spring.pulsar.consumer.name=fromPropsCustomizer") @@ -389,6 +408,13 @@ class PulsarReactiveAutoConfigurationTests { }); } + @Test + void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() { + this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false") + .run((context) -> assertThat((DefaultReactivePulsarReaderFactory) context + .getBean(DefaultReactivePulsarReaderFactory.class)).extracting("topicBuilder").isNull()); + } + @Test void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { this.contextRunner.withPropertyValues("spring.pulsar.reader.name=fromPropsCustomizer")