Join buffers in decodeToMono for Jackson and Jaxb2

Closes gh-22783
This commit is contained in:
Rossen Stoyanchev 2019-04-10 17:33:08 -04:00
parent 3d0ec509ab
commit 2aae81ef0c
3 changed files with 77 additions and 38 deletions

View File

@ -38,6 +38,7 @@ import org.springframework.core.codec.CodecException;
import org.springframework.core.codec.DecodingException; import org.springframework.core.codec.DecodingException;
import org.springframework.core.codec.Hints; import org.springframework.core.codec.Hints;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.log.LogFormatUtils; import org.springframework.core.log.LogFormatUtils;
import org.springframework.http.codec.HttpMessageDecoder; import org.springframework.http.codec.HttpMessageDecoder;
import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequest;
@ -88,56 +89,73 @@ public abstract class AbstractJackson2Decoder extends Jackson2CodecSupport imple
Flux<TokenBuffer> tokens = Jackson2Tokenizer.tokenize( Flux<TokenBuffer> tokens = Jackson2Tokenizer.tokenize(
Flux.from(input), this.jsonFactory, getObjectMapper(), true); Flux.from(input), this.jsonFactory, getObjectMapper(), true);
return decodeInternal(tokens, elementType, mimeType, hints);
ObjectReader reader = getObjectReader(elementType, hints);
return tokens.handle((tokenBuffer, sink) -> {
try {
Object value = reader.readValue(tokenBuffer.asParser(getObjectMapper()));
logValue(value, hints);
if (value != null) {
sink.next(value);
}
}
catch (IOException ex) {
sink.error(processException(ex));
}
});
} }
@Override @Override
public Mono<Object> decodeToMono(Publisher<DataBuffer> input, ResolvableType elementType, public Mono<Object> decodeToMono(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) { @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Flux<TokenBuffer> tokens = Jackson2Tokenizer.tokenize( return DataBufferUtils.join(input).map(dataBuffer -> {
Flux.from(input), this.jsonFactory, getObjectMapper(), false); try {
return decodeInternal(tokens, elementType, mimeType, hints).singleOrEmpty(); ObjectReader objectReader = getObjectReader(elementType, hints);
Object value = objectReader.readValue(dataBuffer.asInputStream());
logValue(value, hints);
return value;
}
catch (IOException ex) {
throw processException(ex);
}
finally {
DataBufferUtils.release(dataBuffer);
}
});
} }
private Flux<Object> decodeInternal(Flux<TokenBuffer> tokens, ResolvableType elementType, private ObjectReader getObjectReader(ResolvableType elementType, @Nullable Map<String, Object> hints) {
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Assert.notNull(tokens, "'tokens' must not be null");
Assert.notNull(elementType, "'elementType' must not be null"); Assert.notNull(elementType, "'elementType' must not be null");
MethodParameter param = getParameter(elementType); MethodParameter param = getParameter(elementType);
Class<?> contextClass = (param != null ? param.getContainingClass() : null); Class<?> contextClass = (param != null ? param.getContainingClass() : null);
JavaType javaType = getJavaType(elementType.getType(), contextClass); JavaType javaType = getJavaType(elementType.getType(), contextClass);
Class<?> jsonView = (hints != null ? (Class<?>) hints.get(Jackson2CodecSupport.JSON_VIEW_HINT) : null); Class<?> jsonView = (hints != null ? (Class<?>) hints.get(Jackson2CodecSupport.JSON_VIEW_HINT) : null);
return jsonView != null ?
ObjectReader reader = (jsonView != null ?
getObjectMapper().readerWithView(jsonView).forType(javaType) : getObjectMapper().readerWithView(jsonView).forType(javaType) :
getObjectMapper().readerFor(javaType)); getObjectMapper().readerFor(javaType);
}
return tokens.handle((tokenBuffer, sink) -> { private void logValue(@Nullable Object value, @Nullable Map<String, Object> hints) {
try { if (!Hints.isLoggingSuppressed(hints)) {
Object value = reader.readValue(tokenBuffer.asParser(getObjectMapper())); LogFormatUtils.traceDebug(logger, traceOn -> {
if (!Hints.isLoggingSuppressed(hints)) { String formatted = LogFormatUtils.formatValue(value, !traceOn);
LogFormatUtils.traceDebug(logger, traceOn -> { return Hints.getLogPrefix(hints) + "Decoded [" + formatted + "]";
String formatted = LogFormatUtils.formatValue(value, !traceOn); });
return Hints.getLogPrefix(hints) + "Decoded [" + formatted + "]"; }
}); }
}
if (value != null) { private CodecException processException(IOException ex) {
sink.next(value); if (ex instanceof InvalidDefinitionException) {
} JavaType type = ((InvalidDefinitionException) ex).getType();
} return new CodecException("Type definition error: " + type, ex);
catch (InvalidDefinitionException ex) { }
sink.error(new CodecException("Type definition error: " + ex.getType(), ex)); if (ex instanceof JsonProcessingException) {
} String originalMessage = ((JsonProcessingException) ex).getOriginalMessage();
catch (JsonProcessingException ex) { return new DecodingException("JSON decoding error: " + originalMessage, ex);
sink.error(new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex)); }
} return new DecodingException("I/O error while parsing input stream", ex);
catch (IOException ex) {
sink.error(new DecodingException("I/O error while parsing input stream", ex));
}
});
} }

View File

@ -17,6 +17,7 @@
package org.springframework.http.codec.xml; package org.springframework.http.codec.xml;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
@ -31,9 +32,12 @@ import javax.xml.bind.annotation.XmlSchema;
import javax.xml.bind.annotation.XmlType; import javax.xml.bind.annotation.XmlType;
import javax.xml.namespace.QName; import javax.xml.namespace.QName;
import javax.xml.stream.XMLEventReader; import javax.xml.stream.XMLEventReader;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.events.XMLEvent; import javax.xml.stream.events.XMLEvent;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink; import reactor.core.publisher.SynchronousSink;
@ -44,6 +48,7 @@ import org.springframework.core.codec.CodecException;
import org.springframework.core.codec.DecodingException; import org.springframework.core.codec.DecodingException;
import org.springframework.core.codec.Hints; import org.springframework.core.codec.Hints;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.log.LogFormatUtils; import org.springframework.core.log.LogFormatUtils;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -72,6 +77,8 @@ public class Jaxb2XmlDecoder extends AbstractDecoder<Object> {
*/ */
private static final String JAXB_DEFAULT_ANNOTATION_VALUE = "##default"; private static final String JAXB_DEFAULT_ANNOTATION_VALUE = "##default";
private static final XMLInputFactory inputFactory = StaxUtils.createDefensiveInputFactory();
private final XmlEventDecoder xmlEventDecoder = new XmlEventDecoder(); private final XmlEventDecoder xmlEventDecoder = new XmlEventDecoder();
@ -132,10 +139,24 @@ public class Jaxb2XmlDecoder extends AbstractDecoder<Object> {
} }
@Override @Override
public Mono<Object> decodeToMono(Publisher<DataBuffer> inputStream, ResolvableType elementType, @SuppressWarnings({"rawtypes", "unchecked", "cast"}) // XMLEventReader is Iterator<Object> on JDK 9
public Mono<Object> decodeToMono(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) { @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
return decode(inputStream, elementType, mimeType, hints).singleOrEmpty(); return DataBufferUtils.join(input).map(dataBuffer -> {
try {
Iterator eventReader = inputFactory.createXMLEventReader(dataBuffer.asInputStream());
List<XMLEvent> events = new ArrayList<>();
eventReader.forEachRemaining(event -> events.add((XMLEvent) event));
return unmarshal(events, elementType.toClass());
}
catch (XMLStreamException ex) {
throw Exceptions.propagate(ex);
}
finally {
DataBufferUtils.release(dataBuffer);
}
});
} }
private Object unmarshal(List<XMLEvent> events, Class<?> outputClass) { private Object unmarshal(List<XMLEvent> events, Class<?> outputClass) {

View File

@ -95,7 +95,7 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> {
@Override @Override
@SuppressWarnings({"rawtypes", "unchecked", "cast"}) // on JDK 9 where XMLEventReader is Iterator<Object> instead of simply Iterator @SuppressWarnings({"rawtypes", "unchecked", "cast"}) // XMLEventReader is Iterator<Object> on JDK 9
public Flux<XMLEvent> decode(Publisher<DataBuffer> input, ResolvableType elementType, public Flux<XMLEvent> decode(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) { @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {