MINOR: some ZK migration code cleanups.

Some minor improvements to the JavaDoc for ZkMigrationState.

Rename MigrationState to MigrationDriverState to avoid confusion with ZkMigrationState.

Remove ClusterImage#zkBrokers. This costs O(num_brokers) time to calculate, but is only ever used
when in migration state. It should just be calculated in the migration code. (Additionally, the
function ClusterImage.zkBrokers() returns something other than ClusterImage#zkBrokers, which is
confusing.)

Also remove ClusterDelta#liveZkBrokerIdChanges. This is only used in one place, and it's easy to
calculate it there. In general we should avoid providing expensive accessors unless absolutely
necessary. Expensive code should look expensive: if people want to iterate over all brokers, they
can write a loop to do that rather than hiding it inside an accessor.
This commit is contained in:
Colin P. McCabe 2023-01-30 15:10:19 -08:00
parent 7322f4cd55
commit 6b89672b5e
8 changed files with 83 additions and 90 deletions

View File

@ -23,12 +23,14 @@ import kafka.server.KafkaConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Time
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicsImage}
import org.apache.kafka.metadata.PartitionRegistration
import org.apache.kafka.metadata.migration.LegacyPropagator
import org.apache.kafka.server.common.MetadataVersion
import java.util
import scala.jdk.CollectionConverters._
import scala.compat.java8.OptionConverters._
class MigrationPropagator(
nodeId: Int,
@ -79,6 +81,18 @@ class MigrationPropagator(
_image = image
}
/**
* A very expensive function that creates a map with an entry for every partition that exists, from
* (topic name, partition index) to partition registration.
*/
def materializePartitions(topicsImage: TopicsImage): util.Map[TopicPartition, PartitionRegistration] = {
val result = new util.HashMap[TopicPartition, PartitionRegistration]()
topicsImage.topicsById().values().forEach(topic => {
topic.partitions().forEach((key, value) => result.put(new TopicPartition(topic.name(), key), value));
})
result
}
override def sendRPCsToBrokersFromMetadataDelta(delta: MetadataDelta, image: MetadataImage,
zkControllerEpoch: Int): Unit = {
publishMetadata(image)
@ -87,15 +101,19 @@ class MigrationPropagator(
delta.getOrCreateTopicsDelta()
delta.getOrCreateClusterDelta()
val changedZkBrokers = delta.clusterDelta().liveZkBrokerIdChanges().asScala.map(_.toInt).toSet
val zkBrokers = image.cluster().zkBrokers().keySet().asScala.map(_.toInt).toSet
val changedZkBrokers = delta.clusterDelta().changedBrokers().values().asScala.map(_.asScala).filter {
case None => false
case Some(registration) => registration.isMigratingZkBroker && !registration.fenced()
}.map(_.get.id()).toSet
val zkBrokers = image.cluster().brokers().values().asScala.filter(_.isMigratingZkBroker).map(_.id()).toSet
val oldZkBrokers = zkBrokers -- changedZkBrokers
val brokersChanged = !delta.clusterDelta().changedBrokers().isEmpty
// First send metadata about the live/dead brokers to all the zk brokers.
if (changedZkBrokers.nonEmpty) {
// Update new Zk brokers about all the metadata.
requestBatch.addUpdateMetadataRequestForBrokers(changedZkBrokers.toSeq, image.topics().partitions().keySet().asScala)
requestBatch.addUpdateMetadataRequestForBrokers(changedZkBrokers.toSeq, materializePartitions(image.topics()).asScala.keySet)
}
if (brokersChanged) {
requestBatch.addUpdateMetadataRequestForBrokers(oldZkBrokers.toSeq)
@ -107,7 +125,7 @@ class MigrationPropagator(
// brokers based on the topic changes.
if (changedZkBrokers.nonEmpty) {
// For new the brokers, check if there are partition assignments and add LISR appropriately.
image.topics().partitions().asScala.foreach { case (tp, partitionRegistration) =>
materializePartitions(image.topics()).asScala.foreach { case (tp, partitionRegistration) =>
val replicas = partitionRegistration.replicas.toSet
val leaderIsrAndControllerEpochOpt = MigrationControllerChannelContext.partitionLeadershipInfo(image, tp)
val newBrokersWithReplicas = replicas.intersect(changedZkBrokers)
@ -181,12 +199,12 @@ class MigrationPropagator(
override def sendRPCsToBrokersFromMetadataImage(image: MetadataImage, zkControllerEpoch: Int): Unit = {
publishMetadata(image)
val zkBrokers = image.cluster().zkBrokers().keySet().asScala.map(_.toInt).toSeq
val partitions = image.topics().partitions()
val zkBrokers = image.cluster().brokers().values().asScala.filter(_.isMigratingZkBroker).map(_.id()).toSeq
val partitions = materializePartitions(image.topics())
// First send all the metadata before sending any other requests to make sure subsequent
// requests are handled correctly.
requestBatch.newBatch()
requestBatch.addUpdateMetadataRequestForBrokers(zkBrokers, partitions.keySet().asScala)
requestBatch.addUpdateMetadataRequestForBrokers(zkBrokers, partitions.keySet.asScala)
requestBatch.sendRequestsToBrokers(zkControllerEpoch)
requestBatch.newBatch()

View File

@ -31,8 +31,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
@ -58,17 +56,6 @@ public final class ClusterDelta {
return image.broker(nodeId);
}
public Set<Integer> liveZkBrokerIdChanges() {
return changedBrokers
.values()
.stream()
.filter(Optional::isPresent)
.map(Optional::get)
.filter(registration -> registration.isMigratingZkBroker() && !registration.fenced())
.map(BrokerRegistration::id)
.collect(Collectors.toSet());
}
public void finishSnapshot() {
for (Integer brokerId : image.brokers().keySet()) {
if (!changedBrokers.containsKey(brokerId)) {

View File

@ -34,15 +34,9 @@ public final class ClusterImage {
public static final ClusterImage EMPTY = new ClusterImage(Collections.emptyMap());
private final Map<Integer, BrokerRegistration> brokers;
private final Map<Integer, BrokerRegistration> zkBrokers;
public ClusterImage(Map<Integer, BrokerRegistration> brokers) {
this.brokers = Collections.unmodifiableMap(brokers);
this.zkBrokers = Collections.unmodifiableMap(brokers
.entrySet()
.stream()
.filter(entry -> entry.getValue().isMigratingZkBroker())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}
public boolean isEmpty() {
@ -53,21 +47,10 @@ public final class ClusterImage {
return brokers;
}
public Map<Integer, BrokerRegistration> zkBrokers() {
return Collections.unmodifiableMap(
brokers
.entrySet()
.stream()
.filter(x -> x.getValue().isMigratingZkBroker() && !x.getValue().fenced())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}
public BrokerRegistration broker(int nodeId) {
return brokers.get(nodeId);
}
public boolean containsBroker(int brokerId) {
return brokers.containsKey(brokerId);
}

View File

@ -17,7 +17,6 @@
package org.apache.kafka.image;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
@ -25,7 +24,6 @@ import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.util.TranslatedValueMapView;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
@ -112,14 +110,6 @@ public final class TopicsImage {
return new TranslatedValueMapView<>(topicsById, image -> image.name());
}
public Map<TopicPartition, PartitionRegistration> partitions() {
Map<TopicPartition, PartitionRegistration> partitions = new HashMap<>();
topicsById.values().forEach(topic -> {
topic.partitions().forEach((key, value) -> partitions.put(new TopicPartition(topic.name(), key), value));
});
return partitions;
}
@Override
public String toString() {
return "TopicsImage(topicsById=" + topicsById.entrySet().stream().

View File

@ -26,6 +26,7 @@ import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.loader.LogDeltaManifest;
import org.apache.kafka.image.loader.SnapshotManifest;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.LeaderAndEpoch;
@ -72,7 +73,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
*/
private final Consumer<MetadataPublisher> initialZkLoadHandler;
private volatile LeaderAndEpoch leaderAndEpoch;
private volatile MigrationState migrationState;
private volatile MigrationDriverState migrationState;
private volatile ZkMigrationLeadershipState migrationLeadershipState;
private volatile MetadataImage image;
@ -90,7 +91,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
this.propagator = propagator;
this.time = Time.SYSTEM;
this.log = LoggerFactory.getLogger(KRaftMigrationDriver.class);
this.migrationState = MigrationState.UNINITIALIZED;
this.migrationState = MigrationDriverState.UNINITIALIZED;
this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY;
this.eventQueue = new KafkaEventQueue(Time.SYSTEM, new LogContext("KRaftMigrationDriver"), "kraft-migration");
this.image = MetadataImage.EMPTY;
@ -110,8 +111,8 @@ public class KRaftMigrationDriver implements MetadataPublisher {
}
// Visible for testing
CompletableFuture<MigrationState> migrationState() {
CompletableFuture<MigrationState> stateFuture = new CompletableFuture<>();
CompletableFuture<MigrationDriverState> migrationState() {
CompletableFuture<MigrationDriverState> stateFuture = new CompletableFuture<>();
eventQueue.append(() -> stateFuture.complete(migrationState));
return stateFuture;
}
@ -123,7 +124,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
log.info("Recovered migration state {}. ZK migration is {}.", migrationLeadershipState, maybeDone);
initialZkLoadHandler.accept(this);
// Let's transition to INACTIVE state and wait for leadership events.
transitionTo(MigrationState.INACTIVE);
transitionTo(MigrationDriverState.INACTIVE);
}
private boolean isControllerQuorumReadyForMigration() {
@ -137,9 +138,12 @@ public class KRaftMigrationDriver implements MetadataPublisher {
log.info("Waiting for initial metadata publish before checking if Zk brokers are registered.");
return false;
}
Set<Integer> kraftRegisteredZkBrokers = image.cluster().zkBrokers().keySet();
Set<Integer> zkRegisteredZkBrokers = zkMigrationClient.readBrokerIdsFromTopicAssignments();
zkRegisteredZkBrokers.removeAll(kraftRegisteredZkBrokers);
for (BrokerRegistration broker : image.cluster().brokers().values()) {
if (broker.isMigratingZkBroker()) {
zkRegisteredZkBrokers.remove(broker.id());
}
}
if (zkRegisteredZkBrokers.isEmpty()) {
return true;
} else {
@ -155,43 +159,43 @@ public class KRaftMigrationDriver implements MetadataPublisher {
this.migrationLeadershipState = afterState;
}
private boolean isValidStateChange(MigrationState newState) {
private boolean isValidStateChange(MigrationDriverState newState) {
if (migrationState == newState)
return true;
switch (migrationState) {
case UNINITIALIZED:
case DUAL_WRITE:
return newState == MigrationState.INACTIVE;
return newState == MigrationDriverState.INACTIVE;
case INACTIVE:
return newState == MigrationState.WAIT_FOR_CONTROLLER_QUORUM;
return newState == MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM;
case WAIT_FOR_CONTROLLER_QUORUM:
return
newState == MigrationState.INACTIVE ||
newState == MigrationState.WAIT_FOR_BROKERS;
newState == MigrationDriverState.INACTIVE ||
newState == MigrationDriverState.WAIT_FOR_BROKERS;
case WAIT_FOR_BROKERS:
return
newState == MigrationState.INACTIVE ||
newState == MigrationState.BECOME_CONTROLLER;
newState == MigrationDriverState.INACTIVE ||
newState == MigrationDriverState.BECOME_CONTROLLER;
case BECOME_CONTROLLER:
return
newState == MigrationState.INACTIVE ||
newState == MigrationState.ZK_MIGRATION ||
newState == MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM;
newState == MigrationDriverState.INACTIVE ||
newState == MigrationDriverState.ZK_MIGRATION ||
newState == MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM;
case ZK_MIGRATION:
return
newState == MigrationState.INACTIVE ||
newState == MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM;
newState == MigrationDriverState.INACTIVE ||
newState == MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM;
case KRAFT_CONTROLLER_TO_BROKER_COMM:
return
newState == MigrationState.INACTIVE ||
newState == MigrationState.DUAL_WRITE;
newState == MigrationDriverState.INACTIVE ||
newState == MigrationDriverState.DUAL_WRITE;
default:
log.error("Migration driver trying to transition from an unknown state {}", migrationState);
return false;
}
}
private void transitionTo(MigrationState newState) {
private void transitionTo(MigrationDriverState newState) {
if (!isValidStateChange(newState)) {
log.error("Error transition in migration driver from {} to {}", migrationState, newState);
return;
@ -336,13 +340,13 @@ public class KRaftMigrationDriver implements MetadataPublisher {
default:
if (!isActive) {
apply("KRaftLeaderEvent is not active", state -> ZkMigrationLeadershipState.EMPTY);
transitionTo(MigrationState.INACTIVE);
transitionTo(MigrationDriverState.INACTIVE);
} else {
// Apply the new KRaft state
apply("KRaftLeaderEvent is active", state -> state.withNewKRaftController(nodeId, leaderAndEpoch.epoch()));
// Before becoming the controller fo ZkBrokers, we need to make sure the
// Controller Quorum can handle migration.
transitionTo(MigrationState.WAIT_FOR_CONTROLLER_QUORUM);
transitionTo(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM);
}
break;
}
@ -359,7 +363,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
log.debug("Controller Quorum is ready for Zk to KRaft migration");
// Note that leadership would not change here. Hence we do not need to
// `apply` any leadership state change.
transitionTo(MigrationState.WAIT_FOR_BROKERS);
transitionTo(MigrationDriverState.WAIT_FOR_BROKERS);
}
break;
default:
@ -380,9 +384,9 @@ public class KRaftMigrationDriver implements MetadataPublisher {
// We could not claim leadership, stay in BECOME_CONTROLLER to retry
} else {
if (!migrationLeadershipState.zkMigrationComplete()) {
transitionTo(MigrationState.ZK_MIGRATION);
transitionTo(MigrationDriverState.ZK_MIGRATION);
} else {
transitionTo(MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM);
transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM);
}
}
break;
@ -400,7 +404,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
case WAIT_FOR_BROKERS:
if (areZkBrokersReadyForMigration()) {
log.debug("Zk brokers are registered and ready for migration");
transitionTo(MigrationState.BECOME_CONTROLLER);
transitionTo(MigrationDriverState.BECOME_CONTROLLER);
}
break;
default:
@ -447,7 +451,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
offsetAndEpochAfterMigration.offset(),
offsetAndEpochAfterMigration.epoch());
apply("Migrate metadata from Zk", state -> zkMigrationClient.setMigrationRecoveryState(newState));
transitionTo(MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM);
transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM);
} catch (Throwable t) {
zkRecordConsumer.abortMigration();
super.handleException(t);
@ -460,13 +464,13 @@ public class KRaftMigrationDriver implements MetadataPublisher {
@Override
public void run() throws Exception {
// Ignore sending RPCs to the brokers since we're no longer in the state.
if (migrationState == MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM) {
if (migrationState == MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM) {
if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) >= 0) {
log.trace("Sending RPCs to broker before moving to dual-write mode using " +
"at offset and epoch {}", image.highestOffsetAndEpoch());
propagator.sendRPCsToBrokersFromMetadataImage(image, migrationLeadershipState.zkControllerEpoch());
// Migration leadership state doesn't change since we're not doing any Zk writes.
transitionTo(MigrationState.DUAL_WRITE);
transitionTo(MigrationDriverState.DUAL_WRITE);
} else {
log.trace("Ignoring using metadata image since migration leadership state is at a greater offset and epoch {}",
migrationLeadershipState.offsetAndEpoch());
@ -501,7 +505,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
KRaftMigrationDriver.this.image = image;
String metadataType = isSnapshot ? "snapshot" : "delta";
if (migrationState != MigrationState.DUAL_WRITE) {
if (migrationState != MigrationDriverState.DUAL_WRITE) {
log.trace("Received metadata {}, but the controller is not in dual-write " +
"mode. Ignoring the change to be replicated to Zookeeper", metadataType);
completionHandler.accept(null);

View File

@ -18,6 +18,10 @@
package org.apache.kafka.metadata.migration;
/**
* This is the internal state of the KRaftMigrationDriver class on a particular controller node.
* Unlike the ZkMigrationState, which is persisted in the metadata log and image, this is soft
* state which is stored in memory only.
*
* UNINITIALIZEDINACTIVEDUAL_WRITE
*
*
@ -34,7 +38,7 @@ package org.apache.kafka.metadata.migration;
*
* BECOME_CONTROLLERWAIT_FOR_BROKERS
*/
public enum MigrationState {
public enum MigrationDriverState {
UNINITIALIZED(false), // Initial state.
INACTIVE(false), // State when not the active controller.
WAIT_FOR_CONTROLLER_QUORUM(false), // Ensure all the quorum nodes are ready for migration.
@ -46,7 +50,7 @@ public enum MigrationState {
private final boolean isActiveController;
MigrationState(boolean isActiveController) {
MigrationDriverState(boolean isActiveController) {
this.isActiveController = isActiveController;
}

View File

@ -19,32 +19,39 @@ package org.apache.kafka.metadata.migration;
import java.util.Optional;
/**
* The cluster-wide ZooKeeper migration state.
*
* An enumeration of the possible states of the ZkMigrationState field in ZkMigrationStateRecord.
* This information is persisted in the metadata log and image.
*
* @see org.apache.kafka.common.metadata.ZkMigrationStateRecord
*/
public enum ZkMigrationState {
/**
* No migration has been started by the controller. The controller is in regular KRaft mode
* The cluster was created in KRaft mode. A cluster that was created in ZK mode can never attain
* this state; the endpoint of migration is POST_MIGRATION, instead.
*/
NONE((byte) 0),
/**
* A KRaft controller has been elected with "zookeeper.metadata.migration.enable" set to "true".
* The controller is now awaiting the pre-conditions for starting the migration.
* The controller is now awaiting the preconditions for starting the migration to KRaft. In this
* state, the metadata log does not yet contain the cluster's data. There is a metadata quorum,
* but it is not doing anything useful yet.
*/
PRE_MIGRATION((byte) 1),
/**
* The ZK data has been migrated and the KRaft controller is now writing metadata to both ZK and the
* metadata log. The controller will remain in this state until all of the brokers have been restarted
* in KRaft mode
* The ZK data has been migrated, and the KRaft controller is now writing metadata to both ZK
* and the metadata log. The controller will remain in this state until all of the brokers have
* been restarted in KRaft mode.
*/
MIGRATION((byte) 2),
/**
* The migration has been fully completed. The cluster is running in KRaft mode. This state will persist
* indefinitely after the migration.
* The migration from ZK has been fully completed. The cluster is running in KRaft mode. This state
* will persist indefinitely after the migration. In operational terms, this is the same as the NONE
* state.
*/
POST_MIGRATION((byte) 3);

View File

@ -280,7 +280,7 @@ public class KRaftMigrationDriverTest {
driver.publishLogDelta(delta, image, new LogDeltaManifest(provenance,
new LeaderAndEpoch(OptionalInt.of(3000), 1), 1, 100, 42));
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationState.DUAL_WRITE),
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
Assertions.assertEquals(1, metadataPropagator.images);