From 2f371e5aeb8ece6c6b3939f1ad8faba5539b38fd Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 10 Jul 2014 09:26:39 -0400 Subject: [PATCH] Add ExecutorChannelInterceptor Issue: SPR-11968 --- .../support/ExecutorChannelInterceptor.java | 47 +++++++ .../support/ExecutorSubscribableChannel.java | 131 ++++++++++++++++-- .../ExecutorSubscribableChannelTests.java | 127 ++++++++++++++++- 3 files changed, 294 insertions(+), 11 deletions(-) create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorChannelInterceptor.java diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorChannelInterceptor.java b/spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorChannelInterceptor.java new file mode 100644 index 0000000000..7942855e6f --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorChannelInterceptor.java @@ -0,0 +1,47 @@ +package org.springframework.messaging.support; + +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +/** + * An extension of {@link ChannelInterceptor} with callbacks to intercept the + * asynchronous sending of a {@link org.springframework.messaging.Message} to a + * specific subscriber through an {@link java.util.concurrent.Executor}. + * Supported on {@link org.springframework.messaging.MessageChannel} + * implementations that can be configured with an Executor. + * + * @author Rossen Stoyanchev + * @since 4.1 + */ +public interface ExecutorChannelInterceptor extends ChannelInterceptor { + + /** + * Invoked inside the {@link Runnable} submitted to the Executor just before + * calling the target MessageHandler to handle the message. Allows for + * modification of the Message if necessary or when {@code null} is returned + * the MessageHandler is not invoked. + * + * @param message the message to be handled + * @param channel the channel on which the message was sent to + * @param handler the target handler to handle the message + * @return the input message, or a new instance, or {@code null} + */ + Message beforeHandle(Message message, MessageChannel channel, MessageHandler handler); + + /** + * Invoked inside the {@link Runnable} submitted to the Executor after calling + * the target MessageHandler regardless of the outcome (i.e. Exception raised + * or not) thus allowing for proper resource cleanup. + * + *

Note that this will be invoked only if beforeHandle successfully completed + * and returned a Message, i.e. it did not return {@code null}. + * + * @param message the message handled + * @param channel the channel on which the message was sent to + * @param handler the target handler that handled the message + * @param ex any exception that may been raised by the handler + */ + void afterMessageHandled(Message message, MessageChannel channel, MessageHandler handler, Exception ex); + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorSubscribableChannel.java b/spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorSubscribableChannel.java index feea43d07a..42758c72b6 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorSubscribableChannel.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorSubscribableChannel.java @@ -16,10 +16,15 @@ package org.springframework.messaging.support; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.Executor; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageDeliveryException; import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessagingException; import org.springframework.messaging.SubscribableChannel; /** @@ -33,6 +38,8 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel { private final Executor executor; + private final List executorInterceptors = new ArrayList(4); + /** * Create a new {@link ExecutorSubscribableChannel} instance where messages will be sent @@ -57,22 +64,130 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel { return this.executor; } + @Override + public void setInterceptors(List interceptors) { + super.setInterceptors(interceptors); + this.executorInterceptors.clear(); + for (ChannelInterceptor interceptor : interceptors) { + if (interceptor instanceof ExecutorChannelInterceptor) { + this.executorInterceptors.add((ExecutorChannelInterceptor) interceptor); + } + } + } + + @Override + public void addInterceptor(ChannelInterceptor interceptor) { + super.addInterceptor(interceptor); + if (interceptor instanceof ExecutorChannelInterceptor) { + this.executorInterceptors.add((ExecutorChannelInterceptor) interceptor); + } + } + + @Override public boolean sendInternal(final Message message, long timeout) { - for (final MessageHandler handler : getSubscribers()) { + for (MessageHandler subscriber : getSubscribers()) { + ExecutorChannelInterceptorChain chain = new ExecutorChannelInterceptorChain(); + SendTask sendTask = new SendTask(message, this, subscriber, chain); if (this.executor == null) { - handler.handleMessage(message); + sendTask.run(); } else { - this.executor.execute(new Runnable() { - @Override - public void run() { - handler.handleMessage(message); - } - }); + this.executor.execute(sendTask); } } return true; } + + /** + * Helps with the invocation of the target MessageHandler and interceptors. + */ + private static class SendTask implements Runnable { + + private final Message inputMessage; + + private final MessageChannel channel; + + private final MessageHandler handler; + + private final ExecutorChannelInterceptorChain chain; + + + public SendTask(Message message, MessageChannel channel, MessageHandler handler, + ExecutorChannelInterceptorChain chain) { + + this.inputMessage = message; + this.channel = channel; + this.handler = handler; + this.chain = chain; + } + + @Override + public void run() { + Message message = this.inputMessage; + try { + message = chain.applyBeforeHandle(message, this.channel, this.handler); + if (message == null) { + return; + } + this.handler.handleMessage(message); + this.chain.triggerAfterMessageHandled(message, this.channel, this.handler, null); + } + catch (Exception ex) { + this.chain.triggerAfterMessageHandled(message, this.channel, this.handler, ex); + if (ex instanceof MessagingException) { + throw (MessagingException) ex; + } + throw new MessageDeliveryException(message, + "Failed to handle message to " + this.channel + " in " + this.handler, ex); + } + catch (Error ex) { + this.chain.triggerAfterMessageHandled(message, this.channel, this.handler, + new MessageDeliveryException(message, + "Failed to handle message to " + this.channel + " in " + this.handler, ex)); + throw ex; + } + } + } + + /** + * Helps with the invocation of configured executor channel interceptors. + */ + private class ExecutorChannelInterceptorChain { + + private int interceptorIndex = -1; + + + public Message applyBeforeHandle(Message message, MessageChannel channel, MessageHandler handler) { + for (ExecutorChannelInterceptor interceptor : executorInterceptors) { + message = interceptor.beforeHandle(message, channel, handler); + if (message == null) { + String name = interceptor.getClass().getSimpleName(); + if (logger.isDebugEnabled()) { + logger.debug(name + " returned null from beforeHandle, i.e. precluding the send."); + } + triggerAfterMessageHandled(message, channel, handler, null); + return null; + } + this.interceptorIndex++; + } + return message; + } + + public void triggerAfterMessageHandled(Message message, MessageChannel channel, + MessageHandler handler, Exception ex) { + + for (int i = this.interceptorIndex; i >= 0; i--) { + ExecutorChannelInterceptor interceptor = executorInterceptors.get(i); + try { + interceptor.afterMessageHandled(message, channel, handler, ex); + } + catch (Throwable ex2) { + logger.error("Exception from afterMessageHandled in " + interceptor, ex2); + } + } + } + } + } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/support/ExecutorSubscribableChannelTests.java b/spring-messaging/src/test/java/org/springframework/messaging/support/ExecutorSubscribableChannelTests.java index 4e92934dac..173829aa5d 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/support/ExecutorSubscribableChannelTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/support/ExecutorSubscribableChannelTests.java @@ -26,11 +26,15 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.springframework.core.task.TaskExecutor; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageDeliveryException; import org.springframework.messaging.MessageHandler; +import java.util.concurrent.atomic.AtomicInteger; + import static org.hamcrest.Matchers.*; import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; import static org.mockito.BDDMockito.*; /** @@ -70,21 +74,29 @@ public class ExecutorSubscribableChannelTests { @Test public void sendWithoutExecutor() { + BeforeHandleInterceptor interceptor = new BeforeHandleInterceptor(); + this.channel.addInterceptor(interceptor); this.channel.subscribe(this.handler); this.channel.send(this.message); verify(this.handler).handleMessage(this.message); + assertEquals(1, interceptor.getCounter().get()); + assertTrue(interceptor.wasAfterHandledInvoked()); } @Test public void sendWithExecutor() throws Exception { + BeforeHandleInterceptor interceptor = new BeforeHandleInterceptor(); TaskExecutor executor = mock(TaskExecutor.class); - this.channel = new ExecutorSubscribableChannel(executor); - this.channel.subscribe(this.handler); - this.channel.send(this.message); + ExecutorSubscribableChannel testChannel = new ExecutorSubscribableChannel(executor); + testChannel.addInterceptor(interceptor); + testChannel.subscribe(this.handler); + testChannel.send(this.message); verify(executor).execute(this.runnableCaptor.capture()); verify(this.handler, never()).handleMessage(this.message); this.runnableCaptor.getValue().run(); verify(this.handler).handleMessage(this.message); + assertEquals(1, interceptor.getCounter().get()); + assertTrue(interceptor.wasAfterHandledInvoked()); } @Test @@ -128,4 +140,113 @@ public class ExecutorSubscribableChannelTests { verify(this.handler).handleMessage(this.message); } + @Test + public void interceptorWithModifiedMessage() { + Message expected = mock(Message.class); + BeforeHandleInterceptor interceptor = new BeforeHandleInterceptor(); + interceptor.setMessageToReturn(expected); + this.channel.addInterceptor(interceptor); + this.channel.subscribe(this.handler); + this.channel.send(this.message); + verify(this.handler).handleMessage(expected); + assertEquals(1, interceptor.getCounter().get()); + assertTrue(interceptor.wasAfterHandledInvoked()); + } + + @Test + public void interceptorWithNull() { + BeforeHandleInterceptor interceptor1 = new BeforeHandleInterceptor(); + NullReturningBeforeHandleInterceptor interceptor2 = new NullReturningBeforeHandleInterceptor(); + this.channel.addInterceptor(interceptor1); + this.channel.addInterceptor(interceptor2); + this.channel.subscribe(this.handler); + this.channel.send(this.message); + verifyNoMoreInteractions(this.handler); + assertEquals(1, interceptor1.getCounter().get()); + assertEquals(1, interceptor2.getCounter().get()); + assertTrue(interceptor1.wasAfterHandledInvoked()); + } + + @Test + public void interceptorWithException() { + IllegalStateException expected = new IllegalStateException("Fake exception"); + doThrow(expected).when(this.handler).handleMessage(this.message); + BeforeHandleInterceptor interceptor = new BeforeHandleInterceptor(); + this.channel.addInterceptor(interceptor); + this.channel.subscribe(this.handler); + try { + this.channel.send(this.message); + } + catch (MessageDeliveryException actual) { + assertSame(expected, actual.getCause()); + } + verify(this.handler).handleMessage(this.message); + assertEquals(1, interceptor.getCounter().get()); + assertTrue(interceptor.wasAfterHandledInvoked()); + } + + + private abstract static class AbstractTestInterceptor extends ChannelInterceptorAdapter + implements ExecutorChannelInterceptor { + + private AtomicInteger counter = new AtomicInteger(); + + private volatile boolean afterHandledInvoked; + + + public AtomicInteger getCounter() { + return this.counter; + } + + public boolean wasAfterHandledInvoked() { + return this.afterHandledInvoked; + } + + @Override + public Message beforeHandle(Message message, MessageChannel channel, MessageHandler handler) { + assertNotNull(message); + counter.incrementAndGet(); + return message; + } + + @Override + public void afterMessageHandled(Message message, MessageChannel channel, MessageHandler handler, Exception ex) { + this.afterHandledInvoked = true; + } + } + + private static class BeforeHandleInterceptor extends AbstractTestInterceptor { + + private Message messageToReturn; + + private RuntimeException exceptionToRaise; + + + public void setMessageToReturn(Message messageToReturn) { + this.messageToReturn = messageToReturn; + } + + public void setExceptionToRaise(RuntimeException exception) { + this.exceptionToRaise = exception; + } + + @Override + public Message beforeHandle(Message message, MessageChannel channel, MessageHandler handler) { + super.beforeHandle(message, channel, handler); + if (this.exceptionToRaise != null) { + throw this.exceptionToRaise; + } + return (this.messageToReturn != null ? this.messageToReturn : message); + } + } + + private static class NullReturningBeforeHandleInterceptor extends AbstractTestInterceptor { + + @Override + public Message beforeHandle(Message message, MessageChannel channel, MessageHandler handler) { + super.beforeHandle(message, channel, handler); + return null; + } + } + }