recfactor to reduce the complexity

This commit is contained in:
Jinhe Zhang 2025-08-25 14:48:26 -04:00
parent 9ef493983c
commit d56ccebafc
1 changed files with 142 additions and 84 deletions

View File

@ -16,6 +16,32 @@
*/
package org.apache.kafka.streams.tests;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import static java.util.Collections.emptyMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -36,33 +62,6 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
public class SmokeTestDriver extends SmokeTestUtil {
@ -381,89 +380,150 @@ public class SmokeTestDriver extends SmokeTestUtil {
final Map<String, Set<Integer>> inputs,
final int maxRecordsPerKey,
final boolean eosEnabled) {
final Properties props = createConsumerProperties(kafka);
try (final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props)) {
final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
final int recordsGenerated = inputs.size() * maxRecordsPerKey;
final RecordProcessingState state = new RecordProcessingState(recordsGenerated);
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
final long start = System.currentTimeMillis();
final VerificationResult verificationResult = consumeAndProcessRecords(consumer, inputs, events, state, start, eosEnabled);
final VerificationResult eosResult = performEosVerification(eosEnabled, kafka);
if (!eosResult.passed()) {
return eosResult;
}
return validateAndReportResults(inputs, events, state, verificationResult, start, eosEnabled);
}
}
private static Properties createConsumerProperties(final String kafka) {
final Properties props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NumberDeserializer.class);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
return props;
}
final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
final int recordsGenerated = inputs.size() * maxRecordsPerKey;
private static class RecordProcessingState {
final int recordsGenerated;
int recordsProcessed = 0;
final Map<String, AtomicInteger> processed =
Stream.of(NUMERIC_VALUE_TOPICS)
.collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
VerificationResult verificationResult = new VerificationResult(false, "no results yet");
int retry = 0;
final long start = System.currentTimeMillis();
final Map<String, AtomicInteger> processed;
RecordProcessingState(final int recordsGenerated) {
this.recordsGenerated = recordsGenerated;
this.processed = Stream.of(NUMERIC_VALUE_TOPICS)
.collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
}
}
private static VerificationResult consumeAndProcessRecords(
final KafkaConsumer<String, Number> consumer,
final Map<String, Set<Integer>> inputs,
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
final RecordProcessingState state,
final long start,
final boolean eosEnabled) {
VerificationResult verificationResult = new VerificationResult(false, "no results yet");
while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) {
final ConsumerRecords<String, Number> records = consumer.poll(Duration.ofSeconds(5));
if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
verificationResult = verifyAll(inputs, events, false, eosEnabled);
if (verificationResult.passed()) {
if (records.isEmpty() && state.recordsProcessed >= state.recordsGenerated) {
verificationResult = handleEmptyRecords(inputs, events, state, eosEnabled);
if (verificationResult.passed() || state.retry > MAX_RECORD_EMPTY_RETRIES) {
break;
} else if (retry++ > MAX_RECORD_EMPTY_RETRIES) {
System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries.");
break;
} else {
System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying..." + retry);
}
} else {
System.out.println(Instant.now() + " Get some more results from " + records.partitions() + ", resetting retry.");
retry = 0;
for (final ConsumerRecord<String, Number> record : records) {
final String key = record.key();
final String topic = record.topic();
processed.get(topic).incrementAndGet();
if (topic.equals("echo")) {
recordsProcessed++;
if (recordsProcessed % 100 == 0) {
System.out.println("Echo records processed = " + recordsProcessed);
}
}
events.computeIfAbsent(topic, t -> new HashMap<>())
.computeIfAbsent(key, k -> new LinkedList<>())
.add(record);
}
System.out.println(processed);
processRecords(records, events, state);
}
}
consumer.close();
return verificationResult;
}
final VerificationResult eosResult = performEosVerification(eosEnabled, kafka);
if (!eosResult.passed()) {
return eosResult;
private static VerificationResult handleEmptyRecords(
final Map<String, Set<Integer>> inputs,
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
final RecordProcessingState state,
final boolean eosEnabled) {
final VerificationResult result = verifyAll(inputs, events, false, eosEnabled);
if (result.passed()) {
return result;
}
state.retry++;
if (state.retry > MAX_RECORD_EMPTY_RETRIES) {
System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries.");
} else {
System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying..." + state.retry);
}
return result;
}
private static void processRecords(
final ConsumerRecords<String, Number> records,
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
final RecordProcessingState state) {
System.out.println(Instant.now() + " Get some more results from " + records.partitions() + ", resetting retry.");
state.retry = 0;
for (final ConsumerRecord<String, Number> record : records) {
final String key = record.key();
final String topic = record.topic();
state.processed.get(topic).incrementAndGet();
if (topic.equals("echo")) {
state.recordsProcessed++;
if (state.recordsProcessed % 100 == 0) {
System.out.println("Echo records processed = " + state.recordsProcessed);
}
}
events.computeIfAbsent(topic, t -> new HashMap<>())
.computeIfAbsent(key, k -> new LinkedList<>())
.add(record);
}
System.out.println(state.processed);
}
private static VerificationResult validateAndReportResults(
final Map<String, Set<Integer>> inputs,
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
final RecordProcessingState state,
VerificationResult verificationResult,
final long start,
final boolean eosEnabled) {
final long finished = System.currentTimeMillis() - start;
System.out.println("Verification time=" + finished);
System.out.println("-------------------");
System.out.println("Result Verification");
System.out.println("-------------------");
System.out.println("recordGenerated=" + recordsGenerated);
System.out.println("recordProcessed=" + recordsProcessed);
System.out.println("recordGenerated=" + state.recordsGenerated);
System.out.println("recordProcessed=" + state.recordsProcessed);
if (recordsProcessed > recordsGenerated) {
if (state.recordsProcessed > state.recordsGenerated) {
System.out.println("PROCESSED-MORE-THAN-GENERATED");
} else if (recordsProcessed < recordsGenerated) {
} else if (state.recordsProcessed < state.recordsGenerated) {
System.out.println("PROCESSED-LESS-THAN-GENERATED");
}
final Map<String, Set<Number>> received = parseRecordsForEchoTopic(events);
boolean success = inputs.equals(received);
if (success) {
@ -471,19 +531,17 @@ public class SmokeTestDriver extends SmokeTestUtil {
} else {
int missedCount = 0;
for (final Map.Entry<String, Set<Integer>> entry : inputs.entrySet()) {
missedCount += received.get(entry.getKey()).size();
missedCount += received.getOrDefault(entry.getKey(), Collections.emptySet()).size();
}
System.out.println("missedRecords=" + missedCount);
}
// give it one more try if it's not already passing.
if (!verificationResult.passed()) {
verificationResult = verifyAll(inputs, events, true, eosEnabled);
}
success &= verificationResult.passed();
System.out.println(verificationResult.result());
System.out.println(success ? "SUCCESS" : "FAILURE");
return verificationResult;
}
@ -573,7 +631,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
} else {
if (outputEvents.size() != inputData.size()) {
if (outputEvents.size() < inputData.size()) {
resultStream.println("fail: missing result data; got " + inputData.size() + " keys, expected: " + outputEvents.size() + " keys");
resultStream.println("fail: missing result data; got " + outputEvents.size() + " keys, expected: " + inputData.size() + " keys");
return false;
} else {
if (eosEnabled) {