diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java index a95f590c5b2..af775c7c451 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java @@ -149,7 +149,7 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics metrics.addMetric(numPartitionsActive, (Gauge) (config, now) -> numPartitionsActiveCounter.get()); metrics.addMetric(numPartitionsFailed, (Gauge) (config, now) -> numPartitionsFailedCounter.get()); - this.partitionLoadSensor = metrics.sensor("GroupPartitionLoadTime"); + this.partitionLoadSensor = metrics.sensor(this.metricsGroup + "-PartitionLoadTime"); this.partitionLoadSensor.add( metrics.metricName( "partition-load-time-max", @@ -163,7 +163,7 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics "The average time it took to load the partitions in the last 30 sec." ), new Avg()); - this.threadIdleSensor = metrics.sensor("ThreadIdleRatio"); + this.threadIdleSensor = metrics.sensor(this.metricsGroup + "-ThreadIdleRatio"); this.threadIdleSensor.add( metrics.metricName( "thread-idle-ratio-avg", @@ -178,7 +178,7 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics "The " + suffix + " event queue time in milliseconds" ) ); - this.eventQueueTimeSensor = metrics.sensor("EventQueueTime"); + this.eventQueueTimeSensor = metrics.sensor(this.metricsGroup + "-EventQueueTime"); this.eventQueueTimeSensor.add(eventQueueTimeHistogram); KafkaMetricHistogram eventProcessingTimeHistogram = KafkaMetricHistogram.newLatencyHistogram( @@ -187,7 +187,7 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics "The " + suffix + " event processing time in milliseconds" ) ); - this.eventProcessingTimeSensor = metrics.sensor("EventProcessingTime"); + this.eventProcessingTimeSensor = metrics.sensor(this.metricsGroup + "-EventProcessingTime"); this.eventProcessingTimeSensor.add(eventProcessingTimeHistogram); KafkaMetricHistogram eventPurgatoryTimeHistogram = KafkaMetricHistogram.newLatencyHistogram( @@ -196,7 +196,7 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics "The " + suffix + " event purgatory time in milliseconds" ) ); - this.eventPurgatoryTimeSensor = metrics.sensor("EventPurgatoryTime"); + this.eventPurgatoryTimeSensor = metrics.sensor(this.metricsGroup + "-EventPurgatoryTime"); this.eventPurgatoryTimeSensor.add(eventPurgatoryTimeHistogram); KafkaMetricHistogram flushTimeHistogram = KafkaMetricHistogram.newLatencyHistogram( @@ -205,7 +205,7 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics "The " + suffix + " flush time in milliseconds" ) ); - this.flushTimeSensor = metrics.sensor("FlushTime"); + this.flushTimeSensor = metrics.sensor(this.metricsGroup + "-FlushTime"); this.flushTimeSensor.add(flushTimeHistogram); } diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java index 04ba1264ee5..68f152f2bea 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.util.List; import java.util.Set; import java.util.stream.IntStream; @@ -38,12 +39,14 @@ import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetr import static org.apache.kafka.coordinator.common.runtime.KafkaMetricHistogram.MAX_LATENCY_MS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; public class CoordinatorRuntimeMetricsImplTest { private static final String METRICS_GROUP = "test-runtime-metrics"; - + private static final String OTHER_METRICS_GROUP = "test-runtime-metrics-2"; + @Test public void testMetricNames() { Metrics metrics = new Metrics(); @@ -109,6 +112,26 @@ public class CoordinatorRuntimeMetricsImplTest { } } + @Test + public void testNumPartitionsMetricsGroupIsolation() { + Metrics metrics = new Metrics(); + + try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP); + CoordinatorRuntimeMetricsImpl runtimeMetrics2 = new CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) { + IntStream.range(0, 3) + .forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorState.INITIAL, CoordinatorState.LOADING)); + IntStream.range(0, 2) + .forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorState.LOADING, CoordinatorState.ACTIVE)); + IntStream.range(0, 1) + .forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorState.ACTIVE, CoordinatorState.FAILED)); + + for (String state : List.of("loading", "active", "failed")) { + assertMetricGauge(metrics, kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", state), 1); + assertMetricGauge(metrics, otherGroupKafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", state), 0); + } + } + } + @Test public void testPartitionLoadSensorMetrics() { Time time = new MockTime(); @@ -130,6 +153,29 @@ public class CoordinatorRuntimeMetricsImplTest { } } + @ParameterizedTest + @ValueSource(strings = { + "partition-load-time-avg", + "partition-load-time-max" + }) + public void testPartitionLoadSensorMetricsGroupIsolation(String name) { + Time time = new MockTime(); + Metrics metrics = new Metrics(time); + + try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP); + CoordinatorRuntimeMetricsImpl runtimeMetrics2 = new CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) { + long startTimeMs = time.milliseconds(); + runtimeMetrics.recordPartitionLoadSensor(startTimeMs, startTimeMs + 1000); + + org.apache.kafka.common.MetricName metricName = kafkaMetricName(metrics, name); + org.apache.kafka.common.MetricName otherGroupMetricName = otherGroupKafkaMetricName(metrics, name); + KafkaMetric metric = metrics.metrics().get(metricName); + KafkaMetric otherMetric = metrics.metrics().get(otherGroupMetricName); + assertNotEquals(Double.NaN, metric.metricValue()); + assertEquals(Double.NaN, otherMetric.metricValue()); + } + } + @Test public void testThreadIdleSensor() { Time time = new MockTime(); @@ -143,6 +189,22 @@ public class CoordinatorRuntimeMetricsImplTest { assertEquals(6 / 30.0, metric.metricValue()); // 'total_ms / window_ms' } + @Test + public void testThreadIdleSensorMetricsGroupIsolation() { + Time time = new MockTime(); + Metrics metrics = new Metrics(time); + + try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP); + CoordinatorRuntimeMetricsImpl runtimeMetrics2 = new CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) { + runtimeMetrics.recordThreadIdleTime(1000.0); + + org.apache.kafka.common.MetricName metricName = kafkaMetricName(metrics, "thread-idle-ratio-avg"); + org.apache.kafka.common.MetricName otherGroupMetricName = otherGroupKafkaMetricName(metrics, "thread-idle-ratio-avg"); + assertNotEquals(0.0, metrics.metrics().get(metricName).metricValue()); + assertEquals(0.0, metrics.metrics().get(otherGroupMetricName).metricValue()); + } + } + @Test public void testEventQueueSize() { Time time = new MockTime(); @@ -154,6 +216,21 @@ public class CoordinatorRuntimeMetricsImplTest { } } + @Test + public void testEventQueueSizeMetricsGroupIsolation() { + Time time = new MockTime(); + Metrics metrics = new Metrics(time); + + try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP); + CoordinatorRuntimeMetricsImpl otherRuntimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) { + runtimeMetrics.registerEventQueueSizeGauge(() -> 5); + otherRuntimeMetrics.registerEventQueueSizeGauge(() -> 0); + + assertMetricGauge(metrics, kafkaMetricName(metrics, "event-queue-size"), 5); + assertMetricGauge(metrics, otherGroupKafkaMetricName(metrics, "event-queue-size"), 0); + } + } + @ParameterizedTest @ValueSource(strings = { EVENT_QUEUE_TIME_METRIC_NAME, @@ -204,6 +281,45 @@ public class CoordinatorRuntimeMetricsImplTest { assertEquals(999.0, metric.metricValue()); } + @ParameterizedTest + @ValueSource(strings = { + EVENT_QUEUE_TIME_METRIC_NAME, + EVENT_PROCESSING_TIME_METRIC_NAME, + EVENT_PURGATORY_TIME_METRIC_NAME, + BATCH_FLUSH_TIME_METRIC_NAME + }) + public void testHistogramMetricsGroupIsolation(String metricNamePrefix) { + Time time = new MockTime(); + Metrics metrics = new Metrics(time); + + try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP); + CoordinatorRuntimeMetricsImpl runtimeMetrics2 = new CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) { + switch (metricNamePrefix) { + case EVENT_QUEUE_TIME_METRIC_NAME: + runtimeMetrics.recordEventQueueTime(1000); + break; + case EVENT_PROCESSING_TIME_METRIC_NAME: + runtimeMetrics.recordEventProcessingTime(1000); + break; + case EVENT_PURGATORY_TIME_METRIC_NAME: + runtimeMetrics.recordEventPurgatoryTime(1000); + break; + case BATCH_FLUSH_TIME_METRIC_NAME: + runtimeMetrics.recordFlushTime(1000); + } + + // Check metric group isolation + for (String suffix : List.of("-max", "-p50", "-p95", "-p99", "-p999")) { + org.apache.kafka.common.MetricName metricName = kafkaMetricName(metrics, metricNamePrefix + suffix); + org.apache.kafka.common.MetricName otherGroupMetricName = otherGroupKafkaMetricName(metrics, metricNamePrefix + suffix); + KafkaMetric metric = metrics.metrics().get(metricName); + KafkaMetric otherMetric = metrics.metrics().get(otherGroupMetricName); + assertNotEquals(0.0, metric.metricValue()); + assertEquals(0.0, otherMetric.metricValue()); + } + } + } + @Test public void testRecordEventPurgatoryTimeLimit() { Time time = new MockTime(); @@ -228,4 +344,8 @@ public class CoordinatorRuntimeMetricsImplTest { private static MetricName kafkaMetricName(Metrics metrics, String name, String... keyValue) { return metrics.metricName(name, METRICS_GROUP, "", keyValue); } + + private static MetricName otherGroupKafkaMetricName(Metrics metrics, String name, String... keyValue) { + return metrics.metricName(name, OTHER_METRICS_GROUP, "", keyValue); + } }