From cb44ae62e9c49809a85bd44d82e1a5047dfd5290 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 8 Dec 2020 18:43:54 +0000 Subject: [PATCH] Additional DataBuffer hints See gh-26230 --- .../codec/json/AbstractJackson2Encoder.java | 2 ++ .../reactive/AbstractServerHttpResponse.java | 31 +++++++++++++------ .../reactive/ReactorServerHttpResponse.java | 17 ++++++++++ 3 files changed, 40 insertions(+), 10 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Encoder.java b/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Encoder.java index a5bfda5625f..aa6ac5a26ab 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Encoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Encoder.java @@ -223,6 +223,7 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple byte[] bytes = byteBuilder.toByteArray(); DataBuffer buffer = bufferFactory.allocateBuffer(bytes.length); buffer.write(bytes); + Hints.touchDataBuffer(buffer, hints, logger); return buffer; } @@ -267,6 +268,7 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple DataBuffer buffer = bufferFactory.allocateBuffer(length + separator.length); buffer.write(bytes, offset, length); buffer.write(separator); + Hints.touchDataBuffer(buffer, hints, logger); return buffer; } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java index 04bd7f2adb9..41b691c83a4 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java @@ -211,16 +211,18 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { // We must resolve value first however, for a chance to handle potential error. if (body instanceof Mono) { return ((Mono) body) - .flatMap(buffer -> - doCommit(() -> { - try { - return writeWithInternal(Mono.fromCallable(() -> buffer) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)); - } - catch (Throwable ex) { - return Mono.error(ex); - } - }).doOnError(ex -> DataBufferUtils.release(buffer))) + .flatMap(buffer -> { + touchDataBuffer(buffer); + return doCommit(() -> { + try { + return writeWithInternal(Mono.fromCallable(() -> buffer) + .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)); + } + catch (Throwable ex) { + return Mono.error(ex); + } + }).doOnError(ex -> DataBufferUtils.release(buffer)); + }) .doOnError(t -> getHeaders().clearContentHeaders()); } else { @@ -323,4 +325,13 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { */ protected abstract void applyCookies(); + /** + * Allow sub-classes to associate a hint with the data buffer if it is a + * pooled buffer and supports leak tracking. + * @param buffer the buffer to attach a hint to + * @since 5.3.2 + */ + protected void touchDataBuffer(DataBuffer buffer) { + } + } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java index 64b0a1c3f1d..ccfa1f1f883 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java @@ -20,6 +20,9 @@ import java.nio.file.Path; import java.util.List; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelId; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -27,6 +30,7 @@ import reactor.netty.http.server.HttpServerResponse; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; @@ -43,6 +47,9 @@ import org.springframework.util.Assert; */ class ReactorServerHttpResponse extends AbstractServerHttpResponse implements ZeroCopyHttpOutputMessage { + private static final Log logger = LogFactory.getLog(ReactorServerHttpResponse.class); + + private final HttpServerResponse response; @@ -115,4 +122,14 @@ class ReactorServerHttpResponse extends AbstractServerHttpResponse implements Ze Flux.from(dataBuffers).map(NettyDataBufferFactory::toByteBuf); } + @Override + protected void touchDataBuffer(DataBuffer buffer) { + if (logger.isDebugEnabled()) { + this.response.withConnection(connection -> { + ChannelId id = connection.channel().id(); + DataBufferUtils.touch(buffer, "Channel id: " + id.asShortText()); + }); + } + } + }