Support Reactor Netty 2 / Netty 5 in spring-messaging

Closes gh-28847
This commit is contained in:
rstoyanchev 2022-09-12 09:36:22 +01:00
parent 2f4c39ba2a
commit 3a5f550a30
11 changed files with 706 additions and 20 deletions

View File

@ -9,6 +9,7 @@ dependencies {
optional(project(":spring-context"))
optional(project(":spring-oxm"))
optional("io.projectreactor.netty:reactor-netty-http")
optional("io.projectreactor.netty:reactor-netty5-http:2.0.0-M1")
optional("io.rsocket:rsocket-core")
optional("io.rsocket:rsocket-transport-netty")
optional("com.fasterxml.jackson.core:jackson-databind")

View File

@ -21,17 +21,31 @@ import java.util.concurrent.CompletableFuture;
import org.springframework.lang.Nullable;
import org.springframework.messaging.simp.SimpLogging;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.ReactorNetty2TcpClient;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
/**
* A STOMP over TCP client that uses {@link ReactorNettyTcpClient}.
* A STOMP over TCP client, configurable with either
* {@link ReactorNettyTcpClient} or {@link ReactorNetty2TcpClient}.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public class ReactorNettyTcpStompClient extends StompClientSupport {
private static final boolean reactorNettyClientPresent;
private static final boolean reactorNetty2ClientPresent;
static {
ClassLoader classLoader = StompBrokerRelayMessageHandler.class.getClassLoader();
reactorNettyClientPresent = ClassUtils.isPresent("reactor.netty.http.client.HttpClient", classLoader);
reactorNetty2ClientPresent = ClassUtils.isPresent("reactor.netty5.http.client.HttpClient", classLoader);
}
private final TcpOperations<byte[]> tcpClient;
@ -60,10 +74,18 @@ public class ReactorNettyTcpStompClient extends StompClientSupport {
this.tcpClient = tcpClient;
}
private static ReactorNettyTcpClient<byte[]> initTcpClient(String host, int port) {
ReactorNettyTcpClient<byte[]> client = new ReactorNettyTcpClient<>(host, port, new StompReactorNettyCodec());
client.setLogger(SimpLogging.forLog(client.getLogger()));
return client;
private static TcpOperations<byte[]> initTcpClient(String host, int port) {
if (reactorNettyClientPresent) {
ReactorNettyTcpClient<byte[]> client = new ReactorNettyTcpClient<>(host, port, new StompReactorNettyCodec());
client.setLogger(SimpLogging.forLog(client.getLogger()));
return client;
}
else if (reactorNetty2ClientPresent) {
ReactorNetty2TcpClient<byte[]> client = new ReactorNetty2TcpClient<>(host, port, new StompTcpMessageCodec());
client.setLogger(SimpLogging.forLog(client.getLogger()));
return client;
}
throw new IllegalStateException("No compatible version of Reactor Netty");
}

View File

@ -43,10 +43,13 @@ import org.springframework.messaging.support.MessageHeaderInitializer;
import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.ReactorNetty2TcpClient;
import org.springframework.messaging.tcp.reactor.ReactorNettyCodec;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
import org.springframework.messaging.tcp.reactor.TcpMessageCodec;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
/**
* A {@link org.springframework.messaging.MessageHandler} that handles messages by
@ -101,10 +104,18 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private static final Message<byte[]> HEARTBEAT_MESSAGE;
private static final boolean reactorNettyClientPresent;
private static final boolean reactorNetty2ClientPresent;
static {
HEART_BEAT_ACCESSOR = StompHeaderAccessor.createForHeartbeat();
HEARTBEAT_MESSAGE = MessageBuilder.createMessage(
StompDecoder.HEARTBEAT_PAYLOAD, HEART_BEAT_ACCESSOR.getMessageHeaders());
ClassLoader classLoader = StompBrokerRelayMessageHandler.class.getClassLoader();
reactorNettyClientPresent = ClassUtils.isPresent("reactor.netty.http.client.HttpClient", classLoader);
reactorNetty2ClientPresent = ClassUtils.isPresent("reactor.netty5.http.client.HttpClient", classLoader);
}
@ -345,7 +356,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
/**
* Configure a TCP client for managing TCP connections to the STOMP broker.
* <p>By default {@link ReactorNettyTcpClient} is used.
* <p>By default {@link ReactorNettyTcpClient} or
* {@link ReactorNetty2TcpClient} is used.
* <p><strong>Note:</strong> when this property is used, any
* {@link #setRelayHost(String) host} or {@link #setRelayPort(int) port}
* specified are effectively ignored.
@ -458,15 +470,24 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
private ReactorNettyTcpClient<byte[]> initTcpClient() {
private TcpOperations<byte[]> initTcpClient() {
StompDecoder decoder = new StompDecoder();
if (this.headerInitializer != null) {
decoder.setHeaderInitializer(this.headerInitializer);
}
ReactorNettyCodec<byte[]> codec = new StompReactorNettyCodec(decoder);
ReactorNettyTcpClient<byte[]> client = new ReactorNettyTcpClient<>(this.relayHost, this.relayPort, codec);
client.setLogger(SimpLogging.forLog(client.getLogger()));
return client;
if (reactorNettyClientPresent) {
ReactorNettyCodec<byte[]> codec = new StompReactorNettyCodec(decoder);
ReactorNettyTcpClient<byte[]> client = new ReactorNettyTcpClient<>(this.relayHost, this.relayPort, codec);
client.setLogger(SimpLogging.forLog(client.getLogger()));
return client;
}
else if (reactorNetty2ClientPresent) {
TcpMessageCodec<byte[]> codec = new StompTcpMessageCodec(decoder);
ReactorNetty2TcpClient<byte[]> client = new ReactorNetty2TcpClient<>(this.relayHost, this.relayPort, codec);
client.setLogger(SimpLogging.forLog(client.getLogger()));
return client;
}
throw new IllegalStateException("No compatible version of Reactor Netty");
}
@Override

View File

@ -0,0 +1,63 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.stomp;
import java.nio.ByteBuffer;
import java.util.List;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.reactor.TcpMessageCodec;
/**
* {@link TcpMessageCodec} for STOMP, delegating to {@link StompDecoder} and
* {@link StompEncoder}.
*
* @author Rossen Stoyanchev
* @since 6.0
*/
public class StompTcpMessageCodec implements TcpMessageCodec<byte[]> {
private final StompDecoder decoder;
private final StompEncoder encoder;
public StompTcpMessageCodec() {
this(new StompDecoder());
}
public StompTcpMessageCodec(StompDecoder decoder) {
this(decoder, new StompEncoder());
}
public StompTcpMessageCodec(StompDecoder decoder, StompEncoder encoder) {
this.decoder = decoder;
this.encoder = encoder;
}
@Override
public List<Message<byte[]>> decode(ByteBuffer nioBuffer) {
return this.decoder.decode(nioBuffer);
}
@Override
public ByteBuffer encode(Message<byte[]> message) {
return ByteBuffer.wrap(this.encoder.encode(message));
}
}

View File

@ -0,0 +1,352 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.tcp.reactor;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Function;
import io.netty5.buffer.api.Buffer;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.channel.group.ChannelGroup;
import io.netty5.channel.group.DefaultChannelGroup;
import io.netty5.handler.codec.ByteToMessageDecoder;
import io.netty5.util.concurrent.ImmediateEventExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.netty5.Connection;
import reactor.netty5.NettyInbound;
import reactor.netty5.NettyOutbound;
import reactor.netty5.resources.ConnectionProvider;
import reactor.netty5.resources.LoopResources;
import reactor.netty5.tcp.TcpClient;
import reactor.util.retry.Retry;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.ReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.util.Assert;
/**
* Reactor Netty based implementation of {@link TcpOperations}.
*
* <p>This class is based on {@link ReactorNettyTcpClient}.
*
* @author Rossen Stoyanchev
* @since 6.0
* @param <P> the type of payload for in and outbound messages
*/
public class ReactorNetty2TcpClient<P> implements TcpOperations<P> {
private static final int PUBLISH_ON_BUFFER_SIZE = 16;
private final TcpClient tcpClient;
private final TcpMessageCodec<P> codec;
@Nullable
private final ChannelGroup channelGroup;
@Nullable
private final LoopResources loopResources;
@Nullable
private final ConnectionProvider poolResources;
private final Scheduler scheduler = Schedulers.newParallel("tcp-client-scheduler");
private Log logger = LogFactory.getLog(ReactorNetty2TcpClient.class);
private volatile boolean stopping;
/**
* Simple constructor with the host and port to use to connect to.
* <p>This constructor manages the lifecycle of the {@link TcpClient} and
* underlying resources such as {@link ConnectionProvider},
* {@link LoopResources}, and {@link ChannelGroup}.
* <p>For full control over the initialization and lifecycle of the
* TcpClient, use {@link #ReactorNetty2TcpClient(TcpClient, TcpMessageCodec)}.
* @param host the host to connect to
* @param port the port to connect to
* @param codec for encoding and decoding the input/output byte streams
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
*/
public ReactorNetty2TcpClient(String host, int port, TcpMessageCodec<P> codec) {
Assert.notNull(host, "host is required");
Assert.notNull(codec, "ReactorNettyCodec is required");
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
this.loopResources = LoopResources.create("tcp-client-loop");
this.poolResources = ConnectionProvider.create("tcp-client-pool", 10000);
this.codec = codec;
this.tcpClient = TcpClient.create(this.poolResources)
.host(host).port(port)
.runOn(this.loopResources, false)
.doOnConnected(conn -> this.channelGroup.add(conn.channel()));
}
/**
* A variant of {@link #ReactorNetty2TcpClient(String, int, TcpMessageCodec)}
* that still manages the lifecycle of the {@link TcpClient} and underlying
* resources, but allows for direct configuration of other properties of the
* client through a {@code Function<TcpClient, TcpClient>}.
* @param clientConfigurer the configurer function
* @param codec for encoding and decoding the input/output byte streams
* @since 5.1.3
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
*/
public ReactorNetty2TcpClient(Function<TcpClient, TcpClient> clientConfigurer, TcpMessageCodec<P> codec) {
Assert.notNull(codec, "ReactorNettyCodec is required");
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
this.loopResources = LoopResources.create("tcp-client-loop");
this.poolResources = ConnectionProvider.create("tcp-client-pool", 10000);
this.codec = codec;
this.tcpClient = clientConfigurer.apply(TcpClient
.create(this.poolResources)
.runOn(this.loopResources, false)
.doOnConnected(conn -> this.channelGroup.add(conn.channel())));
}
/**
* Constructor with an externally created {@link TcpClient} instance whose
* lifecycle is expected to be managed externally.
* @param tcpClient the TcpClient instance to use
* @param codec for encoding and decoding the input/output byte streams
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
*/
public ReactorNetty2TcpClient(TcpClient tcpClient, TcpMessageCodec<P> codec) {
Assert.notNull(tcpClient, "TcpClient is required");
Assert.notNull(codec, "ReactorNettyCodec is required");
this.tcpClient = tcpClient;
this.codec = codec;
this.channelGroup = null;
this.loopResources = null;
this.poolResources = null;
}
/**
* Set an alternative logger to use than the one based on the class name.
* @param logger the logger to use
* @since 5.1
*/
public void setLogger(Log logger) {
this.logger = logger;
}
/**
* Return the currently configured Logger.
* @since 5.1
*/
public Log getLogger() {
return logger;
}
@Override
public CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> handler) {
Assert.notNull(handler, "TcpConnectionHandler is required");
if (this.stopping) {
return handleShuttingDownConnectFailure(handler);
}
return extendTcpClient(this.tcpClient, handler)
.handle(new ReactorNettyHandler(handler))
.connect()
.doOnError(handler::afterConnectFailure)
.then().toFuture();
}
/**
* Provides an opportunity to initialize the {@link TcpClient} for the given
* {@link TcpConnectionHandler} which may implement sub-interfaces such as
* {@link org.springframework.messaging.simp.stomp.StompTcpConnectionHandler}
* that expose further information.
* @param tcpClient the candidate TcpClient
* @param handler the handler for the TCP connection
* @return the same handler or an updated instance
*/
protected TcpClient extendTcpClient(TcpClient tcpClient, TcpConnectionHandler<P> handler) {
return tcpClient;
}
@Override
public CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
Assert.notNull(handler, "TcpConnectionHandler is required");
Assert.notNull(strategy, "ReconnectStrategy is required");
if (this.stopping) {
return handleShuttingDownConnectFailure(handler);
}
// Report first connect to the ListenableFuture
CompletableFuture<Void> connectFuture = new CompletableFuture<>();
extendTcpClient(this.tcpClient, handler)
.handle(new ReactorNettyHandler(handler))
.connect()
.doOnNext(conn -> connectFuture.complete(null))
.doOnError(connectFuture::completeExceptionally)
.doOnError(handler::afterConnectFailure) // report all connect failures to the handler
.flatMap(Connection::onDispose) // post-connect issues
.retryWhen(Retry.from(signals -> signals
.map(retrySignal -> (int) retrySignal.totalRetriesInARow())
.flatMap(attempt -> reconnect(attempt, strategy))))
.repeatWhen(flux -> flux
.scan(1, (count, element) -> count++)
.flatMap(attempt -> reconnect(attempt, strategy)))
.subscribe();
return connectFuture;
}
private CompletableFuture<Void> handleShuttingDownConnectFailure(TcpConnectionHandler<P> handler) {
IllegalStateException ex = new IllegalStateException("Shutting down.");
handler.afterConnectFailure(ex);
return Mono.<Void>error(ex).toFuture();
}
private Publisher<? extends Long> reconnect(Integer attempt, ReconnectStrategy reconnectStrategy) {
Long time = reconnectStrategy.getTimeToNextAttempt(attempt);
return (time != null ? Mono.delay(Duration.ofMillis(time), this.scheduler) : Mono.empty());
}
@Override
public CompletableFuture<Void> shutdownAsync() {
if (this.stopping) {
return CompletableFuture.completedFuture(null);
}
this.stopping = true;
Mono<Void> result;
if (this.channelGroup != null) {
Sinks.Empty<Void> channnelGroupCloseSink = Sinks.empty();
this.channelGroup.close().addListener(future -> channnelGroupCloseSink.tryEmitEmpty());
result = channnelGroupCloseSink.asMono();
if (this.loopResources != null) {
result = result.onErrorResume(ex -> Mono.empty()).then(this.loopResources.disposeLater());
}
if (this.poolResources != null) {
result = result.onErrorResume(ex -> Mono.empty()).then(this.poolResources.disposeLater());
}
result = result.onErrorResume(ex -> Mono.empty()).then(stopScheduler());
}
else {
result = stopScheduler();
}
return result.toFuture();
}
private Mono<Void> stopScheduler() {
return Mono.fromRunnable(() -> {
this.scheduler.dispose();
for (int i = 0; i < 20; i++) {
if (this.scheduler.isDisposed()) {
break;
}
try {
Thread.sleep(100);
}
catch (Throwable ex) {
break;
}
}
});
}
@Override
public String toString() {
return "ReactorNetty2TcpClient[" + this.tcpClient + "]";
}
private class ReactorNettyHandler implements BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> {
private final TcpConnectionHandler<P> connectionHandler;
ReactorNettyHandler(TcpConnectionHandler<P> handler) {
this.connectionHandler = handler;
}
@Override
@SuppressWarnings("unchecked")
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
inbound.withConnection(conn -> {
if (logger.isDebugEnabled()) {
logger.debug("Connected to " + conn.address());
}
});
Sinks.Empty<Void> completionSink = Sinks.empty();
TcpConnection<P> connection = new ReactorNetty2TcpConnection<>(inbound, outbound, codec, completionSink);
scheduler.schedule(() -> this.connectionHandler.afterConnected(connection));
inbound.withConnection(conn -> conn.addHandlerFirst(new StompMessageDecoder<>(codec)));
inbound.receiveObject()
.cast(Message.class)
.publishOn(scheduler, PUBLISH_ON_BUFFER_SIZE)
.subscribe(
this.connectionHandler::handleMessage,
this.connectionHandler::handleFailure,
this.connectionHandler::afterConnectionClosed);
return completionSink.asMono();
}
}
private static class StompMessageDecoder<P> extends ByteToMessageDecoder {
private final TcpMessageCodec<P> codec;
StompMessageDecoder(TcpMessageCodec<P> codec) {
this.codec = codec;
}
@Override
protected void decode(ChannelHandlerContext ctx, Buffer buffer) throws Exception {
ByteBuffer byteBuffer = ByteBuffer.allocate(buffer.readableBytes());
buffer.readBytes(byteBuffer);
byteBuffer.position(0);
List<Message<P>> messages = this.codec.decode(byteBuffer);
for (Message<P> message : messages) {
ctx.fireChannelRead(message);
}
}
}
}

View File

@ -0,0 +1,84 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.tcp.reactor;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import io.netty5.buffer.api.Buffer;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.netty5.NettyInbound;
import reactor.netty5.NettyOutbound;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.TcpConnection;
/**
* Reactor Netty based implementation of {@link TcpConnection}.
*
* <p>This class is based on {@link ReactorNettyTcpConnection}.
*
* @author Rossen Stoyanchev
* @since 6.0
* @param <P> the type of payload for outbound messages
*/
public class ReactorNetty2TcpConnection<P> implements TcpConnection<P> {
private final NettyInbound inbound;
private final NettyOutbound outbound;
private final TcpMessageCodec<P> codec;
private final Sinks.Empty<Void> completionSink;
public ReactorNetty2TcpConnection(NettyInbound inbound, NettyOutbound outbound,
TcpMessageCodec<P> codec, Sinks.Empty<Void> completionSink) {
this.inbound = inbound;
this.outbound = outbound;
this.codec = codec;
this.completionSink = completionSink;
}
@Override
public CompletableFuture<Void> sendAsync(Message<P> message) {
ByteBuffer byteBuffer = this.codec.encode(message);
Buffer buffer = this.outbound.alloc().copyOf(byteBuffer);
return this.outbound.send(Mono.just(buffer)).then().toFuture();
}
@Override
public void onReadInactivity(Runnable runnable, long inactivityDuration) {
this.inbound.withConnection(conn -> conn.onReadIdle(inactivityDuration, runnable));
}
@Override
public void onWriteInactivity(Runnable runnable, long inactivityDuration) {
this.inbound.withConnection(conn -> conn.onWriteIdle(inactivityDuration, runnable));
}
@Override
public void close() {
// Ignore result: concurrent attempts to complete are ok
this.completionSink.tryEmitEmpty();
}
}

View File

@ -0,0 +1,48 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.tcp.reactor;
import java.nio.ByteBuffer;
import java.util.List;
import org.springframework.messaging.Message;
/**
* Contract to encode and decode a {@link Message} to and from a {@link ByteBuffer}
* allowing a higher-level protocol (e.g. STOMP over TCP) to plug in.
*
* @author Rossen Stoyanchev
* @since 6.0
* @param <P> the message payload type
*/
public interface TcpMessageCodec<P> {
/**
* Decode the input {@link ByteBuffer} into one or more {@link Message Messages}.
* @param buffer the input buffer to decode from
* @return 0 or more decoded messages
*/
List<Message<P>> decode(ByteBuffer buffer);
/**
* Encode the given {@link Message} to the output {@link ByteBuffer}.
* @param message the message to encode
* @return the encoded buffer
*/
ByteBuffer encode(Message<P> message);
}

View File

@ -47,6 +47,7 @@ import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.broker.BrokerAvailabilityEvent;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.util.Assert;
import static org.assertj.core.api.Assertions.assertThat;
@ -56,11 +57,10 @@ import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
* Integration tests for {@link StompBrokerRelayMessageHandler} running against ActiveMQ.
*
* @author Rossen Stoyanchev
* @author Sam Brannen
*/
public class StompBrokerRelayMessageHandlerIntegrationTests {
public abstract class AbstractStompBrokerRelayIntegrationTests {
private static final Log logger = LogFactory.getLog(StompBrokerRelayMessageHandlerIntegrationTests.class);
private static final Log logger = LogFactory.getLog(AbstractStompBrokerRelayIntegrationTests.class);
private StompBrokerRelayMessageHandler relay;
@ -77,7 +77,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@BeforeEach
@SuppressWarnings("deprecation")
public void setup(TestInfo testInfo) throws Exception {
logger.debug("Setting up before '" + testInfo.getTestMethod().get().getName() + "'");
@ -121,10 +120,15 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
this.relay.setSystemHeartbeatSendInterval(0);
this.relay.setPreservePublishOrder(true);
TcpOperations<byte[]> tcpClient = initTcpClient(this.port);
this.relay.setTcpClient(tcpClient);
this.relay.start();
this.eventPublisher.expectBrokerAvailabilityEvent(true);
}
protected abstract TcpOperations<byte[]> initTcpClient(int port);
@AfterEach
public void stop() throws Exception {
try {

View File

@ -0,0 +1,35 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.stomp;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.ReactorNetty2TcpClient;
/**
* Integration tests for {@link StompBrokerRelayMessageHandler} running against
* ActiveMQ with {@link ReactorNetty2TcpClient}.
*
* @author Rossen Stoyanchev
*/
public class ReactorNetty2StompBrokerRelayIntegrationTests extends AbstractStompBrokerRelayIntegrationTests {
@Override
protected TcpOperations<byte[]> initTcpClient(int port) {
return new ReactorNetty2TcpClient<>("127.0.0.1", port, new StompTcpMessageCodec());
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.stomp;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
/**
* Integration tests for {@link StompBrokerRelayMessageHandler} running against
* ActiveMQ with {@link ReactorNettyTcpClient}.
*
* @author Rossen Stoyanchev
*/
public class ReactorNettyStompBrokerRelayIntegrationTests extends AbstractStompBrokerRelayIntegrationTests {
@Override
protected TcpOperations<byte[]> initTcpClient(int port) {
return new ReactorNettyTcpClient<>("127.0.0.1", port, new StompReactorNettyCodec());
}
}

View File

@ -37,6 +37,7 @@ import org.junit.jupiter.api.TestInfo;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.messaging.simp.stomp.StompSession.Subscription;
import org.springframework.messaging.tcp.reactor.ReactorNetty2TcpClient;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;
@ -57,6 +58,8 @@ public class ReactorNettyTcpStompClientTests {
private ReactorNettyTcpStompClient client;
private ReactorNettyTcpStompClient client2;
@BeforeEach
public void setup(TestInfo testInfo) throws Exception {
@ -75,9 +78,16 @@ public class ReactorNettyTcpStompClientTests {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
this.client = new ReactorNettyTcpStompClient("127.0.0.1", stompConnector.getServer().getSocketAddress().getPort());
int port = stompConnector.getServer().getSocketAddress().getPort();
String host = "127.0.0.1";
this.client = new ReactorNettyTcpStompClient(host, port);
this.client.setMessageConverter(new StringMessageConverter());
this.client.setTaskScheduler(taskScheduler);
this.client2 = new ReactorNettyTcpStompClient(new ReactorNetty2TcpClient<>(host, port, new StompTcpMessageCodec()));
this.client2.setMessageConverter(new StringMessageConverter());
this.client2.setTaskScheduler(taskScheduler);
}
private TransportConnector createStompConnector() throws Exception {
@ -90,6 +100,7 @@ public class ReactorNettyTcpStompClientTests {
public void shutdown() throws Exception {
try {
this.client.shutdown();
this.client2.shutdown();
}
catch (Throwable ex) {
logger.error("Failed to shut client", ex);
@ -103,14 +114,24 @@ public class ReactorNettyTcpStompClientTests {
}
}
@Test
public void publishSubscribe() throws Exception {
public void publishSubscribeOnReactorNetty() throws Exception {
testPublishSubscribe(this.client);
}
@Test
public void publishSubscribeOnReactorNetty2() throws Exception {
testPublishSubscribe(this.client2);
}
private void testPublishSubscribe(ReactorNettyTcpStompClient clientToUse) throws Exception {
String destination = "/topic/foo";
ConsumingHandler consumingHandler1 = new ConsumingHandler(destination);
CompletableFuture<StompSession> consumerFuture1 = this.client.connectAsync(consumingHandler1);
CompletableFuture<StompSession> consumerFuture1 = clientToUse.connectAsync(consumingHandler1);
ConsumingHandler consumingHandler2 = new ConsumingHandler(destination);
CompletableFuture<StompSession> consumerFuture2 = this.client.connectAsync(consumingHandler2);
CompletableFuture<StompSession> consumerFuture2 = clientToUse.connectAsync(consumingHandler2);
assertThat(consumingHandler1.awaitForSubscriptions(5000)).isTrue();
assertThat(consumingHandler2.awaitForSubscriptions(5000)).isTrue();
@ -118,7 +139,7 @@ public class ReactorNettyTcpStompClientTests {
ProducingHandler producingHandler = new ProducingHandler();
producingHandler.addToSend(destination, "foo1");
producingHandler.addToSend(destination, "foo2");
CompletableFuture<StompSession> producerFuture = this.client.connectAsync(producingHandler);
CompletableFuture<StompSession> producerFuture = clientToUse.connectAsync(producingHandler);
assertThat(consumingHandler1.awaitForMessageCount(2, 5000)).isTrue();
assertThat(consumingHandler1.getReceived()).containsExactly("foo1", "foo2");