Direct delegation to RSocketMessageHandler

Simplify handling by eliminating the use of a message channel. Instead
MessageHandlerAcceptor now extends from RSocketMessageHandler and
delegates directly to it.

See gh-21987
This commit is contained in:
Rossen Stoyanchev 2019-02-26 19:10:02 -05:00
parent 555dca9aff
commit fa95b010cb
13 changed files with 223 additions and 467 deletions

View File

@ -1,38 +0,0 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging;
import reactor.core.publisher.Mono;
/**
* Contract for reactive, non-blocking sending of messages.
*
* @author Rossen Stoyanchev
* @since 5.2
*/
public interface ReactiveMessageChannel {
/**
* Send a {@link Message} to this channel. If the message is sent
* successfully, return {@code true}. Or if not sent due to a non-fatal
* reason, return {@code false}.
* @param message the message to send
* @return completion {@link Mono} returning {@code true} on success,
* {@code false} if not sent, or an error signal.
*/
Mono<Boolean> send(Message<?> message);
}

View File

@ -1,42 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging;
/**
* {@link MessageChannel} that maintains a registry of subscribers to handle
* messages sent through this channel.
*
* @author Rossen Stoyanchev
* @since 5.2
*/
public interface ReactiveSubscribableChannel extends ReactiveMessageChannel {
/**
* Register a message handler.
* @return {@code true} if the handler was subscribed or {@code false} if it
* was already subscribed.
*/
boolean subscribe(ReactiveMessageHandler handler);
/**
* Un-register a message handler.
* @return {@code true} if the handler was un-registered, or {@code false}
* if was not registered.
*/
boolean unsubscribe(ReactiveMessageHandler handler);
}

View File

@ -30,14 +30,12 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.EmbeddedValueResolverAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.codec.Decoder;
import org.springframework.core.convert.ConversionService;
import org.springframework.format.support.DefaultFormattingConversionService;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.ReactiveSubscribableChannel;
import org.springframework.messaging.handler.CompositeMessageCondition;
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
import org.springframework.messaging.handler.annotation.MessageMapping;
@ -74,9 +72,11 @@ import org.springframework.validation.Validator;
* @see AbstractEncoderMethodReturnValueHandler
*/
public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<CompositeMessageCondition>
implements SmartLifecycle, EmbeddedValueResolverAware {
implements EmbeddedValueResolverAware {
private final ReactiveSubscribableChannel inboundChannel;
@Nullable
private Predicate<Class<?>> handlerPredicate =
beanType -> AnnotatedElementUtils.hasAnnotation(beanType, Controller.class);
private final List<Decoder<?>> decoders = new ArrayList<>();
@ -90,20 +90,63 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C
@Nullable
private StringValueResolver valueResolver;
private volatile boolean running = false;
private final Object lifecycleMonitor = new Object();
public MessageMappingMessageHandler(ReactiveSubscribableChannel inboundChannel) {
Assert.notNull(inboundChannel, "`inboundChannel` is required");
this.inboundChannel = inboundChannel;
public MessageMappingMessageHandler() {
this.pathMatcher = new AntPathMatcher();
((AntPathMatcher) this.pathMatcher).setPathSeparator(".");
setHandlerPredicate(beanType -> AnnotatedElementUtils.hasAnnotation(beanType, Controller.class));
}
/**
* Manually configure handlers to check for {@code @MessageMapping} methods.
* <p><strong>Note:</strong> the given handlers are not required to be
* annotated with {@code @Controller}. Consider also using
* {@link #setAutoDetectDisabled()} if the intent is to use these handlers
* instead of, and not in addition to {@code @Controller} classes. Or
* alternatively use {@link #setHandlerPredicate(Predicate)} to select a
* different set of beans based on a different criteria.
* @param handlers the handlers to register
* @see #setAutoDetectDisabled()
* @see #setHandlerPredicate(Predicate)
*/
public void setHandlers(List<Object> handlers) {
for (Object handler : handlers) {
detectHandlerMethods(handler);
}
// Disable auto-detection..
this.handlerPredicate = null;
}
/**
* Configure the predicate to use for selecting which Spring beans to check
* for {@code @MessageMapping} methods. When set to {@code null},
* auto-detection is turned off which is what
* {@link #setAutoDetectDisabled()} does internally.
* <p>The predicate used by default selects {@code @Controller} classes.
* @see #setHandlers(List)
* @see #setAutoDetectDisabled()
*/
public void setHandlerPredicate(@Nullable Predicate<Class<?>> handlerPredicate) {
this.handlerPredicate = handlerPredicate;
}
/**
* Return the {@link #setHandlerPredicate configured} handler predicate.
*/
@Nullable
public Predicate<Class<?>> getHandlerPredicate() {
return this.handlerPredicate;
}
/**
* Disable auto-detection of {@code @MessageMapping} methods, e.g. in
* {@code @Controller}s, by setting {@link #setHandlerPredicate(Predicate)
* setHandlerPredicate(null)}.
*/
public void setAutoDetectDisabled() {
this.handlerPredicate = null;
}
/**
* Configure the decoders to use for incoming payloads.
*/
@ -203,34 +246,9 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C
return Collections.emptyList();
}
@Override
public final void start() {
synchronized (this.lifecycleMonitor) {
this.inboundChannel.subscribe(this);
this.running = true;
}
}
@Override
public final void stop() {
synchronized (this.lifecycleMonitor) {
this.running = false;
this.inboundChannel.unsubscribe(this);
}
}
@Override
public final void stop(Runnable callback) {
synchronized (this.lifecycleMonitor) {
stop();
callback.run();
}
}
@Override
public final boolean isRunning() {
return this.running;
protected Predicate<Class<?>> initHandlerPredicate() {
return this.handlerPredicate;
}

View File

@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
@ -47,6 +48,7 @@ import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ObjectUtils;
/**
* Abstract base class for reactive HandlerMethod-based message handling.
@ -61,7 +63,7 @@ import org.springframework.util.MultiValueMap;
* @param <T> the type of the Object that contains information mapping information
*/
public abstract class AbstractMethodMessageHandler<T>
implements ReactiveMessageHandler, ApplicationContextAware, InitializingBean {
implements ReactiveMessageHandler, ApplicationContextAware, InitializingBean, BeanNameAware {
/**
* Bean name prefix for target beans behind scoped proxies. Used to exclude those
@ -79,9 +81,6 @@ public abstract class AbstractMethodMessageHandler<T>
protected final Log logger = LogFactory.getLog(getClass());
@Nullable
private Predicate<Class<?>> handlerPredicate;
private ArgumentResolverConfigurer argumentResolverConfigurer = new ArgumentResolverConfigurer();
private ReturnValueHandlerConfigurer returnValueHandlerConfigurer = new ReturnValueHandlerConfigurer();
@ -91,29 +90,14 @@ public abstract class AbstractMethodMessageHandler<T>
@Nullable
private ApplicationContext applicationContext;
@Nullable
private String beanName;
private final Map<T, HandlerMethod> handlerMethods = new LinkedHashMap<>(64);
private final MultiValueMap<String, T> destinationLookup = new LinkedMultiValueMap<>(64);
/**
* Configure a predicate to decide if which beans in the Spring context
* should be checked to see if they have message handling methods.
* <p>By default this is not set and sub-classes should configure it in
* order to enable auto-detection of message handling methods.
*/
public void setHandlerPredicate(@Nullable Predicate<Class<?>> handlerPredicate) {
this.handlerPredicate = handlerPredicate;
}
/**
* Return the {@link #setHandlerPredicate configured} handler predicate.
*/
@Nullable
public Predicate<Class<?>> getHandlerPredicate() {
return this.handlerPredicate;
}
/**
* Configure custom resolvers for handler method arguments.
*/
@ -170,6 +154,16 @@ public abstract class AbstractMethodMessageHandler<T>
return this.applicationContext;
}
@Override
public void setBeanName(String name) {
this.beanName = name;
}
public String getBeanName() {
return this.beanName != null ? this.beanName :
getClass().getSimpleName() + "@" + ObjectUtils.getIdentityHexString(this);
}
/**
* Subclasses can invoke this method to populate the MessagingAdviceBean cache
* (e.g. to support "global" {@code @MessageExceptionHandler}).
@ -234,8 +228,9 @@ public abstract class AbstractMethodMessageHandler<T>
logger.warn("No ApplicationContext available for detecting beans with message handling methods.");
return;
}
if (this.handlerPredicate == null) {
logger.warn("'handlerPredicate' not configured: no auto-detection of message handling methods.");
Predicate<Class<?>> handlerPredicate = initHandlerPredicate();
if (handlerPredicate == null) {
logger.warn("[" + getBeanName() + "] No auto-detection of handler methods (e.g. in @Controller).");
return;
}
for (String beanName : this.applicationContext.getBeanNamesForType(Object.class)) {
@ -250,13 +245,21 @@ public abstract class AbstractMethodMessageHandler<T>
logger.debug("Could not resolve target class for bean with name '" + beanName + "'", ex);
}
}
if (beanType != null && this.handlerPredicate.test(beanType)) {
if (beanType != null && handlerPredicate.test(beanType)) {
detectHandlerMethods(beanName);
}
}
}
}
/**
* Return the predicate to use to check whether a given Spring bean should
* be introspected for message handling methods. If {@code null} is
* returned, auto-detection is effectively disabled.
*/
@Nullable
protected abstract Predicate<Class<?>> initHandlerPredicate();
/**
* Detect if the given handler has any methods that can handle messages and if
* so register it with the extracted mapping information.

View File

@ -0,0 +1,77 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.rsocket;
import java.util.function.Function;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import reactor.core.publisher.Mono;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.util.MimeType;
/**
* Extension of {@link RSocketMessageHandler} that can be plugged directly into
* RSocket to receive connections either on the
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory#acceptor(Function) client} or on the
* {@link io.rsocket.RSocketFactory.ServerRSocketFactory#acceptor(SocketAcceptor) server}
* side. Requests are handled by delegating to the "super" {@link #handleMessage(Message)}.
*
* @author Rossen Stoyanchev
* @since 5.2
*/
public final class MessageHandlerAcceptor extends RSocketMessageHandler
implements SocketAcceptor, Function<RSocket, RSocket> {
@Nullable
private MimeType defaultDataMimeType;
/**
* Configure the default content type to use for data payloads.
* <p>By default this is not set. However a server acceptor will use the
* content type from the {@link ConnectionSetupPayload}, so this is typically
* required for clients but can also be used on servers as a fallback.
* @param defaultDataMimeType the MimeType to use
*/
public void setDefaultDataMimeType(@Nullable MimeType defaultDataMimeType) {
this.defaultDataMimeType = defaultDataMimeType;
}
@Override
public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket sendingRSocket) {
MessagingRSocket rsocket = createRSocket(sendingRSocket);
// Allow handling of the ConnectionSetupPayload via @MessageMapping methods.
// However, if the handling is to make requests to the client, it's expected
// it will do so decoupled from the handling, e.g. via .subscribe().
return rsocket.handleConnectionSetupPayload(setupPayload).then(Mono.just(rsocket));
}
@Override
public RSocket apply(RSocket sendingRSocket) {
return createRSocket(sendingRSocket);
}
private MessagingRSocket createRSocket(RSocket rsocket) {
return new MessagingRSocket(
this::handleMessage, rsocket, this.defaultDataMimeType, getRSocketStrategies());
}
}

View File

@ -1,107 +0,0 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.rsocket;
import java.util.function.Function;
import java.util.function.Predicate;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import reactor.core.publisher.Mono;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.ReactiveMessageChannel;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
/**
* RSocket acceptor for
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory#acceptor(Function) client} or
* {@link io.rsocket.RSocketFactory.ServerRSocketFactory#acceptor(SocketAcceptor) server}
* side use. It wraps requests with a {@link Message} envelope and sends them
* to a {@link ReactiveMessageChannel} for handling, e.g. via
* {@code @MessageMapping} method.
*
* @author Rossen Stoyanchev
* @since 5.2
*/
public final class MessagingAcceptor implements SocketAcceptor, Function<RSocket, RSocket> {
private final ReactiveMessageChannel messageChannel;
private final RSocketStrategies rsocketStrategies;
@Nullable
private MimeType defaultDataMimeType;
/**
* Constructor with a message channel to send messages to.
* @param messageChannel the message channel to use
* <p>This assumes a Spring configuration setup with a
* {@code ReactiveMessageChannel} and an {@link RSocketMessageHandler} which
* by default auto-detects {@code @MessageMapping} methods in
* {@code @Controller} classes, but can also be configured with a
* {@link RSocketMessageHandler#setHandlerPredicate(Predicate) handlerPredicate}
* or with handler instances.
*/
public MessagingAcceptor(ReactiveMessageChannel messageChannel) {
this(messageChannel, RSocketStrategies.builder().build());
}
/**
* Variant of {@link #MessagingAcceptor(ReactiveMessageChannel)} with an
* {@link RSocketStrategies} for wrapping the sending {@link RSocket} as
* {@link RSocketRequester}.
*/
public MessagingAcceptor(ReactiveMessageChannel messageChannel, RSocketStrategies rsocketStrategies) {
Assert.notNull(messageChannel, "ReactiveMessageChannel is required");
Assert.notNull(rsocketStrategies, "RSocketStrategies is required");
this.messageChannel = messageChannel;
this.rsocketStrategies = rsocketStrategies;
}
/**
* Configure the default content type to use for data payloads.
* <p>By default this is not set. However a server acceptor will use the
* content type from the {@link ConnectionSetupPayload}.
* @param defaultDataMimeType the MimeType to use
*/
public void setDefaultDataMimeType(@Nullable MimeType defaultDataMimeType) {
this.defaultDataMimeType = defaultDataMimeType;
}
@Override
public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket sendingRSocket) {
MessagingRSocket rsocket = createRSocket(sendingRSocket);
rsocket.handleConnectionSetupPayload(setupPayload).subscribe();
return Mono.just(rsocket);
}
@Override
public RSocket apply(RSocket sendingRSocket) {
return createRSocket(sendingRSocket);
}
private MessagingRSocket createRSocket(RSocket rsocket) {
return new MessagingRSocket(this.messageChannel, rsocket, this.defaultDataMimeType, this.rsocketStrategies);
}
}

View File

@ -34,7 +34,6 @@ import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.ReactiveMessageChannel;
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler;
import org.springframework.messaging.support.MessageBuilder;
@ -45,16 +44,16 @@ import org.springframework.util.MimeTypeUtils;
import org.springframework.util.StringUtils;
/**
* Package private implementation of {@link RSocket} that is is hooked into an
* RSocket client or server via {@link MessagingAcceptor} to accept and handle
* requests.
* Implementation of {@link RSocket} that wraps incoming requests with a
* {@link Message}, delegates to a {@link Function} for handling, and then
* obtains the response from a "reply" header.
*
* @author Rossen Stoyanchev
* @since 5.2
*/
class MessagingRSocket extends AbstractRSocket {
private final ReactiveMessageChannel messageChannel;
private final Function<Message<?>, Mono<Void>> handler;
private final RSocketRequester requester;
@ -64,19 +63,24 @@ class MessagingRSocket extends AbstractRSocket {
private final RSocketStrategies strategies;
MessagingRSocket(ReactiveMessageChannel messageChannel,
RSocket sendingRSocket, @Nullable MimeType defaultDataMimeType, RSocketStrategies strategies) {
MessagingRSocket(Function<Message<?>, Mono<Void>> handler, RSocket sendingRSocket,
@Nullable MimeType defaultDataMimeType, RSocketStrategies strategies) {
Assert.notNull(messageChannel, "'messageChannel' is required");
Assert.notNull(handler, "'handler' is required");
Assert.notNull(sendingRSocket, "'sendingRSocket' is required");
this.messageChannel = messageChannel;
this.handler = handler;
this.requester = RSocketRequester.create(sendingRSocket, defaultDataMimeType, strategies);
this.dataMimeType = defaultDataMimeType;
this.strategies = strategies;
}
/**
* Wrap the {@link ConnectionSetupPayload} with a {@link Message} and
* delegate to {@link #handle(Payload)} for handling.
* @param payload the connection payload
* @return completion handle for success or error
*/
public Mono<Void> handleConnectionSetupPayload(ConnectionSetupPayload payload) {
if (StringUtils.hasText(payload.dataMimeType())) {
this.dataMimeType = MimeTypeUtils.parseMimeType(payload.dataMimeType());
@ -111,32 +115,26 @@ class MessagingRSocket extends AbstractRSocket {
@Override
public Mono<Void> metadataPush(Payload payload) {
// This won't be very useful until createHeaders starting doing something more with metadata..
// Not very useful until createHeaders does more with metadata
return handle(payload);
}
private Mono<Void> handle(Payload payload) {
Message<?> message = MessageBuilder.createMessage(
Mono.fromCallable(() -> wrapPayloadData(payload)),
createHeaders(payload, null));
return this.messageChannel.send(message).flatMap(result -> result ?
Mono.empty() : Mono.error(new MessageDeliveryException("RSocket request not handled")));
Mono.fromCallable(() -> wrapPayloadData(payload)), createHeaders(payload, null));
return this.handler.apply(message);
}
private Flux<Payload> handleAndReply(Payload firstPayload, Flux<Payload> payloads) {
MonoProcessor<Flux<Payload>> replyMono = MonoProcessor.create();
Message<?> message = MessageBuilder.createMessage(
payloads.map(this::wrapPayloadData).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release),
createHeaders(firstPayload, replyMono));
return this.messageChannel.send(message).flatMapMany(result ->
result && replyMono.isTerminated() ? replyMono.flatMapMany(Function.identity()) :
Mono.error(new MessageDeliveryException("RSocket request not handled")));
return this.handler.apply(message)
.thenMany(Flux.defer(() -> replyMono.isTerminated() ?
replyMono.flatMapMany(Function.identity()) :
Mono.error(new MessageDeliveryException("RSocket request not handled"))));
}
private MessageHeaders createHeaders(Payload payload, @Nullable MonoProcessor<?> replyMono) {

View File

@ -21,7 +21,6 @@ import java.util.List;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.ReactiveSubscribableChannel;
import org.springframework.messaging.handler.annotation.support.reactive.MessageMappingMessageHandler;
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler;
import org.springframework.util.Assert;
@ -44,20 +43,6 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
private RSocketStrategies rsocketStrategies;
public RSocketMessageHandler(ReactiveSubscribableChannel inboundChannel) {
super(inboundChannel);
}
public RSocketMessageHandler(ReactiveSubscribableChannel inboundChannel, List<Object> handlers) {
super(inboundChannel);
setHandlerPredicate(null); // disable auto-detection..
for (Object handler : handlers) {
detectHandlerMethods(handler);
}
}
/**
* Configure the encoders to use for encoding handler method return values.
*/
@ -76,8 +61,8 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
* Provide configuration in the form of {@link RSocketStrategies}. This is
* an alternative to using {@link #setEncoders(List)},
* {@link #setDecoders(List)}, and others directly. It is convenient when
* you also need to configure an {@link RSocketRequester} in which case
* the strategies can be configured once and used in multiple places.
* you also configuring an {@link RSocketRequester} in which case the
* {@link RSocketStrategies} encapsulates required configuration for re-use.
* @param rsocketStrategies the strategies to use
*/
public void setRSocketStrategies(RSocketStrategies rsocketStrategies) {
@ -91,19 +76,18 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
/**
* Return the {@code RSocketStrategies} instance provided via
* {@link #setRSocketStrategies rsocketStrategies}, or
* otherwise a new instance populated with the configured
* {@link #setEncoders(List) encoders}, {@link #setDecoders(List) decoders}
* and others.
* otherwise initialize it with the configured {@link #setEncoders(List)
* encoders}, {@link #setDecoders(List) decoders}, and others.
*/
public RSocketStrategies getRSocketStrategies() {
if (this.rsocketStrategies != null) {
return this.rsocketStrategies;
if (this.rsocketStrategies == null) {
this.rsocketStrategies = RSocketStrategies.builder()
.decoder(getDecoders().toArray(new Decoder<?>[0]))
.encoder(getEncoders().toArray(new Encoder<?>[0]))
.reactiveAdapterStrategy(getReactiveAdapterRegistry())
.build();
}
return RSocketStrategies.builder()
.decoder(getDecoders().toArray(new Decoder<?>[0]))
.encoder(getEncoders().toArray(new Encoder<?>[0]))
.reactiveAdapterStrategy(getReactiveAdapterRegistry())
.build();
return this.rsocketStrategies;
}

View File

@ -1,102 +0,0 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.support;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.messaging.Message;
import org.springframework.messaging.ReactiveMessageHandler;
import org.springframework.messaging.ReactiveSubscribableChannel;
import org.springframework.util.ObjectUtils;
/**
* Default implementation of {@link ReactiveSubscribableChannel}.
*
* @author Rossen Stoyanchev
* @since 5.2
*/
public class DefaultReactiveMessageChannel implements ReactiveSubscribableChannel, BeanNameAware {
private static final Mono<Boolean> SUCCESS_RESULT = Mono.just(true);
private static Log logger = LogFactory.getLog(DefaultReactiveMessageChannel.class);
private final Set<ReactiveMessageHandler> handlers = new CopyOnWriteArraySet<>();
private String beanName;
public DefaultReactiveMessageChannel() {
this.beanName = getClass().getSimpleName() + "@" + ObjectUtils.getIdentityHexString(this);
}
/**
* A message channel uses the bean name primarily for logging purposes.
*/
@Override
public void setBeanName(String name) {
this.beanName = name;
}
/**
* Return the bean name for this message channel.
*/
public String getBeanName() {
return this.beanName;
}
@Override
public boolean subscribe(ReactiveMessageHandler handler) {
boolean result = this.handlers.add(handler);
if (result) {
if (logger.isDebugEnabled()) {
logger.debug(getBeanName() + " added " + handler);
}
}
return result;
}
@Override
public boolean unsubscribe(ReactiveMessageHandler handler) {
boolean result = this.handlers.remove(handler);
if (result) {
if (logger.isDebugEnabled()) {
logger.debug(getBeanName() + " removed " + handler);
}
}
return result;
}
@Override
public Mono<Boolean> send(Message<?> message) {
return Flux.fromIterable(this.handlers)
.concatMap(handler -> handler.handleMessage(message))
.then(SUCCESS_RESULT);
}
}

View File

@ -38,7 +38,6 @@ import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.ReactiveSubscribableChannel;
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.messaging.handler.annotation.MessageMapping;
@ -48,7 +47,6 @@ import org.springframework.stereotype.Controller;
import static java.nio.charset.StandardCharsets.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
* Unit tests for {@link MessageMappingMessageHandler}.
@ -134,9 +132,7 @@ public class MessageMappingMessageHandlerTests {
context.registerSingleton("testController", TestController.class);
context.refresh();
ReactiveSubscribableChannel channel = mock(ReactiveSubscribableChannel.class);
MessageMappingMessageHandler messageHandler = new MessageMappingMessageHandler(channel);
MessageMappingMessageHandler messageHandler = new MessageMappingMessageHandler();
messageHandler.getReturnValueHandlerConfigurer().addCustomHandler(this.returnValueHandler);
messageHandler.setApplicationContext(context);
messageHandler.setEmbeddedValueResolver(new EmbeddedValueResolver(context.getBeanFactory()));

View File

@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.hamcrest.Matchers;
import org.junit.Test;
@ -193,11 +194,6 @@ public class MethodMessageHandlerTests {
private PathMatcher pathMatcher = new AntPathMatcher();
public TestMethodMessageHandler() {
setHandlerPredicate(handlerType -> handlerType.getName().endsWith("Controller"));
}
@Override
protected List<? extends HandlerMethodArgumentResolver> initArgumentResolvers() {
return Collections.emptyList();
@ -208,6 +204,11 @@ public class MethodMessageHandlerTests {
return Collections.singletonList(this.returnValueHandler);
}
@Override
protected Predicate<Class<?>> initHandlerPredicate() {
return handlerType -> handlerType.getName().endsWith("Controller");
}
@Nullable
public Object getLastReturnValue() {
return this.returnValueHandler.getLastReturnValue();

View File

@ -35,11 +35,8 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.messaging.ReactiveMessageChannel;
import org.springframework.messaging.ReactiveSubscribableChannel;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.support.DefaultReactiveMessageChannel;
import org.springframework.stereotype.Controller;
import org.springframework.util.MimeTypeUtils;
@ -69,12 +66,9 @@ public class RSocketClientToServerIntegrationTests {
context = new AnnotationConfigApplicationContext(ServerConfig.class);
ReactiveMessageChannel messageChannel = context.getBean(ReactiveMessageChannel.class);
RSocketStrategies rsocketStrategies = context.getBean(RSocketStrategies.class);
server = RSocketFactory.receive()
.addServerPlugin(interceptor)
.acceptor(new MessagingAcceptor(messageChannel))
.acceptor(context.getBean(MessageHandlerAcceptor.class))
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.block();
@ -86,7 +80,7 @@ public class RSocketClientToServerIntegrationTests {
.block();
requester = RSocketRequester.create(
client, MimeTypeUtils.TEXT_PLAIN, rsocketStrategies);
client, MimeTypeUtils.TEXT_PLAIN, context.getBean(RSocketStrategies.class));
}
@AfterClass
@ -254,15 +248,10 @@ public class RSocketClientToServerIntegrationTests {
}
@Bean
public ReactiveSubscribableChannel rsocketChannel() {
return new DefaultReactiveMessageChannel();
}
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler(rsocketChannel());
handler.setRSocketStrategies(rsocketStrategies());
return handler;
public MessageHandlerAcceptor messageHandlerAcceptor() {
MessageHandlerAcceptor acceptor = new MessageHandlerAcceptor();
acceptor.setRSocketStrategies(rsocketStrategies());
return acceptor;
}
@Bean

View File

@ -17,7 +17,6 @@ package org.springframework.messaging.rsocket;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import io.rsocket.Closeable;
import io.rsocket.RSocket;
@ -40,10 +39,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.messaging.ReactiveMessageChannel;
import org.springframework.messaging.ReactiveSubscribableChannel;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.support.DefaultReactiveMessageChannel;
import org.springframework.stereotype.Controller;
/**
@ -57,23 +53,15 @@ public class RSocketServerToClientIntegrationTests {
private static Closeable server;
private static MessagingAcceptor clientAcceptor;
@BeforeClass
@SuppressWarnings("ConstantConditions")
public static void setupOnce() {
context = new AnnotationConfigApplicationContext(ServerConfig.class);
ReactiveMessageChannel messageChannel = context.getBean("serverChannel", ReactiveMessageChannel.class);
RSocketStrategies rsocketStrategies = context.getBean(RSocketStrategies.class);
clientAcceptor = new MessagingAcceptor(
context.getBean("clientChannel", ReactiveMessageChannel.class));
context = new AnnotationConfigApplicationContext(RSocketConfig.class);
server = RSocketFactory.receive()
.acceptor(new MessagingAcceptor(messageChannel, rsocketStrategies))
.acceptor(context.getBean("serverAcceptor", MessageHandlerAcceptor.class))
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.block();
@ -116,7 +104,7 @@ public class RSocketServerToClientIntegrationTests {
rsocket = RSocketFactory.connect()
.setupPayload(DefaultPayload.create("", destination))
.dataMimeType("text/plain")
.acceptor(clientAcceptor)
.acceptor(context.getBean("clientAcceptor", MessageHandlerAcceptor.class))
.transport(TcpClientTransport.create("localhost", 7000))
.start()
.block();
@ -212,13 +200,13 @@ public class RSocketServerToClientIntegrationTests {
Mono.fromRunnable(testEcho)
.doOnError(ex -> result.onError(ex))
.doOnSuccess(o -> result.onComplete())
.subscribeOn(Schedulers.elastic())
.subscribeOn(Schedulers.elastic()) // StepVerifier will block
.subscribe();
}
}
private static class ClientController {
private static class ClientHandler {
final ReplayProcessor<String> fireForgetPayloads = ReplayProcessor.create();
@ -251,11 +239,11 @@ public class RSocketServerToClientIntegrationTests {
@Configuration
static class ServerConfig {
static class RSocketConfig {
@Bean
public ClientController clientController() {
return new ClientController();
public ClientHandler clientHandler() {
return new ClientHandler();
}
@Bean
@ -264,26 +252,17 @@ public class RSocketServerToClientIntegrationTests {
}
@Bean
public ReactiveSubscribableChannel clientChannel() {
return new DefaultReactiveMessageChannel();
public MessageHandlerAcceptor clientAcceptor() {
MessageHandlerAcceptor acceptor = new MessageHandlerAcceptor();
acceptor.setHandlers(Collections.singletonList(clientHandler()));
acceptor.setAutoDetectDisabled();
acceptor.setRSocketStrategies(rsocketStrategies());
return acceptor;
}
@Bean
public ReactiveSubscribableChannel serverChannel() {
return new DefaultReactiveMessageChannel();
}
@Bean
public RSocketMessageHandler clientMessageHandler() {
List<Object> handlers = Collections.singletonList(clientController());
RSocketMessageHandler handler = new RSocketMessageHandler(clientChannel(), handlers);
handler.setRSocketStrategies(rsocketStrategies());
return handler;
}
@Bean
public RSocketMessageHandler serverMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler(serverChannel());
public MessageHandlerAcceptor serverAcceptor() {
MessageHandlerAcceptor handler = new MessageHandlerAcceptor();
handler.setRSocketStrategies(rsocketStrategies());
return handler;
}