Support ListenableFuture in @MessageMapping handler methods
This commit introduces support for asynchronous return values thanks to the new AsyncHandlerMethodReturnValueHandler interface. Out of the box support for ListenableFuture is also provided. Issue: SPR-12168
This commit is contained in:
parent
792b7b9d11
commit
d3db99c201
|
|
@ -258,7 +258,7 @@ public class HandlerMethod {
|
|||
/**
|
||||
* A MethodParameter with HandlerMethod-specific behavior.
|
||||
*/
|
||||
private class HandlerMethodParameter extends MethodParameter {
|
||||
protected class HandlerMethodParameter extends MethodParameter {
|
||||
|
||||
public HandlerMethodParameter(int index) {
|
||||
super(HandlerMethod.this.bridgedMethod, index);
|
||||
|
|
|
|||
|
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.messaging.handler.invocation;
|
||||
|
||||
import org.springframework.core.MethodParameter;
|
||||
import org.springframework.messaging.Message;
|
||||
|
||||
/**
|
||||
* Abstract base class for {@link AsyncHandlerMethodReturnValueHandler} implementations
|
||||
* only intended for asynchronous return value handling.
|
||||
*
|
||||
* @author Sebastien Deleuze
|
||||
* @since 4.2
|
||||
*/
|
||||
public abstract class AbstractAsyncReturnValueHandler implements AsyncHandlerMethodReturnValueHandler {
|
||||
|
||||
@Override
|
||||
public void handleReturnValue(Object returnValue, MethodParameter returnType, Message<?> message) throws Exception {
|
||||
throw new UnsupportedOperationException("Not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAsyncReturnValue(Object returnValue, MethodParameter returnType) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
@ -48,6 +48,8 @@ import org.springframework.util.CollectionUtils;
|
|||
import org.springframework.util.LinkedMultiValueMap;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
import org.springframework.util.concurrent.ListenableFutureCallback;
|
||||
|
||||
/**
|
||||
* Abstract base class for HandlerMethod-based message handling. Provides most of
|
||||
|
|
@ -462,7 +464,15 @@ public abstract class AbstractMethodMessageHandler<T>
|
|||
if (void.class == returnType.getParameterType()) {
|
||||
return;
|
||||
}
|
||||
this.returnValueHandlers.handleReturnValue(returnValue, returnType, message);
|
||||
if (this.returnValueHandlers.isAsyncReturnValue(returnValue, returnType)) {
|
||||
ListenableFuture<?> future = this.returnValueHandlers.toListenableFuture(returnValue, returnType);
|
||||
if (future != null) {
|
||||
future.addCallback(new ReturnValueListenableFutureCallback(returnType, invocable, message));
|
||||
}
|
||||
}
|
||||
else {
|
||||
this.returnValueHandlers.handleReturnValue(returnValue, returnType, message);
|
||||
}
|
||||
}
|
||||
catch (Exception ex) {
|
||||
processHandlerMethodException(handlerMethod, ex, message);
|
||||
|
|
@ -584,4 +594,42 @@ public abstract class AbstractMethodMessageHandler<T>
|
|||
}
|
||||
}
|
||||
|
||||
private class ReturnValueListenableFutureCallback implements ListenableFutureCallback<Object> {
|
||||
|
||||
private final MethodParameter returnType;
|
||||
|
||||
private final InvocableHandlerMethod handlerMethod;
|
||||
|
||||
private final Message<?> message;
|
||||
|
||||
|
||||
public ReturnValueListenableFutureCallback(MethodParameter returnType,
|
||||
InvocableHandlerMethod handlerMethod, Message<?> message) {
|
||||
|
||||
this.returnType = returnType;
|
||||
this.handlerMethod = handlerMethod;
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSuccess(Object result) {
|
||||
try {
|
||||
returnValueHandlers.handleReturnValue(result, handlerMethod.getAsyncReturnValueType(result), this.message);
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
handleFailure(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable ex) {
|
||||
handleFailure(ex);
|
||||
}
|
||||
|
||||
private void handleFailure(Throwable ex) {
|
||||
Exception cause = (ex instanceof Exception ? (Exception) ex : new RuntimeException(ex));
|
||||
processHandlerMethodException(this.handlerMethod, cause, this.message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.messaging.handler.invocation;
|
||||
|
||||
import org.springframework.core.MethodParameter;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
|
||||
/**
|
||||
* An extension of {@link HandlerMethodReturnValueHandler} for handling async
|
||||
* return value types.
|
||||
*
|
||||
* <p>Implementations only intended for asynchronous return value handling can extend
|
||||
* {@link AbstractAsyncReturnValueHandler}.</p>
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 4.2
|
||||
* @see AbstractAsyncReturnValueHandler
|
||||
*/
|
||||
public interface AsyncHandlerMethodReturnValueHandler extends HandlerMethodReturnValueHandler {
|
||||
|
||||
/**
|
||||
* Whether the return value type represents a value that will be produced
|
||||
* asynchronously. If this method returns {@code true}, the
|
||||
* {@link #toListenableFuture(Object, MethodParameter)} will be invoked next.
|
||||
* @param returnValue the value returned from the handler method
|
||||
* @param returnType the type of the return value. This type must have
|
||||
* previously been passed to
|
||||
* {@link #supportsReturnType(org.springframework.core.MethodParameter)}
|
||||
* and it must have returned {@code true}
|
||||
* @return true if the return value type represents an async value.
|
||||
*/
|
||||
boolean isAsyncReturnValue(Object returnValue, MethodParameter returnType);
|
||||
|
||||
/**
|
||||
* Adapt the given asynchronous return value to a ListenableFuture.
|
||||
* Implementations can return an instance of
|
||||
* {@link org.springframework.util.concurrent.SettableListenableFuture} and
|
||||
* then set it to an Object (success) or a Throwable (failure) to complete
|
||||
* handling.
|
||||
* @param returnValue the value returned from the handler method
|
||||
* @param returnType the type of the return value. This type must have
|
||||
* previously been passed to
|
||||
* {@link #supportsReturnType(org.springframework.core.MethodParameter)}
|
||||
* and it must have returned {@code true}
|
||||
* @return a ListenableFuture
|
||||
*/
|
||||
ListenableFuture<?> toListenableFuture(Object returnValue, MethodParameter returnType);
|
||||
|
||||
}
|
||||
|
|
@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.springframework.core.MethodParameter;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
|
||||
/**
|
||||
* A HandlerMethodReturnValueHandler that wraps and delegates to others.
|
||||
|
|
@ -33,7 +34,7 @@ import org.springframework.util.Assert;
|
|||
* @author Rossen Stoyanchev
|
||||
* @since 4.0
|
||||
*/
|
||||
public class HandlerMethodReturnValueHandlerComposite implements HandlerMethodReturnValueHandler {
|
||||
public class HandlerMethodReturnValueHandlerComposite implements AsyncHandlerMethodReturnValueHandler {
|
||||
|
||||
private static final Log logger = LogFactory.getLog(HandlerMethodReturnValueHandlerComposite.class);
|
||||
|
||||
|
|
@ -100,4 +101,22 @@ public class HandlerMethodReturnValueHandlerComposite implements HandlerMethodRe
|
|||
handler.handleReturnValue(returnValue, returnType, message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAsyncReturnValue(Object returnValue, MethodParameter returnType) {
|
||||
HandlerMethodReturnValueHandler handler = getReturnValueHandler(returnType);
|
||||
if (handler != null && handler instanceof AsyncHandlerMethodReturnValueHandler) {
|
||||
if (((AsyncHandlerMethodReturnValueHandler) handler).isAsyncReturnValue(returnValue, returnType)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<?> toListenableFuture(Object returnValue, MethodParameter returnType) {
|
||||
HandlerMethodReturnValueHandler handler = getReturnValueHandler(returnType);
|
||||
Assert.isTrue(handler != null && handler instanceof AsyncHandlerMethodReturnValueHandler);
|
||||
return ((AsyncHandlerMethodReturnValueHandler) handler).toListenableFuture(returnValue, returnType);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,14 +18,17 @@ package org.springframework.messaging.handler.invocation;
|
|||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.springframework.core.DefaultParameterNameDiscoverer;
|
||||
import org.springframework.core.GenericTypeResolver;
|
||||
import org.springframework.core.MethodParameter;
|
||||
import org.springframework.core.ParameterNameDiscoverer;
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.handler.HandlerMethod;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
|
|
@ -242,4 +245,35 @@ public class InvocableHandlerMethod extends HandlerMethod {
|
|||
return sb.toString();
|
||||
}
|
||||
|
||||
MethodParameter getAsyncReturnValueType(Object returnValue) {
|
||||
return new AsyncResultMethodParameter(returnValue);
|
||||
}
|
||||
|
||||
private class AsyncResultMethodParameter extends HandlerMethodParameter {
|
||||
|
||||
private final Object returnValue;
|
||||
|
||||
private final ResolvableType returnType;
|
||||
|
||||
public AsyncResultMethodParameter(Object returnValue) {
|
||||
super(-1);
|
||||
this.returnValue = returnValue;
|
||||
this.returnType = ResolvableType.forType(super.getGenericParameterType()).getGeneric(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<?> getParameterType() {
|
||||
if (this.returnValue != null) {
|
||||
return this.returnValue.getClass();
|
||||
}
|
||||
Assert.isTrue(!ResolvableType.NONE.equals(this.returnType), "Expected Future-like type with generic parameter");
|
||||
return this.returnType.getRawClass();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Type getGenericParameterType() {
|
||||
return this.returnType.getType();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.messaging.handler.invocation;
|
||||
|
||||
import org.springframework.core.MethodParameter;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
|
||||
/**
|
||||
* An {@link AsyncHandlerMethodReturnValueHandler} for {@link ListenableFuture} return type handling.
|
||||
*
|
||||
* @author Sebastien Deleuze
|
||||
* @since 4.2
|
||||
*/
|
||||
public class ListenableFutureReturnValueHandler extends AbstractAsyncReturnValueHandler {
|
||||
|
||||
@Override
|
||||
public boolean supportsReturnType(MethodParameter returnType) {
|
||||
return ListenableFuture.class.isAssignableFrom(returnType.getParameterType());
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public ListenableFuture<?> toListenableFuture(Object returnValue, MethodParameter returnType) {
|
||||
return (ListenableFuture<?>)returnValue;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -52,6 +52,7 @@ import org.springframework.messaging.handler.invocation.AbstractExceptionHandler
|
|||
import org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler;
|
||||
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
|
||||
import org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler;
|
||||
import org.springframework.messaging.handler.invocation.ListenableFutureReturnValueHandler;
|
||||
import org.springframework.messaging.simp.SimpAttributesContextHolder;
|
||||
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
|
||||
import org.springframework.messaging.simp.SimpMessageMappingInfo;
|
||||
|
|
@ -314,6 +315,10 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan
|
|||
protected List<? extends HandlerMethodReturnValueHandler> initReturnValueHandlers() {
|
||||
List<HandlerMethodReturnValueHandler> handlers = new ArrayList<HandlerMethodReturnValueHandler>();
|
||||
|
||||
// Single-purpose return value types
|
||||
ListenableFutureReturnValueHandler lfh = new ListenableFutureReturnValueHandler();
|
||||
handlers.add(lfh);
|
||||
|
||||
// Annotation-based return value types
|
||||
SendToMethodReturnValueHandler sth = new SendToMethodReturnValueHandler(this.brokerTemplate, true);
|
||||
sth.setHeaderInitializer(this.headerInitializer);
|
||||
|
|
|
|||
|
|
@ -25,12 +25,19 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import static org.mockito.BDDMockito.given;
|
||||
import org.mockito.Captor;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyObject;
|
||||
import org.mockito.Mock;
|
||||
|
||||
import org.springframework.context.support.StaticApplicationContext;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
import org.springframework.messaging.converter.MessageConverter;
|
||||
import org.springframework.messaging.handler.annotation.DestinationVariable;
|
||||
import org.springframework.messaging.handler.annotation.Header;
|
||||
import org.springframework.messaging.handler.annotation.Headers;
|
||||
|
|
@ -48,12 +55,15 @@ import org.springframework.messaging.simp.annotation.SubscribeMapping;
|
|||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.stereotype.Controller;
|
||||
import org.springframework.util.AntPathMatcher;
|
||||
import org.springframework.util.concurrent.ListenableFutureTask;
|
||||
import org.springframework.validation.Errors;
|
||||
import org.springframework.validation.Validator;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
/**
|
||||
* Test fixture for
|
||||
|
|
@ -61,6 +71,7 @@ import static org.junit.Assert.*;
|
|||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @author Brian Clozel
|
||||
* @author Sebastien Deleuze
|
||||
*/
|
||||
public class SimpAnnotationMethodMessageHandlerTests {
|
||||
|
||||
|
|
@ -70,13 +81,24 @@ public class SimpAnnotationMethodMessageHandlerTests {
|
|||
|
||||
private TestController testController;
|
||||
|
||||
@Mock
|
||||
private SubscribableChannel channel;
|
||||
|
||||
@Mock
|
||||
private MessageConverter converter;
|
||||
|
||||
@Captor
|
||||
private ArgumentCaptor<Object> payloadCaptor;
|
||||
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
SubscribableChannel channel = Mockito.mock(SubscribableChannel.class);
|
||||
SimpMessageSendingOperations brokerTemplate = new SimpMessagingTemplate(channel);
|
||||
MockitoAnnotations.initMocks(this);
|
||||
|
||||
this.messageHandler = new TestSimpAnnotationMethodMessageHandler(brokerTemplate, channel, channel);
|
||||
SimpMessagingTemplate brokerTemplate = new SimpMessagingTemplate(this.channel);
|
||||
brokerTemplate.setMessageConverter(converter);
|
||||
|
||||
this.messageHandler = new TestSimpAnnotationMethodMessageHandler(brokerTemplate, this.channel, this.channel);
|
||||
this.messageHandler.setApplicationContext(new StaticApplicationContext());
|
||||
this.messageHandler.setValidator(new StringTestValidator(TEST_INVALID_VALUE));
|
||||
this.messageHandler.afterPropertiesSet();
|
||||
|
|
@ -225,6 +247,53 @@ public class SimpAnnotationMethodMessageHandlerTests {
|
|||
assertEquals("handleFoo", controller.method);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void listenableFutureSuccess() {
|
||||
|
||||
given(this.channel.send(any(Message.class))).willReturn(true);
|
||||
given(this.converter.toMessage(anyObject(), any(MessageHeaders.class)))
|
||||
.willReturn((Message) MessageBuilder.withPayload(new byte[0]).build());
|
||||
|
||||
|
||||
ListenableFutureController controller = new ListenableFutureController();
|
||||
this.messageHandler.registerHandler(controller);
|
||||
this.messageHandler.setDestinationPrefixes(Arrays.asList("/app1", "/app2/"));
|
||||
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create();
|
||||
headers.setSessionId("session1");
|
||||
headers.setSessionAttributes(new HashMap<>());
|
||||
headers.setDestination("/app1/listenable-future/success");
|
||||
Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
|
||||
this.messageHandler.handleMessage(message);
|
||||
|
||||
assertNotNull(controller.future);
|
||||
controller.future.run();
|
||||
verify(this.converter).toMessage(this.payloadCaptor.capture(), any(MessageHeaders.class));
|
||||
assertEquals("foo", this.payloadCaptor.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void listenableFutureFailure() {
|
||||
|
||||
given(this.channel.send(any(Message.class))).willReturn(true);
|
||||
given(this.converter.toMessage(anyObject(), any(MessageHeaders.class)))
|
||||
.willReturn((Message) MessageBuilder.withPayload(new byte[0]).build());
|
||||
|
||||
ListenableFutureController controller = new ListenableFutureController();
|
||||
this.messageHandler.registerHandler(controller);
|
||||
this.messageHandler.setDestinationPrefixes(Arrays.asList("/app1", "/app2/"));
|
||||
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create();
|
||||
headers.setSessionId("session1");
|
||||
headers.setSessionAttributes(new HashMap<>());
|
||||
headers.setDestination("/app1/listenable-future/failure");
|
||||
Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
|
||||
this.messageHandler.handleMessage(message);
|
||||
|
||||
controller.future.run();
|
||||
assertTrue(controller.exceptionCatched);
|
||||
}
|
||||
|
||||
|
||||
private static class TestSimpAnnotationMethodMessageHandler extends SimpAnnotationMethodMessageHandler {
|
||||
|
||||
|
|
@ -316,6 +385,34 @@ public class SimpAnnotationMethodMessageHandlerTests {
|
|||
}
|
||||
}
|
||||
|
||||
@Controller
|
||||
@MessageMapping("listenable-future")
|
||||
private static class ListenableFutureController {
|
||||
|
||||
private ListenableFutureTask<String> future;
|
||||
private boolean exceptionCatched = false;
|
||||
|
||||
@MessageMapping("success")
|
||||
public ListenableFutureTask<String> handleListenableFuture() {
|
||||
this.future = new ListenableFutureTask<String>(() -> "foo");
|
||||
return this.future;
|
||||
}
|
||||
|
||||
@MessageMapping("failure")
|
||||
public ListenableFutureTask<String> handleListenableFutureException() {
|
||||
this.future = new ListenableFutureTask<String>(() -> {
|
||||
throw new IllegalStateException();
|
||||
});
|
||||
return this.future;
|
||||
}
|
||||
|
||||
@MessageExceptionHandler(IllegalStateException.class)
|
||||
public void handleValidationException() {
|
||||
this.exceptionCatched = true;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
private static class StringTestValidator implements Validator {
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue