StringDecoder shouldn't chop off strings randomly
Issue: SPR-16337
This commit is contained in:
parent
cfe7ff1c81
commit
609f173ebc
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -20,26 +20,31 @@ import java.nio.CharBuffer;
|
|||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.IntPredicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.MimeType;
|
||||
import org.springframework.util.MimeTypeUtils;
|
||||
|
||||
/**
|
||||
* Decode from a bytes stream to a {@code String} stream.
|
||||
* Decode from a data buffer stream to a {@code String} stream. Before decoding, this decoder
|
||||
* realigns the incoming data buffers so that each buffer ends with a newline.
|
||||
* This is to make sure that multibyte characters are decoded properly, and do not cross buffer
|
||||
* boundaries. The default delimiters ({@code \n}, {@code \r\n})can be customized.
|
||||
*
|
||||
* <p>By default, this decoder will split the received {@link DataBuffer}s
|
||||
* along newline characters ({@code \r\n}), but this can be changed by
|
||||
* passing {@code false} as a constructor argument.
|
||||
* <p>Partially inspired by Netty's {@code DelimiterBasedFrameDecoder}.
|
||||
*
|
||||
* @author Sebastien Deleuze
|
||||
* @author Brian Clozel
|
||||
|
|
@ -50,22 +55,28 @@ import org.springframework.util.MimeTypeUtils;
|
|||
*/
|
||||
public class StringDecoder extends AbstractDataBufferDecoder<String> {
|
||||
|
||||
public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
|
||||
|
||||
private static final IntPredicate NEWLINE_DELIMITER = b -> b == '\n' || b == '\r';
|
||||
|
||||
|
||||
private final boolean splitOnNewline;
|
||||
|
||||
private static final DataBuffer END_FRAME = new DefaultDataBufferFactory().wrap(new byte[0]);
|
||||
|
||||
/**
|
||||
* Create a {@code StringDecoder} that decodes a bytes stream to a String stream
|
||||
* @param splitOnNewline whether this decoder should split the received data buffers
|
||||
* along newline characters
|
||||
* The default charset to use, i.e. "UTF-8".
|
||||
*/
|
||||
private StringDecoder(boolean splitOnNewline, MimeType... mimeTypes) {
|
||||
public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
|
||||
|
||||
/**
|
||||
* The default delimiter strings to use, i.e. {@code \n} and {@code \r\n}.
|
||||
*/
|
||||
public static final List<String> DEFAULT_DELIMITERS = Arrays.asList("\r\n", "\n");
|
||||
|
||||
|
||||
private final List<String> delimiters;
|
||||
|
||||
private final boolean stripDelimiter;
|
||||
|
||||
private StringDecoder(List<String> delimiters, boolean stripDelimiter, MimeType... mimeTypes) {
|
||||
super(mimeTypes);
|
||||
this.splitOnNewline = splitOnNewline;
|
||||
Assert.notEmpty(delimiters, "'delimiters' must not be empty");
|
||||
this.delimiters = new ArrayList<>(delimiters);
|
||||
this.stripDelimiter = stripDelimiter;
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -79,28 +90,112 @@ public class StringDecoder extends AbstractDataBufferDecoder<String> {
|
|||
public Flux<String> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
|
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
Flux<DataBuffer> inputFlux = Flux.from(inputStream);
|
||||
if (this.splitOnNewline) {
|
||||
inputFlux = Flux.from(inputStream).flatMap(StringDecoder::splitOnNewline);
|
||||
}
|
||||
List<byte[]> delimiterBytes = getDelimiterBytes(mimeType);
|
||||
|
||||
Flux<DataBuffer> inputFlux = Flux.from(inputStream)
|
||||
.flatMap(dataBuffer -> splitOnDelimiter(dataBuffer, delimiterBytes))
|
||||
.bufferUntil(StringDecoder::isEndFrame)
|
||||
.flatMap(StringDecoder::joinUntilEndFrame);
|
||||
return super.decode(inputFlux, elementType, mimeType, hints);
|
||||
}
|
||||
|
||||
private static Flux<DataBuffer> splitOnNewline(DataBuffer dataBuffer) {
|
||||
List<DataBuffer> results = new ArrayList<>();
|
||||
int startIdx = 0;
|
||||
int endIdx;
|
||||
final int limit = dataBuffer.readableByteCount();
|
||||
private List<byte[]> getDelimiterBytes(@Nullable MimeType mimeType) {
|
||||
Charset charset = getCharset(mimeType);
|
||||
return this.delimiters.stream()
|
||||
.map(s -> s.getBytes(charset))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Splits the given data buffer on delimiter boundaries. The returned Flux contains a
|
||||
* {@link #END_FRAME} buffer after each delimiter.
|
||||
*/
|
||||
private Flux<DataBuffer> splitOnDelimiter(DataBuffer dataBuffer, List<byte[]> delimiterBytes) {
|
||||
List<DataBuffer> frames = new ArrayList<>();
|
||||
do {
|
||||
endIdx = dataBuffer.indexOf(NEWLINE_DELIMITER, startIdx);
|
||||
int length = endIdx != -1 ? endIdx - startIdx + 1 : limit - startIdx;
|
||||
DataBuffer token = dataBuffer.slice(startIdx, length);
|
||||
results.add(DataBufferUtils.retain(token));
|
||||
startIdx = endIdx + 1;
|
||||
int length = Integer.MAX_VALUE;
|
||||
byte[] matchingDelimiter = null;
|
||||
for (byte[] delimiter : delimiterBytes) {
|
||||
int idx = indexOf(dataBuffer, delimiter);
|
||||
if (idx >= 0 && idx < length) {
|
||||
length = idx;
|
||||
matchingDelimiter = delimiter;
|
||||
}
|
||||
}
|
||||
DataBuffer frame;
|
||||
int readPosition = dataBuffer.readPosition();
|
||||
if (matchingDelimiter != null) {
|
||||
if (this.stripDelimiter) {
|
||||
frame = dataBuffer.slice(readPosition, length);
|
||||
}
|
||||
else {
|
||||
frame = dataBuffer.slice(readPosition, length + matchingDelimiter.length);
|
||||
}
|
||||
dataBuffer.readPosition(readPosition + length + matchingDelimiter.length);
|
||||
|
||||
frames.add(DataBufferUtils.retain(frame));
|
||||
frames.add(END_FRAME);
|
||||
}
|
||||
else {
|
||||
frame = dataBuffer.slice(readPosition, dataBuffer.readableByteCount());
|
||||
dataBuffer.readPosition(readPosition + dataBuffer.readableByteCount());
|
||||
frames.add(DataBufferUtils.retain(frame));
|
||||
}
|
||||
}
|
||||
while (startIdx < limit && endIdx != -1);
|
||||
while (dataBuffer.readableByteCount() > 0);
|
||||
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
return Flux.fromIterable(results);
|
||||
return Flux.fromIterable(frames);
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds the given delimiter in the given data buffer. Return the index of the delimiter, or
|
||||
* -1 if not found.
|
||||
*/
|
||||
private static int indexOf(DataBuffer dataBuffer, byte[] delimiter) {
|
||||
for (int i = dataBuffer.readPosition(); i < dataBuffer.writePosition(); i++) {
|
||||
int dataBufferPos = i;
|
||||
int delimiterPos = 0;
|
||||
while (delimiterPos < delimiter.length) {
|
||||
if (dataBuffer.getByte(dataBufferPos) != delimiter[delimiterPos]) {
|
||||
break;
|
||||
}
|
||||
else {
|
||||
dataBufferPos++;
|
||||
if (dataBufferPos == dataBuffer.writePosition() &&
|
||||
delimiterPos != delimiter.length - 1) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
delimiterPos++;
|
||||
}
|
||||
|
||||
if (delimiterPos == delimiter.length) {
|
||||
return i - dataBuffer.readPosition();
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether the given buffer is {@link #END_FRAME}.
|
||||
*/
|
||||
private static boolean isEndFrame(DataBuffer dataBuffer) {
|
||||
return dataBuffer == END_FRAME;
|
||||
}
|
||||
|
||||
/**
|
||||
* Joins the given list of buffers into a single buffer.
|
||||
*/
|
||||
private static Mono<DataBuffer> joinUntilEndFrame(List<DataBuffer> dataBuffers) {
|
||||
if (dataBuffers.size() > 0) {
|
||||
int lastIdx = dataBuffers.size() - 1;
|
||||
if (isEndFrame(dataBuffers.get(lastIdx))) {
|
||||
dataBuffers.remove(lastIdx);
|
||||
}
|
||||
}
|
||||
Flux<DataBuffer> flux = Flux.fromIterable(dataBuffers);
|
||||
return DataBufferUtils.join(flux);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -113,7 +208,7 @@ public class StringDecoder extends AbstractDataBufferDecoder<String> {
|
|||
return charBuffer.toString();
|
||||
}
|
||||
|
||||
private Charset getCharset(@Nullable MimeType mimeType) {
|
||||
private static Charset getCharset(@Nullable MimeType mimeType) {
|
||||
if (mimeType != null && mimeType.getCharset() != null) {
|
||||
return mimeType.getCharset();
|
||||
}
|
||||
|
|
@ -125,19 +220,55 @@ public class StringDecoder extends AbstractDataBufferDecoder<String> {
|
|||
|
||||
/**
|
||||
* Create a {@code StringDecoder} for {@code "text/plain"}.
|
||||
* @param splitOnNewline whether to split the byte stream into lines
|
||||
* @param ignored ignored
|
||||
* @deprecated as of Spring 5.0.4, in favor of {@link #textPlainOnly()} or
|
||||
* {@link #textPlainOnly(List, boolean)}.
|
||||
*/
|
||||
public static StringDecoder textPlainOnly(boolean splitOnNewline) {
|
||||
return new StringDecoder(splitOnNewline, new MimeType("text", "plain", DEFAULT_CHARSET));
|
||||
@Deprecated
|
||||
public static StringDecoder textPlainOnly(boolean ignored) {
|
||||
return textPlainOnly();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@code StringDecoder} for {@code "text/plain"}.
|
||||
*/
|
||||
public static StringDecoder textPlainOnly() {
|
||||
return textPlainOnly(DEFAULT_DELIMITERS, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@code StringDecoder} for {@code "text/plain"}.
|
||||
*/
|
||||
public static StringDecoder textPlainOnly(List<String> delimiters, boolean stripDelimiter) {
|
||||
return new StringDecoder(delimiters, stripDelimiter,
|
||||
new MimeType("text", "plain", DEFAULT_CHARSET));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@code StringDecoder} that supports all MIME types.
|
||||
* @param splitOnNewline whether to split the byte stream into lines
|
||||
* @param ignored ignored
|
||||
* @deprecated as of Spring 5.0.4, in favor of {@link #allMimeTypes()} or
|
||||
* {@link #allMimeTypes(List, boolean)}.
|
||||
*/
|
||||
public static StringDecoder allMimeTypes(boolean splitOnNewline) {
|
||||
return new StringDecoder(splitOnNewline,
|
||||
@Deprecated
|
||||
public static StringDecoder allMimeTypes(boolean ignored) {
|
||||
return allMimeTypes();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@code StringDecoder} that supports all MIME types.
|
||||
*/
|
||||
public static StringDecoder allMimeTypes() {
|
||||
return allMimeTypes(DEFAULT_DELIMITERS, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@code StringDecoder} that supports all MIME types.
|
||||
*/
|
||||
public static StringDecoder allMimeTypes(List<String> delimiters, boolean stripDelimiter) {
|
||||
return new StringDecoder(delimiters, stripDelimiter,
|
||||
new MimeType("text", "plain", DEFAULT_CHARSET), MimeTypeUtils.ALL);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -140,7 +140,16 @@ public interface DataBuffer {
|
|||
DataBuffer writePosition(int writePosition);
|
||||
|
||||
/**
|
||||
* Read a single byte from the current reading position of this data buffer.
|
||||
* Read a single byte at the given index from this data buffer.
|
||||
* @param index the index at which the byte will be read
|
||||
* @return the byte at the given index
|
||||
* @throws IndexOutOfBoundsException when {@code index} is out of bounds
|
||||
* @since 5.0.4
|
||||
*/
|
||||
byte getByte(int index);
|
||||
|
||||
/**
|
||||
* Read a single byte from the current reading position from this data buffer.
|
||||
* @return the byte at this buffer's current reading position
|
||||
*/
|
||||
byte read();
|
||||
|
|
|
|||
|
|
@ -515,7 +515,7 @@ public abstract class DataBufferUtils {
|
|||
* @return a buffer that is composed from the {@code dataBuffers} argument
|
||||
* @since 5.0.3
|
||||
*/
|
||||
public static Mono<DataBuffer> join(Publisher<? extends DataBuffer> dataBuffers) {
|
||||
public static Mono<DataBuffer> join(Publisher<DataBuffer> dataBuffers) {
|
||||
Assert.notNull(dataBuffers, "'dataBuffers' must not be null");
|
||||
|
||||
return Flux.from(dataBuffers)
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -148,7 +148,7 @@ public class DefaultDataBuffer implements DataBuffer {
|
|||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer readPosition(int readPosition) {
|
||||
public DefaultDataBuffer readPosition(int readPosition) {
|
||||
assertIndex(readPosition >= 0, "'readPosition' %d must be >= 0", readPosition);
|
||||
assertIndex(readPosition <= this.writePosition, "'readPosition' %d must be <= %d",
|
||||
readPosition, this.writePosition);
|
||||
|
|
@ -163,7 +163,7 @@ public class DefaultDataBuffer implements DataBuffer {
|
|||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer writePosition(int writePosition) {
|
||||
public DefaultDataBuffer writePosition(int writePosition) {
|
||||
assertIndex(writePosition >= this.readPosition, "'writePosition' %d must be >= %d",
|
||||
writePosition, this.readPosition);
|
||||
assertIndex(writePosition <= this.capacity, "'writePosition' %d must be <= %d",
|
||||
|
|
@ -179,7 +179,7 @@ public class DefaultDataBuffer implements DataBuffer {
|
|||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer capacity(int newCapacity) {
|
||||
public DefaultDataBuffer capacity(int newCapacity) {
|
||||
Assert.isTrue(newCapacity > 0,
|
||||
String.format("'newCapacity' %d must be higher than 0", newCapacity));
|
||||
|
||||
|
|
@ -222,6 +222,15 @@ public class DefaultDataBuffer implements DataBuffer {
|
|||
return direct ? ByteBuffer.allocateDirect(capacity) : ByteBuffer.allocate(capacity);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getByte(int index) {
|
||||
assertIndex(index >= 0, "index %d must be >= 0", index);
|
||||
assertIndex(index <= this.writePosition - 1, "index %d must be <= %d",
|
||||
index, this.writePosition - 1);
|
||||
|
||||
return this.byteBuffer.get(index);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte read() {
|
||||
assertIndex(this.readPosition <= this.writePosition - 1, "readPosition %d must be <= %d",
|
||||
|
|
@ -286,7 +295,7 @@ public class DefaultDataBuffer implements DataBuffer {
|
|||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer write(DataBuffer... buffers) {
|
||||
public DefaultDataBuffer write(DataBuffer... buffers) {
|
||||
if (!ObjectUtils.isEmpty(buffers)) {
|
||||
ByteBuffer[] byteBuffers =
|
||||
Arrays.stream(buffers).map(DataBuffer::asByteBuffer)
|
||||
|
|
@ -315,7 +324,7 @@ public class DefaultDataBuffer implements DataBuffer {
|
|||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer slice(int index, int length) {
|
||||
public DefaultDataBuffer slice(int index, int length) {
|
||||
checkIndex(index, length);
|
||||
int oldPosition = this.byteBuffer.position();
|
||||
// Explicit access via Buffer base type for compatibility
|
||||
|
|
@ -488,7 +497,7 @@ public class DefaultDataBuffer implements DataBuffer {
|
|||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer capacity(int newCapacity) {
|
||||
public DefaultDataBuffer capacity(int newCapacity) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Changing the capacity of a sliced buffer is not supported");
|
||||
}
|
||||
|
|
|
|||
|
|
@ -107,7 +107,7 @@ public class NettyDataBuffer implements PooledDataBuffer {
|
|||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer readPosition(int readPosition) {
|
||||
public NettyDataBuffer readPosition(int readPosition) {
|
||||
this.byteBuf.readerIndex(readPosition);
|
||||
return this;
|
||||
}
|
||||
|
|
@ -118,18 +118,23 @@ public class NettyDataBuffer implements PooledDataBuffer {
|
|||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer writePosition(int writePosition) {
|
||||
public NettyDataBuffer writePosition(int writePosition) {
|
||||
this.byteBuf.writerIndex(writePosition);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getByte(int index) {
|
||||
return this.byteBuf.getByte(index);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int capacity() {
|
||||
return this.byteBuf.capacity();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer capacity(int capacity) {
|
||||
public NettyDataBuffer capacity(int capacity) {
|
||||
this.byteBuf.capacity(capacity);
|
||||
return this;
|
||||
}
|
||||
|
|
@ -225,7 +230,7 @@ public class NettyDataBuffer implements PooledDataBuffer {
|
|||
}
|
||||
|
||||
@Override
|
||||
public DataBuffer slice(int index, int length) {
|
||||
public NettyDataBuffer slice(int index, int length) {
|
||||
ByteBuf slice = this.byteBuf.slice(index, length);
|
||||
return new NettyDataBuffer(slice, this.dataBufferFactory);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -16,7 +16,10 @@
|
|||
|
||||
package org.springframework.core.codec;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.Test;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
|
@ -28,8 +31,7 @@ import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;
|
|||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.util.MimeTypeUtils;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* @author Sebastien Deleuze
|
||||
|
|
@ -38,7 +40,7 @@ import static org.junit.Assert.assertTrue;
|
|||
*/
|
||||
public class StringDecoderTests extends AbstractDataBufferAllocatingTestCase {
|
||||
|
||||
private StringDecoder decoder = StringDecoder.allMimeTypes(true);
|
||||
private StringDecoder decoder = StringDecoder.allMimeTypes();
|
||||
|
||||
|
||||
@Test
|
||||
|
|
@ -56,36 +58,88 @@ public class StringDecoderTests extends AbstractDataBufferAllocatingTestCase {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void decode() throws InterruptedException {
|
||||
this.decoder = StringDecoder.allMimeTypes(false);
|
||||
Flux<DataBuffer> source = Flux.just(stringBuffer("foo"), stringBuffer("bar"), stringBuffer("baz"));
|
||||
public void decodeMultibyteCharacter() {
|
||||
String s = "üéø";
|
||||
Flux<DataBuffer> source = toSingleByteDataBuffers(s);
|
||||
|
||||
Flux<String> output = this.decoder.decode(source, ResolvableType.forClass(String.class),
|
||||
null, Collections.emptyMap());
|
||||
StepVerifier.create(output)
|
||||
.expectNext(s)
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
private Flux<DataBuffer> toSingleByteDataBuffers(String s) {
|
||||
byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
List<DataBuffer> dataBuffers = new ArrayList<>();
|
||||
for (byte b : bytes) {
|
||||
dataBuffers.add(this.bufferFactory.wrap(new byte[]{b}));
|
||||
}
|
||||
return Flux.fromIterable(dataBuffers);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void decodeNewLine() {
|
||||
Flux<DataBuffer> source = Flux.just(
|
||||
stringBuffer("\r\nabc\n"),
|
||||
stringBuffer("def"),
|
||||
stringBuffer("ghi\r\n\n"),
|
||||
stringBuffer("jkl"),
|
||||
stringBuffer("mno\npqr\n"),
|
||||
stringBuffer("stu"),
|
||||
stringBuffer("vw"),
|
||||
stringBuffer("xyz")
|
||||
);
|
||||
|
||||
Flux<String> output = this.decoder.decode(source, ResolvableType.forClass(String.class),
|
||||
null, Collections.emptyMap());
|
||||
|
||||
StepVerifier.create(output)
|
||||
.expectNext("foo", "bar", "baz")
|
||||
.expectNext("")
|
||||
.expectNext("abc")
|
||||
.expectNext("defghi")
|
||||
.expectNext("")
|
||||
.expectNext("jklmno")
|
||||
.expectNext("pqr")
|
||||
.expectNext("stuvwxyz")
|
||||
.expectComplete()
|
||||
.verify();
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void decodeNewLine() throws InterruptedException {
|
||||
DataBuffer fooBar = stringBuffer("\nfoo\r\nbar\r");
|
||||
DataBuffer baz = stringBuffer("\nbaz");
|
||||
Flux<DataBuffer> source = Flux.just(fooBar, baz);
|
||||
Flux<String> output = decoder.decode(source, ResolvableType.forClass(String.class),
|
||||
public void decodeNewLineIncludeDelimiters() {
|
||||
|
||||
decoder = StringDecoder.allMimeTypes(StringDecoder.DEFAULT_DELIMITERS, false);
|
||||
|
||||
Flux<DataBuffer> source = Flux.just(
|
||||
stringBuffer("\r\nabc\n"),
|
||||
stringBuffer("def"),
|
||||
stringBuffer("ghi\r\n\n"),
|
||||
stringBuffer("jkl"),
|
||||
stringBuffer("mno\npqr\n"),
|
||||
stringBuffer("stu"),
|
||||
stringBuffer("vw"),
|
||||
stringBuffer("xyz")
|
||||
);
|
||||
|
||||
Flux<String> output = this.decoder.decode(source, ResolvableType.forClass(String.class),
|
||||
null, Collections.emptyMap());
|
||||
|
||||
StepVerifier.create(output)
|
||||
.expectNext("\n", "foo\r", "\n", "bar\r", "\n", "baz")
|
||||
.expectNext("\r\n")
|
||||
.expectNext("abc\n")
|
||||
.expectNext("defghi\r\n")
|
||||
.expectNext("\n")
|
||||
.expectNext("jklmno\n")
|
||||
.expectNext("pqr\n")
|
||||
.expectNext("stuvwxyz")
|
||||
.expectComplete()
|
||||
.verify();
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void decodeEmptyFlux() throws InterruptedException {
|
||||
public void decodeEmptyFlux() {
|
||||
Flux<DataBuffer> source = Flux.empty();
|
||||
Flux<String> output = this.decoder.decode(source, ResolvableType.forClass(String.class),
|
||||
null, Collections.emptyMap());
|
||||
|
|
@ -98,7 +152,7 @@ public class StringDecoderTests extends AbstractDataBufferAllocatingTestCase {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void decodeEmptyString() throws InterruptedException {
|
||||
public void decodeEmptyDataBuffer() {
|
||||
Flux<DataBuffer> source = Flux.just(stringBuffer(""));
|
||||
Flux<String> output = this.decoder.decode(source,
|
||||
ResolvableType.forClass(String.class), null, Collections.emptyMap());
|
||||
|
|
@ -110,8 +164,7 @@ public class StringDecoderTests extends AbstractDataBufferAllocatingTestCase {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void decodeToMono() throws InterruptedException {
|
||||
this.decoder = StringDecoder.allMimeTypes(false);
|
||||
public void decodeToMono() {
|
||||
Flux<DataBuffer> source = Flux.just(stringBuffer("foo"), stringBuffer("bar"), stringBuffer("baz"));
|
||||
Mono<String> output = this.decoder.decodeToMono(source,
|
||||
ResolvableType.forClass(String.class), null, Collections.emptyMap());
|
||||
|
|
|
|||
|
|
@ -492,5 +492,28 @@ public class DataBufferTests extends AbstractDataBufferAllocatingTestCase {
|
|||
release(composite);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getByte() {
|
||||
DataBuffer buffer = stringBuffer("abc");
|
||||
|
||||
assertEquals('a', buffer.getByte(0));
|
||||
assertEquals('b', buffer.getByte(1));
|
||||
assertEquals('c', buffer.getByte(2));
|
||||
try {
|
||||
buffer.getByte(-1);
|
||||
fail("IndexOutOfBoundsException expected");
|
||||
}
|
||||
catch (IndexOutOfBoundsException ignored) {
|
||||
}
|
||||
|
||||
try {
|
||||
buffer.getByte(3);
|
||||
fail("IndexOutOfBoundsException expected");
|
||||
}
|
||||
catch (IndexOutOfBoundsException ignored) {
|
||||
}
|
||||
|
||||
release(buffer);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -335,5 +335,4 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
|
|||
release(result);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -40,7 +40,7 @@ import org.springframework.http.MediaType;
|
|||
import org.springframework.http.ReactiveHttpInputMessage;
|
||||
import org.springframework.lang.Nullable;
|
||||
|
||||
import static java.util.stream.Collectors.*;
|
||||
import static java.util.stream.Collectors.joining;
|
||||
|
||||
/**
|
||||
* Reader that supports a stream of {@link ServerSentEvent}s and also plain
|
||||
|
|
@ -56,7 +56,7 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader<Objec
|
|||
|
||||
private static final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
|
||||
|
||||
private static final StringDecoder stringDecoder = StringDecoder.textPlainOnly(false);
|
||||
private static final StringDecoder stringDecoder = StringDecoder.textPlainOnly();
|
||||
|
||||
|
||||
@Nullable
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -188,12 +188,10 @@ abstract class AbstractCodecConfigurer implements CodecConfigurer {
|
|||
result.add(new DecoderHttpMessageReader<>(new ByteBufferDecoder()));
|
||||
result.add(new DecoderHttpMessageReader<>(new DataBufferDecoder()));
|
||||
result.add(new DecoderHttpMessageReader<>(new ResourceDecoder()));
|
||||
result.add(new DecoderHttpMessageReader<>(StringDecoder.textPlainOnly(splitTextOnNewLine())));
|
||||
result.add(new DecoderHttpMessageReader<>(StringDecoder.textPlainOnly()));
|
||||
return result;
|
||||
}
|
||||
|
||||
abstract boolean splitTextOnNewLine();
|
||||
|
||||
List<HttpMessageReader<?>> getObjectReaders() {
|
||||
if (!this.registerDefaults) {
|
||||
return Collections.emptyList();
|
||||
|
|
@ -216,7 +214,7 @@ abstract class AbstractCodecConfigurer implements CodecConfigurer {
|
|||
return Collections.emptyList();
|
||||
}
|
||||
List<HttpMessageReader<?>> result = new ArrayList<>();
|
||||
result.add(new DecoderHttpMessageReader<>(StringDecoder.allMimeTypes(splitTextOnNewLine())));
|
||||
result.add(new DecoderHttpMessageReader<>(StringDecoder.allMimeTypes()));
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -70,11 +70,6 @@ public class DefaultClientCodecConfigurer extends AbstractCodecConfigurer implem
|
|||
this.sseDecoder = decoder;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean splitTextOnNewLine() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
List<HttpMessageReader<?>> getObjectReaders() {
|
||||
if (!shouldRegisterDefaults()) {
|
||||
|
|
@ -110,8 +105,7 @@ public class DefaultClientCodecConfigurer extends AbstractCodecConfigurer implem
|
|||
}
|
||||
else {
|
||||
DefaultCustomCodecs customCodecs = getCustomCodecs();
|
||||
partWriters = new ArrayList<>();
|
||||
partWriters.addAll(super.getTypedWriters());
|
||||
partWriters = new ArrayList<>(super.getTypedWriters());
|
||||
if (customCodecs != null) {
|
||||
partWriters.addAll(customCodecs.getTypedWriters());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -66,11 +66,6 @@ public class DefaultServerCodecConfigurer extends AbstractCodecConfigurer implem
|
|||
this.sseEncoder = encoder;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean splitTextOnNewLine() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
List<HttpMessageReader<?>> getTypedReaders() {
|
||||
if (!shouldRegisterDefaults()) {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -269,6 +269,11 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
|
|||
return this.dataBuffer.capacity(newCapacity);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getByte(int index) {
|
||||
return this.dataBuffer.getByte(index);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte read() {
|
||||
return this.dataBuffer.read();
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -18,6 +18,7 @@ package org.springframework.http.codec.support;
|
|||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
|
@ -55,10 +56,7 @@ import org.springframework.http.codec.xml.Jaxb2XmlDecoder;
|
|||
import org.springframework.http.codec.xml.Jaxb2XmlEncoder;
|
||||
import org.springframework.util.MimeTypeUtils;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.springframework.core.ResolvableType.forClass;
|
||||
|
||||
/**
|
||||
|
|
@ -141,7 +139,7 @@ public class ClientCodecConfigurerTests {
|
|||
Flux.just(new DefaultDataBufferFactory().wrap("line1\nline2".getBytes(StandardCharsets.UTF_8))),
|
||||
ResolvableType.forClass(String.class), MimeTypeUtils.TEXT_PLAIN, Collections.emptyMap());
|
||||
|
||||
assertEquals(Collections.singletonList("line1\nline2"), decoded.collectList().block(Duration.ZERO));
|
||||
assertEquals(Arrays.asList("line1", "line2"), decoded.collectList().block(Duration.ZERO));
|
||||
}
|
||||
|
||||
private void assertStringEncoder(Encoder<?> encoder, boolean textOnly) {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -50,7 +50,7 @@ import org.springframework.util.MimeTypeUtils;
|
|||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
import static org.springframework.core.ResolvableType.*;
|
||||
import static org.springframework.core.ResolvableType.forClass;
|
||||
|
||||
/**
|
||||
* Unit tests for {@link AbstractCodecConfigurer.AbstractDefaultCodecs}.
|
||||
|
|
@ -292,10 +292,6 @@ public class CodecConfigurerTests {
|
|||
|
||||
private static class TestDefaultCodecs extends AbstractDefaultCodecs {
|
||||
|
||||
@Override
|
||||
boolean splitTextOnNewLine() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -59,7 +59,7 @@ import org.springframework.http.codec.xml.Jaxb2XmlEncoder;
|
|||
import org.springframework.util.MimeTypeUtils;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.springframework.core.ResolvableType.*;
|
||||
import static org.springframework.core.ResolvableType.forClass;
|
||||
|
||||
/**
|
||||
* Unit tests for {@link ServerCodecConfigurer}.
|
||||
|
|
@ -143,7 +143,7 @@ public class ServerCodecConfigurerTests {
|
|||
Flux.just(new DefaultDataBufferFactory().wrap("line1\nline2".getBytes(StandardCharsets.UTF_8))),
|
||||
ResolvableType.forClass(String.class), MimeTypeUtils.TEXT_PLAIN, Collections.emptyMap());
|
||||
|
||||
assertEquals(Arrays.asList("line1\n", "line2"), flux.collectList().block(Duration.ZERO));
|
||||
assertEquals(Arrays.asList("line1", "line2"), flux.collectList().block(Duration.ZERO));
|
||||
}
|
||||
|
||||
private void assertStringEncoder(Encoder<?> encoder, boolean textOnly) {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -34,7 +34,7 @@ import org.springframework.http.server.reactive.ServerHttpRequest;
|
|||
import org.springframework.http.server.reactive.ServerHttpResponse;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* @author Sebastien Deleuze
|
||||
|
|
@ -121,7 +121,7 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest
|
|||
if (path.endsWith("write-and-flush")) {
|
||||
Flux<Publisher<DataBuffer>> responseBody = Flux
|
||||
.interval(Duration.ofMillis(50))
|
||||
.map(l -> toDataBuffer("data" + l, response.bufferFactory()))
|
||||
.map(l -> toDataBuffer("data" + l + "\n", response.bufferFactory()))
|
||||
.take(2)
|
||||
.map(Flux::just);
|
||||
responseBody = responseBody.concatWith(Flux.never());
|
||||
|
|
@ -131,14 +131,14 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest
|
|||
Flux<DataBuffer> responseBody = Flux
|
||||
.just("0123456789")
|
||||
.repeat(20000)
|
||||
.map(value -> toDataBuffer(value, response.bufferFactory()));
|
||||
.map(value -> toDataBuffer(value + "\n", response.bufferFactory()));
|
||||
return response.writeWith(responseBody);
|
||||
}
|
||||
else if (path.endsWith("write-and-never-complete")) {
|
||||
Flux<DataBuffer> responseBody = Flux
|
||||
.just("0123456789")
|
||||
.repeat(20000)
|
||||
.map(value -> toDataBuffer(value, response.bufferFactory()))
|
||||
.map(value -> toDataBuffer(value + "\n", response.bufferFactory()))
|
||||
.mergeWith(Flux.never());
|
||||
return response.writeWith(responseBody);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -47,12 +47,7 @@ import org.springframework.web.reactive.BindingContext;
|
|||
import org.springframework.web.server.ServerWebExchange;
|
||||
import org.springframework.web.server.ServerWebInputException;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.springframework.core.ResolvableType.forClassWithGenerics;
|
||||
import static org.springframework.http.MediaType.TEXT_PLAIN;
|
||||
import static org.springframework.mock.http.server.reactive.test.MockServerHttpRequest.post;
|
||||
|
|
@ -282,9 +277,9 @@ public class HttpEntityArgumentResolverTests {
|
|||
|
||||
assertEquals(exchange.getRequest().getHeaders(), httpEntity.getHeaders());
|
||||
StepVerifier.create(httpEntity.getBody())
|
||||
.expectNext("line1\n")
|
||||
.expectNext("line2\n")
|
||||
.expectNext("line3\n")
|
||||
.expectNext("line1")
|
||||
.expectNext("line2")
|
||||
.expectNext("line3")
|
||||
.expectComplete()
|
||||
.verify();
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue