Make the broker relay heartbeat intervals configurable

Prior to this commit, the intervals at which the broker relay's system
session would send heartbeats to the STOMP broker and expect to
receive heartbeats from the STOMP broker were hard-coded at 10
seconds.

This commit makes the intervals configurable, with 10 seconds being
the default value.
This commit is contained in:
Andy Wilkinson 2013-10-02 15:13:27 +01:00 committed by Rossen Stoyanchev
parent ba11af7f11
commit a7f735b50a
3 changed files with 93 additions and 16 deletions

View File

@ -37,6 +37,10 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
private String applicationPasscode = "guest";
private Long systemHeartbeatSendInterval;
private Long systemHeartbeatReceiveInterval;
private boolean autoStartup = true;
@ -63,7 +67,7 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
}
/**
* Set the login for a "system" TCP connection used to send messages to the STOMP
* Set the login for the "system" relay session used to send messages to the STOMP
* broker without having a client session (e.g. REST/HTTP request handling method).
*/
public StompBrokerRelayRegistration setApplicationLogin(String login) {
@ -73,7 +77,7 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
}
/**
* Set the passcode for a "system" TCP connection used to send messages to the STOMP
* Set the passcode for the "system" relay session used to send messages to the STOMP
* broker without having a client session (e.g. REST/HTTP request handling method).
*/
public StompBrokerRelayRegistration setApplicationPasscode(String passcode) {
@ -82,6 +86,31 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
return this;
}
/**
* Set the interval, in milliseconds, at which the "system" relay session will,
* in the absence of any other data being sent, send a heartbeat to the STOMP broker.
* A value of zero will prevent heartbeats from being sent to the broker.
* <p>
* The default value is 10000.
*/
public StompBrokerRelayRegistration setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) {
this.systemHeartbeatSendInterval = systemHeartbeatSendInterval;
return this;
}
/**
* Set the maximum interval, in milliseconds, at which the "system" relay session
* expects, in the absence of any other data, to receive a heartbeat from the STOMP
* broker. A value of zero will configure the relay session to expect not to receive
* heartbeats from the broker.
* <p>
* The default value is 10000.
*/
public StompBrokerRelayRegistration setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) {
this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval;
return this;
}
/**
* Configure whether the {@link StompBrokerRelayMessageHandler} should start
* automatically when the Spring ApplicationContext is refreshed.
@ -101,6 +130,12 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
handler.setRelayPort(this.relayPort);
handler.setSystemLogin(this.applicationLogin);
handler.setSystemPasscode(this.applicationPasscode);
if (this.systemHeartbeatSendInterval != null) {
handler.setSystemHeartbeatSendInterval(this.systemHeartbeatSendInterval);
}
if (this.systemHeartbeatReceiveInterval != null) {
handler.setSystemHeartbeatReceiveInterval(this.systemHeartbeatReceiveInterval);
}
handler.setAutoStartup(this.autoStartup);
return handler;
}

View File

@ -67,6 +67,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private String systemPasscode = "guest";
private long systemHeartbeatSendInterval = 10000;
private long systemHeartbeatReceiveInterval = 10000;
private Environment environment;
private TcpClient<Message<byte[]>, Message<byte[]>> tcpClient;
@ -116,7 +120,46 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
/**
* Set the login for a "system" TCP connection used to send messages to the STOMP
* Set the interval, in milliseconds, at which the "system" relay session will,
* in the absence of any other data being sent, send a heartbeat to the STOMP broker.
* A value of zero will prevent heartbeats from being sent to the broker.
* <p>
* The default value is 10000.
*/
public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) {
this.systemHeartbeatSendInterval = systemHeartbeatSendInterval;
}
/**
* @return The interval, in milliseconds, at which the "system" relay session will
* send heartbeats to the STOMP broker.
*/
public long getSystemHeartbeatSendInterval() {
return this.systemHeartbeatSendInterval;
}
/**
* Set the maximum interval, in milliseconds, at which the "system" relay session
* expects, in the absence of any other data, to receive a heartbeat from the STOMP
* broker. A value of zero will configure the relay session to expect not to receive
* heartbeats from the broker.
* <p>
* The default value is 10000.
*/
public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) {
this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval;
}
/**
* @return The interval, in milliseconds, at which the "system" relay session expects
* to receive heartbeats from the STOMP broker.
*/
public long getSystemHeartbeatReceiveInterval() {
return this.systemHeartbeatReceiveInterval;
}
/**
* Set the login for the "system" relay session used to send messages to the STOMP
* broker without having a client session (e.g. REST/HTTP request handling method).
*/
public void setSystemLogin(String systemLogin) {
@ -125,14 +168,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
/**
* @return the login for a shared, "system" connection to the STOMP message broker.
* @return the login used by the "system" relay session to connect to the STOMP broker
*/
public String getSystemLogin() {
return this.systemLogin;
}
/**
* Set the passcode for a "system" TCP connection used to send messages to the STOMP
* Set the passcode for the "system" relay session used to send messages to the STOMP
* broker without having a client session (e.g. REST/HTTP request handling method).
*/
public void setSystemPasscode(String systemPasscode) {
@ -140,7 +183,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
/**
* @return the passcode for a shared, "system" connection to the STOMP message broker.
* @return the passcode used by the "system" relay session to connect to the STOMP broker
*/
public String getSystemPasscode() {
return this.systemPasscode;
@ -458,10 +501,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private static final long HEARTBEAT_RECEIVE_MULTIPLIER = 3;
private static final long HEARTBEAT_SEND_INTERVAL = 10000;
private static final long HEARTBEAT_RECEIVE_INTERVAL = 10000;
public static final String ID = "stompRelaySystemSessionId";
private final byte[] heartbeatPayload = new byte[] {'\n'};
@ -476,7 +515,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
headers.setAcceptVersion("1.1,1.2");
headers.setLogin(systemLogin);
headers.setPasscode(systemPasscode);
headers.setHeartbeat(HEARTBEAT_SEND_INTERVAL, HEARTBEAT_RECEIVE_INTERVAL);
headers.setHeartbeat(systemHeartbeatSendInterval, systemHeartbeatReceiveInterval);
Message<?> connectMessage = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build();
super.connect(connectMessage);
}
@ -500,8 +539,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
protected void connected(StompHeaderAccessor headers, final StompConnection stompConnection) {
long brokerReceiveInterval = headers.getHeartbeat()[1];
if ((HEARTBEAT_SEND_INTERVAL > 0) && (brokerReceiveInterval > 0)) {
long interval = Math.max(HEARTBEAT_SEND_INTERVAL, brokerReceiveInterval);
if ((systemHeartbeatSendInterval > 0) && (brokerReceiveInterval > 0)) {
long interval = Math.max(systemHeartbeatSendInterval, brokerReceiveInterval);
stompConnection.connection.on().writeIdle(interval, new Runnable() {
@Override
@ -523,8 +562,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
long brokerSendInterval = headers.getHeartbeat()[0];
if (HEARTBEAT_RECEIVE_INTERVAL > 0 && brokerSendInterval > 0) {
final long interval = Math.max(HEARTBEAT_RECEIVE_INTERVAL, brokerSendInterval)
if (systemHeartbeatReceiveInterval > 0 && brokerSendInterval > 0) {
final long interval = Math.max(systemHeartbeatReceiveInterval, brokerSendInterval)
* HEARTBEAT_RECEIVE_MULTIPLIER;
stompConnection.connection.on().readIdle(interval, new Runnable() {

View File

@ -90,9 +90,12 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
}
private void createAndStartRelay() throws InterruptedException {
this.relay = new StompBrokerRelayMessageHandler(this.responseChannel, Arrays.asList("/queue/", "/topic/"));
this.relay = new StompBrokerRelayMessageHandler(
this.responseChannel, Arrays.asList("/queue/", "/topic/"));
this.relay.setRelayPort(port);
this.relay.setApplicationEventPublisher(this.eventPublisher);
this.relay.setSystemHeartbeatReceiveInterval(0);
this.relay.setSystemHeartbeatSendInterval(0);
this.eventPublisher.expect(true);
this.relay.start();