From 24848ec1bcec59d3fc93ddd77a084b637266db93 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 20 Nov 2018 21:20:21 -0500 Subject: [PATCH] Configurable TcpClient for ReactorNettyTcpClient Issue: SPR-17523 --- .../tcp/reactor/ReactorNettyTcpClient.java | 24 ++++++++++++++++++- src/docs/asciidoc/web/websocket.adoc | 17 +++++-------- 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 9168e2b24e4..c57deef8471 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -110,19 +110,41 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); this.loopResources = LoopResources.create("tcp-client-loop"); this.poolResources = ConnectionProvider.elastic("tcp-client-pool"); + this.codec = codec; this.tcpClient = TcpClient.create(this.poolResources) .host(host).port(port) .runOn(this.loopResources, false) .doOnConnected(conn -> this.channelGroup.add(conn.channel())); + } + /** + * A variant of {@link #ReactorNettyTcpClient(String, int, ReactorNettyCodec)} + * that still manages the lifecycle of the {@link TcpClient} and underlying + * resources, but allows for direct configuration of other properties of the + * client through a {@code Function}. + * @param clientConfigurer the configurer function + * @param codec for encoding and decoding the input/output byte streams + * @since 5.1.3 + * @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec + */ + public ReactorNettyTcpClient(Function clientConfigurer, ReactorNettyCodec

codec) { + Assert.notNull(codec, "ReactorNettyCodec is required"); + + this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); + this.loopResources = LoopResources.create("tcp-client-loop"); + this.poolResources = ConnectionProvider.elastic("tcp-client-pool"); this.codec = codec; + + this.tcpClient = clientConfigurer.apply(TcpClient + .create(this.poolResources) + .runOn(this.loopResources, false) + .doOnConnected(conn -> this.channelGroup.add(conn.channel()))); } /** * Constructor with an externally created {@link TcpClient} instance whose * lifecycle is expected to be managed externally. - * * @param tcpClient the TcpClient instance to use * @param codec for encoding and decoding the input/output byte streams * @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec diff --git a/src/docs/asciidoc/web/websocket.adoc b/src/docs/asciidoc/web/websocket.adoc index bb8b20f1154..5b6a4987e5c 100644 --- a/src/docs/asciidoc/web/websocket.adoc +++ b/src/docs/asciidoc/web/websocket.adoc @@ -1641,10 +1641,10 @@ to receive notifications when the "`system`" connection to the broker is lost an re-established. For example, a Stock Quote service that broadcasts stock quotes can stop trying to send messages when there is no active "`system`" connection. -By default, the STOMP broker relay always connects (and reconnects as needed if -connectivity is lost) to the same host and port. If you wish to supply multiple addresses, +By default, the STOMP broker relay always connects, and reconnects as needed if +connectivity is lost, to the same host and port. If you wish to supply multiple addresses, on each attempt to connect, you can configure a supplier of addresses, instead of a -fixed host and port. The following example shows how to do so: +fixed host and port. The following example shows how to do that: ==== [source,java,indent=0] @@ -1663,14 +1663,9 @@ public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { } private ReactorNettyTcpClient createTcpClient() { - - Consumer> builderConsumer = builder -> { - builder.connectAddress(()-> { - // Select address to connect to ... - }); - }; - - return new ReactorNettyTcpClient<>(builderConsumer, new StompReactorNettyCodec()); + return new ReactorNettyTcpClient<>( + client -> client.addressSupplier(() -> ... ), + new StompReactorNettyCodec()); } } ----