Add Pulsar container factory customizer infrastructure

This commit adds the ability for users to customize the auto-configured
Spring for Apache Pulsar message container factories. Each container
factory holds a set of container properties that is a common target for
users to configure. Allowing the customization of these properties
prevents a rapid increase of configuration properties.

See gh-42182
This commit is contained in:
Chris Bono 2024-09-08 15:22:37 -05:00 committed by Stéphane Nicoll
parent 920e3cc4c2
commit 5cbe0e84f9
10 changed files with 394 additions and 12 deletions

View File

@ -178,7 +178,7 @@ public class PulsarAutoConfiguration {
ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
PulsarConsumerFactory<Object> pulsarConsumerFactory, SchemaResolver schemaResolver,
TopicResolver topicResolver, ObjectProvider<PulsarAwareTransactionManager> pulsarTransactionManager,
Environment environment) {
PulsarContainerFactoryCustomizers containerFactoryCustomizers, Environment environment) {
PulsarContainerProperties containerProperties = new PulsarContainerProperties();
containerProperties.setSchemaResolver(schemaResolver);
containerProperties.setTopicResolver(topicResolver);
@ -187,7 +187,10 @@ public class PulsarAutoConfiguration {
}
pulsarTransactionManager.ifUnique(containerProperties.transactions()::setTransactionManager);
this.propertiesMapper.customizeContainerProperties(containerProperties);
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
ConcurrentPulsarListenerContainerFactory<?> containerFactory = new ConcurrentPulsarListenerContainerFactory<>(
pulsarConsumerFactory, containerProperties);
containerFactoryCustomizers.customize(containerFactory);
return containerFactory;
}
@Bean
@ -215,14 +218,18 @@ public class PulsarAutoConfiguration {
@Bean
@ConditionalOnMissingBean(name = "pulsarReaderContainerFactory")
DefaultPulsarReaderContainerFactory<?> pulsarReaderContainerFactory(PulsarReaderFactory<?> pulsarReaderFactory,
SchemaResolver schemaResolver, Environment environment) {
SchemaResolver schemaResolver, PulsarContainerFactoryCustomizers containerFactoryCustomizers,
Environment environment) {
PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties();
readerContainerProperties.setSchemaResolver(schemaResolver);
if (Threading.VIRTUAL.isActive(environment)) {
readerContainerProperties.setReaderTaskExecutor(new VirtualThreadTaskExecutor("pulsar-reader-"));
}
this.propertiesMapper.customizeReaderContainerProperties(readerContainerProperties);
return new DefaultPulsarReaderContainerFactory<>(pulsarReaderFactory, readerContainerProperties);
DefaultPulsarReaderContainerFactory<?> containerFactory = new DefaultPulsarReaderContainerFactory<>(
pulsarReaderFactory, readerContainerProperties);
containerFactoryCustomizers.customize(containerFactory);
return containerFactory;
}
@Configuration(proxyBeanMethods = false)

View File

@ -188,4 +188,11 @@ class PulsarConfiguration {
this.properties.getDefaults().getTopic().getNamespace());
}
@Bean
@ConditionalOnMissingBean
PulsarContainerFactoryCustomizers pulsarContainerFactoryCustomizers(
ObjectProvider<PulsarContainerFactoryCustomizer<?>> customizers) {
return new PulsarContainerFactoryCustomizers(customizers.orderedStream().toList());
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.pulsar;
import org.springframework.pulsar.config.PulsarContainerFactory;
/**
* Callback interface that can be implemented by beans wishing to customize a
* {@link PulsarContainerFactory} before it is fully initialized, in particular to tune
* its configuration.
*
* @param <T> the type of the {@link PulsarContainerFactory}
* @author Chris Bono
* @since 3.4.0
*/
@FunctionalInterface
public interface PulsarContainerFactoryCustomizer<T extends PulsarContainerFactory<?, ?>> {
/**
* Customize the container factory.
* @param containerFactory the {@code PulsarContainerFactory} to customize
*/
void customize(T containerFactory);
}

View File

@ -0,0 +1,58 @@
/*
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.pulsar;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.springframework.boot.util.LambdaSafe;
import org.springframework.pulsar.config.PulsarContainerFactory;
import org.springframework.pulsar.core.PulsarConsumerFactory;
/**
* Invokes the available {@link PulsarContainerFactoryCustomizer} instances in the context
* for a given {@link PulsarConsumerFactory}.
*
* @author Chris Bono
* @since 3.4.0
*/
public class PulsarContainerFactoryCustomizers {
private final List<PulsarContainerFactoryCustomizer<?>> customizers;
public PulsarContainerFactoryCustomizers(List<? extends PulsarContainerFactoryCustomizer<?>> customizers) {
this.customizers = (customizers != null) ? new ArrayList<>(customizers) : Collections.emptyList();
}
/**
* Customize the specified {@link PulsarContainerFactory}. Locates all
* {@link PulsarContainerFactoryCustomizer} beans able to handle the specified
* instance and invoke {@link PulsarContainerFactoryCustomizer#customize} on them.
* @param <T> the type of container factory
* @param containerFactory the container factory to customize
* @return the customized container factory
*/
@SuppressWarnings("unchecked")
public <T extends PulsarContainerFactory<?, ?>> T customize(T containerFactory) {
LambdaSafe.callbacks(PulsarContainerFactoryCustomizer.class, this.customizers, containerFactory)
.withLogger(PulsarContainerFactoryCustomizers.class)
.invoke((customizer) -> customizer.customize(containerFactory));
return containerFactory;
}
}

View File

@ -164,12 +164,15 @@ public class PulsarReactiveAutoConfiguration {
@ConditionalOnMissingBean(name = "reactivePulsarListenerContainerFactory")
DefaultReactivePulsarListenerContainerFactory<?> reactivePulsarListenerContainerFactory(
ReactivePulsarConsumerFactory<Object> reactivePulsarConsumerFactory, SchemaResolver schemaResolver,
TopicResolver topicResolver) {
TopicResolver topicResolver, PulsarContainerFactoryCustomizers containerFactoryCustomizers) {
ReactivePulsarContainerProperties<Object> containerProperties = new ReactivePulsarContainerProperties<>();
containerProperties.setSchemaResolver(schemaResolver);
containerProperties.setTopicResolver(topicResolver);
this.propertiesMapper.customizeContainerProperties(containerProperties);
return new DefaultReactivePulsarListenerContainerFactory<>(reactivePulsarConsumerFactory, containerProperties);
DefaultReactivePulsarListenerContainerFactory<?> containerFactory = new DefaultReactivePulsarListenerContainerFactory<>(
reactivePulsarConsumerFactory, containerProperties);
containerFactoryCustomizers.customize(containerFactory);
return containerFactory;
}
@Bean

View File

@ -18,6 +18,7 @@ package org.springframework.boot.autoconfigure.pulsar;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@ -72,6 +73,7 @@ import org.springframework.pulsar.core.ReaderBuilderCustomizer;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.pulsar.listener.PulsarContainerProperties.TransactionSettings;
import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory;
import org.springframework.pulsar.transaction.PulsarAwareTransactionManager;
import org.springframework.test.util.ReflectionTestUtils;
@ -585,6 +587,44 @@ class PulsarAutoConfigurationTests {
});
}
@Test
void whenHasUserDefinedCustomizersAppliesInCorrectOrder() {
this.contextRunner.withUserConfiguration(ListenerContainerFactoryCustomizersConfig.class)
.run((context) -> assertThat(context).getBean(ConcurrentPulsarListenerContainerFactory.class)
.hasFieldOrPropertyWithValue("containerProperties.subscriptionName", ":bar:foo"));
}
@TestConfiguration(proxyBeanMethods = false)
static class ListenerContainerFactoryCustomizersConfig {
@Bean
@Order(50)
PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>> customizerIgnored() {
return (__) -> {
throw new RuntimeException("should-not-have-matched");
};
}
@Bean
@Order(200)
PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> customizerFoo() {
return (containerFactory) -> appendToSubscriptionName(containerFactory, ":foo");
}
@Bean
@Order(100)
PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> customizerBar() {
return (containerFactory) -> appendToSubscriptionName(containerFactory, ":bar");
}
private void appendToSubscriptionName(ConcurrentPulsarListenerContainerFactory<?> containerFactory,
String valueToAppend) {
String name = Objects.toString(containerFactory.getContainerProperties().getSubscriptionName(), "");
containerFactory.getContainerProperties().setSubscriptionName(name.concat(valueToAppend));
}
}
}
@Nested
@ -617,7 +657,7 @@ class PulsarAutoConfigurationTests {
}
@Test
<T> void whenHasUserDefinedCustomizersAppliesInCorrectOrder() {
<T> void whenHasUserDefinedReaderBuilderCustomizersAppliesInCorrectOrder() {
this.contextRunner.withPropertyValues("spring.pulsar.reader.name=fromPropsCustomizer")
.withUserConfiguration(ReaderBuilderCustomizersConfig.class)
.run((context) -> {
@ -654,6 +694,13 @@ class PulsarAutoConfigurationTests {
});
}
@Test
void whenHasUserDefinedFactoryCustomizersAppliesInCorrectOrder() {
this.contextRunner.withUserConfiguration(ReaderContainerFactoryCustomizersConfig.class)
.run((context) -> assertThat(context).getBean(DefaultPulsarReaderContainerFactory.class)
.hasFieldOrPropertyWithValue("containerProperties.readerListener", ":bar:foo"));
}
@TestConfiguration(proxyBeanMethods = false)
static class ReaderBuilderCustomizersConfig {
@ -671,6 +718,37 @@ class PulsarAutoConfigurationTests {
}
@TestConfiguration(proxyBeanMethods = false)
static class ReaderContainerFactoryCustomizersConfig {
@Bean
@Order(50)
PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>> customizerIgnored() {
return (__) -> {
throw new RuntimeException("should-not-have-matched");
};
}
@Bean
@Order(200)
PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>> customizerFoo() {
return (containerFactory) -> appendToReaderListener(containerFactory, ":foo");
}
@Bean
@Order(100)
PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>> customizerBar() {
return (containerFactory) -> appendToReaderListener(containerFactory, ":bar");
}
private void appendToReaderListener(DefaultPulsarReaderContainerFactory<?> containerFactory,
String valueToAppend) {
String name = Objects.toString(containerFactory.getContainerProperties().getReaderListener(), "");
containerFactory.getContainerProperties().setReaderListener(name.concat(valueToAppend));
}
}
}
@Nested

View File

@ -86,6 +86,15 @@ class PulsarConfigurationTests {
.isSameAs(customConnectionDetails));
}
@Test
void whenHasUserDefinedContainerFactoryCustomizersBeanDoesNotAutoConfigureBean() {
PulsarContainerFactoryCustomizers customizers = mock(PulsarContainerFactoryCustomizers.class);
this.contextRunner
.withBean("customContainerFactoryCustomizers", PulsarContainerFactoryCustomizers.class, () -> customizers)
.run((context) -> assertThat(context).getBean(PulsarContainerFactoryCustomizers.class)
.isSameAs(customizers));
}
@Nested
class ClientTests {

View File

@ -0,0 +1,140 @@
/*
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.pulsar;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.mockito.BDDMockito;
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
import org.springframework.pulsar.config.DefaultPulsarReaderContainerFactory;
import org.springframework.pulsar.config.ListenerContainerFactory;
import org.springframework.pulsar.config.PulsarContainerFactory;
import org.springframework.pulsar.config.PulsarListenerContainerFactory;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
/**
* Unit tests for {@link PulsarContainerFactoryCustomizers}.
*
* @author Chris Bono
*/
class PulsarContainerFactoryCustomizersTests {
@Test
void customizeWithNullCustomizersShouldDoNothing() {
PulsarContainerFactory<?, ?> containerFactory = mock(PulsarContainerFactory.class);
new PulsarContainerFactoryCustomizers(null).customize(containerFactory);
BDDMockito.verifyNoInteractions(containerFactory);
}
@SuppressWarnings("unchecked")
@Test
void customizeSimplePulsarContainerFactory() {
PulsarContainerFactoryCustomizers customizers = new PulsarContainerFactoryCustomizers(
Collections.singletonList(new SimplePulsarContainerFactoryCustomizer()));
PulsarContainerProperties containerProperties = new PulsarContainerProperties();
ConcurrentPulsarListenerContainerFactory<String> pulsarContainerFactory = new ConcurrentPulsarListenerContainerFactory<>(
mock(PulsarConsumerFactory.class), containerProperties);
customizers.customize(pulsarContainerFactory);
assertThat(pulsarContainerFactory.getContainerProperties().getSubscriptionName()).isEqualTo("my-subscription");
}
@Test
void customizeShouldCheckGeneric() {
List<TestCustomizer<?>> list = new ArrayList<>();
list.add(new TestCustomizer<>());
list.add(new TestPulsarListenersContainerFactoryCustomizer());
list.add(new TestConcurrentPulsarListenerContainerFactoryCustomizer());
PulsarContainerFactoryCustomizers customizers = new PulsarContainerFactoryCustomizers(list);
customizers.customize(mock(PulsarContainerFactory.class));
assertThat(list.get(0).getCount()).isOne();
assertThat(list.get(1).getCount()).isZero();
assertThat(list.get(2).getCount()).isZero();
customizers.customize(mock(ConcurrentPulsarListenerContainerFactory.class));
assertThat(list.get(0).getCount()).isEqualTo(2);
assertThat(list.get(1).getCount()).isOne();
assertThat(list.get(2).getCount()).isOne();
customizers.customize(mock(DefaultReactivePulsarListenerContainerFactory.class));
assertThat(list.get(0).getCount()).isEqualTo(3);
assertThat(list.get(1).getCount()).isEqualTo(2);
assertThat(list.get(2).getCount()).isOne();
customizers.customize(mock(DefaultPulsarReaderContainerFactory.class));
assertThat(list.get(0).getCount()).isEqualTo(4);
assertThat(list.get(1).getCount()).isEqualTo(2);
assertThat(list.get(2).getCount()).isOne();
}
static class SimplePulsarContainerFactoryCustomizer
implements PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> {
@Override
public void customize(ConcurrentPulsarListenerContainerFactory<?> containerFactory) {
containerFactory.getContainerProperties().setSubscriptionName("my-subscription");
}
}
/**
* Test customizer that will match all {@link PulsarListenerContainerFactory}.
*/
static class TestCustomizer<T extends PulsarContainerFactory<?, ?>> implements PulsarContainerFactoryCustomizer<T> {
private int count;
@Override
public void customize(T pulsarContainerFactory) {
this.count++;
}
int getCount() {
return this.count;
}
}
/**
* Test customizer that will match both
* {@link ConcurrentPulsarListenerContainerFactory} and
* {@link DefaultReactivePulsarListenerContainerFactory} as they both extend
* {@link ListenerContainerFactory}.
*/
static class TestPulsarListenersContainerFactoryCustomizer extends TestCustomizer<ListenerContainerFactory<?, ?>> {
}
/**
* Test customizer that will match only
* {@link ConcurrentPulsarListenerContainerFactory}.
*/
static class TestConcurrentPulsarListenerContainerFactoryCustomizer
extends TestCustomizer<ConcurrentPulsarListenerContainerFactory<?>> {
}
}

View File

@ -19,6 +19,7 @@ package org.springframework.boot.autoconfigure.pulsar;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import com.github.benmanes.caffeine.cache.Caffeine;
@ -45,6 +46,7 @@ import org.springframework.boot.test.context.assertj.AssertableApplicationContex
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.core.annotation.Order;
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
import org.springframework.pulsar.core.DefaultSchemaResolver;
import org.springframework.pulsar.core.DefaultTopicResolver;
import org.springframework.pulsar.core.PulsarAdministration;
@ -382,6 +384,44 @@ class PulsarReactiveAutoConfigurationTests {
});
}
@Test
void whenHasUserDefinedFactoryCustomizersAppliesInCorrectOrder() {
this.contextRunner.withUserConfiguration(ListenerContainerFactoryCustomizersConfig.class)
.run((context) -> assertThat(context).getBean(DefaultReactivePulsarListenerContainerFactory.class)
.hasFieldOrPropertyWithValue("containerProperties.subscriptionName", ":bar:foo"));
}
@TestConfiguration(proxyBeanMethods = false)
static class ListenerContainerFactoryCustomizersConfig {
@Bean
@Order(50)
PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> customizerIgnored() {
return (__) -> {
throw new RuntimeException("should-not-have-matched");
};
}
@Bean
@Order(200)
PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>> customizerFoo() {
return (containerFactory) -> appendToSubscriptionName(containerFactory, ":foo");
}
@Bean
@Order(100)
PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>> customizerBar() {
return (containerFactory) -> appendToSubscriptionName(containerFactory, ":bar");
}
private void appendToSubscriptionName(DefaultReactivePulsarListenerContainerFactory<?> containerFactory,
String valueToAppend) {
String name = Objects.toString(containerFactory.getContainerProperties().getSubscriptionName(), "");
containerFactory.getContainerProperties().setSubscriptionName(name.concat(valueToAppend));
}
}
}
@Nested

View File

@ -150,11 +150,11 @@ include-code::MyBean[]
Spring Boot auto-configuration provides all the components necessary for `PulsarListener`, such as the `PulsarListenerContainerFactory` and the consumer factory it uses to construct the underlying Pulsar consumers.
You can configure these components by specifying any of the `spring.pulsar.listener.\*` and `spring.pulsar.consumer.*` prefixed application properties.
If you need more control over the consumer factory configuration, consider registering one or more `ConsumerBuilderCustomizer` beans.
If you need more control over the configuration of the consumer factory used by the container factory to create consumers, consider registering one or more `ConsumerBuilderCustomizer` beans.
These customizers are applied to all consumers created by the factory, and therefore all `@PulsarListener` instances.
You can also customize a single listener by setting the `consumerCustomizer` attribute of the `@PulsarListener` annotation.
If you need more control over the actual container factory configuration, consider registering one or more `PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactoryCustomizer<?>>` beans.
[[messaging.pulsar.receiving-reactive]]
== Receiving a Message Reactively
@ -167,11 +167,11 @@ include-code::MyBean[]
Spring Boot auto-configuration provides all the components necessary for `ReactivePulsarListener`, such as the `ReactivePulsarListenerContainerFactory` and the consumer factory it uses to construct the underlying reactive Pulsar consumers.
You can configure these components by specifying any of the `spring.pulsar.listener.\*` and `spring.pulsar.consumer.*` prefixed application properties.
If you need more control over the consumer factory configuration, consider registering one or more `ReactiveMessageConsumerBuilderCustomizer` beans.
If you need more control over the configuration of the consumer factory used by the container factory to create consumers, consider registering one or more `ReactiveMessageConsumerBuilderCustomizer` beans.
These customizers are applied to all consumers created by the factory, and therefore all `@ReactivePulsarListener` instances.
You can also customize a single listener by setting the `consumerCustomizer` attribute of the `@ReactivePulsarListener` annotation.
If you need more control over the actual container factory configuration, consider registering one or more `PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>>` beans.
[[messaging.pulsar.reading]]
== Reading a Message
@ -187,10 +187,11 @@ include-code::MyBean[]
The `@PulsarReader` relies on a `PulsarReaderFactory` to create the underlying Pulsar reader.
Spring Boot auto-configuration provides this reader factory which can be customized by setting any of the `spring.pulsar.reader.*` prefixed application properties.
If you need more control over the reader factory configuration, consider registering one or more `ReaderBuilderCustomizer` beans.
If you need more control over the configuration of the reader factory used by the container factory to create readers, consider registering one or more `ReaderBuilderCustomizer` beans.
These customizers are applied to all readers created by the factory, and therefore all `@PulsarReader` instances.
You can also customize a single listener by setting the `readerCustomizer` attribute of the `@PulsarReader` annotation.
If you need more control over the actual container factory configuration, consider registering one or more `PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>>` beans.
[[messaging.pulsar.reading-reactive]]