diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index 4c796f9eade..7cf82b8762c 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -165,19 +165,6 @@ public class KRaftMigrationDriver implements MetadataPublisher { return stateFuture; } - private void recoverMigrationStateFromZK() { - applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState); - String maybeDone = migrationLeadershipState.initialZkMigrationComplete() ? "done" : "not done"; - log.info("Initial migration of ZK metadata is {}.", maybeDone); - - // Once we've recovered the migration state from ZK, install this class as a metadata publisher - // by calling the initialZkLoadHandler. - initialZkLoadHandler.accept(this); - - // Transition to INACTIVE state and wait for leadership events. - transitionTo(MigrationDriverState.INACTIVE); - } - private boolean isControllerQuorumReadyForMigration() { Optional notReadyMsg = this.quorumFeatures.reasonAllControllersZkMigrationNotReady( image.features().metadataVersion(), image.cluster().controllers()); @@ -414,7 +401,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { /** * An event generated by a call to {@link MetadataPublisher#onControllerChange}. This will not be called until * this class is registered with {@link org.apache.kafka.image.loader.MetadataLoader}. The registration happens - * after the migration state is loaded from ZooKeeper in {@link #recoverMigrationStateFromZK}. + * after the migration state is loaded from ZooKeeper in {@link RecoverMigrationStateFromZKEvent}. */ class KRaftLeaderEvent extends MigrationEvent { private final LeaderAndEpoch leaderAndEpoch; @@ -786,12 +773,31 @@ public class KRaftMigrationDriver implements MetadataPublisher { } } + class RecoverMigrationStateFromZKEvent extends MigrationEvent { + @Override + public void run() throws Exception { + if (checkDriverState(MigrationDriverState.UNINITIALIZED, this)) { + applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState); + String maybeDone = migrationLeadershipState.initialZkMigrationComplete() ? "done" : "not done"; + log.info("Initial migration of ZK metadata is {}.", maybeDone); + + // Once we've recovered the migration state from ZK, install this class as a metadata publisher + // by calling the initialZkLoadHandler. + initialZkLoadHandler.accept(KRaftMigrationDriver.this); + + // Transition to INACTIVE state and wait for leadership events. + transitionTo(MigrationDriverState.INACTIVE); + } + } + } + class PollEvent extends MigrationEvent { + @Override public void run() throws Exception { switch (migrationState) { case UNINITIALIZED: - recoverMigrationStateFromZK(); + eventQueue.append(new RecoverMigrationStateFromZKEvent()); break; case INACTIVE: // Nothing to do when the driver is inactive. We must wait until a KRaftLeaderEvent diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java index dea5e62db96..14f347ca1e1 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java @@ -253,7 +253,7 @@ public class KRaftMigrationDriverTest { MetadataImage image = MetadataImage.EMPTY; MetadataDelta delta = new MetadataDelta(image); - driver.start(); + startAndWaitForRecoveringMigrationStateFromZK(driver); setupDeltaForMigration(delta, registerControllers); delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message()); delta.replay(zkBrokerRecord(1)); @@ -338,7 +338,7 @@ public class KRaftMigrationDriverTest { MetadataDelta delta = new MetadataDelta(image); setupDeltaForMigration(delta, true); - driver.start(); + startAndWaitForRecoveringMigrationStateFromZK(driver); delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message()); delta.replay(zkBrokerRecord(1)); delta.replay(zkBrokerRecord(2)); @@ -363,6 +363,62 @@ public class KRaftMigrationDriverTest { } } + @Test + public void testMigrationWithClientExceptionWhileMigratingZnodeCreation() throws Exception { + CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator(); + // suppose the ZNode creation failed 3 times + CountDownLatch createZnodeAttempts = new CountDownLatch(3); + CapturingMigrationClient migrationClient = new CapturingMigrationClient(new HashSet<>(Arrays.asList(1, 2, 3)), + new CapturingTopicMigrationClient(), + new CapturingConfigMigrationClient(), + new CapturingAclMigrationClient(), + new CapturingDelegationTokenMigrationClient(), + CapturingMigrationClient.EMPTY_BATCH_SUPPLIER) { + @Override + public ZkMigrationLeadershipState getOrCreateMigrationRecoveryState(ZkMigrationLeadershipState initialState) { + if (createZnodeAttempts.getCount() == 0) { + this.setMigrationRecoveryState(initialState); + return initialState; + } else { + createZnodeAttempts.countDown(); + throw new MigrationClientException("Some kind of ZK error!"); + } + } + }; + MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration"); + KRaftMigrationDriver.Builder builder = defaultTestBuilder() + .setZkMigrationClient(migrationClient) + .setFaultHandler(faultHandler) + .setPropagator(metadataPropagator); + try (KRaftMigrationDriver driver = builder.build()) { + MetadataImage image = MetadataImage.EMPTY; + MetadataDelta delta = new MetadataDelta(image); + setupDeltaForMigration(delta, true); + + startAndWaitForRecoveringMigrationStateFromZK(driver); + + delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message()); + delta.replay(zkBrokerRecord(1)); + delta.replay(zkBrokerRecord(2)); + delta.replay(zkBrokerRecord(3)); + MetadataProvenance provenance = new MetadataProvenance(100, 1, 1); + image = delta.apply(provenance); + // Before leadership claiming, the getOrCreateMigrationRecoveryState should be able to get correct state + assertTrue(createZnodeAttempts.await(1, TimeUnit.MINUTES)); + + // Notify the driver that it is the leader + driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1)); + // Publish metadata of all the ZK brokers being ready + driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance, + new LeaderAndEpoch(OptionalInt.of(3000), 1)).build()); + + TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE), + "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state"); + + Assertions.assertNull(faultHandler.firstException()); + } + } + private void setupDeltaForMigration( MetadataDelta delta, boolean registerControllers @@ -413,7 +469,7 @@ public class KRaftMigrationDriverTest { MetadataImage image = MetadataImage.EMPTY; MetadataDelta delta = new MetadataDelta(image); - driver.start(); + startAndWaitForRecoveringMigrationStateFromZK(driver); if (allNodePresent) { setupDeltaWithControllerRegistrations(delta, Arrays.asList(4, 5, 6), Arrays.asList()); } else { @@ -469,7 +525,7 @@ public class KRaftMigrationDriverTest { migrationClient.setMigrationRecoveryState( ZkMigrationLeadershipState.EMPTY.withKRaftMetadataOffsetAndEpoch(100, 1)); - driver.start(); + startAndWaitForRecoveringMigrationStateFromZK(driver); delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message()); delta.replay(zkBrokerRecord(1)); delta.replay(zkBrokerRecord(2)); @@ -483,7 +539,7 @@ public class KRaftMigrationDriverTest { new LeaderAndEpoch(OptionalInt.of(3000), 1)).build()); TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE), - "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state"); + "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state"); } } @@ -546,7 +602,7 @@ public class KRaftMigrationDriverTest { DelegationTokenImage.EMPTY); MetadataDelta delta = new MetadataDelta(image); - driver.start(); + startAndWaitForRecoveringMigrationStateFromZK(driver); setupDeltaForMigration(delta, true); delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message()); delta.replay(zkBrokerRecord(0)); @@ -565,7 +621,7 @@ public class KRaftMigrationDriverTest { // Wait for migration TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE), - "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state"); + "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state"); // Modify topics in a KRaft snapshot -- delete foo, modify bar, add baz, add new foo, add bam, delete bam provenance = new MetadataProvenance(200, 1, 1); @@ -601,7 +657,7 @@ public class KRaftMigrationDriverTest { DelegationTokenImage.EMPTY); MetadataDelta delta = new MetadataDelta(image); - driver.start(); + startAndWaitForRecoveringMigrationStateFromZK(driver); setupDeltaForMigration(delta, true); delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message()); delta.replay(zkBrokerRecord(0)); @@ -656,7 +712,7 @@ public class KRaftMigrationDriverTest { DelegationTokenImage.EMPTY); MetadataDelta delta = new MetadataDelta(image); - driver.start(); + startAndWaitForRecoveringMigrationStateFromZK(driver); setupDeltaForMigration(delta, true); delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message()); delta.replay(zkBrokerRecord(0)); @@ -673,7 +729,7 @@ public class KRaftMigrationDriverTest { driver.onControllerChange(newLeader); TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM), - "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state"); + "Waiting for KRaftMigrationDriver to enter WAIT_FOR_CONTROLLER_QUORUM state"); driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance, newLeader).build()); @@ -711,7 +767,7 @@ public class KRaftMigrationDriverTest { DelegationTokenImage.EMPTY); MetadataDelta delta = new MetadataDelta(image); - driver.start(); + startAndWaitForRecoveringMigrationStateFromZK(driver); setupDeltaForMigration(delta, true); delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message()); delta.replay(zkBrokerRecord(0)); @@ -778,7 +834,7 @@ public class KRaftMigrationDriverTest { MetadataImage image = MetadataImage.EMPTY; MetadataDelta delta = new MetadataDelta(image); - driver.start(); + startAndWaitForRecoveringMigrationStateFromZK(driver); setupDeltaForMigration(delta, true); delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message()); delta.replay(zkBrokerRecord(1)); @@ -798,7 +854,7 @@ public class KRaftMigrationDriverTest { new LeaderAndEpoch(OptionalInt.of(3000), 1)).build()); TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE), - "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state"); + "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state"); assertEquals(1, migrationBeginCalls.get()); } } @@ -864,7 +920,7 @@ public class KRaftMigrationDriverTest { MetadataImage image = MetadataImage.EMPTY; MetadataDelta delta = new MetadataDelta(image); - driver.start(); + startAndWaitForRecoveringMigrationStateFromZK(driver); setupDeltaForMigration(delta, true); delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message()); delta.replay(zkBrokerRecord(1)); @@ -881,10 +937,18 @@ public class KRaftMigrationDriverTest { new LeaderAndEpoch(OptionalInt.of(3000), 1)).build()); TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE), - "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state"); + "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state"); assertEquals(expectedBatchCount, batchesPassedToController.size()); assertEquals(expectedRecordCount, batchesPassedToController.stream().mapToInt(List::size).sum()); } } + + // Wait until the driver has recovered MigrationState From ZK. This is to simulate the driver needs to be installed as the metadata publisher + // so that it can receive onControllerChange (KRaftLeaderEvent) and onMetadataUpdate (MetadataChangeEvent) events. + private void startAndWaitForRecoveringMigrationStateFromZK(KRaftMigrationDriver driver) throws InterruptedException { + driver.start(); + TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.INACTIVE), + "Waiting for KRaftMigrationDriver to enter INACTIVE state"); + } }