diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorChannelInterceptor.java b/spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorChannelInterceptor.java
new file mode 100644
index 0000000000..7942855e6f
--- /dev/null
+++ b/spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorChannelInterceptor.java
@@ -0,0 +1,47 @@
+package org.springframework.messaging.support;
+
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+
+/**
+ * An extension of {@link ChannelInterceptor} with callbacks to intercept the
+ * asynchronous sending of a {@link org.springframework.messaging.Message} to a
+ * specific subscriber through an {@link java.util.concurrent.Executor}.
+ * Supported on {@link org.springframework.messaging.MessageChannel}
+ * implementations that can be configured with an Executor.
+ *
+ * @author Rossen Stoyanchev
+ * @since 4.1
+ */
+public interface ExecutorChannelInterceptor extends ChannelInterceptor {
+
+ /**
+ * Invoked inside the {@link Runnable} submitted to the Executor just before
+ * calling the target MessageHandler to handle the message. Allows for
+ * modification of the Message if necessary or when {@code null} is returned
+ * the MessageHandler is not invoked.
+ *
+ * @param message the message to be handled
+ * @param channel the channel on which the message was sent to
+ * @param handler the target handler to handle the message
+ * @return the input message, or a new instance, or {@code null}
+ */
+ Message> beforeHandle(Message> message, MessageChannel channel, MessageHandler handler);
+
+ /**
+ * Invoked inside the {@link Runnable} submitted to the Executor after calling
+ * the target MessageHandler regardless of the outcome (i.e. Exception raised
+ * or not) thus allowing for proper resource cleanup.
+ *
+ *
Note that this will be invoked only if beforeHandle successfully completed
+ * and returned a Message, i.e. it did not return {@code null}.
+ *
+ * @param message the message handled
+ * @param channel the channel on which the message was sent to
+ * @param handler the target handler that handled the message
+ * @param ex any exception that may been raised by the handler
+ */
+ void afterMessageHandled(Message> message, MessageChannel channel, MessageHandler handler, Exception ex);
+
+}
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorSubscribableChannel.java b/spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorSubscribableChannel.java
index feea43d07a..42758c72b6 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorSubscribableChannel.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorSubscribableChannel.java
@@ -16,10 +16,15 @@
package org.springframework.messaging.support;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.Executor;
import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
/**
@@ -33,6 +38,8 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
private final Executor executor;
+ private final List executorInterceptors = new ArrayList(4);
+
/**
* Create a new {@link ExecutorSubscribableChannel} instance where messages will be sent
@@ -57,22 +64,130 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
return this.executor;
}
+ @Override
+ public void setInterceptors(List interceptors) {
+ super.setInterceptors(interceptors);
+ this.executorInterceptors.clear();
+ for (ChannelInterceptor interceptor : interceptors) {
+ if (interceptor instanceof ExecutorChannelInterceptor) {
+ this.executorInterceptors.add((ExecutorChannelInterceptor) interceptor);
+ }
+ }
+ }
+
+ @Override
+ public void addInterceptor(ChannelInterceptor interceptor) {
+ super.addInterceptor(interceptor);
+ if (interceptor instanceof ExecutorChannelInterceptor) {
+ this.executorInterceptors.add((ExecutorChannelInterceptor) interceptor);
+ }
+ }
+
+
@Override
public boolean sendInternal(final Message> message, long timeout) {
- for (final MessageHandler handler : getSubscribers()) {
+ for (MessageHandler subscriber : getSubscribers()) {
+ ExecutorChannelInterceptorChain chain = new ExecutorChannelInterceptorChain();
+ SendTask sendTask = new SendTask(message, this, subscriber, chain);
if (this.executor == null) {
- handler.handleMessage(message);
+ sendTask.run();
}
else {
- this.executor.execute(new Runnable() {
- @Override
- public void run() {
- handler.handleMessage(message);
- }
- });
+ this.executor.execute(sendTask);
}
}
return true;
}
+
+ /**
+ * Helps with the invocation of the target MessageHandler and interceptors.
+ */
+ private static class SendTask implements Runnable {
+
+ private final Message> inputMessage;
+
+ private final MessageChannel channel;
+
+ private final MessageHandler handler;
+
+ private final ExecutorChannelInterceptorChain chain;
+
+
+ public SendTask(Message> message, MessageChannel channel, MessageHandler handler,
+ ExecutorChannelInterceptorChain chain) {
+
+ this.inputMessage = message;
+ this.channel = channel;
+ this.handler = handler;
+ this.chain = chain;
+ }
+
+ @Override
+ public void run() {
+ Message> message = this.inputMessage;
+ try {
+ message = chain.applyBeforeHandle(message, this.channel, this.handler);
+ if (message == null) {
+ return;
+ }
+ this.handler.handleMessage(message);
+ this.chain.triggerAfterMessageHandled(message, this.channel, this.handler, null);
+ }
+ catch (Exception ex) {
+ this.chain.triggerAfterMessageHandled(message, this.channel, this.handler, ex);
+ if (ex instanceof MessagingException) {
+ throw (MessagingException) ex;
+ }
+ throw new MessageDeliveryException(message,
+ "Failed to handle message to " + this.channel + " in " + this.handler, ex);
+ }
+ catch (Error ex) {
+ this.chain.triggerAfterMessageHandled(message, this.channel, this.handler,
+ new MessageDeliveryException(message,
+ "Failed to handle message to " + this.channel + " in " + this.handler, ex));
+ throw ex;
+ }
+ }
+ }
+
+ /**
+ * Helps with the invocation of configured executor channel interceptors.
+ */
+ private class ExecutorChannelInterceptorChain {
+
+ private int interceptorIndex = -1;
+
+
+ public Message> applyBeforeHandle(Message> message, MessageChannel channel, MessageHandler handler) {
+ for (ExecutorChannelInterceptor interceptor : executorInterceptors) {
+ message = interceptor.beforeHandle(message, channel, handler);
+ if (message == null) {
+ String name = interceptor.getClass().getSimpleName();
+ if (logger.isDebugEnabled()) {
+ logger.debug(name + " returned null from beforeHandle, i.e. precluding the send.");
+ }
+ triggerAfterMessageHandled(message, channel, handler, null);
+ return null;
+ }
+ this.interceptorIndex++;
+ }
+ return message;
+ }
+
+ public void triggerAfterMessageHandled(Message> message, MessageChannel channel,
+ MessageHandler handler, Exception ex) {
+
+ for (int i = this.interceptorIndex; i >= 0; i--) {
+ ExecutorChannelInterceptor interceptor = executorInterceptors.get(i);
+ try {
+ interceptor.afterMessageHandled(message, channel, handler, ex);
+ }
+ catch (Throwable ex2) {
+ logger.error("Exception from afterMessageHandled in " + interceptor, ex2);
+ }
+ }
+ }
+ }
+
}
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/support/ExecutorSubscribableChannelTests.java b/spring-messaging/src/test/java/org/springframework/messaging/support/ExecutorSubscribableChannelTests.java
index 4e92934dac..173829aa5d 100644
--- a/spring-messaging/src/test/java/org/springframework/messaging/support/ExecutorSubscribableChannelTests.java
+++ b/spring-messaging/src/test/java/org/springframework/messaging/support/ExecutorSubscribableChannelTests.java
@@ -26,11 +26,15 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.springframework.core.task.TaskExecutor;
import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
+import java.util.concurrent.atomic.AtomicInteger;
+
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
import static org.mockito.BDDMockito.*;
/**
@@ -70,21 +74,29 @@ public class ExecutorSubscribableChannelTests {
@Test
public void sendWithoutExecutor() {
+ BeforeHandleInterceptor interceptor = new BeforeHandleInterceptor();
+ this.channel.addInterceptor(interceptor);
this.channel.subscribe(this.handler);
this.channel.send(this.message);
verify(this.handler).handleMessage(this.message);
+ assertEquals(1, interceptor.getCounter().get());
+ assertTrue(interceptor.wasAfterHandledInvoked());
}
@Test
public void sendWithExecutor() throws Exception {
+ BeforeHandleInterceptor interceptor = new BeforeHandleInterceptor();
TaskExecutor executor = mock(TaskExecutor.class);
- this.channel = new ExecutorSubscribableChannel(executor);
- this.channel.subscribe(this.handler);
- this.channel.send(this.message);
+ ExecutorSubscribableChannel testChannel = new ExecutorSubscribableChannel(executor);
+ testChannel.addInterceptor(interceptor);
+ testChannel.subscribe(this.handler);
+ testChannel.send(this.message);
verify(executor).execute(this.runnableCaptor.capture());
verify(this.handler, never()).handleMessage(this.message);
this.runnableCaptor.getValue().run();
verify(this.handler).handleMessage(this.message);
+ assertEquals(1, interceptor.getCounter().get());
+ assertTrue(interceptor.wasAfterHandledInvoked());
}
@Test
@@ -128,4 +140,113 @@ public class ExecutorSubscribableChannelTests {
verify(this.handler).handleMessage(this.message);
}
+ @Test
+ public void interceptorWithModifiedMessage() {
+ Message> expected = mock(Message.class);
+ BeforeHandleInterceptor interceptor = new BeforeHandleInterceptor();
+ interceptor.setMessageToReturn(expected);
+ this.channel.addInterceptor(interceptor);
+ this.channel.subscribe(this.handler);
+ this.channel.send(this.message);
+ verify(this.handler).handleMessage(expected);
+ assertEquals(1, interceptor.getCounter().get());
+ assertTrue(interceptor.wasAfterHandledInvoked());
+ }
+
+ @Test
+ public void interceptorWithNull() {
+ BeforeHandleInterceptor interceptor1 = new BeforeHandleInterceptor();
+ NullReturningBeforeHandleInterceptor interceptor2 = new NullReturningBeforeHandleInterceptor();
+ this.channel.addInterceptor(interceptor1);
+ this.channel.addInterceptor(interceptor2);
+ this.channel.subscribe(this.handler);
+ this.channel.send(this.message);
+ verifyNoMoreInteractions(this.handler);
+ assertEquals(1, interceptor1.getCounter().get());
+ assertEquals(1, interceptor2.getCounter().get());
+ assertTrue(interceptor1.wasAfterHandledInvoked());
+ }
+
+ @Test
+ public void interceptorWithException() {
+ IllegalStateException expected = new IllegalStateException("Fake exception");
+ doThrow(expected).when(this.handler).handleMessage(this.message);
+ BeforeHandleInterceptor interceptor = new BeforeHandleInterceptor();
+ this.channel.addInterceptor(interceptor);
+ this.channel.subscribe(this.handler);
+ try {
+ this.channel.send(this.message);
+ }
+ catch (MessageDeliveryException actual) {
+ assertSame(expected, actual.getCause());
+ }
+ verify(this.handler).handleMessage(this.message);
+ assertEquals(1, interceptor.getCounter().get());
+ assertTrue(interceptor.wasAfterHandledInvoked());
+ }
+
+
+ private abstract static class AbstractTestInterceptor extends ChannelInterceptorAdapter
+ implements ExecutorChannelInterceptor {
+
+ private AtomicInteger counter = new AtomicInteger();
+
+ private volatile boolean afterHandledInvoked;
+
+
+ public AtomicInteger getCounter() {
+ return this.counter;
+ }
+
+ public boolean wasAfterHandledInvoked() {
+ return this.afterHandledInvoked;
+ }
+
+ @Override
+ public Message> beforeHandle(Message> message, MessageChannel channel, MessageHandler handler) {
+ assertNotNull(message);
+ counter.incrementAndGet();
+ return message;
+ }
+
+ @Override
+ public void afterMessageHandled(Message> message, MessageChannel channel, MessageHandler handler, Exception ex) {
+ this.afterHandledInvoked = true;
+ }
+ }
+
+ private static class BeforeHandleInterceptor extends AbstractTestInterceptor {
+
+ private Message> messageToReturn;
+
+ private RuntimeException exceptionToRaise;
+
+
+ public void setMessageToReturn(Message> messageToReturn) {
+ this.messageToReturn = messageToReturn;
+ }
+
+ public void setExceptionToRaise(RuntimeException exception) {
+ this.exceptionToRaise = exception;
+ }
+
+ @Override
+ public Message> beforeHandle(Message> message, MessageChannel channel, MessageHandler handler) {
+ super.beforeHandle(message, channel, handler);
+ if (this.exceptionToRaise != null) {
+ throw this.exceptionToRaise;
+ }
+ return (this.messageToReturn != null ? this.messageToReturn : message);
+ }
+ }
+
+ private static class NullReturningBeforeHandleInterceptor extends AbstractTestInterceptor {
+
+ @Override
+ public Message> beforeHandle(Message> message, MessageChannel channel, MessageHandler handler) {
+ super.beforeHandle(message, channel, handler);
+ return null;
+ }
+ }
+
}