Add RSocketRequest.Builder in Spring Messaging
Prior to this commit, `RSocketRequester` would have a single `RSocketRequester.create` static method taking a fully built `RSocket` as an argument. Developers need to build an `RSocket` instance using the `RSocketFactory` and then use it to create a requester. To help developers set up a requester, this commit adds a new `RSocketRequester.Builder` interface and implementation. The `RSocket` building phase and codecs configuration are part of a single call chain. Subscribing to the returned `Mono<RSocketRequester>` will configure and connect to the remote RSocket server. This design should be improved in gh-22798, since we will need to support metadata in a broader fashion. Closes gh-22806
This commit is contained in:
parent
900abfce47
commit
02904121a3
|
|
@ -16,6 +16,7 @@ dependencies {
|
|||
optional(project(":spring-oxm"))
|
||||
optional("io.projectreactor.netty:reactor-netty")
|
||||
optional("io.rsocket:rsocket-core:${rsocketVersion}")
|
||||
optional("io.rsocket:rsocket-transport-netty:${rsocketVersion}")
|
||||
optional("com.fasterxml.jackson.core:jackson-databind:${jackson2Version}")
|
||||
optional("javax.xml.bind:jaxb-api:2.3.1")
|
||||
testCompile("javax.inject:javax.inject-tck:1")
|
||||
|
|
@ -29,7 +30,6 @@ dependencies {
|
|||
testCompile("org.apache.activemq:activemq-stomp:5.8.0")
|
||||
testCompile("io.projectreactor:reactor-test")
|
||||
testCompile "io.reactivex.rxjava2:rxjava:${rxjava2Version}"
|
||||
testCompile("io.rsocket:rsocket-transport-netty:${rsocketVersion}")
|
||||
testCompile("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}")
|
||||
testCompile("org.jetbrains.kotlin:kotlin-stdlib:${kotlinVersion}")
|
||||
testCompile("org.xmlunit:xmlunit-matchers:2.6.2")
|
||||
|
|
|
|||
|
|
@ -0,0 +1,82 @@
|
|||
/*
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.messaging.rsocket;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import io.rsocket.RSocketFactory;
|
||||
import io.rsocket.transport.ClientTransport;
|
||||
import io.rsocket.transport.netty.client.TcpClientTransport;
|
||||
import io.rsocket.transport.netty.client.WebsocketClientTransport;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.MimeType;
|
||||
|
||||
/**
|
||||
* Default implementation of {@link RSocketRequester.Builder}.
|
||||
*
|
||||
* @author Brian Clozel
|
||||
* @since 5.2
|
||||
*/
|
||||
final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
|
||||
|
||||
@Nullable
|
||||
private List<Consumer<RSocketFactory.ClientRSocketFactory>> factoryConfigurers = new ArrayList<>();
|
||||
|
||||
@Nullable
|
||||
private List<Consumer<RSocketStrategies.Builder>> strategiesConfigurers = new ArrayList<>();
|
||||
|
||||
@Override
|
||||
public RSocketRequester.Builder rsocketFactory(Consumer<RSocketFactory.ClientRSocketFactory> configurer) {
|
||||
this.factoryConfigurers.add(configurer);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer) {
|
||||
this.strategiesConfigurers.add(configurer);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<RSocketRequester> connect(ClientTransport transport, MimeType dataMimeType) {
|
||||
return Mono.defer(() -> {
|
||||
RSocketStrategies.Builder strategiesBuilder = RSocketStrategies.builder();
|
||||
this.strategiesConfigurers.forEach(configurer -> configurer.accept(strategiesBuilder));
|
||||
RSocketFactory.ClientRSocketFactory clientFactory = RSocketFactory.connect()
|
||||
.dataMimeType(dataMimeType.toString());
|
||||
this.factoryConfigurers.forEach(configurer -> configurer.accept(clientFactory));
|
||||
return clientFactory.transport(transport).start()
|
||||
.map(rsocket -> RSocketRequester.create(rsocket, dataMimeType, strategiesBuilder.build()));
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<RSocketRequester> connectTcp(String host, int port, MimeType dataMimeType) {
|
||||
return connect(TcpClientTransport.create(host, port), dataMimeType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<RSocketRequester> connectWebSocket(URI uri, MimeType dataMimeType) {
|
||||
return connect(WebsocketClientTransport.create(uri), dataMimeType);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -16,7 +16,12 @@
|
|||
|
||||
package org.springframework.messaging.rsocket;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import io.rsocket.RSocket;
|
||||
import io.rsocket.RSocketFactory;
|
||||
import io.rsocket.transport.ClientTransport;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
|
@ -32,6 +37,7 @@ import org.springframework.util.MimeType;
|
|||
* methods specify routing and other metadata.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @author Brian Clozel
|
||||
* @since 5.2
|
||||
*/
|
||||
public interface RSocketRequester {
|
||||
|
|
@ -55,6 +61,61 @@ public interface RSocketRequester {
|
|||
return new DefaultRSocketRequester(rsocket, dataMimeType, strategies);
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain a {@code RSocketRequester} builder.
|
||||
*/
|
||||
static RSocketRequester.Builder builder() {
|
||||
return new DefaultRSocketRequesterBuilder();
|
||||
}
|
||||
|
||||
/**
|
||||
* A mutable builder for creating a client {@link RSocketRequester}.
|
||||
*/
|
||||
interface Builder {
|
||||
|
||||
/**
|
||||
* Configure the client {@code RSocketFactory}. This is useful for
|
||||
* customizing protocol options and add RSocket plugins.
|
||||
* @param configurer the configurer to apply
|
||||
*/
|
||||
RSocketRequester.Builder rsocketFactory(
|
||||
Consumer<RSocketFactory.ClientRSocketFactory> configurer);
|
||||
|
||||
/**
|
||||
* Configure the builder for {@link RSocketStrategies}.
|
||||
* <p>The builder starts with an empty {@code RSocketStrategies}.
|
||||
* @param configurer the configurer to apply
|
||||
*/
|
||||
RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer);
|
||||
|
||||
/**
|
||||
* Configure the {@code ClientTransport} for the RSocket connection
|
||||
* and connect to the RSocket server
|
||||
* @param transport the chosen client transport
|
||||
* @return a mono containing the connected {@code RSocketRequester}
|
||||
*/
|
||||
Mono<RSocketRequester> connect(ClientTransport transport, MimeType dataMimeType);
|
||||
|
||||
/**
|
||||
* Connect to the RSocket server over TCP transport using the
|
||||
* provided connection parameters
|
||||
* @param host the RSocket server host
|
||||
* @param port the RSocket server port
|
||||
* @param dataMimeType the data MimeType
|
||||
* @return a mono containing the connected {@code RSocketRequester}
|
||||
*/
|
||||
Mono<RSocketRequester> connectTcp(String host, int port, MimeType dataMimeType);
|
||||
|
||||
/**
|
||||
* Connect to the RSocket server over WebSocket transport using the
|
||||
* provided connection parameters
|
||||
* @param uri the RSocket server endpoint URI
|
||||
* @param dataMimeType the data MimeType
|
||||
* @return a mono containing the connected {@code RSocketRequester}
|
||||
*/
|
||||
Mono<RSocketRequester> connectWebSocket(URI uri, MimeType dataMimeType);
|
||||
|
||||
}
|
||||
|
||||
// For now we treat metadata as a simple string that is the route.
|
||||
// This will change after the resolution of:
|
||||
|
|
|
|||
|
|
@ -0,0 +1,105 @@
|
|||
/*
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.messaging.rsocket;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.rsocket.DuplexConnection;
|
||||
import io.rsocket.RSocketFactory;
|
||||
import io.rsocket.transport.ClientTransport;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.util.MimeTypeUtils;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyInt;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Unit tests for {@link DefaultRSocketRequesterBuilder}.
|
||||
*
|
||||
* @author Brian Clozel
|
||||
*/
|
||||
public class DefaultRSocketRequesterBuilderTests {
|
||||
|
||||
private ClientTransport transport;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
this.transport = mock(ClientTransport.class);
|
||||
when(this.transport.connect(anyInt())).thenReturn(Mono.just(new MockConnection()));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void shouldApplyCustomizationsAtSubscription() {
|
||||
Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer = mock(Consumer.class);
|
||||
Consumer<RSocketStrategies.Builder> strategiesConfigurer = mock(Consumer.class);
|
||||
Mono<RSocketRequester> requester = RSocketRequester.builder()
|
||||
.rsocketFactory(factoryConfigurer)
|
||||
.rsocketStrategies(strategiesConfigurer)
|
||||
.connect(this.transport, MimeTypeUtils.APPLICATION_JSON);
|
||||
verifyZeroInteractions(this.transport, factoryConfigurer, strategiesConfigurer);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void shouldApplyCustomizations() {
|
||||
Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer = mock(Consumer.class);
|
||||
Consumer<RSocketStrategies.Builder> strategiesConfigurer = mock(Consumer.class);
|
||||
RSocketRequester requester = RSocketRequester.builder()
|
||||
.rsocketFactory(factoryConfigurer)
|
||||
.rsocketStrategies(strategiesConfigurer)
|
||||
.connect(this.transport, MimeTypeUtils.APPLICATION_JSON)
|
||||
.block();
|
||||
verify(this.transport).connect(anyInt());
|
||||
verify(factoryConfigurer).accept(any(RSocketFactory.ClientRSocketFactory.class));
|
||||
verify(strategiesConfigurer).accept(any(RSocketStrategies.Builder.class));
|
||||
}
|
||||
|
||||
static class MockConnection implements DuplexConnection {
|
||||
|
||||
@Override
|
||||
public Mono<Void> send(Publisher<ByteBuf> frames) {
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<ByteBuf> receive() {
|
||||
return Flux.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> onClose() {
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispose() {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -19,10 +19,8 @@ package org.springframework.messaging.rsocket;
|
|||
import java.time.Duration;
|
||||
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import io.rsocket.RSocket;
|
||||
import io.rsocket.RSocketFactory;
|
||||
import io.rsocket.frame.decoder.PayloadDecoder;
|
||||
import io.rsocket.transport.netty.client.TcpClientTransport;
|
||||
import io.rsocket.transport.netty.server.CloseableChannel;
|
||||
import io.rsocket.transport.netty.server.TcpServerTransport;
|
||||
import org.junit.AfterClass;
|
||||
|
|
@ -59,8 +57,6 @@ public class RSocketClientToServerIntegrationTests {
|
|||
|
||||
private static FireAndForgetCountingInterceptor interceptor = new FireAndForgetCountingInterceptor();
|
||||
|
||||
private static RSocket client;
|
||||
|
||||
private static RSocketRequester requester;
|
||||
|
||||
|
||||
|
|
@ -77,20 +73,19 @@ public class RSocketClientToServerIntegrationTests {
|
|||
.start()
|
||||
.block();
|
||||
|
||||
client = RSocketFactory.connect()
|
||||
.dataMimeType(MimeTypeUtils.TEXT_PLAIN_VALUE)
|
||||
.frameDecoder(PayloadDecoder.ZERO_COPY)
|
||||
.transport(TcpClientTransport.create("localhost", 7000))
|
||||
.start()
|
||||
requester = RSocketRequester.builder()
|
||||
.rsocketFactory(factory -> factory.frameDecoder(PayloadDecoder.ZERO_COPY))
|
||||
.rsocketStrategies(strategies -> strategies
|
||||
.decoder(StringDecoder.allMimeTypes())
|
||||
.encoder(CharSequenceEncoder.allMimeTypes())
|
||||
.dataBufferFactory(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT)))
|
||||
.connectTcp("localhost", 7000, MimeTypeUtils.TEXT_PLAIN)
|
||||
.block();
|
||||
|
||||
requester = RSocketRequester.create(
|
||||
client, MimeTypeUtils.TEXT_PLAIN, context.getBean(RSocketStrategies.class));
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownOnce() {
|
||||
client.dispose();
|
||||
requester.rsocket().dispose();
|
||||
server.dispose();
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue