MINOR: fix Kafka Streams "smoke test" pass criteria (#18835)

Reviewers: Bill Bejeck <bill@confluent.io>, Bruno Cadonna <bruno@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-02-19 14:33:31 -08:00 committed by GitHub
parent 538a60e1b3
commit 9f23b25f6e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 46 additions and 28 deletions

View File

@ -77,7 +77,7 @@ public class SmokeTestDriverIntegrationTest {
try {
final Map<String, Set<Integer>> allData =
generate(bootstrapServers, numKeys, maxRecordsPerKey, Duration.ofSeconds(20));
result = verify(bootstrapServers, allData, maxRecordsPerKey);
result = verify(bootstrapServers, allData, maxRecordsPerKey, false);
} catch (final Exception ex) {
this.exception = ex;

View File

@ -372,7 +372,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
public static VerificationResult verify(final String kafka,
final Map<String, Set<Integer>> inputs,
final int maxRecordsPerKey) {
final int maxRecordsPerKey,
final boolean eosEnabled) {
final Properties props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
@ -399,7 +400,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) {
final ConsumerRecords<String, Number> records = consumer.poll(Duration.ofSeconds(5));
if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
verificationResult = verifyAll(inputs, events, false);
verificationResult = verifyAll(inputs, events, false, eosEnabled);
if (verificationResult.passed()) {
break;
} else if (retry++ > MAX_RECORD_EMPTY_RETRIES) {
@ -465,7 +466,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
// give it one more try if it's not already passing.
if (!verificationResult.passed()) {
verificationResult = verifyAll(inputs, events, true);
verificationResult = verifyAll(inputs, events, true, eosEnabled);
}
success &= verificationResult.passed();
@ -508,7 +509,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
private static VerificationResult verifyAll(final Map<String, Set<Integer>> inputs,
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
final boolean printResults) {
final boolean printResults,
final boolean eosEnabled) {
final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
boolean pass;
try (final PrintStream resultStream = new PrintStream(byteArrayOutputStream)) {
@ -517,14 +519,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
pass &= verify(resultStream, "min-suppressed", inputs, events, windowedKey -> {
final String unwindowedKey = windowedKey.substring(1, windowedKey.length() - 1).replaceAll("@.*", "");
return getMin(unwindowedKey);
}, printResults);
}, printResults, eosEnabled);
pass &= verifySuppressed(resultStream, "sws-suppressed", events, printResults);
pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin, printResults);
pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax, printResults);
pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue(), printResults);
pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum, printResults);
pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L, printResults);
pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg, printResults);
pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin, printResults, eosEnabled);
pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax, printResults, eosEnabled);
pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue(), printResults, eosEnabled);
pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum, printResults, eosEnabled);
pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L, printResults, eosEnabled);
pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg, printResults, eosEnabled);
}
return new VerificationResult(pass, new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8));
}
@ -534,20 +536,32 @@ public class SmokeTestDriver extends SmokeTestUtil {
final Map<String, Set<Integer>> inputData,
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
final Function<String, Number> keyToExpectation,
final boolean printResults) {
final boolean printResults,
final boolean eosEnabled) {
resultStream.printf("verifying topic '%s'%n", topic);
final Map<String, LinkedList<ConsumerRecord<String, Number>>> observedInputEvents = events.get("data");
final Map<String, LinkedList<ConsumerRecord<String, Number>>> outputEvents = events.getOrDefault(topic, emptyMap());
if (outputEvents.isEmpty()) {
resultStream.println(topic + " is empty");
resultStream.println("fail: missing result data; topic '" + topic + "' is empty, expected " + inputData.size() + " keys");
return false;
} else {
resultStream.printf("verifying %s with %d keys%n", topic, outputEvents.size());
if (outputEvents.size() != inputData.size()) {
resultStream.printf("fail: resultCount=%d expectedCount=%s%n\tresult=%s%n\texpected=%s%n",
outputEvents.size(), inputData.size(), outputEvents.keySet(), inputData.keySet());
return false;
if (outputEvents.size() < inputData.size()) {
resultStream.println("fail: missing result data; got " + inputData.size() + " keys, expected: " + outputEvents.size() + " keys");
return false;
} else {
if (eosEnabled) {
resultStream.printf("fail: resultCount=%d expectedCount=%s%n\tresult=%s%n\texpected=%s%n",
outputEvents.size(), inputData.size(), outputEvents.keySet(), inputData.keySet());
return false;
} else {
resultStream.printf("duplicated detected (ok for ALOS): resultCount=%d expectedCount=%s%n\tresult=%s%n\texpected=%s%n",
outputEvents.size(), inputData.size(), outputEvents.keySet(), inputData.keySet());
}
}
}
for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : outputEvents.entrySet()) {
final String key = entry.getKey();
final Number expected = keyToExpectation.apply(key);
@ -577,7 +591,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
}
private static boolean verifySuppressed(final PrintStream resultStream,
@SuppressWarnings("SameParameterValue") final String topic,
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
@ -630,14 +643,17 @@ public class SmokeTestDriver extends SmokeTestUtil {
final Map<String, Set<Integer>> allData,
final Map<String, LinkedList<ConsumerRecord<String, Number>>> taggEvents,
final boolean printResults) {
resultStream.println("verifying topic tagg");
if (taggEvents == null) {
resultStream.println("tagg is missing");
resultStream.println("fail: missing result data; tagg is missing, expected: " + allData.size() + " keys");
return false;
} else if (taggEvents.isEmpty()) {
resultStream.println("tagg is empty");
resultStream.println("fail: missing result data; tagg is empty, expected: " + allData.size() + " keys");
return false;
} else {
resultStream.println("verifying tagg");
if (taggEvents.size() < allData.size()) {
resultStream.println("fail: missing result data; got " + taggEvents.size() + " keys, expected: " + allData.size() + " keys");
}
// generate expected answer
final Map<String, Long> expected = new HashMap<>();

View File

@ -86,7 +86,12 @@ public class StreamsSmokeTest {
// do their bounces, etc.
final Map<String, Set<Integer>> allData =
generate(kafka, numKeys, maxRecordsPerKey, Duration.ofSeconds(90));
SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
SmokeTestDriver.verify(
kafka,
allData,
maxRecordsPerKey,
StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee)
);
}
break;
case "process":

View File

@ -107,7 +107,4 @@ class StreamsSmokeTest(KafkaTest):
processor3.stop()
if crash and processing_guarantee == 'at_least_once':
self.driver.node.account.ssh("grep -E 'SUCCESS|PROCESSED-MORE-THAN-GENERATED' %s" % self.driver.STDOUT_FILE, allow_fail=False)
else:
self.driver.node.account.ssh("grep SUCCESS %s" % self.driver.STDOUT_FILE, allow_fail=False)
self.driver.node.account.ssh("grep SUCCESS %s" % self.driver.STDOUT_FILE, allow_fail=False)