Improve decodeToMono support
This commit ensures uniform support for decodeToMono across the various byte and String related decoders. Issue: SPR-16253
This commit is contained in:
parent
8e253a316d
commit
f4d8c7cc2b
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.core.codec;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.MimeType;
|
||||
|
||||
/**
|
||||
* Abstract base class for {@code Decoder} implementations that can decode
|
||||
* a {@code DataBuffer} directly to the target element type.
|
||||
*
|
||||
* <p>Sub-classes must implement {@link #decodeDataBuffer} to provide a way to
|
||||
* transform a {@code DataBuffer} to the target data type. The default
|
||||
* {@link #decode} implementation transforms each individual data buffer while
|
||||
* {@link #decodeToMono} applies "reduce" and transforms the aggregated buffer.
|
||||
*
|
||||
* <p>Sub-classes can override {@link #decode} in order to split the input stream
|
||||
* along different boundaries (e.g. on new line characters for {@code String})
|
||||
* or always reduce to a single data buffer (e.g. {@code Resource}).
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 5.0
|
||||
*/
|
||||
public abstract class AbstractDataBufferDecoder<T> extends AbstractDecoder<T> {
|
||||
|
||||
|
||||
protected AbstractDataBufferDecoder(MimeType... supportedMimeTypes) {
|
||||
super(supportedMimeTypes);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Flux<T> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
|
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
return Flux.from(inputStream).map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<T> decodeToMono(Publisher<DataBuffer> inputStream, ResolvableType elementType,
|
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
return Flux.from(inputStream)
|
||||
.reduce(DataBuffer::write)
|
||||
.map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints));
|
||||
}
|
||||
|
||||
/**
|
||||
* How to decode a {@code DataBuffer} to the target element type.
|
||||
*/
|
||||
protected abstract T decodeDataBuffer(DataBuffer buffer, ResolvableType elementType,
|
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints);
|
||||
|
||||
}
|
||||
|
|
@ -18,9 +18,6 @@ package org.springframework.core.codec;
|
|||
|
||||
import java.util.Map;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
|
|
@ -32,9 +29,11 @@ import org.springframework.util.MimeTypeUtils;
|
|||
* Decoder for {@code byte} arrays.
|
||||
*
|
||||
* @author Arjen Poutsma
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 5.0
|
||||
*/
|
||||
public class ByteArrayDecoder extends AbstractDecoder<byte[]> {
|
||||
public class ByteArrayDecoder extends AbstractDataBufferDecoder<byte[]> {
|
||||
|
||||
|
||||
public ByteArrayDecoder() {
|
||||
super(MimeTypeUtils.ALL);
|
||||
|
|
@ -48,16 +47,13 @@ public class ByteArrayDecoder extends AbstractDecoder<byte[]> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Flux<byte[]> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
|
||||
protected byte[] decodeDataBuffer(DataBuffer dataBuffer, ResolvableType elementType,
|
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
return Flux.from(inputStream).map((dataBuffer) -> {
|
||||
byte[] result = new byte[dataBuffer.readableByteCount()];
|
||||
dataBuffer.read(result);
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
return result ;
|
||||
});
|
||||
byte[] result = new byte[dataBuffer.readableByteCount()];
|
||||
dataBuffer.read(result);
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,9 +19,6 @@ package org.springframework.core.codec;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
|
|
@ -34,9 +31,11 @@ import org.springframework.util.MimeTypeUtils;
|
|||
*
|
||||
* @author Sebastien Deleuze
|
||||
* @author Arjen Poutsma
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 5.0
|
||||
*/
|
||||
public class ByteBufferDecoder extends AbstractDecoder<ByteBuffer> {
|
||||
public class ByteBufferDecoder extends AbstractDataBufferDecoder<ByteBuffer> {
|
||||
|
||||
|
||||
public ByteBufferDecoder() {
|
||||
super(MimeTypeUtils.ALL);
|
||||
|
|
@ -50,16 +49,14 @@ public class ByteBufferDecoder extends AbstractDecoder<ByteBuffer> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Flux<ByteBuffer> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
|
||||
protected ByteBuffer decodeDataBuffer(DataBuffer dataBuffer, ResolvableType elementType,
|
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
return Flux.from(inputStream).map((dataBuffer) -> {
|
||||
ByteBuffer copy = ByteBuffer.allocate(dataBuffer.readableByteCount());
|
||||
copy.put(dataBuffer.asByteBuffer());
|
||||
copy.flip();
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
return copy;
|
||||
});
|
||||
ByteBuffer copy = ByteBuffer.allocate(dataBuffer.readableByteCount());
|
||||
copy.put(dataBuffer.asByteBuffer());
|
||||
copy.flip();
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
return copy;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,9 +34,11 @@ import org.springframework.util.MimeTypeUtils;
|
|||
* {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer)}.
|
||||
*
|
||||
* @author Arjen Poutsma
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 5.0
|
||||
*/
|
||||
public class DataBufferDecoder extends AbstractDecoder<DataBuffer> {
|
||||
public class DataBufferDecoder extends AbstractDataBufferDecoder<DataBuffer> {
|
||||
|
||||
|
||||
public DataBufferDecoder() {
|
||||
super(MimeTypeUtils.ALL);
|
||||
|
|
@ -56,4 +58,11 @@ public class DataBufferDecoder extends AbstractDecoder<DataBuffer> {
|
|||
return Flux.from(inputStream);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DataBuffer decodeDataBuffer(DataBuffer buffer, ResolvableType elementType,
|
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,7 +21,6 @@ import java.util.Map;
|
|||
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.core.io.ByteArrayResource;
|
||||
|
|
@ -38,9 +37,11 @@ import org.springframework.util.MimeTypeUtils;
|
|||
* Decoder for {@link Resource}s.
|
||||
*
|
||||
* @author Arjen Poutsma
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 5.0
|
||||
*/
|
||||
public class ResourceDecoder extends AbstractDecoder<Resource> {
|
||||
public class ResourceDecoder extends AbstractDataBufferDecoder<Resource> {
|
||||
|
||||
|
||||
public ResourceDecoder() {
|
||||
super(MimeTypeUtils.ALL);
|
||||
|
|
@ -63,30 +64,24 @@ public class ResourceDecoder extends AbstractDecoder<Resource> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Mono<Resource> decodeToMono(Publisher<DataBuffer> inputStream, ResolvableType elementType,
|
||||
protected Resource decodeDataBuffer(DataBuffer dataBuffer, ResolvableType elementType,
|
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
byte[] bytes = new byte[dataBuffer.readableByteCount()];
|
||||
dataBuffer.read(bytes);
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
|
||||
Class<?> clazz = elementType.getRawClass();
|
||||
Assert.state(clazz != null, "No resource class");
|
||||
|
||||
Mono<byte[]> byteArray = Flux.from(inputStream).
|
||||
reduce(DataBuffer::write).
|
||||
map(dataBuffer -> {
|
||||
byte[] bytes = new byte[dataBuffer.readableByteCount()];
|
||||
dataBuffer.read(bytes);
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
return bytes;
|
||||
});
|
||||
|
||||
|
||||
if (InputStreamResource.class == clazz) {
|
||||
return Mono.from(byteArray.map(ByteArrayInputStream::new).map(InputStreamResource::new));
|
||||
return new InputStreamResource(new ByteArrayInputStream(bytes));
|
||||
}
|
||||
else if (clazz.isAssignableFrom(ByteArrayResource.class)) {
|
||||
return Mono.from(byteArray.map(ByteArrayResource::new));
|
||||
return new ByteArrayResource(bytes);
|
||||
}
|
||||
else {
|
||||
return Mono.error(new IllegalStateException("Unsupported resource class: " + clazz));
|
||||
throw new IllegalStateException("Unsupported resource class: " + clazz);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -26,7 +26,6 @@ import java.util.function.IntPredicate;
|
|||
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
|
|
@ -49,7 +48,7 @@ import org.springframework.util.MimeTypeUtils;
|
|||
* @since 5.0
|
||||
* @see CharSequenceEncoder
|
||||
*/
|
||||
public class StringDecoder extends AbstractDecoder<String> {
|
||||
public class StringDecoder extends AbstractDataBufferDecoder<String> {
|
||||
|
||||
public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
|
||||
|
||||
|
|
@ -84,16 +83,7 @@ public class StringDecoder extends AbstractDecoder<String> {
|
|||
if (this.splitOnNewline) {
|
||||
inputFlux = Flux.from(inputStream).flatMap(StringDecoder::splitOnNewline);
|
||||
}
|
||||
return inputFlux.map(buffer -> decodeDataBuffer(buffer, mimeType));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<String> decodeToMono(Publisher<DataBuffer> inputStream, ResolvableType elementType,
|
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
return Flux.from(inputStream)
|
||||
.reduce(DataBuffer::write)
|
||||
.map(buffer -> decodeDataBuffer(buffer, mimeType));
|
||||
return super.decode(inputFlux, elementType, mimeType, hints);
|
||||
}
|
||||
|
||||
private static Flux<DataBuffer> splitOnNewline(DataBuffer dataBuffer) {
|
||||
|
|
@ -113,7 +103,10 @@ public class StringDecoder extends AbstractDecoder<String> {
|
|||
return Flux.fromIterable(results);
|
||||
}
|
||||
|
||||
private String decodeDataBuffer(DataBuffer dataBuffer, @Nullable MimeType mimeType) {
|
||||
@Override
|
||||
protected String decodeDataBuffer(DataBuffer dataBuffer, ResolvableType elementType,
|
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
Charset charset = getCharset(mimeType);
|
||||
CharBuffer charBuffer = charset.decode(dataBuffer.asByteBuffer());
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -21,6 +21,7 @@ import java.util.Collections;
|
|||
import org.junit.Test;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
import org.springframework.core.ResolvableType;
|
||||
|
|
@ -39,6 +40,7 @@ public class ByteArrayDecoderTests extends AbstractDataBufferAllocatingTestCase
|
|||
|
||||
private final ByteArrayDecoder decoder = new ByteArrayDecoder();
|
||||
|
||||
|
||||
@Test
|
||||
public void canDecode() {
|
||||
assertTrue(this.decoder.canDecode(ResolvableType.forClass(byte[].class),
|
||||
|
|
@ -65,4 +67,19 @@ public class ByteArrayDecoderTests extends AbstractDataBufferAllocatingTestCase
|
|||
.verify();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void decodeToMono() {
|
||||
DataBuffer fooBuffer = stringBuffer("foo");
|
||||
DataBuffer barBuffer = stringBuffer("bar");
|
||||
Flux<DataBuffer> source = Flux.just(fooBuffer, barBuffer);
|
||||
Mono<byte[]> output = this.decoder.decodeToMono(source,
|
||||
ResolvableType.forClassWithGenerics(Publisher.class, byte[].class),
|
||||
null, Collections.emptyMap());
|
||||
|
||||
StepVerifier.create(output)
|
||||
.consumeNextWith(bytes -> assertArrayEquals("foobar".getBytes(), bytes))
|
||||
.expectComplete()
|
||||
.verify();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -22,6 +22,7 @@ import java.util.Collections;
|
|||
import org.junit.Test;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
import org.springframework.core.ResolvableType;
|
||||
|
|
@ -63,4 +64,19 @@ public class ByteBufferDecoderTests extends AbstractDataBufferAllocatingTestCase
|
|||
.expectComplete()
|
||||
.verify();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void decodeToMono() {
|
||||
DataBuffer fooBuffer = stringBuffer("foo");
|
||||
DataBuffer barBuffer = stringBuffer("bar");
|
||||
Flux<DataBuffer> source = Flux.just(fooBuffer, barBuffer);
|
||||
Mono<ByteBuffer> output = this.decoder.decodeToMono(source,
|
||||
ResolvableType.forClassWithGenerics(Publisher.class, ByteBuffer.class),
|
||||
null, Collections.emptyMap());
|
||||
|
||||
StepVerifier.create(output)
|
||||
.expectNext(ByteBuffer.wrap("foobar".getBytes()))
|
||||
.expectComplete()
|
||||
.verify();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,15 +17,19 @@
|
|||
package org.springframework.core.codec;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.support.DataBufferTestUtils;
|
||||
import org.springframework.util.MimeTypeUtils;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
|
@ -53,11 +57,25 @@ public class DataBufferDecoderTests extends AbstractDataBufferAllocatingTestCase
|
|||
DataBuffer barBuffer = stringBuffer("bar");
|
||||
Flux<DataBuffer> source = Flux.just(fooBuffer, barBuffer);
|
||||
Flux<DataBuffer> output = this.decoder.decode(source,
|
||||
ResolvableType.forClassWithGenerics(Publisher.class, ByteBuffer.class),
|
||||
ResolvableType.forClassWithGenerics(Publisher.class, DataBuffer.class),
|
||||
null, Collections.emptyMap());
|
||||
|
||||
assertSame(source, output);
|
||||
|
||||
release(fooBuffer, barBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void decodeToMono() {
|
||||
DataBuffer fooBuffer = stringBuffer("foo");
|
||||
DataBuffer barBuffer = stringBuffer("bar");
|
||||
Flux<DataBuffer> source = Flux.just(fooBuffer, barBuffer);
|
||||
Mono<DataBuffer> output = this.decoder.decodeToMono(source,
|
||||
ResolvableType.forClassWithGenerics(Publisher.class, DataBuffer.class),
|
||||
null, Collections.emptyMap());
|
||||
|
||||
DataBuffer outputBuffer = output.block(Duration.ofSeconds(5));
|
||||
assertEquals("foobar", DataBufferTestUtils.dumpString(outputBuffer, StandardCharsets.UTF_8));
|
||||
|
||||
release(outputBuffer);
|
||||
}}
|
||||
|
|
|
|||
Loading…
Reference in New Issue