parent
79c339b03e
commit
b2a4d1c5b1
|
|
@ -23,8 +23,10 @@ import java.util.LinkedHashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import io.rsocket.Payload;
|
||||
import io.rsocket.RSocket;
|
||||
import io.rsocket.frame.decoder.PayloadDecoder;
|
||||
import io.rsocket.metadata.WellKnownMimeType;
|
||||
import io.rsocket.transport.ClientTransport;
|
||||
|
|
@ -174,10 +176,6 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
|
|||
|
||||
@Override
|
||||
public Mono<RSocketRequester> connect(ClientTransport transport) {
|
||||
return Mono.defer(() -> doConnect(transport));
|
||||
}
|
||||
|
||||
private Mono<RSocketRequester> doConnect(ClientTransport transport) {
|
||||
RSocketStrategies rsocketStrategies = getRSocketStrategies();
|
||||
Assert.isTrue(!rsocketStrategies.encoders().isEmpty(), "No encoders");
|
||||
Assert.isTrue(!rsocketStrategies.decoders().isEmpty(), "No decoders");
|
||||
|
|
@ -186,21 +184,28 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
|
|||
MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
|
||||
|
||||
MimeType dataMimeType = getDataMimeType(rsocketStrategies);
|
||||
Mono<Payload> setupPayload = getSetupPayload(dataMimeType, metaMimeType, rsocketStrategies);
|
||||
|
||||
Function<Payload, Mono<RSocket>> connectFunction;
|
||||
if (rsocketConnectorPresent) {
|
||||
return getSetupPayload(dataMimeType, metaMimeType, rsocketStrategies)
|
||||
.flatMap(payload ->
|
||||
new RSocketConnectorHelper().connect(
|
||||
this.rsocketConnectorConfigurers, this.rsocketFactoryConfigurers,
|
||||
metaMimeType, dataMimeType, payload, rsocketStrategies, transport));
|
||||
connectFunction = payload -> new RSocketConnectorHelper().getRSocketMono(
|
||||
this.rsocketConnectorConfigurers, this.rsocketFactoryConfigurers,
|
||||
metaMimeType, dataMimeType, setupPayload, rsocketStrategies, transport, payload);
|
||||
}
|
||||
else {
|
||||
return getSetupPayload(dataMimeType, metaMimeType, rsocketStrategies)
|
||||
.flatMap(payload ->
|
||||
new RSocketFactoryHelper().connect(
|
||||
this.rsocketFactoryConfigurers, metaMimeType, dataMimeType, payload,
|
||||
rsocketStrategies, transport));
|
||||
connectFunction = payload -> new RSocketFactoryHelper().getRSocketMono(
|
||||
this.rsocketFactoryConfigurers, metaMimeType, dataMimeType,
|
||||
setupPayload, rsocketStrategies, transport, payload);
|
||||
}
|
||||
|
||||
// In RSocket 1.0.2 we can pass a Mono for the setup Payload. Until then we have to
|
||||
// resolve it and then cache the Mono<RSocket> because it may be a ReconnectMono.
|
||||
|
||||
return setupPayload
|
||||
.map(connectFunction)
|
||||
.cache()
|
||||
.flatMap(mono -> mono.map(rsocket ->
|
||||
new DefaultRSocketRequester(rsocket, dataMimeType, metaMimeType, rsocketStrategies)));
|
||||
}
|
||||
|
||||
private RSocketStrategies getRSocketStrategies() {
|
||||
|
|
@ -285,14 +290,13 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
|
|||
}
|
||||
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private static class RSocketConnectorHelper {
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
Mono<RSocketRequester> connect(
|
||||
List<RSocketConnectorConfigurer> connectorConfigurers,
|
||||
Mono<RSocket> getRSocketMono(List<RSocketConnectorConfigurer> connectorConfigurers,
|
||||
List<ClientRSocketFactoryConfigurer> factoryConfigurers,
|
||||
MimeType metaMimeType, MimeType dataMimeType, Payload setupPayload,
|
||||
RSocketStrategies rsocketStrategies, ClientTransport transport) {
|
||||
MimeType metaMimeType, MimeType dataMimeType, Mono<Payload> setupPayload,
|
||||
RSocketStrategies rsocketStrategies, ClientTransport transport, Payload payload) {
|
||||
|
||||
io.rsocket.core.RSocketConnector connector = io.rsocket.core.RSocketConnector.create();
|
||||
connectorConfigurers.forEach(c -> c.configure(connector));
|
||||
|
|
@ -307,16 +311,13 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
|
|||
connector.payloadDecoder(PayloadDecoder.ZERO_COPY);
|
||||
}
|
||||
|
||||
if (setupPayload != EMPTY_SETUP_PAYLOAD) {
|
||||
connector.setupPayload(setupPayload);
|
||||
}
|
||||
connector.metadataMimeType(metaMimeType.toString());
|
||||
connector.dataMimeType(dataMimeType.toString());
|
||||
|
||||
return connector
|
||||
.metadataMimeType(metaMimeType.toString())
|
||||
.dataMimeType(dataMimeType.toString())
|
||||
.connect(transport)
|
||||
.map(rsocket -> new DefaultRSocketRequester(
|
||||
rsocket, dataMimeType, metaMimeType, rsocketStrategies));
|
||||
if (setupPayload != EMPTY_SETUP_PAYLOAD) {
|
||||
connector.setupPayload(payload);
|
||||
}
|
||||
return connector.connect(transport);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -324,10 +325,9 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
|
|||
@SuppressWarnings("deprecation")
|
||||
private static class RSocketFactoryHelper {
|
||||
|
||||
Mono<RSocketRequester> connect(
|
||||
List<ClientRSocketFactoryConfigurer> configurers,
|
||||
MimeType metaMimeType, MimeType dataMimeType, Payload setupPayload,
|
||||
RSocketStrategies rsocketStrategies, ClientTransport transport) {
|
||||
Mono<RSocket> getRSocketMono(List<ClientRSocketFactoryConfigurer> configurers,
|
||||
MimeType metaMimeType, MimeType dataMimeType, Mono<Payload> setupPayload,
|
||||
RSocketStrategies rsocketStrategies, ClientTransport transport, Payload payload) {
|
||||
|
||||
io.rsocket.RSocketFactory.ClientRSocketFactory factory = io.rsocket.RSocketFactory.connect();
|
||||
configurers.forEach(c -> c.configure(factory));
|
||||
|
|
@ -336,16 +336,12 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
|
|||
factory.frameDecoder(PayloadDecoder.ZERO_COPY);
|
||||
}
|
||||
|
||||
factory.metadataMimeType(metaMimeType.toString());
|
||||
factory.dataMimeType(dataMimeType.toString());
|
||||
if (setupPayload != EMPTY_SETUP_PAYLOAD) {
|
||||
factory.setupPayload(setupPayload);
|
||||
factory.setupPayload(payload);
|
||||
}
|
||||
|
||||
return factory.metadataMimeType(metaMimeType.toString())
|
||||
.dataMimeType(dataMimeType.toString())
|
||||
.transport(transport)
|
||||
.start()
|
||||
.map(rsocket -> new DefaultRSocketRequester(
|
||||
rsocket, dataMimeType, metaMimeType, rsocketStrategies));
|
||||
return factory.transport(transport).start();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue