Register annotated handlers in RSocketRequester

Prior to this commit, there would be no easy way to register client
RSocket handlers against an `RSocketRequester` instance. The only
solution was to gather all handlers and wrap them in a
`RSocketMessageHandler` and configure it as an acceptor on the client
RSocket.

This commit adds a convenience method on the `RSocketRequester` builder
to tkae care of this part of the infrastructure.

Closes gh-23170
This commit is contained in:
Brian Clozel 2019-06-24 15:11:18 +02:00
parent d5a2bdee8d
commit dd15ff79d7
3 changed files with 45 additions and 33 deletions

View File

@ -18,17 +18,20 @@ package org.springframework.messaging.rsocket;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Stream; import java.util.stream.Stream;
import io.rsocket.RSocketFactory; import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport; import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport; import io.rsocket.transport.netty.client.WebsocketClientTransport;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.MimeType; import org.springframework.util.MimeType;
@ -53,6 +56,7 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
private List<Consumer<RSocketStrategies.Builder>> strategiesConfigurers = new ArrayList<>(); private List<Consumer<RSocketStrategies.Builder>> strategiesConfigurers = new ArrayList<>();
private List<Object> handlers = new ArrayList<>();
@Override @Override
public RSocketRequester.Builder dataMimeType(@Nullable MimeType mimeType) { public RSocketRequester.Builder dataMimeType(@Nullable MimeType mimeType) {
@ -79,6 +83,12 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
return this; return this;
} }
@Override
public RSocketRequester.Builder annotatedHandlers(Object... handlers) {
this.handlers.addAll(Arrays.asList(handlers));
return this;
}
@Override @Override
public RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer) { public RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer) {
this.strategiesConfigurers.add(configurer); this.strategiesConfigurers.add(configurer);
@ -101,7 +111,6 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
} }
private Mono<RSocketRequester> doConnect(ClientTransport transport) { private Mono<RSocketRequester> doConnect(ClientTransport transport) {
RSocketStrategies rsocketStrategies = getRSocketStrategies(); RSocketStrategies rsocketStrategies = getRSocketStrategies();
Assert.isTrue(!rsocketStrategies.encoders().isEmpty(), "No encoders"); Assert.isTrue(!rsocketStrategies.encoders().isEmpty(), "No encoders");
Assert.isTrue(!rsocketStrategies.decoders().isEmpty(), "No decoders"); Assert.isTrue(!rsocketStrategies.decoders().isEmpty(), "No decoders");
@ -110,6 +119,15 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
MimeType dataMimeType = getDataMimeType(rsocketStrategies); MimeType dataMimeType = getDataMimeType(rsocketStrategies);
rsocketFactory.dataMimeType(dataMimeType.toString()); rsocketFactory.dataMimeType(dataMimeType.toString());
rsocketFactory.metadataMimeType(this.metadataMimeType.toString()); rsocketFactory.metadataMimeType(this.metadataMimeType.toString());
if (!this.handlers.isEmpty()) {
RSocketMessageHandler messageHandler = new RSocketMessageHandler();
messageHandler.setHandlers(this.handlers);
messageHandler.setRSocketStrategies(rsocketStrategies);
messageHandler.afterPropertiesSet();
rsocketFactory.acceptor(messageHandler.clientAcceptor());
}
rsocketFactory.frameDecoder(PayloadDecoder.ZERO_COPY);
this.factoryConfigurers.forEach(consumer -> consumer.accept(rsocketFactory)); this.factoryConfigurers.forEach(consumer -> consumer.accept(rsocketFactory));
return rsocketFactory.transport(transport) return rsocketFactory.transport(transport)

View File

@ -30,6 +30,7 @@ import reactor.core.publisher.Mono;
import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ReactiveAdapterRegistry; import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.util.MimeType; import org.springframework.util.MimeType;
/** /**
@ -158,6 +159,17 @@ public interface RSocketRequester {
*/ */
RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer); RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer);
/**
* Add handlers for processing requests sent by the server.
* <p>This is a shortcut for registering client handlers (i.e. annotated controllers)
* to a {@link RSocketMessageHandler} and configuring it as an acceptor.
* You can take full control by manually registering an acceptor on the
* {@link RSocketFactory.ClientRSocketFactory} using {@link #rsocketFactory(Consumer)}
* instead.
* @param handlers the client handlers to configure on the requester
*/
RSocketRequester.Builder annotatedHandlers(Object... handlers);
/** /**
* Connect to the RSocket server over TCP. * Connect to the RSocket server over TCP.
* @param host the server host * @param host the server host

View File

@ -17,16 +17,12 @@
package org.springframework.messaging.rsocket; package org.springframework.messaging.rsocket;
import java.time.Duration; import java.time.Duration;
import java.util.Collections;
import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator;
import io.rsocket.Closeable;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory; import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport; import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -51,12 +47,13 @@ import org.springframework.stereotype.Controller;
* Client-side handling of requests initiated from the server side. * Client-side handling of requests initiated from the server side.
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @author Brian Clozel
*/ */
public class RSocketServerToClientIntegrationTests { public class RSocketServerToClientIntegrationTests {
private static AnnotationConfigApplicationContext context; private static AnnotationConfigApplicationContext context;
private static Closeable server; private static CloseableChannel server;
@BeforeClass @BeforeClass
@ -66,8 +63,8 @@ public class RSocketServerToClientIntegrationTests {
server = RSocketFactory.receive() server = RSocketFactory.receive()
.frameDecoder(PayloadDecoder.ZERO_COPY) .frameDecoder(PayloadDecoder.ZERO_COPY)
.acceptor(context.getBean("serverMessageHandler", RSocketMessageHandler.class).serverAcceptor()) .acceptor(context.getBean(RSocketMessageHandler.class).serverAcceptor())
.transport(TcpServerTransport.create("localhost", 7000)) .transport(TcpServerTransport.create("localhost", 0))
.start() .start()
.block(); .block();
} }
@ -104,23 +101,21 @@ public class RSocketServerToClientIntegrationTests {
ServerController serverController = context.getBean(ServerController.class); ServerController serverController = context.getBean(ServerController.class);
serverController.reset(); serverController.reset();
RSocket rsocket = null; RSocketRequester requester = null;
try { try {
rsocket = RSocketFactory.connect() requester = RSocketRequester.builder()
.metadataMimeType("message/x.rsocket.routing.v0") .annotatedHandlers(new ClientHandler())
.dataMimeType("text/plain") .rsocketStrategies(context.getBean(RSocketStrategies.class))
.setupPayload(DefaultPayload.create("", destination)) .connectTcp("localhost", server.address().getPort())
.frameDecoder(PayloadDecoder.ZERO_COPY)
.acceptor(context.getBean("clientMessageHandler", RSocketMessageHandler.class).clientAcceptor())
.transport(TcpClientTransport.create("localhost", 7000))
.start()
.block(); .block();
requester.route(destination).data("").send().block();
serverController.await(Duration.ofSeconds(5)); serverController.await(Duration.ofSeconds(5));
} }
finally { finally {
if (rsocket != null) { if (requester != null) {
rsocket.dispose(); requester.rsocket().dispose();
} }
} }
} }
@ -250,24 +245,11 @@ public class RSocketServerToClientIntegrationTests {
@Configuration @Configuration
static class RSocketConfig { static class RSocketConfig {
@Bean
public ClientHandler clientHandler() {
return new ClientHandler();
}
@Bean @Bean
public ServerController serverController() { public ServerController serverController() {
return new ServerController(); return new ServerController();
} }
@Bean
public RSocketMessageHandler clientMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setHandlers(Collections.singletonList(clientHandler()));
handler.setRSocketStrategies(rsocketStrategies());
return handler;
}
@Bean @Bean
public RSocketMessageHandler serverMessageHandler() { public RSocketMessageHandler serverMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler(); RSocketMessageHandler handler = new RSocketMessageHandler();