From f98719902b37144d0cf6348b2c66f8b72d293de8 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 28 Aug 2020 09:43:01 +0100 Subject: [PATCH] Revert "Polishing OrderedMessageSender" This reverts commit d014d88937356d21db3b10d3c2dc0650c7f168b5. See gh-25581 --- .../simp/broker/OrderedMessageSender.java | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/OrderedMessageSender.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/OrderedMessageSender.java index e9586327caf..c7652d4eb43 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/OrderedMessageSender.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/OrderedMessageSender.java @@ -52,7 +52,7 @@ import org.springframework.util.Assert; */ class OrderedMessageSender implements MessageChannel { - private static final CompletionTaskInterceptor COMPLETION_INTERCEPTOR = new CompletionTaskInterceptor(); + static final String COMPLETION_TASK_HEADER = "simpSendCompletionTask"; private final MessageChannel channel; @@ -101,7 +101,7 @@ class OrderedMessageSender implements MessageChannel { } this.control.removeMessage(message); try { - CompletionTaskInterceptor.instrumentMessage(message, () -> { + getMutableAccessor(message).setHeader(COMPLETION_TASK_HEADER, (Runnable) () -> { this.control.releaseSessionLock(sessionId); if (this.control.hasRemainingWork()) { trySend(); @@ -130,6 +130,13 @@ class OrderedMessageSender implements MessageChannel { } } + private SimpMessageHeaderAccessor getMutableAccessor(Message message) { + SimpMessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class); + Assert.isTrue(accessor != null && accessor.isMutable(), "Expected mutable SimpMessageHeaderAccessor"); + return accessor; + } + + /** * Install or remove an {@link ExecutorChannelInterceptor} that invokes a * completion task once the message is handled. @@ -141,14 +148,14 @@ class OrderedMessageSender implements MessageChannel { if (preservePublishOrder) { Assert.isInstanceOf(ExecutorSubscribableChannel.class, channel, "An ExecutorSubscribableChannel is required for `preservePublishOrder`"); - ExecutorSubscribableChannel executorChannel = (ExecutorSubscribableChannel) channel; - if (executorChannel.getInterceptors().stream().noneMatch(i -> i == COMPLETION_INTERCEPTOR)) { - executorChannel.addInterceptor(0, COMPLETION_INTERCEPTOR); + ExecutorSubscribableChannel execChannel = (ExecutorSubscribableChannel) channel; + if (execChannel.getInterceptors().stream().noneMatch(i -> i instanceof CompletionTaskInterceptor)) { + execChannel.addInterceptor(0, new CompletionTaskInterceptor()); } } else if (channel instanceof ExecutorSubscribableChannel) { ExecutorSubscribableChannel execChannel = (ExecutorSubscribableChannel) channel; - execChannel.getInterceptors().stream().filter(i -> i == COMPLETION_INTERCEPTOR) + execChannel.getInterceptors().stream().filter(i -> i instanceof CompletionTaskInterceptor) .findFirst() .map(execChannel::removeInterceptor); @@ -193,7 +200,10 @@ class OrderedMessageSender implements MessageChannel { } public boolean acquireSessionLock(String sessionId) { - return (this.sessionsInProgress.put(sessionId, Boolean.TRUE) == null); + if (this.sessionsInProgress.put(sessionId, Boolean.TRUE) != null) { + return false; + } + return true; } public void releaseSessionLock(String sessionId) { @@ -213,23 +223,15 @@ class OrderedMessageSender implements MessageChannel { private static class CompletionTaskInterceptor implements ExecutorChannelInterceptor { - private static final String COMPLETION_TASK_HEADER = "simpSendCompletionTask"; - @Override public void afterMessageHandled( Message message, MessageChannel ch, MessageHandler handler, @Nullable Exception ex) { - Runnable task = (Runnable) message.getHeaders().get(COMPLETION_TASK_HEADER); + Runnable task = (Runnable) message.getHeaders().get(OrderedMessageSender.COMPLETION_TASK_HEADER); if (task != null) { task.run(); } } - - public static void instrumentMessage(Message message, Runnable task) { - SimpMessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class); - Assert.isTrue(accessor != null && accessor.isMutable(), "Expected a mutable SimpMessageHeaderAccessor"); - accessor.setHeader(COMPLETION_TASK_HEADER, task); - } } }