From c2137a0d2fb74127b799b37bd8d41ccff44a0b3f Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 26 Feb 2021 14:28:07 +0000 Subject: [PATCH] Cancel handling onError/Timeout in ServletHttpHandlerAdapter This commit ensures handling is cancelled in case of onError/Timeout callback from the Servlet container. Separately we detect the same in ServletServerHttpRequest and ServletServerHttpResponse, which signal onError to the read publisher and cancel writing, but if the onError/Timeout arrives after reading is done and before writing has started (e.g. longer handling), then neither will reach handling. See gh-26434, gh-26407 --- .../reactive/ServletHttpHandlerAdapter.java | 49 ++++++++++++++----- 1 file changed, 37 insertions(+), 12 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java index 54961a8449d..660968a957c 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java @@ -185,10 +185,10 @@ public class ServletHttpHandlerAdapter implements Servlet { } AtomicBoolean isCompleted = new AtomicBoolean(); - HandlerResultAsyncListener listener = new HandlerResultAsyncListener(isCompleted, httpRequest); - asyncContext.addListener(listener); - HandlerResultSubscriber subscriber = new HandlerResultSubscriber(asyncContext, isCompleted, httpRequest); + HandlerResultAsyncListener listener = new HandlerResultAsyncListener(isCompleted, httpRequest, subscriber); + + asyncContext.addListener(listener); this.httpHandler.handle(httpRequest, httpResponse).subscribe(subscriber); } @@ -222,10 +222,6 @@ public class ServletHttpHandlerAdapter implements Servlet { } - /** - * We cannot combine ERROR_LISTENER and HandlerResultSubscriber due to: - * https://issues.jboss.org/browse/WFLY-8515. - */ private static void runIfAsyncNotComplete(AsyncContext asyncContext, AtomicBoolean isCompleted, Runnable task) { try { if (asyncContext.getRequest().isAsyncStarted() && isCompleted.compareAndSet(false, true)) { @@ -254,24 +250,41 @@ public class ServletHttpHandlerAdapter implements Servlet { private final String logPrefix; - public HandlerResultAsyncListener(AtomicBoolean isCompleted, ServletServerHttpRequest request) { + // We cannot have AsyncListener and HandlerResultSubscriber until WildFly 12+: + // https://issues.jboss.org/browse/WFLY-8515 + private final Runnable handlerDisposeTask; + + public HandlerResultAsyncListener( + AtomicBoolean isCompleted, ServletServerHttpRequest request, Runnable handlerDisposeTask) { + this.isCompleted = isCompleted; this.logPrefix = request.getLogPrefix(); + this.handlerDisposeTask = handlerDisposeTask; } @Override public void onTimeout(AsyncEvent event) { logger.debug(this.logPrefix + "Timeout notification"); - AsyncContext context = event.getAsyncContext(); - runIfAsyncNotComplete(context, this.isCompleted, context::complete); + handleTimeoutOrError(event); } @Override public void onError(AsyncEvent event) { Throwable ex = event.getThrowable(); logger.debug(this.logPrefix + "Error notification: " + (ex != null ? ex : "")); + handleTimeoutOrError(event); + } + + private void handleTimeoutOrError(AsyncEvent event) { AsyncContext context = event.getAsyncContext(); - runIfAsyncNotComplete(context, this.isCompleted, context::complete); + runIfAsyncNotComplete(context, this.isCompleted, () -> { + try { + this.handlerDisposeTask.run(); + } + finally { + context.complete(); + } + }); } @Override @@ -286,7 +299,7 @@ public class ServletHttpHandlerAdapter implements Servlet { } - private static class HandlerResultSubscriber implements Subscriber { + private static class HandlerResultSubscriber implements Subscriber, Runnable { private final AsyncContext asyncContext; @@ -294,6 +307,9 @@ public class ServletHttpHandlerAdapter implements Servlet { private final String logPrefix; + @Nullable + private volatile Subscription subscription; + public HandlerResultSubscriber( AsyncContext asyncContext, AtomicBoolean isCompleted, ServletServerHttpRequest httpRequest) { @@ -304,6 +320,7 @@ public class ServletHttpHandlerAdapter implements Servlet { @Override public void onSubscribe(Subscription subscription) { + this.subscription = subscription; subscription.request(Long.MAX_VALUE); } @@ -339,6 +356,14 @@ public class ServletHttpHandlerAdapter implements Servlet { logger.trace(this.logPrefix + "Handling completed"); runIfAsyncNotComplete(this.asyncContext, this.isCompleted, this.asyncContext::complete); } + + @Override + public void run() { + Subscription s = this.subscription; + if (s != null) { + s.cancel(); + } + } } }