Merge branch '6.0.x'
This commit is contained in:
commit
579f29857f
|
|
@ -18,12 +18,14 @@ package org.springframework.web.filter.reactive;
|
|||
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import io.micrometer.observation.Observation;
|
||||
import io.micrometer.observation.ObservationRegistry;
|
||||
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.observability.DefaultSignalListener;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.util.context.Context;
|
||||
|
||||
import org.springframework.http.server.reactive.ServerHttpResponse;
|
||||
import org.springframework.http.server.reactive.observation.DefaultServerRequestObservationConvention;
|
||||
|
|
@ -102,40 +104,69 @@ public class ServerHttpObservationFilter implements WebFilter {
|
|||
ServerRequestObservationContext observationContext = new ServerRequestObservationContext(exchange.getRequest(),
|
||||
exchange.getResponse(), exchange.getAttributes());
|
||||
exchange.getAttributes().put(CURRENT_OBSERVATION_CONTEXT_ATTRIBUTE, observationContext);
|
||||
return chain.filter(exchange).transformDeferred(call -> filter(exchange, observationContext, call));
|
||||
return chain.filter(exchange).tap(() -> new ObservationSignalListener(observationContext));
|
||||
}
|
||||
|
||||
private Publisher<Void> filter(ServerWebExchange exchange, ServerRequestObservationContext observationContext, Mono<Void> call) {
|
||||
Observation observation = ServerHttpObservationDocumentation.HTTP_REACTIVE_SERVER_REQUESTS.observation(this.observationConvention,
|
||||
DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, this.observationRegistry);
|
||||
observation.start();
|
||||
return call.doOnEach(signal -> {
|
||||
Throwable throwable = signal.getThrowable();
|
||||
if (throwable != null) {
|
||||
if (DISCONNECTED_CLIENT_EXCEPTIONS.contains(throwable.getClass().getSimpleName())) {
|
||||
observationContext.setConnectionAborted(true);
|
||||
}
|
||||
observationContext.setError(throwable);
|
||||
}
|
||||
onTerminalSignal(observation, exchange);
|
||||
})
|
||||
.doOnCancel(() -> {
|
||||
observationContext.setConnectionAborted(true);
|
||||
observation.stop();
|
||||
})
|
||||
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, observation));
|
||||
}
|
||||
private final class ObservationSignalListener extends DefaultSignalListener<Void> {
|
||||
|
||||
private void onTerminalSignal(Observation observation, ServerWebExchange exchange) {
|
||||
ServerHttpResponse response = exchange.getResponse();
|
||||
if (response.isCommitted()) {
|
||||
observation.stop();
|
||||
private static final Set<String> DISCONNECTED_CLIENT_EXCEPTIONS = Set.of("AbortedException",
|
||||
"ClientAbortException", "EOFException", "EofException");
|
||||
|
||||
private final ServerRequestObservationContext observationContext;
|
||||
|
||||
private final Observation observation;
|
||||
|
||||
private AtomicBoolean observationRecorded = new AtomicBoolean();
|
||||
|
||||
public ObservationSignalListener(ServerRequestObservationContext observationContext) {
|
||||
this.observationContext = observationContext;
|
||||
this.observation = ServerHttpObservationDocumentation.HTTP_REACTIVE_SERVER_REQUESTS.observation(observationConvention,
|
||||
DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, observationRegistry);
|
||||
}
|
||||
else {
|
||||
response.beforeCommit(() -> {
|
||||
observation.stop();
|
||||
return Mono.empty();
|
||||
});
|
||||
|
||||
@Override
|
||||
public void doOnSubscription() throws Throwable {
|
||||
this.observation.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Context addToContext(Context originalContext) {
|
||||
return originalContext.put(ObservationThreadLocalAccessor.KEY, this.observation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doOnCancel() throws Throwable {
|
||||
if (this.observationRecorded.compareAndSet(false, true)) {
|
||||
this.observationContext.setConnectionAborted(true);
|
||||
this.observation.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doOnComplete() throws Throwable {
|
||||
if (this.observationRecorded.compareAndSet(false, true)) {
|
||||
ServerHttpResponse response = this.observationContext.getResponse();
|
||||
if (response.isCommitted()) {
|
||||
this.observation.stop();
|
||||
}
|
||||
else {
|
||||
response.beforeCommit(() -> {
|
||||
this.observation.stop();
|
||||
return Mono.empty();
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doOnError(Throwable error) throws Throwable {
|
||||
if (this.observationRecorded.compareAndSet(false, true)) {
|
||||
if (DISCONNECTED_CLIENT_EXCEPTIONS.contains(error.getClass().getSimpleName())) {
|
||||
this.observationContext.setConnectionAborted(true);
|
||||
}
|
||||
this.observationContext.setError(error);
|
||||
this.observation.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue