Explicitly manage event loop in Reactor2TcpStompClient

Reactor2TcpStompClient now explicitly manages a Netty EventLoopGroup
which prevents resource leaks on attempts to reconnect.

Issue: SPR-15035
This commit is contained in:
Rossen Stoyanchev 2017-01-17 14:19:36 -05:00
parent d92f69747b
commit 48f57e3bb2
3 changed files with 76 additions and 20 deletions

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2016 the original author or authors. * Copyright 2002-2017 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -20,30 +20,38 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import io.netty.channel.EventLoopGroup;
import reactor.Environment; import reactor.Environment;
import reactor.core.config.ConfigurationReader; import reactor.core.config.ConfigurationReader;
import reactor.core.config.DispatcherConfiguration; import reactor.core.config.DispatcherConfiguration;
import reactor.core.config.DispatcherType; import reactor.core.config.DispatcherType;
import reactor.core.config.ReactorConfiguration; import reactor.core.config.ReactorConfiguration;
import reactor.io.net.NetStreams; import reactor.io.net.NetStreams.TcpClientFactory;
import reactor.io.net.Spec.TcpClientSpec; import reactor.io.net.Spec.TcpClientSpec;
import reactor.io.net.impl.netty.NettyClientSocketOptions;
import org.springframework.context.Lifecycle;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.TcpOperations; import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.Reactor2TcpClient; import org.springframework.messaging.tcp.reactor.Reactor2TcpClient;
import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFuture;
/** /**
* A STOMP over TCP client that uses * A STOMP over TCP client that uses {@link Reactor2TcpClient}.
* {@link Reactor2TcpClient}.
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 4.2 * @since 4.2
*/ */
public class Reactor2TcpStompClient extends StompClientSupport { public class Reactor2TcpStompClient extends StompClientSupport implements Lifecycle {
private final TcpOperations<byte[]> tcpClient; private final TcpOperations<byte[]> tcpClient;
private final EventLoopGroup eventLoopGroup;
private final Environment environment;
private volatile boolean running = false;
/** /**
* Create an instance with host "127.0.0.1" and port 61613. * Create an instance with host "127.0.0.1" and port 61613.
@ -57,11 +65,11 @@ public class Reactor2TcpStompClient extends StompClientSupport {
* @param host the host * @param host the host
* @param port the port * @param port the port
*/ */
public Reactor2TcpStompClient(final String host, final int port) { public Reactor2TcpStompClient(String host, int port) {
ConfigurationReader reader = new StompClientDispatcherConfigReader(); this.eventLoopGroup = Reactor2TcpClient.initEventLoopGroup();
Environment environment = new Environment(reader).assignErrorJournal(); this.environment = new Environment();
StompTcpClientSpecFactory factory = new StompTcpClientSpecFactory(environment, host, port); this.tcpClient = new Reactor2TcpClient<byte[]>(
this.tcpClient = new Reactor2TcpClient<byte[]>(factory); new StompTcpClientSpecFactory(host, port, this.eventLoopGroup, this.environment));
} }
/** /**
@ -70,6 +78,43 @@ public class Reactor2TcpStompClient extends StompClientSupport {
*/ */
public Reactor2TcpStompClient(TcpOperations<byte[]> tcpClient) { public Reactor2TcpStompClient(TcpOperations<byte[]> tcpClient) {
this.tcpClient = tcpClient; this.tcpClient = tcpClient;
this.eventLoopGroup = null;
this.environment = null;
}
@Override
public void start() {
if (!isRunning()) {
this.running = true;
}
}
@Override
public void stop() {
if (isRunning()) {
this.running = false;
try {
if (this.eventLoopGroup != null) {
this.eventLoopGroup.shutdownGracefully().await(5000);
}
if (this.environment != null) {
this.environment.shutdown();
}
}
catch (InterruptedException ex) {
if (logger.isErrorEnabled()) {
logger.error("Failed to shutdown gracefully", ex);
}
}
}
}
@Override
public boolean isRunning() {
return this.running;
} }
@ -120,30 +165,36 @@ public class Reactor2TcpStompClient extends StompClientSupport {
} }
private static class StompTcpClientSpecFactory private static class StompTcpClientSpecFactory implements TcpClientFactory<Message<byte[]>, Message<byte[]>> {
implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {
private final Environment environment;
private final String host; private final String host;
private final int port; private final int port;
public StompTcpClientSpecFactory(Environment environment, String host, int port) { private final EventLoopGroup eventLoopGroup;
this.environment = environment;
private final Environment environment;
public StompTcpClientSpecFactory(String host, int port, EventLoopGroup group, Environment environment) {
this.host = host; this.host = host;
this.port = port; this.port = port;
this.eventLoopGroup = group;
this.environment = environment;
} }
@Override @Override
public TcpClientSpec<Message<byte[]>, Message<byte[]>> apply( public TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(
TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) { TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
final Reactor2StompCodec codec = new Reactor2StompCodec(new StompEncoder(), new StompDecoder());
return tcpClientSpec return tcpClientSpec
.codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
.env(this.environment) .env(this.environment)
.dispatcher(this.environment.getCachedDispatchers("StompClient").get()) .dispatcher(this.environment.getDispatcher(Environment.WORK_QUEUE))
.connect(this.host, this.port); .connect(this.host, this.port)
.codec(codec)
.options(new NettyClientSocketOptions().eventLoopGroup(this.eventLoopGroup));
} }
} }

View File

@ -18,6 +18,9 @@ package org.springframework.messaging.simp.stomp;
import java.util.Arrays; import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.SimpleMessageConverter; import org.springframework.messaging.converter.SimpleMessageConverter;
import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.TaskScheduler;
@ -40,6 +43,8 @@ import org.springframework.util.Assert;
*/ */
public abstract class StompClientSupport { public abstract class StompClientSupport {
protected Log logger = LogFactory.getLog(getClass());
private MessageConverter messageConverter = new SimpleMessageConverter(); private MessageConverter messageConverter = new SimpleMessageConverter();
private TaskScheduler taskScheduler; private TaskScheduler taskScheduler;

View File

@ -146,7 +146,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
} }
private static NioEventLoopGroup initEventLoopGroup() { public static NioEventLoopGroup initEventLoopGroup() {
int ioThreadCount; int ioThreadCount;
try { try {
ioThreadCount = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount")); ioThreadCount = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount"));