From c0b5fcf8437cd64467e49f84bf97591385df7bed Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Mon, 23 Jun 2025 20:16:52 +0200 Subject: [PATCH] KAFKA-19429: Deflake streams_smoke_test (#20019) It looks like we are checking for properties that are not guaranteed under at_least_once, for example, exact counting (not allowing for overcounting). This change relaxes the validation constraint to only check that we counted _at least_ N messages, and our sums come out as _at least_ the expected sum. Reviewers: Matthias J. Sax --- .../kafka/streams/tests/SmokeTestDriver.java | 43 ++++++++++++++----- 1 file changed, 32 insertions(+), 11 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index 68e6c27592c..ad3cead1763 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -48,11 +48,13 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.Random; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiPredicate; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -512,21 +514,27 @@ public class SmokeTestDriver extends SmokeTestUtil { final boolean printResults, final boolean eosEnabled) { final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + final BiPredicate validationPredicate; + if (eosEnabled) { + validationPredicate = Objects::equals; + } else { + validationPredicate = SmokeTestDriver::lessEquals; + } boolean pass; try (final PrintStream resultStream = new PrintStream(byteArrayOutputStream)) { - pass = verifyTAgg(resultStream, inputs, events.get("tagg"), printResults); + pass = verifyTAgg(resultStream, inputs, events.get("tagg"), validationPredicate, printResults); pass &= verifySuppressed(resultStream, "min-suppressed", events, printResults); pass &= verify(resultStream, "min-suppressed", inputs, events, windowedKey -> { final String unwindowedKey = windowedKey.substring(1, windowedKey.length() - 1).replaceAll("@.*", ""); return getMin(unwindowedKey); - }, printResults, eosEnabled); + }, Object::equals, printResults, eosEnabled); pass &= verifySuppressed(resultStream, "sws-suppressed", events, printResults); - pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin, printResults, eosEnabled); - pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax, printResults, eosEnabled); - pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue(), printResults, eosEnabled); - pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum, printResults, eosEnabled); - pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L, printResults, eosEnabled); - pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg, printResults, eosEnabled); + pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin, Object::equals, printResults, eosEnabled); + pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax, Object::equals, printResults, eosEnabled); + pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue(), Object::equals, printResults, eosEnabled); + pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum, validationPredicate, printResults, eosEnabled); + pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L, validationPredicate, printResults, eosEnabled); + pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg, validationPredicate, printResults, eosEnabled); } return new VerificationResult(pass, new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8)); } @@ -536,6 +544,7 @@ public class SmokeTestDriver extends SmokeTestUtil { final Map> inputData, final Map>>> events, final Function keyToExpectation, + final BiPredicate validationPredicate, final boolean printResults, final boolean eosEnabled) { resultStream.printf("verifying topic '%s'%n", topic); @@ -561,12 +570,11 @@ public class SmokeTestDriver extends SmokeTestUtil { } } - for (final Map.Entry>> entry : outputEvents.entrySet()) { final String key = entry.getKey(); final Number expected = keyToExpectation.apply(key); final Number actual = entry.getValue().getLast().value(); - if (!expected.equals(actual)) { + if (!validationPredicate.test(expected, actual)) { resultStream.printf("%s fail: key=%s actual=%s expected=%s%n", topic, key, actual, expected); if (printResults) { @@ -591,6 +599,18 @@ public class SmokeTestDriver extends SmokeTestUtil { } } + private static boolean lessEquals(final Number expected, final Number actual) { + if (actual instanceof Integer && expected instanceof Integer) { + return actual.intValue() >= expected.intValue(); + } else if (actual instanceof Long && expected instanceof Long) { + return actual.longValue() >= expected.longValue(); + } else if (actual instanceof Double && expected instanceof Double) { + return actual.doubleValue() >= expected.doubleValue(); + } else { + throw new IllegalArgumentException("Unexpected type: " + actual.getClass()); + } + } + private static boolean verifySuppressed(final PrintStream resultStream, @SuppressWarnings("SameParameterValue") final String topic, final Map>>> events, @@ -642,6 +662,7 @@ public class SmokeTestDriver extends SmokeTestUtil { private static boolean verifyTAgg(final PrintStream resultStream, final Map> allData, final Map>> taggEvents, + final BiPredicate validationPredicate, final boolean printResults) { resultStream.println("verifying topic tagg"); if (taggEvents == null) { @@ -673,7 +694,7 @@ public class SmokeTestDriver extends SmokeTestUtil { expectedCount = 0L; } - if (entry.getValue().getLast().value().longValue() != expectedCount) { + if (!validationPredicate.test(expectedCount, entry.getValue().getLast().value())) { resultStream.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expectedCount); if (printResults)