Improve TCP connection info logging.

After the recent changes to expose configuring TcpOperations, it no
longer makes sense to automatically log the relayHost/Port since that's
mutually exclusive with a custom TcpOperations.

Instead we delegate to TcpOperations.toString().

Issue: SPR-16801
This commit is contained in:
Rossen Stoyanchev 2018-05-16 11:14:24 -04:00
parent 4ff4d5a181
commit 37b0ed9fcb
4 changed files with 34 additions and 15 deletions

View File

@ -89,4 +89,8 @@ public class ReactorNettyTcpStompClient extends StompClientSupport {
this.tcpClient.shutdown();
}
@Override
public String toString() {
return "ReactorNettyTcpStompClient[" + this.tcpClient + "]";
}
}

View File

@ -82,12 +82,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
public static final String SYSTEM_SESSION_ID = "_system_";
// STOMP recommends error of margin for receiving heartbeats
/** STOMP recommended error of margin for receiving heartbeats */
private static final long HEARTBEAT_MULTIPLIER = 3;
/**
* A heartbeat is setup once a CONNECTED frame is received which contains the heartbeat settings
* we need. If we don't receive CONNECTED within a minute, the connection is closed proactively.
* Heartbeat starts once CONNECTED frame with heartbeat settings is received.
* If CONNECTED doesn't arrive within a minute, we'll close the connection.
*/
private static final int MAX_TIME_TO_CONNECTED_FRAME = 60 * 1000;
@ -403,7 +403,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
if (logger.isInfoEnabled()) {
logger.info("Connecting \"system\" session to " + this.relayHost + ":" + this.relayPort);
logger.info("Starting \"system\" session, " + toString());
}
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECT);
@ -552,7 +552,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override
public String toString() {
return "StompBrokerRelay[" + this.relayHost + ":" + this.relayPort + "]";
return "StompBrokerRelay[" + getTcpClientInfo() + "]";
}
private String getTcpClientInfo() {
return this.tcpClient != null ? this.tcpClient.toString() : this.relayHost + ":" + this.relayPort;
}
@ -987,7 +991,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private static class VoidCallable implements Callable<Void> {
@Override
public Void call() throws Exception {
public Void call() {
return null;
}
}
@ -1014,7 +1018,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
public String toString() {
return (connectionHandlers.size() + " sessions, " + relayHost + ":" + relayPort +
return (connectionHandlers.size() + " sessions, " + getTcpClientInfo() +
(isBrokerAvailable() ? " (available)" : " (not available)") +
", processed CONNECT(" + this.connect.get() + ")-CONNECTED(" +
this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")");

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -30,6 +30,8 @@ import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.concurrent.ImmediateEventExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
@ -65,6 +67,8 @@ import org.springframework.util.concurrent.SettableListenableFuture;
*/
public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
private static Log logger = LogFactory.getLog(ReactorNettyTcpClient.class);
private static final int PUBLISH_ON_BUFFER_SIZE = 16;
@ -201,7 +205,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
.doOnNext(updateConnectMono(connectMono))
.doOnError(updateConnectMono(connectMono))
.doOnError(handler::afterConnectFailure) // report all connect failures to the handler
.flatMap(NettyContext::onClose) // post-connect issues
.flatMap(NettyContext::onClose) // post-connect issues
.retryWhen(reconnectFunction(strategy))
.repeatWhen(reconnectFunction(strategy))
.subscribe();
@ -281,6 +285,11 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
});
}
@Override
public String toString() {
return "ReactorNettyTcpClient[" + this.tcpClient + "]";
}
private class ReactorNettyHandler implements BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> {
@ -293,6 +302,9 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
@Override
@SuppressWarnings("unchecked")
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
if (logger.isDebugEnabled()) {
logger.debug("Connected to " + inbound.remoteAddress());
}
DirectProcessor<Void> completion = DirectProcessor.create();
TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion);
scheduler.schedule(() -> connectionHandler.afterConnected(connection));
@ -321,7 +333,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
Collection<Message<P>> messages = codec.decode(in);
out.addAll(messages);
}

View File

@ -279,9 +279,9 @@ public class MessageBrokerBeanDefinitionParserTests {
assertEquals(5000, messageBroker.getSystemHeartbeatSendInterval());
assertThat(messageBroker.getDestinationPrefixes(), Matchers.containsInAnyOrder("/topic","/queue"));
List<Class<? extends MessageHandler>> subscriberTypes =
Arrays.<Class<? extends MessageHandler>>asList(SimpAnnotationMethodMessageHandler.class,
UserDestinationMessageHandler.class, StompBrokerRelayMessageHandler.class);
List<Class<? extends MessageHandler>> subscriberTypes = Arrays.asList(
SimpAnnotationMethodMessageHandler.class, UserDestinationMessageHandler.class,
StompBrokerRelayMessageHandler.class);
testChannel("clientInboundChannel", subscriberTypes, 2);
testExecutor("clientInboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60);
@ -289,8 +289,7 @@ public class MessageBrokerBeanDefinitionParserTests {
testChannel("clientOutboundChannel", subscriberTypes, 1);
testExecutor("clientOutboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60);
subscriberTypes = Arrays.<Class<? extends MessageHandler>>asList(
StompBrokerRelayMessageHandler.class, UserDestinationMessageHandler.class);
subscriberTypes = Arrays.asList(StompBrokerRelayMessageHandler.class, UserDestinationMessageHandler.class);
testChannel("brokerChannel", subscriberTypes, 1);
try {
this.appContext.getBean("brokerChannelExecutor", ThreadPoolTaskExecutor.class);