Polishing in WebAsyncManager

See gh-34192
This commit is contained in:
rstoyanchev 2025-01-02 13:32:41 +00:00
parent 181db1db75
commit 5a44897c55
4 changed files with 107 additions and 97 deletions

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2024 the original author or authors. * Copyright 2002-2025 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -288,10 +288,28 @@ public class DeferredResult<T> {
} }
final DeferredResultProcessingInterceptor getInterceptor() { final DeferredResultProcessingInterceptor getLifecycleInterceptor() {
return new DeferredResultProcessingInterceptor() { return new LifecycleInterceptor();
}
/**
* Handles a DeferredResult value when set.
*/
@FunctionalInterface
public interface DeferredResultHandler {
void handleResult(@Nullable Object result);
}
/**
* Instance interceptor to receive Servlet container notifications.
*/
private class LifecycleInterceptor implements DeferredResultProcessingInterceptor {
@Override @Override
public <S> boolean handleTimeout(NativeWebRequest request, DeferredResult<S> deferredResult) { public <S> boolean handleTimeout(NativeWebRequest request, DeferredResult<S> result) {
boolean continueProcessing = true; boolean continueProcessing = true;
try { try {
if (timeoutCallback != null) { if (timeoutCallback != null) {
@ -312,8 +330,9 @@ public class DeferredResult<T> {
} }
return continueProcessing; return continueProcessing;
} }
@Override @Override
public <S> boolean handleError(NativeWebRequest request, DeferredResult<S> deferredResult, Throwable t) { public <S> boolean handleError(NativeWebRequest request, DeferredResult<S> result, Throwable t) {
try { try {
if (errorCallback != null) { if (errorCallback != null) {
errorCallback.accept(t); errorCallback.accept(t);
@ -329,24 +348,15 @@ public class DeferredResult<T> {
} }
return false; return false;
} }
@Override @Override
public <S> void afterCompletion(NativeWebRequest request, DeferredResult<S> deferredResult) { public <S> void afterCompletion(NativeWebRequest request, DeferredResult<S> result) {
expired = true; expired = true;
if (completionCallback != null) { if (completionCallback != null) {
completionCallback.run(); completionCallback.run();
} }
} }
};
}
/**
* Handles a DeferredResult value when set.
*/
@FunctionalInterface
public interface DeferredResultHandler {
void handleResult(@Nullable Object result);
} }
} }

View File

@ -383,36 +383,6 @@ public final class WebAsyncManager {
} }
} }
private void setConcurrentResultAndDispatch(@Nullable Object result) {
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
synchronized (WebAsyncManager.this) {
if (!this.state.compareAndSet(State.ASYNC_PROCESSING, State.RESULT_SET)) {
if (logger.isDebugEnabled()) {
logger.debug("Async result already set: [" + this.state.get() +
"], ignored result for " + formatUri(this.asyncWebRequest));
}
return;
}
this.concurrentResult = result;
if (logger.isDebugEnabled()) {
logger.debug("Async result set for " + formatUri(this.asyncWebRequest));
}
if (this.asyncWebRequest.isAsyncComplete()) {
if (logger.isDebugEnabled()) {
logger.debug("Async request already completed for " + formatUri(this.asyncWebRequest));
}
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Performing async dispatch for " + formatUri(this.asyncWebRequest));
}
this.asyncWebRequest.dispatch();
}
}
/** /**
* Start concurrent request processing and initialize the given * Start concurrent request processing and initialize the given
* {@link DeferredResult} with a {@link DeferredResultHandler} that saves * {@link DeferredResult} with a {@link DeferredResultHandler} that saves
@ -445,7 +415,7 @@ public final class WebAsyncManager {
} }
List<DeferredResultProcessingInterceptor> interceptors = new ArrayList<>(); List<DeferredResultProcessingInterceptor> interceptors = new ArrayList<>();
interceptors.add(deferredResult.getInterceptor()); interceptors.add(deferredResult.getLifecycleInterceptor());
interceptors.addAll(this.deferredResultInterceptors.values()); interceptors.addAll(this.deferredResultInterceptors.values());
interceptors.add(timeoutDeferredResultInterceptor); interceptors.add(timeoutDeferredResultInterceptor);
@ -510,6 +480,36 @@ public final class WebAsyncManager {
this.asyncWebRequest.startAsync(); this.asyncWebRequest.startAsync();
} }
private void setConcurrentResultAndDispatch(@Nullable Object result) {
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
synchronized (WebAsyncManager.this) {
if (!this.state.compareAndSet(State.ASYNC_PROCESSING, State.RESULT_SET)) {
if (logger.isDebugEnabled()) {
logger.debug("Async result already set: [" + this.state.get() + "], " +
"ignored result for " + formatUri(this.asyncWebRequest));
}
return;
}
this.concurrentResult = result;
if (logger.isDebugEnabled()) {
logger.debug("Async result set for " + formatUri(this.asyncWebRequest));
}
if (this.asyncWebRequest.isAsyncComplete()) {
if (logger.isDebugEnabled()) {
logger.debug("Async request already completed for " + formatUri(this.asyncWebRequest));
}
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Performing async dispatch for " + formatUri(this.asyncWebRequest));
}
this.asyncWebRequest.dispatch();
}
}
private static String formatUri(AsyncWebRequest asyncWebRequest) { private static String formatUri(AsyncWebRequest asyncWebRequest) {
HttpServletRequest request = asyncWebRequest.getNativeRequest(HttpServletRequest.class); HttpServletRequest request = asyncWebRequest.getNativeRequest(HttpServletRequest.class);
return (request != null ? "\"" + request.getRequestURI() + "\"" : "servlet container"); return (request != null ? "\"" + request.getRequestURI() + "\"" : "servlet container");
@ -519,13 +519,13 @@ public final class WebAsyncManager {
/** /**
* Represents a state for {@link WebAsyncManager} to be in. * Represents a state for {@link WebAsyncManager} to be in.
* <p><pre> * <p><pre>
* NOT_STARTED <------+ * +------> NOT_STARTED <------+
* | | * | | |
* v | * | v |
* ASYNC_PROCESSING | * | ASYNC_PROCESSING |
* | | * | | |
* v | * | v |
* RESULT_SET -------+ * <-------+ RESULT_SET -------+
* </pre> * </pre>
* @since 5.3.33 * @since 5.3.33
*/ */

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2024 the original author or authors. * Copyright 2002-2025 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -93,7 +93,7 @@ class DeferredResultTests {
DeferredResult<String> result = new DeferredResult<>(); DeferredResult<String> result = new DeferredResult<>();
result.onCompletion(() -> sb.append("completion event")); result.onCompletion(() -> sb.append("completion event"));
result.getInterceptor().afterCompletion(null, null); result.getLifecycleInterceptor().afterCompletion(null, null);
assertThat(result.isSetOrExpired()).isTrue(); assertThat(result.isSetOrExpired()).isTrue();
assertThat(sb.toString()).isEqualTo("completion event"); assertThat(sb.toString()).isEqualTo("completion event");
@ -109,7 +109,7 @@ class DeferredResultTests {
result.setResultHandler(handler); result.setResultHandler(handler);
result.onTimeout(() -> sb.append("timeout event")); result.onTimeout(() -> sb.append("timeout event"));
result.getInterceptor().handleTimeout(null, null); result.getLifecycleInterceptor().handleTimeout(null, null);
assertThat(sb.toString()).isEqualTo("timeout event"); assertThat(sb.toString()).isEqualTo("timeout event");
assertThat(result.setResult("hello")).as("Should not be able to set result a second time").isFalse(); assertThat(result.setResult("hello")).as("Should not be able to set result a second time").isFalse();
@ -127,7 +127,7 @@ class DeferredResultTests {
Exception e = new Exception(); Exception e = new Exception();
result.onError(t -> sb.append("error event")); result.onError(t -> sb.append("error event"));
result.getInterceptor().handleError(null, null, e); result.getLifecycleInterceptor().handleError(null, null, e);
assertThat(sb.toString()).isEqualTo("error event"); assertThat(sb.toString()).isEqualTo("error event");
assertThat(result.setResult("hello")).as("Should not be able to set result a second time").isFalse(); assertThat(result.setResult("hello")).as("Should not be able to set result a second time").isFalse();

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2024 the original author or authors. * Copyright 2002-2025 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -360,8 +360,8 @@ class ReactiveTypeHandler {
this.subscription.request(1); this.subscription.request(1);
} }
catch (final Throwable ex) { catch (final Throwable ex) {
if (logger.isTraceEnabled()) { if (logger.isDebugEnabled()) {
logger.trace("Send for " + this.emitter + " failed: " + ex); logger.debug("Send for " + this.emitter + " failed: " + ex);
} }
terminate(); terminate();
this.emitter.completeWithError(ex); this.emitter.completeWithError(ex);
@ -374,8 +374,8 @@ class ReactiveTypeHandler {
Throwable ex = this.error; Throwable ex = this.error;
this.error = null; this.error = null;
if (ex != null) { if (ex != null) {
if (logger.isTraceEnabled()) { if (logger.isDebugEnabled()) {
logger.trace("Publisher for " + this.emitter + " failed: " + ex); logger.debug("Publisher for " + this.emitter + " failed: " + ex);
} }
this.emitter.completeWithError(ex); this.emitter.completeWithError(ex);
} }