Default MimeType selection for RSocketRequester

Remove the dataMimeType argument on connect methods. Applications can
still configure it through the ClientRSocketFactory but it shouldn't
be necessary as we now choose a default MimeType from the supported
encoders and decoders.

Add an option to provide the RSocketStrategies instance (vs customizing
it) which is expected in Spring config where an RSocketStrategies
instance may be shared between client and server setups.
This commit is contained in:
Rossen Stoyanchev 2019-04-29 15:22:01 -04:00
parent 8888a65079
commit a1ad0285ca
5 changed files with 81 additions and 52 deletions

View File

@ -27,6 +27,7 @@ import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport; import io.rsocket.transport.netty.client.WebsocketClientTransport;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import org.springframework.lang.Nullable;
import org.springframework.util.MimeType; import org.springframework.util.MimeType;
/** /**
@ -39,6 +40,9 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
private List<Consumer<RSocketFactory.ClientRSocketFactory>> factoryConfigurers = new ArrayList<>(); private List<Consumer<RSocketFactory.ClientRSocketFactory>> factoryConfigurers = new ArrayList<>();
@Nullable
private RSocketStrategies strategies;
private List<Consumer<RSocketStrategies.Builder>> strategiesConfigurers = new ArrayList<>(); private List<Consumer<RSocketStrategies.Builder>> strategiesConfigurers = new ArrayList<>();
@ -48,6 +52,12 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
return this; return this;
} }
@Override
public RSocketRequester.Builder rsocketStrategies(@Nullable RSocketStrategies strategies) {
this.strategies = strategies;
return this;
}
@Override @Override
public RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer) { public RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer) {
this.strategiesConfigurers.add(configurer); this.strategiesConfigurers.add(configurer);
@ -55,28 +65,54 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
} }
@Override @Override
public Mono<RSocketRequester> connectTcp(String host, int port, MimeType dataMimeType) { public Mono<RSocketRequester> connectTcp(String host, int port) {
return connect(TcpClientTransport.create(host, port), dataMimeType); return connect(TcpClientTransport.create(host, port));
} }
@Override @Override
public Mono<RSocketRequester> connectWebSocket(URI uri, MimeType dataMimeType) { public Mono<RSocketRequester> connectWebSocket(URI uri) {
return connect(WebsocketClientTransport.create(uri), dataMimeType); return connect(WebsocketClientTransport.create(uri));
} }
@Override @Override
public Mono<RSocketRequester> connect(ClientTransport transport, MimeType dataMimeType) { public Mono<RSocketRequester> connect(ClientTransport transport) {
return Mono.defer(() -> { return Mono.defer(() -> {
String mimeType = dataMimeType.toString(); RSocketStrategies strategies = getRSocketStrategies();
RSocketFactory.ClientRSocketFactory factory = RSocketFactory.connect().dataMimeType(mimeType); MimeType dataMimeType = getDefaultDataMimeType(strategies);
RSocketFactory.ClientRSocketFactory factory = RSocketFactory.connect();
if (dataMimeType != null) {
factory.dataMimeType(dataMimeType.toString());
}
this.factoryConfigurers.forEach(configurer -> configurer.accept(factory)); this.factoryConfigurers.forEach(configurer -> configurer.accept(factory));
RSocketStrategies.Builder builder = RSocketStrategies.builder();
this.strategiesConfigurers.forEach(configurer -> configurer.accept(builder));
return factory.transport(transport).start() return factory.transport(transport).start()
.map(rsocket -> RSocketRequester.create(rsocket, dataMimeType, builder.build())); .map(rsocket -> RSocketRequester.create(rsocket, dataMimeType, strategies));
}); });
} }
private RSocketStrategies getRSocketStrategies() {
if (this.strategiesConfigurers.isEmpty()) {
return this.strategies != null ? this.strategies : RSocketStrategies.builder().build();
}
RSocketStrategies.Builder strategiesBuilder = this.strategies != null ?
this.strategies.mutate() : RSocketStrategies.builder();
this.strategiesConfigurers.forEach(configurer -> configurer.accept(strategiesBuilder));
return strategiesBuilder.build();
}
@Nullable
private MimeType getDefaultDataMimeType(RSocketStrategies strategies) {
return strategies.encoders().stream()
.flatMap(encoder -> encoder.getEncodableMimeTypes().stream())
.filter(MimeType::isConcrete)
.findFirst()
.orElseGet(() ->
strategies.decoders().stream()
.flatMap(encoder -> encoder.getDecodableMimeTypes().stream())
.filter(MimeType::isConcrete)
.findFirst()
.orElse(null));
}
} }

View File

@ -91,15 +91,25 @@ public interface RSocketRequester {
interface Builder { interface Builder {
/** /**
* Configure the {@code ClientRSocketFactory} to customize protocol * Configure the {@code ClientRSocketFactory}.
* options, register RSocket plugins (interceptors), and more. * <p>Note there is typically no need to set a data MimeType explicitly.
* By default a data MimeType is picked by taking the first concrete
* MimeType supported by the configured encoders and decoders.
* @param configurer the configurer to apply * @param configurer the configurer to apply
*/ */
RSocketRequester.Builder rsocketFactory(Consumer<RSocketFactory.ClientRSocketFactory> configurer); RSocketRequester.Builder rsocketFactory(Consumer<RSocketFactory.ClientRSocketFactory> configurer);
/** /**
* Configure the builder for {@link RSocketStrategies}. * Set the {@link RSocketStrategies} instance.
* <p>The builder starts with an empty {@code RSocketStrategies}. * @param strategies the strategies to use
*/
RSocketRequester.Builder rsocketStrategies(@Nullable RSocketStrategies strategies);
/**
* Customize the {@link RSocketStrategies}.
* <p>By default this starts out with an empty builder, i.e.
* {@link RSocketStrategies#builder()}, but the strategies can also be
* set via {@link #rsocketStrategies(RSocketStrategies)}.
* @param configurer the configurer to apply * @param configurer the configurer to apply
*/ */
RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer); RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer);
@ -108,25 +118,23 @@ public interface RSocketRequester {
* Connect to the RSocket server over TCP. * Connect to the RSocket server over TCP.
* @param host the server host * @param host the server host
* @param port the server port * @param port the server port
* @param dataMimeType the data MimeType for the connection
* @return an {@code RSocketRequester} for the connection * @return an {@code RSocketRequester} for the connection
*/ */
Mono<RSocketRequester> connectTcp(String host, int port, MimeType dataMimeType); Mono<RSocketRequester> connectTcp(String host, int port);
/** /**
* Connect to the RSocket server over WebSocket. * Connect to the RSocket server over WebSocket.
* @param uri the RSocket server endpoint URI * @param uri the RSocket server endpoint URI
* @param dataMimeType the data MimeType
* @return an {@code RSocketRequester} for the connection * @return an {@code RSocketRequester} for the connection
*/ */
Mono<RSocketRequester> connectWebSocket(URI uri, MimeType dataMimeType); Mono<RSocketRequester> connectWebSocket(URI uri);
/** /**
* Connect to the RSocket server with the given {@code ClientTransport}. * Connect to the RSocket server with the given {@code ClientTransport}.
* @param transport the client transport to use * @param transport the client transport to use
* @return an {@code RSocketRequester} for the connection * @return an {@code RSocketRequester} for the connection
*/ */
Mono<RSocketRequester> connect(ClientTransport transport, MimeType dataMimeType); Mono<RSocketRequester> connect(ClientTransport transport);
} }

View File

@ -28,14 +28,8 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import org.springframework.util.MimeTypeUtils; import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
/** /**
* Unit tests for {@link DefaultRSocketRequesterBuilder}. * Unit tests for {@link DefaultRSocketRequesterBuilder}.
@ -46,6 +40,7 @@ public class DefaultRSocketRequesterBuilderTests {
private ClientTransport transport; private ClientTransport transport;
@Before @Before
public void setup() { public void setup() {
this.transport = mock(ClientTransport.class); this.transport = mock(ClientTransport.class);
@ -57,10 +52,10 @@ public class DefaultRSocketRequesterBuilderTests {
public void shouldApplyCustomizationsAtSubscription() { public void shouldApplyCustomizationsAtSubscription() {
Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer = mock(Consumer.class); Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer = mock(Consumer.class);
Consumer<RSocketStrategies.Builder> strategiesConfigurer = mock(Consumer.class); Consumer<RSocketStrategies.Builder> strategiesConfigurer = mock(Consumer.class);
Mono<RSocketRequester> requester = RSocketRequester.builder() RSocketRequester.builder()
.rsocketFactory(factoryConfigurer) .rsocketFactory(factoryConfigurer)
.rsocketStrategies(strategiesConfigurer) .rsocketStrategies(strategiesConfigurer)
.connect(this.transport, MimeTypeUtils.APPLICATION_JSON); .connect(this.transport);
verifyZeroInteractions(this.transport, factoryConfigurer, strategiesConfigurer); verifyZeroInteractions(this.transport, factoryConfigurer, strategiesConfigurer);
} }
@ -69,10 +64,10 @@ public class DefaultRSocketRequesterBuilderTests {
public void shouldApplyCustomizations() { public void shouldApplyCustomizations() {
Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer = mock(Consumer.class); Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer = mock(Consumer.class);
Consumer<RSocketStrategies.Builder> strategiesConfigurer = mock(Consumer.class); Consumer<RSocketStrategies.Builder> strategiesConfigurer = mock(Consumer.class);
RSocketRequester requester = RSocketRequester.builder() RSocketRequester.builder()
.rsocketFactory(factoryConfigurer) .rsocketFactory(factoryConfigurer)
.rsocketStrategies(strategiesConfigurer) .rsocketStrategies(strategiesConfigurer)
.connect(this.transport, MimeTypeUtils.APPLICATION_JSON) .connect(this.transport)
.block(); .block();
verify(this.transport).connect(anyInt()); verify(this.transport).connect(anyInt());
verify(factoryConfigurer).accept(any(RSocketFactory.ClientRSocketFactory.class)); verify(factoryConfigurer).accept(any(RSocketFactory.ClientRSocketFactory.class));

View File

@ -32,7 +32,6 @@ import io.rsocket.RSocket;
import io.rsocket.RSocketFactory; import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.plugins.RSocketInterceptor; import io.rsocket.plugins.RSocketInterceptor;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport; import io.rsocket.transport.netty.server.TcpServerTransport;
import org.junit.After; import org.junit.After;
@ -60,7 +59,6 @@ import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload; import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Controller; import org.springframework.stereotype.Controller;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -78,8 +76,6 @@ public class RSocketBufferLeakTests {
private static CloseableChannel server; private static CloseableChannel server;
private static RSocket client;
private static RSocketRequester requester; private static RSocketRequester requester;
@ -96,21 +92,19 @@ public class RSocketBufferLeakTests {
.start() .start()
.block(); .block();
client = RSocketFactory.connect() requester = RSocketRequester.builder()
.frameDecoder(PayloadDecoder.ZERO_COPY) .rsocketFactory(factory -> {
.addClientPlugin(payloadInterceptor) // intercept outgoing requests factory.frameDecoder(PayloadDecoder.ZERO_COPY);
.dataMimeType(MimeTypeUtils.TEXT_PLAIN_VALUE) factory.addClientPlugin(payloadInterceptor); // intercept outgoing requests
.transport(TcpClientTransport.create("localhost", 7000)) })
.start() .rsocketStrategies(context.getBean(RSocketStrategies.class))
.connectTcp("localhost", 7000)
.block(); .block();
requester = RSocketRequester.create(
client, MimeTypeUtils.TEXT_PLAIN, context.getBean(RSocketStrategies.class));
} }
@AfterClass @AfterClass
public static void tearDownOnce() { public static void tearDownOnce() {
client.dispose(); requester.rsocket().dispose();
server.dispose(); server.dispose();
} }

View File

@ -40,7 +40,6 @@ import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler; import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller; import org.springframework.stereotype.Controller;
import org.springframework.util.MimeTypeUtils;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -75,11 +74,8 @@ public class RSocketClientToServerIntegrationTests {
requester = RSocketRequester.builder() requester = RSocketRequester.builder()
.rsocketFactory(factory -> factory.frameDecoder(PayloadDecoder.ZERO_COPY)) .rsocketFactory(factory -> factory.frameDecoder(PayloadDecoder.ZERO_COPY))
.rsocketStrategies(strategies -> strategies .rsocketStrategies(context.getBean(RSocketStrategies.class))
.decoder(StringDecoder.allMimeTypes()) .connectTcp("localhost", 7000)
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT)))
.connectTcp("localhost", 7000, MimeTypeUtils.TEXT_PLAIN)
.block(); .block();
} }