Reconnect failures delegated to TcpConnectionHandler
When connecting with a ReconnectStrategy we can only report the outcome of the first connect to the ListenableFuture<Void> return value. Failures for all subsequent attempts to reconnect however must be channeled to TcpConnectHandler#afterConnectFailure which is used in the STOMP broker relay for example to publish BroadcastAvailability(true/false) events.
This commit is contained in:
parent
ea274ebc0a
commit
698c885e06
|
@ -116,12 +116,15 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
|
||||||
return handleShuttingDownConnectFailure(handler);
|
return handleShuttingDownConnectFailure(handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Report first connect to the ListenableFuture
|
||||||
MonoProcessor<Void> connectMono = MonoProcessor.create();
|
MonoProcessor<Void> connectMono = MonoProcessor.create();
|
||||||
|
|
||||||
this.tcpClient
|
this.tcpClient
|
||||||
.newHandler(new ReactorNettyHandler(handler))
|
.newHandler(new ReactorNettyHandler(handler))
|
||||||
.doOnNext(connectFailureConsumer(connectMono))
|
.doOnNext(updateConnectMono(connectMono))
|
||||||
.doOnError(connectFailureConsumer(connectMono))
|
.doOnError(updateConnectMono(connectMono))
|
||||||
.then(NettyContext::onClose)
|
.doOnError(handler::afterConnectFailure) // report all connect failures to the handler
|
||||||
|
.then(NettyContext::onClose) // post-connect issues
|
||||||
.retryWhen(reconnectFunction(strategy))
|
.retryWhen(reconnectFunction(strategy))
|
||||||
.repeatWhen(reconnectFunction(strategy))
|
.repeatWhen(reconnectFunction(strategy))
|
||||||
.subscribe();
|
.subscribe();
|
||||||
|
@ -135,7 +138,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
|
||||||
return new MonoToListenableFutureAdapter<>(Mono.error(ex));
|
return new MonoToListenableFutureAdapter<>(Mono.error(ex));
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> Consumer<T> connectFailureConsumer(MonoProcessor<Void> connectMono) {
|
private <T> Consumer<T> updateConnectMono(MonoProcessor<Void> connectMono) {
|
||||||
return o -> {
|
return o -> {
|
||||||
if (!connectMono.isTerminated()) {
|
if (!connectMono.isTerminated()) {
|
||||||
if (o instanceof Throwable) {
|
if (o instanceof Throwable) {
|
||||||
|
|
Loading…
Reference in New Issue