Add client responder configuration
Prior to this commit, the `RSocketRequester.Builder` would allow to configure directly annotated handlers for processing server requests. This lead to a package tangle where the `o.s.messaging.rsocket` would use classes from `o.s.messaging.rsocket.annotation.support` package. This commit introduces the `ClientResponderFactory` interface for configuring a responder on the client RSocket factory. Its goal is to be compatible with future changes with a functional variant for RSocket handlers. Closes gh-23170
This commit is contained in:
parent
d6e3394b81
commit
e7ecb83449
|
@ -18,7 +18,6 @@ package org.springframework.messaging.rsocket;
|
|||
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Stream;
|
||||
|
@ -31,7 +30,6 @@ import io.rsocket.transport.netty.client.WebsocketClientTransport;
|
|||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.MimeType;
|
||||
|
||||
|
@ -56,7 +54,6 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
|
|||
|
||||
private List<Consumer<RSocketStrategies.Builder>> strategiesConfigurers = new ArrayList<>();
|
||||
|
||||
private List<Object> handlers = new ArrayList<>();
|
||||
|
||||
@Override
|
||||
public RSocketRequester.Builder dataMimeType(@Nullable MimeType mimeType) {
|
||||
|
@ -83,12 +80,6 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
|
|||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RSocketRequester.Builder annotatedHandlers(Object... handlers) {
|
||||
this.handlers.addAll(Arrays.asList(handlers));
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer) {
|
||||
this.strategiesConfigurers.add(configurer);
|
||||
|
@ -120,13 +111,6 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
|
|||
rsocketFactory.dataMimeType(dataMimeType.toString());
|
||||
rsocketFactory.metadataMimeType(this.metadataMimeType.toString());
|
||||
|
||||
if (!this.handlers.isEmpty()) {
|
||||
RSocketMessageHandler messageHandler = new RSocketMessageHandler();
|
||||
messageHandler.setHandlers(this.handlers);
|
||||
messageHandler.setRSocketStrategies(rsocketStrategies);
|
||||
messageHandler.afterPropertiesSet();
|
||||
rsocketFactory.acceptor(messageHandler.clientResponder());
|
||||
}
|
||||
rsocketFactory.frameDecoder(PayloadDecoder.ZERO_COPY);
|
||||
this.factoryConfigurers.forEach(consumer -> consumer.accept(rsocketFactory));
|
||||
|
||||
|
|
|
@ -30,7 +30,6 @@ import reactor.core.publisher.Mono;
|
|||
import org.springframework.core.ParameterizedTypeReference;
|
||||
import org.springframework.core.ReactiveAdapterRegistry;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
|
||||
import org.springframework.util.MimeType;
|
||||
|
||||
/**
|
||||
|
@ -65,7 +64,6 @@ public interface RSocketRequester {
|
|||
*/
|
||||
MimeType metadataMimeType();
|
||||
|
||||
|
||||
/**
|
||||
* Begin to specify a new request with the given route to a remote handler.
|
||||
* <p>If the connection is set to use composite metadata, the route is
|
||||
|
@ -158,6 +156,7 @@ public interface RSocketRequester {
|
|||
/**
|
||||
* Set the {@link RSocketStrategies} to use for access to encoders,
|
||||
* decoders, and a factory for {@code DataBuffer's}.
|
||||
* @param strategies the codecs strategies to use
|
||||
*/
|
||||
RSocketRequester.Builder rsocketStrategies(@Nullable RSocketStrategies strategies);
|
||||
|
||||
|
@ -170,17 +169,6 @@ public interface RSocketRequester {
|
|||
*/
|
||||
RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer);
|
||||
|
||||
/**
|
||||
* Add handlers for processing requests sent by the server.
|
||||
* <p>This is a shortcut for registering client handlers (i.e. annotated controllers)
|
||||
* to a {@link RSocketMessageHandler} and configuring it as an acceptor.
|
||||
* You can take full control by manually registering an acceptor on the
|
||||
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory} using
|
||||
* {@link #rsocketFactory(Consumer)} instead.
|
||||
* @param handlers the client handlers to configure on the requester
|
||||
*/
|
||||
RSocketRequester.Builder annotatedHandlers(Object... handlers);
|
||||
|
||||
/**
|
||||
* Connect to the RSocket server over TCP.
|
||||
* @param host the server host
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.messaging.rsocket.annotation.support;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import io.rsocket.RSocketFactory;
|
||||
|
||||
import org.springframework.messaging.handler.invocation.reactive.ArgumentResolverConfigurer;
|
||||
import org.springframework.messaging.handler.invocation.reactive.ReturnValueHandlerConfigurer;
|
||||
import org.springframework.messaging.rsocket.RSocketStrategies;
|
||||
import org.springframework.util.RouteMatcher;
|
||||
|
||||
/**
|
||||
* Build and configure a responder on a {@link RSocketFactory.ClientRSocketFactory} in order
|
||||
* to handle requests sent by the RSocket server to the client.
|
||||
* <p>This can be configured as a responder on a {@link org.springframework.messaging.rsocket.RSocketRequester}
|
||||
* being built by passing it as an argument to the
|
||||
* {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketFactory} method.
|
||||
*
|
||||
* @author Brian Clozel
|
||||
* @since 5.2
|
||||
* @see org.springframework.messaging.rsocket.RSocketRequester
|
||||
*/
|
||||
public interface ClientResponderFactory extends Consumer<RSocketFactory.ClientRSocketFactory> {
|
||||
|
||||
/**
|
||||
* Create a new {@link ClientResponderFactory.Config} for handling requests with annotated handlers.
|
||||
*/
|
||||
static ClientResponderFactory.Config create() {
|
||||
return new DefaultClientResponderFactory();
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the client responder with infrastructure options
|
||||
* to be applied on the resulting {@link RSocketMessageHandler}.
|
||||
*/
|
||||
interface Config {
|
||||
|
||||
/**
|
||||
* Set the {@link RSocketStrategies} to use for access to encoders,
|
||||
* decoders, and a factory for {@code DataBuffer's}.
|
||||
* @param strategies the codecs strategies to use
|
||||
*/
|
||||
Config strategies(RSocketStrategies strategies);
|
||||
|
||||
/**
|
||||
* Set the {@link RouteMatcher} to use for matching incoming requests.
|
||||
* <p>If none is set, then the responder will use a default
|
||||
* {@link org.springframework.util.SimpleRouteMatcher} instance backed
|
||||
* by and {@link org.springframework.util.AntPathMatcher}.
|
||||
* @param routeMatcher the route matcher to use with the responder
|
||||
*/
|
||||
Config routeMatcher(RouteMatcher routeMatcher);
|
||||
|
||||
/**
|
||||
* Set the {@link MetadataExtractor} to use for extracting information
|
||||
* from metadata frames.
|
||||
* @param extractor the metadata extractor to use
|
||||
*/
|
||||
Config metadataExtractor(MetadataExtractor extractor);
|
||||
|
||||
/**
|
||||
* Set the {@link ReturnValueHandlerConfigurer} for configuring
|
||||
* return value handlers.
|
||||
* @param configurer the configurer to use
|
||||
*/
|
||||
Config returnValueHandler(ReturnValueHandlerConfigurer configurer);
|
||||
|
||||
/**
|
||||
* Set the {@link ArgumentResolverConfigurer} for configuring
|
||||
* argument resolvers.
|
||||
* @param configurer the configurer to use
|
||||
*/
|
||||
Config argumentResolver(ArgumentResolverConfigurer configurer);
|
||||
|
||||
/**
|
||||
* Set the annotated handlers in charge of processing the incoming RSocket requests.
|
||||
* @param handlers the annotated handlers
|
||||
*/
|
||||
ClientResponderFactory handlers(Object... handlers);
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,116 @@
|
|||
/*
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.messaging.rsocket.annotation.support;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import io.rsocket.RSocketFactory;
|
||||
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.handler.invocation.reactive.ArgumentResolverConfigurer;
|
||||
import org.springframework.messaging.handler.invocation.reactive.ReturnValueHandlerConfigurer;
|
||||
import org.springframework.messaging.rsocket.RSocketStrategies;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.RouteMatcher;
|
||||
|
||||
/**
|
||||
* Default implementation of {@link ClientResponderFactory}.
|
||||
*
|
||||
* @author Brian Clozel
|
||||
*/
|
||||
class DefaultClientResponderFactory implements ClientResponderFactory, ClientResponderFactory.Config {
|
||||
|
||||
private List<Object> handlers;
|
||||
|
||||
@Nullable
|
||||
private RSocketStrategies strategies;
|
||||
|
||||
@Nullable
|
||||
private RouteMatcher routeMatcher;
|
||||
|
||||
@Nullable
|
||||
private MetadataExtractor extractor;
|
||||
|
||||
@Nullable
|
||||
private ReturnValueHandlerConfigurer returnValueHandlerConfigurer;
|
||||
|
||||
@Nullable
|
||||
private ArgumentResolverConfigurer argumentResolverConfigurer;
|
||||
|
||||
@Override
|
||||
public ClientResponderFactory handlers(Object... handlers) {
|
||||
Assert.notEmpty(handlers, "handlers should not be empty");
|
||||
this.handlers = Arrays.asList(handlers);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientResponderFactory.Config strategies(RSocketStrategies strategies) {
|
||||
this.strategies = strategies;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientResponderFactory.Config routeMatcher(RouteMatcher routeMatcher) {
|
||||
this.routeMatcher = routeMatcher;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientResponderFactory.Config metadataExtractor(MetadataExtractor extractor) {
|
||||
this.extractor = extractor;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientResponderFactory.Config returnValueHandler(ReturnValueHandlerConfigurer configurer) {
|
||||
this.returnValueHandlerConfigurer = configurer;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientResponderFactory.Config argumentResolver(ArgumentResolverConfigurer configurer) {
|
||||
this.argumentResolverConfigurer = configurer;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(RSocketFactory.ClientRSocketFactory clientRSocketFactory) {
|
||||
Assert.notEmpty(this.handlers, "handlers should not be empty");
|
||||
RSocketMessageHandler messageHandler = new RSocketMessageHandler();
|
||||
messageHandler.setHandlers(this.handlers);
|
||||
if (this.strategies != null) {
|
||||
messageHandler.setRSocketStrategies(this.strategies);
|
||||
}
|
||||
if (this.routeMatcher != null) {
|
||||
messageHandler.setRouteMatcher(this.routeMatcher);
|
||||
}
|
||||
if (this.extractor != null) {
|
||||
messageHandler.setMetadataExtractor(this.extractor);
|
||||
}
|
||||
if (this.returnValueHandlerConfigurer != null) {
|
||||
messageHandler.setReturnValueHandlerConfigurer(this.returnValueHandlerConfigurer);
|
||||
}
|
||||
if (this.argumentResolverConfigurer != null) {
|
||||
messageHandler.setArgumentResolverConfigurer(this.argumentResolverConfigurer);
|
||||
}
|
||||
messageHandler.afterPropertiesSet();
|
||||
clientRSocketFactory.acceptor(messageHandler.clientResponder());
|
||||
}
|
||||
|
||||
}
|
|
@ -42,6 +42,7 @@ import org.springframework.core.codec.StringDecoder;
|
|||
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||
import org.springframework.messaging.handler.annotation.MessageMapping;
|
||||
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
|
||||
import org.springframework.messaging.rsocket.annotation.support.ClientResponderFactory;
|
||||
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
|
||||
import org.springframework.stereotype.Controller;
|
||||
|
||||
|
@ -102,17 +103,22 @@ public class RSocketServerToClientIntegrationTests {
|
|||
|
||||
ServerController serverController = context.getBean(ServerController.class);
|
||||
serverController.reset();
|
||||
RSocketStrategies rSocketStrategies = context.getBean(RSocketStrategies.class);
|
||||
|
||||
ClientResponderFactory clientResponder = ClientResponderFactory.create()
|
||||
.strategies(rSocketStrategies)
|
||||
.handlers(new ClientHandler());
|
||||
|
||||
RSocketRequester requester = null;
|
||||
try {
|
||||
requester = RSocketRequester.builder()
|
||||
.annotatedHandlers(new ClientHandler())
|
||||
.rsocketFactory(factory -> {
|
||||
factory.metadataMimeType("text/plain");
|
||||
factory.setupPayload(ByteBufPayload.create("", connectionRoute));
|
||||
factory.frameDecoder(PayloadDecoder.ZERO_COPY);
|
||||
})
|
||||
.rsocketStrategies(context.getBean(RSocketStrategies.class))
|
||||
.rsocketFactory(clientResponder)
|
||||
.rsocketStrategies(rSocketStrategies)
|
||||
.connectTcp("localhost", server.address().getPort())
|
||||
.block();
|
||||
|
||||
|
|
Loading…
Reference in New Issue