From 7d098cfbbd26ff60a3edb8f15e3068c8999a48b7 Mon Sep 17 00:00:00 2001 From: Ken Huang Date: Mon, 29 Sep 2025 01:24:38 +0800 Subject: [PATCH] KAFKA-17876/ KAFKA-19150 Rename AssignmentsManager and RemoteStorageThreadPool metrics (#20265) Rename org.apache.kafka.server:type=AssignmentsManager and org.apache.kafka.storage.internals.log.RemoteStorageThreadPool metrics for the consist, these metrics should be - `kafka.log.remote:type=...` - `kafka.server:type=...` Reviewers: Chia-Ping Tsai --- docs/upgrade.html | 12 +++++++++ .../kafka/server/AssignmentsManager.java | 27 ++++++++++++------- .../kafka/server/AssignmentsManagerTest.java | 10 +++++++ .../remote/storage/RemoteStorageMetrics.java | 12 +++++++-- .../log/RemoteStorageThreadPool.java | 18 ++++++++++--- 5 files changed, 63 insertions(+), 16 deletions(-) diff --git a/docs/upgrade.html b/docs/upgrade.html index 81af2d65261..d28898590f8 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -170,6 +170,18 @@ For further details, please refer to KIP-1120. +
  • + The metrics org.apache.kafka.server:type=AssignmentsManager.QueuedReplicaToDirAssignments, + org.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool.RemoteLogReaderTaskQueueSize, and + org.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool.RemoteLogReaderAvgIdlePercent + have been deprecated and will be removed in Kafka 5.0. + + As replacements, the following metrics have been introduced, which report the same information: + kafka.server:type=AssignmentsManager.QueuedReplicaToDirAssignments, + kafka.log.remote:type=RemoteStorageThreadPool.RemoteLogReaderTaskQueueSize, and + kafka.log.remote:type=RemoteStorageThreadPool.RemoteLogReaderAvgIdlePercent. + For further details, please refer to KIP-1100. +
  • Upgrading to 4.1.0

    diff --git a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java index 5b20e1475a2..34a0584e394 100644 --- a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java +++ b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java @@ -71,11 +71,15 @@ public final class AssignmentsManager { */ static final long MIN_NOISY_FAILURE_INTERVAL_NS = TimeUnit.MINUTES.toNanos(2); + @Deprecated(since = "4.2") + static final MetricName DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC = + KafkaYammerMetrics.getMetricName("org.apache.kafka.server", "AssignmentsManager", "QueuedReplicaToDirAssignments"); + /** * The metric reflecting the number of pending assignments. */ static final MetricName QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC = - metricName("QueuedReplicaToDirAssignments"); + KafkaYammerMetrics.getMetricName("kafka.server", "AssignmentsManager", "QueuedReplicaToDirAssignments"); /** * The event at which we send assignments, if appropriate. @@ -142,10 +146,6 @@ public final class AssignmentsManager { */ private final KafkaEventQueue eventQueue; - static MetricName metricName(String name) { - return KafkaYammerMetrics.getMetricName("org.apache.kafka.server", "AssignmentsManager", name); - } - public AssignmentsManager( Time time, NodeToControllerChannelManager channelManager, @@ -182,12 +182,18 @@ public final class AssignmentsManager { this.ready = new ConcurrentHashMap<>(); this.inflight = Map.of(); this.metricsRegistry = metricsRegistry; + this.metricsRegistry.newGauge(DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new Gauge() { + @Override + public Integer value() { + return numPending(); + } + }); this.metricsRegistry.newGauge(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new Gauge() { - @Override - public Integer value() { - return numPending(); - } - }); + @Override + public Integer value() { + return numPending(); + } + }); this.previousGlobalFailures = 0; this.eventQueue = new KafkaEventQueue(time, new LogContext("[AssignmentsManager id=" + nodeId + "]"), @@ -248,6 +254,7 @@ public final class AssignmentsManager { log.error("Unexpected exception shutting down NodeToControllerChannelManager", e); } try { + metricsRegistry.removeMetric(DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC); metricsRegistry.removeMetric(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC); } catch (Exception e) { log.error("Unexpected exception removing metrics.", e); diff --git a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java index 3397a7488ff..4c533dd5737 100644 --- a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java +++ b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java @@ -250,6 +250,13 @@ public class AssignmentsManagerTest { return queuedReplicaToDirAssignments.value(); } + @SuppressWarnings("unchecked") // do not warn about Gauge typecast. + int deprecatedQueuedReplicaToDirAssignments() { + Gauge queuedReplicaToDirAssignments = + (Gauge) findMetric(AssignmentsManager.DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC); + return queuedReplicaToDirAssignments.value(); + } + @Override public void close() throws Exception { try { @@ -279,10 +286,12 @@ public class AssignmentsManagerTest { public void testSuccessfulAssignment() throws Exception { try (TestEnv testEnv = new TestEnv()) { assertEquals(0, testEnv.queuedReplicaToDirAssignments()); + assertEquals(0, testEnv.deprecatedQueuedReplicaToDirAssignments()); testEnv.onAssignment(new TopicIdPartition(TOPIC_1, 0), DIR_1); TestUtils.retryOnExceptionWithTimeout(60_000, () -> { assertEquals(1, testEnv.assignmentsManager.numPending()); assertEquals(1, testEnv.queuedReplicaToDirAssignments()); + assertEquals(1, testEnv.deprecatedQueuedReplicaToDirAssignments()); }); assertEquals(0, testEnv.assignmentsManager.previousGlobalFailures()); assertEquals(1, testEnv.assignmentsManager.numInFlight()); @@ -290,6 +299,7 @@ public class AssignmentsManagerTest { TestUtils.retryOnExceptionWithTimeout(60_000, () -> { assertEquals(0, testEnv.assignmentsManager.numPending()); assertEquals(0, testEnv.queuedReplicaToDirAssignments()); + assertEquals(0, testEnv.deprecatedQueuedReplicaToDirAssignments()); assertEquals(1, testEnv.success(new TopicIdPartition(TOPIC_1, 0))); }); assertEquals(0, testEnv.assignmentsManager.previousGlobalFailures()); diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java index 7922d88d831..8e47a674681 100644 --- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java +++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java @@ -90,10 +90,16 @@ public class RemoteStorageMetrics { "kafka.server", "BrokerTopicMetrics", REMOTE_DELETE_LAG_SEGMENTS); public static final MetricName REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC = getMetricName( "kafka.log.remote", "RemoteLogManager", REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT); - public static final MetricName REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName( + @Deprecated(since = "4.2") + public static final MetricName DEPRECATE_REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName( "org.apache.kafka.storage.internals.log", "RemoteStorageThreadPool", REMOTE_LOG_READER_TASK_QUEUE_SIZE); - public static final MetricName REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = getMetricName( + @Deprecated(since = "4.2") + public static final MetricName DEPRECATE_REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = getMetricName( "org.apache.kafka.storage.internals.log", "RemoteStorageThreadPool", REMOTE_LOG_READER_AVG_IDLE_PERCENT); + public static final MetricName REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName( + "kafka.log.remote", "RemoteStorageThreadPool", REMOTE_LOG_READER_TASK_QUEUE_SIZE); + public static final MetricName REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = getMetricName( + "kafka.log.remote", "RemoteStorageThreadPool", REMOTE_LOG_READER_AVG_IDLE_PERCENT); public static final MetricName REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC = getMetricName( "kafka.log.remote", "RemoteLogManager", REMOTE_LOG_READER_FETCH_RATE_AND_TIME_MS); @@ -115,6 +121,8 @@ public class RemoteStorageMetrics { metrics.add(REMOTE_DELETE_LAG_BYTES_METRIC); metrics.add(REMOTE_DELETE_LAG_SEGMENTS_METRIC); metrics.add(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC); + metrics.add(DEPRECATE_REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC); + metrics.add(DEPRECATE_REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC); metrics.add(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC); metrics.add(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC); metrics.add(REMOTE_LOG_METADATA_COUNT_METRIC); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java index a09b558b124..692afbddaf2 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java @@ -17,6 +17,7 @@ package org.apache.kafka.storage.internals.log; import org.apache.kafka.common.utils.ThreadUtils; +import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics; import org.apache.kafka.server.metrics.KafkaMetricsGroup; import org.slf4j.Logger; @@ -32,8 +33,12 @@ import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.RE public final class RemoteStorageThreadPool extends ThreadPoolExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteStorageThreadPool.class); - private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(this.getClass()); + @Deprecated(since = "4.2") + // This metrics group is used to register deprecated metrics. It will be removed in Kafka 5.0 + private final KafkaMetricsGroup deprecatedLogMetricsGroup = new KafkaMetricsGroup("org.apache.kafka.storage.internals.log", "RemoteStorageThreadPool"); + private final KafkaMetricsGroup logRemoteMetricsGroup = new KafkaMetricsGroup("kafka.log.remote", "RemoteStorageThreadPool"); + @SuppressWarnings("deprecation") public RemoteStorageThreadPool(String threadNamePattern, int numThreads, int maxPendingTasks) { @@ -45,9 +50,13 @@ public final class RemoteStorageThreadPool extends ThreadPoolExecutor { ThreadUtils.createThreadFactory(threadNamePattern, false, (t, e) -> LOGGER.error("Uncaught exception in thread '{}':", t.getName(), e)) ); - metricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(), + deprecatedLogMetricsGroup.newGauge(RemoteStorageMetrics.DEPRECATE_REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(), () -> getQueue().size()); - metricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(), + deprecatedLogMetricsGroup.newGauge(RemoteStorageMetrics.DEPRECATE_REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(), + () -> 1 - (double) getActiveCount() / (double) getCorePoolSize()); + logRemoteMetricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(), + () -> getQueue().size()); + logRemoteMetricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(), () -> 1 - (double) getActiveCount() / (double) getCorePoolSize()); } @@ -59,6 +68,7 @@ public final class RemoteStorageThreadPool extends ThreadPoolExecutor { } public void removeMetrics() { - REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(metricsGroup::removeMetric); + REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(deprecatedLogMetricsGroup::removeMetric); + REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(logRemoteMetricsGroup::removeMetric); } }