Upgrade to RSocket 1.0 RC7 snapshots

This commit is contained in:
Rossen Stoyanchev 2020-04-20 09:42:14 +01:00
parent 376434eb75
commit ed8c61a852
12 changed files with 335 additions and 130 deletions

View File

@ -29,7 +29,7 @@ configure(allprojects) { project ->
mavenBom "com.fasterxml.jackson:jackson-bom:2.10.3"
mavenBom "io.netty:netty-bom:4.1.48.Final"
mavenBom "io.projectreactor:reactor-bom:Dysprosium-BUILD-SNAPSHOT"
mavenBom "io.rsocket:rsocket-bom:1.0.0-RC6"
mavenBom "io.rsocket:rsocket-bom:1.0.0-RC7-SNAPSHOT"
mavenBom "org.eclipse.jetty:jetty-bom:9.4.28.v20200408"
mavenBom "org.jetbrains.kotlin:kotlin-bom:1.3.72"
mavenBom "org.jetbrains.kotlinx:kotlinx-coroutines-bom:1.3.5"
@ -280,6 +280,7 @@ configure(allprojects) { project ->
mavenCentral()
maven { url "https://repo.spring.io/libs-spring-framework-build" }
maven { url "https://repo.spring.io/snapshot" }
maven { url "https://oss.jfrog.org/artifactory/oss-snapshot-local" } // RSocket
}
}
configurations.all {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -15,8 +15,6 @@
*/
package org.springframework.messaging.rsocket;
import io.rsocket.RSocketFactory;
/**
* Strategy to apply configuration to a client side {@code RSocketFactory}.
* that's being prepared by {@link RSocketRequester.Builder} to connect
@ -24,13 +22,17 @@ import io.rsocket.RSocketFactory;
*
* @author Rossen Stoyanchev
* @since 5.2
* @deprecated as of 5.2.6 following the deprecation of
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory RSocketFactory.ClientRSocketFactory}
* in RSocket 1.0 RC7. Please, use {@link RSocketConnectorConfigurer}.
*/
@FunctionalInterface
@Deprecated
public interface ClientRSocketFactoryConfigurer {
/**
* Apply configuration to the given {@code ClientRSocketFactory}.
*/
void configure(RSocketFactory.ClientRSocketFactory rsocketFactory);
void configure(io.rsocket.RSocketFactory.ClientRSocketFactory rsocketFactory);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -25,12 +25,12 @@ import java.util.Map;
import java.util.function.Consumer;
import io.rsocket.Payload;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.metadata.WellKnownMimeType;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import io.rsocket.util.DefaultPayload;
import reactor.core.publisher.Mono;
import org.springframework.core.ReactiveAdapter;
@ -43,6 +43,7 @@ import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
@ -56,10 +57,17 @@ import org.springframework.util.MimeTypeUtils;
*/
final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
private final static boolean rsocketConnectorPresent =
ClassUtils.isPresent("io.rsocket.core.RSocketConnector",
DefaultRSocketRequesterBuilder.class.getClassLoader());
private static final Map<String, Object> HINTS = Collections.emptyMap();
private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
private static final Payload EMPTY_SETUP_PAYLOAD = DefaultPayload.create(EMPTY_BYTE_ARRAY);
@Nullable
private MimeType dataMimeType;
@ -84,7 +92,10 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
private List<Consumer<RSocketStrategies.Builder>> strategiesConfigurers = new ArrayList<>();
private List<ClientRSocketFactoryConfigurer> rsocketConfigurers = new ArrayList<>();
private List<RSocketConnectorConfigurer> rsocketConnectorConfigurers = new ArrayList<>();
@SuppressWarnings("deprecation")
private List<ClientRSocketFactoryConfigurer> rsocketFactoryConfigurers = new ArrayList<>();
@Override
@ -133,8 +144,15 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
}
@Override
public RSocketRequester.Builder rsocketConnector(RSocketConnectorConfigurer configurer) {
this.rsocketConnectorConfigurers.add(configurer);
return this;
}
@Override
@Deprecated
public RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer) {
this.rsocketConfigurers.add(configurer);
this.rsocketFactoryConfigurers.add(configurer);
return this;
}
@ -164,28 +182,25 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
Assert.isTrue(!rsocketStrategies.encoders().isEmpty(), "No encoders");
Assert.isTrue(!rsocketStrategies.decoders().isEmpty(), "No decoders");
RSocketFactory.ClientRSocketFactory factory = RSocketFactory.connect();
this.rsocketConfigurers.forEach(configurer -> configurer.configure(factory));
if (rsocketStrategies.dataBufferFactory() instanceof NettyDataBufferFactory) {
factory.frameDecoder(PayloadDecoder.ZERO_COPY);
}
MimeType metaMimeType = this.metadataMimeType != null ? this.metadataMimeType :
MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
MimeType dataMimeType = getDataMimeType(rsocketStrategies);
factory.dataMimeType(dataMimeType.toString());
factory.metadataMimeType(metaMimeType.toString());
return getSetupPayload(dataMimeType, metaMimeType, rsocketStrategies)
.doOnNext(factory::setupPayload)
.then(Mono.defer(() ->
factory.transport(transport)
.start()
.map(rsocket -> new DefaultRSocketRequester(
rsocket, dataMimeType, metaMimeType, rsocketStrategies))
));
if (rsocketConnectorPresent) {
return getSetupPayload(dataMimeType, metaMimeType, rsocketStrategies)
.flatMap(payload ->
new RSocketConnectorHelper().connect(
this.rsocketConnectorConfigurers, this.rsocketFactoryConfigurers,
metaMimeType, dataMimeType, payload, rsocketStrategies, transport));
}
else {
return getSetupPayload(dataMimeType, metaMimeType, rsocketStrategies)
.flatMap(payload ->
new RSocketFactoryHelper().connect(
this.rsocketFactoryConfigurers, metaMimeType, dataMimeType, payload,
rsocketStrategies, transport));
}
}
private RSocketStrategies getRSocketStrategies() {
@ -234,7 +249,7 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
Object data = this.setupData;
boolean hasMetadata = (this.setupRoute != null || !CollectionUtils.isEmpty(this.setupMetadata));
if (!hasMetadata && data == null) {
return Mono.empty();
return Mono.just(EMPTY_SETUP_PAYLOAD);
}
Mono<DataBuffer> dataMono = Mono.empty();
@ -269,4 +284,69 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
.doOnDiscard(Payload.class, Payload::release);
}
private static class RSocketConnectorHelper {
@SuppressWarnings("deprecation")
Mono<RSocketRequester> connect(
List<RSocketConnectorConfigurer> connectorConfigurers,
List<ClientRSocketFactoryConfigurer> factoryConfigurers,
MimeType metaMimeType, MimeType dataMimeType, Payload setupPayload,
RSocketStrategies rsocketStrategies, ClientTransport transport) {
io.rsocket.core.RSocketConnector connector = io.rsocket.core.RSocketConnector.create();
connectorConfigurers.forEach(c -> c.configure(connector));
if (!factoryConfigurers.isEmpty()) {
io.rsocket.RSocketFactory.ClientRSocketFactory factory =
new io.rsocket.RSocketFactory.ClientRSocketFactory(connector);
factoryConfigurers.forEach(c -> c.configure(factory));
}
if (rsocketStrategies.dataBufferFactory() instanceof NettyDataBufferFactory) {
connector.payloadDecoder(PayloadDecoder.ZERO_COPY);
}
if (setupPayload != EMPTY_SETUP_PAYLOAD) {
connector.setupPayload(setupPayload);
}
return connector
.metadataMimeType(metaMimeType.toString())
.dataMimeType(dataMimeType.toString())
.connect(transport)
.map(rsocket -> new DefaultRSocketRequester(
rsocket, dataMimeType, metaMimeType, rsocketStrategies));
}
}
@SuppressWarnings("deprecation")
private static class RSocketFactoryHelper {
Mono<RSocketRequester> connect(
List<ClientRSocketFactoryConfigurer> configurers,
MimeType metaMimeType, MimeType dataMimeType, Payload setupPayload,
RSocketStrategies rsocketStrategies, ClientTransport transport) {
io.rsocket.RSocketFactory.ClientRSocketFactory factory = io.rsocket.RSocketFactory.connect();
configurers.forEach(c -> c.configure(factory));
if (rsocketStrategies.dataBufferFactory() instanceof NettyDataBufferFactory) {
factory.frameDecoder(PayloadDecoder.ZERO_COPY);
}
if (setupPayload != EMPTY_SETUP_PAYLOAD) {
factory.setupPayload(setupPayload);
}
return factory.metadataMimeType(metaMimeType.toString())
.dataMimeType(dataMimeType.toString())
.transport(transport)
.start()
.map(rsocket -> new DefaultRSocketRequester(
rsocket, dataMimeType, metaMimeType, rsocketStrategies));
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.rsocket;
import io.rsocket.core.RSocketConnector;
/**
* Strategy to apply configuration to an {@code RSocketConnector}. For use with
* {@link RSocketRequester.Builder#rsocketConnector RSocketRequester.Builder}.
*
* @author Rossen Stoyanchev
* @since 5.2.6
*/
@FunctionalInterface
public interface RSocketConnectorConfigurer {
/**
* Apply configuration to the given {@code RSocketConnector}.
*/
void configure(RSocketConnector connector);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -185,6 +185,26 @@ public interface RSocketRequester {
*/
RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer);
/**
* Callback to configure the {@code RSocketConnector} directly.
* <ul>
* <li>The data and metadata mime types cannot be set directly
* on the {@code RSocketConnector} and will be overridden. Use the
* shortcuts {@link #dataMimeType(MimeType)} and
* {@link #metadataMimeType(MimeType)} on this builder instead.
* <li>The frame decoder also cannot be set directly and instead is set
* to match the configured {@code DataBufferFactory}.
* <li>For the
* {@link io.rsocket.core.RSocketConnector#setupPayload(Payload)
* setupPayload}, consider using methods on this builder to specify the
* route, other metadata, and data as Object values to be encoded.
* <li>To configure client side responding, see
* {@link RSocketMessageHandler#responder(RSocketStrategies, Object...)}.
* </ul>
* @since 5.2.6
*/
RSocketRequester.Builder rsocketConnector(RSocketConnectorConfigurer configurer);
/**
* Callback to configure the {@code ClientRSocketFactory} directly.
* <ul>
@ -201,7 +221,11 @@ public interface RSocketRequester {
* <li>To configure client side responding, see
* {@link RSocketMessageHandler#clientResponder(RSocketStrategies, Object...)}.
* </ul>
* @deprecated as of 5.2.6 following the deprecation of
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory RSocketFactory.ClientRSocketFactory}
* in RSocket 1.0 RC7. Please, use {@link #rsocketConnector(RSocketConnectorConfigurer)}.
*/
@Deprecated
RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer);
/**

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -20,8 +20,6 @@ import java.util.List;
import java.util.function.Consumer;
import io.rsocket.Payload;
import io.rsocket.RSocketFactory.ClientRSocketFactory;
import io.rsocket.RSocketFactory.ServerRSocketFactory;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.ResolvableType;
@ -200,9 +198,10 @@ public interface RSocketStrategies {
* <a href="https://github.com/rsocket/rsocket-java#zero-copy">configured</a>
* for zero copy. For client setup, {@link RSocketRequester.Builder}
* adapts automatically to the {@code DataBufferFactory} configured
* here, and sets the frame decoder in {@link ClientRSocketFactory
* ClientRSocketFactory} accordingly. For server setup, the
* {@link ServerRSocketFactory ServerRSocketFactory} must be configured
* here, and sets the frame decoder in
* {@link io.rsocket.core.RSocketConnector RSocketConnector}
* accordingly. For server setup, the
* {@link io.rsocket.core.RSocketServer RSocketServer} must be configured
* accordingly for zero copy too.
* <p>If using {@link DefaultDataBufferFactory} instead, there is no
* need for related config changes in RSocket.

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -48,7 +48,6 @@ import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.reactive.MessageMappingMessageHandler;
import org.springframework.messaging.handler.annotation.reactive.PayloadMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler;
import org.springframework.messaging.rsocket.ClientRSocketFactoryConfigurer;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
@ -70,13 +69,13 @@ import org.springframework.util.StringUtils;
* {@link #setHandlerPredicate(Predicate) handlerPredicate}. Given an instance
* of this class, you can then use {@link #responder()} to obtain a
* {@link SocketAcceptor} adapter to register with the
* {@link io.rsocket.RSocketFactory}.
* {@link io.rsocket.core.RSocketServer}.
*
* <p>For client scenarios, possibly in the same process as a server, consider
* <p>For a client, possibly in the same process as a server, consider
* consider using the static factory method
* {@link #clientResponder(RSocketStrategies, Object...)} to obtain a client
* responder to be registered with an
* {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketFactory
* {@link #responder(RSocketStrategies, Object...)} to obtain a client
* responder to be registered via
* {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketConnector
* RSocketRequester.Builder}.
*
* <p>For {@code @MessageMapping} methods, this class automatically determines
@ -393,13 +392,13 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
}
/**
* Return an adapter for a {@link SocketAcceptor} that delegates to this
* {@code RSocketMessageHandler} instance. The adapter can be plugged in as a
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory#acceptor(SocketAcceptor) client} or
* {@link io.rsocket.RSocketFactory.ServerRSocketFactory#acceptor(SocketAcceptor) server}
* side responder.
* <p>The initial {@link ConnectionSetupPayload} can be handled with a
* {@link ConnectMapping @ConnectionMapping} method which can be asynchronous
* Return an RSocket {@link SocketAcceptor} backed by this
* {@code RSocketMessageHandler} instance that can be plugged in as a
* {@link io.rsocket.core.RSocketConnector#acceptor(SocketAcceptor) client} or
* {@link io.rsocket.core.RSocketServer#acceptor(SocketAcceptor) server}
* RSocket responder.
* <p>The initial {@link ConnectionSetupPayload} is handled through
* {@link ConnectMapping @ConnectionMapping} methods that can be asynchronous
* and return {@code Mono<Void>} with an error signal preventing the
* connection. Such a method can also start requests to the client but that
* must be done decoupled from handling and from the current thread.
@ -445,10 +444,61 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
return false;
}
/**
* Static factory method to create an RSocket {@link SocketAcceptor}
* backed by handlers with annotated methods. Effectively a shortcut for:
* <pre class="code">
* RSocketMessageHandler handler = new RSocketMessageHandler();
* handler.setHandlers(handlers);
* handler.setRSocketStrategies(strategies);
* handler.afterPropertiesSet();
*
* SocketAcceptor acceptor = handler.responder();
* </pre>
* <p>This is intended for programmatic creation and registration of a
* client-side responder. For example:
* <pre class="code">
* SocketAcceptor responder =
* RSocketMessageHandler.responder(strategies, new ClientHandler());
*
* RSocketRequester.builder()
* .rsocketConnector(connector -> connector.acceptor(responder))
* .connectTcp("localhost", server.address().getPort());
* </pre>
*
* <p>Note that the given handlers do not need to have any stereotype
* annotations such as {@code @Controller} which helps to avoid overlap with
* server side handlers that may be used in the same application. However,
* for more advanced scenarios, e.g. discovering handlers through a custom
* stereotype annotation, consider declaring {@code RSocketMessageHandler}
* as a bean, and then obtain the responder from it.
*
* @param strategies the strategies to set on the created
* {@code RSocketMessageHandler}
* @param candidateHandlers a list of Objects and/or Classes with annotated
* handler methods; used to call {@link #setHandlers(List)} with
* on the created {@code RSocketMessageHandler}
* @return a configurer that may be passed into
* {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketConnector}
* @since 5.2.6
*/
public static SocketAcceptor responder(RSocketStrategies strategies, Object... candidateHandlers) {
Assert.notEmpty(candidateHandlers, "No handlers");
List<Object> handlers = new ArrayList<>(candidateHandlers.length);
for (Object obj : candidateHandlers) {
handlers.add(obj instanceof Class ? BeanUtils.instantiateClass((Class<?>) obj) : obj);
}
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setHandlers(handlers);
handler.setRSocketStrategies(strategies);
handler.afterPropertiesSet();
return handler.responder();
}
/**
* Static factory method for a configurer of a client side responder with
* annotated handler methods. This is intended to be passed into
* {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketFactory(ClientRSocketFactoryConfigurer)}.
* {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketFactory}.
* <p>In effect a shortcut to create and initialize
* {@code RSocketMessageHandler} with the given strategies and handlers,
* use {@link #responder()} to obtain the responder, and plug that into
@ -462,9 +512,13 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
* handler methods; used to call {@link #setHandlers(List)} with
* on the created {@code RSocketMessageHandler}
* @return a configurer that may be passed into
* {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketFactory(ClientRSocketFactoryConfigurer)}
* {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketFactory}
* @deprecated as of 5.2.6 following the deprecation of
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory RSocketFactory.ClientRSocketFactory}
* in RSocket 1.0 RC7.
*/
public static ClientRSocketFactoryConfigurer clientResponder(
@Deprecated
public static org.springframework.messaging.rsocket.ClientRSocketFactoryConfigurer clientResponder(
RSocketStrategies strategies, Object... candidateHandlers) {
Assert.notEmpty(candidateHandlers, "No handlers");
@ -473,12 +527,14 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
handlers.add(obj instanceof Class ? BeanUtils.instantiateClass((Class<?>) obj) : obj);
}
return rsocketFactory -> {
return factory -> {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setHandlers(handlers);
handler.setRSocketStrategies(strategies);
handler.afterPropertiesSet();
rsocketFactory.acceptor(handler.responder());
factory.acceptor(handler.responder());
};
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -28,6 +28,8 @@ import io.netty.buffer.ByteBufAllocator;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.DuplexConnection;
import io.rsocket.RSocketFactory;
import io.rsocket.core.DefaultConnectionSetupPayload;
import io.rsocket.core.RSocketConnector;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.metadata.WellKnownMimeType;
import io.rsocket.transport.ClientTransport;
@ -68,7 +70,7 @@ public class DefaultRSocketRequesterBuilderTests {
private final MockConnection connection = new MockConnection();
private final TestRSocketFactoryConfigurer rsocketFactoryConfigurer = new TestRSocketFactoryConfigurer();
private final TestRSocketConnectorConfigurer connectorConfigurer = new TestRSocketConnectorConfigurer();
@BeforeEach
@ -80,32 +82,35 @@ public class DefaultRSocketRequesterBuilderTests {
@Test
@SuppressWarnings("unchecked")
public void rsocketFactoryConfigurerAppliesAtSubscription() {
public void rsocketConnectorConfigurerAppliesAtSubscription() {
Consumer<RSocketStrategies.Builder> strategiesConfigurer = mock(Consumer.class);
RSocketRequester.builder()
.rsocketFactory(this.rsocketFactoryConfigurer)
.rsocketConnector(this.connectorConfigurer)
.rsocketStrategies(strategiesConfigurer)
.connect(this.transport);
verifyNoInteractions(this.transport);
assertThat(this.rsocketFactoryConfigurer.rsocketFactory()).isNull();
assertThat(this.connectorConfigurer.connector()).isNull();
}
@Test
@SuppressWarnings("unchecked")
public void rsocketFactoryConfigurer() {
Consumer<RSocketStrategies.Builder> rsocketStrategiesConfigurer = mock(Consumer.class);
@SuppressWarnings({"unchecked", "deprecation"})
public void rsocketConnectorConfigurer() {
ClientRSocketFactoryConfigurer factoryConfigurer = mock(ClientRSocketFactoryConfigurer.class);
Consumer<RSocketStrategies.Builder> strategiesConfigurer = mock(Consumer.class);
RSocketRequester.builder()
.rsocketFactory(this.rsocketFactoryConfigurer)
.rsocketStrategies(rsocketStrategiesConfigurer)
.rsocketConnector(this.connectorConfigurer)
.rsocketFactory(factoryConfigurer)
.rsocketStrategies(strategiesConfigurer)
.connect(this.transport)
.block();
// RSocketStrategies and RSocketFactory configurers should have been called
// RSocketStrategies and RSocketConnector configurers should have been called
verify(this.transport).connect(anyInt());
verify(rsocketStrategiesConfigurer).accept(any(RSocketStrategies.Builder.class));
assertThat(this.rsocketFactoryConfigurer.rsocketFactory()).isNotNull();
verify(strategiesConfigurer).accept(any(RSocketStrategies.Builder.class));
verify(factoryConfigurer).configure(any(RSocketFactory.ClientRSocketFactory.class));
assertThat(this.connectorConfigurer.connector()).isNotNull();
}
@Test
@ -143,7 +148,7 @@ public class DefaultRSocketRequesterBuilderTests {
.block();
ConnectionSetupPayload setupPayload = Mono.from(this.connection.sentFrames())
.map(ConnectionSetupPayload::create)
.map(DefaultConnectionSetupPayload::new)
.block();
assertThat(setupPayload.dataMimeType()).isEqualTo("application/json");
@ -151,22 +156,22 @@ public class DefaultRSocketRequesterBuilderTests {
}
@Test
public void mimeTypesCannotBeChangedAtRSocketFactoryLevel() {
public void mimeTypesCannotBeChangedAtRSocketConnectorLevel() {
MimeType dataMimeType = MimeTypeUtils.APPLICATION_JSON;
MimeType metaMimeType = MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString());
RSocketRequester requester = RSocketRequester.builder()
.metadataMimeType(metaMimeType)
.dataMimeType(dataMimeType)
.rsocketFactory(factory -> {
factory.metadataMimeType("text/plain");
factory.dataMimeType("application/xml");
.rsocketConnector(connector -> {
connector.metadataMimeType("text/plain");
connector.dataMimeType("application/xml");
})
.connect(this.transport)
.block();
ConnectionSetupPayload setupPayload = Mono.from(this.connection.sentFrames())
.map(ConnectionSetupPayload::create)
.map(DefaultConnectionSetupPayload::new)
.block();
assertThat(setupPayload.dataMimeType()).isEqualTo(dataMimeType.toString());
@ -186,7 +191,7 @@ public class DefaultRSocketRequesterBuilderTests {
.block();
ConnectionSetupPayload setupPayload = Mono.from(this.connection.sentFrames())
.map(ConnectionSetupPayload::create)
.map(DefaultConnectionSetupPayload::new)
.block();
assertThat(setupPayload.getMetadataUtf8()).isEqualTo("toA");
@ -210,7 +215,7 @@ public class DefaultRSocketRequesterBuilderTests {
.block();
ConnectionSetupPayload payload = Mono.from(this.connection.sentFrames())
.map(ConnectionSetupPayload::create)
.map(DefaultConnectionSetupPayload::new)
.block();
MimeType compositeMimeType =
@ -228,11 +233,11 @@ public class DefaultRSocketRequesterBuilderTests {
@Test
public void frameDecoderMatchesDataBufferFactory() throws Exception {
testFrameDecoder(new NettyDataBufferFactory(ByteBufAllocator.DEFAULT), PayloadDecoder.ZERO_COPY);
testFrameDecoder(new DefaultDataBufferFactory(), PayloadDecoder.DEFAULT);
testPayloadDecoder(new NettyDataBufferFactory(ByteBufAllocator.DEFAULT), PayloadDecoder.ZERO_COPY);
testPayloadDecoder(new DefaultDataBufferFactory(), PayloadDecoder.DEFAULT);
}
private void testFrameDecoder(DataBufferFactory bufferFactory, PayloadDecoder frameDecoder)
private void testPayloadDecoder(DataBufferFactory bufferFactory, PayloadDecoder payloadDecoder)
throws NoSuchFieldException {
RSocketStrategies strategies = RSocketStrategies.builder()
@ -241,17 +246,17 @@ public class DefaultRSocketRequesterBuilderTests {
RSocketRequester.builder()
.rsocketStrategies(strategies)
.rsocketFactory(this.rsocketFactoryConfigurer)
.rsocketConnector(this.connectorConfigurer)
.connect(this.transport)
.block();
RSocketFactory.ClientRSocketFactory factory = this.rsocketFactoryConfigurer.rsocketFactory();
assertThat(factory).isNotNull();
RSocketConnector connector = this.connectorConfigurer.connector();
assertThat(connector).isNotNull();
Field field = RSocketFactory.ClientRSocketFactory.class.getDeclaredField("payloadDecoder");
Field field = RSocketConnector.class.getDeclaredField("payloadDecoder");
ReflectionUtils.makeAccessible(field);
PayloadDecoder decoder = (PayloadDecoder) ReflectionUtils.getField(field, factory);
assertThat(decoder).isSameAs(frameDecoder);
PayloadDecoder decoder = (PayloadDecoder) ReflectionUtils.getField(field, connector);
assertThat(decoder).isSameAs(payloadDecoder);
}
@ -286,19 +291,17 @@ public class DefaultRSocketRequesterBuilderTests {
}
static class TestRSocketFactoryConfigurer implements ClientRSocketFactoryConfigurer {
static class TestRSocketConnectorConfigurer implements RSocketConnectorConfigurer {
private RSocketFactory.ClientRSocketFactory rsocketFactory;
private RSocketConnector connector;
RSocketFactory.ClientRSocketFactory rsocketFactory() {
return this.rsocketFactory;
RSocketConnector connector() {
return this.connector;
}
@Override
public void configure(RSocketFactory.ClientRSocketFactory rsocketFactory) {
this.rsocketFactory = rsocketFactory;
public void configure(RSocketConnector connector) {
this.connector = connector;
}
}
@ -307,7 +310,6 @@ public class DefaultRSocketRequesterBuilderTests {
private final MimeType mimeType;
TestJsonDecoder(MimeType mimeType) {
this.mimeType = mimeType;
}
@ -344,5 +346,4 @@ public class DefaultRSocketRequesterBuilderTests {
throw new UnsupportedOperationException();
}
}
}

View File

@ -25,8 +25,8 @@ import io.netty.buffer.PooledByteBufAllocator;
import io.netty.util.ReferenceCounted;
import io.rsocket.AbstractRSocket;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketServer;
import io.rsocket.exceptions.ApplicationErrorException;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.plugins.RSocketInterceptor;
@ -80,16 +80,14 @@ class RSocketBufferLeakTests {
RSocketMessageHandler messageHandler = context.getBean(RSocketMessageHandler.class);
SocketAcceptor responder = messageHandler.responder();
server = RSocketFactory.receive()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.addResponderPlugin(payloadInterceptor) // intercept responding
.acceptor(responder)
.transport(TcpServerTransport.create("localhost", 7000))
.start()
server = RSocketServer.create(responder)
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.interceptors(registry -> registry.forResponder(payloadInterceptor)) // intercept responding
.bind(TcpServerTransport.create("localhost", 7000))
.block();
requester = RSocketRequester.builder()
.rsocketFactory(factory -> factory.addRequesterPlugin(payloadInterceptor))
.rsocketConnector(conn -> conn.interceptors(registry -> registry.forRequester(payloadInterceptor)))
.rsocketStrategies(context.getBean(RSocketStrategies.class))
.connectTcp("localhost", 7000)
.block();
@ -157,6 +155,18 @@ class RSocketBufferLeakTests {
StepVerifier.create(result).expectError(ApplicationErrorException.class).verify(Duration.ofSeconds(5));
}
@Test
public void echoChannel() {
Flux<String> result = requester.route("echo-channel")
.data(Flux.range(1, 10).map(i -> "Hello " + i), String.class)
.retrieveFlux(String.class);
StepVerifier.create(result)
.expectNext("Hello 1 async").expectNextCount(8).expectNext("Hello 10 async")
.thenCancel() // https://github.com/rsocket/rsocket-java/issues/613
.verify(Duration.ofSeconds(5));
}
@Controller
static class ServerController {
@ -192,6 +202,11 @@ class RSocketBufferLeakTests {
Mono<String> ignoreInput() {
return Mono.delay(Duration.ofMillis(10)).map(l -> "bar");
}
@MessageMapping("echo-channel")
Flux<String> echoChannel(Flux<String> payloads) {
return payloads.delayElements(Duration.ofMillis(10)).map(payload -> payload + " async");
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,8 +18,8 @@ package org.springframework.messaging.rsocket;
import java.time.Duration;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketServer;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.metadata.WellKnownMimeType;
import io.rsocket.transport.netty.server.CloseableChannel;
@ -71,12 +71,10 @@ public class RSocketClientToServerIntegrationTests {
RSocketMessageHandler messageHandler = context.getBean(RSocketMessageHandler.class);
SocketAcceptor responder = messageHandler.responder();
server = RSocketFactory.receive()
.addResponderPlugin(interceptor)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.acceptor(responder)
.transport(TcpServerTransport.create("localhost", 7000))
.start()
server = RSocketServer.create(responder)
.interceptors(registry -> registry.forResponder(interceptor))
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.bind(TcpServerTransport.create("localhost", 7000))
.block();
requester = RSocketRequester.builder()

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,8 +18,8 @@ package org.springframework.messaging.rsocket;
import java.time.Duration;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketServer;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
@ -62,11 +62,9 @@ public class RSocketServerToClientIntegrationTests {
RSocketMessageHandler messageHandler = context.getBean(RSocketMessageHandler.class);
SocketAcceptor responder = messageHandler.responder();
server = RSocketFactory.receive()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.acceptor(responder)
.transport(TcpServerTransport.create("localhost", 0))
.start()
server = RSocketServer.create(responder)
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.bind(TcpServerTransport.create("localhost", 0))
.block();
}
@ -99,23 +97,21 @@ public class RSocketServerToClientIntegrationTests {
private static void connectAndRunTest(String connectionRoute) {
ServerController serverController = context.getBean(ServerController.class);
serverController.reset();
context.getBean(ServerController.class).reset();
RSocketStrategies strategies = context.getBean(RSocketStrategies.class);
ClientRSocketFactoryConfigurer clientResponderConfigurer =
RSocketMessageHandler.clientResponder(strategies, new ClientHandler());
SocketAcceptor responder = RSocketMessageHandler.responder(strategies, new ClientHandler());
RSocketRequester requester = null;
try {
requester = RSocketRequester.builder()
.setupRoute(connectionRoute)
.rsocketStrategies(strategies)
.rsocketFactory(clientResponderConfigurer)
.rsocketConnector(connector -> connector.acceptor(responder))
.connectTcp("localhost", server.address().getPort())
.block();
serverController.await(Duration.ofSeconds(5));
context.getBean(ServerController.class).await(Duration.ofSeconds(5));
}
finally {
if (requester != null) {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,7 +17,7 @@
package org.springframework.messaging.rsocket
import io.netty.buffer.PooledByteBufAllocator
import io.rsocket.RSocketFactory
import io.rsocket.core.RSocketServer
import io.rsocket.frame.decoder.PayloadDecoder
import io.rsocket.transport.netty.server.CloseableChannel
import io.rsocket.transport.netty.server.TcpServerTransport
@ -257,15 +257,13 @@ class RSocketClientToServerCoroutinesIntegrationTests {
fun setupOnce() {
context = AnnotationConfigApplicationContext(ServerConfig::class.java)
server = RSocketFactory.receive()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.acceptor(context.getBean(RSocketMessageHandler::class.java).responder())
.transport(TcpServerTransport.create("localhost", 7000))
.start()
server = RSocketServer.create(context.getBean(RSocketMessageHandler::class.java).responder())
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.bind(TcpServerTransport.create("localhost", 7000))
.block()!!
requester = RSocketRequester.builder()
.rsocketFactory { factory -> factory.frameDecoder(PayloadDecoder.ZERO_COPY) }
.rsocketConnector { connector -> connector.payloadDecoder(PayloadDecoder.ZERO_COPY) }
.rsocketStrategies(context.getBean(RSocketStrategies::class.java))
.connectTcp("localhost", 7000)
.block()!!