KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors (#15732)

When running ZK migrating to KRaft process, we encountered an issue that the migrating is hanging and the ZkMigrationState cannot move to MIGRATION state. And it is because the pollEvent didn't retry with the retriable MigrationClientException (ZK client retriable errors) while it should. This PR fixes it and add test. And because of this, the poll event will not poll anymore, which causes the KRaftMigrationDriver hanging.

Reviewers: Luke Chen <showuon@gmail.com>, Igor Soarez<soarez@apple.com>, Akhilesh C <akhileshchg@users.noreply.github.com>
This commit is contained in:
Luke Chen 2024-04-29 18:44:47 +09:00 committed by GitHub
parent a9b4b88e54
commit ec151c8278
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 100 additions and 30 deletions

View File

@ -165,19 +165,6 @@ public class KRaftMigrationDriver implements MetadataPublisher {
return stateFuture;
}
private void recoverMigrationStateFromZK() {
applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState);
String maybeDone = migrationLeadershipState.initialZkMigrationComplete() ? "done" : "not done";
log.info("Initial migration of ZK metadata is {}.", maybeDone);
// Once we've recovered the migration state from ZK, install this class as a metadata publisher
// by calling the initialZkLoadHandler.
initialZkLoadHandler.accept(this);
// Transition to INACTIVE state and wait for leadership events.
transitionTo(MigrationDriverState.INACTIVE);
}
private boolean isControllerQuorumReadyForMigration() {
Optional<String> notReadyMsg = this.quorumFeatures.reasonAllControllersZkMigrationNotReady(
image.features().metadataVersion(), image.cluster().controllers());
@ -414,7 +401,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
/**
* An event generated by a call to {@link MetadataPublisher#onControllerChange}. This will not be called until
* this class is registered with {@link org.apache.kafka.image.loader.MetadataLoader}. The registration happens
* after the migration state is loaded from ZooKeeper in {@link #recoverMigrationStateFromZK}.
* after the migration state is loaded from ZooKeeper in {@link RecoverMigrationStateFromZKEvent}.
*/
class KRaftLeaderEvent extends MigrationEvent {
private final LeaderAndEpoch leaderAndEpoch;
@ -786,12 +773,31 @@ public class KRaftMigrationDriver implements MetadataPublisher {
}
}
class RecoverMigrationStateFromZKEvent extends MigrationEvent {
@Override
public void run() throws Exception {
if (checkDriverState(MigrationDriverState.UNINITIALIZED, this)) {
applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState);
String maybeDone = migrationLeadershipState.initialZkMigrationComplete() ? "done" : "not done";
log.info("Initial migration of ZK metadata is {}.", maybeDone);
// Once we've recovered the migration state from ZK, install this class as a metadata publisher
// by calling the initialZkLoadHandler.
initialZkLoadHandler.accept(KRaftMigrationDriver.this);
// Transition to INACTIVE state and wait for leadership events.
transitionTo(MigrationDriverState.INACTIVE);
}
}
}
class PollEvent extends MigrationEvent {
@Override
public void run() throws Exception {
switch (migrationState) {
case UNINITIALIZED:
recoverMigrationStateFromZK();
eventQueue.append(new RecoverMigrationStateFromZKEvent());
break;
case INACTIVE:
// Nothing to do when the driver is inactive. We must wait until a KRaftLeaderEvent

View File

@ -253,7 +253,7 @@ public class KRaftMigrationDriverTest {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
driver.start();
startAndWaitForRecoveringMigrationStateFromZK(driver);
setupDeltaForMigration(delta, registerControllers);
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(1));
@ -338,7 +338,7 @@ public class KRaftMigrationDriverTest {
MetadataDelta delta = new MetadataDelta(image);
setupDeltaForMigration(delta, true);
driver.start();
startAndWaitForRecoveringMigrationStateFromZK(driver);
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(1));
delta.replay(zkBrokerRecord(2));
@ -363,6 +363,62 @@ public class KRaftMigrationDriverTest {
}
}
@Test
public void testMigrationWithClientExceptionWhileMigratingZnodeCreation() throws Exception {
CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
// suppose the ZNode creation failed 3 times
CountDownLatch createZnodeAttempts = new CountDownLatch(3);
CapturingMigrationClient migrationClient = new CapturingMigrationClient(new HashSet<>(Arrays.asList(1, 2, 3)),
new CapturingTopicMigrationClient(),
new CapturingConfigMigrationClient(),
new CapturingAclMigrationClient(),
new CapturingDelegationTokenMigrationClient(),
CapturingMigrationClient.EMPTY_BATCH_SUPPLIER) {
@Override
public ZkMigrationLeadershipState getOrCreateMigrationRecoveryState(ZkMigrationLeadershipState initialState) {
if (createZnodeAttempts.getCount() == 0) {
this.setMigrationRecoveryState(initialState);
return initialState;
} else {
createZnodeAttempts.countDown();
throw new MigrationClientException("Some kind of ZK error!");
}
}
};
MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration");
KRaftMigrationDriver.Builder builder = defaultTestBuilder()
.setZkMigrationClient(migrationClient)
.setFaultHandler(faultHandler)
.setPropagator(metadataPropagator);
try (KRaftMigrationDriver driver = builder.build()) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
setupDeltaForMigration(delta, true);
startAndWaitForRecoveringMigrationStateFromZK(driver);
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(1));
delta.replay(zkBrokerRecord(2));
delta.replay(zkBrokerRecord(3));
MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
image = delta.apply(provenance);
// Before leadership claiming, the getOrCreateMigrationRecoveryState should be able to get correct state
assertTrue(createZnodeAttempts.await(1, TimeUnit.MINUTES));
// Notify the driver that it is the leader
driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1));
// Publish metadata of all the ZK brokers being ready
driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance,
new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
Assertions.assertNull(faultHandler.firstException());
}
}
private void setupDeltaForMigration(
MetadataDelta delta,
boolean registerControllers
@ -413,7 +469,7 @@ public class KRaftMigrationDriverTest {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
driver.start();
startAndWaitForRecoveringMigrationStateFromZK(driver);
if (allNodePresent) {
setupDeltaWithControllerRegistrations(delta, Arrays.asList(4, 5, 6), Arrays.asList());
} else {
@ -469,7 +525,7 @@ public class KRaftMigrationDriverTest {
migrationClient.setMigrationRecoveryState(
ZkMigrationLeadershipState.EMPTY.withKRaftMetadataOffsetAndEpoch(100, 1));
driver.start();
startAndWaitForRecoveringMigrationStateFromZK(driver);
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(1));
delta.replay(zkBrokerRecord(2));
@ -483,7 +539,7 @@ public class KRaftMigrationDriverTest {
new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
}
}
@ -546,7 +602,7 @@ public class KRaftMigrationDriverTest {
DelegationTokenImage.EMPTY);
MetadataDelta delta = new MetadataDelta(image);
driver.start();
startAndWaitForRecoveringMigrationStateFromZK(driver);
setupDeltaForMigration(delta, true);
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(0));
@ -565,7 +621,7 @@ public class KRaftMigrationDriverTest {
// Wait for migration
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
// Modify topics in a KRaft snapshot -- delete foo, modify bar, add baz, add new foo, add bam, delete bam
provenance = new MetadataProvenance(200, 1, 1);
@ -601,7 +657,7 @@ public class KRaftMigrationDriverTest {
DelegationTokenImage.EMPTY);
MetadataDelta delta = new MetadataDelta(image);
driver.start();
startAndWaitForRecoveringMigrationStateFromZK(driver);
setupDeltaForMigration(delta, true);
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(0));
@ -656,7 +712,7 @@ public class KRaftMigrationDriverTest {
DelegationTokenImage.EMPTY);
MetadataDelta delta = new MetadataDelta(image);
driver.start();
startAndWaitForRecoveringMigrationStateFromZK(driver);
setupDeltaForMigration(delta, true);
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(0));
@ -673,7 +729,7 @@ public class KRaftMigrationDriverTest {
driver.onControllerChange(newLeader);
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM),
"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
"Waiting for KRaftMigrationDriver to enter WAIT_FOR_CONTROLLER_QUORUM state");
driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance, newLeader).build());
@ -711,7 +767,7 @@ public class KRaftMigrationDriverTest {
DelegationTokenImage.EMPTY);
MetadataDelta delta = new MetadataDelta(image);
driver.start();
startAndWaitForRecoveringMigrationStateFromZK(driver);
setupDeltaForMigration(delta, true);
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(0));
@ -778,7 +834,7 @@ public class KRaftMigrationDriverTest {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
driver.start();
startAndWaitForRecoveringMigrationStateFromZK(driver);
setupDeltaForMigration(delta, true);
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(1));
@ -798,7 +854,7 @@ public class KRaftMigrationDriverTest {
new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
assertEquals(1, migrationBeginCalls.get());
}
}
@ -864,7 +920,7 @@ public class KRaftMigrationDriverTest {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
driver.start();
startAndWaitForRecoveringMigrationStateFromZK(driver);
setupDeltaForMigration(delta, true);
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(1));
@ -881,10 +937,18 @@ public class KRaftMigrationDriverTest {
new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
assertEquals(expectedBatchCount, batchesPassedToController.size());
assertEquals(expectedRecordCount, batchesPassedToController.stream().mapToInt(List::size).sum());
}
}
// Wait until the driver has recovered MigrationState From ZK. This is to simulate the driver needs to be installed as the metadata publisher
// so that it can receive onControllerChange (KRaftLeaderEvent) and onMetadataUpdate (MetadataChangeEvent) events.
private void startAndWaitForRecoveringMigrationStateFromZK(KRaftMigrationDriver driver) throws InterruptedException {
driver.start();
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.INACTIVE),
"Waiting for KRaftMigrationDriver to enter INACTIVE state");
}
}