diff --git a/spring-context/spring-context.gradle b/spring-context/spring-context.gradle
index 38b176740a3..51b395baf27 100644
--- a/spring-context/spring-context.gradle
+++ b/spring-context/spring-context.gradle
@@ -2,6 +2,12 @@ description = "Spring Context"
apply plugin: "groovy"
+dependencyManagement {
+ imports {
+ mavenBom "io.projectreactor:reactor-bom:${reactorVersion}"
+ }
+}
+
dependencies {
compile(project(":spring-aop"))
compile(project(":spring-beans"))
@@ -23,6 +29,8 @@ dependencies {
optional("org.hibernate:hibernate-validator:5.4.3.Final")
optional("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}")
optional("org.jetbrains.kotlin:kotlin-stdlib:${kotlinVersion}")
+ optional("org.reactivestreams:reactive-streams")
+ testCompile("io.projectreactor:reactor-core")
testCompile("org.codehaus.groovy:groovy-jsr223:${groovyVersion}")
testCompile("org.codehaus.groovy:groovy-test:${groovyVersion}")
testCompile("org.codehaus.groovy:groovy-xml:${groovyVersion}")
diff --git a/spring-context/src/main/java/org/springframework/context/event/ApplicationListenerMethodAdapter.java b/spring-context/src/main/java/org/springframework/context/event/ApplicationListenerMethodAdapter.java
index 1e572c0ca8e..e352d69ad34 100644
--- a/spring-context/src/main/java/org/springframework/context/event/ApplicationListenerMethodAdapter.java
+++ b/spring-context/src/main/java/org/springframework/context/event/ApplicationListenerMethodAdapter.java
@@ -24,9 +24,12 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletionStage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
import org.springframework.aop.support.AopUtils;
import org.springframework.context.ApplicationContext;
@@ -34,20 +37,24 @@ import org.springframework.context.ApplicationEvent;
import org.springframework.context.PayloadApplicationEvent;
import org.springframework.context.expression.AnnotatedElementKey;
import org.springframework.core.BridgeMethodResolver;
+import org.springframework.core.ReactiveAdapter;
+import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.ResolvableType;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.annotation.Order;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
+import org.springframework.util.ClassUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
+import org.springframework.util.concurrent.ListenableFuture;
/**
* {@link GenericApplicationListener} adapter that delegates the processing of
* an event to an {@link EventListener} annotated method.
*
- *
Delegates to {@link #processEvent(ApplicationEvent)} to give sub-classes
+ *
Delegates to {@link #processEvent(ApplicationEvent)} to give subclasses
* a chance to deviate from the default. Unwraps the content of a
* {@link PayloadApplicationEvent} if necessary to allow a method declaration
* to define any arbitrary event type. If a condition is defined, it is
@@ -60,6 +67,10 @@ import org.springframework.util.StringUtils;
*/
public class ApplicationListenerMethodAdapter implements GenericApplicationListener {
+ private static final boolean reactiveStreamsPresent = ClassUtils.isPresent(
+ "org.reactivestreams.Publisher", ApplicationListenerMethodAdapter.class.getClassLoader());
+
+
protected final Log logger = LogFactory.getLog(getClass());
private final String beanName;
@@ -213,6 +224,30 @@ public class ApplicationListenerMethodAdapter implements GenericApplicationListe
}
protected void handleResult(Object result) {
+ if (reactiveStreamsPresent && new ReactiveResultHandler().subscribeToPublisher(result)) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Adapted to reactive result: " + result);
+ }
+ }
+ else if (result instanceof CompletionStage) {
+ ((CompletionStage>) result).whenComplete((event, ex) -> {
+ if (ex != null) {
+ handleAsyncError(ex);
+ }
+ else if (event != null) {
+ publishEvent(event);
+ }
+ });
+ }
+ else if (result instanceof ListenableFuture) {
+ ((ListenableFuture>) result).addCallback(this::publishEvents, this::handleAsyncError);
+ }
+ else {
+ publishEvents(result);
+ }
+ }
+
+ private void publishEvents(Object result) {
if (result.getClass().isArray()) {
Object[] events = ObjectUtils.toObjectArray(result);
for (Object event : events) {
@@ -237,6 +272,10 @@ public class ApplicationListenerMethodAdapter implements GenericApplicationListe
}
}
+ protected void handleAsyncError(Throwable t) {
+ logger.error("Unexpected error occurred in asynchronous listener", t);
+ }
+
private boolean shouldHandle(ApplicationEvent event, @Nullable Object[] args) {
if (args == null) {
return false;
@@ -376,4 +415,40 @@ public class ApplicationListenerMethodAdapter implements GenericApplicationListe
return this.method.toGenericString();
}
+
+ private class ReactiveResultHandler {
+
+ public boolean subscribeToPublisher(Object result) {
+ ReactiveAdapter adapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(result.getClass());
+ if (adapter != null) {
+ adapter.toPublisher(result).subscribe(new EventPublicationSubscriber());
+ return true;
+ }
+ return false;
+ }
+ }
+
+
+ private class EventPublicationSubscriber implements Subscriber