From ed8c61a8520a4a0015d4a7b104ae1e03ef698bf5 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 20 Apr 2020 09:42:14 +0100 Subject: [PATCH] Upgrade to RSocket 1.0 RC7 snapshots --- build.gradle | 3 +- .../ClientRSocketFactoryConfigurer.java | 10 +- .../DefaultRSocketRequesterBuilder.java | 124 ++++++++++++++---- .../rsocket/RSocketConnectorConfigurer.java | 35 +++++ .../messaging/rsocket/RSocketRequester.java | 26 +++- .../messaging/rsocket/RSocketStrategies.java | 11 +- .../support/RSocketMessageHandler.java | 94 ++++++++++--- .../DefaultRSocketRequesterBuilderTests.java | 81 ++++++------ .../rsocket/RSocketBufferLeakTests.java | 31 +++-- ...RSocketClientToServerIntegrationTests.java | 14 +- ...RSocketServerToClientIntegrationTests.java | 22 ++-- ...lientToServerCoroutinesIntegrationTests.kt | 14 +- 12 files changed, 335 insertions(+), 130 deletions(-) create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketConnectorConfigurer.java diff --git a/build.gradle b/build.gradle index bb431bcf5c..ddaef1a2ae 100644 --- a/build.gradle +++ b/build.gradle @@ -29,7 +29,7 @@ configure(allprojects) { project -> mavenBom "com.fasterxml.jackson:jackson-bom:2.10.3" mavenBom "io.netty:netty-bom:4.1.48.Final" mavenBom "io.projectreactor:reactor-bom:Dysprosium-BUILD-SNAPSHOT" - mavenBom "io.rsocket:rsocket-bom:1.0.0-RC6" + mavenBom "io.rsocket:rsocket-bom:1.0.0-RC7-SNAPSHOT" mavenBom "org.eclipse.jetty:jetty-bom:9.4.28.v20200408" mavenBom "org.jetbrains.kotlin:kotlin-bom:1.3.72" mavenBom "org.jetbrains.kotlinx:kotlinx-coroutines-bom:1.3.5" @@ -280,6 +280,7 @@ configure(allprojects) { project -> mavenCentral() maven { url "https://repo.spring.io/libs-spring-framework-build" } maven { url "https://repo.spring.io/snapshot" } + maven { url "https://oss.jfrog.org/artifactory/oss-snapshot-local" } // RSocket } } configurations.all { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/ClientRSocketFactoryConfigurer.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/ClientRSocketFactoryConfigurer.java index e297b8dab2..0af2004508 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/ClientRSocketFactoryConfigurer.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/ClientRSocketFactoryConfigurer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,8 +15,6 @@ */ package org.springframework.messaging.rsocket; -import io.rsocket.RSocketFactory; - /** * Strategy to apply configuration to a client side {@code RSocketFactory}. * that's being prepared by {@link RSocketRequester.Builder} to connect @@ -24,13 +22,17 @@ import io.rsocket.RSocketFactory; * * @author Rossen Stoyanchev * @since 5.2 + * @deprecated as of 5.2.6 following the deprecation of + * {@link io.rsocket.RSocketFactory.ClientRSocketFactory RSocketFactory.ClientRSocketFactory} + * in RSocket 1.0 RC7. Please, use {@link RSocketConnectorConfigurer}. */ @FunctionalInterface +@Deprecated public interface ClientRSocketFactoryConfigurer { /** * Apply configuration to the given {@code ClientRSocketFactory}. */ - void configure(RSocketFactory.ClientRSocketFactory rsocketFactory); + void configure(io.rsocket.RSocketFactory.ClientRSocketFactory rsocketFactory); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java index 24d8026aa5..1b117c941e 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,12 +25,12 @@ import java.util.Map; import java.util.function.Consumer; import io.rsocket.Payload; -import io.rsocket.RSocketFactory; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.metadata.WellKnownMimeType; import io.rsocket.transport.ClientTransport; import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.client.WebsocketClientTransport; +import io.rsocket.util.DefaultPayload; import reactor.core.publisher.Mono; import org.springframework.core.ReactiveAdapter; @@ -43,6 +43,7 @@ import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.lang.Nullable; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; import org.springframework.util.CollectionUtils; import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; @@ -56,10 +57,17 @@ import org.springframework.util.MimeTypeUtils; */ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { + private final static boolean rsocketConnectorPresent = + ClassUtils.isPresent("io.rsocket.core.RSocketConnector", + DefaultRSocketRequesterBuilder.class.getClassLoader()); + + private static final Map HINTS = Collections.emptyMap(); private static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; + private static final Payload EMPTY_SETUP_PAYLOAD = DefaultPayload.create(EMPTY_BYTE_ARRAY); + @Nullable private MimeType dataMimeType; @@ -84,7 +92,10 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { private List> strategiesConfigurers = new ArrayList<>(); - private List rsocketConfigurers = new ArrayList<>(); + private List rsocketConnectorConfigurers = new ArrayList<>(); + + @SuppressWarnings("deprecation") + private List rsocketFactoryConfigurers = new ArrayList<>(); @Override @@ -133,8 +144,15 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { } @Override + public RSocketRequester.Builder rsocketConnector(RSocketConnectorConfigurer configurer) { + this.rsocketConnectorConfigurers.add(configurer); + return this; + } + + @Override + @Deprecated public RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer) { - this.rsocketConfigurers.add(configurer); + this.rsocketFactoryConfigurers.add(configurer); return this; } @@ -164,28 +182,25 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { Assert.isTrue(!rsocketStrategies.encoders().isEmpty(), "No encoders"); Assert.isTrue(!rsocketStrategies.decoders().isEmpty(), "No decoders"); - RSocketFactory.ClientRSocketFactory factory = RSocketFactory.connect(); - this.rsocketConfigurers.forEach(configurer -> configurer.configure(factory)); - - if (rsocketStrategies.dataBufferFactory() instanceof NettyDataBufferFactory) { - factory.frameDecoder(PayloadDecoder.ZERO_COPY); - } - MimeType metaMimeType = this.metadataMimeType != null ? this.metadataMimeType : MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()); MimeType dataMimeType = getDataMimeType(rsocketStrategies); - factory.dataMimeType(dataMimeType.toString()); - factory.metadataMimeType(metaMimeType.toString()); - return getSetupPayload(dataMimeType, metaMimeType, rsocketStrategies) - .doOnNext(factory::setupPayload) - .then(Mono.defer(() -> - factory.transport(transport) - .start() - .map(rsocket -> new DefaultRSocketRequester( - rsocket, dataMimeType, metaMimeType, rsocketStrategies)) - )); + if (rsocketConnectorPresent) { + return getSetupPayload(dataMimeType, metaMimeType, rsocketStrategies) + .flatMap(payload -> + new RSocketConnectorHelper().connect( + this.rsocketConnectorConfigurers, this.rsocketFactoryConfigurers, + metaMimeType, dataMimeType, payload, rsocketStrategies, transport)); + } + else { + return getSetupPayload(dataMimeType, metaMimeType, rsocketStrategies) + .flatMap(payload -> + new RSocketFactoryHelper().connect( + this.rsocketFactoryConfigurers, metaMimeType, dataMimeType, payload, + rsocketStrategies, transport)); + } } private RSocketStrategies getRSocketStrategies() { @@ -234,7 +249,7 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { Object data = this.setupData; boolean hasMetadata = (this.setupRoute != null || !CollectionUtils.isEmpty(this.setupMetadata)); if (!hasMetadata && data == null) { - return Mono.empty(); + return Mono.just(EMPTY_SETUP_PAYLOAD); } Mono dataMono = Mono.empty(); @@ -269,4 +284,69 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { .doOnDiscard(Payload.class, Payload::release); } + + private static class RSocketConnectorHelper { + + @SuppressWarnings("deprecation") + Mono connect( + List connectorConfigurers, + List factoryConfigurers, + MimeType metaMimeType, MimeType dataMimeType, Payload setupPayload, + RSocketStrategies rsocketStrategies, ClientTransport transport) { + + io.rsocket.core.RSocketConnector connector = io.rsocket.core.RSocketConnector.create(); + connectorConfigurers.forEach(c -> c.configure(connector)); + + if (!factoryConfigurers.isEmpty()) { + io.rsocket.RSocketFactory.ClientRSocketFactory factory = + new io.rsocket.RSocketFactory.ClientRSocketFactory(connector); + factoryConfigurers.forEach(c -> c.configure(factory)); + } + + if (rsocketStrategies.dataBufferFactory() instanceof NettyDataBufferFactory) { + connector.payloadDecoder(PayloadDecoder.ZERO_COPY); + } + + if (setupPayload != EMPTY_SETUP_PAYLOAD) { + connector.setupPayload(setupPayload); + } + + return connector + .metadataMimeType(metaMimeType.toString()) + .dataMimeType(dataMimeType.toString()) + .connect(transport) + .map(rsocket -> new DefaultRSocketRequester( + rsocket, dataMimeType, metaMimeType, rsocketStrategies)); + } + } + + + @SuppressWarnings("deprecation") + private static class RSocketFactoryHelper { + + Mono connect( + List configurers, + MimeType metaMimeType, MimeType dataMimeType, Payload setupPayload, + RSocketStrategies rsocketStrategies, ClientTransport transport) { + + io.rsocket.RSocketFactory.ClientRSocketFactory factory = io.rsocket.RSocketFactory.connect(); + configurers.forEach(c -> c.configure(factory)); + + if (rsocketStrategies.dataBufferFactory() instanceof NettyDataBufferFactory) { + factory.frameDecoder(PayloadDecoder.ZERO_COPY); + } + + if (setupPayload != EMPTY_SETUP_PAYLOAD) { + factory.setupPayload(setupPayload); + } + + return factory.metadataMimeType(metaMimeType.toString()) + .dataMimeType(dataMimeType.toString()) + .transport(transport) + .start() + .map(rsocket -> new DefaultRSocketRequester( + rsocket, dataMimeType, metaMimeType, rsocketStrategies)); + } + } + } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketConnectorConfigurer.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketConnectorConfigurer.java new file mode 100644 index 0000000000..f224764248 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketConnectorConfigurer.java @@ -0,0 +1,35 @@ +/* + * Copyright 2002-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.messaging.rsocket; + +import io.rsocket.core.RSocketConnector; + +/** + * Strategy to apply configuration to an {@code RSocketConnector}. For use with + * {@link RSocketRequester.Builder#rsocketConnector RSocketRequester.Builder}. + * + * @author Rossen Stoyanchev + * @since 5.2.6 + */ +@FunctionalInterface +public interface RSocketConnectorConfigurer { + + /** + * Apply configuration to the given {@code RSocketConnector}. + */ + void configure(RSocketConnector connector); + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java index 3cccb61408..78cd8b3f02 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -185,6 +185,26 @@ public interface RSocketRequester { */ RSocketRequester.Builder rsocketStrategies(Consumer configurer); + /** + * Callback to configure the {@code RSocketConnector} directly. + *
    + *
  • The data and metadata mime types cannot be set directly + * on the {@code RSocketConnector} and will be overridden. Use the + * shortcuts {@link #dataMimeType(MimeType)} and + * {@link #metadataMimeType(MimeType)} on this builder instead. + *
  • The frame decoder also cannot be set directly and instead is set + * to match the configured {@code DataBufferFactory}. + *
  • For the + * {@link io.rsocket.core.RSocketConnector#setupPayload(Payload) + * setupPayload}, consider using methods on this builder to specify the + * route, other metadata, and data as Object values to be encoded. + *
  • To configure client side responding, see + * {@link RSocketMessageHandler#responder(RSocketStrategies, Object...)}. + *
+ * @since 5.2.6 + */ + RSocketRequester.Builder rsocketConnector(RSocketConnectorConfigurer configurer); + /** * Callback to configure the {@code ClientRSocketFactory} directly. *
    @@ -201,7 +221,11 @@ public interface RSocketRequester { *
  • To configure client side responding, see * {@link RSocketMessageHandler#clientResponder(RSocketStrategies, Object...)}. *
+ * @deprecated as of 5.2.6 following the deprecation of + * {@link io.rsocket.RSocketFactory.ClientRSocketFactory RSocketFactory.ClientRSocketFactory} + * in RSocket 1.0 RC7. Please, use {@link #rsocketConnector(RSocketConnectorConfigurer)}. */ + @Deprecated RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer); /** diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java index bd3b81c428..332603561a 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,8 +20,6 @@ import java.util.List; import java.util.function.Consumer; import io.rsocket.Payload; -import io.rsocket.RSocketFactory.ClientRSocketFactory; -import io.rsocket.RSocketFactory.ServerRSocketFactory; import org.springframework.core.ReactiveAdapterRegistry; import org.springframework.core.ResolvableType; @@ -200,9 +198,10 @@ public interface RSocketStrategies { * configured * for zero copy. For client setup, {@link RSocketRequester.Builder} * adapts automatically to the {@code DataBufferFactory} configured - * here, and sets the frame decoder in {@link ClientRSocketFactory - * ClientRSocketFactory} accordingly. For server setup, the - * {@link ServerRSocketFactory ServerRSocketFactory} must be configured + * here, and sets the frame decoder in + * {@link io.rsocket.core.RSocketConnector RSocketConnector} + * accordingly. For server setup, the + * {@link io.rsocket.core.RSocketServer RSocketServer} must be configured * accordingly for zero copy too. *

If using {@link DefaultDataBufferFactory} instead, there is no * need for related config changes in RSocket. diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java index 17aa48e99b..919e392d8d 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,7 +48,6 @@ import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.handler.annotation.reactive.MessageMappingMessageHandler; import org.springframework.messaging.handler.annotation.reactive.PayloadMethodArgumentResolver; import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler; -import org.springframework.messaging.rsocket.ClientRSocketFactoryConfigurer; import org.springframework.messaging.rsocket.MetadataExtractor; import org.springframework.messaging.rsocket.RSocketRequester; import org.springframework.messaging.rsocket.RSocketStrategies; @@ -70,13 +69,13 @@ import org.springframework.util.StringUtils; * {@link #setHandlerPredicate(Predicate) handlerPredicate}. Given an instance * of this class, you can then use {@link #responder()} to obtain a * {@link SocketAcceptor} adapter to register with the - * {@link io.rsocket.RSocketFactory}. + * {@link io.rsocket.core.RSocketServer}. * - *

For client scenarios, possibly in the same process as a server, consider + *

For a client, possibly in the same process as a server, consider * consider using the static factory method - * {@link #clientResponder(RSocketStrategies, Object...)} to obtain a client - * responder to be registered with an - * {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketFactory + * {@link #responder(RSocketStrategies, Object...)} to obtain a client + * responder to be registered via + * {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketConnector * RSocketRequester.Builder}. * *

For {@code @MessageMapping} methods, this class automatically determines @@ -393,13 +392,13 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { } /** - * Return an adapter for a {@link SocketAcceptor} that delegates to this - * {@code RSocketMessageHandler} instance. The adapter can be plugged in as a - * {@link io.rsocket.RSocketFactory.ClientRSocketFactory#acceptor(SocketAcceptor) client} or - * {@link io.rsocket.RSocketFactory.ServerRSocketFactory#acceptor(SocketAcceptor) server} - * side responder. - *

The initial {@link ConnectionSetupPayload} can be handled with a - * {@link ConnectMapping @ConnectionMapping} method which can be asynchronous + * Return an RSocket {@link SocketAcceptor} backed by this + * {@code RSocketMessageHandler} instance that can be plugged in as a + * {@link io.rsocket.core.RSocketConnector#acceptor(SocketAcceptor) client} or + * {@link io.rsocket.core.RSocketServer#acceptor(SocketAcceptor) server} + * RSocket responder. + *

The initial {@link ConnectionSetupPayload} is handled through + * {@link ConnectMapping @ConnectionMapping} methods that can be asynchronous * and return {@code Mono} with an error signal preventing the * connection. Such a method can also start requests to the client but that * must be done decoupled from handling and from the current thread. @@ -445,10 +444,61 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { return false; } + /** + * Static factory method to create an RSocket {@link SocketAcceptor} + * backed by handlers with annotated methods. Effectively a shortcut for: + *

+	 * RSocketMessageHandler handler = new RSocketMessageHandler();
+	 * handler.setHandlers(handlers);
+	 * handler.setRSocketStrategies(strategies);
+	 * handler.afterPropertiesSet();
+	 *
+	 * SocketAcceptor acceptor = handler.responder();
+	 * 
+ *

This is intended for programmatic creation and registration of a + * client-side responder. For example: + *

+	 * SocketAcceptor responder =
+	 *         RSocketMessageHandler.responder(strategies, new ClientHandler());
+	 *
+	 * RSocketRequester.builder()
+	 *         .rsocketConnector(connector -> connector.acceptor(responder))
+	 *         .connectTcp("localhost", server.address().getPort());
+	 * 
+ * + *

Note that the given handlers do not need to have any stereotype + * annotations such as {@code @Controller} which helps to avoid overlap with + * server side handlers that may be used in the same application. However, + * for more advanced scenarios, e.g. discovering handlers through a custom + * stereotype annotation, consider declaring {@code RSocketMessageHandler} + * as a bean, and then obtain the responder from it. + * + * @param strategies the strategies to set on the created + * {@code RSocketMessageHandler} + * @param candidateHandlers a list of Objects and/or Classes with annotated + * handler methods; used to call {@link #setHandlers(List)} with + * on the created {@code RSocketMessageHandler} + * @return a configurer that may be passed into + * {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketConnector} + * @since 5.2.6 + */ + public static SocketAcceptor responder(RSocketStrategies strategies, Object... candidateHandlers) { + Assert.notEmpty(candidateHandlers, "No handlers"); + List handlers = new ArrayList<>(candidateHandlers.length); + for (Object obj : candidateHandlers) { + handlers.add(obj instanceof Class ? BeanUtils.instantiateClass((Class) obj) : obj); + } + RSocketMessageHandler handler = new RSocketMessageHandler(); + handler.setHandlers(handlers); + handler.setRSocketStrategies(strategies); + handler.afterPropertiesSet(); + return handler.responder(); + } + /** * Static factory method for a configurer of a client side responder with * annotated handler methods. This is intended to be passed into - * {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketFactory(ClientRSocketFactoryConfigurer)}. + * {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketFactory}. *

In effect a shortcut to create and initialize * {@code RSocketMessageHandler} with the given strategies and handlers, * use {@link #responder()} to obtain the responder, and plug that into @@ -462,9 +512,13 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { * handler methods; used to call {@link #setHandlers(List)} with * on the created {@code RSocketMessageHandler} * @return a configurer that may be passed into - * {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketFactory(ClientRSocketFactoryConfigurer)} + * {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketFactory} + * @deprecated as of 5.2.6 following the deprecation of + * {@link io.rsocket.RSocketFactory.ClientRSocketFactory RSocketFactory.ClientRSocketFactory} + * in RSocket 1.0 RC7. */ - public static ClientRSocketFactoryConfigurer clientResponder( + @Deprecated + public static org.springframework.messaging.rsocket.ClientRSocketFactoryConfigurer clientResponder( RSocketStrategies strategies, Object... candidateHandlers) { Assert.notEmpty(candidateHandlers, "No handlers"); @@ -473,12 +527,14 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { handlers.add(obj instanceof Class ? BeanUtils.instantiateClass((Class) obj) : obj); } - return rsocketFactory -> { + return factory -> { RSocketMessageHandler handler = new RSocketMessageHandler(); handler.setHandlers(handlers); handler.setRSocketStrategies(strategies); handler.afterPropertiesSet(); - rsocketFactory.acceptor(handler.responder()); + factory.acceptor(handler.responder()); }; } } + + diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java index a21f4f91d1..000ebcff0f 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,8 @@ import io.netty.buffer.ByteBufAllocator; import io.rsocket.ConnectionSetupPayload; import io.rsocket.DuplexConnection; import io.rsocket.RSocketFactory; +import io.rsocket.core.DefaultConnectionSetupPayload; +import io.rsocket.core.RSocketConnector; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.metadata.WellKnownMimeType; import io.rsocket.transport.ClientTransport; @@ -68,7 +70,7 @@ public class DefaultRSocketRequesterBuilderTests { private final MockConnection connection = new MockConnection(); - private final TestRSocketFactoryConfigurer rsocketFactoryConfigurer = new TestRSocketFactoryConfigurer(); + private final TestRSocketConnectorConfigurer connectorConfigurer = new TestRSocketConnectorConfigurer(); @BeforeEach @@ -80,32 +82,35 @@ public class DefaultRSocketRequesterBuilderTests { @Test @SuppressWarnings("unchecked") - public void rsocketFactoryConfigurerAppliesAtSubscription() { + public void rsocketConnectorConfigurerAppliesAtSubscription() { Consumer strategiesConfigurer = mock(Consumer.class); RSocketRequester.builder() - .rsocketFactory(this.rsocketFactoryConfigurer) + .rsocketConnector(this.connectorConfigurer) .rsocketStrategies(strategiesConfigurer) .connect(this.transport); verifyNoInteractions(this.transport); - assertThat(this.rsocketFactoryConfigurer.rsocketFactory()).isNull(); + assertThat(this.connectorConfigurer.connector()).isNull(); } @Test - @SuppressWarnings("unchecked") - public void rsocketFactoryConfigurer() { - Consumer rsocketStrategiesConfigurer = mock(Consumer.class); + @SuppressWarnings({"unchecked", "deprecation"}) + public void rsocketConnectorConfigurer() { + ClientRSocketFactoryConfigurer factoryConfigurer = mock(ClientRSocketFactoryConfigurer.class); + Consumer strategiesConfigurer = mock(Consumer.class); RSocketRequester.builder() - .rsocketFactory(this.rsocketFactoryConfigurer) - .rsocketStrategies(rsocketStrategiesConfigurer) + .rsocketConnector(this.connectorConfigurer) + .rsocketFactory(factoryConfigurer) + .rsocketStrategies(strategiesConfigurer) .connect(this.transport) .block(); - // RSocketStrategies and RSocketFactory configurers should have been called + // RSocketStrategies and RSocketConnector configurers should have been called verify(this.transport).connect(anyInt()); - verify(rsocketStrategiesConfigurer).accept(any(RSocketStrategies.Builder.class)); - assertThat(this.rsocketFactoryConfigurer.rsocketFactory()).isNotNull(); + verify(strategiesConfigurer).accept(any(RSocketStrategies.Builder.class)); + verify(factoryConfigurer).configure(any(RSocketFactory.ClientRSocketFactory.class)); + assertThat(this.connectorConfigurer.connector()).isNotNull(); } @Test @@ -143,7 +148,7 @@ public class DefaultRSocketRequesterBuilderTests { .block(); ConnectionSetupPayload setupPayload = Mono.from(this.connection.sentFrames()) - .map(ConnectionSetupPayload::create) + .map(DefaultConnectionSetupPayload::new) .block(); assertThat(setupPayload.dataMimeType()).isEqualTo("application/json"); @@ -151,22 +156,22 @@ public class DefaultRSocketRequesterBuilderTests { } @Test - public void mimeTypesCannotBeChangedAtRSocketFactoryLevel() { + public void mimeTypesCannotBeChangedAtRSocketConnectorLevel() { MimeType dataMimeType = MimeTypeUtils.APPLICATION_JSON; MimeType metaMimeType = MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString()); RSocketRequester requester = RSocketRequester.builder() .metadataMimeType(metaMimeType) .dataMimeType(dataMimeType) - .rsocketFactory(factory -> { - factory.metadataMimeType("text/plain"); - factory.dataMimeType("application/xml"); + .rsocketConnector(connector -> { + connector.metadataMimeType("text/plain"); + connector.dataMimeType("application/xml"); }) .connect(this.transport) .block(); ConnectionSetupPayload setupPayload = Mono.from(this.connection.sentFrames()) - .map(ConnectionSetupPayload::create) + .map(DefaultConnectionSetupPayload::new) .block(); assertThat(setupPayload.dataMimeType()).isEqualTo(dataMimeType.toString()); @@ -186,7 +191,7 @@ public class DefaultRSocketRequesterBuilderTests { .block(); ConnectionSetupPayload setupPayload = Mono.from(this.connection.sentFrames()) - .map(ConnectionSetupPayload::create) + .map(DefaultConnectionSetupPayload::new) .block(); assertThat(setupPayload.getMetadataUtf8()).isEqualTo("toA"); @@ -210,7 +215,7 @@ public class DefaultRSocketRequesterBuilderTests { .block(); ConnectionSetupPayload payload = Mono.from(this.connection.sentFrames()) - .map(ConnectionSetupPayload::create) + .map(DefaultConnectionSetupPayload::new) .block(); MimeType compositeMimeType = @@ -228,11 +233,11 @@ public class DefaultRSocketRequesterBuilderTests { @Test public void frameDecoderMatchesDataBufferFactory() throws Exception { - testFrameDecoder(new NettyDataBufferFactory(ByteBufAllocator.DEFAULT), PayloadDecoder.ZERO_COPY); - testFrameDecoder(new DefaultDataBufferFactory(), PayloadDecoder.DEFAULT); + testPayloadDecoder(new NettyDataBufferFactory(ByteBufAllocator.DEFAULT), PayloadDecoder.ZERO_COPY); + testPayloadDecoder(new DefaultDataBufferFactory(), PayloadDecoder.DEFAULT); } - private void testFrameDecoder(DataBufferFactory bufferFactory, PayloadDecoder frameDecoder) + private void testPayloadDecoder(DataBufferFactory bufferFactory, PayloadDecoder payloadDecoder) throws NoSuchFieldException { RSocketStrategies strategies = RSocketStrategies.builder() @@ -241,17 +246,17 @@ public class DefaultRSocketRequesterBuilderTests { RSocketRequester.builder() .rsocketStrategies(strategies) - .rsocketFactory(this.rsocketFactoryConfigurer) + .rsocketConnector(this.connectorConfigurer) .connect(this.transport) .block(); - RSocketFactory.ClientRSocketFactory factory = this.rsocketFactoryConfigurer.rsocketFactory(); - assertThat(factory).isNotNull(); + RSocketConnector connector = this.connectorConfigurer.connector(); + assertThat(connector).isNotNull(); - Field field = RSocketFactory.ClientRSocketFactory.class.getDeclaredField("payloadDecoder"); + Field field = RSocketConnector.class.getDeclaredField("payloadDecoder"); ReflectionUtils.makeAccessible(field); - PayloadDecoder decoder = (PayloadDecoder) ReflectionUtils.getField(field, factory); - assertThat(decoder).isSameAs(frameDecoder); + PayloadDecoder decoder = (PayloadDecoder) ReflectionUtils.getField(field, connector); + assertThat(decoder).isSameAs(payloadDecoder); } @@ -286,19 +291,17 @@ public class DefaultRSocketRequesterBuilderTests { } - static class TestRSocketFactoryConfigurer implements ClientRSocketFactoryConfigurer { + static class TestRSocketConnectorConfigurer implements RSocketConnectorConfigurer { - private RSocketFactory.ClientRSocketFactory rsocketFactory; + private RSocketConnector connector; - - RSocketFactory.ClientRSocketFactory rsocketFactory() { - return this.rsocketFactory; + RSocketConnector connector() { + return this.connector; } - @Override - public void configure(RSocketFactory.ClientRSocketFactory rsocketFactory) { - this.rsocketFactory = rsocketFactory; + public void configure(RSocketConnector connector) { + this.connector = connector; } } @@ -307,7 +310,6 @@ public class DefaultRSocketRequesterBuilderTests { private final MimeType mimeType; - TestJsonDecoder(MimeType mimeType) { this.mimeType = mimeType; } @@ -344,5 +346,4 @@ public class DefaultRSocketRequesterBuilderTests { throw new UnsupportedOperationException(); } } - } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java index f9ad8b4b19..cb63e828ca 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java @@ -25,8 +25,8 @@ import io.netty.buffer.PooledByteBufAllocator; import io.netty.util.ReferenceCounted; import io.rsocket.AbstractRSocket; import io.rsocket.RSocket; -import io.rsocket.RSocketFactory; import io.rsocket.SocketAcceptor; +import io.rsocket.core.RSocketServer; import io.rsocket.exceptions.ApplicationErrorException; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.plugins.RSocketInterceptor; @@ -80,16 +80,14 @@ class RSocketBufferLeakTests { RSocketMessageHandler messageHandler = context.getBean(RSocketMessageHandler.class); SocketAcceptor responder = messageHandler.responder(); - server = RSocketFactory.receive() - .frameDecoder(PayloadDecoder.ZERO_COPY) - .addResponderPlugin(payloadInterceptor) // intercept responding - .acceptor(responder) - .transport(TcpServerTransport.create("localhost", 7000)) - .start() + server = RSocketServer.create(responder) + .payloadDecoder(PayloadDecoder.ZERO_COPY) + .interceptors(registry -> registry.forResponder(payloadInterceptor)) // intercept responding + .bind(TcpServerTransport.create("localhost", 7000)) .block(); requester = RSocketRequester.builder() - .rsocketFactory(factory -> factory.addRequesterPlugin(payloadInterceptor)) + .rsocketConnector(conn -> conn.interceptors(registry -> registry.forRequester(payloadInterceptor))) .rsocketStrategies(context.getBean(RSocketStrategies.class)) .connectTcp("localhost", 7000) .block(); @@ -157,6 +155,18 @@ class RSocketBufferLeakTests { StepVerifier.create(result).expectError(ApplicationErrorException.class).verify(Duration.ofSeconds(5)); } + @Test + public void echoChannel() { + Flux result = requester.route("echo-channel") + .data(Flux.range(1, 10).map(i -> "Hello " + i), String.class) + .retrieveFlux(String.class); + + StepVerifier.create(result) + .expectNext("Hello 1 async").expectNextCount(8).expectNext("Hello 10 async") + .thenCancel() // https://github.com/rsocket/rsocket-java/issues/613 + .verify(Duration.ofSeconds(5)); + } + @Controller static class ServerController { @@ -192,6 +202,11 @@ class RSocketBufferLeakTests { Mono ignoreInput() { return Mono.delay(Duration.ofMillis(10)).map(l -> "bar"); } + + @MessageMapping("echo-channel") + Flux echoChannel(Flux payloads) { + return payloads.delayElements(Duration.ofMillis(10)).map(payload -> payload + " async"); + } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java index db15789167..195f664456 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,8 +18,8 @@ package org.springframework.messaging.rsocket; import java.time.Duration; -import io.rsocket.RSocketFactory; import io.rsocket.SocketAcceptor; +import io.rsocket.core.RSocketServer; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.metadata.WellKnownMimeType; import io.rsocket.transport.netty.server.CloseableChannel; @@ -71,12 +71,10 @@ public class RSocketClientToServerIntegrationTests { RSocketMessageHandler messageHandler = context.getBean(RSocketMessageHandler.class); SocketAcceptor responder = messageHandler.responder(); - server = RSocketFactory.receive() - .addResponderPlugin(interceptor) - .frameDecoder(PayloadDecoder.ZERO_COPY) - .acceptor(responder) - .transport(TcpServerTransport.create("localhost", 7000)) - .start() + server = RSocketServer.create(responder) + .interceptors(registry -> registry.forResponder(interceptor)) + .payloadDecoder(PayloadDecoder.ZERO_COPY) + .bind(TcpServerTransport.create("localhost", 7000)) .block(); requester = RSocketRequester.builder() diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java index 7efccafc6f..b651c19db5 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,8 +18,8 @@ package org.springframework.messaging.rsocket; import java.time.Duration; -import io.rsocket.RSocketFactory; import io.rsocket.SocketAcceptor; +import io.rsocket.core.RSocketServer; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.TcpServerTransport; @@ -62,11 +62,9 @@ public class RSocketServerToClientIntegrationTests { RSocketMessageHandler messageHandler = context.getBean(RSocketMessageHandler.class); SocketAcceptor responder = messageHandler.responder(); - server = RSocketFactory.receive() - .frameDecoder(PayloadDecoder.ZERO_COPY) - .acceptor(responder) - .transport(TcpServerTransport.create("localhost", 0)) - .start() + server = RSocketServer.create(responder) + .payloadDecoder(PayloadDecoder.ZERO_COPY) + .bind(TcpServerTransport.create("localhost", 0)) .block(); } @@ -99,23 +97,21 @@ public class RSocketServerToClientIntegrationTests { private static void connectAndRunTest(String connectionRoute) { - ServerController serverController = context.getBean(ServerController.class); - serverController.reset(); + context.getBean(ServerController.class).reset(); RSocketStrategies strategies = context.getBean(RSocketStrategies.class); - ClientRSocketFactoryConfigurer clientResponderConfigurer = - RSocketMessageHandler.clientResponder(strategies, new ClientHandler()); + SocketAcceptor responder = RSocketMessageHandler.responder(strategies, new ClientHandler()); RSocketRequester requester = null; try { requester = RSocketRequester.builder() .setupRoute(connectionRoute) .rsocketStrategies(strategies) - .rsocketFactory(clientResponderConfigurer) + .rsocketConnector(connector -> connector.acceptor(responder)) .connectTcp("localhost", server.address().getPort()) .block(); - serverController.await(Duration.ofSeconds(5)); + context.getBean(ServerController.class).await(Duration.ofSeconds(5)); } finally { if (requester != null) { diff --git a/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketClientToServerCoroutinesIntegrationTests.kt b/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketClientToServerCoroutinesIntegrationTests.kt index d72da8a94f..23dff780b6 100644 --- a/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketClientToServerCoroutinesIntegrationTests.kt +++ b/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketClientToServerCoroutinesIntegrationTests.kt @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,7 @@ package org.springframework.messaging.rsocket import io.netty.buffer.PooledByteBufAllocator -import io.rsocket.RSocketFactory +import io.rsocket.core.RSocketServer import io.rsocket.frame.decoder.PayloadDecoder import io.rsocket.transport.netty.server.CloseableChannel import io.rsocket.transport.netty.server.TcpServerTransport @@ -257,15 +257,13 @@ class RSocketClientToServerCoroutinesIntegrationTests { fun setupOnce() { context = AnnotationConfigApplicationContext(ServerConfig::class.java) - server = RSocketFactory.receive() - .frameDecoder(PayloadDecoder.ZERO_COPY) - .acceptor(context.getBean(RSocketMessageHandler::class.java).responder()) - .transport(TcpServerTransport.create("localhost", 7000)) - .start() + server = RSocketServer.create(context.getBean(RSocketMessageHandler::class.java).responder()) + .payloadDecoder(PayloadDecoder.ZERO_COPY) + .bind(TcpServerTransport.create("localhost", 7000)) .block()!! requester = RSocketRequester.builder() - .rsocketFactory { factory -> factory.frameDecoder(PayloadDecoder.ZERO_COPY) } + .rsocketConnector { connector -> connector.payloadDecoder(PayloadDecoder.ZERO_COPY) } .rsocketStrategies(context.getBean(RSocketStrategies::class.java)) .connectTcp("localhost", 7000) .block()!!