Use of TcpClient extensible per connection

Closes gh-25889
This commit is contained in:
Rossen Stoyanchev 2020-10-19 21:30:52 +01:00
parent 43f595f80e
commit 76eb5e6e2c
5 changed files with 78 additions and 17 deletions

View File

@ -16,7 +16,6 @@
package org.springframework.messaging.simp.stomp;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.util.concurrent.ListenableFuture;
/**
@ -30,7 +29,7 @@ import org.springframework.util.concurrent.ListenableFuture;
* @author Rossen Stoyanchev
* @since 4.2
*/
public interface ConnectionHandlingStompSession extends StompSession, TcpConnectionHandler<byte[]> {
public interface ConnectionHandlingStompSession extends StompSession, StompTcpConnectionHandler<byte[]> {
/**
* Return a future that will complete when the session is ready for use.

View File

@ -133,6 +133,13 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
return this.sessionId;
}
@Override
public StompHeaderAccessor getConnectHeaders() {
StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.CONNECT);
accessor.addNativeHeaders(this.connectHeaders);
return accessor;
}
/**
* Return the configured session handler.
*/

View File

@ -41,7 +41,6 @@ import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.messaging.support.MessageHeaderInitializer;
import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.ReactorNettyCodec;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
@ -141,7 +140,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private final DefaultStats stats = new DefaultStats();
private final Map<String, StompConnectionHandler> connectionHandlers = new ConcurrentHashMap<>();
private final Map<String, RelayConnectionHandler> connectionHandlers = new ConcurrentHashMap<>();
@Nullable
private TaskScheduler taskScheduler;
@ -451,7 +450,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
logger.debug("Forwarding " + accessor.getShortLogMessage(EMPTY_PAYLOAD));
}
SystemStompConnectionHandler handler = new SystemStompConnectionHandler(accessor);
SystemSessionConnectionHandler handler = new SystemSessionConnectionHandler(accessor);
this.connectionHandlers.put(handler.getSessionId(), handler);
this.stats.incrementConnectCount();
@ -495,7 +494,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
throw new MessageDeliveryException("Message broker not active. Consider subscribing to " +
"receive BrokerAvailabilityEvent's from an ApplicationListener Spring bean.");
}
StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
RelayConnectionHandler handler = this.connectionHandlers.get(sessionId);
if (handler != null) {
handler.sendStompErrorFrameToClient("Broker not available.");
handler.clearConnection();
@ -562,14 +561,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
if (getVirtualHost() != null) {
stompAccessor.setHost(getVirtualHost());
}
StompConnectionHandler handler = new StompConnectionHandler(sessionId, stompAccessor);
RelayConnectionHandler handler = new RelayConnectionHandler(sessionId, stompAccessor);
this.connectionHandlers.put(sessionId, handler);
this.stats.incrementConnectCount();
Assert.state(this.tcpClient != null, "No TCP client available");
this.tcpClient.connect(handler);
}
else if (StompCommand.DISCONNECT.equals(command)) {
StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
RelayConnectionHandler handler = this.connectionHandlers.get(sessionId);
if (handler == null) {
if (logger.isDebugEnabled()) {
logger.debug("Ignoring DISCONNECT in session " + sessionId + ". Connection already cleaned up.");
@ -580,7 +579,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
handler.forward(message, stompAccessor);
}
else {
StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
RelayConnectionHandler handler = this.connectionHandlers.get(sessionId);
if (handler == null) {
if (logger.isDebugEnabled()) {
logger.debug("No TCP connection for session " + sessionId + " in " + message);
@ -611,7 +610,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
private class StompConnectionHandler implements TcpConnectionHandler<byte[]> {
private class RelayConnectionHandler implements StompTcpConnectionHandler<byte[]> {
private final String sessionId;
@ -634,11 +633,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private long clientSendMessageTimestamp;
protected StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders) {
protected RelayConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders) {
this(sessionId, connectHeaders, true);
}
private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders, boolean isClientSession) {
private RelayConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders, boolean isClientSession) {
Assert.notNull(sessionId, "'sessionId' must not be null");
Assert.notNull(connectHeaders, "'connectHeaders' must not be null");
this.sessionId = sessionId;
@ -662,6 +661,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return this.sessionId;
}
@Override
public StompHeaderAccessor getConnectHeaders() {
return this.connectHeaders;
}
@ -968,9 +968,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
private class SystemStompConnectionHandler extends StompConnectionHandler {
private class SystemSessionConnectionHandler extends RelayConnectionHandler {
public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) {
public SystemSessionConnectionHandler(StompHeaderAccessor connectHeaders) {
super(SYSTEM_SESSION_ID, connectHeaders, false);
}
@ -1099,7 +1099,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override
public void run() {
long now = System.currentTimeMillis();
for (StompConnectionHandler handler : connectionHandlers.values()) {
for (RelayConnectionHandler handler : connectionHandlers.values()) {
handler.updateClientSendMessageCount(now);
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.stomp;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.tcp.TcpConnectionHandler;
/**
* A {@link TcpConnectionHandler} for use with STOMP connections, exposing
* further information about the connection.
*
* @author Rossen Stoyanchev
* @since 5.3
* @param <P> the type of payload for inbound and outbound messages
*/
public interface StompTcpConnectionHandler<P> extends TcpConnectionHandler<P> {
/**
* Return the {@link SimpMessageHeaderAccessor#getSessionId() sessionId}
* associated with the STOMP connection.
*/
String getSessionId();
/**
* Return the headers that will be sent in the STOMP CONNECT frame.
*/
StompHeaderAccessor getConnectHeaders();
}

View File

@ -186,7 +186,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
return handleShuttingDownConnectFailure(handler);
}
Mono<Void> connectMono = this.tcpClient
Mono<Void> connectMono = extendTcpClient(this.tcpClient, handler)
.handle(new ReactorNettyHandler(handler))
.connect()
.doOnError(handler::afterConnectFailure)
@ -195,6 +195,19 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
return new MonoToListenableFutureAdapter<>(connectMono);
}
/**
* Provides an opportunity to initialize the {@link TcpClient} for the given
* {@link TcpConnectionHandler} which may implement sub-interfaces such as
* {@link org.springframework.messaging.simp.stomp.StompTcpConnectionHandler}
* that expose further information.
* @param tcpClient the candidate TcpClient
* @param handler the handler for the TCP connection
* @return the same handler or an updated instance
*/
protected TcpClient extendTcpClient(TcpClient tcpClient, TcpConnectionHandler<P> handler) {
return tcpClient;
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
Assert.notNull(handler, "TcpConnectionHandler is required");
@ -207,7 +220,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
// Report first connect to the ListenableFuture
CompletableFuture<Void> connectFuture = new CompletableFuture<>();
this.tcpClient
extendTcpClient(this.tcpClient, handler)
.handle(new ReactorNettyHandler(handler))
.connect()
.doOnNext(conn -> connectFuture.complete(null))