From d23a108e769ac30dba2afd2baf6802050f2a9b4b Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 3 Mar 2021 16:28:02 +0000 Subject: [PATCH] Consolidate AsyncListener registration Previously we registered 3 AsyncListener's from the request, from the response, and from the Servlet adapter. After this change, only the Servlet adapter registers a listener and the others are delegated to. This consolidates the handling of AsyncListener events so that it's easier to discover, trace, and enforce the order of handling. See gh-26434 --- .../reactive/ServletHttpHandlerAdapter.java | 95 ++++++++++++++----- .../reactive/ServletServerHttpRequest.java | 29 ++++-- .../reactive/ServletServerHttpResponse.java | 17 +++- 3 files changed, 107 insertions(+), 34 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java index 660968a957c..745a8b590fb 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java @@ -167,8 +167,12 @@ public class ServletHttpHandlerAdapter implements Servlet { asyncContext.setTimeout(-1); ServletServerHttpRequest httpRequest; + AsyncListener requestListener; + String logPrefix; try { httpRequest = createRequest(((HttpServletRequest) request), asyncContext); + requestListener = httpRequest.getAsyncListener(); + logPrefix = httpRequest.getLogPrefix(); } catch (URISyntaxException ex) { if (logger.isDebugEnabled()) { @@ -180,15 +184,17 @@ public class ServletHttpHandlerAdapter implements Servlet { } ServerHttpResponse httpResponse = createResponse(((HttpServletResponse) response), asyncContext, httpRequest); + AsyncListener responseListener = ((ServletServerHttpResponse) httpResponse).getAsyncListener(); if (httpRequest.getMethod() == HttpMethod.HEAD) { httpResponse = new HttpHeadResponseDecorator(httpResponse); } - AtomicBoolean isCompleted = new AtomicBoolean(); - HandlerResultSubscriber subscriber = new HandlerResultSubscriber(asyncContext, isCompleted, httpRequest); - HandlerResultAsyncListener listener = new HandlerResultAsyncListener(isCompleted, httpRequest, subscriber); + AtomicBoolean completionFlag = new AtomicBoolean(); + HandlerResultSubscriber subscriber = new HandlerResultSubscriber(asyncContext, completionFlag, logPrefix); + + asyncContext.addListener(new HttpHandlerAsyncListener( + requestListener, responseListener, subscriber, completionFlag, logPrefix)); - asyncContext.addListener(listener); this.httpHandler.handle(httpRequest, httpResponse).subscribe(subscriber); } @@ -244,27 +250,38 @@ public class ServletHttpHandlerAdapter implements Servlet { * cancel the write Publisher and signal onError/onComplete downstream to * the writing result Subscriber. */ - private static class HandlerResultAsyncListener implements AsyncListener { + private static class HttpHandlerAsyncListener implements AsyncListener { - private final AtomicBoolean isCompleted; + private final AsyncListener requestAsyncListener; - private final String logPrefix; + private final AsyncListener responseAsyncListener; - // We cannot have AsyncListener and HandlerResultSubscriber until WildFly 12+: + // We cannot have AsyncListener and HandlerResultSubscriber until WildFly 12+: // https://issues.jboss.org/browse/WFLY-8515 private final Runnable handlerDisposeTask; - public HandlerResultAsyncListener( - AtomicBoolean isCompleted, ServletServerHttpRequest request, Runnable handlerDisposeTask) { + private final AtomicBoolean completionFlag; - this.isCompleted = isCompleted; - this.logPrefix = request.getLogPrefix(); + private final String logPrefix; + + + public HttpHandlerAsyncListener( + AsyncListener requestAsyncListener, AsyncListener responseAsyncListener, + Runnable handlerDisposeTask, AtomicBoolean completionFlag, String logPrefix) { + + this.requestAsyncListener = requestAsyncListener; + this.responseAsyncListener = responseAsyncListener; this.handlerDisposeTask = handlerDisposeTask; + this.completionFlag = completionFlag; + this.logPrefix = logPrefix; } + @Override public void onTimeout(AsyncEvent event) { logger.debug(this.logPrefix + "Timeout notification"); + delegateTimeout(this.requestAsyncListener, event); + delegateTimeout(this.responseAsyncListener, event); handleTimeoutOrError(event); } @@ -272,12 +289,47 @@ public class ServletHttpHandlerAdapter implements Servlet { public void onError(AsyncEvent event) { Throwable ex = event.getThrowable(); logger.debug(this.logPrefix + "Error notification: " + (ex != null ? ex : "")); + delegateError(this.requestAsyncListener, event); + delegateError(this.responseAsyncListener, event); handleTimeoutOrError(event); } + @Override + public void onComplete(AsyncEvent event) { + delegateComplete(this.requestAsyncListener, event); + delegateComplete(this.responseAsyncListener, event); + } + + private static void delegateTimeout(AsyncListener listener, AsyncEvent event) { + try { + listener.onTimeout(event); + } + catch (Exception ex) { + // Ignore + } + } + + private static void delegateError(AsyncListener listener, AsyncEvent event) { + try { + listener.onError(event); + } + catch (Exception ex) { + // Ignore + } + } + + private static void delegateComplete(AsyncListener listener, AsyncEvent event) { + try { + listener.onComplete(event); + } + catch (Exception ex) { + // Ignore + } + } + private void handleTimeoutOrError(AsyncEvent event) { AsyncContext context = event.getAsyncContext(); - runIfAsyncNotComplete(context, this.isCompleted, () -> { + runIfAsyncNotComplete(context, this.completionFlag, () -> { try { this.handlerDisposeTask.run(); } @@ -291,11 +343,6 @@ public class ServletHttpHandlerAdapter implements Servlet { public void onStartAsync(AsyncEvent event) { // no-op } - - @Override - public void onComplete(AsyncEvent event) { - // no-op - } } @@ -303,7 +350,7 @@ public class ServletHttpHandlerAdapter implements Servlet { private final AsyncContext asyncContext; - private final AtomicBoolean isCompleted; + private final AtomicBoolean completionFlag; private final String logPrefix; @@ -311,11 +358,11 @@ public class ServletHttpHandlerAdapter implements Servlet { private volatile Subscription subscription; public HandlerResultSubscriber( - AsyncContext asyncContext, AtomicBoolean isCompleted, ServletServerHttpRequest httpRequest) { + AsyncContext asyncContext, AtomicBoolean completionFlag, String logPrefix) { this.asyncContext = asyncContext; - this.isCompleted = isCompleted; - this.logPrefix = httpRequest.getLogPrefix(); + this.completionFlag = completionFlag; + this.logPrefix = logPrefix; } @Override @@ -332,7 +379,7 @@ public class ServletHttpHandlerAdapter implements Servlet { @Override public void onError(Throwable ex) { logger.trace(this.logPrefix + "Failed to complete: " + ex.getMessage()); - runIfAsyncNotComplete(this.asyncContext, this.isCompleted, () -> { + runIfAsyncNotComplete(this.asyncContext, this.completionFlag, () -> { if (this.asyncContext.getResponse().isCommitted()) { logger.trace(this.logPrefix + "Dispatch to container, to raise the error on servlet thread"); this.asyncContext.getRequest().setAttribute(WRITE_ERROR_ATTRIBUTE_NAME, ex); @@ -354,7 +401,7 @@ public class ServletHttpHandlerAdapter implements Servlet { @Override public void onComplete() { logger.trace(this.logPrefix + "Handling completed"); - runIfAsyncNotComplete(this.asyncContext, this.isCompleted, this.asyncContext::complete); + runIfAsyncNotComplete(this.asyncContext, this.completionFlag, this.asyncContext::complete); } @Override diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java index 693255b61b7..a84ddc6d6e3 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -73,6 +73,9 @@ class ServletServerHttpRequest extends AbstractServerHttpRequest { private final byte[] buffer; + private final AsyncListener asyncListener; + + public ServletServerHttpRequest(HttpServletRequest request, AsyncContext asyncContext, String servletPath, DataBufferFactory bufferFactory, int bufferSize) throws IOException, URISyntaxException { @@ -93,7 +96,7 @@ class ServletServerHttpRequest extends AbstractServerHttpRequest { this.bufferFactory = bufferFactory; this.buffer = new byte[bufferSize]; - asyncContext.addListener(new RequestAsyncListener()); + this.asyncListener = new RequestAsyncListener(); // Tomcat expects ReadListener registration on initial thread ServletInputStream inputStream = request.getInputStream(); @@ -214,6 +217,22 @@ class ServletServerHttpRequest extends AbstractServerHttpRequest { return Flux.from(this.bodyPublisher); } + @SuppressWarnings("unchecked") + @Override + public T getNativeRequest() { + return (T) this.request; + } + + /** + * Return an {@link RequestAsyncListener} that completes the request body + * Publisher when the Servlet container notifies that request input has ended. + * The listener is not actually registered but is rather exposed for + * {@link ServletHttpHandlerAdapter} to ensure events are delegated. + */ + AsyncListener getAsyncListener() { + return this.asyncListener; + } + /** * Read from the request body InputStream and return a DataBuffer. * Invoked only when {@link ServletInputStream#isReady()} returns "true". @@ -245,12 +264,6 @@ class ServletServerHttpRequest extends AbstractServerHttpRequest { } } - @SuppressWarnings("unchecked") - @Override - public T getNativeRequest() { - return (T) this.request; - } - private final class RequestAsyncListener implements AsyncListener { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java index a4e17276498..c621246f567 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -65,6 +65,9 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse { private final ServletServerHttpRequest request; + private final AsyncListener asyncListener; + + public ServletServerHttpResponse(HttpServletResponse response, AsyncContext asyncContext, DataBufferFactory bufferFactory, int bufferSize, ServletServerHttpRequest request) throws IOException { @@ -85,7 +88,7 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse { this.bufferSize = bufferSize; this.request = request; - asyncContext.addListener(new ResponseAsyncListener()); + this.asyncListener = new ResponseAsyncListener(); // Tomcat expects WriteListener registration on initial thread response.getOutputStream().setWriteListener(new ResponseBodyWriteListener()); @@ -165,6 +168,16 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse { } } + /** + * Return an {@link ResponseAsyncListener} that notifies the response + * body Publisher and Subscriber of Servlet container events. The listener + * is not actually registered but is rather exposed for + * {@link ServletHttpHandlerAdapter} to ensure events are delegated. + */ + AsyncListener getAsyncListener() { + return this.asyncListener; + } + @Override protected Processor, Void> createBodyFlushProcessor() { ResponseBodyFlushProcessor processor = new ResponseBodyFlushProcessor();