diff --git a/framework-docs/modules/ROOT/pages/data-access/transaction/event.adoc b/framework-docs/modules/ROOT/pages/data-access/transaction/event.adoc index b4edfd8c4e..8d34460168 100644 --- a/framework-docs/modules/ROOT/pages/data-access/transaction/event.adoc +++ b/framework-docs/modules/ROOT/pages/data-access/transaction/event.adoc @@ -57,10 +57,14 @@ attribute of the annotation to `true`. [NOTE] ==== -`@TransactionalEventListener` only works with thread-bound transactions managed by -`PlatformTransactionManager`. A reactive transaction managed by `ReactiveTransactionManager` -uses the Reactor context instead of thread-local attributes, so from the perspective of -an event listener, there is no compatible active transaction that it can participate in. +As of 6.1, `@TransactionalEventListener` can work with thread-bound transactions managed by +`PlatformTransactionManager` as well as reactive transactions managed by `ReactiveTransactionManager`. +For the former, listeners are guaranteed to see the current thread-bound transaction. +Since the latter uses the Reactor context instead of thread-local variables, the transaction +context needs to be included in the published event instance as the event source. +See the +{api-spring-framework}/transaction/reactive/TransactionalEventPublisher.html[`TransactionalEventPublisher`] +javadoc for details. ==== diff --git a/spring-context/src/main/java/org/springframework/context/ApplicationEventPublisher.java b/spring-context/src/main/java/org/springframework/context/ApplicationEventPublisher.java index 779b73c1c4..30cdd4e4d4 100644 --- a/spring-context/src/main/java/org/springframework/context/ApplicationEventPublisher.java +++ b/spring-context/src/main/java/org/springframework/context/ApplicationEventPublisher.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ package org.springframework.context; * @see org.springframework.context.ApplicationEvent * @see org.springframework.context.event.ApplicationEventMulticaster * @see org.springframework.context.event.EventPublicationInterceptor + * @see org.springframework.transaction.event.TransactionalApplicationListener */ @FunctionalInterface public interface ApplicationEventPublisher { @@ -42,8 +43,21 @@ public interface ApplicationEventPublisher { * or even immediate execution at all. Event listeners are encouraged * to be as efficient as possible, individually using asynchronous * execution for longer-running and potentially blocking operations. + *

For usage in a reactive call stack, include event publication + * as a simple hand-off: + * {@code Mono.fromRunnable(() -> eventPublisher.publishEvent(...))}. + * As with any asynchronous execution, thread-local data is not going + * to be available for reactive listener methods. All state which is + * necessary to process the event needs to be included in the event + * instance itself. + *

For the convenient inclusion of the current transaction context + * in a reactive hand-off, consider using + * {@link org.springframework.transaction.reactive.TransactionalEventPublisher#publishEvent(Function)}. + * For thread-bound transactions, this is not necessary since the + * state will be implicitly available through thread-local storage. * @param event the event to publish * @see #publishEvent(Object) + * @see ApplicationListener#supportsAsyncExecution() * @see org.springframework.context.event.ContextRefreshedEvent * @see org.springframework.context.event.ContextClosedEvent */ @@ -61,6 +75,11 @@ public interface ApplicationEventPublisher { * or even immediate execution at all. Event listeners are encouraged * to be as efficient as possible, individually using asynchronous * execution for longer-running and potentially blocking operations. + *

For the convenient inclusion of the current transaction context + * in a reactive hand-off, consider using + * {@link org.springframework.transaction.reactive.TransactionalEventPublisher#publishEvent(Object)}. + * For thread-bound transactions, this is not necessary since the + * state will be implicitly available through thread-local storage. * @param event the event to publish * @since 4.2 * @see #publishEvent(ApplicationEvent) diff --git a/spring-tx/src/main/java/org/springframework/transaction/event/TransactionalApplicationListener.java b/spring-tx/src/main/java/org/springframework/transaction/event/TransactionalApplicationListener.java index e829fe3042..0ba5818c14 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/event/TransactionalApplicationListener.java +++ b/spring-tx/src/main/java/org/springframework/transaction/event/TransactionalApplicationListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,12 +32,13 @@ import org.springframework.lang.Nullable; * allows you to prioritize that listener amongst other listeners running before or after * transaction completion. * - *

NOTE: Transactional event listeners only work with thread-bound transactions - * managed by a {@link org.springframework.transaction.PlatformTransactionManager - * PlatformTransactionManager}. A reactive transaction managed by a - * {@link org.springframework.transaction.ReactiveTransactionManager ReactiveTransactionManager} - * uses the Reactor context instead of thread-local variables, so from the perspective of - * an event listener, there is no compatible active transaction that it can participate in. + *

As of 6.1, transactional event listeners can work with thread-bound transactions managed + * by a {@link org.springframework.transaction.PlatformTransactionManager} as well as reactive + * transactions managed by a {@link org.springframework.transaction.ReactiveTransactionManager}. + * For the former, listeners are guaranteed to see the current thread-bound transaction. + * Since the latter uses the Reactor context instead of thread-local variables, the transaction + * context needs to be included in the published event instance as the event source: + * see {@link org.springframework.transaction.reactive.TransactionalEventPublisher}. * * @author Juergen Hoeller * @author Oliver Drotbohm @@ -60,6 +61,16 @@ public interface TransactionalApplicationListener return Ordered.LOWEST_PRECEDENCE; } + /** + * Transaction-synchronized listeners do not support asynchronous execution, + * only their target listener ({@link #processEvent}) potentially does. + * @since 6.1 + */ + @Override + default boolean supportsAsyncExecution() { + return false; + } + /** * Return an identifier for the listener to be able to refer to it individually. *

It might be necessary for specific completion callback implementations diff --git a/spring-tx/src/main/java/org/springframework/transaction/event/TransactionalApplicationListenerAdapter.java b/spring-tx/src/main/java/org/springframework/transaction/event/TransactionalApplicationListenerAdapter.java index 18ce10c3a5..77ab522ba6 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/event/TransactionalApplicationListenerAdapter.java +++ b/spring-tx/src/main/java/org/springframework/transaction/event/TransactionalApplicationListenerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationListener; import org.springframework.core.Ordered; -import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.util.Assert; /** @@ -128,11 +127,7 @@ public class TransactionalApplicationListenerAdapter @Override public void onApplicationEvent(E event) { - if (TransactionSynchronizationManager.isSynchronizationActive() && - TransactionSynchronizationManager.isActualTransactionActive()) { - TransactionSynchronizationManager.registerSynchronization( - new TransactionalApplicationListenerSynchronization<>(event, this, this.callbacks)); - } + TransactionalApplicationListenerSynchronization.register(event, this, this.callbacks); } } diff --git a/spring-tx/src/main/java/org/springframework/transaction/event/TransactionalApplicationListenerMethodAdapter.java b/spring-tx/src/main/java/org/springframework/transaction/event/TransactionalApplicationListenerMethodAdapter.java index a7830e4761..4b59f70c41 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/event/TransactionalApplicationListenerMethodAdapter.java +++ b/spring-tx/src/main/java/org/springframework/transaction/event/TransactionalApplicationListenerMethodAdapter.java @@ -25,7 +25,6 @@ import org.springframework.context.event.ApplicationListenerMethodAdapter; import org.springframework.context.event.EventListener; import org.springframework.context.event.GenericApplicationListener; import org.springframework.core.annotation.AnnotatedElementUtils; -import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.util.Assert; /** @@ -87,10 +86,10 @@ public class TransactionalApplicationListenerMethodAdapter extends ApplicationLi @Override public void onApplicationEvent(ApplicationEvent event) { - if (TransactionSynchronizationManager.isSynchronizationActive() && - TransactionSynchronizationManager.isActualTransactionActive()) { - TransactionSynchronizationManager.registerSynchronization( - new TransactionalApplicationListenerSynchronization<>(event, this, this.callbacks)); + if (TransactionalApplicationListenerSynchronization.register(event, this, this.callbacks)) { + if (logger.isDebugEnabled()) { + logger.debug("Registered transaction synchronization for " + event); + } } else if (this.annotation.fallbackExecution()) { if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) { diff --git a/spring-tx/src/main/java/org/springframework/transaction/event/TransactionalApplicationListenerSynchronization.java b/spring-tx/src/main/java/org/springframework/transaction/event/TransactionalApplicationListenerSynchronization.java index d8fc358752..2def91e493 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/event/TransactionalApplicationListenerSynchronization.java +++ b/spring-tx/src/main/java/org/springframework/transaction/event/TransactionalApplicationListenerSynchronization.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,19 +18,21 @@ package org.springframework.transaction.event; import java.util.List; +import reactor.core.publisher.Mono; + import org.springframework.context.ApplicationEvent; -import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.core.Ordered; +import org.springframework.transaction.reactive.TransactionContext; /** - * {@link TransactionSynchronization} implementation for event processing with a + * {@code TransactionSynchronization} implementations for event processing with a * {@link TransactionalApplicationListener}. * * @author Juergen Hoeller * @since 5.3 * @param the specific {@code ApplicationEvent} subclass to listen to */ -class TransactionalApplicationListenerSynchronization - implements TransactionSynchronization { +abstract class TransactionalApplicationListenerSynchronization implements Ordered { private final E event; @@ -53,28 +55,11 @@ class TransactionalApplicationListenerSynchronization callback.preProcessEvent(this.event)); try { this.listener.processEvent(this.event); @@ -86,4 +71,94 @@ class TransactionalApplicationListenerSynchronization callback.postProcessEvent(this.event, null)); } + + public static boolean register( + E event, TransactionalApplicationListener listener, + List callbacks) { + + if (org.springframework.transaction.support.TransactionSynchronizationManager.isSynchronizationActive() && + org.springframework.transaction.support.TransactionSynchronizationManager.isActualTransactionActive()) { + org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization( + new PlatformSynchronization<>(event, listener, callbacks)); + return true; + } + else if (event.getSource() instanceof TransactionContext txContext) { + org.springframework.transaction.reactive.TransactionSynchronizationManager rtsm = + new org.springframework.transaction.reactive.TransactionSynchronizationManager(txContext); + if (rtsm.isSynchronizationActive() && rtsm.isActualTransactionActive()) { + rtsm.registerSynchronization(new ReactiveSynchronization<>(event, listener, callbacks)); + return true; + } + } + return false; + } + + + private static class PlatformSynchronization + extends TransactionalApplicationListenerSynchronization + implements org.springframework.transaction.support.TransactionSynchronization { + + public PlatformSynchronization(AE event, TransactionalApplicationListener listener, + List callbacks) { + + super(event, listener, callbacks); + } + + @Override + public void beforeCommit(boolean readOnly) { + if (getTransactionPhase() == TransactionPhase.BEFORE_COMMIT) { + processEventWithCallbacks(); + } + } + + @Override + public void afterCompletion(int status) { + TransactionPhase phase = getTransactionPhase(); + if (phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) { + processEventWithCallbacks(); + } + else if (phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) { + processEventWithCallbacks(); + } + else if (phase == TransactionPhase.AFTER_COMPLETION) { + processEventWithCallbacks(); + } + } + } + + + private static class ReactiveSynchronization + extends TransactionalApplicationListenerSynchronization + implements org.springframework.transaction.reactive.TransactionSynchronization { + + public ReactiveSynchronization(AE event, TransactionalApplicationListener listener, + List callbacks) { + + super(event, listener, callbacks); + } + + @Override + public Mono beforeCommit(boolean readOnly) { + if (getTransactionPhase() == TransactionPhase.BEFORE_COMMIT) { + return Mono.fromRunnable(this::processEventWithCallbacks); + } + return Mono.empty(); + } + + @Override + public Mono afterCompletion(int status) { + TransactionPhase phase = getTransactionPhase(); + if (phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) { + return Mono.fromRunnable(this::processEventWithCallbacks); + } + else if (phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) { + return Mono.fromRunnable(this::processEventWithCallbacks); + } + else if (phase == TransactionPhase.AFTER_COMPLETION) { + return Mono.fromRunnable(this::processEventWithCallbacks); + } + return Mono.empty(); + } + } + } diff --git a/spring-tx/src/main/java/org/springframework/transaction/event/TransactionalEventListener.java b/spring-tx/src/main/java/org/springframework/transaction/event/TransactionalEventListener.java index f70022e7cd..3ade90efc8 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/event/TransactionalEventListener.java +++ b/spring-tx/src/main/java/org/springframework/transaction/event/TransactionalEventListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,12 +37,13 @@ import org.springframework.core.annotation.AliasFor; * method allows you to prioritize that listener amongst other listeners running before * or after transaction completion. * - *

NOTE: Transactional event listeners only work with thread-bound transactions - * managed by a {@link org.springframework.transaction.PlatformTransactionManager - * PlatformTransactionManager}. A reactive transaction managed by a - * {@link org.springframework.transaction.ReactiveTransactionManager ReactiveTransactionManager} - * uses the Reactor context instead of thread-local variables, so from the perspective of - * an event listener, there is no compatible active transaction that it can participate in. + *

As of 6.1, transactional event listeners can work with thread-bound transactions managed + * by a {@link org.springframework.transaction.PlatformTransactionManager} as well as reactive + * transactions managed by a {@link org.springframework.transaction.ReactiveTransactionManager}. + * For the former, listeners are guaranteed to see the current thread-bound transaction. + * Since the latter uses the Reactor context instead of thread-local variables, the transaction + * context needs to be included in the published event instance as the event source: + * see {@link org.springframework.transaction.reactive.TransactionalEventPublisher}. * *

WARNING: if the {@code TransactionPhase} is set to * {@link TransactionPhase#AFTER_COMMIT AFTER_COMMIT} (the default), diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalEventPublisher.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalEventPublisher.java new file mode 100644 index 0000000000..9a778e66c3 --- /dev/null +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalEventPublisher.java @@ -0,0 +1,89 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction.reactive; + +import java.util.function.Function; + +import reactor.core.publisher.Mono; + +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.PayloadApplicationEvent; + +/** + * A delegate for publishing transactional events in a reactive setup. + * Includes the current Reactor-managed {@link TransactionContext} as + * a source object for every {@link ApplicationEvent} to be published. + * + *

This delegate is just a convenience. The current {@link TransactionContext} + * can be directly included as the event source as well, and then published + * through an {@link ApplicationEventPublisher} such as the Spring + * {@link org.springframework.context.ApplicationContext}: + * + *

+ * TransactionContextManager.currentContext()
+ *     .map(source -> new PayloadApplicationEvent<>(source, "myPayload"))
+ *     .doOnSuccess(this.eventPublisher::publishEvent)
+ * 
+ * + * @author Juergen Hoeller + * @since 6.1 + * @see #publishEvent(Function) + * @see #publishEvent(Object) + * @see ApplicationEventPublisher + */ +public class TransactionalEventPublisher { + + private final ApplicationEventPublisher eventPublisher; + + + /** + * Create a new delegate for publishing transactional events in a reactive setup. + * @param eventPublisher the actual event publisher to use, + * typically a Spring {@link org.springframework.context.ApplicationContext} + */ + public TransactionalEventPublisher(ApplicationEventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; + } + + + /** + * Publish an event created through the given function which maps the transaction + * source object (the {@link TransactionContext}) to the event instance. + * @param eventCreationFunction a function mapping the source object to the event instance, + * e.g. {@code source -> new PayloadApplicationEvent<>(source, "myPayload")} + * @return the Reactor {@link Mono} for the transactional event publication + */ + public Mono publishEvent(Function eventCreationFunction) { + return TransactionContextManager.currentContext().map(eventCreationFunction) + .doOnSuccess(this.eventPublisher::publishEvent).then(); + } + + /** + * Publish an event created for the given payload. + * @param payload the payload to publish as an event + * @return the Reactor {@link Mono} for the transactional event publication + */ + public Mono publishEvent(Object payload) { + if (payload instanceof ApplicationEvent) { + return Mono.error(new IllegalArgumentException("Cannot publish ApplicationEvent with transactional " + + "source - publish payload object or use publishEvent(Function")); + } + return publishEvent(source -> new PayloadApplicationEvent<>(source, payload)); + } + +} diff --git a/spring-tx/src/test/java/org/springframework/transaction/event/ReactiveTransactionalEventListenerTests.java b/spring-tx/src/test/java/org/springframework/transaction/event/ReactiveTransactionalEventListenerTests.java new file mode 100644 index 0000000000..b5f827065c --- /dev/null +++ b/spring-tx/src/test/java/org/springframework/transaction/event/ReactiveTransactionalEventListenerTests.java @@ -0,0 +1,508 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction.event; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.event.EventListener; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.EnableTransactionManagement; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.reactive.TransactionalEventPublisher; +import org.springframework.transaction.reactive.TransactionalOperator; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.testfixture.ReactiveCallCountingTransactionManager; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalStateException; +import static org.springframework.transaction.event.TransactionPhase.AFTER_COMMIT; +import static org.springframework.transaction.event.TransactionPhase.AFTER_COMPLETION; +import static org.springframework.transaction.event.TransactionPhase.AFTER_ROLLBACK; +import static org.springframework.transaction.event.TransactionPhase.BEFORE_COMMIT; + +/** + * Integration tests for {@link TransactionalEventListener} support + * with reactive transactions. + * + * @author Juergen Hoeller + * @since 6.1 + */ +public class ReactiveTransactionalEventListenerTests { + + private ConfigurableApplicationContext context; + + private EventCollector eventCollector; + + private TransactionalOperator transactionalOperator; + + + @AfterEach + public void closeContext() { + if (this.context != null) { + this.context.close(); + } + } + + + @Test + public void immediately() { + load(ImmediateTestListener.class); + this.transactionalOperator.execute(status -> publishEvent("test").then(Mono.fromRunnable(() -> { + getEventCollector().assertEvents(EventCollector.IMMEDIATELY, "test"); + getEventCollector().assertTotalEventsCount(1); + }))).blockFirst(); + getEventCollector().assertEvents(EventCollector.IMMEDIATELY, "test"); + getEventCollector().assertTotalEventsCount(1); + } + + @Test + public void immediatelyImpactsCurrentTransaction() { + load(ImmediateTestListener.class, BeforeCommitTestListener.class); + assertThatIllegalStateException().isThrownBy(() -> + this.transactionalOperator.execute(status -> publishEvent("FAIL").then(Mono.fromRunnable(() -> { + throw new AssertionError("Should have thrown an exception at this point"); + }))).blockFirst()) + .withMessageContaining("Test exception") + .withMessageContaining(EventCollector.IMMEDIATELY); + + getEventCollector().assertEvents(EventCollector.IMMEDIATELY, "FAIL"); + getEventCollector().assertTotalEventsCount(1); + } + + @Test + public void afterCompletionCommit() { + load(AfterCompletionTestListener.class); + this.transactionalOperator.execute(status -> publishEvent("test") + .then(Mono.fromRunnable(() -> getEventCollector().assertNoEventReceived()))).blockFirst(); + getEventCollector().assertEvents(EventCollector.AFTER_COMPLETION, "test"); + getEventCollector().assertTotalEventsCount(1); // After rollback not invoked + } + + @Test + public void afterCompletionRollback() { + load(AfterCompletionTestListener.class); + this.transactionalOperator.execute(status -> publishEvent("test").then(Mono.fromRunnable(() -> { + getEventCollector().assertNoEventReceived(); + status.setRollbackOnly(); + }))).blockFirst(); + getEventCollector().assertEvents(EventCollector.AFTER_COMPLETION, "test"); + getEventCollector().assertTotalEventsCount(1); // After rollback not invoked + } + + @Test + public void afterCommit() { + load(AfterCompletionExplicitTestListener.class); + this.transactionalOperator.execute(status -> publishEvent("test") + .then(Mono.fromRunnable(() -> getEventCollector().assertNoEventReceived()))).blockFirst(); + getEventCollector().assertEvents(EventCollector.AFTER_COMMIT, "test"); + getEventCollector().assertTotalEventsCount(1); // After rollback not invoked + } + + @Test + public void afterCommitWithTransactionalComponentListenerProxiedViaDynamicProxy() { + load(TransactionalComponentTestListener.class); + this.transactionalOperator.execute(status -> publishEvent("SKIP") + .then(Mono.fromRunnable(() -> getEventCollector().assertNoEventReceived()))).blockFirst(); + getEventCollector().assertNoEventReceived(); + } + + @Test + public void afterRollback() { + load(AfterCompletionExplicitTestListener.class); + this.transactionalOperator.execute(status -> publishEvent("test").then(Mono.fromRunnable(() -> { + getEventCollector().assertNoEventReceived(); + status.setRollbackOnly(); + }))).blockFirst(); + getEventCollector().assertEvents(EventCollector.AFTER_ROLLBACK, "test"); + getEventCollector().assertTotalEventsCount(1); // After commit not invoked + } + + @Test + public void beforeCommit() { + load(BeforeCommitTestListener.class); + this.transactionalOperator.execute(status -> publishEvent("test") + .then(Mono.fromRunnable(() -> getEventCollector().assertNoEventReceived()))).blockFirst(); + getEventCollector().assertEvents(EventCollector.BEFORE_COMMIT, "test"); + getEventCollector().assertTotalEventsCount(1); + } + + @Test + public void noTransaction() { + load(BeforeCommitTestListener.class, AfterCompletionTestListener.class, + AfterCompletionExplicitTestListener.class); + publishEvent("test"); + getEventCollector().assertTotalEventsCount(0); + } + + @Test + public void transactionDemarcationWithNotSupportedPropagation() { + load(BeforeCommitTestListener.class, AfterCompletionTestListener.class); + getContext().getBean(TestBean.class).notSupported().block(); + getEventCollector().assertTotalEventsCount(0); + } + + @Test + public void transactionDemarcationWithSupportsPropagationAndNoTransaction() { + load(BeforeCommitTestListener.class, AfterCompletionTestListener.class); + getContext().getBean(TestBean.class).supports().block(); + getEventCollector().assertTotalEventsCount(0); + } + + @Test + public void transactionDemarcationWithSupportsPropagationAndExistingTransaction() { + load(BeforeCommitTestListener.class, AfterCompletionTestListener.class); + this.transactionalOperator.execute(status -> getContext().getBean(TestBean.class).supports() + .then(Mono.fromRunnable(() -> getEventCollector().assertNoEventReceived()))).blockFirst(); + getEventCollector().assertTotalEventsCount(2); + } + + @Test + public void transactionDemarcationWithRequiredPropagation() { + load(BeforeCommitTestListener.class, AfterCompletionTestListener.class); + getContext().getBean(TestBean.class).required().block(); + getEventCollector().assertTotalEventsCount(2); + } + + @Test + public void noTransactionWithFallbackExecution() { + load(FallbackExecutionTestListener.class); + getContext().publishEvent("test"); + this.eventCollector.assertEvents(EventCollector.BEFORE_COMMIT, "test"); + this.eventCollector.assertEvents(EventCollector.AFTER_COMMIT, "test"); + this.eventCollector.assertEvents(EventCollector.AFTER_ROLLBACK, "test"); + this.eventCollector.assertEvents(EventCollector.AFTER_COMPLETION, "test"); + getEventCollector().assertTotalEventsCount(4); + } + + @Test + public void conditionFoundOnTransactionalEventListener() { + load(ImmediateTestListener.class); + this.transactionalOperator.execute(status -> publishEvent("SKIP") + .then(Mono.fromRunnable(() -> getEventCollector().assertNoEventReceived()))).blockFirst(); + getEventCollector().assertNoEventReceived(); + } + + @Test + public void afterCommitMetaAnnotation() { + load(AfterCommitMetaAnnotationTestListener.class); + this.transactionalOperator.execute(status -> publishEvent("test") + .then(Mono.fromRunnable(() -> getEventCollector().assertNoEventReceived()))).blockFirst(); + getEventCollector().assertEvents(EventCollector.AFTER_COMMIT, "test"); + getEventCollector().assertTotalEventsCount(1); + } + + @Test + public void conditionFoundOnMetaAnnotation() { + load(AfterCommitMetaAnnotationTestListener.class); + this.transactionalOperator.execute(status -> publishEvent("SKIP") + .then(Mono.fromRunnable(() -> getEventCollector().assertNoEventReceived()))).blockFirst(); + getEventCollector().assertNoEventReceived(); + } + + + protected EventCollector getEventCollector() { + return this.eventCollector; + } + + protected ConfigurableApplicationContext getContext() { + return this.context; + } + + private void load(Class... classes) { + List> allClasses = new ArrayList<>(); + allClasses.add(BasicConfiguration.class); + allClasses.addAll(Arrays.asList(classes)); + doLoad(allClasses.toArray(new Class[0])); + } + + private void doLoad(Class... classes) { + this.context = new AnnotationConfigApplicationContext(classes); + this.eventCollector = this.context.getBean(EventCollector.class); + this.transactionalOperator = this.context.getBean(TransactionalOperator.class); + } + + private Mono publishEvent(Object event) { + return new TransactionalEventPublisher(getContext()).publishEvent(event); + } + + + @Configuration + @EnableTransactionManagement + static class BasicConfiguration { + + @Bean + public EventCollector eventCollector() { + return new EventCollector(); + } + + @Bean + public TestBean testBean(ApplicationEventPublisher eventPublisher) { + return new TestBean(eventPublisher); + } + + @Bean + public ReactiveCallCountingTransactionManager transactionManager() { + return new ReactiveCallCountingTransactionManager(); + } + + @Bean + public TransactionalOperator transactionTemplate() { + return TransactionalOperator.create(transactionManager()); + } + } + + + static class EventCollector { + + public static final String IMMEDIATELY = "IMMEDIATELY"; + + public static final String BEFORE_COMMIT = "BEFORE_COMMIT"; + + public static final String AFTER_COMPLETION = "AFTER_COMPLETION"; + + public static final String AFTER_COMMIT = "AFTER_COMMIT"; + + public static final String AFTER_ROLLBACK = "AFTER_ROLLBACK"; + + public static final String[] ALL_PHASES = {IMMEDIATELY, BEFORE_COMMIT, AFTER_COMMIT, AFTER_ROLLBACK}; + + private final MultiValueMap events = new LinkedMultiValueMap<>(); + + public void addEvent(String phase, Object event) { + this.events.add(phase, event); + } + + public List getEvents(String phase) { + return this.events.getOrDefault(phase, Collections.emptyList()); + } + + public void assertNoEventReceived(String... phases) { + if (phases.length == 0) { // All values if none set + phases = ALL_PHASES; + } + for (String phase : phases) { + List eventsForPhase = getEvents(phase); + assertThat(eventsForPhase.size()).as("Expected no events for phase '" + phase + "' " + + "but got " + eventsForPhase + ":").isEqualTo(0); + } + } + + public void assertEvents(String phase, Object... expected) { + List actual = getEvents(phase); + assertThat(actual.size()).as("wrong number of events for phase '" + phase + "'").isEqualTo(expected.length); + for (int i = 0; i < expected.length; i++) { + assertThat(actual.get(i)).as("Wrong event for phase '" + phase + "' at index " + i).isEqualTo(expected[i]); + } + } + + public void assertTotalEventsCount(int number) { + int size = 0; + for (Map.Entry> entry : this.events.entrySet()) { + size += entry.getValue().size(); + } + assertThat(size).as("Wrong number of total events (" + this.events.size() + ") " + + "registered phase(s)").isEqualTo(number); + } + } + + + static class TestBean { + + private final ApplicationEventPublisher eventPublisher; + + TestBean(ApplicationEventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; + } + + @Transactional(propagation = Propagation.NOT_SUPPORTED) + public Mono notSupported() { + return new TransactionalEventPublisher(this.eventPublisher).publishEvent("test"); + } + + @Transactional(propagation = Propagation.SUPPORTS) + public Mono supports() { + return new TransactionalEventPublisher(this.eventPublisher).publishEvent("test"); + } + + @Transactional(propagation = Propagation.REQUIRED) + public Mono required() { + return new TransactionalEventPublisher(this.eventPublisher).publishEvent("test"); + } + } + + + static abstract class BaseTransactionalTestListener { + + static final String FAIL_MSG = "FAIL"; + + @Autowired + private EventCollector eventCollector; + + public void handleEvent(String phase, String data) { + this.eventCollector.addEvent(phase, data); + if (FAIL_MSG.equals(data)) { + throw new IllegalStateException("Test exception on phase '" + phase + "'"); + } + } + } + + + @Component + static class ImmediateTestListener extends BaseTransactionalTestListener { + + @EventListener(condition = "!'SKIP'.equals(#data)") + public void handleImmediately(String data) { + handleEvent(EventCollector.IMMEDIATELY, data); + } + } + + + @Component + static class AfterCompletionTestListener extends BaseTransactionalTestListener { + + @TransactionalEventListener(phase = AFTER_COMPLETION) + public void handleAfterCompletion(String data) { + handleEvent(EventCollector.AFTER_COMPLETION, data); + } + } + + + @Component + static class AfterCompletionExplicitTestListener extends BaseTransactionalTestListener { + + @TransactionalEventListener(phase = AFTER_COMMIT) + public void handleAfterCommit(String data) { + handleEvent(EventCollector.AFTER_COMMIT, data); + } + + @TransactionalEventListener(phase = AFTER_ROLLBACK) + public void handleAfterRollback(String data) { + handleEvent(EventCollector.AFTER_ROLLBACK, data); + } + } + + + @Transactional + @Component + interface TransactionalComponentTestListenerInterface { + + // Cannot use #data in condition due to dynamic proxy. + @TransactionalEventListener(condition = "!'SKIP'.equals(#p0)") + void handleAfterCommit(String data); + } + + + static class TransactionalComponentTestListener extends BaseTransactionalTestListener implements + TransactionalComponentTestListenerInterface { + + @Override + public void handleAfterCommit(String data) { + handleEvent(EventCollector.AFTER_COMMIT, data); + } + } + + + @Component + static class BeforeCommitTestListener extends BaseTransactionalTestListener { + + @TransactionalEventListener(phase = BEFORE_COMMIT) + @Order(15) + public void handleBeforeCommit(String data) { + handleEvent(EventCollector.BEFORE_COMMIT, data); + } + } + + + @Component + static class FallbackExecutionTestListener extends BaseTransactionalTestListener { + + @TransactionalEventListener(phase = BEFORE_COMMIT, fallbackExecution = true) + public void handleBeforeCommit(String data) { + handleEvent(EventCollector.BEFORE_COMMIT, data); + } + + @TransactionalEventListener(phase = AFTER_COMMIT, fallbackExecution = true) + public void handleAfterCommit(String data) { + handleEvent(EventCollector.AFTER_COMMIT, data); + } + + @TransactionalEventListener(phase = AFTER_ROLLBACK, fallbackExecution = true) + public void handleAfterRollback(String data) { + handleEvent(EventCollector.AFTER_ROLLBACK, data); + } + + @TransactionalEventListener(phase = AFTER_COMPLETION, fallbackExecution = true) + public void handleAfterCompletion(String data) { + handleEvent(EventCollector.AFTER_COMPLETION, data); + } + } + + + @TransactionalEventListener(phase = AFTER_COMMIT, condition = "!'SKIP'.equals(#p0)") + @Target(ElementType.METHOD) + @Retention(RetentionPolicy.RUNTIME) + @interface AfterCommitEventListener { + } + + + @Component + static class AfterCommitMetaAnnotationTestListener extends BaseTransactionalTestListener { + + @AfterCommitEventListener + public void handleAfterCommit(String data) { + handleEvent(EventCollector.AFTER_COMMIT, data); + } + } + + + static class EventTransactionSynchronization implements TransactionSynchronization { + + private final int order; + + EventTransactionSynchronization(int order) { + this.order = order; + } + + @Override + public int getOrder() { + return order; + } + } + +} diff --git a/spring-tx/src/test/java/org/springframework/transaction/event/TransactionalEventListenerTests.java b/spring-tx/src/test/java/org/springframework/transaction/event/TransactionalEventListenerTests.java index 22ed99a2b9..725e60cf36 100644 --- a/spring-tx/src/test/java/org/springframework/transaction/event/TransactionalEventListenerTests.java +++ b/spring-tx/src/test/java/org/springframework/transaction/event/TransactionalEventListenerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,7 +36,9 @@ import org.springframework.context.annotation.AnnotationConfigApplicationContext import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.event.EventListener; +import org.springframework.context.event.SimpleApplicationEventMulticaster; import org.springframework.core.annotation.Order; +import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.EnableTransactionManagement; import org.springframework.transaction.annotation.Propagation; @@ -57,6 +59,7 @@ import static org.springframework.transaction.event.TransactionPhase.BEFORE_COMM /** * Integration tests for {@link TransactionalEventListener} support + * with thread-bound transactions. * * @author Stephane Nicoll * @author Sam Brannen @@ -87,7 +90,6 @@ public class TransactionalEventListenerTests { getEventCollector().assertEvents(EventCollector.IMMEDIATELY, "test"); getEventCollector().assertTotalEventsCount(1); return null; - }); getEventCollector().assertEvents(EventCollector.IMMEDIATELY, "test"); getEventCollector().assertTotalEventsCount(1); @@ -115,7 +117,6 @@ public class TransactionalEventListenerTests { getContext().publishEvent("test"); getEventCollector().assertNoEventReceived(); return null; - }); getEventCollector().assertEvents(EventCollector.AFTER_COMPLETION, "test"); getEventCollector().assertTotalEventsCount(1); // After rollback not invoked @@ -129,7 +130,6 @@ public class TransactionalEventListenerTests { getEventCollector().assertNoEventReceived(); status.setRollbackOnly(); return null; - }); getEventCollector().assertEvents(EventCollector.AFTER_COMPLETION, "test"); getEventCollector().assertTotalEventsCount(1); // After rollback not invoked @@ -142,7 +142,6 @@ public class TransactionalEventListenerTests { getContext().publishEvent("test"); getEventCollector().assertNoEventReceived(); return null; - }); getEventCollector().assertEvents(EventCollector.AFTER_COMMIT, "test"); getEventCollector().assertTotalEventsCount(1); // After rollback not invoked @@ -172,6 +171,19 @@ public class TransactionalEventListenerTests { getEventCollector().assertTotalEventsCount(1); // After commit not invoked } + @Test + public void afterRollbackWithCustomExecutor() { + load(AfterCompletionExplicitTestListener.class, MulticasterWithCustomExecutor.class); + this.transactionTemplate.execute(status -> { + getContext().publishEvent("test"); + getEventCollector().assertNoEventReceived(); + status.setRollbackOnly(); + return null; + }); + getEventCollector().assertEvents(EventCollector.AFTER_ROLLBACK, "test"); + getEventCollector().assertTotalEventsCount(1); // After commit not invoked + } + @Test public void beforeCommit() { load(BeforeCommitTestListener.class); @@ -307,13 +319,12 @@ public class TransactionalEventListenerTests { } @Test - public void afterCommitMetaAnnotation() throws Exception { + public void afterCommitMetaAnnotation() { load(AfterCommitMetaAnnotationTestListener.class); this.transactionTemplate.execute(status -> { getContext().publishEvent("test"); getEventCollector().assertNoEventReceived(); return null; - }); getEventCollector().assertEvents(EventCollector.AFTER_COMMIT, "test"); getEventCollector().assertTotalEventsCount(1); @@ -326,7 +337,6 @@ public class TransactionalEventListenerTests { getContext().publishEvent("SKIP"); getEventCollector().assertNoEventReceived(); return null; - }); getEventCollector().assertNoEventReceived(); } @@ -380,6 +390,18 @@ public class TransactionalEventListenerTests { } + @Configuration + static class MulticasterWithCustomExecutor { + + @Bean + public SimpleApplicationEventMulticaster applicationEventMulticaster() { + SimpleApplicationEventMulticaster multicaster = new SimpleApplicationEventMulticaster(); + multicaster.setTaskExecutor(new SimpleAsyncTaskExecutor()); + return multicaster; + } + } + + static class EventCollector { public static final String IMMEDIATELY = "IMMEDIATELY";