diff --git a/docs/zk2kraft.html b/docs/zk2kraft.html
index 210feee581e..f85b59b3bdc 100644
--- a/docs/zk2kraft.html
+++ b/docs/zk2kraft.html
@@ -164,5 +164,17 @@
In Kraft mode, Zookeeper is not used, so the metrics is removed.
+
+
+ Remove the metrics which is monitoring the number of Zookeeper migrations.
+
+
+ kafka.controller:type=KafkaController,name=MigratingZkBrokerCount
+ kafka.controller:type=KafkaController,name=ZkMigrationState
+
+
+ Kafka remove all zookeeper dependencies, so the metrics is removed.
+
+
\ No newline at end of file
diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
index 884bc709cf6..890fbb30e63 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
@@ -42,8 +42,6 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
"KafkaController", "FencedBrokerCount");
private static final MetricName ACTIVE_BROKER_COUNT = getMetricName(
"KafkaController", "ActiveBrokerCount");
- private static final MetricName MIGRATING_ZK_BROKER_COUNT = getMetricName(
- "KafkaController", "MigratingZkBrokerCount");
private static final MetricName GLOBAL_TOPIC_COUNT = getMetricName(
"KafkaController", "GlobalTopicCount");
private static final MetricName GLOBAL_PARTITION_COUNT = getMetricName(
@@ -54,21 +52,17 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
"KafkaController", "PreferredReplicaImbalanceCount");
private static final MetricName METADATA_ERROR_COUNT = getMetricName(
"KafkaController", "MetadataErrorCount");
- private static final MetricName ZK_MIGRATION_STATE = getMetricName(
- "KafkaController", "ZkMigrationState");
private static final MetricName UNCLEAN_LEADER_ELECTIONS_PER_SEC = getMetricName(
"ControllerStats", "UncleanLeaderElectionsPerSec");
private final Optional registry;
private final AtomicInteger fencedBrokerCount = new AtomicInteger(0);
private final AtomicInteger activeBrokerCount = new AtomicInteger(0);
- private final AtomicInteger migratingZkBrokerCount = new AtomicInteger(0);
private final AtomicInteger globalTopicCount = new AtomicInteger(0);
private final AtomicInteger globalPartitionCount = new AtomicInteger(0);
private final AtomicInteger offlinePartitionCount = new AtomicInteger(0);
private final AtomicInteger preferredReplicaImbalanceCount = new AtomicInteger(0);
private final AtomicInteger metadataErrorCount = new AtomicInteger(0);
- private final AtomicInteger zkMigrationState = new AtomicInteger(-1);
private Optional uncleanLeaderElectionMeter = Optional.empty();
@@ -121,20 +115,6 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
return metadataErrorCount();
}
}));
- registry.ifPresent(r -> r.newGauge(ZK_MIGRATION_STATE, new Gauge() {
- @Override
- public Integer value() {
- return (int) zkMigrationState();
- }
- }));
-
- registry.ifPresent(r -> r.newGauge(MIGRATING_ZK_BROKER_COUNT, new Gauge() {
- @Override
- public Integer value() {
- return migratingZkBrokerCount();
- }
- }));
-
registry.ifPresent(r -> uncleanLeaderElectionMeter =
Optional.of(registry.get().newMeter(UNCLEAN_LEADER_ELECTIONS_PER_SEC, "elections", TimeUnit.SECONDS)));
}
@@ -162,19 +142,7 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
public int activeBrokerCount() {
return this.activeBrokerCount.get();
}
-
- public void setMigratingZkBrokerCount(int brokerCount) {
- this.migratingZkBrokerCount.set(brokerCount);
- }
-
- public void addToMigratingZkBrokerCount(int brokerCountDelta) {
- this.migratingZkBrokerCount.addAndGet(brokerCountDelta);
- }
-
- public int migratingZkBrokerCount() {
- return this.migratingZkBrokerCount.get();
- }
-
+
public void setGlobalTopicCount(int topicCount) {
this.globalTopicCount.set(topicCount);
}
@@ -230,15 +198,7 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
public int metadataErrorCount() {
return this.metadataErrorCount.get();
}
-
- public void setZkMigrationState(byte migrationStateValue) {
- this.zkMigrationState.set(migrationStateValue);
- }
-
- public byte zkMigrationState() {
- return zkMigrationState.byteValue();
- }
-
+
public void updateUncleanLeaderElection(int count) {
this.uncleanLeaderElectionMeter.ifPresent(m -> m.mark(count));
}
@@ -248,13 +208,11 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
registry.ifPresent(r -> Arrays.asList(
FENCED_BROKER_COUNT,
ACTIVE_BROKER_COUNT,
- MIGRATING_ZK_BROKER_COUNT,
GLOBAL_TOPIC_COUNT,
GLOBAL_PARTITION_COUNT,
OFFLINE_PARTITION_COUNT,
PREFERRED_REPLICA_IMBALANCE_COUNT,
METADATA_ERROR_COUNT,
- ZK_MIGRATION_STATE,
UNCLEAN_LEADER_ELECTIONS_PER_SEC
).forEach(r::removeMetric));
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java
index c4aec110793..565de3c1a9b 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java
@@ -39,10 +39,6 @@ import java.util.Optional;
* behind the latest in-memory state which has not yet been fully persisted to the log. This is
* reasonable for metrics, which don't need up-to-the-millisecond update latency.
*
- * NOTE: the ZK controller has some special rules for calculating preferredReplicaImbalanceCount
- * which we haven't implemented here. Specifically, the ZK controller considers reassigning
- * partitions to always have their preferred leader, even if they don't.
- * All other metrics should be the same, as far as is possible.
*/
public class ControllerMetadataMetricsPublisher implements MetadataPublisher {
private final ControllerMetadataMetrics metrics;
@@ -121,20 +117,15 @@ public class ControllerMetadataMetricsPublisher implements MetadataPublisher {
metrics.setGlobalTopicCount(newImage.topics().topicsById().size());
int fencedBrokers = 0;
int activeBrokers = 0;
- int zkBrokers = 0;
for (BrokerRegistration broker : newImage.cluster().brokers().values()) {
if (broker.fenced()) {
fencedBrokers++;
} else {
activeBrokers++;
}
- if (broker.isMigratingZkBroker()) {
- zkBrokers++;
- }
}
metrics.setFencedBrokerCount(fencedBrokers);
metrics.setActiveBrokerCount(activeBrokers);
- metrics.setMigratingZkBrokerCount(zkBrokers);
int totalPartitions = 0;
int offlinePartitions = 0;
diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java
index a4b30a9f9f3..dc5b5cf5b93 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java
@@ -43,7 +43,6 @@ class ControllerMetricsChanges {
private int fencedBrokersChange = 0;
private int activeBrokersChange = 0;
- private int migratingZkBrokersChange = 0;
private int globalTopicsChange = 0;
private int globalPartitionsChange = 0;
private int offlinePartitionsChange = 0;
@@ -58,10 +57,6 @@ class ControllerMetricsChanges {
return activeBrokersChange;
}
- public int migratingZkBrokersChange() {
- return migratingZkBrokersChange;
- }
-
public int globalTopicsChange() {
return globalTopicsChange;
}
@@ -81,23 +76,18 @@ class ControllerMetricsChanges {
void handleBrokerChange(BrokerRegistration prev, BrokerRegistration next) {
boolean wasFenced = false;
boolean wasActive = false;
- boolean wasZk = false;
if (prev != null) {
wasFenced = prev.fenced();
wasActive = !prev.fenced();
- wasZk = prev.isMigratingZkBroker();
}
boolean isFenced = false;
boolean isActive = false;
- boolean isZk = false;
if (next != null) {
isFenced = next.fenced();
isActive = !next.fenced();
- isZk = next.isMigratingZkBroker();
}
fencedBrokersChange += delta(wasFenced, isFenced);
activeBrokersChange += delta(wasActive, isActive);
- migratingZkBrokersChange += delta(wasZk, isZk);
}
void handleDeletedTopic(TopicImage deletedTopic) {
@@ -157,9 +147,6 @@ class ControllerMetricsChanges {
if (activeBrokersChange != 0) {
metrics.addToActiveBrokerCount(activeBrokersChange);
}
- if (migratingZkBrokersChange != 0) {
- metrics.addToMigratingZkBrokerCount(migratingZkBrokersChange);
- }
if (globalTopicsChange != 0) {
metrics.addToGlobalTopicCount(globalTopicsChange);
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java
index 6ebb582c158..f6dd72edb2f 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java
@@ -43,13 +43,11 @@ public class ControllerMetadataMetricsTest {
new HashSet<>(Arrays.asList(
"kafka.controller:type=KafkaController,name=ActiveBrokerCount",
"kafka.controller:type=KafkaController,name=FencedBrokerCount",
- "kafka.controller:type=KafkaController,name=MigratingZkBrokerCount",
"kafka.controller:type=KafkaController,name=GlobalPartitionCount",
"kafka.controller:type=KafkaController,name=GlobalTopicCount",
"kafka.controller:type=KafkaController,name=MetadataErrorCount",
"kafka.controller:type=KafkaController,name=OfflinePartitionsCount",
"kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount",
- "kafka.controller:type=KafkaController,name=ZkMigrationState",
"kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec"
)));
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java
index 0cfea7b4ce5..80f54daf27b 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java
@@ -60,18 +60,6 @@ public class ControllerMetricsChangesTest {
setInControlledShutdown(false).build();
}
- private static BrokerRegistration zkBrokerRegistration(
- int brokerId
- ) {
- return new BrokerRegistration.Builder().
- setId(brokerId).
- setEpoch(100L).
- setIncarnationId(Uuid.fromString("Pxi6QwS2RFuN8VSKjqJZyQ")).
- setFenced(false).
- setInControlledShutdown(false).
- setIsMigratingZkBroker(true).build();
- }
-
@Test
public void testInitialValues() {
ControllerMetricsChanges changes = new ControllerMetricsChanges();
@@ -115,20 +103,6 @@ public class ControllerMetricsChangesTest {
assertEquals(1, changes.activeBrokersChange());
}
- @Test
- public void testHandleZkBroker() {
- ControllerMetricsChanges changes = new ControllerMetricsChanges();
- changes.handleBrokerChange(null, zkBrokerRegistration(1));
- assertEquals(1, changes.migratingZkBrokersChange());
- changes.handleBrokerChange(null, zkBrokerRegistration(2));
- changes.handleBrokerChange(null, zkBrokerRegistration(3));
- assertEquals(3, changes.migratingZkBrokersChange());
-
- changes.handleBrokerChange(zkBrokerRegistration(3), brokerRegistration(3, true));
- changes.handleBrokerChange(brokerRegistration(3, true), brokerRegistration(3, false));
- assertEquals(2, changes.migratingZkBrokersChange());
- }
-
@Test
public void testHandleDeletedTopic() {
ControllerMetricsChanges changes = new ControllerMetricsChanges();