Add Spring Pulsar transaction support
Adds auto-config for Spring for Apache Pulsar transactions. Introduces a new `spring.pulsar.transaction.enabled` property which can be used to enable transactions. This feature is opt-in and remains disabled by default. See gh-40189 Co-authored-by: Andy Wilkinson <andy.wilkinson@broadcom.com> Co-authored-by: Phillip Webb <phil.webb@broadcom.com>
This commit is contained in:
parent
07f82744f1
commit
08ad7aa444
|
@ -57,6 +57,8 @@ import org.springframework.pulsar.core.SchemaResolver;
|
|||
import org.springframework.pulsar.core.TopicResolver;
|
||||
import org.springframework.pulsar.listener.PulsarContainerProperties;
|
||||
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
|
||||
import org.springframework.pulsar.transaction.PulsarAwareTransactionManager;
|
||||
import org.springframework.pulsar.transaction.PulsarTransactionManager;
|
||||
|
||||
/**
|
||||
* {@link EnableAutoConfiguration Auto-configuration} for Apache Pulsar.
|
||||
|
@ -126,8 +128,11 @@ public class PulsarAutoConfiguration {
|
|||
PulsarTemplate<?> pulsarTemplate(PulsarProducerFactory<?> pulsarProducerFactory,
|
||||
ObjectProvider<ProducerInterceptor> producerInterceptors, SchemaResolver schemaResolver,
|
||||
TopicResolver topicResolver) {
|
||||
return new PulsarTemplate<>(pulsarProducerFactory, producerInterceptors.orderedStream().toList(),
|
||||
schemaResolver, topicResolver, this.properties.getTemplate().isObservationsEnabled());
|
||||
PulsarTemplate<?> template = new PulsarTemplate<>(pulsarProducerFactory,
|
||||
producerInterceptors.orderedStream().toList(), schemaResolver, topicResolver,
|
||||
this.properties.getTemplate().isObservationsEnabled());
|
||||
this.propertiesMapper.customizeTemplate(template);
|
||||
return template;
|
||||
}
|
||||
|
||||
@Bean
|
||||
|
@ -142,6 +147,13 @@ public class PulsarAutoConfiguration {
|
|||
return new DefaultPulsarConsumerFactory<>(pulsarClient, lambdaSafeCustomizers);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(PulsarAwareTransactionManager.class)
|
||||
@ConditionalOnProperty(prefix = "spring.pulsar.transaction", name = "enabled")
|
||||
public PulsarTransactionManager pulsarTransactionManager(PulsarClient pulsarClient) {
|
||||
return new PulsarTransactionManager(pulsarClient);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void applyConsumerBuilderCustomizers(List<ConsumerBuilderCustomizer<?>> customizers,
|
||||
ConsumerBuilder<?> builder) {
|
||||
|
@ -153,13 +165,15 @@ public class PulsarAutoConfiguration {
|
|||
@ConditionalOnMissingBean(name = "pulsarListenerContainerFactory")
|
||||
ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
|
||||
PulsarConsumerFactory<Object> pulsarConsumerFactory, SchemaResolver schemaResolver,
|
||||
TopicResolver topicResolver, Environment environment) {
|
||||
TopicResolver topicResolver, ObjectProvider<PulsarAwareTransactionManager> pulsarTransactionManager,
|
||||
Environment environment) {
|
||||
PulsarContainerProperties containerProperties = new PulsarContainerProperties();
|
||||
containerProperties.setSchemaResolver(schemaResolver);
|
||||
containerProperties.setTopicResolver(topicResolver);
|
||||
if (Threading.VIRTUAL.isActive(environment)) {
|
||||
containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor("pulsar-consumer-"));
|
||||
}
|
||||
pulsarTransactionManager.ifUnique(containerProperties.transactions()::setTransactionManager);
|
||||
this.propertiesMapper.customizeContainerProperties(containerProperties);
|
||||
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
|
||||
}
|
||||
|
|
|
@ -67,6 +67,8 @@ public class PulsarProperties {
|
|||
|
||||
private final Template template = new Template();
|
||||
|
||||
private final Transaction transaction = new Transaction();
|
||||
|
||||
public Client getClient() {
|
||||
return this.client;
|
||||
}
|
||||
|
@ -103,6 +105,10 @@ public class PulsarProperties {
|
|||
return this.template;
|
||||
}
|
||||
|
||||
public Transaction getTransaction() {
|
||||
return this.transaction;
|
||||
}
|
||||
|
||||
public static class Client {
|
||||
|
||||
/**
|
||||
|
@ -868,6 +874,23 @@ public class PulsarProperties {
|
|||
|
||||
}
|
||||
|
||||
public static class Transaction {
|
||||
|
||||
/**
|
||||
* Whether transaction support is enabled.
|
||||
*/
|
||||
private boolean enabled;
|
||||
|
||||
public boolean isEnabled() {
|
||||
return this.enabled;
|
||||
}
|
||||
|
||||
public void setEnabled(boolean enabled) {
|
||||
this.enabled = enabled;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Authentication {
|
||||
|
||||
/**
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.pulsar.client.api.ServiceUrlProvider;
|
|||
import org.apache.pulsar.client.impl.AutoClusterFailover.AutoClusterFailoverBuilderImpl;
|
||||
|
||||
import org.springframework.boot.context.properties.PropertyMapper;
|
||||
import org.springframework.pulsar.core.PulsarTemplate;
|
||||
import org.springframework.pulsar.listener.PulsarContainerProperties;
|
||||
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
@ -64,6 +65,7 @@ final class PulsarPropertiesMapper {
|
|||
map.from(properties::getConnectionTimeout).to(timeoutProperty(clientBuilder::connectionTimeout));
|
||||
map.from(properties::getOperationTimeout).to(timeoutProperty(clientBuilder::operationTimeout));
|
||||
map.from(properties::getLookupTimeout).to(timeoutProperty(clientBuilder::lookupTimeout));
|
||||
map.from(this.properties.getTransaction()::isEnabled).whenTrue().to(clientBuilder::enableTransaction);
|
||||
customizeAuthentication(properties.getAuthentication(), clientBuilder::authentication);
|
||||
customizeServiceUrlProviderBuilder(clientBuilder::serviceUrl, clientBuilder::serviceUrlProvider, properties,
|
||||
connectionDetails);
|
||||
|
@ -157,6 +159,10 @@ final class PulsarPropertiesMapper {
|
|||
map.from(properties::getAccessMode).to(producerBuilder::accessMode);
|
||||
}
|
||||
|
||||
<T> void customizeTemplate(PulsarTemplate<T> template) {
|
||||
template.transactions().setEnabled(this.properties.getTransaction().isEnabled());
|
||||
}
|
||||
|
||||
<T> void customizeConsumerBuilder(ConsumerBuilder<T> consumerBuilder) {
|
||||
PulsarProperties.Consumer properties = this.properties.getConsumer();
|
||||
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
|
||||
|
@ -183,6 +189,7 @@ final class PulsarPropertiesMapper {
|
|||
void customizeContainerProperties(PulsarContainerProperties containerProperties) {
|
||||
customizePulsarContainerConsumerSubscriptionProperties(containerProperties);
|
||||
customizePulsarContainerListenerProperties(containerProperties);
|
||||
containerProperties.transactions().setEnabled(this.properties.getTransaction().isEnabled());
|
||||
}
|
||||
|
||||
private void customizePulsarContainerConsumerSubscriptionProperties(PulsarContainerProperties containerProperties) {
|
||||
|
|
|
@ -70,6 +70,8 @@ import org.springframework.pulsar.core.PulsarTemplate;
|
|||
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
|
||||
import org.springframework.pulsar.core.SchemaResolver;
|
||||
import org.springframework.pulsar.core.TopicResolver;
|
||||
import org.springframework.pulsar.listener.PulsarContainerProperties.TransactionSettings;
|
||||
import org.springframework.pulsar.transaction.PulsarAwareTransactionManager;
|
||||
import org.springframework.test.util.ReflectionTestUtils;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
@ -330,6 +332,13 @@ class PulsarAutoConfigurationTests {
|
|||
.hasFieldOrPropertyWithValue("observationEnabled", false));
|
||||
}
|
||||
|
||||
@Test
|
||||
void whenTransactionEnabledTrueEnablesTransactions() {
|
||||
this.contextRunner.withPropertyValues("spring.pulsar.transaction.enabled=true")
|
||||
.run((context) -> assertThat(context.getBean(PulsarTemplate.class).transactions().isEnabled())
|
||||
.isTrue());
|
||||
}
|
||||
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
static class InterceptorTestConfiguration {
|
||||
|
||||
|
@ -525,6 +534,28 @@ class PulsarAutoConfigurationTests {
|
|||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void whenTransactionEnabledTrueListenerContainerShouldUseTransactions() {
|
||||
this.contextRunner.withPropertyValues("spring.pulsar.transaction.enabled=true").run((context) -> {
|
||||
ConcurrentPulsarListenerContainerFactory<?> factory = context
|
||||
.getBean(ConcurrentPulsarListenerContainerFactory.class);
|
||||
TransactionSettings transactions = factory.getContainerProperties().transactions();
|
||||
assertThat(transactions.isEnabled()).isTrue();
|
||||
assertThat(transactions.getTransactionManager()).isNotNull();
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void whenTransactionEnabledFalseListenerContainerShouldNotUseTransactions() {
|
||||
this.contextRunner.withPropertyValues("spring.pulsar.transaction.enabled=false").run((context) -> {
|
||||
ConcurrentPulsarListenerContainerFactory<?> factory = context
|
||||
.getBean(ConcurrentPulsarListenerContainerFactory.class);
|
||||
TransactionSettings transactions = factory.getContainerProperties().transactions();
|
||||
assertThat(transactions.isEnabled()).isFalse();
|
||||
assertThat(transactions.getTransactionManager()).isNull();
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Nested
|
||||
|
@ -603,4 +634,37 @@ class PulsarAutoConfigurationTests {
|
|||
|
||||
}
|
||||
|
||||
@Nested
|
||||
class TransactionManagerTests {
|
||||
|
||||
private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner;
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
void whenUserHasDefinedATransactionManagerTheAutoConfigurationBacksOff() {
|
||||
PulsarAwareTransactionManager txnMgr = mock(PulsarAwareTransactionManager.class);
|
||||
this.contextRunner.withBean("customTransactionManager", PulsarAwareTransactionManager.class, () -> txnMgr)
|
||||
.run((context) -> assertThat(context).getBean(PulsarAwareTransactionManager.class).isSameAs(txnMgr));
|
||||
}
|
||||
|
||||
@Test
|
||||
void whenNoPropertiesAreSetTransactionManagerShouldNotBeDefined() {
|
||||
this.contextRunner
|
||||
.run((context) -> assertThat(context).doesNotHaveBean(PulsarAwareTransactionManager.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
void whenTransactionEnabledFalseTransactionManagerIsNotAutoConfigured() {
|
||||
this.contextRunner.withPropertyValues("spring.pulsar.transaction.enabled=false")
|
||||
.run((context) -> assertThat(context).doesNotHaveBean(PulsarAwareTransactionManager.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
void whenTransactionEnabledTrueTransactionManagerIsAutoConfigured() {
|
||||
this.contextRunner.withPropertyValues("spring.pulsar.transaction.enabled=true")
|
||||
.run((context) -> assertThat(context).hasSingleBean(PulsarAwareTransactionManager.class));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -38,16 +38,20 @@ import org.apache.pulsar.client.api.SubscriptionType;
|
|||
import org.apache.pulsar.client.impl.AutoClusterFailover;
|
||||
import org.apache.pulsar.common.schema.SchemaType;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Consumer;
|
||||
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Failover.BackupCluster;
|
||||
import org.springframework.pulsar.core.PulsarProducerFactory;
|
||||
import org.springframework.pulsar.core.PulsarTemplate;
|
||||
import org.springframework.pulsar.listener.PulsarContainerProperties;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||
import static org.mockito.BDDMockito.given;
|
||||
import static org.mockito.BDDMockito.then;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
|
||||
/**
|
||||
* Tests for {@link PulsarPropertiesMapper}.
|
||||
|
@ -87,6 +91,26 @@ class PulsarPropertiesMapperTests {
|
|||
then(builder).should().authentication("myclass", authParamString);
|
||||
}
|
||||
|
||||
@Test
|
||||
void customizeClientBuilderWhenTransactionEnabled() {
|
||||
PulsarProperties properties = new PulsarProperties();
|
||||
properties.getTransaction().setEnabled(true);
|
||||
ClientBuilder builder = mock(ClientBuilder.class);
|
||||
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder,
|
||||
new PropertiesPulsarConnectionDetails(properties));
|
||||
then(builder).should().enableTransaction(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
void customizeClientBuilderWhenTransactionDisabled() {
|
||||
PulsarProperties properties = new PulsarProperties();
|
||||
properties.getTransaction().setEnabled(false);
|
||||
ClientBuilder builder = mock(ClientBuilder.class);
|
||||
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder,
|
||||
new PropertiesPulsarConnectionDetails(properties));
|
||||
then(builder).should(never()).enableTransaction(anyBoolean());
|
||||
}
|
||||
|
||||
@Test
|
||||
void customizeClientBuilderWhenHasConnectionDetails() {
|
||||
PulsarProperties properties = new PulsarProperties();
|
||||
|
@ -120,7 +144,7 @@ class PulsarPropertiesMapperTests {
|
|||
ClientBuilder builder = mock(ClientBuilder.class);
|
||||
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder,
|
||||
new PropertiesPulsarConnectionDetails(properties));
|
||||
then(builder).should().serviceUrlProvider(Mockito.any(AutoClusterFailover.class));
|
||||
then(builder).should().serviceUrlProvider(any(AutoClusterFailover.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -189,6 +213,16 @@ class PulsarPropertiesMapperTests {
|
|||
then(builder).should().accessMode(ProducerAccessMode.Exclusive);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
void customizeTemplate() {
|
||||
PulsarProperties properties = new PulsarProperties();
|
||||
properties.getTransaction().setEnabled(true);
|
||||
PulsarTemplate<Object> template = new PulsarTemplate<>(mock(PulsarProducerFactory.class));
|
||||
new PulsarPropertiesMapper(properties).customizeTemplate(template);
|
||||
assertThat(template.transactions().isEnabled()).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
void customizeConsumerBuilder() {
|
||||
|
@ -220,11 +254,13 @@ class PulsarPropertiesMapperTests {
|
|||
properties.getConsumer().getSubscription().setType(SubscriptionType.Shared);
|
||||
properties.getListener().setSchemaType(SchemaType.AVRO);
|
||||
properties.getListener().setObservationEnabled(true);
|
||||
properties.getTransaction().setEnabled(true);
|
||||
PulsarContainerProperties containerProperties = new PulsarContainerProperties("my-topic-pattern");
|
||||
new PulsarPropertiesMapper(properties).customizeContainerProperties(containerProperties);
|
||||
assertThat(containerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
|
||||
assertThat(containerProperties.getSchemaType()).isEqualTo(SchemaType.AVRO);
|
||||
assertThat(containerProperties.isObservationEnabled()).isTrue();
|
||||
assertThat(containerProperties.transactions().isEnabled()).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -396,4 +396,17 @@ class PulsarPropertiesTests {
|
|||
|
||||
}
|
||||
|
||||
@Nested
|
||||
class TransactionProperties {
|
||||
|
||||
@Test
|
||||
void bind() {
|
||||
Map<String, String> map = new HashMap<>();
|
||||
map.put("spring.pulsar.transaction.enabled", "true");
|
||||
PulsarProperties.Transaction properties = bindPropeties(map).getTransaction();
|
||||
assertThat(properties.isEnabled()).isTrue();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -215,6 +215,26 @@ TIP: For more details on any of the above components and to discover other avail
|
|||
|
||||
|
||||
|
||||
[[messaging.pulsar.transactions]]
|
||||
== Transaction Support
|
||||
|
||||
Spring for Apache Pulsar supports transactions when using `PulsarTemplate` and `@PulsarListener`.
|
||||
|
||||
NOTE: Transactions are not currently supported when using the reactive variants.
|
||||
|
||||
Setting the configprop:spring.pulsar.transaction.enabled[] property to `true` will:
|
||||
|
||||
* Configure a `PulsarTransactionManager` bean
|
||||
* Enable transaction support for `PulsarTemplate`
|
||||
* Enable transaction support for `@PulsarListener` methods
|
||||
|
||||
The `transactional` attribute of `@PulsarListener` can be used to fine-tune when transactions should be used with listeners.
|
||||
|
||||
For more control of the Spring for Apache Pulsar transaction features you should define your own `PulsarTemplate` and/or `ConcurrentPulsarListenerContainerFactory` beans.
|
||||
You can also define a `PulsarAwareTransactionManager` bean if the default auto-configured `PulsarTransactionManager` is not suitable.
|
||||
|
||||
|
||||
|
||||
[[messaging.pulsar.additional-properties]]
|
||||
== Additional Pulsar Properties
|
||||
|
||||
|
|
Loading…
Reference in New Issue