From fa95b010cbcdb9b53672f0e226cafe2e30e1615e Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 26 Feb 2019 19:10:02 -0500 Subject: [PATCH] Direct delegation to RSocketMessageHandler Simplify handling by eliminating the use of a message channel. Instead MessageHandlerAcceptor now extends from RSocketMessageHandler and delegates directly to it. See gh-21987 --- .../messaging/ReactiveMessageChannel.java | 38 ------- .../ReactiveSubscribableChannel.java | 42 ------- .../MessageMappingMessageHandler.java | 96 +++++++++------- .../AbstractMethodMessageHandler.java | 53 +++++---- .../rsocket/MessageHandlerAcceptor.java | 77 +++++++++++++ .../messaging/rsocket/MessagingAcceptor.java | 107 ------------------ .../messaging/rsocket/MessagingRSocket.java | 44 ++++--- .../rsocket/RSocketMessageHandler.java | 38 ++----- .../DefaultReactiveMessageChannel.java | 102 ----------------- .../MessageMappingMessageHandlerTests.java | 6 +- .../reactive/MethodMessageHandlerTests.java | 11 +- ...RSocketClientToServerIntegrationTests.java | 23 +--- ...RSocketServerToClientIntegrationTests.java | 53 +++------ 13 files changed, 223 insertions(+), 467 deletions(-) delete mode 100644 spring-messaging/src/main/java/org/springframework/messaging/ReactiveMessageChannel.java delete mode 100644 spring-messaging/src/main/java/org/springframework/messaging/ReactiveSubscribableChannel.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessageHandlerAcceptor.java delete mode 100644 spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingAcceptor.java delete mode 100644 spring-messaging/src/main/java/org/springframework/messaging/support/DefaultReactiveMessageChannel.java diff --git a/spring-messaging/src/main/java/org/springframework/messaging/ReactiveMessageChannel.java b/spring-messaging/src/main/java/org/springframework/messaging/ReactiveMessageChannel.java deleted file mode 100644 index 08e6537e85c..00000000000 --- a/spring-messaging/src/main/java/org/springframework/messaging/ReactiveMessageChannel.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2002-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.messaging; - -import reactor.core.publisher.Mono; - -/** - * Contract for reactive, non-blocking sending of messages. - * - * @author Rossen Stoyanchev - * @since 5.2 - */ -public interface ReactiveMessageChannel { - - /** - * Send a {@link Message} to this channel. If the message is sent - * successfully, return {@code true}. Or if not sent due to a non-fatal - * reason, return {@code false}. - * @param message the message to send - * @return completion {@link Mono} returning {@code true} on success, - * {@code false} if not sent, or an error signal. - */ - Mono send(Message message); - -} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/ReactiveSubscribableChannel.java b/spring-messaging/src/main/java/org/springframework/messaging/ReactiveSubscribableChannel.java deleted file mode 100644 index c3b792019ed..00000000000 --- a/spring-messaging/src/main/java/org/springframework/messaging/ReactiveSubscribableChannel.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2002-2013 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.messaging; - -/** - * {@link MessageChannel} that maintains a registry of subscribers to handle - * messages sent through this channel. - * - * @author Rossen Stoyanchev - * @since 5.2 - */ -public interface ReactiveSubscribableChannel extends ReactiveMessageChannel { - - /** - * Register a message handler. - * @return {@code true} if the handler was subscribed or {@code false} if it - * was already subscribed. - */ - boolean subscribe(ReactiveMessageHandler handler); - - /** - * Un-register a message handler. - * @return {@code true} if the handler was un-registered, or {@code false} - * if was not registered. - */ - boolean unsubscribe(ReactiveMessageHandler handler); - -} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandler.java index 68518837980..7bdad79d5ad 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandler.java @@ -30,14 +30,12 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.EmbeddedValueResolverAware; -import org.springframework.context.SmartLifecycle; import org.springframework.core.annotation.AnnotatedElementUtils; import org.springframework.core.codec.Decoder; import org.springframework.core.convert.ConversionService; import org.springframework.format.support.DefaultFormattingConversionService; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; -import org.springframework.messaging.ReactiveSubscribableChannel; import org.springframework.messaging.handler.CompositeMessageCondition; import org.springframework.messaging.handler.DestinationPatternsMessageCondition; import org.springframework.messaging.handler.annotation.MessageMapping; @@ -74,9 +72,11 @@ import org.springframework.validation.Validator; * @see AbstractEncoderMethodReturnValueHandler */ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler - implements SmartLifecycle, EmbeddedValueResolverAware { + implements EmbeddedValueResolverAware { - private final ReactiveSubscribableChannel inboundChannel; + @Nullable + private Predicate> handlerPredicate = + beanType -> AnnotatedElementUtils.hasAnnotation(beanType, Controller.class); private final List> decoders = new ArrayList<>(); @@ -90,20 +90,63 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler AnnotatedElementUtils.hasAnnotation(beanType, Controller.class)); } + /** + * Manually configure handlers to check for {@code @MessageMapping} methods. + *

Note: the given handlers are not required to be + * annotated with {@code @Controller}. Consider also using + * {@link #setAutoDetectDisabled()} if the intent is to use these handlers + * instead of, and not in addition to {@code @Controller} classes. Or + * alternatively use {@link #setHandlerPredicate(Predicate)} to select a + * different set of beans based on a different criteria. + * @param handlers the handlers to register + * @see #setAutoDetectDisabled() + * @see #setHandlerPredicate(Predicate) + */ + public void setHandlers(List handlers) { + for (Object handler : handlers) { + detectHandlerMethods(handler); + } + // Disable auto-detection.. + this.handlerPredicate = null; + } + + /** + * Configure the predicate to use for selecting which Spring beans to check + * for {@code @MessageMapping} methods. When set to {@code null}, + * auto-detection is turned off which is what + * {@link #setAutoDetectDisabled()} does internally. + *

The predicate used by default selects {@code @Controller} classes. + * @see #setHandlers(List) + * @see #setAutoDetectDisabled() + */ + public void setHandlerPredicate(@Nullable Predicate> handlerPredicate) { + this.handlerPredicate = handlerPredicate; + } + + /** + * Return the {@link #setHandlerPredicate configured} handler predicate. + */ + @Nullable + public Predicate> getHandlerPredicate() { + return this.handlerPredicate; + } + + /** + * Disable auto-detection of {@code @MessageMapping} methods, e.g. in + * {@code @Controller}s, by setting {@link #setHandlerPredicate(Predicate) + * setHandlerPredicate(null)}. + */ + public void setAutoDetectDisabled() { + this.handlerPredicate = null; + } + /** * Configure the decoders to use for incoming payloads. */ @@ -203,34 +246,9 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler> initHandlerPredicate() { + return this.handlerPredicate; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/AbstractMethodMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/AbstractMethodMessageHandler.java index 0241100714f..971648562ed 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/AbstractMethodMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/AbstractMethodMessageHandler.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Mono; +import org.springframework.beans.factory.BeanNameAware; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; @@ -47,6 +48,7 @@ import org.springframework.util.ClassUtils; import org.springframework.util.CollectionUtils; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; +import org.springframework.util.ObjectUtils; /** * Abstract base class for reactive HandlerMethod-based message handling. @@ -61,7 +63,7 @@ import org.springframework.util.MultiValueMap; * @param the type of the Object that contains information mapping information */ public abstract class AbstractMethodMessageHandler - implements ReactiveMessageHandler, ApplicationContextAware, InitializingBean { + implements ReactiveMessageHandler, ApplicationContextAware, InitializingBean, BeanNameAware { /** * Bean name prefix for target beans behind scoped proxies. Used to exclude those @@ -79,9 +81,6 @@ public abstract class AbstractMethodMessageHandler protected final Log logger = LogFactory.getLog(getClass()); - @Nullable - private Predicate> handlerPredicate; - private ArgumentResolverConfigurer argumentResolverConfigurer = new ArgumentResolverConfigurer(); private ReturnValueHandlerConfigurer returnValueHandlerConfigurer = new ReturnValueHandlerConfigurer(); @@ -91,29 +90,14 @@ public abstract class AbstractMethodMessageHandler @Nullable private ApplicationContext applicationContext; + @Nullable + private String beanName; + private final Map handlerMethods = new LinkedHashMap<>(64); private final MultiValueMap destinationLookup = new LinkedMultiValueMap<>(64); - /** - * Configure a predicate to decide if which beans in the Spring context - * should be checked to see if they have message handling methods. - *

By default this is not set and sub-classes should configure it in - * order to enable auto-detection of message handling methods. - */ - public void setHandlerPredicate(@Nullable Predicate> handlerPredicate) { - this.handlerPredicate = handlerPredicate; - } - - /** - * Return the {@link #setHandlerPredicate configured} handler predicate. - */ - @Nullable - public Predicate> getHandlerPredicate() { - return this.handlerPredicate; - } - /** * Configure custom resolvers for handler method arguments. */ @@ -170,6 +154,16 @@ public abstract class AbstractMethodMessageHandler return this.applicationContext; } + @Override + public void setBeanName(String name) { + this.beanName = name; + } + + public String getBeanName() { + return this.beanName != null ? this.beanName : + getClass().getSimpleName() + "@" + ObjectUtils.getIdentityHexString(this); + } + /** * Subclasses can invoke this method to populate the MessagingAdviceBean cache * (e.g. to support "global" {@code @MessageExceptionHandler}). @@ -234,8 +228,9 @@ public abstract class AbstractMethodMessageHandler logger.warn("No ApplicationContext available for detecting beans with message handling methods."); return; } - if (this.handlerPredicate == null) { - logger.warn("'handlerPredicate' not configured: no auto-detection of message handling methods."); + Predicate> handlerPredicate = initHandlerPredicate(); + if (handlerPredicate == null) { + logger.warn("[" + getBeanName() + "] No auto-detection of handler methods (e.g. in @Controller)."); return; } for (String beanName : this.applicationContext.getBeanNamesForType(Object.class)) { @@ -250,13 +245,21 @@ public abstract class AbstractMethodMessageHandler logger.debug("Could not resolve target class for bean with name '" + beanName + "'", ex); } } - if (beanType != null && this.handlerPredicate.test(beanType)) { + if (beanType != null && handlerPredicate.test(beanType)) { detectHandlerMethods(beanName); } } } } + /** + * Return the predicate to use to check whether a given Spring bean should + * be introspected for message handling methods. If {@code null} is + * returned, auto-detection is effectively disabled. + */ + @Nullable + protected abstract Predicate> initHandlerPredicate(); + /** * Detect if the given handler has any methods that can handle messages and if * so register it with the extracted mapping information. diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessageHandlerAcceptor.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessageHandlerAcceptor.java new file mode 100644 index 00000000000..7ff4ec20bae --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessageHandlerAcceptor.java @@ -0,0 +1,77 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.messaging.rsocket; + +import java.util.function.Function; + +import io.rsocket.ConnectionSetupPayload; +import io.rsocket.RSocket; +import io.rsocket.SocketAcceptor; +import reactor.core.publisher.Mono; + +import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; +import org.springframework.util.MimeType; + +/** + * Extension of {@link RSocketMessageHandler} that can be plugged directly into + * RSocket to receive connections either on the + * {@link io.rsocket.RSocketFactory.ClientRSocketFactory#acceptor(Function) client} or on the + * {@link io.rsocket.RSocketFactory.ServerRSocketFactory#acceptor(SocketAcceptor) server} + * side. Requests are handled by delegating to the "super" {@link #handleMessage(Message)}. + * + * @author Rossen Stoyanchev + * @since 5.2 + */ +public final class MessageHandlerAcceptor extends RSocketMessageHandler + implements SocketAcceptor, Function { + + @Nullable + private MimeType defaultDataMimeType; + + + /** + * Configure the default content type to use for data payloads. + *

By default this is not set. However a server acceptor will use the + * content type from the {@link ConnectionSetupPayload}, so this is typically + * required for clients but can also be used on servers as a fallback. + * @param defaultDataMimeType the MimeType to use + */ + public void setDefaultDataMimeType(@Nullable MimeType defaultDataMimeType) { + this.defaultDataMimeType = defaultDataMimeType; + } + + + @Override + public Mono accept(ConnectionSetupPayload setupPayload, RSocket sendingRSocket) { + MessagingRSocket rsocket = createRSocket(sendingRSocket); + // Allow handling of the ConnectionSetupPayload via @MessageMapping methods. + // However, if the handling is to make requests to the client, it's expected + // it will do so decoupled from the handling, e.g. via .subscribe(). + return rsocket.handleConnectionSetupPayload(setupPayload).then(Mono.just(rsocket)); + } + + @Override + public RSocket apply(RSocket sendingRSocket) { + return createRSocket(sendingRSocket); + } + + private MessagingRSocket createRSocket(RSocket rsocket) { + return new MessagingRSocket( + this::handleMessage, rsocket, this.defaultDataMimeType, getRSocketStrategies()); + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingAcceptor.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingAcceptor.java deleted file mode 100644 index e4b7e44ad76..00000000000 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingAcceptor.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright 2002-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.messaging.rsocket; - -import java.util.function.Function; -import java.util.function.Predicate; - -import io.rsocket.ConnectionSetupPayload; -import io.rsocket.RSocket; -import io.rsocket.SocketAcceptor; -import reactor.core.publisher.Mono; - -import org.springframework.lang.Nullable; -import org.springframework.messaging.Message; -import org.springframework.messaging.ReactiveMessageChannel; -import org.springframework.util.Assert; -import org.springframework.util.MimeType; - -/** - * RSocket acceptor for - * {@link io.rsocket.RSocketFactory.ClientRSocketFactory#acceptor(Function) client} or - * {@link io.rsocket.RSocketFactory.ServerRSocketFactory#acceptor(SocketAcceptor) server} - * side use. It wraps requests with a {@link Message} envelope and sends them - * to a {@link ReactiveMessageChannel} for handling, e.g. via - * {@code @MessageMapping} method. - * - * @author Rossen Stoyanchev - * @since 5.2 - */ -public final class MessagingAcceptor implements SocketAcceptor, Function { - - private final ReactiveMessageChannel messageChannel; - - private final RSocketStrategies rsocketStrategies; - - @Nullable - private MimeType defaultDataMimeType; - - - /** - * Constructor with a message channel to send messages to. - * @param messageChannel the message channel to use - *

This assumes a Spring configuration setup with a - * {@code ReactiveMessageChannel} and an {@link RSocketMessageHandler} which - * by default auto-detects {@code @MessageMapping} methods in - * {@code @Controller} classes, but can also be configured with a - * {@link RSocketMessageHandler#setHandlerPredicate(Predicate) handlerPredicate} - * or with handler instances. - */ - public MessagingAcceptor(ReactiveMessageChannel messageChannel) { - this(messageChannel, RSocketStrategies.builder().build()); - } - - /** - * Variant of {@link #MessagingAcceptor(ReactiveMessageChannel)} with an - * {@link RSocketStrategies} for wrapping the sending {@link RSocket} as - * {@link RSocketRequester}. - */ - public MessagingAcceptor(ReactiveMessageChannel messageChannel, RSocketStrategies rsocketStrategies) { - Assert.notNull(messageChannel, "ReactiveMessageChannel is required"); - Assert.notNull(rsocketStrategies, "RSocketStrategies is required"); - this.messageChannel = messageChannel; - this.rsocketStrategies = rsocketStrategies; - } - - - /** - * Configure the default content type to use for data payloads. - *

By default this is not set. However a server acceptor will use the - * content type from the {@link ConnectionSetupPayload}. - * @param defaultDataMimeType the MimeType to use - */ - public void setDefaultDataMimeType(@Nullable MimeType defaultDataMimeType) { - this.defaultDataMimeType = defaultDataMimeType; - } - - - @Override - public Mono accept(ConnectionSetupPayload setupPayload, RSocket sendingRSocket) { - MessagingRSocket rsocket = createRSocket(sendingRSocket); - rsocket.handleConnectionSetupPayload(setupPayload).subscribe(); - return Mono.just(rsocket); - } - - @Override - public RSocket apply(RSocket sendingRSocket) { - return createRSocket(sendingRSocket); - } - - private MessagingRSocket createRSocket(RSocket rsocket) { - return new MessagingRSocket(this.messageChannel, rsocket, this.defaultDataMimeType, this.rsocketStrategies); - } - -} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java index 824ff3901d0..63953937393 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java @@ -34,7 +34,6 @@ import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageDeliveryException; import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.ReactiveMessageChannel; import org.springframework.messaging.handler.DestinationPatternsMessageCondition; import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler; import org.springframework.messaging.support.MessageBuilder; @@ -45,16 +44,16 @@ import org.springframework.util.MimeTypeUtils; import org.springframework.util.StringUtils; /** - * Package private implementation of {@link RSocket} that is is hooked into an - * RSocket client or server via {@link MessagingAcceptor} to accept and handle - * requests. + * Implementation of {@link RSocket} that wraps incoming requests with a + * {@link Message}, delegates to a {@link Function} for handling, and then + * obtains the response from a "reply" header. * * @author Rossen Stoyanchev * @since 5.2 */ class MessagingRSocket extends AbstractRSocket { - private final ReactiveMessageChannel messageChannel; + private final Function, Mono> handler; private final RSocketRequester requester; @@ -64,19 +63,24 @@ class MessagingRSocket extends AbstractRSocket { private final RSocketStrategies strategies; - MessagingRSocket(ReactiveMessageChannel messageChannel, - RSocket sendingRSocket, @Nullable MimeType defaultDataMimeType, RSocketStrategies strategies) { + MessagingRSocket(Function, Mono> handler, RSocket sendingRSocket, + @Nullable MimeType defaultDataMimeType, RSocketStrategies strategies) { - Assert.notNull(messageChannel, "'messageChannel' is required"); + Assert.notNull(handler, "'handler' is required"); Assert.notNull(sendingRSocket, "'sendingRSocket' is required"); - this.messageChannel = messageChannel; + this.handler = handler; this.requester = RSocketRequester.create(sendingRSocket, defaultDataMimeType, strategies); this.dataMimeType = defaultDataMimeType; this.strategies = strategies; } - + /** + * Wrap the {@link ConnectionSetupPayload} with a {@link Message} and + * delegate to {@link #handle(Payload)} for handling. + * @param payload the connection payload + * @return completion handle for success or error + */ public Mono handleConnectionSetupPayload(ConnectionSetupPayload payload) { if (StringUtils.hasText(payload.dataMimeType())) { this.dataMimeType = MimeTypeUtils.parseMimeType(payload.dataMimeType()); @@ -111,32 +115,26 @@ class MessagingRSocket extends AbstractRSocket { @Override public Mono metadataPush(Payload payload) { - // This won't be very useful until createHeaders starting doing something more with metadata.. + // Not very useful until createHeaders does more with metadata return handle(payload); } private Mono handle(Payload payload) { - Message message = MessageBuilder.createMessage( - Mono.fromCallable(() -> wrapPayloadData(payload)), - createHeaders(payload, null)); - - return this.messageChannel.send(message).flatMap(result -> result ? - Mono.empty() : Mono.error(new MessageDeliveryException("RSocket request not handled"))); + Mono.fromCallable(() -> wrapPayloadData(payload)), createHeaders(payload, null)); + return this.handler.apply(message); } private Flux handleAndReply(Payload firstPayload, Flux payloads) { - MonoProcessor> replyMono = MonoProcessor.create(); - Message message = MessageBuilder.createMessage( payloads.map(this::wrapPayloadData).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release), createHeaders(firstPayload, replyMono)); - - return this.messageChannel.send(message).flatMapMany(result -> - result && replyMono.isTerminated() ? replyMono.flatMapMany(Function.identity()) : - Mono.error(new MessageDeliveryException("RSocket request not handled"))); + return this.handler.apply(message) + .thenMany(Flux.defer(() -> replyMono.isTerminated() ? + replyMono.flatMapMany(Function.identity()) : + Mono.error(new MessageDeliveryException("RSocket request not handled")))); } private MessageHeaders createHeaders(Payload payload, @Nullable MonoProcessor replyMono) { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketMessageHandler.java index 93d5fb43a63..cac3ee2f4a8 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketMessageHandler.java @@ -21,7 +21,6 @@ import java.util.List; import org.springframework.core.codec.Decoder; import org.springframework.core.codec.Encoder; import org.springframework.lang.Nullable; -import org.springframework.messaging.ReactiveSubscribableChannel; import org.springframework.messaging.handler.annotation.support.reactive.MessageMappingMessageHandler; import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler; import org.springframework.util.Assert; @@ -44,20 +43,6 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { private RSocketStrategies rsocketStrategies; - public RSocketMessageHandler(ReactiveSubscribableChannel inboundChannel) { - super(inboundChannel); - } - - public RSocketMessageHandler(ReactiveSubscribableChannel inboundChannel, List handlers) { - super(inboundChannel); - setHandlerPredicate(null); // disable auto-detection.. - for (Object handler : handlers) { - detectHandlerMethods(handler); - } - } - - - /** * Configure the encoders to use for encoding handler method return values. */ @@ -76,8 +61,8 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { * Provide configuration in the form of {@link RSocketStrategies}. This is * an alternative to using {@link #setEncoders(List)}, * {@link #setDecoders(List)}, and others directly. It is convenient when - * you also need to configure an {@link RSocketRequester} in which case - * the strategies can be configured once and used in multiple places. + * you also configuring an {@link RSocketRequester} in which case the + * {@link RSocketStrategies} encapsulates required configuration for re-use. * @param rsocketStrategies the strategies to use */ public void setRSocketStrategies(RSocketStrategies rsocketStrategies) { @@ -91,19 +76,18 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { /** * Return the {@code RSocketStrategies} instance provided via * {@link #setRSocketStrategies rsocketStrategies}, or - * otherwise a new instance populated with the configured - * {@link #setEncoders(List) encoders}, {@link #setDecoders(List) decoders} - * and others. + * otherwise initialize it with the configured {@link #setEncoders(List) + * encoders}, {@link #setDecoders(List) decoders}, and others. */ public RSocketStrategies getRSocketStrategies() { - if (this.rsocketStrategies != null) { - return this.rsocketStrategies; + if (this.rsocketStrategies == null) { + this.rsocketStrategies = RSocketStrategies.builder() + .decoder(getDecoders().toArray(new Decoder[0])) + .encoder(getEncoders().toArray(new Encoder[0])) + .reactiveAdapterStrategy(getReactiveAdapterRegistry()) + .build(); } - return RSocketStrategies.builder() - .decoder(getDecoders().toArray(new Decoder[0])) - .encoder(getEncoders().toArray(new Encoder[0])) - .reactiveAdapterStrategy(getReactiveAdapterRegistry()) - .build(); + return this.rsocketStrategies; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/DefaultReactiveMessageChannel.java b/spring-messaging/src/main/java/org/springframework/messaging/support/DefaultReactiveMessageChannel.java deleted file mode 100644 index a5283cdfe7e..00000000000 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/DefaultReactiveMessageChannel.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright 2002-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.messaging.support; - -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import org.springframework.beans.factory.BeanNameAware; -import org.springframework.messaging.Message; -import org.springframework.messaging.ReactiveMessageHandler; -import org.springframework.messaging.ReactiveSubscribableChannel; -import org.springframework.util.ObjectUtils; - -/** - * Default implementation of {@link ReactiveSubscribableChannel}. - * - * @author Rossen Stoyanchev - * @since 5.2 - */ -public class DefaultReactiveMessageChannel implements ReactiveSubscribableChannel, BeanNameAware { - - private static final Mono SUCCESS_RESULT = Mono.just(true); - - private static Log logger = LogFactory.getLog(DefaultReactiveMessageChannel.class); - - - private final Set handlers = new CopyOnWriteArraySet<>(); - - private String beanName; - - - public DefaultReactiveMessageChannel() { - this.beanName = getClass().getSimpleName() + "@" + ObjectUtils.getIdentityHexString(this); - } - - - /** - * A message channel uses the bean name primarily for logging purposes. - */ - @Override - public void setBeanName(String name) { - this.beanName = name; - } - - /** - * Return the bean name for this message channel. - */ - public String getBeanName() { - return this.beanName; - } - - - @Override - public boolean subscribe(ReactiveMessageHandler handler) { - boolean result = this.handlers.add(handler); - if (result) { - if (logger.isDebugEnabled()) { - logger.debug(getBeanName() + " added " + handler); - } - } - return result; - } - - - @Override - public boolean unsubscribe(ReactiveMessageHandler handler) { - boolean result = this.handlers.remove(handler); - if (result) { - if (logger.isDebugEnabled()) { - logger.debug(getBeanName() + " removed " + handler); - } - } - return result; - } - - - @Override - public Mono send(Message message) { - return Flux.fromIterable(this.handlers) - .concatMap(handler -> handler.handleMessage(message)) - .then(SUCCESS_RESULT); - } - -} diff --git a/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandlerTests.java index a34ddc8b0c2..af1a643de51 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandlerTests.java @@ -38,7 +38,6 @@ import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.messaging.Message; -import org.springframework.messaging.ReactiveSubscribableChannel; import org.springframework.messaging.handler.DestinationPatternsMessageCondition; import org.springframework.messaging.handler.annotation.MessageExceptionHandler; import org.springframework.messaging.handler.annotation.MessageMapping; @@ -48,7 +47,6 @@ import org.springframework.stereotype.Controller; import static java.nio.charset.StandardCharsets.*; import static org.junit.Assert.*; -import static org.mockito.Mockito.*; /** * Unit tests for {@link MessageMappingMessageHandler}. @@ -134,9 +132,7 @@ public class MessageMappingMessageHandlerTests { context.registerSingleton("testController", TestController.class); context.refresh(); - ReactiveSubscribableChannel channel = mock(ReactiveSubscribableChannel.class); - - MessageMappingMessageHandler messageHandler = new MessageMappingMessageHandler(channel); + MessageMappingMessageHandler messageHandler = new MessageMappingMessageHandler(); messageHandler.getReturnValueHandlerConfigurer().addCustomHandler(this.returnValueHandler); messageHandler.setApplicationContext(context); messageHandler.setEmbeddedValueResolver(new EmbeddedValueResolver(context.getBeanFactory())); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/MethodMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/MethodMessageHandlerTests.java index 82a448e14f3..60228767197 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/MethodMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/MethodMessageHandlerTests.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Consumer; +import java.util.function.Predicate; import org.hamcrest.Matchers; import org.junit.Test; @@ -193,11 +194,6 @@ public class MethodMessageHandlerTests { private PathMatcher pathMatcher = new AntPathMatcher(); - public TestMethodMessageHandler() { - setHandlerPredicate(handlerType -> handlerType.getName().endsWith("Controller")); - } - - @Override protected List initArgumentResolvers() { return Collections.emptyList(); @@ -208,6 +204,11 @@ public class MethodMessageHandlerTests { return Collections.singletonList(this.returnValueHandler); } + @Override + protected Predicate> initHandlerPredicate() { + return handlerType -> handlerType.getName().endsWith("Controller"); + } + @Nullable public Object getLastReturnValue() { return this.returnValueHandler.getLastReturnValue(); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java index ff13b07100f..a37ce6b2766 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java @@ -35,11 +35,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.codec.CharSequenceEncoder; import org.springframework.core.codec.StringDecoder; -import org.springframework.messaging.ReactiveMessageChannel; -import org.springframework.messaging.ReactiveSubscribableChannel; import org.springframework.messaging.handler.annotation.MessageExceptionHandler; import org.springframework.messaging.handler.annotation.MessageMapping; -import org.springframework.messaging.support.DefaultReactiveMessageChannel; import org.springframework.stereotype.Controller; import org.springframework.util.MimeTypeUtils; @@ -69,12 +66,9 @@ public class RSocketClientToServerIntegrationTests { context = new AnnotationConfigApplicationContext(ServerConfig.class); - ReactiveMessageChannel messageChannel = context.getBean(ReactiveMessageChannel.class); - RSocketStrategies rsocketStrategies = context.getBean(RSocketStrategies.class); - server = RSocketFactory.receive() .addServerPlugin(interceptor) - .acceptor(new MessagingAcceptor(messageChannel)) + .acceptor(context.getBean(MessageHandlerAcceptor.class)) .transport(TcpServerTransport.create("localhost", 7000)) .start() .block(); @@ -86,7 +80,7 @@ public class RSocketClientToServerIntegrationTests { .block(); requester = RSocketRequester.create( - client, MimeTypeUtils.TEXT_PLAIN, rsocketStrategies); + client, MimeTypeUtils.TEXT_PLAIN, context.getBean(RSocketStrategies.class)); } @AfterClass @@ -254,15 +248,10 @@ public class RSocketClientToServerIntegrationTests { } @Bean - public ReactiveSubscribableChannel rsocketChannel() { - return new DefaultReactiveMessageChannel(); - } - - @Bean - public RSocketMessageHandler rsocketMessageHandler() { - RSocketMessageHandler handler = new RSocketMessageHandler(rsocketChannel()); - handler.setRSocketStrategies(rsocketStrategies()); - return handler; + public MessageHandlerAcceptor messageHandlerAcceptor() { + MessageHandlerAcceptor acceptor = new MessageHandlerAcceptor(); + acceptor.setRSocketStrategies(rsocketStrategies()); + return acceptor; } @Bean diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java index 6a31f906dc9..3a05e01cc84 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java @@ -17,7 +17,6 @@ package org.springframework.messaging.rsocket; import java.time.Duration; import java.util.Collections; -import java.util.List; import io.rsocket.Closeable; import io.rsocket.RSocket; @@ -40,10 +39,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.codec.CharSequenceEncoder; import org.springframework.core.codec.StringDecoder; -import org.springframework.messaging.ReactiveMessageChannel; -import org.springframework.messaging.ReactiveSubscribableChannel; import org.springframework.messaging.handler.annotation.MessageMapping; -import org.springframework.messaging.support.DefaultReactiveMessageChannel; import org.springframework.stereotype.Controller; /** @@ -57,23 +53,15 @@ public class RSocketServerToClientIntegrationTests { private static Closeable server; - private static MessagingAcceptor clientAcceptor; - @BeforeClass @SuppressWarnings("ConstantConditions") public static void setupOnce() { - context = new AnnotationConfigApplicationContext(ServerConfig.class); - - ReactiveMessageChannel messageChannel = context.getBean("serverChannel", ReactiveMessageChannel.class); - RSocketStrategies rsocketStrategies = context.getBean(RSocketStrategies.class); - - clientAcceptor = new MessagingAcceptor( - context.getBean("clientChannel", ReactiveMessageChannel.class)); + context = new AnnotationConfigApplicationContext(RSocketConfig.class); server = RSocketFactory.receive() - .acceptor(new MessagingAcceptor(messageChannel, rsocketStrategies)) + .acceptor(context.getBean("serverAcceptor", MessageHandlerAcceptor.class)) .transport(TcpServerTransport.create("localhost", 7000)) .start() .block(); @@ -116,7 +104,7 @@ public class RSocketServerToClientIntegrationTests { rsocket = RSocketFactory.connect() .setupPayload(DefaultPayload.create("", destination)) .dataMimeType("text/plain") - .acceptor(clientAcceptor) + .acceptor(context.getBean("clientAcceptor", MessageHandlerAcceptor.class)) .transport(TcpClientTransport.create("localhost", 7000)) .start() .block(); @@ -212,13 +200,13 @@ public class RSocketServerToClientIntegrationTests { Mono.fromRunnable(testEcho) .doOnError(ex -> result.onError(ex)) .doOnSuccess(o -> result.onComplete()) - .subscribeOn(Schedulers.elastic()) + .subscribeOn(Schedulers.elastic()) // StepVerifier will block .subscribe(); } } - private static class ClientController { + private static class ClientHandler { final ReplayProcessor fireForgetPayloads = ReplayProcessor.create(); @@ -251,11 +239,11 @@ public class RSocketServerToClientIntegrationTests { @Configuration - static class ServerConfig { + static class RSocketConfig { @Bean - public ClientController clientController() { - return new ClientController(); + public ClientHandler clientHandler() { + return new ClientHandler(); } @Bean @@ -264,26 +252,17 @@ public class RSocketServerToClientIntegrationTests { } @Bean - public ReactiveSubscribableChannel clientChannel() { - return new DefaultReactiveMessageChannel(); + public MessageHandlerAcceptor clientAcceptor() { + MessageHandlerAcceptor acceptor = new MessageHandlerAcceptor(); + acceptor.setHandlers(Collections.singletonList(clientHandler())); + acceptor.setAutoDetectDisabled(); + acceptor.setRSocketStrategies(rsocketStrategies()); + return acceptor; } @Bean - public ReactiveSubscribableChannel serverChannel() { - return new DefaultReactiveMessageChannel(); - } - - @Bean - public RSocketMessageHandler clientMessageHandler() { - List handlers = Collections.singletonList(clientController()); - RSocketMessageHandler handler = new RSocketMessageHandler(clientChannel(), handlers); - handler.setRSocketStrategies(rsocketStrategies()); - return handler; - } - - @Bean - public RSocketMessageHandler serverMessageHandler() { - RSocketMessageHandler handler = new RSocketMessageHandler(serverChannel()); + public MessageHandlerAcceptor serverAcceptor() { + MessageHandlerAcceptor handler = new MessageHandlerAcceptor(); handler.setRSocketStrategies(rsocketStrategies()); return handler; }