From 7bfe01a0287c5cf56f57e6660f7943351d7f8d9d Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Fri, 5 Jul 2019 16:19:23 +0200 Subject: [PATCH] Support for reactive result values from event listener methods Closes gh-21831 --- spring-context/spring-context.gradle | 8 ++ .../ApplicationListenerMethodAdapter.java | 77 ++++++++++++++++++- .../scheduling/support/TaskUtils.java | 6 +- .../AnnotationDrivenEventListenerTests.java | 76 ++++++++++++++++-- ...ApplicationListenerMethodAdapterTests.java | 15 ++-- .../EventPublicationInterceptorTests.java | 18 ++--- 6 files changed, 169 insertions(+), 31 deletions(-) diff --git a/spring-context/spring-context.gradle b/spring-context/spring-context.gradle index 38b176740a..51b395baf2 100644 --- a/spring-context/spring-context.gradle +++ b/spring-context/spring-context.gradle @@ -2,6 +2,12 @@ description = "Spring Context" apply plugin: "groovy" +dependencyManagement { + imports { + mavenBom "io.projectreactor:reactor-bom:${reactorVersion}" + } +} + dependencies { compile(project(":spring-aop")) compile(project(":spring-beans")) @@ -23,6 +29,8 @@ dependencies { optional("org.hibernate:hibernate-validator:5.4.3.Final") optional("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}") optional("org.jetbrains.kotlin:kotlin-stdlib:${kotlinVersion}") + optional("org.reactivestreams:reactive-streams") + testCompile("io.projectreactor:reactor-core") testCompile("org.codehaus.groovy:groovy-jsr223:${groovyVersion}") testCompile("org.codehaus.groovy:groovy-test:${groovyVersion}") testCompile("org.codehaus.groovy:groovy-xml:${groovyVersion}") diff --git a/spring-context/src/main/java/org/springframework/context/event/ApplicationListenerMethodAdapter.java b/spring-context/src/main/java/org/springframework/context/event/ApplicationListenerMethodAdapter.java index 1e572c0ca8..e352d69ad3 100644 --- a/spring-context/src/main/java/org/springframework/context/event/ApplicationListenerMethodAdapter.java +++ b/spring-context/src/main/java/org/springframework/context/event/ApplicationListenerMethodAdapter.java @@ -24,9 +24,12 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletionStage; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import org.springframework.aop.support.AopUtils; import org.springframework.context.ApplicationContext; @@ -34,20 +37,24 @@ import org.springframework.context.ApplicationEvent; import org.springframework.context.PayloadApplicationEvent; import org.springframework.context.expression.AnnotatedElementKey; import org.springframework.core.BridgeMethodResolver; +import org.springframework.core.ReactiveAdapter; +import org.springframework.core.ReactiveAdapterRegistry; import org.springframework.core.ResolvableType; import org.springframework.core.annotation.AnnotatedElementUtils; import org.springframework.core.annotation.Order; import org.springframework.lang.Nullable; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; import org.springframework.util.ObjectUtils; import org.springframework.util.ReflectionUtils; import org.springframework.util.StringUtils; +import org.springframework.util.concurrent.ListenableFuture; /** * {@link GenericApplicationListener} adapter that delegates the processing of * an event to an {@link EventListener} annotated method. * - *

Delegates to {@link #processEvent(ApplicationEvent)} to give sub-classes + *

Delegates to {@link #processEvent(ApplicationEvent)} to give subclasses * a chance to deviate from the default. Unwraps the content of a * {@link PayloadApplicationEvent} if necessary to allow a method declaration * to define any arbitrary event type. If a condition is defined, it is @@ -60,6 +67,10 @@ import org.springframework.util.StringUtils; */ public class ApplicationListenerMethodAdapter implements GenericApplicationListener { + private static final boolean reactiveStreamsPresent = ClassUtils.isPresent( + "org.reactivestreams.Publisher", ApplicationListenerMethodAdapter.class.getClassLoader()); + + protected final Log logger = LogFactory.getLog(getClass()); private final String beanName; @@ -213,6 +224,30 @@ public class ApplicationListenerMethodAdapter implements GenericApplicationListe } protected void handleResult(Object result) { + if (reactiveStreamsPresent && new ReactiveResultHandler().subscribeToPublisher(result)) { + if (logger.isTraceEnabled()) { + logger.trace("Adapted to reactive result: " + result); + } + } + else if (result instanceof CompletionStage) { + ((CompletionStage) result).whenComplete((event, ex) -> { + if (ex != null) { + handleAsyncError(ex); + } + else if (event != null) { + publishEvent(event); + } + }); + } + else if (result instanceof ListenableFuture) { + ((ListenableFuture) result).addCallback(this::publishEvents, this::handleAsyncError); + } + else { + publishEvents(result); + } + } + + private void publishEvents(Object result) { if (result.getClass().isArray()) { Object[] events = ObjectUtils.toObjectArray(result); for (Object event : events) { @@ -237,6 +272,10 @@ public class ApplicationListenerMethodAdapter implements GenericApplicationListe } } + protected void handleAsyncError(Throwable t) { + logger.error("Unexpected error occurred in asynchronous listener", t); + } + private boolean shouldHandle(ApplicationEvent event, @Nullable Object[] args) { if (args == null) { return false; @@ -376,4 +415,40 @@ public class ApplicationListenerMethodAdapter implements GenericApplicationListe return this.method.toGenericString(); } + + private class ReactiveResultHandler { + + public boolean subscribeToPublisher(Object result) { + ReactiveAdapter adapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(result.getClass()); + if (adapter != null) { + adapter.toPublisher(result).subscribe(new EventPublicationSubscriber()); + return true; + } + return false; + } + } + + + private class EventPublicationSubscriber implements Subscriber { + + @Override + public void onSubscribe(Subscription s) { + s.request(Integer.MAX_VALUE); + } + + @Override + public void onNext(Object o) { + publishEvents(o); + } + + @Override + public void onError(Throwable t) { + handleAsyncError(t); + } + + @Override + public void onComplete() { + } + } + } diff --git a/spring-context/src/main/java/org/springframework/scheduling/support/TaskUtils.java b/spring-context/src/main/java/org/springframework/scheduling/support/TaskUtils.java index d6aa70ca94..f27ab89c22 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/support/TaskUtils.java +++ b/spring-context/src/main/java/org/springframework/scheduling/support/TaskUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -92,9 +92,7 @@ public abstract class TaskUtils { @Override public void handleError(Throwable t) { - if (logger.isErrorEnabled()) { - logger.error("Unexpected error occurred in scheduled task.", t); - } + logger.error("Unexpected error occurred in scheduled task", t); } } diff --git a/spring-context/src/test/java/org/springframework/context/event/AnnotationDrivenEventListenerTests.java b/spring-context/src/test/java/org/springframework/context/event/AnnotationDrivenEventListenerTests.java index 1edfbb0447..c54370ff27 100644 --- a/spring-context/src/test/java/org/springframework/context/event/AnnotationDrivenEventListenerTests.java +++ b/spring-context/src/test/java/org/springframework/context/event/AnnotationDrivenEventListenerTests.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; @@ -32,6 +33,8 @@ import javax.annotation.PostConstruct; import org.junit.After; import org.junit.Ignore; import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import org.springframework.aop.framework.Advised; import org.springframework.aop.support.AopUtils; @@ -61,6 +64,7 @@ import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.stereotype.Component; import org.springframework.util.Assert; +import org.springframework.util.concurrent.SettableListenableFuture; import org.springframework.validation.annotation.Validated; import org.springframework.validation.beanvalidation.MethodValidationPostProcessor; @@ -243,7 +247,69 @@ public class AnnotationDrivenEventListenerTests { } @Test - public void eventListenerWorksWithSimpleInterfaceProxy() throws Exception { + public void listenableFutureReply() { + load(TestEventListener.class, ReplyEventListener.class); + SettableListenableFuture future = new SettableListenableFuture<>(); + future.set("dummy"); + AnotherTestEvent event = new AnotherTestEvent(this, future); + ReplyEventListener replyEventListener = this.context.getBean(ReplyEventListener.class); + TestEventListener listener = this.context.getBean(TestEventListener.class); + + this.eventCollector.assertNoEventReceived(listener); + this.eventCollector.assertNoEventReceived(replyEventListener); + this.context.publishEvent(event); + this.eventCollector.assertEvent(replyEventListener, event); + this.eventCollector.assertEvent(listener, "dummy"); // reply + this.eventCollector.assertTotalEventsCount(2); + } + + @Test + public void completableFutureReply() { + load(TestEventListener.class, ReplyEventListener.class); + AnotherTestEvent event = new AnotherTestEvent(this, CompletableFuture.completedFuture("dummy")); + ReplyEventListener replyEventListener = this.context.getBean(ReplyEventListener.class); + TestEventListener listener = this.context.getBean(TestEventListener.class); + + this.eventCollector.assertNoEventReceived(listener); + this.eventCollector.assertNoEventReceived(replyEventListener); + this.context.publishEvent(event); + this.eventCollector.assertEvent(replyEventListener, event); + this.eventCollector.assertEvent(listener, "dummy"); // reply + this.eventCollector.assertTotalEventsCount(2); + } + + @Test + public void monoReply() { + load(TestEventListener.class, ReplyEventListener.class); + AnotherTestEvent event = new AnotherTestEvent(this, Mono.just("dummy")); + ReplyEventListener replyEventListener = this.context.getBean(ReplyEventListener.class); + TestEventListener listener = this.context.getBean(TestEventListener.class); + + this.eventCollector.assertNoEventReceived(listener); + this.eventCollector.assertNoEventReceived(replyEventListener); + this.context.publishEvent(event); + this.eventCollector.assertEvent(replyEventListener, event); + this.eventCollector.assertEvent(listener, "dummy"); // reply + this.eventCollector.assertTotalEventsCount(2); + } + + @Test + public void fluxReply() { + load(TestEventListener.class, ReplyEventListener.class); + AnotherTestEvent event = new AnotherTestEvent(this, Flux.just("dummy1", "dummy2")); + ReplyEventListener replyEventListener = this.context.getBean(ReplyEventListener.class); + TestEventListener listener = this.context.getBean(TestEventListener.class); + + this.eventCollector.assertNoEventReceived(listener); + this.eventCollector.assertNoEventReceived(replyEventListener); + this.context.publishEvent(event); + this.eventCollector.assertEvent(replyEventListener, event); + this.eventCollector.assertEvent(listener, "dummy1", "dummy2"); // reply + this.eventCollector.assertTotalEventsCount(3); + } + + @Test + public void eventListenerWorksWithSimpleInterfaceProxy() { load(ScopedProxyTestBean.class); SimpleService proxy = this.context.getBean(SimpleService.class); @@ -260,7 +326,7 @@ public class AnnotationDrivenEventListenerTests { } @Test - public void eventListenerWorksWithAnnotatedInterfaceProxy() throws Exception { + public void eventListenerWorksWithAnnotatedInterfaceProxy() { load(AnnotatedProxyTestBean.class); AnnotatedSimpleService proxy = this.context.getBean(AnnotatedSimpleService.class); @@ -277,7 +343,7 @@ public class AnnotationDrivenEventListenerTests { } @Test - public void eventListenerWorksWithCglibProxy() throws Exception { + public void eventListenerWorksWithCglibProxy() { load(CglibProxyTestBean.class); CglibProxyTestBean proxy = this.context.getBean(CglibProxyTestBean.class); @@ -294,14 +360,14 @@ public class AnnotationDrivenEventListenerTests { } @Test - public void privateMethodOnCglibProxyFails() throws Exception { + public void privateMethodOnCglibProxyFails() { assertThatExceptionOfType(BeanInitializationException.class).isThrownBy(() -> load(CglibProxyWithPrivateMethod.class)) .withCauseInstanceOf(IllegalStateException.class); } @Test - public void eventListenerWorksWithCustomScope() throws Exception { + public void eventListenerWorksWithCustomScope() { load(CustomScopeTestBean.class); CustomScope customScope = new CustomScope(); this.context.getBeanFactory().registerScope("custom", customScope); diff --git a/spring-context/src/test/java/org/springframework/context/event/ApplicationListenerMethodAdapterTests.java b/spring-context/src/test/java/org/springframework/context/event/ApplicationListenerMethodAdapterTests.java index b8299c67e4..4fd1a32adc 100644 --- a/spring-context/src/test/java/org/springframework/context/event/ApplicationListenerMethodAdapterTests.java +++ b/spring-context/src/test/java/org/springframework/context/event/ApplicationListenerMethodAdapterTests.java @@ -99,8 +99,7 @@ public class ApplicationListenerMethodAdapterTests extends AbstractApplicationEv @Test public void listenerWithSubTypeSeveralGenerics() { - Method method = ReflectionUtils.findMethod(SampleEvents.class, - "handleString", String.class); + Method method = ReflectionUtils.findMethod(SampleEvents.class, "handleString", String.class); supportsEventType(true, method, ResolvableType.forClass(PayloadTestEvent.class)); } @@ -141,23 +140,20 @@ public class ApplicationListenerMethodAdapterTests extends AbstractApplicationEv public void listenerWithTooManyParameters() { Method method = ReflectionUtils.findMethod( SampleEvents.class, "tooManyParameters", String.class, String.class); - assertThatIllegalStateException().isThrownBy(() -> - createTestInstance(method)); + assertThatIllegalStateException().isThrownBy(() -> createTestInstance(method)); } @Test public void listenerWithNoParameter() { Method method = ReflectionUtils.findMethod(SampleEvents.class, "noParameter"); - assertThatIllegalStateException().isThrownBy(() -> - createTestInstance(method)); + assertThatIllegalStateException().isThrownBy(() -> createTestInstance(method)); } @Test public void listenerWithMoreThanOneParameter() { Method method = ReflectionUtils.findMethod( SampleEvents.class, "moreThanOneParameter", String.class, Integer.class); - assertThatIllegalStateException().isThrownBy(() -> - createTestInstance(method)); + assertThatIllegalStateException().isThrownBy(() -> createTestInstance(method)); } @Test @@ -331,7 +327,8 @@ public class ApplicationListenerMethodAdapterTests extends AbstractApplicationEv private void supportsEventType(boolean match, Method method, ResolvableType eventType) { ApplicationListenerMethodAdapter adapter = createTestInstance(method); - assertThat(adapter.supportsEventType(eventType)).as("Wrong match for event '" + eventType + "' on " + method).isEqualTo(match); + assertThat(adapter.supportsEventType(eventType)) + .as("Wrong match for event '" + eventType + "' on " + method).isEqualTo(match); } private void invokeListener(Method method, ApplicationEvent event) { diff --git a/spring-context/src/test/java/org/springframework/context/event/EventPublicationInterceptorTests.java b/spring-context/src/test/java/org/springframework/context/event/EventPublicationInterceptorTests.java index d48cfa27ff..3239a47213 100644 --- a/spring-context/src/test/java/org/springframework/context/event/EventPublicationInterceptorTests.java +++ b/spring-context/src/test/java/org/springframework/context/event/EventPublicationInterceptorTests.java @@ -16,7 +16,6 @@ package org.springframework.context.event; -import org.junit.Before; import org.junit.Test; import org.springframework.aop.framework.ProxyFactory; @@ -42,16 +41,11 @@ import static org.mockito.Mockito.mock; */ public class EventPublicationInterceptorTests { - private ApplicationEventPublisher publisher; + private final ApplicationEventPublisher publisher = mock(ApplicationEventPublisher.class); - @Before - public void setUp() { - this.publisher = mock(ApplicationEventPublisher.class); - } - @Test - public void testWithNoApplicationEventClassSupplied() throws Exception { + public void testWithNoApplicationEventClassSupplied() { EventPublicationInterceptor interceptor = new EventPublicationInterceptor(); interceptor.setApplicationEventPublisher(this.publisher); assertThatIllegalArgumentException().isThrownBy( @@ -59,7 +53,7 @@ public class EventPublicationInterceptorTests { } @Test - public void testWithNonApplicationEventClassSupplied() throws Exception { + public void testWithNonApplicationEventClassSupplied() { EventPublicationInterceptor interceptor = new EventPublicationInterceptor(); interceptor.setApplicationEventPublisher(this.publisher); assertThatIllegalArgumentException().isThrownBy(() -> { @@ -69,7 +63,7 @@ public class EventPublicationInterceptorTests { } @Test - public void testWithAbstractStraightApplicationEventClassSupplied() throws Exception { + public void testWithAbstractStraightApplicationEventClassSupplied() { EventPublicationInterceptor interceptor = new EventPublicationInterceptor(); interceptor.setApplicationEventPublisher(this.publisher); assertThatIllegalArgumentException().isThrownBy(() -> { @@ -79,7 +73,7 @@ public class EventPublicationInterceptorTests { } @Test - public void testWithApplicationEventClassThatDoesntExposeAValidCtor() throws Exception { + public void testWithApplicationEventClassThatDoesntExposeAValidCtor() { EventPublicationInterceptor interceptor = new EventPublicationInterceptor(); interceptor.setApplicationEventPublisher(this.publisher); assertThatIllegalArgumentException().isThrownBy(() -> { @@ -89,7 +83,7 @@ public class EventPublicationInterceptorTests { } @Test - public void testExpectedBehavior() throws Exception { + public void testExpectedBehavior() { TestBean target = new TestBean(); final TestListener listener = new TestListener();