diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java index a528271d1ab..df389fa5652 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java @@ -26,7 +26,6 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; import java.math.BigDecimal; import java.math.BigInteger; -import java.math.RoundingMode; import java.nio.ByteBuffer; import java.text.CharacterIterator; import java.text.DateFormat; @@ -1020,6 +1019,7 @@ public class Values { return parseAsTemporal(token); } + private static SchemaAndValue parseAsNumber(String token) { // Try to parse as a number ... BigDecimal decimal = new BigDecimal(token); @@ -1040,12 +1040,20 @@ public class Values { } } + private static boolean isWholeNumber(BigDecimal bd) { + return bd.signum() == 0 || bd.scale() <= 0 || bd.stripTrailingZeros().scale() <= 0; + } + + private static final BigDecimal BIGGER_THAN_LONG = new BigDecimal("1e19"); + private static SchemaAndValue parseAsExactDecimal(BigDecimal decimal) { - BigDecimal ceil = decimal.setScale(0, RoundingMode.CEILING); - BigDecimal floor = decimal.setScale(0, RoundingMode.FLOOR); - if (ceil.equals(floor)) { - BigInteger num = ceil.toBigIntegerExact(); - if (ceil.precision() >= 19 && (num.compareTo(LONG_MIN) < 0 || num.compareTo(LONG_MAX) > 0)) { + BigDecimal abs = decimal.abs(); + if (abs.compareTo(BIGGER_THAN_LONG) > 0 || (abs.compareTo(BigDecimal.ONE) < 0 && abs.compareTo(BigDecimal.ZERO) != 0)) { + return null; + } + if (isWholeNumber(decimal)) { + BigInteger num = decimal.toBigIntegerExact(); + if (num.compareTo(LONG_MIN) < 0 || num.compareTo(LONG_MAX) > 0) { return null; } long integral = num.longValue(); diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java index c9b6ef047e4..e552e6f4de0 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java @@ -1155,6 +1155,17 @@ public class ValuesTest { assertEquals(new SchemaAndValue(Schema.FLOAT32_SCHEMA, 66000.0008f), Values.parseString("66000.0008")); } + @Test + public void avoidCpuAndMemoryIssuesConvertingExtremeBigDecimals() { + String parsingBig = "1e+100000000"; // new BigDecimal().setScale(0, RoundingMode.FLOOR) takes around two minutes and uses 3GB; + BigDecimal valueBig = new BigDecimal(parsingBig); + assertEquals(new SchemaAndValue(Decimal.schema(-100000000), valueBig), Values.parseString(parsingBig), "parsing number that's too big"); + + String parsingSmall = "1e-100000000"; + BigDecimal valueSmall = new BigDecimal(parsingSmall); + assertEquals(new SchemaAndValue(Schema.FLOAT32_SCHEMA, (float) valueSmall.doubleValue()), Values.parseString(parsingSmall), "parsing number that's too big, strictly this should return a bigdecimal"); + } + protected void assertParsed(String input) { assertParsed(input, input); }