Add STOMP broker relay to configure "host" header

Issue: SPR-10955
This commit is contained in:
Rossen Stoyanchev 2013-10-14 16:35:16 -04:00
parent cf7889e226
commit 7c3749769a
2 changed files with 60 additions and 7 deletions

View File

@ -47,9 +47,24 @@ import reactor.tuple.Tuple2;
/** /**
* A {@link MessageHandler} that handles messages by forwarding them to a STOMP broker and * A {@link MessageHandler} that handles messages by forwarding them to a STOMP broker.
* reversely sends any returned messages from the broker to the provided * For each new {@link SimpMessageType#CONNECT CONNECT} message, an independent TCP
* {@link MessageChannel}. * connection to the broker is opened and used exclusively for all messages from the
* client that originated the CONNECT message. Messages from the same client are
* identified through the session id message header. Reversely, when the STOMP broker
* sends messages back on the TCP connection, those messages are enriched with the session
* id of the client and sent back downstream through the {@link MessageChannel} provided
* to the constructor.
* <p>
* This class also automatically opens a default "system" TCP connection to the message
* broker that is used for sending messages that originate from the server application (as
* opposed to from a client). Such messages are recognized because they are not associated
* with any client and therefore do not have a session id header. The "system" connection
* is effectively shared and cannot be used to receive messages. Several properties are
* provided to configure the "system" session including the the
* {@link #setSystemLogin(String) login} {@link #setSystemPasscode(String) passcode},
* heartbeat {@link #setSystemHeartbeatSendInterval(long) send} and
* {@link #setSystemHeartbeatReceiveInterval(long) receive} intervals.
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @author Andy Wilkinson * @author Andy Wilkinson
@ -71,6 +86,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private long systemHeartbeatReceiveInterval = 10000; private long systemHeartbeatReceiveInterval = 10000;
private String virtualHost;
private Environment environment; private Environment environment;
private TcpClient<Message<byte[]>, Message<byte[]>> tcpClient; private TcpClient<Message<byte[]>, Message<byte[]>> tcpClient;
@ -120,11 +137,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
} }
/** /**
* Set the interval, in milliseconds, at which the "system" relay session will, * Set the interval, in milliseconds, at which the "system" relay session will, in the
* in the absence of any other data being sent, send a heartbeat to the STOMP broker. * absence of any other data being sent, send a heartbeat to the STOMP broker. A value
* A value of zero will prevent heartbeats from being sent to the broker. * of zero will prevent heartbeats from being sent to the broker.
* <p> * <p>
* The default value is 10000. * The default value is 10000.
* <p>
* See class-level documentation for more information on the "system" session.
*/ */
public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) { public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) {
this.systemHeartbeatSendInterval = systemHeartbeatSendInterval; this.systemHeartbeatSendInterval = systemHeartbeatSendInterval;
@ -145,6 +164,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
* heartbeats from the broker. * heartbeats from the broker.
* <p> * <p>
* The default value is 10000. * The default value is 10000.
* <p>
* See class-level documentation for more information on the "system" session.
*/ */
public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) { public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) {
this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval; this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval;
@ -161,6 +182,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
/** /**
* Set the login for the "system" relay session used to send messages to the STOMP * Set the login for the "system" relay session used to send messages to the STOMP
* broker without having a client session (e.g. REST/HTTP request handling method). * broker without having a client session (e.g. REST/HTTP request handling method).
* <p>
* See class-level documentation for more information on the "system" session.
*/ */
public void setSystemLogin(String systemLogin) { public void setSystemLogin(String systemLogin) {
Assert.hasText(systemLogin, "systemLogin must not be empty"); Assert.hasText(systemLogin, "systemLogin must not be empty");
@ -177,6 +200,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
/** /**
* Set the passcode for the "system" relay session used to send messages to the STOMP * Set the passcode for the "system" relay session used to send messages to the STOMP
* broker without having a client session (e.g. REST/HTTP request handling method). * broker without having a client session (e.g. REST/HTTP request handling method).
* <p>
* See class-level documentation for more information on the "system" session.
*/ */
public void setSystemPasscode(String systemPasscode) { public void setSystemPasscode(String systemPasscode) {
this.systemPasscode = systemPasscode; this.systemPasscode = systemPasscode;
@ -189,6 +214,26 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return this.systemPasscode; return this.systemPasscode;
} }
/**
* Set the value of the "host" header to use in STOMP CONNECT frames. When this
* property is configured, a "host" header will be added to every STOMP frame sent to
* the STOMP broker. This may be useful for example in a cloud environment where the
* actual host to which the TCP connection is established is different from the host
* providing the cloud-based STOMP service.
* <p>
* By default this property is not set.
*/
public void setVirtualHost(String virtualHost) {
this.virtualHost = virtualHost;
}
/**
* @return the configured virtual host value.
*/
public String getVirtualHost() {
return this.virtualHost;
}
@Override @Override
protected void startInternal() { protected void startInternal() {
@ -252,7 +297,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
} }
if (SimpMessageType.CONNECT.equals(messageType)) { if (SimpMessageType.CONNECT.equals(messageType)) {
message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build(); if (getVirtualHost() != null) {
headers.setHost(getVirtualHost());
message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build();
}
StompRelaySession session = new StompRelaySession(sessionId); StompRelaySession session = new StompRelaySession(sessionId);
this.relaySessions.put(sessionId, session); this.relaySessions.put(sessionId, session);
session.connect(message); session.connect(message);
@ -516,6 +564,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
headers.setLogin(systemLogin); headers.setLogin(systemLogin);
headers.setPasscode(systemPasscode); headers.setPasscode(systemPasscode);
headers.setHeartbeat(systemHeartbeatSendInterval, systemHeartbeatReceiveInterval); headers.setHeartbeat(systemHeartbeatSendInterval, systemHeartbeatReceiveInterval);
if (getVirtualHost() != null) {
headers.setHost(getVirtualHost());
}
Message<?> connectMessage = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build(); Message<?> connectMessage = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
super.connect(connectMessage); super.connect(connectMessage);
} }

View File

@ -113,6 +113,8 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
} }
} }
// test "host" header (virtualHost property) when TCP client is behind interface and configurable
@Test @Test
public void publishSubscribe() throws Exception { public void publishSubscribe() throws Exception {