Polish "Add Pulsar container factory customizer infrastructure"
See gh-42182
This commit is contained in:
parent
5cbe0e84f9
commit
5b25a37a36
|
|
@ -178,7 +178,7 @@ public class PulsarAutoConfiguration {
|
||||||
ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
|
ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
|
||||||
PulsarConsumerFactory<Object> pulsarConsumerFactory, SchemaResolver schemaResolver,
|
PulsarConsumerFactory<Object> pulsarConsumerFactory, SchemaResolver schemaResolver,
|
||||||
TopicResolver topicResolver, ObjectProvider<PulsarAwareTransactionManager> pulsarTransactionManager,
|
TopicResolver topicResolver, ObjectProvider<PulsarAwareTransactionManager> pulsarTransactionManager,
|
||||||
PulsarContainerFactoryCustomizers containerFactoryCustomizers, Environment environment) {
|
Environment environment, PulsarContainerFactoryCustomizers containerFactoryCustomizers) {
|
||||||
PulsarContainerProperties containerProperties = new PulsarContainerProperties();
|
PulsarContainerProperties containerProperties = new PulsarContainerProperties();
|
||||||
containerProperties.setSchemaResolver(schemaResolver);
|
containerProperties.setSchemaResolver(schemaResolver);
|
||||||
containerProperties.setTopicResolver(topicResolver);
|
containerProperties.setTopicResolver(topicResolver);
|
||||||
|
|
@ -218,8 +218,8 @@ public class PulsarAutoConfiguration {
|
||||||
@Bean
|
@Bean
|
||||||
@ConditionalOnMissingBean(name = "pulsarReaderContainerFactory")
|
@ConditionalOnMissingBean(name = "pulsarReaderContainerFactory")
|
||||||
DefaultPulsarReaderContainerFactory<?> pulsarReaderContainerFactory(PulsarReaderFactory<?> pulsarReaderFactory,
|
DefaultPulsarReaderContainerFactory<?> pulsarReaderContainerFactory(PulsarReaderFactory<?> pulsarReaderFactory,
|
||||||
SchemaResolver schemaResolver, PulsarContainerFactoryCustomizers containerFactoryCustomizers,
|
SchemaResolver schemaResolver, Environment environment,
|
||||||
Environment environment) {
|
PulsarContainerFactoryCustomizers containerFactoryCustomizers) {
|
||||||
PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties();
|
PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties();
|
||||||
readerContainerProperties.setSchemaResolver(schemaResolver);
|
readerContainerProperties.setSchemaResolver(schemaResolver);
|
||||||
if (Threading.VIRTUAL.isActive(environment)) {
|
if (Threading.VIRTUAL.isActive(environment)) {
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,6 @@ package org.springframework.boot.autoconfigure.pulsar;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
|
@ -600,8 +599,8 @@ class PulsarAutoConfigurationTests {
|
||||||
@Bean
|
@Bean
|
||||||
@Order(50)
|
@Order(50)
|
||||||
PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>> customizerIgnored() {
|
PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>> customizerIgnored() {
|
||||||
return (__) -> {
|
return (containerFactory) -> {
|
||||||
throw new RuntimeException("should-not-have-matched");
|
throw new IllegalStateException("should-not-have-matched");
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -619,8 +618,9 @@ class PulsarAutoConfigurationTests {
|
||||||
|
|
||||||
private void appendToSubscriptionName(ConcurrentPulsarListenerContainerFactory<?> containerFactory,
|
private void appendToSubscriptionName(ConcurrentPulsarListenerContainerFactory<?> containerFactory,
|
||||||
String valueToAppend) {
|
String valueToAppend) {
|
||||||
String name = Objects.toString(containerFactory.getContainerProperties().getSubscriptionName(), "");
|
String subscriptionName = containerFactory.getContainerProperties().getSubscriptionName();
|
||||||
containerFactory.getContainerProperties().setSubscriptionName(name.concat(valueToAppend));
|
String updatedValue = (subscriptionName != null) ? subscriptionName + valueToAppend : valueToAppend;
|
||||||
|
containerFactory.getContainerProperties().setSubscriptionName(updatedValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -724,8 +724,8 @@ class PulsarAutoConfigurationTests {
|
||||||
@Bean
|
@Bean
|
||||||
@Order(50)
|
@Order(50)
|
||||||
PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>> customizerIgnored() {
|
PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>> customizerIgnored() {
|
||||||
return (__) -> {
|
return (containerFactory) -> {
|
||||||
throw new RuntimeException("should-not-have-matched");
|
throw new IllegalStateException("should-not-have-matched");
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -743,8 +743,9 @@ class PulsarAutoConfigurationTests {
|
||||||
|
|
||||||
private void appendToReaderListener(DefaultPulsarReaderContainerFactory<?> containerFactory,
|
private void appendToReaderListener(DefaultPulsarReaderContainerFactory<?> containerFactory,
|
||||||
String valueToAppend) {
|
String valueToAppend) {
|
||||||
String name = Objects.toString(containerFactory.getContainerProperties().getReaderListener(), "");
|
Object readerListener = containerFactory.getContainerProperties().getReaderListener();
|
||||||
containerFactory.getContainerProperties().setReaderListener(name.concat(valueToAppend));
|
String updatedValue = (readerListener != null) ? readerListener + valueToAppend : valueToAppend;
|
||||||
|
containerFactory.getContainerProperties().setReaderListener(updatedValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,6 @@ import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.mockito.BDDMockito;
|
|
||||||
|
|
||||||
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
|
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
|
||||||
import org.springframework.pulsar.config.DefaultPulsarReaderContainerFactory;
|
import org.springframework.pulsar.config.DefaultPulsarReaderContainerFactory;
|
||||||
|
|
@ -33,10 +32,11 @@ import org.springframework.pulsar.listener.PulsarContainerProperties;
|
||||||
import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory;
|
import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.mockito.BDDMockito.then;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit tests for {@link PulsarContainerFactoryCustomizers}.
|
* Tests for {@link PulsarContainerFactoryCustomizers}.
|
||||||
*
|
*
|
||||||
* @author Chris Bono
|
* @author Chris Bono
|
||||||
*/
|
*/
|
||||||
|
|
@ -46,11 +46,11 @@ class PulsarContainerFactoryCustomizersTests {
|
||||||
void customizeWithNullCustomizersShouldDoNothing() {
|
void customizeWithNullCustomizersShouldDoNothing() {
|
||||||
PulsarContainerFactory<?, ?> containerFactory = mock(PulsarContainerFactory.class);
|
PulsarContainerFactory<?, ?> containerFactory = mock(PulsarContainerFactory.class);
|
||||||
new PulsarContainerFactoryCustomizers(null).customize(containerFactory);
|
new PulsarContainerFactoryCustomizers(null).customize(containerFactory);
|
||||||
BDDMockito.verifyNoInteractions(containerFactory);
|
then(containerFactory).shouldHaveNoInteractions();
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Test
|
@Test
|
||||||
|
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||||
void customizeSimplePulsarContainerFactory() {
|
void customizeSimplePulsarContainerFactory() {
|
||||||
PulsarContainerFactoryCustomizers customizers = new PulsarContainerFactoryCustomizers(
|
PulsarContainerFactoryCustomizers customizers = new PulsarContainerFactoryCustomizers(
|
||||||
Collections.singletonList(new SimplePulsarContainerFactoryCustomizer()));
|
Collections.singletonList(new SimplePulsarContainerFactoryCustomizer()));
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,6 @@ package org.springframework.boot.autoconfigure.pulsar;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||||
|
|
@ -397,8 +396,8 @@ class PulsarReactiveAutoConfigurationTests {
|
||||||
@Bean
|
@Bean
|
||||||
@Order(50)
|
@Order(50)
|
||||||
PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> customizerIgnored() {
|
PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> customizerIgnored() {
|
||||||
return (__) -> {
|
return (containerFactory) -> {
|
||||||
throw new RuntimeException("should-not-have-matched");
|
throw new IllegalStateException("should-not-have-matched");
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -416,8 +415,9 @@ class PulsarReactiveAutoConfigurationTests {
|
||||||
|
|
||||||
private void appendToSubscriptionName(DefaultReactivePulsarListenerContainerFactory<?> containerFactory,
|
private void appendToSubscriptionName(DefaultReactivePulsarListenerContainerFactory<?> containerFactory,
|
||||||
String valueToAppend) {
|
String valueToAppend) {
|
||||||
String name = Objects.toString(containerFactory.getContainerProperties().getSubscriptionName(), "");
|
String subscriptionName = containerFactory.getContainerProperties().getSubscriptionName();
|
||||||
containerFactory.getContainerProperties().setSubscriptionName(name.concat(valueToAppend));
|
String updatedValue = (subscriptionName != null) ? subscriptionName + valueToAppend : valueToAppend;
|
||||||
|
containerFactory.getContainerProperties().setSubscriptionName(updatedValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -150,11 +150,11 @@ include-code::MyBean[]
|
||||||
Spring Boot auto-configuration provides all the components necessary for `PulsarListener`, such as the `PulsarListenerContainerFactory` and the consumer factory it uses to construct the underlying Pulsar consumers.
|
Spring Boot auto-configuration provides all the components necessary for `PulsarListener`, such as the `PulsarListenerContainerFactory` and the consumer factory it uses to construct the underlying Pulsar consumers.
|
||||||
You can configure these components by specifying any of the `spring.pulsar.listener.\*` and `spring.pulsar.consumer.*` prefixed application properties.
|
You can configure these components by specifying any of the `spring.pulsar.listener.\*` and `spring.pulsar.consumer.*` prefixed application properties.
|
||||||
|
|
||||||
If you need more control over the configuration of the consumer factory used by the container factory to create consumers, consider registering one or more `ConsumerBuilderCustomizer` beans.
|
If you need more control over the configuration of the consumer factory, consider registering one or more `ConsumerBuilderCustomizer` beans.
|
||||||
These customizers are applied to all consumers created by the factory, and therefore all `@PulsarListener` instances.
|
These customizers are applied to all consumers created by the factory, and therefore all `@PulsarListener` instances.
|
||||||
You can also customize a single listener by setting the `consumerCustomizer` attribute of the `@PulsarListener` annotation.
|
You can also customize a single listener by setting the `consumerCustomizer` attribute of the `@PulsarListener` annotation.
|
||||||
|
|
||||||
If you need more control over the actual container factory configuration, consider registering one or more `PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactoryCustomizer<?>>` beans.
|
If you need more control over the actual container factory configuration, consider registering one or more `PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>>` beans.
|
||||||
|
|
||||||
[[messaging.pulsar.receiving-reactive]]
|
[[messaging.pulsar.receiving-reactive]]
|
||||||
== Receiving a Message Reactively
|
== Receiving a Message Reactively
|
||||||
|
|
@ -167,7 +167,7 @@ include-code::MyBean[]
|
||||||
Spring Boot auto-configuration provides all the components necessary for `ReactivePulsarListener`, such as the `ReactivePulsarListenerContainerFactory` and the consumer factory it uses to construct the underlying reactive Pulsar consumers.
|
Spring Boot auto-configuration provides all the components necessary for `ReactivePulsarListener`, such as the `ReactivePulsarListenerContainerFactory` and the consumer factory it uses to construct the underlying reactive Pulsar consumers.
|
||||||
You can configure these components by specifying any of the `spring.pulsar.listener.\*` and `spring.pulsar.consumer.*` prefixed application properties.
|
You can configure these components by specifying any of the `spring.pulsar.listener.\*` and `spring.pulsar.consumer.*` prefixed application properties.
|
||||||
|
|
||||||
If you need more control over the configuration of the consumer factory used by the container factory to create consumers, consider registering one or more `ReactiveMessageConsumerBuilderCustomizer` beans.
|
If you need more control over the configuration of the consumer factory, consider registering one or more `ReactiveMessageConsumerBuilderCustomizer` beans.
|
||||||
These customizers are applied to all consumers created by the factory, and therefore all `@ReactivePulsarListener` instances.
|
These customizers are applied to all consumers created by the factory, and therefore all `@ReactivePulsarListener` instances.
|
||||||
You can also customize a single listener by setting the `consumerCustomizer` attribute of the `@ReactivePulsarListener` annotation.
|
You can also customize a single listener by setting the `consumerCustomizer` attribute of the `@ReactivePulsarListener` annotation.
|
||||||
|
|
||||||
|
|
@ -187,7 +187,7 @@ include-code::MyBean[]
|
||||||
The `@PulsarReader` relies on a `PulsarReaderFactory` to create the underlying Pulsar reader.
|
The `@PulsarReader` relies on a `PulsarReaderFactory` to create the underlying Pulsar reader.
|
||||||
Spring Boot auto-configuration provides this reader factory which can be customized by setting any of the `spring.pulsar.reader.*` prefixed application properties.
|
Spring Boot auto-configuration provides this reader factory which can be customized by setting any of the `spring.pulsar.reader.*` prefixed application properties.
|
||||||
|
|
||||||
If you need more control over the configuration of the reader factory used by the container factory to create readers, consider registering one or more `ReaderBuilderCustomizer` beans.
|
If you need more control over the configuration of the reader factory, consider registering one or more `ReaderBuilderCustomizer` beans.
|
||||||
These customizers are applied to all readers created by the factory, and therefore all `@PulsarReader` instances.
|
These customizers are applied to all readers created by the factory, and therefore all `@PulsarReader` instances.
|
||||||
You can also customize a single listener by setting the `readerCustomizer` attribute of the `@PulsarReader` annotation.
|
You can also customize a single listener by setting the `readerCustomizer` attribute of the `@PulsarReader` annotation.
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue