MINOR: some minor cleanups in the quorum controller. (#17819)

BrokerHeartbeatManager.java: fix an outdated comment.

Move an inefficient test method that is O(num_brokers) from ClusterControlManager.java into ReplicationControlManagerTest.java, so that it doesn't accidentally get used in production code.

Remove QuorumController.ImbalanceSchedule, etc. since it is no longer used.

Move the initialization of OffsetControlManager later in the QuorumController constructor and add a comment explaining why it should come last. This doesn't fix any bugs currently, but it's a good practice for the future.

Reviewers: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
Colin Patrick McCabe 2024-11-18 11:15:38 -08:00 committed by GitHub
parent 50c15b94c9
commit 130bf1054b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 24 additions and 44 deletions

View File

@ -75,8 +75,7 @@ public class BrokerHeartbeatManager {
/** /**
* The offset at which the broker should complete its controlled shutdown, or -1 * The offset at which the broker should complete its controlled shutdown, or -1
* if the broker is not performing a controlled shutdown. When this field is * if the broker is not performing a controlled shutdown.
* updated, we also have to update the broker's position in the shuttingDown set.
*/ */
private long controlledShutdownOffset; private long controlledShutdownOffset;

View File

@ -70,7 +70,6 @@ import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS;
@ -327,14 +326,6 @@ public class ClusterControlManager {
return brokerRegistrations; return brokerRegistrations;
} }
Set<Integer> fencedBrokerIds() {
return brokerRegistrations.values()
.stream()
.filter(BrokerRegistration::fenced)
.map(BrokerRegistration::id)
.collect(Collectors.toSet());
}
/** /**
* Process an incoming broker registration request. * Process an incoming broker registration request.
*/ */

View File

@ -1431,35 +1431,14 @@ public final class QuorumController implements Controller {
*/ */
private volatile int curClaimEpoch; private volatile int curClaimEpoch;
/**
* How long to delay partition leader balancing operations.
*/
private final OptionalLong leaderImbalanceCheckIntervalNs;
private enum ImbalanceSchedule {
// The leader balancing operation has been scheduled
SCHEDULED,
// If the leader balancing operation should be scheduled, schedule it with a delay
DEFERRED,
// If the leader balancing operation should be scheduled, schedule it immediately
IMMEDIATELY
}
/**
* Tracks the scheduling state for partition leader balancing operations.
*/
private final ImbalanceSchedule imbalancedScheduled = ImbalanceSchedule.DEFERRED;
/**
* Tracks the scheduling state for unclean leader election operations.
*/
private final ImbalanceSchedule uncleanScheduled = ImbalanceSchedule.DEFERRED;
/** /**
* The bootstrap metadata to use for initialization if needed. * The bootstrap metadata to use for initialization if needed.
*/ */
private final BootstrapMetadata bootstrapMetadata; private final BootstrapMetadata bootstrapMetadata;
/**
* True if the KIP-966 eligible leader replicas feature is enabled.
*/
private final boolean eligibleLeaderReplicasEnabled; private final boolean eligibleLeaderReplicasEnabled;
/** /**
@ -1515,12 +1494,6 @@ public final class QuorumController implements Controller {
this.controllerMetrics = controllerMetrics; this.controllerMetrics = controllerMetrics;
this.snapshotRegistry = new SnapshotRegistry(logContext); this.snapshotRegistry = new SnapshotRegistry(logContext);
this.deferredEventQueue = new DeferredEventQueue(logContext); this.deferredEventQueue = new DeferredEventQueue(logContext);
this.offsetControl = new OffsetControlManager.Builder().
setLogContext(logContext).
setSnapshotRegistry(snapshotRegistry).
setMetrics(controllerMetrics).
setTime(time).
build();
this.resourceExists = new ConfigResourceExistenceChecker(); this.resourceExists = new ConfigResourceExistenceChecker();
this.configurationControl = new ConfigurationControlManager.Builder(). this.configurationControl = new ConfigurationControlManager.Builder().
setLogContext(logContext). setLogContext(logContext).
@ -1571,7 +1544,6 @@ public final class QuorumController implements Controller {
setSnapshotRegistry(snapshotRegistry). setSnapshotRegistry(snapshotRegistry).
setClusterControlManager(clusterControl). setClusterControlManager(clusterControl).
build(); build();
this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs;
this.replicationControl = new ReplicationControlManager.Builder(). this.replicationControl = new ReplicationControlManager.Builder().
setSnapshotRegistry(snapshotRegistry). setSnapshotRegistry(snapshotRegistry).
setLogContext(logContext). setLogContext(logContext).
@ -1619,6 +1591,15 @@ public final class QuorumController implements Controller {
registerElectUnclean(TimeUnit.MILLISECONDS.toNanos(uncleanLeaderElectionCheckIntervalMs)); registerElectUnclean(TimeUnit.MILLISECONDS.toNanos(uncleanLeaderElectionCheckIntervalMs));
registerExpireDelegationTokens(MILLISECONDS.toNanos(delegationTokenExpiryCheckIntervalMs)); registerExpireDelegationTokens(MILLISECONDS.toNanos(delegationTokenExpiryCheckIntervalMs));
// OffsetControlManager must be initialized last, because its constructor will take the
// initial in-memory snapshot of all extant timeline data structures.
this.offsetControl = new OffsetControlManager.Builder().
setLogContext(logContext).
setSnapshotRegistry(snapshotRegistry).
setMetrics(controllerMetrics).
setTime(time).
build();
log.info("Creating new QuorumController with clusterId {}.{}", log.info("Creating new QuorumController with clusterId {}.{}",
clusterId, clusterId,
eligibleLeaderReplicasEnabled ? " Eligible leader replicas enabled." : ""); eligibleLeaderReplicasEnabled ? " Eligible leader replicas enabled." : "");

View File

@ -501,7 +501,7 @@ public class ReplicationControlManagerTest {
replay(fenceResult.records()); replay(fenceResult.records());
} while (fenceResult.response().booleanValue()); } while (fenceResult.response().booleanValue());
assertEquals(brokerIds, clusterControl.fencedBrokerIds()); assertEquals(brokerIds, fencedBrokerIds());
} }
long currentBrokerEpoch(int brokerId) { long currentBrokerEpoch(int brokerId) {
@ -525,6 +525,15 @@ public class ReplicationControlManagerTest {
replay(result.records()); replay(result.records());
return result; return result;
} }
Set<Integer> fencedBrokerIds() {
return clusterControl.brokerRegistrations().values()
.stream()
.filter(BrokerRegistration::fenced)
.map(BrokerRegistration::id)
.collect(Collectors.toSet());
}
} }
static CreateTopicsResponseData withoutConfigs(CreateTopicsResponseData data) { static CreateTopicsResponseData withoutConfigs(CreateTopicsResponseData data) {
@ -2443,7 +2452,7 @@ public class ReplicationControlManagerTest {
Uuid fooId = ctx.createTestTopic("foo", new int[][]{ Uuid fooId = ctx.createTestTopic("foo", new int[][]{
new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId(); new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId();
assertTrue(ctx.clusterControl.fencedBrokerIds().isEmpty()); assertTrue(ctx.fencedBrokerIds().isEmpty());
ctx.fenceBrokers(Set.of(2, 3)); ctx.fenceBrokers(Set.of(2, 3));
PartitionRegistration partition0 = replication.getPartition(fooId, 0); PartitionRegistration partition0 = replication.getPartition(fooId, 0);