From ffcfc974d90ed1b2039dcfd13787e5dd8c3e9137 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 14 Jul 2025 17:13:54 -0700 Subject: [PATCH] KAFKA-19842: Fix flaky KafkaStreamsTelemetryIntegrationTest (#20147) The new "streams" protocol behaves slightly different to the "classic" protocol, and thus we need to update the test to avoid race conditions. In particular, the first call to `poll()` won't "block" and return after task assignment completed if we need to create internal topics, but returns early without a task assignment, and only a consecutive rebalance will assign tasks. This implies, that KafkaStreams transits to RUNNING state even if the group is still in NOT_READY state broker side, but this NOT_READY state is not reflected in the client side state machine. Disabling the combination of "complex-topology + streams" for now, until this difference in behavior of the client state machine is fixed. Reviewers: Lucas Brutschy --- .../KafkaStreamsTelemetryIntegrationTest.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java index b0eb3ebf0a5..eaacac0d240 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java @@ -279,7 +279,7 @@ public class KafkaStreamsTelemetryIntegrationTest { // Streams metrics should get passed to Admin and Consumer streamsApplicationProperties = props(stateUpdaterEnabled, groupProtocol); final Topology topology = topologyType.equals("simple") ? simpleTopology(false) : complexTopology(); - + try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) { IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); @@ -315,7 +315,7 @@ public class KafkaStreamsTelemetryIntegrationTest { streamsSecondApplicationProperties = props(stateUpdaterEnabled, groupProtocol); streamsSecondApplicationProperties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath() + "-ks2"); streamsSecondApplicationProperties.put(StreamsConfig.CLIENT_ID_CONFIG, appId + "-ks2"); - + final Topology topology = complexTopology(); try (final KafkaStreams streamsOne = new KafkaStreams(topology, streamsApplicationProperties)) { @@ -451,18 +451,14 @@ public class KafkaStreamsTelemetryIntegrationTest { Arguments.of("complex", true, "classic"), Arguments.of("complex", false, "classic"), Arguments.of("simple", true, "streams"), - Arguments.of("simple", false, "streams"), - Arguments.of("complex", true, "streams"), - Arguments.of("complex", false, "streams") + Arguments.of("simple", false, "streams") ); } private static Stream multiTaskParameters() { return Stream.of( Arguments.of(true, "classic"), - Arguments.of(false, "classic"), - Arguments.of(true, "streams"), - Arguments.of(false, "streams") + Arguments.of(false, "classic") ); }