diff --git a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java index e36345de02a..bd6c5c7c007 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java @@ -32,6 +32,7 @@ import reactor.core.publisher.Flux; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferLimitException; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.DataBufferWrapper; import org.springframework.core.io.buffer.DefaultDataBufferFactory; @@ -96,22 +97,13 @@ public final class StringDecoder extends AbstractDataBufferDecoder { Flux inputFlux = Flux.defer(() -> { DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes); - Flux buffers = Flux.from(input) - .concatMapIterable(buffer -> endFrameAfterDelimiter(buffer, matcher)); + @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") + LimitChecker limiter = new LimitChecker(getMaxInMemorySize()); - Flux> delimitedBuffers; - if (getMaxInMemorySize() != -1) { - delimitedBuffers = buffers - .windowUntil(buffer -> buffer instanceof EndFrameBuffer) - .concatMap(window -> window.collect( - () -> new LimitedDataBufferList(getMaxInMemorySize()), - LimitedDataBufferList::add)); - } - else { - delimitedBuffers = buffers.bufferUntil(buffer -> buffer instanceof EndFrameBuffer); - } - - return delimitedBuffers + return Flux.from(input) + .concatMapIterable(buffer -> endFrameAfterDelimiter(buffer, matcher)) + .doOnNext(limiter) + .bufferUntil(buffer -> buffer instanceof EndFrameBuffer) .map(list -> joinAndStrip(list, this.stripDelimiter)) .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); }); @@ -205,7 +197,7 @@ public final class StringDecoder extends AbstractDataBufferDecoder { DataBuffer lastBuffer = dataBuffers.get(lastIdx); if (lastBuffer instanceof EndFrameBuffer) { matchingDelimiter = ((EndFrameBuffer) lastBuffer).delimiter(); - dataBuffers = dataBuffers.subList(0, lastIdx); + dataBuffers.remove(lastIdx); } DataBuffer result = dataBuffers.get(0).factory().join(dataBuffers); @@ -296,31 +288,28 @@ public final class StringDecoder extends AbstractDataBufferDecoder { } - private class ConcatMapIterableDiscardWorkaroundCache implements Consumer, Runnable { + private static class LimitChecker implements Consumer { - private final List buffers = new ArrayList<>(); + @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") + private final LimitedDataBufferList list; - public List addAll(List buffersToAdd) { - this.buffers.addAll(buffersToAdd); - return buffersToAdd; + LimitChecker(int maxInMemorySize) { + this.list = new LimitedDataBufferList(maxInMemorySize); } @Override - public void accept(DataBuffer dataBuffer) { - this.buffers.remove(dataBuffer); - } - - @Override - public void run() { - this.buffers.forEach(buffer -> { - try { - DataBufferUtils.release(buffer); - } - catch (Throwable ex) { - // Keep going.. - } - }); + public void accept(DataBuffer buffer) { + if (buffer instanceof EndFrameBuffer) { + this.list.clear(); + } + try { + this.list.add(buffer); + } + catch (DataBufferLimitException ex) { + DataBufferUtils.release(buffer); + throw ex; + } } } diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index b1beb6f2cbd..a11c3edcc3e 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -547,13 +547,10 @@ public abstract class DataBufferUtils { return (Mono) buffers; } - // TODO: Drop doOnDiscard(LimitedDataBufferList.class, ...) (reactor-core#1924) - return Flux.from(buffers) .collect(() -> new LimitedDataBufferList(maxByteCount), LimitedDataBufferList::add) .filter(list -> !list.isEmpty()) .map(list -> list.get(0).factory().join(list)) - .doOnDiscard(LimitedDataBufferList.class, LimitedDataBufferList::releaseAndClear) .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); } diff --git a/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java index 7057c52865b..b3bcdfe8f7a 100644 --- a/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java +++ b/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java @@ -78,11 +78,11 @@ class StringDecoderTests extends AbstractDecoderTests { // TODO: temporarily replace testDecodeAll with explicit decode/cancel/empty // see https://github.com/reactor/reactor-core/issues/2041 - testDecode(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null); - testDecodeCancel(input, TYPE, null, null); - testDecodeEmpty(TYPE, null, null); +// testDecode(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null); +// testDecodeCancel(input, TYPE, null, null); +// testDecodeEmpty(TYPE, null, null); - // testDecodeAll(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null); + testDecodeAll(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null); } @Test