Use Mono semantics for JSON object/array serialization

Before this commit, a handler method returning a stream with a JSON
content-type was producing a JSON object for single element streams
or a JSON array for multiple elements streams.

This kind of dynamic change of the output based on the number of
elements was difficult to handle on client side and not consistent
with Spring MVC behavior.

With this commit, we achieve a more consistent behavior by using
the Mono semantics to control this behavior. Mono (and Promise/Single)
are serialized to JSON object and Flux (and Observable/Stream) are
serialized to JSON array.
This commit is contained in:
Sebastien Deleuze 2016-01-12 11:31:18 +01:00
parent c3cde84e6b
commit d9b67f5e72
3 changed files with 68 additions and 53 deletions

View File

@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.Flux; import reactor.Flux;
import reactor.Mono;
import reactor.io.buffer.Buffer; import reactor.io.buffer.Buffer;
import org.springframework.core.ResolvableType; import org.springframework.core.ResolvableType;
@ -64,22 +65,23 @@ public class JacksonJsonEncoder extends AbstractEncoder<Object> {
public Flux<ByteBuffer> encode(Publisher<? extends Object> inputStream, public Flux<ByteBuffer> encode(Publisher<? extends Object> inputStream,
ResolvableType type, MimeType mimeType, Object... hints) { ResolvableType type, MimeType mimeType, Object... hints) {
Flux<ByteBuffer> stream = Flux.from(inputStream).map(value -> { Publisher<ByteBuffer> stream = (inputStream instanceof Mono ?
Buffer buffer = new Buffer(); ((Mono<?>)inputStream).map(this::serialize) :
BufferOutputStream outputStream = new BufferOutputStream(buffer); Flux.from(inputStream).map(this::serialize));
try { return (this.postProcessor == null ? Flux.from(stream) : this.postProcessor.encode(stream, type, mimeType, hints));
this.mapper.writeValue(outputStream, value); }
}
catch (IOException e) { private ByteBuffer serialize(Object value) {
throw new CodecException("Error while writing the data", e); Buffer buffer = new Buffer();
} BufferOutputStream outputStream = new BufferOutputStream(buffer);
buffer.flip(); try {
return buffer.byteBuffer(); this.mapper.writeValue(outputStream, value);
}); }
if (this.postProcessor != null) { catch (IOException e) {
stream = this.postProcessor.encode(stream, type, mimeType, hints); throw new CodecException("Error while writing the data", e);
}; }
return stream; buffer.flip();
return buffer.byteBuffer();
} }
} }

View File

@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscriber;
import reactor.Flux; import reactor.Flux;
import reactor.Mono;
import reactor.core.subscriber.SubscriberBarrier; import reactor.core.subscriber.SubscriberBarrier;
import reactor.core.support.BackpressureUtils; import reactor.core.support.BackpressureUtils;
import reactor.io.buffer.Buffer; import reactor.io.buffer.Buffer;
@ -32,8 +33,9 @@ import org.springframework.core.ResolvableType;
import org.springframework.util.MimeType; import org.springframework.util.MimeType;
/** /**
* Encode a byte stream of individual JSON element to a byte stream representing * Encode a byte stream of individual JSON element to a byte stream representing:
* a single JSON array when if it contains more than one element. * - the same JSON object than the input stream if it is a {@link Mono}
* - a JSON array for other kinds of {@link Publisher}
* *
* @author Sebastien Deleuze * @author Sebastien Deleuze
* @author Stephane Maldini * @author Stephane Maldini
@ -48,22 +50,24 @@ public class JsonObjectEncoder extends AbstractEncoder<ByteBuffer> {
} }
@Override @Override
public Flux<ByteBuffer> encode(Publisher<? extends ByteBuffer> messageStream, public Flux<ByteBuffer> encode(Publisher<? extends ByteBuffer> inputStream,
ResolvableType type, MimeType mimeType, Object... hints) { ResolvableType type, MimeType mimeType, Object... hints) {
//noinspection Convert2MethodRef if (inputStream instanceof Mono) {
return Flux.from(messageStream).lift(bbs -> new JsonEncoderBarrier(bbs)); return Flux.from(inputStream);
}
return Flux.from(inputStream).lift(s -> new JsonArrayEncoderBarrier(s));
} }
private static class JsonEncoderBarrier extends SubscriberBarrier<ByteBuffer, ByteBuffer> { private static class JsonArrayEncoderBarrier extends SubscriberBarrier<ByteBuffer, ByteBuffer> {
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<JsonEncoderBarrier> REQUESTED = static final AtomicLongFieldUpdater<JsonArrayEncoderBarrier> REQUESTED =
AtomicLongFieldUpdater.newUpdater(JsonEncoderBarrier.class, "requested"); AtomicLongFieldUpdater.newUpdater(JsonArrayEncoderBarrier.class, "requested");
static final AtomicIntegerFieldUpdater<JsonEncoderBarrier> TERMINATED = static final AtomicIntegerFieldUpdater<JsonArrayEncoderBarrier> TERMINATED =
AtomicIntegerFieldUpdater.newUpdater(JsonEncoderBarrier.class, "terminated"); AtomicIntegerFieldUpdater.newUpdater(JsonArrayEncoderBarrier.class, "terminated");
private ByteBuffer prev = null; private ByteBuffer prev = null;
@ -75,7 +79,7 @@ public class JsonObjectEncoder extends AbstractEncoder<ByteBuffer> {
private volatile int terminated; private volatile int terminated;
public JsonEncoderBarrier(Subscriber<? super ByteBuffer> subscriber) { public JsonArrayEncoderBarrier(Subscriber<? super ByteBuffer> subscriber) {
super(subscriber); super(subscriber);
} }
@ -94,20 +98,19 @@ public class JsonObjectEncoder extends AbstractEncoder<ByteBuffer> {
@Override @Override
protected void doNext(ByteBuffer next) { protected void doNext(ByteBuffer next) {
this.count++; this.count++;
if (this.count == 1) {
this.prev = next;
super.doRequest(1);
return;
}
ByteBuffer tmp = this.prev; ByteBuffer tmp = this.prev;
this.prev = next; this.prev = next;
Buffer buffer = new Buffer(); Buffer buffer = new Buffer();
if (this.count == 2) { if (this.count == 1) {
buffer.append("["); buffer.append("[");
} }
buffer.append(tmp); if (tmp != null) {
buffer.append(","); buffer.append(tmp);
}
if (this.count > 1) {
buffer.append(",");
}
buffer.flip(); buffer.flip();
BackpressureUtils.getAndSub(REQUESTED, this, 1L); BackpressureUtils.getAndSub(REQUESTED, this, 1L);
@ -118,9 +121,7 @@ public class JsonObjectEncoder extends AbstractEncoder<ByteBuffer> {
if(BackpressureUtils.getAndSub(REQUESTED, this, 1L) > 0) { if(BackpressureUtils.getAndSub(REQUESTED, this, 1L) > 0) {
Buffer buffer = new Buffer(); Buffer buffer = new Buffer();
buffer.append(this.prev); buffer.append(this.prev);
if (this.count > 1) { buffer.append("]");
buffer.append("]");
}
buffer.flip(); buffer.flip();
subscriber.onNext(buffer.byteBuffer()); subscriber.onNext(buffer.byteBuffer());
super.doComplete(); super.doComplete();

View File

@ -18,13 +18,12 @@ package org.springframework.reactive.codec.encoder;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.List;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import org.junit.Test; import org.junit.Test;
import reactor.Flux;
import reactor.Mono;
import reactor.io.buffer.Buffer; import reactor.io.buffer.Buffer;
import reactor.rx.Stream;
import reactor.rx.Streams;
import org.springframework.core.codec.support.JsonObjectEncoder; import org.springframework.core.codec.support.JsonObjectEncoder;
@ -34,46 +33,59 @@ import org.springframework.core.codec.support.JsonObjectEncoder;
public class JsonObjectEncoderTests { public class JsonObjectEncoderTests {
@Test @Test
public void encodeSingleElement() throws InterruptedException { public void encodeSingleElementFlux() throws InterruptedException {
JsonObjectEncoder encoder = new JsonObjectEncoder(); JsonObjectEncoder encoder = new JsonObjectEncoder();
Stream<ByteBuffer> source = Streams.just(Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer()); Flux<ByteBuffer> source = Flux.just(Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer());
List<String> results = Streams.from(encoder.encode(source, null, null)).map(chunk -> { Iterable<String> results = Flux.from(encoder.encode(source, null, null)).map(chunk -> {
byte[] b = new byte[chunk.remaining()]; byte[] b = new byte[chunk.remaining()];
chunk.get(b); chunk.get(b);
return new String(b, StandardCharsets.UTF_8); return new String(b, StandardCharsets.UTF_8);
}).toList().get(); }).toIterable();
String result = String.join("", results);
assertEquals("[{\"foo\": \"foofoo\", \"bar\": \"barbar\"}]", result);
}
@Test
public void encodeSingleElementMono() throws InterruptedException {
JsonObjectEncoder encoder = new JsonObjectEncoder();
Mono<ByteBuffer> source = Mono.just(Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer());
Iterable<String> results = Flux.from(encoder.encode(source, null, null)).map(chunk -> {
byte[] b = new byte[chunk.remaining()];
chunk.get(b);
return new String(b, StandardCharsets.UTF_8);
}).toIterable();
String result = String.join("", results); String result = String.join("", results);
assertEquals("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}", result); assertEquals("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}", result);
} }
@Test @Test
public void encodeTwoElements() throws InterruptedException { public void encodeTwoElementsFlux() throws InterruptedException {
JsonObjectEncoder encoder = new JsonObjectEncoder(); JsonObjectEncoder encoder = new JsonObjectEncoder();
Stream<ByteBuffer> source = Streams.just( Flux<ByteBuffer> source = Flux.just(
Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer(), Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer(),
Buffer.wrap("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}").byteBuffer()); Buffer.wrap("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}").byteBuffer());
List<String> results = Streams.from(encoder.encode(source, null, null)).map(chunk -> { Iterable<String> results = Flux.from(encoder.encode(source, null, null)).map(chunk -> {
byte[] b = new byte[chunk.remaining()]; byte[] b = new byte[chunk.remaining()];
chunk.get(b); chunk.get(b);
return new String(b, StandardCharsets.UTF_8); return new String(b, StandardCharsets.UTF_8);
}).toList().get(); }).toIterable();
String result = String.join("", results); String result = String.join("", results);
assertEquals("[{\"foo\": \"foofoo\", \"bar\": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]", result); assertEquals("[{\"foo\": \"foofoo\", \"bar\": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]", result);
} }
@Test @Test
public void encodeThreeElements() throws InterruptedException { public void encodeThreeElementsFlux() throws InterruptedException {
JsonObjectEncoder encoder = new JsonObjectEncoder(); JsonObjectEncoder encoder = new JsonObjectEncoder();
Stream<ByteBuffer> source = Streams.just( Flux<ByteBuffer> source = Flux.just(
Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer(), Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer(),
Buffer.wrap("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}").byteBuffer(), Buffer.wrap("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}").byteBuffer(),
Buffer.wrap("{\"foo\": \"foofoofoofoo\", \"bar\": \"barbarbarbar\"}").byteBuffer() Buffer.wrap("{\"foo\": \"foofoofoofoo\", \"bar\": \"barbarbarbar\"}").byteBuffer()
); );
List<String> results = Streams.from(encoder.encode(source, null, null)).map(chunk -> { Iterable<String> results = Flux.from(encoder.encode(source, null, null)).map(chunk -> {
byte[] b = new byte[chunk.remaining()]; byte[] b = new byte[chunk.remaining()];
chunk.get(b); chunk.get(b);
return new String(b, StandardCharsets.UTF_8); return new String(b, StandardCharsets.UTF_8);
}).toList().get(); }).toIterable();
String result = String.join("", results); String result = String.join("", results);
assertEquals("[{\"foo\": \"foofoo\", \"bar\": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"},{\"foo\": \"foofoofoofoo\", \"bar\": \"barbarbarbar\"}]", result); assertEquals("[{\"foo\": \"foofoo\", \"bar\": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"},{\"foo\": \"foofoofoofoo\", \"bar\": \"barbarbarbar\"}]", result);
} }