Support for transactional listeners with reactive transactions

TransactionalApplicationListener and TransactionalEventListener automatically detect a reactive TransactionContext as the event source and register the synchronization accordingly. TransactionalEventPublisher is a convenient delegate for publishing corresponding events with the current TransactionContext as event source. This can also serve as a guideline for similar reactive event purposes.

Closes gh-27515
Closes gh-21025
Closes gh-30244
This commit is contained in:
Juergen Hoeller 2023-08-01 23:27:38 +02:00
parent a9d100eeee
commit 450cc212a2
10 changed files with 787 additions and 64 deletions

View File

@ -57,10 +57,14 @@ attribute of the annotation to `true`.
[NOTE]
====
`@TransactionalEventListener` only works with thread-bound transactions managed by
`PlatformTransactionManager`. A reactive transaction managed by `ReactiveTransactionManager`
uses the Reactor context instead of thread-local attributes, so from the perspective of
an event listener, there is no compatible active transaction that it can participate in.
As of 6.1, `@TransactionalEventListener` can work with thread-bound transactions managed by
`PlatformTransactionManager` as well as reactive transactions managed by `ReactiveTransactionManager`.
For the former, listeners are guaranteed to see the current thread-bound transaction.
Since the latter uses the Reactor context instead of thread-local variables, the transaction
context needs to be included in the published event instance as the event source.
See the
{api-spring-framework}/transaction/reactive/TransactionalEventPublisher.html[`TransactionalEventPublisher`]
javadoc for details.
====

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -29,6 +29,7 @@ package org.springframework.context;
* @see org.springframework.context.ApplicationEvent
* @see org.springframework.context.event.ApplicationEventMulticaster
* @see org.springframework.context.event.EventPublicationInterceptor
* @see org.springframework.transaction.event.TransactionalApplicationListener
*/
@FunctionalInterface
public interface ApplicationEventPublisher {
@ -42,8 +43,21 @@ public interface ApplicationEventPublisher {
* or even immediate execution at all. Event listeners are encouraged
* to be as efficient as possible, individually using asynchronous
* execution for longer-running and potentially blocking operations.
* <p>For usage in a reactive call stack, include event publication
* as a simple hand-off:
* {@code Mono.fromRunnable(() -> eventPublisher.publishEvent(...))}.
* As with any asynchronous execution, thread-local data is not going
* to be available for reactive listener methods. All state which is
* necessary to process the event needs to be included in the event
* instance itself.
* <p>For the convenient inclusion of the current transaction context
* in a reactive hand-off, consider using
* {@link org.springframework.transaction.reactive.TransactionalEventPublisher#publishEvent(Function)}.
* For thread-bound transactions, this is not necessary since the
* state will be implicitly available through thread-local storage.
* @param event the event to publish
* @see #publishEvent(Object)
* @see ApplicationListener#supportsAsyncExecution()
* @see org.springframework.context.event.ContextRefreshedEvent
* @see org.springframework.context.event.ContextClosedEvent
*/
@ -61,6 +75,11 @@ public interface ApplicationEventPublisher {
* or even immediate execution at all. Event listeners are encouraged
* to be as efficient as possible, individually using asynchronous
* execution for longer-running and potentially blocking operations.
* <p>For the convenient inclusion of the current transaction context
* in a reactive hand-off, consider using
* {@link org.springframework.transaction.reactive.TransactionalEventPublisher#publishEvent(Object)}.
* For thread-bound transactions, this is not necessary since the
* state will be implicitly available through thread-local storage.
* @param event the event to publish
* @since 4.2
* @see #publishEvent(ApplicationEvent)

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -32,12 +32,13 @@ import org.springframework.lang.Nullable;
* allows you to prioritize that listener amongst other listeners running before or after
* transaction completion.
*
* <p><b>NOTE: Transactional event listeners only work with thread-bound transactions
* managed by a {@link org.springframework.transaction.PlatformTransactionManager
* PlatformTransactionManager}.</b> A reactive transaction managed by a
* {@link org.springframework.transaction.ReactiveTransactionManager ReactiveTransactionManager}
* uses the Reactor context instead of thread-local variables, so from the perspective of
* an event listener, there is no compatible active transaction that it can participate in.
* <p>As of 6.1, transactional event listeners can work with thread-bound transactions managed
* by a {@link org.springframework.transaction.PlatformTransactionManager} as well as reactive
* transactions managed by a {@link org.springframework.transaction.ReactiveTransactionManager}.
* For the former, listeners are guaranteed to see the current thread-bound transaction.
* Since the latter uses the Reactor context instead of thread-local variables, the transaction
* context needs to be included in the published event instance as the event source:
* see {@link org.springframework.transaction.reactive.TransactionalEventPublisher}.
*
* @author Juergen Hoeller
* @author Oliver Drotbohm
@ -60,6 +61,16 @@ public interface TransactionalApplicationListener<E extends ApplicationEvent>
return Ordered.LOWEST_PRECEDENCE;
}
/**
* Transaction-synchronized listeners do not support asynchronous execution,
* only their target listener ({@link #processEvent}) potentially does.
* @since 6.1
*/
@Override
default boolean supportsAsyncExecution() {
return false;
}
/**
* Return an identifier for the listener to be able to refer to it individually.
* <p>It might be necessary for specific completion callback implementations

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -22,7 +22,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.core.Ordered;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
/**
@ -128,11 +127,7 @@ public class TransactionalApplicationListenerAdapter<E extends ApplicationEvent>
@Override
public void onApplicationEvent(E event) {
if (TransactionSynchronizationManager.isSynchronizationActive() &&
TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronizationManager.registerSynchronization(
new TransactionalApplicationListenerSynchronization<>(event, this, this.callbacks));
}
TransactionalApplicationListenerSynchronization.register(event, this, this.callbacks);
}
}

View File

@ -25,7 +25,6 @@ import org.springframework.context.event.ApplicationListenerMethodAdapter;
import org.springframework.context.event.EventListener;
import org.springframework.context.event.GenericApplicationListener;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
/**
@ -87,10 +86,10 @@ public class TransactionalApplicationListenerMethodAdapter extends ApplicationLi
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (TransactionSynchronizationManager.isSynchronizationActive() &&
TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronizationManager.registerSynchronization(
new TransactionalApplicationListenerSynchronization<>(event, this, this.callbacks));
if (TransactionalApplicationListenerSynchronization.register(event, this, this.callbacks)) {
if (logger.isDebugEnabled()) {
logger.debug("Registered transaction synchronization for " + event);
}
}
else if (this.annotation.fallbackExecution()) {
if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,19 +18,21 @@ package org.springframework.transaction.event;
import java.util.List;
import reactor.core.publisher.Mono;
import org.springframework.context.ApplicationEvent;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.core.Ordered;
import org.springframework.transaction.reactive.TransactionContext;
/**
* {@link TransactionSynchronization} implementation for event processing with a
* {@code TransactionSynchronization} implementations for event processing with a
* {@link TransactionalApplicationListener}.
*
* @author Juergen Hoeller
* @since 5.3
* @param <E> the specific {@code ApplicationEvent} subclass to listen to
*/
class TransactionalApplicationListenerSynchronization<E extends ApplicationEvent>
implements TransactionSynchronization {
abstract class TransactionalApplicationListenerSynchronization<E extends ApplicationEvent> implements Ordered {
private final E event;
@ -53,28 +55,11 @@ class TransactionalApplicationListenerSynchronization<E extends ApplicationEvent
return this.listener.getOrder();
}
@Override
public void beforeCommit(boolean readOnly) {
if (this.listener.getTransactionPhase() == TransactionPhase.BEFORE_COMMIT) {
processEventWithCallbacks();
}
public TransactionPhase getTransactionPhase() {
return this.listener.getTransactionPhase();
}
@Override
public void afterCompletion(int status) {
TransactionPhase phase = this.listener.getTransactionPhase();
if (phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
processEventWithCallbacks();
}
else if (phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
processEventWithCallbacks();
}
else if (phase == TransactionPhase.AFTER_COMPLETION) {
processEventWithCallbacks();
}
}
private void processEventWithCallbacks() {
public void processEventWithCallbacks() {
this.callbacks.forEach(callback -> callback.preProcessEvent(this.event));
try {
this.listener.processEvent(this.event);
@ -86,4 +71,94 @@ class TransactionalApplicationListenerSynchronization<E extends ApplicationEvent
this.callbacks.forEach(callback -> callback.postProcessEvent(this.event, null));
}
public static <E extends ApplicationEvent> boolean register(
E event, TransactionalApplicationListener<E> listener,
List<TransactionalApplicationListener.SynchronizationCallback> callbacks) {
if (org.springframework.transaction.support.TransactionSynchronizationManager.isSynchronizationActive() &&
org.springframework.transaction.support.TransactionSynchronizationManager.isActualTransactionActive()) {
org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization(
new PlatformSynchronization<>(event, listener, callbacks));
return true;
}
else if (event.getSource() instanceof TransactionContext txContext) {
org.springframework.transaction.reactive.TransactionSynchronizationManager rtsm =
new org.springframework.transaction.reactive.TransactionSynchronizationManager(txContext);
if (rtsm.isSynchronizationActive() && rtsm.isActualTransactionActive()) {
rtsm.registerSynchronization(new ReactiveSynchronization<>(event, listener, callbacks));
return true;
}
}
return false;
}
private static class PlatformSynchronization<AE extends ApplicationEvent>
extends TransactionalApplicationListenerSynchronization<AE>
implements org.springframework.transaction.support.TransactionSynchronization {
public PlatformSynchronization(AE event, TransactionalApplicationListener<AE> listener,
List<TransactionalApplicationListener.SynchronizationCallback> callbacks) {
super(event, listener, callbacks);
}
@Override
public void beforeCommit(boolean readOnly) {
if (getTransactionPhase() == TransactionPhase.BEFORE_COMMIT) {
processEventWithCallbacks();
}
}
@Override
public void afterCompletion(int status) {
TransactionPhase phase = getTransactionPhase();
if (phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
processEventWithCallbacks();
}
else if (phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
processEventWithCallbacks();
}
else if (phase == TransactionPhase.AFTER_COMPLETION) {
processEventWithCallbacks();
}
}
}
private static class ReactiveSynchronization<AE extends ApplicationEvent>
extends TransactionalApplicationListenerSynchronization<AE>
implements org.springframework.transaction.reactive.TransactionSynchronization {
public ReactiveSynchronization(AE event, TransactionalApplicationListener<AE> listener,
List<TransactionalApplicationListener.SynchronizationCallback> callbacks) {
super(event, listener, callbacks);
}
@Override
public Mono<Void> beforeCommit(boolean readOnly) {
if (getTransactionPhase() == TransactionPhase.BEFORE_COMMIT) {
return Mono.fromRunnable(this::processEventWithCallbacks);
}
return Mono.empty();
}
@Override
public Mono<Void> afterCompletion(int status) {
TransactionPhase phase = getTransactionPhase();
if (phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
return Mono.fromRunnable(this::processEventWithCallbacks);
}
else if (phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
return Mono.fromRunnable(this::processEventWithCallbacks);
}
else if (phase == TransactionPhase.AFTER_COMPLETION) {
return Mono.fromRunnable(this::processEventWithCallbacks);
}
return Mono.empty();
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -37,12 +37,13 @@ import org.springframework.core.annotation.AliasFor;
* method allows you to prioritize that listener amongst other listeners running before
* or after transaction completion.
*
* <p><b>NOTE: Transactional event listeners only work with thread-bound transactions
* managed by a {@link org.springframework.transaction.PlatformTransactionManager
* PlatformTransactionManager}.</b> A reactive transaction managed by a
* {@link org.springframework.transaction.ReactiveTransactionManager ReactiveTransactionManager}
* uses the Reactor context instead of thread-local variables, so from the perspective of
* an event listener, there is no compatible active transaction that it can participate in.
* <p>As of 6.1, transactional event listeners can work with thread-bound transactions managed
* by a {@link org.springframework.transaction.PlatformTransactionManager} as well as reactive
* transactions managed by a {@link org.springframework.transaction.ReactiveTransactionManager}.
* For the former, listeners are guaranteed to see the current thread-bound transaction.
* Since the latter uses the Reactor context instead of thread-local variables, the transaction
* context needs to be included in the published event instance as the event source:
* see {@link org.springframework.transaction.reactive.TransactionalEventPublisher}.
*
* <p><strong>WARNING:</strong> if the {@code TransactionPhase} is set to
* {@link TransactionPhase#AFTER_COMMIT AFTER_COMMIT} (the default),

View File

@ -0,0 +1,89 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.reactive;
import java.util.function.Function;
import reactor.core.publisher.Mono;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.PayloadApplicationEvent;
/**
* A delegate for publishing transactional events in a reactive setup.
* Includes the current Reactor-managed {@link TransactionContext} as
* a source object for every {@link ApplicationEvent} to be published.
*
* <p>This delegate is just a convenience. The current {@link TransactionContext}
* can be directly included as the event source as well, and then published
* through an {@link ApplicationEventPublisher} such as the Spring
* {@link org.springframework.context.ApplicationContext}:
*
* <pre class="code">
* TransactionContextManager.currentContext()
* .map(source -> new PayloadApplicationEvent<>(source, "myPayload"))
* .doOnSuccess(this.eventPublisher::publishEvent)
* </pre>
*
* @author Juergen Hoeller
* @since 6.1
* @see #publishEvent(Function)
* @see #publishEvent(Object)
* @see ApplicationEventPublisher
*/
public class TransactionalEventPublisher {
private final ApplicationEventPublisher eventPublisher;
/**
* Create a new delegate for publishing transactional events in a reactive setup.
* @param eventPublisher the actual event publisher to use,
* typically a Spring {@link org.springframework.context.ApplicationContext}
*/
public TransactionalEventPublisher(ApplicationEventPublisher eventPublisher) {
this.eventPublisher = eventPublisher;
}
/**
* Publish an event created through the given function which maps the transaction
* source object (the {@link TransactionContext}) to the event instance.
* @param eventCreationFunction a function mapping the source object to the event instance,
* e.g. {@code source -> new PayloadApplicationEvent<>(source, "myPayload")}
* @return the Reactor {@link Mono} for the transactional event publication
*/
public Mono<Void> publishEvent(Function<TransactionContext, ApplicationEvent> eventCreationFunction) {
return TransactionContextManager.currentContext().map(eventCreationFunction)
.doOnSuccess(this.eventPublisher::publishEvent).then();
}
/**
* Publish an event created for the given payload.
* @param payload the payload to publish as an event
* @return the Reactor {@link Mono} for the transactional event publication
*/
public Mono<Void> publishEvent(Object payload) {
if (payload instanceof ApplicationEvent) {
return Mono.error(new IllegalArgumentException("Cannot publish ApplicationEvent with transactional " +
"source - publish payload object or use publishEvent(Function<Object, ApplicationEvent>"));
}
return publishEvent(source -> new PayloadApplicationEvent<>(source, payload));
}
}

View File

@ -0,0 +1,508 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.event;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.reactive.TransactionalEventPublisher;
import org.springframework.transaction.reactive.TransactionalOperator;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.testfixture.ReactiveCallCountingTransactionManager;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.springframework.transaction.event.TransactionPhase.AFTER_COMMIT;
import static org.springframework.transaction.event.TransactionPhase.AFTER_COMPLETION;
import static org.springframework.transaction.event.TransactionPhase.AFTER_ROLLBACK;
import static org.springframework.transaction.event.TransactionPhase.BEFORE_COMMIT;
/**
* Integration tests for {@link TransactionalEventListener} support
* with reactive transactions.
*
* @author Juergen Hoeller
* @since 6.1
*/
public class ReactiveTransactionalEventListenerTests {
private ConfigurableApplicationContext context;
private EventCollector eventCollector;
private TransactionalOperator transactionalOperator;
@AfterEach
public void closeContext() {
if (this.context != null) {
this.context.close();
}
}
@Test
public void immediately() {
load(ImmediateTestListener.class);
this.transactionalOperator.execute(status -> publishEvent("test").then(Mono.fromRunnable(() -> {
getEventCollector().assertEvents(EventCollector.IMMEDIATELY, "test");
getEventCollector().assertTotalEventsCount(1);
}))).blockFirst();
getEventCollector().assertEvents(EventCollector.IMMEDIATELY, "test");
getEventCollector().assertTotalEventsCount(1);
}
@Test
public void immediatelyImpactsCurrentTransaction() {
load(ImmediateTestListener.class, BeforeCommitTestListener.class);
assertThatIllegalStateException().isThrownBy(() ->
this.transactionalOperator.execute(status -> publishEvent("FAIL").then(Mono.fromRunnable(() -> {
throw new AssertionError("Should have thrown an exception at this point");
}))).blockFirst())
.withMessageContaining("Test exception")
.withMessageContaining(EventCollector.IMMEDIATELY);
getEventCollector().assertEvents(EventCollector.IMMEDIATELY, "FAIL");
getEventCollector().assertTotalEventsCount(1);
}
@Test
public void afterCompletionCommit() {
load(AfterCompletionTestListener.class);
this.transactionalOperator.execute(status -> publishEvent("test")
.then(Mono.fromRunnable(() -> getEventCollector().assertNoEventReceived()))).blockFirst();
getEventCollector().assertEvents(EventCollector.AFTER_COMPLETION, "test");
getEventCollector().assertTotalEventsCount(1); // After rollback not invoked
}
@Test
public void afterCompletionRollback() {
load(AfterCompletionTestListener.class);
this.transactionalOperator.execute(status -> publishEvent("test").then(Mono.fromRunnable(() -> {
getEventCollector().assertNoEventReceived();
status.setRollbackOnly();
}))).blockFirst();
getEventCollector().assertEvents(EventCollector.AFTER_COMPLETION, "test");
getEventCollector().assertTotalEventsCount(1); // After rollback not invoked
}
@Test
public void afterCommit() {
load(AfterCompletionExplicitTestListener.class);
this.transactionalOperator.execute(status -> publishEvent("test")
.then(Mono.fromRunnable(() -> getEventCollector().assertNoEventReceived()))).blockFirst();
getEventCollector().assertEvents(EventCollector.AFTER_COMMIT, "test");
getEventCollector().assertTotalEventsCount(1); // After rollback not invoked
}
@Test
public void afterCommitWithTransactionalComponentListenerProxiedViaDynamicProxy() {
load(TransactionalComponentTestListener.class);
this.transactionalOperator.execute(status -> publishEvent("SKIP")
.then(Mono.fromRunnable(() -> getEventCollector().assertNoEventReceived()))).blockFirst();
getEventCollector().assertNoEventReceived();
}
@Test
public void afterRollback() {
load(AfterCompletionExplicitTestListener.class);
this.transactionalOperator.execute(status -> publishEvent("test").then(Mono.fromRunnable(() -> {
getEventCollector().assertNoEventReceived();
status.setRollbackOnly();
}))).blockFirst();
getEventCollector().assertEvents(EventCollector.AFTER_ROLLBACK, "test");
getEventCollector().assertTotalEventsCount(1); // After commit not invoked
}
@Test
public void beforeCommit() {
load(BeforeCommitTestListener.class);
this.transactionalOperator.execute(status -> publishEvent("test")
.then(Mono.fromRunnable(() -> getEventCollector().assertNoEventReceived()))).blockFirst();
getEventCollector().assertEvents(EventCollector.BEFORE_COMMIT, "test");
getEventCollector().assertTotalEventsCount(1);
}
@Test
public void noTransaction() {
load(BeforeCommitTestListener.class, AfterCompletionTestListener.class,
AfterCompletionExplicitTestListener.class);
publishEvent("test");
getEventCollector().assertTotalEventsCount(0);
}
@Test
public void transactionDemarcationWithNotSupportedPropagation() {
load(BeforeCommitTestListener.class, AfterCompletionTestListener.class);
getContext().getBean(TestBean.class).notSupported().block();
getEventCollector().assertTotalEventsCount(0);
}
@Test
public void transactionDemarcationWithSupportsPropagationAndNoTransaction() {
load(BeforeCommitTestListener.class, AfterCompletionTestListener.class);
getContext().getBean(TestBean.class).supports().block();
getEventCollector().assertTotalEventsCount(0);
}
@Test
public void transactionDemarcationWithSupportsPropagationAndExistingTransaction() {
load(BeforeCommitTestListener.class, AfterCompletionTestListener.class);
this.transactionalOperator.execute(status -> getContext().getBean(TestBean.class).supports()
.then(Mono.fromRunnable(() -> getEventCollector().assertNoEventReceived()))).blockFirst();
getEventCollector().assertTotalEventsCount(2);
}
@Test
public void transactionDemarcationWithRequiredPropagation() {
load(BeforeCommitTestListener.class, AfterCompletionTestListener.class);
getContext().getBean(TestBean.class).required().block();
getEventCollector().assertTotalEventsCount(2);
}
@Test
public void noTransactionWithFallbackExecution() {
load(FallbackExecutionTestListener.class);
getContext().publishEvent("test");
this.eventCollector.assertEvents(EventCollector.BEFORE_COMMIT, "test");
this.eventCollector.assertEvents(EventCollector.AFTER_COMMIT, "test");
this.eventCollector.assertEvents(EventCollector.AFTER_ROLLBACK, "test");
this.eventCollector.assertEvents(EventCollector.AFTER_COMPLETION, "test");
getEventCollector().assertTotalEventsCount(4);
}
@Test
public void conditionFoundOnTransactionalEventListener() {
load(ImmediateTestListener.class);
this.transactionalOperator.execute(status -> publishEvent("SKIP")
.then(Mono.fromRunnable(() -> getEventCollector().assertNoEventReceived()))).blockFirst();
getEventCollector().assertNoEventReceived();
}
@Test
public void afterCommitMetaAnnotation() {
load(AfterCommitMetaAnnotationTestListener.class);
this.transactionalOperator.execute(status -> publishEvent("test")
.then(Mono.fromRunnable(() -> getEventCollector().assertNoEventReceived()))).blockFirst();
getEventCollector().assertEvents(EventCollector.AFTER_COMMIT, "test");
getEventCollector().assertTotalEventsCount(1);
}
@Test
public void conditionFoundOnMetaAnnotation() {
load(AfterCommitMetaAnnotationTestListener.class);
this.transactionalOperator.execute(status -> publishEvent("SKIP")
.then(Mono.fromRunnable(() -> getEventCollector().assertNoEventReceived()))).blockFirst();
getEventCollector().assertNoEventReceived();
}
protected EventCollector getEventCollector() {
return this.eventCollector;
}
protected ConfigurableApplicationContext getContext() {
return this.context;
}
private void load(Class<?>... classes) {
List<Class<?>> allClasses = new ArrayList<>();
allClasses.add(BasicConfiguration.class);
allClasses.addAll(Arrays.asList(classes));
doLoad(allClasses.toArray(new Class<?>[0]));
}
private void doLoad(Class<?>... classes) {
this.context = new AnnotationConfigApplicationContext(classes);
this.eventCollector = this.context.getBean(EventCollector.class);
this.transactionalOperator = this.context.getBean(TransactionalOperator.class);
}
private Mono<Void> publishEvent(Object event) {
return new TransactionalEventPublisher(getContext()).publishEvent(event);
}
@Configuration
@EnableTransactionManagement
static class BasicConfiguration {
@Bean
public EventCollector eventCollector() {
return new EventCollector();
}
@Bean
public TestBean testBean(ApplicationEventPublisher eventPublisher) {
return new TestBean(eventPublisher);
}
@Bean
public ReactiveCallCountingTransactionManager transactionManager() {
return new ReactiveCallCountingTransactionManager();
}
@Bean
public TransactionalOperator transactionTemplate() {
return TransactionalOperator.create(transactionManager());
}
}
static class EventCollector {
public static final String IMMEDIATELY = "IMMEDIATELY";
public static final String BEFORE_COMMIT = "BEFORE_COMMIT";
public static final String AFTER_COMPLETION = "AFTER_COMPLETION";
public static final String AFTER_COMMIT = "AFTER_COMMIT";
public static final String AFTER_ROLLBACK = "AFTER_ROLLBACK";
public static final String[] ALL_PHASES = {IMMEDIATELY, BEFORE_COMMIT, AFTER_COMMIT, AFTER_ROLLBACK};
private final MultiValueMap<String, Object> events = new LinkedMultiValueMap<>();
public void addEvent(String phase, Object event) {
this.events.add(phase, event);
}
public List<Object> getEvents(String phase) {
return this.events.getOrDefault(phase, Collections.emptyList());
}
public void assertNoEventReceived(String... phases) {
if (phases.length == 0) { // All values if none set
phases = ALL_PHASES;
}
for (String phase : phases) {
List<Object> eventsForPhase = getEvents(phase);
assertThat(eventsForPhase.size()).as("Expected no events for phase '" + phase + "' " +
"but got " + eventsForPhase + ":").isEqualTo(0);
}
}
public void assertEvents(String phase, Object... expected) {
List<Object> actual = getEvents(phase);
assertThat(actual.size()).as("wrong number of events for phase '" + phase + "'").isEqualTo(expected.length);
for (int i = 0; i < expected.length; i++) {
assertThat(actual.get(i)).as("Wrong event for phase '" + phase + "' at index " + i).isEqualTo(expected[i]);
}
}
public void assertTotalEventsCount(int number) {
int size = 0;
for (Map.Entry<String, List<Object>> entry : this.events.entrySet()) {
size += entry.getValue().size();
}
assertThat(size).as("Wrong number of total events (" + this.events.size() + ") " +
"registered phase(s)").isEqualTo(number);
}
}
static class TestBean {
private final ApplicationEventPublisher eventPublisher;
TestBean(ApplicationEventPublisher eventPublisher) {
this.eventPublisher = eventPublisher;
}
@Transactional(propagation = Propagation.NOT_SUPPORTED)
public Mono<Void> notSupported() {
return new TransactionalEventPublisher(this.eventPublisher).publishEvent("test");
}
@Transactional(propagation = Propagation.SUPPORTS)
public Mono<Void> supports() {
return new TransactionalEventPublisher(this.eventPublisher).publishEvent("test");
}
@Transactional(propagation = Propagation.REQUIRED)
public Mono<Void> required() {
return new TransactionalEventPublisher(this.eventPublisher).publishEvent("test");
}
}
static abstract class BaseTransactionalTestListener {
static final String FAIL_MSG = "FAIL";
@Autowired
private EventCollector eventCollector;
public void handleEvent(String phase, String data) {
this.eventCollector.addEvent(phase, data);
if (FAIL_MSG.equals(data)) {
throw new IllegalStateException("Test exception on phase '" + phase + "'");
}
}
}
@Component
static class ImmediateTestListener extends BaseTransactionalTestListener {
@EventListener(condition = "!'SKIP'.equals(#data)")
public void handleImmediately(String data) {
handleEvent(EventCollector.IMMEDIATELY, data);
}
}
@Component
static class AfterCompletionTestListener extends BaseTransactionalTestListener {
@TransactionalEventListener(phase = AFTER_COMPLETION)
public void handleAfterCompletion(String data) {
handleEvent(EventCollector.AFTER_COMPLETION, data);
}
}
@Component
static class AfterCompletionExplicitTestListener extends BaseTransactionalTestListener {
@TransactionalEventListener(phase = AFTER_COMMIT)
public void handleAfterCommit(String data) {
handleEvent(EventCollector.AFTER_COMMIT, data);
}
@TransactionalEventListener(phase = AFTER_ROLLBACK)
public void handleAfterRollback(String data) {
handleEvent(EventCollector.AFTER_ROLLBACK, data);
}
}
@Transactional
@Component
interface TransactionalComponentTestListenerInterface {
// Cannot use #data in condition due to dynamic proxy.
@TransactionalEventListener(condition = "!'SKIP'.equals(#p0)")
void handleAfterCommit(String data);
}
static class TransactionalComponentTestListener extends BaseTransactionalTestListener implements
TransactionalComponentTestListenerInterface {
@Override
public void handleAfterCommit(String data) {
handleEvent(EventCollector.AFTER_COMMIT, data);
}
}
@Component
static class BeforeCommitTestListener extends BaseTransactionalTestListener {
@TransactionalEventListener(phase = BEFORE_COMMIT)
@Order(15)
public void handleBeforeCommit(String data) {
handleEvent(EventCollector.BEFORE_COMMIT, data);
}
}
@Component
static class FallbackExecutionTestListener extends BaseTransactionalTestListener {
@TransactionalEventListener(phase = BEFORE_COMMIT, fallbackExecution = true)
public void handleBeforeCommit(String data) {
handleEvent(EventCollector.BEFORE_COMMIT, data);
}
@TransactionalEventListener(phase = AFTER_COMMIT, fallbackExecution = true)
public void handleAfterCommit(String data) {
handleEvent(EventCollector.AFTER_COMMIT, data);
}
@TransactionalEventListener(phase = AFTER_ROLLBACK, fallbackExecution = true)
public void handleAfterRollback(String data) {
handleEvent(EventCollector.AFTER_ROLLBACK, data);
}
@TransactionalEventListener(phase = AFTER_COMPLETION, fallbackExecution = true)
public void handleAfterCompletion(String data) {
handleEvent(EventCollector.AFTER_COMPLETION, data);
}
}
@TransactionalEventListener(phase = AFTER_COMMIT, condition = "!'SKIP'.equals(#p0)")
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@interface AfterCommitEventListener {
}
@Component
static class AfterCommitMetaAnnotationTestListener extends BaseTransactionalTestListener {
@AfterCommitEventListener
public void handleAfterCommit(String data) {
handleEvent(EventCollector.AFTER_COMMIT, data);
}
}
static class EventTransactionSynchronization implements TransactionSynchronization {
private final int order;
EventTransactionSynchronization(int order) {
this.order = order;
}
@Override
public int getOrder() {
return order;
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -36,7 +36,9 @@ import org.springframework.context.annotation.AnnotationConfigApplicationContext
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.context.event.SimpleApplicationEventMulticaster;
import org.springframework.core.annotation.Order;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.annotation.Propagation;
@ -57,6 +59,7 @@ import static org.springframework.transaction.event.TransactionPhase.BEFORE_COMM
/**
* Integration tests for {@link TransactionalEventListener} support
* with thread-bound transactions.
*
* @author Stephane Nicoll
* @author Sam Brannen
@ -87,7 +90,6 @@ public class TransactionalEventListenerTests {
getEventCollector().assertEvents(EventCollector.IMMEDIATELY, "test");
getEventCollector().assertTotalEventsCount(1);
return null;
});
getEventCollector().assertEvents(EventCollector.IMMEDIATELY, "test");
getEventCollector().assertTotalEventsCount(1);
@ -115,7 +117,6 @@ public class TransactionalEventListenerTests {
getContext().publishEvent("test");
getEventCollector().assertNoEventReceived();
return null;
});
getEventCollector().assertEvents(EventCollector.AFTER_COMPLETION, "test");
getEventCollector().assertTotalEventsCount(1); // After rollback not invoked
@ -129,7 +130,6 @@ public class TransactionalEventListenerTests {
getEventCollector().assertNoEventReceived();
status.setRollbackOnly();
return null;
});
getEventCollector().assertEvents(EventCollector.AFTER_COMPLETION, "test");
getEventCollector().assertTotalEventsCount(1); // After rollback not invoked
@ -142,7 +142,6 @@ public class TransactionalEventListenerTests {
getContext().publishEvent("test");
getEventCollector().assertNoEventReceived();
return null;
});
getEventCollector().assertEvents(EventCollector.AFTER_COMMIT, "test");
getEventCollector().assertTotalEventsCount(1); // After rollback not invoked
@ -172,6 +171,19 @@ public class TransactionalEventListenerTests {
getEventCollector().assertTotalEventsCount(1); // After commit not invoked
}
@Test
public void afterRollbackWithCustomExecutor() {
load(AfterCompletionExplicitTestListener.class, MulticasterWithCustomExecutor.class);
this.transactionTemplate.execute(status -> {
getContext().publishEvent("test");
getEventCollector().assertNoEventReceived();
status.setRollbackOnly();
return null;
});
getEventCollector().assertEvents(EventCollector.AFTER_ROLLBACK, "test");
getEventCollector().assertTotalEventsCount(1); // After commit not invoked
}
@Test
public void beforeCommit() {
load(BeforeCommitTestListener.class);
@ -307,13 +319,12 @@ public class TransactionalEventListenerTests {
}
@Test
public void afterCommitMetaAnnotation() throws Exception {
public void afterCommitMetaAnnotation() {
load(AfterCommitMetaAnnotationTestListener.class);
this.transactionTemplate.execute(status -> {
getContext().publishEvent("test");
getEventCollector().assertNoEventReceived();
return null;
});
getEventCollector().assertEvents(EventCollector.AFTER_COMMIT, "test");
getEventCollector().assertTotalEventsCount(1);
@ -326,7 +337,6 @@ public class TransactionalEventListenerTests {
getContext().publishEvent("SKIP");
getEventCollector().assertNoEventReceived();
return null;
});
getEventCollector().assertNoEventReceived();
}
@ -380,6 +390,18 @@ public class TransactionalEventListenerTests {
}
@Configuration
static class MulticasterWithCustomExecutor {
@Bean
public SimpleApplicationEventMulticaster applicationEventMulticaster() {
SimpleApplicationEventMulticaster multicaster = new SimpleApplicationEventMulticaster();
multicaster.setTaskExecutor(new SimpleAsyncTaskExecutor());
return multicaster;
}
}
static class EventCollector {
public static final String IMMEDIATELY = "IMMEDIATELY";