Use preset content-type for streaming response

Closes gh-35130
This commit is contained in:
rstoyanchev 2025-06-30 15:34:00 +01:00
parent e88b70e6ad
commit 55634f972c
4 changed files with 57 additions and 14 deletions

View File

@ -138,7 +138,8 @@ class ReactiveTypeHandler {
* with a {@link DeferredResult} * with a {@link DeferredResult}
*/ */
@Nullable @Nullable
public ResponseBodyEmitter handleValue(Object returnValue, MethodParameter returnType, public ResponseBodyEmitter handleValue(
Object returnValue, MethodParameter returnType, @Nullable MediaType presetContentType,
ModelAndViewContainer mav, NativeWebRequest request) throws Exception { ModelAndViewContainer mav, NativeWebRequest request) throws Exception {
Assert.notNull(returnValue, "Expected return value"); Assert.notNull(returnValue, "Expected return value");
@ -157,7 +158,7 @@ class ReactiveTypeHandler {
ResolvableType elementType = ResolvableType.forMethodParameter(returnType).getGeneric(); ResolvableType elementType = ResolvableType.forMethodParameter(returnType).getGeneric();
Class<?> elementClass = elementType.toClass(); Class<?> elementClass = elementType.toClass();
Collection<MediaType> mediaTypes = getMediaTypes(request); Collection<MediaType> mediaTypes = getMediaTypes(request, presetContentType);
Optional<MediaType> mediaType = mediaTypes.stream().filter(MimeType::isConcrete).findFirst(); Optional<MediaType> mediaType = mediaTypes.stream().filter(MimeType::isConcrete).findFirst();
if (adapter.isMultiValue()) { if (adapter.isMultiValue()) {
@ -228,14 +229,25 @@ class ReactiveTypeHandler {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private Collection<MediaType> getMediaTypes(NativeWebRequest request) private Collection<MediaType> getMediaTypes(NativeWebRequest request, @Nullable MediaType contentType)
throws HttpMediaTypeNotAcceptableException { throws HttpMediaTypeNotAcceptableException {
Collection<MediaType> mediaTypes = (Collection<MediaType>) request.getAttribute( Collection<MediaType> producibleMediaTypes = (Collection<MediaType>) request.getAttribute(
HandlerMapping.PRODUCIBLE_MEDIA_TYPES_ATTRIBUTE, RequestAttributes.SCOPE_REQUEST); HandlerMapping.PRODUCIBLE_MEDIA_TYPES_ATTRIBUTE, RequestAttributes.SCOPE_REQUEST);
return CollectionUtils.isEmpty(mediaTypes) ? Collection<MediaType> mediaTypes = (CollectionUtils.isEmpty(producibleMediaTypes) ?
this.contentNegotiationManager.resolveMediaTypes(request) : mediaTypes; this.contentNegotiationManager.resolveMediaTypes(request) : producibleMediaTypes);
if (contentType != null) {
for (MediaType mediaType : mediaTypes) {
if (mediaType.isConcrete()) {
return mediaTypes;
}
}
return List.of(contentType);
}
return mediaTypes;
} }
private ResponseBodyEmitter getEmitter(MediaType mediaType) { private ResponseBodyEmitter getEmitter(MediaType mediaType) {

View File

@ -186,10 +186,12 @@ public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodRetur
HttpServletResponse response = webRequest.getNativeResponse(HttpServletResponse.class); HttpServletResponse response = webRequest.getNativeResponse(HttpServletResponse.class);
Assert.state(response != null, "No HttpServletResponse"); Assert.state(response != null, "No HttpServletResponse");
ServerHttpResponse outputMessage = new ServletServerHttpResponse(response); ServerHttpResponse outputMessage = new ServletServerHttpResponse(response);
MediaType contentType = null;
if (returnValue instanceof ResponseEntity<?> responseEntity) { if (returnValue instanceof ResponseEntity<?> responseEntity) {
response.setStatus(responseEntity.getStatusCode().value()); response.setStatus(responseEntity.getStatusCode().value());
outputMessage.getHeaders().putAll(responseEntity.getHeaders()); outputMessage.getHeaders().putAll(responseEntity.getHeaders());
contentType = responseEntity.getHeaders().getContentType();
returnValue = responseEntity.getBody(); returnValue = responseEntity.getBody();
returnType = returnType.nested(); returnType = returnType.nested();
if (returnValue == null) { if (returnValue == null) {
@ -207,7 +209,7 @@ public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodRetur
emitter = responseBodyEmitter; emitter = responseBodyEmitter;
} }
else { else {
emitter = this.reactiveHandler.handleValue(returnValue, returnType, mavContainer, webRequest); emitter = this.reactiveHandler.handleValue(returnValue, returnType, contentType, mavContainer, webRequest);
if (emitter == null) { if (emitter == null) {
// We're not streaming; write headers without committing response // We're not streaming; write headers without committing response
outputMessage.getHeaders().forEach((headerName, headerValues) -> { outputMessage.getHeaders().forEach((headerName, headerValues) -> {

View File

@ -242,19 +242,22 @@ class ReactiveTypeHandlerTests {
// Media type from request // Media type from request
this.servletRequest.addHeader("Accept", "text/event-stream"); this.servletRequest.addHeader("Accept", "text/event-stream");
testSseResponse(true); testSseResponse(true, null);
// Media type from "produces" attribute // Media type from "produces" attribute
Set<MediaType> types = Collections.singleton(MediaType.TEXT_EVENT_STREAM); Set<MediaType> types = Collections.singleton(MediaType.TEXT_EVENT_STREAM);
this.servletRequest.setAttribute(HandlerMapping.PRODUCIBLE_MEDIA_TYPES_ATTRIBUTE, types); this.servletRequest.setAttribute(HandlerMapping.PRODUCIBLE_MEDIA_TYPES_ATTRIBUTE, types);
testSseResponse(true); testSseResponse(true, null);
// Preset media type // gh-35130
testSseResponse(true, MediaType.TEXT_EVENT_STREAM);
// No media type preferences // No media type preferences
testSseResponse(false); testSseResponse(false, null);
} }
private void testSseResponse(boolean expectSseEmitter) throws Exception { private void testSseResponse(boolean expectSseEmitter, @Nullable MediaType contentType) throws Exception {
ResponseBodyEmitter emitter = handleValue(Flux.empty(), Flux.class, forClass(String.class)); ResponseBodyEmitter emitter = handleValue(Flux.empty(), Flux.class, forClass(String.class), contentType);
Object actual = emitter instanceof SseEmitter; Object actual = emitter instanceof SseEmitter;
assertThat(actual).isEqualTo(expectSseEmitter); assertThat(actual).isEqualTo(expectSseEmitter);
resetRequest(); resetRequest();
@ -450,7 +453,7 @@ class ReactiveTypeHandlerTests {
try { try {
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer(); Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
ResponseBodyEmitter emitter = handler.handleValue(sink.asFlux(), returnType, mavContainer, this.webRequest); ResponseBodyEmitter emitter = handler.handleValue(sink.asFlux(), returnType, null, mavContainer, this.webRequest);
ContextEmitterHandler emitterHandler = new ContextEmitterHandler(); ContextEmitterHandler emitterHandler = new ContextEmitterHandler();
emitter.initialize(emitterHandler); emitter.initialize(emitterHandler);
@ -497,9 +500,15 @@ class ReactiveTypeHandlerTests {
private ResponseBodyEmitter handleValue(Object returnValue, Class<?> asyncType, private ResponseBodyEmitter handleValue(Object returnValue, Class<?> asyncType,
ResolvableType genericType) throws Exception { ResolvableType genericType) throws Exception {
return handleValue(returnValue, asyncType, genericType, null);
}
private ResponseBodyEmitter handleValue(Object returnValue, Class<?> asyncType,
ResolvableType genericType, @Nullable MediaType contentType) throws Exception {
ModelAndViewContainer mavContainer = new ModelAndViewContainer(); ModelAndViewContainer mavContainer = new ModelAndViewContainer();
MethodParameter returnType = on(TestController.class).resolveReturnType(asyncType, genericType); MethodParameter returnType = on(TestController.class).resolveReturnType(asyncType, genericType);
return this.handler.handleValue(returnValue, returnType, mavContainer, this.webRequest); return this.handler.handleValue(returnValue, returnType, contentType, mavContainer, this.webRequest);
} }

View File

@ -28,12 +28,14 @@ import io.micrometer.context.ContextSnapshot;
import io.micrometer.context.ContextSnapshot.Scope; import io.micrometer.context.ContextSnapshot.Scope;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
import org.springframework.core.MethodParameter; import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType; import org.springframework.core.ResolvableType;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.web.context.request.NativeWebRequest; import org.springframework.web.context.request.NativeWebRequest;
@ -342,6 +344,21 @@ class ResponseBodyEmitterReturnValueHandlerTests {
assertThat(this.response.isCommitted()).isFalse(); assertThat(this.response.isCommitted()).isFalse();
} }
@Test // gh-35130
void responseEntityFluxSseWithPresetContentType() throws Exception {
ResponseEntity<Publisher<?>> entity =
ResponseEntity.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(Flux.just("foo", "bar"));
MethodParameter type = on(TestController.class).resolveReturnType(ResponseEntity.class, Publisher.class);
this.handler.handleReturnValue(entity, type, this.mavContainer, this.webRequest);
assertThat(this.request.isAsyncStarted()).isTrue();
assertThat(this.response.getStatus()).isEqualTo(200);
assertThat(this.response.getContentType()).isEqualTo("text/event-stream");
assertThat(this.response.getContentAsString()).isEqualTo("data:foo\n\ndata:bar\n\n");
}
@SuppressWarnings({"unused", "ConstantConditions"}) @SuppressWarnings({"unused", "ConstantConditions"})
private static class TestController { private static class TestController {
@ -365,6 +382,9 @@ class ResponseBodyEmitterReturnValueHandlerTests {
private ResponseEntity<Flux<String>> h9() { return null; } private ResponseEntity<Flux<String>> h9() { return null; }
private ResponseEntity<Flux<SimpleBean>> h10() { return null; } private ResponseEntity<Flux<SimpleBean>> h10() { return null; }
private ResponseEntity<Publisher<?>> h11() { return null; }
} }