Merge pull request #40189 from onobc

* pr/40189:
  Add Spring Pulsar transaction support

Closes gh-40189
This commit is contained in:
Phillip Webb 2024-04-17 19:32:17 -07:00
commit d55eb5b33b
7 changed files with 182 additions and 5 deletions

View File

@ -57,6 +57,8 @@ import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
import org.springframework.pulsar.transaction.PulsarAwareTransactionManager;
import org.springframework.pulsar.transaction.PulsarTransactionManager;
/**
* {@link EnableAutoConfiguration Auto-configuration} for Apache Pulsar.
@ -126,8 +128,11 @@ public class PulsarAutoConfiguration {
PulsarTemplate<?> pulsarTemplate(PulsarProducerFactory<?> pulsarProducerFactory,
ObjectProvider<ProducerInterceptor> producerInterceptors, SchemaResolver schemaResolver,
TopicResolver topicResolver) {
return new PulsarTemplate<>(pulsarProducerFactory, producerInterceptors.orderedStream().toList(),
schemaResolver, topicResolver, this.properties.getTemplate().isObservationsEnabled());
PulsarTemplate<?> template = new PulsarTemplate<>(pulsarProducerFactory,
producerInterceptors.orderedStream().toList(), schemaResolver, topicResolver,
this.properties.getTemplate().isObservationsEnabled());
this.propertiesMapper.customizeTemplate(template);
return template;
}
@Bean
@ -142,6 +147,13 @@ public class PulsarAutoConfiguration {
return new DefaultPulsarConsumerFactory<>(pulsarClient, lambdaSafeCustomizers);
}
@Bean
@ConditionalOnMissingBean(PulsarAwareTransactionManager.class)
@ConditionalOnProperty(prefix = "spring.pulsar.transaction", name = "enabled")
public PulsarTransactionManager pulsarTransactionManager(PulsarClient pulsarClient) {
return new PulsarTransactionManager(pulsarClient);
}
@SuppressWarnings("unchecked")
private void applyConsumerBuilderCustomizers(List<ConsumerBuilderCustomizer<?>> customizers,
ConsumerBuilder<?> builder) {
@ -153,13 +165,15 @@ public class PulsarAutoConfiguration {
@ConditionalOnMissingBean(name = "pulsarListenerContainerFactory")
ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
PulsarConsumerFactory<Object> pulsarConsumerFactory, SchemaResolver schemaResolver,
TopicResolver topicResolver, Environment environment) {
TopicResolver topicResolver, ObjectProvider<PulsarAwareTransactionManager> pulsarTransactionManager,
Environment environment) {
PulsarContainerProperties containerProperties = new PulsarContainerProperties();
containerProperties.setSchemaResolver(schemaResolver);
containerProperties.setTopicResolver(topicResolver);
if (Threading.VIRTUAL.isActive(environment)) {
containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor("pulsar-consumer-"));
}
pulsarTransactionManager.ifUnique(containerProperties.transactions()::setTransactionManager);
this.propertiesMapper.customizeContainerProperties(containerProperties);
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
}

View File

@ -67,6 +67,8 @@ public class PulsarProperties {
private final Template template = new Template();
private final Transaction transaction = new Transaction();
public Client getClient() {
return this.client;
}
@ -103,6 +105,10 @@ public class PulsarProperties {
return this.template;
}
public Transaction getTransaction() {
return this.transaction;
}
public static class Client {
/**
@ -868,6 +874,23 @@ public class PulsarProperties {
}
public static class Transaction {
/**
* Whether transaction support is enabled.
*/
private boolean enabled;
public boolean isEnabled() {
return this.enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
}
public static class Authentication {
/**

View File

@ -39,6 +39,7 @@ import org.apache.pulsar.client.api.ServiceUrlProvider;
import org.apache.pulsar.client.impl.AutoClusterFailover.AutoClusterFailoverBuilderImpl;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
import org.springframework.util.StringUtils;
@ -64,6 +65,7 @@ final class PulsarPropertiesMapper {
map.from(properties::getConnectionTimeout).to(timeoutProperty(clientBuilder::connectionTimeout));
map.from(properties::getOperationTimeout).to(timeoutProperty(clientBuilder::operationTimeout));
map.from(properties::getLookupTimeout).to(timeoutProperty(clientBuilder::lookupTimeout));
map.from(this.properties.getTransaction()::isEnabled).whenTrue().to(clientBuilder::enableTransaction);
customizeAuthentication(properties.getAuthentication(), clientBuilder::authentication);
customizeServiceUrlProviderBuilder(clientBuilder::serviceUrl, clientBuilder::serviceUrlProvider, properties,
connectionDetails);
@ -157,6 +159,10 @@ final class PulsarPropertiesMapper {
map.from(properties::getAccessMode).to(producerBuilder::accessMode);
}
<T> void customizeTemplate(PulsarTemplate<T> template) {
template.transactions().setEnabled(this.properties.getTransaction().isEnabled());
}
<T> void customizeConsumerBuilder(ConsumerBuilder<T> consumerBuilder) {
PulsarProperties.Consumer properties = this.properties.getConsumer();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
@ -183,6 +189,7 @@ final class PulsarPropertiesMapper {
void customizeContainerProperties(PulsarContainerProperties containerProperties) {
customizePulsarContainerConsumerSubscriptionProperties(containerProperties);
customizePulsarContainerListenerProperties(containerProperties);
containerProperties.transactions().setEnabled(this.properties.getTransaction().isEnabled());
}
private void customizePulsarContainerConsumerSubscriptionProperties(PulsarContainerProperties containerProperties) {

View File

@ -70,6 +70,8 @@ import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.pulsar.listener.PulsarContainerProperties.TransactionSettings;
import org.springframework.pulsar.transaction.PulsarAwareTransactionManager;
import org.springframework.test.util.ReflectionTestUtils;
import static org.assertj.core.api.Assertions.assertThat;
@ -330,6 +332,13 @@ class PulsarAutoConfigurationTests {
.hasFieldOrPropertyWithValue("observationEnabled", false));
}
@Test
void whenTransactionEnabledTrueEnablesTransactions() {
this.contextRunner.withPropertyValues("spring.pulsar.transaction.enabled=true")
.run((context) -> assertThat(context.getBean(PulsarTemplate.class).transactions().isEnabled())
.isTrue());
}
@Configuration(proxyBeanMethods = false)
static class InterceptorTestConfiguration {
@ -525,6 +534,28 @@ class PulsarAutoConfigurationTests {
});
}
@Test
void whenTransactionEnabledTrueListenerContainerShouldUseTransactions() {
this.contextRunner.withPropertyValues("spring.pulsar.transaction.enabled=true").run((context) -> {
ConcurrentPulsarListenerContainerFactory<?> factory = context
.getBean(ConcurrentPulsarListenerContainerFactory.class);
TransactionSettings transactions = factory.getContainerProperties().transactions();
assertThat(transactions.isEnabled()).isTrue();
assertThat(transactions.getTransactionManager()).isNotNull();
});
}
@Test
void whenTransactionEnabledFalseListenerContainerShouldNotUseTransactions() {
this.contextRunner.withPropertyValues("spring.pulsar.transaction.enabled=false").run((context) -> {
ConcurrentPulsarListenerContainerFactory<?> factory = context
.getBean(ConcurrentPulsarListenerContainerFactory.class);
TransactionSettings transactions = factory.getContainerProperties().transactions();
assertThat(transactions.isEnabled()).isFalse();
assertThat(transactions.getTransactionManager()).isNull();
});
}
}
@Nested
@ -603,4 +634,37 @@ class PulsarAutoConfigurationTests {
}
@Nested
class TransactionManagerTests {
private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner;
@Test
@SuppressWarnings("unchecked")
void whenUserHasDefinedATransactionManagerTheAutoConfigurationBacksOff() {
PulsarAwareTransactionManager txnMgr = mock(PulsarAwareTransactionManager.class);
this.contextRunner.withBean("customTransactionManager", PulsarAwareTransactionManager.class, () -> txnMgr)
.run((context) -> assertThat(context).getBean(PulsarAwareTransactionManager.class).isSameAs(txnMgr));
}
@Test
void whenNoPropertiesAreSetTransactionManagerShouldNotBeDefined() {
this.contextRunner
.run((context) -> assertThat(context).doesNotHaveBean(PulsarAwareTransactionManager.class));
}
@Test
void whenTransactionEnabledFalseTransactionManagerIsNotAutoConfigured() {
this.contextRunner.withPropertyValues("spring.pulsar.transaction.enabled=false")
.run((context) -> assertThat(context).doesNotHaveBean(PulsarAwareTransactionManager.class));
}
@Test
void whenTransactionEnabledTrueTransactionManagerIsAutoConfigured() {
this.contextRunner.withPropertyValues("spring.pulsar.transaction.enabled=true")
.run((context) -> assertThat(context).hasSingleBean(PulsarAwareTransactionManager.class));
}
}
}

View File

@ -38,16 +38,20 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.AutoClusterFailover;
import org.apache.pulsar.common.schema.SchemaType;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Consumer;
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Failover.BackupCluster;
import org.springframework.pulsar.core.PulsarProducerFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
/**
* Tests for {@link PulsarPropertiesMapper}.
@ -87,6 +91,26 @@ class PulsarPropertiesMapperTests {
then(builder).should().authentication("myclass", authParamString);
}
@Test
void customizeClientBuilderWhenTransactionEnabled() {
PulsarProperties properties = new PulsarProperties();
properties.getTransaction().setEnabled(true);
ClientBuilder builder = mock(ClientBuilder.class);
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder,
new PropertiesPulsarConnectionDetails(properties));
then(builder).should().enableTransaction(true);
}
@Test
void customizeClientBuilderWhenTransactionDisabled() {
PulsarProperties properties = new PulsarProperties();
properties.getTransaction().setEnabled(false);
ClientBuilder builder = mock(ClientBuilder.class);
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder,
new PropertiesPulsarConnectionDetails(properties));
then(builder).should(never()).enableTransaction(anyBoolean());
}
@Test
void customizeClientBuilderWhenHasConnectionDetails() {
PulsarProperties properties = new PulsarProperties();
@ -120,7 +144,7 @@ class PulsarPropertiesMapperTests {
ClientBuilder builder = mock(ClientBuilder.class);
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder,
new PropertiesPulsarConnectionDetails(properties));
then(builder).should().serviceUrlProvider(Mockito.any(AutoClusterFailover.class));
then(builder).should().serviceUrlProvider(any(AutoClusterFailover.class));
}
@Test
@ -189,6 +213,16 @@ class PulsarPropertiesMapperTests {
then(builder).should().accessMode(ProducerAccessMode.Exclusive);
}
@Test
@SuppressWarnings("unchecked")
void customizeTemplate() {
PulsarProperties properties = new PulsarProperties();
properties.getTransaction().setEnabled(true);
PulsarTemplate<Object> template = new PulsarTemplate<>(mock(PulsarProducerFactory.class));
new PulsarPropertiesMapper(properties).customizeTemplate(template);
assertThat(template.transactions().isEnabled()).isTrue();
}
@Test
@SuppressWarnings("unchecked")
void customizeConsumerBuilder() {
@ -220,11 +254,13 @@ class PulsarPropertiesMapperTests {
properties.getConsumer().getSubscription().setType(SubscriptionType.Shared);
properties.getListener().setSchemaType(SchemaType.AVRO);
properties.getListener().setObservationEnabled(true);
properties.getTransaction().setEnabled(true);
PulsarContainerProperties containerProperties = new PulsarContainerProperties("my-topic-pattern");
new PulsarPropertiesMapper(properties).customizeContainerProperties(containerProperties);
assertThat(containerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
assertThat(containerProperties.getSchemaType()).isEqualTo(SchemaType.AVRO);
assertThat(containerProperties.isObservationEnabled()).isTrue();
assertThat(containerProperties.transactions().isEnabled()).isTrue();
}
@Test

View File

@ -396,4 +396,17 @@ class PulsarPropertiesTests {
}
@Nested
class TransactionProperties {
@Test
void bind() {
Map<String, String> map = new HashMap<>();
map.put("spring.pulsar.transaction.enabled", "true");
PulsarProperties.Transaction properties = bindPropeties(map).getTransaction();
assertThat(properties.isEnabled()).isTrue();
}
}
}

View File

@ -215,6 +215,26 @@ TIP: For more details on any of the above components and to discover other avail
[[messaging.pulsar.transactions]]
== Transaction Support
Spring for Apache Pulsar supports transactions when using `PulsarTemplate` and `@PulsarListener`.
NOTE: Transactions are not currently supported when using the reactive variants.
Setting the configprop:spring.pulsar.transaction.enabled[] property to `true` will:
* Configure a `PulsarTransactionManager` bean
* Enable transaction support for `PulsarTemplate`
* Enable transaction support for `@PulsarListener` methods
The `transactional` attribute of `@PulsarListener` can be used to fine-tune when transactions should be used with listeners.
For more control of the Spring for Apache Pulsar transaction features you should define your own `PulsarTemplate` and/or `ConcurrentPulsarListenerContainerFactory` beans.
You can also define a `PulsarAwareTransactionManager` bean if the default auto-configured `PulsarTransactionManager` is not suitable.
[[messaging.pulsar.additional-properties]]
== Additional Pulsar Properties