KAFKA-18541: fix flaky KafkaStreamsTelemetryIntegrationTest (#18569)

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-01-27 09:43:25 -08:00 committed by GitHub
parent d001b47093
commit ee4f8f8c42
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 24 additions and 12 deletions

View File

@ -70,6 +70,7 @@ import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
@ -172,9 +173,11 @@ public class KafkaStreamsTelemetryIntegrationTest {
.map(metric -> metric.metricName().tags().get("process-id")) .map(metric -> metric.metricName().tags().get("process-id"))
.findFirst().orElseThrow(); .findFirst().orElseThrow();
TestUtils.waitForCondition(() -> !TelemetryPlugin.SUBSCRIBED_METRICS.get(mainConsumerInstanceId).isEmpty(), TestUtils.waitForCondition(
30_000, () -> !TelemetryPlugin.SUBSCRIBED_METRICS.getOrDefault(mainConsumerInstanceId, Collections.emptyList()).isEmpty(),
"Never received subscribed metrics"); 30_000,
"Never received subscribed metrics"
);
final List<String> expectedMetrics = streams.metrics().values().stream().map(Metric::metricName) final List<String> expectedMetrics = streams.metrics().values().stream().map(Metric::metricName)
.filter(metricName -> metricName.tags().containsKey("thread-id")).map(mn -> { .filter(metricName -> metricName.tags().containsKey("thread-id")).map(mn -> {
final String name = mn.name().replace('-', '.'); final String name = mn.name().replace('-', '.');
@ -185,9 +188,11 @@ public class KafkaStreamsTelemetryIntegrationTest {
final List<String> actualMetrics = new ArrayList<>(TelemetryPlugin.SUBSCRIBED_METRICS.get(mainConsumerInstanceId)); final List<String> actualMetrics = new ArrayList<>(TelemetryPlugin.SUBSCRIBED_METRICS.get(mainConsumerInstanceId));
assertEquals(expectedMetrics, actualMetrics); assertEquals(expectedMetrics, actualMetrics);
TestUtils.waitForCondition(() -> !TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId).isEmpty(), TestUtils.waitForCondition(
30_000, () -> !TelemetryPlugin.SUBSCRIBED_METRICS.getOrDefault(adminInstanceId, Collections.emptyList()).isEmpty(),
"Never received subscribed metrics"); 30_000,
"Never received subscribed metrics"
);
final List<String> actualInstanceMetrics = TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId); final List<String> actualInstanceMetrics = TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId);
final List<String> expectedInstanceMetrics = Arrays.asList( final List<String> expectedInstanceMetrics = Arrays.asList(
"org.apache.kafka.stream.alive.stream.threads", "org.apache.kafka.stream.alive.stream.threads",
@ -267,18 +272,25 @@ public class KafkaStreamsTelemetryIntegrationTest {
assertEquals(streamsTaskMetricNames.size(), consumerPassedStreamTaskMetricNames.size()); assertEquals(streamsTaskMetricNames.size(), consumerPassedStreamTaskMetricNames.size());
assertEquals(consumerPassedTaskMetricCount, streamsTaskMetricNames.size()); assertEquals(consumerPassedTaskMetricCount, streamsTaskMetricNames.size());
try (final KafkaStreams streamsTwo = new KafkaStreams(topology, streamsSecondApplicationProperties)) { try (final KafkaStreams streamsTwo = new KafkaStreams(topology, streamsSecondApplicationProperties)) {
streamsTwo.start(); streamsTwo.start();
waitForCondition(() -> KafkaStreams.State.RUNNING == streamsTwo.state() && KafkaStreams.State.RUNNING == streamsOne.state(),
IntegrationTestUtils.DEFAULT_TIMEOUT,
() -> "Kafka Streams one or two never transitioned to a RUNNING state.");
/* /*
Now with 2 instances, the tasks will get split amongst both Kafka Streams applications Now with 2 instances, the tasks will get split amongst both Kafka Streams applications
*/ */
final List<String> streamOneTaskIds = getTaskIdsAsStrings(streamsOne); final List<String> streamOneTaskIds = new ArrayList<>();
final List<String> streamTwoTasksIds = getTaskIdsAsStrings(streamsTwo); final List<String> streamTwoTasksIds = new ArrayList<>();
waitForCondition(() -> {
streamOneTaskIds.clear();
streamTwoTasksIds.clear();
streamOneTaskIds.addAll(getTaskIdsAsStrings(streamsOne));
streamTwoTasksIds.addAll(getTaskIdsAsStrings(streamsTwo));
return streamOneTaskIds.size() == 2 && streamTwoTasksIds.size() == 2;
},
"Task assignment did not complete."
);
final List<MetricName> streamsOneTaskMetrics = streamsOne.metrics().values().stream().map(Metric::metricName) final List<MetricName> streamsOneTaskMetrics = streamsOne.metrics().values().stream().map(Metric::metricName)
.filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList()); .filter(metricName -> metricName.tags().containsKey("task-id")).collect(Collectors.toList());