Polishing

See gh-21798
This commit is contained in:
rstoyanchev 2023-10-09 12:01:48 +01:00
parent e76b453685
commit d62d7f5ff1
5 changed files with 60 additions and 46 deletions

View File

@ -161,8 +161,7 @@ public abstract class AbstractBrokerMessageHandler
* ThreadPoolExecutor that in turn does not guarantee processing in order. * ThreadPoolExecutor that in turn does not guarantee processing in order.
* <p>When this flag is set to {@code true} messages within the same session * <p>When this flag is set to {@code true} messages within the same session
* will be sent to the {@code "clientOutboundChannel"} one at a time in * will be sent to the {@code "clientOutboundChannel"} one at a time in
* order to preserve the order of publication. Enable this only if needed * order to preserve the order of publication.
* since there is some performance overhead to keep messages in order.
* @param preservePublishOrder whether to publish in order * @param preservePublishOrder whether to publish in order
* @since 5.1 * @since 5.1
*/ */

View File

@ -89,11 +89,7 @@ public class OrderedMessageChannelDecorator implements MessageChannel {
Message<?> message = this.messages.peek(); Message<?> message = this.messages.peek();
if (message != null) { if (message != null) {
try { try {
addNextMessageTaskHeader(message, () -> { setTaskHeader(message, new PostHandleTask(message));
if (removeMessage(message)) {
sendNextMessage();
}
});
if (this.channel.send(message)) { if (this.channel.send(message)) {
return; return;
} }
@ -124,19 +120,12 @@ public class OrderedMessageChannelDecorator implements MessageChannel {
} }
} }
private static void addNextMessageTaskHeader(Message<?> message, Runnable task) { private static void setTaskHeader(Message<?> message, Runnable task) {
SimpMessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class); SimpMessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);
Assert.isTrue(accessor != null && accessor.isMutable(), "Expected mutable SimpMessageHeaderAccessor"); Assert.isTrue(accessor != null && accessor.isMutable(), "Expected mutable SimpMessageHeaderAccessor");
accessor.setHeader(NEXT_MESSAGE_TASK_HEADER, task); accessor.setHeader(NEXT_MESSAGE_TASK_HEADER, task);
} }
/**
* Obtain the task to release the next message, if found.
*/
@Nullable
public static Runnable getNextMessageTask(Message<?> message) {
return (Runnable) message.getHeaders().get(OrderedMessageChannelDecorator.NEXT_MESSAGE_TASK_HEADER);
}
/** /**
* Install or remove an {@link ExecutorChannelInterceptor} that invokes a * Install or remove an {@link ExecutorChannelInterceptor} that invokes a
@ -150,19 +139,47 @@ public class OrderedMessageChannelDecorator implements MessageChannel {
Assert.isInstanceOf(ExecutorSubscribableChannel.class, channel, Assert.isInstanceOf(ExecutorSubscribableChannel.class, channel,
"An ExecutorSubscribableChannel is required for 'preservePublishOrder'"); "An ExecutorSubscribableChannel is required for 'preservePublishOrder'");
ExecutorSubscribableChannel execChannel = (ExecutorSubscribableChannel) channel; ExecutorSubscribableChannel execChannel = (ExecutorSubscribableChannel) channel;
if (execChannel.getInterceptors().stream().noneMatch(CallbackInterceptor.class::isInstance)) { if (execChannel.getInterceptors().stream().noneMatch(CallbackTaskInterceptor.class::isInstance)) {
execChannel.addInterceptor(0, new CallbackInterceptor()); execChannel.addInterceptor(0, new CallbackTaskInterceptor());
} }
} }
else if (channel instanceof ExecutorSubscribableChannel execChannel) { else if (channel instanceof ExecutorSubscribableChannel execChannel) {
execChannel.getInterceptors().stream().filter(CallbackInterceptor.class::isInstance) execChannel.getInterceptors().stream().filter(CallbackTaskInterceptor.class::isInstance)
.findFirst().map(execChannel::removeInterceptor); .findFirst().map(execChannel::removeInterceptor);
} }
} }
/**
* Obtain the task to release the next message, if found.
*/
@Nullable
public static Runnable getNextMessageTask(Message<?> message) {
return (Runnable) message.getHeaders().get(OrderedMessageChannelDecorator.NEXT_MESSAGE_TASK_HEADER);
}
private static class CallbackInterceptor implements ExecutorChannelInterceptor {
/**
* Remove handled message from queue, and send next message.
*/
private class PostHandleTask implements Runnable {
private final Message<?> message;
private PostHandleTask(Message<?> message) {
this.message = message;
}
@Override
public void run() {
if (OrderedMessageChannelDecorator.this.removeMessage(message)) {
sendNextMessage();
}
}
}
private static class CallbackTaskInterceptor implements ExecutorChannelInterceptor {
@Override @Override
public void afterMessageHandled( public void afterMessageHandled(

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2018 the original author or authors. * Copyright 2002-2023 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -228,8 +228,7 @@ public class MessageBrokerRegistry {
* ThreadPoolExecutor that in turn does not guarantee processing in order. * ThreadPoolExecutor that in turn does not guarantee processing in order.
* <p>When this flag is set to {@code true} messages within the same session * <p>When this flag is set to {@code true} messages within the same session
* will be sent to the {@code "clientOutboundChannel"} one at a time in * will be sent to the {@code "clientOutboundChannel"} one at a time in
* order to preserve the order of publication. Enable this only if needed * order to preserve the order of publication.
* since there is some performance overhead to keep messages in order.
* @since 5.1 * @since 5.1
*/ */
public MessageBrokerRegistry setPreservePublishOrder(boolean preservePublishOrder) { public MessageBrokerRegistry setPreservePublishOrder(boolean preservePublishOrder) {

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2016 the original author or authors. * Copyright 2002-2023 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -35,7 +35,7 @@ public abstract class AbstractSubscribableChannel extends AbstractMessageChannel
public Set<MessageHandler> getSubscribers() { public Set<MessageHandler> getSubscribers() {
return Collections.<MessageHandler>unmodifiableSet(this.handlers); return Collections.unmodifiableSet(this.handlers);
} }
public boolean hasSubscription(MessageHandler handler) { public boolean hasSubscription(MessageHandler handler) {

View File

@ -79,9 +79,9 @@ public class OrderedMessageSendingIntegrationTests {
private BlockingWebSocketSession blockingSession; private BlockingWebSocketSession blockingSession;
private ExecutorSubscribableChannel subscribableChannel; private ExecutorSubscribableChannel clientOutChannel;
private OrderedMessageChannelDecorator orderedMessageChannel; private OrderedMessageChannelDecorator orderedClientOutChannel;
private ThreadPoolTaskExecutor executor; private ThreadPoolTaskExecutor executor;
@ -98,10 +98,10 @@ public class OrderedMessageSendingIntegrationTests {
this.executor.setAllowCoreThreadTimeOut(true); this.executor.setAllowCoreThreadTimeOut(true);
this.executor.afterPropertiesSet(); this.executor.afterPropertiesSet();
this.subscribableChannel = new ExecutorSubscribableChannel(this.executor); this.clientOutChannel = new ExecutorSubscribableChannel(this.executor);
OrderedMessageChannelDecorator.configureInterceptor(this.subscribableChannel, true); OrderedMessageChannelDecorator.configureInterceptor(this.clientOutChannel, true);
this.orderedMessageChannel = new OrderedMessageChannelDecorator(this.subscribableChannel, logger); this.orderedClientOutChannel = new OrderedMessageChannelDecorator(this.clientOutChannel, logger);
} }
@AfterEach @AfterEach
@ -119,14 +119,14 @@ public class OrderedMessageSendingIntegrationTests {
this.blockingSession, 60 * 1000, messageCount * MESSAGE_SIZE); this.blockingSession, 60 * 1000, messageCount * MESSAGE_SIZE);
TestMessageHandler handler = new TestMessageHandler(concurrentSessionDecorator); TestMessageHandler handler = new TestMessageHandler(concurrentSessionDecorator);
subscribableChannel.subscribe(handler); this.clientOutChannel.subscribe(handler);
List<Message<?>> expectedMessages = new ArrayList<>(messageCount); List<Message<?>> expectedMessages = new ArrayList<>(messageCount);
// Send one to block // Send one to block
Message<byte[]> message = createMessage(0); Message<byte[]> message = createMessage(0);
expectedMessages.add(message); expectedMessages.add(message);
this.orderedMessageChannel.send(message); this.orderedClientOutChannel.send(message);
CountDownLatch latch = new CountDownLatch(messageCount); CountDownLatch latch = new CountDownLatch(messageCount);
handler.setMessageLatch(latch); handler.setMessageLatch(latch);
@ -134,10 +134,10 @@ public class OrderedMessageSendingIntegrationTests {
for (int i = 1; i <= messageCount; i++) { for (int i = 1; i <= messageCount; i++) {
message = createMessage(i); message = createMessage(i);
expectedMessages.add(message); expectedMessages.add(message);
this.orderedMessageChannel.send(message); this.orderedClientOutChannel.send(message);
} }
latch.await(5, TimeUnit.SECONDS); assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(concurrentSessionDecorator.getTimeSinceSendStarted()).isGreaterThan(0); assertThat(concurrentSessionDecorator.getTimeSinceSendStarted()).isGreaterThan(0);
assertThat(concurrentSessionDecorator.getBufferSize()).isEqualTo((messageCount * MESSAGE_SIZE)); assertThat(concurrentSessionDecorator.getBufferSize()).isEqualTo((messageCount * MESSAGE_SIZE));
@ -152,10 +152,10 @@ public class OrderedMessageSendingIntegrationTests {
new ConcurrentWebSocketSessionDecorator(this.blockingSession, 100, 1024); new ConcurrentWebSocketSessionDecorator(this.blockingSession, 100, 1024);
TestMessageHandler messageHandler = new TestMessageHandler(concurrentSessionDecorator); TestMessageHandler messageHandler = new TestMessageHandler(concurrentSessionDecorator);
subscribableChannel.subscribe(messageHandler); this.clientOutChannel.subscribe(messageHandler);
// Send one to block // Send one to block
this.orderedMessageChannel.send(createMessage(0)); this.orderedClientOutChannel.send(createMessage(0));
// Exceed send time.. // Exceed send time..
Thread.sleep(200); Thread.sleep(200);
@ -164,10 +164,9 @@ public class OrderedMessageSendingIntegrationTests {
messageHandler.setMessageLatch(messageLatch); messageHandler.setMessageLatch(messageLatch);
// Send one more // Send one more
this.orderedMessageChannel.send(createMessage(1)); this.orderedClientOutChannel.send(createMessage(1));
messageLatch.await(5, TimeUnit.SECONDS);
assertThat(messageLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(messageHandler.getSavedException()).hasMessageMatching( assertThat(messageHandler.getSavedException()).hasMessageMatching(
"Send time [\\d]+ \\(ms\\) for session '1' exceeded the allowed limit 100"); "Send time [\\d]+ \\(ms\\) for session '1' exceeded the allowed limit 100");
} }
@ -179,23 +178,23 @@ public class OrderedMessageSendingIntegrationTests {
new ConcurrentWebSocketSessionDecorator(this.blockingSession, 60 * 1000, 2 * MESSAGE_SIZE); new ConcurrentWebSocketSessionDecorator(this.blockingSession, 60 * 1000, 2 * MESSAGE_SIZE);
TestMessageHandler messageHandler = new TestMessageHandler(concurrentSessionDecorator); TestMessageHandler messageHandler = new TestMessageHandler(concurrentSessionDecorator);
subscribableChannel.subscribe(messageHandler); this.clientOutChannel.subscribe(messageHandler);
// Send one to block // Send one to block
this.orderedMessageChannel.send(createMessage(0)); this.orderedClientOutChannel.send(createMessage(0));
int messageCount = 3; int messageCount = 3;
CountDownLatch messageLatch = new CountDownLatch(messageCount); CountDownLatch messageLatch = new CountDownLatch(messageCount);
messageHandler.setMessageLatch(messageLatch); messageHandler.setMessageLatch(messageLatch);
for (int i = 1; i <= messageCount; i++) { for (int i = 1; i <= messageCount; i++) {
this.orderedMessageChannel.send(createMessage(i)); this.orderedClientOutChannel.send(createMessage(i));
} }
messageLatch.await(5, TimeUnit.SECONDS); assertThat(messageLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(messageHandler.getSavedException()).hasMessage( assertThat(messageHandler.getSavedException()).hasMessage(
"Buffer size " + 3 * MESSAGE_SIZE + " bytes for session '1' exceeds the allowed limit " + 2 * MESSAGE_SIZE); "Buffer size " + 3 * MESSAGE_SIZE + " bytes for session '1' " +
"exceeds the allowed limit " + 2 * MESSAGE_SIZE);
} }
private static Message<byte[]> createMessage(int index) { private static Message<byte[]> createMessage(int index) {
@ -218,9 +217,9 @@ public class OrderedMessageSendingIntegrationTests {
@Nullable @Nullable
private CountDownLatch messageLatch; private CountDownLatch messageLatch;
private Queue<Message<?>> messages = new LinkedBlockingQueue<>(); private final Queue<Message<?>> messages = new LinkedBlockingQueue<>();
private AtomicReference<Exception> exception = new AtomicReference<>(); private final AtomicReference<Exception> exception = new AtomicReference<>();
public TestMessageHandler(WebSocketSession session) { public TestMessageHandler(WebSocketSession session) {