Use doOnDiscard to free internally queued data buffers

Issue: SPR-17246
This commit is contained in:
Arjen Poutsma 2018-09-11 13:24:03 +02:00
parent 1756f83701
commit 8a4835368d
5 changed files with 45 additions and 15 deletions

View File

@ -101,8 +101,8 @@ public abstract class DataBufferUtils {
bufferSize); bufferSize);
return Flux.generate(generator); return Flux.generate(generator);
}, },
DataBufferUtils::closeChannel DataBufferUtils::closeChannel)
); .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
} }
/** /**
@ -140,14 +140,16 @@ public abstract class DataBufferUtils {
DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize); DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize);
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize); ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
return Flux.using(channelSupplier, Flux<DataBuffer> result = Flux.using(channelSupplier,
channel -> Flux.create(sink -> { channel -> Flux.create(sink -> {
CompletionHandler<Integer, DataBuffer> completionHandler = CompletionHandler<Integer, DataBuffer> completionHandler =
new AsynchronousFileChannelReadCompletionHandler(channel, new AsynchronousFileChannelReadCompletionHandler(channel,
sink, position, dataBufferFactory, bufferSize); sink, position, dataBufferFactory, bufferSize);
channel.read(byteBuffer, position, dataBuffer, completionHandler); channel.read(byteBuffer, position, dataBuffer, completionHandler);
}), }),
DataBufferUtils::closeChannel); DataBufferUtils::closeChannel);
return result.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
} }
/** /**
@ -391,12 +393,19 @@ public abstract class DataBufferUtils {
} }
/** /**
* Release the given data buffer, if it is a {@link PooledDataBuffer}. * Release the given data buffer, if it is a {@link PooledDataBuffer} and
* has been {@linkplain PooledDataBuffer#isAllocated() allocated}.
* @param dataBuffer the data buffer to release * @param dataBuffer the data buffer to release
* @return {@code true} if the buffer was released; {@code false} otherwise. * @return {@code true} if the buffer was released; {@code false} otherwise.
*/ */
public static boolean release(@Nullable DataBuffer dataBuffer) { public static boolean release(@Nullable DataBuffer dataBuffer) {
return (dataBuffer instanceof PooledDataBuffer && ((PooledDataBuffer) dataBuffer).release()); if (dataBuffer instanceof PooledDataBuffer) {
PooledDataBuffer pooledDataBuffer = (PooledDataBuffer) dataBuffer;
if (pooledDataBuffer.isAllocated()) {
return pooledDataBuffer.release();
}
}
return false;
} }
/** /**

View File

@ -259,6 +259,11 @@ public class NettyDataBuffer implements PooledDataBuffer {
return new ByteBufOutputStream(this.byteBuf); return new ByteBufOutputStream(this.byteBuf);
} }
@Override
public boolean isAllocated() {
return this.byteBuf.refCnt() > 0;
}
@Override @Override
public PooledDataBuffer retain() { public PooledDataBuffer retain() {
return new NettyDataBuffer(this.byteBuf.retain(), this.dataBufferFactory); return new NettyDataBuffer(this.byteBuf.retain(), this.dataBufferFactory);

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2016 the original author or authors. * Copyright 2002-2018 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -25,6 +25,12 @@ package org.springframework.core.io.buffer;
*/ */
public interface PooledDataBuffer extends DataBuffer { public interface PooledDataBuffer extends DataBuffer {
/**
* Return {@code true} if this buffer is allocated; {@code false} if it has been deallocated.
* @since 5.1
*/
boolean isAllocated();
/** /**
* Increase the reference count for this buffer by one. * Increase the reference count for this buffer by one.
* @return this buffer * @return this buffer
@ -32,9 +38,9 @@ public interface PooledDataBuffer extends DataBuffer {
PooledDataBuffer retain(); PooledDataBuffer retain();
/** /**
* Decrease the reference count for this buffer by one, and release it * Decrease the reference count for this buffer by one, and deallocate it
* once the count reaches zero. * once the count reaches zero.
* @return {@code true} if the buffer was released; {@code false} otherwise. * @return {@code true} if the buffer was deallocated; {@code false} otherwise.
*/ */
boolean release(); boolean release();

View File

@ -29,7 +29,9 @@ import reactor.netty.Connection;
import reactor.netty.http.server.HttpServerRequest; import reactor.netty.http.server.HttpServerRequest;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.http.HttpCookie; import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
@ -153,6 +155,7 @@ class ReactorServerHttpRequest extends AbstractServerHttpRequest {
return this.request.remoteAddress(); return this.request.remoteAddress();
} }
@Override
@Nullable @Nullable
protected SslInfo initSslInfo() { protected SslInfo initSslInfo() {
SslHandler sslHandler = ((Connection) this.request).channel().pipeline().get(SslHandler.class); SslHandler sslHandler = ((Connection) this.request).channel().pipeline().get(SslHandler.class);
@ -165,7 +168,8 @@ class ReactorServerHttpRequest extends AbstractServerHttpRequest {
@Override @Override
public Flux<DataBuffer> getBody() { public Flux<DataBuffer> getBody() {
return this.request.receive().retain().map(this.bufferFactory::wrap); Flux<DataBuffer> body = this.request.receive().retain().map(this.bufferFactory::wrap);
return body.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -119,7 +119,8 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
@Override @Override
public Flux<DataBuffer> getBody() { public Flux<DataBuffer> getBody() {
return Flux.from(this.body); return Flux.from(this.body)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -216,6 +217,11 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
this.pooledByteBuffer = pooledByteBuffer; this.pooledByteBuffer = pooledByteBuffer;
} }
@Override
public boolean isAllocated() {
return this.pooledByteBuffer.isOpen();
}
@Override @Override
public PooledDataBuffer retain() { public PooledDataBuffer retain() {
return this; return this;