Limits on input stream in codecs

- Add maxInMemorySize property to Decoder and HttpMessageReader
  implementations that aggregate input to trigger
  DataBufferLimitException when reached.

- For codecs that call DataBufferUtils#join, there is now an overloaded
  variant with a maxInMemorySize extra argument. Internally, a custom
  LimitedDataBufferList is used to count and enforce the limit.

- Jackson2Tokenizer and XmlEventDecoder support those limits per
  streamed JSON object.

See gh-23884
This commit is contained in:
Rossen Stoyanchev 2019-10-28 14:26:26 +00:00
parent ce0b012f43
commit 89d053d7f4
16 changed files with 672 additions and 68 deletions

View File

@ -48,12 +48,39 @@ import org.springframework.util.MimeType;
@SuppressWarnings("deprecation")
public abstract class AbstractDataBufferDecoder<T> extends AbstractDecoder<T> {
private int maxInMemorySize = 256 * 1024;
protected AbstractDataBufferDecoder(MimeType... supportedMimeTypes) {
super(supportedMimeTypes);
}
/**
* Configure a limit on the number of bytes that can be buffered whenever
* the input stream needs to be aggregated. This can be a result of
* decoding to a single {@code DataBuffer},
* {@link java.nio.ByteBuffer ByteBuffer}, {@code byte[]},
* {@link org.springframework.core.io.Resource Resource}, {@code String}, etc.
* It can also occur when splitting the input stream, e.g. delimited text,
* in which case the limit applies to data buffered between delimiters.
* <p>By default this is set to 256K.
* @param byteCount the max number of bytes to buffer, or -1 for unlimited
* @since 5.1.11
*/
public void setMaxInMemorySize(int byteCount) {
this.maxInMemorySize = byteCount;
}
/**
* Return the {@link #setMaxInMemorySize configured} byte count limit.
* @since 5.1.11
*/
public int getMaxInMemorySize() {
return this.maxInMemorySize;
}
@Override
public Flux<T> decode(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
@ -65,7 +92,7 @@ public abstract class AbstractDataBufferDecoder<T> extends AbstractDecoder<T> {
public Mono<T> decodeToMono(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
return DataBufferUtils.join(input)
return DataBufferUtils.join(input, this.maxInMemorySize)
.map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints));
}

View File

@ -25,15 +25,18 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferLimitException;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DataBufferWrapper;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.core.io.buffer.LimitedDataBufferList;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.core.log.LogFormatUtils;
import org.springframework.lang.Nullable;
@ -91,12 +94,18 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
byte[][] delimiterBytes = getDelimiterBytes(mimeType);
// TODO: Drop Consumer and use bufferUntil with Supplier<LimistedDataBufferList> (reactor-core#1925)
// TODO: Drop doOnDiscard(LimitedDataBufferList.class, ...) (reactor-core#1924)
LimitedDataBufferConsumer limiter = new LimitedDataBufferConsumer(getMaxInMemorySize());
Flux<DataBuffer> inputFlux = Flux.defer(() -> {
DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes);
return Flux.from(input)
.concatMapIterable(buffer -> endFrameAfterDelimiter(buffer, matcher))
.doOnNext(limiter)
.bufferUntil(buffer -> buffer instanceof EndFrameBuffer)
.map(buffers -> joinAndStrip(buffers, this.stripDelimiter))
.doOnDiscard(LimitedDataBufferList.class, LimitedDataBufferList::releaseAndClear)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
});
@ -279,4 +288,34 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
}
/**
* Temporary measure for reactor-core#1925.
* Consumer that adds to a {@link LimitedDataBufferList} to enforce limits.
*/
private static class LimitedDataBufferConsumer implements Consumer<DataBuffer> {
private final LimitedDataBufferList bufferList;
public LimitedDataBufferConsumer(int maxInMemorySize) {
this.bufferList = new LimitedDataBufferList(maxInMemorySize);
}
@Override
public void accept(DataBuffer buffer) {
if (buffer instanceof EndFrameBuffer) {
this.bufferList.clear();
}
else {
try {
this.bufferList.add(buffer);
}
catch (DataBufferLimitException ex) {
DataBufferUtils.release(buffer);
throw ex;
}
}
}
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.io.buffer;
/**
* Exception that indicates the cumulative number of bytes consumed from a
* stream of {@link DataBuffer DataBuffer}'s exceeded some pre-configured limit.
* This can be raised when data buffers are cached and aggregated, e.g.
* {@link DataBufferUtils#join}. Or it could also be raised when data buffers
* have been released but a parsed representation is being aggregated, e.g. async
* parsing with Jackson.
*
* @author Rossen Stoyanchev
* @since 5.1.11
*/
@SuppressWarnings("serial")
public class DataBufferLimitException extends IllegalStateException {
public DataBufferLimitException(String message) {
super(message);
}
}

View File

@ -525,16 +525,35 @@ public abstract class DataBufferUtils {
*/
@SuppressWarnings("unchecked")
public static Mono<DataBuffer> join(Publisher<? extends DataBuffer> dataBuffers) {
Assert.notNull(dataBuffers, "'dataBuffers' must not be null");
return join(dataBuffers, -1);
}
if (dataBuffers instanceof Mono) {
return (Mono<DataBuffer>) dataBuffers;
/**
* Variant of {@link #join(Publisher)} that behaves the same way up until
* the specified max number of bytes to buffer. Once the limit is exceeded,
* {@link DataBufferLimitException} is raised.
* @param buffers the data buffers that are to be composed
* @param maxByteCount the max number of bytes to buffer, or -1 for unlimited
* @return a buffer with the aggregated content, possibly an empty Mono if
* the max number of bytes to buffer is exceeded.
* @throws DataBufferLimitException if maxByteCount is exceeded
* @since 5.1.11
*/
@SuppressWarnings("unchecked")
public static Mono<DataBuffer> join(Publisher<? extends DataBuffer> buffers, int maxByteCount) {
Assert.notNull(buffers, "'dataBuffers' must not be null");
if (buffers instanceof Mono) {
return (Mono<DataBuffer>) buffers;
}
return Flux.from(dataBuffers)
.collectList()
// TODO: Drop doOnDiscard(LimitedDataBufferList.class, ...) (reactor-core#1924)
return Flux.from(buffers)
.collect(() -> new LimitedDataBufferList(maxByteCount), LimitedDataBufferList::add)
.filter(list -> !list.isEmpty())
.map(list -> list.get(0).factory().join(list))
.doOnDiscard(LimitedDataBufferList.class, LimitedDataBufferList::releaseAndClear)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}

View File

@ -0,0 +1,157 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.io.buffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Predicate;
import reactor.core.publisher.Flux;
/**
* Custom {@link List} to collect data buffers with and enforce a
* limit on the total number of bytes buffered. For use with "collect" or
* other buffering operators in declarative APIs, e.g. {@link Flux}.
*
* <p>Adding elements increases the byte count and if the limit is exceeded,
* {@link DataBufferLimitException} is raised. {@link #clear()} resets the
* count. Remove and set are not supported.
*
* <p><strong>Note:</strong> This class does not automatically release the
* buffers it contains. It is usually preferable to use hooks such as
* {@link Flux#doOnDiscard} that also take care of cancel and error signals,
* or otherwise {@link #releaseAndClear()} can be used.
*
* @author Rossen Stoyanchev
* @since 5.1.11
*/
@SuppressWarnings("serial")
public class LimitedDataBufferList extends ArrayList<DataBuffer> {
private final int maxByteCount;
private int byteCount;
public LimitedDataBufferList(int maxByteCount) {
this.maxByteCount = maxByteCount;
}
@Override
public boolean add(DataBuffer buffer) {
boolean result = super.add(buffer);
if (result) {
updateCount(buffer.readableByteCount());
}
return result;
}
@Override
public void add(int index, DataBuffer buffer) {
super.add(index, buffer);
updateCount(buffer.readableByteCount());
}
@Override
public boolean addAll(Collection<? extends DataBuffer> collection) {
boolean result = super.addAll(collection);
collection.forEach(buffer -> updateCount(buffer.readableByteCount()));
return result;
}
@Override
public boolean addAll(int index, Collection<? extends DataBuffer> collection) {
boolean result = super.addAll(index, collection);
collection.forEach(buffer -> updateCount(buffer.readableByteCount()));
return result;
}
private void updateCount(int bytesToAdd) {
if (this.maxByteCount < 0) {
return;
}
if (bytesToAdd > Integer.MAX_VALUE - this.byteCount) {
raiseLimitException();
}
else {
this.byteCount += bytesToAdd;
if (this.byteCount > this.maxByteCount) {
raiseLimitException();
}
}
}
private void raiseLimitException() {
// Do not release here, it's likely down via doOnDiscard..
throw new DataBufferLimitException(
"Exceeded limit on max bytes to buffer : " + this.maxByteCount);
}
@Override
public DataBuffer remove(int index) {
throw new UnsupportedOperationException();
}
@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException();
}
@Override
protected void removeRange(int fromIndex, int toIndex) {
throw new UnsupportedOperationException();
}
@Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException();
}
@Override
public boolean removeIf(Predicate<? super DataBuffer> filter) {
throw new UnsupportedOperationException();
}
@Override
public DataBuffer set(int index, DataBuffer element) {
throw new UnsupportedOperationException();
}
@Override
public void clear() {
this.byteCount = 0;
super.clear();
}
/**
* Shortcut to {@link DataBufferUtils#release release} all data buffers and
* then {@link #clear()}.
*/
public void releaseAndClear() {
forEach(buf -> {
try {
DataBufferUtils.release(buf);
}
catch (Throwable ex) {
// Keep going..
}
});
clear();
}
}

View File

@ -29,6 +29,7 @@ import reactor.test.StepVerifier;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferLimitException;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
@ -127,6 +128,20 @@ class StringDecoderTests extends AbstractDecoderTests<StringDecoder> {
.verify());
}
@Test
void decodeNewLineWithLimit() {
Flux<DataBuffer> input = Flux.just(
stringBuffer("abc\n"),
stringBuffer("defg\n"),
stringBuffer("hijkl\n")
);
this.decoder.setMaxInMemorySize(5);
testDecode(input, String.class, step ->
step.expectNext("abc", "defg")
.verifyError(DataBufferLimitException.class));
}
@Test
void decodeNewLineIncludeDelimiters() {
this.decoder = StringDecoder.allMimeTypes(StringDecoder.DEFAULT_DELIMITERS, false);

View File

@ -813,13 +813,27 @@ class DataBufferUtilsTests extends AbstractDataBufferAllocatingTests {
Mono<DataBuffer> result = DataBufferUtils.join(flux);
StepVerifier.create(result)
.consumeNextWith(dataBuffer -> {
assertThat(DataBufferTestUtils.dumpString(dataBuffer, StandardCharsets.UTF_8)).isEqualTo("foobarbaz");
release(dataBuffer);
.consumeNextWith(buf -> {
assertThat(DataBufferTestUtils.dumpString(buf, StandardCharsets.UTF_8)).isEqualTo("foobarbaz");
release(buf);
})
.verifyComplete();
}
@ParameterizedDataBufferAllocatingTest
void joinWithLimit(String displayName, DataBufferFactory bufferFactory) {
super.bufferFactory = bufferFactory;
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
DataBuffer baz = stringBuffer("baz");
Flux<DataBuffer> flux = Flux.just(foo, bar, baz);
Mono<DataBuffer> result = DataBufferUtils.join(flux, 8);
StepVerifier.create(result)
.verifyError(DataBufferLimitException.class);
}
@ParameterizedDataBufferAllocatingTest
void joinErrors(String displayName, DataBufferFactory bufferFactory) {
super.bufferFactory = bufferFactory;

View File

@ -0,0 +1,57 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.io.buffer;
import java.nio.charset.StandardCharsets;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
/**
* Unit tests for {@link LimitedDataBufferList}.
* @author Rossen Stoyanchev
* @since 5.1.11
*/
public class LimitedDataBufferListTests {
private final static DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
@Test
void limitEnforced() {
Assertions.assertThatThrownBy(() -> new LimitedDataBufferList(5).add(toDataBuffer("123456")))
.isInstanceOf(DataBufferLimitException.class);
}
@Test
void limitIgnored() {
new LimitedDataBufferList(-1).add(toDataBuffer("123456"));
}
@Test
void clearResetsCount() {
LimitedDataBufferList list = new LimitedDataBufferList(5);
list.add(toDataBuffer("12345"));
list.clear();
list.add(toDataBuffer("12345"));
}
private static DataBuffer toDataBuffer(String value) {
return bufferFactory.wrap(value.getBytes(StandardCharsets.UTF_8));
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -30,6 +30,7 @@ import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Hints;
import org.springframework.core.io.buffer.DataBufferLimitException;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.log.LogFormatUtils;
import org.springframework.http.MediaType;
@ -62,6 +63,8 @@ public class FormHttpMessageReader extends LoggingCodecSupport
private Charset defaultCharset = DEFAULT_CHARSET;
private int maxInMemorySize = 256 * 1024;
/**
* Set the default character set to use for reading form data when the
@ -80,6 +83,26 @@ public class FormHttpMessageReader extends LoggingCodecSupport
return this.defaultCharset;
}
/**
* Set the max number of bytes for input form data. As form data is buffered
* before it is parsed, this helps to limit the amount of buffering. Once
* the limit is exceeded, {@link DataBufferLimitException} is raised.
* <p>By default this is set to 256K.
* @param byteCount the max number of bytes to buffer, or -1 for unlimited
* @since 5.1.11
*/
public void setMaxInMemorySize(int byteCount) {
this.maxInMemorySize = byteCount;
}
/**
* Return the {@link #setMaxInMemorySize configured} byte count limit.
* @since 5.1.11
*/
public int getMaxInMemorySize() {
return this.maxInMemorySize;
}
@Override
public boolean canRead(ResolvableType elementType, @Nullable MediaType mediaType) {
@ -105,7 +128,7 @@ public class FormHttpMessageReader extends LoggingCodecSupport
MediaType contentType = message.getHeaders().getContentType();
Charset charset = getMediaTypeCharset(contentType);
return DataBufferUtils.join(message.getBody())
return DataBufferUtils.join(message.getBody(), this.maxInMemorySize)
.map(buffer -> {
CharBuffer charBuffer = charset.decode(buffer.asByteBuffer());
String body = charBuffer.toString();

View File

@ -37,6 +37,7 @@ import org.springframework.core.codec.CodecException;
import org.springframework.core.codec.DecodingException;
import org.springframework.core.codec.Hints;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferLimitException;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.log.LogFormatUtils;
import org.springframework.http.codec.HttpMessageDecoder;
@ -59,6 +60,9 @@ import org.springframework.util.MimeType;
*/
public abstract class AbstractJackson2Decoder extends Jackson2CodecSupport implements HttpMessageDecoder<Object> {
private int maxInMemorySize = 256 * 1024;
/**
* Constructor with a Jackson {@link ObjectMapper} to use.
*/
@ -67,6 +71,28 @@ public abstract class AbstractJackson2Decoder extends Jackson2CodecSupport imple
}
/**
* Set the max number of bytes that can be buffered by this decoder. This
* is either the size of the entire input when decoding as a whole, or the
* size of one top-level JSON object within a JSON stream. When the limit
* is exceeded, {@link DataBufferLimitException} is raised.
* <p>By default this is set to 256K.
* @param byteCount the max number of bytes to buffer, or -1 for unlimited
* @since 5.1.11
*/
public void setMaxInMemorySize(int byteCount) {
this.maxInMemorySize = byteCount;
}
/**
* Return the {@link #setMaxInMemorySize configured} byte count limit.
* @since 5.1.11
*/
public int getMaxInMemorySize() {
return this.maxInMemorySize;
}
@Override
public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType) {
JavaType javaType = getObjectMapper().getTypeFactory().constructType(elementType.getType());
@ -81,7 +107,7 @@ public abstract class AbstractJackson2Decoder extends Jackson2CodecSupport imple
ObjectMapper mapper = getObjectMapper();
Flux<TokenBuffer> tokens = Jackson2Tokenizer.tokenize(
Flux.from(input), mapper.getFactory(), mapper, true);
Flux.from(input), mapper.getFactory(), mapper, true, getMaxInMemorySize());
ObjectReader reader = getObjectReader(elementType, hints);
@ -103,7 +129,7 @@ public abstract class AbstractJackson2Decoder extends Jackson2CodecSupport imple
public Mono<Object> decodeToMono(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
return DataBufferUtils.join(input)
return DataBufferUtils.join(input, this.maxInMemorySize)
.map(dataBuffer -> decode(dataBuffer, elementType, mimeType, hints));
}

View File

@ -35,6 +35,7 @@ import reactor.core.publisher.Flux;
import org.springframework.core.codec.DecodingException;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferLimitException;
import org.springframework.core.io.buffer.DataBufferUtils;
/**
@ -61,30 +62,39 @@ final class Jackson2Tokenizer {
private int arrayDepth;
private final int maxInMemorySize;
private int byteCount;
// TODO: change to ByteBufferFeeder when supported by Jackson
// See https://github.com/FasterXML/jackson-core/issues/478
private final ByteArrayFeeder inputFeeder;
private Jackson2Tokenizer(
JsonParser parser, DeserializationContext deserializationContext, boolean tokenizeArrayElements) {
private Jackson2Tokenizer(JsonParser parser, DeserializationContext deserializationContext,
boolean tokenizeArrayElements, int maxInMemorySize) {
this.parser = parser;
this.deserializationContext = deserializationContext;
this.tokenizeArrayElements = tokenizeArrayElements;
this.tokenBuffer = new TokenBuffer(parser, deserializationContext);
this.inputFeeder = (ByteArrayFeeder) this.parser.getNonBlockingInputFeeder();
this.maxInMemorySize = maxInMemorySize;
}
private List<TokenBuffer> tokenize(DataBuffer dataBuffer) {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
int bufferSize = dataBuffer.readableByteCount();
byte[] bytes = new byte[bufferSize];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
try {
this.inputFeeder.feedInput(bytes, 0, bytes.length);
return parseTokenBufferFlux();
List<TokenBuffer> result = parseTokenBufferFlux();
assertInMemorySize(bufferSize, result);
return result;
}
catch (JsonProcessingException ex) {
throw new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex);
@ -174,18 +184,40 @@ final class Jackson2Tokenizer {
(token == JsonToken.END_ARRAY && this.arrayDepth == 0));
}
private void assertInMemorySize(int currentBufferSize, List<TokenBuffer> result) {
if (this.maxInMemorySize >= 0) {
if (!result.isEmpty()) {
this.byteCount = 0;
}
else if (currentBufferSize > Integer.MAX_VALUE - this.byteCount) {
raiseLimitException();
}
else {
this.byteCount += currentBufferSize;
if (this.byteCount > this.maxInMemorySize) {
raiseLimitException();
}
}
}
}
private void raiseLimitException() {
throw new DataBufferLimitException(
"Exceeded limit on max bytes per JSON object: " + this.maxInMemorySize);
}
/**
* Tokenize the given {@code Flux<DataBuffer>} into {@code Flux<TokenBuffer>}.
* @param dataBuffers the source data buffers
* @param jsonFactory the factory to use
* @param objectMapper the current mapper instance
* @param tokenizeArrayElements if {@code true} and the "top level" JSON object is
* @param tokenizeArrays if {@code true} and the "top level" JSON object is
* an array, each element is returned individually immediately after it is received
* @return the resulting token buffers
*/
public static Flux<TokenBuffer> tokenize(Flux<DataBuffer> dataBuffers, JsonFactory jsonFactory,
ObjectMapper objectMapper, boolean tokenizeArrayElements) {
ObjectMapper objectMapper, boolean tokenizeArrays, int maxInMemorySize) {
try {
JsonParser parser = jsonFactory.createNonBlockingByteArrayParser();
@ -194,7 +226,7 @@ final class Jackson2Tokenizer {
context = ((DefaultDeserializationContext) context).createInstance(
objectMapper.getDeserializationConfig(), parser, objectMapper.getInjectableValues());
}
Jackson2Tokenizer tokenizer = new Jackson2Tokenizer(parser, context, tokenizeArrayElements);
Jackson2Tokenizer tokenizer = new Jackson2Tokenizer(parser, context, tokenizeArrays, maxInMemorySize);
return dataBuffers.concatMapIterable(tokenizer::tokenize).concatWith(tokenizer.endOfInput());
}
catch (IOException ex) {

View File

@ -36,6 +36,7 @@ import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.DecodingException;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferLimitException;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
@ -127,7 +128,7 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
public Mono<Message> decodeToMono(Publisher<DataBuffer> inputStream, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
return DataBufferUtils.join(inputStream)
return DataBufferUtils.join(inputStream, this.maxMessageSize)
.map(dataBuffer -> decode(dataBuffer, elementType, mimeType, hints));
}
@ -205,8 +206,8 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
return messages;
}
if (this.messageBytesToRead > this.maxMessageSize) {
throw new DecodingException(
"The number of bytes to read from the incoming stream " +
throw new DataBufferLimitException(
"The number of bytes to read for message " +
"(" + this.messageBytesToRead + ") exceeds " +
"the configured limit (" + this.maxMessageSize + ")");
}

View File

@ -49,6 +49,7 @@ import org.springframework.core.codec.CodecException;
import org.springframework.core.codec.DecodingException;
import org.springframework.core.codec.Hints;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferLimitException;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.log.LogFormatUtils;
import org.springframework.lang.Nullable;
@ -87,6 +88,8 @@ public class Jaxb2XmlDecoder extends AbstractDecoder<Object> {
private Function<Unmarshaller, Unmarshaller> unmarshallerProcessor = Function.identity();
private int maxInMemorySize = 256 * 1024;
public Jaxb2XmlDecoder() {
super(MimeTypeUtils.APPLICATION_XML, MimeTypeUtils.TEXT_XML);
@ -119,6 +122,28 @@ public class Jaxb2XmlDecoder extends AbstractDecoder<Object> {
return this.unmarshallerProcessor;
}
/**
* Set the max number of bytes that can be buffered by this decoder.
* This is either the size of the entire input when decoding as a whole, or when
* using async parsing with Aalto XML, it is the size of one top-level XML tree.
* When the limit is exceeded, {@link DataBufferLimitException} is raised.
* <p>By default this is set to 256K.
* @param byteCount the max number of bytes to buffer, or -1 for unlimited
* @since 5.1.11
*/
public void setMaxInMemorySize(int byteCount) {
this.maxInMemorySize = byteCount;
this.xmlEventDecoder.setMaxInMemorySize(byteCount);
}
/**
* Return the {@link #setMaxInMemorySize configured} byte count limit.
* @since 5.1.11
*/
public int getMaxInMemorySize() {
return this.maxInMemorySize;
}
@Override
public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType) {
@ -153,7 +178,7 @@ public class Jaxb2XmlDecoder extends AbstractDecoder<Object> {
public Mono<Object> decodeToMono(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
return DataBufferUtils.join(input)
return DataBufferUtils.join(input, this.maxInMemorySize)
.map(dataBuffer -> decode(dataBuffer, elementType, mimeType, hints));
}

View File

@ -40,6 +40,7 @@ import reactor.core.publisher.Flux;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.AbstractDecoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferLimitException;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;
@ -89,26 +90,50 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> {
boolean useAalto = aaltoPresent;
private int maxInMemorySize = 256 * 1024;
public XmlEventDecoder() {
super(MimeTypeUtils.APPLICATION_XML, MimeTypeUtils.TEXT_XML);
}
/**
* Set the max number of bytes that can be buffered by this decoder. This
* is either the size the entire input when decoding as a whole, or when
* using async parsing via Aalto XML, it is size one top-level XML tree.
* When the limit is exceeded, {@link DataBufferLimitException} is raised.
* <p>By default this is set to 256K.
* @param byteCount the max number of bytes to buffer, or -1 for unlimited
* @since 5.1.11
*/
public void setMaxInMemorySize(int byteCount) {
this.maxInMemorySize = byteCount;
}
/**
* Return the {@link #setMaxInMemorySize configured} byte count limit.
* @since 5.1.11
*/
public int getMaxInMemorySize() {
return this.maxInMemorySize;
}
@Override
@SuppressWarnings({"rawtypes", "unchecked", "cast"}) // XMLEventReader is Iterator<Object> on JDK 9
public Flux<XMLEvent> decode(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
if (this.useAalto) {
AaltoDataBufferToXmlEvent mapper = new AaltoDataBufferToXmlEvent();
AaltoDataBufferToXmlEvent mapper = new AaltoDataBufferToXmlEvent(this.maxInMemorySize);
return Flux.from(input)
.flatMapIterable(mapper)
.doFinally(signalType -> mapper.endOfInput());
}
else {
return DataBufferUtils.join(input).
flatMapIterable(buffer -> {
return DataBufferUtils.join(input, this.maxInMemorySize)
.flatMapIterable(buffer -> {
try {
InputStream is = buffer.asInputStream();
Iterator eventReader = inputFactory.createXMLEventReader(is);
@ -140,10 +165,22 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> {
private final XMLEventAllocator eventAllocator = EventAllocatorImpl.getDefaultInstance();
private final int maxInMemorySize;
private int byteCount;
private int elementDepth;
public AaltoDataBufferToXmlEvent(int maxInMemorySize) {
this.maxInMemorySize = maxInMemorySize;
}
@Override
public List<? extends XMLEvent> apply(DataBuffer dataBuffer) {
try {
increaseByteCount(dataBuffer);
this.streamReader.getInputFeeder().feedInput(dataBuffer.asByteBuffer());
List<XMLEvent> events = new ArrayList<>();
while (true) {
@ -157,8 +194,12 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> {
if (event.isEndDocument()) {
break;
}
checkDepthAndResetByteCount(event);
}
}
if (this.maxInMemorySize > 0 && this.byteCount > this.maxInMemorySize) {
raiseLimitException();
}
return events;
}
catch (XMLStreamException ex) {
@ -169,9 +210,40 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> {
}
}
private void increaseByteCount(DataBuffer dataBuffer) {
if (this.maxInMemorySize > 0) {
if (dataBuffer.readableByteCount() > Integer.MAX_VALUE - this.byteCount) {
raiseLimitException();
}
else {
this.byteCount += dataBuffer.readableByteCount();
}
}
}
private void checkDepthAndResetByteCount(XMLEvent event) {
if (this.maxInMemorySize > 0) {
if (event.isStartElement()) {
this.byteCount = this.elementDepth == 1 ? 0 : this.byteCount;
this.elementDepth++;
}
else if (event.isEndElement()) {
this.elementDepth--;
this.byteCount = this.elementDepth == 1 ? 0 : this.byteCount;
}
}
}
private void raiseLimitException() {
throw new DataBufferLimitException(
"Exceeded limit on max bytes per XML top-level node: " + this.maxInMemorySize);
}
public void endOfInput() {
this.streamReader.getInputFeeder().endOfInput();
}
}
}

View File

@ -20,7 +20,6 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.function.Consumer;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.TreeNode;
@ -36,6 +35,7 @@ import reactor.test.StepVerifier;
import org.springframework.core.codec.DecodingException;
import org.springframework.core.io.buffer.AbstractLeakCheckingTests;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferLimitException;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
@ -181,11 +181,68 @@ public class Jackson2TokenizerTests extends AbstractLeakCheckingTests {
testTokenize(asList("[1", ",2,", "3]"), asList("1", "2", "3"), true);
}
private void testTokenize(List<String> input, List<String> output, boolean tokenize) {
StepVerifier.FirstStep<String> builder = StepVerifier.create(decode(input, tokenize, -1));
output.forEach(expected -> builder.assertNext(actual -> {
try {
JSONAssert.assertEquals(expected, actual, true);
}
catch (JSONException ex) {
throw new RuntimeException(ex);
}
}));
builder.verifyComplete();
}
@Test
public void testLimit() {
List<String> source = asList("[",
"{", "\"id\":1,\"name\":\"Dan\"", "},",
"{", "\"id\":2,\"name\":\"Ron\"", "},",
"{", "\"id\":3,\"name\":\"Bartholomew\"", "}",
"]");
String expected = String.join("", source);
int maxInMemorySize = expected.length();
StepVerifier.create(decode(source, false, maxInMemorySize))
.expectNext(expected)
.verifyComplete();
StepVerifier.create(decode(source, false, maxInMemorySize - 1))
.expectError(DataBufferLimitException.class);
}
@Test
public void testLimitTokenized() {
List<String> source = asList("[",
"{", "\"id\":1, \"name\":\"Dan\"", "},",
"{", "\"id\":2, \"name\":\"Ron\"", "},",
"{", "\"id\":3, \"name\":\"Bartholomew\"", "}",
"]");
String expected = "{\"id\":3,\"name\":\"Bartholomew\"}";
int maxInMemorySize = expected.length();
StepVerifier.create(decode(source, true, maxInMemorySize))
.expectNext("{\"id\":1,\"name\":\"Dan\"}")
.expectNext("{\"id\":2,\"name\":\"Ron\"}")
.expectNext(expected)
.verifyComplete();
StepVerifier.create(decode(source, true, maxInMemorySize - 1))
.expectNext("{\"id\":1,\"name\":\"Dan\"}")
.expectNext("{\"id\":2,\"name\":\"Ron\"}")
.verifyError(DataBufferLimitException.class);
}
@Test
public void errorInStream() {
DataBuffer buffer = stringBuffer("{\"id\":1,\"name\":");
Flux<DataBuffer> source = Flux.just(buffer).concatWith(Flux.error(new RuntimeException()));
Flux<TokenBuffer> result = Jackson2Tokenizer.tokenize(source, this.jsonFactory, this.objectMapper, true);
Flux<TokenBuffer> result = Jackson2Tokenizer.tokenize(source, this.jsonFactory, this.objectMapper, true, -1);
StepVerifier.create(result)
.expectError(RuntimeException.class)
@ -195,7 +252,7 @@ public class Jackson2TokenizerTests extends AbstractLeakCheckingTests {
@Test // SPR-16521
public void jsonEOFExceptionIsWrappedAsDecodingError() {
Flux<DataBuffer> source = Flux.just(stringBuffer("{\"status\": \"noClosingQuote}"));
Flux<TokenBuffer> tokens = Jackson2Tokenizer.tokenize(source, this.jsonFactory, this.objectMapper, false);
Flux<TokenBuffer> tokens = Jackson2Tokenizer.tokenize(source, this.jsonFactory, this.objectMapper, false, -1);
StepVerifier.create(tokens)
.expectError(DecodingException.class)
@ -203,12 +260,13 @@ public class Jackson2TokenizerTests extends AbstractLeakCheckingTests {
}
private void testTokenize(List<String> source, List<String> expected, boolean tokenizeArrayElements) {
private Flux<String> decode(List<String> source, boolean tokenize, int maxInMemorySize) {
Flux<TokenBuffer> tokens = Jackson2Tokenizer.tokenize(
Flux.fromIterable(source).map(this::stringBuffer),
this.jsonFactory, this.objectMapper, tokenizeArrayElements);
this.jsonFactory, this.objectMapper, tokenize, maxInMemorySize);
Flux<String> result = tokens
return tokens
.map(tokenBuffer -> {
try {
TreeNode root = this.objectMapper.readTree(tokenBuffer.asParser());
@ -218,10 +276,6 @@ public class Jackson2TokenizerTests extends AbstractLeakCheckingTests {
throw new UncheckedIOException(ex);
}
});
StepVerifier.FirstStep<String> builder = StepVerifier.create(result);
expected.forEach(s -> builder.assertNext(new JSONAssertConsumer(s)));
builder.verifyComplete();
}
private DataBuffer stringBuffer(String value) {
@ -231,24 +285,4 @@ public class Jackson2TokenizerTests extends AbstractLeakCheckingTests {
return buffer;
}
private static class JSONAssertConsumer implements Consumer<String> {
private final String expected;
JSONAssertConsumer(String expected) {
this.expected = expected;
}
@Override
public void accept(String s) {
try {
JSONAssert.assertEquals(this.expected, s, true);
}
catch (JSONException ex) {
throw new RuntimeException(ex);
}
}
}
}

View File

@ -28,6 +28,7 @@ import reactor.test.StepVerifier;
import org.springframework.core.io.buffer.AbstractLeakCheckingTests;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferLimitException;
import static org.assertj.core.api.Assertions.assertThat;
@ -44,11 +45,12 @@ public class XmlEventDecoderTests extends AbstractLeakCheckingTests {
private XmlEventDecoder decoder = new XmlEventDecoder();
@Test
public void toXMLEventsAalto() {
Flux<XMLEvent> events =
this.decoder.decode(stringBuffer(XML), null, null, Collections.emptyMap());
this.decoder.decode(stringBufferMono(XML), null, null, Collections.emptyMap());
StepVerifier.create(events)
.consumeNextWith(e -> assertThat(e.isStartDocument()).isTrue())
@ -69,7 +71,7 @@ public class XmlEventDecoderTests extends AbstractLeakCheckingTests {
decoder.useAalto = false;
Flux<XMLEvent> events =
this.decoder.decode(stringBuffer(XML), null, null, Collections.emptyMap());
this.decoder.decode(stringBufferMono(XML), null, null, Collections.emptyMap());
StepVerifier.create(events)
.consumeNextWith(e -> assertThat(e.isStartDocument()).isTrue())
@ -86,10 +88,32 @@ public class XmlEventDecoderTests extends AbstractLeakCheckingTests {
.verify();
}
@Test
public void toXMLEventsWithLimit() {
this.decoder.setMaxInMemorySize(6);
Flux<String> source = Flux.just(
"<pojo>", "<foo>", "foofoo", "</foo>", "<bar>", "barbarbar", "</bar>", "</pojo>");
Flux<XMLEvent> events = this.decoder.decode(
source.map(this::stringBuffer), null, null, Collections.emptyMap());
StepVerifier.create(events)
.consumeNextWith(e -> assertThat(e.isStartDocument()).isTrue())
.consumeNextWith(e -> assertStartElement(e, "pojo"))
.consumeNextWith(e -> assertStartElement(e, "foo"))
.consumeNextWith(e -> assertCharacters(e, "foofoo"))
.consumeNextWith(e -> assertEndElement(e, "foo"))
.consumeNextWith(e -> assertStartElement(e, "bar"))
.expectError(DataBufferLimitException.class)
.verify();
}
@Test
public void decodeErrorAalto() {
Flux<DataBuffer> source = Flux.concat(
stringBuffer("<pojo>"),
stringBufferMono("<pojo>"),
Flux.error(new RuntimeException()));
Flux<XMLEvent> events =
@ -107,7 +131,7 @@ public class XmlEventDecoderTests extends AbstractLeakCheckingTests {
decoder.useAalto = false;
Flux<DataBuffer> source = Flux.concat(
stringBuffer("<pojo>"),
stringBufferMono("<pojo>"),
Flux.error(new RuntimeException()));
Flux<XMLEvent> events =
@ -133,13 +157,15 @@ public class XmlEventDecoderTests extends AbstractLeakCheckingTests {
assertThat(event.asCharacters().getData()).isEqualTo(expectedData);
}
private Mono<DataBuffer> stringBuffer(String value) {
return Mono.defer(() -> {
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = this.bufferFactory.allocateBuffer(bytes.length);
buffer.write(bytes);
return Mono.just(buffer);
});
private DataBuffer stringBuffer(String value) {
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = this.bufferFactory.allocateBuffer(bytes.length);
buffer.write(bytes);
return buffer;
}
private Mono<DataBuffer> stringBufferMono(String value) {
return Mono.defer(() -> Mono.just(stringBuffer(value)));
}
}