Consolidate AsyncListener registration

Previously we registered 3 AsyncListener's from the request, from the
response, and from the Servlet adapter.

After this change, only the Servlet adapter registers a listener and
the others are delegated to. This consolidates the handling of
AsyncListener events so that it's easier to discover, trace, and
enforce the order of handling.

See gh-26434
This commit is contained in:
Rossen Stoyanchev 2021-03-03 16:28:02 +00:00
parent b9a612b637
commit d23a108e76
3 changed files with 107 additions and 34 deletions

View File

@ -167,8 +167,12 @@ public class ServletHttpHandlerAdapter implements Servlet {
asyncContext.setTimeout(-1); asyncContext.setTimeout(-1);
ServletServerHttpRequest httpRequest; ServletServerHttpRequest httpRequest;
AsyncListener requestListener;
String logPrefix;
try { try {
httpRequest = createRequest(((HttpServletRequest) request), asyncContext); httpRequest = createRequest(((HttpServletRequest) request), asyncContext);
requestListener = httpRequest.getAsyncListener();
logPrefix = httpRequest.getLogPrefix();
} }
catch (URISyntaxException ex) { catch (URISyntaxException ex) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
@ -180,15 +184,17 @@ public class ServletHttpHandlerAdapter implements Servlet {
} }
ServerHttpResponse httpResponse = createResponse(((HttpServletResponse) response), asyncContext, httpRequest); ServerHttpResponse httpResponse = createResponse(((HttpServletResponse) response), asyncContext, httpRequest);
AsyncListener responseListener = ((ServletServerHttpResponse) httpResponse).getAsyncListener();
if (httpRequest.getMethod() == HttpMethod.HEAD) { if (httpRequest.getMethod() == HttpMethod.HEAD) {
httpResponse = new HttpHeadResponseDecorator(httpResponse); httpResponse = new HttpHeadResponseDecorator(httpResponse);
} }
AtomicBoolean isCompleted = new AtomicBoolean(); AtomicBoolean completionFlag = new AtomicBoolean();
HandlerResultSubscriber subscriber = new HandlerResultSubscriber(asyncContext, isCompleted, httpRequest); HandlerResultSubscriber subscriber = new HandlerResultSubscriber(asyncContext, completionFlag, logPrefix);
HandlerResultAsyncListener listener = new HandlerResultAsyncListener(isCompleted, httpRequest, subscriber);
asyncContext.addListener(new HttpHandlerAsyncListener(
requestListener, responseListener, subscriber, completionFlag, logPrefix));
asyncContext.addListener(listener);
this.httpHandler.handle(httpRequest, httpResponse).subscribe(subscriber); this.httpHandler.handle(httpRequest, httpResponse).subscribe(subscriber);
} }
@ -244,27 +250,38 @@ public class ServletHttpHandlerAdapter implements Servlet {
* cancel the write Publisher and signal onError/onComplete downstream to * cancel the write Publisher and signal onError/onComplete downstream to
* the writing result Subscriber. * the writing result Subscriber.
*/ */
private static class HandlerResultAsyncListener implements AsyncListener { private static class HttpHandlerAsyncListener implements AsyncListener {
private final AtomicBoolean isCompleted; private final AsyncListener requestAsyncListener;
private final String logPrefix; private final AsyncListener responseAsyncListener;
// We cannot have AsyncListener and HandlerResultSubscriber until WildFly 12+: // We cannot have AsyncListener and HandlerResultSubscriber until WildFly 12+:
// https://issues.jboss.org/browse/WFLY-8515 // https://issues.jboss.org/browse/WFLY-8515
private final Runnable handlerDisposeTask; private final Runnable handlerDisposeTask;
public HandlerResultAsyncListener( private final AtomicBoolean completionFlag;
AtomicBoolean isCompleted, ServletServerHttpRequest request, Runnable handlerDisposeTask) {
this.isCompleted = isCompleted; private final String logPrefix;
this.logPrefix = request.getLogPrefix();
public HttpHandlerAsyncListener(
AsyncListener requestAsyncListener, AsyncListener responseAsyncListener,
Runnable handlerDisposeTask, AtomicBoolean completionFlag, String logPrefix) {
this.requestAsyncListener = requestAsyncListener;
this.responseAsyncListener = responseAsyncListener;
this.handlerDisposeTask = handlerDisposeTask; this.handlerDisposeTask = handlerDisposeTask;
this.completionFlag = completionFlag;
this.logPrefix = logPrefix;
} }
@Override @Override
public void onTimeout(AsyncEvent event) { public void onTimeout(AsyncEvent event) {
logger.debug(this.logPrefix + "Timeout notification"); logger.debug(this.logPrefix + "Timeout notification");
delegateTimeout(this.requestAsyncListener, event);
delegateTimeout(this.responseAsyncListener, event);
handleTimeoutOrError(event); handleTimeoutOrError(event);
} }
@ -272,12 +289,47 @@ public class ServletHttpHandlerAdapter implements Servlet {
public void onError(AsyncEvent event) { public void onError(AsyncEvent event) {
Throwable ex = event.getThrowable(); Throwable ex = event.getThrowable();
logger.debug(this.logPrefix + "Error notification: " + (ex != null ? ex : "<no Throwable>")); logger.debug(this.logPrefix + "Error notification: " + (ex != null ? ex : "<no Throwable>"));
delegateError(this.requestAsyncListener, event);
delegateError(this.responseAsyncListener, event);
handleTimeoutOrError(event); handleTimeoutOrError(event);
} }
@Override
public void onComplete(AsyncEvent event) {
delegateComplete(this.requestAsyncListener, event);
delegateComplete(this.responseAsyncListener, event);
}
private static void delegateTimeout(AsyncListener listener, AsyncEvent event) {
try {
listener.onTimeout(event);
}
catch (Exception ex) {
// Ignore
}
}
private static void delegateError(AsyncListener listener, AsyncEvent event) {
try {
listener.onError(event);
}
catch (Exception ex) {
// Ignore
}
}
private static void delegateComplete(AsyncListener listener, AsyncEvent event) {
try {
listener.onComplete(event);
}
catch (Exception ex) {
// Ignore
}
}
private void handleTimeoutOrError(AsyncEvent event) { private void handleTimeoutOrError(AsyncEvent event) {
AsyncContext context = event.getAsyncContext(); AsyncContext context = event.getAsyncContext();
runIfAsyncNotComplete(context, this.isCompleted, () -> { runIfAsyncNotComplete(context, this.completionFlag, () -> {
try { try {
this.handlerDisposeTask.run(); this.handlerDisposeTask.run();
} }
@ -291,11 +343,6 @@ public class ServletHttpHandlerAdapter implements Servlet {
public void onStartAsync(AsyncEvent event) { public void onStartAsync(AsyncEvent event) {
// no-op // no-op
} }
@Override
public void onComplete(AsyncEvent event) {
// no-op
}
} }
@ -303,7 +350,7 @@ public class ServletHttpHandlerAdapter implements Servlet {
private final AsyncContext asyncContext; private final AsyncContext asyncContext;
private final AtomicBoolean isCompleted; private final AtomicBoolean completionFlag;
private final String logPrefix; private final String logPrefix;
@ -311,11 +358,11 @@ public class ServletHttpHandlerAdapter implements Servlet {
private volatile Subscription subscription; private volatile Subscription subscription;
public HandlerResultSubscriber( public HandlerResultSubscriber(
AsyncContext asyncContext, AtomicBoolean isCompleted, ServletServerHttpRequest httpRequest) { AsyncContext asyncContext, AtomicBoolean completionFlag, String logPrefix) {
this.asyncContext = asyncContext; this.asyncContext = asyncContext;
this.isCompleted = isCompleted; this.completionFlag = completionFlag;
this.logPrefix = httpRequest.getLogPrefix(); this.logPrefix = logPrefix;
} }
@Override @Override
@ -332,7 +379,7 @@ public class ServletHttpHandlerAdapter implements Servlet {
@Override @Override
public void onError(Throwable ex) { public void onError(Throwable ex) {
logger.trace(this.logPrefix + "Failed to complete: " + ex.getMessage()); logger.trace(this.logPrefix + "Failed to complete: " + ex.getMessage());
runIfAsyncNotComplete(this.asyncContext, this.isCompleted, () -> { runIfAsyncNotComplete(this.asyncContext, this.completionFlag, () -> {
if (this.asyncContext.getResponse().isCommitted()) { if (this.asyncContext.getResponse().isCommitted()) {
logger.trace(this.logPrefix + "Dispatch to container, to raise the error on servlet thread"); logger.trace(this.logPrefix + "Dispatch to container, to raise the error on servlet thread");
this.asyncContext.getRequest().setAttribute(WRITE_ERROR_ATTRIBUTE_NAME, ex); this.asyncContext.getRequest().setAttribute(WRITE_ERROR_ATTRIBUTE_NAME, ex);
@ -354,7 +401,7 @@ public class ServletHttpHandlerAdapter implements Servlet {
@Override @Override
public void onComplete() { public void onComplete() {
logger.trace(this.logPrefix + "Handling completed"); logger.trace(this.logPrefix + "Handling completed");
runIfAsyncNotComplete(this.asyncContext, this.isCompleted, this.asyncContext::complete); runIfAsyncNotComplete(this.asyncContext, this.completionFlag, this.asyncContext::complete);
} }
@Override @Override

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2020 the original author or authors. * Copyright 2002-2021 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -73,6 +73,9 @@ class ServletServerHttpRequest extends AbstractServerHttpRequest {
private final byte[] buffer; private final byte[] buffer;
private final AsyncListener asyncListener;
public ServletServerHttpRequest(HttpServletRequest request, AsyncContext asyncContext, public ServletServerHttpRequest(HttpServletRequest request, AsyncContext asyncContext,
String servletPath, DataBufferFactory bufferFactory, int bufferSize) String servletPath, DataBufferFactory bufferFactory, int bufferSize)
throws IOException, URISyntaxException { throws IOException, URISyntaxException {
@ -93,7 +96,7 @@ class ServletServerHttpRequest extends AbstractServerHttpRequest {
this.bufferFactory = bufferFactory; this.bufferFactory = bufferFactory;
this.buffer = new byte[bufferSize]; this.buffer = new byte[bufferSize];
asyncContext.addListener(new RequestAsyncListener()); this.asyncListener = new RequestAsyncListener();
// Tomcat expects ReadListener registration on initial thread // Tomcat expects ReadListener registration on initial thread
ServletInputStream inputStream = request.getInputStream(); ServletInputStream inputStream = request.getInputStream();
@ -214,6 +217,22 @@ class ServletServerHttpRequest extends AbstractServerHttpRequest {
return Flux.from(this.bodyPublisher); return Flux.from(this.bodyPublisher);
} }
@SuppressWarnings("unchecked")
@Override
public <T> T getNativeRequest() {
return (T) this.request;
}
/**
* Return an {@link RequestAsyncListener} that completes the request body
* Publisher when the Servlet container notifies that request input has ended.
* The listener is not actually registered but is rather exposed for
* {@link ServletHttpHandlerAdapter} to ensure events are delegated.
*/
AsyncListener getAsyncListener() {
return this.asyncListener;
}
/** /**
* Read from the request body InputStream and return a DataBuffer. * Read from the request body InputStream and return a DataBuffer.
* Invoked only when {@link ServletInputStream#isReady()} returns "true". * Invoked only when {@link ServletInputStream#isReady()} returns "true".
@ -245,12 +264,6 @@ class ServletServerHttpRequest extends AbstractServerHttpRequest {
} }
} }
@SuppressWarnings("unchecked")
@Override
public <T> T getNativeRequest() {
return (T) this.request;
}
private final class RequestAsyncListener implements AsyncListener { private final class RequestAsyncListener implements AsyncListener {

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2020 the original author or authors. * Copyright 2002-2021 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -65,6 +65,9 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse {
private final ServletServerHttpRequest request; private final ServletServerHttpRequest request;
private final AsyncListener asyncListener;
public ServletServerHttpResponse(HttpServletResponse response, AsyncContext asyncContext, public ServletServerHttpResponse(HttpServletResponse response, AsyncContext asyncContext,
DataBufferFactory bufferFactory, int bufferSize, ServletServerHttpRequest request) throws IOException { DataBufferFactory bufferFactory, int bufferSize, ServletServerHttpRequest request) throws IOException {
@ -85,7 +88,7 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse {
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.request = request; this.request = request;
asyncContext.addListener(new ResponseAsyncListener()); this.asyncListener = new ResponseAsyncListener();
// Tomcat expects WriteListener registration on initial thread // Tomcat expects WriteListener registration on initial thread
response.getOutputStream().setWriteListener(new ResponseBodyWriteListener()); response.getOutputStream().setWriteListener(new ResponseBodyWriteListener());
@ -165,6 +168,16 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse {
} }
} }
/**
* Return an {@link ResponseAsyncListener} that notifies the response
* body Publisher and Subscriber of Servlet container events. The listener
* is not actually registered but is rather exposed for
* {@link ServletHttpHandlerAdapter} to ensure events are delegated.
*/
AsyncListener getAsyncListener() {
return this.asyncListener;
}
@Override @Override
protected Processor<? super Publisher<? extends DataBuffer>, Void> createBodyFlushProcessor() { protected Processor<? super Publisher<? extends DataBuffer>, Void> createBodyFlushProcessor() {
ResponseBodyFlushProcessor processor = new ResponseBodyFlushProcessor(); ResponseBodyFlushProcessor processor = new ResponseBodyFlushProcessor();