mirror of https://github.com/apache/kafka.git
KAFKA-17506 KRaftMigrationDriver initialization race (#17147)
There is a race condition between KRaftMigrationDriver running its first poll() and being notified by Raft about a leader change. If onControllerChange is called before RecoverMigrationStateFromZKEvent is run, we will end up getting stuck in the INACTIVE state. This patch fixes the race by enqueuing a RecoverMigrationStateFromZKEvent from onControllerChange if the driver has not yet initialized. If another RecoverMigrationStateFromZKEvent was already enqueued, the second one to run will just be ignored. Reviewers: Luke Chen <showuon@gmail.com>
This commit is contained in:
parent
d04f534892
commit
0e30209f01
|
@ -359,6 +359,9 @@ public class KRaftMigrationDriver implements MetadataPublisher {
|
|||
@Override
|
||||
public void onControllerChange(LeaderAndEpoch newLeaderAndEpoch) {
|
||||
curLeaderAndEpoch = newLeaderAndEpoch;
|
||||
if (migrationState.equals(MigrationDriverState.UNINITIALIZED)) {
|
||||
eventQueue.append(new RecoverMigrationStateFromZKEvent());
|
||||
}
|
||||
eventQueue.append(new KRaftLeaderEvent(newLeaderAndEpoch));
|
||||
}
|
||||
|
||||
|
@ -519,8 +522,8 @@ public class KRaftMigrationDriver implements MetadataPublisher {
|
|||
KRaftMigrationDriver.this.image = image;
|
||||
String metadataType = isSnapshot ? "snapshot" : "delta";
|
||||
|
||||
if (migrationState.equals(MigrationDriverState.INACTIVE)) {
|
||||
// No need to log anything if this node is not the active controller
|
||||
if (EnumSet.of(MigrationDriverState.UNINITIALIZED, MigrationDriverState.INACTIVE).contains(migrationState)) {
|
||||
// No need to log anything if this node is not the active controller or the driver has not initialized
|
||||
completionHandler.accept(null);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -232,6 +232,32 @@ public class KRaftMigrationDriverTest {
|
|||
return future;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnControllerChangeWhenUninitialized() throws InterruptedException {
|
||||
CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
|
||||
CapturingMigrationClient.newBuilder().build();
|
||||
CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().build();
|
||||
MockFaultHandler faultHandler = new MockFaultHandler("testBecomeLeaderUninitialized");
|
||||
KRaftMigrationDriver.Builder builder = defaultTestBuilder()
|
||||
.setZkMigrationClient(migrationClient)
|
||||
.setPropagator(metadataPropagator)
|
||||
.setFaultHandler(faultHandler);
|
||||
try (KRaftMigrationDriver driver = builder.build()) {
|
||||
// Fake a complete migration with ZK client
|
||||
migrationClient.setMigrationRecoveryState(
|
||||
ZkMigrationLeadershipState.EMPTY.withKRaftMetadataOffsetAndEpoch(100, 1));
|
||||
|
||||
// simulate the Raft layer running before the driver has fully started.
|
||||
driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1));
|
||||
|
||||
// start up the driver. this will enqueue a poll event. once run, this will enqueue a recovery event
|
||||
driver.start();
|
||||
|
||||
// Even though we contrived a race above, the driver still makes it past initialization.
|
||||
TestUtils.waitForCondition(() -> driver.migrationState().get(30, TimeUnit.SECONDS).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM),
|
||||
"Waiting for KRaftMigrationDriver to enter WAIT_FOR_CONTROLLER_QUORUM state");
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Don't send RPCs to brokers for every metadata change, only when brokers or topics change.
|
||||
* This is a regression test for KAFKA-14668
|
||||
|
|
Loading…
Reference in New Issue