Polish ExecutorSubscribableChannel

Consolidate inner classes used to invoke a subscriber with
interceptors.
This commit is contained in:
Rossen Stoyanchev 2014-09-29 22:23:45 -04:00
parent 803fc20019
commit 84137ab986
1 changed files with 46 additions and 65 deletions

View File

@ -21,7 +21,6 @@ import java.util.List;
import java.util.concurrent.Executor;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
@ -85,10 +84,9 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
@Override
public boolean sendInternal(final Message<?> message, long timeout) {
for (MessageHandler subscriber : getSubscribers()) {
ExecutorChannelInterceptorChain chain = new ExecutorChannelInterceptorChain();
SendTask sendTask = new SendTask(message, this, subscriber, chain);
public boolean sendInternal(Message<?> message, long timeout) {
for (MessageHandler handler : getSubscribers()) {
SendTask sendTask = new SendTask(message, handler);
if (this.executor == null) {
sendTask.run();
}
@ -101,21 +99,57 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
/**
* Helps with the invocation of configured executor channel interceptors.
* Invoke a MessageHandler with ExecutorChannelInterceptor's.
*/
private class ExecutorChannelInterceptorChain {
private class SendTask implements Runnable {
private final Message<?> inputMessage;
private final MessageHandler handler;
private int interceptorIndex = -1;
public Message<?> applyBeforeHandle(Message<?> message, MessageChannel channel, MessageHandler handler) {
public SendTask(Message<?> message, MessageHandler handler) {
this.inputMessage = message;
this.handler = handler;
}
@Override
public void run() {
Message<?> message = this.inputMessage;
try {
message = applyBeforeHandle(message);
if (message == null) {
return;
}
this.handler.handleMessage(message);
triggerAfterMessageHandled(message, null);
}
catch (Exception ex) {
triggerAfterMessageHandled(message, ex);
if (ex instanceof MessagingException) {
throw (MessagingException) ex;
}
String description = "Failed to handle " + message + " to " + this + " in " + this.handler;
throw new MessageDeliveryException(message, description, ex);
}
catch (Error ex) {
String description = "Failed to handle " + message + " to " + this + " in " + this.handler;
triggerAfterMessageHandled(message, new MessageDeliveryException(message, description, ex));
throw ex;
}
}
private Message<?> applyBeforeHandle(Message<?> message) {
for (ExecutorChannelInterceptor interceptor : executorInterceptors) {
message = interceptor.beforeHandle(message, channel, handler);
message = interceptor.beforeHandle(message, ExecutorSubscribableChannel.this, this.handler);
if (message == null) {
String name = interceptor.getClass().getSimpleName();
if (logger.isDebugEnabled()) {
logger.debug(name + " returned null from beforeHandle, i.e. precluding the send.");
}
triggerAfterMessageHandled(message, channel, handler, null);
triggerAfterMessageHandled(message, null);
return null;
}
this.interceptorIndex++;
@ -123,13 +157,11 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
return message;
}
public void triggerAfterMessageHandled(Message<?> message, MessageChannel channel,
MessageHandler handler, Exception ex) {
private void triggerAfterMessageHandled(Message<?> message, Exception ex) {
for (int i = this.interceptorIndex; i >= 0; i--) {
ExecutorChannelInterceptor interceptor = executorInterceptors.get(i);
try {
interceptor.afterMessageHandled(message, channel, handler, ex);
interceptor.afterMessageHandled(message, ExecutorSubscribableChannel.this, this.handler, ex);
}
catch (Throwable ex2) {
logger.error("Exception from afterMessageHandled in " + interceptor, ex2);
@ -138,55 +170,4 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
}
}
/**
* Helps with the invocation of the target MessageHandler and interceptors.
*/
private static class SendTask implements Runnable {
private final Message<?> inputMessage;
private final MessageChannel channel;
private final MessageHandler handler;
private final ExecutorChannelInterceptorChain chain;
public SendTask(Message<?> message, MessageChannel channel, MessageHandler handler,
ExecutorChannelInterceptorChain chain) {
this.inputMessage = message;
this.channel = channel;
this.handler = handler;
this.chain = chain;
}
@Override
public void run() {
Message<?> message = this.inputMessage;
try {
message = chain.applyBeforeHandle(message, this.channel, this.handler);
if (message == null) {
return;
}
this.handler.handleMessage(message);
this.chain.triggerAfterMessageHandled(message, this.channel, this.handler, null);
}
catch (Exception ex) {
this.chain.triggerAfterMessageHandled(message, this.channel, this.handler, ex);
if (ex instanceof MessagingException) {
throw (MessagingException) ex;
}
throw new MessageDeliveryException(message,
"Failed to handle message to " + this.channel + " in " + this.handler, ex);
}
catch (Error ex) {
this.chain.triggerAfterMessageHandled(message, this.channel, this.handler,
new MessageDeliveryException(message,
"Failed to handle message to " + this.channel + " in " + this.handler, ex));
throw ex;
}
}
}
}