Various DataBuffer improvements
- Added fromIndex parameter to indexOf and lastIndexOf - Moved DataBuffer.tokenize to StringEncoder, as that's the only place it's used.
This commit is contained in:
parent
622d11dbce
commit
ea21643a29
|
@ -19,7 +19,9 @@ package org.springframework.core.codec.support;
|
|||
import java.nio.CharBuffer;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.function.IntPredicate;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
@ -47,6 +49,8 @@ public class StringDecoder extends AbstractDecoder<String> {
|
|||
|
||||
public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
|
||||
|
||||
private static final IntPredicate NEWLINE_DELIMITER = b -> b == '\n' || b == '\r';
|
||||
|
||||
private final boolean splitOnNewline;
|
||||
|
||||
/**
|
||||
|
@ -80,11 +84,7 @@ public class StringDecoder extends AbstractDecoder<String> {
|
|||
MimeType mimeType, Object... hints) {
|
||||
Flux<DataBuffer> inputFlux = Flux.from(inputStream);
|
||||
if (this.splitOnNewline) {
|
||||
inputFlux = inputFlux.flatMap(dataBuffer -> {
|
||||
List<DataBuffer> tokens =
|
||||
DataBufferUtils.tokenize(dataBuffer, b -> b == '\n' || b == '\r');
|
||||
return Flux.fromIterable(tokens);
|
||||
});
|
||||
inputFlux = inputFlux.flatMap(StringDecoder::splitOnNewline);
|
||||
}
|
||||
Charset charset = getCharset(mimeType);
|
||||
return inputFlux.map(dataBuffer -> {
|
||||
|
@ -94,6 +94,24 @@ public class StringDecoder extends AbstractDecoder<String> {
|
|||
});
|
||||
}
|
||||
|
||||
private static Flux<DataBuffer> splitOnNewline(DataBuffer dataBuffer) {
|
||||
List<DataBuffer> results = new ArrayList<DataBuffer>();
|
||||
int startIdx = 0;
|
||||
int endIdx = 0;
|
||||
final int limit = dataBuffer.readableByteCount();
|
||||
do {
|
||||
endIdx = dataBuffer.indexOf(NEWLINE_DELIMITER, startIdx);
|
||||
int length = endIdx != -1 ? endIdx - startIdx + 1 : limit - startIdx;
|
||||
DataBuffer token = dataBuffer.slice(startIdx, length);
|
||||
results.add(DataBufferUtils.retain(token));
|
||||
startIdx = endIdx + 1;
|
||||
}
|
||||
while (startIdx < limit && endIdx != -1);
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
return Flux.fromIterable(results);
|
||||
}
|
||||
|
||||
|
||||
private Charset getCharset(MimeType mimeType) {
|
||||
if (mimeType != null && mimeType.getCharset() != null) {
|
||||
return mimeType.getCharset();
|
||||
|
|
|
@ -38,19 +38,21 @@ public interface DataBuffer {
|
|||
* Returns the index of the first byte in this buffer that matches the given
|
||||
* predicate.
|
||||
* @param predicate the predicate to match
|
||||
* @param fromIndex the index to start the search from
|
||||
* @return the index of the first byte that matches {@code predicate}; or {@code -1}
|
||||
* if none match
|
||||
*/
|
||||
int indexOf(IntPredicate predicate);
|
||||
int indexOf(IntPredicate predicate, int fromIndex);
|
||||
|
||||
/**
|
||||
* Returns the index of the last byte in this buffer that matches the given
|
||||
* predicate.
|
||||
* @param predicate the predicate to match
|
||||
* @param fromIndex the index to start the search from
|
||||
* @return the index of the last byte that matches {@code predicate}; or {@code -1}
|
||||
* if none match
|
||||
*/
|
||||
int lastIndexOf(IntPredicate predicate);
|
||||
int lastIndexOf(IntPredicate predicate, int fromIndex);
|
||||
|
||||
/**
|
||||
* Returns the number of bytes that can be read from this data buffer.
|
||||
|
|
|
@ -84,8 +84,16 @@ public class DefaultDataBuffer implements DataBuffer {
|
|||
}
|
||||
|
||||
@Override
|
||||
public int indexOf(IntPredicate predicate) {
|
||||
for (int i = 0; i < readableByteCount(); i++) {
|
||||
public int indexOf(IntPredicate predicate, int fromIndex) {
|
||||
Assert.notNull(predicate, "'predicate' must not be null");
|
||||
|
||||
if (fromIndex < 0) {
|
||||
fromIndex = 0;
|
||||
}
|
||||
else if (fromIndex >= this.writePosition) {
|
||||
return -1;
|
||||
}
|
||||
for (int i = fromIndex; i < this.writePosition; i++) {
|
||||
byte b = this.byteBuffer.get(i);
|
||||
if (predicate.test(b)) {
|
||||
return i;
|
||||
|
@ -95,8 +103,10 @@ public class DefaultDataBuffer implements DataBuffer {
|
|||
}
|
||||
|
||||
@Override
|
||||
public int lastIndexOf(IntPredicate predicate) {
|
||||
for (int i = readableByteCount() - 1; i >= 0; i--) {
|
||||
public int lastIndexOf(IntPredicate predicate, int fromIndex) {
|
||||
Assert.notNull(predicate, "'predicate' must not be null");
|
||||
int i = Math.min(fromIndex, this.writePosition - 1);
|
||||
for (; i >= 0; i--) {
|
||||
byte b = this.byteBuffer.get(i);
|
||||
if (predicate.test(b)) {
|
||||
return i;
|
||||
|
|
|
@ -69,15 +69,28 @@ public class NettyDataBuffer implements PooledDataBuffer {
|
|||
}
|
||||
|
||||
@Override
|
||||
public int indexOf(IntPredicate predicate) {
|
||||
IntPredicate negated = predicate.negate();
|
||||
return this.byteBuf.forEachByte(negated::test);
|
||||
public int indexOf(IntPredicate predicate, int fromIndex) {
|
||||
Assert.notNull(predicate, "'predicate' must not be null");
|
||||
if (fromIndex < 0) {
|
||||
fromIndex = 0;
|
||||
}
|
||||
else if (fromIndex >= this.byteBuf.writerIndex()) {
|
||||
return -1;
|
||||
}
|
||||
int length = this.byteBuf.writerIndex() - fromIndex;
|
||||
|
||||
return this.byteBuf.forEachByte(fromIndex, length, predicate.negate()::test);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int lastIndexOf(IntPredicate predicate) {
|
||||
IntPredicate negated = predicate.negate();
|
||||
return this.byteBuf.forEachByteDesc(negated::test);
|
||||
public int lastIndexOf(IntPredicate predicate, int fromIndex) {
|
||||
Assert.notNull(predicate, "'predicate' must not be null");
|
||||
if (fromIndex < 0) {
|
||||
return -1;
|
||||
}
|
||||
fromIndex = Math.min(fromIndex, this.byteBuf.writerIndex() - 1);
|
||||
|
||||
return this.byteBuf.forEachByteDesc(0, fromIndex, predicate.negate()::test);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -21,13 +21,9 @@ import java.io.InputStream;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.channels.ReadableByteChannel;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.IntPredicate;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
@ -124,45 +120,6 @@ public abstract class DataBufferUtils {
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Tokenize the {@link DataBuffer} using the given delimiter
|
||||
* function. Does not include the delimiter in the result.
|
||||
* @param dataBuffer the data buffer to tokenize
|
||||
* @param delimiter the delimiter function
|
||||
* @return the tokens
|
||||
*/
|
||||
public static List<DataBuffer> tokenize(DataBuffer dataBuffer,
|
||||
IntPredicate delimiter) {
|
||||
Assert.notNull(dataBuffer, "'dataBuffer' must not be null");
|
||||
Assert.notNull(delimiter, "'delimiter' must not be null");
|
||||
|
||||
List<DataBuffer> results = new ArrayList<DataBuffer>();
|
||||
int idx;
|
||||
do {
|
||||
idx = dataBuffer.indexOf(delimiter);
|
||||
if (idx < 0) {
|
||||
results.add(dataBuffer);
|
||||
}
|
||||
else {
|
||||
if (idx > 0) {
|
||||
DataBuffer slice = dataBuffer.slice(0, idx);
|
||||
slice = retain(slice);
|
||||
results.add(slice);
|
||||
}
|
||||
int remainingLen = dataBuffer.readableByteCount() - (idx + 1);
|
||||
if (remainingLen > 0) {
|
||||
dataBuffer = dataBuffer.slice(idx + 1, remainingLen);
|
||||
}
|
||||
else {
|
||||
release(dataBuffer);
|
||||
idx = -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
while (idx != -1);
|
||||
return Collections.unmodifiableList(results);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retains the given data buffer, it it is a {@link PooledDataBuffer}.
|
||||
* @param dataBuffer the data buffer to retain
|
||||
|
|
|
@ -82,8 +82,7 @@ public class StringDecoderTests extends AbstractDataBufferAllocatingTestCase {
|
|||
TestSubscriber
|
||||
.subscribe(output)
|
||||
.assertNoError()
|
||||
.assertComplete()
|
||||
.assertValues("foo", "bar", "baz");
|
||||
.assertComplete().assertValues("\n", "foo\r", "\n", "bar\r", "\n", "baz");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -190,10 +190,16 @@ public class DataBufferTests extends AbstractDataBufferAllocatingTestCase {
|
|||
DataBuffer buffer = createDataBuffer(3);
|
||||
buffer.write(new byte[]{'a', 'b', 'c'});
|
||||
|
||||
int result = buffer.indexOf(b -> b == 'c');
|
||||
int result = buffer.indexOf(b -> b == 'c', 0);
|
||||
assertEquals(2, result);
|
||||
|
||||
result = buffer.indexOf(b -> b == 'z');
|
||||
result = buffer.indexOf(b -> b == 'c', Integer.MIN_VALUE);
|
||||
assertEquals(2, result);
|
||||
|
||||
result = buffer.indexOf(b -> b == 'c', Integer.MAX_VALUE);
|
||||
assertEquals(-1, result);
|
||||
|
||||
result = buffer.indexOf(b -> b == 'z', 0);
|
||||
assertEquals(-1, result);
|
||||
|
||||
release(buffer);
|
||||
|
@ -204,10 +210,16 @@ public class DataBufferTests extends AbstractDataBufferAllocatingTestCase {
|
|||
DataBuffer buffer = createDataBuffer(3);
|
||||
buffer.write(new byte[]{'a', 'b', 'c'});
|
||||
|
||||
int result = buffer.lastIndexOf(b -> b == 'b');
|
||||
int result = buffer.lastIndexOf(b -> b == 'b', 3);
|
||||
assertEquals(1, result);
|
||||
|
||||
result = buffer.lastIndexOf(b -> b == 'z');
|
||||
result = buffer.lastIndexOf(b -> b == 'b', Integer.MAX_VALUE);
|
||||
assertEquals(1, result);
|
||||
|
||||
result = buffer.lastIndexOf(b -> b == 'b', Integer.MIN_VALUE);
|
||||
assertEquals(-1, result);
|
||||
|
||||
result = buffer.lastIndexOf(b -> b == 'z', 0);
|
||||
assertEquals(-1, result);
|
||||
|
||||
release(buffer);
|
||||
|
|
|
@ -19,10 +19,8 @@ package org.springframework.core.io.buffer.support;
|
|||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.Test;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
@ -31,7 +29,6 @@ import reactor.core.test.TestSubscriber;
|
|||
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
/**
|
||||
|
@ -112,23 +109,4 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
|
|||
release(baz);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void tokenize() {
|
||||
DataBuffer dataBuffer = stringBuffer("-foo--bar-");
|
||||
|
||||
List<DataBuffer> results = DataBufferUtils.tokenize(dataBuffer, b -> b == '-');
|
||||
assertEquals(2, results.size());
|
||||
|
||||
DataBuffer result = results.get(0);
|
||||
String value = DataBufferTestUtils.dumpString(result, StandardCharsets.UTF_8);
|
||||
assertEquals("foo", value);
|
||||
|
||||
result = results.get(1);
|
||||
value = DataBufferTestUtils.dumpString(result, StandardCharsets.UTF_8);
|
||||
assertEquals("bar", value);
|
||||
|
||||
results.stream().forEach(b -> release(b));
|
||||
}
|
||||
|
||||
|
||||
}
|
Loading…
Reference in New Issue