Cancel WebAsyncManager thread on request timeout
Issue: SPR-15852
This commit is contained in:
parent
9c3bd8ce85
commit
8b7a670821
|
|
@ -18,6 +18,7 @@ package org.springframework.web.context.request.async;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
@ -39,11 +40,19 @@ class CallableInterceptorChain {
|
||||||
|
|
||||||
private int preProcessIndex = -1;
|
private int preProcessIndex = -1;
|
||||||
|
|
||||||
|
private volatile Future<?> taskFuture;
|
||||||
|
|
||||||
|
|
||||||
public CallableInterceptorChain(List<CallableProcessingInterceptor> interceptors) {
|
public CallableInterceptorChain(List<CallableProcessingInterceptor> interceptors) {
|
||||||
this.interceptors = interceptors;
|
this.interceptors = interceptors;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void setTaskFuture(Future<?> taskFuture) {
|
||||||
|
this.taskFuture = taskFuture;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public void applyBeforeConcurrentHandling(NativeWebRequest request, Callable<?> task) throws Exception {
|
public void applyBeforeConcurrentHandling(NativeWebRequest request, Callable<?> task) throws Exception {
|
||||||
for (CallableProcessingInterceptor interceptor : this.interceptors) {
|
for (CallableProcessingInterceptor interceptor : this.interceptors) {
|
||||||
interceptor.beforeConcurrentHandling(request, task);
|
interceptor.beforeConcurrentHandling(request, task);
|
||||||
|
|
@ -77,6 +86,7 @@ class CallableInterceptorChain {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Object triggerAfterTimeout(NativeWebRequest request, Callable<?> task) {
|
public Object triggerAfterTimeout(NativeWebRequest request, Callable<?> task) {
|
||||||
|
cancelTask();
|
||||||
for (CallableProcessingInterceptor interceptor : this.interceptors) {
|
for (CallableProcessingInterceptor interceptor : this.interceptors) {
|
||||||
try {
|
try {
|
||||||
Object result = interceptor.handleTimeout(request, task);
|
Object result = interceptor.handleTimeout(request, task);
|
||||||
|
|
@ -94,7 +104,20 @@ class CallableInterceptorChain {
|
||||||
return CallableProcessingInterceptor.RESULT_NONE;
|
return CallableProcessingInterceptor.RESULT_NONE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void cancelTask() {
|
||||||
|
Future<?> future = this.taskFuture;
|
||||||
|
if (future != null) {
|
||||||
|
try {
|
||||||
|
future.cancel(true);
|
||||||
|
}
|
||||||
|
catch (Throwable ex) {
|
||||||
|
// Ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public Object triggerAfterError(NativeWebRequest request, Callable<?> task, Throwable throwable) {
|
public Object triggerAfterError(NativeWebRequest request, Callable<?> task, Throwable throwable) {
|
||||||
|
cancelTask();
|
||||||
for (CallableProcessingInterceptor interceptor : this.interceptors) {
|
for (CallableProcessingInterceptor interceptor : this.interceptors) {
|
||||||
try {
|
try {
|
||||||
Object result = interceptor.handleError(request, task, throwable);
|
Object result = interceptor.handleError(request, task, throwable);
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,7 @@ import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
import javax.servlet.http.HttpServletRequest;
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
|
||||||
|
|
@ -314,7 +315,7 @@ public final class WebAsyncManager {
|
||||||
interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, callable);
|
interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, callable);
|
||||||
startAsyncProcessing(processingContext);
|
startAsyncProcessing(processingContext);
|
||||||
try {
|
try {
|
||||||
this.taskExecutor.submit(() -> {
|
Future<?> future = this.taskExecutor.submit(() -> {
|
||||||
Object result = null;
|
Object result = null;
|
||||||
try {
|
try {
|
||||||
interceptorChain.applyPreProcess(this.asyncWebRequest, callable);
|
interceptorChain.applyPreProcess(this.asyncWebRequest, callable);
|
||||||
|
|
@ -328,6 +329,7 @@ public final class WebAsyncManager {
|
||||||
}
|
}
|
||||||
setConcurrentResultAndDispatch(result);
|
setConcurrentResultAndDispatch(result);
|
||||||
});
|
});
|
||||||
|
interceptorChain.setTaskFuture(future);
|
||||||
}
|
}
|
||||||
catch (RejectedExecutionException ex) {
|
catch (RejectedExecutionException ex) {
|
||||||
Object result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, ex);
|
Object result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, ex);
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@
|
||||||
package org.springframework.web.context.request.async;
|
package org.springframework.web.context.request.async;
|
||||||
|
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
import javax.servlet.AsyncEvent;
|
import javax.servlet.AsyncEvent;
|
||||||
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
@ -30,9 +31,12 @@ import org.springframework.web.context.request.NativeWebRequest;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.BDDMockito.given;
|
import static org.mockito.BDDMockito.given;
|
||||||
import static org.mockito.BDDMockito.mock;
|
import static org.mockito.BDDMockito.mock;
|
||||||
import static org.mockito.BDDMockito.verify;
|
import static org.mockito.BDDMockito.verify;
|
||||||
|
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
import static org.springframework.web.context.request.async.CallableProcessingInterceptor.RESULT_NONE;
|
import static org.springframework.web.context.request.async.CallableProcessingInterceptor.RESULT_NONE;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -151,6 +155,27 @@ public class WebAsyncManagerTimeoutTests {
|
||||||
verify(interceptor).beforeConcurrentHandling(this.asyncWebRequest, callable);
|
verify(interceptor).beforeConcurrentHandling(this.asyncWebRequest, callable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Test
|
||||||
|
public void startCallableProcessingTimeoutAndCheckThreadInterrupted() throws Exception {
|
||||||
|
|
||||||
|
StubCallable callable = new StubCallable();
|
||||||
|
Future future = mock(Future.class);
|
||||||
|
|
||||||
|
AsyncTaskExecutor executor = mock(AsyncTaskExecutor.class);
|
||||||
|
when(executor.submit(any(Runnable.class))).thenReturn(future);
|
||||||
|
|
||||||
|
this.asyncManager.setTaskExecutor(executor);
|
||||||
|
this.asyncManager.startCallableProcessing(callable);
|
||||||
|
|
||||||
|
this.asyncWebRequest.onTimeout(ASYNC_EVENT);
|
||||||
|
|
||||||
|
assertTrue(this.asyncManager.hasConcurrentResult());
|
||||||
|
|
||||||
|
verify(future).cancel(true);
|
||||||
|
verifyNoMoreInteractions(future);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void startDeferredResultProcessingTimeoutAndComplete() throws Exception {
|
public void startDeferredResultProcessingTimeoutAndComplete() throws Exception {
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue