From 76eb5e6e2c7ae668646b5c6b73ca8cacc8247643 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 19 Oct 2020 21:30:52 +0100 Subject: [PATCH] Use of TcpClient extensible per connection Closes gh-25889 --- .../stomp/ConnectionHandlingStompSession.java | 3 +- .../simp/stomp/DefaultStompSession.java | 7 ++++ .../stomp/StompBrokerRelayMessageHandler.java | 26 ++++++------ .../simp/stomp/StompTcpConnectionHandler.java | 42 +++++++++++++++++++ .../tcp/reactor/ReactorNettyTcpClient.java | 17 +++++++- 5 files changed, 78 insertions(+), 17 deletions(-) create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompTcpConnectionHandler.java diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ConnectionHandlingStompSession.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ConnectionHandlingStompSession.java index d867e51595f..55fe372dd8c 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ConnectionHandlingStompSession.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ConnectionHandlingStompSession.java @@ -16,7 +16,6 @@ package org.springframework.messaging.simp.stomp; -import org.springframework.messaging.tcp.TcpConnectionHandler; import org.springframework.util.concurrent.ListenableFuture; /** @@ -30,7 +29,7 @@ import org.springframework.util.concurrent.ListenableFuture; * @author Rossen Stoyanchev * @since 4.2 */ -public interface ConnectionHandlingStompSession extends StompSession, TcpConnectionHandler { +public interface ConnectionHandlingStompSession extends StompSession, StompTcpConnectionHandler { /** * Return a future that will complete when the session is ready for use. diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java index 45f7061e0ee..f54dc05e208 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java @@ -133,6 +133,13 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { return this.sessionId; } + @Override + public StompHeaderAccessor getConnectHeaders() { + StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.CONNECT); + accessor.addNativeHeaders(this.connectHeaders); + return accessor; + } + /** * Return the configured session handler. */ diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index e4081df3bfd..c95f208b338 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -41,7 +41,6 @@ import org.springframework.messaging.support.MessageHeaderAccessor; import org.springframework.messaging.support.MessageHeaderInitializer; import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy; import org.springframework.messaging.tcp.TcpConnection; -import org.springframework.messaging.tcp.TcpConnectionHandler; import org.springframework.messaging.tcp.TcpOperations; import org.springframework.messaging.tcp.reactor.ReactorNettyCodec; import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient; @@ -141,7 +140,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler private final DefaultStats stats = new DefaultStats(); - private final Map connectionHandlers = new ConcurrentHashMap<>(); + private final Map connectionHandlers = new ConcurrentHashMap<>(); @Nullable private TaskScheduler taskScheduler; @@ -451,7 +450,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler logger.debug("Forwarding " + accessor.getShortLogMessage(EMPTY_PAYLOAD)); } - SystemStompConnectionHandler handler = new SystemStompConnectionHandler(accessor); + SystemSessionConnectionHandler handler = new SystemSessionConnectionHandler(accessor); this.connectionHandlers.put(handler.getSessionId(), handler); this.stats.incrementConnectCount(); @@ -495,7 +494,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler throw new MessageDeliveryException("Message broker not active. Consider subscribing to " + "receive BrokerAvailabilityEvent's from an ApplicationListener Spring bean."); } - StompConnectionHandler handler = this.connectionHandlers.get(sessionId); + RelayConnectionHandler handler = this.connectionHandlers.get(sessionId); if (handler != null) { handler.sendStompErrorFrameToClient("Broker not available."); handler.clearConnection(); @@ -562,14 +561,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler if (getVirtualHost() != null) { stompAccessor.setHost(getVirtualHost()); } - StompConnectionHandler handler = new StompConnectionHandler(sessionId, stompAccessor); + RelayConnectionHandler handler = new RelayConnectionHandler(sessionId, stompAccessor); this.connectionHandlers.put(sessionId, handler); this.stats.incrementConnectCount(); Assert.state(this.tcpClient != null, "No TCP client available"); this.tcpClient.connect(handler); } else if (StompCommand.DISCONNECT.equals(command)) { - StompConnectionHandler handler = this.connectionHandlers.get(sessionId); + RelayConnectionHandler handler = this.connectionHandlers.get(sessionId); if (handler == null) { if (logger.isDebugEnabled()) { logger.debug("Ignoring DISCONNECT in session " + sessionId + ". Connection already cleaned up."); @@ -580,7 +579,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler handler.forward(message, stompAccessor); } else { - StompConnectionHandler handler = this.connectionHandlers.get(sessionId); + RelayConnectionHandler handler = this.connectionHandlers.get(sessionId); if (handler == null) { if (logger.isDebugEnabled()) { logger.debug("No TCP connection for session " + sessionId + " in " + message); @@ -611,7 +610,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } - private class StompConnectionHandler implements TcpConnectionHandler { + private class RelayConnectionHandler implements StompTcpConnectionHandler { private final String sessionId; @@ -634,11 +633,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler private long clientSendMessageTimestamp; - protected StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders) { + protected RelayConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders) { this(sessionId, connectHeaders, true); } - private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders, boolean isClientSession) { + private RelayConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders, boolean isClientSession) { Assert.notNull(sessionId, "'sessionId' must not be null"); Assert.notNull(connectHeaders, "'connectHeaders' must not be null"); this.sessionId = sessionId; @@ -662,6 +661,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler return this.sessionId; } + @Override public StompHeaderAccessor getConnectHeaders() { return this.connectHeaders; } @@ -968,9 +968,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } - private class SystemStompConnectionHandler extends StompConnectionHandler { + private class SystemSessionConnectionHandler extends RelayConnectionHandler { - public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) { + public SystemSessionConnectionHandler(StompHeaderAccessor connectHeaders) { super(SYSTEM_SESSION_ID, connectHeaders, false); } @@ -1099,7 +1099,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @Override public void run() { long now = System.currentTimeMillis(); - for (StompConnectionHandler handler : connectionHandlers.values()) { + for (RelayConnectionHandler handler : connectionHandlers.values()) { handler.updateClientSendMessageCount(now); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompTcpConnectionHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompTcpConnectionHandler.java new file mode 100644 index 00000000000..098f985cb30 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompTcpConnectionHandler.java @@ -0,0 +1,42 @@ +/* + * Copyright 2002-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.messaging.simp.stomp; + +import org.springframework.messaging.simp.SimpMessageHeaderAccessor; +import org.springframework.messaging.tcp.TcpConnectionHandler; + +/** + * A {@link TcpConnectionHandler} for use with STOMP connections, exposing + * further information about the connection. + * + * @author Rossen Stoyanchev + * @since 5.3 + * @param

the type of payload for inbound and outbound messages + */ +public interface StompTcpConnectionHandler

extends TcpConnectionHandler

{ + + /** + * Return the {@link SimpMessageHeaderAccessor#getSessionId() sessionId} + * associated with the STOMP connection. + */ + String getSessionId(); + + /** + * Return the headers that will be sent in the STOMP CONNECT frame. + */ + StompHeaderAccessor getConnectHeaders(); + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 0bb08d36ac9..1230998357d 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -186,7 +186,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ return handleShuttingDownConnectFailure(handler); } - Mono connectMono = this.tcpClient + Mono connectMono = extendTcpClient(this.tcpClient, handler) .handle(new ReactorNettyHandler(handler)) .connect() .doOnError(handler::afterConnectFailure) @@ -195,6 +195,19 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ return new MonoToListenableFutureAdapter<>(connectMono); } + /** + * Provides an opportunity to initialize the {@link TcpClient} for the given + * {@link TcpConnectionHandler} which may implement sub-interfaces such as + * {@link org.springframework.messaging.simp.stomp.StompTcpConnectionHandler} + * that expose further information. + * @param tcpClient the candidate TcpClient + * @param handler the handler for the TCP connection + * @return the same handler or an updated instance + */ + protected TcpClient extendTcpClient(TcpClient tcpClient, TcpConnectionHandler

handler) { + return tcpClient; + } + @Override public ListenableFuture connect(TcpConnectionHandler

handler, ReconnectStrategy strategy) { Assert.notNull(handler, "TcpConnectionHandler is required"); @@ -207,7 +220,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ // Report first connect to the ListenableFuture CompletableFuture connectFuture = new CompletableFuture<>(); - this.tcpClient + extendTcpClient(this.tcpClient, handler) .handle(new ReactorNettyHandler(handler)) .connect() .doOnNext(conn -> connectFuture.complete(null))