Provide access to underlying RSocketClient

The recently introduced support for RSocketClient in commit
7c98251142 did not expose the underlying
client in order to avoid a hard dependency on RSocket 1.1 for the time
being. However such access appears to be necessary, e.g. for Spring
Integration, where the connection needs to be established (warmed up)
ahead of actual requests.

See gh-25332
This commit is contained in:
Rossen Stoyanchev 2020-07-22 14:57:33 +03:00
parent f9ba58eab9
commit 9fe1feea9a
8 changed files with 43 additions and 153 deletions

View File

@ -23,6 +23,7 @@ import java.util.function.Consumer;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketClient;
import io.rsocket.core.RSocketClientAdapter;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -49,7 +50,10 @@ final class DefaultRSocketRequester implements RSocketRequester {
private static final Map<String, Object> EMPTY_HINTS = Collections.emptyMap();
private final RSocketDelegate rsocketDelegate;
private final RSocketClient rsocketClient;
@Nullable
private final RSocket rsocket;
private final MimeType dataMimeType;
@ -61,15 +65,16 @@ final class DefaultRSocketRequester implements RSocketRequester {
DefaultRSocketRequester(
RSocketDelegate rsocketDelegate, MimeType dataMimeType, MimeType metadataMimeType,
RSocketStrategies strategies) {
@Nullable RSocketClient rsocketClient, @Nullable RSocket rsocket,
MimeType dataMimeType, MimeType metadataMimeType, RSocketStrategies strategies) {
Assert.notNull(rsocketDelegate, "RSocket or RSocketClient is required");
Assert.isTrue(rsocketClient != null || rsocket != null, "RSocketClient or RSocket is required");
Assert.notNull(dataMimeType, "'dataMimeType' is required");
Assert.notNull(metadataMimeType, "'metadataMimeType' is required");
Assert.notNull(strategies, "RSocketStrategies is required");
this.rsocketDelegate = rsocketDelegate;
this.rsocketClient = (rsocketClient != null ? rsocketClient : new RSocketClientAdapter(rsocket));
this.rsocket = rsocket;
this.dataMimeType = dataMimeType;
this.metadataMimeType = metadataMimeType;
this.strategies = strategies;
@ -77,11 +82,15 @@ final class DefaultRSocketRequester implements RSocketRequester {
}
@Override
public RSocketClient rsocketClient() {
return this.rsocketClient;
}
@Nullable
@Override
public RSocket rsocket() {
return (this.rsocketDelegate instanceof ConnectionRSocketDelegate ?
((ConnectionRSocketDelegate) this.rsocketDelegate).getRSocket() : null);
return this.rsocket;
}
@Override
@ -104,11 +113,6 @@ final class DefaultRSocketRequester implements RSocketRequester {
return new DefaultRequestSpec(metadata, mimeType);
}
@Override
public void dispose() {
this.rsocketDelegate.dispose();
}
private static boolean isVoid(ResolvableType elementType) {
return (Void.class.equals(elementType.resolve()) || void.class.equals(elementType.resolve()));
}
@ -256,12 +260,12 @@ final class DefaultRSocketRequester implements RSocketRequester {
@Override
public Mono<Void> sendMetadata() {
return rsocketDelegate().metadataPush(getPayloadMono());
return rsocketClient.metadataPush(getPayloadMono());
}
@Override
public Mono<Void> send() {
return rsocketDelegate().fireAndForget(getPayloadMono());
return rsocketClient.fireAndForget(getPayloadMono());
}
@Override
@ -276,7 +280,7 @@ final class DefaultRSocketRequester implements RSocketRequester {
@SuppressWarnings("unchecked")
private <T> Mono<T> retrieveMono(ResolvableType elementType) {
Mono<Payload> payloadMono = rsocketDelegate().requestResponse(getPayloadMono());
Mono<Payload> payloadMono = rsocketClient.requestResponse(getPayloadMono());
if (isVoid(elementType)) {
return (Mono<T>) payloadMono.then();
@ -301,8 +305,8 @@ final class DefaultRSocketRequester implements RSocketRequester {
private <T> Flux<T> retrieveFlux(ResolvableType elementType) {
Flux<Payload> payloadFlux = (this.payloadFlux != null ?
rsocketDelegate().requestChannel(this.payloadFlux) :
rsocketDelegate().requestStream(getPayloadMono()));
rsocketClient.requestChannel(this.payloadFlux) :
rsocketClient.requestStream(getPayloadMono()));
if (isVoid(elementType)) {
return payloadFlux.thenMany(Flux.empty());
@ -313,10 +317,6 @@ final class DefaultRSocketRequester implements RSocketRequester {
(T) decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS));
}
private RSocketDelegate rsocketDelegate() {
return DefaultRSocketRequester.this.rsocketDelegate;
}
private Mono<Payload> getPayloadMono() {
Assert.state(this.payloadFlux == null, "No RSocket interaction with Flux request and Mono response.");
return this.payloadMono != null ? this.payloadMono : firstPayload(emptyBufferMono);
@ -327,106 +327,4 @@ final class DefaultRSocketRequester implements RSocketRequester {
}
}
// Contract to avoid a hard dependency on RSocketClient for now.
interface RSocketDelegate {
Mono<Void> fireAndForget(Mono<Payload> payloadMono);
Mono<Payload> requestResponse(Mono<Payload> payloadMono);
Flux<Payload> requestStream(Mono<Payload> payloadMono);
Flux<Payload> requestChannel(Publisher<Payload> payloadPublisher);
Mono<Void> metadataPush(Mono<Payload> payloadMono);
void dispose();
}
static class ConnectionRSocketDelegate implements RSocketDelegate {
private final RSocket rsocket;
public ConnectionRSocketDelegate(RSocket rsocket) {
Assert.notNull(rsocket, "RSocket is required");
this.rsocket = rsocket;
}
public RSocket getRSocket() {
return this.rsocket;
}
@Override
public Mono<Void> fireAndForget(Mono<Payload> payloadMono) {
return payloadMono.flatMap(this.rsocket::fireAndForget);
}
@Override
public Mono<Payload> requestResponse(Mono<Payload> payloadMono) {
return payloadMono.flatMap(this.rsocket::requestResponse);
}
@Override
public Flux<Payload> requestStream(Mono<Payload> payloadMono) {
return payloadMono.flatMapMany(this.rsocket::requestStream);
}
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloadPublisher) {
return this.rsocket.requestChannel(payloadPublisher);
}
@Override
public Mono<Void> metadataPush(Mono<Payload> payloadMono) {
return payloadMono.flatMap(this.rsocket::metadataPush);
}
@Override
public void dispose() {
this.rsocket.dispose();
}
}
static class ClientRSocketDelegate implements RSocketDelegate {
private final RSocketClient rsocketClient;
public ClientRSocketDelegate(RSocketClient rsocketClient) {
Assert.notNull(rsocketClient, "RSocketClient is required");
this.rsocketClient = rsocketClient;
}
@Override
public Mono<Void> fireAndForget(Mono<Payload> payloadMono) {
return this.rsocketClient.fireAndForget(payloadMono);
}
@Override
public Mono<Payload> requestResponse(Mono<Payload> payloadMono) {
return this.rsocketClient.requestResponse(payloadMono);
}
@Override
public Flux<Payload> requestStream(Mono<Payload> payloadMono) {
return this.rsocketClient.requestStream(payloadMono);
}
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloadPublisher) {
return this.rsocketClient.requestChannel(payloadPublisher);
}
@Override
public Mono<Void> metadataPush(Mono<Payload> payloadMono) {
return this.rsocketClient.metadataPush(payloadMono);
}
@Override
public void dispose() {
this.rsocketClient.dispose();
}
}
}

View File

@ -184,8 +184,7 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
metaMimeType, dataMimeType, setupPayload, strategies);
return new DefaultRSocketRequester(
new DefaultRSocketRequester.ClientRSocketDelegate(connector.toRSocketClient(transport)),
dataMimeType, metaMimeType, strategies);
connector.toRSocketClient(transport), null, dataMimeType, metaMimeType, strategies);
}
@Override
@ -219,9 +218,7 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
metaMimeType, dataMimeType, setupPayload, rsocketStrategies);
return connector.connect(transport).map(rsocket ->
new DefaultRSocketRequester(
new DefaultRSocketRequester.ConnectionRSocketDelegate(rsocket),
dataMimeType, metaMimeType, rsocketStrategies));
new DefaultRSocketRequester(null, rsocket, dataMimeType, metaMimeType, rsocketStrategies));
}
private RSocketStrategies getRSocketStrategies() {

View File

@ -22,6 +22,7 @@ import java.util.function.Consumer;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketClient;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
@ -48,13 +49,16 @@ import org.springframework.util.MimeType;
public interface RSocketRequester {
/**
* This method returns {@code null} unless the the requester was created
* with a "live" RSocket through one of the (now deprecated) builder connect
* methods or via {@link #wrap(RSocket, MimeType, MimeType, RSocketStrategies)}
* which is mainly for internal use in client and server responder
* implementations. Otherwise in the more common case where there is no
* "live" RSocket, the requester delegates to an
* {@link io.rsocket.RSocketClient}.
* Return the underlying {@link RSocketClient} used to make requests with.
* @since 5.3
*/
RSocketClient rsocketClient();
/**
* Return the underlying {@link RSocket} if the requester was created with a
* "live" RSocket via {@link #wrap(RSocket, MimeType, MimeType, RSocketStrategies)}
* or via one of the (deprecated) connect methods on the
* {@code RSocketRequester} builder, or otherwise return {@code null}.
*/
@Nullable
RSocket rsocket();
@ -103,13 +107,6 @@ public interface RSocketRequester {
*/
RequestSpec metadata(Object metadata, @Nullable MimeType mimeType);
/**
* Invoke the dispose method on the underlying
* {@link io.rsocket.RSocketClient} or {@link RSocket}.
* @since 5.3
*/
public void dispose();
/**
* Obtain a builder to create a client {@link RSocketRequester} by connecting
* to an RSocket server.
@ -126,9 +123,7 @@ public interface RSocketRequester {
RSocket rsocket, MimeType dataMimeType, MimeType metadataMimeType,
RSocketStrategies strategies) {
return new DefaultRSocketRequester(
new DefaultRSocketRequester.ConnectionRSocketDelegate(rsocket),
dataMimeType, metadataMimeType, strategies);
return new DefaultRSocketRequester(null, rsocket, dataMimeType, metadataMimeType, strategies);
}

View File

@ -206,8 +206,8 @@ public class DefaultRSocketRequesterBuilderTests {
}
private ConnectionSetupPayload getConnectionSetupPayload(RSocketRequester requester) {
// Trigger connection and sending of SETUP frame
requester.route("any-route").data("any-data").send().block();
// Trigger connection establishment
requester.rsocketClient().source().block();
return new DefaultConnectionSetupPayload(this.connection.setupFrame());
}

View File

@ -94,7 +94,7 @@ class RSocketBufferLeakTests {
@AfterAll
void tearDownOnce() {
requester.dispose();
requester.rsocketClient().dispose();
server.dispose();
context.close();
}

View File

@ -96,7 +96,7 @@ public class RSocketClientToServerIntegrationTests {
@AfterAll
public static void tearDownOnce() {
requester.dispose();
requester.rsocketClient().dispose();
server.dispose();
}

View File

@ -110,14 +110,14 @@ public class RSocketServerToClientIntegrationTests {
.rsocketConnector(connector -> connector.acceptor(responder))
.tcp("localhost", server.address().getPort());
// Make a request to cause a connection to be established.
requester.route("fnf").send().block();
// Trigger connection establishment.
requester.rsocketClient().source().block();
context.getBean(ServerController.class).await(Duration.ofSeconds(5));
}
finally {
if (requester != null) {
requester.dispose();
requester.rsocketClient().dispose();
}
}
}

View File

@ -271,7 +271,7 @@ class RSocketClientToServerCoroutinesIntegrationTests {
@AfterAll
@JvmStatic
fun tearDownOnce() {
requester.dispose()
requester.rsocketClient().dispose()
server.dispose()
}
}