KAFKA-18435 Remove zookeeper dependencies in build.gradle (#18450)

Remove Apache ZooKeeper from the Apache Kafka build. Also remove commons IO, commons CLI, and netty, which were dependencies we took only because of ZooKeeper.

In order to keep the size of this PR manageable, I did not remove all classes which formerly interfaced with ZK. I just removed the ZK types. Fortunately, Kafka generally wrapped ZK data structures rather than using them directly.

Some classes were pretty entangled with ZK, so it was easier just to stub them out. For ZkNodeChangeNotificationListener.scala, PartitionStateMachine.scala, ReplicaStateMachine.scala, KafkaZkClient.scala, and ZookeeperClient.scala, I replaced all the functions with "throw new UnsupportedOperationException". Since the tests for these classes have been removed, as well as the ZK-based broker code, this should be OK as an incremental step.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Colin Patrick McCabe 2025-01-08 18:25:17 -08:00 committed by GitHub
parent d1b1d9d47d
commit c28d9a3486
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 181 additions and 2771 deletions

View File

@ -208,10 +208,8 @@ License Version 2.0:
audience-annotations-0.12.0
caffeine-3.1.1
commons-beanutils-1.9.4
commons-cli-1.4
commons-collections-3.2.2
commons-digester-2.1
commons-io-2.14.0
commons-lang3-3.12.0
commons-logging-1.3.2
commons-validator-1.9.0
@ -257,15 +255,6 @@ lz4-java-1.8.0
maven-artifact-3.9.6
metrics-core-4.1.12.1
metrics-core-2.2.0
netty-buffer-4.1.115.Final
netty-codec-4.1.115.Final
netty-common-4.1.115.Final
netty-handler-4.1.115.Final
netty-resolver-4.1.115.Final
netty-transport-4.1.115.Final
netty-transport-classes-epoll-4.1.115.Final
netty-transport-native-epoll-4.1.115.Final
netty-transport-native-unix-common-4.1.115.Final
opentelemetry-proto-1.0.0-alpha
plexus-utils-3.5.1
rocksdbjni-7.9.2
@ -275,8 +264,6 @@ scala-reflect-2.13.15
snappy-java-1.1.10.5
snakeyaml-2.2
swagger-annotations-2.2.25
zookeeper-3.8.4
zookeeper-jute-3.8.4
===============================================================================
This product bundles various third-party components under other open source

View File

@ -596,45 +596,6 @@ limitations under the License.
This software includes projects with other licenses -- see `doc/LICENSE.md`.
Apache ZooKeeper - Server
Copyright 2008-2021 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
Apache ZooKeeper - Jute
Copyright 2008-2021 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
The Netty Project
=================
Please visit the Netty web site for more information:
* https://netty.io/
Copyright 2014 The Netty Project
The Netty Project licenses this file to you under the Apache License,
version 2.0 (the "License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at:
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
Also, please refer to each LICENSE.<component>.txt file, which is located in
the 'license' directory of the distribution file, for the license terms of the
components that this product depends on.
-------------------------------------------------------------------------------
This product contains the extensions to Java Collections Framework which has
been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene:

View File

@ -190,10 +190,6 @@ allprojects {
libs.scalaReflect,
libs.jacksonAnnotations,
libs.jacksonDatabindYaml,
// be explicit about the Netty dependency version instead of relying on the version set by
// ZooKeeper (potentially older and containing CVEs)
libs.nettyHandler,
libs.nettyTransportNativeEpoll,
libs.log4j2Api,
libs.log4j2Core,
libs.log4j1Bridge2Api
@ -1107,20 +1103,6 @@ project(':core') {
// only needed transitively, but set it explicitly to ensure it has the same version as scala-library
implementation libs.scalaReflect
implementation libs.scalaLogging
implementation libs.commonsIo // ZooKeeper dependency. Do not use, this is going away.
implementation(libs.zookeeper) {
// Dropwizard Metrics are required by ZooKeeper as of v3.6.0,
// but the library should *not* be used in Kafka code
implementation libs.dropwizardMetrics
exclude module: 'slf4j-log4j12'
exclude module: 'log4j'
// Both Kafka and Zookeeper use slf4j. ZooKeeper moved from log4j to logback in v3.8.0.
// We are removing Zookeeper's dependency on logback so we have a singular logging backend.
exclude module: 'logback-classic'
exclude module: 'logback-core'
}
// ZooKeeperMain depends on commons-cli but declares the dependency as `provided`
implementation libs.commonsCli
implementation log4jLibs
runtimeOnly log4jRuntimeLibs

View File

@ -451,7 +451,6 @@
<subpackage name="internals">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="kafka.utils" />
<allow pkg="org.apache.zookeeper" />
<allow pkg="org.apache.log4j" />
</subpackage>
</subpackage>

View File

@ -16,18 +16,9 @@
*/
package kafka.common
import java.nio.charset.StandardCharsets.UTF_8
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.AtomicBoolean
import kafka.utils.Logging
import kafka.zk.{KafkaZkClient, StateChangeHandlers}
import kafka.zookeeper.{StateChangeHandler, ZNodeChildChangeHandler}
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.util.ShutdownableThread
import scala.collection.Seq
import scala.util.{Failure, Try}
/**
* Handle the notificationMessage.
@ -56,105 +47,20 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
private val notificationHandler: NotificationHandler,
private val changeExpirationMs: Long = 15 * 60 * 1000,
private val time: Time = Time.SYSTEM) extends Logging {
private var lastExecutedChange = -1L
private val queue = new LinkedBlockingQueue[ChangeNotification]
private val thread = new ChangeEventProcessThread(s"$seqNodeRoot-event-process-thread")
private val isClosed = new AtomicBoolean(false)
def init(): Unit = {
zkClient.registerStateChangeHandler(ZkStateChangeHandler)
zkClient.registerZNodeChildChangeHandler(ChangeNotificationHandler)
addChangeNotification()
thread.start()
throw new UnsupportedOperationException()
}
def close(): Unit = {
isClosed.set(true)
zkClient.unregisterStateChangeHandler(ZkStateChangeHandler.name)
zkClient.unregisterZNodeChildChangeHandler(ChangeNotificationHandler.path)
queue.clear()
thread.shutdown()
throw new UnsupportedOperationException()
}
/**
* Process notifications
*/
private def processNotifications(): Unit = {
try {
val notifications = zkClient.getChildren(seqNodeRoot).sorted
if (notifications.nonEmpty) {
info(s"Processing notification(s) to $seqNodeRoot")
val now = time.milliseconds
for (notification <- notifications) {
val changeId = changeNumber(notification)
if (changeId > lastExecutedChange) {
processNotification(notification)
lastExecutedChange = changeId
}
}
purgeObsoleteNotifications(now, notifications)
}
} catch {
case e: InterruptedException => if (!isClosed.get) error(s"Error while processing notification change for path = $seqNodeRoot", e)
case e: Exception => error(s"Error while processing notification change for path = $seqNodeRoot", e)
object ZkStateChangeHandler {
val name: String = null
def afterInitializingSession(): Unit = {
throw new UnsupportedOperationException()
}
}
private def processNotification(notification: String): Unit = {
val changeZnode = seqNodeRoot + "/" + notification
val (data, _) = zkClient.getDataAndStat(changeZnode)
data match {
case Some(d) => Try(notificationHandler.processNotification(d)) match {
case Failure(e) => error(s"error processing change notification ${new String(d, UTF_8)} from $changeZnode", e)
case _ =>
}
case None => warn(s"read null data from $changeZnode")
}
}
private def addChangeNotification(): Unit = {
if (!isClosed.get && queue.peek() == null)
queue.put(new ChangeNotification)
}
private class ChangeNotification {
def process(): Unit = processNotifications()
}
/**
* Purges expired notifications.
*
* @param now
* @param notifications
*/
private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]): Unit = {
for (notification <- notifications.sorted) {
val notificationNode = seqNodeRoot + "/" + notification
val (data, stat) = zkClient.getDataAndStat(notificationNode)
if (data.isDefined) {
if (now - stat.getCtime > changeExpirationMs) {
debug(s"Purging change notification $notificationNode")
zkClient.deletePath(notificationNode)
}
}
}
}
/* get the change number from a change notification znode */
private def changeNumber(name: String): Long = name.substring(seqNodePrefix.length).toLong
private class ChangeEventProcessThread(name: String) extends ShutdownableThread(name) {
override def doWork(): Unit = queue.take().process()
}
private object ChangeNotificationHandler extends ZNodeChildChangeHandler {
override val path: String = seqNodeRoot
override def handleChildChange(): Unit = addChangeNotification()
}
object ZkStateChangeHandler extends StateChangeHandler {
override val name: String = StateChangeHandlers.zkNodeChangeListenerHandler(seqNodeRoot)
override def afterInitializingSession(): Unit = addChangeNotification()
}
}

View File

@ -28,10 +28,9 @@ import kafka.utils._
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import kafka.zk.TopicZNode.TopicIdReplicaAssignment
import kafka.zk.{FeatureZNodeStatus, _}
import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler, ZNodeChildChangeHandler}
import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler, ZNodeChildChangeHandler, ZooKeeperClientException}
import org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType
import org.apache.kafka.common.ElectionType
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException, StaleBrokerEpochException}
@ -45,8 +44,6 @@ import org.apache.kafka.server.BrokerFeatures
import org.apache.kafka.server.common.{AdminOperationException, ProducerIdsBlock}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.KafkaScheduler
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.Code
import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.collection.mutable.ArrayBuffer
@ -1068,21 +1065,7 @@ class KafkaController(val config: KafkaConfig,
}
private def updateReplicaAssignmentForPartition(topicPartition: TopicPartition, assignment: ReplicaAssignment): Unit = {
val topicAssignment = mutable.Map() ++=
controllerContext.partitionFullReplicaAssignmentForTopic(topicPartition.topic) +=
(topicPartition -> assignment)
val setDataResponse = zkClient.setTopicAssignmentRaw(topicPartition.topic,
controllerContext.topicIds.get(topicPartition.topic),
topicAssignment, controllerContext.epochZkVersion)
setDataResponse.resultCode match {
case Code.OK =>
info(s"Successfully updated assignment of partition $topicPartition to $assignment")
case Code.NONODE =>
throw new IllegalStateException(s"Failed to update assignment for $topicPartition since the topic " +
"has no current assignment")
case _ => throw new KafkaException(setDataResponse.resultException.get)
}
throw new UnsupportedOperationException()
}
private def startNewReplicasForReassignedPartition(topicPartition: TopicPartition, newReplicas: Seq[Int]): Unit = {
@ -1186,7 +1169,7 @@ class KafkaController(val config: KafkaConfig,
try {
zkClient.setOrCreatePartitionReassignment(updatedPartitionsBeingReassigned, controllerContext.epochZkVersion)
} catch {
case e: KeeperException => throw new AdminOperationException(e)
case e: ZooKeeperClientException => throw new AdminOperationException(e)
}
}
}

View File

@ -16,40 +16,27 @@
*/
package kafka.controller
import kafka.common.StateChangeFailedException
import kafka.controller.Election._
import kafka.server.KafkaConfig
import kafka.utils.Logging
import kafka.zk.KafkaZkClient
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import kafka.zk.TopicPartitionStateZNode
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.ControllerMovedException
import org.apache.kafka.metadata.LeaderAndIsr
import org.apache.kafka.server.common.MetadataVersion.IBP_3_2_IV0
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.Code
import scala.collection.{Map, Seq, mutable}
import scala.jdk.CollectionConverters._
import scala.collection.{Map, Seq}
abstract class PartitionStateMachine(controllerContext: ControllerContext) extends Logging {
/**
* Invoked on successful controller election.
*/
def startup(): Unit = {
info("Initializing partition state")
initializePartitionState()
info("Triggering online partition state changes")
triggerOnlinePartitionStateChange()
debug(s"Started partition state machine with initial state -> ${controllerContext.partitionStates}")
throw new UnsupportedOperationException()
}
/**
* Invoked on controller shutdown.
*/
def shutdown(): Unit = {
info("Stopped partition state machine")
throw new UnsupportedOperationException()
}
/**
@ -57,53 +44,18 @@ abstract class PartitionStateMachine(controllerContext: ControllerContext) exten
* state. This is called on a successful controller election and on broker changes
*/
def triggerOnlinePartitionStateChange(): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
val partitions = controllerContext.partitionsInStates(Set(OfflinePartition, NewPartition))
triggerOnlineStateChangeForPartitions(partitions)
throw new UnsupportedOperationException()
}
def triggerOnlinePartitionStateChange(topic: String): Unit = {
val partitions = controllerContext.partitionsInStates(topic, Set(OfflinePartition, NewPartition))
triggerOnlineStateChangeForPartitions(partitions)
}
private def triggerOnlineStateChangeForPartitions(partitions: collection.Set[TopicPartition]): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
// try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state except partitions
// that belong to topics to be deleted
val partitionsToTrigger = partitions.filter { partition =>
!controllerContext.isTopicQueuedUpForDeletion(partition.topic)
}.toSeq
handleStateChanges(partitionsToTrigger, OnlinePartition, Some(OfflinePartitionLeaderElectionStrategy(false)))
// TODO: If handleStateChanges catches an exception, it is not enough to bail out and log an error.
// It is important to trigger leader election for those partitions.
}
/**
* Invoked on startup of the partition's state machine to set the initial state for all existing partitions in
* zookeeper
*/
private def initializePartitionState(): Unit = {
for (topicPartition <- controllerContext.allPartitions) {
// check if leader and isr path exists for partition. If not, then it is in NEW state
controllerContext.partitionLeadershipInfo(topicPartition) match {
case Some(currentLeaderIsrAndEpoch) =>
// else, check if the leader for partition is alive. If yes, it is in Online state, else it is in Offline state
if (controllerContext.isReplicaOnline(currentLeaderIsrAndEpoch.leaderAndIsr.leader, topicPartition))
// leader is alive
controllerContext.putPartitionState(topicPartition, OnlinePartition)
else
controllerContext.putPartitionState(topicPartition, OfflinePartition)
case None =>
controllerContext.putPartitionState(topicPartition, NewPartition)
}
}
throw new UnsupportedOperationException()
}
def handleStateChanges(
partitions: Seq[TopicPartition],
targetState: PartitionState
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
handleStateChanges(partitions, targetState, None)
throw new UnsupportedOperationException()
}
def handleStateChanges(
@ -111,7 +63,6 @@ abstract class PartitionStateMachine(controllerContext: ControllerContext) exten
targetState: PartitionState,
leaderElectionStrategy: Option[PartitionLeaderElectionStrategy]
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]]
}
/**
@ -133,8 +84,6 @@ class ZkPartitionStateMachine(config: KafkaConfig,
controllerBrokerRequestBatch: ControllerBrokerRequestBatch)
extends PartitionStateMachine(controllerContext) {
private val isLeaderRecoverySupported = config.interBrokerProtocolVersion.isAtLeast(IBP_3_2_IV0)
private val controllerId = config.brokerId
this.logIdent = s"[PartitionStateMachine controllerId=$controllerId] "
@ -153,405 +102,25 @@ class ZkPartitionStateMachine(config: KafkaConfig,
targetState: PartitionState,
partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
if (partitions.nonEmpty) {
try {
controllerBrokerRequestBatch.newBatch()
val result = doHandleStateChanges(
partitions,
targetState,
partitionLeaderElectionStrategyOpt
)
controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
result
} catch {
case e: ControllerMovedException =>
error(s"Controller moved to another broker when moving some partitions to $targetState state", e)
throw e
case e: Throwable =>
error(s"Error while moving some partitions to $targetState state", e)
partitions.iterator.map(_ -> Left(e)).toMap
}
} else {
Map.empty
}
}
private def partitionState(partition: TopicPartition): PartitionState = {
controllerContext.partitionState(partition)
}
/**
* This API exercises the partition's state machine. It ensures that every state transition happens from a legal
* previous state to the target state. Valid state transitions are:
* NonExistentPartition -> NewPartition:
* --load assigned replicas from ZK to controller cache
*
* NewPartition -> OnlinePartition
* --assign first live replica as the leader and all live replicas as the isr; write leader and isr to ZK for this partition
* --send LeaderAndIsr request to every live replica and UpdateMetadata request to every live broker
*
* OnlinePartition,OfflinePartition -> OnlinePartition
* --select new leader and isr for this partition and a set of replicas to receive the LeaderAndIsr request, and write leader and isr to ZK
* --for this partition, send LeaderAndIsr request to every receiving replica and UpdateMetadata request to every live broker
*
* NewPartition,OnlinePartition,OfflinePartition -> OfflinePartition
* --nothing other than marking partition state as Offline
*
* OfflinePartition -> NonExistentPartition
* --nothing other than marking the partition state as NonExistentPartition
* @param partitions The partitions for which the state transition is invoked
* @param targetState The end state that the partition should be moved to
* @return A map of failed and successful elections when targetState is OnlinePartitions. The keys are the
* topic partitions and the corresponding values are either the exception that was thrown or new
* leader & ISR.
*/
private def doHandleStateChanges(
partitions: Seq[TopicPartition],
targetState: PartitionState,
partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
val traceEnabled = stateChangeLog.isTraceEnabled
partitions.foreach(partition => controllerContext.putPartitionStateIfNotExists(partition, NonExistentPartition))
val (validPartitions, invalidPartitions) = controllerContext.checkValidPartitionStateChange(partitions, targetState)
invalidPartitions.foreach(partition => logInvalidTransition(partition, targetState))
targetState match {
case NewPartition =>
validPartitions.foreach { partition =>
stateChangeLog.info(s"Changed partition $partition state from ${partitionState(partition)} to $targetState with " +
s"assigned replicas ${controllerContext.partitionReplicaAssignment(partition).mkString(",")}")
controllerContext.putPartitionState(partition, NewPartition)
}
Map.empty
case OnlinePartition =>
val uninitializedPartitions = validPartitions.filter(partition => partitionState(partition) == NewPartition)
val partitionsToElectLeader = validPartitions.filter(partition => partitionState(partition) == OfflinePartition || partitionState(partition) == OnlinePartition)
if (uninitializedPartitions.nonEmpty) {
val successfulInitializations = initializeLeaderAndIsrForPartitions(uninitializedPartitions)
successfulInitializations.foreach { partition =>
stateChangeLog.info(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
s"${controllerContext.partitionLeadershipInfo(partition).get.leaderAndIsr}")
controllerContext.putPartitionState(partition, OnlinePartition)
}
}
if (partitionsToElectLeader.nonEmpty) {
val electionResults = electLeaderForPartitions(
partitionsToElectLeader,
partitionLeaderElectionStrategyOpt.getOrElse(
throw new IllegalArgumentException("Election strategy is a required field when the target state is OnlinePartition")
)
)
electionResults.foreach {
case (partition, Right(leaderAndIsr)) =>
stateChangeLog.info(
s"Changed partition $partition from ${partitionState(partition)} to $targetState with state $leaderAndIsr"
)
controllerContext.putPartitionState(partition, OnlinePartition)
case (_, Left(_)) => // Ignore; no need to update partition state on election error
}
electionResults
} else {
Map.empty
}
case OfflinePartition | NonExistentPartition =>
validPartitions.foreach { partition =>
if (traceEnabled)
stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState")
controllerContext.putPartitionState(partition, targetState)
}
Map.empty
}
}
/**
* Initialize leader and isr partition state in zookeeper.
* @param partitions The partitions that we're trying to initialize.
* @return The partitions that have been successfully initialized.
*/
private def initializeLeaderAndIsrForPartitions(partitions: Seq[TopicPartition]): Seq[TopicPartition] = {
val successfulInitializations = mutable.Buffer.empty[TopicPartition]
val replicasPerPartition = partitions.map(partition => partition -> controllerContext.partitionReplicaAssignment(partition))
val liveReplicasPerPartition = replicasPerPartition.map { case (partition, replicas) =>
val liveReplicasForPartition = replicas.filter(replica => controllerContext.isReplicaOnline(replica, partition))
partition -> liveReplicasForPartition
}
val (partitionsWithoutLiveReplicas, partitionsWithLiveReplicas) = liveReplicasPerPartition.partition { case (_, liveReplicas) => liveReplicas.isEmpty }
partitionsWithoutLiveReplicas.foreach { case (partition, _) =>
val failMsg = s"Controller $controllerId epoch ${controllerContext.epoch} encountered error during state change of " +
s"partition $partition from New to Online, assigned replicas are " +
s"[${controllerContext.partitionReplicaAssignment(partition).mkString(",")}], live brokers are [${controllerContext.liveBrokerIds}]. No assigned " +
"replica is alive."
logFailedStateChange(partition, NewPartition, OnlinePartition, new StateChangeFailedException(failMsg))
}
val leaderIsrAndControllerEpochs = partitionsWithLiveReplicas.map { case (partition, liveReplicas) =>
val leaderAndIsr = new LeaderAndIsr(liveReplicas.head, liveReplicas.toList.map(Integer.valueOf).asJava)
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
partition -> leaderIsrAndControllerEpoch
}.toMap
val createResponses = try {
zkClient.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs, controllerContext.epochZkVersion)
} catch {
case e: ControllerMovedException =>
error("Controller moved to another broker when trying to create the topic partition state znode", e)
throw e
case e: Exception =>
partitionsWithLiveReplicas.foreach { case (partition, _) => logFailedStateChange(partition, partitionState(partition), NewPartition, e) }
Seq.empty
}
createResponses.foreach { createResponse =>
val code = createResponse.resultCode
val partition = createResponse.ctx.get.asInstanceOf[TopicPartition]
val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochs(partition)
if (code == Code.OK) {
controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch)
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(leaderIsrAndControllerEpoch.leaderAndIsr.isr.asScala.map(_.toInt),
partition, leaderIsrAndControllerEpoch, controllerContext.partitionFullReplicaAssignment(partition), isNew = true)
successfulInitializations += partition
} else {
logFailedStateChange(partition, NewPartition, OnlinePartition, code)
}
}
successfulInitializations
}
/**
* Repeatedly attempt to elect leaders for multiple partitions until there are no more remaining partitions to retry.
* @param partitions The partitions that we're trying to elect leaders for.
* @param partitionLeaderElectionStrategy The election strategy to use.
* @return A map of failed and successful elections. The keys are the topic partitions and the corresponding values are
* either the exception that was thrown or new leader & ISR.
*/
private def electLeaderForPartitions(
partitions: Seq[TopicPartition],
partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
var remaining = partitions
val finishedElections = mutable.Map.empty[TopicPartition, Either[Throwable, LeaderAndIsr]]
while (remaining.nonEmpty) {
val (finished, updatesToRetry) = doElectLeaderForPartitions(remaining, partitionLeaderElectionStrategy)
remaining = updatesToRetry
finished.foreach {
case (partition, Left(e)) =>
logFailedStateChange(partition, partitionState(partition), OnlinePartition, e)
case (_, Right(_)) => // Ignore; success so no need to log failed state change
}
finishedElections ++= finished
if (remaining.nonEmpty)
logger.info(s"Retrying leader election with strategy $partitionLeaderElectionStrategy for partitions $remaining")
}
finishedElections.toMap
}
/**
* Try to elect leaders for multiple partitions.
* Electing a leader for a partition updates partition state in zookeeper.
*
* @param partitions The partitions that we're trying to elect leaders for.
* @param partitionLeaderElectionStrategy The election strategy to use.
* @return A tuple of two values:
* 1. The partitions and the expected leader and isr that successfully had a leader elected. And exceptions
* corresponding to failed elections that should not be retried.
* 2. The partitions that we should retry due to a zookeeper BADVERSION conflict. Version conflicts can occur if
* the partition leader updated partition state while the controller attempted to update partition state.
*/
private def doElectLeaderForPartitions(
partitions: Seq[TopicPartition],
partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy
): (Map[TopicPartition, Either[Exception, LeaderAndIsr]], Seq[TopicPartition]) = {
val getDataResponses = try {
zkClient.getTopicPartitionStatesRaw(partitions)
} catch {
case e: Exception =>
return (partitions.iterator.map(_ -> Left(e)).toMap, Seq.empty)
}
val failedElections = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]]
val validLeaderAndIsrs = mutable.Buffer.empty[(TopicPartition, LeaderAndIsr)]
getDataResponses.foreach { getDataResponse =>
val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
val currState = partitionState(partition)
if (getDataResponse.resultCode == Code.OK) {
TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat) match {
case Some(leaderIsrAndControllerEpoch) =>
if (leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch) {
val failMsg = s"Aborted leader election for partition $partition since the LeaderAndIsr path was " +
s"already written by another controller. This probably means that the current controller $controllerId went through " +
s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}."
failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
} else {
validLeaderAndIsrs += partition -> leaderIsrAndControllerEpoch.leaderAndIsr
}
case None =>
val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
failedElections.put(partition, Left(exception))
}
} else if (getDataResponse.resultCode == Code.NONODE) {
val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
failedElections.put(partition, Left(exception))
} else {
failedElections.put(partition, Left(getDataResponse.resultException.get))
}
}
if (validLeaderAndIsrs.isEmpty) {
return (failedElections.toMap, Seq.empty)
}
val (partitionsWithoutLeaders, partitionsWithLeaders) = partitionLeaderElectionStrategy match {
case OfflinePartitionLeaderElectionStrategy(allowUnclean) =>
val partitionsWithUncleanLeaderElectionState = collectUncleanLeaderElectionState(
validLeaderAndIsrs,
allowUnclean
)
leaderForOffline(
controllerContext,
isLeaderRecoverySupported,
partitionsWithUncleanLeaderElectionState
).partition(_.leaderAndIsr.isEmpty)
case ReassignPartitionLeaderElectionStrategy =>
leaderForReassign(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
case PreferredReplicaPartitionLeaderElectionStrategy =>
leaderForPreferredReplica(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
case ControlledShutdownPartitionLeaderElectionStrategy =>
leaderForControlledShutdown(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
}
partitionsWithoutLeaders.foreach { electionResult =>
val partition = electionResult.topicPartition
val failMsg = s"Failed to elect leader for partition $partition under strategy $partitionLeaderElectionStrategy"
failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
}
val recipientsPerPartition = partitionsWithLeaders.map(result => result.topicPartition -> result.liveReplicas).toMap
val adjustedLeaderAndIsrs = partitionsWithLeaders.map(result => result.topicPartition -> result.leaderAndIsr.get).toMap
val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) = zkClient.updateLeaderAndIsr(
adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
finishedUpdates.foreachEntry { (partition, result) =>
result.foreach { leaderAndIsr =>
val replicaAssignment = controllerContext.partitionFullReplicaAssignment(partition)
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch)
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipientsPerPartition(partition), partition,
leaderIsrAndControllerEpoch, replicaAssignment, isNew = false)
}
}
if (isDebugEnabled) {
updatesToRetry.foreach { partition =>
debug(s"Controller failed to elect leader for partition $partition. " +
s"Attempted to write state ${adjustedLeaderAndIsrs(partition)}, but failed with bad ZK version. This will be retried.")
}
}
(finishedUpdates ++ failedElections, updatesToRetry)
}
/* For the provided set of topic partition and partition sync state it attempts to determine if unclean
* leader election should be performed. Unclean election should be performed if there are no live
* replica which are in sync and unclean leader election is allowed (allowUnclean parameter is true or
* the topic has been configured to allow unclean election).
*
* @param leaderIsrAndControllerEpochs set of partition to determine if unclean leader election should be
* allowed
* @param allowUnclean whether to allow unclean election without having to read the topic configuration
* @return a sequence of three element tuple:
* 1. topic partition
* 2. leader, isr and controller epoc. Some means election should be performed
* 3. allow unclean
*/
private def collectUncleanLeaderElectionState(
leaderAndIsrs: Seq[(TopicPartition, LeaderAndIsr)],
allowUnclean: Boolean
): Seq[(TopicPartition, Option[LeaderAndIsr], Boolean)] = {
val (partitionsWithNoLiveInSyncReplicas, partitionsWithLiveInSyncReplicas) = leaderAndIsrs.partition {
case (partition, leaderAndIsr) =>
val liveInSyncReplicas = leaderAndIsr.isr.asScala.filter(controllerContext.isReplicaOnline(_, partition))
liveInSyncReplicas.isEmpty
}
val electionForPartitionWithoutLiveReplicas = if (allowUnclean) {
partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderAndIsr) =>
(partition, Option(leaderAndIsr), true)
}
} else {
val (logConfigs, failed) = zkClient.getLogConfigs(
partitionsWithNoLiveInSyncReplicas.iterator.map { case (partition, _) => partition.topic }.toSet,
config.originals()
)
partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderAndIsr) =>
if (failed.contains(partition.topic)) {
logFailedStateChange(partition, partitionState(partition), OnlinePartition, failed(partition.topic))
(partition, None, false)
} else {
(
partition,
Option(leaderAndIsr),
logConfigs(partition.topic).uncleanLeaderElectionEnable.booleanValue()
)
}
}
}
electionForPartitionWithoutLiveReplicas ++
partitionsWithLiveInSyncReplicas.map { case (partition, leaderAndIsr) =>
(partition, Option(leaderAndIsr), false)
}
}
private def logInvalidTransition(partition: TopicPartition, targetState: PartitionState): Unit = {
val currState = partitionState(partition)
val e = new IllegalStateException(s"Partition $partition should be in one of " +
s"${targetState.validPreviousStates.mkString(",")} states before moving to $targetState state. Instead it is in " +
s"$currState state")
logFailedStateChange(partition, currState, targetState, e)
}
private def logFailedStateChange(partition: TopicPartition, currState: PartitionState, targetState: PartitionState, code: Code): Unit = {
logFailedStateChange(partition, currState, targetState, KeeperException.create(code))
}
private def logFailedStateChange(partition: TopicPartition, currState: PartitionState, targetState: PartitionState, t: Throwable): Unit = {
stateChangeLogger.withControllerEpoch(controllerContext.epoch)
.error(s"Controller $controllerId epoch ${controllerContext.epoch} failed to change state for partition $partition " +
s"from $currState to $targetState", t)
throw new UnsupportedOperationException()
}
}
object PartitionLeaderElectionAlgorithms {
def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean, controllerContext: ControllerContext): Option[Int] = {
assignment.find(id => liveReplicas.contains(id) && isr.contains(id)).orElse {
if (uncleanLeaderElectionEnabled) {
val leaderOpt = assignment.find(liveReplicas.contains)
if (leaderOpt.isDefined)
controllerContext.stats.uncleanLeaderElectionRate.mark()
leaderOpt
} else {
None
}
}
throw new UnsupportedOperationException()
}
def reassignPartitionLeaderElection(reassignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = {
reassignment.find(id => liveReplicas.contains(id) && isr.contains(id))
throw new UnsupportedOperationException()
}
def preferredReplicaPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = {
assignment.headOption.filter(id => liveReplicas.contains(id) && isr.contains(id))
throw new UnsupportedOperationException()
}
def controlledShutdownPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], shuttingDownBrokers: Set[Int]): Option[Int] = {
assignment.find(id => liveReplicas.contains(id) && isr.contains(id) && !shuttingDownBrokers.contains(id))
throw new UnsupportedOperationException()
}
}

View File

@ -16,61 +16,25 @@
*/
package kafka.controller
import kafka.common.StateChangeFailedException
import kafka.server.KafkaConfig
import kafka.utils.Logging
import kafka.zk.KafkaZkClient
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import kafka.zk.TopicPartitionStateZNode
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.ControllerMovedException
import org.apache.kafka.metadata.LeaderAndIsr
import org.apache.zookeeper.KeeperException.Code
import java.util.stream.Collectors
import scala.collection.{Seq, mutable}
import scala.collection.Seq
abstract class ReplicaStateMachine(controllerContext: ControllerContext) extends Logging {
/**
* Invoked on successful controller election.
*/
def startup(): Unit = {
info("Initializing replica state")
initializeReplicaState()
info("Triggering online replica state changes")
val (onlineReplicas, offlineReplicas) = controllerContext.onlineAndOfflineReplicas
handleStateChanges(onlineReplicas.toSeq, OnlineReplica)
info("Triggering offline replica state changes")
handleStateChanges(offlineReplicas.toSeq, OfflineReplica)
debug(s"Started replica state machine with initial state -> ${controllerContext.replicaStates}")
throw new UnsupportedOperationException()
}
/**
* Invoked on controller shutdown.
*/
def shutdown(): Unit = {
info("Stopped replica state machine")
}
/**
* Invoked on startup of the replica's state machine to set the initial state for replicas of all existing partitions
* in zookeeper
*/
private def initializeReplicaState(): Unit = {
controllerContext.allPartitions.foreach { partition =>
val replicas = controllerContext.partitionReplicaAssignment(partition)
replicas.foreach { replicaId =>
val partitionAndReplica = PartitionAndReplica(partition, replicaId)
if (controllerContext.isReplicaOnline(replicaId, partition)) {
controllerContext.putReplicaState(partitionAndReplica, OnlineReplica)
} else {
// mark replicas on dead brokers as failed for topic deletion, if they belong to a topic to be deleted.
// This is required during controller failover since during controller failover a broker can go down,
// so the replicas on that broker should be moved to ReplicaDeletionIneligible to be on the safer side.
controllerContext.putReplicaState(partitionAndReplica, ReplicaDeletionIneligible)
}
}
}
throw new UnsupportedOperationException()
}
def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit
@ -106,350 +70,7 @@ class ZkReplicaStateMachine(config: KafkaConfig,
this.logIdent = s"[ReplicaStateMachine controllerId=$controllerId] "
override def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit = {
if (replicas.nonEmpty) {
try {
controllerBrokerRequestBatch.newBatch()
replicas.groupBy(_.replica).foreachEntry { (replicaId, replicas) =>
doHandleStateChanges(replicaId, replicas, targetState)
}
controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
} catch {
case e: ControllerMovedException =>
error(s"Controller moved to another broker when moving some replicas to $targetState state", e)
throw e
case e: Throwable => error(s"Error while moving some replicas to $targetState state", e)
}
}
}
/**
* This API exercises the replica's state machine. It ensures that every state transition happens from a legal
* previous state to the target state. Valid state transitions are:
* NonExistentReplica --> NewReplica
* --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the
* partition to every live broker
*
* NewReplica -> OnlineReplica
* --add the new replica to the assigned replica list if needed
*
* OnlineReplica,OfflineReplica -> OnlineReplica
* --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the
* partition to every live broker
*
* NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible -> OfflineReplica
* --send StopReplicaRequest to the replica (w/o deletion)
* --remove this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and
* UpdateMetadata request for the partition to every live broker.
*
* OfflineReplica -> ReplicaDeletionStarted
* --send StopReplicaRequest to the replica (with deletion)
*
* ReplicaDeletionStarted -> ReplicaDeletionSuccessful
* -- mark the state of the replica in the state machine
*
* ReplicaDeletionStarted -> ReplicaDeletionIneligible
* -- mark the state of the replica in the state machine
*
* ReplicaDeletionSuccessful -> NonExistentReplica
* -- remove the replica from the in memory partition replica assignment cache
*
* @param replicaId The replica for which the state transition is invoked
* @param replicas The partitions on this replica for which the state transition is invoked
* @param targetState The end state that the replica should be moved to
*/
private def doHandleStateChanges(replicaId: Int, replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit = {
val stateLogger = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
val traceEnabled = stateLogger.isTraceEnabled
replicas.foreach(replica => controllerContext.putReplicaStateIfNotExists(replica, NonExistentReplica))
val (validReplicas, invalidReplicas) = controllerContext.checkValidReplicaStateChange(replicas, targetState)
invalidReplicas.foreach(replica => logInvalidTransition(replica, targetState))
targetState match {
case NewReplica =>
validReplicas.foreach { replica =>
val partition = replica.topicPartition
val currentState = controllerContext.replicaState(replica)
controllerContext.partitionLeadershipInfo(partition) match {
case Some(leaderIsrAndControllerEpoch) =>
if (leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) {
val exception = new StateChangeFailedException(s"Replica $replicaId for partition $partition cannot be moved to NewReplica state as it is being requested to become leader")
logFailedStateChange(replica, currentState, OfflineReplica, exception)
} else {
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
replica.topicPartition,
leaderIsrAndControllerEpoch,
controllerContext.partitionFullReplicaAssignment(replica.topicPartition),
isNew = true)
if (traceEnabled)
logSuccessfulTransition(stateLogger, replicaId, partition, currentState, NewReplica)
controllerContext.putReplicaState(replica, NewReplica)
}
case None =>
if (traceEnabled)
logSuccessfulTransition(stateLogger, replicaId, partition, currentState, NewReplica)
controllerContext.putReplicaState(replica, NewReplica)
}
}
case OnlineReplica =>
validReplicas.foreach { replica =>
val partition = replica.topicPartition
val currentState = controllerContext.replicaState(replica)
currentState match {
case NewReplica =>
val assignment = controllerContext.partitionFullReplicaAssignment(partition)
if (!assignment.replicas.contains(replicaId)) {
error(s"Adding replica ($replicaId) that is not part of the assignment $assignment")
val newAssignment = assignment.copy(replicas = assignment.replicas :+ replicaId)
controllerContext.updatePartitionFullReplicaAssignment(partition, newAssignment)
}
case _ =>
controllerContext.partitionLeadershipInfo(partition) match {
case Some(leaderIsrAndControllerEpoch) =>
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
replica.topicPartition,
leaderIsrAndControllerEpoch,
controllerContext.partitionFullReplicaAssignment(partition), isNew = false)
case None =>
}
}
if (traceEnabled)
logSuccessfulTransition(stateLogger, replicaId, partition, currentState, OnlineReplica)
controllerContext.putReplicaState(replica, OnlineReplica)
}
case OfflineReplica =>
validReplicas.foreach { replica =>
controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition, deletePartition = false)
}
val (replicasWithLeadershipInfo, replicasWithoutLeadershipInfo) = validReplicas.partition { replica =>
controllerContext.partitionLeadershipInfo(replica.topicPartition).isDefined
}
val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasWithLeadershipInfo.map(_.topicPartition))
updatedLeaderIsrAndControllerEpochs.foreachEntry { (partition, leaderIsrAndControllerEpoch) =>
stateLogger.info(s"Partition $partition state changed to $leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as part of transition to $OfflineReplica")
if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId)
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipients,
partition,
leaderIsrAndControllerEpoch,
controllerContext.partitionFullReplicaAssignment(partition), isNew = false)
}
val replica = PartitionAndReplica(partition, replicaId)
val currentState = controllerContext.replicaState(replica)
if (traceEnabled)
logSuccessfulTransition(stateLogger, replicaId, partition, currentState, OfflineReplica)
controllerContext.putReplicaState(replica, OfflineReplica)
}
replicasWithoutLeadershipInfo.foreach { replica =>
val currentState = controllerContext.replicaState(replica)
if (traceEnabled)
logSuccessfulTransition(stateLogger, replicaId, replica.topicPartition, currentState, OfflineReplica)
controllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(replica.topicPartition))
controllerContext.putReplicaState(replica, OfflineReplica)
}
case ReplicaDeletionStarted =>
validReplicas.foreach { replica =>
val currentState = controllerContext.replicaState(replica)
if (traceEnabled)
logSuccessfulTransition(stateLogger, replicaId, replica.topicPartition, currentState, ReplicaDeletionStarted)
controllerContext.putReplicaState(replica, ReplicaDeletionStarted)
controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition, deletePartition = true)
}
case ReplicaDeletionIneligible =>
validReplicas.foreach { replica =>
val currentState = controllerContext.replicaState(replica)
if (traceEnabled)
logSuccessfulTransition(stateLogger, replicaId, replica.topicPartition, currentState, ReplicaDeletionIneligible)
controllerContext.putReplicaState(replica, ReplicaDeletionIneligible)
}
case ReplicaDeletionSuccessful =>
validReplicas.foreach { replica =>
val currentState = controllerContext.replicaState(replica)
if (traceEnabled)
logSuccessfulTransition(stateLogger, replicaId, replica.topicPartition, currentState, ReplicaDeletionSuccessful)
controllerContext.putReplicaState(replica, ReplicaDeletionSuccessful)
}
case NonExistentReplica =>
validReplicas.foreach { replica =>
val currentState = controllerContext.replicaState(replica)
val newAssignedReplicas = controllerContext
.partitionFullReplicaAssignment(replica.topicPartition)
.removeReplica(replica.replica)
controllerContext.updatePartitionFullReplicaAssignment(replica.topicPartition, newAssignedReplicas)
if (traceEnabled)
logSuccessfulTransition(stateLogger, replicaId, replica.topicPartition, currentState, NonExistentReplica)
controllerContext.removeReplicaState(replica)
}
}
}
/**
* Repeatedly attempt to remove a replica from the isr of multiple partitions until there are no more remaining partitions
* to retry.
* @param replicaId The replica being removed from isr of multiple partitions
* @param partitions The partitions from which we're trying to remove the replica from isr
* @return The updated LeaderIsrAndControllerEpochs of all partitions for which we successfully removed the replica from isr.
*/
private def removeReplicasFromIsr(
replicaId: Int,
partitions: Seq[TopicPartition]
): Map[TopicPartition, LeaderIsrAndControllerEpoch] = {
var results = Map.empty[TopicPartition, LeaderIsrAndControllerEpoch]
var remaining = partitions
while (remaining.nonEmpty) {
val (finishedRemoval, removalsToRetry) = doRemoveReplicasFromIsr(replicaId, remaining)
remaining = removalsToRetry
finishedRemoval.foreach {
case (partition, Left(e)) =>
val replica = PartitionAndReplica(partition, replicaId)
val currentState = controllerContext.replicaState(replica)
logFailedStateChange(replica, currentState, OfflineReplica, e)
case (partition, Right(leaderIsrAndEpoch)) =>
results += partition -> leaderIsrAndEpoch
}
}
results
}
/**
* Try to remove a replica from the isr of multiple partitions.
* Removing a replica from isr updates partition state in zookeeper.
*
* @param replicaId The replica being removed from isr of multiple partitions
* @param partitions The partitions from which we're trying to remove the replica from isr
* @return A tuple of two elements:
* 1. The updated Right[LeaderIsrAndControllerEpochs] of all partitions for which we successfully
* removed the replica from isr. Or Left[Exception] corresponding to failed removals that should
* not be retried
* 2. The partitions that we should retry due to a zookeeper BADVERSION conflict. Version conflicts can occur if
* the partition leader updated partition state while the controller attempted to update partition state.
*/
private def doRemoveReplicasFromIsr(
replicaId: Int,
partitions: Seq[TopicPartition]
): (Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]], Seq[TopicPartition]) = {
val (leaderAndIsrs, partitionsWithNoLeaderAndIsrInZk) = getTopicPartitionStatesFromZk(partitions)
val (leaderAndIsrsWithReplica, leaderAndIsrsWithoutReplica) = leaderAndIsrs.partition { case (_, result) =>
result.map { leaderAndIsr =>
leaderAndIsr.isr.contains(replicaId)
}.getOrElse(false)
}
val adjustedLeaderAndIsrs: Map[TopicPartition, LeaderAndIsr] = leaderAndIsrsWithReplica.flatMap {
case (partition, result) =>
result.toOption.map { leaderAndIsr =>
val newLeader = if (replicaId == leaderAndIsr.leader) LeaderAndIsr.NO_LEADER else leaderAndIsr.leader
val adjustedIsr =
if (leaderAndIsr.isr.size == 1) leaderAndIsr.isr
else leaderAndIsr.isr.stream().filter(_ != replicaId).collect(Collectors.toList[Integer])
partition -> leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr)
}
}
val UpdateLeaderAndIsrResult(finishedPartitions, updatesToRetry) = zkClient.updateLeaderAndIsr(
adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
val exceptionsForPartitionsWithNoLeaderAndIsrInZk: Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]] =
partitionsWithNoLeaderAndIsrInZk.iterator.flatMap { partition =>
if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
val exception = new StateChangeFailedException(
s"Failed to change state of replica $replicaId for partition $partition since the leader and isr " +
"path in zookeeper is empty"
)
Option(partition -> Left(exception))
} else None
}.toMap
val leaderIsrAndControllerEpochs: Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]] =
(leaderAndIsrsWithoutReplica ++ finishedPartitions).map { case (partition, result) =>
(partition, result.map { leaderAndIsr =>
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch)
leaderIsrAndControllerEpoch
})
}
if (isDebugEnabled) {
updatesToRetry.foreach { partition =>
debug(s"Controller failed to remove replica $replicaId from ISR of partition $partition. " +
s"Attempted to write state ${adjustedLeaderAndIsrs(partition)}, but failed with bad ZK version. This will be retried.")
}
}
(leaderIsrAndControllerEpochs ++ exceptionsForPartitionsWithNoLeaderAndIsrInZk, updatesToRetry)
}
/**
* Gets the partition state from zookeeper
* @param partitions the partitions whose state we want from zookeeper
* @return A tuple of two values:
* 1. The Right(LeaderAndIsrs) of partitions whose state we successfully read from zookeeper.
* The Left(Exception) to failed zookeeper lookups or states whose controller epoch exceeds our current epoch
* 2. The partitions that had no leader and isr state in zookeeper. This happens if the controller
* didn't finish partition initialization.
*/
private def getTopicPartitionStatesFromZk(
partitions: Seq[TopicPartition]
): (Map[TopicPartition, Either[Exception, LeaderAndIsr]], Seq[TopicPartition]) = {
val getDataResponses = try {
zkClient.getTopicPartitionStatesRaw(partitions)
} catch {
case e: Exception =>
return (partitions.iterator.map(_ -> Left(e)).toMap, Seq.empty)
}
val partitionsWithNoLeaderAndIsrInZk = mutable.Buffer.empty[TopicPartition]
val result = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]]
getDataResponses.foreach[Unit] { getDataResponse =>
val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
if (getDataResponse.resultCode == Code.OK) {
TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat) match {
case None =>
partitionsWithNoLeaderAndIsrInZk += partition
case Some(leaderIsrAndControllerEpoch) =>
if (leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch) {
val exception = new StateChangeFailedException(
"Leader and isr path written by another controller. This probably " +
s"means the current controller with epoch ${controllerContext.epoch} went through a soft failure and " +
s"another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}. Aborting " +
"state change by this controller"
)
result += (partition -> Left(exception))
} else {
result += (partition -> Right(leaderIsrAndControllerEpoch.leaderAndIsr))
}
}
} else if (getDataResponse.resultCode == Code.NONODE) {
partitionsWithNoLeaderAndIsrInZk += partition
} else {
result += (partition -> Left(getDataResponse.resultException.get))
}
}
(result.toMap, partitionsWithNoLeaderAndIsrInZk)
}
private def logSuccessfulTransition(logger: StateChangeLogger, replicaId: Int, partition: TopicPartition,
currState: ReplicaState, targetState: ReplicaState): Unit = {
logger.trace(s"Changed state of replica $replicaId for partition $partition from $currState to $targetState")
}
private def logInvalidTransition(replica: PartitionAndReplica, targetState: ReplicaState): Unit = {
val currState = controllerContext.replicaState(replica)
val e = new IllegalStateException(s"Replica $replica should be in the ${targetState.validPreviousStates.mkString(",")} " +
s"states before moving to $targetState state. Instead it is in $currState state")
logFailedStateChange(replica, currState, targetState, e)
}
private def logFailedStateChange(replica: PartitionAndReplica, currState: ReplicaState, targetState: ReplicaState, t: Throwable): Unit = {
stateChangeLogger.withControllerEpoch(controllerContext.epoch)
.error(s"Controller $controllerId epoch ${controllerContext.epoch} initiated state change of replica ${replica.replica} " +
s"for partition ${replica.topicPartition} from $currState to $targetState failed", t)
throw new UnsupportedOperationException()
}
}

View File

@ -49,7 +49,6 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.MetricConfigs
import org.apache.kafka.server.util.Csv
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import org.apache.zookeeper.client.ZKClientConfig
import scala.jdk.CollectionConverters._
import scala.collection.{Map, Seq}
@ -62,18 +61,6 @@ object KafkaConfig {
DynamicBrokerConfig.dynamicConfigUpdateModes))
}
private[kafka] def zooKeeperClientProperty(clientConfig: ZKClientConfig, kafkaPropName: String): Option[String] = {
Option(clientConfig.getProperty(ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(kafkaPropName)))
}
// For ZooKeeper TLS client authentication to be enabled the client must (at a minimum) configure itself as using TLS
// with both a client connection socket and a key store location explicitly set.
private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig): Boolean = {
zooKeeperClientProperty(zkClientConfig, ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG).contains("true") &&
zooKeeperClientProperty(zkClientConfig, ZkConfigs.ZK_CLIENT_CNXN_SOCKET_CONFIG).isDefined &&
zooKeeperClientProperty(zkClientConfig, ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_CONFIG).isDefined
}
val configDef = AbstractKafkaConfig.CONFIG_DEF
def configNames: Seq[String] = configDef.names.asScala.toBuffer.sorted

View File

@ -18,7 +18,6 @@ package kafka.zk
import java.util.{Collections, Optional, Properties}
import kafka.admin.RackAwareMode
import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.controller.ReplicaAssignment
import kafka.server.{DynamicConfig, KafkaConfig}
import kafka.utils._
@ -29,7 +28,6 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.server.common.AdminOperationException
import org.apache.kafka.server.config.{ConfigType, ZooKeeperInternals}
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.zookeeper.KeeperException.NodeExistsException
import scala.jdk.CollectionConverters._
import scala.collection.{Map, Seq}
@ -180,7 +178,6 @@ class AdminZkClient(zkClient: KafkaZkClient,
}
debug("Updated path %s with %s for replica assignment".format(TopicZNode.path(topic), assignment))
} catch {
case _: NodeExistsException => throw new TopicExistsException(s"Topic '$topic' already exists.")
case e2: Throwable => throw new AdminOperationException(e2.toString)
}
}
@ -194,8 +191,6 @@ class AdminZkClient(zkClient: KafkaZkClient,
try {
zkClient.createDeleteTopicPath(topic)
} catch {
case _: NodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
"topic %s is already marked for deletion".format(topic))
case e: Throwable => throw new AdminOperationException(e.getMessage)
}
} else {

File diff suppressed because it is too large Load Diff

View File

@ -27,6 +27,7 @@ import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpo
import kafka.server.DelegationTokenManagerZk
import kafka.utils.Json
import kafka.utils.json.JsonObject
import kafka.zookeeper.Stat
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.feature.Features._
import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
@ -36,17 +37,13 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.token.delegation.TokenInformation
import org.apache.kafka.common.utils.{SecurityUtils, Time}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.common.{MetadataVersion, ProducerIdsBlock}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_0_IV1, IBP_2_7_IV0}
import org.apache.kafka.server.config.ConfigType
import org.apache.zookeeper.ZooDefs
import org.apache.zookeeper.data.{ACL, Stat}
import scala.beans.BeanProperty
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Map, Seq, immutable, mutable}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}
@ -113,17 +110,8 @@ object BrokerInfo {
* any case).
*/
def apply(broker: Broker, metadataVersion: MetadataVersion, jmxPort: Int): BrokerInfo = {
val version = {
if (metadataVersion.isAtLeast(IBP_2_7_IV0))
5
else if (metadataVersion.isAtLeast(IBP_0_10_0_IV1))
4
else
2
}
BrokerInfo(broker, version, jmxPort)
throw new UnsupportedOperationException()
}
}
case class BrokerInfo(broker: Broker, version: Int, jmxPort: Int) {
@ -395,20 +383,7 @@ object TopicPartitionStateZNode {
}
def decode(bytes: Array[Byte], stat: Stat): Option[LeaderIsrAndControllerEpoch] = {
Json.parseBytes(bytes).map { js =>
val leaderIsrAndEpochInfo = js.asJsonObject
val leader = leaderIsrAndEpochInfo("leader").to[Int]
val epoch = leaderIsrAndEpochInfo("leader_epoch").to[Int]
val isr = leaderIsrAndEpochInfo("isr").to[List[Int]]
val recovery = leaderIsrAndEpochInfo
.get("leader_recovery_state")
.map(jsonValue => LeaderRecoveryState.of(jsonValue.to[Int].toByte))
.getOrElse(LeaderRecoveryState.RECOVERED)
val controllerEpoch = leaderIsrAndEpochInfo("controller_epoch").to[Int]
val zkPathVersion = stat.getVersion
LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, epoch, isr.map(Int.box).asJava, recovery, zkPathVersion), controllerEpoch)
}
throw new UnsupportedOperationException()
}
}
@ -763,8 +738,13 @@ case object ExtendedAclChangeStore extends ZkAclChangeStore {
object ResourceZNode {
def path(resource: ResourcePattern): String = ZkAclStore(resource.patternType).path(resource.resourceType, resource.name)
def encode(acls: Set[AclEntry]): Array[Byte] = Json.encodeAsBytes(AclEntry.toJsonCompatibleMap(acls.asJava))
def decode(bytes: Array[Byte], stat: Stat): ZkData.VersionedAcls = ZkData.VersionedAcls(AclEntry.fromBytes(bytes).asScala.toSet, stat.getVersion)
def encode(acls: Set[AclEntry]): Array[Byte] = {
throw new UnsupportedOperationException()
}
def decode(bytes: Array[Byte], stat: Stat): ZkData.VersionedAcls = {
throw new UnsupportedOperationException()
}
}
object ExtendedAclChangeEvent {
@ -1083,19 +1063,4 @@ object ZkData {
ConfigEntityTypeZNode.path(ConfigType.BROKER),
DelegationTokensZNode.path
)
def sensitivePath(path: String): Boolean = {
path != null && SensitiveRootPaths.exists(path.startsWith)
}
def defaultAcls(isSecure: Boolean, path: String): Seq[ACL] = {
//Old Consumer path is kept open as different consumers will write under this node.
if (!ConsumerPathZNode.path.equals(path) && isSecure) {
val acls = new ArrayBuffer[ACL]
acls ++= ZooDefs.Ids.CREATOR_ALL_ACL.asScala
if (!sensitivePath(path))
acls ++= ZooDefs.Ids.READ_ACL_UNSAFE.asScala
acls
} else ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala
}
}

View File

@ -17,30 +17,21 @@
package kafka.zookeeper
import java.util.Locale
import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
import java.util.concurrent._
import java.util.{List => JList}
import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
import kafka.utils.Logging
import kafka.zookeeper.ZooKeeperClient._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.util.KafkaScheduler
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.zookeeper.AsyncCallback.{Children2Callback, DataCallback, StatCallback}
import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
import org.apache.zookeeper.ZooKeeper.States
import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper._
import org.apache.zookeeper.client.ZKClientConfig
import scala.jdk.CollectionConverters._
import scala.collection.{Seq, mutable}
import scala.collection.Seq
object ZooKeeperClient {
val RetryBackoffMs = 1000
case class ACL() {}
case class CreateMode() {}
case class OpResult() {}
object Code {
val OK: Integer = 1
val NONODE: Integer = 1
}
case class Code() {}
case class Stat() {}
case class KeeperException() extends RuntimeException {}
/**
* A ZooKeeper client that encourages pipelined requests.
@ -59,63 +50,9 @@ class ZooKeeperClient(connectString: String,
time: Time,
metricGroup: String,
metricType: String,
private[zookeeper] val clientConfig: ZKClientConfig,
name: String) extends Logging {
private val metricsGroup: KafkaMetricsGroup = new KafkaMetricsGroup(metricGroup, metricType)
this.logIdent = s"[ZooKeeperClient $name] "
private val initializationLock = new ReentrantReadWriteLock()
private val isConnectedOrExpiredLock = new ReentrantLock()
private val isConnectedOrExpiredCondition = isConnectedOrExpiredLock.newCondition()
private val zNodeChangeHandlers = new ConcurrentHashMap[String, ZNodeChangeHandler]().asScala
private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala
private val inFlightRequests = new Semaphore(maxInFlightRequests)
private val stateChangeHandlers = new ConcurrentHashMap[String, StateChangeHandler]().asScala
private[zookeeper] val reinitializeScheduler = new KafkaScheduler(1, true, s"zk-client-${threadPrefix}reinit-")
private var isFirstConnectionEstablished = false
private val metricNames = mutable.Set[String]()
// The state map has to be created before creating ZooKeeper since it's needed in the ZooKeeper callback.
private val stateToMeterMap = {
import KeeperState._
val stateToEventTypeMap = Map(
Disconnected -> "Disconnects",
SyncConnected -> "SyncConnects",
AuthFailed -> "AuthFailures",
ConnectedReadOnly -> "ReadOnlyConnects",
SaslAuthenticated -> "SaslAuthentications",
Expired -> "Expires"
)
stateToEventTypeMap.map { case (state, eventType) =>
val name = s"ZooKeeper${eventType}PerSec"
metricNames += name
state -> metricsGroup.newMeter(name, eventType.toLowerCase(Locale.ROOT), TimeUnit.SECONDS)
}
}
info(s"Initializing a new session to $connectString.")
// Fail-fast if there's an error during construction (so don't call initialize, which retries forever)
@volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher,
clientConfig)
metricsGroup.newGauge("SessionState", () => connectionState.toString)
metricNames += "SessionState"
reinitializeScheduler.startup()
try waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
catch {
case e: Throwable =>
close()
throw e
}
/**
* Return the state of the ZooKeeper connection.
*/
def connectionState: States = zooKeeper.getState
/**
* Send a request and wait for its response. See handle(Seq[AsyncRequest]) for details.
@ -124,7 +61,7 @@ class ZooKeeperClient(connectString: String,
* @return an instance of the response with the specific type (e.g. CreateRequest -> CreateResponse).
*/
def handleRequest[Req <: AsyncRequest](request: Req): Req#Response = {
handleRequests(Seq(request)).head
throw new UnsupportedOperationException()
}
/**
@ -139,135 +76,7 @@ class ZooKeeperClient(connectString: String,
* will be used (e.g. Seq[AsyncRequest] -> Seq[AsyncResponse]).
*/
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = {
if (requests.isEmpty)
Seq.empty
else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)
requests.foreach { request =>
inFlightRequests.acquire()
try {
inReadLock(initializationLock) {
send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
}
}
} catch {
case e: Throwable =>
inFlightRequests.release()
throw e
}
}
countDownLatch.await()
responseQueue.asScala.toBuffer
}
}
// Visibility to override for testing
private[zookeeper] def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response => Unit): Unit = {
// Safe to cast as we always create a response of the right type
def callback(response: AsyncResponse): Unit = processResponse(response.asInstanceOf[Req#Response])
def responseMetadata(sendTimeMs: Long) = ResponseMetadata(sendTimeMs, receivedTimeMs = time.hiResClockMs())
val sendTimeMs = time.hiResClockMs()
// Cast to AsyncRequest to workaround a scalac bug that results in an false exhaustiveness warning
// with -Xlint:strict-unsealed-patmat
(request: AsyncRequest) match {
case ExistsRequest(path, ctx) =>
zooKeeper.exists(path, shouldWatch(request), new StatCallback {
def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit =
callback(ExistsResponse(Code.get(rc), path, Option(ctx), stat, responseMetadata(sendTimeMs)))
}, ctx.orNull)
case GetDataRequest(path, ctx) =>
zooKeeper.getData(path, shouldWatch(request), new DataCallback {
def processResult(rc: Int, path: String, ctx: Any, data: Array[Byte], stat: Stat): Unit =
callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, stat, responseMetadata(sendTimeMs)))
}, ctx.orNull)
case GetChildrenRequest(path, _, ctx) =>
zooKeeper.getChildren(path, shouldWatch(request), new Children2Callback {
def processResult(rc: Int, path: String, ctx: Any, children: JList[String], stat: Stat): Unit =
callback(GetChildrenResponse(Code.get(rc), path, Option(ctx), Option(children).map(_.asScala).getOrElse(Seq.empty),
stat, responseMetadata(sendTimeMs)))
}, ctx.orNull)
case CreateRequest(path, data, acl, createMode, ctx) =>
zooKeeper.create(path, data, acl.asJava, createMode,
(rc, path, ctx, name) =>
callback(CreateResponse(Code.get(rc), path, Option(ctx), name, responseMetadata(sendTimeMs))),
ctx.orNull)
case SetDataRequest(path, data, version, ctx) =>
zooKeeper.setData(path, data, version,
(rc, path, ctx, stat) =>
callback(SetDataResponse(Code.get(rc), path, Option(ctx), stat, responseMetadata(sendTimeMs))),
ctx.orNull)
case DeleteRequest(path, version, ctx) =>
zooKeeper.delete(path, version,
(rc, path, ctx) => callback(DeleteResponse(Code.get(rc), path, Option(ctx), responseMetadata(sendTimeMs))),
ctx.orNull)
case GetAclRequest(path, ctx) =>
zooKeeper.getACL(path, null,
(rc, path, ctx, acl, stat) =>
callback(GetAclResponse(Code.get(rc), path, Option(ctx), Option(acl).map(_.asScala).getOrElse(Seq.empty),
stat, responseMetadata(sendTimeMs))),
ctx.orNull)
case SetAclRequest(path, acl, version, ctx) =>
zooKeeper.setACL(path, acl.asJava, version,
(rc, path, ctx, stat) =>
callback(SetAclResponse(Code.get(rc), path, Option(ctx), stat, responseMetadata(sendTimeMs))),
ctx.orNull)
case MultiRequest(zkOps, ctx) =>
def toZkOpResult(opResults: JList[OpResult]): Seq[ZkOpResult] =
Option(opResults).map(results => zkOps.zip(results.asScala).map { case (zkOp, result) =>
ZkOpResult(zkOp, result)
}).orNull
zooKeeper.multi(zkOps.map(_.toZookeeperOp).asJava,
(rc, path, ctx, opResults) =>
callback(MultiResponse(Code.get(rc), path, Option(ctx), toZkOpResult(opResults), responseMetadata(sendTimeMs))),
ctx.orNull)
}
}
/**
* Wait indefinitely until the underlying zookeeper client to reaches the CONNECTED state.
* @throws ZooKeeperClientAuthFailedException if the authentication failed either before or while waiting for connection.
* @throws ZooKeeperClientExpiredException if the session expired either before or while waiting for connection.
*/
def waitUntilConnected(): Unit = inLock(isConnectedOrExpiredLock) {
waitUntilConnected(Long.MaxValue, TimeUnit.MILLISECONDS)
}
private def waitUntilConnected(timeout: Long, timeUnit: TimeUnit): Unit = {
info("Waiting until connected.")
var nanos = timeUnit.toNanos(timeout)
inLock(isConnectedOrExpiredLock) {
var state = connectionState
while (!state.isConnected && state.isAlive) {
if (nanos <= 0) {
throw new ZooKeeperClientTimeoutException(s"Timed out waiting for connection while in state: $state")
}
nanos = isConnectedOrExpiredCondition.awaitNanos(nanos)
state = connectionState
}
if (state == States.AUTH_FAILED) {
throw new ZooKeeperClientAuthFailedException("Auth failed either before or while waiting for connection")
} else if (state == States.CLOSED) {
throw new ZooKeeperClientExpiredException("Session expired either before or while waiting for connection")
}
isFirstConnectionEstablished = true
}
info("Connected.")
}
// If this method is changed, the documentation for registerZNodeChangeHandler and/or registerZNodeChildChangeHandler
// may need to be updated.
private def shouldWatch(request: AsyncRequest): Boolean = request match {
case GetChildrenRequest(_, registerWatch, _) => registerWatch && zNodeChildChangeHandlers.contains(request.path)
case _: ExistsRequest | _: GetDataRequest => zNodeChangeHandlers.contains(request.path)
case _ => throw new IllegalArgumentException(s"Request $request is not watchable")
throw new UnsupportedOperationException()
}
/**
@ -281,7 +90,7 @@ class ZooKeeperClient(connectString: String,
* @param zNodeChangeHandler the handler to register
*/
def registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): Unit = {
zNodeChangeHandlers.put(zNodeChangeHandler.path, zNodeChangeHandler)
throw new UnsupportedOperationException()
}
/**
@ -289,7 +98,7 @@ class ZooKeeperClient(connectString: String,
* @param path the path of the handler to unregister
*/
def unregisterZNodeChangeHandler(path: String): Unit = {
zNodeChangeHandlers.remove(path)
throw new UnsupportedOperationException()
}
/**
@ -300,7 +109,7 @@ class ZooKeeperClient(connectString: String,
* @param zNodeChildChangeHandler the handler to register
*/
def registerZNodeChildChangeHandler(zNodeChildChangeHandler: ZNodeChildChangeHandler): Unit = {
zNodeChildChangeHandlers.put(zNodeChildChangeHandler.path, zNodeChildChangeHandler)
throw new UnsupportedOperationException()
}
/**
@ -308,148 +117,30 @@ class ZooKeeperClient(connectString: String,
* @param path the path of the handler to unregister
*/
def unregisterZNodeChildChangeHandler(path: String): Unit = {
zNodeChildChangeHandlers.remove(path)
throw new UnsupportedOperationException()
}
/**
* @param stateChangeHandler
*/
def registerStateChangeHandler(stateChangeHandler: StateChangeHandler): Unit = inReadLock(initializationLock) {
if (stateChangeHandler != null)
stateChangeHandlers.put(stateChangeHandler.name, stateChangeHandler)
def registerStateChangeHandler(stateChangeHandler: StateChangeHandler): Unit = {
throw new UnsupportedOperationException()
}
/**
*
* @param name
*/
def unregisterStateChangeHandler(name: String): Unit = inReadLock(initializationLock) {
stateChangeHandlers.remove(name)
def unregisterStateChangeHandler(name: String): Unit = {
throw new UnsupportedOperationException()
}
def close(): Unit = {
info("Closing.")
// Shutdown scheduler outside of lock to avoid deadlock if scheduler
// is waiting for lock to process session expiry. Close expiry thread
// first to ensure that new clients are not created during close().
reinitializeScheduler.shutdown()
inWriteLock(initializationLock) {
zNodeChangeHandlers.clear()
zNodeChildChangeHandlers.clear()
stateChangeHandlers.clear()
zooKeeper.close()
metricNames.foreach(metricsGroup.removeMetric)
}
info("Closed.")
throw new UnsupportedOperationException()
}
def sessionId: Long = inReadLock(initializationLock) {
zooKeeper.getSessionId
}
// Only for testing
private[kafka] def currentZooKeeper: ZooKeeper = inReadLock(initializationLock) {
zooKeeper
}
private def reinitialize(): Unit = {
// Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion
// may require additional Zookeeper requests, which will block to acquire the initialization lock
stateChangeHandlers.values.foreach(callBeforeInitializingSession)
inWriteLock(initializationLock) {
if (!connectionState.isAlive) {
zooKeeper.close()
info(s"Initializing a new session to $connectString.")
// retry forever until ZooKeeper can be instantiated
var connected = false
while (!connected) {
try {
zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher, clientConfig)
connected = true
} catch {
case e: Exception =>
info("Error when recreating ZooKeeper, retrying after a short sleep", e)
Thread.sleep(RetryBackoffMs)
}
}
}
}
stateChangeHandlers.values.foreach(callAfterInitializingSession)
}
/**
* Close the zookeeper client to force session reinitialization. This is visible for testing only.
*/
private[zookeeper] def forceReinitialize(): Unit = {
zooKeeper.close()
reinitialize()
}
private def callBeforeInitializingSession(handler: StateChangeHandler): Unit = {
try {
handler.beforeInitializingSession()
} catch {
case t: Throwable =>
error(s"Uncaught error in handler ${handler.name}", t)
}
}
private def callAfterInitializingSession(handler: StateChangeHandler): Unit = {
try {
handler.afterInitializingSession()
} catch {
case t: Throwable =>
error(s"Uncaught error in handler ${handler.name}", t)
}
}
// Visibility for testing
private[zookeeper] def scheduleReinitialize(name: String, message: String, delayMs: Long): Unit = {
reinitializeScheduler.scheduleOnce(name, () => {
info(message)
reinitialize()
}, delayMs)
}
private def threadPrefix: String = name.replaceAll("\\s", "") + "-"
// package level visibility for testing only
private[zookeeper] object ZooKeeperClientWatcher extends Watcher {
override def process(event: WatchedEvent): Unit = {
debug(s"Received event: $event")
Option(event.getPath) match {
case None =>
val state = event.getState
stateToMeterMap.get(state).foreach(_.mark())
inLock(isConnectedOrExpiredLock) {
isConnectedOrExpiredCondition.signalAll()
}
if (state == KeeperState.AuthFailed) {
error(s"Auth failed, initialized=$isFirstConnectionEstablished connectionState=$connectionState")
stateChangeHandlers.values.foreach(_.onAuthFailure())
// If this is during initial startup, we fail fast. Otherwise, schedule retry.
val initialized = inLock(isConnectedOrExpiredLock) {
isFirstConnectionEstablished
}
if (initialized && !connectionState.isAlive)
scheduleReinitialize("auth-failed", "Reinitializing due to auth failure.", RetryBackoffMs)
} else if (state == KeeperState.Expired) {
scheduleReinitialize("session-expired", "Session expired.", delayMs = 0L)
}
case Some(path) =>
(event.getType: @unchecked) match {
case EventType.NodeChildrenChanged => zNodeChildChangeHandlers.get(path).foreach(_.handleChildChange())
case EventType.NodeCreated => zNodeChangeHandlers.get(path).foreach(_.handleCreation())
case EventType.NodeDeleted => zNodeChangeHandlers.get(path).foreach(_.handleDeletion())
case EventType.NodeDataChanged => zNodeChangeHandlers.get(path).foreach(_.handleDataChange())
}
}
}
def sessionId: Long = {
throw new UnsupportedOperationException()
}
}
@ -474,23 +165,18 @@ trait ZNodeChildChangeHandler {
// Thin wrapper for zookeeper.Op
sealed trait ZkOp {
def toZookeeperOp: Op
}
case class CreateOp(path: String, data: Array[Byte], acl: Seq[ACL], createMode: CreateMode) extends ZkOp {
override def toZookeeperOp: Op = Op.create(path, data, acl.asJava, createMode)
}
case class DeleteOp(path: String, version: Int) extends ZkOp {
override def toZookeeperOp: Op = Op.delete(path, version)
}
case class SetDataOp(path: String, data: Array[Byte], version: Int) extends ZkOp {
override def toZookeeperOp: Op = Op.setData(path, data, version)
}
case class CheckOp(path: String, version: Int) extends ZkOp {
override def toZookeeperOp: Op = Op.check(path, version)
}
case class ZkOpResult(zkOp: ZkOp, rawOpResult: OpResult)
@ -550,19 +236,9 @@ sealed abstract class AsyncResponse {
def path: String
def ctx: Option[Any]
/** Return None if the result code is OK and KeeperException otherwise. */
def resultException: Option[KeeperException] =
if (resultCode == Code.OK) None else Some(KeeperException.create(resultCode, path))
/**
* Throw KeeperException if the result code is not OK.
*/
def maybeThrow(): Unit = {
if (resultCode != Code.OK)
throw KeeperException.create(resultCode, path)
}
def metadata: ResponseMetadata
def resultException: Option[RuntimeException] = None
}
case class ResponseMetadata(sendTimeMs: Long, receivedTimeMs: Long) {
@ -588,7 +264,4 @@ case class GetChildrenResponse(resultCode: Code, path: String, ctx: Option[Any],
case class MultiResponse(resultCode: Code, path: String, ctx: Option[Any], zkOpResults: Seq[ZkOpResult],
metadata: ResponseMetadata) extends AsyncResponse
class ZooKeeperClientException(message: String) extends RuntimeException(message)
class ZooKeeperClientExpiredException(message: String) extends ZooKeeperClientException(message)
class ZooKeeperClientAuthFailedException(message: String) extends ZooKeeperClientException(message)
class ZooKeeperClientTimeoutException(message: String) extends ZooKeeperClientException(message)
case class ZooKeeperClientException(message: String) extends RuntimeException(message)

View File

@ -20,7 +20,6 @@ package kafka.api
import kafka.security.JaasTestUtils
import kafka.security.JaasTestUtils.JaasSection
import kafka.security.minikdc.MiniKdc
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, ScramCredentialInfo, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
@ -30,9 +29,7 @@ import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.authenticator.LoginManager
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.config.ConfigType
import org.apache.zookeeper.client.ZKClientConfig
import java.io.File
import java.util
@ -203,10 +200,7 @@ trait SaslSetup {
}
def createScramCredentials(zkConnect: String, userName: String, password: String): Unit = {
val zkClientConfig = new ZKClientConfig()
Using.resource(KafkaZkClient(
zkConnect, JaasUtils.isZkSaslEnabled || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig), 30000, 30000,
Int.MaxValue, Time.SYSTEM, name = "SaslSetup", zkClientConfig = zkClientConfig, enableEntityConfigControllerCheck = false)) { zkClient =>
Using.resource(new KafkaZkClient()) { zkClient =>
val adminZkClient = new AdminZkClient(zkClient)
val entityType = ConfigType.USER

View File

@ -22,7 +22,6 @@ import kafka.network.SocketServer
import kafka.server.IntegrationTestUtils.connectAndReceive
import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes}
import kafka.utils.TestUtils
import org.apache.commons.io.FileUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
@ -57,7 +56,7 @@ import org.slf4j.LoggerFactory
import java.io.File
import java.nio.charset.StandardCharsets
import java.nio.file.{FileSystems, Files, Path}
import java.nio.file.{FileSystems, Files, Path, Paths}
import java.{lang, util}
import java.util.concurrent.{CompletableFuture, CompletionStage, ExecutionException, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
@ -1435,6 +1434,15 @@ class KRaftClusterTest {
}
}
def copyDirectory(src: String, dest: String): Unit = {
Files.walk(Paths.get(src)).forEach(p => {
val out = Paths.get(dest, p.toString().substring(src.length()))
if (!p.toString().equals(src)) {
Files.copy(p, out);
}
});
}
@Test
def testAbandonedFutureReplicaRecovered_mainReplicaInOnlineLogDir(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
@ -1476,7 +1484,8 @@ class KRaftClusterTest {
val parentDir = log.parentDir
val targetParentDir = broker0.config.logDirs.filter(_ != parentDir).head
val targetDirFile = new File(targetParentDir, log.dir.getName)
FileUtils.copyDirectory(log.dir, targetDirFile)
targetDirFile.mkdir()
copyDirectory(log.dir.toString(), targetDirFile.toString())
assertTrue(targetDirFile.exists())
// Rename original log to a future

View File

@ -60,8 +60,6 @@ versions += [
caffeine: "3.1.1",
bndlib: "7.0.0",
checkstyle: project.hasProperty('checkstyleVersion') ? checkstyleVersion : "10.20.2",
commonsCli: "1.4",
commonsIo: "2.14.0", // ZooKeeper dependency. Do not use, this is going away.
commonsValidator: "1.9.0",
classgraph: "4.8.173",
dropwizardMetrics: "4.1.12.1",
@ -114,7 +112,6 @@ versions += [
mavenArtifact: "3.9.6",
metrics: "2.2.0",
mockito: "5.14.2",
netty: "4.1.115.Final",
opentelemetryProto: "1.0.0-alpha",
protobuf: "3.25.5", // a dependency of opentelemetryProto
pcollections: "4.0.1",
@ -129,7 +126,6 @@ versions += [
snappy: "1.1.10.5",
spotbugs: "4.8.6",
zinc: "1.9.2",
zookeeper: "3.8.4",
// When updating the zstd version, please do as well in docker/native/native-image-configs/resource-config.json
// Also make sure the compression levels in org.apache.kafka.common.record.CompressionType are still valid
zstd: "1.5.6-6",
@ -153,8 +149,6 @@ libs += [
bndlib:"biz.aQute.bnd:biz.aQute.bndlib:$versions.bndlib",
caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
commonsCli: "commons-cli:commons-cli:$versions.commonsCli",
commonsIo: "commons-io:commons-io:$versions.commonsIo",
commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator",
jacksonAnnotations: "com.fasterxml.jackson.core:jackson-annotations:$versions.jackson",
jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson",
@ -218,8 +212,6 @@ libs += [
dropwizardMetrics: "io.dropwizard.metrics:metrics-core:$versions.dropwizardMetrics",
mockitoCore: "org.mockito:mockito-core:$versions.mockito",
mockitoJunitJupiter: "org.mockito:mockito-junit-jupiter:$versions.mockito",
nettyHandler: "io.netty:netty-handler:$versions.netty",
nettyTransportNativeEpoll: "io.netty:netty-transport-native-epoll:$versions.netty",
pcollections: "org.pcollections:pcollections:$versions.pcollections",
opentelemetryProto: "io.opentelemetry.proto:opentelemetry-proto:$versions.opentelemetryProto",
protobuf: "com.google.protobuf:protobuf-java:$versions.protobuf",
@ -233,7 +225,6 @@ libs += [
snappy: "org.xerial.snappy:snappy-java:$versions.snappy",
spotbugs: "com.github.spotbugs:spotbugs-annotations:$versions.spotbugs",
swaggerAnnotations: "io.swagger.core.v3:swagger-annotations:$swaggerVersion",
zookeeper: "org.apache.zookeeper:zookeeper:$versions.zookeeper",
jfreechart: "jfreechart:jfreechart:$versions.jfreechart",
mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact",
zstd: "com.github.luben:zstd-jni:$versions.zstd",