Various DataBufferUtils improvements

- Made sure that takeUntilByteCount is usable in multiple subscriptions.
- Added composite Matcher
This commit is contained in:
Arjen Poutsma 2019-07-10 12:14:22 +02:00
parent 77c24aac2f
commit ef19a6bf8e
1 changed files with 92 additions and 14 deletions

View File

@ -419,19 +419,21 @@ public abstract class DataBufferUtils {
Assert.notNull(publisher, "Publisher must not be null");
Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
AtomicLong countDown = new AtomicLong(maxByteCount);
return Flux.from(publisher)
.map(buffer -> {
long remainder = countDown.addAndGet(-buffer.readableByteCount());
if (remainder < 0) {
int length = buffer.readableByteCount() + (int) remainder;
return buffer.slice(0, length);
}
else {
return buffer;
}
})
.takeUntil(buffer -> countDown.get() <= 0);
return Flux.defer(() -> {
AtomicLong countDown = new AtomicLong(maxByteCount);
return Flux.from(publisher)
.map(buffer -> {
long remainder = countDown.addAndGet(-buffer.readableByteCount());
if (remainder < 0) {
int length = buffer.readableByteCount() + (int) remainder;
return buffer.slice(0, length);
}
else {
return buffer;
}
})
.takeUntil(buffer -> countDown.get() <= 0);
});
// No doOnDiscard as operators used do not cache (and drop) buffers
}
@ -539,7 +541,7 @@ public abstract class DataBufferUtils {
}
/**
* Return a {@link Matcher} for the given delimiters. The matcher can be used to find the
* Return a {@link Matcher} for the given delimiter. The matcher can be used to find the
* delimiters in data buffers.
* @param delimiter the delimiter bytes to find
* @return the matcher
@ -550,6 +552,26 @@ public abstract class DataBufferUtils {
return new KnuthMorrisPrattMatcher(delimiter);
}
/** Return a {@link Matcher} for the given delimiters. The matcher can be used to find the
* delimiters in data buffers.
* @param delimiters the delimiters bytes to find
* @return the matcher
* @since 5.2
*/
public static Matcher matcher(byte[]... delimiters) {
Assert.isTrue(delimiters.length > 0, "Delimiters must not be empty");
if (delimiters.length == 1) {
return matcher(delimiters[0]);
}
else {
Matcher[] matchers = new Matcher[delimiters.length];
for (int i = 0; i < delimiters.length; i++) {
matchers[i] = matcher(delimiters[i]);
}
return new CompositeMatcher(matchers);
}
}
/**
* Splits the given stream of data buffers around the given delimiter.
* The returned flux contains data buffers that are terminated by the given delimiter,
@ -1030,6 +1052,62 @@ public abstract class DataBufferUtils {
}
}
/**
* Implementation of {@link Matcher} that wraps several other matchers.
*/
private static class CompositeMatcher implements Matcher {
private static final byte[] NO_DELIMITER = new byte[0];
private final Matcher[] matchers;
byte[] longestDelimiter = NO_DELIMITER;
public CompositeMatcher(Matcher[] matchers) {
this.matchers = matchers;
}
@Override
public int match(DataBuffer dataBuffer) {
this.longestDelimiter = NO_DELIMITER;
int bestEndIdx = Integer.MAX_VALUE;
for (Matcher matcher : this.matchers) {
int endIdx = matcher.match(dataBuffer);
if (endIdx != -1 &&
endIdx <= bestEndIdx &&
matcher.delimiter().length > this.longestDelimiter.length) {
bestEndIdx = endIdx;
this.longestDelimiter = matcher.delimiter();
}
}
if (bestEndIdx == Integer.MAX_VALUE) {
this.longestDelimiter = NO_DELIMITER;
return -1;
}
else {
reset();
return bestEndIdx;
}
}
@Override
public byte[] delimiter() {
Assert.state(this.longestDelimiter != NO_DELIMITER, "Illegal state!");
return this.longestDelimiter;
}
@Override
public void reset() {
for (Matcher matcher : this.matchers) {
matcher.reset();
}
}
}
private static class EndFrameBuffer implements DataBuffer {