diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/codec/encoder/JsonObjectEncoder.java b/spring-web-reactive/src/main/java/org/springframework/reactive/codec/encoder/JsonObjectEncoder.java index da7856a571a..d39e530f732 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/codec/encoder/JsonObjectEncoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/codec/encoder/JsonObjectEncoder.java @@ -25,8 +25,12 @@ import org.springframework.reactive.codec.decoder.JsonObjectDecoder; import reactor.core.subscriber.SubscriberBarrier; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import static reactor.Publishers.*; + +import reactor.core.support.BackpressureUtils; import reactor.io.buffer.Buffer; /** @@ -49,24 +53,44 @@ public class JsonObjectEncoder implements MessageToByteEncoder { @Override public Publisher encode(Publisher messageStream, ResolvableType type, MediaType mediaType, Object... hints) { - return lift(messageStream, sub -> new JsonEncoderBarrier(sub)); + return lift(messageStream, bbs -> new JsonEncoderBarrier(bbs)); } private static class JsonEncoderBarrier extends SubscriberBarrier { + private volatile long requested; + @SuppressWarnings("rawtypes") + static final AtomicLongFieldUpdater REQUESTED = + AtomicLongFieldUpdater.newUpdater(JsonEncoderBarrier.class, "requested"); + + private volatile int terminated; + static final AtomicIntegerFieldUpdater TERMINATED = + AtomicIntegerFieldUpdater.newUpdater(JsonEncoderBarrier.class, "terminated"); + + ByteBuffer prev = null; + long count = 0; + public JsonEncoderBarrier(Subscriber subscriber) { super(subscriber); } - ByteBuffer prev = null; - long count = 0; + @Override + protected void doRequest(long n) { + BackpressureUtils.getAndAdd(REQUESTED, this, n); + if(TERMINATED.compareAndSet(this, 1, 2)){ + drainLast(); + } + else { + super.doRequest(n); + } + } @Override protected void doNext(ByteBuffer next) { count++; if (count == 1) { prev = next; - doRequest(1); + super.doRequest(1); return; } @@ -79,19 +103,29 @@ public class JsonObjectEncoder implements MessageToByteEncoder { buffer.append(tmp); buffer.append(","); buffer.flip(); + + BackpressureUtils.getAndSub(REQUESTED, this, 1L); subscriber.onNext(buffer.byteBuffer()); } + protected void drainLast(){ + if(BackpressureUtils.getAndSub(REQUESTED, this, 1L) > 0) { + Buffer buffer = new Buffer(); + buffer.append(prev); + if (count > 1) { + buffer.append("]"); + } + buffer.flip(); + subscriber.onNext(buffer.byteBuffer()); + super.doComplete(); + } + } + @Override protected void doComplete() { - Buffer buffer = new Buffer(); - buffer.append(prev); - if (count > 1) { - buffer.append("]"); + if(TERMINATED.compareAndSet(this, 0, 1)){ + drainLast(); } - buffer.flip(); - subscriber.onNext(buffer.byteBuffer()); - subscriber.onComplete(); } } diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/web/http/AbstractHttpHandlerIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/web/http/AbstractHttpHandlerIntegrationTests.java index 604f8570e60..e03b75180d7 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/web/http/AbstractHttpHandlerIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/web/http/AbstractHttpHandlerIntegrationTests.java @@ -40,9 +40,9 @@ public abstract class AbstractHttpHandlerIntegrationTests { @Parameterized.Parameters(name = "server [{0}]") public static Object[][] arguments() { return new Object[][] { - /*{new JettyHttpServer()}, + {new JettyHttpServer()}, {new RxNettyHttpServer()}, - {new ReactorHttpServer()},*/ + {new ReactorHttpServer()}, {new TomcatHttpServer()} }; }