ClientRSocketFactoryConfigurer refactoring
This commit is contained in:
parent
027fd78306
commit
2aa3363ba2
|
@ -18,10 +18,9 @@ package org.springframework.messaging.rsocket;
|
||||||
import io.rsocket.RSocketFactory;
|
import io.rsocket.RSocketFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Strategy to apply some configuration to a
|
* Strategy to apply configuration to a client side {@code RSocketFactory}.
|
||||||
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory ClientRSocketFactory}.
|
* that's being prepared by {@link RSocketRequester.Builder} to connect
|
||||||
* It is given to {@link RSocketRequester.Builder} to initialize the
|
* to a server.
|
||||||
* {@code RSocketFactory} that's used to connect.
|
|
||||||
*
|
*
|
||||||
* @author Rossen Stoyanchev
|
* @author Rossen Stoyanchev
|
||||||
* @since 5.2
|
* @since 5.2
|
||||||
|
@ -30,17 +29,7 @@ import io.rsocket.RSocketFactory;
|
||||||
public interface ClientRSocketFactoryConfigurer {
|
public interface ClientRSocketFactoryConfigurer {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method is invoked by {@link RSocketRequester.Builder} immediately
|
* Apply configuration to the given {@code ClientRSocketFactory}.
|
||||||
* before the call to {@link #configure}, and can be used by implementations
|
|
||||||
* of this interface that need access to the configured
|
|
||||||
* {@code RSocketStrategies}.
|
|
||||||
*/
|
|
||||||
default void configureWithStrategies(RSocketStrategies strategies) {
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Configure the given {@code ClientRSocketFactory}.
|
|
||||||
* @param rsocketFactory the factory to configure
|
|
||||||
*/
|
*/
|
||||||
void configure(RSocketFactory.ClientRSocketFactory rsocketFactory);
|
void configure(RSocketFactory.ClientRSocketFactory rsocketFactory);
|
||||||
|
|
||||||
|
|
|
@ -55,7 +55,7 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
|
||||||
|
|
||||||
private List<Consumer<RSocketStrategies.Builder>> strategiesConfigurers = new ArrayList<>();
|
private List<Consumer<RSocketStrategies.Builder>> strategiesConfigurers = new ArrayList<>();
|
||||||
|
|
||||||
private List<ClientRSocketFactoryConfigurer> rsocketFactoryConfigurers = new ArrayList<>();
|
private List<ClientRSocketFactoryConfigurer> rsocketConfigurers = new ArrayList<>();
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -85,7 +85,7 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer) {
|
public RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer) {
|
||||||
this.rsocketFactoryConfigurers.add(configurer);
|
this.rsocketConfigurers.add(configurer);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,10 +118,7 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
|
||||||
rsocketFactory.frameDecoder(PayloadDecoder.ZERO_COPY);
|
rsocketFactory.frameDecoder(PayloadDecoder.ZERO_COPY);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.rsocketFactoryConfigurers.forEach(configurer -> {
|
this.rsocketConfigurers.forEach(configurer -> configurer.configure(rsocketFactory));
|
||||||
configurer.configureWithStrategies(rsocketStrategies);
|
|
||||||
configurer.configure(rsocketFactory);
|
|
||||||
});
|
|
||||||
|
|
||||||
return rsocketFactory.transport(transport)
|
return rsocketFactory.transport(transport)
|
||||||
.start()
|
.start()
|
||||||
|
|
|
@ -164,15 +164,12 @@ public interface RSocketRequester {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Callback to configure the {@code ClientRSocketFactory} directly.
|
* Callback to configure the {@code ClientRSocketFactory} directly.
|
||||||
* <p>See static factory method
|
* <p>Do not set {@link #dataMimeType(MimeType)} and
|
||||||
* {@link RSocketMessageHandler#clientResponder(Object...)} for
|
* {@link #metadataMimeType(MimeType)} directly on the
|
||||||
* configuring a client side responder with annotated methods.
|
* {@code ClientRSocketFactory}. Use methods on this builder instead
|
||||||
* <p><strong>Note:</strong> Do not set {@link #dataMimeType(MimeType)}
|
* so the {@code RSocketRequester} will have access to them.
|
||||||
* and {@link #metadataMimeType(MimeType)} directly on the
|
* <p>For configuring client side responding, see
|
||||||
* {@code ClientRSocketFactory}. Use the shortcuts on this builder
|
* {@link RSocketMessageHandler#clientResponder(RSocketStrategies, Object...)}.
|
||||||
* instead since the created {@code RSocketRequester} needs to be aware
|
|
||||||
* of those settings.
|
|
||||||
* @see RSocketMessageHandler#clientResponder(Object...)
|
|
||||||
*/
|
*/
|
||||||
RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer);
|
RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer);
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,6 @@ import java.util.function.Function;
|
||||||
|
|
||||||
import io.rsocket.ConnectionSetupPayload;
|
import io.rsocket.ConnectionSetupPayload;
|
||||||
import io.rsocket.RSocket;
|
import io.rsocket.RSocket;
|
||||||
import io.rsocket.RSocketFactory;
|
|
||||||
import io.rsocket.SocketAcceptor;
|
import io.rsocket.SocketAcceptor;
|
||||||
import io.rsocket.frame.FrameType;
|
import io.rsocket.frame.FrameType;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
@ -344,18 +343,15 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return an adapter for a client side
|
* Return an adapter for a client side responder that can be used to set
|
||||||
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory#acceptor(BiFunction)
|
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory#acceptor(Function)}.
|
||||||
* acceptor} that delegate to this {@link RSocketMessageHandler} for
|
* The responder delegates requests to this {@code RSocketMessageHandler}
|
||||||
* handling.
|
* for handling via {@code @MessageMapping} methods.
|
||||||
* <p>The initial {@link ConnectionSetupPayload} can be processed with a
|
* <p>The initial {@link ConnectionSetupPayload} can be accessed through a
|
||||||
* {@link ConnectMapping @ConnectionMapping} method but, unlike the
|
* {@link ConnectMapping @ConnectionMapping} method, but such a method is
|
||||||
* server side, such a method is merely a callback and cannot prevent the
|
* only a callback just before the connection is made and cannot "accept"
|
||||||
* connection unless the method throws an error immediately. Such a method
|
* or prevent the connection. Such a method can also start requests to the
|
||||||
* can also start requests to the server but must do so decoupled from
|
* server but must do so decoupled from handling and the current thread.
|
||||||
* handling and from the current thread.
|
|
||||||
* <p>Subsequent stream requests can be handled with
|
|
||||||
* {@link MessageMapping MessageMapping} methods.
|
|
||||||
*/
|
*/
|
||||||
public BiFunction<ConnectionSetupPayload, RSocket, RSocket> clientResponder() {
|
public BiFunction<ConnectionSetupPayload, RSocket, RSocket> clientResponder() {
|
||||||
return (setupPayload, sendingRSocket) -> {
|
return (setupPayload, sendingRSocket) -> {
|
||||||
|
@ -375,17 +371,13 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
|
||||||
MimeType metaMimeType = StringUtils.hasText(s) ? MimeTypeUtils.parseMimeType(s) : this.defaultMetadataMimeType;
|
MimeType metaMimeType = StringUtils.hasText(s) ? MimeTypeUtils.parseMimeType(s) : this.defaultMetadataMimeType;
|
||||||
Assert.notNull(metaMimeType, "No `metadataMimeType` in ConnectionSetupPayload and no default value");
|
Assert.notNull(metaMimeType, "No `metadataMimeType` in ConnectionSetupPayload and no default value");
|
||||||
|
|
||||||
RSocketStrategies strategies = getRSocketStrategies();
|
RSocketRequester requester = RSocketRequester.wrap(
|
||||||
RSocketRequester requester = RSocketRequester.wrap(rsocket, dataMimeType, metaMimeType, strategies);
|
rsocket, dataMimeType, metaMimeType, this.strategies);
|
||||||
|
|
||||||
Assert.state(this.metadataExtractor != null,
|
Assert.state(getRouteMatcher() != null, () -> "No RouteMatcher. Was afterPropertiesSet not called?");
|
||||||
() -> "No MetadataExtractor. Was afterPropertiesSet not called?");
|
|
||||||
|
|
||||||
Assert.state(getRouteMatcher() != null,
|
return new MessagingRSocket(dataMimeType, metaMimeType, this.metadataExtractor,
|
||||||
() -> "No RouteMatcher. Was afterPropertiesSet not called?");
|
requester, this, getRouteMatcher(), this.strategies);
|
||||||
|
|
||||||
return new MessagingRSocket(dataMimeType, metaMimeType, this.metadataExtractor, requester,
|
|
||||||
this, getRouteMatcher(), strategies);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isDataMimeTypeSupported(MimeType dataMimeType) {
|
private boolean isDataMimeTypeSupported(MimeType dataMimeType) {
|
||||||
|
@ -399,39 +391,39 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ClientRSocketFactoryConfigurer clientResponder(Object... handlers) {
|
/**
|
||||||
return new ResponderConfigurer(handlers);
|
* Static factory method for a configurer of a client side responder with
|
||||||
}
|
* annotated handler methods. This is intended to be passed into
|
||||||
|
* {@link RSocketRequester.Builder#rsocketFactory(ClientRSocketFactoryConfigurer)}.
|
||||||
|
* <p>In effect a shortcut to create and initialize
|
||||||
|
* {@code RSocketMessageHandler} with the given strategies and handlers,
|
||||||
|
* and use {@link #clientResponder()} to obtain the responder.
|
||||||
|
* For more advanced scenarios, e.g. discovering handlers through a custom
|
||||||
|
* stereotype annotation, consider declaring {@code RSocketMessageHandler}
|
||||||
|
* as a bean, and then obtain the responder from it.
|
||||||
|
* @param strategies the strategies to set on the created
|
||||||
|
* {@code RSocketMessageHandler}
|
||||||
|
* @param candidateHandlers a list of Objects and/or Classes with annotated
|
||||||
|
* handler methods; used to call {@link #setHandlers(List)} with
|
||||||
|
* on the created {@code RSocketMessageHandler}
|
||||||
|
* @return a configurer that may be passed into
|
||||||
|
* {@link RSocketRequester.Builder#rsocketFactory(ClientRSocketFactoryConfigurer)}
|
||||||
|
*/
|
||||||
|
public static ClientRSocketFactoryConfigurer clientResponder(
|
||||||
|
RSocketStrategies strategies, Object... candidateHandlers) {
|
||||||
|
|
||||||
|
Assert.notEmpty(candidateHandlers, "No handlers");
|
||||||
private static final class ResponderConfigurer implements ClientRSocketFactoryConfigurer {
|
List<Object> handlers = new ArrayList<>(candidateHandlers.length);
|
||||||
|
for (Object obj : candidateHandlers) {
|
||||||
private final List<Object> handlers = new ArrayList<>();
|
handlers.add(obj instanceof Class ? BeanUtils.instantiateClass((Class<?>) obj) : obj);
|
||||||
|
|
||||||
@Nullable
|
|
||||||
private RSocketStrategies strategies;
|
|
||||||
|
|
||||||
|
|
||||||
private ResponderConfigurer(Object... handlers) {
|
|
||||||
Assert.notEmpty(handlers, "No handlers");
|
|
||||||
for (Object obj : handlers) {
|
|
||||||
this.handlers.add(obj instanceof Class ? BeanUtils.instantiateClass((Class<?>) obj) : obj);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
return rsocketFactory -> {
|
||||||
public void configureWithStrategies(RSocketStrategies strategies) {
|
|
||||||
this.strategies = strategies;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void configure(RSocketFactory.ClientRSocketFactory factory) {
|
|
||||||
RSocketMessageHandler handler = new RSocketMessageHandler();
|
RSocketMessageHandler handler = new RSocketMessageHandler();
|
||||||
handler.setHandlers(this.handlers);
|
handler.setHandlers(handlers);
|
||||||
handler.setRSocketStrategies(this.strategies);
|
handler.setRSocketStrategies(strategies);
|
||||||
handler.afterPropertiesSet();
|
handler.afterPropertiesSet();
|
||||||
factory.acceptor(handler.clientResponder());
|
rsocketFactory.acceptor(handler.clientResponder());
|
||||||
}
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,7 +99,6 @@ public class DefaultRSocketRequesterBuilderTests {
|
||||||
|
|
||||||
verify(this.transport).connect(anyInt());
|
verify(this.transport).connect(anyInt());
|
||||||
verify(rsocketStrategiesConfigurer).accept(any(RSocketStrategies.Builder.class));
|
verify(rsocketStrategiesConfigurer).accept(any(RSocketStrategies.Builder.class));
|
||||||
assertThat(this.rsocketFactoryConfigurer.rsocketStrategies()).isNotNull();
|
|
||||||
assertThat(this.rsocketFactoryConfigurer.rsocketFactory()).isNotNull();
|
assertThat(this.rsocketFactoryConfigurer.rsocketFactory()).isNotNull();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -194,25 +193,14 @@ public class DefaultRSocketRequesterBuilderTests {
|
||||||
|
|
||||||
static class TestRSocketFactoryConfigurer implements ClientRSocketFactoryConfigurer {
|
static class TestRSocketFactoryConfigurer implements ClientRSocketFactoryConfigurer {
|
||||||
|
|
||||||
private RSocketStrategies strategies;
|
|
||||||
|
|
||||||
private RSocketFactory.ClientRSocketFactory rsocketFactory;
|
private RSocketFactory.ClientRSocketFactory rsocketFactory;
|
||||||
|
|
||||||
|
|
||||||
public RSocketStrategies rsocketStrategies() {
|
RSocketFactory.ClientRSocketFactory rsocketFactory() {
|
||||||
return this.strategies;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RSocketFactory.ClientRSocketFactory rsocketFactory() {
|
|
||||||
return this.rsocketFactory;
|
return this.rsocketFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void configureWithStrategies(RSocketStrategies strategies) {
|
|
||||||
this.strategies = strategies;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void configure(RSocketFactory.ClientRSocketFactory rsocketFactory) {
|
public void configure(RSocketFactory.ClientRSocketFactory rsocketFactory) {
|
||||||
this.rsocketFactory = rsocketFactory;
|
this.rsocketFactory = rsocketFactory;
|
||||||
|
|
|
@ -106,15 +106,17 @@ public class RSocketServerToClientIntegrationTests {
|
||||||
ServerController serverController = context.getBean(ServerController.class);
|
ServerController serverController = context.getBean(ServerController.class);
|
||||||
serverController.reset();
|
serverController.reset();
|
||||||
|
|
||||||
|
RSocketStrategies strategies = context.getBean(RSocketStrategies.class);
|
||||||
|
ClientRSocketFactoryConfigurer clientResponderConfigurer =
|
||||||
|
RSocketMessageHandler.clientResponder(strategies, new ClientHandler());
|
||||||
|
|
||||||
RSocketRequester requester = null;
|
RSocketRequester requester = null;
|
||||||
try {
|
try {
|
||||||
requester = RSocketRequester.builder()
|
requester = RSocketRequester.builder()
|
||||||
.rsocketFactory(factory -> {
|
.metadataMimeType(MimeTypeUtils.TEXT_PLAIN)
|
||||||
factory.metadataMimeType("text/plain");
|
.rsocketStrategies(strategies)
|
||||||
factory.setupPayload(ByteBufPayload.create("", connectionRoute));
|
.rsocketFactory(clientResponderConfigurer)
|
||||||
})
|
.rsocketFactory(factory -> factory.setupPayload(ByteBufPayload.create("", connectionRoute)))
|
||||||
.rsocketFactory(RSocketMessageHandler.clientResponder(new ClientHandler()))
|
|
||||||
.rsocketStrategies(context.getBean(RSocketStrategies.class))
|
|
||||||
.connectTcp("localhost", server.address().getPort())
|
.connectTcp("localhost", server.address().getPort())
|
||||||
.block();
|
.block();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue