Eliminate windowUntil from StringDecoder

This is a follow-up on the earlier commit
28a95e89f3 eliminating windowUntil
entirely which generates a BubblingException wrapper. This also keeps
the chain a little simpler.

See gh-24355
This commit is contained in:
Rossen Stoyanchev 2020-02-11 16:49:21 +00:00
parent e35d3b8bb5
commit d552105516
3 changed files with 28 additions and 42 deletions

View File

@ -32,6 +32,7 @@ import reactor.core.publisher.Flux;
import org.springframework.core.ResolvableType; import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferLimitException;
import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DataBufferWrapper; import org.springframework.core.io.buffer.DataBufferWrapper;
import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.core.io.buffer.DefaultDataBufferFactory;
@ -96,22 +97,13 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
Flux<DataBuffer> inputFlux = Flux.defer(() -> { Flux<DataBuffer> inputFlux = Flux.defer(() -> {
DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes); DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes);
Flux<DataBuffer> buffers = Flux.from(input) @SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
.concatMapIterable(buffer -> endFrameAfterDelimiter(buffer, matcher)); LimitChecker limiter = new LimitChecker(getMaxInMemorySize());
Flux<List<DataBuffer>> delimitedBuffers; return Flux.from(input)
if (getMaxInMemorySize() != -1) { .concatMapIterable(buffer -> endFrameAfterDelimiter(buffer, matcher))
delimitedBuffers = buffers .doOnNext(limiter)
.windowUntil(buffer -> buffer instanceof EndFrameBuffer) .bufferUntil(buffer -> buffer instanceof EndFrameBuffer)
.concatMap(window -> window.collect(
() -> new LimitedDataBufferList(getMaxInMemorySize()),
LimitedDataBufferList::add));
}
else {
delimitedBuffers = buffers.bufferUntil(buffer -> buffer instanceof EndFrameBuffer);
}
return delimitedBuffers
.map(list -> joinAndStrip(list, this.stripDelimiter)) .map(list -> joinAndStrip(list, this.stripDelimiter))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}); });
@ -205,7 +197,7 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
DataBuffer lastBuffer = dataBuffers.get(lastIdx); DataBuffer lastBuffer = dataBuffers.get(lastIdx);
if (lastBuffer instanceof EndFrameBuffer) { if (lastBuffer instanceof EndFrameBuffer) {
matchingDelimiter = ((EndFrameBuffer) lastBuffer).delimiter(); matchingDelimiter = ((EndFrameBuffer) lastBuffer).delimiter();
dataBuffers = dataBuffers.subList(0, lastIdx); dataBuffers.remove(lastIdx);
} }
DataBuffer result = dataBuffers.get(0).factory().join(dataBuffers); DataBuffer result = dataBuffers.get(0).factory().join(dataBuffers);
@ -296,31 +288,28 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
} }
private class ConcatMapIterableDiscardWorkaroundCache implements Consumer<DataBuffer>, Runnable { private static class LimitChecker implements Consumer<DataBuffer> {
private final List<DataBuffer> buffers = new ArrayList<>(); @SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
private final LimitedDataBufferList list;
public List<DataBuffer> addAll(List<DataBuffer> buffersToAdd) { LimitChecker(int maxInMemorySize) {
this.buffers.addAll(buffersToAdd); this.list = new LimitedDataBufferList(maxInMemorySize);
return buffersToAdd;
} }
@Override @Override
public void accept(DataBuffer dataBuffer) { public void accept(DataBuffer buffer) {
this.buffers.remove(dataBuffer); if (buffer instanceof EndFrameBuffer) {
this.list.clear();
} }
@Override
public void run() {
this.buffers.forEach(buffer -> {
try { try {
this.list.add(buffer);
}
catch (DataBufferLimitException ex) {
DataBufferUtils.release(buffer); DataBufferUtils.release(buffer);
throw ex;
} }
catch (Throwable ex) {
// Keep going..
}
});
} }
} }

View File

@ -547,13 +547,10 @@ public abstract class DataBufferUtils {
return (Mono<DataBuffer>) buffers; return (Mono<DataBuffer>) buffers;
} }
// TODO: Drop doOnDiscard(LimitedDataBufferList.class, ...) (reactor-core#1924)
return Flux.from(buffers) return Flux.from(buffers)
.collect(() -> new LimitedDataBufferList(maxByteCount), LimitedDataBufferList::add) .collect(() -> new LimitedDataBufferList(maxByteCount), LimitedDataBufferList::add)
.filter(list -> !list.isEmpty()) .filter(list -> !list.isEmpty())
.map(list -> list.get(0).factory().join(list)) .map(list -> list.get(0).factory().join(list))
.doOnDiscard(LimitedDataBufferList.class, LimitedDataBufferList::releaseAndClear)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
} }

View File

@ -78,11 +78,11 @@ class StringDecoderTests extends AbstractDecoderTests<StringDecoder> {
// TODO: temporarily replace testDecodeAll with explicit decode/cancel/empty // TODO: temporarily replace testDecodeAll with explicit decode/cancel/empty
// see https://github.com/reactor/reactor-core/issues/2041 // see https://github.com/reactor/reactor-core/issues/2041
testDecode(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null); // testDecode(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null);
testDecodeCancel(input, TYPE, null, null); // testDecodeCancel(input, TYPE, null, null);
testDecodeEmpty(TYPE, null, null); // testDecodeEmpty(TYPE, null, null);
// testDecodeAll(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null); testDecodeAll(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null);
} }
@Test @Test