Deferred handling of Flux error in Spring MVC

This commit defers flushing of the response until the first item is
emitted that needs to be written (and flushed) to the response.

This makes Spring MVC consistent with WebFlux in this regard.

Closes gh-21972
This commit is contained in:
Rossen Stoyanchev 2019-05-06 12:26:25 -04:00
parent 15e1af2281
commit 53cadf15e7
2 changed files with 37 additions and 9 deletions

View File

@ -152,9 +152,8 @@ public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodRetur
// At this point we know we're streaming..
ShallowEtagHeaderFilter.disableContentCaching(request);
// Commit the response and wrap to ignore further header changes
outputMessage.getBody();
outputMessage.flush();
// Wrap the response to ignore further header changes
// Headers will be flushed at the first write
outputMessage = new StreamingServletServerHttpResponse(outputMessage);
DeferredResult<?> deferredResult = new DeferredResult<>(emitter.getTimeout());
@ -198,7 +197,13 @@ public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodRetur
@Override
public void complete() {
this.deferredResult.setResult(null);
try {
this.outputMessage.flush();
this.deferredResult.setResult(null);
}
catch (IOException ex) {
this.deferredResult.setErrorResult(ex);
}
}
@Override

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -40,6 +40,7 @@ import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.context.request.ServletWebRequest;
import org.springframework.web.context.request.async.AsyncWebRequest;
import org.springframework.web.context.request.async.StandardServletAsyncWebRequest;
import org.springframework.web.context.request.async.WebAsyncManager;
import org.springframework.web.context.request.async.WebAsyncUtils;
import org.springframework.web.method.support.ModelAndViewContainer;
@ -197,7 +198,6 @@ public class ResponseBodyEmitterReturnValueHandlerTests {
assertTrue(this.request.isAsyncStarted());
assertEquals(200, this.response.getStatus());
assertEquals("text/event-stream;charset=UTF-8", this.response.getContentType());
SimpleBean bean1 = new SimpleBean();
bean1.setId(1L);
@ -210,6 +210,7 @@ public class ResponseBodyEmitterReturnValueHandlerTests {
emitter.send(SseEmitter.event().
comment("a test").name("update").id("1").reconnectTime(5000L).data(bean1).data(bean2));
assertEquals("text/event-stream;charset=UTF-8", this.response.getContentType());
assertEquals(":a test\n" +
"event:update\n" +
"id:1\n" +
@ -230,21 +231,43 @@ public class ResponseBodyEmitterReturnValueHandlerTests {
assertTrue(this.request.isAsyncStarted());
assertEquals(200, this.response.getStatus());
assertEquals("text/event-stream;charset=UTF-8", this.response.getContentType());
processor.onNext("foo");
processor.onNext("bar");
processor.onNext("baz");
processor.onComplete();
assertEquals("text/event-stream;charset=UTF-8", this.response.getContentType());
assertEquals("data:foo\n\ndata:bar\n\ndata:baz\n\n", this.response.getContentAsString());
}
@Test // gh-21972
public void responseBodyFluxWithError() throws Exception {
this.request.addHeader("Accept", "text/event-stream");
MethodParameter type = on(TestController.class).resolveReturnType(Flux.class, String.class);
EmitterProcessor<String> processor = EmitterProcessor.create();
this.handler.handleReturnValue(processor, type, this.mavContainer, this.webRequest);
assertTrue(this.request.isAsyncStarted());
IllegalStateException ex = new IllegalStateException("wah wah");
processor.onError(ex);
processor.onComplete();
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(this.webRequest);
assertSame(ex, asyncManager.getConcurrentResult());
assertNull(this.response.getContentType());
}
@Test
public void responseEntitySse() throws Exception {
MethodParameter type = on(TestController.class).resolveReturnType(ResponseEntity.class, SseEmitter.class);
ResponseEntity<SseEmitter> entity = ResponseEntity.ok().header("foo", "bar").body(new SseEmitter());
SseEmitter emitter = new SseEmitter();
ResponseEntity<SseEmitter> entity = ResponseEntity.ok().header("foo", "bar").body(emitter);
this.handler.handleReturnValue(entity, type, this.mavContainer, this.webRequest);
emitter.complete();
assertTrue(this.request.isAsyncStarted());
assertEquals(200, this.response.getStatus());
@ -274,13 +297,13 @@ public class ResponseBodyEmitterReturnValueHandlerTests {
assertTrue(this.request.isAsyncStarted());
assertEquals(200, this.response.getStatus());
assertEquals("text/plain", this.response.getContentType());
processor.onNext("foo");
processor.onNext("bar");
processor.onNext("baz");
processor.onComplete();
assertEquals("text/plain", this.response.getContentType());
assertEquals("foobarbaz", this.response.getContentAsString());
}