From 870f61fd8ea75ff46f0892c9ccff347fa6d8b6f3 Mon Sep 17 00:00:00 2001 From: Stephane Maldini Date: Wed, 30 Nov 2016 12:46:01 +0000 Subject: [PATCH] update STOMP support to reactor-netty --- build.gradle | 15 +- .../simp/stomp/Reactor2StompCodec.java | 107 ------ ...t.java => ReactorNettyTcpStompClient.java} | 104 +++-- .../stomp/StompBrokerRelayMessageHandler.java | 17 +- ...bstractMonoToListenableFutureAdapter.java} | 58 ++- ...ava => MonoToListenableFutureAdapter.java} | 12 +- .../tcp/reactor/Reactor2TcpClient.java | 362 ------------------ .../tcp/reactor/ReactorNettyTcpClient.java | 264 +++++++++++++ ...on.java => ReactorNettyTcpConnection.java} | 48 ++- ...a => ReactorNettyTcpStompClientTests.java} | 10 +- ...CodecTests.java => StompDecoderTests.java} | 105 +---- .../simp/stomp/StompEncoderTests.java | 93 +++++ 12 files changed, 498 insertions(+), 697 deletions(-) delete mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2StompCodec.java rename spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/{Reactor2TcpStompClient.java => ReactorNettyTcpStompClient.java} (51%) rename spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/{AbstractPromiseToListenableFutureAdapter.java => AbstractMonoToListenableFutureAdapter.java} (70%) rename spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/{PassThroughPromiseToListenableFutureAdapter.java => MonoToListenableFutureAdapter.java} (72%) delete mode 100644 spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java rename spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/{Reactor2TcpConnection.java => ReactorNettyTcpConnection.java} (57%) rename spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/{Reactor2TcpStompClientTests.java => ReactorNettyTcpStompClientTests.java} (95%) rename spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/{StompCodecTests.java => StompDecoderTests.java} (70%) create mode 100644 spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompEncoderTests.java diff --git a/build.gradle b/build.gradle index 37d9b017124..d56e831e138 100644 --- a/build.gradle +++ b/build.gradle @@ -78,7 +78,6 @@ configure(allprojects) { project -> ext.protobufVersion = "3.1.0" ext.quartzVersion = "2.2.3" ext.reactivestreamsVersion = "1.0.0" - ext.reactorVersion = "2.0.8.RELEASE" ext.reactorCoreVersion = '3.0.3.RELEASE' ext.reactorNettyVersion = '0.6.0.BUILD-SNAPSHOT' ext.romeVersion = "1.7.0" @@ -578,12 +577,8 @@ project("spring-messaging") { compile(project(":spring-core")) compile(project(":spring-context")) optional(project(":spring-oxm")) - optional("io.projectreactor:reactor-core:${reactorVersion}") { - force = true // enforce 2.0.x - } - optional("io.projectreactor:reactor-net:${reactorVersion}") { - exclude group: "io.netty", module: "netty-all" - } + optional("io.projectreactor:reactor-core:${reactorCoreVersion}") + optional("io.projectreactor.ipc:reactor-netty:${reactorNettyVersion}") optional("io.netty:netty-all:${nettyVersion}") optional("org.eclipse.jetty.websocket:websocket-server:${jettyVersion}") { exclude group: "javax.servlet", module: "javax.servlet-api" @@ -1003,10 +998,8 @@ project("spring-websocket") { optional("com.fasterxml.jackson.core:jackson-databind:${jackson2Version}") testCompile("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}") testCompile("org.apache.tomcat.embed:tomcat-embed-websocket:${tomcatVersion}") - testCompile("io.projectreactor:reactor-core:${reactorVersion}") { - force = true // enforce 2.0.x - } - testCompile("io.projectreactor:reactor-net:${reactorVersion}") + testCompile("io.projectreactor:reactor-core:${reactorCoreVersion}") + testCompile("io.projectreactor.ipc:reactor-netty:${reactorNettyVersion}") testCompile("io.netty:netty-all:${nettyVersion}") testCompile("org.slf4j:slf4j-jcl:${slf4jVersion}") testRuntime("org.jboss.xnio:xnio-nio:${xnioVersion}") diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2StompCodec.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2StompCodec.java deleted file mode 100644 index 91d8c4851b7..00000000000 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2StompCodec.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright 2002-2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.messaging.simp.stomp; - -import java.nio.ByteBuffer; - -import reactor.fn.Consumer; -import reactor.fn.Function; -import reactor.io.buffer.Buffer; -import reactor.io.codec.Codec; - -import org.springframework.messaging.Message; -import org.springframework.util.Assert; - -/** - * A Reactor TCP {@link Codec} for sending and receiving STOMP messages. - * - * @author Andy Wilkinson - * @author Rossen Stoyanchev - * @since 4.0 - */ -public class Reactor2StompCodec extends Codec, Message> { - - private final Function, Buffer> encodingFunction; - - private final StompDecoder stompDecoder; - - - public Reactor2StompCodec() { - this(new StompEncoder(), new StompDecoder()); - } - - public Reactor2StompCodec(StompEncoder encoder, StompDecoder decoder) { - Assert.notNull(encoder, "StompEncoder is required"); - Assert.notNull(decoder, "StompDecoder is required"); - this.encodingFunction = new EncodingFunction(encoder); - this.stompDecoder = decoder; - } - - - @Override - public Function> decoder(final Consumer> messageConsumer) { - return new DecodingFunction(this.stompDecoder, messageConsumer); - } - - @Override - public Function, Buffer> encoder() { - return this.encodingFunction; - } - - @Override - public Buffer apply(Message message) { - return this.encodingFunction.apply(message); - } - - - private static class EncodingFunction implements Function, Buffer> { - - private final StompEncoder encoder; - - public EncodingFunction(StompEncoder encoder) { - this.encoder = encoder; - } - - @Override - public Buffer apply(Message message) { - byte[] bytes = this.encoder.encode(message); - return new Buffer(ByteBuffer.wrap(bytes)); - } - } - - - private static class DecodingFunction implements Function> { - - private final StompDecoder decoder; - - private final Consumer> messageConsumer; - - public DecodingFunction(StompDecoder decoder, Consumer> next) { - this.decoder = decoder; - this.messageConsumer = next; - } - - @Override - public Message apply(Buffer buffer) { - for (Message message : this.decoder.decode(buffer.byteBuffer())) { - this.messageConsumer.accept(message); - } - return null; - } - } - -} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClient.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java similarity index 51% rename from spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClient.java rename to spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java index e7eda79eb30..64d072f9eda 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java @@ -16,63 +16,54 @@ package org.springframework.messaging.simp.stomp; -import java.util.Collections; import java.util.List; -import java.util.Properties; +import java.util.function.BiConsumer; +import java.util.function.Function; -import reactor.Environment; -import reactor.core.config.ConfigurationReader; -import reactor.core.config.DispatcherConfiguration; -import reactor.core.config.DispatcherType; -import reactor.core.config.ReactorConfiguration; -import reactor.io.net.NetStreams; -import reactor.io.net.Spec.TcpClientSpec; +import io.netty.buffer.ByteBuf; +import reactor.core.scheduler.Schedulers; import org.springframework.messaging.Message; import org.springframework.messaging.tcp.TcpOperations; -import org.springframework.messaging.tcp.reactor.Reactor2TcpClient; +import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient; import org.springframework.util.concurrent.ListenableFuture; /** * A STOMP over TCP client that uses - * {@link Reactor2TcpClient}. + * {@link ReactorNettyTcpClient}. * * @author Rossen Stoyanchev * @since 4.2 */ -public class Reactor2TcpStompClient extends StompClientSupport { +public class ReactorNettyTcpStompClient extends StompClientSupport { private final TcpOperations tcpClient; - /** * Create an instance with host "127.0.0.1" and port 61613. */ - public Reactor2TcpStompClient() { + public ReactorNettyTcpStompClient() { this("127.0.0.1", 61613); } + /** * Create an instance with the given host and port. * @param host the host * @param port the port */ - public Reactor2TcpStompClient(final String host, final int port) { - ConfigurationReader reader = new StompClientDispatcherConfigReader(); - Environment environment = new Environment(reader).assignErrorJournal(); - StompTcpClientSpecFactory factory = new StompTcpClientSpecFactory(environment, host, port); - this.tcpClient = new Reactor2TcpClient<>(factory); + public ReactorNettyTcpStompClient(final String host, final int port) { + this.tcpClient = create(host, port, new StompDecoder()); } /** * Create an instance with a pre-configured TCP client. * @param tcpClient the client to use */ - public Reactor2TcpStompClient(TcpOperations tcpClient) { + public ReactorNettyTcpStompClient(TcpOperations tcpClient) { this.tcpClient = tcpClient; } - /** * Connect and notify the given {@link StompSessionHandler} when connected * on the STOMP level. @@ -83,6 +74,7 @@ public class Reactor2TcpStompClient extends StompClientSupport { return connect(null, handler); } + /** * An overloaded version of {@link #connect(StompSessionHandler)} that * accepts headers to use for the STOMP CONNECT frame. @@ -103,48 +95,54 @@ public class Reactor2TcpStompClient extends StompClientSupport { this.tcpClient.shutdown(); } - /** - * A ConfigurationReader with a thread pool-based dispatcher. + * Create a new {@link ReactorNettyTcpClient} with Stomp specific configuration for + * encoding, decoding and hand-off. + * + * @param relayHost target host + * @param relayPort target port + * @param decoder {@link StompDecoder} to use + * @return a new {@link TcpOperations} */ - private static class StompClientDispatcherConfigReader implements ConfigurationReader { - - @Override - public ReactorConfiguration read() { - String dispatcherName = "StompClient"; - DispatcherType dispatcherType = DispatcherType.DISPATCHER_GROUP; - DispatcherConfiguration config = new DispatcherConfiguration(dispatcherName, dispatcherType, 128, 0); - List configList = Collections.singletonList(config); - return new ReactorConfiguration(configList, dispatcherName, new Properties()); - } + protected static TcpOperations create(String relayHost, + int relayPort, + StompDecoder decoder) { + return new ReactorNettyTcpClient<>(relayHost, + relayPort, + new ReactorNettyTcpClient.MessageHandlerConfiguration<>(new DecodingFunction( + decoder), + new EncodingConsumer(new StompEncoder()), + 128, + Schedulers.newParallel("StompClient"))); } + private static final class EncodingConsumer + implements BiConsumer> { - private static class StompTcpClientSpecFactory - implements NetStreams.TcpClientFactory, Message> { + private final StompEncoder encoder; - private final Environment environment; - - private final String host; - - private final int port; - - public StompTcpClientSpecFactory(Environment environment, String host, int port) { - this.environment = environment; - this.host = host; - this.port = port; + public EncodingConsumer(StompEncoder encoder) { + this.encoder = encoder; } @Override - public TcpClientSpec, Message> apply( - TcpClientSpec, Message> tcpClientSpec) { - - return tcpClientSpec - .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder())) - .env(this.environment) - .dispatcher(this.environment.getCachedDispatchers("StompClient").get()) - .connect(this.host, this.port); + public void accept(ByteBuf byteBuf, Message message) { + byteBuf.writeBytes(encoder.encode(message)); } } + private static final class DecodingFunction + implements Function>> { + + private final StompDecoder decoder; + + public DecodingFunction(StompDecoder decoder) { + this.decoder = decoder; + } + + @Override + public List> apply(ByteBuf buffer) { + return this.decoder.decode(buffer.nioBuffer()); + } + } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index a4d16467835..b8b9c98a989 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -40,7 +40,7 @@ import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy; import org.springframework.messaging.tcp.TcpConnection; import org.springframework.messaging.tcp.TcpConnectionHandler; import org.springframework.messaging.tcp.TcpOperations; -import org.springframework.messaging.tcp.reactor.Reactor2TcpClient; +import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient; import org.springframework.util.Assert; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; @@ -335,7 +335,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler /** * Configure a TCP client for managing TCP connections to the STOMP broker. - * By default {@link Reactor2TcpClient} is used. + * By default {@link ReactorNettyTcpClient} is used. */ public void setTcpClient(TcpOperations tcpClient) { this.tcpClient = tcpClient; @@ -387,8 +387,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler if (this.tcpClient == null) { StompDecoder decoder = new StompDecoder(); decoder.setHeaderInitializer(getHeaderInitializer()); - Reactor2StompCodec codec = new Reactor2StompCodec(new StompEncoder(), decoder); - this.tcpClient = new StompTcpClientFactory().create(this.relayHost, this.relayPort, codec); + + this.tcpClient = ReactorNettyTcpStompClient.create(this.relayHost, this.relayPort, decoder); } if (logger.isInfoEnabled()) { @@ -970,15 +970,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } - - private static class StompTcpClientFactory { - - public TcpOperations create(String relayHost, int relayPort, Reactor2StompCodec codec) { - return new Reactor2TcpClient<>(relayHost, relayPort, codec); - } - } - - private static class VoidCallable implements Callable { @Override diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractMonoToListenableFutureAdapter.java similarity index 70% rename from spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java rename to spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractMonoToListenableFutureAdapter.java index 7e4a77ad343..2b5ab32d479 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractMonoToListenableFutureAdapter.java @@ -16,12 +16,14 @@ package org.springframework.messaging.tcp.reactor; +import java.time.Duration; +import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import reactor.fn.Consumer; -import reactor.rx.Promise; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; import org.springframework.util.Assert; import org.springframework.util.concurrent.FailureCallback; @@ -31,28 +33,24 @@ import org.springframework.util.concurrent.ListenableFutureCallbackRegistry; import org.springframework.util.concurrent.SuccessCallback; /** - * Adapts a reactor {@link Promise} to {@link ListenableFuture} optionally converting + * Adapts a reactor {@link Mono} to {@link ListenableFuture} optionally converting * the result Object type {@code } to the expected target type {@code }. * * @author Rossen Stoyanchev * @since 4.0 - * @param the type of object expected from the {@link Promise} + * @param the type of object expected from the {@link Mono} * @param the type of object expected from the {@link ListenableFuture} */ -abstract class AbstractPromiseToListenableFutureAdapter implements ListenableFuture { +abstract class AbstractMonoToListenableFutureAdapter + implements ListenableFuture { - private final Promise promise; + private final MonoProcessor promise; private final ListenableFutureCallbackRegistry registry = new ListenableFutureCallbackRegistry<>(); - - protected AbstractPromiseToListenableFutureAdapter(Promise promise) { - Assert.notNull(promise, "Promise must not be null"); - this.promise = promise; - - this.promise.onSuccess(new Consumer() { - @Override - public void accept(S result) { + protected AbstractMonoToListenableFutureAdapter(Mono promise) { + Assert.notNull(promise, "Mono must not be null"); + this.promise = promise.doOnSuccess(result -> { T adapted; try { adapted = adapt(result); @@ -62,46 +60,44 @@ abstract class AbstractPromiseToListenableFutureAdapter implements Listena return; } registry.success(adapted); - } - }); - - this.promise.onError(new Consumer() { - @Override - public void accept(Throwable ex) { - registry.failure(ex); - } - }); + }) + .doOnError(registry::failure) + .subscribe(); } @Override public T get() throws InterruptedException { - S result = this.promise.await(); + S result = this.promise.block(); return adapt(result); } @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - S result = this.promise.await(timeout, unit); - if (!this.promise.isComplete()) { - throw new TimeoutException(); - } + Objects.requireNonNull(unit, "unit"); + S result = this.promise.block(Duration.ofMillis(TimeUnit.MILLISECONDS.convert( + timeout, + unit))); return adapt(result); } @Override public boolean cancel(boolean mayInterruptIfRunning) { - return false; + if (isCancelled()) { + return false; + } + this.promise.cancel(); + return true; } @Override public boolean isCancelled() { - return false; + return this.promise.isCancelled(); } @Override public boolean isDone() { - return this.promise.isComplete(); + return this.promise.isTerminated(); } @Override diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/PassThroughPromiseToListenableFutureAdapter.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/MonoToListenableFutureAdapter.java similarity index 72% rename from spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/PassThroughPromiseToListenableFutureAdapter.java rename to spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/MonoToListenableFutureAdapter.java index 3028307f31c..8ecab4a974d 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/PassThroughPromiseToListenableFutureAdapter.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/MonoToListenableFutureAdapter.java @@ -16,20 +16,22 @@ package org.springframework.messaging.tcp.reactor; -import reactor.rx.Promise; +import reactor.core.publisher.Mono; /** - * A Promise-to-ListenableFutureAdapter where the source and the target from + * A Mono-to-ListenableFutureAdapter where the source and the target from * the Promise and the ListenableFuture respectively are of the same type. * * @author Rossen Stoyanchev + * @author Stephane Maldini * @since 4.0 */ -class PassThroughPromiseToListenableFutureAdapter extends AbstractPromiseToListenableFutureAdapter { +class MonoToListenableFutureAdapter extends + AbstractMonoToListenableFutureAdapter { - public PassThroughPromiseToListenableFutureAdapter(Promise promise) { - super(promise); + public MonoToListenableFutureAdapter(Mono mono) { + super(mono); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java deleted file mode 100644 index 4ff624314e5..00000000000 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java +++ /dev/null @@ -1,362 +0,0 @@ -/* - * Copyright 2002-2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.messaging.tcp.reactor; - -import java.lang.reflect.Method; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Properties; - -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; -import org.reactivestreams.Publisher; -import reactor.Environment; -import reactor.core.config.ConfigurationReader; -import reactor.core.config.DispatcherConfiguration; -import reactor.core.config.ReactorConfiguration; -import reactor.core.support.NamedDaemonThreadFactory; -import reactor.fn.Consumer; -import reactor.fn.Function; -import reactor.fn.tuple.Tuple; -import reactor.fn.tuple.Tuple2; -import reactor.io.buffer.Buffer; -import reactor.io.codec.Codec; -import reactor.io.net.ChannelStream; -import reactor.io.net.NetStreams; -import reactor.io.net.NetStreams.TcpClientFactory; -import reactor.io.net.ReactorChannelHandler; -import reactor.io.net.Reconnect; -import reactor.io.net.Spec.TcpClientSpec; -import reactor.io.net.config.ClientSocketOptions; -import reactor.io.net.impl.netty.NettyClientSocketOptions; -import reactor.io.net.impl.netty.tcp.NettyTcpClient; -import reactor.io.net.tcp.TcpClient; -import reactor.rx.Promise; -import reactor.rx.Promises; -import reactor.rx.Stream; -import reactor.rx.Streams; -import reactor.rx.action.Signal; - -import org.springframework.messaging.Message; -import org.springframework.messaging.tcp.ReconnectStrategy; -import org.springframework.messaging.tcp.TcpConnectionHandler; -import org.springframework.messaging.tcp.TcpOperations; -import org.springframework.util.Assert; -import org.springframework.util.ReflectionUtils; -import org.springframework.util.concurrent.ListenableFuture; - -/** - * An implementation of {@link org.springframework.messaging.tcp.TcpOperations} - * based on the TCP client support of the Reactor project. - * - *

This implementation wraps N (Reactor) clients for N {@link #connect} calls, - * i.e. a separate (Reactor) client instance for each connection. - * - * @author Rossen Stoyanchev - * @author Stephane Maldini - * @since 4.2 - */ -public class Reactor2TcpClient

implements TcpOperations

{ - - @SuppressWarnings("rawtypes") - public static final Class REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class; - - private static final Method eventLoopGroupMethod = initEventLoopGroupMethod(); - - - private final EventLoopGroup eventLoopGroup; - - private final Environment environment; - - private final TcpClientFactory, Message

> tcpClientSpecFactory; - - private final List, Message

>> tcpClients = - new ArrayList<>(); - - private boolean stopping; - - - /** - * A constructor that creates a {@link TcpClientSpec TcpClientSpec} factory - * with a default {@link reactor.core.dispatch.SynchronousDispatcher}, i.e. - * relying on Netty threads. The number of Netty threads can be tweaked with - * the {@code reactor.tcp.ioThreadCount} System property. The network I/O - * threads will be shared amongst the active clients. - *

Also see the constructor accepting a ready Reactor - * {@link TcpClientSpec} {@link Function} factory. - * @param host the host to connect to - * @param port the port to connect to - * @param codec the codec to use for encoding and decoding the TCP stream - */ - public Reactor2TcpClient(final String host, final int port, final Codec, Message

> codec) { - // Reactor 2.0.5 requires NioEventLoopGroup vs 2.0.6+ requires EventLoopGroup - final NioEventLoopGroup nioEventLoopGroup = initEventLoopGroup(); - this.eventLoopGroup = nioEventLoopGroup; - this.environment = new Environment(new SynchronousDispatcherConfigReader()); - - this.tcpClientSpecFactory = new TcpClientFactory, Message

>() { - @Override - public TcpClientSpec, Message

> apply(TcpClientSpec, Message

> spec) { - return spec - .env(environment) - .codec(codec) - .connect(host, port) - .options(createClientSocketOptions()); - } - - private ClientSocketOptions createClientSocketOptions() { - return (ClientSocketOptions) ReflectionUtils.invokeMethod(eventLoopGroupMethod, - new NettyClientSocketOptions(), nioEventLoopGroup); - } - }; - } - - /** - * A constructor with a pre-configured {@link TcpClientSpec} {@link Function} - * factory. This might be used to add SSL or specific network parameters to - * the generated client configuration. - *

NOTE: if the client is configured with a thread-creating - * dispatcher, you are responsible for cleaning them, e.g. using - * {@link reactor.core.Dispatcher#shutdown}. - * @param tcpClientSpecFactory the TcpClientSpec {@link Function} to use for each client creation - */ - public Reactor2TcpClient(TcpClientFactory, Message

> tcpClientSpecFactory) { - Assert.notNull(tcpClientSpecFactory, "'tcpClientClientFactory' must not be null"); - this.tcpClientSpecFactory = tcpClientSpecFactory; - this.eventLoopGroup = null; - this.environment = null; - } - - - private static NioEventLoopGroup initEventLoopGroup() { - int ioThreadCount; - try { - ioThreadCount = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount")); - } - catch (Throwable ex) { - ioThreadCount = -1; - } - if (ioThreadCount <= 0) { - ioThreadCount = Runtime.getRuntime().availableProcessors(); - } - return new NioEventLoopGroup(ioThreadCount, new NamedDaemonThreadFactory("reactor-tcp-io")); - } - - - @Override - public ListenableFuture connect(final TcpConnectionHandler

connectionHandler) { - Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null"); - - final TcpClient, Message

> tcpClient; - final Runnable cleanupTask; - synchronized (this.tcpClients) { - if (this.stopping) { - IllegalStateException ex = new IllegalStateException("Shutting down."); - connectionHandler.afterConnectFailure(ex); - return new PassThroughPromiseToListenableFutureAdapter<>(Promises.error(ex)); - } - tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory); - this.tcpClients.add(tcpClient); - cleanupTask = new Runnable() { - @Override - public void run() { - synchronized (tcpClients) { - tcpClients.remove(tcpClient); - } - } - }; - } - - Promise promise = tcpClient.start( - new MessageChannelStreamHandler<>(connectionHandler, cleanupTask)); - - return new PassThroughPromiseToListenableFutureAdapter<>( - promise.onError(new Consumer() { - @Override - public void accept(Throwable ex) { - cleanupTask.run(); - connectionHandler.afterConnectFailure(ex); - } - }) - ); - } - - @Override - public ListenableFuture connect(TcpConnectionHandler

connectionHandler, ReconnectStrategy strategy) { - Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null"); - Assert.notNull(strategy, "ReconnectStrategy must not be null"); - - final TcpClient, Message

> tcpClient; - Runnable cleanupTask; - synchronized (this.tcpClients) { - if (this.stopping) { - IllegalStateException ex = new IllegalStateException("Shutting down."); - connectionHandler.afterConnectFailure(ex); - return new PassThroughPromiseToListenableFutureAdapter<>(Promises.error(ex)); - } - tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory); - this.tcpClients.add(tcpClient); - cleanupTask = new Runnable() { - @Override - public void run() { - synchronized (tcpClients) { - tcpClients.remove(tcpClient); - } - } - }; - } - - Stream> stream = tcpClient.start( - new MessageChannelStreamHandler<>(connectionHandler, cleanupTask), - new ReactorReconnectAdapter(strategy)); - - return new PassThroughPromiseToListenableFutureAdapter<>(stream.next().after()); - } - - @Override - public ListenableFuture shutdown() { - synchronized (this.tcpClients) { - this.stopping = true; - } - - Promise promise = Streams.from(this.tcpClients) - .flatMap(new Function, Message

>, Promise>() { - @Override - public Promise apply(final TcpClient, Message

> client) { - return client.shutdown().onComplete(new Consumer>() { - @Override - public void accept(Promise voidPromise) { - tcpClients.remove(client); - } - }); - } - }) - .next(); - - if (this.eventLoopGroup != null) { - final Promise eventLoopPromise = Promises.prepare(); - promise.onComplete(new Consumer>() { - @Override - public void accept(Promise voidPromise) { - eventLoopGroup.shutdownGracefully().addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (future.isSuccess()) { - eventLoopPromise.onComplete(); - } - else { - eventLoopPromise.onError(future.cause()); - } - } - }); - } - }); - promise = eventLoopPromise; - } - - if (this.environment != null) { - promise.onComplete(new Consumer>() { - @Override - public void accept(Promise voidPromise) { - environment.shutdown(); - } - }); - } - - return new PassThroughPromiseToListenableFutureAdapter<>(promise); - } - - - private static Method initEventLoopGroupMethod() { - for (Method method : NettyClientSocketOptions.class.getMethods()) { - if (method.getName().equals("eventLoopGroup") && method.getParameterCount() == 1) { - return method; - } - } - throw new IllegalStateException("No compatible Reactor version found"); - } - - - private static class SynchronousDispatcherConfigReader implements ConfigurationReader { - - @Override - public ReactorConfiguration read() { - return new ReactorConfiguration(Collections.emptyList(), "sync", new Properties()); - } - } - - - private static class MessageChannelStreamHandler

- implements ReactorChannelHandler, Message

, ChannelStream, Message

>> { - - private final TcpConnectionHandler

connectionHandler; - - private final Runnable cleanupTask; - - public MessageChannelStreamHandler(TcpConnectionHandler

connectionHandler, Runnable cleanupTask) { - this.connectionHandler = connectionHandler; - this.cleanupTask = cleanupTask; - } - - @Override - public Publisher apply(ChannelStream, Message

> channelStream) { - Promise closePromise = Promises.prepare(); - this.connectionHandler.afterConnected(new Reactor2TcpConnection<>(channelStream, closePromise)); - channelStream - .finallyDo(new Consumer>>() { - @Override - public void accept(Signal> signal) { - cleanupTask.run(); - if (signal.isOnError()) { - connectionHandler.handleFailure(signal.getThrowable()); - } - else if (signal.isOnComplete()) { - connectionHandler.afterConnectionClosed(); - } - } - }) - .consume(new Consumer>() { - @Override - public void accept(Message

message) { - connectionHandler.handleMessage(message); - } - }); - - return closePromise; - } - } - - - private static class ReactorReconnectAdapter implements Reconnect { - - private final ReconnectStrategy strategy; - - public ReactorReconnectAdapter(ReconnectStrategy strategy) { - this.strategy = strategy; - } - - @Override - public Tuple2 reconnect(InetSocketAddress address, int attempt) { - return Tuple.of(address, this.strategy.getTimeToNextAttempt(attempt)); - } - } - -} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java new file mode 100644 index 00000000000..0b93368e8ce --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -0,0 +1,264 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.tcp.reactor; + +import java.util.Collection; +import java.util.Objects; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.util.concurrent.ImmediateEventExecutor; +import org.reactivestreams.Publisher; +import reactor.core.publisher.DirectProcessor; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; +import reactor.core.scheduler.Scheduler; +import reactor.ipc.netty.ChannelFutureMono; +import reactor.ipc.netty.NettyContext; +import reactor.ipc.netty.NettyInbound; +import reactor.ipc.netty.NettyOutbound; +import reactor.ipc.netty.options.ClientOptions; +import reactor.ipc.netty.tcp.TcpClient; +import reactor.util.concurrent.QueueSupplier; + +import org.springframework.messaging.Message; +import org.springframework.messaging.tcp.ReconnectStrategy; +import org.springframework.messaging.tcp.TcpConnection; +import org.springframework.messaging.tcp.TcpConnectionHandler; +import org.springframework.messaging.tcp.TcpOperations; +import org.springframework.util.Assert; +import org.springframework.util.concurrent.ListenableFuture; + +/** + * An implementation of {@link org.springframework.messaging.tcp.TcpOperations} + * based on the TCP client support of the Reactor project. + *

+ *

This implementation wraps N (Reactor) clients for N {@link #connect} calls, + * i.e. a separate (Reactor) client instance for each connection. + * + * @author Rossen Stoyanchev + * @author Stephane Maldini + * @since 4.2 + */ +public class ReactorNettyTcpClient

implements TcpOperations

{ + + private final TcpClient tcpClient; + + private final MessageHandlerConfiguration

configuration; + private final ChannelGroup group; + private volatile boolean stopping; + + /** + * A constructor that creates a {@link TcpClient TcpClient} factory relying on + * Reactor Netty TCP threads. The number of Netty threads can be tweaked with + * the {@code reactor.tcp.ioThreadCount} System property. The network I/O + * threads will be shared amongst the active clients. + *

Also see the constructor accepting a {@link Consumer} of + * {@link ClientOptions} for advanced tuning. + * + * @param host the host to connect to + * @param port the port to connect to + * @param configuration the client configuration + */ + public ReactorNettyTcpClient(String host, + int port, + MessageHandlerConfiguration

configuration) { + this.configuration = Objects.requireNonNull(configuration, "configuration"); + this.group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); + this.tcpClient = TcpClient.create(options -> options.connect(host, port) + .channelGroup(group)); + } + + /** + * A constructor with a configurator {@link Consumer} that will receive default {@link + * ClientOptions} from {@link TcpClient}. This might be used to add SSL or specific + * network parameters to the generated client configuration. + * + * @param tcpOptions the {@link Consumer} of {@link ClientOptions} shared to use by + * connected handlers. + * @param configuration the client configuration + */ + public ReactorNettyTcpClient(Consumer tcpOptions, + MessageHandlerConfiguration

configuration) { + this.configuration = Objects.requireNonNull(configuration, "configuration"); + this.group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); + this.tcpClient = + TcpClient.create(opts -> tcpOptions.accept(opts.channelGroup(group))); + } + + @Override + public ListenableFuture connect(final TcpConnectionHandler

connectionHandler) { + Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null"); + if (stopping) { + IllegalStateException ex = new IllegalStateException("Shutting down."); + connectionHandler.afterConnectFailure(ex); + return new MonoToListenableFutureAdapter<>(Mono.error(ex)); + } + + MessageHandler

handler = + new MessageHandler<>(connectionHandler, configuration); + + Mono promise = tcpClient.newHandler(handler) + .doOnError(connectionHandler::afterConnectFailure) + .then(); + + return new MonoToListenableFutureAdapter<>(promise); + } + + @Override + public ListenableFuture connect(TcpConnectionHandler

connectionHandler, + ReconnectStrategy strategy) { + Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null"); + Assert.notNull(strategy, "ReconnectStrategy must not be null"); + + if (stopping) { + IllegalStateException ex = new IllegalStateException("Shutting down."); + connectionHandler.afterConnectFailure(ex); + return new MonoToListenableFutureAdapter<>(Mono.error(ex)); + } + + MessageHandler

handler = + new MessageHandler<>(connectionHandler, configuration); + + MonoProcessor promise = MonoProcessor.create(); + + tcpClient.newHandler(handler) + .doOnNext(e -> { + if (!promise.isTerminated()) { + promise.onComplete(); + } + }) + .doOnError(e -> { + if (!promise.isTerminated()) { + promise.onError(e); + } + }) + .then(NettyContext::onClose) + .retryWhen(new Reconnector<>(strategy)) + .repeatWhen(new Reconnector<>(strategy)) + .subscribe(); + + return new MonoToListenableFutureAdapter<>(promise); + } + + @Override + public ListenableFuture shutdown() { + if (stopping) { + return new MonoToListenableFutureAdapter<>(Mono.empty()); + } + + stopping = true; + + Mono closing = ChannelFutureMono.from(group.close()); + + if (configuration.scheduler != null) { + closing = + closing.doAfterTerminate((x, e) -> configuration.scheduler.shutdown()); + } + + return new MonoToListenableFutureAdapter<>(closing); + } + + /** + * A configuration holder + */ + public static final class MessageHandlerConfiguration

{ + + private final Function>> decoder; + private final BiConsumer> encoder; + private final int backlog; + private final Scheduler + scheduler; + + public MessageHandlerConfiguration(Function>> decoder, + BiConsumer> encoder, + int backlog, + Scheduler scheduler) { + this.decoder = decoder; + this.encoder = encoder; + this.backlog = backlog > 0 ? backlog : QueueSupplier.SMALL_BUFFER_SIZE; + this.scheduler = scheduler; + } + } + + private static final class MessageHandler

implements BiFunction> { + + private final TcpConnectionHandler

connectionHandler; + + private final MessageHandlerConfiguration

configuration; + + MessageHandler(TcpConnectionHandler

connectionHandler, + MessageHandlerConfiguration

configuration) { + this.connectionHandler = connectionHandler; + this.configuration = configuration; + } + + @Override + public Publisher apply(NettyInbound in, NettyOutbound out) { + Flux>> inbound = in.receive() + .map(configuration.decoder); + + DirectProcessor promise = DirectProcessor.create(); + TcpConnection

tcpConnection = new ReactorNettyTcpConnection<>(in, + out, + configuration.encoder, + promise); + + if (configuration.scheduler != null) { + configuration.scheduler.schedule(() -> connectionHandler.afterConnected( + tcpConnection)); + inbound = + inbound.publishOn(configuration.scheduler, configuration.backlog); + } + else { + connectionHandler.afterConnected(tcpConnection); + } + + inbound.flatMapIterable(Function.identity()) + .subscribe(connectionHandler::handleMessage, + connectionHandler::handleFailure, + connectionHandler::afterConnectionClosed); + + return promise; + } + + } + + static final class Reconnector implements Function, Publisher> { + + private final ReconnectStrategy strategy; + + Reconnector(ReconnectStrategy strategy) { + this.strategy = strategy; + } + + @Override + public Publisher apply(Flux flux) { + return flux.scan(1, (p, e) -> p++) + .doOnCancel(() -> new Exception().printStackTrace()) + .flatMap(attempt -> Mono.delayMillis(strategy.getTimeToNextAttempt( + attempt))); + } + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java similarity index 57% rename from spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpConnection.java rename to spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java index 3ef2de416d3..7fe758e842c 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpConnection.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java @@ -16,10 +16,13 @@ package org.springframework.messaging.tcp.reactor; -import reactor.io.net.ChannelStream; -import reactor.rx.Promise; -import reactor.rx.Promises; -import reactor.rx.Streams; +import java.util.function.BiConsumer; + +import io.netty.buffer.ByteBuf; +import reactor.core.publisher.DirectProcessor; +import reactor.core.publisher.Mono; +import reactor.ipc.netty.NettyInbound; +import reactor.ipc.netty.NettyOutbound; import org.springframework.messaging.Message; import org.springframework.messaging.tcp.TcpConnection; @@ -29,45 +32,50 @@ import org.springframework.util.concurrent.ListenableFuture; * An implementation of {@link org.springframework.messaging.tcp.TcpConnection * TcpConnection} based on the TCP client support of the Reactor project. * + * @param

the payload type of messages read or written to the TCP stream. + * * @author Rossen Stoyanchev * @since 4.2 - * @param

the payload type of messages read or written to the TCP stream. */ -public class Reactor2TcpConnection

implements TcpConnection

{ +public class ReactorNettyTcpConnection

implements TcpConnection

{ - private final ChannelStream, Message

> channelStream; + private final NettyInbound in; + private final NettyOutbound out; + private final DirectProcessor close; + private final BiConsumer> encoder; - private final Promise closePromise; - - - public Reactor2TcpConnection(ChannelStream, Message

> channelStream, Promise closePromise) { - this.channelStream = channelStream; - this.closePromise = closePromise; + public ReactorNettyTcpConnection(NettyInbound in, + NettyOutbound out, + BiConsumer> encoder, + DirectProcessor close) { + this.out = out; + this.in = in; + this.encoder = encoder; + this.close = close; } - @Override public ListenableFuture send(Message

message) { - Promise afterWrite = Promises.prepare(); - this.channelStream.writeWith(Streams.just(message)).subscribe(afterWrite); - return new PassThroughPromiseToListenableFutureAdapter<>(afterWrite); + ByteBuf byteBuf = in.channel().alloc().buffer(); + encoder.accept(byteBuf, message); + return new MonoToListenableFutureAdapter<>(out.send(Mono.just(byteBuf))); } @Override @SuppressWarnings("deprecation") public void onReadInactivity(Runnable runnable, long inactivityDuration) { - this.channelStream.on().readIdle(inactivityDuration, reactor.fn.Functions.consumer(runnable)); + in.onReadIdle(inactivityDuration, runnable); } @Override @SuppressWarnings("deprecation") public void onWriteInactivity(Runnable runnable, long inactivityDuration) { - this.channelStream.on().writeIdle(inactivityDuration, reactor.fn.Functions.consumer(runnable)); + out.onWriteIdle(inactivityDuration, runnable); } @Override public void close() { - this.closePromise.onComplete(); + close.onComplete(); } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClientTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClientTests.java similarity index 95% rename from spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClientTests.java rename to spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClientTests.java index 94bfce4be33..bdfe762af13 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClientTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClientTests.java @@ -43,20 +43,20 @@ import static org.hamcrest.Matchers.*; import static org.junit.Assert.*; /** - * Integration tests for {@link Reactor2TcpStompClient}. + * Integration tests for {@link ReactorNettyTcpStompClient}. * * @author Rossen Stoyanchev */ -public class Reactor2TcpStompClientTests { +public class ReactorNettyTcpStompClientTests { - private static final Log logger = LogFactory.getLog(Reactor2TcpStompClientTests.class); + private static final Log logger = LogFactory.getLog(ReactorNettyTcpStompClientTests.class); @Rule public final TestName testName = new TestName(); private BrokerService activeMQBroker; - private Reactor2TcpStompClient client; + private ReactorNettyTcpStompClient client; @Before @@ -78,7 +78,7 @@ public class Reactor2TcpStompClientTests { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.afterPropertiesSet(); - this.client = new Reactor2TcpStompClient("127.0.0.1", port); + this.client = new ReactorNettyTcpStompClient("127.0.0.1", port); this.client.setMessageConverter(new StringMessageConverter()); this.client.setTaskScheduler(taskScheduler); } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompDecoderTests.java similarity index 70% rename from spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java rename to spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompDecoderTests.java index 78d34d40847..98816ef77fc 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompDecoderTests.java @@ -17,7 +17,7 @@ package org.springframework.messaging.simp.stomp; import java.io.UnsupportedEncodingException; -import java.util.ArrayList; +import java.nio.ByteBuffer; import java.util.List; import org.junit.Test; @@ -26,22 +26,18 @@ import org.springframework.messaging.Message; import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.InvalidMimeTypeException; -import reactor.fn.Consumer; -import reactor.fn.Function; -import reactor.io.buffer.Buffer; import static org.junit.Assert.*; /** - * Test fixture for {@link Reactor2StompCodec}. + * Test fixture for {@link StompDecoder}. * * @author Andy Wilkinson + * @author Stephane Maldini */ -public class StompCodecTests { +public class StompDecoderTests { - private final ArgumentCapturingConsumer> consumer = new ArgumentCapturingConsumer<>(); - - private final Function> decoder = new Reactor2StompCodec().decoder(consumer); + private final StompDecoder decoder = new StompDecoder(); @Test public void decodeFrameWithCrLfEols() { @@ -172,11 +168,9 @@ public class StompCodecTests { public void decodeMultipleFramesFromSameBuffer() { String frame1 = "SEND\ndestination:test\n\nThe body of the message\0"; String frame2 = "DISCONNECT\n\n\0"; + ByteBuffer buffer = ByteBuffer.wrap((frame1 + frame2).getBytes()); - Buffer buffer = Buffer.wrap(frame1 + frame2); - - final List> messages = new ArrayList<>(); - new Reactor2StompCodec().decoder(messages::add).apply(buffer); + final List> messages = decoder.decode(buffer); assertEquals(2, messages.size()); assertEquals(StompCommand.SEND, StompHeaderAccessor.wrap(messages.get(0)).getCommand()); @@ -245,102 +239,33 @@ public class StompCodecTests { public void decodeHeartbeat() { String frame = "\n"; - Buffer buffer = Buffer.wrap(frame); + ByteBuffer buffer = ByteBuffer.wrap(frame.getBytes()); - final List> messages = new ArrayList<>(); - new Reactor2StompCodec().decoder(messages::add).apply(buffer); + final List> messages = decoder.decode(buffer); assertEquals(1, messages.size()); assertEquals(SimpMessageType.HEARTBEAT, StompHeaderAccessor.wrap(messages.get(0)).getMessageType()); } - @Test - public void encodeFrameWithNoHeadersAndNoBody() { - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); - - Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); - - assertEquals("DISCONNECT\n\n\0", new Reactor2StompCodec().encoder().apply(frame).asString()); - } - - @Test - public void encodeFrameWithHeaders() { - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); - headers.setAcceptVersion("1.2"); - headers.setHost("github.org"); - - Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); - - String frameString = new Reactor2StompCodec().encoder().apply(frame).asString(); - - assertTrue(frameString.equals("CONNECT\naccept-version:1.2\nhost:github.org\n\n\0") || - frameString.equals("CONNECT\nhost:github.org\naccept-version:1.2\n\n\0")); - } - - @Test - public void encodeFrameWithHeadersThatShouldBeEscaped() { - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); - headers.addNativeHeader("a:\r\n\\b", "alpha:bravo\r\n\\"); - - Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); - - assertEquals("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0", - new Reactor2StompCodec().encoder().apply(frame).asString()); - } - - @Test - public void encodeFrameWithHeadersBody() { - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); - headers.addNativeHeader("a", "alpha"); - - Message frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders()); - - assertEquals("SEND\na:alpha\ncontent-length:12\n\nMessage body\0", - new Reactor2StompCodec().encoder().apply(frame).asString()); - } - - @Test - public void encodeFrameWithContentLengthPresent() { - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); - headers.setContentLength(12); - - Message frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders()); - - assertEquals("SEND\ncontent-length:12\n\nMessage body\0", - new Reactor2StompCodec().encoder().apply(frame).asString()); - } - private void assertIncompleteDecode(String partialFrame) { - Buffer buffer = Buffer.wrap(partialFrame); + ByteBuffer buffer = ByteBuffer.wrap(partialFrame.getBytes()); assertNull(decode(buffer)); assertEquals(0, buffer.position()); } private Message decode(String stompFrame) { - Buffer buffer = Buffer.wrap(stompFrame); + ByteBuffer buffer = ByteBuffer.wrap(stompFrame.getBytes()); return decode(buffer); } - private Message decode(Buffer buffer) { - this.decoder.apply(buffer); - if (consumer.arguments.isEmpty()) { + private Message decode(ByteBuffer buffer) { + List> messages = this.decoder.decode(buffer); + if (messages.isEmpty()) { return null; } else { - return consumer.arguments.get(0); + return messages.get(0); } } - - - private static final class ArgumentCapturingConsumer implements Consumer { - - private final List arguments = new ArrayList<>(); - - @Override - public void accept(T t) { - arguments.add(t); - } - - } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompEncoderTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompEncoderTests.java new file mode 100644 index 00000000000..ef6565cdd3e --- /dev/null +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompEncoderTests.java @@ -0,0 +1,93 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.stomp; + +import org.junit.Test; + +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test fixture for {@link StompEncoder}. + * + * @author Andy Wilkinson + * @author Stephane Maldini + */ +public class StompEncoderTests { + + private final StompEncoder encoder = new StompEncoder(); + + @Test + public void encodeFrameWithNoHeadersAndNoBody() { + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); + + Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); + + assertEquals("DISCONNECT\n\n\0", new String(encoder.encode(frame))); + } + + @Test + public void encodeFrameWithHeaders() { + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); + headers.setAcceptVersion("1.2"); + headers.setHost("github.org"); + + Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); + + String frameString = new String(encoder.encode(frame)); + + assertTrue("CONNECT\naccept-version:1.2\nhost:github.org\n\n\0".equals(frameString) || "CONNECT\nhost:github.org\naccept-version:1.2\n\n\0".equals( + frameString)); + } + + @Test + public void encodeFrameWithHeadersThatShouldBeEscaped() { + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); + headers.addNativeHeader("a:\r\n\\b", "alpha:bravo\r\n\\"); + + Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); + + assertEquals("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0", + new String(encoder.encode(frame))); + } + + @Test + public void encodeFrameWithHeadersBody() { + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); + headers.addNativeHeader("a", "alpha"); + + Message frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders()); + + assertEquals("SEND\na:alpha\ncontent-length:12\n\nMessage body\0", + new String(encoder.encode(frame))); + } + + @Test + public void encodeFrameWithContentLengthPresent() { + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); + headers.setContentLength(12); + + Message frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders()); + + assertEquals("SEND\ncontent-length:12\n\nMessage body\0", + new String(encoder.encode(frame))); + } + +}