Add the ZK reading and writing logic for KIP-866 (#12871)

This commit is contained in:
David Arthur 2022-11-28 10:50:07 -05:00 committed by GitHub
parent b2e71b8f83
commit ff4ac87df4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 659 additions and 70 deletions

View File

@ -226,6 +226,7 @@
<allow pkg="org.apache.kafka.common.resource" />
<allow pkg="org.apache.kafka.controller" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.migration" />
<allow pkg="org.apache.kafka.metadata.authorizer" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.queue" />
@ -292,6 +293,20 @@
</subpackage>
</subpackage>
<subpackage name="migration">
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.common" />
</subpackage>
<subpackage name="metalog">
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />

View File

@ -49,7 +49,7 @@ object ControllerChannelManager {
val RequestRateAndQueueTimeMetricName = "RequestRateAndQueueTimeMs"
}
class ControllerChannelManager(controllerContext: ControllerContext,
class ControllerChannelManager(controllerEpoch: () => Int,
config: KafkaConfig,
time: Time,
metrics: Metrics,
@ -67,8 +67,8 @@ class ControllerChannelManager(controllerContext: ControllerContext,
}
)
def startup() = {
controllerContext.liveOrShuttingDownBrokers.foreach(addNewBroker)
def startup(initialBrokers: Set[Broker]) = {
initialBrokers.foreach(addNewBroker)
brokerLock synchronized {
brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1))
@ -173,7 +173,7 @@ class ControllerChannelManager(controllerContext: ControllerContext,
RequestRateAndQueueTimeMetricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS, brokerMetricTags(broker.id)
)
val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,
val requestThread = new RequestSendThread(config.brokerId, controllerEpoch, messageQueue, networkClient,
brokerNode, config, time, requestRateAndQueueTimeMetrics, stateChangeLogger, threadName)
requestThread.setDaemon(false)
@ -214,7 +214,7 @@ case class QueueItem(apiKey: ApiKeys, request: AbstractControlRequest.Builder[_
callback: AbstractResponse => Unit, enqueueTimeMs: Long)
class RequestSendThread(val controllerId: Int,
val controllerContext: ControllerContext,
val controllerEpoch: () => Int,
val queue: BlockingQueue[QueueItem],
val networkClient: NetworkClient,
val brokerNode: Node,
@ -255,7 +255,7 @@ class RequestSendThread(val controllerId: Int,
}
} catch {
case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
warn(s"Controller $controllerId epoch ${controllerContext.epoch} fails to send request $requestBuilder " +
warn(s"Controller $controllerId epoch ${controllerEpoch.apply()} fails to send request $requestBuilder " +
s"to broker $brokerNode. Reconnecting to broker.", e)
networkClient.close(brokerNode.idString)
isSendSuccessful = false
@ -270,7 +270,7 @@ class RequestSendThread(val controllerId: Int,
val response = clientResponse.responseBody
stateChangeLogger.withControllerEpoch(controllerContext.epoch).trace(s"Received response " +
stateChangeLogger.withControllerEpoch(controllerEpoch.apply()).trace(s"Received response " +
s"$response for request $api with correlation id " +
s"${requestHeader.correlationId} sent to broker $brokerNode")

View File

@ -88,7 +88,7 @@ class KafkaController(val config: KafkaConfig,
private val isAlterPartitionEnabled = config.interBrokerProtocolVersion.isAlterPartitionSupported
private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)
val controllerContext = new ControllerContext
var controllerChannelManager = new ControllerChannelManager(controllerContext, config, time, metrics,
var controllerChannelManager = new ControllerChannelManager(() => controllerContext.epoch, config, time, metrics,
stateChangeLogger, threadNamePrefix)
// have a separate scheduler for the controller to be able to start and stop independently of the kafka server
@ -899,7 +899,7 @@ class KafkaController(val config: KafkaConfig,
private def initializeControllerContext(): Unit = {
// update controller cache with delete topic information
val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster
val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster()
val (compatibleBrokerAndEpochs, incompatibleBrokerAndEpochs) = partitionOnFeatureCompatibility(curBrokerAndEpochs)
if (incompatibleBrokerAndEpochs.nonEmpty) {
warn("Ignoring registration of new brokers due to incompatibilities with finalized features: " +
@ -926,7 +926,7 @@ class KafkaController(val config: KafkaConfig,
// update the leader and isr cache for all existing partitions from Zookeeper
updateLeaderAndIsrCache()
// start the channel manager
controllerChannelManager.startup()
controllerChannelManager.startup(controllerContext.liveOrShuttingDownBrokers)
info(s"Currently active brokers in the cluster: ${controllerContext.liveBrokerIds}")
info(s"Currently shutting brokers in the cluster: ${controllerContext.shuttingDownBrokerIds}")
info(s"Current list of topics in the cluster: ${controllerContext.allTopics}")
@ -1556,7 +1556,7 @@ class KafkaController(val config: KafkaConfig,
private def processBrokerChange(): Unit = {
if (!isActive) return
val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster
val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster()
val curBrokerIdAndEpochs = curBrokerAndEpochs map { case (broker, epoch) => (broker.id, epoch) }
val curBrokerIds = curBrokerIdAndEpochs.keySet
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds

View File

@ -0,0 +1,285 @@
package kafka.migration
import kafka.api.LeaderAndIsr
import kafka.controller.{ControllerChannelManager, LeaderIsrAndControllerEpoch, ReplicaAssignment}
import kafka.server.{ConfigEntityName, ConfigType, ZkAdminManager}
import kafka.utils.Logging
import kafka.zk.TopicZNode.TopicIdReplicaAssignment
import kafka.zk._
import kafka.zookeeper._
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.ControllerMovedException
import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
import org.apache.kafka.common.metadata._
import org.apache.kafka.common.requests.{AbstractControlRequest, AbstractResponse}
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.metadata.PartitionRegistration
import org.apache.kafka.migration._
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.zookeeper.CreateMode
import java.util
import java.util.function.Consumer
import scala.collection.{Seq, mutable}
import scala.jdk.CollectionConverters._
class ZkMigrationClient(zkClient: KafkaZkClient,
controllerChannelManager: ControllerChannelManager) extends MigrationClient with Logging {
def claimControllerLeadership(kraftControllerId: Int, kraftControllerEpoch: Int): ZkControllerState = {
val epochZkVersionOpt = zkClient.tryRegisterKRaftControllerAsActiveController(kraftControllerId, kraftControllerEpoch)
if (epochZkVersionOpt.isDefined) {
new ZkControllerState(kraftControllerId, kraftControllerEpoch, epochZkVersionOpt.get)
} else {
throw new ControllerMovedException("Cannot claim controller leadership, the controller has moved.")
}
}
def migrateTopics(metadataVersion: MetadataVersion,
recordConsumer: Consumer[util.List[ApiMessageAndVersion]],
brokerIdConsumer: Consumer[Integer]): Unit = {
val topics = zkClient.getAllTopicsInCluster()
val topicConfigs = zkClient.getEntitiesConfigs(ConfigType.Topic, topics)
val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(topic, topicIdOpt, assignments) =>
val partitions = assignments.keys.toSeq
val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
val topicBatch = new util.ArrayList[ApiMessageAndVersion]()
topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
.setName(topic)
.setTopicId(topicIdOpt.get), TopicRecord.HIGHEST_SUPPORTED_VERSION))
assignments.foreach { case (topicPartition, replicaAssignment) =>
replicaAssignment.replicas.foreach(brokerIdConsumer.accept(_))
replicaAssignment.addingReplicas.foreach(brokerIdConsumer.accept(_))
val leaderIsrAndEpoch = leaderIsrAndControllerEpochs(topicPartition)
topicBatch.add(new ApiMessageAndVersion(new PartitionRecord()
.setTopicId(topicIdOpt.get)
.setPartitionId(topicPartition.partition)
.setReplicas(replicaAssignment.replicas.map(Integer.valueOf).asJava)
.setAddingReplicas(replicaAssignment.addingReplicas.map(Integer.valueOf).asJava)
.setRemovingReplicas(replicaAssignment.removingReplicas.map(Integer.valueOf).asJava)
.setIsr(leaderIsrAndEpoch.leaderAndIsr.isr.map(Integer.valueOf).asJava)
.setLeader(leaderIsrAndEpoch.leaderAndIsr.leader)
.setLeaderEpoch(leaderIsrAndEpoch.leaderAndIsr.leaderEpoch)
.setPartitionEpoch(leaderIsrAndEpoch.leaderAndIsr.partitionEpoch)
.setLeaderRecoveryState(leaderIsrAndEpoch.leaderAndIsr.leaderRecoveryState.value()), PartitionRecord.HIGHEST_SUPPORTED_VERSION))
}
val props = topicConfigs(topic)
props.forEach { case (key: Object, value: Object) =>
topicBatch.add(new ApiMessageAndVersion(new ConfigRecord()
.setResourceType(ConfigResource.Type.TOPIC.id)
.setResourceName(topic)
.setName(key.toString)
.setValue(value.toString), ConfigRecord.HIGHEST_SUPPORTED_VERSION))
}
recordConsumer.accept(topicBatch)
}
}
def migrateBrokerConfigs(metadataVersion: MetadataVersion,
recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
val brokerEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Broker)
val batch = new util.ArrayList[ApiMessageAndVersion]()
zkClient.getEntitiesConfigs(ConfigType.Broker, brokerEntities.toSet).foreach { case (broker, props) =>
val brokerResource = if (broker == ConfigEntityName.Default) {
""
} else {
broker
}
props.forEach { case (key: Object, value: Object) =>
batch.add(new ApiMessageAndVersion(new ConfigRecord()
.setResourceType(ConfigResource.Type.BROKER.id)
.setResourceName(brokerResource)
.setName(key.toString)
.setValue(value.toString), ConfigRecord.HIGHEST_SUPPORTED_VERSION))
}
}
recordConsumer.accept(batch)
}
def migrateClientQuotas(metadataVersion: MetadataVersion,
recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
val adminZkClient = new AdminZkClient(zkClient)
def migrateEntityType(entityType: String): Unit = {
adminZkClient.fetchAllEntityConfigs(entityType).foreach { case (name, props) =>
val entity = new EntityData().setEntityType(entityType).setEntityName(name)
val batch = new util.ArrayList[ApiMessageAndVersion]()
ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { case (key: String, value: Double) =>
batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
.setEntity(List(entity).asJava)
.setKey(key)
.setValue(value), ClientQuotaRecord.HIGHEST_SUPPORTED_VERSION))
}
recordConsumer.accept(batch)
}
}
migrateEntityType(ConfigType.User)
migrateEntityType(ConfigType.Client)
adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, ConfigType.Client).foreach { case (name, props) =>
// Lifted from ZkAdminManager
val components = name.split("/")
if (components.size != 3 || components(1) != "clients")
throw new IllegalArgumentException(s"Unexpected config path: ${name}")
val entity = List(
new EntityData().setEntityType(ConfigType.User).setEntityName(components(0)),
new EntityData().setEntityType(ConfigType.Client).setEntityName(components(2))
)
val batch = new util.ArrayList[ApiMessageAndVersion]()
ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { case (key: String, value: Double) =>
batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
.setEntity(entity.asJava)
.setKey(key)
.setValue(value), ClientQuotaRecord.HIGHEST_SUPPORTED_VERSION))
}
recordConsumer.accept(batch)
}
migrateEntityType(ConfigType.Ip)
}
def migrateProducerId(metadataVersion: MetadataVersion,
recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
val (dataOpt, _) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
dataOpt match {
case Some(data) =>
val producerIdBlock = ProducerIdBlockZNode.parseProducerIdBlockData(data)
recordConsumer.accept(List(new ApiMessageAndVersion(new ProducerIdsRecord()
.setBrokerEpoch(-1)
.setBrokerId(producerIdBlock.assignedBrokerId)
.setNextProducerId(producerIdBlock.firstProducerId), ProducerIdsRecord.HIGHEST_SUPPORTED_VERSION)).asJava)
case None => // Nothing to migrate
}
}
override def readAllMetadata(batchConsumer: Consumer[util.List[ApiMessageAndVersion]], brokerIdConsumer: Consumer[Integer]): Unit = {
migrateTopics(MetadataVersion.latest(), batchConsumer, brokerIdConsumer)
migrateBrokerConfigs(MetadataVersion.latest(), batchConsumer)
migrateClientQuotas(MetadataVersion.latest(), batchConsumer)
migrateProducerId(MetadataVersion.latest(), batchConsumer)
}
override def readBrokerIds(): util.Set[Integer] = {
zkClient.getSortedBrokerList.map(Integer.valueOf).toSet.asJava
}
override def readBrokerIdsFromTopicAssignments(): util.Set[Integer] = {
val topics = zkClient.getAllTopicsInCluster()
val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
val brokersWithAssignments = mutable.HashSet[Int]()
replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(_, _, assignments) =>
assignments.values.foreach { assignment => brokersWithAssignments.addAll(assignment.replicas) }
}
brokersWithAssignments.map(Integer.valueOf).asJava
}
override def addZkBroker(brokerId: Int): Unit = {
val brokerAndEpoch = zkClient.getAllBrokerAndEpochsInCluster(Seq(brokerId))
controllerChannelManager.addBroker(brokerAndEpoch.head._1)
}
override def removeZkBroker(brokerId: Int): Unit = {
controllerChannelManager.removeBroker(brokerId)
}
override def getOrCreateMigrationRecoveryState(initialState: MigrationRecoveryState): MigrationRecoveryState = {
zkClient.getOrCreateMigrationState(initialState)
}
override def setMigrationRecoveryState(state: MigrationRecoveryState): MigrationRecoveryState = {
zkClient.updateMigrationState(state)
}
override def sendRequestToBroker(brokerId: Int,
request: AbstractControlRequest.Builder[_ <: AbstractControlRequest],
callback: Consumer[AbstractResponse]): Unit = {
controllerChannelManager.sendRequest(brokerId, request, callback.accept)
}
override def createTopic(topicName: String, topicId: Uuid, partitions: util.Map[Integer, PartitionRegistration], state: MigrationRecoveryState): MigrationRecoveryState = {
val assignments = partitions.asScala.map { case (partitionId, partition) =>
new TopicPartition(topicName, partitionId) -> ReplicaAssignment(partition.replicas, partition.addingReplicas, partition.removingReplicas)
}
val createTopicZNode = {
val path = TopicZNode.path(topicName)
CreateRequest(
path,
TopicZNode.encode(Some(topicId), assignments),
zkClient.defaultAcls(path),
CreateMode.PERSISTENT)
}
val createPartitionsZNode = {
val path = TopicPartitionsZNode.path(topicName)
CreateRequest(
path,
null,
zkClient.defaultAcls(path),
CreateMode.PERSISTENT)
}
val createPartitionZNodeReqs = partitions.asScala.flatMap { case (partitionId, partition) =>
val topicPartition = new TopicPartition(topicName, partitionId)
Seq(
createTopicPartition(topicPartition),
createTopicPartitionState(topicPartition, partition, state.kraftControllerEpoch())
)
}
val requests = Seq(createTopicZNode, createPartitionsZNode) ++ createPartitionZNodeReqs
val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests, state.controllerZkVersion(), state)
responses.foreach(System.err.println)
state.withZkVersion(migrationZkVersion)
}
private def createTopicPartition(topicPartition: TopicPartition): CreateRequest = {
val path = TopicPartitionZNode.path(topicPartition)
CreateRequest(path, null, zkClient.defaultAcls(path), CreateMode.PERSISTENT, Some(topicPartition))
}
private def createTopicPartitionState(topicPartition: TopicPartition, partitionRegistration: PartitionRegistration, controllerEpoch: Int): CreateRequest = {
val path = TopicPartitionStateZNode.path(topicPartition)
val data = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(new LeaderAndIsr(
partitionRegistration.leader,
partitionRegistration.leaderEpoch,
partitionRegistration.isr.toList,
partitionRegistration.leaderRecoveryState,
partitionRegistration.partitionEpoch), controllerEpoch))
CreateRequest(path, data, zkClient.defaultAcls(path), CreateMode.PERSISTENT, Some(topicPartition))
}
private def updateTopicPartitionState(topicPartition: TopicPartition, partitionRegistration: PartitionRegistration, controllerEpoch: Int): SetDataRequest = {
val path = TopicPartitionStateZNode.path(topicPartition)
val data = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(new LeaderAndIsr(
partitionRegistration.leader,
partitionRegistration.leaderEpoch,
partitionRegistration.isr.toList,
partitionRegistration.leaderRecoveryState,
partitionRegistration.partitionEpoch), controllerEpoch))
SetDataRequest(path, data, ZkVersion.MatchAnyVersion, Some(topicPartition))
}
override def updateTopicPartitions(topicPartitions: util.Map[String, util.Map[Integer, PartitionRegistration]],
state: MigrationRecoveryState): MigrationRecoveryState = {
val requests = topicPartitions.asScala.flatMap { case (topicName, partitionRegistrations) =>
partitionRegistrations.asScala.flatMap { case (partitionId, partitionRegistration) =>
val topicPartition = new TopicPartition(topicName, partitionId)
Seq(updateTopicPartitionState(topicPartition, partitionRegistration, state.kraftControllerEpoch()))
}
}
if (requests.isEmpty) {
state
} else {
val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests.toSeq, state.controllerZkVersion(), state)
responses.foreach(System.err.println)
state.withZkVersion(migrationZkVersion)
}
}
}

View File

@ -54,6 +54,19 @@ import org.apache.kafka.common.utils.Sanitizer
import scala.collection.{Map, mutable, _}
import scala.jdk.CollectionConverters._
object ZkAdminManager {
def clientQuotaPropsToDoubleMap(props: Map[String, String]): Map[String, Double] = {
props.map { case (key, value) =>
val doubleValue = try value.toDouble catch {
case _: NumberFormatException =>
throw new IllegalStateException(s"Unexpected client quota configuration value: $key -> $value")
}
key -> doubleValue
}
}
}
class ZkAdminManager(val config: KafkaConfig,
val metrics: Metrics,
val metadataCache: MetadataCache,
@ -636,16 +649,6 @@ class ZkAdminManager(val config: KafkaConfig,
private def sanitized(name: Option[String]): String = name.map(n => sanitizeEntityName(n)).getOrElse("")
private def fromProps(props: Map[String, String]): Map[String, Double] = {
props.map { case (key, value) =>
val doubleValue = try value.toDouble catch {
case _: NumberFormatException =>
throw new IllegalStateException(s"Unexpected client quota configuration value: $key -> $value")
}
key -> doubleValue
}
}
def handleDescribeClientQuotas(userComponent: Option[ClientQuotaFilterComponent],
clientIdComponent: Option[ClientQuotaFilterComponent], strict: Boolean): Map[ClientQuotaEntity, Map[String, Double]] = {
@ -706,7 +709,7 @@ class ZkAdminManager(val config: KafkaConfig,
(userEntries ++ clientIdEntries ++ bothEntries).flatMap { case ((u, c), p) =>
val quotaProps = p.asScala.filter { case (key, _) => QuotaConfigs.isClientOrUserQuotaConfig(key) }
if (quotaProps.nonEmpty && matches(userComponent, u) && matches(clientIdComponent, c))
Some(userClientIdToEntity(u, c) -> fromProps(quotaProps))
Some(userClientIdToEntity(u, c) -> ZkAdminManager.clientQuotaPropsToDoubleMap(quotaProps))
else
None
}.toMap
@ -732,7 +735,7 @@ class ZkAdminManager(val config: KafkaConfig,
ipEntries.flatMap { case (ip, props) =>
val ipQuotaProps = props.asScala.filter { case (key, _) => DynamicConfig.Ip.names.contains(key) }
if (ipQuotaProps.nonEmpty)
Some(ipToQuotaEntity(ip) -> fromProps(ipQuotaProps))
Some(ipToQuotaEntity(ip) -> ZkAdminManager.clientQuotaPropsToDoubleMap(ipQuotaProps))
else
None
}

View File

@ -34,12 +34,13 @@ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceT
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.migration.MigrationRecoveryState
import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
import org.apache.zookeeper.OpResult.{CreateResult, ErrorResult, SetDataResult}
import org.apache.zookeeper.OpResult.{CheckResult, CreateResult, ErrorResult, SetDataResult}
import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.common.ZKConfig
import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper.{CreateMode, KeeperException, ZooKeeper}
import org.apache.zookeeper.{CreateMode, KeeperException, OpResult, ZooKeeper}
import scala.collection.{Map, Seq, mutable}
@ -141,13 +142,13 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
def tryCreateControllerZNodeAndIncrementEpoch(): (Int, Int) = {
val response = retryRequestUntilConnected(
MultiRequest(Seq(
CreateOp(ControllerZNode.path, ControllerZNode.encode(controllerId, timestamp), defaultAcls(ControllerZNode.path), CreateMode.EPHEMERAL),
SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(newControllerEpoch), expectedControllerEpochZkVersion)))
SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(newControllerEpoch), expectedControllerEpochZkVersion),
CreateOp(ControllerZNode.path, ControllerZNode.encode(controllerId, timestamp), defaultAcls(ControllerZNode.path), CreateMode.EPHEMERAL)))
)
response.resultCode match {
case Code.NODEEXISTS | Code.BADVERSION => checkControllerAndEpoch()
case Code.OK =>
val setDataResult = response.zkOpResults(1).rawOpResult.asInstanceOf[SetDataResult]
val setDataResult = response.zkOpResults(0).rawOpResult.asInstanceOf[SetDataResult]
(newControllerEpoch, setDataResult.getStat.getVersion)
case code => throw KeeperException.create(code)
}
@ -156,6 +157,76 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
tryCreateControllerZNodeAndIncrementEpoch()
}
/**
* Registers a given KRaft controller in zookeeper as the active controller. Unlike the ZK equivalent of this method,
* this creates /controller as a persistent znode. This prevents ZK brokers from attempting to claim the controller
* leadership during a KRaft leadership failover.
*
* This method is called at the beginning of a KRaft migration and during subsequent KRaft leadership changes during
* the migration.
*
* To ensure that the KRaft controller epoch proceeds the current ZK controller epoch, this registration algorithm
* uses a conditional update on the /controller_epoch znode. If a new ZK controller is elected during this method,
* the multi request transaction will fail and this method will return None.
*
* @param kraftControllerId ID of the KRaft controller node
* @param kraftControllerEpoch Epoch of the KRaft controller node
* @return An optional of the new zkVersion of /controller_epoch. None if we could not register the KRaft controller.
*/
def tryRegisterKRaftControllerAsActiveController(kraftControllerId: Int, kraftControllerEpoch: Int): Option[Int] = {
val timestamp = time.milliseconds()
val curEpochOpt: Option[(Int, Int)] = getControllerEpoch.map(e => (e._1, e._2.getVersion))
val controllerOpt = getControllerId
curEpochOpt match {
case None =>
throw new IllegalStateException(s"Cannot register KRaft controller $kraftControllerId as the active controller " +
s"since there is no ZK controller epoch present.")
case Some((curEpoch: Int, curEpochZk: Int)) =>
if (curEpoch >= kraftControllerEpoch) {
// TODO actually raise exception here
warn(s"Cannot register KRaft controller $kraftControllerId as the active controller " +
s"since epoch is not higher. Current ZK epoch is ${curEpoch}, KRaft epoch is $kraftControllerEpoch.")
}
val response = if (controllerOpt.isDefined) {
info(s"KRaft controller $kraftControllerId overwriting ${ControllerZNode.path} to become the active controller with epoch $kraftControllerEpoch")
retryRequestUntilConnected(
MultiRequest(Seq(
SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(kraftControllerEpoch), curEpochZk),
DeleteOp(ControllerZNode.path, ZkVersion.MatchAnyVersion),
CreateOp(ControllerZNode.path, ControllerZNode.encode(kraftControllerId, timestamp), defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT)))
)
} else {
info(s"KRaft controller $kraftControllerId creating ${ControllerZNode.path} to become the active controller with epoch $kraftControllerEpoch")
retryRequestUntilConnected(
MultiRequest(Seq(
SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(kraftControllerEpoch), curEpochZk),
CreateOp(ControllerZNode.path, ControllerZNode.encode(kraftControllerId, timestamp), defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT)))
)
}
val failureSuffix = s"while trying to register KRaft controller $kraftControllerId with epoch $kraftControllerEpoch. KRaft controller was not registered."
response.resultCode match {
case Code.OK =>
info(s"Successfully registered KRaft controller $kraftControllerId with epoch $kraftControllerEpoch")
val setDataResult = response.zkOpResults(0).rawOpResult.asInstanceOf[SetDataResult]
Some(setDataResult.getStat.getVersion)
case Code.BADVERSION =>
info(s"Controller epoch changed $failureSuffix")
None
case Code.NONODE =>
info(s"The ephemeral node at ${ControllerZNode.path} went away $failureSuffix")
None
case Code.NODEEXISTS =>
info(s"The ephemeral node at ${ControllerZNode.path} appeared $failureSuffix")
None
case code =>
throw KeeperException.create(code)
}
}
}
private def maybeCreateControllerEpochZNode(): (Int, Int) = {
createControllerEpochRaw(KafkaController.InitialControllerEpoch).resultCode match {
case Code.OK =>
@ -340,6 +411,24 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
}
}
def getEntitiesConfigs(rootEntityType: String, sanitizedEntityNames: Set[String]): Map[String, Properties] = {
val getDataRequests: Seq[GetDataRequest] = sanitizedEntityNames.map { entityName =>
GetDataRequest(ConfigEntityZNode.path(rootEntityType, entityName), Some(entityName))
}.toSeq
val getDataResponses = retryRequestsUntilConnected(getDataRequests)
getDataResponses.map { response =>
val entityName = response.ctx.get.asInstanceOf[String]
response.resultCode match {
case Code.OK =>
entityName -> ConfigEntityZNode.decode(response.data)
case Code.NONODE =>
entityName -> new Properties()
case _ => throw response.resultException.get
}
}.toMap
}
/**
* Sets or creates the entity znode path with the given configs depending
* on whether it already exists or not.
@ -423,8 +512,12 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
* Gets all brokers with broker epoch in the cluster.
* @return map of broker to epoch in the cluster.
*/
def getAllBrokerAndEpochsInCluster: Map[Broker, Long] = {
val brokerIds = getSortedBrokerList
def getAllBrokerAndEpochsInCluster(filterBrokerIds: Seq[Int] = Seq.empty): Map[Broker, Long] = {
val brokerIds = if (filterBrokerIds.isEmpty) {
getSortedBrokerList
} else {
filterBrokerIds
}
val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), ctx = Some(brokerId)))
val getDataResponses = retryRequestsUntilConnected(getDataRequests)
getDataResponses.flatMap { getDataResponse =>
@ -1554,6 +1647,36 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
}
}
def getOrCreateMigrationState(initialState: MigrationRecoveryState): MigrationRecoveryState = {
val getDataRequest = GetDataRequest(MigrationZNode.path)
val getDataResponse = retryRequestUntilConnected(getDataRequest)
getDataResponse.resultCode match {
case Code.OK =>
MigrationZNode.decode(getDataResponse.data, getDataResponse.stat.getVersion, getDataResponse.stat.getMtime)
case Code.NONODE =>
createInitialMigrationState(initialState)
case _ => throw getDataResponse.resultException.get
}
}
def createInitialMigrationState(initialState: MigrationRecoveryState): MigrationRecoveryState = {
val createRequest = CreateRequest(
MigrationZNode.path,
MigrationZNode.encode(initialState),
defaultAcls(MigrationZNode.path),
CreateMode.PERSISTENT)
val response = retryRequestUntilConnected(createRequest)
response.maybeThrow()
initialState
}
def updateMigrationState(migrationState: MigrationRecoveryState): MigrationRecoveryState = {
val req = SetDataRequest(MigrationZNode.path, MigrationZNode.encode(migrationState), migrationState.migrationZkVersion())
val resp = retryRequestUntilConnected(req)
resp.maybeThrow()
migrationState.withZkVersion(resp.stat.getVersion)
}
/**
* Return the ACLs of the node of the given path
* @param path the given path for the node
@ -1772,6 +1895,106 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
}
}
// Perform a sequence of updates to ZooKeeper as part of a KRaft dual write. In addition to adding a CheckOp on the
// controller epoch ZNode, we also include CheckOp/SetDataOp on the migration ZNode. This ensure proper fencing
// from errant ZK controllers as well as fencing from new KRaft controllers.
def retryMigrationRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req],
expectedControllerZkVersion: Int,
migrationState: MigrationRecoveryState): (Int, Seq[Req#Response]) = {
if (requests.isEmpty) {
throw new IllegalArgumentException("Must specify at least one ZK request for a migration operation.")
}
def wrapMigrationRequest(request: Req, updateMigrationNode: Boolean): MultiRequest = {
val checkOp = CheckOp(ControllerEpochZNode.path, expectedControllerZkVersion)
val migrationOp = if (updateMigrationNode) {
SetDataOp(MigrationZNode.path, MigrationZNode.encode(migrationState), migrationState.migrationZkVersion())
} else {
CheckOp(MigrationZNode.path, migrationState.migrationZkVersion())
}
request match {
case CreateRequest(path, data, acl, createMode, ctx) =>
MultiRequest(Seq(checkOp, migrationOp, CreateOp(path, data, acl, createMode)), ctx)
case DeleteRequest(path, version, ctx) =>
MultiRequest(Seq(checkOp, migrationOp, DeleteOp(path, version)), ctx)
case SetDataRequest(path, data, version, ctx) =>
MultiRequest(Seq(checkOp, migrationOp, SetDataOp(path, data, version)), ctx)
case _ => throw new IllegalStateException(s"$request does not need controller epoch check")
}
}
def handleUnwrappedMigrationResult(migrationOp: ZkOp, migrationResult: OpResult): Int = {
val (path: String, data: Option[Array[Byte]], version: Int) = migrationOp match {
case CheckOp(path, version) => (path, None, version)
case SetDataOp(path, data, version) => (path, Some(data), version)
case _ => throw new IllegalStateException("Unexpected result on /migration znode")
}
migrationResult match {
case _: CheckResult => version
case setDataResult: SetDataResult => setDataResult.getStat.getVersion
case errorResult: ErrorResult =>
if (path.equals(MigrationZNode.path)) {
val errorCode = Code.get(errorResult.getErr)
if (errorCode == Code.BADVERSION) {
data match {
case Some(value) =>
val failedPayload = MigrationZNode.decode(value, version, -1)
throw new RuntimeException(s"Conditional update on KRaft Migration znode failed. Expected zkVersion = ${version}. " +
s"The failed write was: ${failedPayload}. This indicates that another KRaft controller is making writes to ZooKeeper.")
case None =>
throw new RuntimeException(s"Check op on KRaft Migration znode failed. Expected zkVersion = ${version}. " +
s"This indicates that another KRaft controller is making writes to ZooKeeper.")
}
} else if (errorCode == Code.OK) {
// what?
version
} else {
throw KeeperException.create(errorCode, path)
}
} else {
throw new RuntimeException(s"Got migration result for incorrect path $path")
}
case _ => throw new RuntimeException(s"Expected either CheckResult, SetDataResult, or ErrorResult for migration op, but saw ${migrationResult}")
}
}
def unwrapMigrationRequest(response: AsyncResponse): (AsyncResponse, Int) = {
response match {
case MultiResponse(resultCode, _, ctx, zkOpResults, responseMetadata) =>
zkOpResults match {
case Seq(ZkOpResult(checkOp: CheckOp, checkOpResult), ZkOpResult(migrationOp: CheckOp, migrationResult), zkOpResult) =>
// During the ZK migration, we have the a CheckOp, a conditional update on the migration znode, and the op we're performing
handleUnwrappedCheckOp(checkOp, checkOpResult)
val migrationVersion = handleUnwrappedMigrationResult(migrationOp, migrationResult)
(handleUnwrappedZkOp(zkOpResult, resultCode, ctx, responseMetadata), migrationVersion)
case Seq(ZkOpResult(checkOp: CheckOp, checkOpResult), ZkOpResult(migrationOp: SetDataOp, migrationResult), zkOpResult) =>
// During the ZK migration, we have the a CheckOp, a conditional update on the migration znode, and the op we're performing
handleUnwrappedCheckOp(checkOp, checkOpResult)
val migrationVersion = handleUnwrappedMigrationResult(migrationOp, migrationResult)
(handleUnwrappedZkOp(zkOpResult, resultCode, ctx, responseMetadata), migrationVersion)
case null => throw KeeperException.create(resultCode)
case _ => throw new IllegalStateException(s"Cannot unwrap $response because it does not contain the expected operations for a migration operation.")
}
case _ => throw new IllegalStateException(s"Cannot unwrap $response because it is not a MultiResponse")
}
}
expectedControllerZkVersion match {
case ZkVersion.MatchAnyVersion => throw new IllegalArgumentException(s"Expected a controller epoch zkVersion when making migration writes")
case version if version >= 0 =>
logger.trace(s"Performing ${requests.size} migration update(s) with controllerEpochZkVersion=$expectedControllerZkVersion and migrationState=$migrationState")
val wrappedRequests = requests.map(req => wrapMigrationRequest(req, req == requests.last))
val results = retryRequestsUntilConnected(wrappedRequests)
val unwrappedResults = results.map(unwrapMigrationRequest(_))
val migrationZkVersion = unwrappedResults.last._2
(migrationZkVersion, unwrappedResults.map(_._1.asInstanceOf[Req#Response]))
case invalidVersion =>
throw new IllegalArgumentException(s"Expected controller epoch zkVersion $invalidVersion should be non-negative or equal to ${ZkVersion.MatchAnyVersion}")
}
}
private def retryRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = {
val remainingRequests = new mutable.ArrayBuffer(requests.size) ++= requests
val responses = new mutable.ArrayBuffer[Req#Response]
@ -1997,16 +2220,7 @@ object KafkaZkClient {
}
}
// A helper function to transform a MultiResponse with the check on
// controller epoch znode zkVersion back into a regular response.
// ControllerMovedException will be thrown if the controller epoch
// znode zkVersion check fails. This is used for fencing zookeeper
// updates in controller.
private def unwrapResponseWithControllerEpochCheck(response: AsyncResponse): AsyncResponse = {
response match {
case MultiResponse(resultCode, _, ctx, zkOpResults, responseMetadata) =>
zkOpResults match {
case Seq(ZkOpResult(checkOp: CheckOp, checkOpResult), zkOpResult) =>
private def handleUnwrappedCheckOp(checkOp: CheckOp, checkOpResult: OpResult): Unit = {
checkOpResult match {
case errorResult: ErrorResult =>
if (checkOp.path.equals(ControllerEpochZNode.path)) {
@ -2019,6 +2233,9 @@ object KafkaZkClient {
}
case _ =>
}
}
private def handleUnwrappedZkOp(zkOpResult: ZkOpResult, resultCode: Code, ctx: Option[Any], responseMetadata: ResponseMetadata): AsyncResponse = {
val rawOpResult = zkOpResult.rawOpResult
zkOpResult.zkOp match {
case createOp: CreateOp =>
@ -2037,6 +2254,21 @@ object KafkaZkClient {
SetDataResponse(resultCode, setDataOp.path, ctx, stat, responseMetadata)
case zkOp => throw new IllegalStateException(s"Unexpected zkOp: $zkOp")
}
}
// A helper function to transform a MultiResponse with the check on
// controller epoch znode zkVersion back into a regular response.
// ControllerMovedException will be thrown if the controller epoch
// znode zkVersion check fails. This is used for fencing zookeeper
// updates in controller.
private def unwrapResponseWithControllerEpochCheck(response: AsyncResponse): AsyncResponse = {
response match {
case MultiResponse(resultCode, _, ctx, zkOpResults, responseMetadata) =>
zkOpResults match {
// In normal ZK writes, we just have a MultiOp with a CheckOp and the actual operation we're performing
case Seq(ZkOpResult(checkOp: CheckOp, checkOpResult), zkOpResult) =>
handleUnwrappedCheckOp(checkOp, checkOpResult)
handleUnwrappedZkOp(zkOpResult, resultCode, ctx, responseMetadata)
case null => throw KeeperException.create(resultCode)
case _ => throw new IllegalStateException(s"Cannot unwrap $response because the first zookeeper op is not check op in original MultiRequest")
}

View File

@ -68,7 +68,7 @@ class BrokerEpochIntegrationTest extends QuorumTestHarness {
@Test
def testReplicaManagerBrokerEpochMatchesWithZk(): Unit = {
val brokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster
val brokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster()
assertEquals(brokerAndEpochs.size, servers.size)
brokerAndEpochs.foreach {
case (broker, epoch) =>
@ -131,9 +131,9 @@ class BrokerEpochIntegrationTest extends QuorumTestHarness {
val controllerContext = new ControllerContext
controllerContext.setLiveBrokers(brokerAndEpochs)
val metrics = new Metrics
val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, Time.SYSTEM,
val controllerChannelManager = new ControllerChannelManager(() => controllerContext.epoch, controllerConfig, Time.SYSTEM,
metrics, new StateChangeLogger(controllerId, inControllerContext = true, None))
controllerChannelManager.startup()
controllerChannelManager.startup(controllerContext.liveOrShuttingDownBrokers)
val broker2 = servers(brokerId2)
val epochInRequest = broker2.kafkaController.brokerEpoch + epochInRequestDiffFromCurrentEpoch
@ -249,7 +249,7 @@ class BrokerEpochIntegrationTest extends QuorumTestHarness {
}
private def checkControllerBrokerEpochsCacheMatchesWithZk(controllerContext: ControllerContext): Unit = {
val brokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster
val brokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster()
TestUtils.waitUntilTrue(() => {
val brokerEpochsInControllerContext = controllerContext.liveBrokerIdAndEpochs
if (brokerAndEpochs.size != brokerEpochsInControllerContext.size) false

View File

@ -138,9 +138,9 @@ class LeaderElectionTest extends QuorumTestHarness {
val controllerContext = new ControllerContext
controllerContext.setLiveBrokers(brokerAndEpochs)
val metrics = new Metrics
val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, Time.SYSTEM,
val controllerChannelManager = new ControllerChannelManager(() => controllerContext.epoch, controllerConfig, Time.SYSTEM,
metrics, new StateChangeLogger(controllerId, inControllerContext = true, None))
controllerChannelManager.startup()
controllerChannelManager.startup(controllerContext.liveOrShuttingDownBrokers)
try {
val staleControllerEpoch = 0
val partitionStates = Seq(

View File

@ -280,9 +280,9 @@ class ServerShutdownTest extends KafkaServerTestHarness {
val controllerConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(controllerId, zkConnect))
val controllerContext = new ControllerContext
controllerContext.setLiveBrokers(brokerAndEpochs)
controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, Time.SYSTEM,
controllerChannelManager = new ControllerChannelManager(() => controllerContext.epoch, controllerConfig, Time.SYSTEM,
metrics, new StateChangeLogger(controllerId, inControllerContext = true, None))
controllerChannelManager.startup()
controllerChannelManager.startup(controllerContext.liveOrShuttingDownBrokers)
// Initiate a sendRequest and wait until connection is established and one byte is received by the peer
val requestBuilder = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.migration;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.requests.AbstractControlRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
public interface MigrationClient {
ZkControllerState claimControllerLeadership(int kraftControllerId, int kraftControllerEpoch);
void readAllMetadata(Consumer<List<ApiMessageAndVersion>> batchConsumer, Consumer<Integer> brokerIdConsumer);
void addZkBroker(int brokerId);
void removeZkBroker(int brokerId);
Set<Integer> readBrokerIds();
Set<Integer> readBrokerIdsFromTopicAssignments();
MigrationRecoveryState getOrCreateMigrationRecoveryState(MigrationRecoveryState initialState);
MigrationRecoveryState setMigrationRecoveryState(MigrationRecoveryState initialState);
MigrationRecoveryState createTopic(String topicName, Uuid topicId, Map<Integer, PartitionRegistration> topicPartitions, MigrationRecoveryState state);
MigrationRecoveryState updateTopicPartitions(Map<String, Map<Integer, PartitionRegistration>> topicPartitions, MigrationRecoveryState state);
void sendRequestToBroker(int brokerId,
AbstractControlRequest.Builder<? extends AbstractControlRequest> request,
Consumer<AbstractResponse> callback);
}