Ensure Environment.shutdown() in Reactor2TcpClient
Issue: SPR-14229
This commit is contained in:
parent
d48eeb2c84
commit
f7ace54488
|
@ -84,6 +84,8 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
|
|||
|
||||
private final EventLoopGroup eventLoopGroup;
|
||||
|
||||
private final Environment environment;
|
||||
|
||||
private final TcpClientFactory<Message<P>, Message<P>> tcpClientSpecFactory;
|
||||
|
||||
private final List<TcpClient<Message<P>, Message<P>>> tcpClients =
|
||||
|
@ -108,12 +110,13 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
|
|||
// Reactor 2.0.5 requires NioEventLoopGroup vs 2.0.6+ requires EventLoopGroup
|
||||
final NioEventLoopGroup nioEventLoopGroup = initEventLoopGroup();
|
||||
this.eventLoopGroup = nioEventLoopGroup;
|
||||
this.environment = new Environment(new SynchronousDispatcherConfigReader());
|
||||
|
||||
this.tcpClientSpecFactory = new TcpClientFactory<Message<P>, Message<P>>() {
|
||||
@Override
|
||||
public TcpClientSpec<Message<P>, Message<P>> apply(TcpClientSpec<Message<P>, Message<P>> spec) {
|
||||
return spec
|
||||
.env(new Environment(new SynchronousDispatcherConfigReader()))
|
||||
.env(environment)
|
||||
.codec(codec)
|
||||
.connect(host, port)
|
||||
.options(createClientSocketOptions());
|
||||
|
@ -139,6 +142,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
|
|||
Assert.notNull(tcpClientSpecFactory, "'tcpClientClientFactory' must not be null");
|
||||
this.tcpClientSpecFactory = tcpClientSpecFactory;
|
||||
this.eventLoopGroup = null;
|
||||
this.environment = null;
|
||||
}
|
||||
|
||||
|
||||
|
@ -269,6 +273,15 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
|
|||
promise = eventLoopPromise;
|
||||
}
|
||||
|
||||
if (this.environment != null) {
|
||||
promise.onComplete(new Consumer<Promise<Void>>() {
|
||||
@Override
|
||||
public void accept(Promise<Void> voidPromise) {
|
||||
environment.shutdown();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return new PassThroughPromiseToListenableFutureAdapter<Void>(promise);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue