Process tokens after each feed in Jackson2Tokenizer
This commit ensures that we process after each fed buffer in Jackson2Tokenizer, instead of after all fed buffers. Closes gh-31747
This commit is contained in:
parent
1afea0b144
commit
0e6c17f518
|
@ -91,10 +91,12 @@ final class Jackson2Tokenizer {
|
|||
private List<TokenBuffer> tokenize(DataBuffer dataBuffer) {
|
||||
try {
|
||||
int bufferSize = dataBuffer.readableByteCount();
|
||||
List<TokenBuffer> tokens = new ArrayList<>();
|
||||
if (this.inputFeeder instanceof ByteBufferFeeder byteBufferFeeder) {
|
||||
try (DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers()) {
|
||||
while (iterator.hasNext()) {
|
||||
byteBufferFeeder.feedInput(iterator.next());
|
||||
parseTokens(tokens);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -102,10 +104,10 @@ final class Jackson2Tokenizer {
|
|||
byte[] bytes = new byte[bufferSize];
|
||||
dataBuffer.read(bytes);
|
||||
byteArrayFeeder.feedInput(bytes, 0, bufferSize);
|
||||
parseTokens(tokens);
|
||||
}
|
||||
List<TokenBuffer> result = parseTokenBufferFlux();
|
||||
assertInMemorySize(bufferSize, result);
|
||||
return result;
|
||||
assertInMemorySize(bufferSize, tokens);
|
||||
return tokens;
|
||||
}
|
||||
catch (JsonProcessingException ex) {
|
||||
throw new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex);
|
||||
|
@ -122,7 +124,9 @@ final class Jackson2Tokenizer {
|
|||
return Flux.defer(() -> {
|
||||
this.inputFeeder.endOfInput();
|
||||
try {
|
||||
return Flux.fromIterable(parseTokenBufferFlux());
|
||||
List<TokenBuffer> tokens = new ArrayList<>();
|
||||
parseTokens(tokens);
|
||||
return Flux.fromIterable(tokens);
|
||||
}
|
||||
catch (JsonProcessingException ex) {
|
||||
throw new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex);
|
||||
|
@ -133,9 +137,7 @@ final class Jackson2Tokenizer {
|
|||
});
|
||||
}
|
||||
|
||||
private List<TokenBuffer> parseTokenBufferFlux() throws IOException {
|
||||
List<TokenBuffer> result = new ArrayList<>();
|
||||
|
||||
private void parseTokens(List<TokenBuffer> tokens) throws IOException {
|
||||
// SPR-16151: Smile data format uses null to separate documents
|
||||
boolean previousNull = false;
|
||||
while (!this.parser.isClosed()) {
|
||||
|
@ -153,13 +155,12 @@ final class Jackson2Tokenizer {
|
|||
}
|
||||
updateDepth(token);
|
||||
if (!this.tokenizeArrayElements) {
|
||||
processTokenNormal(token, result);
|
||||
processTokenNormal(token, tokens);
|
||||
}
|
||||
else {
|
||||
processTokenArray(token, result);
|
||||
processTokenArray(token, tokens);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private void updateDepth(JsonToken token) {
|
||||
|
|
|
@ -27,6 +27,10 @@ import com.fasterxml.jackson.core.JsonToken;
|
|||
import com.fasterxml.jackson.core.TreeNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.util.TokenBuffer;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.buffer.UnpooledByteBufAllocator;
|
||||
import org.json.JSONException;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
@ -37,6 +41,7 @@ import reactor.test.StepVerifier;
|
|||
import org.springframework.core.codec.DecodingException;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferLimitException;
|
||||
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||
import org.springframework.core.testfixture.io.buffer.AbstractLeakCheckingTests;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
|
@ -337,22 +342,47 @@ public class Jackson2TokenizerTests extends AbstractLeakCheckingTests {
|
|||
.verifyComplete();
|
||||
}
|
||||
|
||||
// gh-31747
|
||||
@Test
|
||||
public void compositeNettyBuffer() {
|
||||
ByteBufAllocator allocator = UnpooledByteBufAllocator.DEFAULT;
|
||||
ByteBuf firstByteBuf = allocator.buffer();
|
||||
firstByteBuf.writeBytes("{\"foo\": \"foofoo\"".getBytes(StandardCharsets.UTF_8));
|
||||
ByteBuf secondBuf = allocator.buffer();
|
||||
secondBuf.writeBytes(", \"bar\": \"barbar\"}".getBytes(StandardCharsets.UTF_8));
|
||||
CompositeByteBuf composite = allocator.compositeBuffer();
|
||||
composite.addComponent(true, firstByteBuf);
|
||||
composite.addComponent(true, secondBuf);
|
||||
|
||||
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(allocator);
|
||||
Flux<DataBuffer> source = Flux.just(bufferFactory.wrap(composite));
|
||||
Flux<TokenBuffer> tokens = Jackson2Tokenizer.tokenize(source, this.jsonFactory, this.objectMapper, false, false, -1);
|
||||
|
||||
Flux<String> strings = tokens.map(this::tokenToString);
|
||||
|
||||
StepVerifier.create(strings)
|
||||
.assertNext(s -> assertThat(s).isEqualTo("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}"))
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
|
||||
private Flux<String> decode(List<String> source, boolean tokenize, int maxInMemorySize) {
|
||||
|
||||
Flux<TokenBuffer> tokens = Jackson2Tokenizer.tokenize(
|
||||
Flux.fromIterable(source).map(this::stringBuffer),
|
||||
this.jsonFactory, this.objectMapper, tokenize, false, maxInMemorySize);
|
||||
|
||||
return tokens
|
||||
.map(tokenBuffer -> {
|
||||
try {
|
||||
TreeNode root = this.objectMapper.readTree(tokenBuffer.asParser());
|
||||
return this.objectMapper.writeValueAsString(root);
|
||||
}
|
||||
catch (IOException ex) {
|
||||
throw new UncheckedIOException(ex);
|
||||
}
|
||||
});
|
||||
return tokens.map(this::tokenToString);
|
||||
}
|
||||
|
||||
private String tokenToString(TokenBuffer tokenBuffer) {
|
||||
try {
|
||||
TreeNode root = this.objectMapper.readTree(tokenBuffer.asParser());
|
||||
return this.objectMapper.writeValueAsString(root);
|
||||
}
|
||||
catch (IOException ex) {
|
||||
throw new UncheckedIOException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
private DataBuffer stringBuffer(String value) {
|
||||
|
|
Loading…
Reference in New Issue