Add responder strategies to RSocketStrategies

RouteMatcher and MetadataExtractor can now be configured on and
accessed through RSocketStrategies. This simplifies configuration for
client and server responders.

See gh-23314
This commit is contained in:
Rossen Stoyanchev 2019-07-24 10:25:59 +01:00
parent a780cad12e
commit 91b040d0bf
5 changed files with 134 additions and 63 deletions

View File

@ -88,6 +88,7 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C
@Nullable @Nullable
private Validator validator; private Validator validator;
@Nullable
private RouteMatcher routeMatcher; private RouteMatcher routeMatcher;
private ConversionService conversionService = new DefaultFormattingConversionService(); private ConversionService conversionService = new DefaultFormattingConversionService();
@ -97,9 +98,6 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C
public MessageMappingMessageHandler() { public MessageMappingMessageHandler() {
AntPathMatcher pathMatcher = new AntPathMatcher();
pathMatcher.setPathSeparator(".");
this.routeMatcher = new SimpleRouteMatcher(pathMatcher);
setHandlerPredicate(type -> AnnotatedElementUtils.hasAnnotation(type, Controller.class)); setHandlerPredicate(type -> AnnotatedElementUtils.hasAnnotation(type, Controller.class));
} }
@ -150,7 +148,9 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C
/** /**
* Return the {@code RouteMatcher} used to map messages to handlers. * Return the {@code RouteMatcher} used to map messages to handlers.
* May be {@code null} before component is initialized.
*/ */
@Nullable
public RouteMatcher getRouteMatcher() { public RouteMatcher getRouteMatcher() {
return this.routeMatcher; return this.routeMatcher;
} }
@ -203,6 +203,12 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C
resolvers.add(new PayloadMethodArgumentResolver( resolvers.add(new PayloadMethodArgumentResolver(
getDecoders(), this.validator, getReactiveAdapterRegistry(), true)); getDecoders(), this.validator, getReactiveAdapterRegistry(), true));
if (this.routeMatcher == null) {
AntPathMatcher pathMatcher = new AntPathMatcher();
pathMatcher.setPathSeparator(".");
this.routeMatcher = new SimpleRouteMatcher(pathMatcher);
}
return resolvers; return resolvers;
} }

View File

@ -38,7 +38,11 @@ import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.RouteMatcher;
import org.springframework.util.SimpleRouteMatcher;
/** /**
* Default, package-private {@link RSocketStrategies} implementation. * Default, package-private {@link RSocketStrategies} implementation.
@ -52,18 +56,25 @@ final class DefaultRSocketStrategies implements RSocketStrategies {
private final List<Decoder<?>> decoders; private final List<Decoder<?>> decoders;
private final ReactiveAdapterRegistry adapterRegistry; private final RouteMatcher routeMatcher;
private final MetadataExtractor metadataExtractor;
private final DataBufferFactory bufferFactory; private final DataBufferFactory bufferFactory;
private final ReactiveAdapterRegistry adapterRegistry;
private DefaultRSocketStrategies(List<Encoder<?>> encoders, List<Decoder<?>> decoders, private DefaultRSocketStrategies(List<Encoder<?>> encoders, List<Decoder<?>> decoders,
ReactiveAdapterRegistry adapterRegistry, DataBufferFactory bufferFactory) { RouteMatcher routeMatcher, MetadataExtractor metadataExtractor,
DataBufferFactory bufferFactory, ReactiveAdapterRegistry adapterRegistry) {
this.encoders = Collections.unmodifiableList(encoders); this.encoders = Collections.unmodifiableList(encoders);
this.decoders = Collections.unmodifiableList(decoders); this.decoders = Collections.unmodifiableList(decoders);
this.adapterRegistry = adapterRegistry; this.routeMatcher = routeMatcher;
this.metadataExtractor = metadataExtractor;
this.bufferFactory = bufferFactory; this.bufferFactory = bufferFactory;
this.adapterRegistry = adapterRegistry;
} }
@ -78,8 +89,13 @@ final class DefaultRSocketStrategies implements RSocketStrategies {
} }
@Override @Override
public ReactiveAdapterRegistry reactiveAdapterRegistry() { public RouteMatcher routeMatcher() {
return this.adapterRegistry; return this.routeMatcher;
}
@Override
public MetadataExtractor metadataExtractor() {
return this.metadataExtractor;
} }
@Override @Override
@ -87,6 +103,11 @@ final class DefaultRSocketStrategies implements RSocketStrategies {
return this.bufferFactory; return this.bufferFactory;
} }
@Override
public ReactiveAdapterRegistry reactiveAdapterRegistry() {
return this.adapterRegistry;
}
/** /**
* Default RSocketStrategies.Builder implementation. * Default RSocketStrategies.Builder implementation.
@ -97,6 +118,12 @@ final class DefaultRSocketStrategies implements RSocketStrategies {
private final List<Decoder<?>> decoders = new ArrayList<>(); private final List<Decoder<?>> decoders = new ArrayList<>();
@Nullable
private RouteMatcher routeMatcher;
@Nullable
private MetadataExtractor metadataExtractor;
private ReactiveAdapterRegistry adapterRegistry = ReactiveAdapterRegistry.getSharedInstance(); private ReactiveAdapterRegistry adapterRegistry = ReactiveAdapterRegistry.getSharedInstance();
@Nullable @Nullable
@ -151,6 +178,18 @@ final class DefaultRSocketStrategies implements RSocketStrategies {
return this; return this;
} }
@Override
public Builder routeMatcher(RouteMatcher routeMatcher) {
this.routeMatcher = routeMatcher;
return this;
}
@Override
public Builder metadataExtractor(MetadataExtractor metadataExtractor) {
this.metadataExtractor = metadataExtractor;
return this;
}
@Override @Override
public Builder reactiveAdapterStrategy(ReactiveAdapterRegistry registry) { public Builder reactiveAdapterStrategy(ReactiveAdapterRegistry registry) {
Assert.notNull(registry, "ReactiveAdapterRegistry is required"); Assert.notNull(registry, "ReactiveAdapterRegistry is required");
@ -167,12 +206,27 @@ final class DefaultRSocketStrategies implements RSocketStrategies {
@Override @Override
public RSocketStrategies build() { public RSocketStrategies build() {
return new DefaultRSocketStrategies( return new DefaultRSocketStrategies(
this.encoders, this.decoders, this.adapterRegistry, initBufferFactory()); this.encoders, this.decoders,
this.routeMatcher != null ? this.routeMatcher : initRouteMatcher(),
this.metadataExtractor != null ? this.metadataExtractor : initMetadataExtractor(),
this.bufferFactory != null ? this.bufferFactory : initBufferFactory(),
this.adapterRegistry);
}
private RouteMatcher initRouteMatcher() {
AntPathMatcher pathMatcher = new AntPathMatcher();
pathMatcher.setPathSeparator(".");
return new SimpleRouteMatcher(pathMatcher);
}
private MetadataExtractor initMetadataExtractor() {
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor();
extractor.metadataToExtract(MimeTypeUtils.TEXT_PLAIN, String.class, MetadataExtractor.ROUTE_KEY);
return extractor;
} }
private DataBufferFactory initBufferFactory() { private DataBufferFactory initBufferFactory() {
return this.bufferFactory != null ? this.bufferFactory : return new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT);
new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT);
} }
} }

View File

@ -31,7 +31,10 @@ import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.MimeType; import org.springframework.util.MimeType;
import org.springframework.util.RouteMatcher;
import org.springframework.util.SimpleRouteMatcher;
/** /**
* Access to strategies for use by RSocket requester and responder components. * Access to strategies for use by RSocket requester and responder components.
@ -90,10 +93,14 @@ public interface RSocketStrategies {
} }
/** /**
* Return the configured * Return the configured {@link Builder#routeMatcher(RouteMatcher)}.
* {@link Builder#reactiveAdapterStrategy(ReactiveAdapterRegistry) reactiveAdapterRegistry}.
*/ */
ReactiveAdapterRegistry reactiveAdapterRegistry(); RouteMatcher routeMatcher();
/**
* Return the configured {@link Builder#metadataExtractor(MetadataExtractor)}.
*/
MetadataExtractor metadataExtractor();
/** /**
* Return the configured * Return the configured
@ -101,6 +108,12 @@ public interface RSocketStrategies {
*/ */
DataBufferFactory dataBufferFactory(); DataBufferFactory dataBufferFactory();
/**
* Return the configured
* {@link Builder#reactiveAdapterStrategy(ReactiveAdapterRegistry) reactiveAdapterRegistry}.
*/
ReactiveAdapterRegistry reactiveAdapterRegistry();
/** /**
* Return a builder to build a new {@code RSocketStrategies} instance. * Return a builder to build a new {@code RSocketStrategies} instance.
@ -149,6 +162,27 @@ public interface RSocketStrategies {
*/ */
Builder decoders(Consumer<List<Decoder<?>>> consumer); Builder decoders(Consumer<List<Decoder<?>>> consumer);
/**
* Configure a {@code RouteMatcher} for matching routes to message
* handlers based on route patterns. This option is applicable to
* client or server responders.
* <p>By default, {@link SimpleRouteMatcher} is used, backed by
* {@link AntPathMatcher} with "." as separator. For better
* efficiency consider using the {@code PathPatternRouteMatcher} from
* {@code spring-web} instead.
*/
Builder routeMatcher(RouteMatcher routeMatcher);
/**
* Configure a {@link MetadataExtractor} to extract the route along with
* other metadata. This option is applicable to client or server
* responders.
* <p>By default this is {@link DefaultMetadataExtractor} extracting a
* route from {@code "message/x.rsocket.routing.v0"} or
* {@code "text/plain"} metadata entries.
*/
Builder metadataExtractor(MetadataExtractor metadataExtractor);
/** /**
* Configure the registry for reactive type support. This can be used to * Configure the registry for reactive type support. This can be used to
* to adapt to, and/or determine the semantics of a given * to adapt to, and/or determine the semantics of a given

View File

@ -25,10 +25,8 @@ import io.rsocket.RSocketFactory;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.messaging.rsocket.ClientRSocketFactoryConfigurer; import org.springframework.messaging.rsocket.ClientRSocketFactoryConfigurer;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketStrategies; import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.RouteMatcher;
/** /**
* {@link ClientRSocketFactoryConfigurer} to configure and plug in a responder * {@link ClientRSocketFactoryConfigurer} to configure and plug in a responder
@ -44,12 +42,6 @@ public final class AnnotationClientResponderConfigurer implements ClientRSocketF
private final List<Object> handlers = new ArrayList<>(); private final List<Object> handlers = new ArrayList<>();
@Nullable
private RouteMatcher routeMatcher;
@Nullable
private MetadataExtractor extractor;
@Nullable @Nullable
private RSocketStrategies strategies; private RSocketStrategies strategies;
@ -61,24 +53,6 @@ public final class AnnotationClientResponderConfigurer implements ClientRSocketF
} }
/**
* Configure the {@link RouteMatcher} to use. This is used to set
* {@link RSocketMessageHandler#setRouteMatcher(RouteMatcher)}.
*/
public AnnotationClientResponderConfigurer routeMatcher(RouteMatcher routeMatcher) {
this.routeMatcher = routeMatcher;
return this;
}
/**
* Configure the {@link MetadataExtractor} to use. This is used to set
* {@link RSocketMessageHandler#setMetadataExtractor(MetadataExtractor)}.
*/
public AnnotationClientResponderConfigurer metadataExtractor(MetadataExtractor extractor) {
this.extractor = extractor;
return this;
}
/** /**
* Configure handlers to detect {@code @MessasgeMapping} handler methods on. * Configure handlers to detect {@code @MessasgeMapping} handler methods on.
* This is used to set {@link RSocketMessageHandler#setHandlers(List)}. * This is used to set {@link RSocketMessageHandler#setHandlers(List)}.
@ -101,12 +75,6 @@ public final class AnnotationClientResponderConfigurer implements ClientRSocketF
Assert.notEmpty(this.handlers, "No handlers"); Assert.notEmpty(this.handlers, "No handlers");
RSocketMessageHandler messageHandler = new RSocketMessageHandler(); RSocketMessageHandler messageHandler = new RSocketMessageHandler();
messageHandler.setHandlers(this.handlers); messageHandler.setHandlers(this.handlers);
if (this.routeMatcher != null) {
messageHandler.setRouteMatcher(this.routeMatcher);
}
if (this.extractor != null) {
messageHandler.setMetadataExtractor(this.extractor);
}
messageHandler.setRSocketStrategies(this.strategies); messageHandler.setRSocketStrategies(this.strategies);
messageHandler.afterPropertiesSet(); messageHandler.afterPropertiesSet();
factory.acceptor(messageHandler.clientResponder()); factory.acceptor(messageHandler.clientResponder());

View File

@ -114,10 +114,16 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
/** /**
* Provide configuration in the form of {@link RSocketStrategies} instance * Provide configuration in the form of {@link RSocketStrategies} instance
* which can also be re-used to initialize a client-side * which can also be re-used to initialize a client-side
* {@link RSocketRequester}. When this property is set, it also sets * {@link RSocketRequester}.
* {@link #setDecoders(List) decoders}, {@link #setEncoders(List) encoders}, * <p>When this is set, in turn it sets the following:
* and {@link #setReactiveAdapterRegistry(ReactiveAdapterRegistry) * <ul>
* reactiveAdapterRegistry}. * <li>{@link #setDecoders(List)}
* <li>{@link #setEncoders(List)}
* <li>{@link #setRouteMatcher(RouteMatcher)}
* <li>{@link #setMetadataExtractor(MetadataExtractor)}
* <li>{@link #setReactiveAdapterRegistry(ReactiveAdapterRegistry)}
* </ul>
* <p>By default if this is not set, it is initialized from the above.
*/ */
public void setRSocketStrategies(@Nullable RSocketStrategies rsocketStrategies) { public void setRSocketStrategies(@Nullable RSocketStrategies rsocketStrategies) {
this.rsocketStrategies = rsocketStrategies; this.rsocketStrategies = rsocketStrategies;
@ -138,12 +144,10 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
} }
/** /**
* Configure a {@link MetadataExtractor} to extract the route and possibly * Configure a {@link MetadataExtractor} to extract the route along with
* other metadata from the first payload of incoming requests. * other metadata.
* <p>By default this is a {@link DefaultMetadataExtractor} with the * <p>By default this is {@link DefaultMetadataExtractor} extracting a
* configured {@link RSocketStrategies} (and decoders), extracting a route * route from {@code "message/x.rsocket.routing.v0"} or {@code "text/plain"}.
* from {@code "message/x.rsocket.routing.v0"} or {@code "text/plain"}
* metadata entries.
* @param extractor the extractor to use * @param extractor the extractor to use
*/ */
public void setMetadataExtractor(MetadataExtractor extractor) { public void setMetadataExtractor(MetadataExtractor extractor) {
@ -200,20 +204,25 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
@Override @Override
public void afterPropertiesSet() { public void afterPropertiesSet() {
getArgumentResolverConfigurer().addCustomResolver(new RSocketRequesterMethodArgumentResolver());
super.afterPropertiesSet();
if (getMetadataExtractor() == null) {
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor();
extractor.metadataToExtract(MimeTypeUtils.TEXT_PLAIN, String.class, MetadataExtractor.ROUTE_KEY);
setMetadataExtractor(extractor);
}
if (this.rsocketStrategies == null) { if (this.rsocketStrategies == null) {
this.rsocketStrategies = RSocketStrategies.builder() this.rsocketStrategies = RSocketStrategies.builder()
.decoder(getDecoders().toArray(new Decoder<?>[0])) .decoder(getDecoders().toArray(new Decoder<?>[0]))
.encoder(getEncoders().toArray(new Encoder<?>[0])) .encoder(getEncoders().toArray(new Encoder<?>[0]))
.routeMatcher(getRouteMatcher())
.metadataExtractor(getMetadataExtractor())
.reactiveAdapterStrategy(getReactiveAdapterRegistry()) .reactiveAdapterStrategy(getReactiveAdapterRegistry())
.build(); .build();
} }
if (this.metadataExtractor == null) {
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor();
extractor.metadataToExtract(MimeTypeUtils.TEXT_PLAIN, String.class, MetadataExtractor.ROUTE_KEY);
this.metadataExtractor = extractor;
}
getArgumentResolverConfigurer().addCustomResolver(new RSocketRequesterMethodArgumentResolver());
super.afterPropertiesSet();
} }
@Override @Override