From 7418c4b7b70a1af4acb15c82c1205de6d3227991 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 7 Dec 2020 22:28:45 +0000 Subject: [PATCH] Fix buffer leak in AbstractServerHttpResponse See gh-26232 --- .../reactive/AbstractServerHttpResponse.java | 13 +++++++-- .../reactive/ServerHttpResponseTests.java | 29 +++++++++++++++++++ 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java index a98b9695b4a..04bd7f2adb9 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java @@ -211,9 +211,16 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { // We must resolve value first however, for a chance to handle potential error. if (body instanceof Mono) { return ((Mono) body) - .flatMap(buffer -> doCommit(() -> - writeWithInternal(Mono.fromCallable(() -> buffer) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)))) + .flatMap(buffer -> + doCommit(() -> { + try { + return writeWithInternal(Mono.fromCallable(() -> buffer) + .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)); + } + catch (Throwable ex) { + return Mono.error(ex); + } + }).doOnError(ex -> DataBufferUtils.release(buffer))) .doOnError(t -> getHeaders().clearContentHeaders()); } else { diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java index 16c96fc643f..a12825b5c1a 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java @@ -19,7 +19,9 @@ package org.springframework.http.server.reactive; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.function.Consumer; import java.util.function.Supplier; @@ -27,15 +29,23 @@ import org.junit.jupiter.api.Test; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.netty.channel.AbortedException; import reactor.test.StepVerifier; +import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DefaultDataBuffer; import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.core.testfixture.io.buffer.LeakAwareDataBufferFactory; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseCookie; +import org.springframework.http.codec.EncoderHttpMessageWriter; +import org.springframework.http.codec.HttpMessageWriter; +import org.springframework.http.codec.json.Jackson2JsonEncoder; +import org.springframework.web.testfixture.http.server.reactive.MockServerHttpRequest; +import org.springframework.web.testfixture.http.server.reactive.MockServerHttpResponse; import static org.assertj.core.api.Assertions.assertThat; @@ -186,6 +196,25 @@ public class ServerHttpResponseTests { }); } + @Test // gh-26232 + void monoResponseShouldNotLeakIfCancelled() { + LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(); + MockServerHttpRequest request = MockServerHttpRequest.get("/").build(); + MockServerHttpResponse response = new MockServerHttpResponse(bufferFactory); + response.setWriteHandler(flux -> { + throw AbortedException.beforeSend(); + }); + + HttpMessageWriter messageWriter = new EncoderHttpMessageWriter<>(new Jackson2JsonEncoder()); + Mono result = messageWriter.write(Mono.just(Collections.singletonMap("foo", "bar")), + ResolvableType.forClass(Mono.class), ResolvableType.forClass(Map.class), null, + request, response, Collections.emptyMap()); + + StepVerifier.create(result).expectError(AbortedException.class).verify(); + + bufferFactory.checkForLeaks(); + } + private DefaultDataBuffer wrap(String a) { return DefaultDataBufferFactory.sharedInstance.wrap(ByteBuffer.wrap(a.getBytes(StandardCharsets.UTF_8)));