@ConnectMapping for RSocket handling
The new annotation helps to differentiate the handling of connection level frames (SETUP and METADATA_PUSH) from the 4 stream requests. Closes gh-23177
This commit is contained in:
parent
507d128e1d
commit
c199cb9054
|
|
@ -96,6 +96,11 @@ public class SimpleRouteMatcher implements RouteMatcher {
|
|||
public String value() {
|
||||
return this.path;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return value();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -224,20 +224,34 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C
|
|||
return methodCondition;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine the mapping condition for the given annotated element.
|
||||
* @param element the element to check
|
||||
* @return the condition, or {@code null}
|
||||
*/
|
||||
@Nullable
|
||||
private CompositeMessageCondition getCondition(AnnotatedElement element) {
|
||||
protected CompositeMessageCondition getCondition(AnnotatedElement element) {
|
||||
MessageMapping annot = AnnotatedElementUtils.findMergedAnnotation(element, MessageMapping.class);
|
||||
if (annot == null || annot.value().length == 0) {
|
||||
return null;
|
||||
}
|
||||
String[] destinations = annot.value();
|
||||
String[] patterns = processDestinations(annot.value());
|
||||
return new CompositeMessageCondition(
|
||||
new DestinationPatternsMessageCondition(patterns, this.routeMatcher));
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve placeholders in the given destinations.
|
||||
* @param destinations the destinations
|
||||
* @return new array with the processed destinations or the same array
|
||||
*/
|
||||
protected String[] processDestinations(String[] destinations) {
|
||||
if (this.valueResolver != null) {
|
||||
destinations = Arrays.stream(annot.value())
|
||||
destinations = Arrays.stream(destinations)
|
||||
.map(s -> this.valueResolver.resolveStringValue(s))
|
||||
.toArray(String[]::new);
|
||||
}
|
||||
return new CompositeMessageCondition(
|
||||
new DestinationPatternsMessageCondition(destinations, this.routeMatcher));
|
||||
return destinations;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -274,12 +274,13 @@ public abstract class AbstractMethodMessageHandler<T>
|
|||
}
|
||||
Predicate<Class<?>> predicate = this.handlerPredicate;
|
||||
if (predicate == null) {
|
||||
logger.warn("[" + getBeanName() + "] Auto-detection of message handling methods is off.");
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("[" + getBeanName() + "] Skip auto-detection of message handling methods");
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (this.applicationContext == null) {
|
||||
logger.warn("No ApplicationContext available " +
|
||||
"for auto-detection of beans with message handling methods.");
|
||||
logger.warn("No ApplicationContext for auto-detection of beans with message handling methods.");
|
||||
return;
|
||||
}
|
||||
for (String beanName : this.applicationContext.getBeanNamesForType(Object.class)) {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.messaging.rsocket.annotation;
|
||||
|
||||
import java.lang.annotation.Documented;
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
import io.rsocket.ConnectionSetupPayload;
|
||||
|
||||
/**
|
||||
* Annotation to map the initial {@link ConnectionSetupPayload
|
||||
* ConnectionSetupPayload} and subsequent metadata pushes onto a handler method.
|
||||
*
|
||||
* <p>This is a method-level annotation that can be combined with a type-level
|
||||
* {@link org.springframework.messaging.handler.annotation.MessageMapping @MessageMapping}
|
||||
* for a combined route pattern. It supports the same arguments as
|
||||
* {@code @MessageMapping} but does not support any return values. On a server,
|
||||
* handling can be asynchronous, e.g. with a {@code Mono<Void>} return value,
|
||||
* in which case the connection is accepted if and when handling completes.
|
||||
* On the client side this method is only a callback and does not affect the
|
||||
* establishment of the connection.
|
||||
*
|
||||
* <p><strong>Note:</strong> an {@code @ConnectMapping} method may start
|
||||
* requests to the remote through an
|
||||
* {@link org.springframework.messaging.rsocket.RSocketRequester RSocketRequester}
|
||||
* method argument, but it must do so independent of the handling thread (e.g.
|
||||
* via subscribing on a different thread).
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 5.2
|
||||
*/
|
||||
@Target(ElementType.METHOD)
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Documented
|
||||
public @interface ConnectMapping {
|
||||
|
||||
/**
|
||||
* Mappings expressed by this annotation to match to the route from the
|
||||
* metadata of the initial {@link ConnectionSetupPayload} or in
|
||||
* subsequent metadata pushes.
|
||||
*
|
||||
* <p>Depending on the configured
|
||||
* {@link org.springframework.util.RouteMatcher RouteMatcher} the pattern may be
|
||||
* {@link org.springframework.util.AntPathMatcher AntPathMatcher} or
|
||||
* {@link org.springframework.web.util.pattern.PathPattern PathPattern} based
|
||||
*
|
||||
* <p>By default this is an empty array in which case it matches all
|
||||
* {@link ConnectionSetupPayload} and metadata pushes.
|
||||
*/
|
||||
String[] value() default {};
|
||||
|
||||
}
|
||||
|
|
@ -24,6 +24,7 @@ import io.rsocket.AbstractRSocket;
|
|||
import io.rsocket.ConnectionSetupPayload;
|
||||
import io.rsocket.Payload;
|
||||
import io.rsocket.RSocket;
|
||||
import io.rsocket.frame.FrameType;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
|
@ -103,7 +104,7 @@ class MessagingRSocket extends AbstractRSocket {
|
|||
|
||||
/**
|
||||
* Wrap the {@link ConnectionSetupPayload} with a {@link Message} and
|
||||
* delegate to {@link #handle(Payload)} for handling.
|
||||
* delegate to {@link #handle(Payload, FrameType)} for handling.
|
||||
* @param payload the connection payload
|
||||
* @return completion handle for success or error
|
||||
*/
|
||||
|
|
@ -111,23 +112,23 @@ class MessagingRSocket extends AbstractRSocket {
|
|||
// frameDecoder does not apply to connectionSetupPayload
|
||||
// so retain here since handle expects it..
|
||||
payload.retain();
|
||||
return handle(payload);
|
||||
return handle(payload, FrameType.SETUP);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Mono<Void> fireAndForget(Payload payload) {
|
||||
return handle(payload);
|
||||
return handle(payload, FrameType.REQUEST_FNF);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Payload> requestResponse(Payload payload) {
|
||||
return handleAndReply(payload, Flux.just(payload)).next();
|
||||
return handleAndReply(payload, FrameType.REQUEST_RESPONSE, Flux.just(payload)).next();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Payload> requestStream(Payload payload) {
|
||||
return handleAndReply(payload, Flux.just(payload));
|
||||
return handleAndReply(payload, FrameType.REQUEST_STREAM, Flux.just(payload));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -135,19 +136,20 @@ class MessagingRSocket extends AbstractRSocket {
|
|||
return Flux.from(payloads)
|
||||
.switchOnFirst((signal, innerFlux) -> {
|
||||
Payload firstPayload = signal.get();
|
||||
return firstPayload == null ? innerFlux : handleAndReply(firstPayload, innerFlux);
|
||||
return firstPayload == null ? innerFlux :
|
||||
handleAndReply(firstPayload, FrameType.REQUEST_CHANNEL, innerFlux);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> metadataPush(Payload payload) {
|
||||
// Not very useful until createHeaders does more with metadata
|
||||
return handle(payload);
|
||||
return handle(payload, FrameType.METADATA_PUSH);
|
||||
}
|
||||
|
||||
|
||||
private Mono<Void> handle(Payload payload) {
|
||||
MessageHeaders headers = createHeaders(payload, null);
|
||||
private Mono<Void> handle(Payload payload, FrameType frameType) {
|
||||
MessageHeaders headers = createHeaders(payload, frameType, null);
|
||||
DataBuffer dataBuffer = retainDataAndReleasePayload(payload);
|
||||
int refCount = refCount(dataBuffer);
|
||||
Message<?> message = MessageBuilder.createMessage(dataBuffer, headers);
|
||||
|
|
@ -164,9 +166,9 @@ class MessagingRSocket extends AbstractRSocket {
|
|||
((NettyDataBuffer) dataBuffer).getNativeBuffer().refCnt() : 1;
|
||||
}
|
||||
|
||||
private Flux<Payload> handleAndReply(Payload firstPayload, Flux<Payload> payloads) {
|
||||
private Flux<Payload> handleAndReply(Payload firstPayload, FrameType frameType, Flux<Payload> payloads) {
|
||||
MonoProcessor<Flux<Payload>> replyMono = MonoProcessor.create();
|
||||
MessageHeaders headers = createHeaders(firstPayload, replyMono);
|
||||
MessageHeaders headers = createHeaders(firstPayload, frameType, replyMono);
|
||||
|
||||
AtomicBoolean read = new AtomicBoolean();
|
||||
Flux<DataBuffer> buffers = payloads.map(this::retainDataAndReleasePayload).doOnSubscribe(s -> read.set(true));
|
||||
|
|
@ -188,7 +190,9 @@ class MessagingRSocket extends AbstractRSocket {
|
|||
return PayloadUtils.retainDataAndReleasePayload(payload, this.bufferFactory);
|
||||
}
|
||||
|
||||
private MessageHeaders createHeaders(Payload payload, @Nullable MonoProcessor<?> replyMono) {
|
||||
private MessageHeaders createHeaders(Payload payload, FrameType frameType,
|
||||
@Nullable MonoProcessor<?> replyMono) {
|
||||
|
||||
MessageHeaderAccessor headers = new MessageHeaderAccessor();
|
||||
headers.setLeaveMutable(true);
|
||||
|
||||
|
|
@ -205,6 +209,7 @@ class MessagingRSocket extends AbstractRSocket {
|
|||
}
|
||||
|
||||
headers.setContentType(this.dataMimeType);
|
||||
headers.setHeader(RSocketFrameTypeMessageCondition.FRAME_TYPE_HEADER, frameType);
|
||||
headers.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, this.requester);
|
||||
if (replyMono != null) {
|
||||
headers.setHeader(RSocketPayloadReturnValueHandler.RESPONSE_HEADER, replyMono);
|
||||
|
|
|
|||
|
|
@ -0,0 +1,141 @@
|
|||
/*
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.messaging.rsocket.annotation.support;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import io.rsocket.frame.FrameType;
|
||||
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.handler.AbstractMessageCondition;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* A condition to assist with mapping onto handler methods based on the RSocket
|
||||
* frame type. This helps to separate the handling of connection-level frame
|
||||
* types, i.e. {@code SETUP} and {@code METADATA_PUSH}, from the handling of
|
||||
* stream requests.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 5.2
|
||||
*/
|
||||
public class RSocketFrameTypeMessageCondition extends AbstractMessageCondition<RSocketFrameTypeMessageCondition> {
|
||||
|
||||
/** The name of the header that contains the RSocket frame type being processed. */
|
||||
public static final String FRAME_TYPE_HEADER = "rsocketFrameType";
|
||||
|
||||
|
||||
/** Per FrameType cache to return ready instances from getMatchingCondition. */
|
||||
private static final Map<String, RSocketFrameTypeMessageCondition> frameTypeConditionCache;
|
||||
|
||||
static {
|
||||
frameTypeConditionCache = new HashMap<>(FrameType.values().length);
|
||||
for (FrameType type : FrameType.values()) {
|
||||
frameTypeConditionCache.put(type.name(), new RSocketFrameTypeMessageCondition(type));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private final Set<FrameType> frameTypes;
|
||||
|
||||
|
||||
public RSocketFrameTypeMessageCondition(FrameType... frameType) {
|
||||
this(Arrays.asList(frameType));
|
||||
}
|
||||
|
||||
public RSocketFrameTypeMessageCondition(Collection<FrameType> frameTypes) {
|
||||
Assert.notEmpty(frameTypes, "`frameTypes` are required");
|
||||
this.frameTypes = Collections.unmodifiableSet(new LinkedHashSet<>(frameTypes));
|
||||
}
|
||||
|
||||
|
||||
public Set<FrameType> getFrameTypes() {
|
||||
return this.frameTypes;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<?> getContent() {
|
||||
return this.frameTypes;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getToStringInfix() {
|
||||
return " || ";
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the RSocket frame type in the message headers.
|
||||
* @param message the current message
|
||||
* @return the frame type or {@code null} if not found
|
||||
*/
|
||||
@SuppressWarnings("ConstantConditions")
|
||||
@Nullable
|
||||
public static FrameType getFrameType(Message<?> message) {
|
||||
return (FrameType) message.getHeaders().get(RSocketFrameTypeMessageCondition.FRAME_TYPE_HEADER);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public RSocketFrameTypeMessageCondition combine(RSocketFrameTypeMessageCondition other) {
|
||||
if (this.frameTypes.equals(other.frameTypes)) {
|
||||
return other;
|
||||
}
|
||||
Set<FrameType> set = new LinkedHashSet<>(this.frameTypes);
|
||||
set.addAll(other.frameTypes);
|
||||
return new RSocketFrameTypeMessageCondition(set);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RSocketFrameTypeMessageCondition getMatchingCondition(Message<?> message) {
|
||||
FrameType actual = message.getHeaders().get(FRAME_TYPE_HEADER, FrameType.class);
|
||||
if (actual != null) {
|
||||
for (FrameType type : this.frameTypes) {
|
||||
if (actual == type) {
|
||||
return frameTypeConditionCache.get(type.name());
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(RSocketFrameTypeMessageCondition other, Message<?> message) {
|
||||
return other.frameTypes.size() - this.frameTypes.size();
|
||||
}
|
||||
|
||||
|
||||
/** Condition to match the initial SETUP frame and subsequent metadata pushes. */
|
||||
public static final RSocketFrameTypeMessageCondition CONNECT_CONDITION =
|
||||
new RSocketFrameTypeMessageCondition(
|
||||
FrameType.SETUP,
|
||||
FrameType.METADATA_PUSH);
|
||||
|
||||
/** Condition to match one of the 4 stream request types. */
|
||||
public static final RSocketFrameTypeMessageCondition REQUEST_CONDITION =
|
||||
new RSocketFrameTypeMessageCondition(
|
||||
FrameType.REQUEST_FNF,
|
||||
FrameType.REQUEST_RESPONSE,
|
||||
FrameType.REQUEST_STREAM,
|
||||
FrameType.REQUEST_CHANNEL);
|
||||
|
||||
}
|
||||
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
package org.springframework.messaging.rsocket.annotation.support;
|
||||
|
||||
import java.lang.reflect.AnnotatedElement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.function.BiFunction;
|
||||
|
|
@ -24,18 +25,24 @@ import java.util.function.Function;
|
|||
import io.rsocket.ConnectionSetupPayload;
|
||||
import io.rsocket.RSocket;
|
||||
import io.rsocket.SocketAcceptor;
|
||||
import io.rsocket.frame.FrameType;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.core.ReactiveAdapterRegistry;
|
||||
import org.springframework.core.annotation.AnnotatedElementUtils;
|
||||
import org.springframework.core.codec.Decoder;
|
||||
import org.springframework.core.codec.Encoder;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageDeliveryException;
|
||||
import org.springframework.messaging.handler.CompositeMessageCondition;
|
||||
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
|
||||
import org.springframework.messaging.handler.annotation.MessageMapping;
|
||||
import org.springframework.messaging.handler.annotation.reactive.MessageMappingMessageHandler;
|
||||
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler;
|
||||
import org.springframework.messaging.rsocket.RSocketRequester;
|
||||
import org.springframework.messaging.rsocket.RSocketStrategies;
|
||||
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.MimeType;
|
||||
import org.springframework.util.MimeTypeUtils;
|
||||
|
|
@ -189,50 +196,84 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
|
|||
return handlers;
|
||||
}
|
||||
|
||||
|
||||
@Nullable
|
||||
protected CompositeMessageCondition getCondition(AnnotatedElement element) {
|
||||
MessageMapping annot1 = AnnotatedElementUtils.findMergedAnnotation(element, MessageMapping.class);
|
||||
if (annot1 != null && annot1.value().length > 0) {
|
||||
String[] patterns = processDestinations(annot1.value());
|
||||
return new CompositeMessageCondition(
|
||||
RSocketFrameTypeMessageCondition.REQUEST_CONDITION,
|
||||
new DestinationPatternsMessageCondition(patterns, getRouteMatcher()));
|
||||
}
|
||||
ConnectMapping annot2 = AnnotatedElementUtils.findMergedAnnotation(element, ConnectMapping.class);
|
||||
if (annot2 != null) {
|
||||
String[] patterns = processDestinations(annot2.value());
|
||||
return new CompositeMessageCondition(
|
||||
RSocketFrameTypeMessageCondition.CONNECT_CONDITION,
|
||||
new DestinationPatternsMessageCondition(patterns, getRouteMatcher()));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void handleNoMatch(@Nullable RouteMatcher.Route destination, Message<?> message) {
|
||||
|
||||
// MessagingRSocket will raise an error anyway if reply Mono is expected
|
||||
// Here we raise a more helpful message if a destination is present
|
||||
|
||||
// It is OK if some messages (ConnectionSetupPayload, metadataPush) are not handled
|
||||
// This works but would be better to have a more explicit way to differentiate
|
||||
|
||||
if (destination != null && StringUtils.hasText(destination.value())) {
|
||||
throw new MessageDeliveryException("No handler for destination '" + destination.value() + "'");
|
||||
FrameType frameType = RSocketFrameTypeMessageCondition.getFrameType(message);
|
||||
if (frameType == FrameType.SETUP || frameType == FrameType.METADATA_PUSH) {
|
||||
return; // optional handling
|
||||
}
|
||||
if (frameType == FrameType.REQUEST_FNF) {
|
||||
// Can't propagate error to client, so just log
|
||||
logger.warn("No handler for fireAndForget to '" + destination + "'");
|
||||
return;
|
||||
}
|
||||
throw new MessageDeliveryException("No handler for destination '" + destination + "'");
|
||||
}
|
||||
|
||||
/**
|
||||
* Return an adapter for a
|
||||
* Return an adapter for a server side
|
||||
* {@link io.rsocket.RSocketFactory.ServerRSocketFactory#acceptor(SocketAcceptor)
|
||||
* server acceptor}. The adapter implements a responding {@link RSocket} by
|
||||
* wrapping {@code Payload} data and metadata as {@link Message} and
|
||||
* delegating to this {@link RSocketMessageHandler} to handle and reply.
|
||||
* acceptor} that delegate to this {@link RSocketMessageHandler} for
|
||||
* handling.
|
||||
* <p>The initial {@link ConnectionSetupPayload} can be handled with a
|
||||
* {@link ConnectMapping @ConnectionMapping} method which can be asynchronous
|
||||
* and return {@code Mono<Void>} with an error signal preventing the
|
||||
* connection. Such a method can also start requests to the client but that
|
||||
* must be done decoupled from handling and from the current thread.
|
||||
* <p>Subsequent stream requests can be handled with
|
||||
* {@link MessageMapping MessageMapping} methods.
|
||||
*/
|
||||
public SocketAcceptor serverAcceptor() {
|
||||
return (setupPayload, sendingRSocket) -> {
|
||||
MessagingRSocket rsocket = createRSocket(setupPayload, sendingRSocket);
|
||||
|
||||
// Allow handling of the ConnectionSetupPayload via @MessageMapping methods.
|
||||
// However, if the handling is to make requests to the client, it's expected
|
||||
// it will do so decoupled from the handling, e.g. via .subscribe().
|
||||
return rsocket.handleConnectionSetupPayload(setupPayload).then(Mono.just(rsocket));
|
||||
MessagingRSocket responder = createResponder(setupPayload, sendingRSocket);
|
||||
return responder.handleConnectionSetupPayload(setupPayload).then(Mono.just(responder));
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Return an adapter for a
|
||||
* Return an adapter for a client side
|
||||
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory#acceptor(BiFunction)
|
||||
* client acceptor}. The adapter implements a responding {@link RSocket} by
|
||||
* wrapping {@code Payload} data and metadata as {@link Message} and
|
||||
* delegating to this {@link RSocketMessageHandler} to handle and reply.
|
||||
* acceptor} that delegate to this {@link RSocketMessageHandler} for
|
||||
* handling.
|
||||
* <p>The initial {@link ConnectionSetupPayload} can be processed with a
|
||||
* {@link ConnectMapping @ConnectionMapping} method but, unlike the
|
||||
* server side, such a method is merely a callback and cannot prevent the
|
||||
* connection unless the method throws an error immediately. Such a method
|
||||
* can also start requests to the server but must do so decoupled from
|
||||
* handling and from the current thread.
|
||||
* <p>Subsequent stream requests can be handled with
|
||||
* {@link MessageMapping MessageMapping} methods.
|
||||
*/
|
||||
public BiFunction<ConnectionSetupPayload, RSocket, RSocket> clientAcceptor() {
|
||||
return this::createRSocket;
|
||||
return (setupPayload, sendingRSocket) -> {
|
||||
MessagingRSocket responder = createResponder(setupPayload, sendingRSocket);
|
||||
responder.handleConnectionSetupPayload(setupPayload).subscribe();
|
||||
return responder;
|
||||
};
|
||||
}
|
||||
|
||||
private MessagingRSocket createRSocket(ConnectionSetupPayload setupPayload, RSocket rsocket) {
|
||||
private MessagingRSocket createResponder(ConnectionSetupPayload setupPayload, RSocket rsocket) {
|
||||
String s = setupPayload.dataMimeType();
|
||||
MimeType dataMimeType = StringUtils.hasText(s) ? MimeTypeUtils.parseMimeType(s) : this.defaultDataMimeType;
|
||||
Assert.notNull(dataMimeType, "No `dataMimeType` in ConnectionSetupPayload and no default value");
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import io.rsocket.RSocketFactory;
|
|||
import io.rsocket.frame.decoder.PayloadDecoder;
|
||||
import io.rsocket.transport.netty.server.CloseableChannel;
|
||||
import io.rsocket.transport.netty.server.TcpServerTransport;
|
||||
import io.rsocket.util.ByteBufPayload;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
|
@ -40,6 +41,7 @@ import org.springframework.core.codec.CharSequenceEncoder;
|
|||
import org.springframework.core.codec.StringDecoder;
|
||||
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||
import org.springframework.messaging.handler.annotation.MessageMapping;
|
||||
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
|
||||
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
|
||||
import org.springframework.stereotype.Controller;
|
||||
|
||||
|
|
@ -77,26 +79,26 @@ public class RSocketServerToClientIntegrationTests {
|
|||
|
||||
@Test
|
||||
public void echo() {
|
||||
connectAndVerify("connect.echo");
|
||||
connectAndRunTest("echo");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void echoAsync() {
|
||||
connectAndVerify("connect.echo-async");
|
||||
connectAndRunTest("echo-async");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void echoStream() {
|
||||
connectAndVerify("connect.echo-stream");
|
||||
connectAndRunTest("echo-stream");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void echoChannel() {
|
||||
connectAndVerify("connect.echo-channel");
|
||||
connectAndRunTest("echo-channel");
|
||||
}
|
||||
|
||||
|
||||
private static void connectAndVerify(String destination) {
|
||||
private static void connectAndRunTest(String connectionRoute) {
|
||||
|
||||
ServerController serverController = context.getBean(ServerController.class);
|
||||
serverController.reset();
|
||||
|
|
@ -105,12 +107,15 @@ public class RSocketServerToClientIntegrationTests {
|
|||
try {
|
||||
requester = RSocketRequester.builder()
|
||||
.annotatedHandlers(new ClientHandler())
|
||||
.rsocketFactory(factory -> {
|
||||
factory.metadataMimeType("text/plain");
|
||||
factory.setupPayload(ByteBufPayload.create("", connectionRoute));
|
||||
factory.frameDecoder(PayloadDecoder.ZERO_COPY);
|
||||
})
|
||||
.rsocketStrategies(context.getBean(RSocketStrategies.class))
|
||||
.connectTcp("localhost", server.address().getPort())
|
||||
.block();
|
||||
|
||||
requester.route(destination).data("").send().block();
|
||||
|
||||
serverController.await(Duration.ofSeconds(5));
|
||||
}
|
||||
finally {
|
||||
|
|
@ -138,7 +143,7 @@ public class RSocketServerToClientIntegrationTests {
|
|||
}
|
||||
|
||||
|
||||
@MessageMapping("connect.echo")
|
||||
@ConnectMapping("echo")
|
||||
void echo(RSocketRequester requester) {
|
||||
runTest(() -> {
|
||||
Flux<String> flux = Flux.range(1, 3).concatMap(i ->
|
||||
|
|
@ -153,7 +158,7 @@ public class RSocketServerToClientIntegrationTests {
|
|||
});
|
||||
}
|
||||
|
||||
@MessageMapping("connect.echo-async")
|
||||
@ConnectMapping("echo-async")
|
||||
void echoAsync(RSocketRequester requester) {
|
||||
runTest(() -> {
|
||||
Flux<String> flux = Flux.range(1, 3).concatMap(i ->
|
||||
|
|
@ -168,7 +173,7 @@ public class RSocketServerToClientIntegrationTests {
|
|||
});
|
||||
}
|
||||
|
||||
@MessageMapping("connect.echo-stream")
|
||||
@ConnectMapping("echo-stream")
|
||||
void echoStream(RSocketRequester requester) {
|
||||
runTest(() -> {
|
||||
Flux<String> flux = requester.route("echo-stream").data("Hello").retrieveFlux(String.class);
|
||||
|
|
@ -183,7 +188,7 @@ public class RSocketServerToClientIntegrationTests {
|
|||
});
|
||||
}
|
||||
|
||||
@MessageMapping("connect.echo-channel")
|
||||
@ConnectMapping("echo-channel")
|
||||
void echoChannel(RSocketRequester requester) {
|
||||
runTest(() -> {
|
||||
Flux<String> flux = requester.route("echo-channel")
|
||||
|
|
@ -195,8 +200,7 @@ public class RSocketServerToClientIntegrationTests {
|
|||
.expectNextCount(7)
|
||||
.expectNext("Hello 9 async")
|
||||
.expectNext("Hello 10 async")
|
||||
.thenCancel() // https://github.com/rsocket/rsocket-java/issues/613
|
||||
.verify(Duration.ofSeconds(5));
|
||||
.verifyComplete();
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.messaging.rsocket.annotation.support;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import io.rsocket.frame.FrameType;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* Unit tests for {@link RSocketFrameTypeMessageCondition}.
|
||||
* @author Rossen Stoyanchev
|
||||
*/
|
||||
public class RSocketFrameTypeMessageConditionTests {
|
||||
|
||||
@Test
|
||||
public void getMatchingCondition() {
|
||||
Message<?> message = message(FrameType.REQUEST_RESPONSE);
|
||||
RSocketFrameTypeMessageCondition condition = condition(FrameType.REQUEST_FNF, FrameType.REQUEST_RESPONSE);
|
||||
RSocketFrameTypeMessageCondition actual = condition.getMatchingCondition(message);
|
||||
|
||||
assertThat(actual).isNotNull();
|
||||
assertThat(actual.getFrameTypes()).hasSize(1).containsOnly(FrameType.REQUEST_RESPONSE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void compareTo() {
|
||||
Message<byte[]> message = message(null);
|
||||
assertThat(condition(FrameType.SETUP).compareTo(condition(FrameType.SETUP), message)).isEqualTo(0);
|
||||
assertThat(condition(FrameType.SETUP).compareTo(condition(FrameType.METADATA_PUSH), message)).isEqualTo(0);
|
||||
}
|
||||
|
||||
private Message<byte[]> message(@Nullable FrameType frameType) {
|
||||
MessageBuilder<byte[]> builder = MessageBuilder.withPayload(new byte[0]);
|
||||
if (frameType != null) {
|
||||
builder.setHeader(RSocketFrameTypeMessageCondition.FRAME_TYPE_HEADER, frameType);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private RSocketFrameTypeMessageCondition condition(FrameType... frameType) {
|
||||
return new RSocketFrameTypeMessageCondition(Arrays.asList(frameType));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,131 @@
|
|||
/*
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.messaging.rsocket.annotation.support;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import io.rsocket.frame.FrameType;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.core.codec.CharSequenceEncoder;
|
||||
import org.springframework.core.codec.StringDecoder;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.handler.CompositeMessageCondition;
|
||||
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
|
||||
import org.springframework.messaging.handler.HandlerMethod;
|
||||
import org.springframework.messaging.handler.annotation.MessageMapping;
|
||||
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.messaging.support.MessageHeaderAccessor;
|
||||
import org.springframework.util.AntPathMatcher;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
import org.springframework.util.RouteMatcher;
|
||||
import org.springframework.util.SimpleRouteMatcher;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||
|
||||
/**
|
||||
* Unit tests for {@link RSocketMessageHandler}.
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 5.2
|
||||
*/
|
||||
public class RSocketMessageHandlerTests {
|
||||
|
||||
@Test
|
||||
public void mappings() {
|
||||
testMapping(new SimpleController(), "path");
|
||||
testMapping(new TypeLevelMappingController(), "base.path");
|
||||
testMapping(new HandleAllController());
|
||||
}
|
||||
|
||||
private static void testMapping(Object controller, String... expectedPatterns) {
|
||||
RSocketMessageHandler handler = new RSocketMessageHandler();
|
||||
handler.setDecoders(Collections.singletonList(StringDecoder.allMimeTypes()));
|
||||
handler.setEncoders(Collections.singletonList(CharSequenceEncoder.allMimeTypes()));
|
||||
handler.setHandlers(Collections.singletonList(controller));
|
||||
handler.afterPropertiesSet();
|
||||
|
||||
Map<CompositeMessageCondition, HandlerMethod> map = handler.getHandlerMethods();
|
||||
assertThat(map).hasSize(1);
|
||||
|
||||
CompositeMessageCondition condition = map.entrySet().iterator().next().getKey();
|
||||
RSocketFrameTypeMessageCondition c1 = condition.getCondition(RSocketFrameTypeMessageCondition.class);
|
||||
assertThat(c1.getFrameTypes()).contains(FrameType.SETUP, FrameType.METADATA_PUSH);
|
||||
|
||||
DestinationPatternsMessageCondition c2 = condition.getCondition(DestinationPatternsMessageCondition.class);
|
||||
if (ObjectUtils.isEmpty(expectedPatterns)) {
|
||||
assertThat(c2.getPatterns()).isEmpty();
|
||||
}
|
||||
else {
|
||||
assertThat(c2.getPatterns()).contains(expectedPatterns);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void handleNoMatch() {
|
||||
|
||||
testHandleNoMatch(FrameType.SETUP);
|
||||
testHandleNoMatch(FrameType.METADATA_PUSH);
|
||||
testHandleNoMatch(FrameType.REQUEST_FNF);
|
||||
|
||||
assertThatThrownBy(() -> testHandleNoMatch(FrameType.REQUEST_RESPONSE))
|
||||
.hasMessage("No handler for destination 'path'");
|
||||
}
|
||||
|
||||
private static void testHandleNoMatch(FrameType frameType) {
|
||||
RSocketMessageHandler handler = new RSocketMessageHandler();
|
||||
handler.setDecoders(Collections.singletonList(StringDecoder.allMimeTypes()));
|
||||
handler.setEncoders(Collections.singletonList(CharSequenceEncoder.allMimeTypes()));
|
||||
handler.afterPropertiesSet();
|
||||
|
||||
RouteMatcher matcher = new SimpleRouteMatcher(new AntPathMatcher("."));
|
||||
RouteMatcher.Route route = matcher.parseRoute("path");
|
||||
|
||||
MessageHeaderAccessor headers = new MessageHeaderAccessor();
|
||||
headers.setHeader(RSocketFrameTypeMessageCondition.FRAME_TYPE_HEADER, frameType);
|
||||
Message<Object> message = MessageBuilder.createMessage("", headers.getMessageHeaders());
|
||||
|
||||
handler.handleNoMatch(route, message);
|
||||
}
|
||||
|
||||
|
||||
private static class SimpleController {
|
||||
|
||||
@ConnectMapping("path")
|
||||
public void handle() {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@MessageMapping("base")
|
||||
private static class TypeLevelMappingController {
|
||||
|
||||
@ConnectMapping("path")
|
||||
public void handleWithPatterns() {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static class HandleAllController {
|
||||
|
||||
@ConnectMapping
|
||||
public void handleAll() {
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -105,6 +105,12 @@ public class PathPatternRouteMatcher implements RouteMatcher {
|
|||
public String value() {
|
||||
return this.pathContainer.value();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return value();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue