diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandler.java index 54cec200e3b..771ef26bb28 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandler.java @@ -323,6 +323,7 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler vars = getPathMatcher().extractUriTemplateVariables(pattern, destination); if (!CollectionUtils.isEmpty(vars)) { MessageHeaderAccessor mha = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class); @@ -332,4 +333,5 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler payloadFlux; - DefaultResponseSpec(Mono payloadMono) { this.payloadMono = payloadMono; this.payloadFlux = null; @@ -206,10 +203,9 @@ final class DefaultRSocketRequester implements RSocketRequester { this.payloadFlux = payloadFlux; } - @Override public Mono send() { - Assert.notNull(this.payloadMono, "No RSocket interaction model for one-way send with Flux."); + Assert.state(this.payloadMono != null, "No RSocket interaction model for one-way send with Flux"); return this.payloadMono.flatMap(rsocket::fireAndForget); } @@ -235,9 +231,7 @@ final class DefaultRSocketRequester implements RSocketRequester { @SuppressWarnings("unchecked") private Mono retrieveMono(ResolvableType elementType) { - Assert.notNull(this.payloadMono, - "No RSocket interaction model for Flux request to Mono response."); - + Assert.notNull(this.payloadMono, "No RSocket interaction model for Flux request to Mono response."); Mono payloadMono = this.payloadMono.flatMap(rsocket::requestResponse); if (isVoid(elementType)) { @@ -251,7 +245,6 @@ final class DefaultRSocketRequester implements RSocketRequester { @SuppressWarnings("unchecked") private Flux retrieveFlux(ResolvableType elementType) { - Flux payloadFlux = this.payloadMono != null ? this.payloadMono.flatMapMany(rsocket::requestStream) : rsocket.requestChannel(this.payloadFlux); @@ -261,7 +254,6 @@ final class DefaultRSocketRequester implements RSocketRequester { } Decoder decoder = strategies.decoder(elementType, dataMimeType); - return payloadFlux.map(this::retainDataAndReleasePayload).map(dataBuffer -> (T) decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS)); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java index 5e49daa12af..373b4b4a979 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java @@ -27,7 +27,6 @@ import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.client.WebsocketClientTransport; import reactor.core.publisher.Mono; -import org.springframework.lang.Nullable; import org.springframework.util.MimeType; /** @@ -38,12 +37,11 @@ import org.springframework.util.MimeType; */ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { - @Nullable private List> factoryConfigurers = new ArrayList<>(); - @Nullable private List> strategiesConfigurers = new ArrayList<>(); + @Override public RSocketRequester.Builder rsocketFactory(Consumer configurer) { this.factoryConfigurers.add(configurer); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketStrategies.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketStrategies.java index 1de258de66a..ba72af81019 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketStrategies.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketStrategies.java @@ -47,8 +47,7 @@ final class DefaultRSocketStrategies implements RSocketStrategies { private final DataBufferFactory bufferFactory; - private DefaultRSocketStrategies( - List> encoders, List> decoders, + private DefaultRSocketStrategies(List> encoders, List> decoders, ReactiveAdapterRegistry adapterRegistry, DataBufferFactory bufferFactory) { this.encoders = Collections.unmodifiableList(encoders); @@ -93,7 +92,6 @@ final class DefaultRSocketStrategies implements RSocketStrategies { @Nullable private DataBufferFactory dataBufferFactory; - public DefaultRSocketStrategiesBuilder() { } @@ -104,7 +102,6 @@ final class DefaultRSocketStrategies implements RSocketStrategies { this.dataBufferFactory = other.dataBufferFactory(); } - @Override public Builder encoder(Encoder... encoders) { this.encoders.addAll(Arrays.asList(encoders)); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessageHandlerAcceptor.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessageHandlerAcceptor.java index 1e168eaed6a..600633c9498 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessageHandlerAcceptor.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessageHandlerAcceptor.java @@ -59,6 +59,7 @@ public final class MessageHandlerAcceptor extends RSocketMessageHandler @Override public Mono accept(ConnectionSetupPayload setupPayload, RSocket sendingRSocket) { MessagingRSocket rsocket = createRSocket(sendingRSocket); + // Allow handling of the ConnectionSetupPayload via @MessageMapping methods. // However, if the handling is to make requests to the client, it's expected // it will do so decoupled from the handling, e.g. via .subscribe(). @@ -71,8 +72,7 @@ public final class MessageHandlerAcceptor extends RSocketMessageHandler } private MessagingRSocket createRSocket(RSocket rsocket) { - return new MessagingRSocket( - this::handleMessage, rsocket, this.defaultDataMimeType, getRSocketStrategies()); + return new MessagingRSocket(this::handleMessage, rsocket, this.defaultDataMimeType, getRSocketStrategies()); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java index 911751f6770..47e006ea55a 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java @@ -42,12 +42,24 @@ import org.springframework.util.MimeType; */ public interface RSocketRequester { - /** * Return the underlying RSocket used to make requests. */ RSocket rsocket(); + // For now we treat metadata as a simple string that is the route. + // This will change after the resolution of: + // https://github.com/rsocket/rsocket-java/issues/568 + + /** + * Entry point to prepare a new request to the given route. + *

For requestChannel interactions, i.e. Flux-to-Flux the metadata is + * attached to the first request payload. + * @param route the routing destination + * @return a spec for further defining and executing the reuqest + */ + RequestSpec route(String route); + /** * Create a new {@code RSocketRequester} from the given {@link RSocket} and @@ -68,20 +80,6 @@ public interface RSocketRequester { return new DefaultRSocketRequesterBuilder(); } - // For now we treat metadata as a simple string that is the route. - // This will change after the resolution of: - // https://github.com/rsocket/rsocket-java/issues/568 - - /** - * Entry point to prepare a new request to the given route. - * - *

For requestChannel interactions, i.e. Flux-to-Flux the metadata is - * attached to the first request payload. - * - * @param route the routing destination - * @return a spec for further defining and executing the reuqest - */ - RequestSpec route(String route); /** * A mutable builder for creating a client {@link RSocketRequester}. @@ -129,7 +127,6 @@ public interface RSocketRequester { * @return a mono containing the connected {@code RSocketRequester} */ Mono connectWebSocket(URI uri, MimeType dataMimeType); - }