KAFKA-18598: Remove ControllerMetadataMetrics ZK-related Metrics (#18629)

Reviewers: Christo Lolov <lolovc@amazon.com>
This commit is contained in:
Ken Huang 2025-01-21 23:23:39 +08:00 committed by GitHub
parent 7cbfd22bde
commit 3d49159c84
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 14 additions and 94 deletions

View File

@ -164,5 +164,17 @@
In Kraft mode, Zookeeper is not used, so the metrics is removed.
</p>
</li>
<li>
<p>
Remove the metrics which is monitoring the number of Zookeeper migrations.
</p>
<ul>
<li><code>kafka.controller:type=KafkaController,name=MigratingZkBrokerCount</code></li>
<li><code>kafka.controller:type=KafkaController,name=ZkMigrationState</code></li>
</ul>
<p>
Kafka remove all zookeeper dependencies, so the metrics is removed.
</p>
</li>
</ul>
</div>

View File

@ -42,8 +42,6 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
"KafkaController", "FencedBrokerCount");
private static final MetricName ACTIVE_BROKER_COUNT = getMetricName(
"KafkaController", "ActiveBrokerCount");
private static final MetricName MIGRATING_ZK_BROKER_COUNT = getMetricName(
"KafkaController", "MigratingZkBrokerCount");
private static final MetricName GLOBAL_TOPIC_COUNT = getMetricName(
"KafkaController", "GlobalTopicCount");
private static final MetricName GLOBAL_PARTITION_COUNT = getMetricName(
@ -54,21 +52,17 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
"KafkaController", "PreferredReplicaImbalanceCount");
private static final MetricName METADATA_ERROR_COUNT = getMetricName(
"KafkaController", "MetadataErrorCount");
private static final MetricName ZK_MIGRATION_STATE = getMetricName(
"KafkaController", "ZkMigrationState");
private static final MetricName UNCLEAN_LEADER_ELECTIONS_PER_SEC = getMetricName(
"ControllerStats", "UncleanLeaderElectionsPerSec");
private final Optional<MetricsRegistry> registry;
private final AtomicInteger fencedBrokerCount = new AtomicInteger(0);
private final AtomicInteger activeBrokerCount = new AtomicInteger(0);
private final AtomicInteger migratingZkBrokerCount = new AtomicInteger(0);
private final AtomicInteger globalTopicCount = new AtomicInteger(0);
private final AtomicInteger globalPartitionCount = new AtomicInteger(0);
private final AtomicInteger offlinePartitionCount = new AtomicInteger(0);
private final AtomicInteger preferredReplicaImbalanceCount = new AtomicInteger(0);
private final AtomicInteger metadataErrorCount = new AtomicInteger(0);
private final AtomicInteger zkMigrationState = new AtomicInteger(-1);
private Optional<Meter> uncleanLeaderElectionMeter = Optional.empty();
@ -121,20 +115,6 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
return metadataErrorCount();
}
}));
registry.ifPresent(r -> r.newGauge(ZK_MIGRATION_STATE, new Gauge<Integer>() {
@Override
public Integer value() {
return (int) zkMigrationState();
}
}));
registry.ifPresent(r -> r.newGauge(MIGRATING_ZK_BROKER_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return migratingZkBrokerCount();
}
}));
registry.ifPresent(r -> uncleanLeaderElectionMeter =
Optional.of(registry.get().newMeter(UNCLEAN_LEADER_ELECTIONS_PER_SEC, "elections", TimeUnit.SECONDS)));
}
@ -162,19 +142,7 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
public int activeBrokerCount() {
return this.activeBrokerCount.get();
}
public void setMigratingZkBrokerCount(int brokerCount) {
this.migratingZkBrokerCount.set(brokerCount);
}
public void addToMigratingZkBrokerCount(int brokerCountDelta) {
this.migratingZkBrokerCount.addAndGet(brokerCountDelta);
}
public int migratingZkBrokerCount() {
return this.migratingZkBrokerCount.get();
}
public void setGlobalTopicCount(int topicCount) {
this.globalTopicCount.set(topicCount);
}
@ -230,15 +198,7 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
public int metadataErrorCount() {
return this.metadataErrorCount.get();
}
public void setZkMigrationState(byte migrationStateValue) {
this.zkMigrationState.set(migrationStateValue);
}
public byte zkMigrationState() {
return zkMigrationState.byteValue();
}
public void updateUncleanLeaderElection(int count) {
this.uncleanLeaderElectionMeter.ifPresent(m -> m.mark(count));
}
@ -248,13 +208,11 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
registry.ifPresent(r -> Arrays.asList(
FENCED_BROKER_COUNT,
ACTIVE_BROKER_COUNT,
MIGRATING_ZK_BROKER_COUNT,
GLOBAL_TOPIC_COUNT,
GLOBAL_PARTITION_COUNT,
OFFLINE_PARTITION_COUNT,
PREFERRED_REPLICA_IMBALANCE_COUNT,
METADATA_ERROR_COUNT,
ZK_MIGRATION_STATE,
UNCLEAN_LEADER_ELECTIONS_PER_SEC
).forEach(r::removeMetric));
}

View File

@ -39,10 +39,6 @@ import java.util.Optional;
* behind the latest in-memory state which has not yet been fully persisted to the log. This is
* reasonable for metrics, which don't need up-to-the-millisecond update latency.
*
* NOTE: the ZK controller has some special rules for calculating preferredReplicaImbalanceCount
* which we haven't implemented here. Specifically, the ZK controller considers reassigning
* partitions to always have their preferred leader, even if they don't.
* All other metrics should be the same, as far as is possible.
*/
public class ControllerMetadataMetricsPublisher implements MetadataPublisher {
private final ControllerMetadataMetrics metrics;
@ -121,20 +117,15 @@ public class ControllerMetadataMetricsPublisher implements MetadataPublisher {
metrics.setGlobalTopicCount(newImage.topics().topicsById().size());
int fencedBrokers = 0;
int activeBrokers = 0;
int zkBrokers = 0;
for (BrokerRegistration broker : newImage.cluster().brokers().values()) {
if (broker.fenced()) {
fencedBrokers++;
} else {
activeBrokers++;
}
if (broker.isMigratingZkBroker()) {
zkBrokers++;
}
}
metrics.setFencedBrokerCount(fencedBrokers);
metrics.setActiveBrokerCount(activeBrokers);
metrics.setMigratingZkBrokerCount(zkBrokers);
int totalPartitions = 0;
int offlinePartitions = 0;

View File

@ -43,7 +43,6 @@ class ControllerMetricsChanges {
private int fencedBrokersChange = 0;
private int activeBrokersChange = 0;
private int migratingZkBrokersChange = 0;
private int globalTopicsChange = 0;
private int globalPartitionsChange = 0;
private int offlinePartitionsChange = 0;
@ -58,10 +57,6 @@ class ControllerMetricsChanges {
return activeBrokersChange;
}
public int migratingZkBrokersChange() {
return migratingZkBrokersChange;
}
public int globalTopicsChange() {
return globalTopicsChange;
}
@ -81,23 +76,18 @@ class ControllerMetricsChanges {
void handleBrokerChange(BrokerRegistration prev, BrokerRegistration next) {
boolean wasFenced = false;
boolean wasActive = false;
boolean wasZk = false;
if (prev != null) {
wasFenced = prev.fenced();
wasActive = !prev.fenced();
wasZk = prev.isMigratingZkBroker();
}
boolean isFenced = false;
boolean isActive = false;
boolean isZk = false;
if (next != null) {
isFenced = next.fenced();
isActive = !next.fenced();
isZk = next.isMigratingZkBroker();
}
fencedBrokersChange += delta(wasFenced, isFenced);
activeBrokersChange += delta(wasActive, isActive);
migratingZkBrokersChange += delta(wasZk, isZk);
}
void handleDeletedTopic(TopicImage deletedTopic) {
@ -157,9 +147,6 @@ class ControllerMetricsChanges {
if (activeBrokersChange != 0) {
metrics.addToActiveBrokerCount(activeBrokersChange);
}
if (migratingZkBrokersChange != 0) {
metrics.addToMigratingZkBrokerCount(migratingZkBrokersChange);
}
if (globalTopicsChange != 0) {
metrics.addToGlobalTopicCount(globalTopicsChange);
}

View File

@ -43,13 +43,11 @@ public class ControllerMetadataMetricsTest {
new HashSet<>(Arrays.asList(
"kafka.controller:type=KafkaController,name=ActiveBrokerCount",
"kafka.controller:type=KafkaController,name=FencedBrokerCount",
"kafka.controller:type=KafkaController,name=MigratingZkBrokerCount",
"kafka.controller:type=KafkaController,name=GlobalPartitionCount",
"kafka.controller:type=KafkaController,name=GlobalTopicCount",
"kafka.controller:type=KafkaController,name=MetadataErrorCount",
"kafka.controller:type=KafkaController,name=OfflinePartitionsCount",
"kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount",
"kafka.controller:type=KafkaController,name=ZkMigrationState",
"kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec"
)));
}

View File

@ -60,18 +60,6 @@ public class ControllerMetricsChangesTest {
setInControlledShutdown(false).build();
}
private static BrokerRegistration zkBrokerRegistration(
int brokerId
) {
return new BrokerRegistration.Builder().
setId(brokerId).
setEpoch(100L).
setIncarnationId(Uuid.fromString("Pxi6QwS2RFuN8VSKjqJZyQ")).
setFenced(false).
setInControlledShutdown(false).
setIsMigratingZkBroker(true).build();
}
@Test
public void testInitialValues() {
ControllerMetricsChanges changes = new ControllerMetricsChanges();
@ -115,20 +103,6 @@ public class ControllerMetricsChangesTest {
assertEquals(1, changes.activeBrokersChange());
}
@Test
public void testHandleZkBroker() {
ControllerMetricsChanges changes = new ControllerMetricsChanges();
changes.handleBrokerChange(null, zkBrokerRegistration(1));
assertEquals(1, changes.migratingZkBrokersChange());
changes.handleBrokerChange(null, zkBrokerRegistration(2));
changes.handleBrokerChange(null, zkBrokerRegistration(3));
assertEquals(3, changes.migratingZkBrokersChange());
changes.handleBrokerChange(zkBrokerRegistration(3), brokerRegistration(3, true));
changes.handleBrokerChange(brokerRegistration(3, true), brokerRegistration(3, false));
assertEquals(2, changes.migratingZkBrokersChange());
}
@Test
public void testHandleDeletedTopic() {
ControllerMetricsChanges changes = new ControllerMetricsChanges();