Removed DataBufferUtils::split
DataBuffers::split (and underlying algorithm) should not be returning a Flux<DataBuffer>, but rather a Flux<Flux<DataBuffer>>. In other words, it should not join all data buffers that come before a delimiter. Providing an implementation of a such a fully reactive split method proved to be beyond the scope of this release, so this commit removes the method altogether.
This commit is contained in:
parent
fdcf09dc2f
commit
09572e7b60
|
|
@ -27,21 +27,17 @@ import java.nio.channels.Channels;
|
|||
import java.nio.channels.CompletionHandler;
|
||||
import java.nio.channels.ReadableByteChannel;
|
||||
import java.nio.channels.WritableByteChannel;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.file.OpenOption;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.IntPredicate;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.reactivestreams.Subscription;
|
||||
|
|
@ -572,147 +568,6 @@ public abstract class DataBufferUtils {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Splits the given stream of data buffers around the given delimiter.
|
||||
* The returned flux contains data buffers that are terminated by the given delimiter,
|
||||
* though the delimiter itself is removed.
|
||||
* @param dataBuffers the input stream of data buffers
|
||||
* @param delimiter the delimiting byte array
|
||||
* @return the flux of data buffers created by splitting the given data buffers around the
|
||||
* given delimiter
|
||||
* @since 5.2
|
||||
*/
|
||||
public static Flux<DataBuffer> split(Publisher<DataBuffer> dataBuffers, byte[] delimiter) {
|
||||
return split(dataBuffers, delimiter, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Splits the given stream of data buffers around the given delimiter.
|
||||
* The returned flux contains data buffers that are terminated by the given delimiter,
|
||||
* which is included when {@code stripDelimiter} is {@code false}.
|
||||
* @param dataBuffers the input stream of data buffers
|
||||
* @param delimiter the delimiter bytes
|
||||
* @param stripDelimiter whether to include the delimiter at the end of each resulting buffer
|
||||
* @return the flux of data buffers created by splitting the given data buffers around the
|
||||
* given delimiter
|
||||
* @since 5.2
|
||||
*/
|
||||
public static Flux<DataBuffer> split(Publisher<DataBuffer> dataBuffers, byte[] delimiter,
|
||||
boolean stripDelimiter) {
|
||||
|
||||
return split(dataBuffers, new byte[][]{delimiter}, stripDelimiter);
|
||||
}
|
||||
|
||||
/**
|
||||
* Splits the given stream of data buffers around the given delimiters.
|
||||
* The returned flux contains data buffers that are terminated by any of the given delimiters,
|
||||
* which are included when {@code stripDelimiter} is {@code false}.
|
||||
* @param dataBuffers the input stream of data buffers
|
||||
* @param delimiters the delimiters, one per element
|
||||
* @param stripDelimiter whether to include the delimiters at the end of each resulting buffer
|
||||
* @return the flux of data buffers created by splitting the given data buffers around the
|
||||
* given delimiters
|
||||
* @since 5.2
|
||||
*/
|
||||
public static Flux<DataBuffer> split(Publisher<DataBuffer> dataBuffers, byte[][] delimiters,
|
||||
boolean stripDelimiter) {
|
||||
Assert.notNull(dataBuffers, "DataBuffers must not be null");
|
||||
Assert.isTrue(delimiters.length > 0, "Delimiter must not be empty");
|
||||
|
||||
Matcher[] matchers = matchers(delimiters);
|
||||
|
||||
return Flux.from(dataBuffers)
|
||||
.flatMap(buffer -> endFrameAfterDelimiter(buffer, matchers))
|
||||
.bufferUntil(buffer -> buffer instanceof EndFrameBuffer)
|
||||
.map(buffers -> joinAndStrip(buffers, stripDelimiter))
|
||||
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
|
||||
}
|
||||
|
||||
private static Matcher[] matchers(byte[][] delimiters) {
|
||||
Assert.isTrue(delimiters.length > 0, "Delimiters must not be empty");
|
||||
Matcher[] result = new Matcher[delimiters.length];
|
||||
for (int i = 0; i < delimiters.length; i++) {
|
||||
result[i] = matcher(delimiters[i]);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds the {@link Matcher} with the first match and longest delimiter, and inserts a
|
||||
* {@link EndFrameBuffer} just after its match.
|
||||
*
|
||||
* @param dataBuffer the buffer to find delimiters in
|
||||
* @param matchers used to find the first delimiters
|
||||
* @return a flux of buffers, containing {@link EndFrameBuffer} after each delimiter that was
|
||||
* found in {@code dataBuffer}. Returns Flux, because returning List (w/ flatMapIterable)
|
||||
* results in memory leaks due to pre-fetching.
|
||||
*/
|
||||
private static Flux<DataBuffer> endFrameAfterDelimiter(DataBuffer dataBuffer, Matcher[] matchers) {
|
||||
List<DataBuffer> result = new ArrayList<>();
|
||||
do {
|
||||
int matchedEndIdx = Integer.MAX_VALUE;
|
||||
byte[] matchedDelimiter = new byte[0];
|
||||
for (Matcher matcher : matchers) {
|
||||
int endIdx = matcher.match(dataBuffer);
|
||||
if (endIdx != -1 &&
|
||||
endIdx <= matchedEndIdx &&
|
||||
matcher.delimiter().length > matchedDelimiter.length) {
|
||||
matchedEndIdx = endIdx;
|
||||
matchedDelimiter = matcher.delimiter();
|
||||
}
|
||||
}
|
||||
if (matchedDelimiter.length > 0) {
|
||||
int readPosition = dataBuffer.readPosition();
|
||||
int length = matchedEndIdx + 1 - readPosition ;
|
||||
result.add(dataBuffer.retainedSlice(readPosition, length));
|
||||
result.add(new EndFrameBuffer(matchedDelimiter));
|
||||
dataBuffer.readPosition(matchedEndIdx + 1);
|
||||
|
||||
for (Matcher matcher : matchers) {
|
||||
matcher.reset();
|
||||
}
|
||||
}
|
||||
else {
|
||||
result.add(retain(dataBuffer));
|
||||
break;
|
||||
}
|
||||
}
|
||||
while (dataBuffer.readableByteCount() > 0);
|
||||
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
return Flux.fromIterable(result);
|
||||
}
|
||||
|
||||
/**
|
||||
* Joins the given list of buffers. If the list ends with a {@link EndFrameBuffer}, it is
|
||||
* removed. If {@code stripDelimiter} is {@code true} and the resulting buffer ends with
|
||||
* a delimiter, it is removed.
|
||||
* @param dataBuffers the data buffers to join
|
||||
* @param stripDelimiter whether to strip the delimiter
|
||||
* @return the joined buffer
|
||||
*/
|
||||
private static DataBuffer joinAndStrip(List<DataBuffer> dataBuffers,
|
||||
boolean stripDelimiter) {
|
||||
|
||||
Assert.state(!dataBuffers.isEmpty(), "DataBuffers should not be empty");
|
||||
|
||||
byte[] matchingDelimiter = null;
|
||||
|
||||
int lastIdx = dataBuffers.size() - 1;
|
||||
DataBuffer lastBuffer = dataBuffers.get(lastIdx);
|
||||
if (lastBuffer instanceof EndFrameBuffer) {
|
||||
matchingDelimiter = ((EndFrameBuffer) lastBuffer).delimiter();
|
||||
dataBuffers.remove(lastIdx);
|
||||
}
|
||||
|
||||
DataBuffer result = dataBuffers.get(0).factory().join(dataBuffers);
|
||||
|
||||
if (stripDelimiter && matchingDelimiter != null) {
|
||||
result.writePosition(result.writePosition() - matchingDelimiter.length);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Defines an object that matches a data buffer against a delimiter.
|
||||
|
|
@ -1107,167 +962,4 @@ public abstract class DataBufferUtils {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
private static class EndFrameBuffer implements DataBuffer {
|
||||
|
||||
private static final DataBuffer BUFFER = new DefaultDataBufferFactory().wrap(new byte[0]);
|
||||
|
||||
private byte[] delimiter;
|
||||
|
||||
|
||||
public EndFrameBuffer(byte[] delimiter) {
|
||||
this.delimiter = delimiter;
|
||||
}
|
||||
|
||||
public byte[] delimiter() {
|
||||
return this.delimiter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataBufferFactory factory() {
|
||||
return BUFFER.factory();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int indexOf(IntPredicate predicate, int fromIndex) {
|
||||
return BUFFER.indexOf(predicate, fromIndex);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int lastIndexOf(IntPredicate predicate, int fromIndex) {
|
||||
return BUFFER.lastIndexOf(predicate, fromIndex);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readableByteCount() {
|
||||
return BUFFER.readableByteCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int writableByteCount() {
|
||||
return BUFFER.writableByteCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int capacity() {
|
||||
return BUFFER.capacity();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer capacity(int capacity) {
|
||||
return BUFFER.capacity(capacity);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer ensureCapacity(int capacity) {
|
||||
return BUFFER.ensureCapacity(capacity);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readPosition() {
|
||||
return BUFFER.readPosition();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer readPosition(int readPosition) {
|
||||
return BUFFER.readPosition(readPosition);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int writePosition() {
|
||||
return BUFFER.writePosition();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer writePosition(int writePosition) {
|
||||
return BUFFER.writePosition(writePosition);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getByte(int index) {
|
||||
return BUFFER.getByte(index);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte read() {
|
||||
return BUFFER.read();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer read(byte[] destination) {
|
||||
return BUFFER.read(destination);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer read(byte[] destination, int offset, int length) {
|
||||
return BUFFER.read(destination, offset, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer write(byte b) {
|
||||
return BUFFER.write(b);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer write(byte[] source) {
|
||||
return BUFFER.write(source);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer write(byte[] source, int offset, int length) {
|
||||
return BUFFER.write(source, offset, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer write(DataBuffer... buffers) {
|
||||
return BUFFER.write(buffers);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer write(ByteBuffer... buffers) {
|
||||
return BUFFER.write(buffers);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer write(CharSequence charSequence, Charset charset) {
|
||||
return BUFFER.write(charSequence, charset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer slice(int index, int length) {
|
||||
return BUFFER.slice(index, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer retainedSlice(int index, int length) {
|
||||
return BUFFER.retainedSlice(index, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer asByteBuffer() {
|
||||
return BUFFER.asByteBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer asByteBuffer(int index, int length) {
|
||||
return BUFFER.asByteBuffer(index, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream asInputStream() {
|
||||
return BUFFER.asInputStream();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream asInputStream(boolean releaseOnClose) {
|
||||
return BUFFER.asInputStream(releaseOnClose);
|
||||
}
|
||||
|
||||
@Override
|
||||
public OutputStream asOutputStream() {
|
||||
return BUFFER.asOutputStream();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -809,127 +809,6 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
|
|||
release(foo);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void split() {
|
||||
Mono<DataBuffer> source =
|
||||
deferStringBuffer("--foo--bar--baz--");
|
||||
|
||||
byte[] delimiter = "--".getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
Flux<DataBuffer> result = DataBufferUtils.split(source, delimiter);
|
||||
|
||||
StepVerifier.create(result)
|
||||
.consumeNextWith(stringConsumer(""))
|
||||
.consumeNextWith(stringConsumer("foo"))
|
||||
.consumeNextWith(stringConsumer("bar"))
|
||||
.consumeNextWith(stringConsumer("baz"))
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void splitIncludeDelimiter() {
|
||||
Mono<DataBuffer> source =
|
||||
deferStringBuffer("--foo--bar--baz--");
|
||||
|
||||
byte[] delimiter = "--".getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
Flux<DataBuffer> result = DataBufferUtils.split(source, delimiter, false);
|
||||
|
||||
StepVerifier.create(result)
|
||||
.consumeNextWith(stringConsumer("--"))
|
||||
.consumeNextWith(stringConsumer("foo--"))
|
||||
.consumeNextWith(stringConsumer("bar--"))
|
||||
.consumeNextWith(stringConsumer("baz--"))
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void splitMultipleDelimiters() {
|
||||
Mono<DataBuffer> source =
|
||||
deferStringBuffer("foobar␍baz");
|
||||
|
||||
byte[][] delimiters = new byte[][]{
|
||||
"".getBytes(StandardCharsets.UTF_8),
|
||||
"␍".getBytes(StandardCharsets.UTF_8)
|
||||
};
|
||||
|
||||
Flux<DataBuffer> result = DataBufferUtils.split(source, delimiters, false);
|
||||
|
||||
StepVerifier.create(result)
|
||||
.consumeNextWith(stringConsumer("foo"))
|
||||
.consumeNextWith(stringConsumer("bar␍"))
|
||||
.consumeNextWith(stringConsumer("baz"))
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void splitErrors() {
|
||||
Flux<DataBuffer> source = Flux.concat(
|
||||
deferStringBuffer("foo--"),
|
||||
deferStringBuffer("bar--"),
|
||||
Mono.error(new RuntimeException())
|
||||
);
|
||||
byte[] delimiter = "--".getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
Flux<DataBuffer> result = DataBufferUtils.split(source, delimiter);
|
||||
|
||||
StepVerifier.create(result)
|
||||
.consumeNextWith(stringConsumer("foo"))
|
||||
.consumeNextWith(stringConsumer("bar"))
|
||||
.expectError(RuntimeException.class)
|
||||
.verify();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void splitCanceled() {
|
||||
Flux<DataBuffer> source = Flux.concat(
|
||||
deferStringBuffer("foo--"),
|
||||
deferStringBuffer("bar--"),
|
||||
deferStringBuffer("baz")
|
||||
);
|
||||
byte[] delimiter = "--".getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
Flux<DataBuffer> result = DataBufferUtils.split(source, delimiter);
|
||||
|
||||
StepVerifier.create(result)
|
||||
.thenCancel()
|
||||
.verify();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void splitWithoutDemand() {
|
||||
Flux<DataBuffer> source = Flux.concat(
|
||||
deferStringBuffer("foo--"),
|
||||
deferStringBuffer("bar--")
|
||||
);
|
||||
byte[] delimiter = "--".getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
Flux<DataBuffer> result = DataBufferUtils.split(source, delimiter);
|
||||
|
||||
BaseSubscriber<DataBuffer> subscriber = new ZeroDemandSubscriber();
|
||||
result.subscribe(subscriber);
|
||||
subscriber.cancel();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void splitAcrossBuffer() {
|
||||
Flux<DataBuffer> source = Flux.concat(
|
||||
deferStringBuffer("foo-"),
|
||||
deferStringBuffer("-bar-"),
|
||||
deferStringBuffer("-baz"));
|
||||
|
||||
byte[] delimiter = "--".getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
Flux<DataBuffer> result = DataBufferUtils.split(source, delimiter);
|
||||
|
||||
StepVerifier.create(result)
|
||||
.consumeNextWith(stringConsumer("foo"))
|
||||
.consumeNextWith(stringConsumer("bar"))
|
||||
.consumeNextWith(stringConsumer("baz"))
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
|
||||
private static class ZeroDemandSubscriber extends BaseSubscriber<DataBuffer> {
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue