Allow "release on close" for DataBuffer.asInputStream

This commit introduces DataBuffer.asInputStream(boolean), that can
release the underlying buffer when the stream is closed.

Furthermore, this commit adds additional javadoc.

Issue: SPR-16444
This commit is contained in:
Arjen Poutsma 2018-02-08 10:11:53 +01:00
parent 4318710b9b
commit fb61af1882
5 changed files with 60 additions and 3 deletions

View File

@ -248,11 +248,25 @@ public interface DataBuffer {
/**
* Expose this buffer's data as an {@link InputStream}. Both data and read position are
* shared between the returned stream and this data buffer.
* shared between the returned stream and this data buffer. The underlying buffer will
* <strong>not</strong> be {@linkplain DataBufferUtils#release(DataBuffer) released} when the
* input stream is {@linkplain InputStream#close() closed}.
* @return this data buffer as an input stream
* @see #asInputStream(boolean)
*/
InputStream asInputStream();
/**
* Expose this buffer's data as an {@link InputStream}. Both data and read position are
* shared between the returned stream and this data buffer.
* @param releaseOnClose whether the underlying buffer will be
* {@linkplain DataBufferUtils#release(DataBuffer) released} when the input stream is
* {@linkplain InputStream#close() closed}.
* @return this data buffer as an input stream
* @since 5.0.4
*/
InputStream asInputStream(boolean releaseOnClose);
/**
* Expose this buffer's data as an {@link OutputStream}. Both data and write position are
* shared between the returned stream and this data buffer.

View File

@ -365,6 +365,11 @@ public class DefaultDataBuffer implements DataBuffer {
return new DefaultDataBufferInputStream();
}
@Override
public InputStream asInputStream(boolean releaseOnClose) {
return new DefaultDataBufferInputStream();
}
@Override
public OutputStream asOutputStream() {
return new DefaultDataBufferOutputStream();

View File

@ -250,6 +250,11 @@ public class NettyDataBuffer implements PooledDataBuffer {
return new ByteBufInputStream(this.byteBuf);
}
@Override
public InputStream asInputStream(boolean releaseOnClose) {
return new ByteBufInputStream(this.byteBuf, releaseOnClose);
}
@Override
public OutputStream asOutputStream() {
return new ByteBufOutputStream(this.byteBuf);

View File

@ -182,6 +182,27 @@ public class DataBufferTests extends AbstractDataBufferAllocatingTestCase {
release(buffer);
}
@Test
public void inputStreamReleaseOnClose() throws IOException {
DataBuffer buffer = createDataBuffer(3);
byte[] bytes = {'a', 'b', 'c'};
buffer.write(bytes);
InputStream inputStream = buffer.asInputStream(true);
try {
byte[] result = new byte[3];
int len = inputStream.read(result);
assertEquals(3, len);
assertArrayEquals(bytes, result);
} finally {
inputStream.close();
}
// AbstractDataBufferAllocatingTestCase.LeakDetector will verify the buffer's release
}
@Test
public void outputStream() throws IOException {
DataBuffer buffer = createDataBuffer(4);

View File

@ -35,6 +35,7 @@ import reactor.core.publisher.Flux;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
@ -210,8 +211,14 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
@Override
public boolean release() {
this.pooledByteBuffer.close();
return this.pooledByteBuffer.isOpen();
boolean result;
try {
result = DataBufferUtils.release(this.dataBuffer);
}
finally {
this.pooledByteBuffer.close();
}
return result && this.pooledByteBuffer.isOpen();
}
@Override
@ -338,6 +345,11 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
return this.dataBuffer.asInputStream();
}
@Override
public InputStream asInputStream(boolean releaseOnClose) {
return this.dataBuffer.asInputStream(releaseOnClose);
}
@Override
public OutputStream asOutputStream() {
return this.dataBuffer.asOutputStream();