diff --git a/docs/upgrade.html b/docs/upgrade.html
index 81af2d65261..d28898590f8 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -170,6 +170,18 @@
For further details, please refer to KIP-1120.
+
+ The metrics org.apache.kafka.server:type=AssignmentsManager.QueuedReplicaToDirAssignments
,
+ org.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool.RemoteLogReaderTaskQueueSize
, and
+ org.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool.RemoteLogReaderAvgIdlePercent
+ have been deprecated and will be removed in Kafka 5.0.
+
+ As replacements, the following metrics have been introduced, which report the same information:
+ kafka.server:type=AssignmentsManager.QueuedReplicaToDirAssignments
,
+ kafka.log.remote:type=RemoteStorageThreadPool.RemoteLogReaderTaskQueueSize
, and
+ kafka.log.remote:type=RemoteStorageThreadPool.RemoteLogReaderAvgIdlePercent
.
+ For further details, please refer to KIP-1100.
+
diff --git a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
index 5b20e1475a2..34a0584e394 100644
--- a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
@@ -71,11 +71,15 @@ public final class AssignmentsManager {
*/
static final long MIN_NOISY_FAILURE_INTERVAL_NS = TimeUnit.MINUTES.toNanos(2);
+ @Deprecated(since = "4.2")
+ static final MetricName DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC =
+ KafkaYammerMetrics.getMetricName("org.apache.kafka.server", "AssignmentsManager", "QueuedReplicaToDirAssignments");
+
/**
* The metric reflecting the number of pending assignments.
*/
static final MetricName QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC =
- metricName("QueuedReplicaToDirAssignments");
+ KafkaYammerMetrics.getMetricName("kafka.server", "AssignmentsManager", "QueuedReplicaToDirAssignments");
/**
* The event at which we send assignments, if appropriate.
@@ -142,10 +146,6 @@ public final class AssignmentsManager {
*/
private final KafkaEventQueue eventQueue;
- static MetricName metricName(String name) {
- return KafkaYammerMetrics.getMetricName("org.apache.kafka.server", "AssignmentsManager", name);
- }
-
public AssignmentsManager(
Time time,
NodeToControllerChannelManager channelManager,
@@ -182,12 +182,18 @@ public final class AssignmentsManager {
this.ready = new ConcurrentHashMap<>();
this.inflight = Map.of();
this.metricsRegistry = metricsRegistry;
+ this.metricsRegistry.newGauge(DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new Gauge() {
+ @Override
+ public Integer value() {
+ return numPending();
+ }
+ });
this.metricsRegistry.newGauge(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new Gauge() {
- @Override
- public Integer value() {
- return numPending();
- }
- });
+ @Override
+ public Integer value() {
+ return numPending();
+ }
+ });
this.previousGlobalFailures = 0;
this.eventQueue = new KafkaEventQueue(time,
new LogContext("[AssignmentsManager id=" + nodeId + "]"),
@@ -248,6 +254,7 @@ public final class AssignmentsManager {
log.error("Unexpected exception shutting down NodeToControllerChannelManager", e);
}
try {
+ metricsRegistry.removeMetric(DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
metricsRegistry.removeMetric(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
} catch (Exception e) {
log.error("Unexpected exception removing metrics.", e);
diff --git a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
index 3397a7488ff..4c533dd5737 100644
--- a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
+++ b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
@@ -250,6 +250,13 @@ public class AssignmentsManagerTest {
return queuedReplicaToDirAssignments.value();
}
+ @SuppressWarnings("unchecked") // do not warn about Gauge typecast.
+ int deprecatedQueuedReplicaToDirAssignments() {
+ Gauge queuedReplicaToDirAssignments =
+ (Gauge) findMetric(AssignmentsManager.DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
+ return queuedReplicaToDirAssignments.value();
+ }
+
@Override
public void close() throws Exception {
try {
@@ -279,10 +286,12 @@ public class AssignmentsManagerTest {
public void testSuccessfulAssignment() throws Exception {
try (TestEnv testEnv = new TestEnv()) {
assertEquals(0, testEnv.queuedReplicaToDirAssignments());
+ assertEquals(0, testEnv.deprecatedQueuedReplicaToDirAssignments());
testEnv.onAssignment(new TopicIdPartition(TOPIC_1, 0), DIR_1);
TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
assertEquals(1, testEnv.assignmentsManager.numPending());
assertEquals(1, testEnv.queuedReplicaToDirAssignments());
+ assertEquals(1, testEnv.deprecatedQueuedReplicaToDirAssignments());
});
assertEquals(0, testEnv.assignmentsManager.previousGlobalFailures());
assertEquals(1, testEnv.assignmentsManager.numInFlight());
@@ -290,6 +299,7 @@ public class AssignmentsManagerTest {
TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
assertEquals(0, testEnv.assignmentsManager.numPending());
assertEquals(0, testEnv.queuedReplicaToDirAssignments());
+ assertEquals(0, testEnv.deprecatedQueuedReplicaToDirAssignments());
assertEquals(1, testEnv.success(new TopicIdPartition(TOPIC_1, 0)));
});
assertEquals(0, testEnv.assignmentsManager.previousGlobalFailures());
diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
index 7922d88d831..8e47a674681 100644
--- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
+++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
@@ -90,10 +90,16 @@ public class RemoteStorageMetrics {
"kafka.server", "BrokerTopicMetrics", REMOTE_DELETE_LAG_SEGMENTS);
public static final MetricName REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC = getMetricName(
"kafka.log.remote", "RemoteLogManager", REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
- public static final MetricName REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName(
+ @Deprecated(since = "4.2")
+ public static final MetricName DEPRECATE_REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName(
"org.apache.kafka.storage.internals.log", "RemoteStorageThreadPool", REMOTE_LOG_READER_TASK_QUEUE_SIZE);
- public static final MetricName REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = getMetricName(
+ @Deprecated(since = "4.2")
+ public static final MetricName DEPRECATE_REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = getMetricName(
"org.apache.kafka.storage.internals.log", "RemoteStorageThreadPool", REMOTE_LOG_READER_AVG_IDLE_PERCENT);
+ public static final MetricName REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName(
+ "kafka.log.remote", "RemoteStorageThreadPool", REMOTE_LOG_READER_TASK_QUEUE_SIZE);
+ public static final MetricName REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = getMetricName(
+ "kafka.log.remote", "RemoteStorageThreadPool", REMOTE_LOG_READER_AVG_IDLE_PERCENT);
public static final MetricName REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC = getMetricName(
"kafka.log.remote", "RemoteLogManager", REMOTE_LOG_READER_FETCH_RATE_AND_TIME_MS);
@@ -115,6 +121,8 @@ public class RemoteStorageMetrics {
metrics.add(REMOTE_DELETE_LAG_BYTES_METRIC);
metrics.add(REMOTE_DELETE_LAG_SEGMENTS_METRIC);
metrics.add(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC);
+ metrics.add(DEPRECATE_REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC);
+ metrics.add(DEPRECATE_REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC);
metrics.add(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC);
metrics.add(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC);
metrics.add(REMOTE_LOG_METADATA_COUNT_METRIC);
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
index a09b558b124..692afbddaf2 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
@@ -17,6 +17,7 @@
package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.slf4j.Logger;
@@ -32,8 +33,12 @@ import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.RE
public final class RemoteStorageThreadPool extends ThreadPoolExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteStorageThreadPool.class);
- private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(this.getClass());
+ @Deprecated(since = "4.2")
+ // This metrics group is used to register deprecated metrics. It will be removed in Kafka 5.0
+ private final KafkaMetricsGroup deprecatedLogMetricsGroup = new KafkaMetricsGroup("org.apache.kafka.storage.internals.log", "RemoteStorageThreadPool");
+ private final KafkaMetricsGroup logRemoteMetricsGroup = new KafkaMetricsGroup("kafka.log.remote", "RemoteStorageThreadPool");
+ @SuppressWarnings("deprecation")
public RemoteStorageThreadPool(String threadNamePattern,
int numThreads,
int maxPendingTasks) {
@@ -45,9 +50,13 @@ public final class RemoteStorageThreadPool extends ThreadPoolExecutor {
ThreadUtils.createThreadFactory(threadNamePattern, false,
(t, e) -> LOGGER.error("Uncaught exception in thread '{}':", t.getName(), e))
);
- metricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(),
+ deprecatedLogMetricsGroup.newGauge(RemoteStorageMetrics.DEPRECATE_REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(),
() -> getQueue().size());
- metricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(),
+ deprecatedLogMetricsGroup.newGauge(RemoteStorageMetrics.DEPRECATE_REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(),
+ () -> 1 - (double) getActiveCount() / (double) getCorePoolSize());
+ logRemoteMetricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(),
+ () -> getQueue().size());
+ logRemoteMetricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(),
() -> 1 - (double) getActiveCount() / (double) getCorePoolSize());
}
@@ -59,6 +68,7 @@ public final class RemoteStorageThreadPool extends ThreadPoolExecutor {
}
public void removeMetrics() {
- REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(metricsGroup::removeMetric);
+ REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(deprecatedLogMetricsGroup::removeMetric);
+ REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(logRemoteMetricsGroup::removeMetric);
}
}