Add @MessageExceptionHandler
Similar to @ExceptionHandler but for message processing. Such a method can send messages to both the message broker channel and the client channel provided the client is subscribed to the target destination.
This commit is contained in:
parent
01c4e458c7
commit
55a212d4a0
|
@ -56,13 +56,17 @@ public class ExceptionHandlerMethodResolver {
|
|||
* @param handlerType the type to introspect
|
||||
*/
|
||||
public ExceptionHandlerMethodResolver(Class<?> handlerType) {
|
||||
for (Method method : HandlerMethodSelector.selectMethods(handlerType, EXCEPTION_HANDLER_METHODS)) {
|
||||
for (Method method : HandlerMethodSelector.selectMethods(handlerType, getExceptionHandlerMethods())) {
|
||||
for (Class<? extends Throwable> exceptionType : detectExceptionMappings(method)) {
|
||||
addExceptionMapping(exceptionType, method);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected MethodFilter getExceptionHandlerMethods() {
|
||||
return EXCEPTION_HANDLER_METHODS;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract exception mappings from the {@code @ExceptionHandler} annotation
|
||||
* first and as a fall-back from the method signature.
|
||||
|
@ -71,8 +75,7 @@ public class ExceptionHandlerMethodResolver {
|
|||
private List<Class<? extends Throwable>> detectExceptionMappings(Method method) {
|
||||
List<Class<? extends Throwable>> result = new ArrayList<Class<? extends Throwable>>();
|
||||
|
||||
ExceptionHandler annotation = AnnotationUtils.findAnnotation(method, ExceptionHandler.class);
|
||||
result.addAll(Arrays.asList(annotation.value()));
|
||||
detectAnnotationExceptionMappings(method, result);
|
||||
|
||||
if (result.isEmpty()) {
|
||||
for (Class<?> paramType : method.getParameterTypes()) {
|
||||
|
@ -87,6 +90,11 @@ public class ExceptionHandlerMethodResolver {
|
|||
return result;
|
||||
}
|
||||
|
||||
protected void detectAnnotationExceptionMappings(Method method, List<Class<? extends Throwable>> result) {
|
||||
ExceptionHandler annotation = AnnotationUtils.findAnnotation(method, ExceptionHandler.class);
|
||||
result.addAll(Arrays.asList(annotation.value()));
|
||||
}
|
||||
|
||||
private void addExceptionMapping(Class<? extends Throwable> exceptionType, Method method) {
|
||||
Method oldMethod = this.mappedMethods.put(exceptionType, method);
|
||||
if (oldMethod != null && !oldMethod.equals(method)) {
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.web.messaging.annotation;
|
||||
|
||||
import java.lang.annotation.Documented;
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
|
||||
/**
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 4.0
|
||||
*/
|
||||
@Target(ElementType.METHOD)
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Documented
|
||||
public @interface MessageExceptionHandler {
|
||||
|
||||
/**
|
||||
* Exceptions handled by the annotation method. If empty, will default
|
||||
* to any exceptions listed in the method argument list.
|
||||
*/
|
||||
Class<? extends Throwable>[] value() default {};
|
||||
|
||||
}
|
|
@ -24,6 +24,7 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
|
@ -69,6 +70,9 @@ public class AnnotationPubSubMessageHandler<M extends Message> extends AbstractP
|
|||
|
||||
private Map<MappingInfo, HandlerMethod> unsubscribeMethods = new HashMap<MappingInfo, HandlerMethod>();
|
||||
|
||||
private final Map<Class<?>, MessageExceptionHandlerMethodResolver> exceptionHandlerCache =
|
||||
new ConcurrentHashMap<Class<?>, MessageExceptionHandlerMethodResolver>(64);
|
||||
|
||||
private ArgumentResolverComposite<M> argumentResolvers = new ArgumentResolverComposite<M>();
|
||||
|
||||
private ReturnValueHandlerComposite<M> returnValueHandlers = new ReturnValueHandlerComposite<M>();
|
||||
|
@ -193,7 +197,7 @@ public class AnnotationPubSubMessageHandler<M extends Message> extends AbstractP
|
|||
|
||||
HandlerMethod handlerMethod = match.createWithResolvedBean();
|
||||
|
||||
// TODO:
|
||||
// TODO: avoid re-creating invocableHandlerMethod
|
||||
InvocableMessageHandlerMethod<M> invocableHandlerMethod = new InvocableMessageHandlerMethod<M>(handlerMethod);
|
||||
invocableHandlerMethod.setMessageMethodArgumentResolvers(this.argumentResolvers);
|
||||
|
||||
|
@ -209,15 +213,46 @@ public class AnnotationPubSubMessageHandler<M extends Message> extends AbstractP
|
|||
|
||||
this.returnValueHandlers.handleReturnValue(value, returnType, message);
|
||||
}
|
||||
catch (Throwable e) {
|
||||
// TODO: send error message, or add @ExceptionHandler-like capability
|
||||
e.printStackTrace();
|
||||
catch (Exception ex) {
|
||||
invokeExceptionHandler(message, handlerMethod, ex);
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
// TODO
|
||||
ex.printStackTrace();
|
||||
}
|
||||
finally {
|
||||
MessageHolder.reset();
|
||||
}
|
||||
}
|
||||
|
||||
private void invokeExceptionHandler(M message, HandlerMethod handlerMethod, Exception ex) {
|
||||
|
||||
InvocableMessageHandlerMethod<M> invocableHandlerMethod;
|
||||
Class<?> beanType = handlerMethod.getBeanType();
|
||||
MessageExceptionHandlerMethodResolver resolver = this.exceptionHandlerCache.get(beanType);
|
||||
if (resolver == null) {
|
||||
resolver = new MessageExceptionHandlerMethodResolver(beanType);
|
||||
this.exceptionHandlerCache.put(beanType, resolver);
|
||||
}
|
||||
|
||||
Method method = resolver.resolveMethod(ex);
|
||||
if (method == null) {
|
||||
logger.error("Unhandled exception", ex);
|
||||
return;
|
||||
}
|
||||
|
||||
invocableHandlerMethod = new InvocableMessageHandlerMethod<M>(handlerMethod.getBean(), method);
|
||||
invocableHandlerMethod.setMessageMethodArgumentResolvers(this.argumentResolvers);
|
||||
|
||||
try {
|
||||
invocableHandlerMethod.invoke(message, ex);
|
||||
}
|
||||
catch (Throwable t) {
|
||||
logger.error("Error while handling exception", t);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
protected HandlerMethod getHandlerMethod(String destination, Map<MappingInfo, HandlerMethod> handlerMethods) {
|
||||
for (MappingInfo key : handlerMethods.keySet()) {
|
||||
for (String mappingDestination : key.getDestinations()) {
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.springframework.web.messaging.service.method;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.springframework.core.GenericTypeResolver;
|
||||
|
@ -58,6 +59,13 @@ public class InvocableMessageHandlerMethod<M extends Message> extends HandlerMet
|
|||
super(handlerMethod);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an instance from a bean instance and a method.
|
||||
*/
|
||||
public InvocableMessageHandlerMethod(Object bean, Method method) {
|
||||
super(bean, method);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a new handler method with the given bean instance, method name and
|
||||
* parameters.
|
||||
|
@ -67,8 +75,9 @@ public class InvocableMessageHandlerMethod<M extends Message> extends HandlerMet
|
|||
* @param parameterTypes the method parameter types
|
||||
* @throws NoSuchMethodException when the method cannot be found
|
||||
*/
|
||||
public InvocableMessageHandlerMethod(
|
||||
Object bean, String methodName, Class<?>... parameterTypes) throws NoSuchMethodException {
|
||||
public InvocableMessageHandlerMethod(Object bean, String methodName, Class<?>... parameterTypes)
|
||||
throws NoSuchMethodException {
|
||||
|
||||
super(bean, methodName, parameterTypes);
|
||||
}
|
||||
|
||||
|
@ -98,9 +107,9 @@ public class InvocableMessageHandlerMethod<M extends Message> extends HandlerMet
|
|||
* @exception Exception raised if no suitable argument resolver can be found, or the
|
||||
* method raised an exception
|
||||
*/
|
||||
public final Object invoke(M message) throws Exception {
|
||||
public final Object invoke(M message, Object... providedArgs) throws Exception {
|
||||
|
||||
Object[] args = getMethodArgumentValues(message);
|
||||
Object[] args = getMethodArgumentValues(message, providedArgs);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
StringBuilder builder = new StringBuilder("Invoking [");
|
||||
|
@ -121,7 +130,7 @@ public class InvocableMessageHandlerMethod<M extends Message> extends HandlerMet
|
|||
/**
|
||||
* Get the method argument values for the current request.
|
||||
*/
|
||||
private Object[] getMethodArgumentValues(M message) throws Exception {
|
||||
private Object[] getMethodArgumentValues(M message, Object... providedArgs) throws Exception {
|
||||
|
||||
MethodParameter[] parameters = getMethodParameters();
|
||||
Object[] args = new Object[parameters.length];
|
||||
|
@ -130,7 +139,7 @@ public class InvocableMessageHandlerMethod<M extends Message> extends HandlerMet
|
|||
parameter.initParameterNameDiscovery(parameterNameDiscoverer);
|
||||
GenericTypeResolver.resolveParameterType(parameter, getBean().getClass());
|
||||
|
||||
args[i] = resolveProvidedArgument(parameter);
|
||||
args[i] = resolveProvidedArgument(parameter, providedArgs);
|
||||
if (args[i] != null) {
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.web.messaging.service.method;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.springframework.core.annotation.AnnotationUtils;
|
||||
import org.springframework.util.ReflectionUtils.MethodFilter;
|
||||
import org.springframework.web.messaging.annotation.MessageExceptionHandler;
|
||||
import org.springframework.web.method.annotation.ExceptionHandlerMethodResolver;
|
||||
|
||||
|
||||
/**
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 4.0
|
||||
*/
|
||||
public class MessageExceptionHandlerMethodResolver extends ExceptionHandlerMethodResolver {
|
||||
|
||||
|
||||
public MessageExceptionHandlerMethodResolver(Class<?> handlerType) {
|
||||
super(handlerType);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected MethodFilter getExceptionHandlerMethods() {
|
||||
return MESSAGE_EXCEPTION_HANDLER_METHODS;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void detectAnnotationExceptionMappings(Method method, List<Class<? extends Throwable>> result) {
|
||||
MessageExceptionHandler annotation = AnnotationUtils.findAnnotation(method, MessageExceptionHandler.class);
|
||||
result.addAll(Arrays.asList(annotation.value()));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A filter for selecting {@code @ExceptionHandler} methods.
|
||||
*/
|
||||
public final static MethodFilter MESSAGE_EXCEPTION_HANDLER_METHODS = new MethodFilter() {
|
||||
|
||||
@Override
|
||||
public boolean matches(Method method) {
|
||||
return AnnotationUtils.findAnnotation(method, MessageExceptionHandler.class) != null;
|
||||
}
|
||||
};
|
||||
}
|
|
@ -24,6 +24,7 @@ import java.util.Map;
|
|||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.http.MediaType;
|
||||
|
@ -147,7 +148,12 @@ public class StompRelayPubSubMessageHandler<M extends Message> extends AbstractP
|
|||
public void stop() {
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
this.running = false;
|
||||
this.tcpClient.close();
|
||||
try {
|
||||
this.tcpClient.close().await(5000, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ package org.springframework.web.messaging.stomp.support;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -29,6 +30,8 @@ import org.springframework.messaging.Message;
|
|||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.util.LinkedMultiValueMap;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
import org.springframework.web.messaging.MessageType;
|
||||
import org.springframework.web.messaging.PubSubChannelRegistry;
|
||||
import org.springframework.web.messaging.converter.CompositeMessageConverter;
|
||||
|
@ -60,7 +63,7 @@ public class StompWebSocketHandler<M extends Message> extends TextWebSocketHandl
|
|||
|
||||
private final StompMessageConverter<M> stompMessageConverter = new StompMessageConverter<M>();
|
||||
|
||||
private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<String, WebSocketSession>();
|
||||
private final Map<String, SessionInfo> sessionInfos = new ConcurrentHashMap<String, SessionInfo>();
|
||||
|
||||
private MessageConverter payloadConverter = new CompositeMessageConverter(null);
|
||||
|
||||
|
@ -78,15 +81,11 @@ public class StompWebSocketHandler<M extends Message> extends TextWebSocketHandl
|
|||
return this.stompMessageConverter;
|
||||
}
|
||||
|
||||
protected WebSocketSession getWebSocketSession(String sessionId) {
|
||||
return this.sessions.get(sessionId);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
|
||||
Assert.notNull(this.outputChannel, "No output channel for STOMP messages.");
|
||||
this.sessions.put(session.getId(), session);
|
||||
this.sessionInfos.put(session.getId(), new SessionInfo(session));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -172,12 +171,23 @@ public class StompWebSocketHandler<M extends Message> extends TextWebSocketHandl
|
|||
}
|
||||
|
||||
protected void handleSubscribe(M message) {
|
||||
|
||||
// TODO: need a way to communicate back if subscription was successfully created or
|
||||
// not in which case an ERROR should be sent back and close the connection
|
||||
// http://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE
|
||||
|
||||
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
|
||||
String sessionId = headers.getSessionId();
|
||||
String destination = headers.getDestination();
|
||||
|
||||
SessionInfo sessionInfo = this.sessionInfos.get(sessionId);
|
||||
sessionInfo.addSubscription(destination, headers.getSubscriptionId());
|
||||
}
|
||||
|
||||
protected void handleUnsubscribe(M message) {
|
||||
|
||||
// TODO: remove subscription
|
||||
|
||||
}
|
||||
|
||||
protected void handleDisconnect(M stompMessage) {
|
||||
|
@ -202,7 +212,7 @@ public class StompWebSocketHandler<M extends Message> extends TextWebSocketHandl
|
|||
|
||||
@Override
|
||||
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
|
||||
this.sessions.remove(session.getId());
|
||||
this.sessionInfos.remove(session.getId());
|
||||
PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.create(MessageType.DISCONNECT);
|
||||
headers.setSessionId(session.getId());
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -228,11 +238,26 @@ public class StompWebSocketHandler<M extends Message> extends TextWebSocketHandl
|
|||
if (sessionId == null) {
|
||||
logger.error("No \"sessionId\" header in message: " + message);
|
||||
}
|
||||
WebSocketSession session = getWebSocketSession(sessionId);
|
||||
|
||||
SessionInfo sessionInfo = this.sessionInfos.get(sessionId);
|
||||
WebSocketSession session = sessionInfo.getWebSocketSession();
|
||||
if (session == null) {
|
||||
logger.error("Session not found: " + message);
|
||||
}
|
||||
|
||||
if (headers.getSubscriptionId() == null) {
|
||||
String destination = headers.getDestination();
|
||||
Set<String> subs = sessionInfo.getSubscriptionsForDestination(destination);
|
||||
if (subs != null) {
|
||||
// TODO: send to all sub ids
|
||||
headers.setSubscriptionId(subs.iterator().next());
|
||||
}
|
||||
else {
|
||||
logger.error("No subscription id: " + message);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
byte[] payload;
|
||||
try {
|
||||
MediaType contentType = headers.getContentType();
|
||||
|
@ -263,4 +288,30 @@ public class StompWebSocketHandler<M extends Message> extends TextWebSocketHandl
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private static class SessionInfo {
|
||||
|
||||
private final WebSocketSession session;
|
||||
|
||||
private final MultiValueMap<String, String> subscriptions = new LinkedMultiValueMap<String, String>(4);
|
||||
|
||||
|
||||
public SessionInfo(WebSocketSession session) {
|
||||
this.session = session;
|
||||
}
|
||||
|
||||
public WebSocketSession getWebSocketSession() {
|
||||
return this.session;
|
||||
}
|
||||
|
||||
public void addSubscription(String destination, String subscriptionId) {
|
||||
this.subscriptions.add(destination, subscriptionId);
|
||||
}
|
||||
|
||||
public Set<String> getSubscriptionsForDestination(String destination) {
|
||||
List<String> ids = this.subscriptions.get(destination);
|
||||
return (ids != null) ? new HashSet<String>(ids) : null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue