From dd15ff79d7223bb33c8e69558d69deb546aadfe1 Mon Sep 17 00:00:00 2001 From: Brian Clozel Date: Mon, 24 Jun 2019 15:11:18 +0200 Subject: [PATCH] Register annotated handlers in RSocketRequester Prior to this commit, there would be no easy way to register client RSocket handlers against an `RSocketRequester` instance. The only solution was to gather all handlers and wrap them in a `RSocketMessageHandler` and configure it as an acceptor on the client RSocket. This commit adds a convenience method on the `RSocketRequester` builder to tkae care of this part of the infrastructure. Closes gh-23170 --- .../DefaultRSocketRequesterBuilder.java | 20 +++++++- .../messaging/rsocket/RSocketRequester.java | 12 +++++ ...RSocketServerToClientIntegrationTests.java | 46 ++++++------------- 3 files changed, 45 insertions(+), 33 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java index 90286cb2bc..52ef55bbde 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java @@ -18,17 +18,20 @@ package org.springframework.messaging.rsocket; import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.function.Consumer; import java.util.stream.Stream; import io.rsocket.RSocketFactory; +import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.transport.ClientTransport; import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.client.WebsocketClientTransport; import reactor.core.publisher.Mono; import org.springframework.lang.Nullable; +import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; import org.springframework.util.Assert; import org.springframework.util.MimeType; @@ -53,6 +56,7 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { private List> strategiesConfigurers = new ArrayList<>(); + private List handlers = new ArrayList<>(); @Override public RSocketRequester.Builder dataMimeType(@Nullable MimeType mimeType) { @@ -79,6 +83,12 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { return this; } + @Override + public RSocketRequester.Builder annotatedHandlers(Object... handlers) { + this.handlers.addAll(Arrays.asList(handlers)); + return this; + } + @Override public RSocketRequester.Builder rsocketStrategies(Consumer configurer) { this.strategiesConfigurers.add(configurer); @@ -101,7 +111,6 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { } private Mono doConnect(ClientTransport transport) { - RSocketStrategies rsocketStrategies = getRSocketStrategies(); Assert.isTrue(!rsocketStrategies.encoders().isEmpty(), "No encoders"); Assert.isTrue(!rsocketStrategies.decoders().isEmpty(), "No decoders"); @@ -110,6 +119,15 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { MimeType dataMimeType = getDataMimeType(rsocketStrategies); rsocketFactory.dataMimeType(dataMimeType.toString()); rsocketFactory.metadataMimeType(this.metadataMimeType.toString()); + + if (!this.handlers.isEmpty()) { + RSocketMessageHandler messageHandler = new RSocketMessageHandler(); + messageHandler.setHandlers(this.handlers); + messageHandler.setRSocketStrategies(rsocketStrategies); + messageHandler.afterPropertiesSet(); + rsocketFactory.acceptor(messageHandler.clientAcceptor()); + } + rsocketFactory.frameDecoder(PayloadDecoder.ZERO_COPY); this.factoryConfigurers.forEach(consumer -> consumer.accept(rsocketFactory)); return rsocketFactory.transport(transport) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java index 8d7488ada1..ab9f22b0f6 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java @@ -30,6 +30,7 @@ import reactor.core.publisher.Mono; import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.ReactiveAdapterRegistry; import org.springframework.lang.Nullable; +import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; import org.springframework.util.MimeType; /** @@ -158,6 +159,17 @@ public interface RSocketRequester { */ RSocketRequester.Builder rsocketStrategies(Consumer configurer); + /** + * Add handlers for processing requests sent by the server. + *

This is a shortcut for registering client handlers (i.e. annotated controllers) + * to a {@link RSocketMessageHandler} and configuring it as an acceptor. + * You can take full control by manually registering an acceptor on the + * {@link RSocketFactory.ClientRSocketFactory} using {@link #rsocketFactory(Consumer)} + * instead. + * @param handlers the client handlers to configure on the requester + */ + RSocketRequester.Builder annotatedHandlers(Object... handlers); + /** * Connect to the RSocket server over TCP. * @param host the server host diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java index 75cb8666c6..39051efa39 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java @@ -17,16 +17,12 @@ package org.springframework.messaging.rsocket; import java.time.Duration; -import java.util.Collections; import io.netty.buffer.PooledByteBufAllocator; -import io.rsocket.Closeable; -import io.rsocket.RSocket; import io.rsocket.RSocketFactory; import io.rsocket.frame.decoder.PayloadDecoder; -import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.TcpServerTransport; -import io.rsocket.util.DefaultPayload; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -51,12 +47,13 @@ import org.springframework.stereotype.Controller; * Client-side handling of requests initiated from the server side. * * @author Rossen Stoyanchev + * @author Brian Clozel */ public class RSocketServerToClientIntegrationTests { private static AnnotationConfigApplicationContext context; - private static Closeable server; + private static CloseableChannel server; @BeforeClass @@ -66,8 +63,8 @@ public class RSocketServerToClientIntegrationTests { server = RSocketFactory.receive() .frameDecoder(PayloadDecoder.ZERO_COPY) - .acceptor(context.getBean("serverMessageHandler", RSocketMessageHandler.class).serverAcceptor()) - .transport(TcpServerTransport.create("localhost", 7000)) + .acceptor(context.getBean(RSocketMessageHandler.class).serverAcceptor()) + .transport(TcpServerTransport.create("localhost", 0)) .start() .block(); } @@ -104,23 +101,21 @@ public class RSocketServerToClientIntegrationTests { ServerController serverController = context.getBean(ServerController.class); serverController.reset(); - RSocket rsocket = null; + RSocketRequester requester = null; try { - rsocket = RSocketFactory.connect() - .metadataMimeType("message/x.rsocket.routing.v0") - .dataMimeType("text/plain") - .setupPayload(DefaultPayload.create("", destination)) - .frameDecoder(PayloadDecoder.ZERO_COPY) - .acceptor(context.getBean("clientMessageHandler", RSocketMessageHandler.class).clientAcceptor()) - .transport(TcpClientTransport.create("localhost", 7000)) - .start() + requester = RSocketRequester.builder() + .annotatedHandlers(new ClientHandler()) + .rsocketStrategies(context.getBean(RSocketStrategies.class)) + .connectTcp("localhost", server.address().getPort()) .block(); + requester.route(destination).data("").send().block(); + serverController.await(Duration.ofSeconds(5)); } finally { - if (rsocket != null) { - rsocket.dispose(); + if (requester != null) { + requester.rsocket().dispose(); } } } @@ -250,24 +245,11 @@ public class RSocketServerToClientIntegrationTests { @Configuration static class RSocketConfig { - @Bean - public ClientHandler clientHandler() { - return new ClientHandler(); - } - @Bean public ServerController serverController() { return new ServerController(); } - @Bean - public RSocketMessageHandler clientMessageHandler() { - RSocketMessageHandler handler = new RSocketMessageHandler(); - handler.setHandlers(Collections.singletonList(clientHandler())); - handler.setRSocketStrategies(rsocketStrategies()); - return handler; - } - @Bean public RSocketMessageHandler serverMessageHandler() { RSocketMessageHandler handler = new RSocketMessageHandler();