update STOMP support to reactor-netty
This commit is contained in:
parent
845dbf040d
commit
870f61fd8e
15
build.gradle
15
build.gradle
|
|
@ -78,7 +78,6 @@ configure(allprojects) { project ->
|
|||
ext.protobufVersion = "3.1.0"
|
||||
ext.quartzVersion = "2.2.3"
|
||||
ext.reactivestreamsVersion = "1.0.0"
|
||||
ext.reactorVersion = "2.0.8.RELEASE"
|
||||
ext.reactorCoreVersion = '3.0.3.RELEASE'
|
||||
ext.reactorNettyVersion = '0.6.0.BUILD-SNAPSHOT'
|
||||
ext.romeVersion = "1.7.0"
|
||||
|
|
@ -578,12 +577,8 @@ project("spring-messaging") {
|
|||
compile(project(":spring-core"))
|
||||
compile(project(":spring-context"))
|
||||
optional(project(":spring-oxm"))
|
||||
optional("io.projectreactor:reactor-core:${reactorVersion}") {
|
||||
force = true // enforce 2.0.x
|
||||
}
|
||||
optional("io.projectreactor:reactor-net:${reactorVersion}") {
|
||||
exclude group: "io.netty", module: "netty-all"
|
||||
}
|
||||
optional("io.projectreactor:reactor-core:${reactorCoreVersion}")
|
||||
optional("io.projectreactor.ipc:reactor-netty:${reactorNettyVersion}")
|
||||
optional("io.netty:netty-all:${nettyVersion}")
|
||||
optional("org.eclipse.jetty.websocket:websocket-server:${jettyVersion}") {
|
||||
exclude group: "javax.servlet", module: "javax.servlet-api"
|
||||
|
|
@ -1003,10 +998,8 @@ project("spring-websocket") {
|
|||
optional("com.fasterxml.jackson.core:jackson-databind:${jackson2Version}")
|
||||
testCompile("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}")
|
||||
testCompile("org.apache.tomcat.embed:tomcat-embed-websocket:${tomcatVersion}")
|
||||
testCompile("io.projectreactor:reactor-core:${reactorVersion}") {
|
||||
force = true // enforce 2.0.x
|
||||
}
|
||||
testCompile("io.projectreactor:reactor-net:${reactorVersion}")
|
||||
testCompile("io.projectreactor:reactor-core:${reactorCoreVersion}")
|
||||
testCompile("io.projectreactor.ipc:reactor-netty:${reactorNettyVersion}")
|
||||
testCompile("io.netty:netty-all:${nettyVersion}")
|
||||
testCompile("org.slf4j:slf4j-jcl:${slf4jVersion}")
|
||||
testRuntime("org.jboss.xnio:xnio-nio:${xnioVersion}")
|
||||
|
|
|
|||
|
|
@ -1,107 +0,0 @@
|
|||
/*
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.messaging.simp.stomp;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import reactor.fn.Consumer;
|
||||
import reactor.fn.Function;
|
||||
import reactor.io.buffer.Buffer;
|
||||
import reactor.io.codec.Codec;
|
||||
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* A Reactor TCP {@link Codec} for sending and receiving STOMP messages.
|
||||
*
|
||||
* @author Andy Wilkinson
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 4.0
|
||||
*/
|
||||
public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<byte[]>> {
|
||||
|
||||
private final Function<Message<byte[]>, Buffer> encodingFunction;
|
||||
|
||||
private final StompDecoder stompDecoder;
|
||||
|
||||
|
||||
public Reactor2StompCodec() {
|
||||
this(new StompEncoder(), new StompDecoder());
|
||||
}
|
||||
|
||||
public Reactor2StompCodec(StompEncoder encoder, StompDecoder decoder) {
|
||||
Assert.notNull(encoder, "StompEncoder is required");
|
||||
Assert.notNull(decoder, "StompDecoder is required");
|
||||
this.encodingFunction = new EncodingFunction(encoder);
|
||||
this.stompDecoder = decoder;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Function<Buffer, Message<byte[]>> decoder(final Consumer<Message<byte[]>> messageConsumer) {
|
||||
return new DecodingFunction(this.stompDecoder, messageConsumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Function<Message<byte[]>, Buffer> encoder() {
|
||||
return this.encodingFunction;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer apply(Message<byte[]> message) {
|
||||
return this.encodingFunction.apply(message);
|
||||
}
|
||||
|
||||
|
||||
private static class EncodingFunction implements Function<Message<byte[]>, Buffer> {
|
||||
|
||||
private final StompEncoder encoder;
|
||||
|
||||
public EncodingFunction(StompEncoder encoder) {
|
||||
this.encoder = encoder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer apply(Message<byte[]> message) {
|
||||
byte[] bytes = this.encoder.encode(message);
|
||||
return new Buffer(ByteBuffer.wrap(bytes));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static class DecodingFunction implements Function<Buffer, Message<byte[]>> {
|
||||
|
||||
private final StompDecoder decoder;
|
||||
|
||||
private final Consumer<Message<byte[]>> messageConsumer;
|
||||
|
||||
public DecodingFunction(StompDecoder decoder, Consumer<Message<byte[]>> next) {
|
||||
this.decoder = decoder;
|
||||
this.messageConsumer = next;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message<byte[]> apply(Buffer buffer) {
|
||||
for (Message<byte[]> message : this.decoder.decode(buffer.byteBuffer())) {
|
||||
this.messageConsumer.accept(message);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -16,63 +16,54 @@
|
|||
|
||||
package org.springframework.messaging.simp.stomp;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import reactor.Environment;
|
||||
import reactor.core.config.ConfigurationReader;
|
||||
import reactor.core.config.DispatcherConfiguration;
|
||||
import reactor.core.config.DispatcherType;
|
||||
import reactor.core.config.ReactorConfiguration;
|
||||
import reactor.io.net.NetStreams;
|
||||
import reactor.io.net.Spec.TcpClientSpec;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.tcp.TcpOperations;
|
||||
import org.springframework.messaging.tcp.reactor.Reactor2TcpClient;
|
||||
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
|
||||
/**
|
||||
* A STOMP over TCP client that uses
|
||||
* {@link Reactor2TcpClient}.
|
||||
* {@link ReactorNettyTcpClient}.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 4.2
|
||||
*/
|
||||
public class Reactor2TcpStompClient extends StompClientSupport {
|
||||
public class ReactorNettyTcpStompClient extends StompClientSupport {
|
||||
|
||||
private final TcpOperations<byte[]> tcpClient;
|
||||
|
||||
|
||||
/**
|
||||
* Create an instance with host "127.0.0.1" and port 61613.
|
||||
*/
|
||||
public Reactor2TcpStompClient() {
|
||||
public ReactorNettyTcpStompClient() {
|
||||
this("127.0.0.1", 61613);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create an instance with the given host and port.
|
||||
* @param host the host
|
||||
* @param port the port
|
||||
*/
|
||||
public Reactor2TcpStompClient(final String host, final int port) {
|
||||
ConfigurationReader reader = new StompClientDispatcherConfigReader();
|
||||
Environment environment = new Environment(reader).assignErrorJournal();
|
||||
StompTcpClientSpecFactory factory = new StompTcpClientSpecFactory(environment, host, port);
|
||||
this.tcpClient = new Reactor2TcpClient<>(factory);
|
||||
public ReactorNettyTcpStompClient(final String host, final int port) {
|
||||
this.tcpClient = create(host, port, new StompDecoder());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an instance with a pre-configured TCP client.
|
||||
* @param tcpClient the client to use
|
||||
*/
|
||||
public Reactor2TcpStompClient(TcpOperations<byte[]> tcpClient) {
|
||||
public ReactorNettyTcpStompClient(TcpOperations<byte[]> tcpClient) {
|
||||
this.tcpClient = tcpClient;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Connect and notify the given {@link StompSessionHandler} when connected
|
||||
* on the STOMP level.
|
||||
|
|
@ -83,6 +74,7 @@ public class Reactor2TcpStompClient extends StompClientSupport {
|
|||
return connect(null, handler);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* An overloaded version of {@link #connect(StompSessionHandler)} that
|
||||
* accepts headers to use for the STOMP CONNECT frame.
|
||||
|
|
@ -103,48 +95,54 @@ public class Reactor2TcpStompClient extends StompClientSupport {
|
|||
this.tcpClient.shutdown();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A ConfigurationReader with a thread pool-based dispatcher.
|
||||
* Create a new {@link ReactorNettyTcpClient} with Stomp specific configuration for
|
||||
* encoding, decoding and hand-off.
|
||||
*
|
||||
* @param relayHost target host
|
||||
* @param relayPort target port
|
||||
* @param decoder {@link StompDecoder} to use
|
||||
* @return a new {@link TcpOperations}
|
||||
*/
|
||||
private static class StompClientDispatcherConfigReader implements ConfigurationReader {
|
||||
|
||||
@Override
|
||||
public ReactorConfiguration read() {
|
||||
String dispatcherName = "StompClient";
|
||||
DispatcherType dispatcherType = DispatcherType.DISPATCHER_GROUP;
|
||||
DispatcherConfiguration config = new DispatcherConfiguration(dispatcherName, dispatcherType, 128, 0);
|
||||
List<DispatcherConfiguration> configList = Collections.<DispatcherConfiguration>singletonList(config);
|
||||
return new ReactorConfiguration(configList, dispatcherName, new Properties());
|
||||
}
|
||||
protected static TcpOperations<byte[]> create(String relayHost,
|
||||
int relayPort,
|
||||
StompDecoder decoder) {
|
||||
return new ReactorNettyTcpClient<>(relayHost,
|
||||
relayPort,
|
||||
new ReactorNettyTcpClient.MessageHandlerConfiguration<>(new DecodingFunction(
|
||||
decoder),
|
||||
new EncodingConsumer(new StompEncoder()),
|
||||
128,
|
||||
Schedulers.newParallel("StompClient")));
|
||||
}
|
||||
|
||||
private static final class EncodingConsumer
|
||||
implements BiConsumer<ByteBuf, Message<byte[]>> {
|
||||
|
||||
private static class StompTcpClientSpecFactory
|
||||
implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {
|
||||
private final StompEncoder encoder;
|
||||
|
||||
private final Environment environment;
|
||||
|
||||
private final String host;
|
||||
|
||||
private final int port;
|
||||
|
||||
public StompTcpClientSpecFactory(Environment environment, String host, int port) {
|
||||
this.environment = environment;
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
public EncodingConsumer(StompEncoder encoder) {
|
||||
this.encoder = encoder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(
|
||||
TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
|
||||
|
||||
return tcpClientSpec
|
||||
.codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
|
||||
.env(this.environment)
|
||||
.dispatcher(this.environment.getCachedDispatchers("StompClient").get())
|
||||
.connect(this.host, this.port);
|
||||
public void accept(ByteBuf byteBuf, Message<byte[]> message) {
|
||||
byteBuf.writeBytes(encoder.encode(message));
|
||||
}
|
||||
}
|
||||
|
||||
private static final class DecodingFunction
|
||||
implements Function<ByteBuf, List<Message<byte[]>>> {
|
||||
|
||||
private final StompDecoder decoder;
|
||||
|
||||
public DecodingFunction(StompDecoder decoder) {
|
||||
this.decoder = decoder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Message<byte[]>> apply(ByteBuf buffer) {
|
||||
return this.decoder.decode(buffer.nioBuffer());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -40,7 +40,7 @@ import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy;
|
|||
import org.springframework.messaging.tcp.TcpConnection;
|
||||
import org.springframework.messaging.tcp.TcpConnectionHandler;
|
||||
import org.springframework.messaging.tcp.TcpOperations;
|
||||
import org.springframework.messaging.tcp.reactor.Reactor2TcpClient;
|
||||
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
import org.springframework.util.concurrent.ListenableFutureCallback;
|
||||
|
|
@ -335,7 +335,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
|
|||
|
||||
/**
|
||||
* Configure a TCP client for managing TCP connections to the STOMP broker.
|
||||
* By default {@link Reactor2TcpClient} is used.
|
||||
* By default {@link ReactorNettyTcpClient} is used.
|
||||
*/
|
||||
public void setTcpClient(TcpOperations<byte[]> tcpClient) {
|
||||
this.tcpClient = tcpClient;
|
||||
|
|
@ -387,8 +387,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
|
|||
if (this.tcpClient == null) {
|
||||
StompDecoder decoder = new StompDecoder();
|
||||
decoder.setHeaderInitializer(getHeaderInitializer());
|
||||
Reactor2StompCodec codec = new Reactor2StompCodec(new StompEncoder(), decoder);
|
||||
this.tcpClient = new StompTcpClientFactory().create(this.relayHost, this.relayPort, codec);
|
||||
|
||||
this.tcpClient = ReactorNettyTcpStompClient.create(this.relayHost, this.relayPort, decoder);
|
||||
}
|
||||
|
||||
if (logger.isInfoEnabled()) {
|
||||
|
|
@ -970,15 +970,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private static class StompTcpClientFactory {
|
||||
|
||||
public TcpOperations<byte[]> create(String relayHost, int relayPort, Reactor2StompCodec codec) {
|
||||
return new Reactor2TcpClient<>(relayHost, relayPort, codec);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static class VoidCallable implements Callable<Void> {
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -16,12 +16,14 @@
|
|||
|
||||
package org.springframework.messaging.tcp.reactor;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import reactor.fn.Consumer;
|
||||
import reactor.rx.Promise;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.MonoProcessor;
|
||||
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.concurrent.FailureCallback;
|
||||
|
|
@ -31,28 +33,24 @@ import org.springframework.util.concurrent.ListenableFutureCallbackRegistry;
|
|||
import org.springframework.util.concurrent.SuccessCallback;
|
||||
|
||||
/**
|
||||
* Adapts a reactor {@link Promise} to {@link ListenableFuture} optionally converting
|
||||
* Adapts a reactor {@link Mono} to {@link ListenableFuture} optionally converting
|
||||
* the result Object type {@code <S>} to the expected target type {@code <T>}.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 4.0
|
||||
* @param <S> the type of object expected from the {@link Promise}
|
||||
* @param <S> the type of object expected from the {@link Mono}
|
||||
* @param <T> the type of object expected from the {@link ListenableFuture}
|
||||
*/
|
||||
abstract class AbstractPromiseToListenableFutureAdapter<S, T> implements ListenableFuture<T> {
|
||||
abstract class AbstractMonoToListenableFutureAdapter<S, T>
|
||||
implements ListenableFuture<T> {
|
||||
|
||||
private final Promise<S> promise;
|
||||
private final MonoProcessor<S> promise;
|
||||
|
||||
private final ListenableFutureCallbackRegistry<T> registry = new ListenableFutureCallbackRegistry<>();
|
||||
|
||||
|
||||
protected AbstractPromiseToListenableFutureAdapter(Promise<S> promise) {
|
||||
Assert.notNull(promise, "Promise must not be null");
|
||||
this.promise = promise;
|
||||
|
||||
this.promise.onSuccess(new Consumer<S>() {
|
||||
@Override
|
||||
public void accept(S result) {
|
||||
protected AbstractMonoToListenableFutureAdapter(Mono<S> promise) {
|
||||
Assert.notNull(promise, "Mono must not be null");
|
||||
this.promise = promise.doOnSuccess(result -> {
|
||||
T adapted;
|
||||
try {
|
||||
adapted = adapt(result);
|
||||
|
|
@ -62,46 +60,44 @@ abstract class AbstractPromiseToListenableFutureAdapter<S, T> implements Listena
|
|||
return;
|
||||
}
|
||||
registry.success(adapted);
|
||||
}
|
||||
});
|
||||
|
||||
this.promise.onError(new Consumer<Throwable>() {
|
||||
@Override
|
||||
public void accept(Throwable ex) {
|
||||
registry.failure(ex);
|
||||
}
|
||||
});
|
||||
})
|
||||
.doOnError(registry::failure)
|
||||
.subscribe();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public T get() throws InterruptedException {
|
||||
S result = this.promise.await();
|
||||
S result = this.promise.block();
|
||||
return adapt(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
|
||||
S result = this.promise.await(timeout, unit);
|
||||
if (!this.promise.isComplete()) {
|
||||
throw new TimeoutException();
|
||||
}
|
||||
Objects.requireNonNull(unit, "unit");
|
||||
S result = this.promise.block(Duration.ofMillis(TimeUnit.MILLISECONDS.convert(
|
||||
timeout,
|
||||
unit)));
|
||||
return adapt(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean cancel(boolean mayInterruptIfRunning) {
|
||||
return false;
|
||||
if (isCancelled()) {
|
||||
return false;
|
||||
}
|
||||
this.promise.cancel();
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCancelled() {
|
||||
return false;
|
||||
return this.promise.isCancelled();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDone() {
|
||||
return this.promise.isComplete();
|
||||
return this.promise.isTerminated();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -16,20 +16,22 @@
|
|||
|
||||
package org.springframework.messaging.tcp.reactor;
|
||||
|
||||
import reactor.rx.Promise;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
/**
|
||||
* A Promise-to-ListenableFutureAdapter where the source and the target from
|
||||
* A Mono-to-ListenableFutureAdapter where the source and the target from
|
||||
* the Promise and the ListenableFuture respectively are of the same type.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @author Stephane Maldini
|
||||
* @since 4.0
|
||||
*/
|
||||
class PassThroughPromiseToListenableFutureAdapter<T> extends AbstractPromiseToListenableFutureAdapter<T, T> {
|
||||
class MonoToListenableFutureAdapter<T> extends
|
||||
AbstractMonoToListenableFutureAdapter<T, T> {
|
||||
|
||||
|
||||
public PassThroughPromiseToListenableFutureAdapter(Promise<T> promise) {
|
||||
super(promise);
|
||||
public MonoToListenableFutureAdapter(Mono<T> mono) {
|
||||
super(mono);
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -1,362 +0,0 @@
|
|||
/*
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.messaging.tcp.reactor;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.FutureListener;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.Environment;
|
||||
import reactor.core.config.ConfigurationReader;
|
||||
import reactor.core.config.DispatcherConfiguration;
|
||||
import reactor.core.config.ReactorConfiguration;
|
||||
import reactor.core.support.NamedDaemonThreadFactory;
|
||||
import reactor.fn.Consumer;
|
||||
import reactor.fn.Function;
|
||||
import reactor.fn.tuple.Tuple;
|
||||
import reactor.fn.tuple.Tuple2;
|
||||
import reactor.io.buffer.Buffer;
|
||||
import reactor.io.codec.Codec;
|
||||
import reactor.io.net.ChannelStream;
|
||||
import reactor.io.net.NetStreams;
|
||||
import reactor.io.net.NetStreams.TcpClientFactory;
|
||||
import reactor.io.net.ReactorChannelHandler;
|
||||
import reactor.io.net.Reconnect;
|
||||
import reactor.io.net.Spec.TcpClientSpec;
|
||||
import reactor.io.net.config.ClientSocketOptions;
|
||||
import reactor.io.net.impl.netty.NettyClientSocketOptions;
|
||||
import reactor.io.net.impl.netty.tcp.NettyTcpClient;
|
||||
import reactor.io.net.tcp.TcpClient;
|
||||
import reactor.rx.Promise;
|
||||
import reactor.rx.Promises;
|
||||
import reactor.rx.Stream;
|
||||
import reactor.rx.Streams;
|
||||
import reactor.rx.action.Signal;
|
||||
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.tcp.ReconnectStrategy;
|
||||
import org.springframework.messaging.tcp.TcpConnectionHandler;
|
||||
import org.springframework.messaging.tcp.TcpOperations;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
|
||||
/**
|
||||
* An implementation of {@link org.springframework.messaging.tcp.TcpOperations}
|
||||
* based on the TCP client support of the Reactor project.
|
||||
*
|
||||
* <p>This implementation wraps N (Reactor) clients for N {@link #connect} calls,
|
||||
* i.e. a separate (Reactor) client instance for each connection.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @author Stephane Maldini
|
||||
* @since 4.2
|
||||
*/
|
||||
public class Reactor2TcpClient<P> implements TcpOperations<P> {
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
public static final Class<NettyTcpClient> REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class;
|
||||
|
||||
private static final Method eventLoopGroupMethod = initEventLoopGroupMethod();
|
||||
|
||||
|
||||
private final EventLoopGroup eventLoopGroup;
|
||||
|
||||
private final Environment environment;
|
||||
|
||||
private final TcpClientFactory<Message<P>, Message<P>> tcpClientSpecFactory;
|
||||
|
||||
private final List<TcpClient<Message<P>, Message<P>>> tcpClients =
|
||||
new ArrayList<>();
|
||||
|
||||
private boolean stopping;
|
||||
|
||||
|
||||
/**
|
||||
* A constructor that creates a {@link TcpClientSpec TcpClientSpec} factory
|
||||
* with a default {@link reactor.core.dispatch.SynchronousDispatcher}, i.e.
|
||||
* relying on Netty threads. The number of Netty threads can be tweaked with
|
||||
* the {@code reactor.tcp.ioThreadCount} System property. The network I/O
|
||||
* threads will be shared amongst the active clients.
|
||||
* <p>Also see the constructor accepting a ready Reactor
|
||||
* {@link TcpClientSpec} {@link Function} factory.
|
||||
* @param host the host to connect to
|
||||
* @param port the port to connect to
|
||||
* @param codec the codec to use for encoding and decoding the TCP stream
|
||||
*/
|
||||
public Reactor2TcpClient(final String host, final int port, final Codec<Buffer, Message<P>, Message<P>> codec) {
|
||||
// Reactor 2.0.5 requires NioEventLoopGroup vs 2.0.6+ requires EventLoopGroup
|
||||
final NioEventLoopGroup nioEventLoopGroup = initEventLoopGroup();
|
||||
this.eventLoopGroup = nioEventLoopGroup;
|
||||
this.environment = new Environment(new SynchronousDispatcherConfigReader());
|
||||
|
||||
this.tcpClientSpecFactory = new TcpClientFactory<Message<P>, Message<P>>() {
|
||||
@Override
|
||||
public TcpClientSpec<Message<P>, Message<P>> apply(TcpClientSpec<Message<P>, Message<P>> spec) {
|
||||
return spec
|
||||
.env(environment)
|
||||
.codec(codec)
|
||||
.connect(host, port)
|
||||
.options(createClientSocketOptions());
|
||||
}
|
||||
|
||||
private ClientSocketOptions createClientSocketOptions() {
|
||||
return (ClientSocketOptions) ReflectionUtils.invokeMethod(eventLoopGroupMethod,
|
||||
new NettyClientSocketOptions(), nioEventLoopGroup);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* A constructor with a pre-configured {@link TcpClientSpec} {@link Function}
|
||||
* factory. This might be used to add SSL or specific network parameters to
|
||||
* the generated client configuration.
|
||||
* <p><strong>NOTE:</strong> if the client is configured with a thread-creating
|
||||
* dispatcher, you are responsible for cleaning them, e.g. using
|
||||
* {@link reactor.core.Dispatcher#shutdown}.
|
||||
* @param tcpClientSpecFactory the TcpClientSpec {@link Function} to use for each client creation
|
||||
*/
|
||||
public Reactor2TcpClient(TcpClientFactory<Message<P>, Message<P>> tcpClientSpecFactory) {
|
||||
Assert.notNull(tcpClientSpecFactory, "'tcpClientClientFactory' must not be null");
|
||||
this.tcpClientSpecFactory = tcpClientSpecFactory;
|
||||
this.eventLoopGroup = null;
|
||||
this.environment = null;
|
||||
}
|
||||
|
||||
|
||||
private static NioEventLoopGroup initEventLoopGroup() {
|
||||
int ioThreadCount;
|
||||
try {
|
||||
ioThreadCount = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount"));
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
ioThreadCount = -1;
|
||||
}
|
||||
if (ioThreadCount <= 0) {
|
||||
ioThreadCount = Runtime.getRuntime().availableProcessors();
|
||||
}
|
||||
return new NioEventLoopGroup(ioThreadCount, new NamedDaemonThreadFactory("reactor-tcp-io"));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> connectionHandler) {
|
||||
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
|
||||
|
||||
final TcpClient<Message<P>, Message<P>> tcpClient;
|
||||
final Runnable cleanupTask;
|
||||
synchronized (this.tcpClients) {
|
||||
if (this.stopping) {
|
||||
IllegalStateException ex = new IllegalStateException("Shutting down.");
|
||||
connectionHandler.afterConnectFailure(ex);
|
||||
return new PassThroughPromiseToListenableFutureAdapter<>(Promises.<Void>error(ex));
|
||||
}
|
||||
tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory);
|
||||
this.tcpClients.add(tcpClient);
|
||||
cleanupTask = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (tcpClients) {
|
||||
tcpClients.remove(tcpClient);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
Promise<Void> promise = tcpClient.start(
|
||||
new MessageChannelStreamHandler<>(connectionHandler, cleanupTask));
|
||||
|
||||
return new PassThroughPromiseToListenableFutureAdapter<>(
|
||||
promise.onError(new Consumer<Throwable>() {
|
||||
@Override
|
||||
public void accept(Throwable ex) {
|
||||
cleanupTask.run();
|
||||
connectionHandler.afterConnectFailure(ex);
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy strategy) {
|
||||
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
|
||||
Assert.notNull(strategy, "ReconnectStrategy must not be null");
|
||||
|
||||
final TcpClient<Message<P>, Message<P>> tcpClient;
|
||||
Runnable cleanupTask;
|
||||
synchronized (this.tcpClients) {
|
||||
if (this.stopping) {
|
||||
IllegalStateException ex = new IllegalStateException("Shutting down.");
|
||||
connectionHandler.afterConnectFailure(ex);
|
||||
return new PassThroughPromiseToListenableFutureAdapter<>(Promises.<Void>error(ex));
|
||||
}
|
||||
tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory);
|
||||
this.tcpClients.add(tcpClient);
|
||||
cleanupTask = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (tcpClients) {
|
||||
tcpClients.remove(tcpClient);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
Stream<Tuple2<InetSocketAddress, Integer>> stream = tcpClient.start(
|
||||
new MessageChannelStreamHandler<>(connectionHandler, cleanupTask),
|
||||
new ReactorReconnectAdapter(strategy));
|
||||
|
||||
return new PassThroughPromiseToListenableFutureAdapter<>(stream.next().after());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> shutdown() {
|
||||
synchronized (this.tcpClients) {
|
||||
this.stopping = true;
|
||||
}
|
||||
|
||||
Promise<Void> promise = Streams.from(this.tcpClients)
|
||||
.flatMap(new Function<TcpClient<Message<P>, Message<P>>, Promise<Void>>() {
|
||||
@Override
|
||||
public Promise<Void> apply(final TcpClient<Message<P>, Message<P>> client) {
|
||||
return client.shutdown().onComplete(new Consumer<Promise<Void>>() {
|
||||
@Override
|
||||
public void accept(Promise<Void> voidPromise) {
|
||||
tcpClients.remove(client);
|
||||
}
|
||||
});
|
||||
}
|
||||
})
|
||||
.next();
|
||||
|
||||
if (this.eventLoopGroup != null) {
|
||||
final Promise<Void> eventLoopPromise = Promises.prepare();
|
||||
promise.onComplete(new Consumer<Promise<Void>>() {
|
||||
@Override
|
||||
public void accept(Promise<Void> voidPromise) {
|
||||
eventLoopGroup.shutdownGracefully().addListener(new FutureListener<Object>() {
|
||||
@Override
|
||||
public void operationComplete(Future<Object> future) throws Exception {
|
||||
if (future.isSuccess()) {
|
||||
eventLoopPromise.onComplete();
|
||||
}
|
||||
else {
|
||||
eventLoopPromise.onError(future.cause());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
promise = eventLoopPromise;
|
||||
}
|
||||
|
||||
if (this.environment != null) {
|
||||
promise.onComplete(new Consumer<Promise<Void>>() {
|
||||
@Override
|
||||
public void accept(Promise<Void> voidPromise) {
|
||||
environment.shutdown();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return new PassThroughPromiseToListenableFutureAdapter<>(promise);
|
||||
}
|
||||
|
||||
|
||||
private static Method initEventLoopGroupMethod() {
|
||||
for (Method method : NettyClientSocketOptions.class.getMethods()) {
|
||||
if (method.getName().equals("eventLoopGroup") && method.getParameterCount() == 1) {
|
||||
return method;
|
||||
}
|
||||
}
|
||||
throw new IllegalStateException("No compatible Reactor version found");
|
||||
}
|
||||
|
||||
|
||||
private static class SynchronousDispatcherConfigReader implements ConfigurationReader {
|
||||
|
||||
@Override
|
||||
public ReactorConfiguration read() {
|
||||
return new ReactorConfiguration(Collections.emptyList(), "sync", new Properties());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static class MessageChannelStreamHandler<P>
|
||||
implements ReactorChannelHandler<Message<P>, Message<P>, ChannelStream<Message<P>, Message<P>>> {
|
||||
|
||||
private final TcpConnectionHandler<P> connectionHandler;
|
||||
|
||||
private final Runnable cleanupTask;
|
||||
|
||||
public MessageChannelStreamHandler(TcpConnectionHandler<P> connectionHandler, Runnable cleanupTask) {
|
||||
this.connectionHandler = connectionHandler;
|
||||
this.cleanupTask = cleanupTask;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<Void> apply(ChannelStream<Message<P>, Message<P>> channelStream) {
|
||||
Promise<Void> closePromise = Promises.prepare();
|
||||
this.connectionHandler.afterConnected(new Reactor2TcpConnection<>(channelStream, closePromise));
|
||||
channelStream
|
||||
.finallyDo(new Consumer<Signal<Message<P>>>() {
|
||||
@Override
|
||||
public void accept(Signal<Message<P>> signal) {
|
||||
cleanupTask.run();
|
||||
if (signal.isOnError()) {
|
||||
connectionHandler.handleFailure(signal.getThrowable());
|
||||
}
|
||||
else if (signal.isOnComplete()) {
|
||||
connectionHandler.afterConnectionClosed();
|
||||
}
|
||||
}
|
||||
})
|
||||
.consume(new Consumer<Message<P>>() {
|
||||
@Override
|
||||
public void accept(Message<P> message) {
|
||||
connectionHandler.handleMessage(message);
|
||||
}
|
||||
});
|
||||
|
||||
return closePromise;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static class ReactorReconnectAdapter implements Reconnect {
|
||||
|
||||
private final ReconnectStrategy strategy;
|
||||
|
||||
public ReactorReconnectAdapter(ReconnectStrategy strategy) {
|
||||
this.strategy = strategy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Tuple2<InetSocketAddress, Long> reconnect(InetSocketAddress address, int attempt) {
|
||||
return Tuple.of(address, this.strategy.getTimeToNextAttempt(attempt));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,264 @@
|
|||
/*
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.messaging.tcp.reactor;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Objects;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.group.ChannelGroup;
|
||||
import io.netty.channel.group.DefaultChannelGroup;
|
||||
import io.netty.util.concurrent.ImmediateEventExecutor;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.DirectProcessor;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.MonoProcessor;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
import reactor.ipc.netty.ChannelFutureMono;
|
||||
import reactor.ipc.netty.NettyContext;
|
||||
import reactor.ipc.netty.NettyInbound;
|
||||
import reactor.ipc.netty.NettyOutbound;
|
||||
import reactor.ipc.netty.options.ClientOptions;
|
||||
import reactor.ipc.netty.tcp.TcpClient;
|
||||
import reactor.util.concurrent.QueueSupplier;
|
||||
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.tcp.ReconnectStrategy;
|
||||
import org.springframework.messaging.tcp.TcpConnection;
|
||||
import org.springframework.messaging.tcp.TcpConnectionHandler;
|
||||
import org.springframework.messaging.tcp.TcpOperations;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
|
||||
/**
|
||||
* An implementation of {@link org.springframework.messaging.tcp.TcpOperations}
|
||||
* based on the TCP client support of the Reactor project.
|
||||
* <p>
|
||||
* <p>This implementation wraps N (Reactor) clients for N {@link #connect} calls,
|
||||
* i.e. a separate (Reactor) client instance for each connection.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @author Stephane Maldini
|
||||
* @since 4.2
|
||||
*/
|
||||
public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
|
||||
|
||||
private final TcpClient tcpClient;
|
||||
|
||||
private final MessageHandlerConfiguration<P> configuration;
|
||||
private final ChannelGroup group;
|
||||
private volatile boolean stopping;
|
||||
|
||||
/**
|
||||
* A constructor that creates a {@link TcpClient TcpClient} factory relying on
|
||||
* Reactor Netty TCP threads. The number of Netty threads can be tweaked with
|
||||
* the {@code reactor.tcp.ioThreadCount} System property. The network I/O
|
||||
* threads will be shared amongst the active clients.
|
||||
* <p>Also see the constructor accepting a {@link Consumer} of
|
||||
* {@link ClientOptions} for advanced tuning.
|
||||
*
|
||||
* @param host the host to connect to
|
||||
* @param port the port to connect to
|
||||
* @param configuration the client configuration
|
||||
*/
|
||||
public ReactorNettyTcpClient(String host,
|
||||
int port,
|
||||
MessageHandlerConfiguration<P> configuration) {
|
||||
this.configuration = Objects.requireNonNull(configuration, "configuration");
|
||||
this.group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
|
||||
this.tcpClient = TcpClient.create(options -> options.connect(host, port)
|
||||
.channelGroup(group));
|
||||
}
|
||||
|
||||
/**
|
||||
* A constructor with a configurator {@link Consumer} that will receive default {@link
|
||||
* ClientOptions} from {@link TcpClient}. This might be used to add SSL or specific
|
||||
* network parameters to the generated client configuration.
|
||||
*
|
||||
* @param tcpOptions the {@link Consumer} of {@link ClientOptions} shared to use by
|
||||
* connected handlers.
|
||||
* @param configuration the client configuration
|
||||
*/
|
||||
public ReactorNettyTcpClient(Consumer<? super ClientOptions> tcpOptions,
|
||||
MessageHandlerConfiguration<P> configuration) {
|
||||
this.configuration = Objects.requireNonNull(configuration, "configuration");
|
||||
this.group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
|
||||
this.tcpClient =
|
||||
TcpClient.create(opts -> tcpOptions.accept(opts.channelGroup(group)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> connectionHandler) {
|
||||
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
|
||||
if (stopping) {
|
||||
IllegalStateException ex = new IllegalStateException("Shutting down.");
|
||||
connectionHandler.afterConnectFailure(ex);
|
||||
return new MonoToListenableFutureAdapter<>(Mono.<Void>error(ex));
|
||||
}
|
||||
|
||||
MessageHandler<P> handler =
|
||||
new MessageHandler<>(connectionHandler, configuration);
|
||||
|
||||
Mono<Void> promise = tcpClient.newHandler(handler)
|
||||
.doOnError(connectionHandler::afterConnectFailure)
|
||||
.then();
|
||||
|
||||
return new MonoToListenableFutureAdapter<>(promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler,
|
||||
ReconnectStrategy strategy) {
|
||||
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
|
||||
Assert.notNull(strategy, "ReconnectStrategy must not be null");
|
||||
|
||||
if (stopping) {
|
||||
IllegalStateException ex = new IllegalStateException("Shutting down.");
|
||||
connectionHandler.afterConnectFailure(ex);
|
||||
return new MonoToListenableFutureAdapter<>(Mono.<Void>error(ex));
|
||||
}
|
||||
|
||||
MessageHandler<P> handler =
|
||||
new MessageHandler<>(connectionHandler, configuration);
|
||||
|
||||
MonoProcessor<Void> promise = MonoProcessor.create();
|
||||
|
||||
tcpClient.newHandler(handler)
|
||||
.doOnNext(e -> {
|
||||
if (!promise.isTerminated()) {
|
||||
promise.onComplete();
|
||||
}
|
||||
})
|
||||
.doOnError(e -> {
|
||||
if (!promise.isTerminated()) {
|
||||
promise.onError(e);
|
||||
}
|
||||
})
|
||||
.then(NettyContext::onClose)
|
||||
.retryWhen(new Reconnector<>(strategy))
|
||||
.repeatWhen(new Reconnector<>(strategy))
|
||||
.subscribe();
|
||||
|
||||
return new MonoToListenableFutureAdapter<>(promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> shutdown() {
|
||||
if (stopping) {
|
||||
return new MonoToListenableFutureAdapter<>(Mono.empty());
|
||||
}
|
||||
|
||||
stopping = true;
|
||||
|
||||
Mono<Void> closing = ChannelFutureMono.from(group.close());
|
||||
|
||||
if (configuration.scheduler != null) {
|
||||
closing =
|
||||
closing.doAfterTerminate((x, e) -> configuration.scheduler.shutdown());
|
||||
}
|
||||
|
||||
return new MonoToListenableFutureAdapter<>(closing);
|
||||
}
|
||||
|
||||
/**
|
||||
* A configuration holder
|
||||
*/
|
||||
public static final class MessageHandlerConfiguration<P> {
|
||||
|
||||
private final Function<? super ByteBuf, ? extends Collection<Message<P>>> decoder;
|
||||
private final BiConsumer<? super ByteBuf, ? super Message<P>> encoder;
|
||||
private final int backlog;
|
||||
private final Scheduler
|
||||
scheduler;
|
||||
|
||||
public MessageHandlerConfiguration(Function<? super ByteBuf, ? extends Collection<Message<P>>> decoder,
|
||||
BiConsumer<? super ByteBuf, ? super Message<P>> encoder,
|
||||
int backlog,
|
||||
Scheduler scheduler) {
|
||||
this.decoder = decoder;
|
||||
this.encoder = encoder;
|
||||
this.backlog = backlog > 0 ? backlog : QueueSupplier.SMALL_BUFFER_SIZE;
|
||||
this.scheduler = scheduler;
|
||||
}
|
||||
}
|
||||
|
||||
private static final class MessageHandler<P> implements BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> {
|
||||
|
||||
private final TcpConnectionHandler<P> connectionHandler;
|
||||
|
||||
private final MessageHandlerConfiguration<P> configuration;
|
||||
|
||||
MessageHandler(TcpConnectionHandler<P> connectionHandler,
|
||||
MessageHandlerConfiguration<P> configuration) {
|
||||
this.connectionHandler = connectionHandler;
|
||||
this.configuration = configuration;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<Void> apply(NettyInbound in, NettyOutbound out) {
|
||||
Flux<Collection<Message<P>>> inbound = in.receive()
|
||||
.map(configuration.decoder);
|
||||
|
||||
DirectProcessor<Void> promise = DirectProcessor.create();
|
||||
TcpConnection<P> tcpConnection = new ReactorNettyTcpConnection<>(in,
|
||||
out,
|
||||
configuration.encoder,
|
||||
promise);
|
||||
|
||||
if (configuration.scheduler != null) {
|
||||
configuration.scheduler.schedule(() -> connectionHandler.afterConnected(
|
||||
tcpConnection));
|
||||
inbound =
|
||||
inbound.publishOn(configuration.scheduler, configuration.backlog);
|
||||
}
|
||||
else {
|
||||
connectionHandler.afterConnected(tcpConnection);
|
||||
}
|
||||
|
||||
inbound.flatMapIterable(Function.identity())
|
||||
.subscribe(connectionHandler::handleMessage,
|
||||
connectionHandler::handleFailure,
|
||||
connectionHandler::afterConnectionClosed);
|
||||
|
||||
return promise;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static final class Reconnector<T> implements Function<Flux<T>, Publisher<?>> {
|
||||
|
||||
private final ReconnectStrategy strategy;
|
||||
|
||||
Reconnector(ReconnectStrategy strategy) {
|
||||
this.strategy = strategy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<?> apply(Flux<T> flux) {
|
||||
return flux.scan(1, (p, e) -> p++)
|
||||
.doOnCancel(() -> new Exception().printStackTrace())
|
||||
.flatMap(attempt -> Mono.delayMillis(strategy.getTimeToNextAttempt(
|
||||
attempt)));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -16,10 +16,13 @@
|
|||
|
||||
package org.springframework.messaging.tcp.reactor;
|
||||
|
||||
import reactor.io.net.ChannelStream;
|
||||
import reactor.rx.Promise;
|
||||
import reactor.rx.Promises;
|
||||
import reactor.rx.Streams;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import reactor.core.publisher.DirectProcessor;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.ipc.netty.NettyInbound;
|
||||
import reactor.ipc.netty.NettyOutbound;
|
||||
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.tcp.TcpConnection;
|
||||
|
|
@ -29,45 +32,50 @@ import org.springframework.util.concurrent.ListenableFuture;
|
|||
* An implementation of {@link org.springframework.messaging.tcp.TcpConnection
|
||||
* TcpConnection} based on the TCP client support of the Reactor project.
|
||||
*
|
||||
* @param <P> the payload type of messages read or written to the TCP stream.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 4.2
|
||||
* @param <P> the payload type of messages read or written to the TCP stream.
|
||||
*/
|
||||
public class Reactor2TcpConnection<P> implements TcpConnection<P> {
|
||||
public class ReactorNettyTcpConnection<P> implements TcpConnection<P> {
|
||||
|
||||
private final ChannelStream<Message<P>, Message<P>> channelStream;
|
||||
private final NettyInbound in;
|
||||
private final NettyOutbound out;
|
||||
private final DirectProcessor<Void> close;
|
||||
private final BiConsumer<? super ByteBuf, ? super Message<P>> encoder;
|
||||
|
||||
private final Promise<Void> closePromise;
|
||||
|
||||
|
||||
public Reactor2TcpConnection(ChannelStream<Message<P>, Message<P>> channelStream, Promise<Void> closePromise) {
|
||||
this.channelStream = channelStream;
|
||||
this.closePromise = closePromise;
|
||||
public ReactorNettyTcpConnection(NettyInbound in,
|
||||
NettyOutbound out,
|
||||
BiConsumer<? super ByteBuf, ? super Message<P>> encoder,
|
||||
DirectProcessor<Void> close) {
|
||||
this.out = out;
|
||||
this.in = in;
|
||||
this.encoder = encoder;
|
||||
this.close = close;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> send(Message<P> message) {
|
||||
Promise<Void> afterWrite = Promises.prepare();
|
||||
this.channelStream.writeWith(Streams.just(message)).subscribe(afterWrite);
|
||||
return new PassThroughPromiseToListenableFutureAdapter<>(afterWrite);
|
||||
ByteBuf byteBuf = in.channel().alloc().buffer();
|
||||
encoder.accept(byteBuf, message);
|
||||
return new MonoToListenableFutureAdapter<>(out.send(Mono.just(byteBuf)));
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("deprecation")
|
||||
public void onReadInactivity(Runnable runnable, long inactivityDuration) {
|
||||
this.channelStream.on().readIdle(inactivityDuration, reactor.fn.Functions.<Void>consumer(runnable));
|
||||
in.onReadIdle(inactivityDuration, runnable);
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("deprecation")
|
||||
public void onWriteInactivity(Runnable runnable, long inactivityDuration) {
|
||||
this.channelStream.on().writeIdle(inactivityDuration, reactor.fn.Functions.<Void>consumer(runnable));
|
||||
out.onWriteIdle(inactivityDuration, runnable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
this.closePromise.onComplete();
|
||||
close.onComplete();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -43,20 +43,20 @@ import static org.hamcrest.Matchers.*;
|
|||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* Integration tests for {@link Reactor2TcpStompClient}.
|
||||
* Integration tests for {@link ReactorNettyTcpStompClient}.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
*/
|
||||
public class Reactor2TcpStompClientTests {
|
||||
public class ReactorNettyTcpStompClientTests {
|
||||
|
||||
private static final Log logger = LogFactory.getLog(Reactor2TcpStompClientTests.class);
|
||||
private static final Log logger = LogFactory.getLog(ReactorNettyTcpStompClientTests.class);
|
||||
|
||||
@Rule
|
||||
public final TestName testName = new TestName();
|
||||
|
||||
private BrokerService activeMQBroker;
|
||||
|
||||
private Reactor2TcpStompClient client;
|
||||
private ReactorNettyTcpStompClient client;
|
||||
|
||||
|
||||
@Before
|
||||
|
|
@ -78,7 +78,7 @@ public class Reactor2TcpStompClientTests {
|
|||
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
|
||||
taskScheduler.afterPropertiesSet();
|
||||
|
||||
this.client = new Reactor2TcpStompClient("127.0.0.1", port);
|
||||
this.client = new ReactorNettyTcpStompClient("127.0.0.1", port);
|
||||
this.client.setMessageConverter(new StringMessageConverter());
|
||||
this.client.setTaskScheduler(taskScheduler);
|
||||
}
|
||||
|
|
@ -17,7 +17,7 @@
|
|||
package org.springframework.messaging.simp.stomp;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.ArrayList;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.Test;
|
||||
|
|
@ -26,22 +26,18 @@ import org.springframework.messaging.Message;
|
|||
import org.springframework.messaging.simp.SimpMessageType;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.util.InvalidMimeTypeException;
|
||||
import reactor.fn.Consumer;
|
||||
import reactor.fn.Function;
|
||||
import reactor.io.buffer.Buffer;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* Test fixture for {@link Reactor2StompCodec}.
|
||||
* Test fixture for {@link StompDecoder}.
|
||||
*
|
||||
* @author Andy Wilkinson
|
||||
* @author Stephane Maldini
|
||||
*/
|
||||
public class StompCodecTests {
|
||||
public class StompDecoderTests {
|
||||
|
||||
private final ArgumentCapturingConsumer<Message<byte[]>> consumer = new ArgumentCapturingConsumer<>();
|
||||
|
||||
private final Function<Buffer, Message<byte[]>> decoder = new Reactor2StompCodec().decoder(consumer);
|
||||
private final StompDecoder decoder = new StompDecoder();
|
||||
|
||||
@Test
|
||||
public void decodeFrameWithCrLfEols() {
|
||||
|
|
@ -172,11 +168,9 @@ public class StompCodecTests {
|
|||
public void decodeMultipleFramesFromSameBuffer() {
|
||||
String frame1 = "SEND\ndestination:test\n\nThe body of the message\0";
|
||||
String frame2 = "DISCONNECT\n\n\0";
|
||||
ByteBuffer buffer = ByteBuffer.wrap((frame1 + frame2).getBytes());
|
||||
|
||||
Buffer buffer = Buffer.wrap(frame1 + frame2);
|
||||
|
||||
final List<Message<byte[]>> messages = new ArrayList<>();
|
||||
new Reactor2StompCodec().decoder(messages::add).apply(buffer);
|
||||
final List<Message<byte[]>> messages = decoder.decode(buffer);
|
||||
|
||||
assertEquals(2, messages.size());
|
||||
assertEquals(StompCommand.SEND, StompHeaderAccessor.wrap(messages.get(0)).getCommand());
|
||||
|
|
@ -245,102 +239,33 @@ public class StompCodecTests {
|
|||
public void decodeHeartbeat() {
|
||||
String frame = "\n";
|
||||
|
||||
Buffer buffer = Buffer.wrap(frame);
|
||||
ByteBuffer buffer = ByteBuffer.wrap(frame.getBytes());
|
||||
|
||||
final List<Message<byte[]>> messages = new ArrayList<>();
|
||||
new Reactor2StompCodec().decoder(messages::add).apply(buffer);
|
||||
final List<Message<byte[]>> messages = decoder.decode(buffer);
|
||||
|
||||
assertEquals(1, messages.size());
|
||||
assertEquals(SimpMessageType.HEARTBEAT, StompHeaderAccessor.wrap(messages.get(0)).getMessageType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void encodeFrameWithNoHeadersAndNoBody() {
|
||||
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
|
||||
|
||||
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
|
||||
|
||||
assertEquals("DISCONNECT\n\n\0", new Reactor2StompCodec().encoder().apply(frame).asString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void encodeFrameWithHeaders() {
|
||||
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
|
||||
headers.setAcceptVersion("1.2");
|
||||
headers.setHost("github.org");
|
||||
|
||||
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
|
||||
|
||||
String frameString = new Reactor2StompCodec().encoder().apply(frame).asString();
|
||||
|
||||
assertTrue(frameString.equals("CONNECT\naccept-version:1.2\nhost:github.org\n\n\0") ||
|
||||
frameString.equals("CONNECT\nhost:github.org\naccept-version:1.2\n\n\0"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void encodeFrameWithHeadersThatShouldBeEscaped() {
|
||||
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
|
||||
headers.addNativeHeader("a:\r\n\\b", "alpha:bravo\r\n\\");
|
||||
|
||||
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
|
||||
|
||||
assertEquals("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0",
|
||||
new Reactor2StompCodec().encoder().apply(frame).asString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void encodeFrameWithHeadersBody() {
|
||||
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
|
||||
headers.addNativeHeader("a", "alpha");
|
||||
|
||||
Message<byte[]> frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders());
|
||||
|
||||
assertEquals("SEND\na:alpha\ncontent-length:12\n\nMessage body\0",
|
||||
new Reactor2StompCodec().encoder().apply(frame).asString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void encodeFrameWithContentLengthPresent() {
|
||||
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
|
||||
headers.setContentLength(12);
|
||||
|
||||
Message<byte[]> frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders());
|
||||
|
||||
assertEquals("SEND\ncontent-length:12\n\nMessage body\0",
|
||||
new Reactor2StompCodec().encoder().apply(frame).asString());
|
||||
}
|
||||
|
||||
private void assertIncompleteDecode(String partialFrame) {
|
||||
Buffer buffer = Buffer.wrap(partialFrame);
|
||||
ByteBuffer buffer = ByteBuffer.wrap(partialFrame.getBytes());
|
||||
assertNull(decode(buffer));
|
||||
assertEquals(0, buffer.position());
|
||||
}
|
||||
|
||||
private Message<byte[]> decode(String stompFrame) {
|
||||
Buffer buffer = Buffer.wrap(stompFrame);
|
||||
ByteBuffer buffer = ByteBuffer.wrap(stompFrame.getBytes());
|
||||
return decode(buffer);
|
||||
}
|
||||
|
||||
private Message<byte[]> decode(Buffer buffer) {
|
||||
this.decoder.apply(buffer);
|
||||
if (consumer.arguments.isEmpty()) {
|
||||
private Message<byte[]> decode(ByteBuffer buffer) {
|
||||
List<Message<byte[]>> messages = this.decoder.decode(buffer);
|
||||
if (messages.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
else {
|
||||
return consumer.arguments.get(0);
|
||||
return messages.get(0);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
private static final class ArgumentCapturingConsumer<T> implements Consumer<T> {
|
||||
|
||||
private final List<T> arguments = new ArrayList<>();
|
||||
|
||||
@Override
|
||||
public void accept(T t) {
|
||||
arguments.add(t);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,93 @@
|
|||
/*
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.messaging.simp.stomp;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Test fixture for {@link StompEncoder}.
|
||||
*
|
||||
* @author Andy Wilkinson
|
||||
* @author Stephane Maldini
|
||||
*/
|
||||
public class StompEncoderTests {
|
||||
|
||||
private final StompEncoder encoder = new StompEncoder();
|
||||
|
||||
@Test
|
||||
public void encodeFrameWithNoHeadersAndNoBody() {
|
||||
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
|
||||
|
||||
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
|
||||
|
||||
assertEquals("DISCONNECT\n\n\0", new String(encoder.encode(frame)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void encodeFrameWithHeaders() {
|
||||
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
|
||||
headers.setAcceptVersion("1.2");
|
||||
headers.setHost("github.org");
|
||||
|
||||
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
|
||||
|
||||
String frameString = new String(encoder.encode(frame));
|
||||
|
||||
assertTrue("CONNECT\naccept-version:1.2\nhost:github.org\n\n\0".equals(frameString) || "CONNECT\nhost:github.org\naccept-version:1.2\n\n\0".equals(
|
||||
frameString));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void encodeFrameWithHeadersThatShouldBeEscaped() {
|
||||
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
|
||||
headers.addNativeHeader("a:\r\n\\b", "alpha:bravo\r\n\\");
|
||||
|
||||
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
|
||||
|
||||
assertEquals("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0",
|
||||
new String(encoder.encode(frame)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void encodeFrameWithHeadersBody() {
|
||||
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
|
||||
headers.addNativeHeader("a", "alpha");
|
||||
|
||||
Message<byte[]> frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders());
|
||||
|
||||
assertEquals("SEND\na:alpha\ncontent-length:12\n\nMessage body\0",
|
||||
new String(encoder.encode(frame)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void encodeFrameWithContentLengthPresent() {
|
||||
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
|
||||
headers.setContentLength(12);
|
||||
|
||||
Message<byte[]> frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders());
|
||||
|
||||
assertEquals("SEND\ncontent-length:12\n\nMessage body\0",
|
||||
new String(encoder.encode(frame)));
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue