Fix JSON encoding issue

This commit is contained in:
Stephane Maldini 2015-10-29 01:14:46 +00:00 committed by Sebastien Deleuze
parent fd52ae999b
commit 3864fc24ff
2 changed files with 47 additions and 13 deletions

View File

@ -25,8 +25,12 @@ import org.springframework.reactive.codec.decoder.JsonObjectDecoder;
import reactor.core.subscriber.SubscriberBarrier;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import static reactor.Publishers.*;
import reactor.core.support.BackpressureUtils;
import reactor.io.buffer.Buffer;
/**
@ -49,24 +53,44 @@ public class JsonObjectEncoder implements MessageToByteEncoder<ByteBuffer> {
@Override
public Publisher<ByteBuffer> encode(Publisher<? extends ByteBuffer> messageStream,
ResolvableType type, MediaType mediaType, Object... hints) {
return lift(messageStream, sub -> new JsonEncoderBarrier(sub));
return lift(messageStream, bbs -> new JsonEncoderBarrier(bbs));
}
private static class JsonEncoderBarrier extends SubscriberBarrier<ByteBuffer, ByteBuffer> {
private volatile long requested;
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<JsonEncoderBarrier> REQUESTED =
AtomicLongFieldUpdater.newUpdater(JsonEncoderBarrier.class, "requested");
private volatile int terminated;
static final AtomicIntegerFieldUpdater<JsonEncoderBarrier> TERMINATED =
AtomicIntegerFieldUpdater.newUpdater(JsonEncoderBarrier.class, "terminated");
ByteBuffer prev = null;
long count = 0;
public JsonEncoderBarrier(Subscriber<? super ByteBuffer> subscriber) {
super(subscriber);
}
ByteBuffer prev = null;
long count = 0;
@Override
protected void doRequest(long n) {
BackpressureUtils.getAndAdd(REQUESTED, this, n);
if(TERMINATED.compareAndSet(this, 1, 2)){
drainLast();
}
else {
super.doRequest(n);
}
}
@Override
protected void doNext(ByteBuffer next) {
count++;
if (count == 1) {
prev = next;
doRequest(1);
super.doRequest(1);
return;
}
@ -79,19 +103,29 @@ public class JsonObjectEncoder implements MessageToByteEncoder<ByteBuffer> {
buffer.append(tmp);
buffer.append(",");
buffer.flip();
BackpressureUtils.getAndSub(REQUESTED, this, 1L);
subscriber.onNext(buffer.byteBuffer());
}
protected void drainLast(){
if(BackpressureUtils.getAndSub(REQUESTED, this, 1L) > 0) {
Buffer buffer = new Buffer();
buffer.append(prev);
if (count > 1) {
buffer.append("]");
}
buffer.flip();
subscriber.onNext(buffer.byteBuffer());
super.doComplete();
}
}
@Override
protected void doComplete() {
Buffer buffer = new Buffer();
buffer.append(prev);
if (count > 1) {
buffer.append("]");
if(TERMINATED.compareAndSet(this, 0, 1)){
drainLast();
}
buffer.flip();
subscriber.onNext(buffer.byteBuffer());
subscriber.onComplete();
}
}

View File

@ -40,9 +40,9 @@ public abstract class AbstractHttpHandlerIntegrationTests {
@Parameterized.Parameters(name = "server [{0}]")
public static Object[][] arguments() {
return new Object[][] {
/*{new JettyHttpServer()},
{new JettyHttpServer()},
{new RxNettyHttpServer()},
{new ReactorHttpServer()},*/
{new ReactorHttpServer()},
{new TomcatHttpServer()}
};
}