diff --git a/spring-web/src/main/java/org/springframework/web/filter/reactive/ServerHttpObservationFilter.java b/spring-web/src/main/java/org/springframework/web/filter/reactive/ServerHttpObservationFilter.java index 174cde705ac..5cc36eeeed8 100644 --- a/spring-web/src/main/java/org/springframework/web/filter/reactive/ServerHttpObservationFilter.java +++ b/spring-web/src/main/java/org/springframework/web/filter/reactive/ServerHttpObservationFilter.java @@ -121,16 +121,17 @@ public class ServerHttpObservationFilter implements WebFilter { DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, observationRegistry); } - @Override - public void doOnSubscription() throws Throwable { - this.observation.start(); - } @Override public Context addToContext(Context originalContext) { return originalContext.put(ObservationThreadLocalAccessor.KEY, this.observation); } + @Override + public void doFirst() throws Throwable { + this.observation.start(); + } + @Override public void doOnCancel() throws Throwable { if (this.observationRecorded.compareAndSet(false, true)) { @@ -142,16 +143,7 @@ public class ServerHttpObservationFilter implements WebFilter { @Override public void doOnComplete() throws Throwable { if (this.observationRecorded.compareAndSet(false, true)) { - ServerHttpResponse response = this.observationContext.getResponse(); - if (response.isCommitted()) { - this.observation.stop(); - } - else { - response.beforeCommit(() -> { - this.observation.stop(); - return Mono.empty(); - }); - } + doOnTerminate(this.observationContext); } } @@ -162,8 +154,21 @@ public class ServerHttpObservationFilter implements WebFilter { this.observationContext.setConnectionAborted(true); } this.observationContext.setError(error); + doOnTerminate(this.observationContext); + } + } + + private void doOnTerminate(ServerRequestObservationContext context) { + ServerHttpResponse response = context.getResponse(); + if (response.isCommitted()) { this.observation.stop(); } + else { + response.beforeCommit(() -> { + this.observation.stop(); + return Mono.empty(); + }); + } } } diff --git a/spring-web/src/main/java/org/springframework/web/server/adapter/HttpWebHandlerAdapter.java b/spring-web/src/main/java/org/springframework/web/server/adapter/HttpWebHandlerAdapter.java index 0eacc76fdfe..34f4f754c04 100644 --- a/spring-web/src/main/java/org/springframework/web/server/adapter/HttpWebHandlerAdapter.java +++ b/spring-web/src/main/java/org/springframework/web/server/adapter/HttpWebHandlerAdapter.java @@ -374,13 +374,13 @@ public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHa } @Override - public void doOnSubscription() throws Throwable { - this.observation.start(); + public Context addToContext(Context originalContext) { + return originalContext.put(ObservationThreadLocalAccessor.KEY, this.observation); } @Override - public Context addToContext(Context originalContext) { - return originalContext.put(ObservationThreadLocalAccessor.KEY, this.observation); + public void doFirst() throws Throwable { + this.observation.start(); } @Override @@ -394,21 +394,12 @@ public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHa @Override public void doOnComplete() throws Throwable { if (this.observationRecorded.compareAndSet(false, true)) { - ServerHttpResponse response = this.observationContext.getResponse(); Throwable throwable = (Throwable) this.observationContext.getAttributes() .get(ExceptionHandlingWebHandler.HANDLED_WEB_EXCEPTION); if (throwable != null) { this.observation.error(throwable); } - if (response.isCommitted()) { - this.observation.stop(); - } - else { - response.beforeCommit(() -> { - this.observation.stop(); - return Mono.empty(); - }); - } + doOnTerminate(this.observationContext); } } @@ -416,8 +407,22 @@ public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHa public void doOnError(Throwable error) throws Throwable { if (this.observationRecorded.compareAndSet(false, true)) { this.observationContext.setError(error); + doOnTerminate(this.observationContext); + } + } + + + private void doOnTerminate(ServerRequestObservationContext context) { + ServerHttpResponse response = context.getResponse(); + if (response.isCommitted()) { this.observation.stop(); } + else { + response.beforeCommit(() -> { + this.observation.stop(); + return Mono.empty(); + }); + } } } diff --git a/spring-web/src/test/java/org/springframework/web/filter/reactive/ServerHttpObservationFilterTests.java b/spring-web/src/test/java/org/springframework/web/filter/reactive/ServerHttpObservationFilterTests.java index b6a6d98cce0..fc1d3e75f7c 100644 --- a/spring-web/src/test/java/org/springframework/web/filter/reactive/ServerHttpObservationFilterTests.java +++ b/spring-web/src/test/java/org/springframework/web/filter/reactive/ServerHttpObservationFilterTests.java @@ -19,6 +19,7 @@ package org.springframework.web.filter.reactive; import java.util.Optional; +import io.micrometer.observation.Observation; import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor; import io.micrometer.observation.tck.TestObservationRegistry; import io.micrometer.observation.tck.TestObservationRegistryAssert; @@ -27,6 +28,7 @@ import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.http.server.reactive.observation.ServerRequestObservationContext; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebFilterChain; @@ -66,7 +68,10 @@ class ServerHttpObservationFilterTests { ServerWebExchange exchange = MockServerWebExchange.from(MockServerHttpRequest.post("/test/resource")); exchange.getResponse().setRawStatusCode(200); WebFilterChain filterChain = webExchange -> Mono.deferContextual(contextView -> { - assertThat(contextView.getOrEmpty(ObservationThreadLocalAccessor.KEY)).isPresent(); + Observation observation = contextView.get(ObservationThreadLocalAccessor.KEY); + assertThat(observation).isNotNull(); + // check that the observation was started + assertThat(observation.getContext().getLowCardinalityKeyValue("outcome")).isNotNull(); return Mono.empty(); }); this.filter.filter(exchange, filterChain).block(); @@ -100,6 +105,25 @@ class ServerHttpObservationFilterTests { assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "UNKNOWN"); } + @Test + void filterShouldStopObservationOnResponseCommit() { + ServerWebExchange exchange = MockServerWebExchange.from(MockServerHttpRequest.post("/test/resource")); + WebFilterChain filterChain = createFilterChain(filterExchange -> { + throw new IllegalArgumentException("server error"); + }); + StepVerifier.create(this.filter.filter(exchange, filterChain).doOnError(throwable -> { + ServerHttpResponse response = exchange.getResponse(); + response.setRawStatusCode(500); + response.setComplete().block(); + })) + .expectError(IllegalArgumentException.class) + .verify(); + Optional observationContext = ServerHttpObservationFilter.findObservationContext(exchange); + assertThat(observationContext.get().getError()).isInstanceOf(IllegalArgumentException.class); + assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SERVER_ERROR"); + } + + private WebFilterChain createFilterChain(ThrowingConsumer exchangeConsumer) { return filterExchange -> { try { diff --git a/spring-web/src/test/java/org/springframework/web/server/adapter/HttpWebHandlerAdapterObservabilityTests.java b/spring-web/src/test/java/org/springframework/web/server/adapter/HttpWebHandlerAdapterObservabilityTests.java index 7e44daad549..8f83da58f54 100644 --- a/spring-web/src/test/java/org/springframework/web/server/adapter/HttpWebHandlerAdapterObservabilityTests.java +++ b/spring-web/src/test/java/org/springframework/web/server/adapter/HttpWebHandlerAdapterObservabilityTests.java @@ -20,13 +20,13 @@ package org.springframework.web.server.adapter; import java.util.List; import java.util.Optional; +import io.micrometer.observation.Observation; import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor; import io.micrometer.observation.tck.TestObservationRegistry; import io.micrometer.observation.tck.TestObservationRegistryAssert; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -import reactor.util.context.ContextView; import org.springframework.http.HttpStatus; import org.springframework.http.server.reactive.observation.ServerRequestObservationContext; @@ -66,7 +66,8 @@ class HttpWebHandlerAdapterObservabilityTests { void handlerShouldSetCurrentObservationInReactorContext() { ReactorContextWebHandler targetHandler = new ReactorContextWebHandler(); createWebHandler(targetHandler).handle(this.request, this.response).block(); - assertThat(targetHandler.contextView.getOrEmpty(ObservationThreadLocalAccessor.KEY)).isPresent(); + assertThat(targetHandler.currentObservation).isNotNull(); + assertThat(targetHandler.observationStarted).isTrue(); assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS"); } @@ -120,13 +121,16 @@ class HttpWebHandlerAdapterObservabilityTests { private static class ReactorContextWebHandler implements WebHandler { - ContextView contextView; + Observation currentObservation; + + boolean observationStarted; @Override public Mono handle(ServerWebExchange exchange) { exchange.getResponse().setStatusCode(HttpStatus.OK); return Mono.deferContextual(contextView -> { - this.contextView = contextView; + this.currentObservation = contextView.get(ObservationThreadLocalAccessor.KEY); + this.observationStarted = this.currentObservation.getContext().getLowCardinalityKeyValue("outcome") != null; return Mono.empty(); }); }