KAFKA-19429: Deflake streams_smoke_test (#20019)

It looks like we are checking for properties that are not guaranteed
under at_least_once, for example, exact counting (not allowing for
overcounting).

This change relaxes the validation constraint to only check that we
counted _at least_ N messages, and our sums come out as _at least_ the
expected sum.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Lucas Brutschy 2025-06-23 20:16:52 +02:00 committed by GitHub
parent 261e861340
commit 4fedffd282
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 32 additions and 11 deletions

View File

@ -48,11 +48,13 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -512,21 +514,27 @@ public class SmokeTestDriver extends SmokeTestUtil {
final boolean printResults,
final boolean eosEnabled) {
final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
final BiPredicate<Number, Number> validationPredicate;
if (eosEnabled) {
validationPredicate = Objects::equals;
} else {
validationPredicate = SmokeTestDriver::lessEquals;
}
boolean pass;
try (final PrintStream resultStream = new PrintStream(byteArrayOutputStream)) {
pass = verifyTAgg(resultStream, inputs, events.get("tagg"), printResults);
pass = verifyTAgg(resultStream, inputs, events.get("tagg"), validationPredicate, printResults);
pass &= verifySuppressed(resultStream, "min-suppressed", events, printResults);
pass &= verify(resultStream, "min-suppressed", inputs, events, windowedKey -> {
final String unwindowedKey = windowedKey.substring(1, windowedKey.length() - 1).replaceAll("@.*", "");
return getMin(unwindowedKey);
}, printResults, eosEnabled);
}, Object::equals, printResults, eosEnabled);
pass &= verifySuppressed(resultStream, "sws-suppressed", events, printResults);
pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin, printResults, eosEnabled);
pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax, printResults, eosEnabled);
pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue(), printResults, eosEnabled);
pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum, printResults, eosEnabled);
pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L, printResults, eosEnabled);
pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg, printResults, eosEnabled);
pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin, Object::equals, printResults, eosEnabled);
pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax, Object::equals, printResults, eosEnabled);
pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue(), Object::equals, printResults, eosEnabled);
pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum, validationPredicate, printResults, eosEnabled);
pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L, validationPredicate, printResults, eosEnabled);
pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg, validationPredicate, printResults, eosEnabled);
}
return new VerificationResult(pass, new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8));
}
@ -536,6 +544,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
final Map<String, Set<Integer>> inputData,
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
final Function<String, Number> keyToExpectation,
final BiPredicate<Number, Number> validationPredicate,
final boolean printResults,
final boolean eosEnabled) {
resultStream.printf("verifying topic '%s'%n", topic);
@ -561,12 +570,11 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
}
for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : outputEvents.entrySet()) {
final String key = entry.getKey();
final Number expected = keyToExpectation.apply(key);
final Number actual = entry.getValue().getLast().value();
if (!expected.equals(actual)) {
if (!validationPredicate.test(expected, actual)) {
resultStream.printf("%s fail: key=%s actual=%s expected=%s%n", topic, key, actual, expected);
if (printResults) {
@ -591,6 +599,18 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
}
private static boolean lessEquals(final Number expected, final Number actual) {
if (actual instanceof Integer && expected instanceof Integer) {
return actual.intValue() >= expected.intValue();
} else if (actual instanceof Long && expected instanceof Long) {
return actual.longValue() >= expected.longValue();
} else if (actual instanceof Double && expected instanceof Double) {
return actual.doubleValue() >= expected.doubleValue();
} else {
throw new IllegalArgumentException("Unexpected type: " + actual.getClass());
}
}
private static boolean verifySuppressed(final PrintStream resultStream,
@SuppressWarnings("SameParameterValue") final String topic,
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
@ -642,6 +662,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
private static boolean verifyTAgg(final PrintStream resultStream,
final Map<String, Set<Integer>> allData,
final Map<String, LinkedList<ConsumerRecord<String, Number>>> taggEvents,
final BiPredicate<Number, Number> validationPredicate,
final boolean printResults) {
resultStream.println("verifying topic tagg");
if (taggEvents == null) {
@ -673,7 +694,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
expectedCount = 0L;
}
if (entry.getValue().getLast().value().longValue() != expectedCount) {
if (!validationPredicate.test(expectedCount, entry.getValue().getLast().value())) {
resultStream.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expectedCount);
if (printResults)