Improve StringDecoder.

This commit is contained in:
Arjen Poutsma 2016-03-31 11:31:02 +02:00
parent d927d72204
commit 75d006d2f9
5 changed files with 40 additions and 286 deletions

View File

@ -1,240 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.codec.support;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.subscriber.SubscriberBarrier;
import reactor.core.util.BackpressureUtils;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Decoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
/**
* Abstract {@link Decoder} that plugs a {@link SubscriberBarrier} into the {@code Flux}
* pipeline in order to apply splitting/aggregation operations on the stream of data.
*
* @author Brian Clozel
*/
public abstract class AbstractRawByteStreamDecoder<T> extends AbstractDecoder<T> {
private final DataBufferAllocator allocator;
public AbstractRawByteStreamDecoder(DataBufferAllocator allocator,
MimeType... supportedMimeTypes) {
super(supportedMimeTypes);
Assert.notNull(allocator, "'allocator' must not be null");
this.allocator = allocator;
}
@Override
public Flux<T> decode(Publisher<DataBuffer> inputStream, ResolvableType type,
MimeType mimeType, Object... hints) {
return decodeInternal(Flux.from(inputStream).lift(bbs -> subscriberBarrier(bbs)),
type, mimeType, hints);
}
/**
* Create a {@link SubscriberBarrier} instance that will be plugged into the Publisher pipeline
*
* <p>Implementations should provide their own {@link SubscriberBarrier} or use one of the
* provided implementations by this class
*/
public abstract SubscriberBarrier<DataBuffer, DataBuffer> subscriberBarrier(
Subscriber<? super DataBuffer> subscriber);
public abstract Flux<T> decodeInternal(Publisher<DataBuffer> inputStream,
ResolvableType type
, MimeType mimeType, Object... hints);
/**
* {@code SubscriberBarrier} implementation that buffers all received elements and emits a single
* {@code DataBuffer} once the incoming stream has been completed
*/
public static class ReduceSingleByteStreamBarrier
extends SubscriberBarrier<DataBuffer, DataBuffer> {
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<ReduceSingleByteStreamBarrier> REQUESTED =
AtomicLongFieldUpdater.newUpdater(ReduceSingleByteStreamBarrier.class, "requested");
static final AtomicIntegerFieldUpdater<ReduceSingleByteStreamBarrier> TERMINATED =
AtomicIntegerFieldUpdater.newUpdater(ReduceSingleByteStreamBarrier.class, "terminated");
private volatile long requested;
private volatile int terminated;
private DataBuffer buffer;
public ReduceSingleByteStreamBarrier(Subscriber<? super DataBuffer> subscriber,
DataBufferAllocator allocator) {
super(subscriber);
this.buffer = allocator.allocateBuffer();
}
@Override
protected void doRequest(long n) {
BackpressureUtils.getAndAdd(REQUESTED, this, n);
if (TERMINATED.compareAndSet(this, 1, 2)) {
drainLast();
}
else {
super.doRequest(Long.MAX_VALUE);
}
}
@Override
protected void doComplete() {
if (TERMINATED.compareAndSet(this, 0, 1)) {
drainLast();
}
}
/*
* TODO: when available, wrap buffers with a single buffer and avoid copying data for every method call.
*/
@Override
protected void doNext(DataBuffer dataBuffer) {
this.buffer.write(dataBuffer);
}
protected void drainLast() {
if (BackpressureUtils.getAndSub(REQUESTED, this, 1L) > 0) {
subscriber.onNext(this.buffer);
super.doComplete();
}
}
}
/**
* {@code SubscriberBarrier} implementation that splits incoming elements
* using line return delimiters: {@code "\n"} and {@code "\r\n"}
*/
public static class SplitLinesByteStreamBarrier
extends SubscriberBarrier<DataBuffer, DataBuffer> {
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<SplitLinesByteStreamBarrier> REQUESTED =
AtomicLongFieldUpdater.newUpdater(SplitLinesByteStreamBarrier.class, "requested");
static final AtomicIntegerFieldUpdater<SplitLinesByteStreamBarrier> TERMINATED =
AtomicIntegerFieldUpdater.newUpdater(SplitLinesByteStreamBarrier.class, "terminated");
private final DataBufferAllocator allocator;
private volatile long requested;
private volatile int terminated;
private DataBuffer buffer;
public SplitLinesByteStreamBarrier(Subscriber<? super DataBuffer> subscriber,
DataBufferAllocator allocator) {
super(subscriber);
this.allocator = allocator;
this.buffer = allocator.allocateBuffer();
}
@Override
protected void doRequest(long n) {
BackpressureUtils.getAndAdd(REQUESTED, this, n);
if (TERMINATED.compareAndSet(this, 1, 2)) {
drainLast();
}
else {
super.doRequest(n);
}
}
@Override
protected void doComplete() {
if (TERMINATED.compareAndSet(this, 0, 1)) {
drainLast();
}
}
/*
* TODO: when available, wrap buffers with a single buffer and avoid copying data for every method call.
*/
@Override
protected void doNext(DataBuffer dataBuffer) {
this.buffer.write(dataBuffer);
while (REQUESTED.get(this) > 0) {
int separatorIndex = findEndOfLine(this.buffer);
if (separatorIndex != -1) {
if (BackpressureUtils.getAndSub(REQUESTED, this, 1L) > 0) {
byte[] message = new byte[separatorIndex];
this.buffer.read(message);
consumeSeparator(this.buffer);
// this.buffer = this.buffer.slice();
DataBuffer buffer2 = allocator.allocateBuffer(message.length);
buffer2.write(message);
super.doNext(buffer2);
}
}
else {
super.doRequest(1);
}
}
}
protected int findEndOfLine(DataBuffer buffer) {
final int n = buffer.readableByteCount();
for (int i = 0; i < n; i++) {
final byte b = buffer.get(i);
if (b == '\n') {
return i;
}
else if (b == '\r' && i < n - 1 && buffer.get(i + 1) == '\n') {
return i;
}
}
return -1;
}
protected void consumeSeparator(DataBuffer buffer) {
byte sep = buffer.read();
if (sep == '\r') {
buffer.read();
}
}
protected void drainLast() {
if (BackpressureUtils.getAndSub(REQUESTED, this, 1L) > 0) {
subscriber.onNext(this.buffer);
super.doComplete();
}
}
}
}

View File

@ -20,13 +20,10 @@ import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.subscriber.SubscriberBarrier;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.util.MimeType;
/**
@ -40,15 +37,14 @@ import org.springframework.util.MimeType;
*
* @author Sebastien Deleuze
* @author Brian Clozel
* @author Arjen Poutsma
* @see StringEncoder
*/
public class StringDecoder extends AbstractRawByteStreamDecoder<String> {
public class StringDecoder extends AbstractDecoder<String> {
public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
public final boolean reduceToSingleBuffer;
private final DataBufferAllocator allocator;
private final boolean reduceToSingleBuffer;
/**
* Create a {@code StringDecoder} that decodes a bytes stream to a String stream
@ -56,8 +52,8 @@ public class StringDecoder extends AbstractRawByteStreamDecoder<String> {
* <p>By default, this decoder will buffer bytes and
* emit a single String as a result.
*/
public StringDecoder(DataBufferAllocator allocator) {
this(allocator, true);
public StringDecoder() {
this(true);
}
/**
@ -66,45 +62,39 @@ public class StringDecoder extends AbstractRawByteStreamDecoder<String> {
* @param reduceToSingleBuffer whether this decoder should buffer all received items
* and decode a single consolidated String or re-emit items as they are provided
*/
public StringDecoder(DataBufferAllocator allocator, boolean reduceToSingleBuffer) {
super(allocator, new MimeType("text", "plain", DEFAULT_CHARSET));
public StringDecoder(boolean reduceToSingleBuffer) {
super(new MimeType("text", "plain", DEFAULT_CHARSET));
this.reduceToSingleBuffer = reduceToSingleBuffer;
this.allocator = allocator;
}
@Override
public boolean canDecode(ResolvableType type, MimeType mimeType, Object... hints) {
return super.canDecode(type, mimeType, hints)
&& String.class.isAssignableFrom(type.getRawClass());
return super.canDecode(type, mimeType, hints) &&
String.class.equals(type.getRawClass());
}
@Override
public SubscriberBarrier<DataBuffer, DataBuffer> subscriberBarrier(
Subscriber<? super DataBuffer> subscriber) {
if (reduceToSingleBuffer) {
return new ReduceSingleByteStreamBarrier(subscriber, allocator);
public Flux<String> decode(Publisher<DataBuffer> inputStream, ResolvableType type,
MimeType mimeType, Object... hints) {
Flux<DataBuffer> inputFlux = Flux.from(inputStream);
if (this.reduceToSingleBuffer) {
inputFlux = Flux.from(inputFlux.reduce(DataBuffer::write));
}
else {
return new SubscriberBarrier<DataBuffer, DataBuffer>(subscriber);
}
}
@Override
public Flux<String> decodeInternal(Publisher<DataBuffer> inputStream,
ResolvableType type, MimeType mimeType, Object... hints) {
Charset charset;
if (mimeType != null && mimeType.getCharSet() != null) {
charset = mimeType.getCharSet();
}
else {
charset = DEFAULT_CHARSET;
}
return Flux.from(inputStream).map(content -> {
Charset charset = getCharset(mimeType);
return inputFlux.map(content -> {
byte[] bytes = new byte[content.readableByteCount()];
content.read(bytes);
return new String(bytes, charset);
});
}
private Charset getCharset(MimeType mimeType) {
if (mimeType != null && mimeType.getCharSet() != null) {
return mimeType.getCharSet();
}
else {
return DEFAULT_CHARSET;
}
}
}

View File

@ -88,7 +88,7 @@ public final class WebClient {
DataBufferAllocator allocator = new DefaultDataBufferAllocator();
this.messageEncoders = Arrays.asList(new ByteBufferEncoder(), new StringEncoder(),
new JacksonJsonEncoder());
this.messageDecoders = Arrays.asList(new ByteBufferDecoder(), new StringDecoder(allocator),
this.messageDecoders = Arrays.asList(new ByteBufferDecoder(), new StringDecoder(),
new JacksonJsonDecoder(new JsonObjectDecoder()));
}

View File

@ -100,7 +100,7 @@ public class RequestMappingHandlerAdapter implements HandlerAdapter, Initializin
if (ObjectUtils.isEmpty(this.argumentResolvers)) {
List<Decoder<?>> decoders = Arrays.asList(new ByteBufferDecoder(),
new StringDecoder(allocator),
new StringDecoder(),
new JacksonJsonDecoder(new JsonObjectDecoder()));
this.argumentResolvers.add(new RequestParamArgumentResolver());

View File

@ -18,9 +18,9 @@ package org.springframework.core.codec.support;
import org.junit.Before;
import org.junit.Test;
import reactor.core.converter.RxJava1SingleConverter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.converter.RxJava1SingleConverter;
import reactor.core.test.TestSubscriber;
import rx.Single;
@ -40,7 +40,7 @@ public class StringDecoderTests extends AbstractAllocatingTestCase {
@Before
public void createEncoder() {
decoder = new StringDecoder(allocator);
decoder = new StringDecoder();
}
@ -53,29 +53,33 @@ public class StringDecoderTests extends AbstractAllocatingTestCase {
@Test
public void decode() throws InterruptedException {
Flux<DataBuffer> source = Flux.just(stringBuffer("foo"), stringBuffer("bar"));
Flux<String> output = this.decoder.decode(source, ResolvableType.forClassWithGenerics(Flux.class, String.class), null);
Flux<DataBuffer> source =
Flux.just(stringBuffer("foo"), stringBuffer("bar"), stringBuffer("baz"));
Flux<String> output =
this.decoder.decode(source, ResolvableType.forClass(String.class), null);
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
testSubscriber.bindTo(output).assertValues("foobar");
testSubscriber.bindTo(output).assertValues("foobarbaz");
}
@Test
public void decodeDoNotBuffer() throws InterruptedException {
StringDecoder decoder = new StringDecoder(allocator, false);
StringDecoder decoder = new StringDecoder(false);
Flux<DataBuffer> source = Flux.just(stringBuffer("foo"), stringBuffer("bar"));
Flux<String> output = decoder.decode(source, ResolvableType.forClassWithGenerics(Flux.class, String.class), null);
Flux<String> output =
decoder.decode(source, ResolvableType.forClass(String.class), null);
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
testSubscriber.bindTo(output).assertValues("foo", "bar");
}
@Test
public void decodeMono() throws InterruptedException {
Flux<DataBuffer> source = Flux.just(stringBuffer("foo"), stringBuffer("bar"));
Flux<DataBuffer> source =
Flux.just(stringBuffer("foo"), stringBuffer("bar"), stringBuffer("baz"));
Mono<String> mono = Mono.from(this.decoder.decode(source,
ResolvableType.forClassWithGenerics(Mono.class, String.class),
MediaType.TEXT_PLAIN));
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
testSubscriber.bindTo(mono).assertValues("foobar");
testSubscriber.bindTo(mono).assertValues("foobarbaz");
}
@Test