Support DestinationVariable on RSocket handlers

Prior to this commit, the pattern destination variables were not set in
the message headers prior to calling the handler. In this case, the
`DestinationVariableMethodArgumentResolver` could not get the
destination variables from the message headers and resolve those as
handler arguments.

This commit mutates the message headers if the message destination
contains patterns.

Fixes gh-22776
This commit is contained in:
Brian Clozel 2019-04-09 22:56:08 +02:00
parent a0826a20c3
commit cd69a4a03b
4 changed files with 47 additions and 4 deletions

View File

@ -24,9 +24,12 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import reactor.core.publisher.Mono;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
@ -39,6 +42,7 @@ import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.CompositeMessageCondition;
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
import org.springframework.messaging.handler.HandlerMethod;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.support.AnnotationExceptionHandlerMethodResolver;
import org.springframework.messaging.handler.invocation.AbstractExceptionHandlerMethodResolver;
@ -46,9 +50,11 @@ import org.springframework.messaging.handler.invocation.reactive.AbstractEncoder
import org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler;
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.stereotype.Controller;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.PathMatcher;
import org.springframework.util.StringValueResolver;
import org.springframework.validation.Validator;
@ -311,4 +317,19 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C
return new AnnotationExceptionHandlerMethodResolver(beanType);
}
@Override
protected Mono<Void> handleMatch(CompositeMessageCondition mapping, HandlerMethod handlerMethod, Message<?> message) {
Set<String> patterns = mapping.getCondition(DestinationPatternsMessageCondition.class).getPatterns();
if (!CollectionUtils.isEmpty(patterns)) {
String pattern = patterns.iterator().next();
String destination = getDestination(message);
Map<String, String> vars = getPathMatcher().extractUriTemplateVariables(pattern, destination);
if (!CollectionUtils.isEmpty(vars)) {
MessageHeaderAccessor mha = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
Assert.state(mha != null && mha.isMutable(), "Mutable MessageHeaderAccessor required");
mha.setHeader(DestinationVariableMethodArgumentResolver.DESTINATION_TEMPLATE_VARIABLES_HEADER, vars);
}
}
return super.handleMatch(mapping, handlerMethod, message);
}
}

View File

@ -381,7 +381,11 @@ public abstract class AbstractMethodMessageHandler<T>
// handleNoMatch would have been invoked already
return Mono.empty();
}
HandlerMethod handlerMethod = match.getHandlerMethod().createWithResolvedBean();
return handleMatch(match.mapping, match.handlerMethod, message);
}
protected Mono<Void> handleMatch(T mapping, HandlerMethod handlerMethod, Message<?> message) {
handlerMethod = handlerMethod.createWithResolvedBean();
return this.invocableHelper.handleMessage(handlerMethod, message);
}

View File

@ -180,6 +180,7 @@ class MessagingRSocket extends AbstractRSocket {
private MessageHeaders createHeaders(String destination, @Nullable MonoProcessor<?> replyMono) {
MessageHeaderAccessor headers = new MessageHeaderAccessor();
headers.setLeaveMutable(true);
headers.setHeader(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, destination);
if (this.dataMimeType != null) {
headers.setContentType(this.dataMimeType);

View File

@ -39,10 +39,13 @@ import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.invocation.reactive.TestEncoderMethodReturnValueHandler;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.stereotype.Controller;
import static java.nio.charset.StandardCharsets.*;
@ -89,6 +92,13 @@ public class MessageMappingMessageHandlerTests {
verifyOutputContent(Collections.singletonList("abcdef::response"));
}
@Test
public void handleWithDestinationVariable() {
MessageMappingMessageHandler messsageHandler = initMesssageHandler();
messsageHandler.handleMessage(message("destination.test", "abcdef")).block(Duration.ofSeconds(5));
verifyOutputContent(Collections.singletonList("test::abcdef::response"));
}
@Test
public void handleException() {
MessageMappingMessageHandler messsageHandler = initMesssageHandler();
@ -143,9 +153,11 @@ public class MessageMappingMessageHandlerTests {
}
private Message<?> message(String destination, String... content) {
return new GenericMessage<>(
Flux.fromIterable(Arrays.asList(content)).map(payload -> toDataBuffer(payload)),
Collections.singletonMap(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, destination));
Flux<DataBuffer> payload = Flux.fromIterable(Arrays.asList(content)).map(parts -> toDataBuffer(parts));
MessageHeaderAccessor headers = new MessageHeaderAccessor();
headers.setLeaveMutable(true);
headers.setHeader(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, destination);
return MessageBuilder.createMessage(payload, headers.getMessageHeaders());
}
private DataBuffer toDataBuffer(String payload) {
@ -181,6 +193,11 @@ public class MessageMappingMessageHandlerTests {
return payload + "::response";
}
@MessageMapping("destination.{variable}")
String handleWithDestinationVariable(@DestinationVariable String variable, String payload) {
return variable + "::" + payload + "::response";
}
@MessageMapping("exception")
String handleAndThrow() {
throw new IllegalArgumentException("rejected");