From 89d053d7f45fb1886b044be5e3276927d7a7798e Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 28 Oct 2019 14:26:26 +0000 Subject: [PATCH] Limits on input stream in codecs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add maxInMemorySize property to Decoder and HttpMessageReader   implementations that aggregate input to trigger   DataBufferLimitException when reached. - For codecs that call DataBufferUtils#join, there is now an overloaded   variant with a maxInMemorySize extra argument. Internally, a custom   LimitedDataBufferList is used to count and enforce the limit. - Jackson2Tokenizer and XmlEventDecoder support those limits per   streamed JSON object. See gh-23884 --- .../core/codec/AbstractDataBufferDecoder.java | 29 +++- .../core/codec/StringDecoder.java | 39 +++++ .../io/buffer/DataBufferLimitException.java | 37 +++++ .../core/io/buffer/DataBufferUtils.java | 29 +++- .../core/io/buffer/LimitedDataBufferList.java | 157 ++++++++++++++++++ .../core/codec/StringDecoderTests.java | 15 ++ .../core/io/buffer/DataBufferUtilsTests.java | 20 ++- .../io/buffer/LimitedDataBufferListTests.java | 57 +++++++ .../http/codec/FormHttpMessageReader.java | 27 ++- .../codec/json/AbstractJackson2Decoder.java | 30 +++- .../http/codec/json/Jackson2Tokenizer.java | 46 ++++- .../http/codec/protobuf/ProtobufDecoder.java | 7 +- .../http/codec/xml/Jaxb2XmlDecoder.java | 27 ++- .../http/codec/xml/XmlEventDecoder.java | 78 ++++++++- .../codec/json/Jackson2TokenizerTests.java | 94 +++++++---- .../http/codec/xml/XmlEventDecoderTests.java | 48 ++++-- 16 files changed, 672 insertions(+), 68 deletions(-) create mode 100644 spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferLimitException.java create mode 100644 spring-core/src/main/java/org/springframework/core/io/buffer/LimitedDataBufferList.java create mode 100644 spring-core/src/test/java/org/springframework/core/io/buffer/LimitedDataBufferListTests.java diff --git a/spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java index f6a29a747aa..ff01c0b47f4 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java @@ -48,12 +48,39 @@ import org.springframework.util.MimeType; @SuppressWarnings("deprecation") public abstract class AbstractDataBufferDecoder extends AbstractDecoder { + private int maxInMemorySize = 256 * 1024; + protected AbstractDataBufferDecoder(MimeType... supportedMimeTypes) { super(supportedMimeTypes); } + /** + * Configure a limit on the number of bytes that can be buffered whenever + * the input stream needs to be aggregated. This can be a result of + * decoding to a single {@code DataBuffer}, + * {@link java.nio.ByteBuffer ByteBuffer}, {@code byte[]}, + * {@link org.springframework.core.io.Resource Resource}, {@code String}, etc. + * It can also occur when splitting the input stream, e.g. delimited text, + * in which case the limit applies to data buffered between delimiters. + *

By default this is set to 256K. + * @param byteCount the max number of bytes to buffer, or -1 for unlimited + * @since 5.1.11 + */ + public void setMaxInMemorySize(int byteCount) { + this.maxInMemorySize = byteCount; + } + + /** + * Return the {@link #setMaxInMemorySize configured} byte count limit. + * @since 5.1.11 + */ + public int getMaxInMemorySize() { + return this.maxInMemorySize; + } + + @Override public Flux decode(Publisher input, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { @@ -65,7 +92,7 @@ public abstract class AbstractDataBufferDecoder extends AbstractDecoder { public Mono decodeToMono(Publisher input, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { - return DataBufferUtils.join(input) + return DataBufferUtils.join(input, this.maxInMemorySize) .map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints)); } diff --git a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java index d6d0ccfc271..2fa3cf0a405 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java @@ -25,15 +25,18 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.function.Consumer; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferLimitException; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.DataBufferWrapper; import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.core.io.buffer.LimitedDataBufferList; import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.core.log.LogFormatUtils; import org.springframework.lang.Nullable; @@ -91,12 +94,18 @@ public final class StringDecoder extends AbstractDataBufferDecoder { byte[][] delimiterBytes = getDelimiterBytes(mimeType); + // TODO: Drop Consumer and use bufferUntil with Supplier (reactor-core#1925) + // TODO: Drop doOnDiscard(LimitedDataBufferList.class, ...) (reactor-core#1924) + LimitedDataBufferConsumer limiter = new LimitedDataBufferConsumer(getMaxInMemorySize()); + Flux inputFlux = Flux.defer(() -> { DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes); return Flux.from(input) .concatMapIterable(buffer -> endFrameAfterDelimiter(buffer, matcher)) + .doOnNext(limiter) .bufferUntil(buffer -> buffer instanceof EndFrameBuffer) .map(buffers -> joinAndStrip(buffers, this.stripDelimiter)) + .doOnDiscard(LimitedDataBufferList.class, LimitedDataBufferList::releaseAndClear) .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); }); @@ -279,4 +288,34 @@ public final class StringDecoder extends AbstractDataBufferDecoder { } + /** + * Temporary measure for reactor-core#1925. + * Consumer that adds to a {@link LimitedDataBufferList} to enforce limits. + */ + private static class LimitedDataBufferConsumer implements Consumer { + + private final LimitedDataBufferList bufferList; + + + public LimitedDataBufferConsumer(int maxInMemorySize) { + this.bufferList = new LimitedDataBufferList(maxInMemorySize); + } + + + @Override + public void accept(DataBuffer buffer) { + if (buffer instanceof EndFrameBuffer) { + this.bufferList.clear(); + } + else { + try { + this.bufferList.add(buffer); + } + catch (DataBufferLimitException ex) { + DataBufferUtils.release(buffer); + throw ex; + } + } + } + } } diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferLimitException.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferLimitException.java new file mode 100644 index 00000000000..ee606aed57f --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferLimitException.java @@ -0,0 +1,37 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.core.io.buffer; + +/** + * Exception that indicates the cumulative number of bytes consumed from a + * stream of {@link DataBuffer DataBuffer}'s exceeded some pre-configured limit. + * This can be raised when data buffers are cached and aggregated, e.g. + * {@link DataBufferUtils#join}. Or it could also be raised when data buffers + * have been released but a parsed representation is being aggregated, e.g. async + * parsing with Jackson. + * + * @author Rossen Stoyanchev + * @since 5.1.11 + */ +@SuppressWarnings("serial") +public class DataBufferLimitException extends IllegalStateException { + + + public DataBufferLimitException(String message) { + super(message); + } + +} diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 197e4762180..aaab44b9864 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -525,16 +525,35 @@ public abstract class DataBufferUtils { */ @SuppressWarnings("unchecked") public static Mono join(Publisher dataBuffers) { - Assert.notNull(dataBuffers, "'dataBuffers' must not be null"); + return join(dataBuffers, -1); + } - if (dataBuffers instanceof Mono) { - return (Mono) dataBuffers; + /** + * Variant of {@link #join(Publisher)} that behaves the same way up until + * the specified max number of bytes to buffer. Once the limit is exceeded, + * {@link DataBufferLimitException} is raised. + * @param buffers the data buffers that are to be composed + * @param maxByteCount the max number of bytes to buffer, or -1 for unlimited + * @return a buffer with the aggregated content, possibly an empty Mono if + * the max number of bytes to buffer is exceeded. + * @throws DataBufferLimitException if maxByteCount is exceeded + * @since 5.1.11 + */ + @SuppressWarnings("unchecked") + public static Mono join(Publisher buffers, int maxByteCount) { + Assert.notNull(buffers, "'dataBuffers' must not be null"); + + if (buffers instanceof Mono) { + return (Mono) buffers; } - return Flux.from(dataBuffers) - .collectList() + // TODO: Drop doOnDiscard(LimitedDataBufferList.class, ...) (reactor-core#1924) + + return Flux.from(buffers) + .collect(() -> new LimitedDataBufferList(maxByteCount), LimitedDataBufferList::add) .filter(list -> !list.isEmpty()) .map(list -> list.get(0).factory().join(list)) + .doOnDiscard(LimitedDataBufferList.class, LimitedDataBufferList::releaseAndClear) .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); } diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/LimitedDataBufferList.java b/spring-core/src/main/java/org/springframework/core/io/buffer/LimitedDataBufferList.java new file mode 100644 index 00000000000..fb8c42aeeb0 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/LimitedDataBufferList.java @@ -0,0 +1,157 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.core.io.buffer; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Predicate; + +import reactor.core.publisher.Flux; + +/** + * Custom {@link List} to collect data buffers with and enforce a + * limit on the total number of bytes buffered. For use with "collect" or + * other buffering operators in declarative APIs, e.g. {@link Flux}. + * + *

Adding elements increases the byte count and if the limit is exceeded, + * {@link DataBufferLimitException} is raised. {@link #clear()} resets the + * count. Remove and set are not supported. + * + *

Note: This class does not automatically release the + * buffers it contains. It is usually preferable to use hooks such as + * {@link Flux#doOnDiscard} that also take care of cancel and error signals, + * or otherwise {@link #releaseAndClear()} can be used. + * + * @author Rossen Stoyanchev + * @since 5.1.11 + */ +@SuppressWarnings("serial") +public class LimitedDataBufferList extends ArrayList { + + private final int maxByteCount; + + private int byteCount; + + + public LimitedDataBufferList(int maxByteCount) { + this.maxByteCount = maxByteCount; + } + + + @Override + public boolean add(DataBuffer buffer) { + boolean result = super.add(buffer); + if (result) { + updateCount(buffer.readableByteCount()); + } + return result; + } + + @Override + public void add(int index, DataBuffer buffer) { + super.add(index, buffer); + updateCount(buffer.readableByteCount()); + } + + @Override + public boolean addAll(Collection collection) { + boolean result = super.addAll(collection); + collection.forEach(buffer -> updateCount(buffer.readableByteCount())); + return result; + } + + @Override + public boolean addAll(int index, Collection collection) { + boolean result = super.addAll(index, collection); + collection.forEach(buffer -> updateCount(buffer.readableByteCount())); + return result; + } + + private void updateCount(int bytesToAdd) { + if (this.maxByteCount < 0) { + return; + } + if (bytesToAdd > Integer.MAX_VALUE - this.byteCount) { + raiseLimitException(); + } + else { + this.byteCount += bytesToAdd; + if (this.byteCount > this.maxByteCount) { + raiseLimitException(); + } + } + } + + private void raiseLimitException() { + // Do not release here, it's likely down via doOnDiscard.. + throw new DataBufferLimitException( + "Exceeded limit on max bytes to buffer : " + this.maxByteCount); + } + + @Override + public DataBuffer remove(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + protected void removeRange(int fromIndex, int toIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean removeIf(Predicate filter) { + throw new UnsupportedOperationException(); + } + + @Override + public DataBuffer set(int index, DataBuffer element) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + this.byteCount = 0; + super.clear(); + } + + /** + * Shortcut to {@link DataBufferUtils#release release} all data buffers and + * then {@link #clear()}. + */ + public void releaseAndClear() { + forEach(buf -> { + try { + DataBufferUtils.release(buf); + } + catch (Throwable ex) { + // Keep going.. + } + }); + clear(); + } + +} diff --git a/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java index 980f0a00721..a17ae084c2f 100644 --- a/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java +++ b/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java @@ -29,6 +29,7 @@ import reactor.test.StepVerifier; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferLimitException; import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; @@ -127,6 +128,20 @@ class StringDecoderTests extends AbstractDecoderTests { .verify()); } + @Test + void decodeNewLineWithLimit() { + Flux input = Flux.just( + stringBuffer("abc\n"), + stringBuffer("defg\n"), + stringBuffer("hijkl\n") + ); + this.decoder.setMaxInMemorySize(5); + + testDecode(input, String.class, step -> + step.expectNext("abc", "defg") + .verifyError(DataBufferLimitException.class)); + } + @Test void decodeNewLineIncludeDelimiters() { this.decoder = StringDecoder.allMimeTypes(StringDecoder.DEFAULT_DELIMITERS, false); diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index 07671ba9c92..27067179cdc 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -813,13 +813,27 @@ class DataBufferUtilsTests extends AbstractDataBufferAllocatingTests { Mono result = DataBufferUtils.join(flux); StepVerifier.create(result) - .consumeNextWith(dataBuffer -> { - assertThat(DataBufferTestUtils.dumpString(dataBuffer, StandardCharsets.UTF_8)).isEqualTo("foobarbaz"); - release(dataBuffer); + .consumeNextWith(buf -> { + assertThat(DataBufferTestUtils.dumpString(buf, StandardCharsets.UTF_8)).isEqualTo("foobarbaz"); + release(buf); }) .verifyComplete(); } + @ParameterizedDataBufferAllocatingTest + void joinWithLimit(String displayName, DataBufferFactory bufferFactory) { + super.bufferFactory = bufferFactory; + + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + DataBuffer baz = stringBuffer("baz"); + Flux flux = Flux.just(foo, bar, baz); + Mono result = DataBufferUtils.join(flux, 8); + + StepVerifier.create(result) + .verifyError(DataBufferLimitException.class); + } + @ParameterizedDataBufferAllocatingTest void joinErrors(String displayName, DataBufferFactory bufferFactory) { super.bufferFactory = bufferFactory; diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/LimitedDataBufferListTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/LimitedDataBufferListTests.java new file mode 100644 index 00000000000..eeb816fe274 --- /dev/null +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/LimitedDataBufferListTests.java @@ -0,0 +1,57 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.core.io.buffer; + +import java.nio.charset.StandardCharsets; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * Unit tests for {@link LimitedDataBufferList}. + * @author Rossen Stoyanchev + * @since 5.1.11 + */ +public class LimitedDataBufferListTests { + + private final static DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); + + + @Test + void limitEnforced() { + Assertions.assertThatThrownBy(() -> new LimitedDataBufferList(5).add(toDataBuffer("123456"))) + .isInstanceOf(DataBufferLimitException.class); + } + + @Test + void limitIgnored() { + new LimitedDataBufferList(-1).add(toDataBuffer("123456")); + } + + @Test + void clearResetsCount() { + LimitedDataBufferList list = new LimitedDataBufferList(5); + list.add(toDataBuffer("12345")); + list.clear(); + list.add(toDataBuffer("12345")); + } + + + private static DataBuffer toDataBuffer(String value) { + return bufferFactory.wrap(value.getBytes(StandardCharsets.UTF_8)); + } + +} diff --git a/spring-web/src/main/java/org/springframework/http/codec/FormHttpMessageReader.java b/spring-web/src/main/java/org/springframework/http/codec/FormHttpMessageReader.java index 01c2d30b33b..63e4a10259f 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/FormHttpMessageReader.java +++ b/spring-web/src/main/java/org/springframework/http/codec/FormHttpMessageReader.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ import reactor.core.publisher.Mono; import org.springframework.core.ResolvableType; import org.springframework.core.codec.Hints; +import org.springframework.core.io.buffer.DataBufferLimitException; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.log.LogFormatUtils; import org.springframework.http.MediaType; @@ -62,6 +63,8 @@ public class FormHttpMessageReader extends LoggingCodecSupport private Charset defaultCharset = DEFAULT_CHARSET; + private int maxInMemorySize = 256 * 1024; + /** * Set the default character set to use for reading form data when the @@ -80,6 +83,26 @@ public class FormHttpMessageReader extends LoggingCodecSupport return this.defaultCharset; } + /** + * Set the max number of bytes for input form data. As form data is buffered + * before it is parsed, this helps to limit the amount of buffering. Once + * the limit is exceeded, {@link DataBufferLimitException} is raised. + *

By default this is set to 256K. + * @param byteCount the max number of bytes to buffer, or -1 for unlimited + * @since 5.1.11 + */ + public void setMaxInMemorySize(int byteCount) { + this.maxInMemorySize = byteCount; + } + + /** + * Return the {@link #setMaxInMemorySize configured} byte count limit. + * @since 5.1.11 + */ + public int getMaxInMemorySize() { + return this.maxInMemorySize; + } + @Override public boolean canRead(ResolvableType elementType, @Nullable MediaType mediaType) { @@ -105,7 +128,7 @@ public class FormHttpMessageReader extends LoggingCodecSupport MediaType contentType = message.getHeaders().getContentType(); Charset charset = getMediaTypeCharset(contentType); - return DataBufferUtils.join(message.getBody()) + return DataBufferUtils.join(message.getBody(), this.maxInMemorySize) .map(buffer -> { CharBuffer charBuffer = charset.decode(buffer.asByteBuffer()); String body = charBuffer.toString(); diff --git a/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Decoder.java b/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Decoder.java index 595ce6f2810..bf7f68d8b82 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Decoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Decoder.java @@ -37,6 +37,7 @@ import org.springframework.core.codec.CodecException; import org.springframework.core.codec.DecodingException; import org.springframework.core.codec.Hints; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferLimitException; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.log.LogFormatUtils; import org.springframework.http.codec.HttpMessageDecoder; @@ -59,6 +60,9 @@ import org.springframework.util.MimeType; */ public abstract class AbstractJackson2Decoder extends Jackson2CodecSupport implements HttpMessageDecoder { + private int maxInMemorySize = 256 * 1024; + + /** * Constructor with a Jackson {@link ObjectMapper} to use. */ @@ -67,6 +71,28 @@ public abstract class AbstractJackson2Decoder extends Jackson2CodecSupport imple } + /** + * Set the max number of bytes that can be buffered by this decoder. This + * is either the size of the entire input when decoding as a whole, or the + * size of one top-level JSON object within a JSON stream. When the limit + * is exceeded, {@link DataBufferLimitException} is raised. + *

By default this is set to 256K. + * @param byteCount the max number of bytes to buffer, or -1 for unlimited + * @since 5.1.11 + */ + public void setMaxInMemorySize(int byteCount) { + this.maxInMemorySize = byteCount; + } + + /** + * Return the {@link #setMaxInMemorySize configured} byte count limit. + * @since 5.1.11 + */ + public int getMaxInMemorySize() { + return this.maxInMemorySize; + } + + @Override public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType) { JavaType javaType = getObjectMapper().getTypeFactory().constructType(elementType.getType()); @@ -81,7 +107,7 @@ public abstract class AbstractJackson2Decoder extends Jackson2CodecSupport imple ObjectMapper mapper = getObjectMapper(); Flux tokens = Jackson2Tokenizer.tokenize( - Flux.from(input), mapper.getFactory(), mapper, true); + Flux.from(input), mapper.getFactory(), mapper, true, getMaxInMemorySize()); ObjectReader reader = getObjectReader(elementType, hints); @@ -103,7 +129,7 @@ public abstract class AbstractJackson2Decoder extends Jackson2CodecSupport imple public Mono decodeToMono(Publisher input, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { - return DataBufferUtils.join(input) + return DataBufferUtils.join(input, this.maxInMemorySize) .map(dataBuffer -> decode(dataBuffer, elementType, mimeType, hints)); } diff --git a/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2Tokenizer.java b/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2Tokenizer.java index 19a03677aa0..1846d3e3bf3 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2Tokenizer.java +++ b/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2Tokenizer.java @@ -35,6 +35,7 @@ import reactor.core.publisher.Flux; import org.springframework.core.codec.DecodingException; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferLimitException; import org.springframework.core.io.buffer.DataBufferUtils; /** @@ -61,30 +62,39 @@ final class Jackson2Tokenizer { private int arrayDepth; + private final int maxInMemorySize; + + private int byteCount; + + // TODO: change to ByteBufferFeeder when supported by Jackson // See https://github.com/FasterXML/jackson-core/issues/478 private final ByteArrayFeeder inputFeeder; - private Jackson2Tokenizer( - JsonParser parser, DeserializationContext deserializationContext, boolean tokenizeArrayElements) { + private Jackson2Tokenizer(JsonParser parser, DeserializationContext deserializationContext, + boolean tokenizeArrayElements, int maxInMemorySize) { this.parser = parser; this.deserializationContext = deserializationContext; this.tokenizeArrayElements = tokenizeArrayElements; this.tokenBuffer = new TokenBuffer(parser, deserializationContext); this.inputFeeder = (ByteArrayFeeder) this.parser.getNonBlockingInputFeeder(); + this.maxInMemorySize = maxInMemorySize; } private List tokenize(DataBuffer dataBuffer) { - byte[] bytes = new byte[dataBuffer.readableByteCount()]; + int bufferSize = dataBuffer.readableByteCount(); + byte[] bytes = new byte[bufferSize]; dataBuffer.read(bytes); DataBufferUtils.release(dataBuffer); try { this.inputFeeder.feedInput(bytes, 0, bytes.length); - return parseTokenBufferFlux(); + List result = parseTokenBufferFlux(); + assertInMemorySize(bufferSize, result); + return result; } catch (JsonProcessingException ex) { throw new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex); @@ -174,18 +184,40 @@ final class Jackson2Tokenizer { (token == JsonToken.END_ARRAY && this.arrayDepth == 0)); } + private void assertInMemorySize(int currentBufferSize, List result) { + if (this.maxInMemorySize >= 0) { + if (!result.isEmpty()) { + this.byteCount = 0; + } + else if (currentBufferSize > Integer.MAX_VALUE - this.byteCount) { + raiseLimitException(); + } + else { + this.byteCount += currentBufferSize; + if (this.byteCount > this.maxInMemorySize) { + raiseLimitException(); + } + } + } + } + + private void raiseLimitException() { + throw new DataBufferLimitException( + "Exceeded limit on max bytes per JSON object: " + this.maxInMemorySize); + } + /** * Tokenize the given {@code Flux} into {@code Flux}. * @param dataBuffers the source data buffers * @param jsonFactory the factory to use * @param objectMapper the current mapper instance - * @param tokenizeArrayElements if {@code true} and the "top level" JSON object is + * @param tokenizeArrays if {@code true} and the "top level" JSON object is * an array, each element is returned individually immediately after it is received * @return the resulting token buffers */ public static Flux tokenize(Flux dataBuffers, JsonFactory jsonFactory, - ObjectMapper objectMapper, boolean tokenizeArrayElements) { + ObjectMapper objectMapper, boolean tokenizeArrays, int maxInMemorySize) { try { JsonParser parser = jsonFactory.createNonBlockingByteArrayParser(); @@ -194,7 +226,7 @@ final class Jackson2Tokenizer { context = ((DefaultDeserializationContext) context).createInstance( objectMapper.getDeserializationConfig(), parser, objectMapper.getInjectableValues()); } - Jackson2Tokenizer tokenizer = new Jackson2Tokenizer(parser, context, tokenizeArrayElements); + Jackson2Tokenizer tokenizer = new Jackson2Tokenizer(parser, context, tokenizeArrays, maxInMemorySize); return dataBuffers.concatMapIterable(tokenizer::tokenize).concatWith(tokenizer.endOfInput()); } catch (IOException ex) { diff --git a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java index 37d7ae4d909..bbe43ac65d7 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java @@ -36,6 +36,7 @@ import org.springframework.core.ResolvableType; import org.springframework.core.codec.Decoder; import org.springframework.core.codec.DecodingException; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferLimitException; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -127,7 +128,7 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder decodeToMono(Publisher inputStream, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { - return DataBufferUtils.join(inputStream) + return DataBufferUtils.join(inputStream, this.maxMessageSize) .map(dataBuffer -> decode(dataBuffer, elementType, mimeType, hints)); } @@ -205,8 +206,8 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder this.maxMessageSize) { - throw new DecodingException( - "The number of bytes to read from the incoming stream " + + throw new DataBufferLimitException( + "The number of bytes to read for message " + "(" + this.messageBytesToRead + ") exceeds " + "the configured limit (" + this.maxMessageSize + ")"); } diff --git a/spring-web/src/main/java/org/springframework/http/codec/xml/Jaxb2XmlDecoder.java b/spring-web/src/main/java/org/springframework/http/codec/xml/Jaxb2XmlDecoder.java index 71a233eff71..36d2319f033 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/xml/Jaxb2XmlDecoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/xml/Jaxb2XmlDecoder.java @@ -49,6 +49,7 @@ import org.springframework.core.codec.CodecException; import org.springframework.core.codec.DecodingException; import org.springframework.core.codec.Hints; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferLimitException; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.log.LogFormatUtils; import org.springframework.lang.Nullable; @@ -87,6 +88,8 @@ public class Jaxb2XmlDecoder extends AbstractDecoder { private Function unmarshallerProcessor = Function.identity(); + private int maxInMemorySize = 256 * 1024; + public Jaxb2XmlDecoder() { super(MimeTypeUtils.APPLICATION_XML, MimeTypeUtils.TEXT_XML); @@ -119,6 +122,28 @@ public class Jaxb2XmlDecoder extends AbstractDecoder { return this.unmarshallerProcessor; } + /** + * Set the max number of bytes that can be buffered by this decoder. + * This is either the size of the entire input when decoding as a whole, or when + * using async parsing with Aalto XML, it is the size of one top-level XML tree. + * When the limit is exceeded, {@link DataBufferLimitException} is raised. + *

By default this is set to 256K. + * @param byteCount the max number of bytes to buffer, or -1 for unlimited + * @since 5.1.11 + */ + public void setMaxInMemorySize(int byteCount) { + this.maxInMemorySize = byteCount; + this.xmlEventDecoder.setMaxInMemorySize(byteCount); + } + + /** + * Return the {@link #setMaxInMemorySize configured} byte count limit. + * @since 5.1.11 + */ + public int getMaxInMemorySize() { + return this.maxInMemorySize; + } + @Override public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType) { @@ -153,7 +178,7 @@ public class Jaxb2XmlDecoder extends AbstractDecoder { public Mono decodeToMono(Publisher input, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { - return DataBufferUtils.join(input) + return DataBufferUtils.join(input, this.maxInMemorySize) .map(dataBuffer -> decode(dataBuffer, elementType, mimeType, hints)); } diff --git a/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java b/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java index cb39ba1307c..2305525a099 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java @@ -40,6 +40,7 @@ import reactor.core.publisher.Flux; import org.springframework.core.ResolvableType; import org.springframework.core.codec.AbstractDecoder; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferLimitException; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.lang.Nullable; import org.springframework.util.ClassUtils; @@ -89,26 +90,50 @@ public class XmlEventDecoder extends AbstractDecoder { boolean useAalto = aaltoPresent; + private int maxInMemorySize = 256 * 1024; + public XmlEventDecoder() { super(MimeTypeUtils.APPLICATION_XML, MimeTypeUtils.TEXT_XML); } + /** + * Set the max number of bytes that can be buffered by this decoder. This + * is either the size the entire input when decoding as a whole, or when + * using async parsing via Aalto XML, it is size one top-level XML tree. + * When the limit is exceeded, {@link DataBufferLimitException} is raised. + *

By default this is set to 256K. + * @param byteCount the max number of bytes to buffer, or -1 for unlimited + * @since 5.1.11 + */ + public void setMaxInMemorySize(int byteCount) { + this.maxInMemorySize = byteCount; + } + + /** + * Return the {@link #setMaxInMemorySize configured} byte count limit. + * @since 5.1.11 + */ + public int getMaxInMemorySize() { + return this.maxInMemorySize; + } + + @Override @SuppressWarnings({"rawtypes", "unchecked", "cast"}) // XMLEventReader is Iterator on JDK 9 public Flux decode(Publisher input, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { if (this.useAalto) { - AaltoDataBufferToXmlEvent mapper = new AaltoDataBufferToXmlEvent(); + AaltoDataBufferToXmlEvent mapper = new AaltoDataBufferToXmlEvent(this.maxInMemorySize); return Flux.from(input) .flatMapIterable(mapper) .doFinally(signalType -> mapper.endOfInput()); } else { - return DataBufferUtils.join(input). - flatMapIterable(buffer -> { + return DataBufferUtils.join(input, this.maxInMemorySize) + .flatMapIterable(buffer -> { try { InputStream is = buffer.asInputStream(); Iterator eventReader = inputFactory.createXMLEventReader(is); @@ -140,10 +165,22 @@ public class XmlEventDecoder extends AbstractDecoder { private final XMLEventAllocator eventAllocator = EventAllocatorImpl.getDefaultInstance(); + private final int maxInMemorySize; + + private int byteCount; + + private int elementDepth; + + + public AaltoDataBufferToXmlEvent(int maxInMemorySize) { + this.maxInMemorySize = maxInMemorySize; + } + @Override public List apply(DataBuffer dataBuffer) { try { + increaseByteCount(dataBuffer); this.streamReader.getInputFeeder().feedInput(dataBuffer.asByteBuffer()); List events = new ArrayList<>(); while (true) { @@ -157,8 +194,12 @@ public class XmlEventDecoder extends AbstractDecoder { if (event.isEndDocument()) { break; } + checkDepthAndResetByteCount(event); } } + if (this.maxInMemorySize > 0 && this.byteCount > this.maxInMemorySize) { + raiseLimitException(); + } return events; } catch (XMLStreamException ex) { @@ -169,9 +210,40 @@ public class XmlEventDecoder extends AbstractDecoder { } } + private void increaseByteCount(DataBuffer dataBuffer) { + if (this.maxInMemorySize > 0) { + if (dataBuffer.readableByteCount() > Integer.MAX_VALUE - this.byteCount) { + raiseLimitException(); + } + else { + this.byteCount += dataBuffer.readableByteCount(); + } + } + } + + private void checkDepthAndResetByteCount(XMLEvent event) { + if (this.maxInMemorySize > 0) { + if (event.isStartElement()) { + this.byteCount = this.elementDepth == 1 ? 0 : this.byteCount; + this.elementDepth++; + } + else if (event.isEndElement()) { + this.elementDepth--; + this.byteCount = this.elementDepth == 1 ? 0 : this.byteCount; + } + } + } + + private void raiseLimitException() { + throw new DataBufferLimitException( + "Exceeded limit on max bytes per XML top-level node: " + this.maxInMemorySize); + } + public void endOfInput() { this.streamReader.getInputFeeder().endOfInput(); } } + + } diff --git a/spring-web/src/test/java/org/springframework/http/codec/json/Jackson2TokenizerTests.java b/spring-web/src/test/java/org/springframework/http/codec/json/Jackson2TokenizerTests.java index dbedb4dcee7..aa92701eeb2 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/json/Jackson2TokenizerTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/json/Jackson2TokenizerTests.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.util.List; -import java.util.function.Consumer; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.TreeNode; @@ -36,6 +35,7 @@ import reactor.test.StepVerifier; import org.springframework.core.codec.DecodingException; import org.springframework.core.io.buffer.AbstractLeakCheckingTests; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferLimitException; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; @@ -181,11 +181,68 @@ public class Jackson2TokenizerTests extends AbstractLeakCheckingTests { testTokenize(asList("[1", ",2,", "3]"), asList("1", "2", "3"), true); } + private void testTokenize(List input, List output, boolean tokenize) { + StepVerifier.FirstStep builder = StepVerifier.create(decode(input, tokenize, -1)); + output.forEach(expected -> builder.assertNext(actual -> { + try { + JSONAssert.assertEquals(expected, actual, true); + } + catch (JSONException ex) { + throw new RuntimeException(ex); + } + })); + builder.verifyComplete(); + } + + @Test + public void testLimit() { + + List source = asList("[", + "{", "\"id\":1,\"name\":\"Dan\"", "},", + "{", "\"id\":2,\"name\":\"Ron\"", "},", + "{", "\"id\":3,\"name\":\"Bartholomew\"", "}", + "]"); + + String expected = String.join("", source); + int maxInMemorySize = expected.length(); + + StepVerifier.create(decode(source, false, maxInMemorySize)) + .expectNext(expected) + .verifyComplete(); + + StepVerifier.create(decode(source, false, maxInMemorySize - 1)) + .expectError(DataBufferLimitException.class); + } + + @Test + public void testLimitTokenized() { + + List source = asList("[", + "{", "\"id\":1, \"name\":\"Dan\"", "},", + "{", "\"id\":2, \"name\":\"Ron\"", "},", + "{", "\"id\":3, \"name\":\"Bartholomew\"", "}", + "]"); + + String expected = "{\"id\":3,\"name\":\"Bartholomew\"}"; + int maxInMemorySize = expected.length(); + + StepVerifier.create(decode(source, true, maxInMemorySize)) + .expectNext("{\"id\":1,\"name\":\"Dan\"}") + .expectNext("{\"id\":2,\"name\":\"Ron\"}") + .expectNext(expected) + .verifyComplete(); + + StepVerifier.create(decode(source, true, maxInMemorySize - 1)) + .expectNext("{\"id\":1,\"name\":\"Dan\"}") + .expectNext("{\"id\":2,\"name\":\"Ron\"}") + .verifyError(DataBufferLimitException.class); + } + @Test public void errorInStream() { DataBuffer buffer = stringBuffer("{\"id\":1,\"name\":"); Flux source = Flux.just(buffer).concatWith(Flux.error(new RuntimeException())); - Flux result = Jackson2Tokenizer.tokenize(source, this.jsonFactory, this.objectMapper, true); + Flux result = Jackson2Tokenizer.tokenize(source, this.jsonFactory, this.objectMapper, true, -1); StepVerifier.create(result) .expectError(RuntimeException.class) @@ -195,7 +252,7 @@ public class Jackson2TokenizerTests extends AbstractLeakCheckingTests { @Test // SPR-16521 public void jsonEOFExceptionIsWrappedAsDecodingError() { Flux source = Flux.just(stringBuffer("{\"status\": \"noClosingQuote}")); - Flux tokens = Jackson2Tokenizer.tokenize(source, this.jsonFactory, this.objectMapper, false); + Flux tokens = Jackson2Tokenizer.tokenize(source, this.jsonFactory, this.objectMapper, false, -1); StepVerifier.create(tokens) .expectError(DecodingException.class) @@ -203,12 +260,13 @@ public class Jackson2TokenizerTests extends AbstractLeakCheckingTests { } - private void testTokenize(List source, List expected, boolean tokenizeArrayElements) { + private Flux decode(List source, boolean tokenize, int maxInMemorySize) { + Flux tokens = Jackson2Tokenizer.tokenize( Flux.fromIterable(source).map(this::stringBuffer), - this.jsonFactory, this.objectMapper, tokenizeArrayElements); + this.jsonFactory, this.objectMapper, tokenize, maxInMemorySize); - Flux result = tokens + return tokens .map(tokenBuffer -> { try { TreeNode root = this.objectMapper.readTree(tokenBuffer.asParser()); @@ -218,10 +276,6 @@ public class Jackson2TokenizerTests extends AbstractLeakCheckingTests { throw new UncheckedIOException(ex); } }); - - StepVerifier.FirstStep builder = StepVerifier.create(result); - expected.forEach(s -> builder.assertNext(new JSONAssertConsumer(s))); - builder.verifyComplete(); } private DataBuffer stringBuffer(String value) { @@ -231,24 +285,4 @@ public class Jackson2TokenizerTests extends AbstractLeakCheckingTests { return buffer; } - - private static class JSONAssertConsumer implements Consumer { - - private final String expected; - - JSONAssertConsumer(String expected) { - this.expected = expected; - } - - @Override - public void accept(String s) { - try { - JSONAssert.assertEquals(this.expected, s, true); - } - catch (JSONException ex) { - throw new RuntimeException(ex); - } - } - } - } diff --git a/spring-web/src/test/java/org/springframework/http/codec/xml/XmlEventDecoderTests.java b/spring-web/src/test/java/org/springframework/http/codec/xml/XmlEventDecoderTests.java index 56a271d4577..eb1a7b38910 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/xml/XmlEventDecoderTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/xml/XmlEventDecoderTests.java @@ -28,6 +28,7 @@ import reactor.test.StepVerifier; import org.springframework.core.io.buffer.AbstractLeakCheckingTests; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferLimitException; import static org.assertj.core.api.Assertions.assertThat; @@ -44,11 +45,12 @@ public class XmlEventDecoderTests extends AbstractLeakCheckingTests { private XmlEventDecoder decoder = new XmlEventDecoder(); + @Test public void toXMLEventsAalto() { Flux events = - this.decoder.decode(stringBuffer(XML), null, null, Collections.emptyMap()); + this.decoder.decode(stringBufferMono(XML), null, null, Collections.emptyMap()); StepVerifier.create(events) .consumeNextWith(e -> assertThat(e.isStartDocument()).isTrue()) @@ -69,7 +71,7 @@ public class XmlEventDecoderTests extends AbstractLeakCheckingTests { decoder.useAalto = false; Flux events = - this.decoder.decode(stringBuffer(XML), null, null, Collections.emptyMap()); + this.decoder.decode(stringBufferMono(XML), null, null, Collections.emptyMap()); StepVerifier.create(events) .consumeNextWith(e -> assertThat(e.isStartDocument()).isTrue()) @@ -86,10 +88,32 @@ public class XmlEventDecoderTests extends AbstractLeakCheckingTests { .verify(); } + @Test + public void toXMLEventsWithLimit() { + + this.decoder.setMaxInMemorySize(6); + + Flux source = Flux.just( + "", "", "foofoo", "", "", "barbarbar", "", ""); + + Flux events = this.decoder.decode( + source.map(this::stringBuffer), null, null, Collections.emptyMap()); + + StepVerifier.create(events) + .consumeNextWith(e -> assertThat(e.isStartDocument()).isTrue()) + .consumeNextWith(e -> assertStartElement(e, "pojo")) + .consumeNextWith(e -> assertStartElement(e, "foo")) + .consumeNextWith(e -> assertCharacters(e, "foofoo")) + .consumeNextWith(e -> assertEndElement(e, "foo")) + .consumeNextWith(e -> assertStartElement(e, "bar")) + .expectError(DataBufferLimitException.class) + .verify(); + } + @Test public void decodeErrorAalto() { Flux source = Flux.concat( - stringBuffer(""), + stringBufferMono(""), Flux.error(new RuntimeException())); Flux events = @@ -107,7 +131,7 @@ public class XmlEventDecoderTests extends AbstractLeakCheckingTests { decoder.useAalto = false; Flux source = Flux.concat( - stringBuffer(""), + stringBufferMono(""), Flux.error(new RuntimeException())); Flux events = @@ -133,13 +157,15 @@ public class XmlEventDecoderTests extends AbstractLeakCheckingTests { assertThat(event.asCharacters().getData()).isEqualTo(expectedData); } - private Mono stringBuffer(String value) { - return Mono.defer(() -> { - byte[] bytes = value.getBytes(StandardCharsets.UTF_8); - DataBuffer buffer = this.bufferFactory.allocateBuffer(bytes.length); - buffer.write(bytes); - return Mono.just(buffer); - }); + private DataBuffer stringBuffer(String value) { + byte[] bytes = value.getBytes(StandardCharsets.UTF_8); + DataBuffer buffer = this.bufferFactory.allocateBuffer(bytes.length); + buffer.write(bytes); + return buffer; + } + + private Mono stringBufferMono(String value) { + return Mono.defer(() -> Mono.just(stringBuffer(value))); } }