Uncomment buffer leak tests in DataBufferUtils

Following a response on
https://github.com/reactor/reactor-core/issues/1634 apparently
FluxSink is respecting doOnDiscard but there is a race condition
in DataBufferUtils with file reading.

This commit makes two changes:
1) Add more checks for onDispose to detect cancellation signals
as well as to deal with potentially concurrent such signal.
2) Do not close the channel through the Flux.using callback but
rather allow the current I/O callback to take place and only then
close the channel or else the buffer is left hanging.

Despite this tests still can fail due to a suspected issue in Reactor
itself with the doOnDiscard callback for FluxSink. That's tracked under
the same issue https://github.com/reactor/reactor-core/issues/1634
and for now the use of DefaultDataBufferFactory is enforced as a
workaround until the issue is resolved.

See gh-22107
This commit is contained in:
Rossen Stoyanchev 2019-03-28 17:18:40 -04:00
parent 3cf2c04406
commit beae1fbb12
5 changed files with 38 additions and 16 deletions

View File

@ -57,6 +57,12 @@ public abstract class DataBufferUtils {
private static final Consumer<DataBuffer> RELEASE_CONSUMER = DataBufferUtils::release; private static final Consumer<DataBuffer> RELEASE_CONSUMER = DataBufferUtils::release;
/**
* Workaround to disable use of pooled buffers:
* https://github.com/reactor/reactor-core/issues/1634
*/
private static final DataBufferFactory defaultDataBufferFactory = new DefaultDataBufferFactory();
//--------------------------------------------------------------------- //---------------------------------------------------------------------
// Reading // Reading
@ -132,16 +138,21 @@ public abstract class DataBufferUtils {
Assert.isTrue(position >= 0, "'position' must be >= 0"); Assert.isTrue(position >= 0, "'position' must be >= 0");
Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0"); Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
DataBufferFactory bufferFactoryToUse = defaultDataBufferFactory;
Flux<DataBuffer> flux = Flux.using(channelSupplier, Flux<DataBuffer> flux = Flux.using(channelSupplier,
channel -> Flux.create(sink -> { channel -> Flux.create(sink -> {
ReadCompletionHandler handler = ReadCompletionHandler handler =
new ReadCompletionHandler(channel, sink, position, bufferFactory, bufferSize); new ReadCompletionHandler(channel, sink, position, bufferFactoryToUse, bufferSize);
DataBuffer dataBuffer = bufferFactory.allocateBuffer(bufferSize); DataBuffer dataBuffer = bufferFactoryToUse.allocateBuffer(bufferSize);
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize); ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
channel.read(byteBuffer, position, dataBuffer, handler); channel.read(byteBuffer, position, dataBuffer, handler);
sink.onDispose(handler::dispose); sink.onDispose(handler::dispose);
}), }),
DataBufferUtils::closeChannel); channel -> {
// Do not close channel from here, rather wait for the current read callback
// and then complete after releasing the DataBuffer.
});
return flux.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); return flux.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
} }
@ -505,26 +516,40 @@ public abstract class DataBufferUtils {
@Override @Override
public void completed(Integer read, DataBuffer dataBuffer) { public void completed(Integer read, DataBuffer dataBuffer) {
if (read != -1) { if (read != -1 && !this.disposed.get()) {
long pos = this.position.addAndGet(read); long pos = this.position.addAndGet(read);
dataBuffer.writePosition(read); dataBuffer.writePosition(read);
this.sink.next(dataBuffer); this.sink.next(dataBuffer);
if (!this.disposed.get()) { // It's possible for cancellation to happen right before the push into the sink
if (this.disposed.get()) {
// TODO:
// This is not ideal since we already passed the buffer into the sink and
// releasing may cause something reading to fail. Maybe we won't have to
// do this after https://github.com/reactor/reactor-core/issues/1634
complete(dataBuffer);
}
else {
DataBuffer newDataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); DataBuffer newDataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize);
ByteBuffer newByteBuffer = newDataBuffer.asByteBuffer(0, this.bufferSize); ByteBuffer newByteBuffer = newDataBuffer.asByteBuffer(0, this.bufferSize);
this.channel.read(newByteBuffer, pos, newDataBuffer, this); this.channel.read(newByteBuffer, pos, newDataBuffer, this);
} }
} }
else { else {
complete(dataBuffer);
}
}
private void complete(DataBuffer dataBuffer) {
release(dataBuffer); release(dataBuffer);
this.sink.complete(); this.sink.complete();
} closeChannel(this.channel);
} }
@Override @Override
public void failed(Throwable exc, DataBuffer dataBuffer) { public void failed(Throwable exc, DataBuffer dataBuffer) {
release(dataBuffer); release(dataBuffer);
this.sink.error(exc); this.sink.error(exc);
closeChannel(this.channel);
} }
public void dispose() { public void dispose() {

View File

@ -80,8 +80,7 @@ public class ResourceRegionEncoderTests {
.expectComplete() .expectComplete()
.verify(); .verify();
// TODO: https://github.com/reactor/reactor-core/issues/1634 this.bufferFactory.checkForLeaks();
// this.bufferFactory.checkForLeaks();
} }
@Test @Test
@ -122,11 +121,10 @@ public class ResourceRegionEncoderTests {
.expectComplete() .expectComplete()
.verify(); .verify();
// TODO: https://github.com/reactor/reactor-core/issues/1634 this.bufferFactory.checkForLeaks();
// this.bufferFactory.checkForLeaks();
} }
@Test // gh- @Test // gh-22107
public void cancelWithoutDemandForMultipleResourceRegions() { public void cancelWithoutDemandForMultipleResourceRegions() {
Resource resource = new ClassPathResource("ResourceRegionEncoderTests.txt", getClass()); Resource resource = new ClassPathResource("ResourceRegionEncoderTests.txt", getClass());
Flux<ResourceRegion> regions = Flux.just( Flux<ResourceRegion> regions = Flux.just(
@ -173,8 +171,7 @@ public class ResourceRegionEncoderTests {
.expectError(EncodingException.class) .expectError(EncodingException.class)
.verify(); .verify();
// TODO: https://github.com/reactor/reactor-core/issues/1634 this.bufferFactory.checkForLeaks();
// this.bufferFactory.checkForLeaks();
} }
protected Consumer<DataBuffer> stringConsumer(String expected) { protected Consumer<DataBuffer> stringConsumer(String expected) {

View File

@ -135,6 +135,7 @@ public abstract class AbstractDataBufferAllocatingTestCase {
catch (InterruptedException ex) { catch (InterruptedException ex) {
// ignore // ignore
} }
continue;
} }
assertEquals("ByteBuf Leak: " + total + " unreleased allocations", 0, total); assertEquals("ByteBuf Leak: " + total + " unreleased allocations", 0, total);
} }

View File

@ -187,8 +187,6 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
.verify(); .verify();
} }
// TODO: Remove ignore after https://github.com/reactor/reactor-core/issues/1634
@Ignore
@Test // gh-22107 @Test // gh-22107
public void readAsynchronousFileChannelCancelWithoutDemand() throws Exception { public void readAsynchronousFileChannelCancelWithoutDemand() throws Exception {
URI uri = this.resource.getURI(); URI uri = this.resource.getURI();

View File

@ -85,6 +85,7 @@ public class LeakAwareDataBufferFactory implements DataBufferFactory {
catch (InterruptedException ex) { catch (InterruptedException ex) {
// ignore // ignore
} }
continue;
} }
List<AssertionError> errors = this.created.stream() List<AssertionError> errors = this.created.stream()
.filter(LeakAwareDataBuffer::isAllocated) .filter(LeakAwareDataBuffer::isAllocated)