Introduce TransactionalApplicationListener interface (with callback support)

Includes forPayload methods and common adapter classes for programmatic usage.
Aligns default order values for event handling delegates to LOWEST_PRECEDENCE.

Closes gh-24163
This commit is contained in:
Juergen Hoeller 2020-10-22 15:19:32 +02:00
parent cfc3522641
commit 95110d8257
19 changed files with 905 additions and 184 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,6 +17,7 @@
package org.springframework.context;
import java.util.EventListener;
import java.util.function.Consumer;
/**
* Interface to be implemented by application event listeners.
@ -45,4 +46,17 @@ public interface ApplicationListener<E extends ApplicationEvent> extends EventLi
*/
void onApplicationEvent(E event);
/**
* Create a new {@code ApplicationListener} for the given payload consumer.
* @param consumer the event payload consumer
* @param <T> the type of the event payload
* @return a corresponding {@code ApplicationListener} instance
* @since 5.3
* @see PayloadApplicationEvent
*/
static <T> ApplicationListener<PayloadApplicationEvent<T>> forPayload(Consumer<T> consumer) {
return event -> consumer.accept(event.getPayload());
}
}

View File

@ -37,6 +37,7 @@ import org.springframework.context.ApplicationEvent;
import org.springframework.context.PayloadApplicationEvent;
import org.springframework.context.expression.AnnotatedElementKey;
import org.springframework.core.BridgeMethodResolver;
import org.springframework.core.Ordered;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.ResolvableType;
@ -95,6 +96,12 @@ public class ApplicationListenerMethodAdapter implements GenericApplicationListe
private EventExpressionEvaluator evaluator;
/**
* Construct a new ApplicationListenerMethodAdapter.
* @param beanName the name of the bean to invoke the listener method on
* @param targetClass the target class that the method is declared on
* @param method the listener method to invoke
*/
public ApplicationListenerMethodAdapter(String beanName, Class<?> targetClass, Method method) {
this.beanName = beanName;
this.method = BridgeMethodResolver.findBridgedMethod(method);
@ -135,7 +142,7 @@ public class ApplicationListenerMethodAdapter implements GenericApplicationListe
private static int resolveOrder(Method method) {
Order ann = AnnotatedElementUtils.findMergedAnnotation(method, Order.class);
return (ann != null ? ann.value() : 0);
return (ann != null ? ann.value() : Ordered.LOWEST_PRECEDENCE);
}
@ -332,6 +339,14 @@ public class ApplicationListenerMethodAdapter implements GenericApplicationListe
return this.applicationContext.getBean(this.beanName);
}
/**
* Return the target listener method.
* @since 5.3
*/
protected Method getTargetMethod() {
return this.targetMethod;
}
/**
* Return the condition to use.
* <p>Matches the {@code condition} attribute of the {@link EventListener}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -29,6 +29,7 @@ import org.springframework.core.Ordered;
*
* @author Stephane Nicoll
* @since 4.2
* @see ApplicationListenerMethodAdapter
*/
public class DefaultEventListenerFactory implements EventListenerFactory, Ordered {

View File

@ -26,6 +26,7 @@ import org.springframework.aop.framework.ProxyFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.PayloadApplicationEvent;
import org.springframework.core.Ordered;
import org.springframework.core.ResolvableType;
import org.springframework.core.ResolvableTypeProvider;
import org.springframework.core.annotation.Order;
@ -161,7 +162,7 @@ public class ApplicationListenerMethodAdapterTests extends AbstractApplicationEv
Method method = ReflectionUtils.findMethod(
SampleEvents.class, "handleGenericString", GenericTestEvent.class);
ApplicationListenerMethodAdapter adapter = createTestInstance(method);
assertThat(adapter.getOrder()).isEqualTo(0);
assertThat(adapter.getOrder()).isEqualTo(Ordered.LOWEST_PRECEDENCE);
}
@Test

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -22,8 +22,11 @@ import java.util.List;
import org.junit.jupiter.api.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.PayloadApplicationEvent;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.stereotype.Component;
import static org.assertj.core.api.Assertions.assertThat;
@ -34,14 +37,42 @@ import static org.assertj.core.api.Assertions.assertThat;
public class PayloadApplicationEventTests {
@Test
@SuppressWarnings({ "rawtypes", "resource" })
public void testEventClassWithInterface() {
ApplicationContext ac = new AnnotationConfigApplicationContext(AuditableListener.class);
AuditablePayloadEvent event = new AuditablePayloadEvent<>(this, "xyz");
AuditablePayloadEvent<String> event = new AuditablePayloadEvent<>(this, "xyz");
ac.publishEvent(event);
assertThat(ac.getBean(AuditableListener.class).events.contains(event)).isTrue();
}
@Test
public void testProgrammaticEventListener() {
List<Auditable> events = new ArrayList<>();
ApplicationListener<AuditablePayloadEvent<String>> listener = events::add;
ConfigurableApplicationContext ac = new GenericApplicationContext();
ac.addApplicationListener(listener);
ac.refresh();
AuditablePayloadEvent<String> event = new AuditablePayloadEvent<>(this, "xyz");
ac.publishEvent(event);
assertThat(events.contains(event)).isTrue();
}
@Test
public void testProgrammaticPayloadListener() {
List<String> events = new ArrayList<>();
ApplicationListener<PayloadApplicationEvent<String>> listener = ApplicationListener.forPayload(events::add);
ConfigurableApplicationContext ac = new GenericApplicationContext();
ac.addApplicationListener(listener);
ac.refresh();
AuditablePayloadEvent<String> event = new AuditablePayloadEvent<>(this, "xyz");
ac.publishEvent(event);
assertThat(events.contains(event.getPayload())).isTrue();
}
public interface Auditable {
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,6 +16,8 @@
package org.springframework.transaction.event;
import java.util.function.Consumer;
import org.springframework.transaction.support.TransactionSynchronization;
/**
@ -24,7 +26,9 @@ import org.springframework.transaction.support.TransactionSynchronization;
* @author Stephane Nicoll
* @author Juergen Hoeller
* @since 4.2
* @see TransactionalEventListener
* @see TransactionalEventListener#phase()
* @see TransactionalApplicationListener#getTransactionPhase()
* @see TransactionalApplicationListener#forPayload(TransactionPhase, Consumer)
*/
public enum TransactionPhase {

View File

@ -0,0 +1,155 @@
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.event;
import java.util.function.Consumer;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.PayloadApplicationEvent;
import org.springframework.core.Ordered;
import org.springframework.lang.Nullable;
/**
* An {@link ApplicationListener} that is invoked according to a {@link TransactionPhase}.
* This is a programmatic equivalent of the {@link TransactionalEventListener} annotation.
*
* <p>Adding {@link org.springframework.core.Ordered} to your listener implementation
* allows you to prioritize that listener amongst other listeners running before or after
* transaction completion.
*
* <p><b>NOTE: Transactional event listeners only work with thread-bound transactions
* managed by {@link org.springframework.transaction.PlatformTransactionManager}.</b>
* A reactive transaction managed by {@link org.springframework.transaction.ReactiveTransactionManager}
* uses the Reactor context instead of thread-local attributes, so from the perspective of
* an event listener, there is no compatible active transaction that it can participate in.
*
* @author Juergen Hoeller
* @author Oliver Drotbohm
* @since 5.3
* @param <E> the specific {@code ApplicationEvent} subclass to listen to
* @see TransactionalEventListener
* @see TransactionalApplicationListenerAdapter
* @see #forPayload
*/
public interface TransactionalApplicationListener<E extends ApplicationEvent>
extends ApplicationListener<E>, Ordered {
/**
* Return the execution order within transaction synchronizations.
* <p>Default is {@link Ordered#LOWEST_PRECEDENCE}.
* @see org.springframework.transaction.support.TransactionSynchronization#getOrder()
*/
@Override
default int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
/**
* Return the {@link TransactionPhase} in which the listener will be invoked.
* <p>The default phase is {@link TransactionPhase#AFTER_COMMIT}.
*/
default TransactionPhase getTransactionPhase() {
return TransactionPhase.AFTER_COMMIT;
}
/**
* Return an identifier for the listener to be able to refer to it individually.
* <p>It might be necessary for specific completion callback implementations
* to provide a specific id, whereas for other scenarios an empty String
* (as the common default value) is acceptable as well.
* @see #addCallback
*/
default String getListenerId() {
return "";
}
/**
* Add a callback to be invoked on processing within transaction synchronization,
* i.e. when {@link #processEvent} is being triggered during actual transactions.
* @param callback the synchronization callback to apply
*/
void addCallback(SynchronizationCallback callback);
/**
* Immediately process the given {@link ApplicationEvent}. In contrast to
* {@link #onApplicationEvent(ApplicationEvent)}, a call to this method will
* directly process the given event without deferring it to the associated
* {@link #getTransactionPhase() transaction phase}.
* @param event the event to process through the target listener implementation
*/
void processEvent(E event);
/**
* Create a new {@code TransactionalApplicationListener} for the given payload consumer,
* to be applied in the default phase {@link TransactionPhase#AFTER_COMMIT}.
* @param consumer the event payload consumer
* @param <T> the type of the event payload
* @return a corresponding {@code TransactionalApplicationListener} instance
* @see PayloadApplicationEvent#getPayload()
* @see TransactionalApplicationListenerAdapter
*/
static <T> TransactionalApplicationListener<PayloadApplicationEvent<T>> forPayload(Consumer<T> consumer) {
return forPayload(TransactionPhase.AFTER_COMMIT, consumer);
}
/**
* Create a new {@code TransactionalApplicationListener} for the given payload consumer.
* @param phase the transaction phase in which to invoke the listener
* @param consumer the event payload consumer
* @param <T> the type of the event payload
* @return a corresponding {@code TransactionalApplicationListener} instance
* @see PayloadApplicationEvent#getPayload()
* @see TransactionalApplicationListenerAdapter
*/
static <T> TransactionalApplicationListener<PayloadApplicationEvent<T>> forPayload(
TransactionPhase phase, Consumer<T> consumer) {
TransactionalApplicationListenerAdapter<PayloadApplicationEvent<T>> listener =
new TransactionalApplicationListenerAdapter<>(event -> consumer.accept(event.getPayload()));
listener.setTransactionPhase(phase);
return listener;
}
/**
* Callback to be invoked on synchronization-driven event processing,
* wrapping the target listener invocation ({@link #processEvent}).
*
* @see #addCallback
* @see #processEvent
*/
interface SynchronizationCallback {
/**
* Called before transactional event listener invocation.
* @param event the event that transaction synchronization is about to process
*/
default void preProcessEvent(ApplicationEvent event) {
}
/**
* Called after a transactional event listener invocation.
* @param event the event that transaction synchronization finished processing
* @param ex an exception that occurred during listener invocation, if any
*/
default void postProcessEvent(ApplicationEvent event, @Nullable Throwable ex) {
}
}
}

View File

@ -0,0 +1,138 @@
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.event;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.core.Ordered;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
/**
* {@link TransactionalApplicationListener} adapter that delegates the processing of
* an event to a target {@link ApplicationListener} instance. Supports the exact
* same features as any regular {@link ApplicationListener} but is aware of the
* transactional context of the event publisher.
*
* <p>For simple {@link org.springframework.context.PayloadApplicationEvent} handling,
* consider the {@link TransactionalApplicationListener#forPayload} factory methods
* as a convenient alternative to custom usage of this adapter class.
*
* @author Juergen Hoeller
* @since 5.3
* @param <E> the specific {@code ApplicationEvent} subclass to listen to
* @see TransactionalApplicationListener
* @see TransactionalEventListener
* @see TransactionalApplicationListenerMethodAdapter
*/
public class TransactionalApplicationListenerAdapter<E extends ApplicationEvent>
implements TransactionalApplicationListener<E>, Ordered {
private final ApplicationListener<E> targetListener;
private int order = Ordered.LOWEST_PRECEDENCE;
private TransactionPhase transactionPhase = TransactionPhase.AFTER_COMMIT;
private String listenerId = "";
private final List<SynchronizationCallback> callbacks = new CopyOnWriteArrayList<>();
/**
* Construct a new TransactionalApplicationListenerAdapter.
* @param targetListener the actual listener to invoke in the specified transaction phase
* @see #setTransactionPhase
* @see TransactionalApplicationListener#forPayload
*/
public TransactionalApplicationListenerAdapter(ApplicationListener<E> targetListener) {
this.targetListener = targetListener;
}
/**
* Specify the synchronization order for the listener.
*/
public void setOrder(int order) {
this.order = order;
}
/**
* Return the synchronization order for the listener.
*/
@Override
public int getOrder() {
return this.order;
}
/**
* Specify the transaction phase to invoke the listener in.
* <p>The default is {@link TransactionPhase#AFTER_COMMIT}.
*/
public void setTransactionPhase(TransactionPhase transactionPhase) {
this.transactionPhase = transactionPhase;
}
/**
* Return the transaction phase to invoke the listener in.
*/
@Override
public TransactionPhase getTransactionPhase() {
return this.transactionPhase;
}
/**
* Specify an id to identify the listener with.
* <p>The default is an empty String.
*/
public void setListenerId(String listenerId) {
this.listenerId = listenerId;
}
/**
* Return an id to identify the listener with.
*/
@Override
public String getListenerId() {
return this.listenerId;
}
@Override
public void addCallback(SynchronizationCallback callback) {
Assert.notNull(callback, "SynchronizationCallback must not be null");
this.callbacks.add(callback);
}
@Override
public void processEvent(E event) {
this.targetListener.onApplicationEvent(event);
}
@Override
public void onApplicationEvent(E event) {
if (TransactionSynchronizationManager.isSynchronizationActive() &&
TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronizationManager.registerSynchronization(
new TransactionalApplicationListenerSynchronization<>(event, this, this.callbacks));
}
}
}

View File

@ -17,14 +17,19 @@
package org.springframework.transaction.event;
import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.event.ApplicationListenerMethodAdapter;
import org.springframework.context.event.EventListener;
import org.springframework.context.event.GenericApplicationListener;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.lang.Nullable;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;
/**
* {@link GenericApplicationListener} adapter that delegates the processing of
@ -38,22 +43,76 @@ import org.springframework.transaction.support.TransactionSynchronizationManager
*
* @author Stephane Nicoll
* @author Juergen Hoeller
* @since 4.2
* @see ApplicationListenerMethodAdapter
* @since 5.3
* @see TransactionalEventListener
* @see TransactionalApplicationListener
* @see TransactionalApplicationListenerAdapter
*/
class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {
public class TransactionalApplicationListenerMethodAdapter extends ApplicationListenerMethodAdapter
implements TransactionalApplicationListener<ApplicationEvent> {
private final TransactionalEventListener annotation;
private final TransactionPhase transactionPhase;
public ApplicationListenerMethodTransactionalAdapter(String beanName, Class<?> targetClass, Method method) {
@Nullable
private volatile String listenerId;
private final List<SynchronizationCallback> callbacks = new CopyOnWriteArrayList<>();
/**
* Construct a new TransactionalApplicationListenerMethodAdapter.
* @param beanName the name of the bean to invoke the listener method on
* @param targetClass the target class that the method is declared on
* @param method the listener method to invoke
*/
public TransactionalApplicationListenerMethodAdapter(String beanName, Class<?> targetClass, Method method) {
super(beanName, targetClass, method);
TransactionalEventListener ann = AnnotatedElementUtils.findMergedAnnotation(method, TransactionalEventListener.class);
TransactionalEventListener ann =
AnnotatedElementUtils.findMergedAnnotation(method, TransactionalEventListener.class);
if (ann == null) {
throw new IllegalStateException("No TransactionalEventListener annotation found on method: " + method);
}
this.annotation = ann;
this.transactionPhase = ann.phase();
}
@Override
public TransactionPhase getTransactionPhase() {
return this.transactionPhase;
}
@Override
public String getListenerId() {
String id = this.listenerId;
if (id == null) {
id = this.annotation.id();
if (id.isEmpty()) {
id = getDefaultListenerId();
}
this.listenerId = id;
}
return id;
}
/**
* Determine the default id for the target listener, to be applied in case of
* no {@link TransactionalEventListener#id() annotation-specified id value}.
* <p>The default implementation builds a method name with parameter types.
* @see #getListenerId()
*/
protected String getDefaultListenerId() {
Method method = getTargetMethod();
return ClassUtils.getQualifiedMethodName(method) +
"(" + StringUtils.arrayToDelimitedString(method.getParameterTypes(), ",") + ")";
}
@Override
public void addCallback(SynchronizationCallback callback) {
Assert.notNull(callback, "SynchronizationCallback must not be null");
this.callbacks.add(callback);
}
@ -61,8 +120,8 @@ class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerM
public void onApplicationEvent(ApplicationEvent event) {
if (TransactionSynchronizationManager.isSynchronizationActive() &&
TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
TransactionSynchronizationManager.registerSynchronization(
new TransactionalApplicationListenerSynchronization<>(event, this, this.callbacks));
}
else if (this.annotation.fallbackExecution()) {
if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
@ -78,55 +137,4 @@ class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerM
}
}
private TransactionSynchronization createTransactionSynchronization(ApplicationEvent event) {
return new TransactionSynchronizationEventAdapter(this, event, this.annotation.phase());
}
private static class TransactionSynchronizationEventAdapter implements TransactionSynchronization {
private final ApplicationListenerMethodAdapter listener;
private final ApplicationEvent event;
private final TransactionPhase phase;
public TransactionSynchronizationEventAdapter(ApplicationListenerMethodAdapter listener,
ApplicationEvent event, TransactionPhase phase) {
this.listener = listener;
this.event = event;
this.phase = phase;
}
@Override
public int getOrder() {
return this.listener.getOrder();
}
@Override
public void beforeCommit(boolean readOnly) {
if (this.phase == TransactionPhase.BEFORE_COMMIT) {
processEvent();
}
}
@Override
public void afterCompletion(int status) {
if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
processEvent();
}
else if (this.phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
processEvent();
}
else if (this.phase == TransactionPhase.AFTER_COMPLETION) {
processEvent();
}
}
protected void processEvent() {
this.listener.processEvent(this.event);
}
}
}

View File

@ -0,0 +1,89 @@
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.event;
import java.util.List;
import org.springframework.context.ApplicationEvent;
import org.springframework.transaction.support.TransactionSynchronization;
/**
* {@link TransactionSynchronization} implementation for event processing with a
* {@link TransactionalApplicationListener}.
*
* @author Juergen Hoeller
* @since 5.3
* @param <E> the specific {@code ApplicationEvent} subclass to listen to
*/
class TransactionalApplicationListenerSynchronization<E extends ApplicationEvent>
implements TransactionSynchronization {
private final E event;
private final TransactionalApplicationListener<E> listener;
private final List<TransactionalApplicationListener.SynchronizationCallback> callbacks;
public TransactionalApplicationListenerSynchronization(E event, TransactionalApplicationListener<E> listener,
List<TransactionalApplicationListener.SynchronizationCallback> callbacks) {
this.event = event;
this.listener = listener;
this.callbacks = callbacks;
}
@Override
public int getOrder() {
return this.listener.getOrder();
}
@Override
public void beforeCommit(boolean readOnly) {
if (this.listener.getTransactionPhase() == TransactionPhase.BEFORE_COMMIT) {
processEventWithCallbacks();
}
}
@Override
public void afterCompletion(int status) {
TransactionPhase phase = this.listener.getTransactionPhase();
if (phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
processEventWithCallbacks();
}
else if (phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
processEventWithCallbacks();
}
else if (phase == TransactionPhase.AFTER_COMPLETION) {
processEventWithCallbacks();
}
}
private void processEventWithCallbacks() {
this.callbacks.forEach(callback -> callback.preProcessEvent(this.event));
try {
this.listener.processEvent(this.event);
}
catch (RuntimeException | Error ex) {
this.callbacks.forEach(callback -> callback.postProcessEvent(this.event, ex));
throw ex;
}
this.callbacks.forEach(callback -> callback.postProcessEvent(this.event, null));
}
}

View File

@ -27,6 +27,7 @@ import org.springframework.core.annotation.AliasFor;
/**
* An {@link EventListener} that is invoked according to a {@link TransactionPhase}.
* This is an an annotation-based equivalent of {@link TransactionalApplicationListener}.
*
* <p>If the event is not published within an active transaction, the event is discarded
* unless the {@link #fallbackExecution} flag is explicitly set. If a transaction is
@ -44,7 +45,10 @@ import org.springframework.core.annotation.AliasFor;
*
* @author Stephane Nicoll
* @author Sam Brannen
* @author Oliver Drotbohm
* @since 4.2
* @see TransactionalApplicationListener
* @see TransactionalApplicationListenerMethodAdapter
*/
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@ -60,6 +64,13 @@ public @interface TransactionalEventListener {
*/
TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;
/**
* An optional identifier to uniquely reference the listener.
* @since 5.3
* @see TransactionalApplicationListener#getListenerId()
*/
String id() default "";
/**
* Whether the event should be processed if no transaction is running.
*/

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -29,6 +29,7 @@ import org.springframework.core.annotation.AnnotatedElementUtils;
*
* @author Stephane Nicoll
* @since 4.2
* @see TransactionalApplicationListenerMethodAdapter
*/
public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {
@ -52,7 +53,7 @@ public class TransactionalEventListenerFactory implements EventListenerFactory,
@Override
public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);
return new TransactionalApplicationListenerMethodAdapter(beanName, type, method);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -85,7 +85,7 @@ abstract class TransactionSynchronizationUtils {
public static Mono<Void> triggerBeforeCompletion(Collection<TransactionSynchronization> synchronizations) {
return Flux.fromIterable(synchronizations)
.concatMap(TransactionSynchronization::beforeCompletion).onErrorContinue((t, o) ->
logger.error("TransactionSynchronization.beforeCompletion threw exception", t)).then();
logger.debug("TransactionSynchronization.beforeCompletion threw exception", t)).then();
}
/**
@ -115,7 +115,7 @@ abstract class TransactionSynchronizationUtils {
Collection<TransactionSynchronization> synchronizations, int completionStatus) {
return Flux.fromIterable(synchronizations).concatMap(it -> it.afterCompletion(completionStatus))
.onErrorContinue((t, o) -> logger.error("TransactionSynchronization.afterCompletion threw exception", t)).then();
.onErrorContinue((t, o) -> logger.debug("TransactionSynchronization.afterCompletion threw exception", t)).then();
}

View File

@ -54,6 +54,10 @@ public interface TransactionSynchronization extends Ordered, Flushable {
int STATUS_UNKNOWN = 2;
/**
* Return the execution order for this transaction synchronization.
* <p>Default is {@link Ordered#LOWEST_PRECEDENCE}.
*/
@Override
default int getOrder() {
return Ordered.LOWEST_PRECEDENCE;

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -106,8 +106,8 @@ public abstract class TransactionSynchronizationUtils {
try {
synchronization.beforeCompletion();
}
catch (Throwable tsex) {
logger.error("TransactionSynchronization.beforeCompletion threw exception", tsex);
catch (Throwable ex) {
logger.debug("TransactionSynchronization.beforeCompletion threw exception", ex);
}
}
}
@ -170,8 +170,8 @@ public abstract class TransactionSynchronizationUtils {
try {
synchronization.afterCompletion(completionStatus);
}
catch (Throwable tsex) {
logger.error("TransactionSynchronization.afterCompletion threw exception", tsex);
catch (Throwable ex) {
logger.debug("TransactionSynchronization.afterCompletion threw exception", ex);
}
}
}

View File

@ -1,106 +0,0 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.event;
import java.lang.reflect.Method;
import org.junit.jupiter.api.Test;
import org.springframework.context.PayloadApplicationEvent;
import org.springframework.context.event.ApplicationListenerMethodAdapter;
import org.springframework.core.ResolvableType;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.util.ReflectionUtils;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Stephane Nicoll
*/
public class ApplicationListenerMethodTransactionalAdapterTests {
@Test
public void defaultPhase() {
Method m = ReflectionUtils.findMethod(SampleEvents.class, "defaultPhase", String.class);
assertPhase(m, TransactionPhase.AFTER_COMMIT);
}
@Test
public void phaseSet() {
Method m = ReflectionUtils.findMethod(SampleEvents.class, "phaseSet", String.class);
assertPhase(m, TransactionPhase.AFTER_ROLLBACK);
}
@Test
public void phaseAndClassesSet() {
Method m = ReflectionUtils.findMethod(SampleEvents.class, "phaseAndClassesSet");
assertPhase(m, TransactionPhase.AFTER_COMPLETION);
supportsEventType(true, m, createGenericEventType(String.class));
supportsEventType(true, m, createGenericEventType(Integer.class));
supportsEventType(false, m, createGenericEventType(Double.class));
}
@Test
public void valueSet() {
Method m = ReflectionUtils.findMethod(SampleEvents.class, "valueSet");
assertPhase(m, TransactionPhase.AFTER_COMMIT);
supportsEventType(true, m, createGenericEventType(String.class));
supportsEventType(false, m, createGenericEventType(Double.class));
}
private void assertPhase(Method method, TransactionPhase expected) {
assertThat(method).as("Method must not be null").isNotNull();
TransactionalEventListener annotation =
AnnotatedElementUtils.findMergedAnnotation(method, TransactionalEventListener.class);
assertThat(annotation.phase()).as("Wrong phase for '" + method + "'").isEqualTo(expected);
}
private void supportsEventType(boolean match, Method method, ResolvableType eventType) {
ApplicationListenerMethodAdapter adapter = createTestInstance(method);
assertThat(adapter.supportsEventType(eventType)).as("Wrong match for event '" + eventType + "' on " + method).isEqualTo(match);
}
private ApplicationListenerMethodTransactionalAdapter createTestInstance(Method m) {
return new ApplicationListenerMethodTransactionalAdapter("test", SampleEvents.class, m);
}
private ResolvableType createGenericEventType(Class<?> payloadType) {
return ResolvableType.forClassWithGenerics(PayloadApplicationEvent.class, payloadType);
}
static class SampleEvents {
@TransactionalEventListener
public void defaultPhase(String data) {
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
public void phaseSet(String data) {
}
@TransactionalEventListener(classes = {String.class, Integer.class},
phase = TransactionPhase.AFTER_COMPLETION)
public void phaseAndClassesSet() {
}
@TransactionalEventListener(String.class)
public void valueSet() {
}
}
}

View File

@ -0,0 +1,48 @@
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.event;
import org.springframework.context.ApplicationEvent;
import org.springframework.lang.Nullable;
/**
* @author Juergen Hoeller
* @author Oliver Drotbohm
*/
class CapturingSynchronizationCallback implements TransactionalApplicationListener.SynchronizationCallback {
@Nullable
ApplicationEvent preEvent;
@Nullable
ApplicationEvent postEvent;
@Nullable
Throwable ex;
@Override
public void preProcessEvent(ApplicationEvent event) {
this.preEvent = event;
}
@Override
public void postProcessEvent(ApplicationEvent event, @Nullable Throwable ex) {
this.postEvent = event;
this.ex = ex;
}
}

View File

@ -0,0 +1,109 @@
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.event;
import org.junit.jupiter.api.Test;
import org.springframework.context.PayloadApplicationEvent;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
/**
* @author Juergen Hoeller
*/
public class TransactionalApplicationListenerAdapterTests {
@Test
public void invokesCompletionCallbackOnSuccess() {
CapturingSynchronizationCallback callback = new CapturingSynchronizationCallback();
PayloadApplicationEvent<Object> event = new PayloadApplicationEvent<>(this, new Object());
TransactionalApplicationListener<PayloadApplicationEvent<Object>> adapter =
TransactionalApplicationListener.forPayload(p -> {});
adapter.addCallback(callback);
runInTransaction(() -> adapter.onApplicationEvent(event));
assertThat(callback.preEvent).isEqualTo(event);
assertThat(callback.postEvent).isEqualTo(event);
assertThat(callback.ex).isNull();
assertThat(adapter.getTransactionPhase()).isEqualTo(TransactionPhase.AFTER_COMMIT);
assertThat(adapter.getListenerId()).isEqualTo("");
}
@Test
public void invokesExceptionHandlerOnException() {
CapturingSynchronizationCallback callback = new CapturingSynchronizationCallback();
PayloadApplicationEvent<String> event = new PayloadApplicationEvent<>(this, "event");
RuntimeException ex = new RuntimeException("event");
TransactionalApplicationListener<PayloadApplicationEvent<String>> adapter =
TransactionalApplicationListener.forPayload(
TransactionPhase.BEFORE_COMMIT, p -> {throw ex;});
adapter.addCallback(callback);
assertThatExceptionOfType(RuntimeException.class)
.isThrownBy(() -> runInTransaction(() -> adapter.onApplicationEvent(event)))
.withMessage("event");
assertThat(callback.preEvent).isEqualTo(event);
assertThat(callback.postEvent).isEqualTo(event);
assertThat(callback.ex).isEqualTo(ex);
assertThat(adapter.getTransactionPhase()).isEqualTo(TransactionPhase.BEFORE_COMMIT);
assertThat(adapter.getListenerId()).isEqualTo("");
}
@Test
public void useSpecifiedIdentifier() {
CapturingSynchronizationCallback callback = new CapturingSynchronizationCallback();
PayloadApplicationEvent<String> event = new PayloadApplicationEvent<>(this, "event");
TransactionalApplicationListenerAdapter<PayloadApplicationEvent<String>> adapter =
new TransactionalApplicationListenerAdapter<>(e -> {});
adapter.setTransactionPhase(TransactionPhase.BEFORE_COMMIT);
adapter.setListenerId("identifier");
adapter.addCallback(callback);
runInTransaction(() -> adapter.onApplicationEvent(event));
assertThat(callback.preEvent).isEqualTo(event);
assertThat(callback.postEvent).isEqualTo(event);
assertThat(callback.ex).isNull();
assertThat(adapter.getTransactionPhase()).isEqualTo(TransactionPhase.BEFORE_COMMIT);
assertThat(adapter.getListenerId()).isEqualTo("identifier");
}
private static void runInTransaction(Runnable runnable) {
TransactionSynchronizationManager.setActualTransactionActive(true);
TransactionSynchronizationManager.initSynchronization();
try {
runnable.run();
TransactionSynchronizationManager.getSynchronizations().forEach(it -> {
it.beforeCommit(false);
it.afterCommit();
it.afterCompletion(TransactionSynchronization.STATUS_COMMITTED);
});
}
finally {
TransactionSynchronizationManager.clearSynchronization();
TransactionSynchronizationManager.setActualTransactionActive(false);
}
}
}

View File

@ -0,0 +1,198 @@
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.event;
import java.lang.reflect.Method;
import org.junit.jupiter.api.Test;
import org.springframework.context.PayloadApplicationEvent;
import org.springframework.context.event.ApplicationListenerMethodAdapter;
import org.springframework.core.ResolvableType;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.ReflectionUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
/**
* @author Stephane Nicoll
* @author Juergen Hoeller
* @author Oliver Drotbohm
*/
public class TransactionalApplicationListenerMethodAdapterTests {
@Test
public void defaultPhase() {
Method m = ReflectionUtils.findMethod(SampleEvents.class, "defaultPhase", String.class);
assertPhase(m, TransactionPhase.AFTER_COMMIT);
}
@Test
public void phaseSet() {
Method m = ReflectionUtils.findMethod(SampleEvents.class, "phaseSet", String.class);
assertPhase(m, TransactionPhase.AFTER_ROLLBACK);
}
@Test
public void phaseAndClassesSet() {
Method m = ReflectionUtils.findMethod(SampleEvents.class, "phaseAndClassesSet");
assertPhase(m, TransactionPhase.AFTER_COMPLETION);
supportsEventType(true, m, createGenericEventType(String.class));
supportsEventType(true, m, createGenericEventType(Integer.class));
supportsEventType(false, m, createGenericEventType(Double.class));
}
@Test
public void valueSet() {
Method m = ReflectionUtils.findMethod(SampleEvents.class, "valueSet");
assertPhase(m, TransactionPhase.AFTER_COMMIT);
supportsEventType(true, m, createGenericEventType(String.class));
supportsEventType(false, m, createGenericEventType(Double.class));
}
@Test
public void invokesCompletionCallbackOnSuccess() {
Method m = ReflectionUtils.findMethod(SampleEvents.class, "defaultPhase", String.class);
CapturingSynchronizationCallback callback = new CapturingSynchronizationCallback();
PayloadApplicationEvent<Object> event = new PayloadApplicationEvent<>(this, new Object());
TransactionalApplicationListenerMethodAdapter adapter = createTestInstance(m);
adapter.addCallback(callback);
runInTransaction(() -> adapter.onApplicationEvent(event));
assertThat(callback.preEvent).isEqualTo(event);
assertThat(callback.postEvent).isEqualTo(event);
assertThat(callback.ex).isNull();
assertThat(adapter.getTransactionPhase()).isEqualTo(TransactionPhase.AFTER_COMMIT);
assertThat(adapter.getListenerId()).endsWith("SampleEvents.defaultPhase(class java.lang.String)");
}
@Test
public void invokesExceptionHandlerOnException() {
Method m = ReflectionUtils.findMethod(SampleEvents.class, "throwing", String.class);
CapturingSynchronizationCallback callback = new CapturingSynchronizationCallback();
PayloadApplicationEvent<String> event = new PayloadApplicationEvent<>(this, "event");
TransactionalApplicationListenerMethodAdapter adapter = createTestInstance(m);
adapter.addCallback(callback);
assertThatExceptionOfType(RuntimeException.class)
.isThrownBy(() -> runInTransaction(() -> adapter.onApplicationEvent(event)))
.withMessage("event");
assertThat(callback.preEvent).isEqualTo(event);
assertThat(callback.postEvent).isEqualTo(event);
assertThat(callback.ex).isInstanceOf(RuntimeException.class);
assertThat(callback.ex.getMessage()).isEqualTo("event");
assertThat(adapter.getTransactionPhase()).isEqualTo(TransactionPhase.BEFORE_COMMIT);
assertThat(adapter.getListenerId()).isEqualTo(adapter.getDefaultListenerId());
}
@Test
public void usesAnnotatedIdentifier() {
Method m = ReflectionUtils.findMethod(SampleEvents.class, "identified", String.class);
CapturingSynchronizationCallback callback = new CapturingSynchronizationCallback();
PayloadApplicationEvent<String> event = new PayloadApplicationEvent<>(this, "event");
TransactionalApplicationListenerMethodAdapter adapter = createTestInstance(m);
adapter.addCallback(callback);
runInTransaction(() -> adapter.onApplicationEvent(event));
assertThat(callback.preEvent).isEqualTo(event);
assertThat(callback.postEvent).isEqualTo(event);
assertThat(callback.ex).isNull();
assertThat(adapter.getTransactionPhase()).isEqualTo(TransactionPhase.AFTER_COMMIT);
assertThat(adapter.getListenerId()).endsWith("identifier");
}
private static void assertPhase(Method method, TransactionPhase expected) {
assertThat(method).as("Method must not be null").isNotNull();
TransactionalEventListener annotation =
AnnotatedElementUtils.findMergedAnnotation(method, TransactionalEventListener.class);
assertThat(annotation.phase()).as("Wrong phase for '" + method + "'").isEqualTo(expected);
}
private static void supportsEventType(boolean match, Method method, ResolvableType eventType) {
ApplicationListenerMethodAdapter adapter = createTestInstance(method);
assertThat(adapter.supportsEventType(eventType)).as("Wrong match for event '" + eventType + "' on " + method).isEqualTo(match);
}
private static TransactionalApplicationListenerMethodAdapter createTestInstance(Method m) {
return new TransactionalApplicationListenerMethodAdapter("test", SampleEvents.class, m) {
@Override
protected Object getTargetBean() {
return new SampleEvents();
}
};
}
private static ResolvableType createGenericEventType(Class<?> payloadType) {
return ResolvableType.forClassWithGenerics(PayloadApplicationEvent.class, payloadType);
}
private static void runInTransaction(Runnable runnable) {
TransactionSynchronizationManager.setActualTransactionActive(true);
TransactionSynchronizationManager.initSynchronization();
try {
runnable.run();
TransactionSynchronizationManager.getSynchronizations().forEach(it -> {
it.beforeCommit(false);
it.afterCommit();
it.afterCompletion(TransactionSynchronization.STATUS_COMMITTED);
});
}
finally {
TransactionSynchronizationManager.clearSynchronization();
TransactionSynchronizationManager.setActualTransactionActive(false);
}
}
static class SampleEvents {
@TransactionalEventListener
public void defaultPhase(String data) {
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
public void phaseSet(String data) {
}
@TransactionalEventListener(classes = {String.class, Integer.class},
phase = TransactionPhase.AFTER_COMPLETION)
public void phaseAndClassesSet() {
}
@TransactionalEventListener(String.class)
public void valueSet() {
}
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void throwing(String data) {
throw new RuntimeException(data);
}
@TransactionalEventListener(id = "identifier")
public void identified(String data) {
}
}
}