Fix buffer leak in AbstractServerHttpResponse

See gh-26232
This commit is contained in:
Rossen Stoyanchev 2020-12-07 22:28:45 +00:00
parent ad42010785
commit 7418c4b7b7
2 changed files with 39 additions and 3 deletions

View File

@ -211,9 +211,16 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
// We must resolve value first however, for a chance to handle potential error.
if (body instanceof Mono) {
return ((Mono<? extends DataBuffer>) body)
.flatMap(buffer -> doCommit(() ->
writeWithInternal(Mono.fromCallable(() -> buffer)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release))))
.flatMap(buffer ->
doCommit(() -> {
try {
return writeWithInternal(Mono.fromCallable(() -> buffer)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
}
catch (Throwable ex) {
return Mono.error(ex);
}
}).doOnError(ex -> DataBufferUtils.release(buffer)))
.doOnError(t -> getHeaders().clearContentHeaders());
}
else {

View File

@ -19,7 +19,9 @@ package org.springframework.http.server.reactive;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
@ -27,15 +29,23 @@ import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.channel.AbortedException;
import reactor.test.StepVerifier;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.core.testfixture.io.buffer.LeakAwareDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseCookie;
import org.springframework.http.codec.EncoderHttpMessageWriter;
import org.springframework.http.codec.HttpMessageWriter;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.web.testfixture.http.server.reactive.MockServerHttpRequest;
import org.springframework.web.testfixture.http.server.reactive.MockServerHttpResponse;
import static org.assertj.core.api.Assertions.assertThat;
@ -186,6 +196,25 @@ public class ServerHttpResponseTests {
});
}
@Test // gh-26232
void monoResponseShouldNotLeakIfCancelled() {
LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory();
MockServerHttpRequest request = MockServerHttpRequest.get("/").build();
MockServerHttpResponse response = new MockServerHttpResponse(bufferFactory);
response.setWriteHandler(flux -> {
throw AbortedException.beforeSend();
});
HttpMessageWriter<Object> messageWriter = new EncoderHttpMessageWriter<>(new Jackson2JsonEncoder());
Mono<Void> result = messageWriter.write(Mono.just(Collections.singletonMap("foo", "bar")),
ResolvableType.forClass(Mono.class), ResolvableType.forClass(Map.class), null,
request, response, Collections.emptyMap());
StepVerifier.create(result).expectError(AbortedException.class).verify();
bufferFactory.checkForLeaks();
}
private DefaultDataBuffer wrap(String a) {
return DefaultDataBufferFactory.sharedInstance.wrap(ByteBuffer.wrap(a.getBytes(StandardCharsets.UTF_8)));