Explicit handling of void return values
Do give HandlerMethodReturnValueHandler's a chance to handle return values so the RSocket reply header is always set. See gh-21987
This commit is contained in:
parent
fa95b010cb
commit
23b39ad27b
|
|
@ -104,6 +104,10 @@ public abstract class AbstractEncoderMethodReturnValueHandler implements Handler
|
|||
public Mono<Void> handleReturnValue(
|
||||
@Nullable Object returnValue, MethodParameter returnType, Message<?> message) {
|
||||
|
||||
if (returnValue == null) {
|
||||
return handleNoContent(returnType, message);
|
||||
}
|
||||
|
||||
DataBufferFactory bufferFactory = (DataBufferFactory) message.getHeaders()
|
||||
.getOrDefault(HandlerMethodReturnValueHandler.DATA_BUFFER_FACTORY_HEADER, this.defaultBufferFactory);
|
||||
|
||||
|
|
@ -202,4 +206,13 @@ public abstract class AbstractEncoderMethodReturnValueHandler implements Handler
|
|||
protected abstract Mono<Void> handleEncodedContent(
|
||||
Flux<DataBuffer> encodedContent, MethodParameter returnType, Message<?> message);
|
||||
|
||||
/**
|
||||
* Invoked for a {@code null} return value, which could mean a void method
|
||||
* or method returning an async type parameterized by void.
|
||||
* @param returnType return type of the handler method that produced the data
|
||||
* @param message the input message handled by the handler method
|
||||
* @return completion {@code Mono<Void>} for the handling
|
||||
*/
|
||||
protected abstract Mono<Void> handleNoContent(MethodParameter returnType, Message<?> message);
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -182,6 +182,7 @@ class InvocableHelper {
|
|||
logger.debug("Invoking " + invocable.getShortLogMessage());
|
||||
}
|
||||
return invocable.invoke(message)
|
||||
.switchIfEmpty(Mono.defer(() -> handleReturnValue(null, invocable, message)))
|
||||
.flatMap(returnValue -> handleReturnValue(returnValue, invocable, message))
|
||||
.onErrorResume(ex -> {
|
||||
InvocableHandlerMethod exHandler = initExceptionHandlerMethod(handlerMethod, ex);
|
||||
|
|
@ -192,6 +193,7 @@ class InvocableHelper {
|
|||
logger.debug("Invoking " + exHandler.getShortLogMessage());
|
||||
}
|
||||
return exHandler.invoke(message, ex)
|
||||
.switchIfEmpty(Mono.defer(() -> handleReturnValue(null, exHandler, message)))
|
||||
.flatMap(returnValue -> handleReturnValue(returnValue, exHandler, message));
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,7 +32,6 @@ import org.springframework.core.io.buffer.DataBufferUtils;
|
|||
import org.springframework.core.io.buffer.PooledDataBuffer;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageDeliveryException;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
|
||||
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler;
|
||||
|
|
@ -123,6 +122,7 @@ class MessagingRSocket extends AbstractRSocket {
|
|||
private Mono<Void> handle(Payload payload) {
|
||||
Message<?> message = MessageBuilder.createMessage(
|
||||
Mono.fromCallable(() -> wrapPayloadData(payload)), createHeaders(payload, null));
|
||||
|
||||
return this.handler.apply(message);
|
||||
}
|
||||
|
||||
|
|
@ -131,10 +131,11 @@ class MessagingRSocket extends AbstractRSocket {
|
|||
Message<?> message = MessageBuilder.createMessage(
|
||||
payloads.map(this::wrapPayloadData).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release),
|
||||
createHeaders(firstPayload, replyMono));
|
||||
|
||||
return this.handler.apply(message)
|
||||
.thenMany(Flux.defer(() -> replyMono.isTerminated() ?
|
||||
replyMono.flatMapMany(Function.identity()) :
|
||||
Mono.error(new MessageDeliveryException("RSocket request not handled"))));
|
||||
Mono.error(new IllegalStateException("Something went wrong: reply Mono not set"))));
|
||||
}
|
||||
|
||||
private MessageHeaders createHeaders(Payload payload, @Nullable MonoProcessor<?> replyMono) {
|
||||
|
|
|
|||
|
|
@ -21,9 +21,12 @@ import java.util.List;
|
|||
import org.springframework.core.codec.Decoder;
|
||||
import org.springframework.core.codec.Encoder;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageDeliveryException;
|
||||
import org.springframework.messaging.handler.annotation.support.reactive.MessageMappingMessageHandler;
|
||||
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* RSocket-specific extension of {@link MessageMappingMessageHandler}.
|
||||
|
|
@ -105,4 +108,18 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
|
|||
return handlers;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handleNoMatch(@Nullable String destination, Message<?> message) {
|
||||
|
||||
// MessagingRSocket will raise an error anyway if reply Mono is expected
|
||||
// Here we raise a more helpful message a destination is present
|
||||
|
||||
// It is OK if some messages (ConnectionSetupPayload, metadataPush) are not handled
|
||||
// We need a better way to avoid raising errors for those
|
||||
|
||||
if (StringUtils.hasText(destination)) {
|
||||
throw new MessageDeliveryException("No handler for destination '" + destination + "'");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ import org.springframework.core.MethodParameter;
|
|||
import org.springframework.core.ReactiveAdapterRegistry;
|
||||
import org.springframework.core.codec.Encoder;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.handler.invocation.reactive.AbstractEncoderMethodReturnValueHandler;
|
||||
import org.springframework.util.Assert;
|
||||
|
|
@ -58,15 +59,28 @@ public class RSocketPayloadReturnValueHandler extends AbstractEncoderMethodRetur
|
|||
protected Mono<Void> handleEncodedContent(
|
||||
Flux<DataBuffer> encodedContent, MethodParameter returnType, Message<?> message) {
|
||||
|
||||
Object headerValue = message.getHeaders().get(RESPONSE_HEADER);
|
||||
Assert.notNull(headerValue, "Missing '" + RESPONSE_HEADER + "'");
|
||||
Assert.isInstanceOf(MonoProcessor.class, headerValue, "Expected MonoProcessor");
|
||||
|
||||
MonoProcessor<Flux<Payload>> monoProcessor = (MonoProcessor<Flux<Payload>>) headerValue;
|
||||
monoProcessor.onNext(encodedContent.map(PayloadUtils::createPayload));
|
||||
monoProcessor.onComplete();
|
||||
|
||||
MonoProcessor<Flux<Payload>> replyMono = getReplyMono(message);
|
||||
Assert.notNull(replyMono, "Missing '" + RESPONSE_HEADER + "'");
|
||||
replyMono.onNext(encodedContent.map(PayloadUtils::createPayload));
|
||||
replyMono.onComplete();
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Mono<Void> handleNoContent(MethodParameter returnType, Message<?> message) {
|
||||
MonoProcessor<Flux<Payload>> replyMono = getReplyMono(message);
|
||||
if (replyMono != null) {
|
||||
replyMono.onComplete();
|
||||
}
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@SuppressWarnings("unchecked")
|
||||
private MonoProcessor<Flux<Payload>> getReplyMono(Message<?> message) {
|
||||
Object headerValue = message.getHeaders().get(RESPONSE_HEADER);
|
||||
Assert.state(headerValue == null || headerValue instanceof MonoProcessor, "Expected MonoProcessor");
|
||||
return (MonoProcessor<Flux<Payload>>) headerValue;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -60,4 +60,10 @@ public class TestEncoderMethodReturnValueHandler extends AbstractEncoderMethodRe
|
|||
this.encodedContent = encodedContent.cache();
|
||||
return this.encodedContent.then();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Mono<Void> handleNoContent(MethodParameter returnType, Message<?> message) {
|
||||
this.encodedContent = Flux.empty();
|
||||
return Mono.empty();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -111,83 +111,73 @@ public class RSocketClientToServerIntegrationTests {
|
|||
|
||||
@Test
|
||||
public void echo() {
|
||||
|
||||
Flux<String> result = Flux.range(1, 3).concatMap(i ->
|
||||
requester.route("echo").data("Hello " + i).retrieveMono(String.class));
|
||||
|
||||
StepVerifier.create(result)
|
||||
.expectNext("Hello 1")
|
||||
.expectNext("Hello 2")
|
||||
.expectNext("Hello 3")
|
||||
.expectNext("Hello 1").expectNext("Hello 2").expectNext("Hello 3")
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void echoAsync() {
|
||||
|
||||
Flux<String> result = Flux.range(1, 3).concatMap(i ->
|
||||
requester.route("echo-async").data("Hello " + i).retrieveMono(String.class));
|
||||
|
||||
StepVerifier.create(result)
|
||||
.expectNext("Hello 1 async")
|
||||
.expectNext("Hello 2 async")
|
||||
.expectNext("Hello 3 async")
|
||||
.expectNext("Hello 1 async").expectNext("Hello 2 async").expectNext("Hello 3 async")
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void echoStream() {
|
||||
|
||||
Flux<String> result = requester.route("echo-stream").data("Hello").retrieveFlux(String.class);
|
||||
|
||||
StepVerifier.create(result)
|
||||
.expectNext("Hello 0")
|
||||
.expectNextCount(5)
|
||||
.expectNext("Hello 6")
|
||||
.expectNext("Hello 7")
|
||||
.expectNext("Hello 0").expectNextCount(6).expectNext("Hello 7")
|
||||
.thenCancel()
|
||||
.verify();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void echoChannel() {
|
||||
|
||||
Flux<String> result = requester.route("echo-channel")
|
||||
.data(Flux.range(1, 10).map(i -> "Hello " + i), String.class)
|
||||
.retrieveFlux(String.class);
|
||||
|
||||
StepVerifier.create(result)
|
||||
.expectNext("Hello 1 async")
|
||||
.expectNextCount(7)
|
||||
.expectNext("Hello 9 async")
|
||||
.expectNext("Hello 10 async")
|
||||
.expectNext("Hello 1 async").expectNextCount(8).expectNext("Hello 10 async")
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void voidReturnValue() {
|
||||
Flux<String> result = requester.route("void-return-value").data("Hello").retrieveFlux(String.class);
|
||||
StepVerifier.create(result).verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void voidReturnValueFromExceptionHandler() {
|
||||
Flux<String> result = requester.route("void-return-value").data("bad").retrieveFlux(String.class);
|
||||
StepVerifier.create(result).verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void handleWithThrownException() {
|
||||
|
||||
Mono<String> result = requester.route("thrown-exception").data("a").retrieveMono(String.class);
|
||||
|
||||
StepVerifier.create(result)
|
||||
.expectNext("Invalid input error handled")
|
||||
.verifyComplete();
|
||||
StepVerifier.create(result).expectNext("Invalid input error handled").verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void handleWithErrorSignal() {
|
||||
|
||||
Mono<String> result = requester.route("error-signal").data("a").retrieveMono(String.class);
|
||||
|
||||
StepVerifier.create(result)
|
||||
.expectNext("Invalid input error handled")
|
||||
.verifyComplete();
|
||||
StepVerifier.create(result).expectNext("Invalid input error handled").verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void noMatchingRoute() {
|
||||
Mono<String> result = requester.route("invalid").data("anything").retrieveMono(String.class);
|
||||
StepVerifier.create(result).verifyErrorMessage("RSocket request not handled");
|
||||
StepVerifier.create(result).verifyErrorMessage("No handler for destination 'invalid'");
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -232,10 +222,22 @@ public class RSocketClientToServerIntegrationTests {
|
|||
return Mono.error(new IllegalArgumentException("Invalid input error"));
|
||||
}
|
||||
|
||||
@MessageMapping("void-return-value")
|
||||
Mono<Void> voidReturnValue(String payload) {
|
||||
return !payload.equals("bad") ?
|
||||
Mono.delay(Duration.ofMillis(10)).then(Mono.empty()) :
|
||||
Mono.error(new IllegalStateException("bad"));
|
||||
}
|
||||
|
||||
@MessageExceptionHandler
|
||||
Mono<String> handleException(IllegalArgumentException ex) {
|
||||
return Mono.delay(Duration.ofMillis(10)).map(aLong -> ex.getMessage() + " handled");
|
||||
}
|
||||
|
||||
@MessageExceptionHandler
|
||||
Mono<Void> handleExceptionWithVoidReturnValue(IllegalStateException ex) {
|
||||
return Mono.delay(Duration.ofMillis(10)).then(Mono.empty());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue