Register callback for HttpComponentsClientHttpConnector
This commit ensures that we register a result callback when executing the request (next to the existing response callback), which gets notified of invalid hosts and other connection issues. Closes gh-29156
This commit is contained in:
parent
f5a39308a0
commit
0ddb394fd0
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2022 the original author or authors.
|
||||
* Copyright 2002-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -20,6 +20,7 @@ import java.io.Closeable;
|
|||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
|
||||
|
|
@ -122,9 +123,9 @@ public class HttpComponentsClientHttpConnector implements ClientHttpConnector, C
|
|||
|
||||
return Mono.create(sink -> {
|
||||
ReactiveResponseConsumer reactiveResponseConsumer =
|
||||
new ReactiveResponseConsumer(new MonoFutureCallbackAdapter(sink, this.dataBufferFactory, context));
|
||||
new ReactiveResponseConsumer(new ResponseCallback(sink, this.dataBufferFactory, context));
|
||||
|
||||
this.client.execute(requestProducer, reactiveResponseConsumer, context, null);
|
||||
this.client.execute(requestProducer, reactiveResponseConsumer, context, new ResultCallback(sink));
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -133,7 +134,10 @@ public class HttpComponentsClientHttpConnector implements ClientHttpConnector, C
|
|||
this.client.close();
|
||||
}
|
||||
|
||||
private static class MonoFutureCallbackAdapter
|
||||
/**
|
||||
* Callback that invoked when a response is received.
|
||||
*/
|
||||
private static class ResponseCallback
|
||||
implements FutureCallback<Message<HttpResponse, Publisher<ByteBuffer>>> {
|
||||
|
||||
private final MonoSink<ClientHttpResponse> sink;
|
||||
|
|
@ -142,7 +146,8 @@ public class HttpComponentsClientHttpConnector implements ClientHttpConnector, C
|
|||
|
||||
private final HttpClientContext context;
|
||||
|
||||
public MonoFutureCallbackAdapter(MonoSink<ClientHttpResponse> sink,
|
||||
|
||||
public ResponseCallback(MonoSink<ClientHttpResponse> sink,
|
||||
DataBufferFactory dataBufferFactory, HttpClientContext context) {
|
||||
this.sink = sink;
|
||||
this.dataBufferFactory = dataBufferFactory;
|
||||
|
|
@ -164,6 +169,37 @@ public class HttpComponentsClientHttpConnector implements ClientHttpConnector, C
|
|||
|
||||
@Override
|
||||
public void cancelled() {
|
||||
this.sink.error(new CancellationException());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Callback that invoked when a request is executed.
|
||||
*/
|
||||
private static class ResultCallback implements FutureCallback<Void> {
|
||||
|
||||
private final MonoSink<?> sink;
|
||||
|
||||
|
||||
public ResultCallback(MonoSink<?> sink) {
|
||||
this.sink = sink;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void completed(Void result) {
|
||||
this.sink.success();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Exception ex) {
|
||||
Throwable t = (ex instanceof HttpStreamResetException hsre ? hsre.getCause() : ex);
|
||||
this.sink.error(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancelled() {
|
||||
this.sink.error(new CancellationException());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue