Shutdown Reactor env when relay handler is stopped
The Reactor Environment (that's used by the TcpClient) manages a number of threads. To ensure that these threads are cleaned up Environment.shutdown() must be called when the Environment is no longer needed.
This commit is contained in:
parent
d650e909b2
commit
e24b71e700
|
@ -73,6 +73,8 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler
|
||||||
|
|
||||||
private MessageConverter payloadConverter;
|
private MessageConverter payloadConverter;
|
||||||
|
|
||||||
|
private Environment environment;
|
||||||
|
|
||||||
private TcpClient<String, String> tcpClient;
|
private TcpClient<String, String> tcpClient;
|
||||||
|
|
||||||
private final Map<String, RelaySession> relaySessions = new ConcurrentHashMap<String, RelaySession>();
|
private final Map<String, RelaySession> relaySessions = new ConcurrentHashMap<String, RelaySession>();
|
||||||
|
@ -181,9 +183,9 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler
|
||||||
@Override
|
@Override
|
||||||
public void start() {
|
public void start() {
|
||||||
synchronized (this.lifecycleMonitor) {
|
synchronized (this.lifecycleMonitor) {
|
||||||
|
this.environment = new Environment();
|
||||||
this.tcpClient = new TcpClient.Spec<String, String>(NettyTcpClient.class)
|
this.tcpClient = new TcpClient.Spec<String, String>(NettyTcpClient.class)
|
||||||
.using(new Environment())
|
.using(this.environment)
|
||||||
.codec(new DelimitedCodec<String, String>((byte) 0, true, StandardCodecs.STRING_CODEC))
|
.codec(new DelimitedCodec<String, String>((byte) 0, true, StandardCodecs.STRING_CODEC))
|
||||||
.connect(this.relayHost, this.relayPort)
|
.connect(this.relayHost, this.relayPort)
|
||||||
.get();
|
.get();
|
||||||
|
@ -214,6 +216,7 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler
|
||||||
this.running = false;
|
this.running = false;
|
||||||
try {
|
try {
|
||||||
this.tcpClient.close().await(5000, TimeUnit.MILLISECONDS);
|
this.tcpClient.close().await(5000, TimeUnit.MILLISECONDS);
|
||||||
|
this.environment.shutdown();
|
||||||
}
|
}
|
||||||
catch (InterruptedException e) {
|
catch (InterruptedException e) {
|
||||||
// ignore
|
// ignore
|
||||||
|
|
Loading…
Reference in New Issue