KAFKA-19842: Fix flaky KafkaStreamsTelemetryIntegrationTest (#20147)

The new "streams" protocol behaves slightly different to the "classic"
protocol, and thus we need to update the test to avoid race conditions.
In particular, the first call to `poll()` won't "block" and return after
task assignment completed if we need to create internal topics,  but
returns early without a task assignment, and only a consecutive
rebalance will assign tasks.

This implies, that KafkaStreams transits to RUNNING state even if the
group is still in NOT_READY state broker side, but this NOT_READY  state
is not reflected in the client side state machine.

Disabling the combination of "complex-topology + streams" for now,
until this difference in behavior of the client state machine is fixed.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-07-14 17:13:54 -07:00 committed by GitHub
parent c6cf5175f6
commit ffcfc974d9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 4 additions and 8 deletions

View File

@ -279,7 +279,7 @@ public class KafkaStreamsTelemetryIntegrationTest {
// Streams metrics should get passed to Admin and Consumer // Streams metrics should get passed to Admin and Consumer
streamsApplicationProperties = props(stateUpdaterEnabled, groupProtocol); streamsApplicationProperties = props(stateUpdaterEnabled, groupProtocol);
final Topology topology = topologyType.equals("simple") ? simpleTopology(false) : complexTopology(); final Topology topology = topologyType.equals("simple") ? simpleTopology(false) : complexTopology();
try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) { try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
@ -315,7 +315,7 @@ public class KafkaStreamsTelemetryIntegrationTest {
streamsSecondApplicationProperties = props(stateUpdaterEnabled, groupProtocol); streamsSecondApplicationProperties = props(stateUpdaterEnabled, groupProtocol);
streamsSecondApplicationProperties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath() + "-ks2"); streamsSecondApplicationProperties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath() + "-ks2");
streamsSecondApplicationProperties.put(StreamsConfig.CLIENT_ID_CONFIG, appId + "-ks2"); streamsSecondApplicationProperties.put(StreamsConfig.CLIENT_ID_CONFIG, appId + "-ks2");
final Topology topology = complexTopology(); final Topology topology = complexTopology();
try (final KafkaStreams streamsOne = new KafkaStreams(topology, streamsApplicationProperties)) { try (final KafkaStreams streamsOne = new KafkaStreams(topology, streamsApplicationProperties)) {
@ -451,18 +451,14 @@ public class KafkaStreamsTelemetryIntegrationTest {
Arguments.of("complex", true, "classic"), Arguments.of("complex", true, "classic"),
Arguments.of("complex", false, "classic"), Arguments.of("complex", false, "classic"),
Arguments.of("simple", true, "streams"), Arguments.of("simple", true, "streams"),
Arguments.of("simple", false, "streams"), Arguments.of("simple", false, "streams")
Arguments.of("complex", true, "streams"),
Arguments.of("complex", false, "streams")
); );
} }
private static Stream<Arguments> multiTaskParameters() { private static Stream<Arguments> multiTaskParameters() {
return Stream.of( return Stream.of(
Arguments.of(true, "classic"), Arguments.of(true, "classic"),
Arguments.of(false, "classic"), Arguments.of(false, "classic")
Arguments.of(true, "streams"),
Arguments.of(false, "streams")
); );
} }