diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringDecoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringDecoder.java index a9673665c4..6ef47d1184 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringDecoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringDecoder.java @@ -19,7 +19,9 @@ package org.springframework.core.codec.support; import java.nio.CharBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.List; +import java.util.function.IntPredicate; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -47,6 +49,8 @@ public class StringDecoder extends AbstractDecoder { public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8; + private static final IntPredicate NEWLINE_DELIMITER = b -> b == '\n' || b == '\r'; + private final boolean splitOnNewline; /** @@ -80,11 +84,7 @@ public class StringDecoder extends AbstractDecoder { MimeType mimeType, Object... hints) { Flux inputFlux = Flux.from(inputStream); if (this.splitOnNewline) { - inputFlux = inputFlux.flatMap(dataBuffer -> { - List tokens = - DataBufferUtils.tokenize(dataBuffer, b -> b == '\n' || b == '\r'); - return Flux.fromIterable(tokens); - }); + inputFlux = inputFlux.flatMap(StringDecoder::splitOnNewline); } Charset charset = getCharset(mimeType); return inputFlux.map(dataBuffer -> { @@ -94,6 +94,24 @@ public class StringDecoder extends AbstractDecoder { }); } + private static Flux splitOnNewline(DataBuffer dataBuffer) { + List results = new ArrayList(); + int startIdx = 0; + int endIdx = 0; + final int limit = dataBuffer.readableByteCount(); + do { + endIdx = dataBuffer.indexOf(NEWLINE_DELIMITER, startIdx); + int length = endIdx != -1 ? endIdx - startIdx + 1 : limit - startIdx; + DataBuffer token = dataBuffer.slice(startIdx, length); + results.add(DataBufferUtils.retain(token)); + startIdx = endIdx + 1; + } + while (startIdx < limit && endIdx != -1); + DataBufferUtils.release(dataBuffer); + return Flux.fromIterable(results); + } + + private Charset getCharset(MimeType mimeType) { if (mimeType != null && mimeType.getCharset() != null) { return mimeType.getCharset(); diff --git a/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/DataBuffer.java b/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/DataBuffer.java index 8fd84476ff..822221bdb1 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/DataBuffer.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/DataBuffer.java @@ -38,19 +38,21 @@ public interface DataBuffer { * Returns the index of the first byte in this buffer that matches the given * predicate. * @param predicate the predicate to match + * @param fromIndex the index to start the search from * @return the index of the first byte that matches {@code predicate}; or {@code -1} * if none match */ - int indexOf(IntPredicate predicate); + int indexOf(IntPredicate predicate, int fromIndex); /** * Returns the index of the last byte in this buffer that matches the given * predicate. * @param predicate the predicate to match + * @param fromIndex the index to start the search from * @return the index of the last byte that matches {@code predicate}; or {@code -1} * if none match */ - int lastIndexOf(IntPredicate predicate); + int lastIndexOf(IntPredicate predicate, int fromIndex); /** * Returns the number of bytes that can be read from this data buffer. diff --git a/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java b/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java index b46a03ce90..5b13e7e1a2 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java @@ -84,8 +84,16 @@ public class DefaultDataBuffer implements DataBuffer { } @Override - public int indexOf(IntPredicate predicate) { - for (int i = 0; i < readableByteCount(); i++) { + public int indexOf(IntPredicate predicate, int fromIndex) { + Assert.notNull(predicate, "'predicate' must not be null"); + + if (fromIndex < 0) { + fromIndex = 0; + } + else if (fromIndex >= this.writePosition) { + return -1; + } + for (int i = fromIndex; i < this.writePosition; i++) { byte b = this.byteBuffer.get(i); if (predicate.test(b)) { return i; @@ -95,8 +103,10 @@ public class DefaultDataBuffer implements DataBuffer { } @Override - public int lastIndexOf(IntPredicate predicate) { - for (int i = readableByteCount() - 1; i >= 0; i--) { + public int lastIndexOf(IntPredicate predicate, int fromIndex) { + Assert.notNull(predicate, "'predicate' must not be null"); + int i = Math.min(fromIndex, this.writePosition - 1); + for (; i >= 0; i--) { byte b = this.byteBuffer.get(i); if (predicate.test(b)) { return i; diff --git a/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java b/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java index 4686490fd1..628f7bec66 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java @@ -69,15 +69,28 @@ public class NettyDataBuffer implements PooledDataBuffer { } @Override - public int indexOf(IntPredicate predicate) { - IntPredicate negated = predicate.negate(); - return this.byteBuf.forEachByte(negated::test); + public int indexOf(IntPredicate predicate, int fromIndex) { + Assert.notNull(predicate, "'predicate' must not be null"); + if (fromIndex < 0) { + fromIndex = 0; + } + else if (fromIndex >= this.byteBuf.writerIndex()) { + return -1; + } + int length = this.byteBuf.writerIndex() - fromIndex; + + return this.byteBuf.forEachByte(fromIndex, length, predicate.negate()::test); } @Override - public int lastIndexOf(IntPredicate predicate) { - IntPredicate negated = predicate.negate(); - return this.byteBuf.forEachByteDesc(negated::test); + public int lastIndexOf(IntPredicate predicate, int fromIndex) { + Assert.notNull(predicate, "'predicate' must not be null"); + if (fromIndex < 0) { + return -1; + } + fromIndex = Math.min(fromIndex, this.byteBuf.writerIndex() - 1); + + return this.byteBuf.forEachByteDesc(0, fromIndex, predicate.negate()::test); } @Override diff --git a/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java b/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java index ae990bb96a..249eb1146a 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java @@ -21,13 +21,9 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.Consumer; -import java.util.function.IntPredicate; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -124,45 +120,6 @@ public abstract class DataBufferUtils { }); } - /** - * Tokenize the {@link DataBuffer} using the given delimiter - * function. Does not include the delimiter in the result. - * @param dataBuffer the data buffer to tokenize - * @param delimiter the delimiter function - * @return the tokens - */ - public static List tokenize(DataBuffer dataBuffer, - IntPredicate delimiter) { - Assert.notNull(dataBuffer, "'dataBuffer' must not be null"); - Assert.notNull(delimiter, "'delimiter' must not be null"); - - List results = new ArrayList(); - int idx; - do { - idx = dataBuffer.indexOf(delimiter); - if (idx < 0) { - results.add(dataBuffer); - } - else { - if (idx > 0) { - DataBuffer slice = dataBuffer.slice(0, idx); - slice = retain(slice); - results.add(slice); - } - int remainingLen = dataBuffer.readableByteCount() - (idx + 1); - if (remainingLen > 0) { - dataBuffer = dataBuffer.slice(idx + 1, remainingLen); - } - else { - release(dataBuffer); - idx = -1; - } - } - } - while (idx != -1); - return Collections.unmodifiableList(results); - } - /** * Retains the given data buffer, it it is a {@link PooledDataBuffer}. * @param dataBuffer the data buffer to retain diff --git a/spring-web-reactive/src/test/java/org/springframework/core/codec/support/StringDecoderTests.java b/spring-web-reactive/src/test/java/org/springframework/core/codec/support/StringDecoderTests.java index 2178eab29c..f89be25a22 100644 --- a/spring-web-reactive/src/test/java/org/springframework/core/codec/support/StringDecoderTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/core/codec/support/StringDecoderTests.java @@ -82,8 +82,7 @@ public class StringDecoderTests extends AbstractDataBufferAllocatingTestCase { TestSubscriber .subscribe(output) .assertNoError() - .assertComplete() - .assertValues("foo", "bar", "baz"); + .assertComplete().assertValues("\n", "foo\r", "\n", "bar\r", "\n", "baz"); } @Test diff --git a/spring-web-reactive/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java b/spring-web-reactive/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java index 4014fd629a..df4a02d789 100644 --- a/spring-web-reactive/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java @@ -190,10 +190,16 @@ public class DataBufferTests extends AbstractDataBufferAllocatingTestCase { DataBuffer buffer = createDataBuffer(3); buffer.write(new byte[]{'a', 'b', 'c'}); - int result = buffer.indexOf(b -> b == 'c'); + int result = buffer.indexOf(b -> b == 'c', 0); assertEquals(2, result); - result = buffer.indexOf(b -> b == 'z'); + result = buffer.indexOf(b -> b == 'c', Integer.MIN_VALUE); + assertEquals(2, result); + + result = buffer.indexOf(b -> b == 'c', Integer.MAX_VALUE); + assertEquals(-1, result); + + result = buffer.indexOf(b -> b == 'z', 0); assertEquals(-1, result); release(buffer); @@ -204,10 +210,16 @@ public class DataBufferTests extends AbstractDataBufferAllocatingTestCase { DataBuffer buffer = createDataBuffer(3); buffer.write(new byte[]{'a', 'b', 'c'}); - int result = buffer.lastIndexOf(b -> b == 'b'); + int result = buffer.lastIndexOf(b -> b == 'b', 3); assertEquals(1, result); - result = buffer.lastIndexOf(b -> b == 'z'); + result = buffer.lastIndexOf(b -> b == 'b', Integer.MAX_VALUE); + assertEquals(1, result); + + result = buffer.lastIndexOf(b -> b == 'b', Integer.MIN_VALUE); + assertEquals(-1, result); + + result = buffer.lastIndexOf(b -> b == 'z', 0); assertEquals(-1, result); release(buffer); diff --git a/spring-web-reactive/src/test/java/org/springframework/core/io/buffer/support/DataBufferUtilsTests.java b/spring-web-reactive/src/test/java/org/springframework/core/io/buffer/support/DataBufferUtilsTests.java index 1a2cba08ec..63029c5882 100644 --- a/spring-web-reactive/src/test/java/org/springframework/core/io/buffer/support/DataBufferUtilsTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/core/io/buffer/support/DataBufferUtilsTests.java @@ -19,10 +19,8 @@ package org.springframework.core.io.buffer.support; import java.io.InputStream; import java.net.URI; import java.nio.channels.FileChannel; -import java.nio.charset.StandardCharsets; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; -import java.util.List; import org.junit.Test; import reactor.core.publisher.Flux; @@ -31,7 +29,6 @@ import reactor.core.test.TestSubscriber; import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase; import org.springframework.core.io.buffer.DataBuffer; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; /** @@ -112,23 +109,4 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { release(baz); } - @Test - public void tokenize() { - DataBuffer dataBuffer = stringBuffer("-foo--bar-"); - - List results = DataBufferUtils.tokenize(dataBuffer, b -> b == '-'); - assertEquals(2, results.size()); - - DataBuffer result = results.get(0); - String value = DataBufferTestUtils.dumpString(result, StandardCharsets.UTF_8); - assertEquals("foo", value); - - result = results.get(1); - value = DataBufferTestUtils.dumpString(result, StandardCharsets.UTF_8); - assertEquals("bar", value); - - results.stream().forEach(b -> release(b)); - } - - } \ No newline at end of file