diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/ReactiveReturnValueHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/ReactiveReturnValueHandler.java new file mode 100644 index 00000000000..3fdd678d575 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/ReactiveReturnValueHandler.java @@ -0,0 +1,65 @@ +/* + * Copyright 2002-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.handler.invocation; + +import reactor.core.publisher.Mono; + +import org.springframework.core.MethodParameter; +import org.springframework.core.ReactiveAdapter; +import org.springframework.core.ReactiveAdapterRegistry; +import org.springframework.messaging.support.MonoToListenableFutureAdapter; +import org.springframework.util.concurrent.ListenableFuture; + +/** + * Support for single-value reactive types (like {@code Mono} or {@code Single}) as a + * return value type. + * + * @author Sebastien Deleuze + * @since 5.1 + */ +public class ReactiveReturnValueHandler extends AbstractAsyncReturnValueHandler { + + private final ReactiveAdapterRegistry adapterRegistry; + + + public ReactiveReturnValueHandler() { + this(ReactiveAdapterRegistry.getSharedInstance()); + } + + public ReactiveReturnValueHandler(ReactiveAdapterRegistry adapterRegistry) { + this.adapterRegistry = adapterRegistry; + } + + + @Override + public boolean supportsReturnType(MethodParameter returnType) { + return this.adapterRegistry.getAdapter(returnType.getParameterType()) != null; + } + + @Override + public boolean isAsyncReturnValue(Object returnValue, MethodParameter returnType) { + ReactiveAdapter adapter = this.adapterRegistry.getAdapter(returnType.getParameterType(), returnValue); + return !adapter.isMultiValue() && !adapter.isNoValue(); + } + + @Override + public ListenableFuture toListenableFuture(Object returnValue, MethodParameter returnType) { + ReactiveAdapter adapter = this.adapterRegistry.getAdapter(returnType.getParameterType(), returnValue); + return new MonoToListenableFutureAdapter<>(Mono.from(adapter.toPublisher(returnValue))); + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandler.java index 4b5aa03c536..256f6d50fd5 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandler.java @@ -60,6 +60,7 @@ import org.springframework.messaging.handler.invocation.HandlerMethodArgumentRes import org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler; import org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandlerComposite; import org.springframework.messaging.handler.invocation.ListenableFutureReturnValueHandler; +import org.springframework.messaging.handler.invocation.ReactiveReturnValueHandler; import org.springframework.messaging.simp.SimpAttributesContextHolder; import org.springframework.messaging.simp.SimpLogging; import org.springframework.messaging.simp.SimpMessageHeaderAccessor; @@ -337,6 +338,7 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan handlers.add(new ListenableFutureReturnValueHandler()); handlers.add(new CompletableFutureReturnValueHandler()); + handlers.add(new ReactiveReturnValueHandler()); // Annotation-based return value types diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractMonoToListenableFutureAdapter.java b/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractMonoToListenableFutureAdapter.java similarity index 98% rename from spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractMonoToListenableFutureAdapter.java rename to spring-messaging/src/main/java/org/springframework/messaging/support/AbstractMonoToListenableFutureAdapter.java index 29e73651c77..9e3870556b6 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractMonoToListenableFutureAdapter.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractMonoToListenableFutureAdapter.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.messaging.tcp.reactor; +package org.springframework.messaging.support; import java.time.Duration; import java.util.concurrent.ExecutionException; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/MonoToListenableFutureAdapter.java b/spring-messaging/src/main/java/org/springframework/messaging/support/MonoToListenableFutureAdapter.java similarity index 88% rename from spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/MonoToListenableFutureAdapter.java rename to spring-messaging/src/main/java/org/springframework/messaging/support/MonoToListenableFutureAdapter.java index ffa405bd84f..712644f55ee 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/MonoToListenableFutureAdapter.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/MonoToListenableFutureAdapter.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.messaging.tcp.reactor; +package org.springframework.messaging.support; import reactor.core.publisher.Mono; @@ -29,7 +29,7 @@ import org.springframework.lang.Nullable; * @since 5.0 * @param the object type */ -class MonoToListenableFutureAdapter extends AbstractMonoToListenableFutureAdapter { +public class MonoToListenableFutureAdapter extends AbstractMonoToListenableFutureAdapter { public MonoToListenableFutureAdapter(Mono mono) { super(mono); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index d2e66ba5c5c..8ad4774b0d2 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -49,6 +49,7 @@ import reactor.netty.tcp.TcpClient; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; +import org.springframework.messaging.support.MonoToListenableFutureAdapter; import org.springframework.messaging.tcp.ReconnectStrategy; import org.springframework.messaging.tcp.TcpConnection; import org.springframework.messaging.tcp.TcpConnectionHandler; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java index 49a3067c5fa..25ec53d67d4 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java @@ -23,6 +23,7 @@ import reactor.netty.NettyInbound; import reactor.netty.NettyOutbound; import org.springframework.messaging.Message; +import org.springframework.messaging.support.MonoToListenableFutureAdapter; import org.springframework.messaging.tcp.TcpConnection; import org.springframework.util.concurrent.ListenableFuture; diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java index 702c6db6f8c..3118a451987 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java @@ -31,12 +31,18 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import reactor.core.publisher.EmitterProcessor; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxProcessor; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; import org.springframework.context.support.StaticApplicationContext; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.handler.HandlerMethod; @@ -336,6 +342,61 @@ public class SimpAnnotationMethodMessageHandlerTests { assertTrue(controller.exceptionCaught); } + @Test + public void monoSuccess() { + Message emptyMessage = MessageBuilder.withPayload(new byte[0]).build(); + given(this.channel.send(any(Message.class))).willReturn(true); + given(this.converter.toMessage(any(), any(MessageHeaders.class))).willReturn(emptyMessage); + + ReactiveController controller = new ReactiveController(); + this.messageHandler.registerHandler(controller); + this.messageHandler.setDestinationPrefixes(Arrays.asList("/app1", "/app2/")); + + Message message = createMessage("/app1/mono"); + this.messageHandler.handleMessage(message); + + assertNotNull(controller.mono); + controller.mono.onNext("foo"); + verify(this.converter).toMessage(this.payloadCaptor.capture(), any(MessageHeaders.class)); + assertEquals("foo", this.payloadCaptor.getValue()); + } + + @Test + public void monoFailure() { + Message emptyMessage = MessageBuilder.withPayload(new byte[0]).build(); + given(this.channel.send(any(Message.class))).willReturn(true); + given(this.converter.toMessage(any(), any(MessageHeaders.class))).willReturn(emptyMessage); + + ReactiveController controller = new ReactiveController(); + this.messageHandler.registerHandler(controller); + this.messageHandler.setDestinationPrefixes(Arrays.asList("/app1", "/app2/")); + + Message message = createMessage("/app1/mono"); + this.messageHandler.handleMessage(message); + + controller.mono.onError(new IllegalStateException()); + assertTrue(controller.exceptionCaught); + } + + @Test + public void fluxNotHandled() { + Message emptyMessage = MessageBuilder.withPayload(new byte[0]).build(); + given(this.channel.send(any(Message.class))).willReturn(true); + given(this.converter.toMessage(any(), any(MessageHeaders.class))).willReturn(emptyMessage); + + ReactiveController controller = new ReactiveController(); + this.messageHandler.registerHandler(controller); + this.messageHandler.setDestinationPrefixes(Arrays.asList("/app1", "/app2/")); + + Message message = createMessage("/app1/flux"); + this.messageHandler.handleMessage(message); + + assertNotNull(controller.flux); + controller.flux.onNext("foo"); + + verify(this.converter, never()).toMessage(any(), any(MessageHeaders.class)); + } + @Test public void placeholder() throws Exception { Message message = createMessage("/pre/myValue"); @@ -542,6 +603,33 @@ public class SimpAnnotationMethodMessageHandlerTests { } } + @Controller + private static class ReactiveController { + + private MonoProcessor mono; + + private FluxProcessor flux; + + private boolean exceptionCaught = false; + + @MessageMapping("mono") + public Mono handleMono() { + this.mono = MonoProcessor.create(); + return this.mono; + } + + @MessageMapping("flux") + public Flux handleFlux() { + this.flux = EmitterProcessor.create(); + return this.flux; + } + + @MessageExceptionHandler(IllegalStateException.class) + public void handleValidationException() { + this.exceptionCaught = true; + } + } + private static class StringTestValidator implements Validator {