Listen to multiple async operations in ServerHttpObservationFilter

Prior to this commit, the `ServerHttpObservationFilter` was fixed to
re-enable instrumentation for async dispatches. This fix involves using
an AsyncListener to be notified of exchange completion.

This change was incomplete, as this would not work in some cases.
If another filter starts the async mode and initiates an ASYNC dispatch,
before async handling at the controller level, the async listener is not
registered against subsequent async starts.

This commit not only ensures that the async listener registers
against new async starts, but also ensure that the initial creation and
registration only happens during the initial REQUEST dispatch.

Fixes gh-33451
This commit is contained in:
Brian Clozel 2024-08-30 20:09:21 +02:00
parent fab889009a
commit debba6545b
2 changed files with 46 additions and 3 deletions

View File

@ -118,13 +118,13 @@ public class ServerHttpObservationFilter extends OncePerRequestFilter {
throw ex;
}
finally {
// If async is started, register a listener for completion notification.
if (request.isAsyncStarted()) {
// If async is started during the first dispatch, register a listener for completion notification.
if (request.isAsyncStarted() && request.getDispatcherType() == DispatcherType.REQUEST) {
request.getAsyncContext().addListener(new ObservationAsyncListener(observation));
}
// scope is opened for ASYNC dispatches, but the observation will be closed
// by the async listener.
else if (request.getDispatcherType() != DispatcherType.ASYNC){
else if (!isAsyncDispatch(request)) {
Throwable error = fetchException(request);
if (error != null) {
observation.error(error);
@ -168,6 +168,7 @@ public class ServerHttpObservationFilter extends OncePerRequestFilter {
@Override
public void onStartAsync(AsyncEvent event) {
event.getAsyncContext().addListener(this);
}
@Override

View File

@ -21,11 +21,16 @@ import java.io.IOException;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.observation.tck.TestObservationRegistry;
import io.micrometer.observation.tck.TestObservationRegistryAssert;
import jakarta.servlet.AsyncContext;
import jakarta.servlet.AsyncEvent;
import jakarta.servlet.AsyncListener;
import jakarta.servlet.DispatcherType;
import jakarta.servlet.Filter;
import jakarta.servlet.FilterChain;
import jakarta.servlet.RequestDispatcher;
import jakarta.servlet.ServletException;
import jakarta.servlet.ServletRequest;
import jakarta.servlet.ServletResponse;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
@ -139,6 +144,21 @@ class ServerHttpObservationFilterTests {
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS").hasBeenStopped();
}
@Test
void shouldRegisterListenerForAsyncStarts() throws Exception {
CustomAsyncFilter customAsyncFilter = new CustomAsyncFilter();
this.mockFilterChain = new MockFilterChain(new NoOpServlet(), customAsyncFilter);
this.request.setAsyncSupported(true);
this.request.setDispatcherType(DispatcherType.REQUEST);
this.filter.doFilter(this.request, this.response, this.mockFilterChain);
customAsyncFilter.asyncContext.dispatch();
this.request.setDispatcherType(DispatcherType.ASYNC);
AsyncContext newAsyncContext = this.request.startAsync();
newAsyncContext.complete();
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS").hasBeenStopped();
}
@Test
void shouldCloseObservationAfterAsyncError() throws Exception {
this.request.setAsyncSupported(true);
@ -187,4 +207,26 @@ class ServerHttpObservationFilterTests {
}
}
@SuppressWarnings("serial")
static class NoOpServlet extends HttpServlet {
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
}
}
static class CustomAsyncFilter implements Filter {
AsyncContext asyncContext;
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
this.asyncContext = servletRequest.startAsync();
filterChain.doFilter(servletRequest, servletResponse);
}
}
}