diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 9168e2b24e4..c57deef8471 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -110,19 +110,41 @@ public class ReactorNettyTcpClient
implements TcpOperations
{
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
this.loopResources = LoopResources.create("tcp-client-loop");
this.poolResources = ConnectionProvider.elastic("tcp-client-pool");
+ this.codec = codec;
this.tcpClient = TcpClient.create(this.poolResources)
.host(host).port(port)
.runOn(this.loopResources, false)
.doOnConnected(conn -> this.channelGroup.add(conn.channel()));
+ }
+ /**
+ * A variant of {@link #ReactorNettyTcpClient(String, int, ReactorNettyCodec)}
+ * that still manages the lifecycle of the {@link TcpClient} and underlying
+ * resources, but allows for direct configuration of other properties of the
+ * client through a {@code Function codec) {
+ Assert.notNull(codec, "ReactorNettyCodec is required");
+
+ this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
+ this.loopResources = LoopResources.create("tcp-client-loop");
+ this.poolResources = ConnectionProvider.elastic("tcp-client-pool");
this.codec = codec;
+
+ this.tcpClient = clientConfigurer.apply(TcpClient
+ .create(this.poolResources)
+ .runOn(this.loopResources, false)
+ .doOnConnected(conn -> this.channelGroup.add(conn.channel())));
}
/**
* Constructor with an externally created {@link TcpClient} instance whose
* lifecycle is expected to be managed externally.
- *
* @param tcpClient the TcpClient instance to use
* @param codec for encoding and decoding the input/output byte streams
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
diff --git a/src/docs/asciidoc/web/websocket.adoc b/src/docs/asciidoc/web/websocket.adoc
index bb8b20f1154..5b6a4987e5c 100644
--- a/src/docs/asciidoc/web/websocket.adoc
+++ b/src/docs/asciidoc/web/websocket.adoc
@@ -1641,10 +1641,10 @@ to receive notifications when the "`system`" connection to the broker is lost an
re-established. For example, a Stock Quote service that broadcasts stock quotes can
stop trying to send messages when there is no active "`system`" connection.
-By default, the STOMP broker relay always connects (and reconnects as needed if
-connectivity is lost) to the same host and port. If you wish to supply multiple addresses,
+By default, the STOMP broker relay always connects, and reconnects as needed if
+connectivity is lost, to the same host and port. If you wish to supply multiple addresses,
on each attempt to connect, you can configure a supplier of addresses, instead of a
-fixed host and port. The following example shows how to do so:
+fixed host and port. The following example shows how to do that:
====
[source,java,indent=0]
@@ -1663,14 +1663,9 @@ public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
}
private ReactorNettyTcpClient