Merge branch '5.1.x'
This commit is contained in:
commit
eb971690d2
|
@ -87,7 +87,7 @@ public abstract class AbstractJackson2Decoder extends Jackson2CodecSupport imple
|
||||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||||
|
|
||||||
Flux<TokenBuffer> tokens = Jackson2Tokenizer.tokenize(
|
Flux<TokenBuffer> tokens = Jackson2Tokenizer.tokenize(
|
||||||
Flux.from(input), this.jsonFactory, getObjectMapper().getDeserializationContext(), true);
|
Flux.from(input), this.jsonFactory, getObjectMapper(), true);
|
||||||
return decodeInternal(tokens, elementType, mimeType, hints);
|
return decodeInternal(tokens, elementType, mimeType, hints);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,7 +96,7 @@ public abstract class AbstractJackson2Decoder extends Jackson2CodecSupport imple
|
||||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||||
|
|
||||||
Flux<TokenBuffer> tokens = Jackson2Tokenizer.tokenize(
|
Flux<TokenBuffer> tokens = Jackson2Tokenizer.tokenize(
|
||||||
Flux.from(input), this.jsonFactory, getObjectMapper().getDeserializationContext(), false);
|
Flux.from(input), this.jsonFactory, getObjectMapper(), false);
|
||||||
return decodeInternal(tokens, elementType, mimeType, hints).singleOrEmpty();
|
return decodeInternal(tokens, elementType, mimeType, hints).singleOrEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,8 @@ import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.core.JsonToken;
|
import com.fasterxml.jackson.core.JsonToken;
|
||||||
import com.fasterxml.jackson.core.async.ByteArrayFeeder;
|
import com.fasterxml.jackson.core.async.ByteArrayFeeder;
|
||||||
import com.fasterxml.jackson.databind.DeserializationContext;
|
import com.fasterxml.jackson.databind.DeserializationContext;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.fasterxml.jackson.databind.deser.DefaultDeserializationContext;
|
||||||
import com.fasterxml.jackson.databind.util.TokenBuffer;
|
import com.fasterxml.jackson.databind.util.TokenBuffer;
|
||||||
import reactor.core.Exceptions;
|
import reactor.core.Exceptions;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
|
@ -177,16 +179,22 @@ final class Jackson2Tokenizer {
|
||||||
* Tokenize the given {@code Flux<DataBuffer>} into {@code Flux<TokenBuffer>}.
|
* Tokenize the given {@code Flux<DataBuffer>} into {@code Flux<TokenBuffer>}.
|
||||||
* @param dataBuffers the source data buffers
|
* @param dataBuffers the source data buffers
|
||||||
* @param jsonFactory the factory to use
|
* @param jsonFactory the factory to use
|
||||||
|
* @param objectMapper the current mapper instance
|
||||||
* @param tokenizeArrayElements if {@code true} and the "top level" JSON object is
|
* @param tokenizeArrayElements if {@code true} and the "top level" JSON object is
|
||||||
* an array, each element is returned individually immediately after it is received
|
* an array, each element is returned individually immediately after it is received
|
||||||
* @return the resulting token buffers
|
* @return the resulting token buffers
|
||||||
*/
|
*/
|
||||||
public static Flux<TokenBuffer> tokenize(Flux<DataBuffer> dataBuffers, JsonFactory jsonFactory,
|
public static Flux<TokenBuffer> tokenize(Flux<DataBuffer> dataBuffers, JsonFactory jsonFactory,
|
||||||
DeserializationContext deserializationContext, boolean tokenizeArrayElements) {
|
ObjectMapper objectMapper, boolean tokenizeArrayElements) {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
JsonParser parser = jsonFactory.createNonBlockingByteArrayParser();
|
JsonParser parser = jsonFactory.createNonBlockingByteArrayParser();
|
||||||
Jackson2Tokenizer tokenizer = new Jackson2Tokenizer(parser, deserializationContext, tokenizeArrayElements);
|
DeserializationContext context = objectMapper.getDeserializationContext();
|
||||||
|
if (context instanceof DefaultDeserializationContext) {
|
||||||
|
context = ((DefaultDeserializationContext) context).createInstance(
|
||||||
|
objectMapper.getDeserializationConfig(), parser, objectMapper.getInjectableValues());
|
||||||
|
}
|
||||||
|
Jackson2Tokenizer tokenizer = new Jackson2Tokenizer(parser, context, tokenizeArrayElements);
|
||||||
return dataBuffers.concatMapIterable(tokenizer::tokenize).concatWith(tokenizer.endOfInput());
|
return dataBuffers.concatMapIterable(tokenizer::tokenize).concatWith(tokenizer.endOfInput());
|
||||||
}
|
}
|
||||||
catch (IOException ex) {
|
catch (IOException ex) {
|
||||||
|
|
|
@ -184,11 +184,8 @@ public class Jackson2TokenizerTests extends AbstractLeakCheckingTestCase {
|
||||||
@Test
|
@Test
|
||||||
public void errorInStream() {
|
public void errorInStream() {
|
||||||
DataBuffer buffer = stringBuffer("{\"id\":1,\"name\":");
|
DataBuffer buffer = stringBuffer("{\"id\":1,\"name\":");
|
||||||
Flux<DataBuffer> source = Flux.just(buffer)
|
Flux<DataBuffer> source = Flux.just(buffer).concatWith(Flux.error(new RuntimeException()));
|
||||||
.concatWith(Flux.error(new RuntimeException()));
|
Flux<TokenBuffer> result = Jackson2Tokenizer.tokenize(source, this.jsonFactory, this.objectMapper, true);
|
||||||
|
|
||||||
Flux<TokenBuffer> result = Jackson2Tokenizer.tokenize(
|
|
||||||
source, this.jsonFactory, this.objectMapper.getDeserializationContext(), true);
|
|
||||||
|
|
||||||
StepVerifier.create(result)
|
StepVerifier.create(result)
|
||||||
.expectError(RuntimeException.class)
|
.expectError(RuntimeException.class)
|
||||||
|
@ -198,8 +195,7 @@ public class Jackson2TokenizerTests extends AbstractLeakCheckingTestCase {
|
||||||
@Test // SPR-16521
|
@Test // SPR-16521
|
||||||
public void jsonEOFExceptionIsWrappedAsDecodingError() {
|
public void jsonEOFExceptionIsWrappedAsDecodingError() {
|
||||||
Flux<DataBuffer> source = Flux.just(stringBuffer("{\"status\": \"noClosingQuote}"));
|
Flux<DataBuffer> source = Flux.just(stringBuffer("{\"status\": \"noClosingQuote}"));
|
||||||
Flux<TokenBuffer> tokens = Jackson2Tokenizer.tokenize(
|
Flux<TokenBuffer> tokens = Jackson2Tokenizer.tokenize(source, this.jsonFactory, this.objectMapper, false);
|
||||||
source, this.jsonFactory, this.objectMapper.getDeserializationContext(), false);
|
|
||||||
|
|
||||||
StepVerifier.create(tokens)
|
StepVerifier.create(tokens)
|
||||||
.expectError(DecodingException.class)
|
.expectError(DecodingException.class)
|
||||||
|
@ -208,12 +204,11 @@ public class Jackson2TokenizerTests extends AbstractLeakCheckingTestCase {
|
||||||
|
|
||||||
|
|
||||||
private void testTokenize(List<String> source, List<String> expected, boolean tokenizeArrayElements) {
|
private void testTokenize(List<String> source, List<String> expected, boolean tokenizeArrayElements) {
|
||||||
Flux<TokenBuffer> tokenBufferFlux = Jackson2Tokenizer.tokenize(
|
Flux<TokenBuffer> tokens = Jackson2Tokenizer.tokenize(
|
||||||
Flux.fromIterable(source).map(this::stringBuffer),
|
Flux.fromIterable(source).map(this::stringBuffer),
|
||||||
this.jsonFactory, this.objectMapper.getDeserializationContext(),
|
this.jsonFactory, this.objectMapper, tokenizeArrayElements);
|
||||||
tokenizeArrayElements);
|
|
||||||
|
|
||||||
Flux<String> result = tokenBufferFlux
|
Flux<String> result = tokens
|
||||||
.map(tokenBuffer -> {
|
.map(tokenBuffer -> {
|
||||||
try {
|
try {
|
||||||
TreeNode root = this.objectMapper.readTree(tokenBuffer.asParser());
|
TreeNode root = this.objectMapper.readTree(tokenBuffer.asParser());
|
||||||
|
|
Loading…
Reference in New Issue