From ee4f8f8c429dfcf0b22fa8917ed0171f3aaf16fb Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 27 Jan 2025 09:43:25 -0800 Subject: [PATCH] KAFKA-18541: fix flaky KafkaStreamsTelemetryIntegrationTest (#18569) Reviewers: Bill Bejeck --- .../KafkaStreamsTelemetryIntegrationTest.java | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java index 18dbd2fa6d8..750b2381a1a 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java @@ -70,6 +70,7 @@ import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Map; @@ -172,9 +173,11 @@ public class KafkaStreamsTelemetryIntegrationTest { .map(metric -> metric.metricName().tags().get("process-id")) .findFirst().orElseThrow(); - TestUtils.waitForCondition(() -> !TelemetryPlugin.SUBSCRIBED_METRICS.get(mainConsumerInstanceId).isEmpty(), - 30_000, - "Never received subscribed metrics"); + TestUtils.waitForCondition( + () -> !TelemetryPlugin.SUBSCRIBED_METRICS.getOrDefault(mainConsumerInstanceId, Collections.emptyList()).isEmpty(), + 30_000, + "Never received subscribed metrics" + ); final List expectedMetrics = streams.metrics().values().stream().map(Metric::metricName) .filter(metricName -> metricName.tags().containsKey("thread-id")).map(mn -> { final String name = mn.name().replace('-', '.'); @@ -185,9 +188,11 @@ public class KafkaStreamsTelemetryIntegrationTest { final List actualMetrics = new ArrayList<>(TelemetryPlugin.SUBSCRIBED_METRICS.get(mainConsumerInstanceId)); assertEquals(expectedMetrics, actualMetrics); - TestUtils.waitForCondition(() -> !TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId).isEmpty(), - 30_000, - "Never received subscribed metrics"); + TestUtils.waitForCondition( + () -> !TelemetryPlugin.SUBSCRIBED_METRICS.getOrDefault(adminInstanceId, Collections.emptyList()).isEmpty(), + 30_000, + "Never received subscribed metrics" + ); final List actualInstanceMetrics = TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId); final List expectedInstanceMetrics = Arrays.asList( "org.apache.kafka.stream.alive.stream.threads", @@ -267,18 +272,25 @@ public class KafkaStreamsTelemetryIntegrationTest { assertEquals(streamsTaskMetricNames.size(), consumerPassedStreamTaskMetricNames.size()); assertEquals(consumerPassedTaskMetricCount, streamsTaskMetricNames.size()); - try (final KafkaStreams streamsTwo = new KafkaStreams(topology, streamsSecondApplicationProperties)) { streamsTwo.start(); - waitForCondition(() -> KafkaStreams.State.RUNNING == streamsTwo.state() && KafkaStreams.State.RUNNING == streamsOne.state(), - IntegrationTestUtils.DEFAULT_TIMEOUT, - () -> "Kafka Streams one or two never transitioned to a RUNNING state."); /* Now with 2 instances, the tasks will get split amongst both Kafka Streams applications */ - final List streamOneTaskIds = getTaskIdsAsStrings(streamsOne); - final List streamTwoTasksIds = getTaskIdsAsStrings(streamsTwo); + final List streamOneTaskIds = new ArrayList<>(); + final List streamTwoTasksIds = new ArrayList<>(); + waitForCondition(() -> { + streamOneTaskIds.clear(); + streamTwoTasksIds.clear(); + + streamOneTaskIds.addAll(getTaskIdsAsStrings(streamsOne)); + streamTwoTasksIds.addAll(getTaskIdsAsStrings(streamsTwo)); + + return streamOneTaskIds.size() == 2 && streamTwoTasksIds.size() == 2; + }, + "Task assignment did not complete." + ); final List streamsOneTaskMetrics = streamsOne.metrics().values().stream().map(Metric::metricName) .filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList());