Support for LoadbalanceRSocketClient

Closes gh-25333
This commit is contained in:
Rossen Stoyanchev 2020-09-07 16:31:12 +01:00
parent 71ecca7443
commit 30d556b9ef
2 changed files with 62 additions and 15 deletions

View File

@ -28,11 +28,15 @@ import io.rsocket.Payload;
import io.rsocket.core.RSocketClient;
import io.rsocket.core.RSocketConnector;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.loadbalance.LoadbalanceRSocketClient;
import io.rsocket.loadbalance.LoadbalanceStrategy;
import io.rsocket.loadbalance.LoadbalanceTarget;
import io.rsocket.metadata.WellKnownMimeType;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import io.rsocket.util.DefaultPayload;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.springframework.core.ReactiveAdapter;
@ -171,6 +175,25 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
return new DefaultRSocketRequester(client, null, dataMimeType, metaMimeType, strategies);
}
@Override
public RSocketRequester transports(
Publisher<List<LoadbalanceTarget>> targetPublisher, LoadbalanceStrategy loadbalanceStrategy) {
RSocketStrategies strategies = getRSocketStrategies();
MimeType metaMimeType = getMetadataMimeType();
MimeType dataMimeType = getDataMimeType(strategies);
RSocketConnector connector = initConnector(
this.rsocketConnectorConfigurers, metaMimeType, dataMimeType, strategies);
LoadbalanceRSocketClient client = LoadbalanceRSocketClient.builder(targetPublisher)
.connector(connector)
.loadbalanceStrategy(loadbalanceStrategy)
.build();
return new DefaultRSocketRequester(client, null, dataMimeType, metaMimeType, strategies);
}
@Override
@SuppressWarnings("deprecation")
public Mono<RSocketRequester> connectTcp(String host, int port) {

View File

@ -17,12 +17,15 @@
package org.springframework.messaging.rsocket;
import java.net.URI;
import java.util.List;
import java.util.function.Consumer;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketClient;
import io.rsocket.loadbalance.LoadbalanceStrategy;
import io.rsocket.loadbalance.LoadbalanceTarget;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
@ -116,8 +119,9 @@ public interface RSocketRequester {
}
/**
* Wrap an existing {@link RSocket}. Typically used in client or server
* responders to wrap the {@code RSocket} for the remote side.
* Wrap an existing {@link RSocket}. Typically for internal framework use,
* to wrap the remote {@code RSocket} in a client or server responder, but
* it can also be used to wrap any {@link RSocket}.
*/
static RSocketRequester wrap(
RSocket rsocket, MimeType dataMimeType, MimeType metadataMimeType,
@ -224,36 +228,56 @@ public interface RSocketRequester {
RSocketRequester.Builder apply(Consumer<RSocketRequester.Builder> configurer);
/**
* Build an {@link RSocketRequester} instance for use with a TCP
* transport. Requests are made via {@link io.rsocket.core.RSocketClient}
* which establishes a shared TCP connection to given host and port.
* @param host the host of the server to connect to
* @param port the port of the server to connect to
* Build an {@link RSocketRequester} with an
* {@link io.rsocket.core.RSocketClient} that connects over TCP to the
* given host and port. The requester can be used to make requests
* concurrently. Requests are made over a shared connection that is also
* re-established as needed when further requests are made.
* @param host the host to connect to
* @param port the port to connect to
* @return the created {@code RSocketRequester}
* @since 5.3
*/
RSocketRequester tcp(String host, int port);
/**
* Build an {@link RSocketRequester} instance for use with a WebSocket
* transport. Requests are made via {@link io.rsocket.core.RSocketClient}
* which establishes a shared WebSocket connection to given URL.
* @param uri the URL of the server to connect to
* Build an {@link RSocketRequester} with an
* {@link io.rsocket.core.RSocketClient} that connects over WebSocket to
* the given URL. The requester can be used to make requests
* concurrently. Requests are made over a shared connection that is also
* re-established as needed when further requests are made.
* @param uri the URL to connect to
* @return the created {@code RSocketRequester}
* @since 5.3
*/
RSocketRequester websocket(URI uri);
/**
* Build an {@link RSocketRequester} instance for use with the given
* transport. Requests are made via {@link io.rsocket.core.RSocketClient}
* which establishes a shared connection through the given transport.
* @param transport the transport to use for connecting to the server
* Variant of {@link #tcp(String, int)} and {@link #websocket(URI)}
* with an already initialized {@link ClientTransport}.
* @param transport the transport to connect with
* @return the created {@code RSocketRequester}
* @since 5.3
*/
RSocketRequester transport(ClientTransport transport);
/**
* Build an {@link RSocketRequester} with an
* {@link io.rsocket.loadbalance.LoadbalanceRSocketClient} that will
* connect to one of the given targets selected through the given
* {@link io.rsocket.loadbalance.LoadbalanceRSocketClient}.
* @param targetPublisher a {@code Publisher} that supplies a list of
* target transports to loadbalance against; the given list may be
* periodically updated by the {@code Publisher}.
* @param loadbalanceStrategy the strategy to use for selecting from
* the list of loadbalance targets.
* @return the created {@code RSocketRequester}
* @since 5.3
*/
RSocketRequester transports(
Publisher<List<LoadbalanceTarget>> targetPublisher,
LoadbalanceStrategy loadbalanceStrategy);
/**
* Connect to the server over TCP.
* @param host the server host