Updated Encoder and Decoder to use DataBuffer

This commit is contained in:
Arjen Poutsma 2016-01-26 14:45:08 +01:00
parent 38ab47f8a0
commit 2981b5e6e8
26 changed files with 509 additions and 297 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,13 +16,13 @@
package org.springframework.core.codec;
import java.nio.ByteBuffer;
import java.util.List;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.util.MimeType;
/**
@ -43,14 +43,14 @@ public interface Decoder<T> {
boolean canDecode(ResolvableType type, MimeType mimeType, Object... hints);
/**
* Decode an input {@link ByteBuffer} stream to an output stream of {@code T}.
* Decode an input {@link DataBuffer} stream to an output stream of {@code T}.
* @param inputStream the input stream to process.
* @param type the stream element type to process.
* @param mimeType the mime type to process.
* @param hints Additional information about how to do decode, optional.
* @return the output stream
*/
Flux<T> decode(Publisher<ByteBuffer> inputStream, ResolvableType type,
Flux<T> decode(Publisher<DataBuffer> inputStream, ResolvableType type,
MimeType mimeType, Object... hints);
/**

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -23,6 +23,7 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.util.MimeType;
/**
@ -50,7 +51,7 @@ public interface Encoder<T> {
* @param hints Additional information about how to do decode, optional.
* @return the output stream
*/
Flux<ByteBuffer> encode(Publisher<? extends T> inputStream, ResolvableType type,
Flux<DataBuffer> encode(Publisher<? extends T> inputStream, ResolvableType type,
MimeType mimeType, Object... hints);
/**

View File

@ -0,0 +1,42 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.codec.support;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
/**
* @author Arjen Poutsma
*/
public abstract class AbstractAllocatingEncoder<T> extends AbstractEncoder<T> {
private final DataBufferAllocator allocator;
public AbstractAllocatingEncoder(DataBufferAllocator allocator,
MimeType... supportedMimeTypes) {
super(supportedMimeTypes);
Assert.notNull(allocator, "'allocator' must not be null");
this.allocator = allocator;
}
public DataBufferAllocator allocator() {
return allocator;
}
}

View File

@ -16,7 +16,6 @@
package org.springframework.core.codec.support;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@ -28,6 +27,9 @@ import reactor.core.util.BackpressureUtils;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Decoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
/**
@ -38,12 +40,19 @@ import org.springframework.util.MimeType;
*/
public abstract class AbstractRawByteStreamDecoder<T> extends AbstractDecoder<T> {
public AbstractRawByteStreamDecoder(MimeType... supportedMimeTypes) {
private final DataBufferAllocator allocator;
public AbstractRawByteStreamDecoder(DataBufferAllocator allocator,
MimeType... supportedMimeTypes) {
super(supportedMimeTypes);
Assert.notNull(allocator, "'allocator' must not be null");
this.allocator = allocator;
}
@Override
public Flux<T> decode(Publisher<ByteBuffer> inputStream, ResolvableType type, MimeType mimeType, Object... hints) {
public Flux<T> decode(Publisher<DataBuffer> inputStream, ResolvableType type,
MimeType mimeType, Object... hints) {
return decodeInternal(Flux.from(inputStream).lift(bbs -> subscriberBarrier(bbs)),
type, mimeType, hints);
@ -55,17 +64,20 @@ public abstract class AbstractRawByteStreamDecoder<T> extends AbstractDecoder<T>
* <p>Implementations should provide their own {@link SubscriberBarrier} or use one of the
* provided implementations by this class
*/
public abstract SubscriberBarrier<ByteBuffer, ByteBuffer> subscriberBarrier(Subscriber<? super ByteBuffer> subscriber);
public abstract SubscriberBarrier<DataBuffer, DataBuffer> subscriberBarrier(
Subscriber<? super DataBuffer> subscriber);
public abstract Flux<T> decodeInternal(Publisher<ByteBuffer> inputStream, ResolvableType type
public abstract Flux<T> decodeInternal(Publisher<DataBuffer> inputStream,
ResolvableType type
, MimeType mimeType, Object... hints);
/**
* {@code SubscriberBarrier} implementation that buffers all received elements and emits a single
* {@code ByteBuffer} once the incoming stream has been completed
* {@code DataBuffer} once the incoming stream has been completed
*/
public static class ReduceSingleByteStreamBarrier extends SubscriberBarrier<ByteBuffer, ByteBuffer> {
public static class ReduceSingleByteStreamBarrier
extends SubscriberBarrier<DataBuffer, DataBuffer> {
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<ReduceSingleByteStreamBarrier> REQUESTED =
@ -74,16 +86,16 @@ public abstract class AbstractRawByteStreamDecoder<T> extends AbstractDecoder<T>
static final AtomicIntegerFieldUpdater<ReduceSingleByteStreamBarrier> TERMINATED =
AtomicIntegerFieldUpdater.newUpdater(ReduceSingleByteStreamBarrier.class, "terminated");
private volatile long requested;
private volatile int terminated;
private ByteBuffer buffer;
private DataBuffer buffer;
public ReduceSingleByteStreamBarrier(Subscriber<? super ByteBuffer> subscriber) {
public ReduceSingleByteStreamBarrier(Subscriber<? super DataBuffer> subscriber,
DataBufferAllocator allocator) {
super(subscriber);
this.buffer = ByteBuffer.allocate(0);
this.buffer = allocator.allocateBuffer();
}
@Override
@ -108,15 +120,12 @@ public abstract class AbstractRawByteStreamDecoder<T> extends AbstractDecoder<T>
* TODO: when available, wrap buffers with a single buffer and avoid copying data for every method call.
*/
@Override
protected void doNext(ByteBuffer byteBuffer) {
this.buffer = ByteBuffer.allocate(this.buffer.capacity() + byteBuffer.capacity())
.put(this.buffer).put(byteBuffer);
this.buffer.flip();
protected void doNext(DataBuffer dataBuffer) {
this.buffer.write(dataBuffer);
}
protected void drainLast() {
if (BackpressureUtils.getAndSub(REQUESTED, this, 1L) > 0) {
this.buffer.flip();
subscriber.onNext(this.buffer);
super.doComplete();
}
@ -127,7 +136,8 @@ public abstract class AbstractRawByteStreamDecoder<T> extends AbstractDecoder<T>
* {@code SubscriberBarrier} implementation that splits incoming elements
* using line return delimiters: {@code "\n"} and {@code "\r\n"}
*/
public static class SplitLinesByteStreamBarrier extends SubscriberBarrier<ByteBuffer, ByteBuffer> {
public static class SplitLinesByteStreamBarrier
extends SubscriberBarrier<DataBuffer, DataBuffer> {
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<SplitLinesByteStreamBarrier> REQUESTED =
@ -136,16 +146,20 @@ public abstract class AbstractRawByteStreamDecoder<T> extends AbstractDecoder<T>
static final AtomicIntegerFieldUpdater<SplitLinesByteStreamBarrier> TERMINATED =
AtomicIntegerFieldUpdater.newUpdater(SplitLinesByteStreamBarrier.class, "terminated");
private final DataBufferAllocator allocator;
private volatile long requested;
private volatile int terminated;
private ByteBuffer buffer;
private DataBuffer buffer;
public SplitLinesByteStreamBarrier(Subscriber<? super ByteBuffer> subscriber) {
public SplitLinesByteStreamBarrier(Subscriber<? super DataBuffer> subscriber,
DataBufferAllocator allocator) {
super(subscriber);
this.buffer = ByteBuffer.allocate(0);
this.allocator = allocator;
this.buffer = allocator.allocateBuffer();
}
@Override
@ -170,19 +184,20 @@ public abstract class AbstractRawByteStreamDecoder<T> extends AbstractDecoder<T>
* TODO: when available, wrap buffers with a single buffer and avoid copying data for every method call.
*/
@Override
protected void doNext(ByteBuffer byteBuffer) {
this.buffer = ByteBuffer.allocate(this.buffer.capacity() + byteBuffer.capacity())
.put(this.buffer).put(byteBuffer);
protected void doNext(DataBuffer dataBuffer) {
this.buffer.write(dataBuffer);
while (REQUESTED.get(this) > 0) {
int separatorIndex = findEndOfLine(this.buffer);
if (separatorIndex != -1) {
if (BackpressureUtils.getAndSub(REQUESTED, this, 1L) > 0) {
byte[] message = new byte[separatorIndex];
this.buffer.get(message);
this.buffer.read(message);
consumeSeparator(this.buffer);
this.buffer = this.buffer.slice();
super.doNext(ByteBuffer.wrap(message));
// this.buffer = this.buffer.slice();
DataBuffer buffer2 = allocator.allocateBuffer(message.length);
buffer2.write(message);
super.doNext(buffer2);
}
}
else {
@ -191,9 +206,9 @@ public abstract class AbstractRawByteStreamDecoder<T> extends AbstractDecoder<T>
}
}
protected int findEndOfLine(ByteBuffer buffer) {
protected int findEndOfLine(DataBuffer buffer) {
final int n = buffer.limit();
final int n = buffer.readableByteCount();
for (int i = 0; i < n; i++) {
final byte b = buffer.get(i);
if (b == '\n') {
@ -207,16 +222,15 @@ public abstract class AbstractRawByteStreamDecoder<T> extends AbstractDecoder<T>
return -1;
}
protected void consumeSeparator(ByteBuffer buffer) {
byte sep = buffer.get();
protected void consumeSeparator(DataBuffer buffer) {
byte sep = buffer.read();
if (sep == '\r') {
buffer.get();
buffer.read();
}
}
protected void drainLast() {
if (BackpressureUtils.getAndSub(REQUESTED, this, 1L) > 0) {
this.buffer.flip();
subscriber.onNext(this.buffer);
super.doComplete();
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -22,6 +22,7 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
@ -43,10 +44,9 @@ public class ByteBufferDecoder extends AbstractDecoder<ByteBuffer> {
}
@Override
public Flux<ByteBuffer> decode(Publisher<ByteBuffer> inputStream, ResolvableType type,
public Flux<ByteBuffer> decode(Publisher<DataBuffer> inputStream, ResolvableType type,
MimeType mimeType, Object... hints) {
return Flux.from(inputStream);
return Flux.from(inputStream).map(DataBuffer::asByteBuffer);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -22,17 +22,18 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
/**
* @author Sebastien Deleuze
*/
public class ByteBufferEncoder extends AbstractEncoder<ByteBuffer> {
public class ByteBufferEncoder extends AbstractAllocatingEncoder<ByteBuffer> {
public ByteBufferEncoder() {
super(MimeTypeUtils.ALL);
public ByteBufferEncoder(DataBufferAllocator allocator) {
super(allocator, MimeTypeUtils.ALL);
}
@ -43,11 +44,16 @@ public class ByteBufferEncoder extends AbstractEncoder<ByteBuffer> {
}
@Override
public Flux<ByteBuffer> encode(Publisher<? extends ByteBuffer> inputStream, ResolvableType type,
public Flux<DataBuffer> encode(Publisher<? extends ByteBuffer> inputStream,
ResolvableType type,
MimeType mimeType, Object... hints) {
//noinspection unchecked
return Flux.from(inputStream);
return Flux.from(inputStream).map(byteBuffer -> {
DataBuffer dataBuffer = allocator().allocateBuffer(byteBuffer.remaining());
dataBuffer.write(byteBuffer);
return dataBuffer;
});
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,7 +17,6 @@
package org.springframework.core.codec.support;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -28,7 +27,7 @@ import reactor.core.publisher.Flux;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.CodecException;
import org.springframework.core.codec.Decoder;
import org.springframework.util.ByteBufferInputStream;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.util.MimeType;
@ -42,39 +41,38 @@ public class JacksonJsonDecoder extends AbstractDecoder<Object> {
private final ObjectMapper mapper;
private Decoder<ByteBuffer> preProcessor;
private Decoder<DataBuffer> preProcessor;
public JacksonJsonDecoder() {
this(new ObjectMapper(), null);
}
public JacksonJsonDecoder(Decoder<ByteBuffer> preProcessor) {
public JacksonJsonDecoder(Decoder<DataBuffer> preProcessor) {
this(new ObjectMapper(), preProcessor);
}
public JacksonJsonDecoder(ObjectMapper mapper, Decoder<ByteBuffer> preProcessor) {
public JacksonJsonDecoder(ObjectMapper mapper, Decoder<DataBuffer> preProcessor) {
super(new MimeType("application", "json", StandardCharsets.UTF_8),
new MimeType("application", "*+json", StandardCharsets.UTF_8));
this.mapper = mapper;
this.preProcessor = preProcessor;
}
@Override
public Flux<Object> decode(Publisher<ByteBuffer> inputStream, ResolvableType type,
public Flux<Object> decode(Publisher<DataBuffer> inputStream, ResolvableType type,
MimeType mimeType, Object... hints) {
ObjectReader reader = this.mapper.readerFor(type.getRawClass());
Flux<ByteBuffer> stream = Flux.from(inputStream);
Flux<DataBuffer> stream = Flux.from(inputStream);
if (this.preProcessor != null) {
stream = this.preProcessor.decode(inputStream, type, mimeType, hints);
}
return stream.map(content -> {
try {
return reader.readValue(new ByteBufferInputStream(content));
return reader.readValue(content.asInputStream());
}
catch (IOException e) {
throw new CodecException("Error while reading the data", e);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,19 +17,19 @@
package org.springframework.core.codec.support;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.io.buffer.Buffer;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.CodecException;
import org.springframework.core.codec.Encoder;
import org.springframework.util.BufferOutputStream;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.util.MimeType;
/**
@ -38,50 +38,49 @@ import org.springframework.util.MimeType;
* @author Sebastien Deleuze
* @see JacksonJsonDecoder
*/
public class JacksonJsonEncoder extends AbstractEncoder<Object> {
public class JacksonJsonEncoder extends AbstractAllocatingEncoder<Object> {
private final ObjectMapper mapper;
private Encoder<ByteBuffer> postProcessor;
private Encoder<DataBuffer> postProcessor;
public JacksonJsonEncoder() {
this(new ObjectMapper(), null);
public JacksonJsonEncoder(DataBufferAllocator allocator) {
this(allocator, new ObjectMapper(), null);
}
public JacksonJsonEncoder(Encoder<ByteBuffer> postProcessor) {
this(new ObjectMapper(), postProcessor);
public JacksonJsonEncoder(DataBufferAllocator allocator,
Encoder<DataBuffer> postProcessor) {
this(allocator, new ObjectMapper(), postProcessor);
}
public JacksonJsonEncoder(ObjectMapper mapper, Encoder<ByteBuffer> postProcessor) {
super(new MimeType("application", "json", StandardCharsets.UTF_8),
public JacksonJsonEncoder(DataBufferAllocator allocator, ObjectMapper mapper,
Encoder<DataBuffer> postProcessor) {
super(allocator, new MimeType("application", "json", StandardCharsets.UTF_8),
new MimeType("application", "*+json", StandardCharsets.UTF_8));
this.mapper = mapper;
this.postProcessor = postProcessor;
}
@Override
public Flux<ByteBuffer> encode(Publisher<? extends Object> inputStream,
public Flux<DataBuffer> encode(Publisher<? extends Object> inputStream,
ResolvableType type, MimeType mimeType, Object... hints) {
Publisher<ByteBuffer> stream = (inputStream instanceof Mono ?
Publisher<DataBuffer> stream = (inputStream instanceof Mono ?
((Mono<?>)inputStream).map(this::serialize) :
Flux.from(inputStream).map(this::serialize));
return (this.postProcessor == null ? Flux.from(stream) : this.postProcessor.encode(stream, type, mimeType, hints));
}
private ByteBuffer serialize(Object value) {
Buffer buffer = new Buffer();
BufferOutputStream outputStream = new BufferOutputStream(buffer);
private DataBuffer serialize(Object value) {
DataBuffer buffer = allocator().allocateBuffer();
OutputStream outputStream = buffer.asOutputStream();
try {
this.mapper.writeValue(outputStream, value);
}
catch (IOException e) {
throw new CodecException("Error while writing the data", e);
}
buffer.flip();
return buffer.byteBuffer();
return buffer;
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,7 +16,6 @@
package org.springframework.core.codec.support;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.xml.bind.JAXBContext;
@ -38,8 +37,9 @@ import reactor.core.publisher.Flux;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.CodecException;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.support.DataBufferUtils;
import org.springframework.util.Assert;
import org.springframework.util.ByteBufferPublisherInputStream;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
@ -60,12 +60,13 @@ public class Jaxb2Decoder extends AbstractDecoder<Object> {
@Override
public Flux<Object> decode(Publisher<ByteBuffer> inputStream, ResolvableType type,
public Flux<Object> decode(Publisher<DataBuffer> inputStream, ResolvableType type,
MimeType mimeType, Object... hints) {
Class<?> outputClass = type.getRawClass();
try {
Source source = processSource(new StreamSource(new ByteBufferPublisherInputStream(inputStream)));
Source source = processSource(
new StreamSource(DataBufferUtils.toInputStream(inputStream)));
Unmarshaller unmarshaller = createUnmarshaller(outputClass);
if (outputClass.isAnnotationPresent(XmlRootElement.class)) {
return Flux.just(unmarshaller.unmarshal(source));

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,7 +16,7 @@
package org.springframework.core.codec.support;
import java.nio.ByteBuffer;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -27,12 +27,12 @@ import javax.xml.bind.Marshaller;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.io.buffer.Buffer;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.CodecException;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.util.Assert;
import org.springframework.util.BufferOutputStream;
import org.springframework.util.ClassUtils;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
@ -43,30 +43,29 @@ import org.springframework.util.MimeTypeUtils;
* @author Sebastien Deleuze
* @see Jaxb2Decoder
*/
public class Jaxb2Encoder extends AbstractEncoder<Object> {
public class Jaxb2Encoder extends AbstractAllocatingEncoder<Object> {
private final ConcurrentMap<Class<?>, JAXBContext> jaxbContexts = new ConcurrentHashMap<>(64);
public Jaxb2Encoder() {
super(MimeTypeUtils.APPLICATION_XML, MimeTypeUtils.TEXT_XML);
public Jaxb2Encoder(DataBufferAllocator allocator) {
super(allocator, MimeTypeUtils.APPLICATION_XML, MimeTypeUtils.TEXT_XML);
}
@Override
public Flux<ByteBuffer> encode(Publisher<? extends Object> messageStream, ResolvableType type,
public Flux<DataBuffer> encode(Publisher<? extends Object> messageStream,
ResolvableType type,
MimeType mimeType, Object... hints) {
return Flux.from(messageStream).map(value -> {
try {
Buffer buffer = new Buffer();
BufferOutputStream outputStream = new BufferOutputStream(buffer);
DataBuffer buffer = allocator().allocateBuffer(1024);
OutputStream outputStream = buffer.asOutputStream();
Class<?> clazz = ClassUtils.getUserClass(value);
Marshaller marshaller = createMarshaller(clazz);
marshaller.setProperty(Marshaller.JAXB_ENCODING, StandardCharsets.UTF_8.name());
marshaller.marshal(value, outputStream);
buffer.flip();
return buffer.byteBuffer();
return buffer;
}
catch (MarshalException ex) {
throw new CodecException("Could not marshal [" + value + "]: " + ex.getMessage(), ex);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,7 +16,6 @@
package org.springframework.core.codec.support;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
@ -29,6 +28,8 @@ import reactor.core.publisher.Flux;
import reactor.fn.Function;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.util.MimeType;
/**
@ -44,7 +45,7 @@ import org.springframework.util.MimeType;
* @author Sebastien Deleuze
* @see JsonObjectEncoder
*/
public class JsonObjectDecoder extends AbstractDecoder<ByteBuffer> {
public class JsonObjectDecoder extends AbstractDecoder<DataBuffer> {
private static final int ST_CORRUPTED = -1;
@ -54,38 +55,40 @@ public class JsonObjectDecoder extends AbstractDecoder<ByteBuffer> {
private static final int ST_DECODING_ARRAY_STREAM = 2;
private final DataBufferAllocator allocator;
private final int maxObjectLength;
private final boolean streamArrayElements;
public JsonObjectDecoder() {
public JsonObjectDecoder(DataBufferAllocator allocator) {
// 1 MB
this(1024 * 1024);
this(allocator, 1024 * 1024);
}
public JsonObjectDecoder(int maxObjectLength) {
this(maxObjectLength, true);
public JsonObjectDecoder(DataBufferAllocator allocator, int maxObjectLength) {
this(allocator, maxObjectLength, true);
}
public JsonObjectDecoder(boolean streamArrayElements) {
this(1024 * 1024, streamArrayElements);
public JsonObjectDecoder(DataBufferAllocator allocator, boolean streamArrayElements) {
this(allocator, 1024 * 1024, streamArrayElements);
}
/**
* @param allocator
* @param maxObjectLength maximum number of bytes a JSON object/array may
* use (including braces and all). Objects exceeding this length are dropped
* and an {@link IllegalStateException} is thrown.
* @param streamArrayElements if set to true and the "top level" JSON object
* is an array, each of its entries is passed through the pipeline individually
* and immediately after it was fully received, allowing for arrays with
* "infinitely" many elements.
*/
public JsonObjectDecoder(int maxObjectLength, boolean streamArrayElements) {
public JsonObjectDecoder(DataBufferAllocator allocator, int maxObjectLength,
boolean streamArrayElements) {
super(new MimeType("application", "json", StandardCharsets.UTF_8),
new MimeType("application", "*+json", StandardCharsets.UTF_8));
this.allocator = allocator;
if (maxObjectLength < 1) {
throw new IllegalArgumentException("maxObjectLength must be a positive int");
}
@ -94,10 +97,11 @@ public class JsonObjectDecoder extends AbstractDecoder<ByteBuffer> {
}
@Override
public Flux<ByteBuffer> decode(Publisher<ByteBuffer> inputStream, ResolvableType type,
public Flux<DataBuffer> decode(Publisher<DataBuffer> inputStream, ResolvableType type,
MimeType mimeType, Object... hints) {
return Flux.from(inputStream).flatMap(new Function<ByteBuffer, Publisher<? extends ByteBuffer>>() {
return Flux.from(inputStream)
.flatMap(new Function<DataBuffer, Publisher<? extends DataBuffer>>() {
int openBraces;
int index;
@ -107,14 +111,15 @@ public class JsonObjectDecoder extends AbstractDecoder<ByteBuffer> {
Integer writerIndex;
@Override
public Publisher<? extends ByteBuffer> apply(ByteBuffer b) {
List<ByteBuffer> chunks = new ArrayList<>();
public Publisher<? extends DataBuffer> apply(DataBuffer b) {
List<DataBuffer> chunks = new ArrayList<>();
if (this.input == null) {
this.input = Unpooled.copiedBuffer(b);
this.input = Unpooled.copiedBuffer(b.asByteBuffer());
this.writerIndex = this.input.writerIndex();
}
else {
this.input = Unpooled.copiedBuffer(this.input, Unpooled.copiedBuffer(b));
this.input = Unpooled.copiedBuffer(this.input,
Unpooled.copiedBuffer(b.asByteBuffer()));
this.writerIndex = this.input.writerIndex();
}
if (this.state == ST_CORRUPTED) {
@ -139,7 +144,7 @@ public class JsonObjectDecoder extends AbstractDecoder<ByteBuffer> {
ByteBuf json = extractObject(this.input, this.input.readerIndex(),
this.index + 1 - this.input.readerIndex());
if (json != null) {
chunks.add(json.nioBuffer());
chunks.add(allocator.wrap(json.nioBuffer()));
}
// The JSON object/array was extracted => discard the bytes from
@ -173,7 +178,7 @@ public class JsonObjectDecoder extends AbstractDecoder<ByteBuffer> {
idxNoSpaces + 1 - this.input.readerIndex());
if (json != null) {
chunks.add(json.nioBuffer());
chunks.add(allocator.wrap(json.nioBuffer()));
}
this.input.readerIndex(this.index + 1);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,7 +16,6 @@
package org.springframework.core.codec.support;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@ -27,9 +26,10 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.subscriber.SubscriberBarrier;
import reactor.core.util.BackpressureUtils;
import reactor.io.buffer.Buffer;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.util.MimeType;
/**
@ -42,25 +42,25 @@ import org.springframework.util.MimeType;
*
* @see JsonObjectDecoder
*/
public class JsonObjectEncoder extends AbstractEncoder<ByteBuffer> {
public class JsonObjectEncoder extends AbstractAllocatingEncoder<DataBuffer> {
public JsonObjectEncoder() {
super(new MimeType("application", "json", StandardCharsets.UTF_8),
public JsonObjectEncoder(DataBufferAllocator allocator) {
super(allocator, new MimeType("application", "json", StandardCharsets.UTF_8),
new MimeType("application", "*+json", StandardCharsets.UTF_8));
}
@Override
public Flux<ByteBuffer> encode(Publisher<? extends ByteBuffer> inputStream,
public Flux<DataBuffer> encode(Publisher<? extends DataBuffer> inputStream,
ResolvableType type, MimeType mimeType, Object... hints) {
if (inputStream instanceof Mono) {
return Flux.from(inputStream);
}
return Flux.from(inputStream).lift(s -> new JsonArrayEncoderBarrier(s));
return Flux.from(inputStream)
.lift(s -> new JsonArrayEncoderBarrier(s, allocator()));
}
private static class JsonArrayEncoderBarrier extends SubscriberBarrier<ByteBuffer, ByteBuffer> {
private static class JsonArrayEncoderBarrier
extends SubscriberBarrier<DataBuffer, DataBuffer> {
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<JsonArrayEncoderBarrier> REQUESTED =
@ -69,8 +69,9 @@ public class JsonObjectEncoder extends AbstractEncoder<ByteBuffer> {
static final AtomicIntegerFieldUpdater<JsonArrayEncoderBarrier> TERMINATED =
AtomicIntegerFieldUpdater.newUpdater(JsonArrayEncoderBarrier.class, "terminated");
private final DataBufferAllocator allocator;
private ByteBuffer prev = null;
private DataBuffer prev = null;
private long count = 0;
@ -78,9 +79,10 @@ public class JsonObjectEncoder extends AbstractEncoder<ByteBuffer> {
private volatile int terminated;
public JsonArrayEncoderBarrier(Subscriber<? super ByteBuffer> subscriber) {
public JsonArrayEncoderBarrier(Subscriber<? super DataBuffer> subscriber,
DataBufferAllocator allocator) {
super(subscriber);
this.allocator = allocator;
}
@ -96,34 +98,32 @@ public class JsonObjectEncoder extends AbstractEncoder<ByteBuffer> {
}
@Override
protected void doNext(ByteBuffer next) {
protected void doNext(DataBuffer next) {
this.count++;
ByteBuffer tmp = this.prev;
DataBuffer tmp = this.prev;
this.prev = next;
Buffer buffer = new Buffer();
DataBuffer buffer = allocator.allocateBuffer();
if (this.count == 1) {
buffer.append("[");
buffer.write((byte) '[');
}
if (tmp != null) {
buffer.append(tmp);
buffer.write(tmp);
}
if (this.count > 1) {
buffer.append(",");
buffer.write((byte) ',');
}
buffer.flip();
BackpressureUtils.getAndSub(REQUESTED, this, 1L);
subscriber.onNext(buffer.byteBuffer());
subscriber.onNext(buffer);
}
protected void drainLast(){
if(BackpressureUtils.getAndSub(REQUESTED, this, 1L) > 0) {
Buffer buffer = new Buffer();
buffer.append(this.prev);
buffer.append("]");
buffer.flip();
subscriber.onNext(buffer.byteBuffer());
DataBuffer buffer = allocator.allocateBuffer();
buffer.write(this.prev);
buffer.write((byte) ']');
subscriber.onNext(buffer);
super.doComplete();
}
}

View File

@ -16,7 +16,6 @@
package org.springframework.core.codec.support;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
@ -26,6 +25,8 @@ import reactor.core.publisher.Flux;
import reactor.core.subscriber.SubscriberBarrier;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.util.MimeType;
/**
@ -47,14 +48,16 @@ public class StringDecoder extends AbstractRawByteStreamDecoder<String> {
public final boolean reduceToSingleBuffer;
private final DataBufferAllocator allocator;
/**
* Create a {@code StringDecoder} that decodes a bytes stream to a String stream
*
* <p>By default, this decoder will buffer bytes and
* emit a single String as a result.
*/
public StringDecoder() {
this(true);
public StringDecoder(DataBufferAllocator allocator) {
this(allocator, true);
}
/**
@ -63,9 +66,10 @@ public class StringDecoder extends AbstractRawByteStreamDecoder<String> {
* @param reduceToSingleBuffer whether this decoder should buffer all received items
* and decode a single consolidated String or re-emit items as they are provided
*/
public StringDecoder(boolean reduceToSingleBuffer) {
super(new MimeType("text", "plain", DEFAULT_CHARSET));
public StringDecoder(DataBufferAllocator allocator, boolean reduceToSingleBuffer) {
super(allocator, new MimeType("text", "plain", DEFAULT_CHARSET));
this.reduceToSingleBuffer = reduceToSingleBuffer;
this.allocator = allocator;
}
@Override
@ -75,18 +79,20 @@ public class StringDecoder extends AbstractRawByteStreamDecoder<String> {
}
@Override
public SubscriberBarrier<ByteBuffer, ByteBuffer> subscriberBarrier(Subscriber<? super ByteBuffer> subscriber) {
public SubscriberBarrier<DataBuffer, DataBuffer> subscriberBarrier(
Subscriber<? super DataBuffer> subscriber) {
if (reduceToSingleBuffer) {
return new ReduceSingleByteStreamBarrier(subscriber);
return new ReduceSingleByteStreamBarrier(subscriber, allocator);
}
else {
return new SubscriberBarrier(subscriber);
return new SubscriberBarrier<DataBuffer, DataBuffer>(subscriber);
}
}
@Override
public Flux<String> decodeInternal(Publisher<ByteBuffer> inputStream, ResolvableType type, MimeType mimeType, Object... hints) {
public Flux<String> decodeInternal(Publisher<DataBuffer> inputStream,
ResolvableType type, MimeType mimeType, Object... hints) {
Charset charset;
if (mimeType != null && mimeType.getCharSet() != null) {
charset = mimeType.getCharSet();
@ -94,7 +100,11 @@ public class StringDecoder extends AbstractRawByteStreamDecoder<String> {
else {
charset = DEFAULT_CHARSET;
}
return Flux.from(inputStream).map(content -> new String(content.duplicate().array(), charset));
return Flux.from(inputStream).map(content -> {
byte[] bytes = new byte[content.readableByteCount()];
content.read(bytes);
return new String(bytes, charset);
});
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,7 +16,6 @@
package org.springframework.core.codec.support;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
@ -24,6 +23,8 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.util.MimeType;
/**
@ -32,13 +33,12 @@ import org.springframework.util.MimeType;
* @author Sebastien Deleuze
* @see StringDecoder
*/
public class StringEncoder extends AbstractEncoder<String> {
public class StringEncoder extends AbstractAllocatingEncoder<String> {
public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
public StringEncoder() {
super(new MimeType("text", "plain", DEFAULT_CHARSET));
public StringEncoder(DataBufferAllocator allocator) {
super(allocator, new MimeType("text", "plain", DEFAULT_CHARSET));
}
@ -49,7 +49,7 @@ public class StringEncoder extends AbstractEncoder<String> {
}
@Override
public Flux<ByteBuffer> encode(Publisher<? extends String> elementStream,
public Flux<DataBuffer> encode(Publisher<? extends String> elementStream,
ResolvableType type, MimeType mimeType, Object... hints) {
Charset charset;
@ -59,7 +59,12 @@ public class StringEncoder extends AbstractEncoder<String> {
else {
charset = DEFAULT_CHARSET;
}
return Flux.from(elementStream).map(s -> ByteBuffer.wrap(s.getBytes(charset)));
return Flux.from(elementStream).map(s -> {
byte[] bytes = s.getBytes(charset);
DataBuffer dataBuffer = allocator().allocateBuffer(bytes.length);
dataBuffer.write(bytes);
return dataBuffer;
});
}
}

View File

@ -0,0 +1,59 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.codec.support;
import java.nio.charset.StandardCharsets;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
/**
* @author Arjen Poutsma
*/
@RunWith(Parameterized.class)
public abstract class AbstractAllocatingTestCase {
@Parameterized.Parameter
public DataBufferAllocator allocator;
@Parameterized.Parameters(name = "{0}")
public static Object[][] allocators() {
return new Object[][]{
{new NettyDataBufferAllocator(new UnpooledByteBufAllocator(true))},
{new NettyDataBufferAllocator(new UnpooledByteBufAllocator(false))},
{new NettyDataBufferAllocator(new PooledByteBufAllocator(true))},
{new NettyDataBufferAllocator(new PooledByteBufAllocator(false))},
{new DefaultDataBufferAllocator(true)},
{new DefaultDataBufferAllocator(false)}
};
}
protected DataBuffer stringBuffer(String value) {
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = allocator.allocateBuffer(bytes.length);
buffer.write(bytes);
return buffer;
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.reactive.codec.decoder;
package org.springframework.core.codec.support;
import java.nio.ByteBuffer;
import java.util.List;
@ -26,7 +26,7 @@ import reactor.core.publisher.Flux;
import reactor.io.buffer.Buffer;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.support.ByteBufferDecoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.MediaType;
import static java.util.stream.Collectors.toList;
@ -35,7 +35,7 @@ import static org.junit.Assert.*;
/**
* @author Sebastien Deleuze
*/
public class ByteBufferDecoderTests {
public class ByteBufferDecoderTests extends AbstractAllocatingTestCase {
private final ByteBufferDecoder decoder = new ByteBufferDecoder();
@ -48,14 +48,25 @@ public class ByteBufferDecoderTests {
@Test
public void decode() throws InterruptedException {
ByteBuffer fooBuffer = Buffer.wrap("foo").byteBuffer();
ByteBuffer barBuffer = Buffer.wrap("bar").byteBuffer();
Flux<ByteBuffer> source = Flux.just(fooBuffer, barBuffer);
DataBuffer fooBuffer = stringBuffer("foo");
DataBuffer barBuffer = stringBuffer("bar");
Flux<DataBuffer> source = Flux.just(fooBuffer, barBuffer);
Flux<ByteBuffer> output = decoder.decode(source, ResolvableType.forClassWithGenerics(Publisher.class, ByteBuffer.class), null);
List<ByteBuffer> results = StreamSupport.stream(output.toIterable().spliterator(), false).collect(toList());
assertEquals(2, results.size());
assertEquals(fooBuffer, results.get(0));
assertEquals(barBuffer, results.get(1));
assertBufferEquals(fooBuffer, results.get(0));
assertBufferEquals(barBuffer, results.get(1));
}
public void assertBufferEquals(DataBuffer expected, ByteBuffer actual) {
byte[] byteBufferBytes = new byte[actual.remaining()];
actual.get(byteBufferBytes);
byte[] dataBufferBytes = new byte[expected.readableByteCount()];
expected.read(dataBufferBytes);
assertArrayEquals(dataBufferBytes, byteBufferBytes);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -14,19 +14,20 @@
* limitations under the License.
*/
package org.springframework.reactive.codec.encoder;
package org.springframework.core.codec.support;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.StreamSupport;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.io.buffer.Buffer;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.support.ByteBufferEncoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.MediaType;
import static java.util.stream.Collectors.toList;
@ -35,27 +36,47 @@ import static org.junit.Assert.*;
/**
* @author Sebastien Deleuze
*/
public class ByteBufferEncoderTests {
public class ByteBufferEncoderTests extends AbstractAllocatingTestCase {
private final ByteBufferEncoder encoder = new ByteBufferEncoder();
private ByteBufferEncoder encoder;
@Before
public void createEncoder() {
encoder = new ByteBufferEncoder(allocator);
}
@Test
public void canDecode() {
public void canEncode() {
assertTrue(encoder.canEncode(ResolvableType.forClass(ByteBuffer.class), MediaType.TEXT_PLAIN));
assertFalse(encoder.canEncode(ResolvableType.forClass(Integer.class), MediaType.TEXT_PLAIN));
assertTrue(encoder.canEncode(ResolvableType.forClass(ByteBuffer.class), MediaType.APPLICATION_JSON));
}
@Test
public void decode() throws InterruptedException {
ByteBuffer fooBuffer = Buffer.wrap("foo").byteBuffer();
ByteBuffer barBuffer = Buffer.wrap("bar").byteBuffer();
Flux<ByteBuffer> source = Flux.just(fooBuffer, barBuffer);
Flux<ByteBuffer> output = encoder.encode(source, ResolvableType.forClassWithGenerics(Publisher.class, ByteBuffer.class), null);
List<ByteBuffer> results = StreamSupport.stream(output.toIterable().spliterator(), false).collect(toList());
public void encode() throws Exception {
byte[] fooBytes = "foo".getBytes(StandardCharsets.UTF_8);
byte[] barBytes = "bar".getBytes(StandardCharsets.UTF_8);
Flux<ByteBuffer> source =
Flux.just(ByteBuffer.wrap(fooBytes), ByteBuffer.wrap(barBytes));
Flux<DataBuffer> output = encoder.encode(source,
ResolvableType.forClassWithGenerics(Publisher.class, ByteBuffer.class),
null);
List<DataBuffer> results =
StreamSupport.stream(output.toIterable().spliterator(), false)
.collect(toList());
assertEquals(2, results.size());
assertEquals(fooBuffer, results.get(0));
assertEquals(barBuffer, results.get(1));
assertEquals(3, results.get(0).readableByteCount());
assertEquals(3, results.get(1).readableByteCount());
byte[] buf = new byte[3];
results.get(0).read(buf);
assertArrayEquals(fooBytes, buf);
results.get(1).read(buf);
assertArrayEquals(barBytes, buf);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -14,9 +14,8 @@
* limitations under the License.
*/
package org.springframework.reactive.codec.decoder;
package org.springframework.core.codec.support;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.stream.StreamSupport;
@ -25,9 +24,8 @@ import reactor.core.publisher.Flux;
import reactor.io.buffer.Buffer;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.support.JacksonJsonDecoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.MediaType;
import org.springframework.reactive.codec.Pojo;
import static java.util.stream.Collectors.toList;
import static org.junit.Assert.*;
@ -35,7 +33,7 @@ import static org.junit.Assert.*;
/**
* @author Sebastien Deleuze
*/
public class JacksonJsonDecoderTests {
public class JacksonJsonDecoderTests extends AbstractAllocatingTestCase {
private final JacksonJsonDecoder decoder = new JacksonJsonDecoder();
@ -47,7 +45,8 @@ public class JacksonJsonDecoderTests {
@Test
public void decode() throws InterruptedException {
Flux<ByteBuffer> source = Flux.just(Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer());
Flux<DataBuffer> source =
Flux.just(stringBuffer("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}"));
Flux<Object> output = decoder.decode(source, ResolvableType.forClass(Pojo.class), null);
List<Object> results = StreamSupport.stream(output.toIterable().spliterator(), false).collect(toList());
assertEquals(1, results.size());

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -14,18 +14,17 @@
* limitations under the License.
*/
package org.springframework.reactive.codec.encoder;
package org.springframework.core.codec.support;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.StreamSupport;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import org.springframework.core.codec.support.JacksonJsonEncoder;
import org.springframework.http.MediaType;
import org.springframework.reactive.codec.Pojo;
import static java.util.stream.Collectors.toList;
import static org.junit.Assert.*;
@ -33,9 +32,14 @@ import static org.junit.Assert.*;
/**
* @author Sebastien Deleuze
*/
public class JacksonJsonEncoderTests {
public class JacksonJsonEncoderTests extends AbstractAllocatingTestCase {
private final JacksonJsonEncoder encoder = new JacksonJsonEncoder();
private JacksonJsonEncoder encoder;
@Before
public void createEncoder() {
encoder = new JacksonJsonEncoder(allocator);
}
@Test
public void canWrite() {
@ -47,8 +51,8 @@ public class JacksonJsonEncoderTests {
public void write() throws InterruptedException {
Flux<Pojo> source = Flux.just(new Pojo("foofoo", "barbar"), new Pojo("foofoofoo", "barbarbar"));
Flux<String> output = encoder.encode(source, null, null).map(chunk -> {
byte[] b = new byte[chunk.remaining()];
chunk.get(b);
byte[] b = new byte[chunk.readableByteCount()];
chunk.read(b);
return new String(b, StandardCharsets.UTF_8);
});
List<String> results = StreamSupport.stream(output.toIterable().spliterator(), false).collect(toList());

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -14,9 +14,8 @@
* limitations under the License.
*/
package org.springframework.reactive.codec.decoder;
package org.springframework.core.codec.support;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.stream.StreamSupport;
@ -25,9 +24,8 @@ import reactor.core.publisher.Flux;
import reactor.io.buffer.Buffer;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.support.Jaxb2Decoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.MediaType;
import org.springframework.reactive.codec.Pojo;
import static java.util.stream.Collectors.toList;
import static org.junit.Assert.*;
@ -35,7 +33,7 @@ import static org.junit.Assert.*;
/**
* @author Sebastien Deleuze
*/
public class Jaxb2DecoderTests {
public class Jaxb2DecoderTests extends AbstractAllocatingTestCase {
private final Jaxb2Decoder decoder = new Jaxb2Decoder();
@ -48,7 +46,8 @@ public class Jaxb2DecoderTests {
@Test
public void decode() throws InterruptedException {
Flux<ByteBuffer> source = Flux.just(Buffer.wrap("<?xml version=\"1.0\" encoding=\"UTF-8\"?><pojo><bar>barbar</bar><foo>foofoo</foo></pojo>").byteBuffer());
Flux<DataBuffer> source = Flux.just(stringBuffer(
"<?xml version=\"1.0\" encoding=\"UTF-8\"?><pojo><bar>barbar</bar><foo>foofoo</foo></pojo>"));
Flux<Object> output = decoder.decode(source, ResolvableType.forClass(Pojo.class), null);
List<Object> results = StreamSupport.stream(output.toIterable().spliterator(), false).collect(toList());
assertEquals(1, results.size());

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -14,18 +14,17 @@
* limitations under the License.
*/
package org.springframework.reactive.codec.encoder;
package org.springframework.core.codec.support;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.StreamSupport;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import org.springframework.core.codec.support.Jaxb2Encoder;
import org.springframework.http.MediaType;
import org.springframework.reactive.codec.Pojo;
import static java.util.stream.Collectors.toList;
import static org.junit.Assert.*;
@ -33,9 +32,14 @@ import static org.junit.Assert.*;
/**
* @author Sebastien Deleuze
*/
public class Jaxb2EncoderTests {
public class Jaxb2EncoderTests extends AbstractAllocatingTestCase {
private final Jaxb2Encoder encoder = new Jaxb2Encoder();
private Jaxb2Encoder encoder;
@Before
public void createEncoder() {
encoder = new Jaxb2Encoder(allocator);
}
@Test
public void canEncode() {
@ -48,8 +52,8 @@ public class Jaxb2EncoderTests {
public void encode() throws InterruptedException {
Flux<Pojo> source = Flux.just(new Pojo("foofoo", "barbar"), new Pojo("foofoofoo", "barbarbar"));
Flux<String> output = encoder.encode(source, null, null).map(chunk -> {
byte[] b = new byte[chunk.remaining()];
chunk.get(b);
byte[] b = new byte[chunk.readableByteCount()];
chunk.read(b);
return new String(b, StandardCharsets.UTF_8);
});
List<String> results = StreamSupport.stream(output.toIterable().spliterator(), false).collect(toList());

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -14,18 +14,16 @@
* limitations under the License.
*/
package org.springframework.reactive.codec.decoder;
package org.springframework.core.codec.support;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.StreamSupport;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.io.buffer.Buffer;
import org.springframework.core.codec.support.JsonObjectDecoder;
import org.springframework.core.io.buffer.DataBuffer;
import static java.util.stream.Collectors.toList;
import static org.junit.Assert.assertEquals;
@ -33,17 +31,16 @@ import static org.junit.Assert.assertEquals;
/**
* @author Sebastien Deleuze
*/
public class JsonObjectDecoderTests {
public class JsonObjectDecoderTests extends AbstractAllocatingTestCase {
@Test
public void decodeSingleChunkToJsonObject() throws InterruptedException {
JsonObjectDecoder decoder = new JsonObjectDecoder();
Flux<ByteBuffer> source = Flux.just(Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer());
Flux<String> output = decoder.decode(source, null, null).map(chunk -> {
byte[] b = new byte[chunk.remaining()];
chunk.get(b);
return new String(b, StandardCharsets.UTF_8);
});
JsonObjectDecoder decoder = new JsonObjectDecoder(allocator);
Flux<DataBuffer> source =
Flux.just(stringBuffer("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}"));
Flux<String> output =
decoder.decode(source, null, null).map(JsonObjectDecoderTests::toString);
List<Object> results = StreamSupport.stream(output.toIterable().spliterator(), false).collect(toList());
assertEquals(1, results.size());
assertEquals("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}", results.get(0));
@ -51,13 +48,11 @@ public class JsonObjectDecoderTests {
@Test
public void decodeMultipleChunksToJsonObject() throws InterruptedException {
JsonObjectDecoder decoder = new JsonObjectDecoder();
Flux<ByteBuffer> source = Flux.just(Buffer.wrap("{\"foo\": \"foofoo\"").byteBuffer(), Buffer.wrap(", \"bar\": \"barbar\"}").byteBuffer());
Flux<String> output = decoder.decode(source, null, null).map(chunk -> {
byte[] b = new byte[chunk.remaining()];
chunk.get(b);
return new String(b, StandardCharsets.UTF_8);
});
JsonObjectDecoder decoder = new JsonObjectDecoder(allocator);
Flux<DataBuffer> source = Flux.just(stringBuffer("{\"foo\": \"foofoo\""),
stringBuffer(", \"bar\": \"barbar\"}"));
Flux<String> output =
decoder.decode(source, null, null).map(JsonObjectDecoderTests::toString);
List<String> results = StreamSupport.stream(output.toIterable().spliterator(), false).collect(toList());
assertEquals(1, results.size());
assertEquals("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}", results.get(0));
@ -65,13 +60,12 @@ public class JsonObjectDecoderTests {
@Test
public void decodeSingleChunkToArray() throws InterruptedException {
JsonObjectDecoder decoder = new JsonObjectDecoder();
Flux<ByteBuffer> source = Flux.just(Buffer.wrap("[{\"foo\": \"foofoo\", \"bar\": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]").byteBuffer());
Flux<String> output = decoder.decode(source, null, null).map(chunk -> {
byte[] b = new byte[chunk.remaining()];
chunk.get(b);
return new String(b, StandardCharsets.UTF_8);
});
JsonObjectDecoder decoder = new JsonObjectDecoder(allocator);
Flux<DataBuffer> source = Flux.just(stringBuffer(
"[{\"foo\": \"foofoo\", \"bar\": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]"));
Flux<String> output =
decoder.decode(source, null, null).map(JsonObjectDecoderTests::toString);
List<String> results = StreamSupport.stream(output.toIterable().spliterator(), false).collect(toList());
assertEquals(2, results.size());
assertEquals("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}", results.get(0));
@ -80,17 +74,22 @@ public class JsonObjectDecoderTests {
@Test
public void decodeMultipleChunksToArray() throws InterruptedException {
JsonObjectDecoder decoder = new JsonObjectDecoder();
Flux<ByteBuffer> source = Flux.just(Buffer.wrap("[{\"foo\": \"foofoo\", \"bar\"").byteBuffer(), Buffer.wrap(": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]").byteBuffer());
Flux<String> output = decoder.decode(source, null, null).map(chunk -> {
byte[] b = new byte[chunk.remaining()];
chunk.get(b);
return new String(b, StandardCharsets.UTF_8);
});
JsonObjectDecoder decoder = new JsonObjectDecoder(allocator);
Flux<DataBuffer> source =
Flux.just(stringBuffer("[{\"foo\": \"foofoo\", \"bar\""), stringBuffer(
": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]"));
Flux<String> output =
decoder.decode(source, null, null).map(JsonObjectDecoderTests::toString);
List<String> results = StreamSupport.stream(output.toIterable().spliterator(), false).collect(toList());
assertEquals(2, results.size());
assertEquals("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}", results.get(0));
assertEquals("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}", results.get(1));
}
private static String toString(DataBuffer buffer) {
byte[] b = new byte[buffer.readableByteCount()];
buffer.read(b);
return new String(b, StandardCharsets.UTF_8);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -14,45 +14,52 @@
* limitations under the License.
*/
package org.springframework.reactive.codec.encoder;
package org.springframework.core.codec.support;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.io.buffer.Buffer;
import org.springframework.core.codec.support.JsonObjectEncoder;
import org.springframework.core.io.buffer.DataBuffer;
import static org.junit.Assert.assertEquals;
/**
* @author Sebastien Deleuze
*/
public class JsonObjectEncoderTests {
public class JsonObjectEncoderTests extends AbstractAllocatingTestCase {
private JsonObjectEncoder encoder;
@Before
public void createEncoder() {
encoder = new JsonObjectEncoder(allocator);
}
@Test
public void encodeSingleElementFlux() throws InterruptedException {
JsonObjectEncoder encoder = new JsonObjectEncoder();
Flux<ByteBuffer> source = Flux.just(Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer());
Flux<DataBuffer> source =
Flux.just(stringBuffer("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}"));
Iterable<String> results = Flux.from(encoder.encode(source, null, null)).map(chunk -> {
byte[] b = new byte[chunk.remaining()];
chunk.get(b);
byte[] b = new byte[chunk.readableByteCount()];
chunk.read(b);
return new String(b, StandardCharsets.UTF_8);
}).toIterable();
String result = String.join("", results);
assertEquals("[{\"foo\": \"foofoo\", \"bar\": \"barbar\"}]", result);
}
@Test
public void encodeSingleElementMono() throws InterruptedException {
JsonObjectEncoder encoder = new JsonObjectEncoder();
Mono<ByteBuffer> source = Mono.just(Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer());
Mono<DataBuffer> source =
Mono.just(stringBuffer("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}"));
Iterable<String> results = Flux.from(encoder.encode(source, null, null)).map(chunk -> {
byte[] b = new byte[chunk.remaining()];
chunk.get(b);
byte[] b = new byte[chunk.readableByteCount()];
chunk.read(b);
return new String(b, StandardCharsets.UTF_8);
}).toIterable();
String result = String.join("", results);
@ -61,13 +68,12 @@ public class JsonObjectEncoderTests {
@Test
public void encodeTwoElementsFlux() throws InterruptedException {
JsonObjectEncoder encoder = new JsonObjectEncoder();
Flux<ByteBuffer> source = Flux.just(
Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer(),
Buffer.wrap("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}").byteBuffer());
Flux<DataBuffer> source =
Flux.just(stringBuffer("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}"),
stringBuffer("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}"));
Iterable<String> results = Flux.from(encoder.encode(source, null, null)).map(chunk -> {
byte[] b = new byte[chunk.remaining()];
chunk.get(b);
byte[] b = new byte[chunk.readableByteCount()];
chunk.read(b);
return new String(b, StandardCharsets.UTF_8);
}).toIterable();
String result = String.join("", results);
@ -76,15 +82,15 @@ public class JsonObjectEncoderTests {
@Test
public void encodeThreeElementsFlux() throws InterruptedException {
JsonObjectEncoder encoder = new JsonObjectEncoder();
Flux<ByteBuffer> source = Flux.just(
Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer(),
Buffer.wrap("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}").byteBuffer(),
Buffer.wrap("{\"foo\": \"foofoofoofoo\", \"bar\": \"barbarbarbar\"}").byteBuffer()
Flux<DataBuffer> source =
Flux.just(stringBuffer("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}"),
stringBuffer("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}"),
stringBuffer(
"{\"foo\": \"foofoofoofoo\", \"bar\": \"barbarbarbar\"}")
);
Iterable<String> results = Flux.from(encoder.encode(source, null, null)).map(chunk -> {
byte[] b = new byte[chunk.remaining()];
chunk.get(b);
byte[] b = new byte[chunk.readableByteCount()];
chunk.read(b);
return new String(b, StandardCharsets.UTF_8);
}).toIterable();
String result = String.join("", results);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.reactive.codec;
package org.springframework.core.codec.support;
import javax.xml.bind.annotation.XmlRootElement;
@ -52,4 +52,20 @@ public class Pojo {
this.bar = bar;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o instanceof Pojo) {
Pojo other = (Pojo) o;
return this.foo.equals(other.foo) && this.bar.equals(other.bar);
}
return false;
}
@Override
public int hashCode() {
return 31 * foo.hashCode() + bar.hashCode();
}
}

View File

@ -14,12 +14,12 @@
* limitations under the License.
*/
package org.springframework.reactive.codec.decoder;
package org.springframework.core.codec.support;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.stream.StreamSupport;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -28,7 +28,7 @@ import reactor.io.buffer.Buffer;
import rx.Single;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.support.StringDecoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.MediaType;
import static java.util.stream.Collectors.toList;
@ -38,9 +38,15 @@ import static org.junit.Assert.*;
* @author Sebastien Deleuze
* @author Brian Clozel
*/
public class StringDecoderTests {
public class StringDecoderTests extends AbstractAllocatingTestCase {
private StringDecoder decoder;
@Before
public void createEncoder() {
decoder = new StringDecoder(allocator);
}
private final StringDecoder decoder = new StringDecoder();
@Test
public void canDecode() {
@ -51,7 +57,7 @@ public class StringDecoderTests {
@Test
public void decode() throws InterruptedException {
Flux<ByteBuffer> source = Flux.just(Buffer.wrap("foo").byteBuffer(), Buffer.wrap("bar").byteBuffer());
Flux<DataBuffer> source = Flux.just(stringBuffer("foo"), stringBuffer("bar"));
Flux<String> output = this.decoder.decode(source, ResolvableType.forClassWithGenerics(Flux.class, String.class), null);
List<String> results = StreamSupport.stream(output.toIterable().spliterator(), false).collect(toList());
assertEquals(1, results.size());
@ -60,8 +66,8 @@ public class StringDecoderTests {
@Test
public void decodeDoNotBuffer() throws InterruptedException {
StringDecoder decoder = new StringDecoder(false);
Flux<ByteBuffer> source = Flux.just(Buffer.wrap("foo").byteBuffer(), Buffer.wrap("bar").byteBuffer());
StringDecoder decoder = new StringDecoder(allocator, false);
Flux<DataBuffer> source = Flux.just(stringBuffer("foo"), stringBuffer("bar"));
Flux<String> output = decoder.decode(source, ResolvableType.forClassWithGenerics(Flux.class, String.class), null);
List<String> results = StreamSupport.stream(output.toIterable().spliterator(), false).collect(toList());
assertEquals(2, results.size());
@ -71,7 +77,7 @@ public class StringDecoderTests {
@Test
public void decodeMono() throws InterruptedException {
Flux<ByteBuffer> source = Flux.just(Buffer.wrap("foo").byteBuffer(), Buffer.wrap("bar").byteBuffer());
Flux<DataBuffer> source = Flux.just(stringBuffer("foo"), stringBuffer("bar"));
Mono<String> mono = Mono.from(this.decoder.decode(source,
ResolvableType.forClassWithGenerics(Mono.class, String.class),
MediaType.TEXT_PLAIN));
@ -81,7 +87,7 @@ public class StringDecoderTests {
@Test
public void decodeSingle() throws InterruptedException {
Flux<ByteBuffer> source = Flux.just(Buffer.wrap("foo").byteBuffer(), Buffer.wrap("bar").byteBuffer());
Flux<DataBuffer> source = Flux.just(stringBuffer("foo"), stringBuffer("bar"));
Single<String> single = RxJava1SingleConverter.from(this.decoder.decode(source,
ResolvableType.forClassWithGenerics(Single.class, String.class),
MediaType.TEXT_PLAIN));

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -14,17 +14,19 @@
* limitations under the License.
*/
package org.springframework.reactive.codec.encoder;
package org.springframework.core.codec.support;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.StreamSupport;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import reactor.core.publisher.Flux;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.support.StringEncoder;
import org.springframework.http.MediaType;
import static java.util.stream.Collectors.toList;
@ -33,9 +35,15 @@ import static org.junit.Assert.*;
/**
* @author Sebastien Deleuze
*/
public class StringEncoderTests {
@RunWith(Parameterized.class)
public class StringEncoderTests extends AbstractAllocatingTestCase {
private final StringEncoder encoder = new StringEncoder();
private StringEncoder encoder;
@Before
public void createEncoder() {
encoder = new StringEncoder(allocator);
}
@Test
public void canWrite() {
@ -47,8 +55,8 @@ public class StringEncoderTests {
@Test
public void write() throws InterruptedException {
Flux<String> output = Flux.from(encoder.encode(Flux.just("foo"), null, null)).map(chunk -> {
byte[] b = new byte[chunk.remaining()];
chunk.get(b);
byte[] b = new byte[chunk.readableByteCount()];
chunk.read(b);
return new String(b, StandardCharsets.UTF_8);
});
List<String> results = StreamSupport.stream(output.toIterable().spliterator(), false).collect(toList());