From ef19a6bf8ecd58c60a041640302ecb7b68351527 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Wed, 10 Jul 2019 12:14:22 +0200 Subject: [PATCH] Various DataBufferUtils improvements - Made sure that takeUntilByteCount is usable in multiple subscriptions. - Added composite Matcher --- .../core/io/buffer/DataBufferUtils.java | 106 +++++++++++++++--- 1 file changed, 92 insertions(+), 14 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 0ba1fc20661..83f39f5d818 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -419,19 +419,21 @@ public abstract class DataBufferUtils { Assert.notNull(publisher, "Publisher must not be null"); Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number"); - AtomicLong countDown = new AtomicLong(maxByteCount); - return Flux.from(publisher) - .map(buffer -> { - long remainder = countDown.addAndGet(-buffer.readableByteCount()); - if (remainder < 0) { - int length = buffer.readableByteCount() + (int) remainder; - return buffer.slice(0, length); - } - else { - return buffer; - } - }) - .takeUntil(buffer -> countDown.get() <= 0); + return Flux.defer(() -> { + AtomicLong countDown = new AtomicLong(maxByteCount); + return Flux.from(publisher) + .map(buffer -> { + long remainder = countDown.addAndGet(-buffer.readableByteCount()); + if (remainder < 0) { + int length = buffer.readableByteCount() + (int) remainder; + return buffer.slice(0, length); + } + else { + return buffer; + } + }) + .takeUntil(buffer -> countDown.get() <= 0); + }); // No doOnDiscard as operators used do not cache (and drop) buffers } @@ -539,7 +541,7 @@ public abstract class DataBufferUtils { } /** - * Return a {@link Matcher} for the given delimiters. The matcher can be used to find the + * Return a {@link Matcher} for the given delimiter. The matcher can be used to find the * delimiters in data buffers. * @param delimiter the delimiter bytes to find * @return the matcher @@ -550,6 +552,26 @@ public abstract class DataBufferUtils { return new KnuthMorrisPrattMatcher(delimiter); } + /** Return a {@link Matcher} for the given delimiters. The matcher can be used to find the + * delimiters in data buffers. + * @param delimiters the delimiters bytes to find + * @return the matcher + * @since 5.2 + */ + public static Matcher matcher(byte[]... delimiters) { + Assert.isTrue(delimiters.length > 0, "Delimiters must not be empty"); + if (delimiters.length == 1) { + return matcher(delimiters[0]); + } + else { + Matcher[] matchers = new Matcher[delimiters.length]; + for (int i = 0; i < delimiters.length; i++) { + matchers[i] = matcher(delimiters[i]); + } + return new CompositeMatcher(matchers); + } + } + /** * Splits the given stream of data buffers around the given delimiter. * The returned flux contains data buffers that are terminated by the given delimiter, @@ -1030,6 +1052,62 @@ public abstract class DataBufferUtils { } } + /** + * Implementation of {@link Matcher} that wraps several other matchers. + */ + private static class CompositeMatcher implements Matcher { + + private static final byte[] NO_DELIMITER = new byte[0]; + + private final Matcher[] matchers; + + byte[] longestDelimiter = NO_DELIMITER; + + + public CompositeMatcher(Matcher[] matchers) { + this.matchers = matchers; + } + + @Override + public int match(DataBuffer dataBuffer) { + this.longestDelimiter = NO_DELIMITER; + int bestEndIdx = Integer.MAX_VALUE; + + + for (Matcher matcher : this.matchers) { + int endIdx = matcher.match(dataBuffer); + if (endIdx != -1 && + endIdx <= bestEndIdx && + matcher.delimiter().length > this.longestDelimiter.length) { + bestEndIdx = endIdx; + this.longestDelimiter = matcher.delimiter(); + } + } + if (bestEndIdx == Integer.MAX_VALUE) { + this.longestDelimiter = NO_DELIMITER; + return -1; + } + else { + reset(); + return bestEndIdx; + } + } + + @Override + public byte[] delimiter() { + Assert.state(this.longestDelimiter != NO_DELIMITER, "Illegal state!"); + return this.longestDelimiter; + } + + @Override + public void reset() { + for (Matcher matcher : this.matchers) { + matcher.reset(); + } + } + } + + private static class EndFrameBuffer implements DataBuffer {