Use singleOrEmpty to avoid upstream cancel
Closes gh-22952
This commit is contained in:
		
							parent
							
								
									d3110c452e
								
							
						
					
					
						commit
						0274752fe9
					
				| 
						 | 
					@ -120,7 +120,8 @@ public class EncoderHttpMessageWriter<T> implements HttpMessageWriter<T> {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if (inputStream instanceof Mono) {
 | 
							if (inputStream instanceof Mono) {
 | 
				
			||||||
			HttpHeaders headers = message.getHeaders();
 | 
								HttpHeaders headers = message.getHeaders();
 | 
				
			||||||
			return Mono.from(body)
 | 
								return body
 | 
				
			||||||
 | 
										.singleOrEmpty()
 | 
				
			||||||
					.switchIfEmpty(Mono.defer(() -> {
 | 
										.switchIfEmpty(Mono.defer(() -> {
 | 
				
			||||||
						headers.setContentLength(0);
 | 
											headers.setContentLength(0);
 | 
				
			||||||
						return message.setComplete().then(Mono.empty());
 | 
											return message.setComplete().then(Mono.empty());
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -33,6 +33,7 @@ import reactor.core.publisher.Flux;
 | 
				
			||||||
import reactor.core.publisher.Mono;
 | 
					import reactor.core.publisher.Mono;
 | 
				
			||||||
import reactor.test.StepVerifier;
 | 
					import reactor.test.StepVerifier;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import org.springframework.core.codec.CharSequenceEncoder;
 | 
				
			||||||
import org.springframework.core.io.buffer.DataBuffer;
 | 
					import org.springframework.core.io.buffer.DataBuffer;
 | 
				
			||||||
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
 | 
					import org.springframework.core.io.buffer.DefaultDataBufferFactory;
 | 
				
			||||||
import org.springframework.http.MediaType;
 | 
					import org.springframework.http.MediaType;
 | 
				
			||||||
| 
						 | 
					@ -131,9 +132,7 @@ public class EncoderHttpMessageWriterTests {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	@Test
 | 
						@Test
 | 
				
			||||||
	public void useNegotiatedMediaTypeCharset() {
 | 
						public void useNegotiatedMediaTypeCharset() {
 | 
				
			||||||
 | 
					 | 
				
			||||||
		MediaType negotiatedMediaType = new MediaType("text", "html", ISO_8859_1);
 | 
							MediaType negotiatedMediaType = new MediaType("text", "html", ISO_8859_1);
 | 
				
			||||||
 | 
					 | 
				
			||||||
		HttpMessageWriter<String> writer = getWriter(TEXT_PLAIN_UTF_8, TEXT_HTML);
 | 
							HttpMessageWriter<String> writer = getWriter(TEXT_PLAIN_UTF_8, TEXT_HTML);
 | 
				
			||||||
		writer.write(Mono.just("body"), forClass(String.class), negotiatedMediaType, this.response, NO_HINTS);
 | 
							writer.write(Mono.just("body"), forClass(String.class), negotiatedMediaType, this.response, NO_HINTS);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -143,7 +142,6 @@ public class EncoderHttpMessageWriterTests {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	@Test
 | 
						@Test
 | 
				
			||||||
	public void useHttpOutputMessageMediaType() {
 | 
						public void useHttpOutputMessageMediaType() {
 | 
				
			||||||
 | 
					 | 
				
			||||||
		MediaType outputMessageMediaType = MediaType.TEXT_HTML;
 | 
							MediaType outputMessageMediaType = MediaType.TEXT_HTML;
 | 
				
			||||||
		this.response.getHeaders().setContentType(outputMessageMediaType);
 | 
							this.response.getHeaders().setContentType(outputMessageMediaType);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -156,16 +154,25 @@ public class EncoderHttpMessageWriterTests {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	@Test
 | 
						@Test
 | 
				
			||||||
	public void setContentLengthForMonoBody() {
 | 
						public void setContentLengthForMonoBody() {
 | 
				
			||||||
 | 
					 | 
				
			||||||
		DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
 | 
							DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
 | 
				
			||||||
		DataBuffer buffer = factory.wrap("body".getBytes(StandardCharsets.UTF_8));
 | 
							DataBuffer buffer = factory.wrap("body".getBytes(StandardCharsets.UTF_8));
 | 
				
			||||||
		HttpMessageWriter<String> writer = getWriter(Flux.just(buffer), MimeTypeUtils.TEXT_PLAIN);
 | 
							HttpMessageWriter<String> writer = getWriter(Flux.just(buffer), MimeTypeUtils.TEXT_PLAIN);
 | 
				
			||||||
 | 
					 | 
				
			||||||
		writer.write(Mono.just("body"), forClass(String.class), TEXT_PLAIN, this.response, NO_HINTS).block();
 | 
							writer.write(Mono.just("body"), forClass(String.class), TEXT_PLAIN, this.response, NO_HINTS).block();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		assertEquals(4, this.response.getHeaders().getContentLength());
 | 
							assertEquals(4, this.response.getHeaders().getContentLength());
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						@Test // gh-22952
 | 
				
			||||||
 | 
						public void monoBodyDoesNotCancelEncodedFlux() {
 | 
				
			||||||
 | 
							Mono<String> inputStream = Mono.just("body")
 | 
				
			||||||
 | 
									.doOnCancel(() -> {
 | 
				
			||||||
 | 
										throw new AssertionError("Cancel signal not expected");
 | 
				
			||||||
 | 
									});
 | 
				
			||||||
 | 
							new EncoderHttpMessageWriter<>(CharSequenceEncoder.allMimeTypes())
 | 
				
			||||||
 | 
									.write(inputStream, forClass(String.class), TEXT_PLAIN, this.response, NO_HINTS)
 | 
				
			||||||
 | 
									.block();
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	@Test // SPR-17220
 | 
						@Test // SPR-17220
 | 
				
			||||||
	public void emptyBodyWritten() {
 | 
						public void emptyBodyWritten() {
 | 
				
			||||||
		HttpMessageWriter<String> writer = getWriter(MimeTypeUtils.TEXT_PLAIN);
 | 
							HttpMessageWriter<String> writer = getWriter(MimeTypeUtils.TEXT_PLAIN);
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue