KAFKA-17876/ KAFKA-19150 Rename AssignmentsManager and RemoteStorageThreadPool metrics (#20265)

Rename org.apache.kafka.server:type=AssignmentsManager and
org.apache.kafka.storage.internals.log.RemoteStorageThreadPool metrics
for the consist, these metrics should be

- `kafka.log.remote:type=...`
- `kafka.server:type=...`

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ken Huang 2025-09-29 01:24:38 +08:00 committed by GitHub
parent 60ad638a35
commit 7d098cfbbd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 63 additions and 16 deletions

View File

@ -170,6 +170,18 @@
</ul>
For further details, please refer to <a href="https://cwiki.apache.org/confluence/x/3gn0Ew">KIP-1120</a>.
</li>
<li>
The metrics <code>org.apache.kafka.server:type=AssignmentsManager.QueuedReplicaToDirAssignments</code>,
<code>org.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool.RemoteLogReaderTaskQueueSize</code>, and
<code>org.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool.RemoteLogReaderAvgIdlePercent</code>
have been deprecated and will be removed in Kafka 5.0.
As replacements, the following metrics have been introduced, which report the same information:
<code>kafka.server:type=AssignmentsManager.QueuedReplicaToDirAssignments</code>,
<code>kafka.log.remote:type=RemoteStorageThreadPool.RemoteLogReaderTaskQueueSize</code>, and
<code>kafka.log.remote:type=RemoteStorageThreadPool.RemoteLogReaderAvgIdlePercent</code>.
For further details, please refer to <a href="https://cwiki.apache.org/confluence/x/6oqMEw">KIP-1100</a>.
</li>
</ul>
<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>

View File

@ -71,11 +71,15 @@ public final class AssignmentsManager {
*/
static final long MIN_NOISY_FAILURE_INTERVAL_NS = TimeUnit.MINUTES.toNanos(2);
@Deprecated(since = "4.2")
static final MetricName DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC =
KafkaYammerMetrics.getMetricName("org.apache.kafka.server", "AssignmentsManager", "QueuedReplicaToDirAssignments");
/**
* The metric reflecting the number of pending assignments.
*/
static final MetricName QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC =
metricName("QueuedReplicaToDirAssignments");
KafkaYammerMetrics.getMetricName("kafka.server", "AssignmentsManager", "QueuedReplicaToDirAssignments");
/**
* The event at which we send assignments, if appropriate.
@ -142,10 +146,6 @@ public final class AssignmentsManager {
*/
private final KafkaEventQueue eventQueue;
static MetricName metricName(String name) {
return KafkaYammerMetrics.getMetricName("org.apache.kafka.server", "AssignmentsManager", name);
}
public AssignmentsManager(
Time time,
NodeToControllerChannelManager channelManager,
@ -182,12 +182,18 @@ public final class AssignmentsManager {
this.ready = new ConcurrentHashMap<>();
this.inflight = Map.of();
this.metricsRegistry = metricsRegistry;
this.metricsRegistry.newGauge(DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new Gauge<Integer>() {
@Override
public Integer value() {
return numPending();
}
});
this.metricsRegistry.newGauge(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new Gauge<Integer>() {
@Override
public Integer value() {
return numPending();
}
});
@Override
public Integer value() {
return numPending();
}
});
this.previousGlobalFailures = 0;
this.eventQueue = new KafkaEventQueue(time,
new LogContext("[AssignmentsManager id=" + nodeId + "]"),
@ -248,6 +254,7 @@ public final class AssignmentsManager {
log.error("Unexpected exception shutting down NodeToControllerChannelManager", e);
}
try {
metricsRegistry.removeMetric(DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
metricsRegistry.removeMetric(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
} catch (Exception e) {
log.error("Unexpected exception removing metrics.", e);

View File

@ -250,6 +250,13 @@ public class AssignmentsManagerTest {
return queuedReplicaToDirAssignments.value();
}
@SuppressWarnings("unchecked") // do not warn about Gauge typecast.
int deprecatedQueuedReplicaToDirAssignments() {
Gauge<Integer> queuedReplicaToDirAssignments =
(Gauge<Integer>) findMetric(AssignmentsManager.DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
return queuedReplicaToDirAssignments.value();
}
@Override
public void close() throws Exception {
try {
@ -279,10 +286,12 @@ public class AssignmentsManagerTest {
public void testSuccessfulAssignment() throws Exception {
try (TestEnv testEnv = new TestEnv()) {
assertEquals(0, testEnv.queuedReplicaToDirAssignments());
assertEquals(0, testEnv.deprecatedQueuedReplicaToDirAssignments());
testEnv.onAssignment(new TopicIdPartition(TOPIC_1, 0), DIR_1);
TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
assertEquals(1, testEnv.assignmentsManager.numPending());
assertEquals(1, testEnv.queuedReplicaToDirAssignments());
assertEquals(1, testEnv.deprecatedQueuedReplicaToDirAssignments());
});
assertEquals(0, testEnv.assignmentsManager.previousGlobalFailures());
assertEquals(1, testEnv.assignmentsManager.numInFlight());
@ -290,6 +299,7 @@ public class AssignmentsManagerTest {
TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
assertEquals(0, testEnv.assignmentsManager.numPending());
assertEquals(0, testEnv.queuedReplicaToDirAssignments());
assertEquals(0, testEnv.deprecatedQueuedReplicaToDirAssignments());
assertEquals(1, testEnv.success(new TopicIdPartition(TOPIC_1, 0)));
});
assertEquals(0, testEnv.assignmentsManager.previousGlobalFailures());

View File

@ -90,10 +90,16 @@ public class RemoteStorageMetrics {
"kafka.server", "BrokerTopicMetrics", REMOTE_DELETE_LAG_SEGMENTS);
public static final MetricName REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC = getMetricName(
"kafka.log.remote", "RemoteLogManager", REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
public static final MetricName REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName(
@Deprecated(since = "4.2")
public static final MetricName DEPRECATE_REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName(
"org.apache.kafka.storage.internals.log", "RemoteStorageThreadPool", REMOTE_LOG_READER_TASK_QUEUE_SIZE);
public static final MetricName REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = getMetricName(
@Deprecated(since = "4.2")
public static final MetricName DEPRECATE_REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = getMetricName(
"org.apache.kafka.storage.internals.log", "RemoteStorageThreadPool", REMOTE_LOG_READER_AVG_IDLE_PERCENT);
public static final MetricName REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName(
"kafka.log.remote", "RemoteStorageThreadPool", REMOTE_LOG_READER_TASK_QUEUE_SIZE);
public static final MetricName REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = getMetricName(
"kafka.log.remote", "RemoteStorageThreadPool", REMOTE_LOG_READER_AVG_IDLE_PERCENT);
public static final MetricName REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC = getMetricName(
"kafka.log.remote", "RemoteLogManager", REMOTE_LOG_READER_FETCH_RATE_AND_TIME_MS);
@ -115,6 +121,8 @@ public class RemoteStorageMetrics {
metrics.add(REMOTE_DELETE_LAG_BYTES_METRIC);
metrics.add(REMOTE_DELETE_LAG_SEGMENTS_METRIC);
metrics.add(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC);
metrics.add(DEPRECATE_REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC);
metrics.add(DEPRECATE_REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC);
metrics.add(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC);
metrics.add(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC);
metrics.add(REMOTE_LOG_METADATA_COUNT_METRIC);

View File

@ -17,6 +17,7 @@
package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.slf4j.Logger;
@ -32,8 +33,12 @@ import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.RE
public final class RemoteStorageThreadPool extends ThreadPoolExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteStorageThreadPool.class);
private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(this.getClass());
@Deprecated(since = "4.2")
// This metrics group is used to register deprecated metrics. It will be removed in Kafka 5.0
private final KafkaMetricsGroup deprecatedLogMetricsGroup = new KafkaMetricsGroup("org.apache.kafka.storage.internals.log", "RemoteStorageThreadPool");
private final KafkaMetricsGroup logRemoteMetricsGroup = new KafkaMetricsGroup("kafka.log.remote", "RemoteStorageThreadPool");
@SuppressWarnings("deprecation")
public RemoteStorageThreadPool(String threadNamePattern,
int numThreads,
int maxPendingTasks) {
@ -45,9 +50,13 @@ public final class RemoteStorageThreadPool extends ThreadPoolExecutor {
ThreadUtils.createThreadFactory(threadNamePattern, false,
(t, e) -> LOGGER.error("Uncaught exception in thread '{}':", t.getName(), e))
);
metricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(),
deprecatedLogMetricsGroup.newGauge(RemoteStorageMetrics.DEPRECATE_REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(),
() -> getQueue().size());
metricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(),
deprecatedLogMetricsGroup.newGauge(RemoteStorageMetrics.DEPRECATE_REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(),
() -> 1 - (double) getActiveCount() / (double) getCorePoolSize());
logRemoteMetricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(),
() -> getQueue().size());
logRemoteMetricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(),
() -> 1 - (double) getActiveCount() / (double) getCorePoolSize());
}
@ -59,6 +68,7 @@ public final class RemoteStorageThreadPool extends ThreadPoolExecutor {
}
public void removeMetrics() {
REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(metricsGroup::removeMetric);
REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(deprecatedLogMetricsGroup::removeMetric);
REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(logRemoteMetricsGroup::removeMetric);
}
}