parent
01fb4dbeba
commit
cb44ae62e9
|
|
@ -223,6 +223,7 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple
|
|||
byte[] bytes = byteBuilder.toByteArray();
|
||||
DataBuffer buffer = bufferFactory.allocateBuffer(bytes.length);
|
||||
buffer.write(bytes);
|
||||
Hints.touchDataBuffer(buffer, hints, logger);
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
|
@ -267,6 +268,7 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple
|
|||
DataBuffer buffer = bufferFactory.allocateBuffer(length + separator.length);
|
||||
buffer.write(bytes, offset, length);
|
||||
buffer.write(separator);
|
||||
Hints.touchDataBuffer(buffer, hints, logger);
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -211,16 +211,18 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
|
|||
// We must resolve value first however, for a chance to handle potential error.
|
||||
if (body instanceof Mono) {
|
||||
return ((Mono<? extends DataBuffer>) body)
|
||||
.flatMap(buffer ->
|
||||
doCommit(() -> {
|
||||
try {
|
||||
return writeWithInternal(Mono.fromCallable(() -> buffer)
|
||||
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
return Mono.error(ex);
|
||||
}
|
||||
}).doOnError(ex -> DataBufferUtils.release(buffer)))
|
||||
.flatMap(buffer -> {
|
||||
touchDataBuffer(buffer);
|
||||
return doCommit(() -> {
|
||||
try {
|
||||
return writeWithInternal(Mono.fromCallable(() -> buffer)
|
||||
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
return Mono.error(ex);
|
||||
}
|
||||
}).doOnError(ex -> DataBufferUtils.release(buffer));
|
||||
})
|
||||
.doOnError(t -> getHeaders().clearContentHeaders());
|
||||
}
|
||||
else {
|
||||
|
|
@ -323,4 +325,13 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
|
|||
*/
|
||||
protected abstract void applyCookies();
|
||||
|
||||
/**
|
||||
* Allow sub-classes to associate a hint with the data buffer if it is a
|
||||
* pooled buffer and supports leak tracking.
|
||||
* @param buffer the buffer to attach a hint to
|
||||
* @since 5.3.2
|
||||
*/
|
||||
protected void touchDataBuffer(DataBuffer buffer) {
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,6 +20,9 @@ import java.nio.file.Path;
|
|||
import java.util.List;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelId;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
|
@ -27,6 +30,7 @@ import reactor.netty.http.server.HttpServerResponse;
|
|||
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpStatus;
|
||||
|
|
@ -43,6 +47,9 @@ import org.springframework.util.Assert;
|
|||
*/
|
||||
class ReactorServerHttpResponse extends AbstractServerHttpResponse implements ZeroCopyHttpOutputMessage {
|
||||
|
||||
private static final Log logger = LogFactory.getLog(ReactorServerHttpResponse.class);
|
||||
|
||||
|
||||
private final HttpServerResponse response;
|
||||
|
||||
|
||||
|
|
@ -115,4 +122,14 @@ class ReactorServerHttpResponse extends AbstractServerHttpResponse implements Ze
|
|||
Flux.from(dataBuffers).map(NettyDataBufferFactory::toByteBuf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void touchDataBuffer(DataBuffer buffer) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
this.response.withConnection(connection -> {
|
||||
ChannelId id = connection.channel().id();
|
||||
DataBufferUtils.touch(buffer, "Channel id: " + id.asShortText());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue