Add TCP abstractions to spring-messaging

This change adds abstractions for opening and managing TCP connections
primarily for use with the STOMP broker support. As one immediate
benefit the change makes the  StompBrokerRelayMessageHandler more
easy to test.
This commit is contained in:
Rossen Stoyanchev 2013-10-17 22:12:06 -04:00
parent a172b32d4c
commit 29934d7c02
12 changed files with 858 additions and 319 deletions

View File

@ -37,6 +37,10 @@ public class MessageDeliveryException extends MessagingException {
super(undeliveredMessage, description);
}
public MessageDeliveryException(Message<?> message, Throwable cause) {
super(message, cause);
}
public MessageDeliveryException(Message<?> undeliveredMessage, String description, Throwable cause) {
super(undeliveredMessage, description, cause);
}

View File

@ -16,11 +16,10 @@
package org.springframework.messaging.simp.stomp;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
@ -29,21 +28,15 @@ import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.tcp.FixedIntervalReconnectStrategy;
import org.springframework.messaging.support.tcp.ReactorNettyTcpClient;
import org.springframework.messaging.support.tcp.TcpConnection;
import org.springframework.messaging.support.tcp.TcpConnectionHandler;
import org.springframework.messaging.support.tcp.TcpOperations;
import org.springframework.util.Assert;
import reactor.core.Environment;
import reactor.core.composable.Composable;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.spec.DeferredPromiseSpec;
import reactor.function.Consumer;
import reactor.tcp.Reconnect;
import reactor.tcp.TcpClient;
import reactor.tcp.TcpConnection;
import reactor.tcp.netty.NettyTcpClient;
import reactor.tcp.spec.TcpClientSpec;
import reactor.tuple.Tuple;
import reactor.tuple.Tuple2;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.ListenableFutureTask;
/**
@ -61,7 +54,7 @@ import reactor.tuple.Tuple2;
* opposed to from a client). Such messages are recognized because they are not associated
* with any client and therefore do not have a session id header. The "system" connection
* is effectively shared and cannot be used to receive messages. Several properties are
* provided to configure the "system" session including the the
* provided to configure the "system" connection including the the
* {@link #setSystemLogin(String) login} {@link #setSystemPasscode(String) passcode},
* heartbeat {@link #setSystemHeartbeatSendInterval(long) send} and
* {@link #setSystemHeartbeatReceiveInterval(long) receive} intervals.
@ -72,6 +65,13 @@ import reactor.tuple.Tuple2;
*/
public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler {
private static final byte[] EMPTY_PAYLOAD = new byte[0];
private static final Message<byte[]> HEARTBEAT_MESSAGE = MessageBuilder.withPayload(new byte[] {'\n'}).build();
private static final long HEARTBEAT_MULTIPLIER = 3;
private final MessageChannel messageChannel;
private String relayHost = "127.0.0.1";
@ -88,11 +88,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private String virtualHost;
private Environment environment;
private TcpOperations<byte[]> tcpClient;
private TcpClient<Message<byte[]>, Message<byte[]>> tcpClient;
private final Map<String, StompRelaySession> relaySessions = new ConcurrentHashMap<String, StompRelaySession>();
private final Map<String, StompConnectionHandler> connectionHandlers =
new ConcurrentHashMap<String, StompConnectionHandler>();
/**
@ -137,20 +136,31 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
/**
* Set the interval, in milliseconds, at which the "system" relay session will, in the
* Configure the TCP client to for managing STOMP over TCP connections to the message
* broker. This is an optional property that can be used to replace the default
* implementation used for example for testing purposes.
* <p>
* By default an instance of {@link ReactorNettyTcpClient} is used.
*/
public void setTcpClient(TcpOperations<byte[]> tcpClient) {
this.tcpClient = tcpClient;
}
/**
* Set the interval, in milliseconds, at which the "system" connection will, in the
* absence of any other data being sent, send a heartbeat to the STOMP broker. A value
* of zero will prevent heartbeats from being sent to the broker.
* <p>
* The default value is 10000.
* <p>
* See class-level documentation for more information on the "system" session.
* See class-level documentation for more information on the "system" connection.
*/
public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) {
this.systemHeartbeatSendInterval = systemHeartbeatSendInterval;
}
/**
* @return The interval, in milliseconds, at which the "system" relay session will
* @return The interval, in milliseconds, at which the "system" connection will
* send heartbeats to the STOMP broker.
*/
public long getSystemHeartbeatSendInterval() {
@ -158,21 +168,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
/**
* Set the maximum interval, in milliseconds, at which the "system" relay session
* Set the maximum interval, in milliseconds, at which the "system" connection
* expects, in the absence of any other data, to receive a heartbeat from the STOMP
* broker. A value of zero will configure the relay session to expect not to receive
* broker. A value of zero will configure the connection to expect not to receive
* heartbeats from the broker.
* <p>
* The default value is 10000.
* <p>
* See class-level documentation for more information on the "system" session.
* See class-level documentation for more information on the "system" connection.
*/
public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) {
this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval;
}
/**
* @return The interval, in milliseconds, at which the "system" relay session expects
* @return The interval, in milliseconds, at which the "system" connection expects
* to receive heartbeats from the STOMP broker.
*/
public long getSystemHeartbeatReceiveInterval() {
@ -180,10 +190,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
/**
* Set the login for the "system" relay session used to send messages to the STOMP
* Set the login for the "system" connection used to send messages to the STOMP
* broker without having a client session (e.g. REST/HTTP request handling method).
* <p>
* See class-level documentation for more information on the "system" session.
* See class-level documentation for more information on the "system" connection.
*/
public void setSystemLogin(String systemLogin) {
Assert.hasText(systemLogin, "systemLogin must not be empty");
@ -191,24 +201,24 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
/**
* @return the login used by the "system" relay session to connect to the STOMP broker
* @return the login used by the "system" connection to connect to the STOMP broker
*/
public String getSystemLogin() {
return this.systemLogin;
}
/**
* Set the passcode for the "system" relay session used to send messages to the STOMP
* Set the passcode for the "system" connection used to send messages to the STOMP
* broker without having a client session (e.g. REST/HTTP request handling method).
* <p>
* See class-level documentation for more information on the "system" session.
* See class-level documentation for more information on the "system" connection.
*/
public void setSystemPasscode(String systemPasscode) {
this.systemPasscode = systemPasscode;
}
/**
* @return the passcode used by the "system" relay session to connect to the STOMP broker
* @return the passcode used by the "system" connection to connect to the STOMP broker
*/
public String getSystemPasscode() {
return this.systemPasscode;
@ -237,12 +247,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override
protected void startInternal() {
this.environment = new Environment();
this.tcpClient = new TcpClientSpec<Message<byte[]>, Message<byte[]>>(NettyTcpClient.class)
.env(this.environment)
.codec(new StompCodec())
.connect(this.relayHost, this.relayPort)
.get();
this.tcpClient = new ReactorNettyTcpClient<byte[]>(this.relayHost, this.relayPort, new StompCodec());
if (logger.isDebugEnabled()) {
logger.debug("Initializing \"system\" TCP connection");
@ -254,30 +260,28 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
headers.setPasscode(this.systemPasscode);
headers.setHeartbeat(this.systemHeartbeatSendInterval, this.systemHeartbeatReceiveInterval);
headers.setHost(getVirtualHost());
Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
SystemStompRelaySession session = new SystemStompRelaySession();
session.connect(message);
SystemStompConnectionHandler handler = new SystemStompConnectionHandler(headers);
this.connectionHandlers.put(handler.getSessionId(), handler);
this.relaySessions.put(session.getId(), session);
this.tcpClient.connect(handler, new FixedIntervalReconnectStrategy(5000));
}
@Override
protected void stopInternal() {
for (StompRelaySession session: this.relaySessions.values()) {
session.disconnect();
}
for (StompConnectionHandler handler : this.connectionHandlers.values()) {
try {
this.tcpClient.close().await();
handler.resetTcpConnection();
}
catch (Throwable t) {
logger.error("Failed to close reactor TCP client", t);
logger.error("Failed to close STOMP connection " + t.getMessage());
}
}
try {
this.environment.shutdown();
this.tcpClient.shutdown();
}
catch (Throwable t) {
logger.error("Failed to shut down reactor Environment", t);
logger.error("Error while shutting down TCP client", t);
}
}
@ -291,7 +295,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
SimpMessageType messageType = headers.getMessageType();
if (SimpMessageType.MESSAGE.equals(messageType)) {
sessionId = (sessionId == null) ? SystemStompRelaySession.ID : sessionId;
sessionId = (sessionId == null) ? SystemStompConnectionHandler.SESSION_ID : sessionId;
headers.setSessionId(sessionId);
command = headers.updateStompCommandAsClientMessage();
message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build();
@ -309,138 +313,120 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
if (SimpMessageType.CONNECT.equals(messageType)) {
if (getVirtualHost() != null) {
headers.setHost(getVirtualHost());
message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build();
}
StompRelaySession session = new StompRelaySession(sessionId);
session.connect(message);
this.relaySessions.put(sessionId, session);
StompConnectionHandler handler = new StompConnectionHandler(sessionId, headers);
this.connectionHandlers.put(sessionId, handler);
this.tcpClient.connect(handler);
}
else if (SimpMessageType.DISCONNECT.equals(messageType)) {
StompRelaySession session = this.relaySessions.remove(sessionId);
if (session == null) {
StompConnectionHandler handler = removeConnectionHandler(sessionId);
if (handler == null) {
if (logger.isTraceEnabled()) {
logger.trace("Session already removed, sessionId=" + sessionId);
logger.trace("Connection already removed for sessionId=" + sessionId);
}
return;
}
session.forward(message);
handler.forward(message);
}
else {
StompRelaySession session = this.relaySessions.get(sessionId);
if (session == null) {
logger.warn("Session id=" + sessionId + " not found. Ignoring message: " + message);
StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
if (handler == null) {
logger.warn("Connection for sessionId=" + sessionId + " not found. Ignoring message: " + message);
return;
}
session.forward(message);
handler.forward(message);
}
}
private StompConnectionHandler removeConnectionHandler(String sessionId) {
return SystemStompConnectionHandler.SESSION_ID.equals(sessionId)
? null : this.connectionHandlers.remove(sessionId);
}
private class StompRelaySession {
private static final long HEARTBEAT_MULTIPLIER = 3;
private class StompConnectionHandler implements TcpConnectionHandler<byte[]> {
private final String sessionId;
private final boolean isRemoteClientSession;
private final long reconnectInterval;
private final StompHeaderAccessor connectHeaders;
private volatile StompConnection stompConnection = new StompConnection();
private volatile TcpConnection<byte[]> tcpConnection;
private volatile StompHeaderAccessor connectHeaders;
private volatile StompHeaderAccessor connectedHeaders;
private volatile boolean isStompConnected;
private StompRelaySession(String sessionId) {
this(sessionId, true, 0);
private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders) {
this(sessionId, connectHeaders, true);
}
private StompRelaySession(String sessionId, boolean isRemoteClientSession, long reconnectInterval) {
private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders,
boolean isRemoteClientSession) {
Assert.notNull(sessionId, "sessionId is required");
Assert.notNull(connectHeaders, "connectHeaders is required");
this.sessionId = sessionId;
this.connectHeaders = connectHeaders;
this.isRemoteClientSession = isRemoteClientSession;
this.reconnectInterval = reconnectInterval;
}
public String getId() {
public String getSessionId() {
return this.sessionId;
}
public void connect(final Message<?> connectMessage) {
Assert.notNull(connectMessage, "connectMessage is required");
this.connectHeaders = StompHeaderAccessor.wrap(connectMessage);
Composable<TcpConnection<Message<byte[]>, Message<byte[]>>> promise;
if (this.reconnectInterval > 0) {
promise = tcpClient.open(new Reconnect() {
@Override
public Tuple2<InetSocketAddress, Long> reconnect(InetSocketAddress address, int attempt) {
return Tuple.of(address, 5000L);
}
});
}
else {
promise = tcpClient.open();
public void afterConnected(TcpConnection<byte[]> connection) {
this.tcpConnection = connection;
connection.send(MessageBuilder.withPayload(EMPTY_PAYLOAD).setHeaders(this.connectHeaders).build());
}
promise.consume(new Consumer<TcpConnection<Message<byte[]>, Message<byte[]>>>() {
@Override
public void accept(TcpConnection<Message<byte[]>, Message<byte[]>> connection) {
handleConnectionReady(connection, connectMessage);
public void afterConnectFailure(Throwable ex) {
handleTcpConnectionFailure("Failed to connect to message broker", ex);
}
});
promise.when(Throwable.class, new Consumer<Throwable>() {
/**
* Invoked when any TCP connectivity issue is detected, i.e. failure to establish
* the TCP connection, failure to send a message, missed heartbeat.
*/
protected void handleTcpConnectionFailure(String errorMessage, Throwable ex) {
if (logger.isErrorEnabled()) {
logger.error(errorMessage + ", sessionId=" + this.sessionId, ex);
}
resetTcpConnection();
sendStompErrorToClient(errorMessage);
}
private void sendStompErrorToClient(String errorText) {
if (this.isRemoteClientSession) {
StompConnectionHandler removed = removeConnectionHandler(this.sessionId);
if (removed != null) {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR);
headers.setSessionId(this.sessionId);
headers.setMessage(errorText);
Message<?> errorMessage = MessageBuilder.withPayload(EMPTY_PAYLOAD).setHeaders(headers).build();
sendMessageToClient(errorMessage);
}
}
}
protected void sendMessageToClient(Message<?> message) {
if (this.isRemoteClientSession) {
StompBrokerRelayMessageHandler.this.messageChannel.send(message);
}
}
@Override
public void accept(Throwable ex) {
relaySessions.remove(sessionId);
handleTcpClientFailure("Failed to connect to message broker", ex);
}
});
}
public void disconnect() {
this.stompConnection.setDisconnected();
}
protected void handleConnectionReady(
TcpConnection<Message<byte[]>, Message<byte[]>> tcpConn, final Message<?> connectMessage) {
this.stompConnection.setTcpConnection(tcpConn);
tcpConn.on().close(new Runnable() {
@Override
public void run() {
connectionClosed();
}
});
tcpConn.in().consume(new Consumer<Message<byte[]>>() {
@Override
public void accept(Message<byte[]> message) {
readStompFrame(message);
}
});
forwardInternal(connectMessage, tcpConn);
}
protected void connectionClosed() {
relaySessions.remove(this.sessionId);
if (this.stompConnection.isReady()) {
sendError("Lost connection to the broker");
}
}
private void readStompFrame(Message<byte[]> message) {
public void handleMessage(Message<byte[]> message) {
if (logger.isTraceEnabled()) {
logger.trace("Reading message for sessionId=" + sessionId + ", " + message);
logger.trace("Reading message for sessionId=" + this.sessionId + ", " + message);
}
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
if (StompCommand.CONNECTED == headers.getCommand()) {
this.connectedHeaders = headers;
connected();
afterStompConnected(headers);
}
headers.setSessionId(this.sessionId);
@ -448,231 +434,158 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
sendMessageToClient(message);
}
private void initHeartbeats() {
/**
* Invoked after the STOMP CONNECTED frame is received. At this point the
* connection is ready for sending STOMP messages to the broker.
*/
protected void afterStompConnected(StompHeaderAccessor connectedHeaders) {
this.isStompConnected = true;
initHeartbeats(connectedHeaders);
}
private void initHeartbeats(StompHeaderAccessor connectedHeaders) {
// Remote clients do their own heartbeat management
if (this.isRemoteClientSession) {
return;
}
long clientSendInterval = this.connectHeaders.getHeartbeat()[0];
long clientReceiveInterval = this.connectHeaders.getHeartbeat()[1];
long serverSendInterval = this.connectedHeaders.getHeartbeat()[0];
long serverReceiveInterval = this.connectedHeaders.getHeartbeat()[1];
long serverSendInterval = connectedHeaders.getHeartbeat()[0];
long serverReceiveInterval = connectedHeaders.getHeartbeat()[1];
if ((clientSendInterval > 0) && (serverReceiveInterval > 0)) {
long interval = Math.max(clientSendInterval, serverReceiveInterval);
stompConnection.connection.on().writeIdle(interval, new Runnable() {
this.tcpConnection.onWriteInactivity(new Runnable() {
@Override
public void run() {
TcpConnection<Message<byte[]>, Message<byte[]>> tcpConn = stompConnection.connection;
if (tcpConn != null) {
tcpConn.send(MessageBuilder.withPayload(new byte[] {'\n'}).build(),
new Consumer<Boolean>() {
@Override
public void accept(Boolean result) {
if (!result) {
handleTcpClientFailure("Failed to send heartbeat to the broker", null);
}
TcpConnection<byte[]> conn = tcpConnection;
if (conn != null) {
conn.send(HEARTBEAT_MESSAGE).addCallback(
new ListenableFutureCallback<Boolean>() {
public void onFailure(Throwable t) {
handleTcpConnectionFailure("Failed to send heartbeat", null);
}
public void onSuccess(Boolean result) {}
});
}
}
});
}, interval);
}
if (clientReceiveInterval > 0 && serverSendInterval > 0) {
final long interval = Math.max(clientReceiveInterval, serverSendInterval) * HEARTBEAT_MULTIPLIER;
stompConnection.connection.on().readIdle(interval, new Runnable() {
this.tcpConnection.onReadInactivity(new Runnable() {
@Override
public void run() {
String message = "Broker hearbeat missed: connection idle for more than " + interval + "ms";
if (logger.isWarnEnabled()) {
logger.warn(message);
handleTcpConnectionFailure("No hearbeat from broker for more than " +
interval + "ms, closing connection", null);
}
disconnected(message);
}, interval);
}
}
@Override
public void afterConnectionClosed() {
sendStompErrorToClient("Connection to broker closed");
}
public ListenableFuture<Boolean> forward(final Message<?> message) {
if (!this.isStompConnected) {
if (logger.isWarnEnabled()) {
logger.warn("Connection to broker inactive or not ready, ignoring message=" + message);
}
return new ListenableFutureTask<Boolean>(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return Boolean.FALSE;
}
});
}
}
protected void connected() {
if (!this.isRemoteClientSession) {
initHeartbeats();
if (logger.isTraceEnabled()) {
logger.trace("Forwarding message to broker: " + message);
}
this.stompConnection.setReady();
}
protected void handleTcpClientFailure(String message, Throwable ex) {
if (logger.isErrorEnabled()) {
logger.error(message + ", sessionId=" + this.sessionId, ex);
}
disconnected(message);
}
protected void disconnected(String errorMessage) {
this.stompConnection.setDisconnected();
sendError(errorMessage);
}
private void sendError(String errorText) {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR);
headers.setSessionId(this.sessionId);
headers.setMessage(errorText);
Message<?> errorMessage = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
sendMessageToClient(errorMessage);
}
protected void sendMessageToClient(Message<?> message) {
if (this.isRemoteClientSession) {
messageChannel.send(message);
}
else {
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
if (StompCommand.ERROR.equals(headers.getCommand())) {
if (logger.isErrorEnabled()) {
logger.error("STOMP ERROR on sessionId=" + this.sessionId + ": " + message);
}
}
// ignore otherwise
}
}
private void forward(Message<?> message) {
TcpConnection<Message<byte[]>, Message<byte[]>> tcpConnection = this.stompConnection.getReadyConnection();
if (tcpConnection == null) {
logger.warn("Connection to STOMP broker is not active");
handleForwardFailure(message);
}
else if (!forwardInternal(message, tcpConnection)) {
handleForwardFailure(message);
}
}
protected void handleForwardFailure(Message<?> message) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to forward message to the broker. message=" + message);
}
}
private boolean forwardInternal(
Message<?> message, TcpConnection<Message<byte[]>, Message<byte[]>> tcpConnection) {
Assert.isInstanceOf(byte[].class, message.getPayload(), "Message's payload must be a byte[]");
@SuppressWarnings("unchecked")
Message<byte[]> byteMessage = (Message<byte[]>) message;
if (logger.isTraceEnabled()) {
logger.trace("Forwarding to STOMP broker, message: " + message);
}
ListenableFuture<Boolean> future = this.tcpConnection.send((Message<byte[]>) message);
StompCommand command = StompHeaderAccessor.wrap(message).getCommand();
final Deferred<Boolean, Promise<Boolean>> deferred = new DeferredPromiseSpec<Boolean>().get();
tcpConnection.send(byteMessage, new Consumer<Boolean>() {
future.addCallback(new ListenableFutureCallback<Boolean>() {
@Override
public void accept(Boolean success) {
deferred.accept(success);
public void onSuccess(Boolean result) {
StompCommand command = StompHeaderAccessor.wrap(message).getCommand();
if (command == StompCommand.DISCONNECT) {
resetTcpConnection();
}
}
@Override
public void onFailure(Throwable t) {
handleTcpConnectionFailure("Failed to send message " + message, t);
}
});
Boolean success = null;
return future;
}
public void resetTcpConnection() {
TcpConnection<byte[]> conn = this.tcpConnection;
this.isStompConnected = false;
this.tcpConnection = null;
if (conn != null) {
try {
success = deferred.compose().await();
if (success == null) {
handleTcpClientFailure("Timed out waiting for message to be forwarded to the broker", null);
this.tcpConnection.close();
}
else if (!success) {
handleTcpClientFailure("Failed to forward message to the broker", null);
catch (Throwable t) {
// ignore
}
else {
if (command == StompCommand.DISCONNECT) {
this.stompConnection.setDisconnected();
}
}
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
handleTcpClientFailure("Interrupted while forwarding message to the broker", ex);
}
return (success != null) ? success : false;
}
}
private static class StompConnection {
private volatile TcpConnection<Message<byte[]>, Message<byte[]>> connection;
private AtomicReference<TcpConnection<Message<byte[]>, Message<byte[]>>> readyConnection =
new AtomicReference<TcpConnection<Message<byte[]>, Message<byte[]>>>();
public void setTcpConnection(TcpConnection<Message<byte[]>, Message<byte[]>> connection) {
Assert.notNull(connection, "connection must not be null");
this.connection = connection;
}
/**
* Return the underlying {@link TcpConnection} but only after the CONNECTED STOMP
* frame is received.
*/
public TcpConnection<Message<byte[]>, Message<byte[]>> getReadyConnection() {
return this.readyConnection.get();
}
private class SystemStompConnectionHandler extends StompConnectionHandler {
public void setReady() {
this.readyConnection.set(this.connection);
}
public static final String SESSION_ID = "stompRelaySystemSessionId";
public boolean isReady() {
return (this.readyConnection.get() != null);
}
public void setDisconnected() {
this.readyConnection.set(null);
TcpConnection<Message<byte[]>, Message<byte[]>> localConnection = this.connection;
if (localConnection != null) {
localConnection.close();
this.connection = null;
}
public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) {
super(SESSION_ID, connectHeaders, false);
}
@Override
public String toString() {
return "StompConnection [ready=" + isReady() + "]";
}
}
private class SystemStompRelaySession extends StompRelaySession {
public static final String ID = "stompRelaySystemSessionId";
public SystemStompRelaySession() {
super(ID, false, 5000);
}
@Override
protected void connected() {
super.connected();
protected void afterStompConnected(StompHeaderAccessor connectedHeaders) {
super.afterStompConnected(connectedHeaders);
publishBrokerAvailableEvent();
}
@Override
protected void disconnected(String errorMessage) {
super.disconnected(errorMessage);
protected void handleTcpConnectionFailure(String errorMessage, Throwable t) {
super.handleTcpConnectionFailure(errorMessage, t);
publishBrokerUnavailableEvent();
}
@Override
protected void connectionClosed() {
public void afterConnectionClosed() {
super.afterConnectionClosed();
publishBrokerUnavailableEvent();
}
@Override
protected void handleForwardFailure(Message<?> message) {
super.handleForwardFailure(message);
public ListenableFuture<Boolean> forward(Message<?> message) {
try {
ListenableFuture<Boolean> future = super.forward(message);
if (!future.get()) {
throw new MessageDeliveryException(message);
}
return future;
}
catch (Throwable t) {
throw new MessageDeliveryException(message, t);
}
}
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.support.tcp;
/**
* A simple strategy for making reconnect attempts at a fixed interval.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class FixedIntervalReconnectStrategy implements ReconnectStrategy {
private final long interval;
/**
* @param interval the frequency, in millisecond, at which to try to reconnect
*/
public FixedIntervalReconnectStrategy(long interval) {
this.interval = interval;
}
@Override
public Long getTimeToNextAttempt(int attemptCount) {
return this.interval;
}
}

View File

@ -0,0 +1,110 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.support.tcp;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.ListenableFutureCallbackRegistry;
import reactor.core.composable.Promise;
import reactor.function.Consumer;
/**
* Adapts a reactor {@link Promise} to {@link ListenableFuture} optionally converting
* the result Object type {@code <S>} to the expected target type {@code <T>}.
*
* @param <S> the type of object expected from the {@link Promise}
* @param <T> the type of object expected from the {@link ListenableFuture}
*
* @author Rossen Stoyanchev
* @since 4.0
*/
abstract class PromiseToListenableFutureAdapter<S, T> implements ListenableFuture<T> {
private final Promise<S> promise;
private final ListenableFutureCallbackRegistry<T> registry = new ListenableFutureCallbackRegistry<T>();
protected PromiseToListenableFutureAdapter(Promise<S> promise) {
Assert.notNull(promise, "promise is required");
this.promise = promise;
this.promise.onSuccess(new Consumer<S>() {
@Override
public void accept(S result) {
try {
registry.success(adapt(result));
}
catch (Throwable t) {
registry.failure(t);
}
}
});
this.promise.onError(new Consumer<Throwable>() {
@Override
public void accept(Throwable t) {
registry.failure(t);
}
});
}
protected abstract T adapt(S adapteeResult);
@Override
public T get() {
S result = this.promise.get();
return adapt(result);
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
S result = this.promise.await(timeout, unit);
if (result == null) {
throw new TimeoutException();
}
return adapt(result);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return this.promise.isComplete();
}
@Override
public void addCallback(ListenableFutureCallback<? super T> callback) {
this.registry.addCallback(callback);
}
}

View File

@ -0,0 +1,127 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.support.tcp;
import java.net.InetSocketAddress;
import org.springframework.messaging.Message;
import org.springframework.util.concurrent.ListenableFuture;
import reactor.core.Environment;
import reactor.core.composable.Composable;
import reactor.core.composable.Promise;
import reactor.function.Consumer;
import reactor.io.Buffer;
import reactor.tcp.Reconnect;
import reactor.tcp.TcpClient;
import reactor.tcp.TcpConnection;
import reactor.tcp.encoding.Codec;
import reactor.tcp.netty.NettyTcpClient;
import reactor.tcp.spec.TcpClientSpec;
import reactor.tuple.Tuple;
import reactor.tuple.Tuple2;
/**
* A Reactor/Netty implementation of {@link TcpOperations}.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
private Environment environment;
private TcpClient<Message<P>, Message<P>> tcpClient;
public ReactorNettyTcpClient(String host, int port, Codec<Buffer, Message<P>, Message<P>> codec) {
this.environment = new Environment();
this.tcpClient = new TcpClientSpec<Message<P>, Message<P>>(NettyTcpClient.class)
.env(this.environment)
.codec(codec)
.connect(host, port)
.get();
}
@Override
public void connect(TcpConnectionHandler<P> connectionHandler) {
this.connect(connectionHandler, null);
}
@Override
public void connect(final TcpConnectionHandler<P> connectionHandler,
final ReconnectStrategy reconnectStrategy) {
Composable<TcpConnection<Message<P>, Message<P>>> composable;
if (reconnectStrategy != null) {
composable = this.tcpClient.open(new Reconnect() {
@Override
public Tuple2<InetSocketAddress, Long> reconnect(InetSocketAddress address, int attempt) {
return Tuple.of(address, reconnectStrategy.getTimeToNextAttempt(attempt));
}
});
}
else {
composable = this.tcpClient.open();
}
composable.when(Throwable.class, new Consumer<Throwable>() {
@Override
public void accept(Throwable ex) {
connectionHandler.afterConnectFailure(ex);
}
});
composable.consume(new Consumer<TcpConnection<Message<P>, Message<P>>>() {
@Override
public void accept(TcpConnection<Message<P>, Message<P>> connection) {
connection.on().close(new Runnable() {
@Override
public void run() {
connectionHandler.afterConnectionClosed();
}
});
connection.in().consume(new Consumer<Message<P>>() {
@Override
public void accept(Message<P> message) {
connectionHandler.handleMessage(message);
}
});
connectionHandler.afterConnected(new ReactorTcpConnection<P>(connection));
}
});
}
@Override
public ListenableFuture<Void> shutdown() {
try {
Promise<Void> promise = this.tcpClient.close();
return new PromiseToListenableFutureAdapter<Void, Void>(promise) {
@Override
protected Void adapt(Void result) {
return result;
}
};
}
finally {
this.environment.shutdown();
}
}
}

View File

@ -0,0 +1,134 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.support.tcp;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.springframework.messaging.Message;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.ListenableFutureCallbackRegistry;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.spec.DeferredPromiseSpec;
import reactor.function.Consumer;
public class ReactorTcpConnection<P> implements TcpConnection<P> {
private final reactor.tcp.TcpConnection<Message<P>, Message<P>> reactorTcpConnection;
public ReactorTcpConnection(reactor.tcp.TcpConnection<Message<P>, Message<P>> connection) {
this.reactorTcpConnection = connection;
}
@Override
public ListenableFuture<Boolean> send(Message<P> message) {
ConsumerListenableFuture future = new ConsumerListenableFuture();
this.reactorTcpConnection.send(message, future);
return future;
}
@Override
public void onReadInactivity(Runnable runnable, long inactivityDuration) {
this.reactorTcpConnection.on().readIdle(inactivityDuration, runnable);
}
@Override
public void onWriteInactivity(Runnable runnable, long inactivityDuration) {
this.reactorTcpConnection.on().writeIdle(inactivityDuration, runnable);
}
@Override
public void close() {
this.reactorTcpConnection.close();
}
// Use this temporarily until reactor provides a send method returning a Promise
private static class ConsumerListenableFuture implements ListenableFuture<Boolean>, Consumer<Boolean> {
final Deferred<Boolean, Promise<Boolean>> deferred = new DeferredPromiseSpec<Boolean>().get();
private final ListenableFutureCallbackRegistry<Boolean> registry =
new ListenableFutureCallbackRegistry<Boolean>();
@Override
public void accept(Boolean result) {
this.deferred.accept(result);
if (result == null) {
this.registry.failure(new TimeoutException());
}
else if (result) {
this.registry.success(result);
}
else {
this.registry.failure(new Exception("Failed send message"));
}
}
@Override
public Boolean get() {
try {
return this.deferred.compose().await();
}
catch (InterruptedException e) {
return Boolean.FALSE;
}
}
@Override
public Boolean get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
Boolean result = this.deferred.compose().await(timeout, unit);
if (result == null) {
throw new TimeoutException();
}
return result;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return this.deferred.compose().isComplete();
}
@Override
public void addCallback(ListenableFutureCallback<? super Boolean> callback) {
this.registry.addCallback(callback);
}
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.support.tcp;
/**
* A contract to determine the frequency of reconnect attempts after connection failure.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface ReconnectStrategy {
/**
* Return the time to the next attempt to reconnect.
*
* @param attemptCount how many reconnect attempts have been made already
* @return the amount of time in milliseconds or {@code null} to stop
*/
Long getTimeToNextAttempt(int attemptCount);
}

View File

@ -0,0 +1,58 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.support.tcp;
import org.springframework.messaging.Message;
import org.springframework.util.concurrent.ListenableFuture;
/**
* A contract for sending messages and managing a TCP connection.
*
* @param <P> the type of payload for outbound {@link Message}s
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface TcpConnection<P> {
/**
* Send the given message.
* @param message the message
* @return whether the send succeeded or not
*/
ListenableFuture<Boolean> send(Message<P> message);
/**
* Register a task to invoke after a period of of read inactivity.
* @param runnable the task to invoke
* @param duration the amount of inactive time in milliseconds
*/
void onReadInactivity(Runnable runnable, long duration);
/**
* Register a task to invoke after a period of of write inactivity.
* @param runnable the task to invoke
* @param duration the amount of inactive time in milliseconds
*/
void onWriteInactivity(Runnable runnable, long duration);
/**
* Close the connection.
*/
void close();
}

View File

@ -0,0 +1,55 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.support.tcp;
import org.springframework.messaging.Message;
/**
* A contract for managing lifecycle events for a TCP connection including
* the handling of incoming messages.
*
* @param <P> the type of payload for in and outbound messages
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface TcpConnectionHandler<P> {
/**
* Invoked after a connection is successfully established.
* @param connection the connection
*/
void afterConnected(TcpConnection<P> connection);
/**
* Invoked after a connection failure.
* @param ex the exception
*/
void afterConnectFailure(Throwable ex);
/**
* Handle a message received from the remote host.
* @param message the message
*/
void handleMessage(Message<P> message);
/**
* Invoked after the connection is closed.
*/
void afterConnectionClosed();
}

View File

@ -0,0 +1,51 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.support.tcp;
import org.springframework.util.concurrent.ListenableFuture;
/**
* A contract for establishing TCP connections.
*
* @param <P> the type of payload for in and outbound messages
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface TcpOperations<P> {
/**
* Open a new connection.
*
* @param connectionHandler a handler to manage the connection
*/
void connect(TcpConnectionHandler<P> connectionHandler);
/**
* Open a new connection and a strategy for reconnecting if the connection fails.
*
* @param connectionHandler a handler to manage the connection
* @param reconnectStrategy a strategy for reconnecting
*/
void connect(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy reconnectStrategy);
/**
* Shut down and close any open connections.
*/
ListenableFuture<Void> shutdown();
}

View File

@ -0,0 +1,9 @@
/**
* Contains abstractions and implementation classes for establishing TCP connections via
* {@link org.springframework.messaging.support.tcp.TcpOperations TcpOperations},
* handling messages via
* {@link org.springframework.messaging.support.tcp.TcpConnectionHandler TcpConnectionHandler},
* as well as sending messages via
* {@link org.springframework.messaging.support.tcp.TcpConnection TcpConnection}.
*/
package org.springframework.messaging.support.tcp;

View File

@ -231,14 +231,13 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@Test
public void disconnectClosesRelaySessionCleanly() throws Exception {
String sess1 = "sess1";
MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build();
this.responseHandler.expect(conn1);
this.relay.handleMessage(conn1.message);
MessageExchange connect = MessageExchangeBuilder.connect("sess1").build();
this.responseHandler.expect(connect);
this.relay.handleMessage(connect.message);
this.responseHandler.awaitAndAssert();
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
headers.setSessionId(sess1);
headers.setSessionId("sess1");
this.relay.handleMessage(MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build());
@ -330,9 +329,9 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
StringBuilder sb = new StringBuilder("\n");
synchronized (this.monitor) {
sb.append("INCOMPLETE:\n").append(this.expected).append("\n");
sb.append("COMPLETE:\n").append(this.actual).append("\n");
sb.append("UNMATCHED MESSAGES:\n").append(this.unexpected).append("\n");
sb.append("UNMATCHED EXPECTATIONS:\n").append(this.expected).append("\n");
sb.append("MATCHED EXPECTATIONS:\n").append(this.actual).append("\n");
sb.append("UNEXPECTED MESSAGES:\n").append(this.unexpected).append("\n");
}
return sb.toString();