Leverage Jackson non-blocking parser

This commit introduces the Jackson2Tokenizer as a replacement for the
JsonObjectDecoder. The latter was dropped because of its complexity, and
hard dependency on Netty's ByteBuf.

The new Jackson2Tokenizer leverages the new non-blocking JSON parser,
using it to parse the incoming data buffers into TokenBuffers, each
token buffer representing one JSON object. As with JsonObjectDecoder,
it also supports streaming individual JSON array elements.

Issue: SPR-14528
This commit is contained in:
Arjen Poutsma 2017-06-28 16:05:35 +02:00
parent b778f94a07
commit 31e0e53750
6 changed files with 391 additions and 424 deletions

View File

@ -59,6 +59,7 @@ configure(allprojects) { project ->
ext.httpclientVersion = "4.5.3"
ext.interceptorApiVersion = "1.2"
ext.jackson2Version = "2.9.0.pr4"
ext.jsonassertVersion = "1.5.0"
ext.javamailVersion = "1.6.0-rc2"
ext.jaxbVersion = "2.2.11"
ext.jaxwsVersion = "2.2.11"
@ -772,6 +773,7 @@ project("spring-web") {
testCompile("com.squareup.okhttp3:mockwebserver:${okhttp3Version}")
testCompile("org.xmlunit:xmlunit-matchers:${xmlunitVersion}")
testCompile("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}")
testCompile("org.skyscreamer:jsonassert:${jsonassertVersion}")
testRuntime("com.sun.mail:javax.mail:${javamailVersion}")
testRuntime("com.sun.xml.bind:jaxb-core:${jaxbVersion}")
testRuntime("com.sun.xml.bind:jaxb-impl:${jaxbVersion}")
@ -1037,7 +1039,7 @@ project("spring-test") {
optional("org.seleniumhq.selenium:selenium-java:3.4.0") {
exclude group: "io.netty", module: "netty"
}
optional("org.skyscreamer:jsonassert:1.5.0")
optional("org.skyscreamer:jsonassert:${jsonassertVersion}")
optional("com.jayway.jsonpath:json-path:2.2.0")
optional("org.reactivestreams:reactive-streams")
optional("io.projectreactor:reactor-core")
@ -1286,6 +1288,22 @@ configure(project(':spring-core')) {
}
}
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Support publication of artifacts versioned by topic branch.
* CI builds supply `-P BRANCH_NAME=<TOPIC>` to gradle at build time.

View File

@ -21,11 +21,15 @@ import java.lang.annotation.Annotation;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.exc.InvalidDefinitionException;
import com.fasterxml.jackson.databind.util.TokenBuffer;
import org.eclipse.jetty.io.RuntimeIOException;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -35,7 +39,6 @@ import org.springframework.core.ResolvableType;
import org.springframework.core.codec.CodecException;
import org.springframework.core.codec.DecodingException;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.codec.HttpMessageDecoder;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.http.server.reactive.ServerHttpRequest;
@ -54,11 +57,6 @@ import org.springframework.util.MimeType;
*/
public class Jackson2JsonDecoder extends Jackson2CodecSupport implements HttpMessageDecoder<Object> {
private final JsonObjectDecoder fluxDecoder = new JsonObjectDecoder(true);
private final JsonObjectDecoder monoDecoder = new JsonObjectDecoder(false);
public Jackson2JsonDecoder() {
super(Jackson2ObjectMapperBuilder.json().build());
}
@ -67,7 +65,6 @@ public class Jackson2JsonDecoder extends Jackson2CodecSupport implements HttpMes
super(mapper, mimeTypes);
}
@Override
public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType) {
JavaType javaType = objectMapper().getTypeFactory().constructType(elementType.getType());
@ -76,7 +73,6 @@ public class Jackson2JsonDecoder extends Jackson2CodecSupport implements HttpMes
objectMapper().canDeserialize(javaType) && supportsMimeType(mimeType));
}
@Override
public List<MimeType> getDecodableMimeTypes() {
return JSON_MIME_TYPES;
@ -86,20 +82,27 @@ public class Jackson2JsonDecoder extends Jackson2CodecSupport implements HttpMes
public Flux<Object> decode(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
return decodeInternal(this.fluxDecoder, input, elementType, mimeType, hints);
Flux<TokenBuffer> tokens = Flux.from(input)
.flatMap(new Jackson2Tokenizer(nonBlockingParser(), true));
return decodeInternal(tokens, elementType, mimeType, hints);
}
@Override
public Mono<Object> decodeToMono(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
return decodeInternal(this.monoDecoder, input, elementType, mimeType, hints).singleOrEmpty();
Flux<TokenBuffer> tokens = Flux.from(input)
.flatMap(new Jackson2Tokenizer(nonBlockingParser(), false));
return decodeInternal(tokens, elementType, mimeType, hints).singleOrEmpty();
}
private Flux<Object> decodeInternal(JsonObjectDecoder objectDecoder, Publisher<DataBuffer> inputStream,
ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
private Flux<Object> decodeInternal(Flux<TokenBuffer> tokens,
ResolvableType elementType, @Nullable MimeType mimeType,
@Nullable Map<String, Object> hints) {
Assert.notNull(inputStream, "'inputStream' must not be null");
Assert.notNull(tokens, "'tokens' must not be null");
Assert.notNull(elementType, "'elementType' must not be null");
Class<?> contextClass = getParameter(elementType).map(MethodParameter::getContainingClass).orElse(null);
@ -110,26 +113,21 @@ public class Jackson2JsonDecoder extends Jackson2CodecSupport implements HttpMes
objectMapper().readerWithView(jsonView).forType(javaType) :
objectMapper().readerFor(javaType));
return objectDecoder.decode(inputStream, elementType, mimeType, hints)
.flatMap(dataBuffer -> {
if (dataBuffer.readableByteCount() == 0) {
return Mono.empty();
}
try {
Object value = reader.readValue(dataBuffer.asInputStream());
DataBufferUtils.release(dataBuffer);
return Mono.just(value);
}
catch (InvalidDefinitionException ex) {
return Mono.error(new CodecException("Type definition error: " + ex.getType(), ex));
}
catch (JsonProcessingException ex) {
return Mono.error(new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex));
}
catch (IOException ex) {
return Mono.error(new DecodingException("I/O error while parsing input stream", ex));
}
});
return tokens.flatMap(tokenBuffer -> {
try {
Object value = reader.readValue(tokenBuffer.asParser());
return Mono.just(value);
}
catch (InvalidDefinitionException ex) {
return Mono.error(new CodecException("Type definition error: " + ex.getType(), ex));
}
catch (JsonProcessingException ex) {
return Mono.error(new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex));
}
catch (IOException ex) {
return Mono.error(new DecodingException("I/O error while parsing input stream", ex));
}
});
}
@ -147,4 +145,13 @@ public class Jackson2JsonDecoder extends Jackson2CodecSupport implements HttpMes
return parameter.getParameterAnnotation(annotType);
}
private JsonParser nonBlockingParser() {
try {
JsonFactory factory = this.objectMapper().getFactory();
return factory.createNonBlockingByteArrayParser();
}
catch (IOException ex) {
throw new RuntimeIOException(ex);
}
}
}

View File

@ -0,0 +1,151 @@
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.codec.json;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.async.ByteArrayFeeder;
import com.fasterxml.jackson.databind.util.TokenBuffer;
import reactor.core.publisher.Flux;
import org.springframework.core.codec.DecodingException;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.util.Assert;
/**
* Function that transforms an arbitrary split byte stream representing JSON objects into a
* {@code Flux<TokenBuffer>}, where each token buffer is a well-formed JSON object.
*
* @author Arjen Poutsma
* @since 5.0
*/
class Jackson2Tokenizer implements Function<DataBuffer, Flux<TokenBuffer>> {
private final JsonParser parser;
private final boolean tokenizeArrayElements;
private TokenBuffer tokenBuffer;
private int objectDepth;
private int arrayDepth;
// TODO: change to ByteBufferFeeder when supported by Jackson
private ByteArrayFeeder inputFeeder;
/**
* Create a new instance of the {@code Jackson2Tokenizer}.
* @param parser the non-blocking parser, obtained via
* {@link com.fasterxml.jackson.core.JsonFactory#createNonBlockingByteArrayParser}
* @param tokenizeArrayElements if {@code true} and the "top level" JSON object is an array,
* each of its elements is returned individually and immediately after it was fully received
*/
public Jackson2Tokenizer(JsonParser parser, boolean tokenizeArrayElements) {
Assert.notNull(parser, "'parser' must not be null");
this.parser = parser;
this.tokenizeArrayElements = tokenizeArrayElements;
this.tokenBuffer = new TokenBuffer(parser);
this.inputFeeder = (ByteArrayFeeder) this.parser.getNonBlockingInputFeeder();
}
@Override
public Flux<TokenBuffer> apply(DataBuffer dataBuffer) {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
try {
this.inputFeeder.feedInput(bytes, 0, bytes.length);
List<TokenBuffer> result = new ArrayList<>();
while (true) {
JsonToken token = this.parser.nextToken();
if (token == JsonToken.NOT_AVAILABLE) {
break;
}
calculateDepth(token);
if (!this.tokenizeArrayElements) {
processTokenNormal(token, result);
}
else {
processTokenArray(token, result);
}
}
return Flux.fromIterable(result);
}
catch (JsonProcessingException ex) {
return Flux.error(new DecodingException(
"JSON decoding error: " + ex.getOriginalMessage(), ex));
}
catch (Exception ex) {
return Flux.error(ex);
}
}
private void calculateDepth(JsonToken token) {
switch (token) {
case START_OBJECT:
this.objectDepth++;
break;
case END_OBJECT:
this.objectDepth--;
break;
case START_ARRAY:
this.arrayDepth++;
break;
case END_ARRAY:
this.arrayDepth--;
break;
}
}
private void processTokenNormal(JsonToken token, List<TokenBuffer> result) throws IOException {
this.tokenBuffer.copyCurrentEvent(this.parser);
if (token == JsonToken.END_OBJECT || token == JsonToken.END_ARRAY) {
if (this.objectDepth == 0 && this.arrayDepth == 0) {
result.add(this.tokenBuffer);
this.tokenBuffer = new TokenBuffer(this.parser);
}
}
}
private void processTokenArray(JsonToken token, List<TokenBuffer> result) throws IOException {
if (token != JsonToken.START_ARRAY && token != JsonToken.END_ARRAY) {
this.tokenBuffer.copyCurrentEvent(this.parser);
}
if (token == JsonToken.END_OBJECT && this.objectDepth == 0 &&
(this.arrayDepth == 1 || this.arrayDepth == 0)) {
result.add(this.tokenBuffer);
this.tokenBuffer = new TokenBuffer(this.parser);
}
}
}

View File

@ -1,259 +0,0 @@
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.codec.json;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.AbstractDecoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.MimeType;
/**
* Decode an arbitrary split byte stream representing JSON objects to a byte
* stream where each chunk is a well-formed JSON object.
*
* <p>This class does not do any real parsing or validation. A sequence of bytes
* is considered a JSON object/array if it contains a matching number of opening
* and closing braces/brackets.
*
* <p>Based on <a href="https://github.com/netty/netty/blob/master/codec/src/main/java/io/netty/handler/codec/json/JsonObjectDecoder.java">Netty JsonObjectDecoder</a>
*
* @author Sebastien Deleuze
* @since 5.0
*/
class JsonObjectDecoder extends AbstractDecoder<DataBuffer> {
private static final int ST_CORRUPTED = -1;
private static final int ST_INIT = 0;
private static final int ST_DECODING_NORMAL = 1;
private static final int ST_DECODING_ARRAY_STREAM = 2;
private final int maxObjectLength;
private final boolean streamArrayElements;
public JsonObjectDecoder() {
// 1 MB
this(1024 * 1024);
}
public JsonObjectDecoder(int maxObjectLength) {
this(maxObjectLength, true);
}
public JsonObjectDecoder(boolean streamArrayElements) {
this(1024 * 1024, streamArrayElements);
}
/**
* @param maxObjectLength maximum number of bytes a JSON object/array may
* use (including braces and all). Objects exceeding this length are dropped
* and an {@link IllegalStateException} is thrown.
* @param streamArrayElements if set to true and the "top level" JSON object
* is an array, each of its entries is passed through the pipeline individually
* and immediately after it was fully received, allowing for arrays with
*/
public JsonObjectDecoder(int maxObjectLength,
boolean streamArrayElements) {
super(new MimeType("application", "json", StandardCharsets.UTF_8),
new MimeType("application", "*+json", StandardCharsets.UTF_8));
if (maxObjectLength < 1) {
throw new IllegalArgumentException("maxObjectLength must be a positive int");
}
this.maxObjectLength = maxObjectLength;
this.streamArrayElements = streamArrayElements;
}
@Override
public Flux<DataBuffer> decode(Publisher<DataBuffer> inputStream, @Nullable ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
return Flux.from(inputStream)
.flatMap(new Function<DataBuffer, Publisher<? extends DataBuffer>>() {
int openBraces;
int index;
int state;
boolean insideString;
ByteBuf input;
Integer writerIndex;
@Override
public Publisher<? extends DataBuffer> apply(DataBuffer buffer) {
List<DataBuffer> chunks = new ArrayList<>();
if (this.input == null) {
this.input = Unpooled.copiedBuffer(buffer.asByteBuffer());
DataBufferUtils.release(buffer);
this.writerIndex = this.input.writerIndex();
}
else {
this.index = this.index - this.input.readerIndex();
this.input = Unpooled.copiedBuffer(this.input,
Unpooled.copiedBuffer(buffer.asByteBuffer()));
DataBufferUtils.release(buffer);
this.writerIndex = this.input.writerIndex();
}
if (this.state == ST_CORRUPTED) {
this.input.skipBytes(this.input.readableBytes());
return Flux.error(new IllegalStateException("Corrupted stream"));
}
if (this.writerIndex > maxObjectLength) {
// buffer size exceeded maxObjectLength; discarding the complete buffer.
this.input.skipBytes(this.input.readableBytes());
reset();
return Flux.error(new IllegalStateException("object length exceeds " +
maxObjectLength + ": " + this.writerIndex + " bytes discarded"));
}
DataBufferFactory dataBufferFactory = buffer.factory();
for (/* use current index */; this.index < this.writerIndex; this.index++) {
byte c = this.input.getByte(this.index);
if (this.state == ST_DECODING_NORMAL) {
decodeByte(c, this.input, this.index);
// All opening braces/brackets have been closed. That's enough to conclude
// that the JSON object/array is complete.
if (this.openBraces == 0) {
ByteBuf json = extractObject(this.input, this.input.readerIndex(),
this.index + 1 - this.input.readerIndex());
chunks.add(dataBufferFactory.wrap(json.nioBuffer()));
// The JSON object/array was extracted => discard the bytes from
// the input buffer.
this.input.readerIndex(this.index + 1);
// Reset the object state to get ready for the next JSON object/text
// coming along the byte stream.
reset();
}
}
else if (this.state == ST_DECODING_ARRAY_STREAM) {
decodeByte(c, this.input, this.index);
if (!this.insideString && (this.openBraces == 1 && c == ',' ||
this.openBraces == 0 && c == ']')) {
// skip leading spaces. No range check is needed and the loop will terminate
// because the byte at position index is not a whitespace.
for (int i = this.input.readerIndex(); Character.isWhitespace(this.input.getByte(i)); i++) {
this.input.skipBytes(1);
}
// skip trailing spaces.
int idxNoSpaces = this.index - 1;
while (idxNoSpaces >= this.input.readerIndex() &&
Character.isWhitespace(this.input.getByte(idxNoSpaces))) {
idxNoSpaces--;
}
ByteBuf json = extractObject(this.input, this.input.readerIndex(),
idxNoSpaces + 1 - this.input.readerIndex());
chunks.add(dataBufferFactory.wrap(json.nioBuffer()));
this.input.readerIndex(this.index + 1);
if (c == ']') {
reset();
}
}
// JSON object/array detected. Accumulate bytes until all braces/brackets are closed.
}
else if (c == '{' || c == '[') {
initDecoding(c, streamArrayElements);
if (this.state == ST_DECODING_ARRAY_STREAM) {
// Discard the array bracket
this.input.skipBytes(1);
}
// Discard leading spaces in front of a JSON object/array.
}
else if (Character.isWhitespace(c)) {
this.input.skipBytes(1);
}
else {
this.state = ST_CORRUPTED;
return Flux.error(new IllegalStateException(
"invalid JSON received at byte position " + this.index + ": " +
ByteBufUtil.hexDump(this.input)));
}
}
return Flux.fromIterable(chunks);
}
/**
* Override this method if you want to filter the json objects/arrays that
* get passed through the pipeline.
*/
protected ByteBuf extractObject(ByteBuf buffer, int index, int length) {
return buffer.slice(index, length).retain();
}
private void decodeByte(byte c, ByteBuf input, int index) {
if ((c == '{' || c == '[') && !this.insideString) {
this.openBraces++;
}
else if ((c == '}' || c == ']') && !this.insideString) {
this.openBraces--;
}
else if (c == '"') {
// start of a new JSON string. It's necessary to detect strings as they may
// also contain braces/brackets and that could lead to incorrect results.
if (!this.insideString) {
this.insideString = true;
// If the double quote wasn't escaped then this is the end of a string.
}
else if (input.getByte(index - 1) != '\\') {
this.insideString = false;
}
}
}
private void initDecoding(byte openingBrace, boolean streamArrayElements) {
this.openBraces = 1;
if (openingBrace == '[' && streamArrayElements) {
this.state = ST_DECODING_ARRAY_STREAM;
}
else {
this.state = ST_DECODING_NORMAL;
}
}
private void reset() {
this.insideString = false;
this.state = ST_INIT;
this.openBraces = 0;
}
});
}
}

View File

@ -0,0 +1,181 @@
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.codec.json;
import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.jetty.io.RuntimeIOException;
import org.json.JSONException;
import org.junit.Before;
import org.junit.Test;
import org.skyscreamer.jsonassert.JSONAssert;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;
import org.springframework.core.io.buffer.DataBuffer;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
/**
* @author Arjen Poutsma
*/
public class Jackson2TokenizerTests extends AbstractDataBufferAllocatingTestCase {
private JsonParser jsonParser;
private Jackson2Tokenizer tokenizer;
private ObjectMapper objectMapper;
@Before
public void createParser() throws IOException {
JsonFactory factory = new JsonFactory();
this.jsonParser = factory.createNonBlockingByteArrayParser();
this.objectMapper = new ObjectMapper(factory);
}
@Test
public void noTokenizeArrayElements() {
this.tokenizer = new Jackson2Tokenizer(this.jsonParser, false);
testTokenize(
singletonList("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}"),
singletonList("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}"));
testTokenize(
asList("{\"foo\": \"foofoo\"",
", \"bar\": \"barbar\"}"),
singletonList("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}"));
testTokenize(
singletonList("[{\"foo\": \"foofoo\", \"bar\": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]"),
singletonList("[{\"foo\": \"foofoo\", \"bar\": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]"));
testTokenize(
singletonList("[{\"foo\": \"bar\"},{\"foo\": \"baz\"}]"),
singletonList("[{\"foo\": \"bar\"},{\"foo\": \"baz\"}]"));
testTokenize(
asList("[{\"foo\": \"foofoo\", \"bar\"",
": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]"),
singletonList("[{\"foo\": \"foofoo\", \"bar\": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]"));
testTokenize(
asList("[",
"{\"id\":1,\"name\":\"Robert\"}",
",",
"{\"id\":2,\"name\":\"Raide\"}",
",",
"{\"id\":3,\"name\":\"Ford\"}",
"]"),
singletonList("[{\"id\":1,\"name\":\"Robert\"},{\"id\":2,\"name\":\"Raide\"},{\"id\":3,\"name\":\"Ford\"}]"));
}
@Test
public void tokenizeArrayElements() {
this.tokenizer = new Jackson2Tokenizer(this.jsonParser, true);
testTokenize(
singletonList("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}"),
singletonList("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}"));
testTokenize(
asList("{\"foo\": \"foofoo\"",
", \"bar\": \"barbar\"}"),
singletonList("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}"));
testTokenize(
singletonList("[{\"foo\": \"foofoo\", \"bar\": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]"),
asList("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}",
"{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}"));
testTokenize(
singletonList("[{\"foo\": \"bar\"},{\"foo\": \"baz\"}]"),
asList("{\"foo\": \"bar\"}",
"{\"foo\": \"baz\"}"));
testTokenize(
asList("[{\"foo\": \"foofoo\", \"bar\"",
": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]"),
asList("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}",
"{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}"));
testTokenize(
asList("[",
"{\"id\":1,\"name\":\"Robert\"}",
",",
"{\"id\":2,\"name\":\"Raide\"}",
",",
"{\"id\":3,\"name\":\"Ford\"}",
"]"),
asList("{\"id\":1,\"name\":\"Robert\"}",
"{\"id\":2,\"name\":\"Raide\"}",
"{\"id\":3,\"name\":\"Ford\"}"));
}
private void testTokenize(List<String> source, List<String> expected) {
Flux<DataBuffer> sourceFlux = Flux.fromIterable(source)
.map(this::stringBuffer);
Flux<String> result = sourceFlux
.flatMap(this.tokenizer)
.map(tokenBuffer -> {
try {
TreeNode root = this.objectMapper.readTree(tokenBuffer.asParser());
return this.objectMapper.writeValueAsString(root);
}
catch (IOException ex) {
throw new RuntimeIOException(ex);
}
});
StepVerifier.FirstStep<String> builder = StepVerifier.create(result);
for (String s : expected) {
builder.assertNext(new JSONAssertConsumer(s));
}
builder.verifyComplete();
}
private static class JSONAssertConsumer implements Consumer<String> {
private final String expected;
public JSONAssertConsumer(String expected) {
this.expected = expected;
}
@Override
public void accept(String s) {
try {
JSONAssert.assertEquals(this.expected, s, true);
}
catch (JSONException ex) {
throw new RuntimeException(ex);
}
}
}
}

View File

@ -1,131 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.codec.json;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;
import org.springframework.core.io.buffer.DataBuffer;
/**
* @author Sebastien Deleuze
*/
public class JsonObjectDecoderTests extends AbstractDataBufferAllocatingTestCase {
@Test
public void decodeSingleChunkToJsonObject() throws Exception {
JsonObjectDecoder decoder = new JsonObjectDecoder();
Flux<DataBuffer> source =
Flux.just(stringBuffer("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}"));
Flux<String> output =
decoder.decode(source, null, null, Collections.emptyMap()).map(JsonObjectDecoderTests::toString);
StepVerifier.create(output)
.expectNext("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}")
.expectComplete()
.verify();
}
@Test
public void decodeMultipleChunksToJsonObject() throws InterruptedException {
JsonObjectDecoder decoder = new JsonObjectDecoder();
Flux<DataBuffer> source = Flux.just(stringBuffer("{\"foo\": \"foofoo\""),
stringBuffer(", \"bar\": \"barbar\"}"));
Flux<String> output =
decoder.decode(source, null, null, Collections.emptyMap()).map(JsonObjectDecoderTests::toString);
StepVerifier.create(output)
.expectNext("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}")
.expectComplete()
.verify();
}
@Test
public void decodeSingleChunkToArray() throws InterruptedException {
JsonObjectDecoder decoder = new JsonObjectDecoder();
Flux<DataBuffer> source = Flux.just(stringBuffer(
"[{\"foo\": \"foofoo\", \"bar\": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]"));
Flux<String> output =
decoder.decode(source, null, null, Collections.emptyMap()).map(JsonObjectDecoderTests::toString);
StepVerifier.create(output)
.expectNext("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}")
.expectNext("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}")
.expectComplete()
.verify();
source = Flux.just(stringBuffer("[{\"foo\": \"bar\"},{\"foo\": \"baz\"}]"));
output = decoder.decode(source, null, null, Collections.emptyMap()).map(JsonObjectDecoderTests::toString);
StepVerifier.create(output)
.expectNext("{\"foo\": \"bar\"}")
.expectNext("{\"foo\": \"baz\"}")
.expectComplete()
.verify();
}
@Test
public void decodeMultipleChunksToArray() throws InterruptedException {
JsonObjectDecoder decoder = new JsonObjectDecoder();
Flux<DataBuffer> source =
Flux.just(stringBuffer("[{\"foo\": \"foofoo\", \"bar\""), stringBuffer(
": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]"));
Flux<String> output =
decoder.decode(source, null, null, Collections.emptyMap()).map(JsonObjectDecoderTests::toString);
StepVerifier.create(output)
.expectNext("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}")
.expectNext("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}")
.expectComplete()
.verify();
source = Flux.just(
stringBuffer("[{\"foo\": \""),
stringBuffer("bar\"},{\"fo"),
stringBuffer("o\": \"baz\"}"),
stringBuffer("]"));
output = decoder.decode(source, null, null, Collections.emptyMap()).map(JsonObjectDecoderTests::toString);
StepVerifier.create(output)
.expectNext("{\"foo\": \"bar\"}")
.expectNext("{\"foo\": \"baz\"}")
.expectComplete()
.verify();
// SPR-15013
source = Flux.just(stringBuffer("["), stringBuffer("{\"id\":1,\"name\":\"Robert\"}"),
stringBuffer(","), stringBuffer("{\"id\":2,\"name\":\"Raide\"}"),
stringBuffer(","), stringBuffer("{\"id\":3,\"name\":\"Ford\"}"),
stringBuffer("]"));
output = decoder.decode(source, null, null, Collections.emptyMap()).map(JsonObjectDecoderTests::toString);
StepVerifier.create(output)
.expectNext("{\"id\":1,\"name\":\"Robert\"}")
.expectNext("{\"id\":2,\"name\":\"Raide\"}")
.expectNext("{\"id\":3,\"name\":\"Ford\"}")
.expectComplete()
.verify();
}
private static String toString(DataBuffer buffer) {
byte[] b = new byte[buffer.readableByteCount()];
buffer.read(b);
return new String(b, StandardCharsets.UTF_8);
}
}