parent
1c67ef4bed
commit
f738273486
|
|
@ -33,6 +33,7 @@ import org.springframework.core.ResolvableType;
|
|||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||
import org.springframework.core.io.buffer.PooledDataBuffer;
|
||||
import org.springframework.core.log.LogFormatUtils;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
|
|
@ -96,7 +97,8 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
|
|||
Flux<DataBuffer> inputFlux = Flux.from(inputStream)
|
||||
.flatMap(dataBuffer -> splitOnDelimiter(dataBuffer, delimiterBytes))
|
||||
.bufferUntil(StringDecoder::isEndFrame)
|
||||
.flatMap(StringDecoder::joinUntilEndFrame);
|
||||
.flatMap(StringDecoder::joinUntilEndFrame)
|
||||
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
|
||||
return super.decode(inputFlux, elementType, mimeType, hints);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -176,8 +176,9 @@ public class StringDecoderTests extends AbstractDataBufferAllocatingTestCase {
|
|||
@Test
|
||||
public void decodeError() {
|
||||
DataBuffer fooBuffer = stringBuffer("foo\n");
|
||||
DataBuffer barBuffer = stringBuffer("bar");
|
||||
Flux<DataBuffer> source =
|
||||
Flux.just(fooBuffer).concatWith(Flux.error(new RuntimeException()));
|
||||
Flux.just(fooBuffer, barBuffer).concatWith(Flux.error(new RuntimeException()));
|
||||
|
||||
Flux<String> output = this.decoder.decode(source,
|
||||
ResolvableType.forClass(String.class), null, Collections.emptyMap());
|
||||
|
|
|
|||
Loading…
Reference in New Issue