Merge branch '5.2.x' into master

This commit is contained in:
Rossen Stoyanchev 2020-10-16 19:16:11 +01:00
commit a4d0af802a
2 changed files with 41 additions and 15 deletions

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2018 the original author or authors. * Copyright 2002-2020 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -72,18 +72,21 @@ public class ResponseBodyEmitter {
/** Store send data before handler is initialized. */ /** Store send data before handler is initialized. */
private final Set<DataWithMediaType> earlySendAttempts = new LinkedHashSet<>(8); private final Set<DataWithMediaType> earlySendAttempts = new LinkedHashSet<>(8);
/** Store complete invocation before handler is initialized. */ /** Store successful completion before the handler is initialized. */
private boolean complete; private boolean complete;
/** Store completeWithError invocation before handler is initialized. */ /** Store an error before the handler is initialized. */
@Nullable @Nullable
private Throwable failure; private Throwable failure;
/** /**
* After an IOException on send, the servlet container will provide an onError * After an I/O error, we don't call {@link #completeWithError} directly but
* callback that we'll handle as completeWithError (on container thread). * wait for the Servlet container to call us via {@code AsyncListener#onError}
* We use this flag to ignore competing attempts to completeWithError by * on a container thread at which point we call completeWithError.
* the application via try-catch. */ * This flag is used to ignore further calls to complete or completeWithError
* that may come for example from an application try-catch block on the
* thread of the I/O error.
*/
private boolean sendFailed; private boolean sendFailed;
private final DefaultCallback timeoutCallback = new DefaultCallback(); private final DefaultCallback timeoutCallback = new DefaultCallback();
@ -124,10 +127,14 @@ public class ResponseBodyEmitter {
synchronized void initialize(Handler handler) throws IOException { synchronized void initialize(Handler handler) throws IOException {
this.handler = handler; this.handler = handler;
try {
for (DataWithMediaType sendAttempt : this.earlySendAttempts) { for (DataWithMediaType sendAttempt : this.earlySendAttempts) {
sendInternal(sendAttempt.getData(), sendAttempt.getMediaType()); sendInternal(sendAttempt.getData(), sendAttempt.getMediaType());
} }
}
finally {
this.earlySendAttempts.clear(); this.earlySendAttempts.clear();
}
if (this.complete) { if (this.complete) {
if (this.failure != null) { if (this.failure != null) {
@ -144,6 +151,13 @@ public class ResponseBodyEmitter {
} }
} }
synchronized void initializeWithError(Throwable ex) {
this.complete = true;
this.failure = ex;
this.earlySendAttempts.clear();
this.errorCallback.accept(ex);
}
/** /**
* Invoked after the response is updated with the status code and headers, * Invoked after the response is updated with the status code and headers,
* if the ResponseBodyEmitter is wrapped in a ResponseEntity, but before the * if the ResponseBodyEmitter is wrapped in a ResponseEntity, but before the
@ -179,7 +193,9 @@ public class ResponseBodyEmitter {
* @throws java.lang.IllegalStateException wraps any other errors * @throws java.lang.IllegalStateException wraps any other errors
*/ */
public synchronized void send(Object object, @Nullable MediaType mediaType) throws IOException { public synchronized void send(Object object, @Nullable MediaType mediaType) throws IOException {
Assert.state(!this.complete, "ResponseBodyEmitter is already set complete"); Assert.state(!this.complete,
"ResponseBodyEmitter has already completed" +
(this.failure != null ? " with error: " + this.failure : ""));
sendInternal(object, mediaType); sendInternal(object, mediaType);
} }
@ -280,7 +296,10 @@ public class ResponseBodyEmitter {
/** /**
* Handle sent objects and complete request processing. * Contract to handle the sending of event data, the completion of event
* sending, and the registration of callbacks to be invoked in case of
* timeout, error, and completion for any reason (including from the
* container side).
*/ */
interface Handler { interface Handler {

View File

@ -172,10 +172,17 @@ public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodRetur
// Headers will be flushed at the first write // Headers will be flushed at the first write
outputMessage = new StreamingServletServerHttpResponse(outputMessage); outputMessage = new StreamingServletServerHttpResponse(outputMessage);
HttpMessageConvertingHandler handler;
try {
DeferredResult<?> deferredResult = new DeferredResult<>(emitter.getTimeout()); DeferredResult<?> deferredResult = new DeferredResult<>(emitter.getTimeout());
WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer); WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer);
handler = new HttpMessageConvertingHandler(outputMessage, deferredResult);
}
catch (Throwable ex) {
emitter.initializeWithError(ex);
throw ex;
}
HttpMessageConvertingHandler handler = new HttpMessageConvertingHandler(outputMessage, deferredResult);
emitter.initialize(handler); emitter.initialize(handler);
} }