From df5d22e1203eb342afd2668aead549a0c816515f Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 12 Nov 2013 16:34:25 -0500 Subject: [PATCH] Improve logging in spring-messaging Before this change the amount of logging was too little or too much with TRACE turned on. This change separates useful debugging information and logs it under DEBUG and leaves more detailed information to be logged under TRACE. --- .../method/AbstractMethodMessageHandler.java | 22 ++++++-- .../messaging/simp/SimpMessagingTemplate.java | 2 +- ...cketMessageBrokerConfigurationSupport.java | 15 ++++-- .../handler/AbstractBrokerMessageHandler.java | 8 +-- .../handler/AbstractSubscriptionRegistry.java | 9 ++-- .../DefaultUserDestinationResolver.java | 32 ++++++++---- .../SimpAnnotationMethodMessageHandler.java | 7 --- .../handler/SimpleBrokerMessageHandler.java | 8 +-- .../UserDestinationMessageHandler.java | 11 +--- .../stomp/StompBrokerRelayMessageHandler.java | 36 ++++++++++--- .../messaging/simp/stomp/StompDecoder.java | 14 +++--- .../messaging/simp/stomp/StompEncoder.java | 19 ++++--- .../simp/stomp/StompProtocolHandler.java | 11 ++-- .../channel/AbstractMessageChannel.java | 3 +- .../channel/ChannelInterceptorChain.java | 7 +-- .../channel/ExecutorSubscribableChannel.java | 5 -- .../support/tcp/ReactorNettyTcpClient.java | 10 ++++ .../messaging/StubMessageChannel.java | 50 +++++++++++++++++++ .../UserDestinationMessageHandlerTests.java | 3 ++ .../StompBrokerRelayMessageHandlerTests.java | 28 +---------- .../LoggingWebSocketHandlerDecorator.java | 4 +- 21 files changed, 187 insertions(+), 117 deletions(-) create mode 100644 spring-messaging/src/test/java/org/springframework/messaging/StubMessageChannel.java diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/method/AbstractMethodMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/method/AbstractMethodMessageHandler.java index 70220df6495..4dd3734d7a9 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/method/AbstractMethodMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/method/AbstractMethodMessageHandler.java @@ -318,17 +318,21 @@ public abstract class AbstractMethodMessageHandler public void handleMessage(Message message) throws MessagingException { String destination = getDestination(message); - String lookupDestination = getLookupDestination(destination); + if (destination == null) { + logger.trace("Ignoring message, no destination"); + return; + } + String lookupDestination = getLookupDestination(destination); if (lookupDestination == null) { if (logger.isTraceEnabled()) { - logger.trace("Ignoring message with destination=" + destination); + logger.trace("Ignoring message to destination=" + destination); } return; } - if (logger.isTraceEnabled()) { - logger.trace("Handling message " + message); + if (logger.isDebugEnabled()) { + logger.debug("Handling message, lookupDestination=" + lookupDestination); } message = MessageBuilder.fromMessage(message).setHeader(LOOKUP_DESTINATION_HEADER, lookupDestination).build(); @@ -438,6 +442,10 @@ public abstract class AbstractMethodMessageHandler protected void handleMatch(T mapping, HandlerMethod handlerMethod, String lookupDestination, Message message) { + if (logger.isDebugEnabled()) { + logger.debug("Message matched to " + handlerMethod); + } + handlerMethod = handlerMethod.createWithResolvedBean(); InvocableHandlerMethod invocable = new InvocableHandlerMethod(handlerMethod); invocable.setMessageMethodArgumentResolvers(this.argumentResolvers); @@ -495,7 +503,11 @@ public abstract class AbstractMethodMessageHandler protected abstract AbstractExceptionHandlerMethodResolver createExceptionHandlerMethodResolverFor(Class beanType); - protected abstract void handleNoMatch(Set ts, String lookupDestination, Message message); + protected void handleNoMatch(Set ts, String lookupDestination, Message message) { + if (logger.isDebugEnabled()) { + logger.debug("No matching method found"); + } + } /** diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessagingTemplate.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessagingTemplate.java index e32d0e4e150..d36fe79324d 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessagingTemplate.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessagingTemplate.java @@ -96,7 +96,7 @@ public class SimpMessagingTemplate extends AbstractMessageSendingTemplate message) { - if (!this.running) { if (logger.isTraceEnabled()) { - logger.trace("STOMP broker relay not running. Ignoring message id=" + message.getHeaders().getId()); + logger.trace("Message broker is not running. Ignoring message id=" + message.getHeaders().getId()); } return; } - - if (logger.isTraceEnabled()) { - logger.trace("Handling message " + message); - } - handleMessageInternal(message); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AbstractSubscriptionRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AbstractSubscriptionRegistry.java index 0da9e7a521f..0c1d55d6aa2 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AbstractSubscriptionRegistry.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AbstractSubscriptionRegistry.java @@ -97,20 +97,17 @@ public abstract class AbstractSubscriptionRegistry implements SubscriptionRegist public final MultiValueMap findSubscriptions(Message message) { SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message); if (!SimpMessageType.MESSAGE.equals(headers.getMessageType())) { - logger.error("Unexpected message type: " + message); + logger.trace("Ignoring message type " + headers.getMessageType()); return null; } String destination = headers.getDestination(); if (destination == null) { - logger.error("Ignoring destination. No destination in message: " + message); + logger.trace("Ignoring message, no destination"); return null; } - if (logger.isTraceEnabled()) { - logger.trace("Find subscriptions, destination=" + headers.getDestination()); - } MultiValueMap result = findSubscriptionsInternal(destination, message); if (logger.isTraceEnabled()) { - logger.trace("Found " + result.size() + " subscriptions"); + logger.trace("Found " + result.size() + " subscriptions for destination=" + headers.getDestination()); } return result; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/DefaultUserDestinationResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/DefaultUserDestinationResolver.java index d5a3e88b2f6..a3e55f0a67b 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/DefaultUserDestinationResolver.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/DefaultUserDestinationResolver.java @@ -120,10 +120,6 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver { private UserDestinationInfo getUserDestinationInfo(SimpMessageHeaderAccessor headers) { String destination = headers.getDestination(); - if (destination == null) { - logger.trace("Ignoring message, no destination"); - return null; - } String targetUser; String targetDestination; @@ -132,20 +128,18 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver { SimpMessageType messageType = headers.getMessageType(); if (SimpMessageType.SUBSCRIBE.equals(messageType) || SimpMessageType.UNSUBSCRIBE.equals(messageType)) { - if (user == null) { - logger.trace("Ignoring (un)subscribe message, no user information"); + if (!checkDestination(destination, this.subscriptionDestinationPrefix)) { return null; } - if (!destination.startsWith(this.subscriptionDestinationPrefix)) { - logger.trace("Ignoring (un)subscribe message, not a \"user\" destination"); + if (user == null) { + logger.warn("Ignoring message, no user information"); return null; } targetUser = user.getName(); targetDestination = destination.substring(this.destinationPrefix.length()-1); } else if (SimpMessageType.MESSAGE.equals(messageType)) { - if (!destination.startsWith(this.destinationPrefix)) { - logger.trace("Ignoring message, not a \"user\" destination"); + if (!checkDestination(destination, this.destinationPrefix)) { return null; } int startIndex = this.destinationPrefix.length(); @@ -156,13 +150,29 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver { } else { - logger.trace("Ignoring message, not of the right message type"); + if (logger.isTraceEnabled()) { + logger.trace("Ignoring " + messageType + " message"); + } return null; } return new UserDestinationInfo(targetUser, targetDestination); } + protected boolean checkDestination(String destination, String requiredPrefix) { + if (destination == null) { + logger.trace("Ignoring message, no destination"); + return false; + } + if (!destination.startsWith(requiredPrefix)) { + if (logger.isTraceEnabled()) { + logger.trace("Ignoring message to " + destination + ", not a \"user\" destination"); + } + return false; + } + return true; + } + private static class UserDestinationInfo { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpAnnotationMethodMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpAnnotationMethodMessageHandler.java index a0dccc54fb2..8a990af1572 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpAnnotationMethodMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpAnnotationMethodMessageHandler.java @@ -295,13 +295,6 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan super.handleMatch(mapping, handlerMethod, lookupDestination, message); } - @Override - protected void handleNoMatch(Set set, String lookupDestination, Message message) { - if (logger.isTraceEnabled()) { - logger.trace("No match for " + lookupDestination); - } - } - @Override protected AbstractExceptionHandlerMethodResolver createExceptionHandlerMethodResolverFor(Class beanType) { return new AnnotationExceptionHandlerMethodResolver(beanType); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandler.java index efa57e3797f..192aa6adeda 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandler.java @@ -83,7 +83,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { if (!checkDestinationPrefix(destination)) { if (logger.isTraceEnabled()) { - logger.trace("Ingoring message with destination " + destination); + logger.trace("Ingoring message to destination=" + destination); } return; } @@ -113,13 +113,15 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { protected void sendMessageToSubscribers(String destination, Message message) { MultiValueMap subscriptions = this.subscriptionRegistry.findSubscriptions(message); + if ((subscriptions.size() > 0) && logger.isDebugEnabled()) { + logger.debug("Sending message with destination=" + destination + + " to " + subscriptions.size() + " subscriber(s)"); + } for (String sessionId : subscriptions.keySet()) { for (String subscriptionId : subscriptions.get(sessionId)) { - SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message); headers.setSessionId(sessionId); headers.setSubscriptionId(subscriptionId); - Object payload = message.getPayload(); Message clientMessage = MessageBuilder.withPayload(payload).setHeaders(headers).build(); try { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java index d1da249399c..ab499f450a6 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java @@ -87,22 +87,15 @@ public class UserDestinationMessageHandler implements MessageHandler { @Override public void handleMessage(Message message) throws MessagingException { - if (logger.isTraceEnabled()) { - logger.trace("Handling message " + message); - } - Set destinations = this.userDestinationResolver.resolveDestination(message); if (CollectionUtils.isEmpty(destinations)) { return; } for (String targetDestination : destinations) { - if (logger.isTraceEnabled()) { - logger.trace("Sending message to resolved user destination: " + targetDestination); + if (logger.isDebugEnabled()) { + logger.debug("Sending message to resolved destination=" + targetDestination); } - SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message); - headers.setDestination(targetDestination); - message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build(); this.messagingTemplate.send(targetDestination, message); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index 20ce19c6059..9c1de3f3c16 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -306,15 +306,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } if (sessionId == null) { - logger.error("No sessionId, ignoring message: " + message); + if (logger.isWarnEnabled()) { + logger.warn("No sessionId, ignoring message: " + message); + } return; } if ((command != null) && command.requiresDestination() && !checkDestinationPrefix(destination)) { + if (logger.isTraceEnabled()) { + logger.trace("Ignoring message to destination=" + destination); + } return; } if (SimpMessageType.CONNECT.equals(messageType)) { + logger.debug("Processing CONNECT in session=" + sessionId); if (getVirtualHost() != null) { headers.setHost(getVirtualHost()); } @@ -335,7 +341,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler else { StompConnectionHandler handler = this.connectionHandlers.get(sessionId); if (handler == null) { - logger.warn("Connection for sessionId=" + sessionId + " not found. Ignoring message: " + message); + logger.warn("Connection for sessionId=" + sessionId + " not found. Ignoring message"); return; } handler.forward(message); @@ -424,11 +430,15 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @Override public void handleMessage(Message message) { - if (logger.isTraceEnabled()) { - logger.trace("Reading message for sessionId=" + this.sessionId + ", " + message); - } StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); + if (SimpMessageType.HEARTBEAT.equals(headers.getMessageType())) { + logger.trace("Received broker heartbeat"); + } + else if (logger.isDebugEnabled()) { + logger.debug("Received broker message in session=" + this.sessionId); + } + if (StompCommand.CONNECTED == headers.getCommand()) { afterStompConnected(headers); } @@ -500,7 +510,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler if (!this.isStompConnected) { if (logger.isWarnEnabled()) { - logger.warn("Connection to broker inactive or not ready, ignoring message=" + message); + logger.warn("Connection to broker inactive or not ready. Ignoring message"); } return new ListenableFutureTask(new Callable() { @Override @@ -510,8 +520,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler }); } - if (logger.isTraceEnabled()) { - logger.trace("Forwarding message to broker: " + message); + if (logger.isDebugEnabled()) { + StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); + if (SimpMessageType.HEARTBEAT.equals(headers.getMessageType())) { + logger.trace("Forwarding heartbeat to broker"); + } + else { + logger.debug("Forwarding message to broker"); + } } @SuppressWarnings("unchecked") @@ -548,6 +564,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } + @Override + public String toString() { + return "StompConnectionHandler{" + "sessionId=" + this.sessionId + "}"; + } } private class SystemStompConnectionHandler extends StompConnectionHandler { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompDecoder.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompDecoder.java index 00f3a59b339..7fd310ca1f0 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompDecoder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompDecoder.java @@ -69,14 +69,17 @@ public class StompDecoder { decodedMessage = MessageBuilder.withPayload(payload) .setHeaders(StompHeaderAccessor.create(stompCommand, headers)).build(); + + if (logger.isDebugEnabled()) { + logger.debug("Decoded " + decodedMessage); + } } else { decodedMessage = MessageBuilder.withPayload(HEARTBEAT_PAYLOAD).setHeaders( StompHeaderAccessor.create(SimpMessageType.HEARTBEAT)).build(); - } - - if (logger.isTraceEnabled()) { - logger.trace("Decoded " + decodedMessage); + if (logger.isTraceEnabled()) { + logger.trace("Decoded heartbeat"); + } } return decodedMessage; @@ -101,7 +104,7 @@ public class StompDecoder { if (headerStream.size() > 0) { String header = new String(headerStream.toByteArray(), UTF8_CHARSET); int colonIndex = header.indexOf(':'); - if (colonIndex <= 0 || colonIndex == header.length() - 1) { + if ((colonIndex <= 0) || (colonIndex == header.length() - 1)) { throw new StompConversionException( "Illegal header: '" + header + "'. A header must be of the form : message) { try { - if (logger.isTraceEnabled()) { - logger.trace("Encoding " + message); - } ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream output = new DataOutputStream(baos); @@ -90,7 +88,14 @@ public final class StompEncoder { private void writeHeaders(StompHeaderAccessor headers, Message message, DataOutputStream output) throws IOException { - for (Entry> entry : headers.toStompHeaderMap().entrySet()) { + Map> stompHeaders = headers.toStompHeaderMap(); + if (SimpMessageType.HEARTBEAT.equals(headers.getMessageType())) { + logger.trace("Encoded heartbeat"); + } + else if (logger.isDebugEnabled()) { + logger.debug("Encoded STOMP command=" + headers.getCommand() + " headers=" + stompHeaders); + } + for (Entry> entry : stompHeaders.entrySet()) { byte[] key = getUtf8BytesEscapingIfNecessary(entry.getKey(), headers); for (String value : entry.getValue()) { output.write(key); @@ -99,9 +104,9 @@ public final class StompEncoder { output.write(LF); } } - if (headers.getCommand() == StompCommand.SEND || - headers.getCommand() == StompCommand.MESSAGE || - headers.getCommand() == StompCommand.ERROR) { + if ((headers.getCommand() == StompCommand.SEND) || (headers.getCommand() == StompCommand.MESSAGE) || + (headers.getCommand() == StompCommand.ERROR)) { + output.write("content-length:".getBytes(UTF8_CHARSET)); output.write(Integer.toString(message.getPayload().length).getBytes(UTF8_CHARSET)); output.write(LF); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java index 17e4d51ebeb..4aa333b0b8c 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java @@ -104,12 +104,15 @@ public class StompProtocolHandler implements SubProtocolHandler { return; } - if (logger.isTraceEnabled()) { - logger.trace("Message " + message); - } - try { StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); + if (SimpMessageType.HEARTBEAT.equals(headers.getMessageType())) { + logger.trace("Received heartbeat from client session=" + session.getId()); + } + else { + logger.trace("Received message from client session=" + session.getId()); + } + headers.setSessionId(session.getId()); headers.setUser(session.getPrincipal()); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/channel/AbstractMessageChannel.java b/spring-messaging/src/main/java/org/springframework/messaging/support/channel/AbstractMessageChannel.java index 8f24167b4bc..81416e79311 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/channel/AbstractMessageChannel.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/channel/AbstractMessageChannel.java @@ -102,8 +102,9 @@ public abstract class AbstractMessageChannel implements MessageChannel, BeanName public final boolean send(Message message, long timeout) { Assert.notNull(message, "Message must not be null"); + if (logger.isTraceEnabled()) { - logger.trace("[" + this.beanName + "] send message " + message); + logger.trace("[" + this.beanName + "] sending message id=" + message.getHeaders().getId()); } message = this.interceptorChain.preSend(message, this); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/channel/ChannelInterceptorChain.java b/spring-messaging/src/main/java/org/springframework/messaging/support/channel/ChannelInterceptorChain.java index ab89d2d9ca4..7aba045e050 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/channel/ChannelInterceptorChain.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/channel/ChannelInterceptorChain.java @@ -59,9 +59,6 @@ class ChannelInterceptorChain { public Message preSend(Message message, MessageChannel channel) { UUID originalId = message.getHeaders().getId(); - if (logger.isTraceEnabled()) { - logger.trace("preSend message id " + originalId); - } for (ChannelInterceptor interceptor : this.interceptors) { message = interceptor.preSend(message, channel); if (message == null) { @@ -71,9 +68,9 @@ class ChannelInterceptorChain { return null; } } - if (logger.isTraceEnabled()) { + if (logger.isDebugEnabled()) { if (!message.getHeaders().getId().equals(originalId)) { - logger.trace("preSend returned modified message " + message); + logger.debug("preSend returned modified message, new message id=" + message.getHeaders().getId()); } } return message; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/channel/ExecutorSubscribableChannel.java b/spring-messaging/src/main/java/org/springframework/messaging/support/channel/ExecutorSubscribableChannel.java index 319beaeff96..75d45c26839 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/channel/ExecutorSubscribableChannel.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/channel/ExecutorSubscribableChannel.java @@ -64,11 +64,6 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel { @Override public boolean sendInternal(final Message message, long timeout) { - - if (logger.isTraceEnabled()) { - logger.trace("subscribers " + this.handlers); - } - for (final MessageHandler handler : this.handlers) { if (this.executor == null) { handler.handleMessage(message); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReactorNettyTcpClient.java index 28ad81f9a5a..a38b7923c26 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReactorNettyTcpClient.java @@ -18,6 +18,8 @@ package org.springframework.messaging.support.tcp; import java.net.InetSocketAddress; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.springframework.messaging.Message; import org.springframework.util.Assert; import org.springframework.util.concurrent.ListenableFuture; @@ -48,6 +50,8 @@ import reactor.tuple.Tuple2; */ public class ReactorNettyTcpClient

implements TcpOperations

{ + private final static Log logger = LogFactory.getLog(ReactorNettyTcpClient.class); + private Environment environment; private TcpClient, Message

> tcpClient; @@ -120,6 +124,12 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ connectionHandler.handleMessage(message); } }); + connection.when(Throwable.class, new Consumer() { + @Override + public void accept(Throwable t) { + logger.error("Exception on connection " + connectionHandler, t); + } + }); connectionHandler.afterConnected(new ReactorTcpConnection

(connection)); } }); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/StubMessageChannel.java b/spring-messaging/src/test/java/org/springframework/messaging/StubMessageChannel.java new file mode 100644 index 00000000000..7440cd2cbc4 --- /dev/null +++ b/spring-messaging/src/test/java/org/springframework/messaging/StubMessageChannel.java @@ -0,0 +1,50 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging; + +import java.util.ArrayList; +import java.util.List; + +/** + * A stub MessageChannel that saves all sent messages. + * + * @author Rossen Stoyanchev + */ +public class StubMessageChannel implements MessageChannel { + + private final List> messages = new ArrayList<>(); + + + public List> getMessages() { + return this.messages; + } + + @Override + @SuppressWarnings("unchecked") + public boolean send(Message message) { + this.messages.add((Message) message); + return true; + } + + @Override + @SuppressWarnings("unchecked") + public boolean send(Message message, long timeout) { + this.messages.add((Message) message); + return true; + } + +} diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandlerTests.java index c5324e2c369..7ef9a3ee5e7 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandlerTests.java @@ -16,6 +16,7 @@ package org.springframework.messaging.simp.handler; +import org.apache.activemq.transport.stomp.Stomp; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -25,6 +26,8 @@ import org.springframework.messaging.core.MessageSendingOperations; import org.springframework.messaging.simp.SimpMessageHeaderAccessor; import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.simp.TestPrincipal; +import org.springframework.messaging.simp.stomp.StompCommand; +import org.springframework.messaging.simp.stomp.StompHeaderAccessor; import org.springframework.messaging.support.MessageBuilder; import static org.junit.Assert.assertEquals; diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java index de5adabd29b..0eeff202f02 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java @@ -23,7 +23,7 @@ import java.util.concurrent.Callable; import org.junit.Before; import org.junit.Test; import org.springframework.messaging.Message; -import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.StubMessageChannel; import org.springframework.messaging.simp.SimpMessageHeaderAccessor; import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.support.MessageBuilder; @@ -47,16 +47,13 @@ public class StompBrokerRelayMessageHandlerTests { private StubTcpOperations tcpClient; - private StubMessageChannel responseChannel; - @Before public void setup() { - this.responseChannel = new StubMessageChannel(); this.tcpClient = new StubTcpOperations(); - this.brokerRelay = new StompBrokerRelayMessageHandler(this.responseChannel, Arrays.asList("/topic")); + this.brokerRelay = new StompBrokerRelayMessageHandler(new StubMessageChannel(), Arrays.asList("/topic")); this.brokerRelay.setTcpClient(tcpClient); } @@ -161,25 +158,4 @@ public class StompBrokerRelayMessageHandlerTests { } } - - private static class StubMessageChannel implements MessageChannel { - - private final List> messages = new ArrayList<>(); - - - @Override - @SuppressWarnings("unchecked") - public boolean send(Message message) { - this.messages.add((Message) message); - return true; - } - - @Override - @SuppressWarnings("unchecked") - public boolean send(Message message, long timeout) { - this.messages.add((Message) message); - return true; - } - } - } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/support/LoggingWebSocketHandlerDecorator.java b/spring-websocket/src/main/java/org/springframework/web/socket/support/LoggingWebSocketHandlerDecorator.java index a7777af8679..e12641ecd56 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/support/LoggingWebSocketHandlerDecorator.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/support/LoggingWebSocketHandlerDecorator.java @@ -49,8 +49,8 @@ public class LoggingWebSocketHandlerDecorator extends WebSocketHandlerDecorator @Override public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { - if (logger.isDebugEnabled()) { - logger.debug(message + ", " + session); + if (logger.isTraceEnabled()) { + logger.trace(message + ", " + session); } super.handleMessage(session, message); }