parent
3cf5db6317
commit
68c99dafcf
|
@ -125,7 +125,7 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
|
|||
messageHandler.setHandlers(this.handlers);
|
||||
messageHandler.setRSocketStrategies(rsocketStrategies);
|
||||
messageHandler.afterPropertiesSet();
|
||||
rsocketFactory.acceptor(messageHandler.clientAcceptor());
|
||||
rsocketFactory.acceptor(messageHandler.clientResponder());
|
||||
}
|
||||
rsocketFactory.frameDecoder(PayloadDecoder.ZERO_COPY);
|
||||
this.factoryConfigurers.forEach(consumer -> consumer.accept(rsocketFactory));
|
||||
|
|
|
@ -53,7 +53,7 @@ import org.springframework.util.StringUtils;
|
|||
* Extension of {@link MessageMappingMessageHandler} to use as an RSocket
|
||||
* responder by handling incoming streams via {@code @MessageMapping} annotated
|
||||
* methods.
|
||||
* <p>Use {@link #clientAcceptor()} and {@link #serverAcceptor()} to obtain
|
||||
* <p>Use {@link #clientResponder()} and {@link #serverResponder()} to obtain
|
||||
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory#acceptor(Function) client} or
|
||||
* {@link io.rsocket.RSocketFactory.ServerRSocketFactory#acceptor(SocketAcceptor) server}
|
||||
* side adapters.
|
||||
|
@ -270,7 +270,7 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
|
|||
* <p>Subsequent stream requests can be handled with
|
||||
* {@link MessageMapping MessageMapping} methods.
|
||||
*/
|
||||
public SocketAcceptor serverAcceptor() {
|
||||
public SocketAcceptor serverResponder() {
|
||||
return (setupPayload, sendingRSocket) -> {
|
||||
MessagingRSocket responder = createResponder(setupPayload, sendingRSocket);
|
||||
return responder.handleConnectionSetupPayload(setupPayload).then(Mono.just(responder));
|
||||
|
@ -291,7 +291,7 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
|
|||
* <p>Subsequent stream requests can be handled with
|
||||
* {@link MessageMapping MessageMapping} methods.
|
||||
*/
|
||||
public BiFunction<ConnectionSetupPayload, RSocket, RSocket> clientAcceptor() {
|
||||
public BiFunction<ConnectionSetupPayload, RSocket, RSocket> clientResponder() {
|
||||
return (setupPayload, sendingRSocket) -> {
|
||||
MessagingRSocket responder = createResponder(setupPayload, sendingRSocket);
|
||||
responder.handleConnectionSetupPayload(setupPayload).subscribe();
|
||||
|
|
|
@ -79,7 +79,7 @@ public class RSocketBufferLeakTests {
|
|||
server = RSocketFactory.receive()
|
||||
.frameDecoder(PayloadDecoder.ZERO_COPY)
|
||||
.addResponderPlugin(payloadInterceptor) // intercept responding
|
||||
.acceptor(context.getBean(RSocketMessageHandler.class).serverAcceptor())
|
||||
.acceptor(context.getBean(RSocketMessageHandler.class).serverResponder())
|
||||
.transport(TcpServerTransport.create("localhost", 7000))
|
||||
.start()
|
||||
.block();
|
||||
|
|
|
@ -68,7 +68,7 @@ public class RSocketClientToServerIntegrationTests {
|
|||
server = RSocketFactory.receive()
|
||||
.addResponderPlugin(interceptor)
|
||||
.frameDecoder(PayloadDecoder.ZERO_COPY)
|
||||
.acceptor(context.getBean(RSocketMessageHandler.class).serverAcceptor())
|
||||
.acceptor(context.getBean(RSocketMessageHandler.class).serverResponder())
|
||||
.transport(TcpServerTransport.create("localhost", 7000))
|
||||
.start()
|
||||
.block();
|
||||
|
|
|
@ -65,7 +65,7 @@ public class RSocketServerToClientIntegrationTests {
|
|||
|
||||
server = RSocketFactory.receive()
|
||||
.frameDecoder(PayloadDecoder.ZERO_COPY)
|
||||
.acceptor(context.getBean(RSocketMessageHandler.class).serverAcceptor())
|
||||
.acceptor(context.getBean(RSocketMessageHandler.class).serverResponder())
|
||||
.transport(TcpServerTransport.create("localhost", 0))
|
||||
.start()
|
||||
.block();
|
||||
|
|
|
@ -202,7 +202,7 @@ class RSocketClientToServerCoroutinesIntegrationTests {
|
|||
server = RSocketFactory.receive()
|
||||
.addResponderPlugin(interceptor)
|
||||
.frameDecoder(PayloadDecoder.ZERO_COPY)
|
||||
.acceptor(context.getBean(RSocketMessageHandler::class.java).serverAcceptor())
|
||||
.acceptor(context.getBean(RSocketMessageHandler::class.java).serverResponder())
|
||||
.transport(TcpServerTransport.create("localhost", 7000))
|
||||
.start()
|
||||
.block()!!
|
||||
|
|
Loading…
Reference in New Issue