Improve limit handling in StringDecoder

The case of one data buffer containing multiple lines can could cause
a buffer leak due to a suspected issue in concatMapIterable. This
commit adds workarounds for that until the underlying issue is
addressed.

Closes gh-24339
This commit is contained in:
Rossen Stoyanchev 2020-01-13 14:55:29 +00:00
parent 850cbf032b
commit a741ae422b
2 changed files with 113 additions and 58 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -94,20 +94,44 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
byte[][] delimiterBytes = getDelimiterBytes(mimeType);
// TODO: Drop Consumer and use bufferUntil with Supplier<LimistedDataBufferList> (reactor-core#1925)
// TODO: Drop doOnDiscard(LimitedDataBufferList.class, ...) (reactor-core#1924)
LimitedDataBufferConsumer limiter = new LimitedDataBufferConsumer(getMaxInMemorySize());
Flux<DataBuffer> inputFlux = Flux.defer(() -> {
DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes);
return Flux.from(input)
.concatMapIterable(buffer -> endFrameAfterDelimiter(buffer, matcher))
.doOnNext(limiter)
.bufferUntil(buffer -> buffer instanceof EndFrameBuffer)
.map(buffers -> joinAndStrip(buffers, this.stripDelimiter))
.doOnDiscard(LimitedDataBufferList.class, LimitedDataBufferList::releaseAndClear)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
if (getMaxInMemorySize() != -1) {
// Passing limiter into endFrameAfterDelimiter helps to ensure that in case of one DataBuffer
// containing multiple lines, the limit is checked and raised immediately without accumulating
// subsequent lines. This is necessary because concatMapIterable doesn't respect doOnDiscard.
// When reactor-core#1925 is resolved, we could replace bufferUntil with:
// .windowUntil(buffer -> buffer instanceof EndFrameBuffer)
// .concatMap(fluxes -> fluxes.collect(() -> new LimitedDataBufferList(getMaxInMemorySize()), LimitedDataBufferList::add))
LimitedDataBufferList limiter = new LimitedDataBufferList(getMaxInMemorySize());
return Flux.from(input)
.concatMapIterable(buffer -> endFrameAfterDelimiter(buffer, matcher, limiter))
.bufferUntil(buffer -> buffer instanceof EndFrameBuffer)
.map(buffers -> joinAndStrip(buffers, this.stripDelimiter))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
else {
// When the decoder is unlimited (-1), concatMapIterable will cache buffers that may not
// be released if cancel is signalled before they are turned into String lines
// (see test maxInMemoryLimitReleasesUnprocessedLinesWhenUnlimited).
// When reactor-core#1925 is resolved, the workaround can be removed and the entire
// else clause possibly dropped.
ConcatMapIterableDiscardWorkaroundCache cache = new ConcatMapIterableDiscardWorkaroundCache();
return Flux.from(input)
.concatMapIterable(buffer -> cache.addAll(endFrameAfterDelimiter(buffer, matcher, null)))
.doOnNext(cache)
.doOnCancel(cache)
.bufferUntil(buffer -> buffer instanceof EndFrameBuffer)
.map(buffers -> joinAndStrip(buffers, this.stripDelimiter))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
});
return super.decode(inputFlux, elementType, mimeType, hints);
@ -152,29 +176,49 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
*
* @param dataBuffer the buffer to find delimiters in
* @param matcher used to find the first delimiters
* @param limiter to enforce maxInMemorySize with
* @return a flux of buffers, containing {@link EndFrameBuffer} after each delimiter that was
* found in {@code dataBuffer}. Returns Flux, because returning List (w/ flatMapIterable)
* results in memory leaks due to pre-fetching.
*/
private static List<DataBuffer> endFrameAfterDelimiter(DataBuffer dataBuffer, DataBufferUtils.Matcher matcher) {
List<DataBuffer> result = new ArrayList<>();
do {
int endIdx = matcher.match(dataBuffer);
if (endIdx != -1) {
int readPosition = dataBuffer.readPosition();
int length = endIdx - readPosition + 1;
result.add(dataBuffer.retainedSlice(readPosition, length));
result.add(new EndFrameBuffer(matcher.delimiter()));
dataBuffer.readPosition(endIdx + 1);
}
else {
result.add(DataBufferUtils.retain(dataBuffer));
break;
}
}
while (dataBuffer.readableByteCount() > 0);
private static List<DataBuffer> endFrameAfterDelimiter(
DataBuffer dataBuffer, DataBufferUtils.Matcher matcher, @Nullable LimitedDataBufferList limiter) {
DataBufferUtils.release(dataBuffer);
List<DataBuffer> result = new ArrayList<>();
try {
do {
int endIdx = matcher.match(dataBuffer);
if (endIdx != -1) {
int readPosition = dataBuffer.readPosition();
int length = (endIdx - readPosition + 1);
DataBuffer slice = dataBuffer.retainedSlice(readPosition, length);
result.add(slice);
result.add(new EndFrameBuffer(matcher.delimiter()));
dataBuffer.readPosition(endIdx + 1);
if (limiter != null) {
limiter.add(slice); // enforce the limit
limiter.clear();
}
}
else {
result.add(DataBufferUtils.retain(dataBuffer));
if (limiter != null) {
limiter.add(dataBuffer);
}
break;
}
}
while (dataBuffer.readableByteCount() > 0);
}
catch (DataBufferLimitException ex) {
if (limiter != null) {
limiter.releaseAndClear();
}
throw ex;
}
finally {
DataBufferUtils.release(dataBuffer);
}
return result;
}
@ -288,34 +332,32 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
}
/**
* Temporary measure for reactor-core#1925.
* Consumer that adds to a {@link LimitedDataBufferList} to enforce limits.
*/
private static class LimitedDataBufferConsumer implements Consumer<DataBuffer> {
private class ConcatMapIterableDiscardWorkaroundCache implements Consumer<DataBuffer>, Runnable {
private final LimitedDataBufferList bufferList;
private final List<DataBuffer> buffers = new ArrayList<>();
public LimitedDataBufferConsumer(int maxInMemorySize) {
this.bufferList = new LimitedDataBufferList(maxInMemorySize);
public List<DataBuffer> addAll(List<DataBuffer> buffersToAdd) {
this.buffers.addAll(buffersToAdd);
return buffersToAdd;
}
@Override
public void accept(DataBuffer buffer) {
if (buffer instanceof EndFrameBuffer) {
this.bufferList.clear();
}
else {
public void accept(DataBuffer dataBuffer) {
this.buffers.remove(dataBuffer);
}
@Override
public void run() {
this.buffers.forEach(buffer -> {
try {
this.bufferList.add(buffer);
}
catch (DataBufferLimitException ex) {
DataBufferUtils.release(buffer);
throw ex;
}
}
catch (Throwable ex) {
// Keep going..
}
});
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -130,17 +130,30 @@ class StringDecoderTests extends AbstractDecoderTests<StringDecoder> {
}
@Test
void decodeNewLineWithLimit() {
void maxInMemoryLimit() {
Flux<DataBuffer> input = Flux.just(
stringBuffer("abc\n"),
stringBuffer("defg\n"),
stringBuffer("hijkl\n")
);
this.decoder.setMaxInMemorySize(5);
stringBuffer("abc\n"), stringBuffer("defg\n"), stringBuffer("hijkl\n"));
this.decoder.setMaxInMemorySize(5);
testDecode(input, String.class, step ->
step.expectNext("abc", "defg")
.verifyError(DataBufferLimitException.class));
step.expectNext("abc", "defg").verifyError(DataBufferLimitException.class));
}
@Test // gh-24312
void maxInMemoryLimitReleaseUnprocessedLinesFromCurrentBuffer() {
Flux<DataBuffer> input = Flux.just(
stringBuffer("TOO MUCH DATA\nanother line\n\nand another\n"));
this.decoder.setMaxInMemorySize(5);
testDecode(input, String.class, step -> step.verifyError(DataBufferLimitException.class));
}
@Test // gh-24339
void maxInMemoryLimitReleaseUnprocessedLinesWhenUnlimited() {
Flux<DataBuffer> input = Flux.just(stringBuffer("Line 1\nLine 2\nLine 3\n"));
this.decoder.setMaxInMemorySize(-1);
testDecodeCancel(input, ResolvableType.forClass(String.class), null, Collections.emptyMap());
}
@Test