Heartbeat in lieu of non-broker destination message

Closes gh-22822
This commit is contained in:
Rossen Stoyanchev 2020-10-19 13:06:17 +01:00
parent a4d0af802a
commit d18fbab7a8
7 changed files with 214 additions and 31 deletions

View File

@ -125,6 +125,12 @@ public abstract class AbstractBrokerMessageHandler
return this.brokerChannel;
}
/**
* Return destination prefixes prefixes to use to filter messages to forward
* to the broker. Messages that have a destination and where the destination
* doesn't match are ignored.
* <p>By default this is not set.
*/
public Collection<String> getDestinationPrefixes() {
return this.destinationPrefixes;
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -21,6 +21,7 @@ import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
/**
@ -63,6 +64,8 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
@Nullable
private String userRegistryBroadcast;
@Nullable
private TaskScheduler taskScheduler;
public StompBrokerRelayRegistration(SubscribableChannel clientInboundChannel,
MessageChannel clientOutboundChannel, String[] destinationPrefixes) {
@ -225,6 +228,26 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
return this;
}
/**
* Some STOMP clients (e.g. stomp-js) always send heartbeats at a fixed rate
* but others (Spring STOMP client) do so only when no other messages are
* sent. However messages with a non-broker {@link #getDestinationPrefixes()
* destination prefix} aren't forwarded and as a result the broker may deem
* the connection inactive.
*
* <p>When this {@link TaskScheduler} is set, it is used to reset a count of
* the number of messages sent from client to broker since the beginning of
* the current heartbeat period. This is then used to decide whether to send
* a heartbeat to the broker when ignoring a message with a non-broker
* destination prefix.
*
* @param taskScheduler the scheduler to use
* @since 5.3
*/
public void setTaskScheduler(@Nullable TaskScheduler taskScheduler) {
this.taskScheduler = taskScheduler;
}
@Nullable
protected String getUserRegistryBroadcast() {
return this.userRegistryBroadcast;
@ -259,6 +282,9 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
if (this.tcpClient != null) {
handler.setTcpClient(this.tcpClient);
}
if (this.taskScheduler != null) {
handler.setTaskScheduler(this.taskScheduler);
}
handler.setAutoStartup(this.autoStartup);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -45,6 +45,7 @@ import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.ReactorNettyCodec;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@ -99,13 +100,15 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private static final ListenableFutureTask<Void> EMPTY_TASK = new ListenableFutureTask<>(new VoidCallable());
private static final Message<byte[]> HEARTBEAT_MESSAGE;
private static final StompHeaderAccessor HEART_BEAT_ACCESSOR;
private static final Message<byte[]> HEARTBEAT_MESSAGE;
static {
EMPTY_TASK.run();
StompHeaderAccessor accessor = StompHeaderAccessor.createForHeartbeat();
HEARTBEAT_MESSAGE = MessageBuilder.createMessage(StompDecoder.HEARTBEAT_PAYLOAD, accessor.getMessageHeaders());
HEART_BEAT_ACCESSOR = StompHeaderAccessor.createForHeartbeat();
HEARTBEAT_MESSAGE = MessageBuilder.createMessage(
StompDecoder.HEARTBEAT_PAYLOAD, HEART_BEAT_ACCESSOR.getMessageHeaders());
}
@ -140,6 +143,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private final Map<String, StompConnectionHandler> connectionHandlers = new ConcurrentHashMap<>();
@Nullable
private TaskScheduler taskScheduler;
/**
* Create a StompBrokerRelayMessageHandler instance with the given message channels
@ -404,6 +410,22 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return this.connectionHandlers.size();
}
/**
* Configure the {@link TaskScheduler} to use to reset client-to-broker
* message count in the current heartbeat period. For more details, see
* {@link org.springframework.messaging.simp.config.StompBrokerRelayRegistration#setTaskScheduler(TaskScheduler)}.
* @param taskScheduler the scheduler to use
* @since 5.3
*/
public void setTaskScheduler(@Nullable TaskScheduler taskScheduler) {
this.taskScheduler = taskScheduler;
}
@Nullable
public TaskScheduler getTaskScheduler() {
return this.taskScheduler;
}
@Override
protected void startInternal() {
@ -434,6 +456,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
this.stats.incrementConnectCount();
this.tcpClient.connect(handler, new FixedIntervalReconnectStrategy(5000));
if (this.taskScheduler != null) {
this.taskScheduler.scheduleWithFixedDelay(new ClientSendMessageCountTask(), 5000);
}
}
private ReactorNettyTcpClient<byte[]> initTcpClient() {
@ -526,11 +552,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
stompAccessor.setSessionId(sessionId);
}
String destination = stompAccessor.getDestination();
if (command != null && command.requiresDestination() && !checkDestinationPrefix(destination)) {
return;
}
if (StompCommand.CONNECT.equals(command) || StompCommand.STOMP.equals(command)) {
if (logger.isDebugEnabled()) {
logger.debug(stompAccessor.getShortLogMessage(EMPTY_PAYLOAD));
@ -566,6 +587,16 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
return;
}
String destination = stompAccessor.getDestination();
if (command != null && command.requiresDestination() && !checkDestinationPrefix(destination)) {
// Not a broker destination but send a heartbeat to keep the connection
if (handler.shouldSendHeartbeatForIgnoredMessage()) {
handler.forward(HEARTBEAT_MESSAGE, HEART_BEAT_ACCESSOR);
}
return;
}
handler.forward(message, stompAccessor);
}
}
@ -584,10 +615,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private final String sessionId;
private final boolean isRemoteClientSession;
private final StompHeaderAccessor connectHeaders;
private final boolean isRemoteClientSession;
private final MessageChannel outboundChannel;
@Nullable
@ -595,6 +626,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private volatile boolean isStompConnected;
private long clientSendInterval;
@Nullable
private final AtomicInteger clientSendMessageCount;
private long clientSendMessageTimestamp;
protected StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders) {
this(sessionId, connectHeaders, true);
@ -607,8 +645,19 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
this.connectHeaders = connectHeaders;
this.isRemoteClientSession = isClientSession;
this.outboundChannel = getClientOutboundChannelForSession(sessionId);
if (isClientSession && taskScheduler != null) {
this.clientSendInterval = connectHeaders.getHeartbeat()[0];
}
if (this.clientSendInterval > 0) {
this.clientSendMessageCount = new AtomicInteger();
this.clientSendMessageTimestamp = System.currentTimeMillis();
}
else {
this.clientSendMessageCount = null;
}
}
public String getSessionId() {
return this.sessionId;
}
@ -719,14 +768,19 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
protected void afterStompConnected(StompHeaderAccessor connectedHeaders) {
this.isStompConnected = true;
stats.incrementConnectedCount();
initHeartbeats(connectedHeaders);
if (this.isRemoteClientSession) {
if (taskScheduler != null) {
long interval = connectedHeaders.getHeartbeat()[1];
this.clientSendInterval = Math.max(interval, this.clientSendInterval);
}
}
else {
// system session
initHeartbeats(connectedHeaders);
}
}
private void initHeartbeats(StompHeaderAccessor connectedHeaders) {
if (this.isRemoteClientSession) {
return;
}
TcpConnection<byte[]> con = this.tcpConnection;
Assert.state(con != null, "No TcpConnection available");
@ -750,6 +804,27 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
/**
* Whether to forward a heartbeat message in lieu of a message with a non-broker
* destination. This is done if client-side heartbeats are expected and if there
* haven't been any other messages in the current heartbeat period.
* @since 5.3
*/
protected boolean shouldSendHeartbeatForIgnoredMessage() {
return (this.clientSendMessageCount != null && this.clientSendMessageCount.get() == 0);
}
/**
* Reset the clientSendMessageCount if the current heartbeat period has expired.
* @since 5.3
*/
void updateClientSendMessageCount(long now) {
if (this.clientSendMessageCount != null && this.clientSendInterval > (now - clientSendMessageTimestamp)) {
this.clientSendMessageCount.set(0);
this.clientSendMessageTimestamp = now;
}
}
@Override
public void handleFailure(Throwable ex) {
if (this.tcpConnection != null) {
@ -824,6 +899,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
if (this.clientSendMessageCount != null) {
this.clientSendMessageCount.incrementAndGet();
}
final Message<?> messageToSend = (accessor.isMutable() && accessor.isModified()) ?
MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders()) : message;
@ -1005,9 +1084,27 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
throw new MessageDeliveryException(message, ex);
}
}
@Override
protected boolean shouldSendHeartbeatForIgnoredMessage() {
return false;
}
}
private class ClientSendMessageCountTask implements Runnable {
@Override
public void run() {
long now = System.currentTimeMillis();
for (StompConnectionHandler handler : connectionHandlers.values()) {
handler.updateClientSendMessageCount(now);
}
}
}
private static class VoidCallable implements Callable<Void> {
@Override

View File

@ -98,6 +98,14 @@ public abstract class StompClientSupport {
* <p>By default this is set to "10000,10000" but subclasses may override
* that default and for example set it to "0,0" if they require a
* TaskScheduler to be configured first.
* <p><strong>Note:</strong> that a heartbeat is sent only in case of
* inactivity, i.e. when no other messages are sent. This can present a
* challenge when using an external broker since messages with a non-broker
* destination represent activity but aren't actually forwarded to the broker.
* In that case you can configure a `TaskScheduler` through the
* {@link org.springframework.messaging.simp.config.StompBrokerRelayRegistration}
* which ensures a heartbeat is forwarded to the broker also when only
* messages with a non-broker destination are sent.
* @param heartbeat the value for the CONNECT "heart-beat" header
* @see <a href="https://stomp.github.io/stomp-specification-1.2.html#Heart-beating">
* https://stomp.github.io/stomp-specification-1.2.html#Heart-beating</a>

View File

@ -274,8 +274,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
}
public void expectMessages(MessageExchange... messageExchanges) throws InterruptedException {
List<MessageExchange> expectedMessages =
new ArrayList<>(Arrays.<MessageExchange>asList(messageExchanges));
List<MessageExchange> expectedMessages = new ArrayList<>(Arrays.asList(messageExchanges));
while (expectedMessages.size() > 0) {
Message<?> message = this.queue.poll(10000, TimeUnit.MILLISECONDS);
assertThat(message).as("Timed out waiting for messages, expected [" + expectedMessages + "]").isNotNull();
@ -451,7 +450,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@Override
public final boolean match(Message<?> message) {
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
if (!this.command.equals(headers.getCommand()) || (this.sessionId != headers.getSessionId())) {
if (!this.command.equals(headers.getCommand()) || !this.sessionId.equals(headers.getSessionId())) {
return false;
}
return matchInternal(headers, message.getPayload());

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,7 +17,6 @@
package org.springframework.messaging.simp.stomp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
@ -39,10 +38,12 @@ import org.springframework.messaging.tcp.ReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureTask;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@ -59,6 +60,8 @@ class StompBrokerRelayMessageHandlerTests {
private StubTcpOperations tcpClient;
ArgumentCaptor<Runnable> messageCountTaskCaptor = ArgumentCaptor.forClass(Runnable.class);
@BeforeEach
void setup() {
@ -66,7 +69,7 @@ class StompBrokerRelayMessageHandlerTests {
this.outboundChannel = new StubMessageChannel();
this.brokerRelay = new StompBrokerRelayMessageHandler(new StubMessageChannel(),
this.outboundChannel, new StubMessageChannel(), Arrays.asList("/topic")) {
this.outboundChannel, new StubMessageChannel(), Collections.singletonList("/topic")) {
@Override
protected void startInternal() {
@ -77,6 +80,8 @@ class StompBrokerRelayMessageHandlerTests {
this.tcpClient = new StubTcpOperations();
this.brokerRelay.setTcpClient(this.tcpClient);
this.brokerRelay.setTaskScheduler(mock(TaskScheduler.class));
}
@ -127,18 +132,51 @@ class StompBrokerRelayMessageHandlerTests {
@Test
void destinationExcluded() {
this.brokerRelay.start();
this.brokerRelay.handleMessage(connectMessage("sess1", "joe"));
SimpMessageHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECTED);
accessor.setLeaveMutable(true);
this.tcpClient.handleMessage(MessageBuilder.createMessage(new byte[0], accessor.getMessageHeaders()));
accessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
accessor.setSessionId("sess1");
accessor.setDestination("/user/daisy/foo");
this.brokerRelay.handleMessage(MessageBuilder.createMessage(new byte[0], accessor.getMessageHeaders()));
assertThat(this.tcpClient.getSentMessages().size()).isEqualTo(2);
StompHeaderAccessor headers = this.tcpClient.getSentHeaders(0);
assertThat(headers.getCommand()).isEqualTo(StompCommand.CONNECT);
assertThat(headers.getSessionId()).isEqualTo(StompBrokerRelayMessageHandler.SYSTEM_SESSION_ID);
headers = this.tcpClient.getSentHeaders(1);
assertThat(headers.getCommand()).isEqualTo(StompCommand.CONNECT);
assertThat(headers.getSessionId()).isEqualTo("sess1");
}
@Test // gh-22822
void destinationExcludedWithHeartbeat() {
Message<byte[]> connectMessage = connectMessage("sess1", "joe");
MessageHeaderAccessor.getAccessor(connectMessage, StompHeaderAccessor.class).setHeartbeat(10000, 10000);
this.brokerRelay.start();
this.brokerRelay.handleMessage(connectMessage);
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
headers.setSessionId("sess1");
headers.setDestination("/user/daisy/foo");
this.brokerRelay.handleMessage(MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()));
SimpMessageHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECTED);
accessor.setLeaveMutable(true);
this.tcpClient.handleMessage(MessageBuilder.createMessage(new byte[0], accessor.getMessageHeaders()));
assertThat(this.tcpClient.getSentMessages().size()).isEqualTo(1);
StompHeaderAccessor headers1 = this.tcpClient.getSentHeaders(0);
assertThat(headers1.getCommand()).isEqualTo(StompCommand.CONNECT);
assertThat(headers1.getSessionId()).isEqualTo(StompBrokerRelayMessageHandler.SYSTEM_SESSION_ID);
// Run the messageCountTask to clear the message count
verify(this.brokerRelay.getTaskScheduler()).scheduleWithFixedDelay(this.messageCountTaskCaptor.capture(), eq(5000L));
this.messageCountTaskCaptor.getValue().run();
accessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
accessor.setSessionId("sess1");
accessor.setDestination("/user/daisy/foo");
this.brokerRelay.handleMessage(MessageBuilder.createMessage(new byte[0], accessor.getMessageHeaders()));
assertThat(this.tcpClient.getSentMessages().size()).isEqualTo(3);
assertThat(this.tcpClient.getSentHeaders(2).getMessageType()).isEqualTo(SimpMessageType.HEARTBEAT);
}
@Test
@ -227,6 +265,7 @@ class StompBrokerRelayMessageHandlerTests {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setSessionId(sessionId);
headers.setUser(new TestPrincipal(user));
headers.setLeaveMutable(true);
return MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
}

View File

@ -2194,6 +2194,14 @@ and optionally customize the heartbeat intervals (10 seconds for write inactivit
which causes a heartbeat to be sent, and 10 seconds for read inactivity, which
closes the connection).
`WebSocketStompClient` sends a heartbeat only in case of inactivity, i.e. when no
other messages are sent. This can present a challenge when using an external broker
since messages with a non-broker destination represent activity but aren't actually
forwarded to the broker. In that case you can configure a `TaskScheduler`
when initializing the <<websocket-stomp-handle-broker-relay>> which ensures a
heartbeat is forwarded to the broker also when only messages with a non-broker
destination are sent.
NOTE: When you use `WebSocketStompClient` for performance tests to simulate thousands
of clients from the same machine, consider turning off heartbeats, since each
connection schedules its own heartbeat tasks and that is not optimized for