Polish "decodeOne" related changes

This commit is contained in:
Rossen Stoyanchev 2016-07-01 15:59:29 -04:00
parent 917a2fb9d0
commit a68ff94fbc
4 changed files with 74 additions and 90 deletions

View File

@ -52,8 +52,10 @@ public class StringDecoder extends AbstractDecoder<String> {
private static final IntPredicate NEWLINE_DELIMITER = b -> b == '\n' || b == '\r';
private final boolean splitOnNewline;
/**
* Create a {@code StringDecoder} that decodes a bytes stream to a String stream
*
@ -74,6 +76,7 @@ public class StringDecoder extends AbstractDecoder<String> {
this.splitOnNewline = splitOnNewline;
}
@Override
public boolean canDecode(ResolvableType elementType, MimeType mimeType, Object... hints) {
return super.canDecode(elementType, mimeType, hints) &&
@ -83,37 +86,27 @@ public class StringDecoder extends AbstractDecoder<String> {
@Override
public Flux<String> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
MimeType mimeType, Object... hints) {
Flux<DataBuffer> inputFlux = Flux.from(inputStream);
if (this.splitOnNewline) {
inputFlux = inputFlux.flatMap(StringDecoder::splitOnNewline);
inputFlux = Flux.from(inputStream).flatMap(StringDecoder::splitOnNewline);
}
Charset charset = getCharset(mimeType);
return inputFlux.map(dataBuffer -> {
CharBuffer charBuffer = charset.decode(dataBuffer.asByteBuffer());
DataBufferUtils.release(dataBuffer);
return charBuffer.toString();
});
return decodeInternal(inputFlux, mimeType);
}
@Override
public Mono<String> decodeOne(Publisher<DataBuffer> inputStream, ResolvableType elementType,
MimeType mimeType, Object... hints) {
Charset charset = getCharset(mimeType);
return Flux.from(inputStream)
.map(dataBuffer -> {
CharBuffer charBuffer = charset.decode(dataBuffer.asByteBuffer());
DataBufferUtils.release(dataBuffer);
return charBuffer.toString();
})
.collect(StringBuilder::new, StringBuilder::append)
.map(StringBuilder::toString);
return decodeInternal(Flux.from(inputStream), mimeType).
collect(StringBuilder::new, StringBuilder::append).
map(StringBuilder::toString);
}
private static Flux<DataBuffer> splitOnNewline(DataBuffer dataBuffer) {
List<DataBuffer> results = new ArrayList<DataBuffer>();
List<DataBuffer> results = new ArrayList<>();
int startIdx = 0;
int endIdx = 0;
int endIdx;
final int limit = dataBuffer.readableByteCount();
do {
endIdx = dataBuffer.indexOf(NEWLINE_DELIMITER, startIdx);
@ -126,7 +119,15 @@ public class StringDecoder extends AbstractDecoder<String> {
DataBufferUtils.release(dataBuffer);
return Flux.fromIterable(results);
}
private Flux<String> decodeInternal(Flux<DataBuffer> inputFlux, MimeType mimeType) {
Charset charset = getCharset(mimeType);
return inputFlux.map(dataBuffer -> {
CharBuffer charBuffer = charset.decode(dataBuffer.asByteBuffer());
DataBufferUtils.release(dataBuffer);
return charBuffer.toString();
});
}
private Charset getCharset(MimeType mimeType) {
if (mimeType != null && mimeType.getCharset() != null) {

View File

@ -29,7 +29,6 @@ import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.CodecException;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.AbstractDecoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.support.DataBufferUtils;
@ -38,18 +37,26 @@ import org.springframework.util.MimeType;
/**
* Decode from a bytes stream of JSON objects to a stream of {@code Object} (POJO).
* Decode a byte stream into JSON and convert to Object's with Jackson.
*
* @author Sebastien Deleuze
* @author Rossen Stoyanchev
*
* @see JacksonJsonEncoder
*/
public class JacksonJsonDecoder extends AbstractDecoder<Object> {
private static final MimeType[] MIME_TYPES = new MimeType[] {
new MimeType("application", "json", StandardCharsets.UTF_8),
new MimeType("application", "*+json", StandardCharsets.UTF_8)
};
private final ObjectMapper mapper;
private final Decoder<DataBuffer> fluxPreProcessor = new JsonObjectDecoder();
private final JsonObjectDecoder fluxObjectDecoder = new JsonObjectDecoder(true);
private final Decoder<DataBuffer> monoPreProcessor = new JsonObjectDecoder(false);
private final JsonObjectDecoder monoObjectDecoder = new JsonObjectDecoder(false);
public JacksonJsonDecoder() {
@ -57,46 +64,39 @@ public class JacksonJsonDecoder extends AbstractDecoder<Object> {
}
public JacksonJsonDecoder(ObjectMapper mapper) {
super(new MimeType("application", "json", StandardCharsets.UTF_8),
new MimeType("application", "*+json", StandardCharsets.UTF_8));
super(MIME_TYPES);
this.mapper = mapper;
}
@Override
public Flux<Object> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
MimeType mimeType, Object... hints) {
Assert.notNull(inputStream, "'inputStream' must not be null");
Assert.notNull(elementType, "'elementType' must not be null");
TypeFactory typeFactory = this.mapper.getTypeFactory();
JavaType javaType = typeFactory.constructType(elementType.getType());
ObjectReader reader = this.mapper.readerFor(javaType);
return this.fluxPreProcessor.decode(inputStream, elementType, mimeType, hints)
.map(dataBuffer -> {
try {
Object value = reader.readValue(dataBuffer.asInputStream());
DataBufferUtils.release(dataBuffer);
return value;
}
catch (IOException e) {
return Flux.error(new CodecException("Error while reading the data", e));
}
});
JsonObjectDecoder objectDecoder = this.fluxObjectDecoder;
return decodeInternal(objectDecoder, inputStream, elementType, mimeType, hints);
}
@Override
public Mono<Object> decodeOne(Publisher<DataBuffer> inputStream, ResolvableType elementType,
MimeType mimeType, Object... hints) {
JsonObjectDecoder objectDecoder = this.monoObjectDecoder;
return decodeInternal(objectDecoder, inputStream, elementType, mimeType, hints).single();
}
private Flux<Object> decodeInternal(JsonObjectDecoder objectDecoder, Publisher<DataBuffer> inputStream,
ResolvableType elementType, MimeType mimeType, Object[] hints) {
Assert.notNull(inputStream, "'inputStream' must not be null");
Assert.notNull(elementType, "'elementType' must not be null");
TypeFactory typeFactory = this.mapper.getTypeFactory();
JavaType javaType = typeFactory.constructType(elementType.getType());
ObjectReader reader = this.mapper.readerFor(javaType);
return this.monoPreProcessor.decode(inputStream, elementType, mimeType, hints)
.single()
return objectDecoder.decode(inputStream, elementType, mimeType, hints)
.map(dataBuffer -> {
try {
Object value = reader.readValue(dataBuffer.asInputStream());
@ -106,7 +106,7 @@ public class JacksonJsonDecoder extends AbstractDecoder<Object> {
catch (IOException e) {
return Flux.error(new CodecException("Error while reading the data", e));
}
});
});
}
}

View File

@ -118,10 +118,7 @@ public class CodecHttpMessageConverter<T> implements HttpMessageConverter<T> {
if (this.decoder == null) {
return Flux.error(new IllegalStateException("No decoder set"));
}
MediaType contentType = inputMessage.getHeaders().getContentType();
if (contentType == null) {
contentType = MediaType.APPLICATION_OCTET_STREAM;
}
MediaType contentType = getContentType(inputMessage);
return this.decoder.decode(inputMessage.getBody(), type, contentType);
}
@ -130,13 +127,16 @@ public class CodecHttpMessageConverter<T> implements HttpMessageConverter<T> {
if (this.decoder == null) {
return Mono.error(new IllegalStateException("No decoder set"));
}
MediaType contentType = inputMessage.getHeaders().getContentType();
if (contentType == null) {
contentType = MediaType.APPLICATION_OCTET_STREAM;
}
MediaType contentType = getContentType(inputMessage);
return this.decoder.decodeOne(inputMessage.getBody(), type, contentType);
}
private MediaType getContentType(ReactiveHttpInputMessage inputMessage) {
MediaType contentType = inputMessage.getHeaders().getContentType();
return (contentType != null ? contentType : MediaType.APPLICATION_OCTET_STREAM);
}
@Override
public Mono<Void> write(Publisher<? extends T> inputStream, ResolvableType type,
MediaType contentType, ReactiveHttpOutputMessage outputMessage) {
@ -181,6 +181,7 @@ public class CodecHttpMessageConverter<T> implements HttpMessageConverter<T> {
* @param elementType the type of element for encoding
* @return the content type, or {@code null}
*/
@SuppressWarnings("UnusedParameters")
protected MediaType getDefaultContentType(ResolvableType elementType) {
return (!this.writableMediaTypes.isEmpty() ? this.writableMediaTypes.get(0) : null);
}

View File

@ -23,7 +23,6 @@ import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import org.springframework.core.Conventions;
import org.springframework.core.MethodParameter;
@ -32,6 +31,7 @@ import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.convert.ConversionService;
import org.springframework.http.MediaType;
import org.springframework.http.converter.reactive.HttpMessageConverter;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.ui.ModelMap;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
@ -122,44 +122,36 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve
ServerWebExchange exchange) {
ResolvableType type = ResolvableType.forMethodParameter(parameter);
boolean isAsyncType = isAsyncType(type);
boolean isStreamableType = isStreamableType(type);
ResolvableType elementType = (isStreamableType || isAsyncType ? type.getGeneric(0) : type);
MediaType mediaType = exchange.getRequest().getHeaders().getContentType();
boolean convertFromMono = getConversionService().canConvert(Mono.class, type.getRawClass());
boolean convertFromFlux = getConversionService().canConvert(Flux.class, type.getRawClass());
ResolvableType elementType = convertFromMono || convertFromFlux ? type.getGeneric(0) : type;
ServerHttpRequest request = exchange.getRequest();
MediaType mediaType = request.getHeaders().getContentType();
if (mediaType == null) {
mediaType = MediaType.APPLICATION_OCTET_STREAM;
}
for (HttpMessageConverter<?> converter : getMessageConverters()) {
if (converter.canRead(elementType, mediaType)) {
if (isStreamableType) {
Publisher<?> elements = converter.read(elementType, exchange.getRequest());
if (convertFromFlux) {
Publisher<?> flux = converter.read(elementType, request);
if (this.validator != null) {
elements= applyValidationIfApplicable(elements, parameter);
}
if (Flux.class.equals(type.getRawClass())) {
return Mono.just(elements);
}
else if (isAsyncType && this.conversionService.canConvert(Flux.class, type.getRawClass())) {
return Mono.just(this.conversionService.convert(elements, type.getRawClass()));
flux= applyValidationIfApplicable(flux, parameter);
}
return Mono.just(this.conversionService.convert(flux, type.getRawClass()));
}
else {
Mono<?> element = converter.readOne(elementType, exchange.getRequest());
Mono<?> mono = converter.readOne(elementType, request);
if (this.validator != null) {
element = Mono.from(applyValidationIfApplicable(element, parameter));
mono = Mono.from(applyValidationIfApplicable(mono, parameter));
}
if (Mono.class.equals(type.getRawClass())) {
return Mono.just(element);
}
else if (isAsyncType && this.conversionService.canConvert(Mono.class, type.getRawClass())) {
return Mono.just(this.conversionService.convert(element, type.getRawClass()));
}
else {
return (Mono<Object>)element;
if (!convertFromMono) {
return mono.map(value-> value); // TODO: MonoToObjectConverter
}
return Mono.just(this.conversionService.convert(mono, type.getRawClass()));
}
}
}
@ -167,16 +159,6 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve
return Mono.error(new UnsupportedMediaTypeStatusException(mediaType, this.supportedMediaTypes));
}
private boolean isAsyncType(ResolvableType type) {
return (Mono.class.equals(type.getRawClass()) || Flux.class.equals(type.getRawClass()) ||
getConversionService().canConvert(Mono.class, type.getRawClass()) ||
getConversionService().canConvert(Flux.class, type.getRawClass()));
}
private boolean isStreamableType(ResolvableType type) {
return this.conversionService.canConvert(Flux.class, type.getRawClass());
}
protected Publisher<?> applyValidationIfApplicable(Publisher<?> elements, MethodParameter methodParam) {
Annotation[] annotations = methodParam.getParameterAnnotations();
for (Annotation ann : annotations) {
@ -185,7 +167,7 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve
Object hints = (validAnnot != null ? validAnnot.value() : AnnotationUtils.getValue(ann));
Object[] validationHints = (hints instanceof Object[] ? (Object[]) hints : new Object[] {hints});
return Flux.from(elements).map(element -> {
validate(element, validationHints, methodParam);
doValidate(element, validationHints, methodParam);
return element;
});
}
@ -196,7 +178,7 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve
/**
* TODO: replace with use of DataBinder
*/
private void validate(Object target, Object[] validationHints, MethodParameter methodParam) {
private void doValidate(Object target, Object[] validationHints, MethodParameter methodParam) {
String name = Conventions.getVariableNameForParameter(methodParam);
Errors errors = new BeanPropertyBindingResult(target, name);
if (!ObjectUtils.isEmpty(validationHints) && this.validator instanceof SmartValidator) {