diff --git a/spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java
new file mode 100644
index 00000000000..d5bdf0feedc
--- /dev/null
+++ b/spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2002-2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.core.codec;
+
+import java.util.Map;
+
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import org.springframework.core.ResolvableType;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.lang.Nullable;
+import org.springframework.util.MimeType;
+
+/**
+ * Abstract base class for {@code Decoder} implementations that can decode
+ * a {@code DataBuffer} directly to the target element type.
+ *
+ *
Sub-classes must implement {@link #decodeDataBuffer} to provide a way to
+ * transform a {@code DataBuffer} to the target data type. The default
+ * {@link #decode} implementation transforms each individual data buffer while
+ * {@link #decodeToMono} applies "reduce" and transforms the aggregated buffer.
+ *
+ *
Sub-classes can override {@link #decode} in order to split the input stream
+ * along different boundaries (e.g. on new line characters for {@code String})
+ * or always reduce to a single data buffer (e.g. {@code Resource}).
+ *
+ * @author Rossen Stoyanchev
+ * @since 5.0
+ */
+public abstract class AbstractDataBufferDecoder extends AbstractDecoder {
+
+
+ protected AbstractDataBufferDecoder(MimeType... supportedMimeTypes) {
+ super(supportedMimeTypes);
+ }
+
+
+ @Override
+ public Flux decode(Publisher inputStream, ResolvableType elementType,
+ @Nullable MimeType mimeType, @Nullable Map hints) {
+
+ return Flux.from(inputStream).map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints));
+ }
+
+ @Override
+ public Mono decodeToMono(Publisher inputStream, ResolvableType elementType,
+ @Nullable MimeType mimeType, @Nullable Map hints) {
+
+ return Flux.from(inputStream)
+ .reduce(DataBuffer::write)
+ .map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints));
+ }
+
+ /**
+ * How to decode a {@code DataBuffer} to the target element type.
+ */
+ protected abstract T decodeDataBuffer(DataBuffer buffer, ResolvableType elementType,
+ @Nullable MimeType mimeType, @Nullable Map hints);
+
+}
diff --git a/spring-core/src/main/java/org/springframework/core/codec/ByteArrayDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/ByteArrayDecoder.java
index d2587a8385a..cf8a34c1056 100644
--- a/spring-core/src/main/java/org/springframework/core/codec/ByteArrayDecoder.java
+++ b/spring-core/src/main/java/org/springframework/core/codec/ByteArrayDecoder.java
@@ -18,9 +18,6 @@ package org.springframework.core.codec;
import java.util.Map;
-import org.reactivestreams.Publisher;
-import reactor.core.publisher.Flux;
-
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
@@ -32,9 +29,11 @@ import org.springframework.util.MimeTypeUtils;
* Decoder for {@code byte} arrays.
*
* @author Arjen Poutsma
+ * @author Rossen Stoyanchev
* @since 5.0
*/
-public class ByteArrayDecoder extends AbstractDecoder {
+public class ByteArrayDecoder extends AbstractDataBufferDecoder {
+
public ByteArrayDecoder() {
super(MimeTypeUtils.ALL);
@@ -48,16 +47,13 @@ public class ByteArrayDecoder extends AbstractDecoder {
}
@Override
- public Flux decode(Publisher inputStream, ResolvableType elementType,
+ protected byte[] decodeDataBuffer(DataBuffer dataBuffer, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map hints) {
- return Flux.from(inputStream).map((dataBuffer) -> {
- byte[] result = new byte[dataBuffer.readableByteCount()];
- dataBuffer.read(result);
- DataBufferUtils.release(dataBuffer);
- return result ;
- });
+ byte[] result = new byte[dataBuffer.readableByteCount()];
+ dataBuffer.read(result);
+ DataBufferUtils.release(dataBuffer);
+ return result;
}
-
}
diff --git a/spring-core/src/main/java/org/springframework/core/codec/ByteBufferDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/ByteBufferDecoder.java
index 391a2fcdc66..df4a642f540 100644
--- a/spring-core/src/main/java/org/springframework/core/codec/ByteBufferDecoder.java
+++ b/spring-core/src/main/java/org/springframework/core/codec/ByteBufferDecoder.java
@@ -19,9 +19,6 @@ package org.springframework.core.codec;
import java.nio.ByteBuffer;
import java.util.Map;
-import org.reactivestreams.Publisher;
-import reactor.core.publisher.Flux;
-
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
@@ -34,9 +31,11 @@ import org.springframework.util.MimeTypeUtils;
*
* @author Sebastien Deleuze
* @author Arjen Poutsma
+ * @author Rossen Stoyanchev
* @since 5.0
*/
-public class ByteBufferDecoder extends AbstractDecoder {
+public class ByteBufferDecoder extends AbstractDataBufferDecoder {
+
public ByteBufferDecoder() {
super(MimeTypeUtils.ALL);
@@ -50,16 +49,14 @@ public class ByteBufferDecoder extends AbstractDecoder {
}
@Override
- public Flux decode(Publisher inputStream, ResolvableType elementType,
+ protected ByteBuffer decodeDataBuffer(DataBuffer dataBuffer, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map hints) {
- return Flux.from(inputStream).map((dataBuffer) -> {
- ByteBuffer copy = ByteBuffer.allocate(dataBuffer.readableByteCount());
- copy.put(dataBuffer.asByteBuffer());
- copy.flip();
- DataBufferUtils.release(dataBuffer);
- return copy;
- });
+ ByteBuffer copy = ByteBuffer.allocate(dataBuffer.readableByteCount());
+ copy.put(dataBuffer.asByteBuffer());
+ copy.flip();
+ DataBufferUtils.release(dataBuffer);
+ return copy;
}
}
diff --git a/spring-core/src/main/java/org/springframework/core/codec/DataBufferDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/DataBufferDecoder.java
index 02fa331691d..25e2d5f482c 100644
--- a/spring-core/src/main/java/org/springframework/core/codec/DataBufferDecoder.java
+++ b/spring-core/src/main/java/org/springframework/core/codec/DataBufferDecoder.java
@@ -34,9 +34,11 @@ import org.springframework.util.MimeTypeUtils;
* {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer)}.
*
* @author Arjen Poutsma
+ * @author Rossen Stoyanchev
* @since 5.0
*/
-public class DataBufferDecoder extends AbstractDecoder {
+public class DataBufferDecoder extends AbstractDataBufferDecoder {
+
public DataBufferDecoder() {
super(MimeTypeUtils.ALL);
@@ -56,4 +58,11 @@ public class DataBufferDecoder extends AbstractDecoder {
return Flux.from(inputStream);
}
+ @Override
+ protected DataBuffer decodeDataBuffer(DataBuffer buffer, ResolvableType elementType,
+ @Nullable MimeType mimeType, @Nullable Map hints) {
+
+ return buffer;
+ }
+
}
diff --git a/spring-core/src/main/java/org/springframework/core/codec/ResourceDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/ResourceDecoder.java
index 75b49c6f9ab..e46839085a6 100644
--- a/spring-core/src/main/java/org/springframework/core/codec/ResourceDecoder.java
+++ b/spring-core/src/main/java/org/springframework/core/codec/ResourceDecoder.java
@@ -21,7 +21,6 @@ import java.util.Map;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.ByteArrayResource;
@@ -38,9 +37,11 @@ import org.springframework.util.MimeTypeUtils;
* Decoder for {@link Resource}s.
*
* @author Arjen Poutsma
+ * @author Rossen Stoyanchev
* @since 5.0
*/
-public class ResourceDecoder extends AbstractDecoder {
+public class ResourceDecoder extends AbstractDataBufferDecoder {
+
public ResourceDecoder() {
super(MimeTypeUtils.ALL);
@@ -63,30 +64,24 @@ public class ResourceDecoder extends AbstractDecoder {
}
@Override
- public Mono decodeToMono(Publisher inputStream, ResolvableType elementType,
+ protected Resource decodeDataBuffer(DataBuffer dataBuffer, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map hints) {
+ byte[] bytes = new byte[dataBuffer.readableByteCount()];
+ dataBuffer.read(bytes);
+ DataBufferUtils.release(dataBuffer);
+
Class> clazz = elementType.getRawClass();
Assert.state(clazz != null, "No resource class");
- Mono byteArray = Flux.from(inputStream).
- reduce(DataBuffer::write).
- map(dataBuffer -> {
- byte[] bytes = new byte[dataBuffer.readableByteCount()];
- dataBuffer.read(bytes);
- DataBufferUtils.release(dataBuffer);
- return bytes;
- });
-
-
if (InputStreamResource.class == clazz) {
- return Mono.from(byteArray.map(ByteArrayInputStream::new).map(InputStreamResource::new));
+ return new InputStreamResource(new ByteArrayInputStream(bytes));
}
else if (clazz.isAssignableFrom(ByteArrayResource.class)) {
- return Mono.from(byteArray.map(ByteArrayResource::new));
+ return new ByteArrayResource(bytes);
}
else {
- return Mono.error(new IllegalStateException("Unsupported resource class: " + clazz));
+ throw new IllegalStateException("Unsupported resource class: " + clazz);
}
}
diff --git a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java
index 555430e77d3..fc403bebd9a 100644
--- a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java
+++ b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java
@@ -26,7 +26,6 @@ import java.util.function.IntPredicate;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
@@ -49,7 +48,7 @@ import org.springframework.util.MimeTypeUtils;
* @since 5.0
* @see CharSequenceEncoder
*/
-public class StringDecoder extends AbstractDecoder {
+public class StringDecoder extends AbstractDataBufferDecoder {
public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
@@ -84,16 +83,7 @@ public class StringDecoder extends AbstractDecoder {
if (this.splitOnNewline) {
inputFlux = Flux.from(inputStream).flatMap(StringDecoder::splitOnNewline);
}
- return inputFlux.map(buffer -> decodeDataBuffer(buffer, mimeType));
- }
-
- @Override
- public Mono decodeToMono(Publisher inputStream, ResolvableType elementType,
- @Nullable MimeType mimeType, @Nullable Map hints) {
-
- return Flux.from(inputStream)
- .reduce(DataBuffer::write)
- .map(buffer -> decodeDataBuffer(buffer, mimeType));
+ return super.decode(inputFlux, elementType, mimeType, hints);
}
private static Flux splitOnNewline(DataBuffer dataBuffer) {
@@ -113,7 +103,10 @@ public class StringDecoder extends AbstractDecoder {
return Flux.fromIterable(results);
}
- private String decodeDataBuffer(DataBuffer dataBuffer, @Nullable MimeType mimeType) {
+ @Override
+ protected String decodeDataBuffer(DataBuffer dataBuffer, ResolvableType elementType,
+ @Nullable MimeType mimeType, @Nullable Map hints) {
+
Charset charset = getCharset(mimeType);
CharBuffer charBuffer = charset.decode(dataBuffer.asByteBuffer());
DataBufferUtils.release(dataBuffer);
diff --git a/spring-core/src/test/java/org/springframework/core/codec/ByteArrayDecoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/ByteArrayDecoderTests.java
index 7f6aed0aa7c..2635ea2dbca 100644
--- a/spring-core/src/test/java/org/springframework/core/codec/ByteArrayDecoderTests.java
+++ b/spring-core/src/test/java/org/springframework/core/codec/ByteArrayDecoderTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2016 the original author or authors.
+ * Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@ import java.util.Collections;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import org.springframework.core.ResolvableType;
@@ -39,6 +40,7 @@ public class ByteArrayDecoderTests extends AbstractDataBufferAllocatingTestCase
private final ByteArrayDecoder decoder = new ByteArrayDecoder();
+
@Test
public void canDecode() {
assertTrue(this.decoder.canDecode(ResolvableType.forClass(byte[].class),
@@ -65,4 +67,19 @@ public class ByteArrayDecoderTests extends AbstractDataBufferAllocatingTestCase
.verify();
}
+ @Test
+ public void decodeToMono() {
+ DataBuffer fooBuffer = stringBuffer("foo");
+ DataBuffer barBuffer = stringBuffer("bar");
+ Flux source = Flux.just(fooBuffer, barBuffer);
+ Mono output = this.decoder.decodeToMono(source,
+ ResolvableType.forClassWithGenerics(Publisher.class, byte[].class),
+ null, Collections.emptyMap());
+
+ StepVerifier.create(output)
+ .consumeNextWith(bytes -> assertArrayEquals("foobar".getBytes(), bytes))
+ .expectComplete()
+ .verify();
+ }
+
}
\ No newline at end of file
diff --git a/spring-core/src/test/java/org/springframework/core/codec/ByteBufferDecoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/ByteBufferDecoderTests.java
index 34e98037168..4068822e6af 100644
--- a/spring-core/src/test/java/org/springframework/core/codec/ByteBufferDecoderTests.java
+++ b/spring-core/src/test/java/org/springframework/core/codec/ByteBufferDecoderTests.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import org.springframework.core.ResolvableType;
@@ -63,4 +64,19 @@ public class ByteBufferDecoderTests extends AbstractDataBufferAllocatingTestCase
.expectComplete()
.verify();
}
+
+ @Test
+ public void decodeToMono() {
+ DataBuffer fooBuffer = stringBuffer("foo");
+ DataBuffer barBuffer = stringBuffer("bar");
+ Flux source = Flux.just(fooBuffer, barBuffer);
+ Mono output = this.decoder.decodeToMono(source,
+ ResolvableType.forClassWithGenerics(Publisher.class, ByteBuffer.class),
+ null, Collections.emptyMap());
+
+ StepVerifier.create(output)
+ .expectNext(ByteBuffer.wrap("foobar".getBytes()))
+ .expectComplete()
+ .verify();
+ }
}
diff --git a/spring-core/src/test/java/org/springframework/core/codec/DataBufferDecoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/DataBufferDecoderTests.java
index 704db7135fd..b6c277f399d 100644
--- a/spring-core/src/test/java/org/springframework/core/codec/DataBufferDecoderTests.java
+++ b/spring-core/src/test/java/org/springframework/core/codec/DataBufferDecoderTests.java
@@ -17,15 +17,19 @@
package org.springframework.core.codec;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.Collections;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;
import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.support.DataBufferTestUtils;
import org.springframework.util.MimeTypeUtils;
import static org.junit.Assert.*;
@@ -53,11 +57,25 @@ public class DataBufferDecoderTests extends AbstractDataBufferAllocatingTestCase
DataBuffer barBuffer = stringBuffer("bar");
Flux source = Flux.just(fooBuffer, barBuffer);
Flux output = this.decoder.decode(source,
- ResolvableType.forClassWithGenerics(Publisher.class, ByteBuffer.class),
+ ResolvableType.forClassWithGenerics(Publisher.class, DataBuffer.class),
null, Collections.emptyMap());
assertSame(source, output);
release(fooBuffer, barBuffer);
}
-}
+
+ @Test
+ public void decodeToMono() {
+ DataBuffer fooBuffer = stringBuffer("foo");
+ DataBuffer barBuffer = stringBuffer("bar");
+ Flux source = Flux.just(fooBuffer, barBuffer);
+ Mono output = this.decoder.decodeToMono(source,
+ ResolvableType.forClassWithGenerics(Publisher.class, DataBuffer.class),
+ null, Collections.emptyMap());
+
+ DataBuffer outputBuffer = output.block(Duration.ofSeconds(5));
+ assertEquals("foobar", DataBufferTestUtils.dumpString(outputBuffer, StandardCharsets.UTF_8));
+
+ release(outputBuffer);
+ }}