KAFKA-12506: Strengthen AdjustStreamThreadCountTest with stateful counting and higher throughput (#20540)

Add count store and output topic; produce 1,000 records across 50 keys
to better exercise concurrency.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Deep Golani 2025-09-29 11:23:05 -04:00 committed by GitHub
parent 92169b8f08
commit 71c5a426b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 41 additions and 2 deletions

View File

@ -17,6 +17,9 @@
package org.apache.kafka.streams.integration; package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.LogCaptureAppender;
@ -28,6 +31,9 @@ import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThr
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorContext;
@ -93,6 +99,7 @@ public class AdjustStreamThreadCountTest {
private final List<KafkaStreams.State> stateTransitionHistory = new ArrayList<>(); private final List<KafkaStreams.State> stateTransitionHistory = new ArrayList<>();
private static String inputTopic; private static String inputTopic;
private static String outputTopic;
private static StreamsBuilder builder; private static StreamsBuilder builder;
private static Properties properties; private static Properties properties;
private static String appId = ""; private static String appId = "";
@ -103,10 +110,21 @@ public class AdjustStreamThreadCountTest {
final String testId = safeUniqueTestName(testInfo); final String testId = safeUniqueTestName(testInfo);
appId = "appId_" + testId; appId = "appId_" + testId;
inputTopic = "input" + testId; inputTopic = "input" + testId;
outputTopic = "output" + testId;
IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic); IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
builder = new StreamsBuilder(); builder = new StreamsBuilder();
builder.stream(inputTopic); // Build a simple stateful topology to exercise concurrency with state stores
final KStream<String, String> source = builder.stream(inputTopic);
final KTable<String, Long> counts = source
.groupByKey()
.count(Named.as("counts"), Materialized.as("counts-store"));
counts
.toStream()
.mapValues(Object::toString)
.to(outputTopic);
produceTestRecords(inputTopic, CLUSTER);
properties = mkObjectProperties( properties = mkObjectProperties(
mkMap( mkMap(
@ -121,6 +139,21 @@ public class AdjustStreamThreadCountTest {
); );
} }
private void produceTestRecords(final String inputTopic, final EmbeddedKafkaCluster cluster) {
final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
props.put(ProducerConfig.CLIENT_ID_CONFIG, "test-client");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
for (int i = 0; i < 1000; i++) {
final String key = "key-" + (i % 50);
final String value = "value-" + i;
producer.send(new ProducerRecord<>(inputTopic, key, value));
}
}
}
private void startStreamsAndWaitForRunning(final KafkaStreams kafkaStreams) throws InterruptedException { private void startStreamsAndWaitForRunning(final KafkaStreams kafkaStreams) throws InterruptedException {
kafkaStreams.start(); kafkaStreams.start();
waitForRunning(); waitForRunning();
@ -251,6 +284,12 @@ public class AdjustStreamThreadCountTest {
assertTrue(latch.await(30, TimeUnit.SECONDS)); assertTrue(latch.await(30, TimeUnit.SECONDS));
one.join(); one.join();
two.join(); two.join();
waitForCondition(
() -> kafkaStreams.metadataForLocalThreads().size() == oldThreadCount &&
kafkaStreams.state() == KafkaStreams.State.RUNNING,
DEFAULT_DURATION.toMillis(),
"Kafka Streams did not stabilize at the expected thread count and RUNNING state."
);
threadMetadata = kafkaStreams.metadataForLocalThreads(); threadMetadata = kafkaStreams.metadataForLocalThreads();
assertThat(threadMetadata.size(), equalTo(oldThreadCount)); assertThat(threadMetadata.size(), equalTo(oldThreadCount));