diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java index 96e6a7bf768..3c3ff332a97 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java @@ -28,11 +28,15 @@ import io.rsocket.Payload; import io.rsocket.core.RSocketClient; import io.rsocket.core.RSocketConnector; import io.rsocket.frame.decoder.PayloadDecoder; +import io.rsocket.loadbalance.LoadbalanceRSocketClient; +import io.rsocket.loadbalance.LoadbalanceStrategy; +import io.rsocket.loadbalance.LoadbalanceTarget; import io.rsocket.metadata.WellKnownMimeType; import io.rsocket.transport.ClientTransport; import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.client.WebsocketClientTransport; import io.rsocket.util.DefaultPayload; +import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; import org.springframework.core.ReactiveAdapter; @@ -171,6 +175,25 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { return new DefaultRSocketRequester(client, null, dataMimeType, metaMimeType, strategies); } + @Override + public RSocketRequester transports( + Publisher> targetPublisher, LoadbalanceStrategy loadbalanceStrategy) { + + RSocketStrategies strategies = getRSocketStrategies(); + MimeType metaMimeType = getMetadataMimeType(); + MimeType dataMimeType = getDataMimeType(strategies); + + RSocketConnector connector = initConnector( + this.rsocketConnectorConfigurers, metaMimeType, dataMimeType, strategies); + + LoadbalanceRSocketClient client = LoadbalanceRSocketClient.builder(targetPublisher) + .connector(connector) + .loadbalanceStrategy(loadbalanceStrategy) + .build(); + + return new DefaultRSocketRequester(client, null, dataMimeType, metaMimeType, strategies); + } + @Override @SuppressWarnings("deprecation") public Mono connectTcp(String host, int port) { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java index 614931b31ec..a3995e8a6e2 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java @@ -17,12 +17,15 @@ package org.springframework.messaging.rsocket; import java.net.URI; +import java.util.List; import java.util.function.Consumer; import io.rsocket.ConnectionSetupPayload; import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.core.RSocketClient; +import io.rsocket.loadbalance.LoadbalanceStrategy; +import io.rsocket.loadbalance.LoadbalanceTarget; import io.rsocket.transport.ClientTransport; import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.client.WebsocketClientTransport; @@ -116,8 +119,9 @@ public interface RSocketRequester { } /** - * Wrap an existing {@link RSocket}. Typically used in client or server - * responders to wrap the {@code RSocket} for the remote side. + * Wrap an existing {@link RSocket}. Typically for internal framework use, + * to wrap the remote {@code RSocket} in a client or server responder, but + * it can also be used to wrap any {@link RSocket}. */ static RSocketRequester wrap( RSocket rsocket, MimeType dataMimeType, MimeType metadataMimeType, @@ -224,36 +228,56 @@ public interface RSocketRequester { RSocketRequester.Builder apply(Consumer configurer); /** - * Build an {@link RSocketRequester} instance for use with a TCP - * transport. Requests are made via {@link io.rsocket.core.RSocketClient} - * which establishes a shared TCP connection to given host and port. - * @param host the host of the server to connect to - * @param port the port of the server to connect to + * Build an {@link RSocketRequester} with an + * {@link io.rsocket.core.RSocketClient} that connects over TCP to the + * given host and port. The requester can be used to make requests + * concurrently. Requests are made over a shared connection that is also + * re-established as needed when further requests are made. + * @param host the host to connect to + * @param port the port to connect to * @return the created {@code RSocketRequester} * @since 5.3 */ RSocketRequester tcp(String host, int port); /** - * Build an {@link RSocketRequester} instance for use with a WebSocket - * transport. Requests are made via {@link io.rsocket.core.RSocketClient} - * which establishes a shared WebSocket connection to given URL. - * @param uri the URL of the server to connect to + * Build an {@link RSocketRequester} with an + * {@link io.rsocket.core.RSocketClient} that connects over WebSocket to + * the given URL. The requester can be used to make requests + * concurrently. Requests are made over a shared connection that is also + * re-established as needed when further requests are made. + * @param uri the URL to connect to * @return the created {@code RSocketRequester} * @since 5.3 */ RSocketRequester websocket(URI uri); /** - * Build an {@link RSocketRequester} instance for use with the given - * transport. Requests are made via {@link io.rsocket.core.RSocketClient} - * which establishes a shared connection through the given transport. - * @param transport the transport to use for connecting to the server + * Variant of {@link #tcp(String, int)} and {@link #websocket(URI)} + * with an already initialized {@link ClientTransport}. + * @param transport the transport to connect with * @return the created {@code RSocketRequester} * @since 5.3 */ RSocketRequester transport(ClientTransport transport); + /** + * Build an {@link RSocketRequester} with an + * {@link io.rsocket.loadbalance.LoadbalanceRSocketClient} that will + * connect to one of the given targets selected through the given + * {@link io.rsocket.loadbalance.LoadbalanceRSocketClient}. + * @param targetPublisher a {@code Publisher} that supplies a list of + * target transports to loadbalance against; the given list may be + * periodically updated by the {@code Publisher}. + * @param loadbalanceStrategy the strategy to use for selecting from + * the list of loadbalance targets. + * @return the created {@code RSocketRequester} + * @since 5.3 + */ + RSocketRequester transports( + Publisher> targetPublisher, + LoadbalanceStrategy loadbalanceStrategy); + /** * Connect to the server over TCP. * @param host the server host