Reactor2TcpClient cleans up TcpClient instances

Issue: SPR-14231
This commit is contained in:
Rossen Stoyanchev 2016-04-28 15:26:35 -04:00
parent 977840884b
commit 220711d45b
1 changed files with 29 additions and 5 deletions

View File

@ -163,7 +163,8 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> connectionHandler) {
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
TcpClient<Message<P>, Message<P>> tcpClient;
final TcpClient<Message<P>, Message<P>> tcpClient;
Runnable cleanupTask;
synchronized (this.tcpClients) {
if (this.stopping) {
IllegalStateException ex = new IllegalStateException("Shutting down.");
@ -172,9 +173,18 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
}
tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory);
this.tcpClients.add(tcpClient);
cleanupTask = new Runnable() {
@Override
public void run() {
synchronized (tcpClients) {
tcpClients.remove(tcpClient);
}
}
};
}
Promise<Void> promise = tcpClient.start(new MessageChannelStreamHandler<P>(connectionHandler));
Promise<Void> promise = tcpClient.start(
new MessageChannelStreamHandler<P>(connectionHandler, cleanupTask));
return new PassThroughPromiseToListenableFutureAdapter<Void>(
promise.onError(new Consumer<Throwable>() {
@ -191,7 +201,8 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
Assert.notNull(strategy, "ReconnectStrategy must not be null");
TcpClient<Message<P>, Message<P>> tcpClient;
final TcpClient<Message<P>, Message<P>> tcpClient;
Runnable cleanupTask;
synchronized (this.tcpClients) {
if (this.stopping) {
IllegalStateException ex = new IllegalStateException("Shutting down.");
@ -200,10 +211,18 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
}
tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory);
this.tcpClients.add(tcpClient);
cleanupTask = new Runnable() {
@Override
public void run() {
synchronized (tcpClients) {
tcpClients.remove(tcpClient);
}
}
};
}
Stream<Tuple2<InetSocketAddress, Integer>> stream = tcpClient.start(
new MessageChannelStreamHandler<P>(connectionHandler),
new MessageChannelStreamHandler<P>(connectionHandler, cleanupTask),
new ReactorReconnectAdapter(strategy));
return new PassThroughPromiseToListenableFutureAdapter<Void>(stream.next().after());
@ -249,6 +268,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
});
promise = eventLoopPromise;
}
return new PassThroughPromiseToListenableFutureAdapter<Void>(promise);
}
@ -278,8 +298,11 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
private final TcpConnectionHandler<P> connectionHandler;
public MessageChannelStreamHandler(TcpConnectionHandler<P> connectionHandler) {
private final Runnable cleanupTask;
public MessageChannelStreamHandler(TcpConnectionHandler<P> connectionHandler, Runnable cleanupTask) {
this.connectionHandler = connectionHandler;
this.cleanupTask = cleanupTask;
}
@Override
@ -290,6 +313,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
.finallyDo(new Consumer<Signal<Message<P>>>() {
@Override
public void accept(Signal<Message<P>> signal) {
cleanupTask.run();
if (signal.isOnError()) {
connectionHandler.handleFailure(signal.getThrowable());
}