Ensure that client responses are observed when filters fail
Prior to this commit, an error thrown by a `ExchangeFilterFunction` configured on a `WebClient` instance would be recorded as such by the client observation, but the response details would be missing from the observation. All filter functions and the exchange function (performing the HTTP call) would be merged into a single `ExchangeFunction`; this instance was instrumented and osberved. As a result, the instrumentation would only get the error signal returned by the filter function and would not see the HTTP response even if it was received. This means that the recorded observation would not have the relevant information for the HTTP status. This commit ensures that between the configured `ExchangeFilterFunction` and the `ExchangeFunction`, an instrumentation `ExchangeFilterFunction` is inserted. This allows to set the client response to the observation context, even if a later error signal is thrown by a filter function. Note that with this change, an error signal sent by a filter function will be still recorded in the observation. See gh-30059
This commit is contained in:
parent
8fca258207
commit
d451d6adcc
|
@ -78,6 +78,9 @@ class DefaultWebClient implements WebClient {
|
|||
|
||||
private final ExchangeFunction exchangeFunction;
|
||||
|
||||
@Nullable
|
||||
private final ExchangeFilterFunction filterFunctions;
|
||||
|
||||
private final UriBuilderFactory uriBuilderFactory;
|
||||
|
||||
@Nullable
|
||||
|
@ -93,19 +96,21 @@ class DefaultWebClient implements WebClient {
|
|||
|
||||
private final ObservationRegistry observationRegistry;
|
||||
|
||||
@Nullable
|
||||
private final ClientRequestObservationConvention observationConvention;
|
||||
|
||||
private final DefaultWebClientBuilder builder;
|
||||
|
||||
|
||||
DefaultWebClient(ExchangeFunction exchangeFunction, UriBuilderFactory uriBuilderFactory,
|
||||
DefaultWebClient(ExchangeFunction exchangeFunction, @Nullable ExchangeFilterFunction filterFunctions, UriBuilderFactory uriBuilderFactory,
|
||||
@Nullable HttpHeaders defaultHeaders, @Nullable MultiValueMap<String, String> defaultCookies,
|
||||
@Nullable Consumer<RequestHeadersSpec<?>> defaultRequest,
|
||||
@Nullable Map<Predicate<HttpStatusCode>, Function<ClientResponse, Mono<? extends Throwable>>> statusHandlerMap,
|
||||
ObservationRegistry observationRegistry, ClientRequestObservationConvention observationConvention,
|
||||
ObservationRegistry observationRegistry, @Nullable ClientRequestObservationConvention observationConvention,
|
||||
DefaultWebClientBuilder builder) {
|
||||
|
||||
this.exchangeFunction = exchangeFunction;
|
||||
this.filterFunctions = filterFunctions;
|
||||
this.uriBuilderFactory = uriBuilderFactory;
|
||||
this.defaultHeaders = defaultHeaders;
|
||||
this.defaultCookies = defaultCookies;
|
||||
|
@ -438,16 +443,21 @@ class DefaultWebClient implements WebClient {
|
|||
observation
|
||||
.parentObservation(contextView.getOrDefault(ObservationThreadLocalAccessor.KEY, null))
|
||||
.start();
|
||||
ExchangeFilterFunction filterFunction = new ObservationFilterFunction(observationContext);
|
||||
if (filterFunctions != null) {
|
||||
filterFunction = filterFunctions.andThen(filterFunction);
|
||||
}
|
||||
ClientRequest request = requestBuilder.build();
|
||||
observationContext.setUriTemplate((String) request.attribute(URI_TEMPLATE_ATTRIBUTE).orElse(null));
|
||||
observationContext.setRequest(request);
|
||||
Mono<ClientResponse> responseMono = exchangeFunction.exchange(request)
|
||||
Mono<ClientResponse> responseMono = filterFunction.apply(exchangeFunction)
|
||||
.exchange(request)
|
||||
.checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]")
|
||||
.switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR);
|
||||
if (this.contextModifier != null) {
|
||||
responseMono = responseMono.contextWrite(this.contextModifier);
|
||||
}
|
||||
return responseMono.doOnNext(observationContext::setResponse)
|
||||
return responseMono
|
||||
.doOnError(observationContext::setError)
|
||||
.doOnCancel(() -> {
|
||||
observationContext.setAborted(true);
|
||||
|
@ -718,4 +728,19 @@ class DefaultWebClient implements WebClient {
|
|||
}
|
||||
}
|
||||
|
||||
private static class ObservationFilterFunction implements ExchangeFilterFunction {
|
||||
|
||||
private final ClientRequestObservationContext observationContext;
|
||||
|
||||
public ObservationFilterFunction(ClientRequestObservationContext observationContext) {
|
||||
this.observationContext = observationContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
|
||||
return next.exchange(request)
|
||||
.doOnNext(this.observationContext::setResponse);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -314,16 +314,17 @@ final class DefaultWebClientBuilder implements WebClient.Builder {
|
|||
ExchangeFunctions.create(connectorToUse, initExchangeStrategies()) :
|
||||
this.exchangeFunction);
|
||||
|
||||
ExchangeFunction filteredExchange = (this.filters != null ? this.filters.stream()
|
||||
ExchangeFilterFunction filterFunctions = (this.filters != null ? this.filters.stream()
|
||||
.reduce(ExchangeFilterFunction::andThen)
|
||||
.map(filter -> filter.apply(exchange))
|
||||
.orElse(exchange) : exchange);
|
||||
.orElse(null) : null);
|
||||
|
||||
HttpHeaders defaultHeaders = copyDefaultHeaders();
|
||||
|
||||
MultiValueMap<String, String> defaultCookies = copyDefaultCookies();
|
||||
|
||||
return new DefaultWebClient(filteredExchange, initUriBuilderFactory(),
|
||||
return new DefaultWebClient(exchange,
|
||||
filterFunctions,
|
||||
initUriBuilderFactory(),
|
||||
defaultHeaders,
|
||||
defaultCookies,
|
||||
this.defaultRequest,
|
||||
|
|
|
@ -133,6 +133,19 @@ class WebClientObservationTests {
|
|||
verifyAndGetRequest();
|
||||
}
|
||||
|
||||
@Test
|
||||
void recordsObservationWithResponseDetailsWhenFilterFunctionErrors() {
|
||||
ExchangeFilterFunction errorFunction = (req, next) -> next.exchange(req).then(Mono.error(new IllegalStateException()));
|
||||
WebClient client = this.builder.filter(errorFunction).build();
|
||||
Mono<Void> responseMono = client.get().uri("/path").retrieve().bodyToMono(Void.class);
|
||||
StepVerifier.create(responseMono)
|
||||
.expectError(IllegalStateException.class)
|
||||
.verify(Duration.ofSeconds(5));
|
||||
assertThatHttpObservation()
|
||||
.hasLowCardinalityKeyValue("exception", "IllegalStateException")
|
||||
.hasLowCardinalityKeyValue("status", "200");
|
||||
}
|
||||
|
||||
private TestObservationRegistryAssert.TestObservationRegistryAssertReturningObservationContextAssert assertThatHttpObservation() {
|
||||
return TestObservationRegistryAssert.assertThat(this.observationRegistry)
|
||||
.hasObservationWithNameEqualTo("http.client.requests").that();
|
||||
|
|
Loading…
Reference in New Issue