Avoid deprecated Scheduler.shutdown() in favor of Scheduler.dispose()

This commit is contained in:
Juergen Hoeller 2016-12-22 22:35:54 +01:00
parent 3719f75d3b
commit ae62341fa3
1 changed files with 9 additions and 20 deletions

View File

@ -50,7 +50,7 @@ import org.springframework.util.concurrent.ListenableFuture;
/**
* An implementation of {@link org.springframework.messaging.tcp.TcpOperations}
* based on the TCP client support of the Reactor project.
* <p>
*
* <p>This implementation wraps N (Reactor) clients for N {@link #connect} calls,
* i.e. a separate (Reactor) client instance for each connection.
*
@ -78,7 +78,6 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
* threads will be shared amongst the active clients.
* <p>Also see the constructor accepting a {@link Consumer} of
* {@link ClientOptions} for additional options.
*
* @param host the host to connect to
* @param port the port to connect to
* @param codec for encoding and decoding messages
@ -88,17 +87,13 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
}
/**
* A constructor with a configurator {@link Consumer} that will receive
* default {@link ClientOptions} from {@link TcpClient}. This might be used
* to add SSL or specific network parameters to the generated client
* configuration.
*
* A constructor with a configurator {@link Consumer} that will receive default
* {@link ClientOptions} from {@link TcpClient}. This might be used to add SSL
* or specific network parameters to the generated client configuration.
* @param tcpOptions callback for configuring shared {@link ClientOptions}
* @param codec for encoding and decoding messages
*/
public ReactorNettyTcpClient(Consumer<? super ClientOptions> tcpOptions,
ReactorNettyCodec<P> codec) {
public ReactorNettyTcpClient(Consumer<? super ClientOptions> tcpOptions, ReactorNettyCodec<P> codec) {
Assert.notNull(codec, "'codec' is required");
this.group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
this.tcpClient = TcpClient.create(opts -> tcpOptions.accept(opts.channelGroup(group)));
@ -165,14 +160,13 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
this.stopping = true;
Mono<Void> completion = FutureMono.from(this.group.close())
.doAfterTerminate((x, e) -> this.scheduler.shutdown());
.doAfterTerminate((x, e) -> this.scheduler.dispose());
return new MonoToListenableFutureAdapter<>(completion);
}
private static final class MessageHandler<P>
implements BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> {
private static final class MessageHandler<P> implements BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> {
private final TcpConnectionHandler<P> connectionHandler;
@ -180,10 +174,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
private final Scheduler scheduler;
MessageHandler(TcpConnectionHandler<P> handler, ReactorNettyCodec<P> codec,
Scheduler scheduler) {
MessageHandler(TcpConnectionHandler<P> handler, ReactorNettyCodec<P> codec, Scheduler scheduler) {
this.connectionHandler = handler;
this.codec = codec;
this.scheduler = scheduler;
@ -192,7 +183,6 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
@Override
public Publisher<Void> apply(NettyInbound in, NettyOutbound out) {
Flux<Collection<Message<P>>> inbound = in.receive().map(this.codec.getDecoder());
DirectProcessor<Void> closeProcessor = DirectProcessor.create();
TcpConnection<P> tcpConnection =
@ -209,14 +199,13 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
return closeProcessor;
}
}
private static final class Reconnector<T> implements Function<Flux<T>, Publisher<?>> {
private final ReconnectStrategy strategy;
Reconnector(ReconnectStrategy strategy) {
this.strategy = strategy;
}