diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JacksonJsonEncoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JacksonJsonEncoder.java index d1f78ee5d5..bd4426ce39 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JacksonJsonEncoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JacksonJsonEncoder.java @@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets; import com.fasterxml.jackson.databind.ObjectMapper; import org.reactivestreams.Publisher; import reactor.Flux; +import reactor.Mono; import reactor.io.buffer.Buffer; import org.springframework.core.ResolvableType; @@ -64,22 +65,23 @@ public class JacksonJsonEncoder extends AbstractEncoder { public Flux encode(Publisher inputStream, ResolvableType type, MimeType mimeType, Object... hints) { - Flux stream = Flux.from(inputStream).map(value -> { - Buffer buffer = new Buffer(); - BufferOutputStream outputStream = new BufferOutputStream(buffer); - try { - this.mapper.writeValue(outputStream, value); - } - catch (IOException e) { - throw new CodecException("Error while writing the data", e); - } - buffer.flip(); - return buffer.byteBuffer(); - }); - if (this.postProcessor != null) { - stream = this.postProcessor.encode(stream, type, mimeType, hints); - }; - return stream; + Publisher stream = (inputStream instanceof Mono ? + ((Mono)inputStream).map(this::serialize) : + Flux.from(inputStream).map(this::serialize)); + return (this.postProcessor == null ? Flux.from(stream) : this.postProcessor.encode(stream, type, mimeType, hints)); + } + + private ByteBuffer serialize(Object value) { + Buffer buffer = new Buffer(); + BufferOutputStream outputStream = new BufferOutputStream(buffer); + try { + this.mapper.writeValue(outputStream, value); + } + catch (IOException e) { + throw new CodecException("Error while writing the data", e); + } + buffer.flip(); + return buffer.byteBuffer(); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JsonObjectEncoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JsonObjectEncoder.java index d00ef799a9..9b8f09e910 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JsonObjectEncoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JsonObjectEncoder.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import reactor.Flux; +import reactor.Mono; import reactor.core.subscriber.SubscriberBarrier; import reactor.core.support.BackpressureUtils; import reactor.io.buffer.Buffer; @@ -32,8 +33,9 @@ import org.springframework.core.ResolvableType; import org.springframework.util.MimeType; /** - * Encode a byte stream of individual JSON element to a byte stream representing - * a single JSON array when if it contains more than one element. + * Encode a byte stream of individual JSON element to a byte stream representing: + * - the same JSON object than the input stream if it is a {@link Mono} + * - a JSON array for other kinds of {@link Publisher} * * @author Sebastien Deleuze * @author Stephane Maldini @@ -48,22 +50,24 @@ public class JsonObjectEncoder extends AbstractEncoder { } @Override - public Flux encode(Publisher messageStream, + public Flux encode(Publisher inputStream, ResolvableType type, MimeType mimeType, Object... hints) { - //noinspection Convert2MethodRef - return Flux.from(messageStream).lift(bbs -> new JsonEncoderBarrier(bbs)); + if (inputStream instanceof Mono) { + return Flux.from(inputStream); + } + return Flux.from(inputStream).lift(s -> new JsonArrayEncoderBarrier(s)); } - private static class JsonEncoderBarrier extends SubscriberBarrier { + private static class JsonArrayEncoderBarrier extends SubscriberBarrier { @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater REQUESTED = - AtomicLongFieldUpdater.newUpdater(JsonEncoderBarrier.class, "requested"); + static final AtomicLongFieldUpdater REQUESTED = + AtomicLongFieldUpdater.newUpdater(JsonArrayEncoderBarrier.class, "requested"); - static final AtomicIntegerFieldUpdater TERMINATED = - AtomicIntegerFieldUpdater.newUpdater(JsonEncoderBarrier.class, "terminated"); + static final AtomicIntegerFieldUpdater TERMINATED = + AtomicIntegerFieldUpdater.newUpdater(JsonArrayEncoderBarrier.class, "terminated"); private ByteBuffer prev = null; @@ -75,7 +79,7 @@ public class JsonObjectEncoder extends AbstractEncoder { private volatile int terminated; - public JsonEncoderBarrier(Subscriber subscriber) { + public JsonArrayEncoderBarrier(Subscriber subscriber) { super(subscriber); } @@ -94,20 +98,19 @@ public class JsonObjectEncoder extends AbstractEncoder { @Override protected void doNext(ByteBuffer next) { this.count++; - if (this.count == 1) { - this.prev = next; - super.doRequest(1); - return; - } ByteBuffer tmp = this.prev; this.prev = next; Buffer buffer = new Buffer(); - if (this.count == 2) { + if (this.count == 1) { buffer.append("["); } - buffer.append(tmp); - buffer.append(","); + if (tmp != null) { + buffer.append(tmp); + } + if (this.count > 1) { + buffer.append(","); + } buffer.flip(); BackpressureUtils.getAndSub(REQUESTED, this, 1L); @@ -118,9 +121,7 @@ public class JsonObjectEncoder extends AbstractEncoder { if(BackpressureUtils.getAndSub(REQUESTED, this, 1L) > 0) { Buffer buffer = new Buffer(); buffer.append(this.prev); - if (this.count > 1) { - buffer.append("]"); - } + buffer.append("]"); buffer.flip(); subscriber.onNext(buffer.byteBuffer()); super.doComplete(); diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/JsonObjectEncoderTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/JsonObjectEncoderTests.java index a41f62ad98..0d31bfda8a 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/JsonObjectEncoderTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/JsonObjectEncoderTests.java @@ -18,13 +18,12 @@ package org.springframework.reactive.codec.encoder; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.List; import static org.junit.Assert.assertEquals; import org.junit.Test; +import reactor.Flux; +import reactor.Mono; import reactor.io.buffer.Buffer; -import reactor.rx.Stream; -import reactor.rx.Streams; import org.springframework.core.codec.support.JsonObjectEncoder; @@ -34,46 +33,59 @@ import org.springframework.core.codec.support.JsonObjectEncoder; public class JsonObjectEncoderTests { @Test - public void encodeSingleElement() throws InterruptedException { + public void encodeSingleElementFlux() throws InterruptedException { JsonObjectEncoder encoder = new JsonObjectEncoder(); - Stream source = Streams.just(Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer()); - List results = Streams.from(encoder.encode(source, null, null)).map(chunk -> { + Flux source = Flux.just(Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer()); + Iterable results = Flux.from(encoder.encode(source, null, null)).map(chunk -> { byte[] b = new byte[chunk.remaining()]; chunk.get(b); return new String(b, StandardCharsets.UTF_8); - }).toList().get(); + }).toIterable(); + String result = String.join("", results); + assertEquals("[{\"foo\": \"foofoo\", \"bar\": \"barbar\"}]", result); + } + + @Test + public void encodeSingleElementMono() throws InterruptedException { + JsonObjectEncoder encoder = new JsonObjectEncoder(); + Mono source = Mono.just(Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer()); + Iterable results = Flux.from(encoder.encode(source, null, null)).map(chunk -> { + byte[] b = new byte[chunk.remaining()]; + chunk.get(b); + return new String(b, StandardCharsets.UTF_8); + }).toIterable(); String result = String.join("", results); assertEquals("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}", result); } @Test - public void encodeTwoElements() throws InterruptedException { + public void encodeTwoElementsFlux() throws InterruptedException { JsonObjectEncoder encoder = new JsonObjectEncoder(); - Stream source = Streams.just( + Flux source = Flux.just( Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer(), Buffer.wrap("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}").byteBuffer()); - List results = Streams.from(encoder.encode(source, null, null)).map(chunk -> { + Iterable results = Flux.from(encoder.encode(source, null, null)).map(chunk -> { byte[] b = new byte[chunk.remaining()]; chunk.get(b); return new String(b, StandardCharsets.UTF_8); - }).toList().get(); + }).toIterable(); String result = String.join("", results); assertEquals("[{\"foo\": \"foofoo\", \"bar\": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]", result); } @Test - public void encodeThreeElements() throws InterruptedException { + public void encodeThreeElementsFlux() throws InterruptedException { JsonObjectEncoder encoder = new JsonObjectEncoder(); - Stream source = Streams.just( + Flux source = Flux.just( Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer(), Buffer.wrap("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}").byteBuffer(), Buffer.wrap("{\"foo\": \"foofoofoofoo\", \"bar\": \"barbarbarbar\"}").byteBuffer() ); - List results = Streams.from(encoder.encode(source, null, null)).map(chunk -> { + Iterable results = Flux.from(encoder.encode(source, null, null)).map(chunk -> { byte[] b = new byte[chunk.remaining()]; chunk.get(b); return new String(b, StandardCharsets.UTF_8); - }).toList().get(); + }).toIterable(); String result = String.join("", results); assertEquals("[{\"foo\": \"foofoo\", \"bar\": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"},{\"foo\": \"foofoofoofoo\", \"bar\": \"barbarbarbar\"}]", result); }