diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java index 99c450274c9..ee9a4dce6bd 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java @@ -18,7 +18,6 @@ package org.springframework.messaging.rsocket; import java.net.URI; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.function.Consumer; import java.util.stream.Stream; @@ -31,7 +30,6 @@ import io.rsocket.transport.netty.client.WebsocketClientTransport; import reactor.core.publisher.Mono; import org.springframework.lang.Nullable; -import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; import org.springframework.util.Assert; import org.springframework.util.MimeType; @@ -56,7 +54,6 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { private List> strategiesConfigurers = new ArrayList<>(); - private List handlers = new ArrayList<>(); @Override public RSocketRequester.Builder dataMimeType(@Nullable MimeType mimeType) { @@ -83,12 +80,6 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { return this; } - @Override - public RSocketRequester.Builder annotatedHandlers(Object... handlers) { - this.handlers.addAll(Arrays.asList(handlers)); - return this; - } - @Override public RSocketRequester.Builder rsocketStrategies(Consumer configurer) { this.strategiesConfigurers.add(configurer); @@ -120,13 +111,6 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { rsocketFactory.dataMimeType(dataMimeType.toString()); rsocketFactory.metadataMimeType(this.metadataMimeType.toString()); - if (!this.handlers.isEmpty()) { - RSocketMessageHandler messageHandler = new RSocketMessageHandler(); - messageHandler.setHandlers(this.handlers); - messageHandler.setRSocketStrategies(rsocketStrategies); - messageHandler.afterPropertiesSet(); - rsocketFactory.acceptor(messageHandler.clientResponder()); - } rsocketFactory.frameDecoder(PayloadDecoder.ZERO_COPY); this.factoryConfigurers.forEach(consumer -> consumer.accept(rsocketFactory)); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java index 142a41dd18c..01bd0038fa2 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java @@ -30,7 +30,6 @@ import reactor.core.publisher.Mono; import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.ReactiveAdapterRegistry; import org.springframework.lang.Nullable; -import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; import org.springframework.util.MimeType; /** @@ -65,7 +64,6 @@ public interface RSocketRequester { */ MimeType metadataMimeType(); - /** * Begin to specify a new request with the given route to a remote handler. *

If the connection is set to use composite metadata, the route is @@ -158,6 +156,7 @@ public interface RSocketRequester { /** * Set the {@link RSocketStrategies} to use for access to encoders, * decoders, and a factory for {@code DataBuffer's}. + * @param strategies the codecs strategies to use */ RSocketRequester.Builder rsocketStrategies(@Nullable RSocketStrategies strategies); @@ -170,17 +169,6 @@ public interface RSocketRequester { */ RSocketRequester.Builder rsocketStrategies(Consumer configurer); - /** - * Add handlers for processing requests sent by the server. - *

This is a shortcut for registering client handlers (i.e. annotated controllers) - * to a {@link RSocketMessageHandler} and configuring it as an acceptor. - * You can take full control by manually registering an acceptor on the - * {@link io.rsocket.RSocketFactory.ClientRSocketFactory} using - * {@link #rsocketFactory(Consumer)} instead. - * @param handlers the client handlers to configure on the requester - */ - RSocketRequester.Builder annotatedHandlers(Object... handlers); - /** * Connect to the RSocket server over TCP. * @param host the server host diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/ClientResponderFactory.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/ClientResponderFactory.java new file mode 100644 index 00000000000..ba76c2913c4 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/ClientResponderFactory.java @@ -0,0 +1,99 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.rsocket.annotation.support; + +import java.util.function.Consumer; + +import io.rsocket.RSocketFactory; + +import org.springframework.messaging.handler.invocation.reactive.ArgumentResolverConfigurer; +import org.springframework.messaging.handler.invocation.reactive.ReturnValueHandlerConfigurer; +import org.springframework.messaging.rsocket.RSocketStrategies; +import org.springframework.util.RouteMatcher; + +/** + * Build and configure a responder on a {@link RSocketFactory.ClientRSocketFactory} in order + * to handle requests sent by the RSocket server to the client. + *

This can be configured as a responder on a {@link org.springframework.messaging.rsocket.RSocketRequester} + * being built by passing it as an argument to the + * {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketFactory} method. + * + * @author Brian Clozel + * @since 5.2 + * @see org.springframework.messaging.rsocket.RSocketRequester + */ +public interface ClientResponderFactory extends Consumer { + + /** + * Create a new {@link ClientResponderFactory.Config} for handling requests with annotated handlers. + */ + static ClientResponderFactory.Config create() { + return new DefaultClientResponderFactory(); + } + + /** + * Configure the client responder with infrastructure options + * to be applied on the resulting {@link RSocketMessageHandler}. + */ + interface Config { + + /** + * Set the {@link RSocketStrategies} to use for access to encoders, + * decoders, and a factory for {@code DataBuffer's}. + * @param strategies the codecs strategies to use + */ + Config strategies(RSocketStrategies strategies); + + /** + * Set the {@link RouteMatcher} to use for matching incoming requests. + *

If none is set, then the responder will use a default + * {@link org.springframework.util.SimpleRouteMatcher} instance backed + * by and {@link org.springframework.util.AntPathMatcher}. + * @param routeMatcher the route matcher to use with the responder + */ + Config routeMatcher(RouteMatcher routeMatcher); + + /** + * Set the {@link MetadataExtractor} to use for extracting information + * from metadata frames. + * @param extractor the metadata extractor to use + */ + Config metadataExtractor(MetadataExtractor extractor); + + /** + * Set the {@link ReturnValueHandlerConfigurer} for configuring + * return value handlers. + * @param configurer the configurer to use + */ + Config returnValueHandler(ReturnValueHandlerConfigurer configurer); + + /** + * Set the {@link ArgumentResolverConfigurer} for configuring + * argument resolvers. + * @param configurer the configurer to use + */ + Config argumentResolver(ArgumentResolverConfigurer configurer); + + /** + * Set the annotated handlers in charge of processing the incoming RSocket requests. + * @param handlers the annotated handlers + */ + ClientResponderFactory handlers(Object... handlers); + + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/DefaultClientResponderFactory.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/DefaultClientResponderFactory.java new file mode 100644 index 00000000000..ba80b0f418a --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/DefaultClientResponderFactory.java @@ -0,0 +1,116 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.rsocket.annotation.support; + +import java.util.Arrays; +import java.util.List; + +import io.rsocket.RSocketFactory; + +import org.springframework.lang.Nullable; +import org.springframework.messaging.handler.invocation.reactive.ArgumentResolverConfigurer; +import org.springframework.messaging.handler.invocation.reactive.ReturnValueHandlerConfigurer; +import org.springframework.messaging.rsocket.RSocketStrategies; +import org.springframework.util.Assert; +import org.springframework.util.RouteMatcher; + +/** + * Default implementation of {@link ClientResponderFactory}. + * + * @author Brian Clozel + */ +class DefaultClientResponderFactory implements ClientResponderFactory, ClientResponderFactory.Config { + + private List handlers; + + @Nullable + private RSocketStrategies strategies; + + @Nullable + private RouteMatcher routeMatcher; + + @Nullable + private MetadataExtractor extractor; + + @Nullable + private ReturnValueHandlerConfigurer returnValueHandlerConfigurer; + + @Nullable + private ArgumentResolverConfigurer argumentResolverConfigurer; + + @Override + public ClientResponderFactory handlers(Object... handlers) { + Assert.notEmpty(handlers, "handlers should not be empty"); + this.handlers = Arrays.asList(handlers); + return this; + } + + @Override + public ClientResponderFactory.Config strategies(RSocketStrategies strategies) { + this.strategies = strategies; + return this; + } + + @Override + public ClientResponderFactory.Config routeMatcher(RouteMatcher routeMatcher) { + this.routeMatcher = routeMatcher; + return this; + } + + @Override + public ClientResponderFactory.Config metadataExtractor(MetadataExtractor extractor) { + this.extractor = extractor; + return this; + } + + @Override + public ClientResponderFactory.Config returnValueHandler(ReturnValueHandlerConfigurer configurer) { + this.returnValueHandlerConfigurer = configurer; + return this; + } + + @Override + public ClientResponderFactory.Config argumentResolver(ArgumentResolverConfigurer configurer) { + this.argumentResolverConfigurer = configurer; + return this; + } + + @Override + public void accept(RSocketFactory.ClientRSocketFactory clientRSocketFactory) { + Assert.notEmpty(this.handlers, "handlers should not be empty"); + RSocketMessageHandler messageHandler = new RSocketMessageHandler(); + messageHandler.setHandlers(this.handlers); + if (this.strategies != null) { + messageHandler.setRSocketStrategies(this.strategies); + } + if (this.routeMatcher != null) { + messageHandler.setRouteMatcher(this.routeMatcher); + } + if (this.extractor != null) { + messageHandler.setMetadataExtractor(this.extractor); + } + if (this.returnValueHandlerConfigurer != null) { + messageHandler.setReturnValueHandlerConfigurer(this.returnValueHandlerConfigurer); + } + if (this.argumentResolverConfigurer != null) { + messageHandler.setArgumentResolverConfigurer(this.argumentResolverConfigurer); + } + messageHandler.afterPropertiesSet(); + clientRSocketFactory.acceptor(messageHandler.clientResponder()); + } + +} diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java index 32d78e578ce..076e1446ad9 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java @@ -42,6 +42,7 @@ import org.springframework.core.codec.StringDecoder; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.rsocket.annotation.ConnectMapping; +import org.springframework.messaging.rsocket.annotation.support.ClientResponderFactory; import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; import org.springframework.stereotype.Controller; @@ -102,17 +103,22 @@ public class RSocketServerToClientIntegrationTests { ServerController serverController = context.getBean(ServerController.class); serverController.reset(); + RSocketStrategies rSocketStrategies = context.getBean(RSocketStrategies.class); + + ClientResponderFactory clientResponder = ClientResponderFactory.create() + .strategies(rSocketStrategies) + .handlers(new ClientHandler()); RSocketRequester requester = null; try { requester = RSocketRequester.builder() - .annotatedHandlers(new ClientHandler()) .rsocketFactory(factory -> { factory.metadataMimeType("text/plain"); factory.setupPayload(ByteBufPayload.create("", connectionRoute)); factory.frameDecoder(PayloadDecoder.ZERO_COPY); }) - .rsocketStrategies(context.getBean(RSocketStrategies.class)) + .rsocketFactory(clientResponder) + .rsocketStrategies(rSocketStrategies) .connectTcp("localhost", server.address().getPort()) .block();