diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java index e63170ca5bf..2762d36f487 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java @@ -75,8 +75,7 @@ public class BrokerHeartbeatManager { /** * The offset at which the broker should complete its controlled shutdown, or -1 - * if the broker is not performing a controlled shutdown. When this field is - * updated, we also have to update the broker's position in the shuttingDown set. + * if the broker is not performing a controlled shutdown. */ private long controlledShutdownOffset; diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java index c583906d4ee..580967d97a5 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java @@ -70,7 +70,6 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -327,14 +326,6 @@ public class ClusterControlManager { return brokerRegistrations; } - Set fencedBrokerIds() { - return brokerRegistrations.values() - .stream() - .filter(BrokerRegistration::fenced) - .map(BrokerRegistration::id) - .collect(Collectors.toSet()); - } - /** * Process an incoming broker registration request. */ diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index a541be68f35..71e43b55b25 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -1431,35 +1431,14 @@ public final class QuorumController implements Controller { */ private volatile int curClaimEpoch; - /** - * How long to delay partition leader balancing operations. - */ - private final OptionalLong leaderImbalanceCheckIntervalNs; - - private enum ImbalanceSchedule { - // The leader balancing operation has been scheduled - SCHEDULED, - // If the leader balancing operation should be scheduled, schedule it with a delay - DEFERRED, - // If the leader balancing operation should be scheduled, schedule it immediately - IMMEDIATELY - } - - /** - * Tracks the scheduling state for partition leader balancing operations. - */ - private final ImbalanceSchedule imbalancedScheduled = ImbalanceSchedule.DEFERRED; - - /** - * Tracks the scheduling state for unclean leader election operations. - */ - private final ImbalanceSchedule uncleanScheduled = ImbalanceSchedule.DEFERRED; - /** * The bootstrap metadata to use for initialization if needed. */ private final BootstrapMetadata bootstrapMetadata; + /** + * True if the KIP-966 eligible leader replicas feature is enabled. + */ private final boolean eligibleLeaderReplicasEnabled; /** @@ -1515,12 +1494,6 @@ public final class QuorumController implements Controller { this.controllerMetrics = controllerMetrics; this.snapshotRegistry = new SnapshotRegistry(logContext); this.deferredEventQueue = new DeferredEventQueue(logContext); - this.offsetControl = new OffsetControlManager.Builder(). - setLogContext(logContext). - setSnapshotRegistry(snapshotRegistry). - setMetrics(controllerMetrics). - setTime(time). - build(); this.resourceExists = new ConfigResourceExistenceChecker(); this.configurationControl = new ConfigurationControlManager.Builder(). setLogContext(logContext). @@ -1571,7 +1544,6 @@ public final class QuorumController implements Controller { setSnapshotRegistry(snapshotRegistry). setClusterControlManager(clusterControl). build(); - this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs; this.replicationControl = new ReplicationControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setLogContext(logContext). @@ -1619,6 +1591,15 @@ public final class QuorumController implements Controller { registerElectUnclean(TimeUnit.MILLISECONDS.toNanos(uncleanLeaderElectionCheckIntervalMs)); registerExpireDelegationTokens(MILLISECONDS.toNanos(delegationTokenExpiryCheckIntervalMs)); + // OffsetControlManager must be initialized last, because its constructor will take the + // initial in-memory snapshot of all extant timeline data structures. + this.offsetControl = new OffsetControlManager.Builder(). + setLogContext(logContext). + setSnapshotRegistry(snapshotRegistry). + setMetrics(controllerMetrics). + setTime(time). + build(); + log.info("Creating new QuorumController with clusterId {}.{}", clusterId, eligibleLeaderReplicasEnabled ? " Eligible leader replicas enabled." : ""); diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 194c295de28..c53f068e9d6 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -501,7 +501,7 @@ public class ReplicationControlManagerTest { replay(fenceResult.records()); } while (fenceResult.response().booleanValue()); - assertEquals(brokerIds, clusterControl.fencedBrokerIds()); + assertEquals(brokerIds, fencedBrokerIds()); } long currentBrokerEpoch(int brokerId) { @@ -525,6 +525,15 @@ public class ReplicationControlManagerTest { replay(result.records()); return result; } + + Set fencedBrokerIds() { + return clusterControl.brokerRegistrations().values() + .stream() + .filter(BrokerRegistration::fenced) + .map(BrokerRegistration::id) + .collect(Collectors.toSet()); + } + } static CreateTopicsResponseData withoutConfigs(CreateTopicsResponseData data) { @@ -2443,7 +2452,7 @@ public class ReplicationControlManagerTest { Uuid fooId = ctx.createTestTopic("foo", new int[][]{ new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId(); - assertTrue(ctx.clusterControl.fencedBrokerIds().isEmpty()); + assertTrue(ctx.fencedBrokerIds().isEmpty()); ctx.fenceBrokers(Set.of(2, 3)); PartitionRegistration partition0 = replication.getPartition(fooId, 0);