Comply with Spring Framework code style

https://github.com/spring-projects/spring-framework/wiki/Spring-Framework-Code-Style
This commit is contained in:
Rossen Stoyanchev 2015-10-30 17:22:19 -04:00
parent 0989c8b3c2
commit c0dff3d2bb
33 changed files with 420 additions and 313 deletions

View File

@ -32,20 +32,22 @@ public class ReactiveStreamsToCompletableFutureConverter implements GenericConve
@Override
public Set<ConvertiblePair> getConvertibleTypes() {
Set<GenericConverter.ConvertiblePair> convertibleTypes = new LinkedHashSet<>();
convertibleTypes.add(new GenericConverter.ConvertiblePair(Publisher.class, CompletableFuture.class));
convertibleTypes.add(new GenericConverter.ConvertiblePair(CompletableFuture.class, Publisher.class));
return convertibleTypes;
Set<GenericConverter.ConvertiblePair> pairs = new LinkedHashSet<>();
pairs.add(new GenericConverter.ConvertiblePair(Publisher.class, CompletableFuture.class));
pairs.add(new GenericConverter.ConvertiblePair(CompletableFuture.class, Publisher.class));
return pairs;
}
@Override
public Object convert(Object source, TypeDescriptor sourceType, TypeDescriptor targetType) {
if (source != null) {
if (CompletableFuture.class.isAssignableFrom(source.getClass())) {
return reactor.core.publisher.convert.CompletableFutureConverter.from((CompletableFuture)source);
} else if (CompletableFuture.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) {
return reactor.core.publisher.convert.CompletableFutureConverter.fromSingle((Publisher)source);
}
if (source == null) {
return null;
}
else if (CompletableFuture.class.isAssignableFrom(source.getClass())) {
return reactor.core.publisher.convert.CompletableFutureConverter.from((CompletableFuture) source);
}
else if (CompletableFuture.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) {
return reactor.core.publisher.convert.CompletableFutureConverter.fromSingle((Publisher) source);
}
return null;
}

View File

@ -35,26 +35,30 @@ public final class ReactiveStreamsToReactorConverter implements GenericConverter
@Override
public Set<GenericConverter.ConvertiblePair> getConvertibleTypes() {
Set<GenericConverter.ConvertiblePair> convertibleTypes = new LinkedHashSet<>();
convertibleTypes.add(new GenericConverter.ConvertiblePair(Publisher.class, Stream.class));
convertibleTypes.add(new GenericConverter.ConvertiblePair(Stream.class, Publisher.class));
convertibleTypes.add(new GenericConverter.ConvertiblePair(Publisher.class, Promise.class));
convertibleTypes.add(new GenericConverter.ConvertiblePair(Promise.class, Publisher.class));
return convertibleTypes;
Set<GenericConverter.ConvertiblePair> pairs = new LinkedHashSet<>();
pairs.add(new GenericConverter.ConvertiblePair(Publisher.class, Stream.class));
pairs.add(new GenericConverter.ConvertiblePair(Stream.class, Publisher.class));
pairs.add(new GenericConverter.ConvertiblePair(Publisher.class, Promise.class));
pairs.add(new GenericConverter.ConvertiblePair(Promise.class, Publisher.class));
return pairs;
}
@Override
public Object convert(Object source, TypeDescriptor sourceType, TypeDescriptor targetType) {
if (source != null) {
if (Stream.class.isAssignableFrom(source.getClass())) {
return source;
} else if (Stream.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) {
return Streams.wrap((Publisher)source);
} else if (Promise.class.isAssignableFrom(source.getClass())) {
return ((Promise<?>)source);
} else if (Promise.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) {
return Streams.wrap((Publisher)source).next();
}
if (source == null) {
return null;
}
if (Stream.class.isAssignableFrom(source.getClass())) {
return source;
}
else if (Stream.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) {
return Streams.wrap((Publisher)source);
}
else if (Promise.class.isAssignableFrom(source.getClass())) {
return source;
}
else if (Promise.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) {
return Streams.wrap((Publisher)source).next();
}
return null;
}

View File

@ -36,28 +36,30 @@ public final class ReactiveStreamsToRxJava1Converter implements GenericConverter
@Override
public Set<GenericConverter.ConvertiblePair> getConvertibleTypes() {
Set<GenericConverter.ConvertiblePair> convertibleTypes = new LinkedHashSet<>();
convertibleTypes.add(new GenericConverter.ConvertiblePair(Publisher.class, Observable.class));
convertibleTypes.add(new GenericConverter.ConvertiblePair(Observable.class, Publisher.class));
convertibleTypes.add(new GenericConverter.ConvertiblePair(Publisher.class, Single.class));
convertibleTypes.add(new GenericConverter.ConvertiblePair(Single.class, Publisher.class));
return convertibleTypes;
Set<GenericConverter.ConvertiblePair> pairs = new LinkedHashSet<>();
pairs.add(new GenericConverter.ConvertiblePair(Publisher.class, Observable.class));
pairs.add(new GenericConverter.ConvertiblePair(Observable.class, Publisher.class));
pairs.add(new GenericConverter.ConvertiblePair(Publisher.class, Single.class));
pairs.add(new GenericConverter.ConvertiblePair(Single.class, Publisher.class));
return pairs;
}
@Override
public Object convert(Object source, TypeDescriptor sourceType, TypeDescriptor targetType) {
if (source != null) {
if (Observable.class.isAssignableFrom(source.getClass())) {
return RxJava1Converter.from((Observable) source);
}
else if (Observable.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) {
return RxJava1Converter.from((Publisher)source);
}
else if (Single.class.isAssignableFrom(source.getClass())) {
return reactor.core.publisher.convert.RxJava1SingleConverter.from((Single) source);
} else if (Single.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) {
return reactor.core.publisher.convert.RxJava1SingleConverter.from((Publisher)source);
}
if (source == null) {
return null;
}
if (Observable.class.isAssignableFrom(source.getClass())) {
return RxJava1Converter.from((Observable) source);
}
else if (Observable.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) {
return RxJava1Converter.from((Publisher) source);
}
else if (Single.class.isAssignableFrom(source.getClass())) {
return reactor.core.publisher.convert.RxJava1SingleConverter.from((Single) source);
}
else if (Single.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) {
return reactor.core.publisher.convert.RxJava1SingleConverter.from((Publisher) source);
}
return null;
}

View File

@ -21,10 +21,12 @@ import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
/**
* Represents a "reactive" HTTP input message, consisting of {@linkplain #getHeaders() headers}
* and a readable {@linkplain #getBody() streaming body }.
* Represents a "reactive" HTTP input message, consisting of
* {@linkplain #getHeaders() headers} and a readable
* {@linkplain #getBody() streaming body }.
*
* <p>Typically implemented by an HTTP request on the server-side, or a response on the client-side.
* <p>Typically implemented by an HTTP request on the server-side, or a response
* on the client-side.
*
* @author Arjen Poutsma
*/

View File

@ -21,19 +21,21 @@ import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
/**
* Represents a "reactive" HTTP output message, consisting of {@linkplain #getHeaders() headers}
* and the capability to add a {@linkplain #setBody(Publisher) body}.
* Represents a "reactive" HTTP output message, consisting of
* {@linkplain #getHeaders() headers} and the capability to add a
* {@linkplain #setBody(Publisher) body}.
*
* <p>Typically implemented by an HTTP request on the client-side, or a response on the server-side.
* <p>Typically implemented by an HTTP request on the client-side, or a response
* on the server-side.
*
* @author Arjen Poutsma
*/
public interface ReactiveHttpOutputMessage extends HttpMessage {
/**
* Sets the body of this message to the given publisher of {@link ByteBuffer}s. The
* publisher will be used to write to the underlying HTTP layer with asynchronously,
* given pull demand by this layer.
* Sets the body of this message to the given publisher of {@link ByteBuffer}s.
* The publisher will be used to write to the underlying HTTP layer with
* asynchronously, given pull demand by this layer.
*
* @param body the body to use
* @return a publisher that indicates completion

View File

@ -34,7 +34,9 @@ public class ByteBufferDecoder implements ByteToMessageDecoder<ByteBuffer> {
}
@Override
public Publisher<ByteBuffer> decode(Publisher<ByteBuffer> inputStream, ResolvableType type, MediaType mediaType, Object... hints) {
public Publisher<ByteBuffer> decode(Publisher<ByteBuffer> inputStream, ResolvableType type,
MediaType mediaType, Object... hints) {
return inputStream;
}
}

View File

@ -49,6 +49,7 @@ public interface ByteToMessageDecoder<T> {
* @param hints Additional information about how to do decode, optional.
* @return the decoded message stream
*/
Publisher<T> decode(Publisher<ByteBuffer> inputStream, ResolvableType type, MediaType mediaType, Object... hints);
Publisher<T> decode(Publisher<ByteBuffer> inputStream, ResolvableType type,
MediaType mediaType, Object... hints);
}

View File

@ -40,6 +40,7 @@ public class JacksonJsonDecoder implements ByteToMessageDecoder<Object> {
private final ObjectMapper mapper;
public JacksonJsonDecoder() {
this(new ObjectMapper());
}
@ -48,18 +49,22 @@ public class JacksonJsonDecoder implements ByteToMessageDecoder<Object> {
this.mapper = mapper;
}
@Override
public boolean canDecode(ResolvableType type, MediaType mediaType, Object... hints) {
return mediaType.isCompatibleWith(MediaType.APPLICATION_JSON);
}
@Override
public Publisher<Object> decode(Publisher<ByteBuffer> inputStream, ResolvableType type, MediaType mediaType, Object... hints) {
ObjectReader reader = mapper.readerFor(type.getRawClass());
public Publisher<Object> decode(Publisher<ByteBuffer> inputStream, ResolvableType type,
MediaType mediaType, Object... hints) {
ObjectReader reader = this.mapper.readerFor(type.getRawClass());
return Publishers.map(inputStream, chunk -> {
try {
return reader.readValue(new ByteBufferInputStream(chunk));
} catch (IOException e) {
}
catch (IOException e) {
throw new CodecException("Error while reading the data", e);
}
});

View File

@ -51,15 +51,19 @@ import org.springframework.util.Assert;
*/
public class Jaxb2Decoder implements ByteToMessageDecoder<Object> {
private final ConcurrentMap<Class<?>, JAXBContext> jaxbContexts = new ConcurrentHashMap<Class<?>, JAXBContext>(64);
private final ConcurrentMap<Class<?>, JAXBContext> jaxbContexts = new ConcurrentHashMap<>(64);
@Override
public boolean canDecode(ResolvableType type, MediaType mediaType, Object... hints) {
return mediaType.isCompatibleWith(MediaType.APPLICATION_XML) || mediaType.isCompatibleWith(MediaType.TEXT_XML);
return (mediaType.isCompatibleWith(MediaType.APPLICATION_XML) ||
mediaType.isCompatibleWith(MediaType.TEXT_XML));
}
@Override
public Publisher<Object> decode(Publisher<ByteBuffer> inputStream, ResolvableType type, MediaType mediaType, Object... hints) {
public Publisher<Object> decode(Publisher<ByteBuffer> inputStream, ResolvableType type,
MediaType mediaType, Object... hints) {
Class<?> outputClass = type.getRawClass();
try {
Source source = processSource(new StreamSource(new ByteBufferPublisherInputStream(inputStream)));
@ -77,7 +81,8 @@ public class Jaxb2Decoder implements ByteToMessageDecoder<Object> {
new CodecException("Could not unmarshal to [" + outputClass + "]: " + ex.getMessage(), ex));
}
catch (JAXBException ex) {
return Publishers.error(new CodecException("Could not instantiate JAXBContext: " + ex.getMessage(), ex));
return Publishers.error(new CodecException("Could not instantiate JAXBContext: " +
ex.getMessage(), ex));
}
}
@ -101,12 +106,11 @@ public class Jaxb2Decoder implements ByteToMessageDecoder<Object> {
protected final Unmarshaller createUnmarshaller(Class<?> clazz) throws JAXBException {
try {
JAXBContext jaxbContext = getJaxbContext(clazz);
Unmarshaller unmarshaller = jaxbContext.createUnmarshaller();
return unmarshaller;
return jaxbContext.createUnmarshaller();
}
catch (JAXBException ex) {
throw new CodecException(
"Could not create Unmarshaller for class [" + clazz + "]: " + ex.getMessage(), ex);
throw new CodecException("Could not create Unmarshaller for class " +
"[" + clazz + "]: " + ex.getMessage(), ex);
}
}
@ -119,8 +123,8 @@ public class Jaxb2Decoder implements ByteToMessageDecoder<Object> {
this.jaxbContexts.putIfAbsent(clazz, jaxbContext);
}
catch (JAXBException ex) {
throw new CodecException(
"Could not instantiate JAXBContext for class [" + clazz + "]: " + ex.getMessage(), ex);
throw new CodecException("Could not instantiate JAXBContext for class " +
"[" + clazz + "]: " + ex.getMessage(), ex);
}
}
return jaxbContext;

View File

@ -32,13 +32,15 @@ import org.springframework.http.MediaType;
import org.springframework.reactive.codec.encoder.JsonObjectEncoder;
/**
* Decode an arbitrary split byte stream representing JSON objects to a byte stream
* where each chunk is a well-formed JSON object.
* Decode an arbitrary split byte stream representing JSON objects to a byte
* stream where each chunk is a well-formed JSON object.
*
* This class does not do any real parsing or validation. A sequence of bytes is considered a JSON object/array
* if it contains a matching number of opening and closing braces/brackets.
* This class does not do any real parsing or validation. A sequence of byte
* is considered a JSON object/array if it contains a matching number of opening
* and closing braces/brackets.
*
* Based on <a href=https://github.com/netty/netty/blob/master/codec/src/main/java/io/netty/handler/codec/json/JsonObjectDecoder.java">Netty {@code JsonObjectDecoder}</a>
* Based on <a href=https://github.com/netty/netty/blob/master/codec/src/main/java/io/netty/handler/codec/json/JsonObjectDecoder.java">
* Netty {@code JsonObjectDecoder}</a>
*
* @author Sebastien Deleuze
* @see JsonObjectEncoder
@ -46,13 +48,19 @@ import org.springframework.reactive.codec.encoder.JsonObjectEncoder;
public class JsonObjectDecoder implements ByteToMessageDecoder<ByteBuffer> {
private static final int ST_CORRUPTED = -1;
private static final int ST_INIT = 0;
private static final int ST_DECODING_NORMAL = 1;
private static final int ST_DECODING_ARRAY_STREAM = 2;
private final int maxObjectLength;
private final boolean streamArrayElements;
public JsonObjectDecoder() {
// 1 MB
this(1024 * 1024);
@ -66,14 +74,15 @@ public class JsonObjectDecoder implements ByteToMessageDecoder<ByteBuffer> {
this(1024 * 1024, streamArrayElements);
}
/**
* @param maxObjectLength maximum number of bytes a JSON object/array may use (including braces and all).
* Objects exceeding this length are dropped and an {@link IllegalStateException}
* is thrown.
* @param streamArrayElements if set to true and the "top level" JSON object is an array, each of its entries
* is passed through the pipeline individually and immediately after it was fully
* received, allowing for arrays with "infinitely" many elements.
*
* @param maxObjectLength maximum number of bytes a JSON object/array may
* use (including braces and all). Objects exceeding this length are dropped
* and an {@link IllegalStateException} is thrown.
* @param streamArrayElements if set to true and the "top level" JSON object
* is an array, each of its entries is passed through the pipeline individually
* and immediately after it was fully received, allowing for arrays with
* "infinitely" many elements.
*/
public JsonObjectDecoder(int maxObjectLength, boolean streamArrayElements) {
if (maxObjectLength < 1) {
@ -90,91 +99,89 @@ public class JsonObjectDecoder implements ByteToMessageDecoder<ByteBuffer> {
}
@Override
public Publisher<ByteBuffer> decode(Publisher<ByteBuffer> inputStream, ResolvableType type, MediaType mediaType, Object... hints) {
public Publisher<ByteBuffer> decode(Publisher<ByteBuffer> inputStream, ResolvableType type,
MediaType mediaType, Object... hints) {
return Publishers.flatMap(inputStream, new Function<ByteBuffer, Publisher<? extends ByteBuffer>>() {
int openBraces;
int idx;
int index;
int state;
boolean insideString;
ByteBuf in;
Integer wrtIdx;
ByteBuf input;
Integer writerIndex;
@Override
public Publisher<? extends ByteBuffer> apply(ByteBuffer b) {
List<ByteBuffer> chunks = new ArrayList<>();
if (in == null) {
in = Unpooled.copiedBuffer(b);
wrtIdx = in.writerIndex();
if (this.input == null) {
this.input = Unpooled.copiedBuffer(b);
this.writerIndex = this.input.writerIndex();
}
else {
in = Unpooled.copiedBuffer(in, Unpooled.copiedBuffer(b));
wrtIdx = in.writerIndex();
this.input = Unpooled.copiedBuffer(this.input, Unpooled.copiedBuffer(b));
this.writerIndex = this.input.writerIndex();
}
if (state == ST_CORRUPTED) {
in.skipBytes(in.readableBytes());
if (this.state == ST_CORRUPTED) {
this.input.skipBytes(this.input.readableBytes());
return Publishers.error(new IllegalStateException("Corrupted stream"));
}
if (wrtIdx > maxObjectLength) {
if (this.writerIndex > maxObjectLength) {
// buffer size exceeded maxObjectLength; discarding the complete buffer.
in.skipBytes(in.readableBytes());
this.input.skipBytes(this.input.readableBytes());
reset();
return Publishers.error(new IllegalStateException(
"object length exceeds " + maxObjectLength + ": " +
wrtIdx +
" bytes discarded"));
return Publishers.error(new IllegalStateException("object length exceeds " +
maxObjectLength + ": " + this.writerIndex + " bytes discarded"));
}
for (/* use current idx */; idx < wrtIdx; idx++) {
byte c = in.getByte(idx);
if (state == ST_DECODING_NORMAL) {
decodeByte(c, in, idx);
for (/* use current index */; this.index < this.writerIndex; this.index++) {
byte c = this.input.getByte(this.index);
if (this.state == ST_DECODING_NORMAL) {
decodeByte(c, this.input, this.index);
// All opening braces/brackets have been closed. That's enough to conclude
// that the JSON object/array is complete.
if (openBraces == 0) {
ByteBuf json = extractObject(in, in.readerIndex(),
idx + 1 - in.readerIndex());
if (this.openBraces == 0) {
ByteBuf json = extractObject(this.input, this.input.readerIndex(),
this.index + 1 - this.input.readerIndex());
if (json != null) {
chunks.add(json.nioBuffer());
}
// The JSON object/array was extracted => discard the bytes from
// the input buffer.
in.readerIndex(idx + 1);
this.input.readerIndex(this.index + 1);
// Reset the object state to get ready for the next JSON object/text
// coming along the byte stream.
reset();
}
}
else if (state == ST_DECODING_ARRAY_STREAM) {
decodeByte(c, in, idx);
else if (this.state == ST_DECODING_ARRAY_STREAM) {
decodeByte(c, this.input, this.index);
if (!insideString && (openBraces == 1 && c == ',' ||
openBraces == 0 && c == ']')) {
if (!this.insideString && (this.openBraces == 1 && c == ',' ||
this.openBraces == 0 && c == ']')) {
// skip leading spaces. No range check is needed and the loop will terminate
// because the byte at position idx is not a whitespace.
for (int i = in.readerIndex(); Character.isWhitespace(in.getByte(i)); i++) {
in.skipBytes(1);
// because the byte at position index is not a whitespace.
for (int i = this.input.readerIndex(); Character.isWhitespace(this.input.getByte(i)); i++) {
this.input.skipBytes(1);
}
// skip trailing spaces.
int idxNoSpaces = idx - 1;
while (idxNoSpaces >= in.readerIndex() &&
Character.isWhitespace(in.getByte(idxNoSpaces))) {
int idxNoSpaces = this.index - 1;
while (idxNoSpaces >= this.input.readerIndex() &&
Character.isWhitespace(this.input.getByte(idxNoSpaces))) {
idxNoSpaces--;
}
ByteBuf json = extractObject(in, in.readerIndex(),
idxNoSpaces + 1 - in.readerIndex());
ByteBuf json = extractObject(this.input, this.input.readerIndex(),
idxNoSpaces + 1 - this.input.readerIndex());
if (json != null) {
chunks.add(json.nioBuffer());
}
in.readerIndex(idx + 1);
this.input.readerIndex(this.index + 1);
if (c == ']') {
reset();
@ -185,74 +192,73 @@ public class JsonObjectDecoder implements ByteToMessageDecoder<ByteBuffer> {
else if (c == '{' || c == '[') {
initDecoding(c, streamArrayElements);
if (state == ST_DECODING_ARRAY_STREAM) {
if (this.state == ST_DECODING_ARRAY_STREAM) {
// Discard the array bracket
in.skipBytes(1);
this.input.skipBytes(1);
}
// Discard leading spaces in front of a JSON object/array.
}
else if (Character.isWhitespace(c)) {
in.skipBytes(1);
this.input.skipBytes(1);
}
else {
state = ST_CORRUPTED;
this.state = ST_CORRUPTED;
return Publishers.error(new IllegalStateException(
"invalid JSON received at byte position " + idx +
": " + ByteBufUtil.hexDump(in)));
"invalid JSON received at byte position " + this.index + ": " +
ByteBufUtil.hexDump(this.input)));
}
}
if (in.readableBytes() == 0) {
idx = 0;
if (this.input.readableBytes() == 0) {
this.index = 0;
}
return Publishers.from(chunks);
}
/**
* Override this method if you want to filter the json objects/arrays that get passed through the pipeline.
* Override this method if you want to filter the json objects/arrays that
* get passed through the pipeline.
*/
@SuppressWarnings("UnusedParameters")
protected ByteBuf extractObject(ByteBuf buffer, int index, int length) {
return buffer.slice(index, length).retain();
}
private void decodeByte(byte c, ByteBuf in, int idx) {
if ((c == '{' || c == '[') && !insideString) {
openBraces++;
private void decodeByte(byte c, ByteBuf input, int index) {
if ((c == '{' || c == '[') && !this.insideString) {
this.openBraces++;
}
else if ((c == '}' || c == ']') && !insideString) {
openBraces--;
else if ((c == '}' || c == ']') && !this.insideString) {
this.openBraces--;
}
else if (c == '"') {
// start of a new JSON string. It's necessary to detect strings as they may
// also contain braces/brackets and that could lead to incorrect results.
if (!insideString) {
insideString = true;
if (!this.insideString) {
this.insideString = true;
// If the double quote wasn't escaped then this is the end of a string.
}
else if (in.getByte(idx - 1) != '\\') {
insideString = false;
else if (input.getByte(index - 1) != '\\') {
this.insideString = false;
}
}
}
private void initDecoding(byte openingBrace, boolean streamArrayElements) {
openBraces = 1;
this.openBraces = 1;
if (openingBrace == '[' && streamArrayElements) {
state = ST_DECODING_ARRAY_STREAM;
this.state = ST_DECODING_ARRAY_STREAM;
}
else {
state = ST_DECODING_NORMAL;
this.state = ST_DECODING_NORMAL;
}
}
private void reset() {
insideString = false;
state = ST_INIT;
openBraces = 0;
this.insideString = false;
this.state = ST_INIT;
this.openBraces = 0;
}
});
}
}

View File

@ -47,7 +47,9 @@ public class StringDecoder implements ByteToMessageDecoder<String> {
}
@Override
public Publisher<String> decode(Publisher<ByteBuffer> inputStream, ResolvableType type, MediaType mediaType, Object... hints) {
public Publisher<String> decode(Publisher<ByteBuffer> inputStream, ResolvableType type,
MediaType mediaType, Object... hints) {
Charset charset = HintUtils.getHintByClass(Charset.class, hints, DEFAULT_CHARSET);
return Publishers.map(inputStream, chunk -> new String(new Buffer(chunk).asBytes(), charset));
}

View File

@ -34,8 +34,11 @@ public class ByteBufferEncoder implements MessageToByteEncoder<ByteBuffer> {
}
@Override
public Publisher<ByteBuffer> encode(Publisher<? extends ByteBuffer> messageStream, ResolvableType type, MediaType mediaType, Object... hints) {
return (Publisher<ByteBuffer>)messageStream;
@SuppressWarnings("unchecked")
public Publisher<ByteBuffer> encode(Publisher<? extends ByteBuffer> messageStream,
ResolvableType type, MediaType mediaType, Object... hints) {
return (Publisher<ByteBuffer>) messageStream;
}
}

View File

@ -54,7 +54,9 @@ public class JacksonJsonEncoder implements MessageToByteEncoder<Object> {
}
@Override
public Publisher<ByteBuffer> encode(Publisher<? extends Object> messageStream, ResolvableType type, MediaType mediaType, Object... hints) {
public Publisher<ByteBuffer> encode(Publisher<? extends Object> messageStream,
ResolvableType type, MediaType mediaType, Object... hints) {
return Publishers.map(messageStream, value -> {
Buffer buffer = new Buffer();
BufferOutputStream outputStream = new BufferOutputStream(buffer);

View File

@ -45,15 +45,19 @@ import org.springframework.util.ClassUtils;
*/
public class Jaxb2Encoder implements MessageToByteEncoder<Object> {
private final ConcurrentMap<Class<?>, JAXBContext> jaxbContexts = new ConcurrentHashMap<Class<?>, JAXBContext>(64);
private final ConcurrentMap<Class<?>, JAXBContext> jaxbContexts = new ConcurrentHashMap<>(64);
@Override
public boolean canEncode(ResolvableType type, MediaType mediaType, Object... hints) {
return mediaType.isCompatibleWith(MediaType.APPLICATION_XML) || mediaType.isCompatibleWith(MediaType.TEXT_XML);
return (mediaType.isCompatibleWith(MediaType.APPLICATION_XML) ||
mediaType.isCompatibleWith(MediaType.TEXT_XML));
}
@Override
public Publisher<ByteBuffer> encode(Publisher<? extends Object> messageStream, ResolvableType type, MediaType mediaType, Object... hints) {
public Publisher<ByteBuffer> encode(Publisher<? extends Object> messageStream, ResolvableType type,
MediaType mediaType, Object... hints) {
return Publishers.map(messageStream, value -> {
try {
Buffer buffer = new Buffer();
@ -64,12 +68,12 @@ public class Jaxb2Encoder implements MessageToByteEncoder<Object> {
marshaller.marshal(value, outputStream);
buffer.flip();
return buffer.byteBuffer();
} catch (MarshalException ex) {
throw new CodecException(
"Could not marshal [" + value + "]: " + ex.getMessage(), ex);
} catch (JAXBException ex) {
throw new CodecException(
"Could not instantiate JAXBContext: " + ex.getMessage(), ex);
}
catch (MarshalException ex) {
throw new CodecException("Could not marshal [" + value + "]: " + ex.getMessage(), ex);
}
catch (JAXBException ex) {
throw new CodecException("Could not instantiate JAXBContext: " + ex.getMessage(), ex);
}
});
}
@ -77,12 +81,11 @@ public class Jaxb2Encoder implements MessageToByteEncoder<Object> {
protected final Marshaller createMarshaller(Class<?> clazz) {
try {
JAXBContext jaxbContext = getJaxbContext(clazz);
Marshaller marshaller = jaxbContext.createMarshaller();
return marshaller;
return jaxbContext.createMarshaller();
}
catch (JAXBException ex) {
throw new CodecException(
"Could not create Marshaller for class [" + clazz + "]: " + ex.getMessage(), ex);
throw new CodecException("Could not create Marshaller for class " +
"[" + clazz + "]: " + ex.getMessage(), ex);
}
}
@ -95,8 +98,8 @@ public class Jaxb2Encoder implements MessageToByteEncoder<Object> {
this.jaxbContexts.putIfAbsent(clazz, jaxbContext);
}
catch (JAXBException ex) {
throw new CodecException(
"Could not instantiate JAXBContext for class [" + clazz + "]: " + ex.getMessage(), ex);
throw new CodecException("Could not instantiate JAXBContext for class " +
"[" + clazz + "]: " + ex.getMessage(), ex);
}
}
return jaxbContext;

View File

@ -33,8 +33,8 @@ import org.springframework.reactive.codec.decoder.JsonObjectDecoder;
import static reactor.Publishers.lift;
/**
* Encode a byte stream of individual JSON element to a byte stream representing a single
* JSON array when if it contains more than one element.
* Encode a byte stream of individual JSON element to a byte stream representing
* a single JSON array when if it contains more than one element.
*
* @author Sebastien Deleuze
* @author Stephane Maldini
@ -52,27 +52,36 @@ public class JsonObjectEncoder implements MessageToByteEncoder<ByteBuffer> {
@Override
public Publisher<ByteBuffer> encode(Publisher<? extends ByteBuffer> messageStream,
ResolvableType type, MediaType mediaType, Object... hints) {
//noinspection Convert2MethodRef
return lift(messageStream, bbs -> new JsonEncoderBarrier(bbs));
}
private static class JsonEncoderBarrier extends SubscriberBarrier<ByteBuffer, ByteBuffer> {
private volatile long requested;
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<JsonEncoderBarrier> REQUESTED =
AtomicLongFieldUpdater.newUpdater(JsonEncoderBarrier.class, "requested");
private volatile int terminated;
static final AtomicIntegerFieldUpdater<JsonEncoderBarrier> TERMINATED =
AtomicIntegerFieldUpdater.newUpdater(JsonEncoderBarrier.class, "terminated");
ByteBuffer prev = null;
long count = 0;
private ByteBuffer prev = null;
private long count = 0;
private volatile long requested;
private volatile int terminated;
public JsonEncoderBarrier(Subscriber<? super ByteBuffer> subscriber) {
super(subscriber);
}
@Override
protected void doRequest(long n) {
BackpressureUtils.getAndAdd(REQUESTED, this, n);
@ -86,17 +95,17 @@ public class JsonObjectEncoder implements MessageToByteEncoder<ByteBuffer> {
@Override
protected void doNext(ByteBuffer next) {
count++;
if (count == 1) {
prev = next;
this.count++;
if (this.count == 1) {
this.prev = next;
super.doRequest(1);
return;
}
ByteBuffer tmp = prev;
prev = next;
ByteBuffer tmp = this.prev;
this.prev = next;
Buffer buffer = new Buffer();
if (count == 2) {
if (this.count == 2) {
buffer.append("[");
}
buffer.append(tmp);
@ -104,25 +113,25 @@ public class JsonObjectEncoder implements MessageToByteEncoder<ByteBuffer> {
buffer.flip();
BackpressureUtils.getAndSub(REQUESTED, this, 1L);
subscriber.onNext(buffer.byteBuffer());
downstream().onNext(buffer.byteBuffer());
}
protected void drainLast(){
if(BackpressureUtils.getAndSub(REQUESTED, this, 1L) > 0) {
Buffer buffer = new Buffer();
buffer.append(prev);
if (count > 1) {
buffer.append(this.prev);
if (this.count > 1) {
buffer.append("]");
}
buffer.flip();
subscriber.onNext(buffer.byteBuffer());
downstream().onNext(buffer.byteBuffer());
super.doComplete();
}
}
@Override
protected void doComplete() {
if(TERMINATED.compareAndSet(this, 0, 1)){
if(TERMINATED.compareAndSet(this, 0, 1)) {
drainLast();
}
}

View File

@ -51,6 +51,7 @@ public interface MessageToByteEncoder<T> {
* @param hints Additional information about how to encode, optional.
* @return the encoded bytes stream
*/
Publisher<ByteBuffer> encode(Publisher<? extends T> messageStream, ResolvableType type, MediaType mediaType, Object... hints);
Publisher<ByteBuffer> encode(Publisher<? extends T> messageStream, ResolvableType type,
MediaType mediaType, Object... hints);
}

View File

@ -46,7 +46,9 @@ public class StringEncoder implements MessageToByteEncoder<String> {
}
@Override
public Publisher<ByteBuffer> encode(Publisher<? extends String> elementStream, ResolvableType type, MediaType mediaType, Object... hints) {
public Publisher<ByteBuffer> encode(Publisher<? extends String> elementStream,
ResolvableType type, MediaType mediaType, Object... hints) {
final Charset charset = HintUtils.getHintByClass(Charset.class, hints, DEFAULT_CHARSET);
return Publishers.map(elementStream, s -> ByteBuffer.wrap(s.getBytes(charset)));
}

View File

@ -130,7 +130,8 @@ public class ByteBufferPublisherInputStream extends InputStream {
if (this.currentStream != null && this.currentStream.available() > 0) {
return this.currentStream;
} else {
// take() blocks until next or complete() then return null, but that's OK since this is a *blocking* InputStream
// take() blocks until next or complete() then return null,
// but that's OK since this is a *blocking* InputStream
ByteBuffer signal = this.queue.take();
if(signal == null){
this.completed = true;

View File

@ -72,20 +72,20 @@ public class DispatcherHandler implements HttpHandler, ApplicationContextAware {
protected void initStrategies(ApplicationContext context) {
Map<String, HandlerMapping> mappingBeans =
BeanFactoryUtils.beansOfTypeIncludingAncestors(context, HandlerMapping.class, true, false);
Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerMapping.class, true, false);
this.handlerMappings = new ArrayList<>(mappingBeans.values());
AnnotationAwareOrderComparator.sort(this.handlerMappings);
Map<String, HandlerAdapter> adapterBeans =
BeanFactoryUtils.beansOfTypeIncludingAncestors(context, HandlerAdapter.class, true, false);
Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerAdapter.class, true, false);
this.handlerAdapters = new ArrayList<>(adapterBeans.values());
AnnotationAwareOrderComparator.sort(this.handlerAdapters);
Map<String, HandlerResultHandler> beans =
BeanFactoryUtils.beansOfTypeIncludingAncestors(context, HandlerResultHandler.class, true, false);
Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerResultHandler.class, true, false);
this.resultHandlers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(this.resultHandlers);
@ -94,7 +94,6 @@ public class DispatcherHandler implements HttpHandler, ApplicationContextAware {
@Override
public Publisher<Void> handle(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response) {
if (logger.isDebugEnabled()) {
logger.debug("Processing " + request.getMethod() + " request for [" + request.getURI() + "]");
}

View File

@ -52,7 +52,6 @@ public interface HandlerAdapter {
* @param handler handler to use. This object must have previously been passed
* to the {@code supports} method of this interface, which must have
* returned {@code true}.
* @throws Exception in case of errors
* @return An {@link HandlerResult} instance
*/
Publisher<HandlerResult> handle(ReactiveServerHttpRequest request,

View File

@ -48,6 +48,7 @@ public interface HandlerResultHandler {
* when the handling is complete (success or error) including the flush of the data on the
* network.
*/
Publisher<Void> handleResult(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response, HandlerResult result);
Publisher<Void> handleResult(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response,
HandlerResult result);
}

View File

@ -34,6 +34,11 @@ public class SimpleHandlerResultHandler implements Ordered, HandlerResultHandler
private int order = Ordered.LOWEST_PRECEDENCE;
public void setOrder(int order) {
this.order = order;
}
@Override
public int getOrder() {
return this.order;
@ -46,8 +51,10 @@ public class SimpleHandlerResultHandler implements Ordered, HandlerResultHandler
}
@Override
public Publisher<Void> handleResult(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response, HandlerResult result) {
Publisher<Void> handleComplete = Publishers.completable((Publisher<?>)result.getValue());
return Publishers.concat(Publishers.from(Arrays.asList(handleComplete, response.writeHeaders())));
public Publisher<Void> handleResult(ReactiveServerHttpRequest request,
ReactiveServerHttpResponse response, HandlerResult result) {
Publisher<Void> completion = Publishers.completable((Publisher<?>)result.getValue());
return Publishers.concat(Publishers.from(Arrays.asList(completion, response.writeHeaders())));
}
}

View File

@ -64,7 +64,8 @@ public class InvocableHandlerMethod extends HandlerMethod {
List<Publisher<Object>> argPublishers = getMethodArguments(request, providedArgs);
Publisher<Object[]> argValues = (!argPublishers.isEmpty() ?
Streams.zip(argPublishers, this::unwrapOptionalArgValues) : Publishers.just(new Object[0]));
Streams.zip(argPublishers, this::unwrapOptionalArgValues) :
Publishers.just(new Object[0]));
return Publishers.map(argValues, args -> {
if (logger.isTraceEnabled()) {
@ -76,7 +77,8 @@ public class InvocableHandlerMethod extends HandlerMethod {
try {
returnValue = doInvoke(args);
if (logger.isTraceEnabled()) {
logger.trace("Method [" + getMethod().getName() + "] returned [" + returnValue + "]");
logger.trace("Method [" + getMethod().getName() + "] returned " +
"[" + returnValue + "]");
}
}
catch (Exception ex) {

View File

@ -43,24 +43,29 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve
private static final Charset UTF_8 = Charset.forName("UTF-8");
private final List<ByteToMessageDecoder<?>> deserializers;
private final List<ByteToMessageDecoder<ByteBuffer>> preProcessors;
private final ConversionService conversionService;
public RequestBodyArgumentResolver(List<ByteToMessageDecoder<?>> deserializers,
public RequestBodyArgumentResolver(List<ByteToMessageDecoder<?>> decoders,
ConversionService conversionService) {
this(deserializers, conversionService, Collections.EMPTY_LIST);
this(decoders, conversionService, Collections.EMPTY_LIST);
}
public RequestBodyArgumentResolver(List<ByteToMessageDecoder<?>> deserializers,
ConversionService conversionService,
List<ByteToMessageDecoder<ByteBuffer>> preProcessors) {
this.deserializers = deserializers;
this.conversionService = conversionService;
public RequestBodyArgumentResolver(List<ByteToMessageDecoder<?>> decoders,
ConversionService service, List<ByteToMessageDecoder<ByteBuffer>> preProcessors) {
this.deserializers = decoders;
this.conversionService = service;
this.preProcessors = preProcessors;
}
@Override
public boolean supportsParameter(MethodParameter parameter) {
return parameter.hasParameterAnnotation(RequestBody.class);
@ -75,14 +80,15 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve
Publisher<ByteBuffer> inputStream = request.getBody();
Publisher<?> elementStream = inputStream;
ResolvableType elementType = type.hasGenerics() ? type.getGeneric(0) : type;
ByteToMessageDecoder<?> deserializer = resolveDeserializers(request, elementType, mediaType, hints.toArray());
if (deserializer != null) {
List<ByteToMessageDecoder<ByteBuffer>> preProcessors =
resolvePreProcessors(request, elementType, mediaType,hints.toArray());
ByteToMessageDecoder<?> decoder = resolveDecoder(request, elementType, mediaType, hints.toArray());
if (decoder != null) {
List<ByteToMessageDecoder<ByteBuffer>> preProcessors = resolvePreProcessors(
request, elementType, mediaType,hints.toArray());
for (ByteToMessageDecoder<ByteBuffer> preProcessor : preProcessors) {
inputStream = preProcessor.decode(inputStream, elementType, mediaType, hints.toArray());
}
elementStream = deserializer.decode(inputStream, elementType, mediaType, hints.toArray());
elementStream = decoder.decode(inputStream, elementType, mediaType, hints.toArray());
}
if (this.conversionService.canConvert(Publisher.class, type.getRawClass())) {
return Publishers.just(this.conversionService.convert(elementStream, type.getRawClass()));
@ -97,7 +103,9 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve
return ( mediaTypes.size() > 0 ? mediaTypes.get(0) : MediaType.TEXT_PLAIN);
}
private ByteToMessageDecoder<?> resolveDeserializers(ReactiveServerHttpRequest request, ResolvableType type, MediaType mediaType, Object[] hints) {
private ByteToMessageDecoder<?> resolveDecoder(ReactiveServerHttpRequest request,
ResolvableType type, MediaType mediaType, Object[] hints) {
for (ByteToMessageDecoder<?> deserializer : this.deserializers) {
if (deserializer.canDecode(type, mediaType, hints)) {
return deserializer;
@ -106,7 +114,10 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve
return null;
}
private List<ByteToMessageDecoder<ByteBuffer>> resolvePreProcessors(ReactiveServerHttpRequest request, ResolvableType type, MediaType mediaType, Object[] hints) {
private List<ByteToMessageDecoder<ByteBuffer>> resolvePreProcessors(
ReactiveServerHttpRequest request, ResolvableType type, MediaType mediaType,
Object[] hints) {
List<ByteToMessageDecoder<ByteBuffer>> preProcessors = new ArrayList<>();
for (ByteToMessageDecoder<ByteBuffer> preProcessor : this.preProcessors) {
if (preProcessor.canDecode(type, mediaType, hints)) {
@ -115,4 +126,5 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve
}
return preProcessors;
}
}

View File

@ -19,6 +19,7 @@ package org.springframework.reactive.web.dispatch.method.annotation;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.reactivestreams.Publisher;
@ -57,12 +58,16 @@ public class RequestMappingHandlerAdapter implements HandlerAdapter, Initializin
@Override
public void afterPropertiesSet() throws Exception {
if (this.argumentResolvers == null) {
List<ByteToMessageDecoder<?>> decoders = Arrays.asList(new ByteBufferDecoder(),
new StringDecoder(), new JacksonJsonDecoder());
List<ByteToMessageDecoder<ByteBuffer>> preProcessors = Collections.singletonList(
new JsonObjectDecoder());
this.argumentResolvers = new ArrayList<>();
this.argumentResolvers.add(new RequestParamArgumentResolver());
List<ByteToMessageDecoder<?>> deserializers = Arrays.asList(new ByteBufferDecoder(),
new StringDecoder(), new JacksonJsonDecoder());
List<ByteToMessageDecoder<ByteBuffer>> preProcessors = Arrays.asList(new JsonObjectDecoder());
this.argumentResolvers.add(new RequestBodyArgumentResolver(deserializers,
this.argumentResolvers.add(new RequestBodyArgumentResolver(decoders,
new DefaultConversionService(), preProcessors));
}
}

View File

@ -94,13 +94,12 @@ public class RequestMappingHandlerMapping implements HandlerMapping,
@Override
public Object getHandler(ReactiveServerHttpRequest request) {
String path = request.getURI().getPath();
HttpMethod method = request.getMethod();
for (Map.Entry<RequestMappingInfo, HandlerMethod> entry : this.methodMap.entrySet()) {
RequestMappingInfo info = entry.getKey();
if (path.equals(info.getPath()) && (info.getMethods().isEmpty() || info.getMethods().contains(RequestMethod.valueOf(method.name())))) {
if (info.matchesRequest(request)) {
if (logger.isDebugEnabled()) {
logger.debug("Mapped " + method + " " + path + " to [" + entry.getValue() + "]");
logger.debug("Mapped " + request.getMethod() + " " +
request.getURI().getPath() + " to [" + entry.getValue() + "]");
}
return entry.getValue();
}
@ -120,6 +119,11 @@ public class RequestMappingHandlerMapping implements HandlerMapping,
this(path, asList(methods));
}
private static List<RequestMethod> asList(RequestMethod... requestMethods) {
return (requestMethods != null ?
Arrays.asList(requestMethods) : Collections.<RequestMethod>emptyList());
}
public RequestMappingInfo(String path, Collection<RequestMethod> methods) {
this.path = path;
this.methods = new TreeSet<>(methods);
@ -127,20 +131,22 @@ public class RequestMappingHandlerMapping implements HandlerMapping,
public String getPath() {
return path;
return this.path;
}
public Set<RequestMethod> getMethods() {
return methods;
return this.methods;
}
private static List<RequestMethod> asList(RequestMethod... requestMethods) {
return (requestMethods != null ? Arrays.asList(requestMethods) : Collections.<RequestMethod>emptyList());
public boolean matchesRequest(ReactiveServerHttpRequest request) {
String httpMethod = request.getMethod().name();
return request.getURI().getPath().equals(getPath()) &&
(getMethods().isEmpty() || getMethods().contains(RequestMethod.valueOf(httpMethod)));
}
@Override
public int compareTo(Object o) {
RequestMappingInfo other = (RequestMappingInfo)o;
RequestMappingInfo other = (RequestMappingInfo) o;
if (!this.path.equals(other.getPath())) {
return -1;
}

View File

@ -16,6 +16,7 @@
package org.springframework.reactive.web.dispatch.method.annotation;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
@ -61,21 +62,26 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered
private int order = 0;
public ResponseBodyResultHandler(List<MessageToByteEncoder<?>> serializers) {
this(serializers, Collections.EMPTY_LIST);
public ResponseBodyResultHandler(List<MessageToByteEncoder<?>> encoders) {
this(encoders, Collections.EMPTY_LIST);
}
public ResponseBodyResultHandler(List<MessageToByteEncoder<?>> serializers, List<MessageToByteEncoder<ByteBuffer>> postProcessors) {
this(serializers, postProcessors, new DefaultConversionService());
public ResponseBodyResultHandler(List<MessageToByteEncoder<?>> encoders,
List<MessageToByteEncoder<ByteBuffer>> postProcessors) {
this(encoders, postProcessors, new DefaultConversionService());
}
public ResponseBodyResultHandler(List<MessageToByteEncoder<?>> serializers, List<MessageToByteEncoder<ByteBuffer>>
postProcessors, ConversionService conversionService) {
this.serializers = serializers;
public ResponseBodyResultHandler(List<MessageToByteEncoder<?>> encoders,
List<MessageToByteEncoder<ByteBuffer>> postProcessors,
ConversionService conversionService) {
this.serializers = encoders;
this.postProcessors = postProcessors;
this.conversionService = conversionService;
}
public void setOrder(int order) {
this.order = order;
}
@ -90,8 +96,8 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered
public boolean supports(HandlerResult result) {
Object handler = result.getHandler();
if (handler instanceof HandlerMethod) {
HandlerMethod handlerMethod = (HandlerMethod) handler;
return AnnotatedElementUtils.isAnnotated(handlerMethod.getMethod(), ResponseBody.class.getName());
Method method = ((HandlerMethod) handler).getMethod();
return AnnotatedElementUtils.isAnnotated(method, ResponseBody.class.getName());
}
return false;
}
@ -99,8 +105,7 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered
@Override
@SuppressWarnings("unchecked")
public Publisher<Void> handleResult(ReactiveServerHttpRequest request,
ReactiveServerHttpResponse response,
HandlerResult result) {
ReactiveServerHttpResponse response, HandlerResult result) {
Object value = result.getValue();
HandlerMethod handlerMethod = (HandlerMethod) result.getHandler();
@ -125,19 +130,22 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered
elementType = type;
}
MessageToByteEncoder<Object> serializer = (MessageToByteEncoder<Object>) resolveSerializer(request, elementType, mediaType, hints.toArray());
if (serializer != null) {
Publisher<ByteBuffer> outputStream = serializer.encode(elementStream, type, mediaType, hints.toArray());
List<MessageToByteEncoder<ByteBuffer>> postProcessors = resolvePostProcessors(request, elementType, mediaType, hints.toArray());
MessageToByteEncoder<Object> encoder = (MessageToByteEncoder<Object>) resolveEncoder(
request, elementType, mediaType, hints.toArray());
if (encoder != null) {
Publisher<ByteBuffer> outputStream = encoder.encode(elementStream, type, mediaType, hints.toArray());
List<MessageToByteEncoder<ByteBuffer>> postProcessors = resolvePostProcessors(request,
elementType, mediaType, hints.toArray());
for (MessageToByteEncoder<ByteBuffer> postProcessor : postProcessors) {
outputStream = postProcessor.encode(outputStream, elementType, mediaType, hints.toArray());
}
response.getHeaders().setContentType(mediaType);
return response.setBody(outputStream);
}
return Publishers.error(new IllegalStateException(
"Return value type '" + returnType.getParameterType().getName() +
"' with media type '" + mediaType + "' not supported"));
String returnTypeName = returnType.getParameterType().getName();
return Publishers.error(new IllegalStateException("Return value type '" + returnTypeName +
"' with media type '" + mediaType + "' not supported"));
}
private MediaType resolveMediaType(ReactiveServerHttpRequest request) {
@ -147,7 +155,9 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered
return ( mediaTypes.size() > 0 ? mediaTypes.get(0) : MediaType.TEXT_PLAIN);
}
private MessageToByteEncoder<?> resolveSerializer(ReactiveServerHttpRequest request, ResolvableType type, MediaType mediaType, Object[] hints) {
private MessageToByteEncoder<?> resolveEncoder(ReactiveServerHttpRequest request,
ResolvableType type, MediaType mediaType, Object[] hints) {
for (MessageToByteEncoder<?> codec : this.serializers) {
if (codec.canEncode(type, mediaType, hints)) {
return codec;
@ -156,7 +166,10 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered
return null;
}
private List<MessageToByteEncoder<ByteBuffer>> resolvePostProcessors(ReactiveServerHttpRequest request, ResolvableType type, MediaType mediaType, Object[] hints) {
private List<MessageToByteEncoder<ByteBuffer>> resolvePostProcessors(
ReactiveServerHttpRequest request, ResolvableType type, MediaType mediaType,
Object[] hints) {
List<MessageToByteEncoder<ByteBuffer>> postProcessors = new ArrayList<>();
for (MessageToByteEncoder<ByteBuffer> postProcessor : this.postProcessors) {
if (postProcessor.canEncode(type, mediaType, hints)) {

View File

@ -71,10 +71,11 @@ public class RxNettyServerHttpResponse implements ReactiveServerHttpResponse {
}
@Override
public Publisher<Void> setBody(Publisher<ByteBuffer> contentPublisher) {
public Publisher<Void> setBody(Publisher<ByteBuffer> publisher) {
applyHeaders();
Observable<byte[]> contentObservable = RxJava1Converter.from(contentPublisher).map(content -> new Buffer(content).asBytes());
return RxJava1Converter.from(this.response.writeBytes(contentObservable));
Observable<byte[]> observable = RxJava1Converter.from(publisher).map(
content -> new Buffer(content).asBytes());
return RxJava1Converter.from(this.response.writeBytes(observable));
}
private void applyHeaders() {

View File

@ -57,17 +57,17 @@ public class HttpHandlerServlet extends HttpServlet {
throws ServletException, IOException {
AsyncContext context = request.startAsync();
AsyncContextSynchronizer contextSynchronizer = new AsyncContextSynchronizer(context);
AsyncContextSynchronizer synchronizer = new AsyncContextSynchronizer(context);
RequestBodyPublisher requestPublisher = new RequestBodyPublisher(contextSynchronizer, BUFFER_SIZE);
RequestBodyPublisher requestPublisher = new RequestBodyPublisher(synchronizer, BUFFER_SIZE);
request.getInputStream().setReadListener(requestPublisher);
ServletServerHttpRequest httpRequest = new ServletServerHttpRequest(request, requestPublisher);
ResponseBodySubscriber responseSubscriber = new ResponseBodySubscriber(contextSynchronizer);
ResponseBodySubscriber responseSubscriber = new ResponseBodySubscriber(synchronizer);
response.getOutputStream().setWriteListener(responseSubscriber);
ServletServerHttpResponse httpResponse = new ServletServerHttpResponse(response, responseSubscriber);
HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(contextSynchronizer, httpResponse);
HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(synchronizer, httpResponse);
this.handler.handle(httpRequest, httpResponse).subscribe(resultSubscriber);
}
@ -79,7 +79,9 @@ public class HttpHandlerServlet extends HttpServlet {
private final ServletServerHttpResponse response;
public HandlerResultSubscriber(AsyncContextSynchronizer synchronizer, ServletServerHttpResponse response) {
public HandlerResultSubscriber(AsyncContextSynchronizer synchronizer,
ServletServerHttpResponse response) {
this.synchronizer = synchronizer;
this.response = response;
}

View File

@ -46,7 +46,9 @@ public class ServletServerHttpRequest implements ReactiveServerHttpRequest {
private HttpHeaders headers;
public ServletServerHttpRequest(HttpServletRequest servletRequest, Publisher<ByteBuffer> requestBodyPublisher) {
public ServletServerHttpRequest(HttpServletRequest servletRequest,
Publisher<ByteBuffer> requestBodyPublisher) {
Assert.notNull(servletRequest, "HttpServletRequest must not be null");
this.servletRequest = servletRequest;
this.requestBodyPublisher = requestBodyPublisher;
@ -74,8 +76,8 @@ public class ServletServerHttpRequest implements ReactiveServerHttpRequest {
public HttpHeaders getHeaders() {
if (this.headers == null) {
this.headers = new HttpHeaders();
for (Enumeration<?> headerNames = this.servletRequest.getHeaderNames(); headerNames.hasMoreElements(); ) {
String headerName = (String) headerNames.nextElement();
for (Enumeration<?> names = this.servletRequest.getHeaderNames(); names.hasMoreElements(); ) {
String headerName = (String) names.nextElement();
for (Enumeration<?> headerValues = this.servletRequest.getHeaders(headerName);
headerValues.hasMoreElements(); ) {
String headerValue = (String) headerValues.nextElement();

View File

@ -17,6 +17,7 @@
package org.springframework.reactive.web.http.servlet;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletResponse;
@ -26,6 +27,7 @@ import reactor.Publishers;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.server.ReactiveServerHttpResponse;
import org.springframework.util.Assert;
@ -34,27 +36,29 @@ import org.springframework.util.Assert;
*/
public class ServletServerHttpResponse implements ReactiveServerHttpResponse {
private final HttpServletResponse servletResponse;
private final HttpServletResponse response;
private final ResponseBodySubscriber responseSubscriber;
private final ResponseBodySubscriber subscriber;
private final HttpHeaders headers;
private boolean headersWritten = false;
public ServletServerHttpResponse(HttpServletResponse servletResponse, ResponseBodySubscriber responseSubscriber) {
Assert.notNull(servletResponse, "'servletResponse' must not be null");
Assert.notNull(responseSubscriber, "'responseSubscriber' must not be null");
this.servletResponse = servletResponse;
this.responseSubscriber = responseSubscriber;
public ServletServerHttpResponse(HttpServletResponse response,
ResponseBodySubscriber subscriber) {
Assert.notNull(response, "'response' must not be null");
Assert.notNull(subscriber, "'subscriber' must not be null");
this.response = response;
this.subscriber = subscriber;
this.headers = new HttpHeaders();
}
@Override
public void setStatusCode(HttpStatus status) {
this.servletResponse.setStatus(status.value());
this.response.setStatus(status.value());
}
@Override
@ -71,7 +75,7 @@ public class ServletServerHttpResponse implements ReactiveServerHttpResponse {
@Override
public Publisher<Void> setBody(final Publisher<ByteBuffer> contentPublisher) {
applyHeaders();
return (s -> contentPublisher.subscribe(responseSubscriber));
return (s -> contentPublisher.subscribe(subscriber));
}
private void applyHeaders() {
@ -79,16 +83,16 @@ public class ServletServerHttpResponse implements ReactiveServerHttpResponse {
for (Map.Entry<String, List<String>> entry : this.headers.entrySet()) {
String headerName = entry.getKey();
for (String headerValue : entry.getValue()) {
this.servletResponse.addHeader(headerName, headerValue);
this.response.addHeader(headerName, headerValue);
}
}
// HttpServletResponse exposes some headers as properties: we should include those if not already present
if (this.servletResponse.getContentType() == null && this.headers.getContentType() != null) {
this.servletResponse.setContentType(this.headers.getContentType().toString());
MediaType contentType = this.headers.getContentType();
if (this.response.getContentType() == null && contentType != null) {
this.response.setContentType(contentType.toString());
}
if (this.servletResponse.getCharacterEncoding() == null && this.headers.getContentType() != null &&
this.headers.getContentType().getCharSet() != null) {
this.servletResponse.setCharacterEncoding(this.headers.getContentType().getCharSet().name());
Charset charset = (contentType != null ? contentType.getCharSet() : null);
if (this.response.getCharacterEncoding() == null && charset != null) {
this.response.setCharacterEncoding(charset.name());
}
this.headersWritten = true;
}

View File

@ -21,6 +21,7 @@ import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@ -69,6 +70,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
private TestController controller;
@Override
protected HttpHandler createHttpHandler() {
@ -76,9 +78,9 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
DefaultListableBeanFactory factory = wac.getDefaultListableBeanFactory();
wac.registerSingleton("handlerMapping", RequestMappingHandlerMapping.class);
wac.registerSingleton("handlerAdapter", RequestMappingHandlerAdapter.class);
factory.registerSingleton("responseBodyResultHandler",
new ResponseBodyResultHandler(Arrays.asList(new ByteBufferEncoder(), new StringEncoder(), new JacksonJsonEncoder()), Arrays.asList
(new JsonObjectEncoder())));
factory.registerSingleton("responseBodyResultHandler", new ResponseBodyResultHandler(
Arrays.asList(new ByteBufferEncoder(), new StringEncoder(),new JacksonJsonEncoder()),
Collections.singletonList(new JsonObjectEncoder())));
wac.registerSingleton("simpleResultHandler", SimpleHandlerResultHandler.class);
this.controller = new TestController();
factory.registerSingleton("controller", this.controller);
@ -197,16 +199,16 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
@Test
public void promiseCapitalize() throws Exception {
capitalizePojo("http://localhost:" + port + "/promise-capitalize");
capitalizePojo("http://localhost:" + this.port + "/promise-capitalize");
}
@Test
public void create() throws Exception {
RestTemplate restTemplate = new RestTemplate();
URI url = new URI("http://localhost:" + port + "/create");
List<Person> persons = Arrays.asList(new Person("Robert"), new Person("Marie"));
RequestEntity<List<Person>> request = RequestEntity.post(url).contentType(MediaType.APPLICATION_JSON).body(persons);
URI url = new URI("http://localhost:" + this.port + "/create");
RequestEntity<List<Person>> request = RequestEntity.post(url)
.contentType(MediaType.APPLICATION_JSON)
.body(Arrays.asList(new Person("Robert"), new Person("Marie")));
ResponseEntity<Void> response = restTemplate.exchange(request, Void.class);
assertEquals(HttpStatus.OK, response.getStatusCode());
@ -216,9 +218,9 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
public void serializeAsPojo(String requestUrl) throws Exception {
RestTemplate restTemplate = new RestTemplate();
URI url = new URI(requestUrl);
RequestEntity<Void> request = RequestEntity.get(url).accept(MediaType.APPLICATION_JSON).build();
RequestEntity<Void> request = RequestEntity.get(new URI(requestUrl))
.accept(MediaType.APPLICATION_JSON)
.build();
ResponseEntity<Person> response = restTemplate.exchange(request, Person.class);
assertEquals(new Person("Robert"), response.getBody());
@ -226,10 +228,9 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
public void postAsPojo(String requestUrl) throws Exception {
RestTemplate restTemplate = new RestTemplate();
URI url = new URI(requestUrl);
RequestEntity<Person> request = RequestEntity.post(url).accept(MediaType.APPLICATION_JSON).body(new Person
("Robert"));
RequestEntity<Person> request = RequestEntity.post(new URI(requestUrl))
.accept(MediaType.APPLICATION_JSON)
.body(new Person("Robert"));
ResponseEntity<Person> response = restTemplate.exchange(request, Person.class);
assertEquals(new Person("Robert"), response.getBody());
@ -237,10 +238,11 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
public void serializeAsCollection(String requestUrl) throws Exception {
RestTemplate restTemplate = new RestTemplate();
URI url = new URI(requestUrl);
RequestEntity<Void> request = RequestEntity.get(url).accept(MediaType.APPLICATION_JSON).build();
List<Person> results = restTemplate.exchange(request, new ParameterizedTypeReference<List<Person>>(){}).getBody();
RequestEntity<Void> request = RequestEntity.get(new URI(requestUrl))
.accept(MediaType.APPLICATION_JSON)
.build();
List<Person> results = restTemplate.exchange(request,
new ParameterizedTypeReference<List<Person>>(){}).getBody();
assertEquals(2, results.size());
assertEquals(new Person("Robert"), results.get(0));
@ -250,10 +252,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
public void capitalizePojo(String requestUrl) throws Exception {
RestTemplate restTemplate = new RestTemplate();
URI url = new URI(requestUrl);
RequestEntity<Person> request = RequestEntity
.post(url)
RequestEntity<Person> request = RequestEntity.post(new URI(requestUrl))
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.body(new Person("Robert"));
@ -265,15 +264,12 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
public void capitalizeCollection(String requestUrl) throws Exception {
RestTemplate restTemplate = new RestTemplate();
URI url = new URI(requestUrl);
List<Person> persons = Arrays.asList(new Person("Robert"), new Person("Marie"));
RequestEntity<List<Person>> request = RequestEntity
.post(url)
RequestEntity<List<Person>> request = RequestEntity.post(new URI(requestUrl))
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.body(persons);
List<Person> results = restTemplate.exchange(request, new ParameterizedTypeReference<List<Person>>(){}).getBody();
.body(Arrays.asList(new Person("Robert"), new Person("Marie")));
List<Person> results = restTemplate.exchange(request,
new ParameterizedTypeReference<List<Person>>(){}).getBody();
assertEquals(2, results.size());
assertEquals("ROBERT", results.get(0).getName());
@ -309,7 +305,8 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
@ResponseBody
public Publisher<ByteBuffer> rawResponseBody() {
JacksonJsonEncoder encoder = new JacksonJsonEncoder();
return encoder.encode(Streams.just(new Person("Robert")), ResolvableType.forClass(Person.class), MediaType.APPLICATION_JSON);
return encoder.encode(Streams.just(new Person("Robert")),
ResolvableType.forClass(Person.class), MediaType.APPLICATION_JSON);
}
@RequestMapping("/raw-observable")
@ -390,7 +387,9 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
@RequestMapping("/completable-future-capitalize")
@ResponseBody
public CompletableFuture<Person> completableFutureCapitalize(@RequestBody CompletableFuture<Person> personFuture) {
public CompletableFuture<Person> completableFutureCapitalize(
@RequestBody CompletableFuture<Person> personFuture) {
return personFuture.thenApply(person -> {
person.setName(person.getName().toUpperCase());
return person;
@ -417,7 +416,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
@RequestMapping("/create")
public Publisher<Void> create(@RequestBody Stream<Person> personStream) {
return personStream.toList().onSuccess(personList -> persons.addAll(personList)).after();
return personStream.toList().onSuccess(persons::addAll).after();
}
//TODO add mixed and T request mappings tests

View File

@ -32,22 +32,24 @@ import org.springframework.web.method.HandlerMethod;
*/
public class ResponseBodyResultHandlerTests {
@Test
public void supports() throws NoSuchMethodException {
ResponseBodyResultHandler resultHandler = new ResponseBodyResultHandler(Collections.emptyList());
TestController controller = new TestController();
HandlerMethod notAnnotatedMethod = new HandlerMethod(controller, TestController.class.getMethod("notAnnotated"));
assertFalse(resultHandler.supports(new HandlerResult(notAnnotatedMethod, null)));
HandlerMethod hm = new HandlerMethod(controller,TestController.class.getMethod("notAnnotated"));
assertFalse(resultHandler.supports(new HandlerResult(hm, null)));
HandlerMethod publisherStringMethod = new HandlerMethod(controller, TestController.class.getMethod("publisherString"));
assertTrue(resultHandler.supports(new HandlerResult(publisherStringMethod, null)));
hm = new HandlerMethod(controller, TestController.class.getMethod("publisherString"));
assertTrue(resultHandler.supports(new HandlerResult(hm, null)));
HandlerMethod publisherVoidMethod = new HandlerMethod(controller, TestController.class.getMethod("publisherVoid"));
assertTrue(resultHandler.supports(new HandlerResult(publisherVoidMethod, null)));
hm = new HandlerMethod(controller, TestController.class.getMethod("publisherVoid"));
assertTrue(resultHandler.supports(new HandlerResult(hm, null)));
}
@SuppressWarnings("unused")
private static class TestController {
public Publisher<String> notAnnotated() {