diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index d0599f3d7a3..bf2d339da8c 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -143,9 +143,15 @@
+
+
+
+
+
+
diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
index ec48cbc57a6..5f72a109736 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
-import org.apache.kafka.timeline.TimelineInteger;
import java.util.Arrays;
import java.util.Collections;
@@ -105,12 +104,9 @@ public class BrokersToIsrs {
*/
private final TimelineHashMap> isrMembers;
- private final TimelineInteger offlinePartitionCount;
-
BrokersToIsrs(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
this.isrMembers = new TimelineHashMap<>(snapshotRegistry, 0);
- this.offlinePartitionCount = new TimelineInteger(snapshotRegistry);
}
/**
@@ -131,9 +127,6 @@ public class BrokersToIsrs {
} else {
if (prevLeader == NO_LEADER) {
prev = Replicas.copyWith(prevIsr, NO_LEADER);
- if (nextLeader != NO_LEADER) {
- offlinePartitionCount.decrement();
- }
} else {
prev = Replicas.clone(prevIsr);
}
@@ -145,9 +138,6 @@ public class BrokersToIsrs {
} else {
if (nextLeader == NO_LEADER) {
next = Replicas.copyWith(nextIsr, NO_LEADER);
- if (prevLeader != NO_LEADER) {
- offlinePartitionCount.increment();
- }
} else {
next = Replicas.clone(nextIsr);
}
@@ -191,9 +181,6 @@ public class BrokersToIsrs {
void removeTopicEntryForBroker(Uuid topicId, int brokerId) {
Map topicMap = isrMembers.get(brokerId);
if (topicMap != null) {
- if (brokerId == NO_LEADER && topicMap.containsKey(topicId)) {
- offlinePartitionCount.set(offlinePartitionCount.get() - topicMap.get(topicId).length);
- }
topicMap.remove(topicId);
}
}
@@ -303,8 +290,4 @@ public class BrokersToIsrs {
boolean hasLeaderships(int brokerId) {
return iterator(brokerId, true).hasNext();
}
-
- int offlinePartitionCount() {
- return offlinePartitionCount.get();
- }
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 5bc5bd2c3af..920e7ef0c65 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -86,7 +86,6 @@ public class ClusterControlManager {
private SnapshotRegistry snapshotRegistry = null;
private long sessionTimeoutNs = DEFAULT_SESSION_TIMEOUT_NS;
private ReplicaPlacer replicaPlacer = null;
- private ControllerMetrics controllerMetrics = null;
private FeatureControlManager featureControl = null;
Builder setLogContext(LogContext logContext) {
@@ -119,11 +118,6 @@ public class ClusterControlManager {
return this;
}
- Builder setControllerMetrics(ControllerMetrics controllerMetrics) {
- this.controllerMetrics = controllerMetrics;
- return this;
- }
-
Builder setFeatureControlManager(FeatureControlManager featureControl) {
this.featureControl = featureControl;
return this;
@@ -142,9 +136,6 @@ public class ClusterControlManager {
if (replicaPlacer == null) {
replicaPlacer = new StripedReplicaPlacer(new Random());
}
- if (controllerMetrics == null) {
- throw new RuntimeException("You must specify ControllerMetrics");
- }
if (featureControl == null) {
throw new RuntimeException("You must specify FeatureControlManager");
}
@@ -154,7 +145,6 @@ public class ClusterControlManager {
snapshotRegistry,
sessionTimeoutNs,
replicaPlacer,
- controllerMetrics,
featureControl
);
}
@@ -226,11 +216,6 @@ public class ClusterControlManager {
*/
private final TimelineHashMap registerBrokerRecordOffsets;
- /**
- * A reference to the controller's metrics registry.
- */
- private final ControllerMetrics controllerMetrics;
-
/**
* The broker heartbeat manager, or null if this controller is on standby.
*/
@@ -254,7 +239,6 @@ public class ClusterControlManager {
SnapshotRegistry snapshotRegistry,
long sessionTimeoutNs,
ReplicaPlacer replicaPlacer,
- ControllerMetrics metrics,
FeatureControlManager featureControl
) {
this.logContext = logContext;
@@ -267,7 +251,6 @@ public class ClusterControlManager {
this.registerBrokerRecordOffsets = new TimelineHashMap<>(snapshotRegistry, 0);
this.heartbeatManager = null;
this.readyBrokersFuture = Optional.empty();
- this.controllerMetrics = metrics;
this.featureControl = featureControl;
}
@@ -397,8 +380,9 @@ public class ClusterControlManager {
}
public OptionalLong registerBrokerRecordOffset(int brokerId) {
- if (registerBrokerRecordOffsets.containsKey(brokerId)) {
- return OptionalLong.of(registerBrokerRecordOffsets.get(brokerId));
+ Long registrationOffset = registerBrokerRecordOffsets.get(brokerId);
+ if (registrationOffset != null) {
+ return OptionalLong.of(registrationOffset);
}
return OptionalLong.empty();
}
@@ -424,7 +408,6 @@ public class ClusterControlManager {
record.incarnationId(), listeners, features,
Optional.ofNullable(record.rack()), record.fenced(),
record.inControlledShutdown()));
- updateMetrics(prevRegistration, brokerRegistrations.get(brokerId));
if (heartbeatManager != null) {
if (prevRegistration != null) heartbeatManager.remove(brokerId);
heartbeatManager.register(brokerId, record.fenced());
@@ -451,7 +434,6 @@ public class ClusterControlManager {
} else {
if (heartbeatManager != null) heartbeatManager.remove(brokerId);
brokerRegistrations.remove(brokerId);
- updateMetrics(registration, brokerRegistrations.get(brokerId));
log.info("Unregistered broker: {}", record);
}
}
@@ -480,11 +462,11 @@ public class ClusterControlManager {
BrokerRegistrationFencingChange fencingChange =
BrokerRegistrationFencingChange.fromValue(record.fenced()).orElseThrow(
() -> new IllegalStateException(String.format("Unable to replay %s: unknown " +
- "value for fenced field: %d", record, record.fenced())));
+ "value for fenced field: %x", record, record.fenced())));
BrokerRegistrationInControlledShutdownChange inControlledShutdownChange =
BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).orElseThrow(
() -> new IllegalStateException(String.format("Unable to replay %s: unknown " +
- "value for inControlledShutdown field: %d", record, record.inControlledShutdown())));
+ "value for inControlledShutdown field: %x", record, record.inControlledShutdown())));
replayRegistrationChange(
record,
record.brokerId(),
@@ -515,7 +497,6 @@ public class ClusterControlManager {
);
if (!curRegistration.equals(nextRegistration)) {
brokerRegistrations.put(brokerId, nextRegistration);
- updateMetrics(curRegistration, nextRegistration);
} else {
log.info("Ignoring no-op registration change for {}", curRegistration);
}
@@ -529,35 +510,6 @@ public class ClusterControlManager {
}
}
- private void updateMetrics(BrokerRegistration prevRegistration, BrokerRegistration registration) {
- if (registration == null) {
- if (prevRegistration.fenced()) {
- controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() - 1);
- } else {
- controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() - 1);
- }
- log.info("Removed broker: {}", prevRegistration.id());
- } else if (prevRegistration == null) {
- if (registration.fenced()) {
- controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() + 1);
- log.info("Added new fenced broker: {}", registration.id());
- } else {
- controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() + 1);
- log.info("Added new unfenced broker: {}", registration.id());
- }
- } else {
- if (prevRegistration.fenced() && !registration.fenced()) {
- controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() - 1);
- controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() + 1);
- log.info("Unfenced broker: {}", registration.id());
- } else if (!prevRegistration.fenced() && registration.fenced()) {
- controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() + 1);
- controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() - 1);
- log.info("Fenced broker: {}", registration.id());
- }
- }
- }
-
Iterator usableBrokers() {
if (heartbeatManager == null) {
throw new RuntimeException("ClusterControlManager is not active.");
@@ -570,7 +522,7 @@ public class ClusterControlManager {
* Returns true if the broker is unfenced; Returns false if it is
* not or if it does not exist.
*/
- public boolean unfenced(int brokerId) {
+ public boolean isUnfenced(int brokerId) {
BrokerRegistration registration = brokerRegistrations.get(brokerId);
if (registration == null) return false;
return !registration.fenced();
@@ -600,7 +552,7 @@ public class ClusterControlManager {
* Returns true if the broker is active. Active means not fenced nor in controlled
* shutdown; Returns false if it is not active or if it does not exist.
*/
- public boolean active(int brokerId) {
+ public boolean isActive(int brokerId) {
BrokerRegistration registration = brokerRegistrations.get(brokerId);
if (registration == null) return false;
return !registration.inControlledShutdown() && !registration.fenced();
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
index ff243aebfcb..baab5854004 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
@@ -35,9 +35,9 @@ public interface ControllerMetrics extends AutoCloseable {
int activeBrokerCount();
- void setGlobalTopicsCount(int topicCount);
+ void setGlobalTopicCount(int topicCount);
- int globalTopicsCount();
+ int globalTopicCount();
void setGlobalPartitionCount(int partitionCount);
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java
new file mode 100644
index 00000000000..d034f42cdd4
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * Type for updating controller metrics based on metadata records.
+ */
+final class ControllerMetricsManager {
+ private final static class PartitionState {
+ final int leader;
+ final int preferredReplica;
+
+ PartitionState(int leader, int preferredReplica) {
+ this.leader = leader;
+ this.preferredReplica = preferredReplica;
+ }
+
+ int leader() {
+ return leader;
+ }
+
+ int preferredReplica() {
+ return preferredReplica;
+ }
+ }
+
+ private final Set registeredBrokers = new HashSet<>();
+
+ private final Set fencedBrokers = new HashSet<>();
+
+ private int topicCount = 0;
+
+ private final Map topicPartitions = new HashMap<>();
+
+ private final Set offlineTopicPartitions = new HashSet<>();
+
+ private final Set imbalancedTopicPartitions = new HashSet<>();
+
+ private final ControllerMetrics controllerMetrics;
+
+ ControllerMetricsManager(ControllerMetrics controllerMetrics) {
+ this.controllerMetrics = controllerMetrics;
+ }
+
+ void replayBatch(long baseOffset, List messages) {
+ int i = 1;
+ for (ApiMessageAndVersion message : messages) {
+ try {
+ replay(message.message());
+ } catch (Exception e) {
+ String failureMessage = String.format(
+ "Unable to update controller metrics for %s record, it was %d of %d record(s) " +
+ "in the batch with baseOffset %d.",
+ message.message().getClass().getSimpleName(),
+ i,
+ messages.size(),
+ baseOffset
+ );
+ throw new IllegalArgumentException(failureMessage, e);
+ }
+ i++;
+ }
+ }
+
+ /**
+ * Update controller metrics by replaying a metadata record.
+ *
+ * This method assumes that the provided ApiMessage is one of the type covered by MetadataRecordType.
+ *
+ * @param message a metadata record
+ */
+ @SuppressWarnings("checkstyle:cyclomaticComplexity")
+ void replay(ApiMessage message) {
+ MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
+ switch (type) {
+ case REGISTER_BROKER_RECORD:
+ replay((RegisterBrokerRecord) message);
+ break;
+ case UNREGISTER_BROKER_RECORD:
+ replay((UnregisterBrokerRecord) message);
+ break;
+ case FENCE_BROKER_RECORD:
+ replay((FenceBrokerRecord) message);
+ break;
+ case UNFENCE_BROKER_RECORD:
+ replay((UnfenceBrokerRecord) message);
+ break;
+ case BROKER_REGISTRATION_CHANGE_RECORD:
+ replay((BrokerRegistrationChangeRecord) message);
+ break;
+ case TOPIC_RECORD:
+ replay((TopicRecord) message);
+ break;
+ case PARTITION_RECORD:
+ replay((PartitionRecord) message);
+ break;
+ case PARTITION_CHANGE_RECORD:
+ replay((PartitionChangeRecord) message);
+ break;
+ case REMOVE_TOPIC_RECORD:
+ replay((RemoveTopicRecord) message);
+ break;
+ case CONFIG_RECORD:
+ case FEATURE_LEVEL_RECORD:
+ case CLIENT_QUOTA_RECORD:
+ case PRODUCER_IDS_RECORD:
+ case ACCESS_CONTROL_ENTRY_RECORD:
+ case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
+ case NO_OP_RECORD:
+ // These record types do not affect metrics
+ break;
+ default:
+ throw new RuntimeException("Unhandled record type " + type);
+ }
+ }
+
+ private void replay(RegisterBrokerRecord record) {
+ Integer brokerId = record.brokerId();
+ registeredBrokers.add(brokerId);
+ if (record.fenced()) {
+ fencedBrokers.add(brokerId);
+ } else {
+ fencedBrokers.remove(brokerId);
+ }
+
+ updateBrokerStateMetrics();
+ }
+
+ private void replay(UnregisterBrokerRecord record) {
+ Integer brokerId = record.brokerId();
+ registeredBrokers.remove(brokerId);
+ fencedBrokers.remove(brokerId);
+
+ updateBrokerStateMetrics();
+ }
+
+ private void replay(FenceBrokerRecord record) {
+ handleFencingChange(record.id(), BrokerRegistrationFencingChange.FENCE);
+ }
+
+ private void replay(UnfenceBrokerRecord record) {
+ handleFencingChange(record.id(), BrokerRegistrationFencingChange.UNFENCE);
+ }
+
+ private void replay(BrokerRegistrationChangeRecord record) {
+ BrokerRegistrationFencingChange fencingChange = BrokerRegistrationFencingChange
+ .fromValue(record.fenced())
+ .orElseThrow(() -> {
+ return new IllegalArgumentException(
+ String.format(
+ "Registration change record for %d has unknown value for fenced field: %x",
+ record.brokerId(),
+ record.fenced()
+ )
+ );
+ });
+
+ handleFencingChange(record.brokerId(), fencingChange);
+ }
+
+ private void handleFencingChange(Integer brokerId, BrokerRegistrationFencingChange fencingChange) {
+ if (!registeredBrokers.contains(brokerId)) {
+ throw new IllegalArgumentException(String.format("Broker with id %s is not registered", brokerId));
+ }
+
+ if (fencingChange == BrokerRegistrationFencingChange.FENCE) {
+ fencedBrokers.add(brokerId);
+ updateBrokerStateMetrics();
+ } else if (fencingChange == BrokerRegistrationFencingChange.UNFENCE) {
+ fencedBrokers.remove(brokerId);
+ updateBrokerStateMetrics();
+ } else {
+ // The fencingChange value is NONE. In this case the controller doesn't need to update the broker
+ // state metrics.
+ }
+ }
+
+ private void updateBrokerStateMetrics() {
+ controllerMetrics.setFencedBrokerCount(fencedBrokers.size());
+
+ Set activeBrokers = new HashSet<>(registeredBrokers);
+ activeBrokers.removeAll(fencedBrokers);
+ controllerMetrics.setActiveBrokerCount(activeBrokers.size());
+ }
+
+ private void replay(TopicRecord record) {
+ topicCount++;
+
+ controllerMetrics.setGlobalTopicCount(topicCount);
+ }
+
+ private void replay(PartitionRecord record) {
+ TopicIdPartition tp = new TopicIdPartition(record.topicId(), record.partitionId());
+
+ PartitionState partitionState = new PartitionState(record.leader(), record.replicas().get(0));
+ topicPartitions.put(tp, partitionState);
+
+ updateBasedOnPartitionState(tp, partitionState);
+
+ updateTopicAndPartitionMetrics();
+ }
+
+ private void replay(PartitionChangeRecord record) {
+ TopicIdPartition tp = new TopicIdPartition(record.topicId(), record.partitionId());
+ if (!topicPartitions.containsKey(tp)) {
+ throw new IllegalArgumentException(String.format("Unknown topic partitions %s", tp));
+ }
+
+ PartitionState partitionState = topicPartitions.computeIfPresent(
+ tp,
+ (key, oldValue) -> {
+ PartitionState newValue = oldValue;
+ // Update replicas
+ if (record.replicas() != null) {
+ newValue = new PartitionState(newValue.leader(), record.replicas().get(0));
+ }
+
+ if (record.leader() != NO_LEADER_CHANGE) {
+ newValue = new PartitionState(record.leader(), newValue.preferredReplica());
+ }
+
+ return newValue;
+ }
+ );
+
+ updateBasedOnPartitionState(tp, partitionState);
+
+ updateTopicAndPartitionMetrics();
+ }
+
+ private void replay(RemoveTopicRecord record) {
+ Uuid topicId = record.topicId();
+ Predicate matchesTopic = tp -> tp.topicId() == topicId;
+
+ topicCount--;
+ topicPartitions.keySet().removeIf(matchesTopic);
+ offlineTopicPartitions.removeIf(matchesTopic);
+ imbalancedTopicPartitions.removeIf(matchesTopic);
+
+ updateTopicAndPartitionMetrics();
+ }
+
+ private void updateBasedOnPartitionState(TopicIdPartition tp, PartitionState partitionState) {
+ if (partitionState.leader() == NO_LEADER) {
+ offlineTopicPartitions.add(tp);
+ } else {
+ offlineTopicPartitions.remove(tp);
+ }
+
+ if (partitionState.leader() == partitionState.preferredReplica()) {
+ imbalancedTopicPartitions.remove(tp);
+ } else {
+ imbalancedTopicPartitions.add(tp);
+ }
+ }
+
+ private void updateTopicAndPartitionMetrics() {
+ controllerMetrics.setGlobalTopicCount(topicCount);
+ controllerMetrics.setGlobalPartitionCount(topicPartitions.size());
+ controllerMetrics.setOfflinePartitionCount(offlineTopicPartitions.size());
+ controllerMetrics.setPreferredReplicaImbalanceCount(imbalancedTopicPartitions.size());
+ }
+
+ /**
+ * Resets the value of all of the metrics.
+ *
+ * Resets all of the state tracked by this type and resets all of the related controller metrics.
+ */
+ void reset() {
+ registeredBrokers.clear();
+ fencedBrokers.clear();
+ topicCount = 0;
+ topicPartitions.clear();
+ offlineTopicPartitions.clear();
+ imbalancedTopicPartitions.clear();
+
+ updateBrokerStateMetrics();
+ updateTopicAndPartitionMetrics();
+ }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
index cdd6d4416f1..e35bf7ef62b 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
@@ -30,7 +30,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
-import java.util.function.Function;
+import java.util.function.IntPredicate;
import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
@@ -73,7 +73,7 @@ public class PartitionChangeBuilder {
private final PartitionRegistration partition;
private final Uuid topicId;
private final int partitionId;
- private final Function isAcceptableLeader;
+ private final IntPredicate isAcceptableLeader;
private final boolean isLeaderRecoverySupported;
private List targetIsr;
private List targetReplicas;
@@ -85,7 +85,7 @@ public class PartitionChangeBuilder {
public PartitionChangeBuilder(PartitionRegistration partition,
Uuid topicId,
int partitionId,
- Function isAcceptableLeader,
+ IntPredicate isAcceptableLeader,
boolean isLeaderRecoverySupported) {
this.partition = partition;
this.topicId = topicId;
@@ -198,7 +198,7 @@ public class PartitionChangeBuilder {
if (election == Election.UNCLEAN) {
// Attempt unclean leader election
Optional uncleanLeader = targetReplicas.stream()
- .filter(replica -> isAcceptableLeader.apply(replica))
+ .filter(replica -> isAcceptableLeader.test(replica))
.findFirst();
if (uncleanLeader.isPresent()) {
return new ElectionResult(uncleanLeader.get(), true);
@@ -209,7 +209,7 @@ public class PartitionChangeBuilder {
}
private boolean isValidNewLeader(int replica) {
- return targetIsr.contains(replica) && isAcceptableLeader.apply(replica);
+ return targetIsr.contains(replica) && isAcceptableLeader.test(replica);
}
private void tryElection(PartitionChangeRecord record) {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index ef758c71598..55e80687219 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -979,6 +979,9 @@ public final class QuorumController implements Controller {
i++;
}
}
+
+ controllerMetricsManager.replayBatch(batch.baseOffset(), messages);
+
updateLastCommittedState(
offset,
epoch,
@@ -1028,6 +1031,7 @@ public final class QuorumController implements Controller {
for (ApiMessageAndVersion message : messages) {
try {
replay(message.message(), Optional.of(reader.snapshotId()), reader.lastContainedLogOffset());
+ controllerMetricsManager.replay(message.message());
} catch (Throwable e) {
String failureMessage = String.format("Unable to apply %s record " +
"from snapshot %s on standby controller, which was %d of " +
@@ -1039,6 +1043,7 @@ public final class QuorumController implements Controller {
i++;
}
}
+
updateLastCommittedState(
reader.lastContainedLogOffset(),
reader.lastContainedLogEpoch(),
@@ -1482,6 +1487,7 @@ public final class QuorumController implements Controller {
private void resetToEmptyState() {
snapshotGeneratorManager.cancel();
snapshotRegistry.reset();
+ controllerMetricsManager.reset();
updateLastCommittedState(-1, -1, -1, 0);
}
@@ -1527,6 +1533,12 @@ public final class QuorumController implements Controller {
*/
private final ControllerMetrics controllerMetrics;
+
+ /**
+ * Manager for updating controller metrics based on the committed metadata.
+ */
+ private final ControllerMetricsManager controllerMetricsManager;
+
/**
* A registry for snapshot data. This must be accessed only by the event queue thread.
*/
@@ -1731,6 +1743,7 @@ public final class QuorumController implements Controller {
this.queue = queue;
this.time = time;
this.controllerMetrics = controllerMetrics;
+ this.controllerMetricsManager = new ControllerMetricsManager(controllerMetrics);
this.snapshotRegistry = new SnapshotRegistry(logContext);
this.purgatory = new ControllerPurgatory();
this.resourceExists = new ConfigResourceExistenceChecker();
@@ -1763,7 +1776,6 @@ public final class QuorumController implements Controller {
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(sessionTimeoutNs).
setReplicaPlacer(replicaPlacer).
- setControllerMetrics(controllerMetrics).
setFeatureControlManager(featureControl).
build();
this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
@@ -1778,7 +1790,6 @@ public final class QuorumController implements Controller {
setMaxElectionsPerImbalance(ReplicationControlManager.MAX_ELECTIONS_PER_IMBALANCE).
setConfigurationControl(configurationControl).
setClusterControl(clusterControl).
- setControllerMetrics(controllerMetrics).
setCreateTopicPolicy(createTopicPolicy).
setFeatureControl(featureControl).
build();
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
index b96a687b0f3..00413c4bd53 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
@@ -49,7 +49,7 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT = getMetricName(
"KafkaController", "PreferredReplicaImbalanceCount");
private final static MetricName METADATA_ERROR_COUNT = getMetricName(
- "KafkaController", "MetadataErrorCount");
+ "KafkaController", "MetadataErrorCount");
private final static MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
"KafkaController", "LastAppliedRecordOffset");
private final static MetricName LAST_COMMITTED_RECORD_OFFSET = getMetricName(
@@ -67,7 +67,7 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
private volatile int globalPartitionCount;
private volatile int offlinePartitionCount;
private volatile int preferredReplicaImbalanceCount;
- private volatile AtomicInteger metadataErrorCount;
+ private final AtomicInteger metadataErrorCount;
private final AtomicLong lastAppliedRecordOffset = new AtomicLong(0);
private final AtomicLong lastCommittedRecordOffset = new AtomicLong(0);
private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
@@ -215,12 +215,12 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
}
@Override
- public void setGlobalTopicsCount(int topicCount) {
+ public void setGlobalTopicCount(int topicCount) {
this.globalTopicCount = topicCount;
}
@Override
- public int globalTopicsCount() {
+ public int globalTopicCount() {
return this.globalTopicCount;
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 1fc713c207f..b84be096c99 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -92,7 +92,6 @@ import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
-import org.apache.kafka.timeline.TimelineInteger;
import org.slf4j.Logger;
import java.util.AbstractMap.SimpleImmutableEntry;
@@ -110,7 +109,7 @@ import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
-import java.util.function.Function;
+import java.util.function.IntPredicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -151,7 +150,6 @@ public class ReplicationControlManager {
private int maxElectionsPerImbalance = MAX_ELECTIONS_PER_IMBALANCE;
private ConfigurationControlManager configurationControl = null;
private ClusterControlManager clusterControl = null;
- private ControllerMetrics controllerMetrics = null;
private Optional createTopicPolicy = Optional.empty();
private FeatureControlManager featureControl = null;
@@ -190,11 +188,6 @@ public class ReplicationControlManager {
return this;
}
- Builder setControllerMetrics(ControllerMetrics controllerMetrics) {
- this.controllerMetrics = controllerMetrics;
- return this;
- }
-
Builder setCreateTopicPolicy(Optional createTopicPolicy) {
this.createTopicPolicy = createTopicPolicy;
return this;
@@ -210,8 +203,6 @@ public class ReplicationControlManager {
throw new IllegalStateException("Configuration control must be set before building");
} else if (clusterControl == null) {
throw new IllegalStateException("Cluster controller must be set before building");
- } else if (controllerMetrics == null) {
- throw new IllegalStateException("Metrics must be set before building");
}
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = configurationControl.snapshotRegistry();
@@ -232,7 +223,6 @@ public class ReplicationControlManager {
maxElectionsPerImbalance,
configurationControl,
clusterControl,
- controllerMetrics,
createTopicPolicy,
featureControl);
}
@@ -294,11 +284,6 @@ public class ReplicationControlManager {
*/
private final int maxElectionsPerImbalance;
- /**
- * A count of the total number of partitions in the cluster.
- */
- private final TimelineInteger globalPartitionCount;
-
/**
* A reference to the controller's configuration control manager.
*/
@@ -309,11 +294,6 @@ public class ReplicationControlManager {
*/
private final ClusterControlManager clusterControl;
- /**
- * A reference to the controller's metrics registry.
- */
- private final ControllerMetrics controllerMetrics;
-
/**
* The policy to use to validate that topic assignments are valid, if one is present.
*/
@@ -379,7 +359,6 @@ public class ReplicationControlManager {
int maxElectionsPerImbalance,
ConfigurationControlManager configurationControl,
ClusterControlManager clusterControl,
- ControllerMetrics controllerMetrics,
Optional createTopicPolicy,
FeatureControlManager featureControl
) {
@@ -389,11 +368,9 @@ public class ReplicationControlManager {
this.defaultNumPartitions = defaultNumPartitions;
this.maxElectionsPerImbalance = maxElectionsPerImbalance;
this.configurationControl = configurationControl;
- this.controllerMetrics = controllerMetrics;
this.createTopicPolicy = createTopicPolicy;
this.featureControl = featureControl;
this.clusterControl = clusterControl;
- this.globalPartitionCount = new TimelineInteger(snapshotRegistry);
this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0);
this.topicsWithCollisionChars = new TimelineHashMap<>(snapshotRegistry, 0);
this.topics = new TimelineHashMap<>(snapshotRegistry, 0);
@@ -415,7 +392,6 @@ public class ReplicationControlManager {
}
topics.put(record.topicId(),
new TopicControlInfo(record.name(), snapshotRegistry, record.topicId()));
- controllerMetrics.setGlobalTopicsCount(topics.size());
log.info("Created topic {} with topic ID {}.", record.name(), record.topicId());
}
@@ -434,8 +410,6 @@ public class ReplicationControlManager {
topicInfo.parts.put(record.partitionId(), newPartInfo);
brokersToIsrs.update(record.topicId(), record.partitionId(), null,
newPartInfo.isr, NO_LEADER, newPartInfo.leader);
- globalPartitionCount.increment();
- controllerMetrics.setGlobalPartitionCount(globalPartitionCount.get());
updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(),
false, newPartInfo.isReassigning());
} else if (!newPartInfo.equals(prevPartInfo)) {
@@ -452,9 +426,6 @@ public class ReplicationControlManager {
} else {
imbalancedPartitions.add(new TopicIdPartition(record.topicId(), record.partitionId()));
}
-
- controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount());
- controllerMetrics.setPreferredReplicaImbalanceCount(imbalancedPartitions.size());
}
private void updateReassigningTopicsIfNeeded(Uuid topicId, int partitionId,
@@ -503,9 +474,6 @@ public class ReplicationControlManager {
imbalancedPartitions.add(new TopicIdPartition(record.topicId(), record.partitionId()));
}
- controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount());
- controllerMetrics.setPreferredReplicaImbalanceCount(imbalancedPartitions.size());
-
if (record.removingReplicas() != null || record.addingReplicas() != null) {
log.info("Replayed partition assignment change {} for topic {}", record, topicInfo.name);
} else if (log.isTraceEnabled()) {
@@ -546,15 +514,9 @@ public class ReplicationControlManager {
}
imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), partitionId));
-
- globalPartitionCount.decrement();
}
brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER);
- controllerMetrics.setGlobalTopicsCount(topics.size());
- controllerMetrics.setGlobalPartitionCount(globalPartitionCount.get());
- controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount());
- controllerMetrics.setPreferredReplicaImbalanceCount(imbalancedPartitions.size());
log.info("Removed topic {} with ID {}.", topic.name, record.topicId());
}
@@ -657,7 +619,7 @@ public class ReplicationControlManager {
validateManualPartitionAssignment(assignment.brokerIds(), replicationFactor);
replicationFactor = OptionalInt.of(assignment.brokerIds().size());
List isr = assignment.brokerIds().stream().
- filter(clusterControl::active).collect(Collectors.toList());
+ filter(clusterControl::isActive).collect(Collectors.toList());
if (isr.isEmpty()) {
return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
"All brokers specified in the manual partition assignment for " +
@@ -701,7 +663,7 @@ public class ReplicationControlManager {
for (int partitionId = 0; partitionId < partitions.size(); partitionId++) {
List replicas = partitions.get(partitionId);
List isr = replicas.stream().
- filter(clusterControl::active).collect(Collectors.toList());
+ filter(clusterControl::isActive).collect(Collectors.toList());
// If the ISR is empty, it means that all brokers are fenced or
// in controlled shutdown. To be consistent with the replica placer,
// we reject the create topic request with INVALID_REPLICATION_FACTOR.
@@ -980,7 +942,7 @@ public class ReplicationControlManager {
partition,
topic.id,
partitionId,
- clusterControl::active,
+ clusterControl::isActive,
featureControl.metadataVersion().isLeaderRecoverySupported());
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
@@ -1356,7 +1318,7 @@ public class ReplicationControlManager {
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
topicId,
partitionId,
- clusterControl::active,
+ clusterControl::isActive,
featureControl.metadataVersion().isLeaderRecoverySupported());
builder.setElection(election);
Optional record = builder.build();
@@ -1471,7 +1433,7 @@ public class ReplicationControlManager {
partition,
topicPartition.topicId(),
topicPartition.partitionId(),
- clusterControl::active,
+ clusterControl::isActive,
featureControl.metadataVersion().isLeaderRecoverySupported()
);
builder.setElection(PartitionChangeBuilder.Election.PREFERRED);
@@ -1554,7 +1516,7 @@ public class ReplicationControlManager {
OptionalInt.of(replicationFactor));
placements.add(assignment.brokerIds());
List isr = assignment.brokerIds().stream().
- filter(clusterControl::active).collect(Collectors.toList());
+ filter(clusterControl::isActive).collect(Collectors.toList());
if (isr.isEmpty()) {
throw new InvalidReplicaAssignmentException(
"All brokers specified in the manual partition assignment for " +
@@ -1574,7 +1536,7 @@ public class ReplicationControlManager {
for (int i = 0; i < placements.size(); i++) {
List replicas = placements.get(i);
List isr = isrs.get(i).stream().
- filter(clusterControl::active).collect(Collectors.toList());
+ filter(clusterControl::isActive).collect(Collectors.toList());
// If the ISR is empty, it means that all brokers are fenced or
// in controlled shutdown. To be consistent with the replica placer,
// we reject the create topic request with INVALID_REPLICATION_FACTOR.
@@ -1660,8 +1622,8 @@ public class ReplicationControlManager {
// from the target ISR, but we need to exclude it here too, to handle the case
// where there is an unclean leader election which chooses a leader from outside
// the ISR.
- Function isAcceptableLeader =
- r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.active(r));
+ IntPredicate isAcceptableLeader =
+ r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.isActive(r));
while (iterator.hasNext()) {
TopicIdPartition topicIdPart = iterator.next();
@@ -1788,7 +1750,7 @@ public class ReplicationControlManager {
PartitionChangeBuilder builder = new PartitionChangeBuilder(part,
tp.topicId(),
tp.partitionId(),
- clusterControl::active,
+ clusterControl::isActive,
featureControl.metadataVersion().isLeaderRecoverySupported());
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
@@ -1840,7 +1802,7 @@ public class ReplicationControlManager {
PartitionChangeBuilder builder = new PartitionChangeBuilder(part,
tp.topicId(),
tp.partitionId(),
- clusterControl::active,
+ clusterControl::isActive,
featureControl.metadataVersion().isLeaderRecoverySupported());
if (!reassignment.merged().equals(currentReplicas)) {
builder.setTargetReplicas(reassignment.merged());
diff --git a/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java b/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
index 0df9c8b8f56..25177d45d71 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
@@ -264,7 +264,7 @@ public class SnapshotRegistry {
}
/**
- * Associate with this registry.
+ * Associate a revertable with this registry.
*/
public void register(Revertable revertable) {
revertables.add(revertable);
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index ceb12cc6254..55bb6df9568 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -86,11 +86,10 @@ public class ClusterControlManagerTest {
setTime(time).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
- setControllerMetrics(new MockControllerMetrics()).
setFeatureControlManager(featureControl).
build();
clusterControl.activate();
- assertFalse(clusterControl.unfenced(0));
+ assertFalse(clusterControl.isUnfenced(0));
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().setBrokerEpoch(100).setBrokerId(1);
brokerRecord.endPoints().add(new BrokerEndpoint().
@@ -104,8 +103,8 @@ public class ClusterControlManagerTest {
() -> clusterControl.checkBrokerEpoch(1, 101));
assertThrows(StaleBrokerEpochException.class,
() -> clusterControl.checkBrokerEpoch(2, 100));
- assertFalse(clusterControl.unfenced(0));
- assertFalse(clusterControl.unfenced(1));
+ assertFalse(clusterControl.isUnfenced(0));
+ assertFalse(clusterControl.isUnfenced(1));
if (metadataVersion.isLessThan(IBP_3_3_IV2)) {
UnfenceBrokerRecord unfenceBrokerRecord =
@@ -116,8 +115,8 @@ public class ClusterControlManagerTest {
new BrokerRegistrationChangeRecord().setBrokerId(1).setBrokerEpoch(100).setFenced(BrokerRegistrationFencingChange.UNFENCE.value());
clusterControl.replay(changeRecord);
}
- assertFalse(clusterControl.unfenced(0));
- assertTrue(clusterControl.unfenced(1));
+ assertFalse(clusterControl.isUnfenced(0));
+ assertTrue(clusterControl.isUnfenced(1));
if (metadataVersion.isLessThan(IBP_3_3_IV2)) {
FenceBrokerRecord fenceBrokerRecord =
@@ -128,8 +127,8 @@ public class ClusterControlManagerTest {
new BrokerRegistrationChangeRecord().setBrokerId(1).setBrokerEpoch(100).setFenced(BrokerRegistrationFencingChange.FENCE.value());
clusterControl.replay(changeRecord);
}
- assertFalse(clusterControl.unfenced(0));
- assertFalse(clusterControl.unfenced(1));
+ assertFalse(clusterControl.isUnfenced(0));
+ assertFalse(clusterControl.isUnfenced(1));
}
@Test
@@ -149,11 +148,10 @@ public class ClusterControlManagerTest {
setTime(time).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
- setControllerMetrics(new MockControllerMetrics()).
setFeatureControlManager(featureControl).
build();
- assertFalse(clusterControl.unfenced(0));
+ assertFalse(clusterControl.isUnfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
@@ -169,20 +167,20 @@ public class ClusterControlManagerTest {
setHost("example.com"));
clusterControl.replay(brokerRecord, 100L);
- assertFalse(clusterControl.unfenced(0));
+ assertFalse(clusterControl.isUnfenced(0));
assertTrue(clusterControl.inControlledShutdown(0));
brokerRecord.setInControlledShutdown(false);
clusterControl.replay(brokerRecord, 100L);
- assertFalse(clusterControl.unfenced(0));
+ assertFalse(clusterControl.isUnfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
assertEquals(100L, clusterControl.registerBrokerRecordOffset(brokerRecord.brokerId()).getAsLong());
brokerRecord.setFenced(false);
clusterControl.replay(brokerRecord, 100L);
- assertTrue(clusterControl.unfenced(0));
+ assertTrue(clusterControl.isUnfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
}
@@ -203,11 +201,10 @@ public class ClusterControlManagerTest {
setTime(time).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
- setControllerMetrics(new MockControllerMetrics()).
setFeatureControlManager(featureControl).
build();
- assertFalse(clusterControl.unfenced(0));
+ assertFalse(clusterControl.isUnfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
@@ -222,7 +219,7 @@ public class ClusterControlManagerTest {
setHost("example.com"));
clusterControl.replay(brokerRecord, 100L);
- assertTrue(clusterControl.unfenced(0));
+ assertTrue(clusterControl.isUnfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
BrokerRegistrationChangeRecord registrationChangeRecord = new BrokerRegistrationChangeRecord()
@@ -231,7 +228,7 @@ public class ClusterControlManagerTest {
.setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value());
clusterControl.replay(registrationChangeRecord);
- assertTrue(clusterControl.unfenced(0));
+ assertTrue(clusterControl.isUnfenced(0));
assertTrue(clusterControl.inControlledShutdown(0));
registrationChangeRecord = new BrokerRegistrationChangeRecord()
@@ -240,7 +237,7 @@ public class ClusterControlManagerTest {
.setFenced(BrokerRegistrationFencingChange.UNFENCE.value());
clusterControl.replay(registrationChangeRecord);
- assertTrue(clusterControl.unfenced(0));
+ assertTrue(clusterControl.isUnfenced(0));
assertTrue(clusterControl.inControlledShutdown(0));
}
@@ -259,7 +256,6 @@ public class ClusterControlManagerTest {
setTime(new MockTime(0, 0, 0)).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
- setControllerMetrics(new MockControllerMetrics()).
setFeatureControlManager(featureControl).
build();
clusterControl.activate();
@@ -289,7 +285,6 @@ public class ClusterControlManagerTest {
setTime(new MockTime(0, 0, 0)).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
- setControllerMetrics(new MockControllerMetrics()).
setFeatureControlManager(featureControl).
build();
clusterControl.activate();
@@ -345,7 +340,6 @@ public class ClusterControlManagerTest {
setTime(new MockTime(0, 0, 0)).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
- setControllerMetrics(new MockControllerMetrics()).
setFeatureControlManager(featureControl).
build();
clusterControl.activate();
@@ -380,7 +374,6 @@ public class ClusterControlManagerTest {
setTime(time).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
- setControllerMetrics(new MockControllerMetrics()).
setFeatureControlManager(featureControl).
build();
clusterControl.activate();
@@ -399,7 +392,7 @@ public class ClusterControlManagerTest {
clusterControl.heartbeatManager().touch(i, false, 0);
}
for (int i = 0; i < numUsableBrokers; i++) {
- assertTrue(clusterControl.unfenced(i),
+ assertTrue(clusterControl.isUnfenced(i),
String.format("broker %d was not unfenced.", i));
}
for (int i = 0; i < 100; i++) {
@@ -439,11 +432,10 @@ public class ClusterControlManagerTest {
setTime(time).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
- setControllerMetrics(new MockControllerMetrics()).
setFeatureControlManager(featureControl).
build();
clusterControl.activate();
- assertFalse(clusterControl.unfenced(0));
+ assertFalse(clusterControl.isUnfenced(0));
for (int i = 0; i < 3; i++) {
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
setBrokerEpoch(100).setBrokerId(i).setRack(null);
@@ -513,7 +505,6 @@ public class ClusterControlManagerTest {
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setTime(new MockTime(0, 0, 0)).
setSnapshotRegistry(snapshotRegistry).
- setControllerMetrics(new MockControllerMetrics()).
setFeatureControlManager(featureControl).
build();
clusterControl.activate();
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ControllerMetricsManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ControllerMetricsManagerTest.java
new file mode 100644
index 00000000000..8cfeea417be
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/controller/ControllerMetricsManagerTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.junit.jupiter.api.Test;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+final class ControllerMetricsManagerTest {
+ @Test
+ public void testActiveBrokerRegistration() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+ manager.replay(brokerRegistration(1, 1, false));
+ assertEquals(1, metrics.activeBrokerCount());
+ assertEquals(0, metrics.fencedBrokerCount());
+ }
+
+ @Test
+ public void testFenceBrokerRegistration() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+ manager.replay(brokerRegistration(1, 1, true));
+ assertEquals(0, metrics.activeBrokerCount());
+ assertEquals(1, metrics.fencedBrokerCount());
+ }
+
+ @Test
+ public void testBrokerChangedToActive() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+ manager.replay(brokerRegistration(1, 1, true));
+ manager.replay(brokerChange(1, 1, BrokerRegistrationFencingChange.UNFENCE));
+ assertEquals(1, metrics.activeBrokerCount());
+ assertEquals(0, metrics.fencedBrokerCount());
+ }
+
+ @Test
+ public void testBrokerLegacyChangedToActive() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+ manager.replay(brokerRegistration(1, 1, true));
+ manager.replay(brokerUnfence(1, 1));
+ assertEquals(1, metrics.activeBrokerCount());
+ assertEquals(0, metrics.fencedBrokerCount());
+ }
+
+ @Test
+ public void testBrokerChangedToFence() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+ manager.replay(brokerRegistration(1, 1, false));
+ manager.replay(brokerChange(1, 1, BrokerRegistrationFencingChange.FENCE));
+ assertEquals(0, metrics.activeBrokerCount());
+ assertEquals(1, metrics.fencedBrokerCount());
+ }
+
+
+ @Test
+ public void testBrokerLegacyChangedToFence() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+ manager.replay(brokerRegistration(1, 1, false));
+ manager.replay(brokerFence(1, 1));
+ assertEquals(0, metrics.activeBrokerCount());
+ assertEquals(1, metrics.fencedBrokerCount());
+ }
+
+ @Test
+ public void testBrokerUnchanged() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+ manager.replay(brokerRegistration(1, 1, true));
+ manager.replay(brokerChange(1, 1, BrokerRegistrationFencingChange.NONE));
+ assertEquals(0, metrics.activeBrokerCount());
+ assertEquals(1, metrics.fencedBrokerCount());
+ }
+
+ @Test
+ public void testBrokerUnregister() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+ manager.replay(brokerRegistration(1, 1, true));
+ manager.replay(brokerRegistration(2, 1, false));
+ assertEquals(1, metrics.activeBrokerCount());
+ assertEquals(1, metrics.fencedBrokerCount());
+ manager.replay(brokerUnregistration(1, 1));
+ assertEquals(1, metrics.activeBrokerCount());
+ assertEquals(0, metrics.fencedBrokerCount());
+ manager.replay(brokerUnregistration(2, 1));
+ assertEquals(0, metrics.activeBrokerCount());
+ assertEquals(0, metrics.fencedBrokerCount());
+ }
+
+ @Test
+ public void testReplayBatch() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+ manager.replayBatch(
+ 0,
+ Arrays.asList(
+ new ApiMessageAndVersion(brokerRegistration(1, 1, true), (short) 0),
+ new ApiMessageAndVersion(brokerChange(1, 1, BrokerRegistrationFencingChange.UNFENCE), (short) 0)
+ )
+ );
+ assertEquals(1, metrics.activeBrokerCount());
+ assertEquals(0, metrics.fencedBrokerCount());
+ }
+
+ @Test
+ public void testTopicCountIncreased() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+ manager.replay(topicRecord("test"));
+ assertEquals(1, metrics.globalTopicCount());
+ }
+
+ @Test
+ public void testTopicCountDecreased() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+ Uuid id = Uuid.randomUuid();
+ manager.replay(topicRecord("test", id));
+ manager.replay(removeTopicRecord(id));
+ assertEquals(0, metrics.globalTopicCount());
+ }
+
+ @Test
+ public void testPartitionCountIncreased() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+ Uuid id = Uuid.randomUuid();
+ manager.replay(topicRecord("test", id));
+ assertEquals(0, metrics.globalPartitionCount());
+ manager.replay(partitionRecord(id, 0, 0, Arrays.asList(0, 1, 2)));
+ assertEquals(1, metrics.globalPartitionCount());
+ manager.replay(partitionRecord(id, 1, 0, Arrays.asList(0, 1, 2)));
+ assertEquals(2, metrics.globalPartitionCount());
+ }
+
+ @Test
+ public void testPartitionCountDecreased() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+ Uuid id = Uuid.randomUuid();
+ manager.replay(topicRecord("test", id));
+ manager.replay(partitionRecord(id, 0, 0, Arrays.asList(0, 1, 2)));
+ manager.replay(partitionRecord(id, 1, 0, Arrays.asList(0, 1, 2)));
+ manager.replay(removeTopicRecord(id));
+ assertEquals(0, metrics.globalPartitionCount());
+ }
+
+ @Test
+ public void testOfflinePartition() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+ Uuid id = Uuid.randomUuid();
+ manager.replay(topicRecord("test", id));
+ manager.replay(partitionRecord(id, 0, NO_LEADER, Arrays.asList(0, 1, 2)));
+ assertEquals(1, metrics.offlinePartitionCount());
+ }
+
+ @Test
+ public void testImbalancedPartition() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+ Uuid id = Uuid.randomUuid();
+ manager.replay(topicRecord("test", id));
+ manager.replay(partitionRecord(id, 0, 1, Arrays.asList(0, 1, 2)));
+ assertEquals(1, metrics.preferredReplicaImbalanceCount());
+ }
+
+ @Test
+ public void testPartitionChange() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+ Uuid id = Uuid.randomUuid();
+ manager.replay(topicRecord("test", id));
+ manager.replay(partitionRecord(id, 0, 0, Arrays.asList(0, 1, 2)));
+
+ manager.replay(partitionChangeRecord(id, 0, OptionalInt.of(NO_LEADER), Optional.empty()));
+ assertEquals(1, metrics.offlinePartitionCount());
+
+ manager.replay(partitionChangeRecord(id, 0, OptionalInt.of(1), Optional.empty()));
+ assertEquals(0, metrics.offlinePartitionCount());
+ assertEquals(1, metrics.preferredReplicaImbalanceCount());
+
+ manager.replay(partitionChangeRecord(id, 0, OptionalInt.of(0), Optional.empty()));
+ assertEquals(0, metrics.preferredReplicaImbalanceCount());
+
+ manager.replay(partitionChangeRecord(id, 0, OptionalInt.empty(), Optional.of(Arrays.asList(1, 2, 0))));
+ assertEquals(1, metrics.preferredReplicaImbalanceCount());
+
+ manager.replay(partitionChangeRecord(id, 0, OptionalInt.of(2), Optional.of(Arrays.asList(2, 0, 1))));
+ assertEquals(0, metrics.preferredReplicaImbalanceCount());
+ }
+
+ @Test
+ public void testStartingMetrics() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+ assertEquals(0, metrics.activeBrokerCount());
+ assertEquals(0, metrics.fencedBrokerCount());
+ assertEquals(0, metrics.globalTopicCount());
+ assertEquals(0, metrics.globalPartitionCount());
+ assertEquals(0, metrics.offlinePartitionCount());
+ assertEquals(0, metrics.preferredReplicaImbalanceCount());
+ }
+
+ @Test
+ public void testReset() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+ manager.replay(brokerRegistration(1, 1, true));
+
+ Uuid id = Uuid.randomUuid();
+ manager.replay(topicRecord("test", id));
+ manager.replay(partitionRecord(id, 0, 0, Arrays.asList(0, 1, 2)));
+
+ manager.reset();
+
+ assertEquals(0, metrics.activeBrokerCount());
+ assertEquals(0, metrics.fencedBrokerCount());
+ assertEquals(0, metrics.globalTopicCount());
+ assertEquals(0, metrics.globalPartitionCount());
+ assertEquals(0, metrics.offlinePartitionCount());
+ assertEquals(0, metrics.preferredReplicaImbalanceCount());
+ }
+
+ private static RegisterBrokerRecord brokerRegistration(
+ int brokerId,
+ long epoch,
+ boolean fenced
+ ) {
+ return new RegisterBrokerRecord()
+ .setBrokerId(brokerId)
+ .setIncarnationId(Uuid.randomUuid())
+ .setBrokerEpoch(epoch)
+ .setFenced(fenced);
+ }
+
+ private static UnregisterBrokerRecord brokerUnregistration(
+ int brokerId,
+ long epoch
+ ) {
+ return new UnregisterBrokerRecord()
+ .setBrokerId(brokerId)
+ .setBrokerEpoch(epoch);
+ }
+
+ private static BrokerRegistrationChangeRecord brokerChange(
+ int brokerId,
+ long epoch,
+ BrokerRegistrationFencingChange fencing
+ ) {
+ return new BrokerRegistrationChangeRecord()
+ .setBrokerId(brokerId)
+ .setBrokerEpoch(epoch)
+ .setFenced(fencing.value());
+ }
+
+ private static UnfenceBrokerRecord brokerUnfence(int brokerId, long epoch) {
+ return new UnfenceBrokerRecord()
+ .setId(brokerId)
+ .setEpoch(epoch);
+ }
+
+ private static FenceBrokerRecord brokerFence(int brokerId, long epoch) {
+ return new FenceBrokerRecord()
+ .setId(brokerId)
+ .setEpoch(epoch);
+ }
+
+ private static TopicRecord topicRecord(String name) {
+ return new TopicRecord().setName(name).setTopicId(Uuid.randomUuid());
+ }
+
+ private static TopicRecord topicRecord(String name, Uuid id) {
+ return new TopicRecord().setName(name).setTopicId(id);
+ }
+
+ private static RemoveTopicRecord removeTopicRecord(Uuid id) {
+ return new RemoveTopicRecord().setTopicId(id);
+ }
+
+ private static PartitionRecord partitionRecord(
+ Uuid id,
+ int partition,
+ int leader,
+ List replicas
+ ) {
+ return new PartitionRecord()
+ .setPartitionId(partition)
+ .setTopicId(id)
+ .setReplicas(replicas)
+ .setIsr(replicas)
+ .setLeader(leader);
+ }
+
+ private static PartitionChangeRecord partitionChangeRecord(
+ Uuid id,
+ int partition,
+ OptionalInt leader,
+ Optional> replicas
+ ) {
+ PartitionChangeRecord record = new PartitionChangeRecord();
+ leader.ifPresent(record::setLeader);
+ replicas.ifPresent(record::setReplicas);
+ replicas.ifPresent(record::setIsr);
+
+ return record
+ .setPartitionId(partition)
+ .setTopicId(id);
+ }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
index ca13d90ddea..4a0155ed1bd 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
@@ -75,12 +75,12 @@ public final class MockControllerMetrics implements ControllerMetrics {
}
@Override
- public void setGlobalTopicsCount(int topicCount) {
+ public void setGlobalTopicCount(int topicCount) {
this.topics = topicCount;
}
@Override
- public int globalTopicsCount() {
+ public int globalTopicCount() {
return this.topics;
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
index 80c5c505ae0..8c6b1281e23 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
@@ -64,7 +64,6 @@ public class ProducerIdControlManagerTest {
setTime(time).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
- setControllerMetrics(new MockControllerMetrics()).
setFeatureControlManager(featureControl).
build();
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 90807a2e2a7..9c427d17da1 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -277,7 +277,7 @@ public class QuorumControllerTest {
// Brokers are only registered and should still be fenced
allBrokers.forEach(brokerId -> {
- assertFalse(active.clusterControl().unfenced(brokerId),
+ assertFalse(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been fenced");
});
@@ -297,7 +297,7 @@ public class QuorumControllerTest {
TestUtils.waitForCondition(() -> {
sendBrokerheartbeat(active, brokersToKeepUnfenced, brokerEpochs);
for (Integer brokerId : brokersToFence) {
- if (active.clusterControl().unfenced(brokerId)) {
+ if (active.clusterControl().isUnfenced(brokerId)) {
return false;
}
}
@@ -311,11 +311,11 @@ public class QuorumControllerTest {
// At this point only the brokers we want fenced should be fenced.
brokersToKeepUnfenced.forEach(brokerId -> {
- assertTrue(active.clusterControl().unfenced(brokerId),
+ assertTrue(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been unfenced");
});
brokersToFence.forEach(brokerId -> {
- assertFalse(active.clusterControl().unfenced(brokerId),
+ assertFalse(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been fenced");
});
@@ -376,7 +376,7 @@ public class QuorumControllerTest {
// Brokers are only registered and should still be fenced
allBrokers.forEach(brokerId -> {
- assertFalse(active.clusterControl().unfenced(brokerId),
+ assertFalse(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been fenced");
});
@@ -396,7 +396,7 @@ public class QuorumControllerTest {
() -> {
sendBrokerheartbeat(active, brokersToKeepUnfenced, brokerEpochs);
for (Integer brokerId : brokersToFence) {
- if (active.clusterControl().unfenced(brokerId)) {
+ if (active.clusterControl().isUnfenced(brokerId)) {
return false;
}
}
@@ -411,11 +411,11 @@ public class QuorumControllerTest {
// At this point only the brokers we want fenced should be fenced.
brokersToKeepUnfenced.forEach(brokerId -> {
- assertTrue(active.clusterControl().unfenced(brokerId),
+ assertTrue(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been unfenced");
});
brokersToFence.forEach(brokerId -> {
- assertFalse(active.clusterControl().unfenced(brokerId),
+ assertFalse(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been fenced");
});
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 335fd52a07d..a9e25d57144 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -151,7 +151,6 @@ public class ReplicationControlManagerTest {
final LogContext logContext = new LogContext();
final MockTime time = new MockTime();
final MockRandom random = new MockRandom();
- final ControllerMetrics metrics = new MockControllerMetrics();
final FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
@@ -165,7 +164,6 @@ public class ReplicationControlManagerTest {
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS, TimeUnit.NANOSECONDS)).
setReplicaPlacer(new StripedReplicaPlacer(random)).
- setControllerMetrics(metrics).
setFeatureControlManager(featureControl).
build();
final ConfigurationControlManager configurationControl = new ConfigurationControlManager.Builder().
@@ -206,7 +204,6 @@ public class ReplicationControlManagerTest {
setMaxElectionsPerImbalance(Integer.MAX_VALUE).
setConfigurationControl(configurationControl).
setClusterControl(clusterControl).
- setControllerMetrics(metrics).
setCreateTopicPolicy(createTopicPolicy).
setFeatureControl(featureControl).
build();
@@ -605,41 +602,6 @@ public class ReplicationControlManagerTest {
);
}
- @Test
- public void testBrokerCountMetrics() throws Exception {
- ReplicationControlTestContext ctx = new ReplicationControlTestContext();
- ReplicationControlManager replicationControl = ctx.replicationControl;
-
- ctx.registerBrokers(0);
-
- assertEquals(1, ctx.metrics.fencedBrokerCount());
- assertEquals(0, ctx.metrics.activeBrokerCount());
-
- ctx.unfenceBrokers(0);
-
- assertEquals(0, ctx.metrics.fencedBrokerCount());
- assertEquals(1, ctx.metrics.activeBrokerCount());
-
- ctx.registerBrokers(1);
- ctx.unfenceBrokers(1);
-
- assertEquals(2, ctx.metrics.activeBrokerCount());
-
- ctx.registerBrokers(2);
- ctx.unfenceBrokers(2);
-
- assertEquals(0, ctx.metrics.fencedBrokerCount());
- assertEquals(3, ctx.metrics.activeBrokerCount());
-
- ControllerResult result = replicationControl.unregisterBroker(0);
- ctx.replay(result.records());
- result = replicationControl.unregisterBroker(2);
- ctx.replay(result.records());
-
- assertEquals(0, ctx.metrics.fencedBrokerCount());
- assertEquals(1, ctx.metrics.activeBrokerCount());
- }
-
@Test
public void testCreateTopicsWithValidateOnlyFlag() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
@@ -698,107 +660,6 @@ public class ReplicationControlManagerTest {
ctx.createTestTopic("quux", new int[][] {new int[] {1, 2, 0}}, POLICY_VIOLATION.code());
}
- @Test
- public void testGlobalTopicAndPartitionMetrics() throws Exception {
- ReplicationControlTestContext ctx = new ReplicationControlTestContext();
- ReplicationControlManager replicationControl = ctx.replicationControl;
- CreateTopicsRequestData request = new CreateTopicsRequestData();
- request.topics().add(new CreatableTopic().setName("foo").
- setNumPartitions(1).setReplicationFactor((short) -1));
-
- ctx.registerBrokers(0, 1, 2);
- ctx.unfenceBrokers(0, 1, 2);
-
- List topicsToDelete = new ArrayList<>();
-
- ControllerResult result =
- replicationControl.createTopics(request, Collections.singleton("foo"));
- topicsToDelete.add(result.response().topics().find("foo").topicId());
-
- RecordTestUtils.replayAll(replicationControl, result.records());
- assertEquals(1, ctx.metrics.globalTopicsCount());
-
- request = new CreateTopicsRequestData();
- request.topics().add(new CreatableTopic().setName("bar").
- setNumPartitions(1).setReplicationFactor((short) -1));
- request.topics().add(new CreatableTopic().setName("baz").
- setNumPartitions(2).setReplicationFactor((short) -1));
- result = replicationControl.createTopics(request,
- new HashSet<>(Arrays.asList("bar", "baz")));
- RecordTestUtils.replayAll(replicationControl, result.records());
- assertEquals(3, ctx.metrics.globalTopicsCount());
- assertEquals(4, ctx.metrics.globalPartitionCount());
-
- topicsToDelete.add(result.response().topics().find("baz").topicId());
- ControllerResult