From 6b89672b5e5d527cf26207d3985a24025afedb1a Mon Sep 17 00:00:00 2001 From: "Colin P. McCabe" Date: Mon, 30 Jan 2023 15:10:19 -0800 Subject: [PATCH] MINOR: some ZK migration code cleanups. Some minor improvements to the JavaDoc for ZkMigrationState. Rename MigrationState to MigrationDriverState to avoid confusion with ZkMigrationState. Remove ClusterImage#zkBrokers. This costs O(num_brokers) time to calculate, but is only ever used when in migration state. It should just be calculated in the migration code. (Additionally, the function ClusterImage.zkBrokers() returns something other than ClusterImage#zkBrokers, which is confusing.) Also remove ClusterDelta#liveZkBrokerIdChanges. This is only used in one place, and it's easy to calculate it there. In general we should avoid providing expensive accessors unless absolutely necessary. Expensive code should look expensive: if people want to iterate over all brokers, they can write a loop to do that rather than hiding it inside an accessor. --- .../kafka/migration/MigrationPropagator.scala | 34 +++++++--- .../org/apache/kafka/image/ClusterDelta.java | 13 ---- .../org/apache/kafka/image/ClusterImage.java | 17 ----- .../org/apache/kafka/image/TopicsImage.java | 10 --- .../migration/KRaftMigrationDriver.java | 68 ++++++++++--------- ...onState.java => MigrationDriverState.java} | 8 ++- .../metadata/migration/ZkMigrationState.java | 21 ++++-- .../migration/KRaftMigrationDriverTest.java | 2 +- 8 files changed, 83 insertions(+), 90 deletions(-) rename metadata/src/main/java/org/apache/kafka/metadata/migration/{MigrationState.java => MigrationDriverState.java} (92%) diff --git a/core/src/main/scala/kafka/migration/MigrationPropagator.scala b/core/src/main/scala/kafka/migration/MigrationPropagator.scala index 355831166ee..80f462bbd21 100644 --- a/core/src/main/scala/kafka/migration/MigrationPropagator.scala +++ b/core/src/main/scala/kafka/migration/MigrationPropagator.scala @@ -23,12 +23,14 @@ import kafka.server.KafkaConfig import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.utils.Time -import org.apache.kafka.image.{MetadataDelta, MetadataImage} +import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicsImage} +import org.apache.kafka.metadata.PartitionRegistration import org.apache.kafka.metadata.migration.LegacyPropagator import org.apache.kafka.server.common.MetadataVersion import java.util import scala.jdk.CollectionConverters._ +import scala.compat.java8.OptionConverters._ class MigrationPropagator( nodeId: Int, @@ -79,6 +81,18 @@ class MigrationPropagator( _image = image } + /** + * A very expensive function that creates a map with an entry for every partition that exists, from + * (topic name, partition index) to partition registration. + */ + def materializePartitions(topicsImage: TopicsImage): util.Map[TopicPartition, PartitionRegistration] = { + val result = new util.HashMap[TopicPartition, PartitionRegistration]() + topicsImage.topicsById().values().forEach(topic => { + topic.partitions().forEach((key, value) => result.put(new TopicPartition(topic.name(), key), value)); + }) + result + } + override def sendRPCsToBrokersFromMetadataDelta(delta: MetadataDelta, image: MetadataImage, zkControllerEpoch: Int): Unit = { publishMetadata(image) @@ -87,15 +101,19 @@ class MigrationPropagator( delta.getOrCreateTopicsDelta() delta.getOrCreateClusterDelta() - val changedZkBrokers = delta.clusterDelta().liveZkBrokerIdChanges().asScala.map(_.toInt).toSet - val zkBrokers = image.cluster().zkBrokers().keySet().asScala.map(_.toInt).toSet + val changedZkBrokers = delta.clusterDelta().changedBrokers().values().asScala.map(_.asScala).filter { + case None => false + case Some(registration) => registration.isMigratingZkBroker && !registration.fenced() + }.map(_.get.id()).toSet + + val zkBrokers = image.cluster().brokers().values().asScala.filter(_.isMigratingZkBroker).map(_.id()).toSet val oldZkBrokers = zkBrokers -- changedZkBrokers val brokersChanged = !delta.clusterDelta().changedBrokers().isEmpty // First send metadata about the live/dead brokers to all the zk brokers. if (changedZkBrokers.nonEmpty) { // Update new Zk brokers about all the metadata. - requestBatch.addUpdateMetadataRequestForBrokers(changedZkBrokers.toSeq, image.topics().partitions().keySet().asScala) + requestBatch.addUpdateMetadataRequestForBrokers(changedZkBrokers.toSeq, materializePartitions(image.topics()).asScala.keySet) } if (brokersChanged) { requestBatch.addUpdateMetadataRequestForBrokers(oldZkBrokers.toSeq) @@ -107,7 +125,7 @@ class MigrationPropagator( // brokers based on the topic changes. if (changedZkBrokers.nonEmpty) { // For new the brokers, check if there are partition assignments and add LISR appropriately. - image.topics().partitions().asScala.foreach { case (tp, partitionRegistration) => + materializePartitions(image.topics()).asScala.foreach { case (tp, partitionRegistration) => val replicas = partitionRegistration.replicas.toSet val leaderIsrAndControllerEpochOpt = MigrationControllerChannelContext.partitionLeadershipInfo(image, tp) val newBrokersWithReplicas = replicas.intersect(changedZkBrokers) @@ -181,12 +199,12 @@ class MigrationPropagator( override def sendRPCsToBrokersFromMetadataImage(image: MetadataImage, zkControllerEpoch: Int): Unit = { publishMetadata(image) - val zkBrokers = image.cluster().zkBrokers().keySet().asScala.map(_.toInt).toSeq - val partitions = image.topics().partitions() + val zkBrokers = image.cluster().brokers().values().asScala.filter(_.isMigratingZkBroker).map(_.id()).toSeq + val partitions = materializePartitions(image.topics()) // First send all the metadata before sending any other requests to make sure subsequent // requests are handled correctly. requestBatch.newBatch() - requestBatch.addUpdateMetadataRequestForBrokers(zkBrokers, partitions.keySet().asScala) + requestBatch.addUpdateMetadataRequestForBrokers(zkBrokers, partitions.keySet.asScala) requestBatch.sendRequestsToBrokers(zkControllerEpoch) requestBatch.newBatch() diff --git a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java index 523cdf9c43d..39d6fdb3d74 100644 --- a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java @@ -31,8 +31,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; /** @@ -58,17 +56,6 @@ public final class ClusterDelta { return image.broker(nodeId); } - public Set liveZkBrokerIdChanges() { - return changedBrokers - .values() - .stream() - .filter(Optional::isPresent) - .map(Optional::get) - .filter(registration -> registration.isMigratingZkBroker() && !registration.fenced()) - .map(BrokerRegistration::id) - .collect(Collectors.toSet()); - } - public void finishSnapshot() { for (Integer brokerId : image.brokers().keySet()) { if (!changedBrokers.containsKey(brokerId)) { diff --git a/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java b/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java index 951cea2ea15..7563657e9a4 100644 --- a/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java @@ -34,15 +34,9 @@ public final class ClusterImage { public static final ClusterImage EMPTY = new ClusterImage(Collections.emptyMap()); private final Map brokers; - private final Map zkBrokers; public ClusterImage(Map brokers) { this.brokers = Collections.unmodifiableMap(brokers); - this.zkBrokers = Collections.unmodifiableMap(brokers - .entrySet() - .stream() - .filter(entry -> entry.getValue().isMigratingZkBroker()) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); } public boolean isEmpty() { @@ -53,21 +47,10 @@ public final class ClusterImage { return brokers; } - public Map zkBrokers() { - return Collections.unmodifiableMap( - brokers - .entrySet() - .stream() - .filter(x -> x.getValue().isMigratingZkBroker() && !x.getValue().fenced()) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); - } - public BrokerRegistration broker(int nodeId) { return brokers.get(nodeId); } - - public boolean containsBroker(int brokerId) { return brokers.containsKey(brokerId); } diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java b/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java index 392bea4119b..5f7db112f0a 100644 --- a/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java @@ -17,7 +17,6 @@ package org.apache.kafka.image; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.image.writer.ImageWriter; import org.apache.kafka.image.writer.ImageWriterOptions; @@ -25,7 +24,6 @@ import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.server.util.TranslatedValueMapView; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; @@ -112,14 +110,6 @@ public final class TopicsImage { return new TranslatedValueMapView<>(topicsById, image -> image.name()); } - public Map partitions() { - Map partitions = new HashMap<>(); - topicsById.values().forEach(topic -> { - topic.partitions().forEach((key, value) -> partitions.put(new TopicPartition(topic.name(), key), value)); - }); - return partitions; - } - @Override public String toString() { return "TopicsImage(topicsById=" + topicsById.entrySet().stream(). diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index 7b06982d0cc..3394741f278 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -26,6 +26,7 @@ import org.apache.kafka.image.MetadataProvenance; import org.apache.kafka.image.loader.LogDeltaManifest; import org.apache.kafka.image.loader.SnapshotManifest; import org.apache.kafka.image.publisher.MetadataPublisher; +import org.apache.kafka.metadata.BrokerRegistration; import org.apache.kafka.queue.EventQueue; import org.apache.kafka.queue.KafkaEventQueue; import org.apache.kafka.raft.LeaderAndEpoch; @@ -72,7 +73,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { */ private final Consumer initialZkLoadHandler; private volatile LeaderAndEpoch leaderAndEpoch; - private volatile MigrationState migrationState; + private volatile MigrationDriverState migrationState; private volatile ZkMigrationLeadershipState migrationLeadershipState; private volatile MetadataImage image; @@ -90,7 +91,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { this.propagator = propagator; this.time = Time.SYSTEM; this.log = LoggerFactory.getLogger(KRaftMigrationDriver.class); - this.migrationState = MigrationState.UNINITIALIZED; + this.migrationState = MigrationDriverState.UNINITIALIZED; this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY; this.eventQueue = new KafkaEventQueue(Time.SYSTEM, new LogContext("KRaftMigrationDriver"), "kraft-migration"); this.image = MetadataImage.EMPTY; @@ -110,8 +111,8 @@ public class KRaftMigrationDriver implements MetadataPublisher { } // Visible for testing - CompletableFuture migrationState() { - CompletableFuture stateFuture = new CompletableFuture<>(); + CompletableFuture migrationState() { + CompletableFuture stateFuture = new CompletableFuture<>(); eventQueue.append(() -> stateFuture.complete(migrationState)); return stateFuture; } @@ -123,7 +124,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { log.info("Recovered migration state {}. ZK migration is {}.", migrationLeadershipState, maybeDone); initialZkLoadHandler.accept(this); // Let's transition to INACTIVE state and wait for leadership events. - transitionTo(MigrationState.INACTIVE); + transitionTo(MigrationDriverState.INACTIVE); } private boolean isControllerQuorumReadyForMigration() { @@ -137,9 +138,12 @@ public class KRaftMigrationDriver implements MetadataPublisher { log.info("Waiting for initial metadata publish before checking if Zk brokers are registered."); return false; } - Set kraftRegisteredZkBrokers = image.cluster().zkBrokers().keySet(); Set zkRegisteredZkBrokers = zkMigrationClient.readBrokerIdsFromTopicAssignments(); - zkRegisteredZkBrokers.removeAll(kraftRegisteredZkBrokers); + for (BrokerRegistration broker : image.cluster().brokers().values()) { + if (broker.isMigratingZkBroker()) { + zkRegisteredZkBrokers.remove(broker.id()); + } + } if (zkRegisteredZkBrokers.isEmpty()) { return true; } else { @@ -155,43 +159,43 @@ public class KRaftMigrationDriver implements MetadataPublisher { this.migrationLeadershipState = afterState; } - private boolean isValidStateChange(MigrationState newState) { + private boolean isValidStateChange(MigrationDriverState newState) { if (migrationState == newState) return true; switch (migrationState) { case UNINITIALIZED: case DUAL_WRITE: - return newState == MigrationState.INACTIVE; + return newState == MigrationDriverState.INACTIVE; case INACTIVE: - return newState == MigrationState.WAIT_FOR_CONTROLLER_QUORUM; + return newState == MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM; case WAIT_FOR_CONTROLLER_QUORUM: return - newState == MigrationState.INACTIVE || - newState == MigrationState.WAIT_FOR_BROKERS; + newState == MigrationDriverState.INACTIVE || + newState == MigrationDriverState.WAIT_FOR_BROKERS; case WAIT_FOR_BROKERS: return - newState == MigrationState.INACTIVE || - newState == MigrationState.BECOME_CONTROLLER; + newState == MigrationDriverState.INACTIVE || + newState == MigrationDriverState.BECOME_CONTROLLER; case BECOME_CONTROLLER: return - newState == MigrationState.INACTIVE || - newState == MigrationState.ZK_MIGRATION || - newState == MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM; + newState == MigrationDriverState.INACTIVE || + newState == MigrationDriverState.ZK_MIGRATION || + newState == MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM; case ZK_MIGRATION: return - newState == MigrationState.INACTIVE || - newState == MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM; + newState == MigrationDriverState.INACTIVE || + newState == MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM; case KRAFT_CONTROLLER_TO_BROKER_COMM: return - newState == MigrationState.INACTIVE || - newState == MigrationState.DUAL_WRITE; + newState == MigrationDriverState.INACTIVE || + newState == MigrationDriverState.DUAL_WRITE; default: log.error("Migration driver trying to transition from an unknown state {}", migrationState); return false; } } - private void transitionTo(MigrationState newState) { + private void transitionTo(MigrationDriverState newState) { if (!isValidStateChange(newState)) { log.error("Error transition in migration driver from {} to {}", migrationState, newState); return; @@ -336,13 +340,13 @@ public class KRaftMigrationDriver implements MetadataPublisher { default: if (!isActive) { apply("KRaftLeaderEvent is not active", state -> ZkMigrationLeadershipState.EMPTY); - transitionTo(MigrationState.INACTIVE); + transitionTo(MigrationDriverState.INACTIVE); } else { // Apply the new KRaft state apply("KRaftLeaderEvent is active", state -> state.withNewKRaftController(nodeId, leaderAndEpoch.epoch())); // Before becoming the controller fo ZkBrokers, we need to make sure the // Controller Quorum can handle migration. - transitionTo(MigrationState.WAIT_FOR_CONTROLLER_QUORUM); + transitionTo(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM); } break; } @@ -359,7 +363,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { log.debug("Controller Quorum is ready for Zk to KRaft migration"); // Note that leadership would not change here. Hence we do not need to // `apply` any leadership state change. - transitionTo(MigrationState.WAIT_FOR_BROKERS); + transitionTo(MigrationDriverState.WAIT_FOR_BROKERS); } break; default: @@ -380,9 +384,9 @@ public class KRaftMigrationDriver implements MetadataPublisher { // We could not claim leadership, stay in BECOME_CONTROLLER to retry } else { if (!migrationLeadershipState.zkMigrationComplete()) { - transitionTo(MigrationState.ZK_MIGRATION); + transitionTo(MigrationDriverState.ZK_MIGRATION); } else { - transitionTo(MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM); + transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM); } } break; @@ -400,7 +404,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { case WAIT_FOR_BROKERS: if (areZkBrokersReadyForMigration()) { log.debug("Zk brokers are registered and ready for migration"); - transitionTo(MigrationState.BECOME_CONTROLLER); + transitionTo(MigrationDriverState.BECOME_CONTROLLER); } break; default: @@ -447,7 +451,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { offsetAndEpochAfterMigration.offset(), offsetAndEpochAfterMigration.epoch()); apply("Migrate metadata from Zk", state -> zkMigrationClient.setMigrationRecoveryState(newState)); - transitionTo(MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM); + transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM); } catch (Throwable t) { zkRecordConsumer.abortMigration(); super.handleException(t); @@ -460,13 +464,13 @@ public class KRaftMigrationDriver implements MetadataPublisher { @Override public void run() throws Exception { // Ignore sending RPCs to the brokers since we're no longer in the state. - if (migrationState == MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM) { + if (migrationState == MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM) { if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) >= 0) { log.trace("Sending RPCs to broker before moving to dual-write mode using " + "at offset and epoch {}", image.highestOffsetAndEpoch()); propagator.sendRPCsToBrokersFromMetadataImage(image, migrationLeadershipState.zkControllerEpoch()); // Migration leadership state doesn't change since we're not doing any Zk writes. - transitionTo(MigrationState.DUAL_WRITE); + transitionTo(MigrationDriverState.DUAL_WRITE); } else { log.trace("Ignoring using metadata image since migration leadership state is at a greater offset and epoch {}", migrationLeadershipState.offsetAndEpoch()); @@ -501,7 +505,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { KRaftMigrationDriver.this.image = image; String metadataType = isSnapshot ? "snapshot" : "delta"; - if (migrationState != MigrationState.DUAL_WRITE) { + if (migrationState != MigrationDriverState.DUAL_WRITE) { log.trace("Received metadata {}, but the controller is not in dual-write " + "mode. Ignoring the change to be replicated to Zookeeper", metadataType); completionHandler.accept(null); diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationState.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationDriverState.java similarity index 92% rename from metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationState.java rename to metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationDriverState.java index 5a6d3df24a3..fa871123ddd 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationState.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationDriverState.java @@ -18,6 +18,10 @@ package org.apache.kafka.metadata.migration; /** + * This is the internal state of the KRaftMigrationDriver class on a particular controller node. + * Unlike the ZkMigrationState, which is persisted in the metadata log and image, this is soft + * state which is stored in memory only. + * * UNINITIALIZED───────────────►INACTIVE◄────────────────DUAL_WRITE◄────────────────────────┐ * │ ▲ │ * │ │ │ @@ -34,7 +38,7 @@ package org.apache.kafka.metadata.migration; * ▼ │ │ │ * BECOME_CONTROLLER───────────────────►└────────────────────►WAIT_FOR_BROKERS───────────────────┘ */ -public enum MigrationState { +public enum MigrationDriverState { UNINITIALIZED(false), // Initial state. INACTIVE(false), // State when not the active controller. WAIT_FOR_CONTROLLER_QUORUM(false), // Ensure all the quorum nodes are ready for migration. @@ -46,7 +50,7 @@ public enum MigrationState { private final boolean isActiveController; - MigrationState(boolean isActiveController) { + MigrationDriverState(boolean isActiveController) { this.isActiveController = isActiveController; } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java index 239ed0ab7d6..f8404cfe5d1 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java @@ -19,32 +19,39 @@ package org.apache.kafka.metadata.migration; import java.util.Optional; /** + * The cluster-wide ZooKeeper migration state. + * * An enumeration of the possible states of the ZkMigrationState field in ZkMigrationStateRecord. + * This information is persisted in the metadata log and image. * * @see org.apache.kafka.common.metadata.ZkMigrationStateRecord */ public enum ZkMigrationState { /** - * No migration has been started by the controller. The controller is in regular KRaft mode + * The cluster was created in KRaft mode. A cluster that was created in ZK mode can never attain + * this state; the endpoint of migration is POST_MIGRATION, instead. */ NONE((byte) 0), /** * A KRaft controller has been elected with "zookeeper.metadata.migration.enable" set to "true". - * The controller is now awaiting the pre-conditions for starting the migration. + * The controller is now awaiting the preconditions for starting the migration to KRaft. In this + * state, the metadata log does not yet contain the cluster's data. There is a metadata quorum, + * but it is not doing anything useful yet. */ PRE_MIGRATION((byte) 1), /** - * The ZK data has been migrated and the KRaft controller is now writing metadata to both ZK and the - * metadata log. The controller will remain in this state until all of the brokers have been restarted - * in KRaft mode + * The ZK data has been migrated, and the KRaft controller is now writing metadata to both ZK + * and the metadata log. The controller will remain in this state until all of the brokers have + * been restarted in KRaft mode. */ MIGRATION((byte) 2), /** - * The migration has been fully completed. The cluster is running in KRaft mode. This state will persist - * indefinitely after the migration. + * The migration from ZK has been fully completed. The cluster is running in KRaft mode. This state + * will persist indefinitely after the migration. In operational terms, this is the same as the NONE + * state. */ POST_MIGRATION((byte) 3); diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java index 1bb1275fd32..b25f1a10a76 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java @@ -280,7 +280,7 @@ public class KRaftMigrationDriverTest { driver.publishLogDelta(delta, image, new LogDeltaManifest(provenance, new LeaderAndEpoch(OptionalInt.of(3000), 1), 1, 100, 42)); - TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationState.DUAL_WRITE), + TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE), "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state"); Assertions.assertEquals(1, metadataPropagator.images);