KAFKA-19445: Fix coordinator runtime metrics sharing sensors (#20062)

When sensors are shared between different metric groups, data from all
groups is combined and added to all metrics under each sensor. This
means that different metric groups will report the same values for their
metrics.

Prefix sensor names with metric group names to isolate metric groups.

Reviewers: Yung <yungyung7654321@gmail.com>, Sushant Mahajan
<smahajan@confluent.io>, Dongnuo Lyu <dlyu@confluent.io>, TengYao Chi
<frankvicky@apache.org>
This commit is contained in:
Sean Quah 2025-06-30 08:14:39 +01:00 committed by GitHub
parent 975fe75d25
commit 08eda2ebed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 127 additions and 7 deletions

View File

@ -149,7 +149,7 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics
metrics.addMetric(numPartitionsActive, (Gauge<Long>) (config, now) -> numPartitionsActiveCounter.get()); metrics.addMetric(numPartitionsActive, (Gauge<Long>) (config, now) -> numPartitionsActiveCounter.get());
metrics.addMetric(numPartitionsFailed, (Gauge<Long>) (config, now) -> numPartitionsFailedCounter.get()); metrics.addMetric(numPartitionsFailed, (Gauge<Long>) (config, now) -> numPartitionsFailedCounter.get());
this.partitionLoadSensor = metrics.sensor("GroupPartitionLoadTime"); this.partitionLoadSensor = metrics.sensor(this.metricsGroup + "-PartitionLoadTime");
this.partitionLoadSensor.add( this.partitionLoadSensor.add(
metrics.metricName( metrics.metricName(
"partition-load-time-max", "partition-load-time-max",
@ -163,7 +163,7 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics
"The average time it took to load the partitions in the last 30 sec." "The average time it took to load the partitions in the last 30 sec."
), new Avg()); ), new Avg());
this.threadIdleSensor = metrics.sensor("ThreadIdleRatio"); this.threadIdleSensor = metrics.sensor(this.metricsGroup + "-ThreadIdleRatio");
this.threadIdleSensor.add( this.threadIdleSensor.add(
metrics.metricName( metrics.metricName(
"thread-idle-ratio-avg", "thread-idle-ratio-avg",
@ -178,7 +178,7 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics
"The " + suffix + " event queue time in milliseconds" "The " + suffix + " event queue time in milliseconds"
) )
); );
this.eventQueueTimeSensor = metrics.sensor("EventQueueTime"); this.eventQueueTimeSensor = metrics.sensor(this.metricsGroup + "-EventQueueTime");
this.eventQueueTimeSensor.add(eventQueueTimeHistogram); this.eventQueueTimeSensor.add(eventQueueTimeHistogram);
KafkaMetricHistogram eventProcessingTimeHistogram = KafkaMetricHistogram.newLatencyHistogram( KafkaMetricHistogram eventProcessingTimeHistogram = KafkaMetricHistogram.newLatencyHistogram(
@ -187,7 +187,7 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics
"The " + suffix + " event processing time in milliseconds" "The " + suffix + " event processing time in milliseconds"
) )
); );
this.eventProcessingTimeSensor = metrics.sensor("EventProcessingTime"); this.eventProcessingTimeSensor = metrics.sensor(this.metricsGroup + "-EventProcessingTime");
this.eventProcessingTimeSensor.add(eventProcessingTimeHistogram); this.eventProcessingTimeSensor.add(eventProcessingTimeHistogram);
KafkaMetricHistogram eventPurgatoryTimeHistogram = KafkaMetricHistogram.newLatencyHistogram( KafkaMetricHistogram eventPurgatoryTimeHistogram = KafkaMetricHistogram.newLatencyHistogram(
@ -196,7 +196,7 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics
"The " + suffix + " event purgatory time in milliseconds" "The " + suffix + " event purgatory time in milliseconds"
) )
); );
this.eventPurgatoryTimeSensor = metrics.sensor("EventPurgatoryTime"); this.eventPurgatoryTimeSensor = metrics.sensor(this.metricsGroup + "-EventPurgatoryTime");
this.eventPurgatoryTimeSensor.add(eventPurgatoryTimeHistogram); this.eventPurgatoryTimeSensor.add(eventPurgatoryTimeHistogram);
KafkaMetricHistogram flushTimeHistogram = KafkaMetricHistogram.newLatencyHistogram( KafkaMetricHistogram flushTimeHistogram = KafkaMetricHistogram.newLatencyHistogram(
@ -205,7 +205,7 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics
"The " + suffix + " flush time in milliseconds" "The " + suffix + " flush time in milliseconds"
) )
); );
this.flushTimeSensor = metrics.sensor("FlushTime"); this.flushTimeSensor = metrics.sensor(this.metricsGroup + "-FlushTime");
this.flushTimeSensor.add(flushTimeHistogram); this.flushTimeSensor.add(flushTimeHistogram);
} }

View File

@ -27,6 +27,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource; import org.junit.jupiter.params.provider.ValueSource;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.IntStream; import java.util.stream.IntStream;
@ -38,11 +39,13 @@ import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetr
import static org.apache.kafka.coordinator.common.runtime.KafkaMetricHistogram.MAX_LATENCY_MS; import static org.apache.kafka.coordinator.common.runtime.KafkaMetricHistogram.MAX_LATENCY_MS;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
public class CoordinatorRuntimeMetricsImplTest { public class CoordinatorRuntimeMetricsImplTest {
private static final String METRICS_GROUP = "test-runtime-metrics"; private static final String METRICS_GROUP = "test-runtime-metrics";
private static final String OTHER_METRICS_GROUP = "test-runtime-metrics-2";
@Test @Test
public void testMetricNames() { public void testMetricNames() {
@ -109,6 +112,26 @@ public class CoordinatorRuntimeMetricsImplTest {
} }
} }
@Test
public void testNumPartitionsMetricsGroupIsolation() {
Metrics metrics = new Metrics();
try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
CoordinatorRuntimeMetricsImpl runtimeMetrics2 = new CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
IntStream.range(0, 3)
.forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorState.INITIAL, CoordinatorState.LOADING));
IntStream.range(0, 2)
.forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorState.LOADING, CoordinatorState.ACTIVE));
IntStream.range(0, 1)
.forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorState.ACTIVE, CoordinatorState.FAILED));
for (String state : List.of("loading", "active", "failed")) {
assertMetricGauge(metrics, kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", state), 1);
assertMetricGauge(metrics, otherGroupKafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", state), 0);
}
}
}
@Test @Test
public void testPartitionLoadSensorMetrics() { public void testPartitionLoadSensorMetrics() {
Time time = new MockTime(); Time time = new MockTime();
@ -130,6 +153,29 @@ public class CoordinatorRuntimeMetricsImplTest {
} }
} }
@ParameterizedTest
@ValueSource(strings = {
"partition-load-time-avg",
"partition-load-time-max"
})
public void testPartitionLoadSensorMetricsGroupIsolation(String name) {
Time time = new MockTime();
Metrics metrics = new Metrics(time);
try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
CoordinatorRuntimeMetricsImpl runtimeMetrics2 = new CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
long startTimeMs = time.milliseconds();
runtimeMetrics.recordPartitionLoadSensor(startTimeMs, startTimeMs + 1000);
org.apache.kafka.common.MetricName metricName = kafkaMetricName(metrics, name);
org.apache.kafka.common.MetricName otherGroupMetricName = otherGroupKafkaMetricName(metrics, name);
KafkaMetric metric = metrics.metrics().get(metricName);
KafkaMetric otherMetric = metrics.metrics().get(otherGroupMetricName);
assertNotEquals(Double.NaN, metric.metricValue());
assertEquals(Double.NaN, otherMetric.metricValue());
}
}
@Test @Test
public void testThreadIdleSensor() { public void testThreadIdleSensor() {
Time time = new MockTime(); Time time = new MockTime();
@ -143,6 +189,22 @@ public class CoordinatorRuntimeMetricsImplTest {
assertEquals(6 / 30.0, metric.metricValue()); // 'total_ms / window_ms' assertEquals(6 / 30.0, metric.metricValue()); // 'total_ms / window_ms'
} }
@Test
public void testThreadIdleSensorMetricsGroupIsolation() {
Time time = new MockTime();
Metrics metrics = new Metrics(time);
try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
CoordinatorRuntimeMetricsImpl runtimeMetrics2 = new CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
runtimeMetrics.recordThreadIdleTime(1000.0);
org.apache.kafka.common.MetricName metricName = kafkaMetricName(metrics, "thread-idle-ratio-avg");
org.apache.kafka.common.MetricName otherGroupMetricName = otherGroupKafkaMetricName(metrics, "thread-idle-ratio-avg");
assertNotEquals(0.0, metrics.metrics().get(metricName).metricValue());
assertEquals(0.0, metrics.metrics().get(otherGroupMetricName).metricValue());
}
}
@Test @Test
public void testEventQueueSize() { public void testEventQueueSize() {
Time time = new MockTime(); Time time = new MockTime();
@ -154,6 +216,21 @@ public class CoordinatorRuntimeMetricsImplTest {
} }
} }
@Test
public void testEventQueueSizeMetricsGroupIsolation() {
Time time = new MockTime();
Metrics metrics = new Metrics(time);
try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
CoordinatorRuntimeMetricsImpl otherRuntimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
runtimeMetrics.registerEventQueueSizeGauge(() -> 5);
otherRuntimeMetrics.registerEventQueueSizeGauge(() -> 0);
assertMetricGauge(metrics, kafkaMetricName(metrics, "event-queue-size"), 5);
assertMetricGauge(metrics, otherGroupKafkaMetricName(metrics, "event-queue-size"), 0);
}
}
@ParameterizedTest @ParameterizedTest
@ValueSource(strings = { @ValueSource(strings = {
EVENT_QUEUE_TIME_METRIC_NAME, EVENT_QUEUE_TIME_METRIC_NAME,
@ -204,6 +281,45 @@ public class CoordinatorRuntimeMetricsImplTest {
assertEquals(999.0, metric.metricValue()); assertEquals(999.0, metric.metricValue());
} }
@ParameterizedTest
@ValueSource(strings = {
EVENT_QUEUE_TIME_METRIC_NAME,
EVENT_PROCESSING_TIME_METRIC_NAME,
EVENT_PURGATORY_TIME_METRIC_NAME,
BATCH_FLUSH_TIME_METRIC_NAME
})
public void testHistogramMetricsGroupIsolation(String metricNamePrefix) {
Time time = new MockTime();
Metrics metrics = new Metrics(time);
try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
CoordinatorRuntimeMetricsImpl runtimeMetrics2 = new CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
switch (metricNamePrefix) {
case EVENT_QUEUE_TIME_METRIC_NAME:
runtimeMetrics.recordEventQueueTime(1000);
break;
case EVENT_PROCESSING_TIME_METRIC_NAME:
runtimeMetrics.recordEventProcessingTime(1000);
break;
case EVENT_PURGATORY_TIME_METRIC_NAME:
runtimeMetrics.recordEventPurgatoryTime(1000);
break;
case BATCH_FLUSH_TIME_METRIC_NAME:
runtimeMetrics.recordFlushTime(1000);
}
// Check metric group isolation
for (String suffix : List.of("-max", "-p50", "-p95", "-p99", "-p999")) {
org.apache.kafka.common.MetricName metricName = kafkaMetricName(metrics, metricNamePrefix + suffix);
org.apache.kafka.common.MetricName otherGroupMetricName = otherGroupKafkaMetricName(metrics, metricNamePrefix + suffix);
KafkaMetric metric = metrics.metrics().get(metricName);
KafkaMetric otherMetric = metrics.metrics().get(otherGroupMetricName);
assertNotEquals(0.0, metric.metricValue());
assertEquals(0.0, otherMetric.metricValue());
}
}
}
@Test @Test
public void testRecordEventPurgatoryTimeLimit() { public void testRecordEventPurgatoryTimeLimit() {
Time time = new MockTime(); Time time = new MockTime();
@ -228,4 +344,8 @@ public class CoordinatorRuntimeMetricsImplTest {
private static MetricName kafkaMetricName(Metrics metrics, String name, String... keyValue) { private static MetricName kafkaMetricName(Metrics metrics, String name, String... keyValue) {
return metrics.metricName(name, METRICS_GROUP, "", keyValue); return metrics.metricName(name, METRICS_GROUP, "", keyValue);
} }
private static MetricName otherGroupKafkaMetricName(Metrics metrics, String name, String... keyValue) {
return metrics.metricName(name, OTHER_METRICS_GROUP, "", keyValue);
}
} }