diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 63137cf6c17..20e5537fccd 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -226,6 +226,7 @@ + @@ -292,6 +293,20 @@ + + + + + + + + + + + + + + diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index d900a7ccea9..d20c10421ba 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -49,7 +49,7 @@ object ControllerChannelManager { val RequestRateAndQueueTimeMetricName = "RequestRateAndQueueTimeMs" } -class ControllerChannelManager(controllerContext: ControllerContext, +class ControllerChannelManager(controllerEpoch: () => Int, config: KafkaConfig, time: Time, metrics: Metrics, @@ -67,8 +67,8 @@ class ControllerChannelManager(controllerContext: ControllerContext, } ) - def startup() = { - controllerContext.liveOrShuttingDownBrokers.foreach(addNewBroker) + def startup(initialBrokers: Set[Broker]) = { + initialBrokers.foreach(addNewBroker) brokerLock synchronized { brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1)) @@ -173,7 +173,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, RequestRateAndQueueTimeMetricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS, brokerMetricTags(broker.id) ) - val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient, + val requestThread = new RequestSendThread(config.brokerId, controllerEpoch, messageQueue, networkClient, brokerNode, config, time, requestRateAndQueueTimeMetrics, stateChangeLogger, threadName) requestThread.setDaemon(false) @@ -214,7 +214,7 @@ case class QueueItem(apiKey: ApiKeys, request: AbstractControlRequest.Builder[_ callback: AbstractResponse => Unit, enqueueTimeMs: Long) class RequestSendThread(val controllerId: Int, - val controllerContext: ControllerContext, + val controllerEpoch: () => Int, val queue: BlockingQueue[QueueItem], val networkClient: NetworkClient, val brokerNode: Node, @@ -255,7 +255,7 @@ class RequestSendThread(val controllerId: Int, } } catch { case e: Throwable => // if the send was not successful, reconnect to broker and resend the message - warn(s"Controller $controllerId epoch ${controllerContext.epoch} fails to send request $requestBuilder " + + warn(s"Controller $controllerId epoch ${controllerEpoch.apply()} fails to send request $requestBuilder " + s"to broker $brokerNode. Reconnecting to broker.", e) networkClient.close(brokerNode.idString) isSendSuccessful = false @@ -270,7 +270,7 @@ class RequestSendThread(val controllerId: Int, val response = clientResponse.responseBody - stateChangeLogger.withControllerEpoch(controllerContext.epoch).trace(s"Received response " + + stateChangeLogger.withControllerEpoch(controllerEpoch.apply()).trace(s"Received response " + s"$response for request $api with correlation id " + s"${requestHeader.correlationId} sent to broker $brokerNode") diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 00851d6ca24..83dfbe3daa2 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -88,7 +88,7 @@ class KafkaController(val config: KafkaConfig, private val isAlterPartitionEnabled = config.interBrokerProtocolVersion.isAlterPartitionSupported private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None) val controllerContext = new ControllerContext - var controllerChannelManager = new ControllerChannelManager(controllerContext, config, time, metrics, + var controllerChannelManager = new ControllerChannelManager(() => controllerContext.epoch, config, time, metrics, stateChangeLogger, threadNamePrefix) // have a separate scheduler for the controller to be able to start and stop independently of the kafka server @@ -899,7 +899,7 @@ class KafkaController(val config: KafkaConfig, private def initializeControllerContext(): Unit = { // update controller cache with delete topic information - val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster + val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster() val (compatibleBrokerAndEpochs, incompatibleBrokerAndEpochs) = partitionOnFeatureCompatibility(curBrokerAndEpochs) if (incompatibleBrokerAndEpochs.nonEmpty) { warn("Ignoring registration of new brokers due to incompatibilities with finalized features: " + @@ -926,7 +926,7 @@ class KafkaController(val config: KafkaConfig, // update the leader and isr cache for all existing partitions from Zookeeper updateLeaderAndIsrCache() // start the channel manager - controllerChannelManager.startup() + controllerChannelManager.startup(controllerContext.liveOrShuttingDownBrokers) info(s"Currently active brokers in the cluster: ${controllerContext.liveBrokerIds}") info(s"Currently shutting brokers in the cluster: ${controllerContext.shuttingDownBrokerIds}") info(s"Current list of topics in the cluster: ${controllerContext.allTopics}") @@ -1556,7 +1556,7 @@ class KafkaController(val config: KafkaConfig, private def processBrokerChange(): Unit = { if (!isActive) return - val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster + val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster() val curBrokerIdAndEpochs = curBrokerAndEpochs map { case (broker, epoch) => (broker.id, epoch) } val curBrokerIds = curBrokerIdAndEpochs.keySet val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds diff --git a/core/src/main/scala/kafka/migration/ZkMigrationClient.scala b/core/src/main/scala/kafka/migration/ZkMigrationClient.scala new file mode 100644 index 00000000000..7d373b52727 --- /dev/null +++ b/core/src/main/scala/kafka/migration/ZkMigrationClient.scala @@ -0,0 +1,285 @@ +package kafka.migration + +import kafka.api.LeaderAndIsr +import kafka.controller.{ControllerChannelManager, LeaderIsrAndControllerEpoch, ReplicaAssignment} +import kafka.server.{ConfigEntityName, ConfigType, ZkAdminManager} +import kafka.utils.Logging +import kafka.zk.TopicZNode.TopicIdReplicaAssignment +import kafka.zk._ +import kafka.zookeeper._ +import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.errors.ControllerMovedException +import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData +import org.apache.kafka.common.metadata._ +import org.apache.kafka.common.requests.{AbstractControlRequest, AbstractResponse} +import org.apache.kafka.common.{TopicPartition, Uuid} +import org.apache.kafka.metadata.PartitionRegistration +import org.apache.kafka.migration._ +import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion} +import org.apache.zookeeper.CreateMode + +import java.util +import java.util.function.Consumer +import scala.collection.{Seq, mutable} +import scala.jdk.CollectionConverters._ + + +class ZkMigrationClient(zkClient: KafkaZkClient, + controllerChannelManager: ControllerChannelManager) extends MigrationClient with Logging { + + def claimControllerLeadership(kraftControllerId: Int, kraftControllerEpoch: Int): ZkControllerState = { + val epochZkVersionOpt = zkClient.tryRegisterKRaftControllerAsActiveController(kraftControllerId, kraftControllerEpoch) + if (epochZkVersionOpt.isDefined) { + new ZkControllerState(kraftControllerId, kraftControllerEpoch, epochZkVersionOpt.get) + } else { + throw new ControllerMovedException("Cannot claim controller leadership, the controller has moved.") + } + } + + def migrateTopics(metadataVersion: MetadataVersion, + recordConsumer: Consumer[util.List[ApiMessageAndVersion]], + brokerIdConsumer: Consumer[Integer]): Unit = { + val topics = zkClient.getAllTopicsInCluster() + val topicConfigs = zkClient.getEntitiesConfigs(ConfigType.Topic, topics) + val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics) + replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(topic, topicIdOpt, assignments) => + val partitions = assignments.keys.toSeq + val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions) + val topicBatch = new util.ArrayList[ApiMessageAndVersion]() + topicBatch.add(new ApiMessageAndVersion(new TopicRecord() + .setName(topic) + .setTopicId(topicIdOpt.get), TopicRecord.HIGHEST_SUPPORTED_VERSION)) + + assignments.foreach { case (topicPartition, replicaAssignment) => + replicaAssignment.replicas.foreach(brokerIdConsumer.accept(_)) + replicaAssignment.addingReplicas.foreach(brokerIdConsumer.accept(_)) + + val leaderIsrAndEpoch = leaderIsrAndControllerEpochs(topicPartition) + topicBatch.add(new ApiMessageAndVersion(new PartitionRecord() + .setTopicId(topicIdOpt.get) + .setPartitionId(topicPartition.partition) + .setReplicas(replicaAssignment.replicas.map(Integer.valueOf).asJava) + .setAddingReplicas(replicaAssignment.addingReplicas.map(Integer.valueOf).asJava) + .setRemovingReplicas(replicaAssignment.removingReplicas.map(Integer.valueOf).asJava) + .setIsr(leaderIsrAndEpoch.leaderAndIsr.isr.map(Integer.valueOf).asJava) + .setLeader(leaderIsrAndEpoch.leaderAndIsr.leader) + .setLeaderEpoch(leaderIsrAndEpoch.leaderAndIsr.leaderEpoch) + .setPartitionEpoch(leaderIsrAndEpoch.leaderAndIsr.partitionEpoch) + .setLeaderRecoveryState(leaderIsrAndEpoch.leaderAndIsr.leaderRecoveryState.value()), PartitionRecord.HIGHEST_SUPPORTED_VERSION)) + } + + val props = topicConfigs(topic) + props.forEach { case (key: Object, value: Object) => + topicBatch.add(new ApiMessageAndVersion(new ConfigRecord() + .setResourceType(ConfigResource.Type.TOPIC.id) + .setResourceName(topic) + .setName(key.toString) + .setValue(value.toString), ConfigRecord.HIGHEST_SUPPORTED_VERSION)) + } + + recordConsumer.accept(topicBatch) + } + } + + def migrateBrokerConfigs(metadataVersion: MetadataVersion, + recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = { + val brokerEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Broker) + val batch = new util.ArrayList[ApiMessageAndVersion]() + zkClient.getEntitiesConfigs(ConfigType.Broker, brokerEntities.toSet).foreach { case (broker, props) => + val brokerResource = if (broker == ConfigEntityName.Default) { + "" + } else { + broker + } + props.forEach { case (key: Object, value: Object) => + batch.add(new ApiMessageAndVersion(new ConfigRecord() + .setResourceType(ConfigResource.Type.BROKER.id) + .setResourceName(brokerResource) + .setName(key.toString) + .setValue(value.toString), ConfigRecord.HIGHEST_SUPPORTED_VERSION)) + } + } + recordConsumer.accept(batch) + } + + def migrateClientQuotas(metadataVersion: MetadataVersion, + recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = { + val adminZkClient = new AdminZkClient(zkClient) + + def migrateEntityType(entityType: String): Unit = { + adminZkClient.fetchAllEntityConfigs(entityType).foreach { case (name, props) => + val entity = new EntityData().setEntityType(entityType).setEntityName(name) + val batch = new util.ArrayList[ApiMessageAndVersion]() + ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { case (key: String, value: Double) => + batch.add(new ApiMessageAndVersion(new ClientQuotaRecord() + .setEntity(List(entity).asJava) + .setKey(key) + .setValue(value), ClientQuotaRecord.HIGHEST_SUPPORTED_VERSION)) + } + recordConsumer.accept(batch) + } + } + + migrateEntityType(ConfigType.User) + migrateEntityType(ConfigType.Client) + adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, ConfigType.Client).foreach { case (name, props) => + // Lifted from ZkAdminManager + val components = name.split("/") + if (components.size != 3 || components(1) != "clients") + throw new IllegalArgumentException(s"Unexpected config path: ${name}") + val entity = List( + new EntityData().setEntityType(ConfigType.User).setEntityName(components(0)), + new EntityData().setEntityType(ConfigType.Client).setEntityName(components(2)) + ) + + val batch = new util.ArrayList[ApiMessageAndVersion]() + ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { case (key: String, value: Double) => + batch.add(new ApiMessageAndVersion(new ClientQuotaRecord() + .setEntity(entity.asJava) + .setKey(key) + .setValue(value), ClientQuotaRecord.HIGHEST_SUPPORTED_VERSION)) + } + recordConsumer.accept(batch) + } + + migrateEntityType(ConfigType.Ip) + } + + def migrateProducerId(metadataVersion: MetadataVersion, + recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = { + val (dataOpt, _) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path) + dataOpt match { + case Some(data) => + val producerIdBlock = ProducerIdBlockZNode.parseProducerIdBlockData(data) + recordConsumer.accept(List(new ApiMessageAndVersion(new ProducerIdsRecord() + .setBrokerEpoch(-1) + .setBrokerId(producerIdBlock.assignedBrokerId) + .setNextProducerId(producerIdBlock.firstProducerId), ProducerIdsRecord.HIGHEST_SUPPORTED_VERSION)).asJava) + case None => // Nothing to migrate + } + } + + override def readAllMetadata(batchConsumer: Consumer[util.List[ApiMessageAndVersion]], brokerIdConsumer: Consumer[Integer]): Unit = { + migrateTopics(MetadataVersion.latest(), batchConsumer, brokerIdConsumer) + migrateBrokerConfigs(MetadataVersion.latest(), batchConsumer) + migrateClientQuotas(MetadataVersion.latest(), batchConsumer) + migrateProducerId(MetadataVersion.latest(), batchConsumer) + } + + override def readBrokerIds(): util.Set[Integer] = { + zkClient.getSortedBrokerList.map(Integer.valueOf).toSet.asJava + } + + override def readBrokerIdsFromTopicAssignments(): util.Set[Integer] = { + val topics = zkClient.getAllTopicsInCluster() + val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics) + val brokersWithAssignments = mutable.HashSet[Int]() + replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(_, _, assignments) => + assignments.values.foreach { assignment => brokersWithAssignments.addAll(assignment.replicas) } + } + brokersWithAssignments.map(Integer.valueOf).asJava + } + + override def addZkBroker(brokerId: Int): Unit = { + val brokerAndEpoch = zkClient.getAllBrokerAndEpochsInCluster(Seq(brokerId)) + controllerChannelManager.addBroker(brokerAndEpoch.head._1) + } + + override def removeZkBroker(brokerId: Int): Unit = { + controllerChannelManager.removeBroker(brokerId) + } + + override def getOrCreateMigrationRecoveryState(initialState: MigrationRecoveryState): MigrationRecoveryState = { + zkClient.getOrCreateMigrationState(initialState) + } + + override def setMigrationRecoveryState(state: MigrationRecoveryState): MigrationRecoveryState = { + zkClient.updateMigrationState(state) + } + + override def sendRequestToBroker(brokerId: Int, + request: AbstractControlRequest.Builder[_ <: AbstractControlRequest], + callback: Consumer[AbstractResponse]): Unit = { + controllerChannelManager.sendRequest(brokerId, request, callback.accept) + } + + override def createTopic(topicName: String, topicId: Uuid, partitions: util.Map[Integer, PartitionRegistration], state: MigrationRecoveryState): MigrationRecoveryState = { + val assignments = partitions.asScala.map { case (partitionId, partition) => + new TopicPartition(topicName, partitionId) -> ReplicaAssignment(partition.replicas, partition.addingReplicas, partition.removingReplicas) + } + + val createTopicZNode = { + val path = TopicZNode.path(topicName) + CreateRequest( + path, + TopicZNode.encode(Some(topicId), assignments), + zkClient.defaultAcls(path), + CreateMode.PERSISTENT) + } + val createPartitionsZNode = { + val path = TopicPartitionsZNode.path(topicName) + CreateRequest( + path, + null, + zkClient.defaultAcls(path), + CreateMode.PERSISTENT) + } + + val createPartitionZNodeReqs = partitions.asScala.flatMap { case (partitionId, partition) => + val topicPartition = new TopicPartition(topicName, partitionId) + Seq( + createTopicPartition(topicPartition), + createTopicPartitionState(topicPartition, partition, state.kraftControllerEpoch()) + ) + } + + val requests = Seq(createTopicZNode, createPartitionsZNode) ++ createPartitionZNodeReqs + val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests, state.controllerZkVersion(), state) + responses.foreach(System.err.println) + state.withZkVersion(migrationZkVersion) + } + + private def createTopicPartition(topicPartition: TopicPartition): CreateRequest = { + val path = TopicPartitionZNode.path(topicPartition) + CreateRequest(path, null, zkClient.defaultAcls(path), CreateMode.PERSISTENT, Some(topicPartition)) + } + + private def createTopicPartitionState(topicPartition: TopicPartition, partitionRegistration: PartitionRegistration, controllerEpoch: Int): CreateRequest = { + val path = TopicPartitionStateZNode.path(topicPartition) + val data = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(new LeaderAndIsr( + partitionRegistration.leader, + partitionRegistration.leaderEpoch, + partitionRegistration.isr.toList, + partitionRegistration.leaderRecoveryState, + partitionRegistration.partitionEpoch), controllerEpoch)) + CreateRequest(path, data, zkClient.defaultAcls(path), CreateMode.PERSISTENT, Some(topicPartition)) + } + + private def updateTopicPartitionState(topicPartition: TopicPartition, partitionRegistration: PartitionRegistration, controllerEpoch: Int): SetDataRequest = { + val path = TopicPartitionStateZNode.path(topicPartition) + val data = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(new LeaderAndIsr( + partitionRegistration.leader, + partitionRegistration.leaderEpoch, + partitionRegistration.isr.toList, + partitionRegistration.leaderRecoveryState, + partitionRegistration.partitionEpoch), controllerEpoch)) + SetDataRequest(path, data, ZkVersion.MatchAnyVersion, Some(topicPartition)) + } + + override def updateTopicPartitions(topicPartitions: util.Map[String, util.Map[Integer, PartitionRegistration]], + state: MigrationRecoveryState): MigrationRecoveryState = { + val requests = topicPartitions.asScala.flatMap { case (topicName, partitionRegistrations) => + partitionRegistrations.asScala.flatMap { case (partitionId, partitionRegistration) => + val topicPartition = new TopicPartition(topicName, partitionId) + Seq(updateTopicPartitionState(topicPartition, partitionRegistration, state.kraftControllerEpoch())) + } + } + if (requests.isEmpty) { + state + } else { + val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests.toSeq, state.controllerZkVersion(), state) + responses.foreach(System.err.println) + state.withZkVersion(migrationZkVersion) + } + } +} diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala b/core/src/main/scala/kafka/server/ZkAdminManager.scala index f65367606da..280afe46147 100644 --- a/core/src/main/scala/kafka/server/ZkAdminManager.scala +++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala @@ -54,6 +54,19 @@ import org.apache.kafka.common.utils.Sanitizer import scala.collection.{Map, mutable, _} import scala.jdk.CollectionConverters._ +object ZkAdminManager { + def clientQuotaPropsToDoubleMap(props: Map[String, String]): Map[String, Double] = { + props.map { case (key, value) => + val doubleValue = try value.toDouble catch { + case _: NumberFormatException => + throw new IllegalStateException(s"Unexpected client quota configuration value: $key -> $value") + } + key -> doubleValue + } + } +} + + class ZkAdminManager(val config: KafkaConfig, val metrics: Metrics, val metadataCache: MetadataCache, @@ -636,16 +649,6 @@ class ZkAdminManager(val config: KafkaConfig, private def sanitized(name: Option[String]): String = name.map(n => sanitizeEntityName(n)).getOrElse("") - private def fromProps(props: Map[String, String]): Map[String, Double] = { - props.map { case (key, value) => - val doubleValue = try value.toDouble catch { - case _: NumberFormatException => - throw new IllegalStateException(s"Unexpected client quota configuration value: $key -> $value") - } - key -> doubleValue - } - } - def handleDescribeClientQuotas(userComponent: Option[ClientQuotaFilterComponent], clientIdComponent: Option[ClientQuotaFilterComponent], strict: Boolean): Map[ClientQuotaEntity, Map[String, Double]] = { @@ -706,7 +709,7 @@ class ZkAdminManager(val config: KafkaConfig, (userEntries ++ clientIdEntries ++ bothEntries).flatMap { case ((u, c), p) => val quotaProps = p.asScala.filter { case (key, _) => QuotaConfigs.isClientOrUserQuotaConfig(key) } if (quotaProps.nonEmpty && matches(userComponent, u) && matches(clientIdComponent, c)) - Some(userClientIdToEntity(u, c) -> fromProps(quotaProps)) + Some(userClientIdToEntity(u, c) -> ZkAdminManager.clientQuotaPropsToDoubleMap(quotaProps)) else None }.toMap @@ -732,7 +735,7 @@ class ZkAdminManager(val config: KafkaConfig, ipEntries.flatMap { case (ip, props) => val ipQuotaProps = props.asScala.filter { case (key, _) => DynamicConfig.Ip.names.contains(key) } if (ipQuotaProps.nonEmpty) - Some(ipToQuotaEntity(ip) -> fromProps(ipQuotaProps)) + Some(ipToQuotaEntity(ip) -> ZkAdminManager.clientQuotaPropsToDoubleMap(ipQuotaProps)) else None } diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index fa7ce00882a..e2287020647 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -34,12 +34,13 @@ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceT import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation} import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} +import org.apache.kafka.migration.MigrationRecoveryState import org.apache.zookeeper.KeeperException.{Code, NodeExistsException} -import org.apache.zookeeper.OpResult.{CreateResult, ErrorResult, SetDataResult} +import org.apache.zookeeper.OpResult.{CheckResult, CreateResult, ErrorResult, SetDataResult} import org.apache.zookeeper.client.ZKClientConfig import org.apache.zookeeper.common.ZKConfig import org.apache.zookeeper.data.{ACL, Stat} -import org.apache.zookeeper.{CreateMode, KeeperException, ZooKeeper} +import org.apache.zookeeper.{CreateMode, KeeperException, OpResult, ZooKeeper} import scala.collection.{Map, Seq, mutable} @@ -140,14 +141,14 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo def tryCreateControllerZNodeAndIncrementEpoch(): (Int, Int) = { val response = retryRequestUntilConnected( - MultiRequest(Seq( - CreateOp(ControllerZNode.path, ControllerZNode.encode(controllerId, timestamp), defaultAcls(ControllerZNode.path), CreateMode.EPHEMERAL), - SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(newControllerEpoch), expectedControllerEpochZkVersion))) - ) + MultiRequest(Seq( + SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(newControllerEpoch), expectedControllerEpochZkVersion), + CreateOp(ControllerZNode.path, ControllerZNode.encode(controllerId, timestamp), defaultAcls(ControllerZNode.path), CreateMode.EPHEMERAL))) + ) response.resultCode match { case Code.NODEEXISTS | Code.BADVERSION => checkControllerAndEpoch() case Code.OK => - val setDataResult = response.zkOpResults(1).rawOpResult.asInstanceOf[SetDataResult] + val setDataResult = response.zkOpResults(0).rawOpResult.asInstanceOf[SetDataResult] (newControllerEpoch, setDataResult.getStat.getVersion) case code => throw KeeperException.create(code) } @@ -156,6 +157,76 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo tryCreateControllerZNodeAndIncrementEpoch() } + /** + * Registers a given KRaft controller in zookeeper as the active controller. Unlike the ZK equivalent of this method, + * this creates /controller as a persistent znode. This prevents ZK brokers from attempting to claim the controller + * leadership during a KRaft leadership failover. + * + * This method is called at the beginning of a KRaft migration and during subsequent KRaft leadership changes during + * the migration. + * + * To ensure that the KRaft controller epoch proceeds the current ZK controller epoch, this registration algorithm + * uses a conditional update on the /controller_epoch znode. If a new ZK controller is elected during this method, + * the multi request transaction will fail and this method will return None. + * + * @param kraftControllerId ID of the KRaft controller node + * @param kraftControllerEpoch Epoch of the KRaft controller node + * @return An optional of the new zkVersion of /controller_epoch. None if we could not register the KRaft controller. + */ + def tryRegisterKRaftControllerAsActiveController(kraftControllerId: Int, kraftControllerEpoch: Int): Option[Int] = { + val timestamp = time.milliseconds() + val curEpochOpt: Option[(Int, Int)] = getControllerEpoch.map(e => (e._1, e._2.getVersion)) + val controllerOpt = getControllerId + + curEpochOpt match { + case None => + throw new IllegalStateException(s"Cannot register KRaft controller $kraftControllerId as the active controller " + + s"since there is no ZK controller epoch present.") + case Some((curEpoch: Int, curEpochZk: Int)) => + if (curEpoch >= kraftControllerEpoch) { + // TODO actually raise exception here + warn(s"Cannot register KRaft controller $kraftControllerId as the active controller " + + s"since epoch is not higher. Current ZK epoch is ${curEpoch}, KRaft epoch is $kraftControllerEpoch.") + } + + val response = if (controllerOpt.isDefined) { + info(s"KRaft controller $kraftControllerId overwriting ${ControllerZNode.path} to become the active controller with epoch $kraftControllerEpoch") + retryRequestUntilConnected( + MultiRequest(Seq( + SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(kraftControllerEpoch), curEpochZk), + DeleteOp(ControllerZNode.path, ZkVersion.MatchAnyVersion), + CreateOp(ControllerZNode.path, ControllerZNode.encode(kraftControllerId, timestamp), defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT))) + ) + } else { + info(s"KRaft controller $kraftControllerId creating ${ControllerZNode.path} to become the active controller with epoch $kraftControllerEpoch") + retryRequestUntilConnected( + MultiRequest(Seq( + SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(kraftControllerEpoch), curEpochZk), + CreateOp(ControllerZNode.path, ControllerZNode.encode(kraftControllerId, timestamp), defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT))) + ) + } + + val failureSuffix = s"while trying to register KRaft controller $kraftControllerId with epoch $kraftControllerEpoch. KRaft controller was not registered." + response.resultCode match { + case Code.OK => + info(s"Successfully registered KRaft controller $kraftControllerId with epoch $kraftControllerEpoch") + val setDataResult = response.zkOpResults(0).rawOpResult.asInstanceOf[SetDataResult] + Some(setDataResult.getStat.getVersion) + case Code.BADVERSION => + info(s"Controller epoch changed $failureSuffix") + None + case Code.NONODE => + info(s"The ephemeral node at ${ControllerZNode.path} went away $failureSuffix") + None + case Code.NODEEXISTS => + info(s"The ephemeral node at ${ControllerZNode.path} appeared $failureSuffix") + None + case code => + throw KeeperException.create(code) + } + } + } + private def maybeCreateControllerEpochZNode(): (Int, Int) = { createControllerEpochRaw(KafkaController.InitialControllerEpoch).resultCode match { case Code.OK => @@ -340,6 +411,24 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo } } + def getEntitiesConfigs(rootEntityType: String, sanitizedEntityNames: Set[String]): Map[String, Properties] = { + val getDataRequests: Seq[GetDataRequest] = sanitizedEntityNames.map { entityName => + GetDataRequest(ConfigEntityZNode.path(rootEntityType, entityName), Some(entityName)) + }.toSeq + + val getDataResponses = retryRequestsUntilConnected(getDataRequests) + getDataResponses.map { response => + val entityName = response.ctx.get.asInstanceOf[String] + response.resultCode match { + case Code.OK => + entityName -> ConfigEntityZNode.decode(response.data) + case Code.NONODE => + entityName -> new Properties() + case _ => throw response.resultException.get + } + }.toMap + } + /** * Sets or creates the entity znode path with the given configs depending * on whether it already exists or not. @@ -423,8 +512,12 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * Gets all brokers with broker epoch in the cluster. * @return map of broker to epoch in the cluster. */ - def getAllBrokerAndEpochsInCluster: Map[Broker, Long] = { - val brokerIds = getSortedBrokerList + def getAllBrokerAndEpochsInCluster(filterBrokerIds: Seq[Int] = Seq.empty): Map[Broker, Long] = { + val brokerIds = if (filterBrokerIds.isEmpty) { + getSortedBrokerList + } else { + filterBrokerIds + } val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), ctx = Some(brokerId))) val getDataResponses = retryRequestsUntilConnected(getDataRequests) getDataResponses.flatMap { getDataResponse => @@ -1554,6 +1647,36 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo } } + def getOrCreateMigrationState(initialState: MigrationRecoveryState): MigrationRecoveryState = { + val getDataRequest = GetDataRequest(MigrationZNode.path) + val getDataResponse = retryRequestUntilConnected(getDataRequest) + getDataResponse.resultCode match { + case Code.OK => + MigrationZNode.decode(getDataResponse.data, getDataResponse.stat.getVersion, getDataResponse.stat.getMtime) + case Code.NONODE => + createInitialMigrationState(initialState) + case _ => throw getDataResponse.resultException.get + } + } + + def createInitialMigrationState(initialState: MigrationRecoveryState): MigrationRecoveryState = { + val createRequest = CreateRequest( + MigrationZNode.path, + MigrationZNode.encode(initialState), + defaultAcls(MigrationZNode.path), + CreateMode.PERSISTENT) + val response = retryRequestUntilConnected(createRequest) + response.maybeThrow() + initialState + } + + def updateMigrationState(migrationState: MigrationRecoveryState): MigrationRecoveryState = { + val req = SetDataRequest(MigrationZNode.path, MigrationZNode.encode(migrationState), migrationState.migrationZkVersion()) + val resp = retryRequestUntilConnected(req) + resp.maybeThrow() + migrationState.withZkVersion(resp.stat.getVersion) + } + /** * Return the ACLs of the node of the given path * @param path the given path for the node @@ -1772,6 +1895,106 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo } } + // Perform a sequence of updates to ZooKeeper as part of a KRaft dual write. In addition to adding a CheckOp on the + // controller epoch ZNode, we also include CheckOp/SetDataOp on the migration ZNode. This ensure proper fencing + // from errant ZK controllers as well as fencing from new KRaft controllers. + def retryMigrationRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req], + expectedControllerZkVersion: Int, + migrationState: MigrationRecoveryState): (Int, Seq[Req#Response]) = { + + if (requests.isEmpty) { + throw new IllegalArgumentException("Must specify at least one ZK request for a migration operation.") + } + def wrapMigrationRequest(request: Req, updateMigrationNode: Boolean): MultiRequest = { + val checkOp = CheckOp(ControllerEpochZNode.path, expectedControllerZkVersion) + val migrationOp = if (updateMigrationNode) { + SetDataOp(MigrationZNode.path, MigrationZNode.encode(migrationState), migrationState.migrationZkVersion()) + } else { + CheckOp(MigrationZNode.path, migrationState.migrationZkVersion()) + } + + request match { + case CreateRequest(path, data, acl, createMode, ctx) => + MultiRequest(Seq(checkOp, migrationOp, CreateOp(path, data, acl, createMode)), ctx) + case DeleteRequest(path, version, ctx) => + MultiRequest(Seq(checkOp, migrationOp, DeleteOp(path, version)), ctx) + case SetDataRequest(path, data, version, ctx) => + MultiRequest(Seq(checkOp, migrationOp, SetDataOp(path, data, version)), ctx) + case _ => throw new IllegalStateException(s"$request does not need controller epoch check") + } + } + + def handleUnwrappedMigrationResult(migrationOp: ZkOp, migrationResult: OpResult): Int = { + val (path: String, data: Option[Array[Byte]], version: Int) = migrationOp match { + case CheckOp(path, version) => (path, None, version) + case SetDataOp(path, data, version) => (path, Some(data), version) + case _ => throw new IllegalStateException("Unexpected result on /migration znode") + } + + migrationResult match { + case _: CheckResult => version + case setDataResult: SetDataResult => setDataResult.getStat.getVersion + case errorResult: ErrorResult => + if (path.equals(MigrationZNode.path)) { + val errorCode = Code.get(errorResult.getErr) + if (errorCode == Code.BADVERSION) { + data match { + case Some(value) => + val failedPayload = MigrationZNode.decode(value, version, -1) + throw new RuntimeException(s"Conditional update on KRaft Migration znode failed. Expected zkVersion = ${version}. " + + s"The failed write was: ${failedPayload}. This indicates that another KRaft controller is making writes to ZooKeeper.") + case None => + throw new RuntimeException(s"Check op on KRaft Migration znode failed. Expected zkVersion = ${version}. " + + s"This indicates that another KRaft controller is making writes to ZooKeeper.") + } + } else if (errorCode == Code.OK) { + // what? + version + } else { + throw KeeperException.create(errorCode, path) + } + } else { + throw new RuntimeException(s"Got migration result for incorrect path $path") + } + case _ => throw new RuntimeException(s"Expected either CheckResult, SetDataResult, or ErrorResult for migration op, but saw ${migrationResult}") + } + } + + def unwrapMigrationRequest(response: AsyncResponse): (AsyncResponse, Int) = { + response match { + case MultiResponse(resultCode, _, ctx, zkOpResults, responseMetadata) => + zkOpResults match { + case Seq(ZkOpResult(checkOp: CheckOp, checkOpResult), ZkOpResult(migrationOp: CheckOp, migrationResult), zkOpResult) => + // During the ZK migration, we have the a CheckOp, a conditional update on the migration znode, and the op we're performing + handleUnwrappedCheckOp(checkOp, checkOpResult) + val migrationVersion = handleUnwrappedMigrationResult(migrationOp, migrationResult) + (handleUnwrappedZkOp(zkOpResult, resultCode, ctx, responseMetadata), migrationVersion) + case Seq(ZkOpResult(checkOp: CheckOp, checkOpResult), ZkOpResult(migrationOp: SetDataOp, migrationResult), zkOpResult) => + // During the ZK migration, we have the a CheckOp, a conditional update on the migration znode, and the op we're performing + handleUnwrappedCheckOp(checkOp, checkOpResult) + val migrationVersion = handleUnwrappedMigrationResult(migrationOp, migrationResult) + (handleUnwrappedZkOp(zkOpResult, resultCode, ctx, responseMetadata), migrationVersion) + case null => throw KeeperException.create(resultCode) + case _ => throw new IllegalStateException(s"Cannot unwrap $response because it does not contain the expected operations for a migration operation.") + } + case _ => throw new IllegalStateException(s"Cannot unwrap $response because it is not a MultiResponse") + } + } + + expectedControllerZkVersion match { + case ZkVersion.MatchAnyVersion => throw new IllegalArgumentException(s"Expected a controller epoch zkVersion when making migration writes") + case version if version >= 0 => + logger.trace(s"Performing ${requests.size} migration update(s) with controllerEpochZkVersion=$expectedControllerZkVersion and migrationState=$migrationState") + val wrappedRequests = requests.map(req => wrapMigrationRequest(req, req == requests.last)) + val results = retryRequestsUntilConnected(wrappedRequests) + val unwrappedResults = results.map(unwrapMigrationRequest(_)) + val migrationZkVersion = unwrappedResults.last._2 + (migrationZkVersion, unwrappedResults.map(_._1.asInstanceOf[Req#Response])) + case invalidVersion => + throw new IllegalArgumentException(s"Expected controller epoch zkVersion $invalidVersion should be non-negative or equal to ${ZkVersion.MatchAnyVersion}") + } + } + private def retryRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { val remainingRequests = new mutable.ArrayBuffer(requests.size) ++= requests val responses = new mutable.ArrayBuffer[Req#Response] @@ -1997,6 +2220,42 @@ object KafkaZkClient { } } + private def handleUnwrappedCheckOp(checkOp: CheckOp, checkOpResult: OpResult): Unit = { + checkOpResult match { + case errorResult: ErrorResult => + if (checkOp.path.equals(ControllerEpochZNode.path)) { + val errorCode = Code.get(errorResult.getErr) + if (errorCode == Code.BADVERSION) + // Throw ControllerMovedException when the zkVersionCheck is performed on the controller epoch znode and the check fails + throw new ControllerMovedException(s"Controller epoch zkVersion check fails. Expected zkVersion = ${checkOp.version}") + else if (errorCode != Code.OK) + throw KeeperException.create(errorCode, checkOp.path) + } + case _ => + } + } + + private def handleUnwrappedZkOp(zkOpResult: ZkOpResult, resultCode: Code, ctx: Option[Any], responseMetadata: ResponseMetadata): AsyncResponse = { + val rawOpResult = zkOpResult.rawOpResult + zkOpResult.zkOp match { + case createOp: CreateOp => + val name = rawOpResult match { + case c: CreateResult => c.getPath + case _ => null + } + CreateResponse(resultCode, createOp.path, ctx, name, responseMetadata) + case deleteOp: DeleteOp => + DeleteResponse(resultCode, deleteOp.path, ctx, responseMetadata) + case setDataOp: SetDataOp => + val stat = rawOpResult match { + case s: SetDataResult => s.getStat + case _ => null + } + SetDataResponse(resultCode, setDataOp.path, ctx, stat, responseMetadata) + case zkOp => throw new IllegalStateException(s"Unexpected zkOp: $zkOp") + } + } + // A helper function to transform a MultiResponse with the check on // controller epoch znode zkVersion back into a regular response. // ControllerMovedException will be thrown if the controller epoch @@ -2006,37 +2265,10 @@ object KafkaZkClient { response match { case MultiResponse(resultCode, _, ctx, zkOpResults, responseMetadata) => zkOpResults match { + // In normal ZK writes, we just have a MultiOp with a CheckOp and the actual operation we're performing case Seq(ZkOpResult(checkOp: CheckOp, checkOpResult), zkOpResult) => - checkOpResult match { - case errorResult: ErrorResult => - if (checkOp.path.equals(ControllerEpochZNode.path)) { - val errorCode = Code.get(errorResult.getErr) - if (errorCode == Code.BADVERSION) - // Throw ControllerMovedException when the zkVersionCheck is performed on the controller epoch znode and the check fails - throw new ControllerMovedException(s"Controller epoch zkVersion check fails. Expected zkVersion = ${checkOp.version}") - else if (errorCode != Code.OK) - throw KeeperException.create(errorCode, checkOp.path) - } - case _ => - } - val rawOpResult = zkOpResult.rawOpResult - zkOpResult.zkOp match { - case createOp: CreateOp => - val name = rawOpResult match { - case c: CreateResult => c.getPath - case _ => null - } - CreateResponse(resultCode, createOp.path, ctx, name, responseMetadata) - case deleteOp: DeleteOp => - DeleteResponse(resultCode, deleteOp.path, ctx, responseMetadata) - case setDataOp: SetDataOp => - val stat = rawOpResult match { - case s: SetDataResult => s.getStat - case _ => null - } - SetDataResponse(resultCode, setDataOp.path, ctx, stat, responseMetadata) - case zkOp => throw new IllegalStateException(s"Unexpected zkOp: $zkOp") - } + handleUnwrappedCheckOp(checkOp, checkOpResult) + handleUnwrappedZkOp(zkOpResult, resultCode, ctx, responseMetadata) case null => throw KeeperException.create(resultCode) case _ => throw new IllegalStateException(s"Cannot unwrap $response because the first zookeeper op is not check op in original MultiRequest") } diff --git a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala index 250f87cbd73..2d506a4ac35 100755 --- a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala @@ -68,7 +68,7 @@ class BrokerEpochIntegrationTest extends QuorumTestHarness { @Test def testReplicaManagerBrokerEpochMatchesWithZk(): Unit = { - val brokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster + val brokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster() assertEquals(brokerAndEpochs.size, servers.size) brokerAndEpochs.foreach { case (broker, epoch) => @@ -131,9 +131,9 @@ class BrokerEpochIntegrationTest extends QuorumTestHarness { val controllerContext = new ControllerContext controllerContext.setLiveBrokers(brokerAndEpochs) val metrics = new Metrics - val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, Time.SYSTEM, + val controllerChannelManager = new ControllerChannelManager(() => controllerContext.epoch, controllerConfig, Time.SYSTEM, metrics, new StateChangeLogger(controllerId, inControllerContext = true, None)) - controllerChannelManager.startup() + controllerChannelManager.startup(controllerContext.liveOrShuttingDownBrokers) val broker2 = servers(brokerId2) val epochInRequest = broker2.kafkaController.brokerEpoch + epochInRequestDiffFromCurrentEpoch @@ -249,7 +249,7 @@ class BrokerEpochIntegrationTest extends QuorumTestHarness { } private def checkControllerBrokerEpochsCacheMatchesWithZk(controllerContext: ControllerContext): Unit = { - val brokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster + val brokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster() TestUtils.waitUntilTrue(() => { val brokerEpochsInControllerContext = controllerContext.liveBrokerIdAndEpochs if (brokerAndEpochs.size != brokerEpochsInControllerContext.size) false diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index f0dea91335e..14890a2b79c 100755 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -138,9 +138,9 @@ class LeaderElectionTest extends QuorumTestHarness { val controllerContext = new ControllerContext controllerContext.setLiveBrokers(brokerAndEpochs) val metrics = new Metrics - val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, Time.SYSTEM, + val controllerChannelManager = new ControllerChannelManager(() => controllerContext.epoch, controllerConfig, Time.SYSTEM, metrics, new StateChangeLogger(controllerId, inControllerContext = true, None)) - controllerChannelManager.startup() + controllerChannelManager.startup(controllerContext.liveOrShuttingDownBrokers) try { val staleControllerEpoch = 0 val partitionStates = Seq( diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 70554d9427c..6a63ce98715 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -280,9 +280,9 @@ class ServerShutdownTest extends KafkaServerTestHarness { val controllerConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(controllerId, zkConnect)) val controllerContext = new ControllerContext controllerContext.setLiveBrokers(brokerAndEpochs) - controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, Time.SYSTEM, + controllerChannelManager = new ControllerChannelManager(() => controllerContext.epoch, controllerConfig, Time.SYSTEM, metrics, new StateChangeLogger(controllerId, inControllerContext = true, None)) - controllerChannelManager.startup() + controllerChannelManager.startup(controllerContext.liveOrShuttingDownBrokers) // Initiate a sendRequest and wait until connection is established and one byte is received by the peer val requestBuilder = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, diff --git a/metadata/src/main/java/org/apache/kafka/migration/MigrationClient.java b/metadata/src/main/java/org/apache/kafka/migration/MigrationClient.java new file mode 100644 index 00000000000..27d875d8191 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/migration/MigrationClient.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.migration; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.requests.AbstractControlRequest; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; + +public interface MigrationClient { + ZkControllerState claimControllerLeadership(int kraftControllerId, int kraftControllerEpoch); + + void readAllMetadata(Consumer> batchConsumer, Consumer brokerIdConsumer); + + void addZkBroker(int brokerId); + + void removeZkBroker(int brokerId); + + Set readBrokerIds(); + + Set readBrokerIdsFromTopicAssignments(); + + MigrationRecoveryState getOrCreateMigrationRecoveryState(MigrationRecoveryState initialState); + + MigrationRecoveryState setMigrationRecoveryState(MigrationRecoveryState initialState); + + MigrationRecoveryState createTopic(String topicName, Uuid topicId, Map topicPartitions, MigrationRecoveryState state); + + MigrationRecoveryState updateTopicPartitions(Map> topicPartitions, MigrationRecoveryState state); + + void sendRequestToBroker(int brokerId, + AbstractControlRequest.Builder request, + Consumer callback); +}