MINOR: Rename and change package of async ZooKeeper classes

- kafka.controller.ZookeeperClient -> kafka.zookeeper.ZooKeeperClient
- kafka.controller.ControllerZkUtils -> kafka.zk.KafkaZkClient
- kafka.controller.ZkData -> kafka.zk.ZkData
- Renamed various fields to match new names and for consistency
- A few clean-ups in ZkData
- Document intent

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Onur Karaman <okaraman@linkedin.com>, Manikumar Reddy <manikumar.reddy@gmail.com>, Jun Rao <junrao@gmail.com>

Closes #4112 from ijuma/rename-zookeeper-client-and-move-to-zookeper-package
This commit is contained in:
Ismael Juma 2017-10-25 21:11:16 -07:00 committed by Jun Rao
parent f7f8e11213
commit ab6f848ba6
30 changed files with 371 additions and 334 deletions

View File

@ -153,11 +153,11 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
}
override def handleSessionEstablishmentError(error: Throwable) {
fatal("Could not establish session with zookeeper", error)
fatal("Could not establish session with ZooKeeper", error)
}
override def handleStateChanged(state: KeeperState) {
debug(s"New zookeeper state: ${state}")
debug(s"New ZooKeeper state: ${state}")
}
}

View File

@ -22,10 +22,12 @@ import com.yammer.metrics.core.Gauge
import kafka.admin.AdminOperationException
import kafka.api._
import kafka.common._
import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.server._
import kafka.utils._
import kafka.zk._
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import kafka.zookeeper.{ZNodeChangeHandler, ZNodeChildChangeHandler}
import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@ -41,7 +43,7 @@ object KafkaController extends Logging {
val InitialControllerEpochZkVersion = 1
}
class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
this.logIdent = s"[Controller id=${config.brokerId}] "
private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)
@ -55,10 +57,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
private[controller] val eventManager = new ControllerEventManager(controllerContext.stats.rateAndTimeMetrics,
_ => updateMetrics())
val topicDeletionManager = new TopicDeletionManager(this, eventManager, zkUtils)
val topicDeletionManager = new TopicDeletionManager(this, eventManager, zkClient)
private val brokerRequestBatch = new ControllerBrokerRequestBatch(this, stateChangeLogger)
val replicaStateMachine = new ReplicaStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, zkUtils, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
val partitionStateMachine = new PartitionStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, zkUtils, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
val replicaStateMachine = new ReplicaStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, zkClient, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
val partitionStateMachine = new PartitionStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, zkClient, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
private val controllerChangeHandler = new ControllerChangeHandler(this, eventManager)
private val brokerChangeHandler = new BrokerChangeHandler(this, eventManager)
@ -155,23 +157,23 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
* This ensures another controller election will be triggered and there will always be an actively serving controller
*/
def onControllerFailover() {
info("Reading controller epoch from zookeeper")
readControllerEpochFromZookeeper()
info("Incrementing controller epoch in zookeeper")
info("Reading controller epoch from ZooKeeper")
readControllerEpochFromZooKeeper()
info("Incrementing controller epoch in ZooKeeper")
incrementControllerEpoch()
info("Registering handlers")
// before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
val childChangeHandlers = Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler,
isrChangeNotificationHandler)
childChangeHandlers.foreach(zkUtils.registerZNodeChildChangeHandler)
childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler)
val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler)
nodeChangeHandlers.foreach(zkUtils.registerZNodeChangeHandlerAndCheckExistence)
nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)
info("Deleting log dir event notifications")
zkUtils.deleteLogDirEventNotifications()
zkClient.deleteLogDirEventNotifications()
info("Deleting isr change notifications")
zkUtils.deleteIsrChangeNotifications()
zkClient.deleteIsrChangeNotifications()
info("Initializing controller context")
initializeControllerContext()
info("Fetching topic deletions in progress")
@ -213,10 +215,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
def onControllerResignation() {
debug("Resigning")
// de-register listeners
zkUtils.unregisterZNodeChildChangeHandler(isrChangeNotificationHandler.path)
zkUtils.unregisterZNodeChangeHandler(partitionReassignmentHandler.path)
zkUtils.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path)
zkUtils.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path)
zkClient.unregisterZNodeChildChangeHandler(isrChangeNotificationHandler.path)
zkClient.unregisterZNodeChangeHandler(partitionReassignmentHandler.path)
zkClient.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path)
zkClient.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path)
// reset topic deletion manager
topicDeletionManager.reset()
@ -232,12 +234,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
unregisterPartitionReassignmentIsrChangeHandlers()
// shutdown partition state machine
partitionStateMachine.shutdown()
zkUtils.unregisterZNodeChildChangeHandler(topicChangeHandler.path)
zkClient.unregisterZNodeChildChangeHandler(topicChangeHandler.path)
unregisterPartitionModificationsHandlers(partitionModificationsHandlers.keys.toSeq)
zkUtils.unregisterZNodeChildChangeHandler(topicDeletionHandler.path)
zkClient.unregisterZNodeChildChangeHandler(topicDeletionHandler.path)
// shutdown replica state machine
replicaStateMachine.shutdown()
zkUtils.unregisterZNodeChildChangeHandler(brokerChangeHandler.path)
zkClient.unregisterZNodeChildChangeHandler(brokerChangeHandler.path)
resetControllerContext()
@ -465,7 +467,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
val partitionReassignmentIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(this, eventManager, partition)
reassignedPartitionContext.partitionReassignmentIsrChangeHandler = partitionReassignmentIsrChangeHandler
// register listener on the leader and isr path to wait until they catch up with the current leader
zkUtils.registerZNodeChangeHandler(partitionReassignmentIsrChangeHandler)
zkClient.registerZNodeChangeHandler(partitionReassignmentIsrChangeHandler)
}
def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
@ -536,7 +538,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
def incrementControllerEpoch(): Unit = {
val newControllerEpoch = controllerContext.epoch + 1
val setDataResponse = zkUtils.setControllerEpochRaw(newControllerEpoch, controllerContext.epochZkVersion)
val setDataResponse = zkClient.setControllerEpochRaw(newControllerEpoch, controllerContext.epochZkVersion)
setDataResponse.resultCode match {
case Code.OK =>
controllerContext.epochZkVersion = setDataResponse.stat.getVersion
@ -545,7 +547,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
// if path doesn't exist, this is the first controller whose epoch should be 1
// the following call can still fail if another controller gets elected between checking if the path exists and
// trying to create the controller epoch path
val createResponse = zkUtils.createControllerEpochRaw(KafkaController.InitialControllerEpoch)
val createResponse = zkClient.createControllerEpochRaw(KafkaController.InitialControllerEpoch)
createResponse.resultCode match {
case Code.OK =>
controllerContext.epoch = KafkaController.InitialControllerEpoch
@ -565,10 +567,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
private def initializeControllerContext() {
// update controller cache with delete topic information
controllerContext.liveBrokers = zkUtils.getAllBrokersInCluster.toSet
controllerContext.allTopics = zkUtils.getAllTopicsInCluster.toSet
controllerContext.liveBrokers = zkClient.getAllBrokersInCluster.toSet
controllerContext.allTopics = zkClient.getAllTopicsInCluster.toSet
registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
controllerContext.partitionReplicaAssignment = mutable.Map.empty ++ zkUtils.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet)
controllerContext.partitionReplicaAssignment = mutable.Map.empty ++ zkClient.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet)
controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
// update the leader and isr cache for all existing partitions from Zookeeper
@ -582,7 +584,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
}
private def fetchPendingPreferredReplicaElections(): Set[TopicAndPartition] = {
val partitionsUndergoingPreferredReplicaElection = zkUtils.getPreferredReplicaElection
val partitionsUndergoingPreferredReplicaElection = zkClient.getPreferredReplicaElection
// check if they are already completed or topic was deleted
val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition =>
val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition)
@ -618,7 +620,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
private def initializePartitionReassignment() {
// read the partitions being reassigned from zookeeper path /admin/reassign_partitions
val partitionsBeingReassigned = zkUtils.getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
val partitionsBeingReassigned = zkClient.getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
// check if they are already completed or topic was deleted
val reassignedPartitions = partitionsBeingReassigned.filter { partition =>
val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition._1)
@ -637,7 +639,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
}
private def fetchTopicDeletionsInProgress(): (Set[String], Set[String]) = {
val topicsToBeDeleted = zkUtils.getTopicDeletions.toSet
val topicsToBeDeleted = zkClient.getTopicDeletions.toSet
val topicsWithOfflineReplicas = controllerContext.partitionReplicaAssignment.filter { case (partition, replicas) =>
replicas.exists(r => !controllerContext.isReplicaOnline(r, partition))
}.keySet.map(_.topic)
@ -661,14 +663,14 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
}
def updateLeaderAndIsrCache(partitions: Seq[TopicAndPartition] = controllerContext.partitionReplicaAssignment.keys.toSeq) {
val leaderIsrAndControllerEpochs = zkUtils.getTopicPartitionStates(partitions)
val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
leaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) =>
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
}
}
private def areReplicasInIsr(partition: TopicAndPartition, replicas: Seq[Int]): Boolean = {
zkUtils.getTopicPartitionStates(Seq(partition)).get(partition).exists { leaderIsrAndControllerEpoch =>
zkClient.getTopicPartitionStates(Seq(partition)).get(partition).exists { leaderIsrAndControllerEpoch =>
replicas.forall(leaderIsrAndControllerEpoch.leaderAndIsr.isr.contains)
}
}
@ -720,7 +722,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
replicas: Seq[Int]) {
val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic == partition.topic)
partitionsAndReplicasForThisTopic.put(partition, replicas)
val setDataResponse = zkUtils.setTopicAssignmentRaw(partition.topic, partitionsAndReplicasForThisTopic.toMap)
val setDataResponse = zkClient.setTopicAssignmentRaw(partition.topic, partitionsAndReplicasForThisTopic.toMap)
if (setDataResponse.resultCode == Code.OK) {
info("Updated assigned replicas for partition %s being reassigned to %s ".format(partition, replicas.mkString(",")))
// update the assigned replica list after a successful zookeeper write
@ -769,13 +771,13 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
val partitionModificationsHandler = new PartitionModificationsHandler(this, eventManager, topic)
partitionModificationsHandlers.put(topic, partitionModificationsHandler)
}
partitionModificationsHandlers.values.foreach(zkUtils.registerZNodeChangeHandler)
partitionModificationsHandlers.values.foreach(zkClient.registerZNodeChangeHandler)
}
def unregisterPartitionModificationsHandlers(topics: Seq[String]) = {
topics.foreach { topic =>
partitionModificationsHandlers.remove(topic)
.foreach(handler => zkUtils.unregisterZNodeChangeHandler(handler.path))
.foreach(handler => zkClient.unregisterZNodeChangeHandler(handler.path))
}
}
@ -784,13 +786,13 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
case (topicAndPartition, reassignedPartitionsContext) =>
val partitionReassignmentIsrChangeHandler =
reassignedPartitionsContext.partitionReassignmentIsrChangeHandler
zkUtils.unregisterZNodeChangeHandler(partitionReassignmentIsrChangeHandler.path)
zkClient.unregisterZNodeChangeHandler(partitionReassignmentIsrChangeHandler.path)
}
}
private def readControllerEpochFromZookeeper() {
private def readControllerEpochFromZooKeeper() {
// initialize the controller epoch and zk version by reading from zookeeper
val epochAndStatOpt = zkUtils.getControllerEpoch
val epochAndStatOpt = zkClient.getControllerEpoch
epochAndStatOpt.foreach { case (epoch, stat) =>
controllerContext.epoch = epoch
controllerContext.epochZkVersion = stat.getVersion
@ -803,21 +805,21 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
// stop watching the ISR changes for this partition
val partitionReassignmentIsrChangeHandler =
controllerContext.partitionsBeingReassigned(topicAndPartition).partitionReassignmentIsrChangeHandler
zkUtils.unregisterZNodeChangeHandler(partitionReassignmentIsrChangeHandler.path)
zkClient.unregisterZNodeChangeHandler(partitionReassignmentIsrChangeHandler.path)
}
// read the current list of reassigned partitions from zookeeper
val partitionsBeingReassigned = zkUtils.getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
val partitionsBeingReassigned = zkClient.getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
// remove this partition from that list
val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition
// write the new list to zookeeper
val reassignment = updatedPartitionsBeingReassigned.mapValues(_.newReplicas)
if (reassignment.isEmpty) {
info("No more partitions need to be reassigned. Deleting zk path %s".format(ReassignPartitionsZNode.path))
zkUtils.deletePartitionReassignment()
zkClient.deletePartitionReassignment()
} else {
val setDataResponse = zkUtils.setPartitionReassignmentRaw(reassignment)
val setDataResponse = zkClient.setPartitionReassignmentRaw(reassignment)
if (setDataResponse.resultCode == Code.NONODE) {
val createDataResponse = zkUtils.createPartitionReassignment(reassignment)
val createDataResponse = zkClient.createPartitionReassignment(reassignment)
createDataResponse.resultException.foreach(e => throw new AdminOperationException(e))
} else {
setDataResponse.resultException.foreach(e => throw new AdminOperationException(e))
@ -840,7 +842,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
}
}
if (!isTriggeredByAutoRebalance)
zkUtils.deletePreferredReplicaElection()
zkClient.deletePreferredReplicaElection()
}
/**
@ -872,7 +874,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
var zkWriteCompleteOrUnnecessary = false
while (!zkWriteCompleteOrUnnecessary) {
// refresh leader and isr from zookeeper again
zkWriteCompleteOrUnnecessary = zkUtils.getTopicPartitionStates(Seq(partition)).get(partition) match {
zkWriteCompleteOrUnnecessary = zkClient.getTopicPartitionStates(Seq(partition)).get(partition) match {
case Some(leaderIsrAndControllerEpoch) =>
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
@ -885,7 +887,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
val newLeaderAndIsr = leaderAndIsr.newEpochAndZkVersion
// update the new leadership decision in zookeeper or retry
val UpdateLeaderAndIsrResult(successfulUpdates, _, failedUpdates) =
zkUtils.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch)
zkClient.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch)
if (successfulUpdates.contains(partition)) {
val finalLeaderAndIsr = successfulUpdates(partition)
finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(finalLeaderAndIsr, epoch))
@ -1065,7 +1067,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
def state = ControllerState.ControllerChange
override def process(): Unit = {
zkUtils.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
elect()
}
@ -1111,7 +1113,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
private def triggerControllerMove(): Unit = {
onControllerResignation()
activeControllerId = -1
zkUtils.deleteController()
zkClient.deleteController()
}
def expire(): Unit = {
@ -1126,7 +1128,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
def elect(): Unit = {
val timestamp = time.milliseconds
activeControllerId = zkUtils.getControllerId.getOrElse(-1)
activeControllerId = zkClient.getControllerId.getOrElse(-1)
/*
* We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,
* it's possible that the controller has already been elected when we get here. This check will prevent the following
@ -1138,14 +1140,14 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
}
try {
zkUtils.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(config.brokerId, timestamp))
zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(config.brokerId, timestamp))
info(config.brokerId + " successfully elected as the controller")
activeControllerId = config.brokerId
onControllerFailover()
} catch {
case _: NodeExistsException =>
// If someone else has written the path, then
activeControllerId = zkUtils.getControllerId.getOrElse(-1)
activeControllerId = zkClient.getControllerId.getOrElse(-1)
if (activeControllerId != -1)
debug("Broker %d was elected as controller instead of broker %d".format(activeControllerId, config.brokerId))
@ -1163,7 +1165,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
override def process(): Unit = {
if (!isActive) return
val curBrokers = zkUtils.getAllBrokersInCluster.toSet
val curBrokers = zkClient.getAllBrokersInCluster.toSet
val curBrokerIds = curBrokers.map(_.id)
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
@ -1189,13 +1191,13 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
override def process(): Unit = {
if (!isActive) return
val topics = zkUtils.getAllTopicsInCluster.toSet
val topics = zkClient.getAllTopicsInCluster.toSet
val newTopics = topics -- controllerContext.allTopics
val deletedTopics = controllerContext.allTopics -- topics
controllerContext.allTopics = topics
registerPartitionModificationsHandlers(newTopics.toSeq)
val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics)
val addedPartitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(newTopics)
controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
!deletedTopics.contains(p._1.topic))
controllerContext.partitionReplicaAssignment ++= addedPartitionReplicaAssignment
@ -1211,13 +1213,13 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
override def process(): Unit = {
if (!isActive) return
val sequenceNumbers = zkUtils.getAllLogDirEventNotifications
val sequenceNumbers = zkClient.getAllLogDirEventNotifications
try {
val brokerIds = zkUtils.getBrokerIdsFromLogDirEvents(sequenceNumbers)
val brokerIds = zkClient.getBrokerIdsFromLogDirEvents(sequenceNumbers)
onBrokerLogDirFailure(brokerIds)
} finally {
// delete processed children
zkUtils.deleteLogDirEventNotifications(sequenceNumbers)
zkClient.deleteLogDirEventNotifications(sequenceNumbers)
}
}
}
@ -1227,7 +1229,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
override def process(): Unit = {
if (!isActive) return
val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(immutable.Set(topic))
val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic))
val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
!controllerContext.partitionReplicaAssignment.contains(p._1))
if(topicDeletionManager.isTopicQueuedUpForDeletion(topic))
@ -1248,12 +1250,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
override def process(): Unit = {
if (!isActive) return
var topicsToBeDeleted = zkUtils.getTopicDeletions.toSet
var topicsToBeDeleted = zkClient.getTopicDeletions.toSet
debug(s"Delete topics listener fired for topics ${topicsToBeDeleted.mkString(",")} to be deleted")
val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
if (nonExistentTopics.nonEmpty) {
warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(",")}")
zkUtils.deleteTopicDeletions(nonExistentTopics.toSeq)
zkClient.deleteTopicDeletions(nonExistentTopics.toSeq)
}
topicsToBeDeleted --= nonExistentTopics
if (config.deleteTopicEnable) {
@ -1272,7 +1274,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
} else {
// If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
info(s"Removing $topicsToBeDeleted since delete topic is disabled")
zkUtils.deleteTopicDeletions(topicsToBeDeleted.toSeq)
zkClient.deleteTopicDeletions(topicsToBeDeleted.toSeq)
}
}
}
@ -1282,8 +1284,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
override def process(): Unit = {
if (!isActive) return
zkUtils.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)
val partitionReassignment = zkUtils.getPartitionReassignment
zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)
val partitionReassignment = zkClient.getPartitionReassignment
val partitionsToBeReassigned = partitionReassignment.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
partitionsToBeReassigned.foreach { partitionToBeReassigned =>
if(topicDeletionManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
@ -1306,7 +1308,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
// check if this partition is still being reassigned or not
controllerContext.partitionsBeingReassigned.get(partition).foreach { reassignedPartitionContext =>
val reassignedReplicas = reassignedPartitionContext.newReplicas.toSet
zkUtils.getTopicPartitionStates(Seq(partition)).get(partition) match {
zkClient.getTopicPartitionStates(Seq(partition)).get(partition) match {
case Some(leaderIsrAndControllerEpoch) => // check if new replicas have joined ISR
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
@ -1334,16 +1336,16 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
override def process(): Unit = {
if (!isActive) return
val sequenceNumbers = zkUtils.getAllIsrChangeNotifications
val sequenceNumbers = zkClient.getAllIsrChangeNotifications
try {
val partitions = zkUtils.getPartitionsFromIsrChangeNotifications(sequenceNumbers)
val partitions = zkClient.getPartitionsFromIsrChangeNotifications(sequenceNumbers)
if (partitions.nonEmpty) {
updateLeaderAndIsrCache(partitions)
processUpdateNotifications(partitions)
}
} finally {
// delete the notifications
zkUtils.deleteIsrChangeNotifications(sequenceNumbers)
zkClient.deleteIsrChangeNotifications(sequenceNumbers)
}
}
@ -1359,8 +1361,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
override def process(): Unit = {
if (!isActive) return
zkUtils.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)
val partitions = zkUtils.getPreferredReplicaElection
zkClient.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)
val partitions = zkClient.getPreferredReplicaElection
val partitionsForTopicsToBeDeleted = partitions.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
if (partitionsForTopicsToBeDeleted.nonEmpty) {
error("Skipping preferred replica election for partitions %s since the respective topics are being deleted"
@ -1375,8 +1377,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
override def process(): Unit = {
val wasActiveBeforeChange = isActive
zkUtils.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
activeControllerId = zkUtils.getControllerId.getOrElse(-1)
zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
activeControllerId = zkClient.getControllerId.getOrElse(-1)
if (wasActiveBeforeChange && !isActive) {
onControllerResignation()
}
@ -1388,8 +1390,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
override def process(): Unit = {
val wasActiveBeforeChange = isActive
zkUtils.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
activeControllerId = zkUtils.getControllerId.getOrElse(-1)
zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
activeControllerId = zkClient.getControllerId.getOrElse(-1)
if (wasActiveBeforeChange && !isActive) {
onControllerResignation()
}

View File

@ -18,9 +18,10 @@ package kafka.controller
import kafka.api.LeaderAndIsr
import kafka.common.{StateChangeFailedException, TopicAndPartition}
import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult
import kafka.server.KafkaConfig
import kafka.utils.Logging
import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.Code
@ -43,7 +44,7 @@ class PartitionStateMachine(config: KafkaConfig,
stateChangeLogger: StateChangeLogger,
controllerContext: ControllerContext,
topicDeletionManager: TopicDeletionManager,
zkUtils: KafkaControllerZkUtils,
zkClient: KafkaZkClient,
partitionState: mutable.Map[TopicAndPartition, PartitionState],
controllerBrokerRequestBatch: ControllerBrokerRequestBatch) extends Logging {
private val controllerId = config.brokerId
@ -217,7 +218,7 @@ class PartitionStateMachine(config: KafkaConfig,
partition -> leaderIsrAndControllerEpoch
}.toMap
val createResponses = try {
zkUtils.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs)
zkClient.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs)
} catch {
case e: Exception =>
partitionsWithLiveReplicas.foreach { case (partition,_) => logFailedStateChange(partition, partitionState(partition), NewPartition, e) }
@ -278,7 +279,7 @@ class PartitionStateMachine(config: KafkaConfig,
private def doElectLeaderForPartitions(partitions: Seq[TopicAndPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy):
(Seq[TopicAndPartition], Seq[TopicAndPartition], Map[TopicAndPartition, Exception]) = {
val getDataResponses = try {
zkUtils.getTopicPartitionStatesRaw(partitions)
zkClient.getTopicPartitionStatesRaw(partitions)
} catch {
case e: Exception =>
return (Seq.empty, Seq.empty, partitions.map(_ -> e).toMap)
@ -331,7 +332,7 @@ class PartitionStateMachine(config: KafkaConfig,
}
val recipientsPerPartition = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, recipients) => partition -> recipients }.toMap
val adjustedLeaderAndIsrs = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, recipients) => partition -> leaderAndIsrOpt.get }.toMap
val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkUtils.updateLeaderAndIsr(
val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr(
adjustedLeaderAndIsrs, controllerContext.epoch)
successfulUpdates.foreach { case (partition, leaderAndIsr) =>
val replicas = controllerContext.partitionReplicaAssignment(partition)
@ -349,7 +350,7 @@ class PartitionStateMachine(config: KafkaConfig,
val liveInSyncReplicas = leaderIsrAndControllerEpoch.leaderAndIsr.isr.filter(replica => controllerContext.isReplicaOnline(replica, partition))
liveInSyncReplicas.isEmpty
}
val (logConfigs, failed) = zkUtils.getLogConfigs(partitionsWithNoLiveInSyncReplicas.map { case (partition, _) => partition.topic }, config.originals())
val (logConfigs, failed) = zkClient.getLogConfigs(partitionsWithNoLiveInSyncReplicas.map { case (partition, _) => partition.topic }, config.originals())
val partitionsWithUncleanLeaderElectionState = partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) =>
if (failed.contains(partition.topic)) {
logFailedStateChange(partition, partitionState(partition), OnlinePartition, failed(partition.topic))

View File

@ -19,9 +19,10 @@ package kafka.controller
import kafka.api.LeaderAndIsr
import kafka.common.{StateChangeFailedException, TopicAndPartition}
import kafka.controller.Callbacks.CallbackBuilder
import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult
import kafka.server.KafkaConfig
import kafka.utils.Logging
import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import org.apache.zookeeper.KeeperException.Code
import scala.collection.mutable
@ -48,7 +49,7 @@ class ReplicaStateMachine(config: KafkaConfig,
stateChangeLogger: StateChangeLogger,
controllerContext: ControllerContext,
topicDeletionManager: TopicDeletionManager,
zkUtils: KafkaControllerZkUtils,
zkClient: KafkaZkClient,
replicaState: mutable.Map[PartitionAndReplica, ReplicaState],
controllerBrokerRequestBatch: ControllerBrokerRequestBatch) extends Logging {
private val controllerId = config.brokerId
@ -292,7 +293,7 @@ class ReplicaStateMachine(config: KafkaConfig,
val adjustedIsr = if (leaderAndIsr.isr.size == 1) leaderAndIsr.isr else leaderAndIsr.isr.filter(_ != replicaId)
leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr)
}
val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkUtils.updateLeaderAndIsr(
val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr(
adjustedLeaderAndIsrs, controllerContext.epoch)
val exceptionsForPartitionsWithNoLeaderAndIsrInZk = partitionsWithNoLeaderAndIsrInZk.flatMap { partition =>
if (!topicDeletionManager.isPartitionToBeDeleted(partition)) {
@ -325,7 +326,7 @@ class ReplicaStateMachine(config: KafkaConfig,
val partitionsWithNoLeaderAndIsrInZk = mutable.Buffer.empty[TopicAndPartition]
val failed = mutable.Map.empty[TopicAndPartition, Exception]
val getDataResponses = try {
zkUtils.getTopicPartitionStatesRaw(partitions)
zkClient.getTopicPartitionStatesRaw(partitions)
} catch {
case e: Exception =>
partitions.foreach(partition => failed.put(partition, e))

View File

@ -19,6 +19,7 @@ package kafka.controller
import kafka.common.TopicAndPartition
import kafka.utils.Logging
import kafka.zk.KafkaZkClient
import scala.collection.{Set, mutable}
@ -57,7 +58,7 @@ import scala.collection.{Set, mutable}
*/
class TopicDeletionManager(controller: KafkaController,
eventManager: ControllerEventManager,
kafkaControllerZkUtils: KafkaControllerZkUtils) extends Logging {
zkClient: KafkaZkClient) extends Logging {
this.logIdent = "[Topic Deletion Manager " + controller.config.brokerId + "], "
val controllerContext = controller.controllerContext
val isDeleteTopicEnabled = controller.config.deleteTopicEnable
@ -73,7 +74,7 @@ class TopicDeletionManager(controller: KafkaController,
} else {
// if delete topic is disabled clean the topic entries under /admin/delete_topics
info("Removing " + initialTopicsToBeDeleted + " since delete topic is disabled")
kafkaControllerZkUtils.deleteTopicDeletions(initialTopicsToBeDeleted.toSeq)
zkClient.deleteTopicDeletions(initialTopicsToBeDeleted.toSeq)
}
}
@ -239,9 +240,9 @@ class TopicDeletionManager(controller: KafkaController,
controller.partitionStateMachine.handleStateChanges(partitionsForDeletedTopic.toSeq, NonExistentPartition)
topicsToBeDeleted -= topic
partitionsToBeDeleted.retain(_.topic != topic)
kafkaControllerZkUtils.deleteTopicZNode(topic)
kafkaControllerZkUtils.deleteTopicConfigs(Seq(topic))
kafkaControllerZkUtils.deleteTopicDeletions(Seq(topic))
zkClient.deleteTopicZNode(topic)
zkClient.deleteTopicConfigs(Seq(topic))
zkClient.deleteTopicDeletions(Seq(topic))
controllerContext.removeTopic(topic)
}

View File

@ -23,14 +23,14 @@ import java.util.concurrent._
import com.yammer.metrics.core.Gauge
import kafka.common.KafkaException
import kafka.controller.KafkaControllerZkUtils
import kafka.metrics.KafkaMetricsGroup
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.{BrokerState, RecoveringFromUncleanShutdown, _}
import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.errors.{LogDirNotFoundException, KafkaStorageException}
import org.apache.kafka.common.errors.{KafkaStorageException, LogDirNotFoundException}
import scala.collection.JavaConverters._
import scala.collection._
@ -102,7 +102,6 @@ class LogManager(logDirs: Seq[File],
loadLogs()
// public, so we can access this from kafka.admin.DeleteTopicTest
val cleaner: LogCleaner =
if(cleanerConfig.enableCleaner)
@ -888,7 +887,7 @@ object LogManager {
def apply(config: KafkaConfig,
initialOfflineDirs: Seq[String],
zkUtils: KafkaControllerZkUtils,
zkClient: KafkaZkClient,
brokerState: BrokerState,
kafkaScheduler: KafkaScheduler,
time: Time,
@ -897,7 +896,7 @@ object LogManager {
val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
val defaultLogConfig = LogConfig(defaultProps)
val (topicConfigs, failed) = zkUtils.getLogConfigs(zkUtils.getAllTopicsInCluster, defaultProps)
val (topicConfigs, failed) = zkClient.getLogConfigs(zkClient.getAllTopicsInCluster, defaultProps)
if (!failed.isEmpty) throw failed.head._2
// read the log configurations from zookeeper

View File

@ -27,7 +27,7 @@ import com.yammer.metrics.core.Gauge
import kafka.api.KAFKA_0_9_0
import kafka.cluster.Broker
import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException}
import kafka.controller.{KafkaController, KafkaControllerZkUtils, StateChangeHandler, ZookeeperClient}
import kafka.controller.KafkaController
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.{LogConfig, LogManager}
@ -36,6 +36,8 @@ import kafka.network.SocketServer
import kafka.security.CredentialProvider
import kafka.security.auth.Authorizer
import kafka.utils._
import kafka.zk.KafkaZkClient
import kafka.zookeeper.{StateChangeHandler, ZooKeeperClient}
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
import org.apache.kafka.common.internals.ClusterResourceListeners
import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
@ -135,7 +137,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
var quotaManagers: QuotaFactory.QuotaManagers = null
var zkUtils: ZkUtils = null
var kafkaControllerZkUtils: KafkaControllerZkUtils = null
private var zkClient: KafkaZkClient = null
val correlationId: AtomicInteger = new AtomicInteger(0)
val brokerMetaPropsFile = "meta.properties"
val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + brokerMetaPropsFile)))).toMap
@ -219,7 +221,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
val zookeeperClient = new ZookeeperClient(config.zkConnect, config.zkSessionTimeoutMs,
val zooKeeperClient = new ZooKeeperClient(config.zkConnect, config.zkSessionTimeoutMs,
config.zkConnectionTimeoutMs, new StateChangeHandler {
override def onReconnectionTimeout(): Unit = {
error("Reconnection timeout.")
@ -233,10 +235,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
override def beforeInitializingSession(): Unit = kafkaController.expire()
})
kafkaControllerZkUtils = new KafkaControllerZkUtils(zookeeperClient, zkUtils.isSecure)
zkClient = new KafkaZkClient(zooKeeperClient, zkUtils.isSecure)
/* start log manager */
logManager = LogManager(config, initialOfflineDirs, kafkaControllerZkUtils, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
logManager.startup()
metadataCache = new MetadataCache(config.brokerId)
@ -250,7 +252,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
replicaManager.startup()
/* start kafka controller */
kafkaController = new KafkaController(config, kafkaControllerZkUtils, time, metrics, threadNamePrefix)
kafkaController = new KafkaController(config, zkClient, time, metrics, threadNamePrefix)
kafkaController.startup()
adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)
@ -561,8 +563,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
CoreUtils.swallow(kafkaController.shutdown())
if (zkUtils != null)
CoreUtils.swallow(zkUtils.close())
if (kafkaControllerZkUtils != null)
CoreUtils.swallow(kafkaControllerZkUtils.close())
if (zkClient != null)
CoreUtils.swallow(zkClient.close())
if (metrics != null)
CoreUtils.swallow(metrics.close())

View File

@ -16,9 +16,12 @@
*/
package kafka.utils
import java.nio.charset.StandardCharsets
import com.fasterxml.jackson.core.JsonProcessingException
import com.fasterxml.jackson.databind.ObjectMapper
import kafka.utils.json.JsonValue
import scala.collection._
/**
@ -35,6 +38,13 @@ object Json {
try Option(mapper.readTree(input)).map(JsonValue(_))
catch { case _: JsonProcessingException => None }
/**
* Parse a JSON byte array into a JsonValue if possible. `None` is returned if `input` is not valid JSON.
*/
def parseBytes(input: Array[Byte]): Option[JsonValue] =
try Option(mapper.readTree(input)).map(JsonValue(_))
catch { case _: JsonProcessingException => None }
/**
* Encode an object into a JSON string. This method accepts any type T where
* T => null | Boolean | String | Number | Map[String, T] | Array[T] | Iterable[T]
@ -59,4 +69,13 @@ object Json {
}
}
/**
* Encode an object into a JSON value in bytes. This method accepts any type T where
* T => null | Boolean | String | Number | Map[String, T] | Array[T] | Iterable[T]
* Any other type will result in an exception.
*
* This method does not properly handle non-ascii characters.
*/
def encodeAsBytes(obj: Any): Array[Byte] = encode(obj).getBytes(StandardCharsets.UTF_8)
}

View File

@ -252,6 +252,9 @@ class ZooKeeperClientMetrics(zkClient: ZkClient, val time: Time)
}
}
/**
* Legacy class for interacting with ZooKeeper. Whenever possible, ``KafkaZkClient`` should be used instead.
*/
class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
val zkConnection: ZkConnection,
val isSecure: Boolean) extends Logging {

View File

@ -14,16 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.controller
package kafka.zk
import java.util.Properties
import kafka.api.LeaderAndIsr
import kafka.cluster.Broker
import kafka.common.TopicAndPartition
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.log.LogConfig
import kafka.server.ConfigType
import kafka.utils.{Logging, ZkUtils}
import kafka.utils._
import kafka.zookeeper._
import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.data.Stat
import org.apache.zookeeper.{CreateMode, KeeperException}
@ -31,8 +33,18 @@ import org.apache.zookeeper.{CreateMode, KeeperException}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
class KafkaControllerZkUtils(zookeeperClient: ZookeeperClient, isSecure: Boolean) extends Logging {
import KafkaControllerZkUtils._
/**
* Provides higher level Kafka-specific operations on top of the pipelined [[kafka.zookeeper.ZooKeeperClient]].
*
* This performs better than [[kafka.utils.ZkUtils]] and should replace it completely, eventually.
*
* Implementation note: this class includes methods for various components (Controller, Configs, Old Consumer, etc.)
* and returns instances of classes from the calling packages in some cases. This is not ideal, but it makes it
* easier to quickly migrate away from `ZkUtils`. We should revisit this once the migration is completed and tests are
* in place. We should also consider whether a monolithic [[kafka.zk.ZkData]] is the way to go.
*/
class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends Logging {
import KafkaZkClient._
/**
* Gets topic partition states for the given partitions.
@ -533,7 +545,7 @@ class KafkaControllerZkUtils(zookeeperClient: ZookeeperClient, isSecure: Boolean
* @param zNodeChangeHandler
*/
def registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler: ZNodeChangeHandler): Unit = {
zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
zooKeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
val existsResponse = retryRequestUntilConnected(ExistsRequest(zNodeChangeHandler.path))
if (existsResponse.resultCode != Code.OK && existsResponse.resultCode != Code.NONODE) {
throw existsResponse.resultException.get
@ -541,42 +553,42 @@ class KafkaControllerZkUtils(zookeeperClient: ZookeeperClient, isSecure: Boolean
}
/**
* See ZookeeperClient.registerZNodeChangeHandler
* See ZooKeeperClient.registerZNodeChangeHandler
* @param zNodeChangeHandler
*/
def registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): Unit = {
zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
zooKeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
}
/**
* See ZookeeperClient.unregisterZNodeChangeHandler
* See ZooKeeperClient.unregisterZNodeChangeHandler
* @param path
*/
def unregisterZNodeChangeHandler(path: String): Unit = {
zookeeperClient.unregisterZNodeChangeHandler(path)
zooKeeperClient.unregisterZNodeChangeHandler(path)
}
/**
* See ZookeeperClient.registerZNodeChildChangeHandler
* See ZooKeeperClient.registerZNodeChildChangeHandler
* @param zNodeChildChangeHandler
*/
def registerZNodeChildChangeHandler(zNodeChildChangeHandler: ZNodeChildChangeHandler): Unit = {
zookeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler)
zooKeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler)
}
/**
* See ZookeeperClient.unregisterZNodeChildChangeHandler
* See ZooKeeperClient.unregisterZNodeChildChangeHandler
* @param path
*/
def unregisterZNodeChildChangeHandler(path: String): Unit = {
zookeeperClient.unregisterZNodeChildChangeHandler(path)
zooKeeperClient.unregisterZNodeChildChangeHandler(path)
}
/**
* Close the underlying ZookeeperClient.
* Close the underlying ZooKeeperClient.
*/
def close(): Unit = {
zookeeperClient.close()
zooKeeperClient.close()
}
private def deleteRecursive(path: String): Unit = {
@ -594,8 +606,7 @@ class KafkaControllerZkUtils(zookeeperClient: ZookeeperClient, isSecure: Boolean
private def createTopicPartition(partitions: Seq[TopicAndPartition]) = {
val createRequests = partitions.map { partition =>
val path = TopicPartitionZNode.path(partition)
val data = TopicPartitionZNode.encode
CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, Some(partition))
CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(partition))
}
retryRequestsUntilConnected(createRequests)
}
@ -603,8 +614,7 @@ class KafkaControllerZkUtils(zookeeperClient: ZookeeperClient, isSecure: Boolean
private def createTopicPartitions(topics: Seq[String]) = {
val createRequests = topics.map { topic =>
val path = TopicPartitionsZNode.path(topic)
val data = TopicPartitionsZNode.encode
CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, Some(topic))
CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(topic))
}
retryRequestsUntilConnected(createRequests)
}
@ -629,7 +639,7 @@ class KafkaControllerZkUtils(zookeeperClient: ZookeeperClient, isSecure: Boolean
val remainingRequests = ArrayBuffer(requests: _*)
val responses = new ArrayBuffer[Req#Response]
while (remainingRequests.nonEmpty) {
val batchResponses = zookeeperClient.handleRequests(remainingRequests)
val batchResponses = zooKeeperClient.handleRequests(remainingRequests)
// Only execute slow path if we find a response with CONNECTIONLOSS
if (batchResponses.exists(_.resultCode == Code.CONNECTIONLOSS)) {
@ -644,7 +654,7 @@ class KafkaControllerZkUtils(zookeeperClient: ZookeeperClient, isSecure: Boolean
}
if (remainingRequests.nonEmpty)
zookeeperClient.waitUntilConnected()
zooKeeperClient.waitUntilConnected()
} else {
remainingRequests.clear()
responses ++= batchResponses
@ -684,7 +694,7 @@ class KafkaControllerZkUtils(zookeeperClient: ZookeeperClient, isSecure: Boolean
val getDataResponse = retryRequestUntilConnected(getDataRequest)
val code = getDataResponse.resultCode
if (code == Code.OK) {
if (getDataResponse.stat.getEphemeralOwner != zookeeperClient.sessionId) {
if (getDataResponse.stat.getEphemeralOwner != zooKeeperClient.sessionId) {
error(s"Error while creating ephemeral at $path with return code: $code")
Code.NODEEXISTS
} else {
@ -701,7 +711,7 @@ class KafkaControllerZkUtils(zookeeperClient: ZookeeperClient, isSecure: Boolean
}
}
object KafkaControllerZkUtils {
object KafkaZkClient {
/**
* @param successfulPartitions The successfully updated partition states with adjusted znode versions.

View File

@ -14,41 +14,43 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.controller
package kafka.zk
import java.nio.charset.StandardCharsets.UTF_8
import java.util.Properties
import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
import kafka.cluster.{Broker, EndPoint}
import kafka.common.TopicAndPartition
import kafka.controller.{IsrChangeNotificationListener, LeaderIsrAndControllerEpoch}
import kafka.utils.Json
import org.apache.zookeeper.data.Stat
import scala.collection.Seq
// This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes).
object ControllerZNode {
def path = "/controller"
def encode(brokerId: Int, timestamp: Long): Array[Byte] =
Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString)).getBytes("UTF-8")
def decode(bytes: Array[Byte]): Option[Int] = Json.parseFull(new String(bytes, "UTF-8")).map { js =>
Json.encodeAsBytes(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString))
def decode(bytes: Array[Byte]): Option[Int] = Json.parseBytes(bytes).map { js =>
js.asJsonObject("brokerid").to[Int]
}
}
object ControllerEpochZNode {
def path = "/controller_epoch"
def encode(epoch: Int): Array[Byte] = epoch.toString.getBytes("UTF-8")
def decode(bytes: Array[Byte]) : Int = new String(bytes, "UTF-8").toInt
def encode(epoch: Int): Array[Byte] = epoch.toString.getBytes(UTF_8)
def decode(bytes: Array[Byte]): Int = new String(bytes, UTF_8).toInt
}
object ConfigZNode {
def path = "/config"
def encode: Array[Byte] = null
}
object BrokersZNode {
def path = "/brokers"
def encode: Array[Byte] = null
}
object BrokerIdsZNode {
@ -66,27 +68,26 @@ object BrokerIdZNode {
rack: Option[String],
apiVersion: ApiVersion): Array[Byte] = {
val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2
Broker.toJson(version, id, host, port, advertisedEndpoints, jmxPort, rack).getBytes("UTF-8")
Broker.toJson(version, id, host, port, advertisedEndpoints, jmxPort, rack).getBytes(UTF_8)
}
def decode(id: Int, bytes: Array[Byte]): Broker = {
Broker.createBroker(id, new String(bytes, "UTF-8"))
Broker.createBroker(id, new String(bytes, UTF_8))
}
}
object TopicsZNode {
def path = s"${BrokersZNode.path}/topics"
def encode: Array[Byte] = null
}
object TopicZNode {
def path(topic: String) = s"${TopicsZNode.path}/$topic"
def encode(assignment: Map[TopicAndPartition, Seq[Int]]): Array[Byte] = {
val assignmentJson = assignment.map { case (partition, replicas) => partition.partition.toString -> replicas }
Json.encode(Map("version" -> 1, "partitions" -> assignmentJson)).getBytes("UTF-8")
Json.encodeAsBytes(Map("version" -> 1, "partitions" -> assignmentJson))
}
def decode(topic: String, bytes: Array[Byte]): Map[TopicAndPartition, Seq[Int]] = {
Json.parseFull(new String(bytes, "UTF-8")).flatMap { js =>
Json.parseBytes(bytes).flatMap { js =>
val assignmentJson = js.asJsonObject
val partitionsJsonOpt = assignmentJson.get("partitions").map(_.asJsonObject)
partitionsJsonOpt.map { partitionsJson =>
@ -100,12 +101,10 @@ object TopicZNode {
object TopicPartitionsZNode {
def path(topic: String) = s"${TopicZNode.path(topic)}/partitions"
def encode: Array[Byte] = null
}
object TopicPartitionZNode {
def path(partition: TopicAndPartition) = s"${TopicPartitionsZNode.path(partition.topic)}/${partition.partition}"
def encode: Array[Byte] = null
}
object TopicPartitionStateZNode {
@ -113,11 +112,11 @@ object TopicPartitionStateZNode {
def encode(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Array[Byte] = {
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
Json.encode(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch,
"controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr)).getBytes("UTF-8")
Json.encodeAsBytes(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch,
"controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr))
}
def decode(bytes: Array[Byte], stat: Stat): Option[LeaderIsrAndControllerEpoch] = {
Json.parseFull(new String(bytes, "UTF-8")).map { js =>
Json.parseBytes(bytes).map { js =>
val leaderIsrAndEpochInfo = js.asJsonObject
val leader = leaderIsrAndEpochInfo("leader").to[Int]
val epoch = leaderIsrAndEpochInfo("leader_epoch").to[Int]
@ -131,17 +130,16 @@ object TopicPartitionStateZNode {
object ConfigEntityTypeZNode {
def path(entityType: String) = s"${ConfigZNode.path}/$entityType"
def encode: Array[Byte] = null
}
object ConfigEntityZNode {
def path(entityType: String, entityName: String) = s"${ConfigEntityTypeZNode.path(entityType)}/$entityName"
def encode(config: Properties): Array[Byte] = {
import scala.collection.JavaConverters._
Json.encode(Map("version" -> 1, "config" -> config.asScala)).getBytes("UTF-8")
Json.encodeAsBytes(Map("version" -> 1, "config" -> config.asScala))
}
def decode(bytes: Array[Byte]): Option[Properties] = {
Json.parseFull(new String(bytes, "UTF-8")).map { js =>
Json.parseBytes(bytes).map { js =>
val configOpt = js.asJsonObjectOption.flatMap(_.get("config").flatMap(_.asJsonObjectOption))
val props = new Properties()
configOpt.foreach(config => config.iterator.foreach { case (k, v) => props.setProperty(k, v.to[String]) })
@ -152,7 +150,6 @@ object ConfigEntityZNode {
object IsrChangeNotificationZNode {
def path = "/isr_change_notification"
def encode: Array[Byte] = null
}
object IsrChangeNotificationSequenceZNode {
@ -160,11 +157,11 @@ object IsrChangeNotificationSequenceZNode {
def path(sequenceNumber: String) = s"${IsrChangeNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber"
def encode(partitions: Set[TopicAndPartition]): Array[Byte] = {
val partitionsJson = partitions.map(partition => Map("topic" -> partition.topic, "partition" -> partition.partition))
Json.encode(Map("version" -> IsrChangeNotificationListener.version, "partitions" -> partitionsJson)).getBytes("UTF-8")
Json.encodeAsBytes(Map("version" -> IsrChangeNotificationListener.version, "partitions" -> partitionsJson))
}
def decode(bytes: Array[Byte]): Set[TopicAndPartition] = {
Json.parseFull(new String(bytes, "UTF-8")).map { js =>
Json.parseBytes(bytes).map { js =>
val partitionsJson = js.asJsonObject("partitions").asJsonArray
partitionsJson.iterator.map { partitionsJson =>
val partitionJson = partitionsJson.asJsonObject
@ -179,7 +176,6 @@ object IsrChangeNotificationSequenceZNode {
object LogDirEventNotificationZNode {
def path = "/log_dir_event_notification"
def encode: Array[Byte] = null
}
object LogDirEventNotificationSequenceZNode {
@ -187,8 +183,8 @@ object LogDirEventNotificationSequenceZNode {
val LogDirFailureEvent = 1
def path(sequenceNumber: String) = s"${LogDirEventNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber"
def encode(brokerId: Int) =
Json.encode(Map("version" -> 1, "broker" -> brokerId, "event" -> LogDirFailureEvent)).getBytes("UTF-8")
def decode(bytes: Array[Byte]): Option[Int] = Json.parseFull(new String(bytes, "UTF-8")).map { js =>
Json.encodeAsBytes(Map("version" -> 1, "broker" -> brokerId, "event" -> LogDirFailureEvent))
def decode(bytes: Array[Byte]): Option[Int] = Json.parseBytes(bytes).map { js =>
js.asJsonObject("broker").to[Int]
}
def sequenceNumber(path: String) = path.substring(path.lastIndexOf(SequenceNumberPrefix) + SequenceNumberPrefix.length)
@ -196,17 +192,14 @@ object LogDirEventNotificationSequenceZNode {
object AdminZNode {
def path = "/admin"
def encode: Array[Byte] = null
}
object DeleteTopicsZNode {
def path = s"${AdminZNode.path}/delete_topics"
def encode: Array[Byte] = null
}
object DeleteTopicsTopicZNode {
def path(topic: String) = s"${DeleteTopicsZNode.path}/$topic"
def encode: Array[Byte] = null
}
object ReassignPartitionsZNode {
@ -215,9 +208,9 @@ object ReassignPartitionsZNode {
val reassignmentJson = reassignment.map { case (TopicAndPartition(topic, partition), replicas) =>
Map("topic" -> topic, "partition" -> partition, "replicas" -> replicas)
}
Json.encode(Map("version" -> 1, "partitions" -> reassignmentJson)).getBytes("UTF-8")
Json.encodeAsBytes(Map("version" -> 1, "partitions" -> reassignmentJson))
}
def decode(bytes: Array[Byte]): Map[TopicAndPartition, Seq[Int]] = Json.parseFull(new String(bytes, "UTF-8")).flatMap { js =>
def decode(bytes: Array[Byte]): Map[TopicAndPartition, Seq[Int]] = Json.parseBytes(bytes).flatMap { js =>
val reassignmentJson = js.asJsonObject
val partitionsJsonOpt = reassignmentJson.get("partitions")
partitionsJsonOpt.map { partitionsJson =>
@ -234,9 +227,12 @@ object ReassignPartitionsZNode {
object PreferredReplicaElectionZNode {
def path = s"${AdminZNode.path}/preferred_replica_election"
def encode(partitions: Set[TopicAndPartition]): Array[Byte] =
Json.encode(Map("version" -> 1, "partitions" -> partitions.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition)))).getBytes("UTF-8")
def decode(bytes: Array[Byte]): Set[TopicAndPartition] = Json.parseFull(new String(bytes, "UTF-8")).map { js =>
def encode(partitions: Set[TopicAndPartition]): Array[Byte] = {
val jsonMap = Map("version" -> 1,
"partitions" -> partitions.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition)))
Json.encodeAsBytes(jsonMap)
}
def decode(bytes: Array[Byte]): Set[TopicAndPartition] = Json.parseBytes(bytes).map { js =>
val partitionsJson = js.asJsonObject("partitions").asJsonArray
partitionsJson.iterator.map { partitionsJson =>
val partitionJson = partitionsJson.asJsonObject
@ -245,4 +241,4 @@ object PreferredReplicaElectionZNode {
TopicAndPartition(topic, partition)
}
}.map(_.toSet).getOrElse(Set.empty)
}
}

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package kafka.controller
package kafka.zookeeper
import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLatch, TimeUnit}
@ -32,16 +32,16 @@ import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent, Watcher,
import scala.collection.JavaConverters._
/**
* ZookeeperClient is a zookeeper client that encourages pipelined requests to zookeeper.
* A ZooKeeper client that encourages pipelined requests.
*
* @param connectString comma separated host:port pairs, each corresponding to a zk server
* @param sessionTimeoutMs session timeout in milliseconds
* @param connectionTimeoutMs connection timeout in milliseconds
* @param stateChangeHandler state change handler callbacks called by the underlying zookeeper client's EventThread.
*/
class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTimeoutMs: Int,
class ZooKeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTimeoutMs: Int,
stateChangeHandler: StateChangeHandler) extends Logging {
this.logIdent = "[ZookeeperClient] "
this.logIdent = "[ZooKeeperClient] "
private val initializationLock = new ReentrantReadWriteLock()
private val isConnectedOrExpiredLock = new ReentrantLock()
private val isConnectedOrExpiredCondition = isConnectedOrExpiredLock.newCondition()
@ -49,7 +49,7 @@ class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi
private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala
info(s"Initializing a new session to $connectString.")
@volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZookeeperClientWatcher)
@volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher)
waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
/**
@ -143,8 +143,8 @@ class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi
/**
* Wait indefinitely until the underlying zookeeper client to reaches the CONNECTED state.
* @throws ZookeeperClientAuthFailedException if the authentication failed either before or while waiting for connection.
* @throws ZookeeperClientExpiredException if the session expired either before or while waiting for connection.
* @throws ZooKeeperClientAuthFailedException if the authentication failed either before or while waiting for connection.
* @throws ZooKeeperClientExpiredException if the session expired either before or while waiting for connection.
*/
def waitUntilConnected(): Unit = inLock(isConnectedOrExpiredLock) {
waitUntilConnected(Long.MaxValue, TimeUnit.MILLISECONDS)
@ -157,15 +157,15 @@ class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi
var state = zooKeeper.getState
while (!state.isConnected && state.isAlive) {
if (nanos <= 0) {
throw new ZookeeperClientTimeoutException(s"Timed out waiting for connection while in state: $state")
throw new ZooKeeperClientTimeoutException(s"Timed out waiting for connection while in state: $state")
}
nanos = isConnectedOrExpiredCondition.awaitNanos(nanos)
state = zooKeeper.getState
}
if (state == States.AUTH_FAILED) {
throw new ZookeeperClientAuthFailedException("Auth failed either before or while waiting for connection")
throw new ZooKeeperClientAuthFailedException("Auth failed either before or while waiting for connection")
} else if (state == States.CLOSED) {
throw new ZookeeperClientExpiredException("Session expired either before or while waiting for connection")
throw new ZooKeeperClientExpiredException("Session expired either before or while waiting for connection")
}
}
info("Connected.")
@ -180,7 +180,7 @@ class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi
}
/**
* Register the handler to ZookeeperClient. This is just a local operation. This does not actually register a watcher.
* Register the handler to ZooKeeperClient. This is just a local operation. This does not actually register a watcher.
*
* The watcher is only registered once the user calls handle(AsyncRequest) or handle(Seq[AsyncRequest])
* with either a GetDataRequest or ExistsRequest.
@ -194,7 +194,7 @@ class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi
}
/**
* Unregister the handler from ZookeeperClient. This is just a local operation.
* Unregister the handler from ZooKeeperClient. This is just a local operation.
* @param path the path of the handler to unregister
*/
def unregisterZNodeChangeHandler(path: String): Unit = {
@ -202,7 +202,7 @@ class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi
}
/**
* Register the handler to ZookeeperClient. This is just a local operation. This does not actually register a watcher.
* Register the handler to ZooKeeperClient. This is just a local operation. This does not actually register a watcher.
*
* The watcher is only registered once the user calls handle(AsyncRequest) or handle(Seq[AsyncRequest]) with a GetChildrenRequest.
*
@ -213,7 +213,7 @@ class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi
}
/**
* Unregister the handler from ZookeeperClient. This is just a local operation.
* Unregister the handler from ZooKeeperClient. This is just a local operation.
* @param path the path of the handler to unregister
*/
def unregisterZNodeChildChangeHandler(path: String): Unit = {
@ -240,7 +240,7 @@ class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi
while (now < threshold) {
try {
zooKeeper.close()
zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZookeeperClientWatcher)
zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher)
waitUntilConnected(threshold - now, TimeUnit.MILLISECONDS)
return
} catch {
@ -257,7 +257,7 @@ class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi
}
}
private object ZookeeperClientWatcher extends Watcher {
private object ZooKeeperClientWatcher extends Watcher {
override def process(event: WatchedEvent): Unit = {
debug("Received event: " + event)
Option(event.getPath) match {
@ -310,7 +310,7 @@ trait ZNodeChildChangeHandler {
sealed trait AsyncRequest {
/**
* This type member allows us to define methods that take requests and return responses with the correct types.
* See ``ZookeeperClient.handleRequests`` for example.
* See ``ZooKeeperClient.handleRequests`` for example.
*/
type Response <: AsyncResponse
def path: String
@ -368,7 +368,7 @@ case class GetAclResponse(resultCode: Code, path: String, ctx: Option[Any], acl:
case class SetAclResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends AsyncResponse
case class GetChildrenResponse(resultCode: Code, path: String, ctx: Option[Any], children: Seq[String], stat: Stat) extends AsyncResponse
class ZookeeperClientException(message: String) extends RuntimeException(message)
class ZookeeperClientExpiredException(message: String) extends ZookeeperClientException(message)
class ZookeeperClientAuthFailedException(message: String) extends ZookeeperClientException(message)
class ZookeeperClientTimeoutException(message: String) extends ZookeeperClientException(message)
class ZooKeeperClientException(message: String) extends RuntimeException(message)
class ZooKeeperClientExpiredException(message: String) extends ZooKeeperClientException(message)
class ZooKeeperClientAuthFailedException(message: String) extends ZooKeeperClientException(message)
class ZooKeeperClientTimeoutException(message: String) extends ZooKeeperClientException(message)

View File

@ -47,7 +47,7 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
/**
* Checks that everyone can access ZkUtils.SecureZkRootPaths and ZkUtils.SensitiveZkRootPaths
* when zookeeper.set.acl=false, even if Zookeeper is SASL-enabled.
* when zookeeper.set.acl=false, even if ZooKeeper is SASL-enabled.
*/
@Test
def testZkAclsDisabled() {

View File

@ -221,7 +221,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
} catch {
case _: UnknownTopicOrPartitionException => // expected exception
}
// verify delete topic path for test2 is removed from zookeeper
// verify delete topic path for test2 is removed from ZooKeeper
TestUtils.verifyTopicDeletion(zkUtils, "test2", 1, servers)
// verify that topic test is untouched
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),

View File

@ -53,7 +53,7 @@ class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness {
/*
* There is no easy way to test purging. Even if we mock kafka time with MockTime, the purging compares kafka time
* with the time stored in zookeeper stat and the embedded zookeeper server does not provide a way to mock time.
* with the time stored in ZooKeeper stat and the embedded ZooKeeper server does not provide a way to mock time.
* So to test purging we would have to use Time.SYSTEM.sleep(changeExpirationMs + 1) issue a write and check
* Assert.assertEquals(1, ZkUtils.getChildren(zkClient, seqNodeRoot).size). However even that the assertion
* can fail as the second node can be deleted depending on how threads get scheduled.

View File

@ -68,7 +68,7 @@ class TopicFilterTest extends JUnitSuite {
topicCount.getTopicCountMap.head._1
}
//lets make sure that the JSON strings are escaping as we expect
//if they are not then when they get saved to zookeeper and read back out they will be broken on parse
//if they are not then when they get saved to ZooKeeper and read back out they will be broken on parse
assertEquals("-\\\"-", getTopicCountMapKey("-\"-"))
assertEquals("-\\\\-", getTopicCountMapKey("-\\-"))
assertEquals("-\\/-", getTopicCountMapKey("-/-"))

View File

@ -18,10 +18,12 @@ package kafka.controller
import kafka.api.LeaderAndIsr
import kafka.common.TopicAndPartition
import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult
import kafka.log.LogConfig
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import kafka.zookeeper.{CreateResponse, GetDataResponse, ZooKeeperClientException}
import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.data.Stat
import org.easymock.EasyMock
@ -33,7 +35,7 @@ import scala.collection.mutable
class PartitionStateMachineTest extends JUnitSuite {
private var controllerContext: ControllerContext = null
private var mockZkUtils: KafkaControllerZkUtils = null
private var mockZkClient: KafkaZkClient = null
private var mockControllerBrokerRequestBatch: ControllerBrokerRequestBatch = null
private var mockTopicDeletionManager: TopicDeletionManager = null
private var partitionState: mutable.Map[TopicAndPartition, PartitionState] = null
@ -49,12 +51,12 @@ class PartitionStateMachineTest extends JUnitSuite {
def setUp(): Unit = {
controllerContext = new ControllerContext
controllerContext.epoch = controllerEpoch
mockZkUtils = EasyMock.createMock(classOf[KafkaControllerZkUtils])
mockZkClient = EasyMock.createMock(classOf[KafkaZkClient])
mockControllerBrokerRequestBatch = EasyMock.createMock(classOf[ControllerBrokerRequestBatch])
mockTopicDeletionManager = EasyMock.createMock(classOf[TopicDeletionManager])
partitionState = mutable.Map.empty[TopicAndPartition, PartitionState]
partitionStateMachine = new PartitionStateMachine(config, new StateChangeLogger(brokerId, true, None), controllerContext, mockTopicDeletionManager,
mockZkUtils, partitionState, mockControllerBrokerRequestBatch)
mockZkClient, partitionState, mockControllerBrokerRequestBatch)
}
@Test
@ -82,14 +84,14 @@ class PartitionStateMachineTest extends JUnitSuite {
partitionState.put(partition, NewPartition)
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
EasyMock.expect(mockZkUtils.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
.andReturn(Seq(CreateResponse(Code.OK, null, Some(partition), null)))
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = true))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
assertEquals(OnlinePartition, partitionState(partition))
}
@ -100,12 +102,12 @@ class PartitionStateMachineTest extends JUnitSuite {
partitionState.put(partition, NewPartition)
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
EasyMock.expect(mockZkUtils.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
.andThrow(new ZookeeperClientException("test"))
EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
.andThrow(new ZooKeeperClientException("test"))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
assertEquals(NewPartition, partitionState(partition))
}
@ -116,12 +118,12 @@ class PartitionStateMachineTest extends JUnitSuite {
partitionState.put(partition, NewPartition)
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
EasyMock.expect(mockZkUtils.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
.andReturn(Seq(CreateResponse(Code.NODEEXISTS, null, Some(partition), null)))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
assertEquals(NewPartition, partitionState(partition))
}
@ -150,22 +152,22 @@ class PartitionStateMachineTest extends JUnitSuite {
val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
.andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch))
EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch))
.andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
partition.topic, partition.partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch),
Seq(brokerId), isNew = false))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(PreferredReplicaPartitionLeaderElectionStrategy))
EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
assertEquals(OnlinePartition, partitionState(partition))
}
@ -182,22 +184,22 @@ class PartitionStateMachineTest extends JUnitSuite {
val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
.andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
val leaderAndIsrAfterElection = leaderAndIsr.newLeaderAndIsr(otherBrokerId, List(otherBrokerId))
val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch))
EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch))
.andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
partition.topic, partition.partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch),
Seq(brokerId, otherBrokerId), isNew = false))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(ControlledShutdownPartitionLeaderElectionStrategy))
EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
assertEquals(OnlinePartition, partitionState(partition))
}
@ -233,23 +235,23 @@ class PartitionStateMachineTest extends JUnitSuite {
val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
.andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
EasyMock.expect(mockZkUtils.getLogConfigs(Seq.empty, config.originals()))
EasyMock.expect(mockZkClient.getLogConfigs(Seq.empty, config.originals()))
.andReturn((Map(partition.topic -> LogConfig()), Map.empty))
val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch))
EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch))
.andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
partition.topic, partition.partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId), isNew = false))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
assertEquals(OnlinePartition, partitionState(partition))
}
@ -263,14 +265,14 @@ class PartitionStateMachineTest extends JUnitSuite {
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
.andThrow(new ZookeeperClientException(""))
EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
.andThrow(new ZooKeeperClientException(""))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
assertEquals(OfflinePartition, partitionState(partition))
}
@ -285,15 +287,15 @@ class PartitionStateMachineTest extends JUnitSuite {
val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
.andReturn(Seq(GetDataResponse(Code.NONODE, null, Some(partition),
TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
assertEquals(OfflinePartition, partitionState(partition))
}

View File

@ -18,9 +18,11 @@ package kafka.controller
import kafka.api.LeaderAndIsr
import kafka.common.TopicAndPartition
import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import kafka.zookeeper.GetDataResponse
import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.data.Stat
import org.easymock.EasyMock
@ -32,7 +34,7 @@ import scala.collection.mutable
class ReplicaStateMachineTest extends JUnitSuite {
private var controllerContext: ControllerContext = null
private var mockZkUtils: KafkaControllerZkUtils = null
private var mockZkClient: KafkaZkClient = null
private var mockControllerBrokerRequestBatch: ControllerBrokerRequestBatch = null
private var mockTopicDeletionManager: TopicDeletionManager = null
private var replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = null
@ -50,11 +52,11 @@ class ReplicaStateMachineTest extends JUnitSuite {
def setUp(): Unit = {
controllerContext = new ControllerContext
controllerContext.epoch = controllerEpoch
mockZkUtils = EasyMock.createMock(classOf[KafkaControllerZkUtils])
mockZkClient = EasyMock.createMock(classOf[KafkaZkClient])
mockControllerBrokerRequestBatch = EasyMock.createMock(classOf[ControllerBrokerRequestBatch])
mockTopicDeletionManager = EasyMock.createMock(classOf[TopicDeletionManager])
replicaState = mutable.Map.empty[PartitionAndReplica, ReplicaState]
replicaStateMachine = new ReplicaStateMachine(config, new StateChangeLogger(brokerId, true, None), controllerContext, mockTopicDeletionManager, mockZkUtils,
replicaStateMachine = new ReplicaStateMachine(config, new StateChangeLogger(brokerId, true, None), controllerContext, mockTopicDeletionManager, mockZkClient,
replicaState, mockControllerBrokerRequestBatch)
}
@ -155,9 +157,9 @@ class ReplicaStateMachineTest extends JUnitSuite {
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = false))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
assertEquals(OnlineReplica, replicaState(replica))
}
@ -178,19 +180,19 @@ class ReplicaStateMachineTest extends JUnitSuite {
val adjustedLeaderAndIsr = leaderAndIsr.newLeaderAndIsr(LeaderAndIsr.NoLeader, List(otherBrokerId))
val updatedLeaderAndIsr = adjustedLeaderAndIsr.withZkVersion(adjustedLeaderAndIsr .zkVersion + 1)
val updatedLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch)
EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
.andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> adjustedLeaderAndIsr), controllerEpoch))
EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> adjustedLeaderAndIsr), controllerEpoch))
.andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
EasyMock.expect(mockTopicDeletionManager.isPartitionToBeDeleted(partition)).andReturn(false)
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
partition.topic, partition.partition, updatedLeaderIsrAndControllerEpoch, replicaIds, isNew = false))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch, mockTopicDeletionManager)
EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch, mockTopicDeletionManager)
replicaStateMachine.handleStateChanges(replicas, OfflineReplica)
EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch, mockTopicDeletionManager)
EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch, mockTopicDeletionManager)
assertEquals(updatedLeaderIsrAndControllerEpoch, controllerContext.partitionLeadershipInfo(partition))
assertEquals(OfflineReplica, replicaState(replica))
}
@ -230,9 +232,9 @@ class ReplicaStateMachineTest extends JUnitSuite {
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = false))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
assertEquals(OnlineReplica, replicaState(replica))
}
@ -244,9 +246,9 @@ class ReplicaStateMachineTest extends JUnitSuite {
EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(brokerId),
partition.topic, partition.partition, true, callbacks.stopReplicaResponseCallback))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
replicaStateMachine.handleStateChanges(replicas, ReplicaDeletionStarted, callbacks)
EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
assertEquals(ReplicaDeletionStarted, replicaState(replica))
}
@ -348,9 +350,9 @@ class ReplicaStateMachineTest extends JUnitSuite {
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = false))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
assertEquals(OnlineReplica, replicaState(replica))
}

View File

@ -86,7 +86,7 @@ class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
for(_ <- 0 until numMessages)
producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, "test".getBytes))
// update offset in zookeeper for consumer to jump "forward" in time
// update offset in ZooKeeper for consumer to jump "forward" in time
val dirs = new ZKGroupTopicDirs(group, topic)
val consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer)
consumerProps.put("auto.offset.reset", resetTo)

View File

@ -239,7 +239,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer, resource)
assertTrue(!zkUtils.pathExists(simpleAclAuthorizer.toResourcePath(resource)))
//test removing last acl also deletes zookeeper path
//test removing last acl also deletes ZooKeeper path
acls = changeAclAndVerify(Set.empty[Acl], Set(acl1), Set.empty[Acl])
changeAclAndVerify(acls, Set.empty[Acl], acls)
assertTrue(!zkUtils.pathExists(simpleAclAuthorizer.toResourcePath(resource)))
@ -405,7 +405,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
def testHighConcurrencyDeletionOfResourceAcls() {
val acl = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username), Allow, WildCardHost, All)
// Alternate authorizer to keep adding and removing zookeeper path
// Alternate authorizer to keep adding and removing ZooKeeper path
val concurrentFuctions = (0 to 50).map { _ =>
() => {
simpleAclAuthorizer.addAcls(Set(acl), resource)

View File

@ -84,7 +84,7 @@ class ClientQuotaManagerTest {
/**
* Tests parsing for <client-id> quotas.
* Quota overrides persisted in Zookeeper in /config/clients/<client-id>, default persisted in /config/clients/<default>
* Quota overrides persisted in ZooKeeper in /config/clients/<client-id>, default persisted in /config/clients/<default>
*/
@Test
def testClientIdQuotaParsing() {
@ -97,7 +97,7 @@ class ClientQuotaManagerTest {
/**
* Tests parsing for <user> quotas.
* Quota overrides persisted in Zookeeper in /config/users/<user>, default persisted in /config/users/<default>
* Quota overrides persisted in ZooKeeper in /config/users/<user>, default persisted in /config/users/<default>
*/
@Test
def testUserQuotaParsing() {
@ -111,7 +111,7 @@ class ClientQuotaManagerTest {
/**
* Tests parsing for <user, client-id> quotas.
* Quotas persisted in Zookeeper in /config/users/<user>/clients/<client-id>, default in /config/users/<default>/clients/<default>
* Quotas persisted in ZooKeeper in /config/users/<user>/clients/<client-id>, default in /config/users/<default>/clients/<default>
*/
@Test
def testUserClientIdQuotaParsing() {

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package unit.kafka.server
package kafka.server
import java.lang.{Long => JLong}
import java.net.InetAddress
@ -30,7 +30,6 @@ import kafka.log.{Log, TimestampOffset}
import kafka.network.RequestChannel
import kafka.security.auth.Authorizer
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server._
import kafka.utils.{MockTime, TestUtils, ZkUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.UnsupportedVersionException

View File

@ -80,7 +80,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
val topic = topicPartition.split("-").head
val part = Integer.valueOf(topicPartition.split("-").last).intValue
// setup brokers in zookeeper as owners of partitions for this test
// setup brokers in ZooKeeper as owners of partitions for this test
AdminUtils.createTopic(zkUtils, topic, 1, 1)
val logManager = server.getLogManager
@ -115,7 +115,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
val topic = topicPartition.split("-").head
val part = Integer.valueOf(topicPartition.split("-").last).intValue
// setup brokers in zookeeper as owners of partitions for this test
// setup brokers in ZooKeeper as owners of partitions for this test
AdminUtils.createTopic(zkUtils, topic, 1, 1)
val logManager = server.getLogManager
@ -154,7 +154,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
val topic = topicPartition.split("-").head
// setup brokers in zookeeper as owners of partitions for this test
// setup brokers in ZooKeeper as owners of partitions for this test
createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
var offsetChanged = false
@ -178,7 +178,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
val topic = topicPartition.split("-").head
val part = Integer.valueOf(topicPartition.split("-").last).intValue
// setup brokers in zookeeper as owners of partitions for this test
// setup brokers in ZooKeeper as owners of partitions for this test
AdminUtils.createTopic(zkUtils, topic, 3, 1)
val logManager = server.getLogManager
@ -207,7 +207,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
val topic = topicPartition.split("-").head
val part = Integer.valueOf(topicPartition.split("-").last).intValue
// setup brokers in zookeeper as owners of partitions for this test
// setup brokers in ZooKeeper as owners of partitions for this test
AdminUtils.createTopic(zkUtils, topic, 3, 1)
val logManager = server.getLogManager

View File

@ -139,7 +139,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
updateProducer()
leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
assertTrue("Leader must remain on broker 1, in case of zookeeper session expiration it can move to broker 0",
assertTrue("Leader must remain on broker 1, in case of ZooKeeper session expiration it can move to broker 0",
leader == 0 || leader == 1)
assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L))
@ -150,7 +150,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
server2.startup()
updateProducer()
leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt = Some(leader))
assertTrue("Leader must remain on broker 0, in case of zookeeper session expiration it can move to broker 1",
assertTrue("Leader must remain on broker 0, in case of ZooKeeper session expiration it can move to broker 1",
leader == 0 || leader == 1)
sendMessages(1)

View File

@ -74,7 +74,7 @@ object TestUtils extends Logging {
/** Port to use for unit tests that mock/don't require a real ZK server. */
val MockZkPort = 1
/** Zookeeper connection string to use for unit tests that mock/don't require a real ZK server. */
/** ZooKeeper connection string to use for unit tests that mock/don't require a real ZK server. */
val MockZkConnect = "127.0.0.1:" + MockZkPort
private val transactionStatusKey = "transactionStatus"
@ -273,7 +273,7 @@ object TestUtils extends Logging {
}
/**
* Create a topic in zookeeper.
* Create a topic in ZooKeeper.
* Wait until the leader is elected and the metadata is propagated to all brokers.
* Return the leader for each partition.
*/
@ -293,7 +293,7 @@ object TestUtils extends Logging {
}
/**
* Create a topic in zookeeper using a customized replica assignment.
* Create a topic in ZooKeeper using a customized replica assignment.
* Wait until the leader is elected and the metadata is propagated to all brokers.
* Return the leader for each partition.
*/
@ -1172,7 +1172,7 @@ object TestUtils extends Logging {
TestUtils.waitUntilTrue(() =>
servers.forall(server => topicPartitions.forall(tp => server.replicaManager.getPartition(tp).isEmpty)),
"Replica manager's should have deleted all of this topic's partitions")
// ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper
// ensure that logs from all replicas are deleted if delete topic is marked successful in ZooKeeper
assertTrue("Replica logs not deleted after delete topic is complete",
servers.forall(server => topicPartitions.forall(tp => server.getLogManager.getLog(tp).isEmpty)))
// ensure that topic is removed from all cleaner offsets

View File

@ -25,19 +25,19 @@ import java.net.InetSocketAddress
import kafka.utils.CoreUtils
import org.apache.kafka.common.utils.Utils
class EmbeddedZookeeper() {
class EmbeddedZooKeeper() {
val snapshotDir = TestUtils.tempDir()
val logDir = TestUtils.tempDir()
val tickTime = 500
val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime)
val zooKeeperServer = new ZooKeeperServer(snapshotDir, logDir, tickTime)
val factory = new NIOServerCnxnFactory()
private val addr = new InetSocketAddress("127.0.0.1", TestUtils.RandomPort)
factory.configure(addr, 0)
factory.startup(zookeeper)
val port = zookeeper.getClientPort()
factory.startup(zooKeeperServer)
val port = zooKeeperServer.getClientPort()
def shutdown() {
CoreUtils.swallow(zookeeper.shutdown())
CoreUtils.swallow(zooKeeperServer.shutdown())
CoreUtils.swallow(factory.shutdown())
def isDown(): Boolean = {

View File

@ -38,7 +38,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
try {
zkUtils.zkPath.resetNamespaceCheckedState
zkUtils.createPersistentPath(path)
fail("Failed to throw ConfigException for missing zookeeper root node")
fail("Failed to throw ConfigException for missing ZooKeeper root node")
} catch {
case _: ConfigException =>
}
@ -62,7 +62,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
try {
zkUtils.zkPath.resetNamespaceCheckedState
zkUtils.makeSurePersistentPathExists(path)
fail("Failed to throw ConfigException for missing zookeeper root node")
fail("Failed to throw ConfigException for missing ZooKeeper root node")
} catch {
case _: ConfigException =>
}
@ -86,7 +86,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
try {
zkUtils.zkPath.resetNamespaceCheckedState
zkUtils.createEphemeralPathExpectConflict(path, "somedata")
fail("Failed to throw ConfigException for missing zookeeper root node")
fail("Failed to throw ConfigException for missing ZooKeeper root node")
} catch {
case _: ConfigException =>
}
@ -111,7 +111,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
try {
zkUtils.zkPath.resetNamespaceCheckedState
zkUtils.createSequentialPersistentPath(path)
fail("Failed to throw ConfigException for missing zookeeper root node")
fail("Failed to throw ConfigException for missing ZooKeeper root node")
} catch {
case _: ConfigException =>
}

View File

@ -41,14 +41,14 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
protected val zkAclsEnabled: Option[Boolean] = None
var zkUtils: ZkUtils = null
var zookeeper: EmbeddedZookeeper = null
var zookeeper: EmbeddedZooKeeper = null
def zkPort: Int = zookeeper.port
def zkConnect: String = s"127.0.0.1:$zkPort"
@Before
def setUp() {
zookeeper = new EmbeddedZookeeper()
zookeeper = new EmbeddedZooKeeper()
zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled()))
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.controller
package kafka.zookeeper
import java.net.UnknownHostException
import java.nio.charset.StandardCharsets
@ -29,7 +29,7 @@ import org.apache.zookeeper.{CreateMode, ZooDefs}
import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue}
import org.junit.{After, Test}
class ZookeeperClientTest extends ZooKeeperTestHarness {
class ZooKeeperClientTest extends ZooKeeperTestHarness {
private val mockPath = "/foo"
@After
@ -41,58 +41,58 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
@Test(expected = classOf[UnknownHostException])
def testUnresolvableConnectString(): Unit = {
new ZookeeperClient("some.invalid.hostname.foo.bar.local", -1, -1, null)
new ZooKeeperClient("some.invalid.hostname.foo.bar.local", -1, -1, null)
}
@Test(expected = classOf[ZookeeperClientTimeoutException])
@Test(expected = classOf[ZooKeeperClientTimeoutException])
def testConnectionTimeout(): Unit = {
zookeeper.shutdown()
new ZookeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, null)
new ZooKeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, null)
}
@Test
def testConnection(): Unit = {
new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
}
@Test
def testDeleteNonExistentZNode(): Unit = {
val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val deleteResponse = zookeeperClient.handleRequest(DeleteRequest(mockPath, -1))
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, -1))
assertEquals("Response code should be NONODE", Code.NONODE, deleteResponse.resultCode)
}
@Test
def testDeleteExistingZNode(): Unit = {
import scala.collection.JavaConverters._
val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
val deleteResponse = zookeeperClient.handleRequest(DeleteRequest(mockPath, -1))
val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, -1))
assertEquals("Response code for delete should be OK", Code.OK, deleteResponse.resultCode)
}
@Test
def testExistsNonExistentZNode(): Unit = {
val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val existsResponse = zookeeperClient.handleRequest(ExistsRequest(mockPath))
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val existsResponse = zooKeeperClient.handleRequest(ExistsRequest(mockPath))
assertEquals("Response code should be NONODE", Code.NONODE, existsResponse.resultCode)
}
@Test
def testExistsExistingZNode(): Unit = {
import scala.collection.JavaConverters._
val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
val existsResponse = zookeeperClient.handleRequest(ExistsRequest(mockPath))
val existsResponse = zooKeeperClient.handleRequest(ExistsRequest(mockPath))
assertEquals("Response code for exists should be OK", Code.OK, existsResponse.resultCode)
}
@Test
def testGetDataNonExistentZNode(): Unit = {
val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val getDataResponse = zookeeperClient.handleRequest(GetDataRequest(mockPath))
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val getDataResponse = zooKeeperClient.handleRequest(GetDataRequest(mockPath))
assertEquals("Response code should be NONODE", Code.NONODE, getDataResponse.resultCode)
}
@ -100,19 +100,19 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
def testGetDataExistingZNode(): Unit = {
import scala.collection.JavaConverters._
val data = bytes
val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala,
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala,
CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
val getDataResponse = zookeeperClient.handleRequest(GetDataRequest(mockPath))
val getDataResponse = zooKeeperClient.handleRequest(GetDataRequest(mockPath))
assertEquals("Response code for getData should be OK", Code.OK, getDataResponse.resultCode)
assertArrayEquals("Data for getData should match created znode data", data, getDataResponse.data)
}
@Test
def testSetDataNonExistentZNode(): Unit = {
val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val setDataResponse = zookeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1))
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val setDataResponse = zooKeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1))
assertEquals("Response code should be NONODE", Code.NONODE, setDataResponse.resultCode)
}
@ -120,31 +120,31 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
def testSetDataExistingZNode(): Unit = {
import scala.collection.JavaConverters._
val data = bytes
val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
val setDataResponse = zookeeperClient.handleRequest(SetDataRequest(mockPath, data, -1))
val setDataResponse = zooKeeperClient.handleRequest(SetDataRequest(mockPath, data, -1))
assertEquals("Response code for setData should be OK", Code.OK, setDataResponse.resultCode)
val getDataResponse = zookeeperClient.handleRequest(GetDataRequest(mockPath))
val getDataResponse = zooKeeperClient.handleRequest(GetDataRequest(mockPath))
assertEquals("Response code for getData should be OK", Code.OK, getDataResponse.resultCode)
assertArrayEquals("Data for getData should match setData's data", data, getDataResponse.data)
}
@Test
def testGetAclNonExistentZNode(): Unit = {
val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val getAclResponse = zookeeperClient.handleRequest(GetAclRequest(mockPath))
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val getAclResponse = zooKeeperClient.handleRequest(GetAclRequest(mockPath))
assertEquals("Response code should be NONODE", Code.NONODE, getAclResponse.resultCode)
}
@Test
def testGetAclExistingZNode(): Unit = {
import scala.collection.JavaConverters._
val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
val getAclResponse = zookeeperClient.handleRequest(GetAclRequest(mockPath))
val getAclResponse = zooKeeperClient.handleRequest(GetAclRequest(mockPath))
assertEquals("Response code for getAcl should be OK", Code.OK, getAclResponse.resultCode)
assertEquals("ACL should be " + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, getAclResponse.acl)
}
@ -152,26 +152,26 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
@Test
def testSetAclNonExistentZNode(): Unit = {
import scala.collection.JavaConverters._
val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val setAclResponse = zookeeperClient.handleRequest(SetAclRequest(mockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, -1))
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val setAclResponse = zooKeeperClient.handleRequest(SetAclRequest(mockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, -1))
assertEquals("Response code should be NONODE", Code.NONODE, setAclResponse.resultCode)
}
@Test
def testGetChildrenNonExistentZNode(): Unit = {
val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val getChildrenResponse = zookeeperClient.handleRequest(GetChildrenRequest(mockPath))
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val getChildrenResponse = zooKeeperClient.handleRequest(GetChildrenRequest(mockPath))
assertEquals("Response code should be NONODE", Code.NONODE, getChildrenResponse.resultCode)
}
@Test
def testGetChildrenExistingZNode(): Unit = {
import scala.collection.JavaConverters._
val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
val getChildrenResponse = zookeeperClient.handleRequest(GetChildrenRequest(mockPath))
val getChildrenResponse = zooKeeperClient.handleRequest(GetChildrenRequest(mockPath))
assertEquals("Response code for getChildren should be OK", Code.OK, getChildrenResponse.resultCode)
assertEquals("getChildren should return no children", Seq.empty[String], getChildrenResponse.children)
}
@ -183,18 +183,18 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
val child2 = "child2"
val child1Path = mockPath + "/" + child1
val child2Path = mockPath + "/" + child2
val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
val createResponseChild1 = zookeeperClient.handleRequest(CreateRequest(child1Path, Array.empty[Byte],
val createResponseChild1 = zooKeeperClient.handleRequest(CreateRequest(child1Path, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create child1 should be OK", Code.OK, createResponseChild1.resultCode)
val createResponseChild2 = zookeeperClient.handleRequest(CreateRequest(child2Path, Array.empty[Byte],
val createResponseChild2 = zooKeeperClient.handleRequest(CreateRequest(child2Path, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create child2 should be OK", Code.OK, createResponseChild2.resultCode)
val getChildrenResponse = zookeeperClient.handleRequest(GetChildrenRequest(mockPath))
val getChildrenResponse = zooKeeperClient.handleRequest(GetChildrenRequest(mockPath))
assertEquals("Response code for getChildren should be OK", Code.OK, getChildrenResponse.resultCode)
assertEquals("getChildren should return two children", Seq(child1, child2), getChildrenResponse.children.sorted)
}
@ -202,12 +202,12 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
@Test
def testPipelinedGetData(): Unit = {
import scala.collection.JavaConverters._
val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val createRequests = (1 to 3).map(x => CreateRequest("/" + x, (x * 2).toString.getBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
val createResponses = createRequests.map(zookeeperClient.handleRequest)
val createResponses = createRequests.map(zooKeeperClient.handleRequest)
createResponses.foreach(createResponse => assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode))
val getDataRequests = (1 to 3).map(x => GetDataRequest("/" + x))
val getDataResponses = zookeeperClient.handleRequests(getDataRequests)
val getDataResponses = zooKeeperClient.handleRequests(getDataRequests)
getDataResponses.foreach(getDataResponse => assertEquals("Response code for getData should be OK", Code.OK,
getDataResponse.resultCode))
getDataResponses.zipWithIndex.foreach { case (getDataResponse, i) =>
@ -219,13 +219,13 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
@Test
def testMixedPipeline(): Unit = {
import scala.collection.JavaConverters._
val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
val getDataRequest = GetDataRequest(mockPath)
val setDataRequest = SetDataRequest("/nonexistent", Array.empty[Byte], -1)
val responses = zookeeperClient.handleRequests(Seq(getDataRequest, setDataRequest))
val responses = zooKeeperClient.handleRequests(Seq(getDataRequest, setDataRequest))
assertEquals("Response code for getData should be OK", Code.OK, responses.head.resultCode)
assertArrayEquals("Data for getData should be empty", Array.empty[Byte], responses.head.asInstanceOf[GetDataResponse].data)
assertEquals("Response code for setData should be NONODE", Code.NONODE, responses.last.resultCode)
@ -234,7 +234,7 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
@Test
def testZNodeChangeHandlerForCreation(): Unit = {
import scala.collection.JavaConverters._
val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
val zNodeChangeHandler = new ZNodeChangeHandler {
override def handleCreation(): Unit = {
@ -243,10 +243,10 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
override val path: String = mockPath
}
zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
zooKeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
val existsRequest = ExistsRequest(mockPath)
val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)
val responses = zookeeperClient.handleRequests(Seq(existsRequest, createRequest))
val responses = zooKeeperClient.handleRequests(Seq(existsRequest, createRequest))
assertEquals("Response code for exists should be NONODE", Code.NONODE, responses.head.resultCode)
assertEquals("Response code for create should be OK", Code.OK, responses.last.resultCode)
assertTrue("Failed to receive create notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
@ -255,7 +255,7 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
@Test
def testZNodeChangeHandlerForDeletion(): Unit = {
import scala.collection.JavaConverters._
val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
val zNodeChangeHandler = new ZNodeChangeHandler {
override def handleDeletion(): Unit = {
@ -264,13 +264,13 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
override val path: String = mockPath
}
zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
zooKeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
val existsRequest = ExistsRequest(mockPath)
val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)
val responses = zookeeperClient.handleRequests(Seq(createRequest, existsRequest))
val responses = zooKeeperClient.handleRequests(Seq(createRequest, existsRequest))
assertEquals("Response code for create should be OK", Code.OK, responses.last.resultCode)
assertEquals("Response code for exists should be OK", Code.OK, responses.head.resultCode)
val deleteResponse = zookeeperClient.handleRequest(DeleteRequest(mockPath, -1))
val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, -1))
assertEquals("Response code for delete should be OK", Code.OK, deleteResponse.resultCode)
assertTrue("Failed to receive delete notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
}
@ -278,7 +278,7 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
@Test
def testZNodeChangeHandlerForDataChange(): Unit = {
import scala.collection.JavaConverters._
val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
val zNodeChangeHandler = new ZNodeChangeHandler {
override def handleDataChange(): Unit = {
@ -287,13 +287,13 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
override val path: String = mockPath
}
zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
zooKeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
val existsRequest = ExistsRequest(mockPath)
val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)
val responses = zookeeperClient.handleRequests(Seq(createRequest, existsRequest))
val responses = zooKeeperClient.handleRequests(Seq(createRequest, existsRequest))
assertEquals("Response code for create should be OK", Code.OK, responses.last.resultCode)
assertEquals("Response code for exists should be OK", Code.OK, responses.head.resultCode)
val setDataResponse = zookeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1))
val setDataResponse = zooKeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1))
assertEquals("Response code for setData should be OK", Code.OK, setDataResponse.resultCode)
assertTrue("Failed to receive data change notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
}
@ -301,7 +301,7 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
@Test
def testZNodeChildChangeHandlerForChildChange(): Unit = {
import scala.collection.JavaConverters._
val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zNodeChildChangeHandlerCountDownLatch = new CountDownLatch(1)
val zNodeChildChangeHandler = new ZNodeChildChangeHandler {
override def handleChildChange(): Unit = {
@ -312,12 +312,12 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
val child1 = "child1"
val child1Path = mockPath + "/" + child1
val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
zookeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler)
val getChildrenResponse = zookeeperClient.handleRequest(GetChildrenRequest(mockPath))
zooKeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler)
val getChildrenResponse = zooKeeperClient.handleRequest(GetChildrenRequest(mockPath))
assertEquals("Response code for getChildren should be OK", Code.OK, getChildrenResponse.resultCode)
val createResponseChild1 = zookeeperClient.handleRequest(CreateRequest(child1Path, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
val createResponseChild1 = zooKeeperClient.handleRequest(CreateRequest(child1Path, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create child1 should be OK", Code.OK, createResponseChild1.resultCode)
assertTrue("Failed to receive child change notification", zNodeChildChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
}
@ -331,7 +331,7 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
stateChangeHandlerCountDownLatch.countDown()
}
}
new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, stateChangeHandler)
new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, stateChangeHandler)
assertTrue("Failed to receive auth failed notification", stateChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
}

View File

@ -20,7 +20,7 @@ import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.ZkUtils;
import kafka.zk.EmbeddedZookeeper;
import kafka.zk.EmbeddedZooKeeper;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.security.JaasUtils;
@ -47,7 +47,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected
private static final int TOPIC_CREATION_TIMEOUT = 30000;
private static final int TOPIC_DELETION_TIMEOUT = 30000;
private EmbeddedZookeeper zookeeper = null;
private EmbeddedZooKeeper zookeeper = null;
private final KafkaEmbedded[] brokers;
private ZkUtils zkUtils = null;
@ -84,7 +84,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
public void start() throws IOException, InterruptedException {
log.debug("Initiating embedded Kafka cluster startup");
log.debug("Starting a ZooKeeper instance");
zookeeper = new EmbeddedZookeeper();
zookeeper = new EmbeddedZooKeeper();
log.debug("ZooKeeper instance is running at {}", zKConnectString());
zkUtils = ZkUtils.apply(