Support for making requests via RSocketClient

Closes gh-25332
This commit is contained in:
Rossen Stoyanchev 2020-07-14 12:07:59 +03:00
parent e4a3c1570d
commit 7c98251142
11 changed files with 328 additions and 235 deletions

View File

@ -22,6 +22,7 @@ import java.util.function.Consumer;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketClient;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -48,8 +49,7 @@ final class DefaultRSocketRequester implements RSocketRequester {
private static final Map<String, Object> EMPTY_HINTS = Collections.emptyMap();
private final RSocket rsocket;
private final RSocketDelegate rsocketDelegate;
private final MimeType dataMimeType;
@ -61,15 +61,15 @@ final class DefaultRSocketRequester implements RSocketRequester {
DefaultRSocketRequester(
RSocket rsocket, MimeType dataMimeType, MimeType metadataMimeType,
RSocketDelegate rsocketDelegate, MimeType dataMimeType, MimeType metadataMimeType,
RSocketStrategies strategies) {
Assert.notNull(rsocket, "RSocket is required");
Assert.notNull(rsocketDelegate, "RSocket or RSocketClient is required");
Assert.notNull(dataMimeType, "'dataMimeType' is required");
Assert.notNull(metadataMimeType, "'metadataMimeType' is required");
Assert.notNull(strategies, "RSocketStrategies is required");
this.rsocket = rsocket;
this.rsocketDelegate = rsocketDelegate;
this.dataMimeType = dataMimeType;
this.metadataMimeType = metadataMimeType;
this.strategies = strategies;
@ -77,9 +77,11 @@ final class DefaultRSocketRequester implements RSocketRequester {
}
@Nullable
@Override
public RSocket rsocket() {
return this.rsocket;
return (this.rsocketDelegate instanceof ConnectionRSocketDelegate ?
((ConnectionRSocketDelegate) this.rsocketDelegate).getRSocket() : null);
}
@Override
@ -102,6 +104,10 @@ final class DefaultRSocketRequester implements RSocketRequester {
return new DefaultRequestSpec(metadata, mimeType);
}
@Override
public void dispose() {
this.rsocketDelegate.dispose();
}
private static boolean isVoid(ResolvableType elementType) {
return (Void.class.equals(elementType.resolve()) || void.class.equals(elementType.resolve()));
@ -250,12 +256,12 @@ final class DefaultRSocketRequester implements RSocketRequester {
@Override
public Mono<Void> sendMetadata() {
return getPayloadMono().flatMap(rsocket::metadataPush);
return rsocketDelegate().metadataPush(getPayloadMono());
}
@Override
public Mono<Void> send() {
return getPayloadMono().flatMap(rsocket::fireAndForget);
return rsocketDelegate().fireAndForget(getPayloadMono());
}
@Override
@ -270,7 +276,7 @@ final class DefaultRSocketRequester implements RSocketRequester {
@SuppressWarnings("unchecked")
private <T> Mono<T> retrieveMono(ResolvableType elementType) {
Mono<Payload> payloadMono = getPayloadMono().flatMap(rsocket::requestResponse);
Mono<Payload> payloadMono = rsocketDelegate().requestResponse(getPayloadMono());
if (isVoid(elementType)) {
return (Mono<T>) payloadMono.then();
@ -295,8 +301,8 @@ final class DefaultRSocketRequester implements RSocketRequester {
private <T> Flux<T> retrieveFlux(ResolvableType elementType) {
Flux<Payload> payloadFlux = (this.payloadFlux != null ?
rsocket.requestChannel(this.payloadFlux) :
getPayloadMono().flatMapMany(rsocket::requestStream));
rsocketDelegate().requestChannel(this.payloadFlux) :
rsocketDelegate().requestStream(getPayloadMono()));
if (isVoid(elementType)) {
return payloadFlux.thenMany(Flux.empty());
@ -307,6 +313,10 @@ final class DefaultRSocketRequester implements RSocketRequester {
(T) decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS));
}
private RSocketDelegate rsocketDelegate() {
return DefaultRSocketRequester.this.rsocketDelegate;
}
private Mono<Payload> getPayloadMono() {
Assert.state(this.payloadFlux == null, "No RSocket interaction with Flux request and Mono response.");
return this.payloadMono != null ? this.payloadMono : firstPayload(emptyBufferMono);
@ -316,4 +326,107 @@ final class DefaultRSocketRequester implements RSocketRequester {
return PayloadUtils.retainDataAndReleasePayload(payload, bufferFactory());
}
}
// Contract to avoid a hard dependency on RSocketClient for now.
interface RSocketDelegate {
Mono<Void> fireAndForget(Mono<Payload> payloadMono);
Mono<Payload> requestResponse(Mono<Payload> payloadMono);
Flux<Payload> requestStream(Mono<Payload> payloadMono);
Flux<Payload> requestChannel(Publisher<Payload> payloadPublisher);
Mono<Void> metadataPush(Mono<Payload> payloadMono);
void dispose();
}
static class ConnectionRSocketDelegate implements RSocketDelegate {
private final RSocket rsocket;
public ConnectionRSocketDelegate(RSocket rsocket) {
Assert.notNull(rsocket, "RSocket is required");
this.rsocket = rsocket;
}
public RSocket getRSocket() {
return this.rsocket;
}
@Override
public Mono<Void> fireAndForget(Mono<Payload> payloadMono) {
return payloadMono.flatMap(this.rsocket::fireAndForget);
}
@Override
public Mono<Payload> requestResponse(Mono<Payload> payloadMono) {
return payloadMono.flatMap(this.rsocket::requestResponse);
}
@Override
public Flux<Payload> requestStream(Mono<Payload> payloadMono) {
return payloadMono.flatMapMany(this.rsocket::requestStream);
}
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloadPublisher) {
return this.rsocket.requestChannel(payloadPublisher);
}
@Override
public Mono<Void> metadataPush(Mono<Payload> payloadMono) {
return payloadMono.flatMap(this.rsocket::metadataPush);
}
@Override
public void dispose() {
this.rsocket.dispose();
}
}
static class ClientRSocketDelegate implements RSocketDelegate {
private final RSocketClient rsocketClient;
public ClientRSocketDelegate(RSocketClient rsocketClient) {
Assert.notNull(rsocketClient, "RSocketClient is required");
this.rsocketClient = rsocketClient;
}
@Override
public Mono<Void> fireAndForget(Mono<Payload> payloadMono) {
return this.rsocketClient.fireAndForget(payloadMono);
}
@Override
public Mono<Payload> requestResponse(Mono<Payload> payloadMono) {
return this.rsocketClient.requestResponse(payloadMono);
}
@Override
public Flux<Payload> requestStream(Mono<Payload> payloadMono) {
return this.rsocketClient.requestStream(payloadMono);
}
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloadPublisher) {
return this.rsocketClient.requestChannel(payloadPublisher);
}
@Override
public Mono<Void> metadataPush(Mono<Payload> payloadMono) {
return this.rsocketClient.metadataPush(payloadMono);
}
@Override
public void dispose() {
this.rsocketClient.dispose();
}
}
}

View File

@ -23,10 +23,9 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.metadata.WellKnownMimeType;
import io.rsocket.transport.ClientTransport;
@ -45,7 +44,6 @@ import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
@ -59,11 +57,6 @@ import org.springframework.util.MimeTypeUtils;
*/
final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
private final static boolean rsocketConnectorPresent =
ClassUtils.isPresent("io.rsocket.core.RSocketConnector",
DefaultRSocketRequesterBuilder.class.getClassLoader());
private static final Map<String, Object> HINTS = Collections.emptyMap();
private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
@ -165,47 +158,70 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
}
@Override
public RSocketRequester tcp(String host, int port) {
return transport(TcpClientTransport.create(host, port));
}
@Override
public RSocketRequester websocket(URI uri) {
return transport(WebsocketClientTransport.create(uri));
}
@Override
public RSocketRequester transport(ClientTransport transport) {
RSocketStrategies strategies = getRSocketStrategies();
Assert.isTrue(!strategies.encoders().isEmpty(), "No encoders");
Assert.isTrue(!strategies.decoders().isEmpty(), "No decoders");
MimeType metaMimeType = (this.metadataMimeType != null ? this.metadataMimeType :
MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()));
MimeType dataMimeType = getDataMimeType(strategies);
Mono<Payload> setupPayload = getSetupPayload(dataMimeType, metaMimeType, strategies);
RSocketConnector connector = initConnector(
this.rsocketConnectorConfigurers, this.rsocketFactoryConfigurers,
metaMimeType, dataMimeType, setupPayload, strategies);
return new DefaultRSocketRequester(
new DefaultRSocketRequester.ClientRSocketDelegate(connector.toRSocketClient(transport)),
dataMimeType, metaMimeType, strategies);
}
@Override
@SuppressWarnings("deprecation")
public Mono<RSocketRequester> connectTcp(String host, int port) {
return connect(TcpClientTransport.create(host, port));
}
@Override
@SuppressWarnings("deprecation")
public Mono<RSocketRequester> connectWebSocket(URI uri) {
return connect(WebsocketClientTransport.create(uri));
}
@Override
@SuppressWarnings("deprecation")
public Mono<RSocketRequester> connect(ClientTransport transport) {
RSocketStrategies rsocketStrategies = getRSocketStrategies();
Assert.isTrue(!rsocketStrategies.encoders().isEmpty(), "No encoders");
Assert.isTrue(!rsocketStrategies.decoders().isEmpty(), "No decoders");
MimeType metaMimeType = this.metadataMimeType != null ? this.metadataMimeType :
MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
MimeType metaMimeType = (this.metadataMimeType != null ? this.metadataMimeType :
MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()));
MimeType dataMimeType = getDataMimeType(rsocketStrategies);
Mono<Payload> setupPayload = getSetupPayload(dataMimeType, metaMimeType, rsocketStrategies);
Function<Payload, Mono<RSocket>> connectFunction;
if (rsocketConnectorPresent) {
connectFunction = payload -> new RSocketConnectorHelper().getRSocketMono(
this.rsocketConnectorConfigurers, this.rsocketFactoryConfigurers,
metaMimeType, dataMimeType, setupPayload, rsocketStrategies, transport, payload);
}
else {
connectFunction = payload -> new RSocketFactoryHelper().getRSocketMono(
this.rsocketFactoryConfigurers, metaMimeType, dataMimeType,
setupPayload, rsocketStrategies, transport, payload);
}
RSocketConnector connector = initConnector(
this.rsocketConnectorConfigurers, this.rsocketFactoryConfigurers,
metaMimeType, dataMimeType, setupPayload, rsocketStrategies);
// In RSocket 1.0.2 we can pass a Mono for the setup Payload. Until then we have to
// resolve it and then cache the Mono<RSocket> because it may be a ReconnectMono.
return setupPayload
.map(connectFunction)
.cache()
.flatMap(mono -> mono.map(rsocket ->
new DefaultRSocketRequester(rsocket, dataMimeType, metaMimeType, rsocketStrategies)));
return connector.connect(transport).map(rsocket ->
new DefaultRSocketRequester(
new DefaultRSocketRequester.ConnectionRSocketDelegate(rsocket),
dataMimeType, metaMimeType, rsocketStrategies));
}
private RSocketStrategies getRSocketStrategies() {
@ -289,60 +305,32 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
.doOnDiscard(Payload.class, Payload::release);
}
@SuppressWarnings("deprecation")
private static class RSocketConnectorHelper {
private RSocketConnector initConnector(List<RSocketConnectorConfigurer> connectorConfigurers,
List<ClientRSocketFactoryConfigurer> factoryConfigurers,
MimeType metaMimeType, MimeType dataMimeType, Mono<Payload> setupPayloadMono,
RSocketStrategies rsocketStrategies) {
Mono<RSocket> getRSocketMono(List<RSocketConnectorConfigurer> connectorConfigurers,
List<ClientRSocketFactoryConfigurer> factoryConfigurers,
MimeType metaMimeType, MimeType dataMimeType, Mono<Payload> setupPayload,
RSocketStrategies rsocketStrategies, ClientTransport transport, Payload payload) {
RSocketConnector connector = RSocketConnector.create();
connectorConfigurers.forEach(c -> c.configure(connector));
io.rsocket.core.RSocketConnector connector = io.rsocket.core.RSocketConnector.create();
connectorConfigurers.forEach(c -> c.configure(connector));
if (!factoryConfigurers.isEmpty()) {
io.rsocket.RSocketFactory.ClientRSocketFactory factory =
new io.rsocket.RSocketFactory.ClientRSocketFactory(connector);
factoryConfigurers.forEach(c -> c.configure(factory));
}
if (rsocketStrategies.dataBufferFactory() instanceof NettyDataBufferFactory) {
connector.payloadDecoder(PayloadDecoder.ZERO_COPY);
}
connector.metadataMimeType(metaMimeType.toString());
connector.dataMimeType(dataMimeType.toString());
if (setupPayload != EMPTY_SETUP_PAYLOAD) {
connector.setupPayload(payload);
}
return connector.connect(transport);
if (!factoryConfigurers.isEmpty()) {
io.rsocket.RSocketFactory.ClientRSocketFactory factory =
new io.rsocket.RSocketFactory.ClientRSocketFactory(connector);
factoryConfigurers.forEach(c -> c.configure(factory));
}
}
@SuppressWarnings("deprecation")
private static class RSocketFactoryHelper {
Mono<RSocket> getRSocketMono(List<ClientRSocketFactoryConfigurer> configurers,
MimeType metaMimeType, MimeType dataMimeType, Mono<Payload> setupPayload,
RSocketStrategies rsocketStrategies, ClientTransport transport, Payload payload) {
io.rsocket.RSocketFactory.ClientRSocketFactory factory = io.rsocket.RSocketFactory.connect();
configurers.forEach(c -> c.configure(factory));
if (rsocketStrategies.dataBufferFactory() instanceof NettyDataBufferFactory) {
factory.frameDecoder(PayloadDecoder.ZERO_COPY);
}
factory.metadataMimeType(metaMimeType.toString());
factory.dataMimeType(dataMimeType.toString());
if (setupPayload != EMPTY_SETUP_PAYLOAD) {
factory.setupPayload(payload);
}
return factory.transport(transport).start();
if (rsocketStrategies.dataBufferFactory() instanceof NettyDataBufferFactory) {
connector.payloadDecoder(PayloadDecoder.ZERO_COPY);
}
connector.metadataMimeType(metaMimeType.toString());
connector.dataMimeType(dataMimeType.toString());
if (setupPayloadMono != EMPTY_SETUP_PAYLOAD) {
connector.setupPayload(setupPayloadMono);
}
return connector;
}
}

View File

@ -48,8 +48,15 @@ import org.springframework.util.MimeType;
public interface RSocketRequester {
/**
* Return the underlying sending RSocket.
* This method returns {@code null} unless the the requester was created
* with a "live" RSocket through one of the (now deprecated) builder connect
* methods or via {@link #wrap(RSocket, MimeType, MimeType, RSocketStrategies)}
* which is mainly for internal use in client and server responder
* implementations. Otherwise in the more common case where there is no
* "live" RSocket, the requester delegates to an
* {@link io.rsocket.RSocketClient}.
*/
@Nullable
RSocket rsocket();
/**
@ -96,6 +103,12 @@ public interface RSocketRequester {
*/
RequestSpec metadata(Object metadata, @Nullable MimeType mimeType);
/**
* Invoke the dispose method on the underlying
* {@link io.rsocket.RSocketClient} or {@link RSocket}.
* @since 5.3
*/
public void dispose();
/**
* Obtain a builder to create a client {@link RSocketRequester} by connecting
@ -113,7 +126,9 @@ public interface RSocketRequester {
RSocket rsocket, MimeType dataMimeType, MimeType metadataMimeType,
RSocketStrategies strategies) {
return new DefaultRSocketRequester(rsocket, dataMimeType, metadataMimeType, strategies);
return new DefaultRSocketRequester(
new DefaultRSocketRequester.ConnectionRSocketDelegate(rsocket),
dataMimeType, metadataMimeType, strategies);
}
@ -236,28 +251,65 @@ public interface RSocketRequester {
*/
RSocketRequester.Builder apply(Consumer<RSocketRequester.Builder> configurer);
/**
* Build an {@link RSocketRequester} instance for use with a TCP
* transport. Requests are made via {@link io.rsocket.RSocketClient}
* which establishes a shared TCP connection to given host and port.
* @param host the host of the server to connect to
* @param port the port of the server to connect to
* @return the created {@code RSocketRequester}
* @since 5.3
*/
RSocketRequester tcp(String host, int port);
/**
* Build an {@link RSocketRequester} instance for use with a WebSocket
* transport. Requests are made via {@link io.rsocket.RSocketClient}
* which establishes a shared WebSocket connection to given URL.
* @param uri the URL of the server to connect to
* @return the created {@code RSocketRequester}
* @since 5.3
*/
RSocketRequester websocket(URI uri);
/**
* Build an {@link RSocketRequester} instance for use with the given
* transport. Requests are made via {@link io.rsocket.RSocketClient}
* which establishes a shared connection through the given transport.
* @param transport the transport to use for connecting to the server
* @return the created {@code RSocketRequester}
* @since 5.3
*/
RSocketRequester transport(ClientTransport transport);
/**
* Connect to the server over TCP.
* @param host the server host
* @param port the server port
* @return an {@code RSocketRequester} for the connection
* @deprecated as of 5.3 in favor of {@link #tcp(String, int)}
* @see TcpClientTransport
*/
@Deprecated
Mono<RSocketRequester> connectTcp(String host, int port);
/**
* Connect to the server over WebSocket.
* @param uri the RSocket server endpoint URI
* @return an {@code RSocketRequester} for the connection
* @deprecated as of 5.3 in favor of {@link #websocket(URI)}
* @see WebsocketClientTransport
*/
@Deprecated
Mono<RSocketRequester> connectWebSocket(URI uri);
/**
* Connect to the server with the given {@code ClientTransport}.
* @param transport the client transport to use
* @return an {@code RSocketRequester} for the connection
* @deprecated as of 5.3 in favor of {@link #transport(ClientTransport)}
*/
@Deprecated
Mono<RSocketRequester> connect(ClientTransport transport);
}

View File

@ -18,9 +18,9 @@ package org.springframework.messaging.rsocket
import io.rsocket.transport.ClientTransport
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactive.asFlow
import org.reactivestreams.Publisher
import org.springframework.core.ParameterizedTypeReference
import reactor.core.publisher.Flux
@ -33,6 +33,7 @@ import java.net.URI
* @author Sebastien Deleuze
* @since 5.2
*/
@Suppress("DEPRECATION")
suspend fun RSocketRequester.Builder.connectAndAwait(transport: ClientTransport): RSocketRequester =
connect(transport).awaitSingle()
@ -42,6 +43,7 @@ suspend fun RSocketRequester.Builder.connectAndAwait(transport: ClientTransport)
* @author Sebastien Deleuze
* @since 5.2
*/
@Suppress("DEPRECATION")
suspend fun RSocketRequester.Builder.connectTcpAndAwait(host: String, port: Int): RSocketRequester =
connectTcp(host, port).awaitSingle()
@ -51,6 +53,7 @@ suspend fun RSocketRequester.Builder.connectTcpAndAwait(host: String, port: Int)
* @author Sebastien Deleuze
* @since 5.2
*/
@Suppress("DEPRECATION")
suspend fun RSocketRequester.Builder.connectWebSocketAndAwait(uri: URI): RSocketRequester =
connectWebSocket(uri).awaitSingle()

View File

@ -55,7 +55,6 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
/**
* Unit tests for {@link DefaultRSocketRequesterBuilder}.
@ -75,22 +74,10 @@ public class DefaultRSocketRequesterBuilderTests {
public void setup() {
this.transport = mock(ClientTransport.class);
given(this.transport.connect()).willReturn(Mono.just(this.connection));
given(this.transport.maxFrameLength()).willReturn(16777215);
}
@Test
@SuppressWarnings("unchecked")
public void rsocketConnectorConfigurerAppliesAtSubscription() {
Consumer<RSocketStrategies.Builder> strategiesConfigurer = mock(Consumer.class);
RSocketRequester.builder()
.rsocketConnector(this.connectorConfigurer)
.rsocketStrategies(strategiesConfigurer)
.connect(this.transport);
verifyNoInteractions(this.transport);
assertThat(this.connectorConfigurer.connector()).isNull();
}
@Test
@SuppressWarnings({"unchecked", "deprecation"})
public void rsocketConnectorConfigurer() {
@ -100,12 +87,10 @@ public class DefaultRSocketRequesterBuilderTests {
.rsocketConnector(this.connectorConfigurer)
.rsocketFactory(factoryConfigurer)
.rsocketStrategies(strategiesConfigurer)
.connect(this.transport)
.block();
.transport(this.transport);
// RSocketStrategies and RSocketConnector configurers should have been called
verify(this.transport).connect();
verify(strategiesConfigurer).accept(any(RSocketStrategies.Builder.class));
verify(factoryConfigurer).configure(any(io.rsocket.RSocketFactory.ClientRSocketFactory.class));
assertThat(this.connectorConfigurer.connector()).isNotNull();
@ -113,9 +98,7 @@ public class DefaultRSocketRequesterBuilderTests {
@Test
public void defaultDataMimeType() {
RSocketRequester requester = RSocketRequester.builder()
.connect(this.transport)
.block();
RSocketRequester requester = RSocketRequester.builder().transport(this.transport);
assertThat(requester.dataMimeType())
.as("Default data MimeType, based on the first Decoder")
@ -130,8 +113,7 @@ public class DefaultRSocketRequesterBuilderTests {
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.connect(this.transport)
.block();
.transport(this.transport);
assertThat(requester.dataMimeType())
.as("Default data MimeType, based on the first configured, non-default Decoder")
@ -142,12 +124,9 @@ public class DefaultRSocketRequesterBuilderTests {
public void dataMimeTypeExplicitlySet() {
RSocketRequester requester = RSocketRequester.builder()
.dataMimeType(MimeTypeUtils.APPLICATION_JSON)
.connect(this.transport)
.block();
.transport(this.transport);
ConnectionSetupPayload setupPayload = Mono.from(this.connection.sentFrames())
.map(DefaultConnectionSetupPayload::new)
.block();
ConnectionSetupPayload setupPayload = getConnectionSetupPayload(requester);
assertThat(setupPayload.dataMimeType()).isEqualTo("application/json");
assertThat(requester.dataMimeType()).isEqualTo(MimeTypeUtils.APPLICATION_JSON);
@ -165,12 +144,9 @@ public class DefaultRSocketRequesterBuilderTests {
connector.metadataMimeType("text/plain");
connector.dataMimeType("application/xml");
})
.connect(this.transport)
.block();
.transport(this.transport);
ConnectionSetupPayload setupPayload = Mono.from(this.connection.sentFrames())
.map(DefaultConnectionSetupPayload::new)
.block();
ConnectionSetupPayload setupPayload = getConnectionSetupPayload(requester);
assertThat(setupPayload.dataMimeType()).isEqualTo(dataMimeType.toString());
assertThat(setupPayload.metadataMimeType()).isEqualTo(metaMimeType.toString());
@ -180,17 +156,14 @@ public class DefaultRSocketRequesterBuilderTests {
@Test
public void setupRoute() {
RSocketRequester.builder()
RSocketRequester requester = RSocketRequester.builder()
.dataMimeType(MimeTypeUtils.TEXT_PLAIN)
.metadataMimeType(MimeTypeUtils.TEXT_PLAIN)
.setupRoute("toA")
.setupData("My data")
.connect(this.transport)
.block();
.transport(this.transport);
ConnectionSetupPayload setupPayload = Mono.from(this.connection.sentFrames())
.map(DefaultConnectionSetupPayload::new)
.block();
ConnectionSetupPayload setupPayload = getConnectionSetupPayload(requester);
assertThat(setupPayload.getMetadataUtf8()).isEqualTo("toA");
assertThat(setupPayload.getDataUtf8()).isEqualTo("My data");
@ -203,18 +176,15 @@ public class DefaultRSocketRequesterBuilderTests {
Mono<String> asyncMeta2 = Mono.delay(Duration.ofMillis(1)).map(aLong -> "Async Metadata 2");
Mono<String> data = Mono.delay(Duration.ofMillis(1)).map(aLong -> "Async data");
RSocketRequester.builder()
RSocketRequester requester = RSocketRequester.builder()
.dataMimeType(MimeTypeUtils.TEXT_PLAIN)
.setupRoute("toA")
.setupMetadata(asyncMeta1, new MimeType("text", "x.test.metadata1"))
.setupMetadata(asyncMeta2, new MimeType("text", "x.test.metadata2"))
.setupData(data)
.connect(this.transport)
.block();
.transport(this.transport);
ConnectionSetupPayload payload = Mono.from(this.connection.sentFrames())
.map(DefaultConnectionSetupPayload::new)
.block();
ConnectionSetupPayload setupPayload = getConnectionSetupPayload(requester);
MimeType compositeMimeType =
MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
@ -222,11 +192,11 @@ public class DefaultRSocketRequesterBuilderTests {
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(StringDecoder.allMimeTypes());
extractor.metadataToExtract(new MimeType("text", "x.test.metadata1"), String.class, "asyncMeta1");
extractor.metadataToExtract(new MimeType("text", "x.test.metadata2"), String.class, "asyncMeta2");
Map<String, Object> metadataValues = extractor.extract(payload, compositeMimeType);
Map<String, Object> metadataValues = extractor.extract(setupPayload, compositeMimeType);
assertThat(metadataValues.get("asyncMeta1")).isEqualTo("Async Metadata 1");
assertThat(metadataValues.get("asyncMeta2")).isEqualTo("Async Metadata 2");
assertThat(payload.getDataUtf8()).isEqualTo("Async data");
assertThat(setupPayload.getDataUtf8()).isEqualTo("Async data");
}
@Test
@ -235,6 +205,12 @@ public class DefaultRSocketRequesterBuilderTests {
testPayloadDecoder(DefaultDataBufferFactory.sharedInstance, PayloadDecoder.DEFAULT);
}
private ConnectionSetupPayload getConnectionSetupPayload(RSocketRequester requester) {
// Trigger connection and sending of SETUP frame
requester.route("any-route").data("any-data").send().block();
return new DefaultConnectionSetupPayload(this.connection.setupFrame());
}
private void testPayloadDecoder(DataBufferFactory bufferFactory, PayloadDecoder payloadDecoder)
throws NoSuchFieldException {
@ -245,8 +221,7 @@ public class DefaultRSocketRequesterBuilderTests {
RSocketRequester.builder()
.rsocketStrategies(strategies)
.rsocketConnector(this.connectorConfigurer)
.connect(this.transport)
.block();
.transport(this.transport);
RSocketConnector connector = this.connectorConfigurer.connector();
assertThat(connector).isNotNull();
@ -260,16 +235,21 @@ public class DefaultRSocketRequesterBuilderTests {
static class MockConnection implements DuplexConnection {
private Publisher<ByteBuf> sentFrames;
private ByteBuf setupFrame;
public Publisher<ByteBuf> sentFrames() {
return this.sentFrames;
public ByteBuf setupFrame() {
return this.setupFrame;
}
@Override
public Mono<Void> send(Publisher<ByteBuf> frames) {
this.sentFrames = frames;
return Mono.empty();
}
@Override
public Mono<Void> sendOne(ByteBuf frame) {
this.setupFrame = frame;
return Mono.empty();
}
@ -285,12 +265,17 @@ public class DefaultRSocketRequesterBuilderTests {
@Override
public Mono<Void> onClose() {
return Mono.empty();
return Mono.never();
}
@Override
public void dispose() {
}
@Override
public boolean isDisposed() {
return false;
}
}

View File

@ -89,13 +89,12 @@ class RSocketBufferLeakTests {
requester = RSocketRequester.builder()
.rsocketConnector(conn -> conn.interceptors(registry -> registry.forRequester(payloadInterceptor)))
.rsocketStrategies(context.getBean(RSocketStrategies.class))
.connectTcp("localhost", 7000)
.block();
.tcp("localhost", 7000);
}
@AfterAll
void tearDownOnce() {
requester.rsocket().dispose();
requester.dispose();
server.dispose();
context.close();
}

View File

@ -91,13 +91,12 @@ public class RSocketClientToServerIntegrationTests {
requester = RSocketRequester.builder()
.metadataMimeType(metadataMimeType)
.rsocketStrategies(context.getBean(RSocketStrategies.class))
.connectTcp("localhost", 7000)
.block();
.tcp("localhost", 7000);
}
@AfterAll
public static void tearDownOnce() {
requester.rsocket().dispose();
requester.dispose();
server.dispose();
}

View File

@ -108,14 +108,16 @@ public class RSocketServerToClientIntegrationTests {
.setupRoute(connectionRoute)
.rsocketStrategies(strategies)
.rsocketConnector(connector -> connector.acceptor(responder))
.connectTcp("localhost", server.address().getPort())
.block();
.tcp("localhost", server.address().getPort());
// Make a request to cause a connection to be established.
requester.route("fnf").send().block();
context.getBean(ServerController.class).await(Duration.ofSeconds(5));
}
finally {
if (requester != null) {
requester.rsocket().dispose();
requester.dispose();
}
}
}
@ -199,7 +201,6 @@ public class RSocketServerToClientIntegrationTests {
});
}
private void runTest(Runnable testEcho) {
Mono.fromRunnable(testEcho)
.doOnError(ex -> result.onError(ex))
@ -207,6 +208,10 @@ public class RSocketServerToClientIntegrationTests {
.subscribeOn(Schedulers.boundedElastic()) // StepVerifier will block
.subscribe();
}
@MessageMapping("fnf")
void handleFireAndForget() {
}
}

View File

@ -265,14 +265,13 @@ class RSocketClientToServerCoroutinesIntegrationTests {
requester = RSocketRequester.builder()
.rsocketConnector { connector -> connector.payloadDecoder(PayloadDecoder.ZERO_COPY) }
.rsocketStrategies(context.getBean(RSocketStrategies::class.java))
.connectTcp("localhost", 7000)
.block()!!
.tcp("localhost", 7000)
}
@AfterAll
@JvmStatic
fun tearDownOnce() {
requester.rsocket().dispose()
requester.dispose()
server.dispose()
}
}

View File

@ -39,6 +39,7 @@ class RSocketRequesterExtensionsTests {
private val stringTypeRefMatcher: (ParameterizedTypeReference<*>) -> Boolean = { it.type == String::class.java }
@Test
@Suppress("DEPRECATION")
fun connectAndAwait() {
val requester = mockk<RSocketRequester>()
val builder = mockk<RSocketRequester.Builder>()
@ -49,6 +50,7 @@ class RSocketRequesterExtensionsTests {
}
@Test
@Suppress("DEPRECATION")
fun connectTcpAndAwait() {
val host = "127.0.0.1"
val requester = mockk<RSocketRequester>()
@ -60,6 +62,7 @@ class RSocketRequesterExtensionsTests {
}
@Test
@Suppress("DEPRECATION")
fun connectWebSocketAndAwait() {
val requester = mockk<RSocketRequester>()
val builder = mockk<RSocketRequester.Builder>()

View File

@ -176,80 +176,33 @@ symmetrically, to make requests from clients and to make requests from servers.
[[rsocket-requester-client]]
=== Client Requester
To obtain an `RSocketRequester` on the client side requires connecting to a server along with
preparing and sending the initial RSocket `SETUP` frame. `RSocketRequester` provides a
builder for that. Internally it builds on `io.rsocket.core.RSocketConnector`.
To obtain an `RSocketRequester` on the client side is to connect to a server which involves
sending an RSocket `SETUP` frame with connection settings. `RSocketRequester` provides a
builder that helps to prepare an `io.rsocket.core.RSocketConnector` including connection
settings for the `SETUP` frame.
This is the most basic way to connect with default settings:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
.connectTcp("localhost", 7000);
RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
.connectWebSocket(URI.create("https://example.org:8080/rsocket"));
URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
----
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
.Kotlin
----
import org.springframework.messaging.rsocket.connectTcpAndAwait
import org.springframework.messaging.rsocket.connectWebSocketAndAwait
val requester = RSocketRequester.builder().tcp("localhost", 7000)
val requester = RSocketRequester.builder()
.connectTcpAndAwait("localhost", 7000)
val requester = RSocketRequester.builder()
.connectWebSocketAndAwait(URI.create("https://example.org:8080/rsocket"))
URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)
----
The above is deferred. To actually connect and use the requester:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
// Connect asynchronously
RSocketRequester.builder().connectTcp("localhost", 7000)
.subscribe(requester -> {
// ...
});
// Or block
RSocketRequester requester = RSocketRequester.builder()
.connectTcp("localhost", 7000)
.block(Duration.ofSeconds(5));
----
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
.Kotlin
----
// Connect asynchronously
import org.springframework.messaging.rsocket.connectTcpAndAwait
class MyService {
private var requester: RSocketRequester? = null
private suspend fun requester() = requester ?:
RSocketRequester.builder().connectTcpAndAwait("localhost", 7000).also { requester = it }
suspend fun doSomething() = requester().route(...)
}
// Or block
import org.springframework.messaging.rsocket.connectTcpAndAwait
class MyService {
private val requester = runBlocking {
RSocketRequester.builder().connectTcpAndAwait("localhost", 7000)
}
suspend fun doSomething() = requester.route(...)
}
----
The above does not connect immediately. When requests are made, a shared connection is
established transparently and used.
[[rsocket-requester-client-setup]]
@ -291,16 +244,14 @@ can be registered as follows:
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.connectTcp("localhost", 7000);
.tcp("localhost", 7000);
----
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
.Kotlin
----
import org.springframework.messaging.rsocket.connectTcpAndAwait
val strategies = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
@ -308,7 +259,7 @@ can be registered as follows:
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.connectTcpAndAwait("localhost", 7000)
.tcp("localhost", 7000)
----
`RSocketStrategies` is designed for re-use. In some scenarios, e.g. client and server in
@ -334,9 +285,9 @@ infrastructure that's used on a server, but registered programmatically as follo
SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); // <2>
Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder)) // <3>
.connectTcp("localhost", 7000);
.tcp("localhost", 7000);
----
<1> Use `PathPatternRouteMatcher`, if `spring-web` is present, for efficient
route matching.
@ -346,8 +297,6 @@ infrastructure that's used on a server, but registered programmatically as follo
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
.Kotlin
----
import org.springframework.messaging.rsocket.connectTcpAndAwait
val strategies = RSocketStrategies.builder()
.routeMatcher(PathPatternRouteMatcher()) // <1>
.build()
@ -357,7 +306,7 @@ infrastructure that's used on a server, but registered programmatically as follo
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(responder) } // <3>
.connectTcpAndAwait("localhost", 7000)
.tcp("localhost", 7000)
----
<1> Use `PathPatternRouteMatcher`, if `spring-web` is present, for efficient
route matching.
@ -374,23 +323,22 @@ you can still declare `RSocketMessageHandler` as a Spring bean and then apply as
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.connectTcp("localhost", 7000);
.tcp("localhost", 7000);
----
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
.Kotlin
----
import org.springframework.beans.factory.getBean
import org.springframework.messaging.rsocket.connectTcpAndAwait
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(handler.responder()) }
.connectTcpAndAwait("localhost", 7000)
.tcp("localhost", 7000)
----
For the above you may also need to use `setHandlerPredicate` in `RSocketMessageHandler` to
@ -413,22 +361,21 @@ at that level as follows:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.connectTcp("localhost", 7000);
.tcp("localhost", 7000);
----
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
.Kotlin
----
import org.springframework.messaging.rsocket.connectTcpAndAwait
val requester = RSocketRequester.builder()
.rsocketConnector {
//...
}.connectTcpAndAwait("localhost", 7000)
}
.tcp("localhost", 7000)
----