diff --git a/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2Tokenizer.java b/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2Tokenizer.java index 259a844c3c..a52ab96d91 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2Tokenizer.java +++ b/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2Tokenizer.java @@ -28,6 +28,7 @@ import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.core.async.ByteArrayFeeder; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.util.TokenBuffer; +import reactor.core.Exceptions; import reactor.core.publisher.Flux; import org.springframework.core.codec.DecodingException; @@ -74,7 +75,7 @@ final class Jackson2Tokenizer { } - private Flux tokenize(DataBuffer dataBuffer) { + private List tokenize(DataBuffer dataBuffer) { byte[] bytes = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(bytes); DataBufferUtils.release(dataBuffer); @@ -84,27 +85,29 @@ final class Jackson2Tokenizer { return parseTokenBufferFlux(); } catch (JsonProcessingException ex) { - return Flux.error(new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex)); + throw new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex); } catch (IOException ex) { - return Flux.error(ex); + throw Exceptions.propagate(ex); } } private Flux endOfInput() { - this.inputFeeder.endOfInput(); - try { - return parseTokenBufferFlux(); - } - catch (JsonProcessingException ex) { - return Flux.error(new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex)); - } - catch (IOException ex) { - return Flux.error(ex); - } + return Flux.defer(() -> { + this.inputFeeder.endOfInput(); + try { + return Flux.fromIterable(parseTokenBufferFlux()); + } + catch (JsonProcessingException ex) { + throw new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex); + } + catch (IOException ex) { + throw Exceptions.propagate(ex); + } + }); } - private Flux parseTokenBufferFlux() throws IOException { + private List parseTokenBufferFlux() throws IOException { List result = new ArrayList<>(); while (true) { @@ -122,7 +125,7 @@ final class Jackson2Tokenizer { processTokenArray(token, result); } } - return Flux.fromIterable(result); + return result; } private void updateDepth(JsonToken token) { @@ -184,7 +187,7 @@ final class Jackson2Tokenizer { try { JsonParser parser = jsonFactory.createNonBlockingByteArrayParser(); Jackson2Tokenizer tokenizer = new Jackson2Tokenizer(parser, deserializationContext, tokenizeArrayElements); - return dataBuffers.flatMap(tokenizer::tokenize, Flux::error, tokenizer::endOfInput); + return dataBuffers.concatMapIterable(tokenizer::tokenize).concatWith(tokenizer.endOfInput()); } catch (IOException ex) { return Flux.error(ex);