diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/AbstractRawByteStreamDecoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/AbstractRawByteStreamDecoder.java deleted file mode 100644 index 7fc77d8137e..00000000000 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/AbstractRawByteStreamDecoder.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Copyright 2002-2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.core.codec.support; - -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; - -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import reactor.core.publisher.Flux; -import reactor.core.subscriber.SubscriberBarrier; -import reactor.core.util.BackpressureUtils; - -import org.springframework.core.ResolvableType; -import org.springframework.core.codec.Decoder; -import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferAllocator; -import org.springframework.util.Assert; -import org.springframework.util.MimeType; - -/** - * Abstract {@link Decoder} that plugs a {@link SubscriberBarrier} into the {@code Flux} - * pipeline in order to apply splitting/aggregation operations on the stream of data. - * - * @author Brian Clozel - */ -public abstract class AbstractRawByteStreamDecoder extends AbstractDecoder { - - private final DataBufferAllocator allocator; - - public AbstractRawByteStreamDecoder(DataBufferAllocator allocator, - MimeType... supportedMimeTypes) { - super(supportedMimeTypes); - Assert.notNull(allocator, "'allocator' must not be null"); - - this.allocator = allocator; - } - - @Override - public Flux decode(Publisher inputStream, ResolvableType type, - MimeType mimeType, Object... hints) { - - return decodeInternal(Flux.from(inputStream).lift(bbs -> subscriberBarrier(bbs)), - type, mimeType, hints); - } - - /** - * Create a {@link SubscriberBarrier} instance that will be plugged into the Publisher pipeline - * - *

Implementations should provide their own {@link SubscriberBarrier} or use one of the - * provided implementations by this class - */ - public abstract SubscriberBarrier subscriberBarrier( - Subscriber subscriber); - - public abstract Flux decodeInternal(Publisher inputStream, - ResolvableType type - , MimeType mimeType, Object... hints); - - - /** - * {@code SubscriberBarrier} implementation that buffers all received elements and emits a single - * {@code DataBuffer} once the incoming stream has been completed - */ - public static class ReduceSingleByteStreamBarrier - extends SubscriberBarrier { - - @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater REQUESTED = - AtomicLongFieldUpdater.newUpdater(ReduceSingleByteStreamBarrier.class, "requested"); - - static final AtomicIntegerFieldUpdater TERMINATED = - AtomicIntegerFieldUpdater.newUpdater(ReduceSingleByteStreamBarrier.class, "terminated"); - - private volatile long requested; - - private volatile int terminated; - - private DataBuffer buffer; - - public ReduceSingleByteStreamBarrier(Subscriber subscriber, - DataBufferAllocator allocator) { - super(subscriber); - this.buffer = allocator.allocateBuffer(); - } - - @Override - protected void doRequest(long n) { - BackpressureUtils.getAndAdd(REQUESTED, this, n); - if (TERMINATED.compareAndSet(this, 1, 2)) { - drainLast(); - } - else { - super.doRequest(Long.MAX_VALUE); - } - } - - @Override - protected void doComplete() { - if (TERMINATED.compareAndSet(this, 0, 1)) { - drainLast(); - } - } - - /* - * TODO: when available, wrap buffers with a single buffer and avoid copying data for every method call. - */ - @Override - protected void doNext(DataBuffer dataBuffer) { - this.buffer.write(dataBuffer); - } - - protected void drainLast() { - if (BackpressureUtils.getAndSub(REQUESTED, this, 1L) > 0) { - subscriber.onNext(this.buffer); - super.doComplete(); - } - } - } - - /** - * {@code SubscriberBarrier} implementation that splits incoming elements - * using line return delimiters: {@code "\n"} and {@code "\r\n"} - */ - public static class SplitLinesByteStreamBarrier - extends SubscriberBarrier { - - @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater REQUESTED = - AtomicLongFieldUpdater.newUpdater(SplitLinesByteStreamBarrier.class, "requested"); - - static final AtomicIntegerFieldUpdater TERMINATED = - AtomicIntegerFieldUpdater.newUpdater(SplitLinesByteStreamBarrier.class, "terminated"); - - private final DataBufferAllocator allocator; - - - private volatile long requested; - - private volatile int terminated; - - private DataBuffer buffer; - - public SplitLinesByteStreamBarrier(Subscriber subscriber, - DataBufferAllocator allocator) { - super(subscriber); - this.allocator = allocator; - this.buffer = allocator.allocateBuffer(); - } - - @Override - protected void doRequest(long n) { - BackpressureUtils.getAndAdd(REQUESTED, this, n); - if (TERMINATED.compareAndSet(this, 1, 2)) { - drainLast(); - } - else { - super.doRequest(n); - } - } - - @Override - protected void doComplete() { - if (TERMINATED.compareAndSet(this, 0, 1)) { - drainLast(); - } - } - - /* - * TODO: when available, wrap buffers with a single buffer and avoid copying data for every method call. - */ - @Override - protected void doNext(DataBuffer dataBuffer) { - this.buffer.write(dataBuffer); - - while (REQUESTED.get(this) > 0) { - int separatorIndex = findEndOfLine(this.buffer); - if (separatorIndex != -1) { - if (BackpressureUtils.getAndSub(REQUESTED, this, 1L) > 0) { - byte[] message = new byte[separatorIndex]; - this.buffer.read(message); - consumeSeparator(this.buffer); -// this.buffer = this.buffer.slice(); - DataBuffer buffer2 = allocator.allocateBuffer(message.length); - buffer2.write(message); - super.doNext(buffer2); - } - } - else { - super.doRequest(1); - } - } - } - - protected int findEndOfLine(DataBuffer buffer) { - - final int n = buffer.readableByteCount(); - for (int i = 0; i < n; i++) { - final byte b = buffer.get(i); - if (b == '\n') { - return i; - } - else if (b == '\r' && i < n - 1 && buffer.get(i + 1) == '\n') { - return i; - } - } - - return -1; - } - - protected void consumeSeparator(DataBuffer buffer) { - byte sep = buffer.read(); - if (sep == '\r') { - buffer.read(); - } - } - - protected void drainLast() { - if (BackpressureUtils.getAndSub(REQUESTED, this, 1L) > 0) { - subscriber.onNext(this.buffer); - super.doComplete(); - } - } - } - -} diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringDecoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringDecoder.java index 7889e1593e2..51fc665aa96 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringDecoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringDecoder.java @@ -20,13 +20,10 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; import reactor.core.publisher.Flux; -import reactor.core.subscriber.SubscriberBarrier; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferAllocator; import org.springframework.util.MimeType; /** @@ -40,15 +37,14 @@ import org.springframework.util.MimeType; * * @author Sebastien Deleuze * @author Brian Clozel + * @author Arjen Poutsma * @see StringEncoder */ -public class StringDecoder extends AbstractRawByteStreamDecoder { +public class StringDecoder extends AbstractDecoder { public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8; - public final boolean reduceToSingleBuffer; - - private final DataBufferAllocator allocator; + private final boolean reduceToSingleBuffer; /** * Create a {@code StringDecoder} that decodes a bytes stream to a String stream @@ -56,8 +52,8 @@ public class StringDecoder extends AbstractRawByteStreamDecoder { *

By default, this decoder will buffer bytes and * emit a single String as a result. */ - public StringDecoder(DataBufferAllocator allocator) { - this(allocator, true); + public StringDecoder() { + this(true); } /** @@ -66,45 +62,39 @@ public class StringDecoder extends AbstractRawByteStreamDecoder { * @param reduceToSingleBuffer whether this decoder should buffer all received items * and decode a single consolidated String or re-emit items as they are provided */ - public StringDecoder(DataBufferAllocator allocator, boolean reduceToSingleBuffer) { - super(allocator, new MimeType("text", "plain", DEFAULT_CHARSET)); + public StringDecoder(boolean reduceToSingleBuffer) { + super(new MimeType("text", "plain", DEFAULT_CHARSET)); this.reduceToSingleBuffer = reduceToSingleBuffer; - this.allocator = allocator; } @Override public boolean canDecode(ResolvableType type, MimeType mimeType, Object... hints) { - return super.canDecode(type, mimeType, hints) - && String.class.isAssignableFrom(type.getRawClass()); + return super.canDecode(type, mimeType, hints) && + String.class.equals(type.getRawClass()); } @Override - public SubscriberBarrier subscriberBarrier( - Subscriber subscriber) { - if (reduceToSingleBuffer) { - return new ReduceSingleByteStreamBarrier(subscriber, allocator); + public Flux decode(Publisher inputStream, ResolvableType type, + MimeType mimeType, Object... hints) { + Flux inputFlux = Flux.from(inputStream); + if (this.reduceToSingleBuffer) { + inputFlux = Flux.from(inputFlux.reduce(DataBuffer::write)); } - else { - return new SubscriberBarrier(subscriber); - } - - } - - @Override - public Flux decodeInternal(Publisher inputStream, - ResolvableType type, MimeType mimeType, Object... hints) { - Charset charset; - if (mimeType != null && mimeType.getCharSet() != null) { - charset = mimeType.getCharSet(); - } - else { - charset = DEFAULT_CHARSET; - } - return Flux.from(inputStream).map(content -> { + Charset charset = getCharset(mimeType); + return inputFlux.map(content -> { byte[] bytes = new byte[content.readableByteCount()]; content.read(bytes); return new String(bytes, charset); }); } + private Charset getCharset(MimeType mimeType) { + if (mimeType != null && mimeType.getCharSet() != null) { + return mimeType.getCharSet(); + } + else { + return DEFAULT_CHARSET; + } + } + } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/WebClient.java b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/WebClient.java index 317048314d5..ff825bc4ba1 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/WebClient.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/WebClient.java @@ -88,7 +88,7 @@ public final class WebClient { DataBufferAllocator allocator = new DefaultDataBufferAllocator(); this.messageEncoders = Arrays.asList(new ByteBufferEncoder(), new StringEncoder(), new JacksonJsonEncoder()); - this.messageDecoders = Arrays.asList(new ByteBufferDecoder(), new StringDecoder(allocator), + this.messageDecoders = Arrays.asList(new ByteBufferDecoder(), new StringDecoder(), new JacksonJsonDecoder(new JsonObjectDecoder())); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerAdapter.java index 55de2dd7f85..75d98f2a717 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerAdapter.java @@ -100,7 +100,7 @@ public class RequestMappingHandlerAdapter implements HandlerAdapter, Initializin if (ObjectUtils.isEmpty(this.argumentResolvers)) { List> decoders = Arrays.asList(new ByteBufferDecoder(), - new StringDecoder(allocator), + new StringDecoder(), new JacksonJsonDecoder(new JsonObjectDecoder())); this.argumentResolvers.add(new RequestParamArgumentResolver()); diff --git a/spring-web-reactive/src/test/java/org/springframework/core/codec/support/StringDecoderTests.java b/spring-web-reactive/src/test/java/org/springframework/core/codec/support/StringDecoderTests.java index 2a87fe86bee..d8494312b90 100644 --- a/spring-web-reactive/src/test/java/org/springframework/core/codec/support/StringDecoderTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/core/codec/support/StringDecoderTests.java @@ -18,9 +18,9 @@ package org.springframework.core.codec.support; import org.junit.Before; import org.junit.Test; +import reactor.core.converter.RxJava1SingleConverter; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.converter.RxJava1SingleConverter; import reactor.core.test.TestSubscriber; import rx.Single; @@ -40,7 +40,7 @@ public class StringDecoderTests extends AbstractAllocatingTestCase { @Before public void createEncoder() { - decoder = new StringDecoder(allocator); + decoder = new StringDecoder(); } @@ -53,29 +53,33 @@ public class StringDecoderTests extends AbstractAllocatingTestCase { @Test public void decode() throws InterruptedException { - Flux source = Flux.just(stringBuffer("foo"), stringBuffer("bar")); - Flux output = this.decoder.decode(source, ResolvableType.forClassWithGenerics(Flux.class, String.class), null); + Flux source = + Flux.just(stringBuffer("foo"), stringBuffer("bar"), stringBuffer("baz")); + Flux output = + this.decoder.decode(source, ResolvableType.forClass(String.class), null); TestSubscriber testSubscriber = new TestSubscriber<>(); - testSubscriber.bindTo(output).assertValues("foobar"); + testSubscriber.bindTo(output).assertValues("foobarbaz"); } @Test public void decodeDoNotBuffer() throws InterruptedException { - StringDecoder decoder = new StringDecoder(allocator, false); + StringDecoder decoder = new StringDecoder(false); Flux source = Flux.just(stringBuffer("foo"), stringBuffer("bar")); - Flux output = decoder.decode(source, ResolvableType.forClassWithGenerics(Flux.class, String.class), null); + Flux output = + decoder.decode(source, ResolvableType.forClass(String.class), null); TestSubscriber testSubscriber = new TestSubscriber<>(); testSubscriber.bindTo(output).assertValues("foo", "bar"); } @Test public void decodeMono() throws InterruptedException { - Flux source = Flux.just(stringBuffer("foo"), stringBuffer("bar")); + Flux source = + Flux.just(stringBuffer("foo"), stringBuffer("bar"), stringBuffer("baz")); Mono mono = Mono.from(this.decoder.decode(source, ResolvableType.forClassWithGenerics(Mono.class, String.class), MediaType.TEXT_PLAIN)); TestSubscriber testSubscriber = new TestSubscriber<>(); - testSubscriber.bindTo(mono).assertValues("foobar"); + testSubscriber.bindTo(mono).assertValues("foobarbaz"); } @Test