diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JacksonJsonEncoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JacksonJsonEncoder.java index fe86fe85c6..21293e72d3 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JacksonJsonEncoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JacksonJsonEncoder.java @@ -27,54 +27,59 @@ import reactor.core.publisher.Mono; import org.springframework.core.ResolvableType; import org.springframework.core.codec.CodecException; -import org.springframework.core.codec.Encoder; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferAllocator; -import org.springframework.core.io.buffer.DefaultDataBufferAllocator; +import org.springframework.util.Assert; import org.springframework.util.MimeType; /** * Encode from an {@code Object} stream to a byte stream of JSON objects. * * @author Sebastien Deleuze + * @author Arjen Poutsma * @see JacksonJsonDecoder */ public class JacksonJsonEncoder extends AbstractEncoder { private final ObjectMapper mapper; - private Encoder postProcessor; - public JacksonJsonEncoder() { - this(new ObjectMapper(), null); + this(new ObjectMapper()); } - public JacksonJsonEncoder(Encoder postProcessor) { - this(new ObjectMapper(), postProcessor); - } - - public JacksonJsonEncoder(ObjectMapper mapper, - Encoder postProcessor) { + public JacksonJsonEncoder(ObjectMapper mapper) { super(new MimeType("application", "json", StandardCharsets.UTF_8), new MimeType("application", "*+json", StandardCharsets.UTF_8)); + Assert.notNull(mapper, "'mapper' must not be null"); + this.mapper = mapper; - this.postProcessor = postProcessor; } @Override public Flux encode(Publisher inputStream, DataBufferAllocator allocator, ResolvableType type, MimeType mimeType, Object... hints) { + if (inputStream instanceof Mono) { + // single object + return Flux.from(inputStream).map(value -> serialize(value, allocator)); + } + else { + // array + Mono startArray = Mono.just(charBuffer('[', allocator)); + Flux arraySeparators = + Flux.create(sub -> sub.onNext(charBuffer(',', allocator))); + Mono endArray = Mono.just(charBuffer(']', allocator)); - Publisher stream = (inputStream instanceof Mono ? - ((Mono) inputStream).map(value -> serialize(value, allocator)) : - Flux.from(inputStream).map(value -> serialize(value, allocator))); - // TODO: figure out why using the parameter allocator for the postprocessor - // commits the response too early - DefaultDataBufferAllocator tempAllocator = new DefaultDataBufferAllocator(); + Flux serializedObjects = + Flux.from(inputStream).map(value -> serialize(value, allocator)); - return (this.postProcessor == null ? Flux.from(stream) : - this.postProcessor.encode(stream, tempAllocator, type, mimeType, hints)); + Flux array = Flux.zip(serializedObjects, arraySeparators) + .flatMap(tuple -> Flux.just(tuple.getT1(), tuple.getT2())); + + Flux arrayWithoutLastSeparator = Flux.from(array).skipLast(1); + + return Flux.concat(startArray, arrayWithoutLastSeparator, endArray); + } } private DataBuffer serialize(Object value, DataBufferAllocator allocator) { @@ -89,4 +94,11 @@ public class JacksonJsonEncoder extends AbstractEncoder { return buffer; } + private DataBuffer charBuffer(char ch, DataBufferAllocator allocator) { + DataBuffer buffer = allocator.allocateBuffer(1); + buffer.write((byte) ch); + return buffer; + } + + } diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JsonObjectDecoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JsonObjectDecoder.java index 4c5a8c32be..42d204916a 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JsonObjectDecoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JsonObjectDecoder.java @@ -43,7 +43,6 @@ import org.springframework.util.MimeType; * Based on Netty JsonObjectDecoder * * @author Sebastien Deleuze - * @see JsonObjectEncoder */ public class JsonObjectDecoder extends AbstractDecoder { @@ -74,7 +73,6 @@ public class JsonObjectDecoder extends AbstractDecoder { /** - * @param allocator * @param maxObjectLength maximum number of bytes a JSON object/array may * use (including braces and all). Objects exceeding this length are dropped * and an {@link IllegalStateException} is thrown. diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JsonObjectEncoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JsonObjectEncoder.java deleted file mode 100644 index 804856bb1e..0000000000 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JsonObjectEncoder.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Copyright 2002-2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.core.codec.support; - -import java.nio.charset.StandardCharsets; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; - -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.subscriber.SubscriberBarrier; -import reactor.core.util.BackpressureUtils; - -import org.springframework.core.ResolvableType; -import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferAllocator; -import org.springframework.util.MimeType; - -/** - * Encode a byte stream of individual JSON element to a byte stream representing: - * - the same JSON object than the input stream if it is a {@link Mono} - * - a JSON array for other kinds of {@link Publisher} - * - * @author Sebastien Deleuze - * @author Stephane Maldini - * - * @see JsonObjectDecoder - */ -public class JsonObjectEncoder extends AbstractEncoder { - - public JsonObjectEncoder() { - super(new MimeType("application", "json", StandardCharsets.UTF_8), - new MimeType("application", "*+json", StandardCharsets.UTF_8)); - } - - @Override - public Flux encode(Publisher inputStream, - DataBufferAllocator allocator, - ResolvableType type, MimeType mimeType, Object... hints) { - if (inputStream instanceof Mono) { - return Flux.from(inputStream); - } - return Flux.from(inputStream) - .lift(s -> new JsonArrayEncoderBarrier(s, allocator)); - } - - private static class JsonArrayEncoderBarrier - extends SubscriberBarrier { - - @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater REQUESTED = - AtomicLongFieldUpdater.newUpdater(JsonArrayEncoderBarrier.class, "requested"); - - static final AtomicIntegerFieldUpdater TERMINATED = - AtomicIntegerFieldUpdater.newUpdater(JsonArrayEncoderBarrier.class, "terminated"); - - private final DataBufferAllocator allocator; - - private DataBuffer prev = null; - - private long count = 0; - - private volatile long requested; - - private volatile int terminated; - - public JsonArrayEncoderBarrier(Subscriber subscriber, - DataBufferAllocator allocator) { - super(subscriber); - this.allocator = allocator; - } - - - @Override - protected void doRequest(long n) { - BackpressureUtils.getAndAdd(REQUESTED, this, n); - if(TERMINATED.compareAndSet(this, 1, 2)){ - drainLast(); - } - else { - super.doRequest(n); - } - } - - @Override - protected void doNext(DataBuffer next) { - this.count++; - - DataBuffer tmp = this.prev; - this.prev = next; - DataBuffer buffer = allocator.allocateBuffer(); - if (this.count == 1) { - buffer.write((byte) '['); - } - if (tmp != null) { - buffer.write(tmp); - } - if (this.count > 1) { - buffer.write((byte) ','); - } - - BackpressureUtils.getAndSub(REQUESTED, this, 1L); - subscriber.onNext(buffer); - } - - protected void drainLast(){ - if(BackpressureUtils.getAndSub(REQUESTED, this, 1L) > 0) { - DataBuffer buffer = allocator.allocateBuffer(); - buffer.write(this.prev); - buffer.write((byte) ']'); - subscriber.onNext(buffer); - super.doComplete(); - } - } - - @Override - protected void doComplete() { - if(TERMINATED.compareAndSet(this, 0, 1)) { - drainLast(); - } - } - } - -} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/DefaultHttpRequestBuilder.java b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/DefaultHttpRequestBuilder.java index c6105e2582..c6e19a414c 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/DefaultHttpRequestBuilder.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/DefaultHttpRequestBuilder.java @@ -27,6 +27,7 @@ import java.util.stream.Collectors; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import org.springframework.core.ResolvableType; import org.springframework.core.codec.Encoder; @@ -55,7 +56,7 @@ public class DefaultHttpRequestBuilder implements HttpRequestBuilder { protected URI url; - protected Flux contentPublisher; + protected Publisher contentPublisher; protected List> messageEncoders; @@ -127,7 +128,7 @@ public class DefaultHttpRequestBuilder implements HttpRequestBuilder { } public DefaultHttpRequestBuilder content(Object content) { - this.contentPublisher = Flux.just(content); + this.contentPublisher = Mono.just(content); return this; } diff --git a/spring-web-reactive/src/test/java/org/springframework/core/codec/support/JacksonJsonEncoderTests.java b/spring-web-reactive/src/test/java/org/springframework/core/codec/support/JacksonJsonEncoderTests.java index 53383547f4..0ea1c99f68 100644 --- a/spring-web-reactive/src/test/java/org/springframework/core/codec/support/JacksonJsonEncoderTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/core/codec/support/JacksonJsonEncoderTests.java @@ -56,8 +56,8 @@ public class JacksonJsonEncoderTests extends AbstractAllocatingTestCase { }); TestSubscriber testSubscriber = new TestSubscriber<>(); testSubscriber.bindTo(output) - .assertValues("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}", - "{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}"); + .assertValues("[", "{\"foo\":\"foofoo\",\"bar\":\"barbar\"}", ",", + "{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}", "]"); } } diff --git a/spring-web-reactive/src/test/java/org/springframework/core/codec/support/JsonObjectEncoderTests.java b/spring-web-reactive/src/test/java/org/springframework/core/codec/support/JsonObjectEncoderTests.java deleted file mode 100644 index 0d4c9f1add..0000000000 --- a/spring-web-reactive/src/test/java/org/springframework/core/codec/support/JsonObjectEncoderTests.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright 2002-2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.core.codec.support; - -import java.nio.charset.StandardCharsets; - -import org.junit.Before; -import org.junit.Test; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.test.TestSubscriber; - -import org.springframework.core.io.buffer.DataBuffer; - -/** - * @author Sebastien Deleuze - */ -public class JsonObjectEncoderTests extends AbstractAllocatingTestCase { - - private JsonObjectEncoder encoder; - - @Before - public void createEncoder() { - encoder = new JsonObjectEncoder(); - } - - @Test - public void encodeSingleElementFlux() throws InterruptedException { - Flux source = - Flux.just(stringBuffer("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}")); - Flux output = - Flux.from(encoder.encode(source, allocator, null, null)).map(chunk -> { - byte[] b = new byte[chunk.readableByteCount()]; - chunk.read(b); - return new String(b, StandardCharsets.UTF_8); - }); - TestSubscriber testSubscriber = new TestSubscriber<>(); - testSubscriber.bindTo(output) - .assertValues("[", "{\"foo\": \"foofoo\", \"bar\": \"barbar\"}]"); - } - - @Test - public void encodeSingleElementMono() throws InterruptedException { - Mono source = - Mono.just(stringBuffer("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}")); - Flux output = - Flux.from(encoder.encode(source, allocator, null, null)).map(chunk -> { - byte[] b = new byte[chunk.readableByteCount()]; - chunk.read(b); - return new String(b, StandardCharsets.UTF_8); - }); - TestSubscriber testSubscriber = new TestSubscriber<>(); - testSubscriber.bindTo(output) - .assertValues("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}"); - } - - @Test - public void encodeTwoElementsFlux() throws InterruptedException { - Flux source = - Flux.just(stringBuffer("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}"), - stringBuffer("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}")); - Flux output = - Flux.from(encoder.encode(source, allocator, null, null)).map(chunk -> { - byte[] b = new byte[chunk.readableByteCount()]; - chunk.read(b); - return new String(b, StandardCharsets.UTF_8); - }); - TestSubscriber testSubscriber = new TestSubscriber<>(); - testSubscriber.bindTo(output) - .assertValues("[", - "{\"foo\": \"foofoo\", \"bar\": \"barbar\"},", - "{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]"); - } - - @Test - public void encodeThreeElementsFlux() throws InterruptedException { - Flux source = - Flux.just(stringBuffer("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}"), - stringBuffer("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}"), - stringBuffer("{\"foo\": \"foofoofoofoo\", \"bar\": \"barbarbarbar\"}") - ); - Flux output = - Flux.from(encoder.encode(source, allocator, null, null)).map(chunk -> { - byte[] b = new byte[chunk.readableByteCount()]; - chunk.read(b); - return new String(b, StandardCharsets.UTF_8); - }); - TestSubscriber testSubscriber = new TestSubscriber<>(); - testSubscriber.bindTo(output) - .assertValues("[", - "{\"foo\": \"foofoo\", \"bar\": \"barbar\"},", - "{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"},", - "{\"foo\": \"foofoofoofoo\", \"bar\": \"barbarbarbar\"}]"); - } - -} diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingIntegrationTests.java index 2ecc53a10c..c49861f30a 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingIntegrationTests.java @@ -40,7 +40,6 @@ import org.springframework.core.ResolvableType; import org.springframework.core.codec.Encoder; import org.springframework.core.codec.support.ByteBufferEncoder; import org.springframework.core.codec.support.JacksonJsonEncoder; -import org.springframework.core.codec.support.JsonObjectEncoder; import org.springframework.core.codec.support.StringEncoder; import org.springframework.core.convert.ConversionService; import org.springframework.core.convert.support.GenericConversionService; @@ -380,7 +379,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati @Bean public ResponseBodyResultHandler responseBodyResultHandler() { List> encoders = Arrays.asList(new ByteBufferEncoder(), - new StringEncoder(), new JacksonJsonEncoder(new JsonObjectEncoder())); + new StringEncoder(), new JacksonJsonEncoder()); ResponseBodyResultHandler resultHandler = new ResponseBodyResultHandler(encoders, conversionService()); resultHandler.setOrder(1); return resultHandler;