Option to preserve publish order

Issue: SPR-13989
This commit is contained in:
Rossen Stoyanchev 2018-07-23 23:22:16 -04:00
parent 430250c80f
commit 7500b144ae
17 changed files with 496 additions and 95 deletions

View File

@ -58,6 +58,8 @@ public abstract class AbstractBrokerMessageHandler
private final Collection<String> destinationPrefixes;
private boolean preservePublishOrder = false;
@Nullable
private ApplicationEventPublisher eventPublisher;
@ -132,6 +134,31 @@ public abstract class AbstractBrokerMessageHandler
this.eventPublisher = publisher;
}
/**
* Whether the client must receive messages in the order of publication.
* <p>By default messages sent to the {@code "clientOutboundChannel"} may
* not be processed in the same order because the channel is backed by a
* ThreadPoolExecutor that in turn does not guarantee processing in order.
* <p>When this flag is set to {@code true} messages within the same session
* will be sent to the {@code "clientOutboundChannel"} one at a time in
* order to preserve the order of publication. Enable this only if needed
* since there is some performance overhead to keep messages in order.
* @param preservePublishOrder whether to publish in order
* @since 5.1
*/
public void setPreservePublishOrder(boolean preservePublishOrder) {
OrderedMessageSender.configureOutboundChannel(this.clientOutboundChannel, preservePublishOrder);
this.preservePublishOrder = preservePublishOrder;
}
/**
* Whether to ensure messages are received in the order of publication.
* @since 5.1
*/
public boolean isPreservePublishOrder() {
return this.preservePublishOrder;
}
@Nullable
public ApplicationEventPublisher getApplicationEventPublisher() {
return this.eventPublisher;
@ -269,6 +296,16 @@ public abstract class AbstractBrokerMessageHandler
}
}
/**
* Get the MessageChannel to use for sending messages to clients, possibly
* a per-session wrapper when {@code preservePublishOrder=true}.
* @since 5.1
*/
protected MessageChannel getClientOutboundChannelForSession(String sessionId) {
return this.preservePublishOrder ?
new OrderedMessageSender(getClientOutboundChannel(), logger) : getClientOutboundChannel();
}
/**
* Detect unsent DISCONNECT messages and process them anyway.

View File

@ -0,0 +1,152 @@
/*
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.broker;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.support.ExecutorChannelInterceptor;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.util.Assert;
/**
* Submit messages to an ExecutorSubscribableChannel, one at a time. The channel
* must have been configured with {@link #configureOutboundChannel}.
*
* @author Rossen Stoyanchev
* @since 5.1
*/
class OrderedMessageSender implements MessageChannel {
static final String COMPLETION_TASK_HEADER = "simpSendCompletionTask";
private final MessageChannel channel;
private final Log logger;
private final Queue<Message<?>> messages = new ConcurrentLinkedQueue<>();
private final AtomicBoolean sendInProgress = new AtomicBoolean(false);
public OrderedMessageSender(MessageChannel channel, Log logger) {
this.channel = channel;
this.logger = logger;
}
public boolean send(Message<?> message) {
return send(message, -1);
}
@Override
public boolean send(Message<?> message, long timeout) {
this.messages.add(message);
trySend();
return true;
}
private void trySend() {
// Take sendInProgress flag only if queue is not empty
if (this.messages.isEmpty()) {
return;
}
if (this.sendInProgress.compareAndSet(false, true)) {
sendNextMessage();
}
}
private void sendNextMessage() {
for (;;) {
Message<?> message = this.messages.poll();
if (message != null) {
try {
addCompletionCallback(message);
if (this.channel.send(message)) {
return;
}
}
catch (Throwable ex) {
if (logger.isErrorEnabled()) {
logger.error("Failed to send " + message, ex);
}
}
}
else {
// We ran out of messages..
this.sendInProgress.set(false);
trySend();
break;
}
}
}
private void addCompletionCallback(Message<?> msg) {
SimpMessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(msg, SimpMessageHeaderAccessor.class);
Assert.isTrue(accessor != null && accessor.isMutable(), "Expected mutable SimpMessageHeaderAccessor");
accessor.setHeader(COMPLETION_TASK_HEADER, (Runnable) this::sendNextMessage);
}
/**
* Install or remove an {@link ExecutorChannelInterceptor} that invokes a
* completion task once the message is handled.
* @param channel the channel to configure
* @param preservePublishOrder whether preserve order is on or off based on
* which an interceptor is either added or removed.
*/
static void configureOutboundChannel(MessageChannel channel, boolean preservePublishOrder) {
if (preservePublishOrder) {
Assert.isInstanceOf(ExecutorSubscribableChannel.class, channel,
"An ExecutorSubscribableChannel is required for `preservePublishOrder`");
ExecutorSubscribableChannel execChannel = (ExecutorSubscribableChannel) channel;
if (execChannel.getInterceptors().stream().noneMatch(i -> i instanceof CallbackInterceptor)) {
execChannel.addInterceptor(0, new CallbackInterceptor());
}
}
else if (channel instanceof ExecutorSubscribableChannel) {
ExecutorSubscribableChannel execChannel = (ExecutorSubscribableChannel) channel;
execChannel.getInterceptors().stream().filter(i -> i instanceof CallbackInterceptor)
.findFirst()
.map(execChannel::removeInterceptor);
}
}
private static class CallbackInterceptor implements ExecutorChannelInterceptor {
@Override
public void afterMessageHandled(Message<?> msg, MessageChannel ch, MessageHandler handler, Exception ex) {
Runnable task = (Runnable) msg.getHeaders().get(OrderedMessageSender.COMPLETION_TASK_HEADER);
if (task != null) {
task.run();
}
}
}
}

View File

@ -306,10 +306,11 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
else if (SimpMessageType.CONNECT.equals(messageType)) {
logMessage(message);
if (sessionId != null) {
long[] clientHeartbeat = SimpMessageHeaderAccessor.getHeartbeat(headers);
long[] serverHeartbeat = getHeartbeatValue();
long[] heartbeatIn = SimpMessageHeaderAccessor.getHeartbeat(headers);
long[] heartbeatOut = getHeartbeatValue();
Principal user = SimpMessageHeaderAccessor.getUser(headers);
this.sessions.put(sessionId, new SessionInfo(sessionId, user, clientHeartbeat, serverHeartbeat));
MessageChannel outChannel = getClientOutboundChannelForSession(sessionId);
this.sessions.put(sessionId, new SessionInfo(sessionId, user, outChannel, heartbeatIn, heartbeatOut));
SimpMessageHeaderAccessor connectAck = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
initHeaders(connectAck);
connectAck.setSessionId(sessionId);
@ -317,7 +318,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
connectAck.setUser(user);
}
connectAck.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, message);
connectAck.setHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER, serverHeartbeat);
connectAck.setHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER, heartbeatOut);
Message<byte[]> messageOut = MessageBuilder.createMessage(EMPTY_PAYLOAD, connectAck.getMessageHeaders());
getClientOutboundChannel().send(messageOut);
}
@ -391,19 +392,20 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
headerAccessor.setSessionId(sessionId);
headerAccessor.setSubscriptionId(subscriptionId);
headerAccessor.copyHeadersIfAbsent(message.getHeaders());
headerAccessor.setLeaveMutable(true);
Object payload = message.getPayload();
Message<?> reply = MessageBuilder.createMessage(payload, headerAccessor.getMessageHeaders());
try {
getClientOutboundChannel().send(reply);
}
catch (Throwable ex) {
if (logger.isErrorEnabled()) {
logger.error("Failed to send " + message, ex);
SessionInfo info = this.sessions.get(sessionId);
if (info != null) {
try {
info.getClientOutboundChannel().send(reply);
}
}
finally {
SessionInfo info = this.sessions.get(sessionId);
if (info != null) {
catch (Throwable ex) {
if (logger.isErrorEnabled()) {
logger.error("Failed to send " + message, ex);
}
}
finally {
info.setLastWriteTime(now);
}
}
@ -427,6 +429,8 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
@Nullable
private final Principal user;
private final MessageChannel clientOutboundChannel;
private final long readInterval;
private final long writeInterval;
@ -435,11 +439,13 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
private volatile long lastWriteTime;
public SessionInfo(String sessionId, @Nullable Principal user,
public SessionInfo(String sessionId, @Nullable Principal user, MessageChannel outboundChannel,
@Nullable long[] clientHeartbeat, @Nullable long[] serverHeartbeat) {
this.sessionId = sessionId;
this.user = user;
this.clientOutboundChannel = outboundChannel;
if (clientHeartbeat != null && serverHeartbeat != null) {
this.readInterval = (clientHeartbeat[0] > 0 && serverHeartbeat[1] > 0 ?
Math.max(clientHeartbeat[0], serverHeartbeat[1]) * HEARTBEAT_MULTIPLIER : 0);
@ -462,6 +468,10 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
return this.user;
}
public MessageChannel getClientOutboundChannel() {
return this.clientOutboundChannel;
}
public long getReadInterval() {
return this.readInterval;
}
@ -505,8 +515,9 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
accessor.setUser(user);
}
initHeaders(accessor);
accessor.setLeaveMutable(true);
MessageHeaders headers = accessor.getMessageHeaders();
getClientOutboundChannel().send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers));
info.getClientOutboundChannel().send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers));
}
}
}

View File

@ -54,6 +54,8 @@ public class MessageBrokerRegistry {
@Nullable
private String userDestinationPrefix;
private boolean preservePublishOrder;
@Nullable
private PathMatcher pathMatcher;
@ -160,6 +162,30 @@ public class MessageBrokerRegistry {
return this.userDestinationPrefix;
}
/**
* Whether the client must receive messages in the order of publication.
* <p>By default messages sent to the {@code "clientOutboundChannel"} may
* not be processed in the same order because the channel is backed by a
* ThreadPoolExecutor that in turn does not guarantee processing in order.
* <p>When this flag is set to {@code true} messages within the same session
* will be sent to the {@code "clientOutboundChannel"} one at a time in
* order to preserve the order of publication. Enable this only if needed
* since there is some performance overhead to keep messages in order.
* @param preservePublishOrder whether to publish in order
* @since 5.1
*/
public void setPreservePublishOrder(boolean preservePublishOrder) {
this.preservePublishOrder = preservePublishOrder;
}
/**
* Whether to ensure messages are received in the order of publication.
* @since 5.1
*/
protected boolean isPreservePublishOrder() {
return this.preservePublishOrder;
}
/**
* Configure the PathMatcher to use to match the destinations of incoming
* messages to {@code @MessageMapping} and {@code @SubscribeMapping} methods.
@ -209,6 +235,7 @@ public class MessageBrokerRegistry {
SimpleBrokerMessageHandler handler = this.simpleBrokerRegistration.getMessageHandler(brokerChannel);
handler.setPathMatcher(this.pathMatcher);
handler.setCacheLimit(this.cacheLimit);
handler.setPreservePublishOrder(this.preservePublishOrder);
return handler;
}
return null;
@ -217,7 +244,9 @@ public class MessageBrokerRegistry {
@Nullable
protected StompBrokerRelayMessageHandler getStompBrokerRelay(SubscribableChannel brokerChannel) {
if (this.brokerRelayRegistration != null) {
return this.brokerRelayRegistration.getMessageHandler(brokerChannel);
StompBrokerRelayMessageHandler relay = this.brokerRelayRegistration.getMessageHandler(brokerChannel);
relay.setPreservePublishOrder(this.preservePublishOrder);
return relay;
}
return null;
}

View File

@ -578,6 +578,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private final StompHeaderAccessor connectHeaders;
private final MessageChannel outboundChannel;
@Nullable
private volatile TcpConnection<byte[]> tcpConnection;
@ -594,6 +596,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
this.sessionId = sessionId;
this.connectHeaders = connectHeaders;
this.isRemoteClientSession = isClientSession;
this.outboundChannel = getClientOutboundChannelForSession(sessionId);
}
public String getSessionId() {
@ -660,6 +663,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
accessor.setUser(user);
}
accessor.setMessage(errorText);
accessor.setLeaveMutable(true);
Message<?> errorMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
handleInboundMessage(errorMessage);
}
@ -667,11 +671,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
protected void handleInboundMessage(Message<?> message) {
if (this.isRemoteClientSession) {
MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, null);
if (accessor != null) {
accessor.setImmutable();
}
StompBrokerRelayMessageHandler.this.getClientOutboundChannel().send(message);
this.outboundChannel.send(message);
}
}

View File

@ -44,7 +44,9 @@ public interface ExecutorChannelInterceptor extends ChannelInterceptor {
* @return the input message, or a new instance, or {@code null}
*/
@Nullable
Message<?> beforeHandle(Message<?> message, MessageChannel channel, MessageHandler handler);
default Message<?> beforeHandle(Message<?> message, MessageChannel channel, MessageHandler handler) {
return message;
}
/**
* Invoked inside the {@link Runnable} submitted to the Executor after calling
@ -57,6 +59,8 @@ public interface ExecutorChannelInterceptor extends ChannelInterceptor {
* @param handler the target handler that handled the message
* @param ex any exception that may been raised by the handler
*/
void afterMessageHandled(Message<?> message, MessageChannel channel, MessageHandler handler, @Nullable Exception ex);
default void afterMessageHandled(Message<?> message, MessageChannel channel, MessageHandler handler,
@Nullable Exception ex) {
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -70,16 +70,22 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
public void setInterceptors(List<ChannelInterceptor> interceptors) {
super.setInterceptors(interceptors);
this.executorInterceptors.clear();
for (ChannelInterceptor interceptor : interceptors) {
if (interceptor instanceof ExecutorChannelInterceptor) {
this.executorInterceptors.add((ExecutorChannelInterceptor) interceptor);
}
}
interceptors.forEach(this::updateExecutorInterceptorsFor);
}
@Override
public void addInterceptor(ChannelInterceptor interceptor) {
super.addInterceptor(interceptor);
updateExecutorInterceptorsFor(interceptor);
}
@Override
public void addInterceptor(int index, ChannelInterceptor interceptor) {
super.addInterceptor(index, interceptor);
updateExecutorInterceptorsFor(interceptor);
}
private void updateExecutorInterceptorsFor(ChannelInterceptor interceptor) {
if (interceptor instanceof ExecutorChannelInterceptor) {
this.executorInterceptors.add((ExecutorChannelInterceptor) interceptor);
}

View File

@ -0,0 +1,118 @@
/*
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.broker;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import static org.junit.Assert.*;
/**
* Unit tests for {@link OrderedMessageSender}.
* @author Rossen Stoyanchev
*/
public class OrderedMessageSenderTests {
private static final Log logger = LogFactory.getLog(OrderedMessageSenderTests.class);
private OrderedMessageSender sender;
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(this.executor);
private ThreadPoolTaskExecutor executor;
@Before
public void setup() {
this.executor = new ThreadPoolTaskExecutor();
this.executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);
this.executor.setAllowCoreThreadTimeOut(true);
this.executor.afterPropertiesSet();
this.channel = new ExecutorSubscribableChannel(this.executor);
OrderedMessageSender.configureOutboundChannel(this.channel, true);
this.sender = new OrderedMessageSender(this.channel, logger);
}
@After
public void tearDown() {
this.executor.shutdown();
}
@Test
public void test() throws InterruptedException {
int start = 1;
int end = 1000;
AtomicInteger index = new AtomicInteger(start);
AtomicReference<Object> result = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
this.channel.subscribe(message -> {
int expected = index.getAndIncrement();
Integer actual = (Integer) message.getHeaders().getOrDefault("seq", -1);
if (actual != expected) {
result.set("Expected: " + expected + ", but was: " + actual);
latch.countDown();
return;
}
if (actual == 100 || actual == 200) {
try {
Thread.sleep(200);
}
catch (InterruptedException ex) {
result.set(ex.toString());
latch.countDown();
}
}
if (actual == end) {
result.set("Done");
latch.countDown();
}
});
for (int i = start; i <= end; i++) {
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
accessor.setHeader("seq", i);
accessor.setLeaveMutable(true);
this.sender.send(MessageBuilder.createMessage("payload", accessor.getMessageHeaders()));
}
latch.await(10, TimeUnit.SECONDS);
assertEquals("Done", result.get());
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -38,20 +38,8 @@ import org.springframework.messaging.simp.TestPrincipal;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.TaskScheduler;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
* Unit tests for SimpleBrokerMessageHandler.
@ -65,10 +53,10 @@ public class SimpleBrokerMessageHandlerTests {
private SimpleBrokerMessageHandler messageHandler;
@Mock
private SubscribableChannel clientInboundChannel;
private SubscribableChannel clientInChannel;
@Mock
private MessageChannel clientOutboundChannel;
private MessageChannel clientOutChannel;
@Mock
private SubscribableChannel brokerChannel;
@ -83,15 +71,16 @@ public class SimpleBrokerMessageHandlerTests {
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
this.messageHandler = new SimpleBrokerMessageHandler(this.clientInboundChannel,
this.clientOutboundChannel, this.brokerChannel, Collections.emptyList());
this.messageHandler = new SimpleBrokerMessageHandler(
this.clientInChannel, this.clientOutChannel, this.brokerChannel, Collections.emptyList());
}
@Test
public void subcribePublish() {
this.messageHandler.start();
startSession("sess1");
startSession("sess2");
this.messageHandler.handleMessage(createSubscriptionMessage("sess1", "sub1", "/foo"));
this.messageHandler.handleMessage(createSubscriptionMessage("sess1", "sub2", "/foo"));
@ -104,7 +93,7 @@ public class SimpleBrokerMessageHandlerTests {
this.messageHandler.handleMessage(createMessage("/foo", "message1"));
this.messageHandler.handleMessage(createMessage("/bar", "message2"));
verify(this.clientOutboundChannel, times(6)).send(this.messageCaptor.capture());
verify(this.clientOutChannel, times(6)).send(this.messageCaptor.capture());
assertTrue(messageCaptured("sess1", "sub1", "/foo"));
assertTrue(messageCaptured("sess1", "sub2", "/foo"));
assertTrue(messageCaptured("sess2", "sub1", "/foo"));
@ -119,7 +108,8 @@ public class SimpleBrokerMessageHandlerTests {
String sess1 = "sess1";
String sess2 = "sess2";
this.messageHandler.start();
startSession(sess1);
startSession(sess2);
this.messageHandler.handleMessage(createSubscriptionMessage(sess1, "sub1", "/foo"));
this.messageHandler.handleMessage(createSubscriptionMessage(sess1, "sub2", "/foo"));
@ -138,9 +128,9 @@ public class SimpleBrokerMessageHandlerTests {
this.messageHandler.handleMessage(createMessage("/foo", "message1"));
this.messageHandler.handleMessage(createMessage("/bar", "message2"));
verify(this.clientOutboundChannel, times(4)).send(this.messageCaptor.capture());
verify(this.clientOutChannel, times(4)).send(this.messageCaptor.capture());
Message<?> captured = this.messageCaptor.getAllValues().get(0);
Message<?> captured = this.messageCaptor.getAllValues().get(2);
assertEquals(SimpMessageType.DISCONNECT_ACK, SimpMessageHeaderAccessor.getMessageType(captured.getHeaders()));
assertSame(message, captured.getHeaders().get(SimpMessageHeaderAccessor.DISCONNECT_MESSAGE_HEADER));
assertEquals(sess1, SimpMessageHeaderAccessor.getSessionId(captured.getHeaders()));
@ -154,14 +144,9 @@ public class SimpleBrokerMessageHandlerTests {
@Test
public void connect() {
this.messageHandler.start();
String id = "sess1";
Message<String> connectMessage = createConnectMessage(id, new TestPrincipal("joe"), null);
this.messageHandler.setTaskScheduler(this.taskScheduler);
this.messageHandler.handleMessage(connectMessage);
verify(this.clientOutboundChannel, times(1)).send(this.messageCaptor.capture());
Message<String> connectMessage = startSession(id);
Message<?> connectAckMessage = this.messageCaptor.getValue();
SimpMessageHeaderAccessor connectAckHeaders = SimpMessageHeaderAccessor.wrap(connectAckMessage);
@ -173,7 +158,7 @@ public class SimpleBrokerMessageHandlerTests {
}
@Test
public void heartbeatValueWithAndWithoutTaskScheduler() throws Exception {
public void heartbeatValueWithAndWithoutTaskScheduler() {
assertNull(this.messageHandler.getHeartbeatValue());
@ -184,14 +169,14 @@ public class SimpleBrokerMessageHandlerTests {
}
@Test(expected = IllegalArgumentException.class)
public void startWithHeartbeatValueWithoutTaskScheduler() throws Exception {
public void startWithHeartbeatValueWithoutTaskScheduler() {
this.messageHandler.setHeartbeatValue(new long[] {10000, 10000});
this.messageHandler.start();
}
@SuppressWarnings("unchecked")
@Test
public void startAndStopWithHeartbeatValue() throws Exception {
public void startAndStopWithHeartbeatValue() {
ScheduledFuture future = mock(ScheduledFuture.class);
when(this.taskScheduler.scheduleWithFixedDelay(any(Runnable.class), eq(15000L))).thenReturn(future);
@ -211,7 +196,7 @@ public class SimpleBrokerMessageHandlerTests {
@SuppressWarnings("unchecked")
@Test
public void startWithOneZeroHeartbeatValue() throws Exception {
public void startWithOneZeroHeartbeatValue() {
this.messageHandler.setTaskScheduler(this.taskScheduler);
this.messageHandler.setHeartbeatValue(new long[] {0, 10000});
@ -240,7 +225,7 @@ public class SimpleBrokerMessageHandlerTests {
Thread.sleep(10);
heartbeatTask.run();
verify(this.clientOutboundChannel, atLeast(2)).send(this.messageCaptor.capture());
verify(this.clientOutChannel, atLeast(2)).send(this.messageCaptor.capture());
List<Message<?>> messages = this.messageCaptor.getAllValues();
assertEquals(2, messages.size());
@ -272,7 +257,7 @@ public class SimpleBrokerMessageHandlerTests {
Thread.sleep(10);
heartbeatTask.run();
verify(this.clientOutboundChannel, times(2)).send(this.messageCaptor.capture());
verify(this.clientOutChannel, times(2)).send(this.messageCaptor.capture());
List<Message<?>> messages = this.messageCaptor.getAllValues();
assertEquals(2, messages.size());
@ -304,13 +289,25 @@ public class SimpleBrokerMessageHandlerTests {
Thread.sleep(10);
heartbeatTask.run();
verify(this.clientOutboundChannel, times(1)).send(this.messageCaptor.capture());
verify(this.clientOutChannel, times(1)).send(this.messageCaptor.capture());
List<Message<?>> messages = this.messageCaptor.getAllValues();
assertEquals(1, messages.size());
assertEquals(SimpMessageType.CONNECT_ACK,
messages.get(0).getHeaders().get(SimpMessageHeaderAccessor.MESSAGE_TYPE_HEADER));
}
private Message<String> startSession(String id) {
this.messageHandler.start();
Message<String> connectMessage = createConnectMessage(id, new TestPrincipal("joe"), null);
this.messageHandler.setTaskScheduler(this.taskScheduler);
this.messageHandler.handleMessage(connectMessage);
verify(this.clientOutChannel, times(1)).send(this.messageCaptor.capture());
reset(this.clientOutChannel);
return connectMessage;
}
private Message<String> createSubscriptionMessage(String sessionId, String subcriptionId, String destination) {
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.SUBSCRIBE);
headers.setSubscriptionId(subcriptionId);

View File

@ -157,7 +157,7 @@ public class MessageBrokerConfigurationTests {
public void clientOutboundChannelUsedBySimpleBroker() {
ApplicationContext context = loadConfig(SimpleBrokerConfig.class);
TestChannel channel = context.getBean("clientOutboundChannel", TestChannel.class);
TestChannel outboundChannel = context.getBean("clientOutboundChannel", TestChannel.class);
SimpleBrokerMessageHandler broker = context.getBean(SimpleBrokerMessageHandler.class);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
@ -167,6 +167,7 @@ public class MessageBrokerConfigurationTests {
Message<?> message = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
// subscribe
broker.handleMessage(createConnectMessage("sess1", new long[] {0,0}));
broker.handleMessage(message);
headers = StompHeaderAccessor.create(StompCommand.SEND);
@ -177,7 +178,7 @@ public class MessageBrokerConfigurationTests {
// message
broker.handleMessage(message);
message = channel.messages.get(0);
message = outboundChannel.messages.get(1);
headers = StompHeaderAccessor.wrap(message);
assertEquals(SimpMessageType.MESSAGE, headers.getMessageType());
@ -192,7 +193,7 @@ public class MessageBrokerConfigurationTests {
AbstractSubscribableChannel channel = context.getBean(
"clientOutboundChannel", AbstractSubscribableChannel.class);
assertEquals(3, channel.getInterceptors().size());
assertEquals(4, channel.getInterceptors().size());
ThreadPoolTaskExecutor taskExecutor = context.getBean(
"clientOutboundChannelExecutor", ThreadPoolTaskExecutor.class);
@ -200,6 +201,10 @@ public class MessageBrokerConfigurationTests {
assertEquals(21, taskExecutor.getCorePoolSize());
assertEquals(22, taskExecutor.getMaxPoolSize());
assertEquals(23, taskExecutor.getKeepAliveSeconds());
SimpleBrokerMessageHandler broker =
context.getBean("simpleBrokerMessageHandler", SimpleBrokerMessageHandler.class);
assertTrue(broker.isPreservePublishOrder());
}
@Test
@ -479,6 +484,7 @@ public class MessageBrokerConfigurationTests {
TestChannel outChannel = context.getBean("clientOutboundChannel", TestChannel.class);
MessageChannel brokerChannel = context.getBean("brokerChannel", MessageChannel.class);
inChannel.send(createConnectMessage("sess1", new long[] {0,0}));
// 1. Subscribe to user destination
@ -497,13 +503,14 @@ public class MessageBrokerConfigurationTests {
message = MessageBuilder.createMessage("123".getBytes(), headers.getMessageHeaders());
inChannel.send(message);
assertEquals(1, outChannel.messages.size());
Message<?> outputMessage = outChannel.messages.remove(0);
assertEquals(2, outChannel.messages.size());
Message<?> outputMessage = outChannel.messages.remove(1);
headers = StompHeaderAccessor.wrap(outputMessage);
assertEquals(SimpMessageType.MESSAGE, headers.getMessageType());
assertEquals(expectLeadingSlash ? "/queue.q1-usersess1" : "queue.q1-usersess1", headers.getDestination());
assertEquals("123", new String((byte[]) outputMessage.getPayload()));
outChannel.messages.clear();
// 3. Send message via broker channel
@ -527,6 +534,13 @@ public class MessageBrokerConfigurationTests {
return new AnnotationConfigApplicationContext(configClass);
}
private Message<String> createConnectMessage(String sessionId, long[] heartbeat) {
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT);
accessor.setSessionId(sessionId);
accessor.setHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER, heartbeat);
return MessageBuilder.createMessage("", accessor.getMessageHeaders());
}
@SuppressWarnings("unused")
@Controller
@ -635,6 +649,7 @@ public class MessageBrokerConfigurationTests {
.corePoolSize(31).maxPoolSize(32).keepAliveSeconds(33).queueCapacity(34);
registry.setPathMatcher(new AntPathMatcher(".")).enableSimpleBroker("/topic", "/queue");
registry.setCacheLimit(8192);
registry.setPreservePublishOrder(true);
}
}

View File

@ -102,12 +102,14 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
}
private void createAndStartRelay() throws InterruptedException {
this.relay = new StompBrokerRelayMessageHandler(new StubMessageChannel(),
this.responseChannel, new StubMessageChannel(), Arrays.asList("/queue/", "/topic/"));
StubMessageChannel channel = new StubMessageChannel();
List<String> prefixes = Arrays.asList("/queue/", "/topic/");
this.relay = new StompBrokerRelayMessageHandler(channel, this.responseChannel, channel, prefixes);
this.relay.setRelayPort(this.port);
this.relay.setApplicationEventPublisher(this.eventPublisher);
this.relay.setSystemHeartbeatReceiveInterval(0);
this.relay.setSystemHeartbeatSendInterval(0);
this.relay.setPreservePublishOrder(true);
this.relay.start();
this.eventPublisher.expectBrokerAvailabilityEvent(true);

View File

@ -163,7 +163,7 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
}
}
Map<String, Object> scopeMap = Collections.<String, Object>singletonMap("websocket", new SimpSessionScope());
Map<String, Object> scopeMap = Collections.singletonMap("websocket", new SimpSessionScope());
RootBeanDefinition scopeConfigurer = new RootBeanDefinition(CustomScopeConfigurer.class);
scopeConfigurer.getPropertyValues().add("scopes", scopeMap);
registerBeanDefByName("webSocketScopeConfigurer", scopeConfigurer, context, source);
@ -444,6 +444,12 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
// Should not happen
throw new IllegalStateException("Neither <simple-broker> nor <stomp-broker-relay> elements found.");
}
if (brokerElement.hasAttribute("preserve-publish-order")) {
String preservePublishOrder = brokerElement.getAttribute("preserve-publish-order");
brokerDef.getPropertyValues().add("preservePublishOrder", preservePublishOrder);
}
registerBeanDef(brokerDef, context, source);
return brokerDef;
}

View File

@ -895,6 +895,22 @@
Prefixes without a trailing slash will have one appended automatically.
By default the list of prefixes is empty in which case all destinations match.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="preserve-publish-order" type="xsd:boolean">
<xsd:annotation>
<xsd:documentation><![CDATA[
Whether the client must receive messages in the order of publication.
By default messages sent to the clientOutboundChannel may
not be processed in the same order because the channel is backed by a
ThreadPoolExecutor that in turn does not guarantee processing in order.
When this flag is set to true messages within the same session
will be sent to the clientOutboundChannel one at a time in
order to preserve the order of publication. Enable this only if needed
since there is some performance overhead to keep messages in order.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>

View File

@ -208,18 +208,19 @@ public class MessageBrokerBeanDefinitionParserTests {
assertEquals("my-selector", registry.getSelectorHeaderName());
assertNotNull(brokerMessageHandler.getTaskScheduler());
assertArrayEquals(new long[] {15000, 15000}, brokerMessageHandler.getHeartbeatValue());
assertTrue(brokerMessageHandler.isPreservePublishOrder());
List<Class<? extends MessageHandler>> subscriberTypes =
Arrays.<Class<? extends MessageHandler>>asList(SimpAnnotationMethodMessageHandler.class,
Arrays.asList(SimpAnnotationMethodMessageHandler.class,
UserDestinationMessageHandler.class, SimpleBrokerMessageHandler.class);
testChannel("clientInboundChannel", subscriberTypes, 2);
testExecutor("clientInboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60);
subscriberTypes = Collections.singletonList(SubProtocolWebSocketHandler.class);
testChannel("clientOutboundChannel", subscriberTypes, 1);
testChannel("clientOutboundChannel", subscriberTypes, 2);
testExecutor("clientOutboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60);
subscriberTypes = Arrays.<Class<? extends MessageHandler>>asList(
subscriberTypes = Arrays.asList(
SimpleBrokerMessageHandler.class, UserDestinationMessageHandler.class);
testChannel("brokerChannel", subscriberTypes, 1);
try {
@ -278,6 +279,7 @@ public class MessageBrokerBeanDefinitionParserTests {
assertEquals(5000, messageBroker.getSystemHeartbeatReceiveInterval());
assertEquals(5000, messageBroker.getSystemHeartbeatSendInterval());
assertThat(messageBroker.getDestinationPrefixes(), Matchers.containsInAnyOrder("/topic","/queue"));
assertTrue(messageBroker.isPreservePublishOrder());
List<Class<? extends MessageHandler>> subscriberTypes = Arrays.asList(
SimpAnnotationMethodMessageHandler.class, UserDestinationMessageHandler.class,
@ -286,7 +288,7 @@ public class MessageBrokerBeanDefinitionParserTests {
testExecutor("clientInboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60);
subscriberTypes = Collections.singletonList(SubProtocolWebSocketHandler.class);
testChannel("clientOutboundChannel", subscriberTypes, 1);
testChannel("clientOutboundChannel", subscriberTypes, 2);
testExecutor("clientOutboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60);
subscriberTypes = Arrays.asList(StompBrokerRelayMessageHandler.class, UserDestinationMessageHandler.class);

View File

@ -102,12 +102,13 @@ public class StompWebSocketIntegrationTests extends AbstractWebSocketIntegration
@Test
public void sendMessageToControllerAndReceiveReplyViaTopic() throws Exception {
TextMessage message1 = create(StompCommand.SUBSCRIBE)
TextMessage m0 = create(StompCommand.CONNECT).headers("accept-version:1.1").build();
TextMessage m1 = create(StompCommand.SUBSCRIBE)
.headers("id:subs1", "destination:/topic/increment").build();
TextMessage message2 = create(StompCommand.SEND)
TextMessage m2 = create(StompCommand.SEND)
.headers("destination:/app/increment").body("5").build();
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, message1, message2);
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1, m2);
WebSocketSession session = doHandshake(clientHandler, "/ws").get();
try {
@ -120,16 +121,17 @@ public class StompWebSocketIntegrationTests extends AbstractWebSocketIntegration
@Test // SPR-10930
public void sendMessageToBrokerAndReceiveReplyViaTopic() throws Exception {
TextMessage m0 = create(StompCommand.CONNECT).headers("accept-version:1.1").build();
TextMessage m1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", "destination:/topic/foo").build();
TextMessage m2 = create(StompCommand.SEND).headers("destination:/topic/foo").body("5").build();
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, m1, m2);
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1, m2);
WebSocketSession session = doHandshake(clientHandler, "/ws").get();
try {
assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));
String payload = clientHandler.actual.get(0).getPayload();
String payload = clientHandler.actual.get(1).getPayload();
assertTrue("Expected STOMP MESSAGE, got " + payload, payload.startsWith("MESSAGE\n"));
}
finally {
@ -139,15 +141,16 @@ public class StompWebSocketIntegrationTests extends AbstractWebSocketIntegration
@Test // SPR-11648
public void sendSubscribeToControllerAndReceiveReply() throws Exception {
TextMessage m0 = create(StompCommand.CONNECT).headers("accept-version:1.1").build();
String destHeader = "destination:/app/number";
TextMessage message = create(StompCommand.SUBSCRIBE).headers("id:subs1", destHeader).build();
TextMessage m1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", destHeader).build();
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, message);
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1);
WebSocketSession session = doHandshake(clientHandler, "/ws").get();
try {
assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));
String payload = clientHandler.actual.get(0).getPayload();
String payload = clientHandler.actual.get(1).getPayload();
assertTrue("Expected STOMP destination=/app/number, got " + payload, payload.contains(destHeader));
assertTrue("Expected STOMP Payload=42, got " + payload, payload.contains("42"));
}
@ -159,15 +162,16 @@ public class StompWebSocketIntegrationTests extends AbstractWebSocketIntegration
@Test
public void handleExceptionAndSendToUser() throws Exception {
String destHeader = "destination:/user/queue/error";
TextMessage m0 = create(StompCommand.CONNECT).headers("accept-version:1.1").build();
TextMessage m1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", destHeader).build();
TextMessage m2 = create(StompCommand.SEND).headers("destination:/app/exception").build();
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, m1, m2);
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1, m2);
WebSocketSession session = doHandshake(clientHandler, "/ws").get();
try {
assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));
String payload = clientHandler.actual.get(0).getPayload();
String payload = clientHandler.actual.get(1).getPayload();
assertTrue(payload.startsWith("MESSAGE\n"));
assertTrue(payload.contains("destination:/user/queue/error\n"));
assertTrue(payload.endsWith("Got error: Bad input\0"));
@ -179,17 +183,18 @@ public class StompWebSocketIntegrationTests extends AbstractWebSocketIntegration
@Test
public void webSocketScope() throws Exception {
TextMessage message1 = create(StompCommand.SUBSCRIBE)
TextMessage m0 = create(StompCommand.CONNECT).headers("accept-version:1.1").build();
TextMessage m1 = create(StompCommand.SUBSCRIBE)
.headers("id:subs1", "destination:/topic/scopedBeanValue").build();
TextMessage message2 = create(StompCommand.SEND)
TextMessage m2 = create(StompCommand.SEND)
.headers("destination:/app/scopedBeanValue").build();
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, message1, message2);
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1, m2);
WebSocketSession session = doHandshake(clientHandler, "/ws").get();
try {
assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));
String payload = clientHandler.actual.get(0).getPayload();
String payload = clientHandler.actual.get(1).getPayload();
assertTrue(payload.startsWith("MESSAGE\n"));
assertTrue(payload.contains("destination:/topic/scopedBeanValue\n"));
assertTrue(payload.endsWith("55\0"));

View File

@ -4,7 +4,7 @@
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket http://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker order="2">
<websocket:message-broker order="2" preserve-publish-order="true">
<websocket:stomp-endpoint path="/foo">
<websocket:sockjs/>
</websocket:stomp-endpoint>

View File

@ -7,7 +7,8 @@
<websocket:message-broker application-destination-prefix="/app"
user-destination-prefix="/personal"
path-matcher="pathMatcher"
path-helper="urlPathHelper">
path-helper="urlPathHelper"
preserve-publish-order="true">
<!-- message-size=128*1024, send-buffer-size=1024*1024 -->
<websocket:transport message-size="131072" send-timeout="25000" send-buffer-size="1048576" time-to-first-message="30000">