From 71c5a426b8a02b80b5b7d2494ac278467d2c2f29 Mon Sep 17 00:00:00 2001 From: Deep Golani <54791570+deepgolani4@users.noreply.github.com> Date: Mon, 29 Sep 2025 11:23:05 -0400 Subject: [PATCH] KAFKA-12506: Strengthen AdjustStreamThreadCountTest with stateful counting and higher throughput (#20540) Add count store and output topic; produce 1,000 records across 50 keys to better exercise concurrency. Reviewers: Matthias J. Sax --- .../AdjustStreamThreadCountTest.java | 43 ++++++++++++++++++- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java index df96837458c..d9779b5a9c5 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java @@ -17,6 +17,9 @@ package org.apache.kafka.streams.integration; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.LogCaptureAppender; @@ -28,6 +31,9 @@ import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThr import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; @@ -93,6 +99,7 @@ public class AdjustStreamThreadCountTest { private final List stateTransitionHistory = new ArrayList<>(); private static String inputTopic; + private static String outputTopic; private static StreamsBuilder builder; private static Properties properties; private static String appId = ""; @@ -103,10 +110,21 @@ public class AdjustStreamThreadCountTest { final String testId = safeUniqueTestName(testInfo); appId = "appId_" + testId; inputTopic = "input" + testId; + outputTopic = "output" + testId; IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic); builder = new StreamsBuilder(); - builder.stream(inputTopic); + // Build a simple stateful topology to exercise concurrency with state stores + final KStream source = builder.stream(inputTopic); + final KTable counts = source + .groupByKey() + .count(Named.as("counts"), Materialized.as("counts-store")); + counts + .toStream() + .mapValues(Object::toString) + .to(outputTopic); + + produceTestRecords(inputTopic, CLUSTER); properties = mkObjectProperties( mkMap( @@ -121,6 +139,21 @@ public class AdjustStreamThreadCountTest { ); } + private void produceTestRecords(final String inputTopic, final EmbeddedKafkaCluster cluster) { + final Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "test-client"); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + try (KafkaProducer producer = new KafkaProducer<>(props)) { + for (int i = 0; i < 1000; i++) { + final String key = "key-" + (i % 50); + final String value = "value-" + i; + producer.send(new ProducerRecord<>(inputTopic, key, value)); + } + } + } + private void startStreamsAndWaitForRunning(final KafkaStreams kafkaStreams) throws InterruptedException { kafkaStreams.start(); waitForRunning(); @@ -251,7 +284,13 @@ public class AdjustStreamThreadCountTest { assertTrue(latch.await(30, TimeUnit.SECONDS)); one.join(); two.join(); - + waitForCondition( + () -> kafkaStreams.metadataForLocalThreads().size() == oldThreadCount && + kafkaStreams.state() == KafkaStreams.State.RUNNING, + DEFAULT_DURATION.toMillis(), + "Kafka Streams did not stabilize at the expected thread count and RUNNING state." + ); + threadMetadata = kafkaStreams.metadataForLocalThreads(); assertThat(threadMetadata.size(), equalTo(oldThreadCount)); } catch (final AssertionError e) {