Fix reactive HTTP server Observation instrumentation

Prior to this commit, regressions were introduced with gh-31417:

1. the observation keyvalues would be inconsistent with the HTTP response
2. the observation scope would not cover all controller handlers, causing
  traceIds to be missing

The first issue is caused by the fact that in case of error signals, the
observation was stopped before the response was fully committed, which
means further processing could happen and update the response status.
This commit delays the stop event until the response is committed in
case of errors.

The second problem is caused by the change from a `contextWrite`
operator to using the `tap` operator with a `SignalListener`. The
observation was started in the `doOnSubscription` callback, which is too
late in some cases. If the WebFlux controller handler is synchronous
non-blocking, the execution of the handler is performed before the
subscription happens. This means that for those handlers, the
observation was not started, even if the current observation was
present in the reactor context. This commit changes the
`doOnSubscription` to `doFirst` to ensure that the observation is
started at the right time.

Fixes gh-31703
Fixes gh-31706
This commit is contained in:
Brian Clozel 2023-11-29 14:39:56 +01:00
parent c8e6315a67
commit 35fcbae8c6
4 changed files with 71 additions and 33 deletions

View File

@ -121,16 +121,17 @@ public class ServerHttpObservationFilter implements WebFilter {
DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, observationRegistry);
}
@Override
public void doOnSubscription() throws Throwable {
this.observation.start();
}
@Override
public Context addToContext(Context originalContext) {
return originalContext.put(ObservationThreadLocalAccessor.KEY, this.observation);
}
@Override
public void doFirst() throws Throwable {
this.observation.start();
}
@Override
public void doOnCancel() throws Throwable {
if (this.observationRecorded.compareAndSet(false, true)) {
@ -142,16 +143,7 @@ public class ServerHttpObservationFilter implements WebFilter {
@Override
public void doOnComplete() throws Throwable {
if (this.observationRecorded.compareAndSet(false, true)) {
ServerHttpResponse response = this.observationContext.getResponse();
if (response.isCommitted()) {
this.observation.stop();
}
else {
response.beforeCommit(() -> {
this.observation.stop();
return Mono.empty();
});
}
doOnTerminate(this.observationContext);
}
}
@ -162,8 +154,21 @@ public class ServerHttpObservationFilter implements WebFilter {
this.observationContext.setConnectionAborted(true);
}
this.observationContext.setError(error);
doOnTerminate(this.observationContext);
}
}
private void doOnTerminate(ServerRequestObservationContext context) {
ServerHttpResponse response = context.getResponse();
if (response.isCommitted()) {
this.observation.stop();
}
else {
response.beforeCommit(() -> {
this.observation.stop();
return Mono.empty();
});
}
}
}

View File

@ -374,13 +374,13 @@ public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHa
}
@Override
public void doOnSubscription() throws Throwable {
this.observation.start();
public Context addToContext(Context originalContext) {
return originalContext.put(ObservationThreadLocalAccessor.KEY, this.observation);
}
@Override
public Context addToContext(Context originalContext) {
return originalContext.put(ObservationThreadLocalAccessor.KEY, this.observation);
public void doFirst() throws Throwable {
this.observation.start();
}
@Override
@ -394,21 +394,12 @@ public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHa
@Override
public void doOnComplete() throws Throwable {
if (this.observationRecorded.compareAndSet(false, true)) {
ServerHttpResponse response = this.observationContext.getResponse();
Throwable throwable = (Throwable) this.observationContext.getAttributes()
.get(ExceptionHandlingWebHandler.HANDLED_WEB_EXCEPTION);
if (throwable != null) {
this.observation.error(throwable);
}
if (response.isCommitted()) {
this.observation.stop();
}
else {
response.beforeCommit(() -> {
this.observation.stop();
return Mono.empty();
});
}
doOnTerminate(this.observationContext);
}
}
@ -416,8 +407,22 @@ public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHa
public void doOnError(Throwable error) throws Throwable {
if (this.observationRecorded.compareAndSet(false, true)) {
this.observationContext.setError(error);
doOnTerminate(this.observationContext);
}
}
private void doOnTerminate(ServerRequestObservationContext context) {
ServerHttpResponse response = context.getResponse();
if (response.isCommitted()) {
this.observation.stop();
}
else {
response.beforeCommit(() -> {
this.observation.stop();
return Mono.empty();
});
}
}
}

View File

@ -19,6 +19,7 @@ package org.springframework.web.filter.reactive;
import java.util.Optional;
import io.micrometer.observation.Observation;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import io.micrometer.observation.tck.TestObservationRegistry;
import io.micrometer.observation.tck.TestObservationRegistryAssert;
@ -27,6 +28,7 @@ import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.observation.ServerRequestObservationContext;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
@ -66,7 +68,10 @@ class ServerHttpObservationFilterTests {
ServerWebExchange exchange = MockServerWebExchange.from(MockServerHttpRequest.post("/test/resource"));
exchange.getResponse().setRawStatusCode(200);
WebFilterChain filterChain = webExchange -> Mono.deferContextual(contextView -> {
assertThat(contextView.getOrEmpty(ObservationThreadLocalAccessor.KEY)).isPresent();
Observation observation = contextView.get(ObservationThreadLocalAccessor.KEY);
assertThat(observation).isNotNull();
// check that the observation was started
assertThat(observation.getContext().getLowCardinalityKeyValue("outcome")).isNotNull();
return Mono.empty();
});
this.filter.filter(exchange, filterChain).block();
@ -100,6 +105,25 @@ class ServerHttpObservationFilterTests {
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "UNKNOWN");
}
@Test
void filterShouldStopObservationOnResponseCommit() {
ServerWebExchange exchange = MockServerWebExchange.from(MockServerHttpRequest.post("/test/resource"));
WebFilterChain filterChain = createFilterChain(filterExchange -> {
throw new IllegalArgumentException("server error");
});
StepVerifier.create(this.filter.filter(exchange, filterChain).doOnError(throwable -> {
ServerHttpResponse response = exchange.getResponse();
response.setRawStatusCode(500);
response.setComplete().block();
}))
.expectError(IllegalArgumentException.class)
.verify();
Optional<ServerRequestObservationContext> observationContext = ServerHttpObservationFilter.findObservationContext(exchange);
assertThat(observationContext.get().getError()).isInstanceOf(IllegalArgumentException.class);
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SERVER_ERROR");
}
private WebFilterChain createFilterChain(ThrowingConsumer<ServerWebExchange> exchangeConsumer) {
return filterExchange -> {
try {

View File

@ -20,13 +20,13 @@ package org.springframework.web.server.adapter;
import java.util.List;
import java.util.Optional;
import io.micrometer.observation.Observation;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import io.micrometer.observation.tck.TestObservationRegistry;
import io.micrometer.observation.tck.TestObservationRegistryAssert;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.util.context.ContextView;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.observation.ServerRequestObservationContext;
@ -66,7 +66,8 @@ class HttpWebHandlerAdapterObservabilityTests {
void handlerShouldSetCurrentObservationInReactorContext() {
ReactorContextWebHandler targetHandler = new ReactorContextWebHandler();
createWebHandler(targetHandler).handle(this.request, this.response).block();
assertThat(targetHandler.contextView.getOrEmpty(ObservationThreadLocalAccessor.KEY)).isPresent();
assertThat(targetHandler.currentObservation).isNotNull();
assertThat(targetHandler.observationStarted).isTrue();
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS");
}
@ -120,13 +121,16 @@ class HttpWebHandlerAdapterObservabilityTests {
private static class ReactorContextWebHandler implements WebHandler {
ContextView contextView;
Observation currentObservation;
boolean observationStarted;
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
exchange.getResponse().setStatusCode(HttpStatus.OK);
return Mono.deferContextual(contextView -> {
this.contextView = contextView;
this.currentObservation = contextView.get(ObservationThreadLocalAccessor.KEY);
this.observationStarted = this.currentObservation.getContext().getLowCardinalityKeyValue("outcome") != null;
return Mono.empty();
});
}