KAFKA-4447; Controller resigned but it also acts as a controller for a long time

Author: Ismael Juma <ismael@juma.me.uk>
Author: xiguantiaozhan <kafkausr@126.com>
Author: tuyang <tuyang@meituan.com>

Reviewers: Jiangjie Qin <becket.qin@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Onur Karaman <okaraman@linkedin.com>

Closes #2191 from xiguantiaozhan/avoid_swamp_controllerLog
This commit is contained in:
Json Tu 2016-12-19 18:35:30 -08:00 committed by Jiangjie Qin
parent 8b84d14c6f
commit a786be9478
12 changed files with 156 additions and 132 deletions

View File

@ -497,7 +497,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val topicCount: TopicCount, val topicCount: TopicCount,
val loadBalancerListener: ZKRebalancerListener) val loadBalancerListener: ZKRebalancerListener)
extends IZkStateListener { extends IZkStateListener {
@throws(classOf[Exception]) @throws[Exception]
def handleStateChanged(state: KeeperState) { def handleStateChanged(state: KeeperState) {
// do nothing, since zkclient will do reconnect for us. // do nothing, since zkclient will do reconnect for us.
} }
@ -509,7 +509,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
* @throws Exception * @throws Exception
* On any error. * On any error.
*/ */
@throws(classOf[Exception]) @throws[Exception]
def handleNewSession() { def handleNewSession() {
/** /**
* When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a * When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
@ -545,7 +545,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
} }
} }
@throws(classOf[Exception]) @throws[Exception]
def handleDataDeleted(dataPath : String) { def handleDataDeleted(dataPath : String) {
// TODO: This need to be implemented when we support delete topic // TODO: This need to be implemented when we support delete topic
warn("Topic for path " + dataPath + " gets deleted, which should not happen at this time") warn("Topic for path " + dataPath + " gets deleted, which should not happen at this time")
@ -597,7 +597,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
} }
watcherExecutorThread.start() watcherExecutorThread.start()
@throws(classOf[Exception]) @throws[Exception]
def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
rebalanceEventTriggered() rebalanceEventTriggered()
} }

View File

@ -59,7 +59,7 @@ class ZookeeperTopicEventWatcher(val zkUtils: ZkUtils,
class ZkTopicEventListener extends IZkChildListener { class ZkTopicEventListener extends IZkChildListener {
@throws(classOf[Exception]) @throws[Exception]
def handleChildChange(parent: String, children: java.util.List[String]) { def handleChildChange(parent: String, children: java.util.List[String]) {
lock.synchronized { lock.synchronized {
try { try {
@ -81,10 +81,10 @@ class ZookeeperTopicEventWatcher(val zkUtils: ZkUtils,
class ZkSessionExpireListener(val topicEventListener: ZkTopicEventListener) class ZkSessionExpireListener(val topicEventListener: ZkTopicEventListener)
extends IZkStateListener { extends IZkStateListener {
@throws(classOf[Exception]) @throws[Exception]
def handleStateChanged(state: KeeperState) { } def handleStateChanged(state: KeeperState) { }
@throws(classOf[Exception]) @throws[Exception]
def handleNewSession() { def handleNewSession() {
lock.synchronized { lock.synchronized {
if (zkUtils != null) { if (zkUtils != null) {

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.controller
import kafka.utils.Logging
import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener}
import scala.collection.JavaConverters._
trait ControllerZkListener extends Logging {
logIdent = s"[$logName on Controller " + controller.config.brokerId + "]: "
protected def logName: String
protected def controller: KafkaController
}
trait ControllerZkChildListener extends IZkChildListener with ControllerZkListener {
@throws[Exception]
final def handleChildChange(parentPath: String, currentChildren: java.util.List[String]): Unit = {
// Due to zkclient's callback order, it's possible for the callback to be triggered after the controller has moved
if (controller.isActive)
doHandleChildChange(parentPath, currentChildren.asScala)
}
@throws[Exception]
def doHandleChildChange(parentPath: String, currentChildren: Seq[String]): Unit
}
trait ControllerZkDataListener extends IZkDataListener with ControllerZkListener {
@throws[Exception]
final def handleDataChange(dataPath: String, data: AnyRef): Unit = {
// Due to zkclient's callback order, it's possible for the callback to be triggered after the controller has moved
if (controller.isActive)
doHandleDataChange(dataPath, data)
}
@throws[Exception]
def doHandleDataChange(dataPath: String, data: AnyRef): Unit
@throws[Exception]
final def handleDataDeleted(dataPath: String): Unit = {
// Due to zkclient's callback order, it's possible for the callback to be triggered after the controller has moved
if (controller.isActive)
doHandleDataDeleted(dataPath)
}
@throws[Exception]
def doHandleDataDeleted(dataPath: String): Unit
}

View File

@ -16,8 +16,6 @@
*/ */
package kafka.controller package kafka.controller
import java.util
import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException} import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException}
import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse} import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse}
@ -39,7 +37,7 @@ import kafka.utils.CoreUtils._
import org.apache.zookeeper.Watcher.Event.KeeperState import org.apache.zookeeper.Watcher.Event.KeeperState
import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Time import org.apache.kafka.common.utils.Time
import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient} import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException} import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException}
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
@ -152,7 +150,7 @@ object KafkaController extends Logging {
} }
} }
class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup { class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Controller " + config.brokerId + "]: " this.logIdent = "[Controller " + config.brokerId + "]: "
private var isRunning = true private var isRunning = true
private val stateChangeLogger = KafkaController.stateChangeLogger private val stateChangeLogger = KafkaController.stateChangeLogger
@ -187,7 +185,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
new Gauge[Int] { new Gauge[Int] {
def value(): Int = { def value(): Int = {
inLock(controllerContext.controllerLock) { inLock(controllerContext.controllerLock) {
if (!isActive()) if (!isActive)
0 0
else else
controllerContext.partitionLeadershipInfo.count(p => !controllerContext.liveOrShuttingDownBrokerIds.contains(p._2.leaderAndIsr.leader)) controllerContext.partitionLeadershipInfo.count(p => !controllerContext.liveOrShuttingDownBrokerIds.contains(p._2.leaderAndIsr.leader))
@ -201,7 +199,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
new Gauge[Int] { new Gauge[Int] {
def value(): Int = { def value(): Int = {
inLock(controllerContext.controllerLock) { inLock(controllerContext.controllerLock) {
if (!isActive()) if (!isActive)
0 0
else else
controllerContext.partitionReplicaAssignment.count { controllerContext.partitionReplicaAssignment.count {
@ -230,7 +228,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
*/ */
def shutdownBroker(id: Int) : Set[TopicAndPartition] = { def shutdownBroker(id: Int) : Set[TopicAndPartition] = {
if (!isActive()) { if (!isActive) {
throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown") throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown")
} }
@ -398,7 +396,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
/** /**
* Returns true if this broker is the current controller. * Returns true if this broker is the current controller.
*/ */
def isActive(): Boolean = { def isActive: Boolean = {
inLock(controllerContext.controllerLock) { inLock(controllerContext.controllerLock) {
controllerContext.controllerChannelManager != null controllerContext.controllerChannelManager != null
} }
@ -1150,7 +1148,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
class SessionExpirationListener() extends IZkStateListener with Logging { class SessionExpirationListener() extends IZkStateListener with Logging {
this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], " this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
@throws(classOf[Exception])
def handleStateChanged(state: KeeperState) { def handleStateChanged(state: KeeperState) {
// do nothing, since zkclient will do reconnect for us. // do nothing, since zkclient will do reconnect for us.
} }
@ -1161,7 +1159,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
* *
* @throws Exception On any error. * @throws Exception On any error.
*/ */
@throws(classOf[Exception]) @throws[Exception]
def handleNewSession() { def handleNewSession() {
info("ZK expired; shut down all controller components and try to re-elect") info("ZK expired; shut down all controller components and try to re-elect")
onControllerResignation() onControllerResignation()
@ -1170,13 +1168,13 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
} }
} }
override def handleSessionEstablishmentError(error: Throwable): Unit = { def handleSessionEstablishmentError(error: Throwable): Unit = {
//no-op handleSessionEstablishmentError in KafkaHealthCheck should handle this error in its handleSessionEstablishmentError //no-op handleSessionEstablishmentError in KafkaHealthCheck should handle this error in its handleSessionEstablishmentError
} }
} }
private def checkAndTriggerPartitionRebalance(): Unit = { private def checkAndTriggerPartitionRebalance(): Unit = {
if (isActive()) { if (isActive) {
trace("checking need to trigger partition rebalance") trace("checking need to trigger partition rebalance")
// get all the active brokers // get all the active brokers
var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null
@ -1234,18 +1232,18 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
* If any of the above conditions are satisfied, it logs an error and removes the partition from list of reassigned * If any of the above conditions are satisfied, it logs an error and removes the partition from list of reassigned
* partitions. * partitions.
*/ */
class PartitionsReassignedListener(controller: KafkaController) extends IZkDataListener with Logging { class PartitionsReassignedListener(protected val controller: KafkaController) extends ControllerZkDataListener {
this.logIdent = "[PartitionsReassignedListener on " + controller.config.brokerId + "]: " private val controllerContext = controller.controllerContext
val zkUtils = controller.controllerContext.zkUtils
val controllerContext = controller.controllerContext protected def logName = "PartitionsReassignedListener"
/** /**
* Invoked when some partitions are reassigned by the admin command * Invoked when some partitions are reassigned by the admin command
* *
* @throws Exception On any error. * @throws Exception On any error.
*/ */
@throws(classOf[Exception]) @throws[Exception]
def handleDataChange(dataPath: String, data: Object) { def doHandleDataChange(dataPath: String, data: AnyRef) {
debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s" debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s"
.format(dataPath, data)) .format(dataPath, data))
val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString) val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString)
@ -1266,30 +1264,20 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL
} }
} }
/** def doHandleDataDeleted(dataPath: String) {}
* Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
*
* @throws Exception On any error.
*/
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
}
} }
class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: String, partition: Int, class ReassignedPartitionsIsrChangeListener(protected val controller: KafkaController, topic: String, partition: Int,
reassignedReplicas: Set[Int]) reassignedReplicas: Set[Int]) extends ControllerZkDataListener {
extends IZkDataListener with Logging { private val zkUtils = controller.controllerContext.zkUtils
this.logIdent = "[ReassignedPartitionsIsrChangeListener on controller " + controller.config.brokerId + "]: " private val controllerContext = controller.controllerContext
val zkUtils = controller.controllerContext.zkUtils
val controllerContext = controller.controllerContext protected def logName = "ReassignedPartitionsIsrChangeListener"
/** /**
* Invoked when some partitions need to move leader to preferred replica * Invoked when some partitions need to move leader to preferred replica
*
* @throws Exception On any error.
*/ */
@throws(classOf[Exception]) def doHandleDataChange(dataPath: String, data: AnyRef) {
def handleDataChange(dataPath: String, data: Object) {
inLock(controllerContext.controllerLock) { inLock(controllerContext.controllerLock) {
debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data)) debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data))
val topicAndPartition = TopicAndPartition(topic, partition) val topicAndPartition = TopicAndPartition(topic, partition)
@ -1325,13 +1313,8 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
} }
} }
/** def doHandleDataDeleted(dataPath: String) {}
* @throws Exception
* On any error.
*/
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
}
} }
/** /**
@ -1339,23 +1322,22 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
* *
* @param controller * @param controller
*/ */
class IsrChangeNotificationListener(controller: KafkaController) extends IZkChildListener with Logging { class IsrChangeNotificationListener(protected val controller: KafkaController) extends ControllerZkChildListener {
override def handleChildChange(parentPath: String, currentChildren: util.List[String]): Unit = { protected def logName = "IsrChangeNotificationListener"
import scala.collection.JavaConverters._
def doHandleChildChange(parentPath: String, currentChildren: Seq[String]): Unit = {
inLock(controller.controllerContext.controllerLock) { inLock(controller.controllerContext.controllerLock) {
debug("[IsrChangeNotificationListener] Fired!!!") debug("ISR change notification listener fired")
val childrenAsScala: mutable.Buffer[String] = currentChildren.asScala
try { try {
val topicAndPartitions: immutable.Set[TopicAndPartition] = childrenAsScala.flatMap(x => getTopicAndPartition(x)).toSet val topicAndPartitions = currentChildren.flatMap(getTopicAndPartition).toSet
if (topicAndPartitions.nonEmpty) { if (topicAndPartitions.nonEmpty) {
controller.updateLeaderAndIsrCache(topicAndPartitions) controller.updateLeaderAndIsrCache(topicAndPartitions)
processUpdateNotifications(topicAndPartitions) processUpdateNotifications(topicAndPartitions)
} }
} finally { } finally {
// delete processed children // delete processed children
childrenAsScala.map(x => controller.controllerContext.zkUtils.deletePath( currentChildren.map(x => controller.controllerContext.zkUtils.deletePath(
ZkUtils.IsrChangeNotificationPath + "/" + x)) ZkUtils.IsrChangeNotificationPath + "/" + x))
} }
} }
@ -1404,28 +1386,28 @@ object IsrChangeNotificationListener {
* Starts the preferred replica leader election for the list of partitions specified under * Starts the preferred replica leader election for the list of partitions specified under
* /admin/preferred_replica_election - * /admin/preferred_replica_election -
*/ */
class PreferredReplicaElectionListener(controller: KafkaController) extends IZkDataListener with Logging { class PreferredReplicaElectionListener(protected val controller: KafkaController) extends ControllerZkDataListener {
this.logIdent = "[PreferredReplicaElectionListener on " + controller.config.brokerId + "]: " private val controllerContext = controller.controllerContext
val zkUtils = controller.controllerContext.zkUtils
val controllerContext = controller.controllerContext protected def logName = "PreferredReplicaElectionListener"
/** /**
* Invoked when some partitions are reassigned by the admin command * Invoked when some partitions are reassigned by the admin command
* *
* @throws Exception On any error. * @throws Exception On any error.
*/ */
@throws(classOf[Exception]) @throws[Exception]
def handleDataChange(dataPath: String, data: Object) { def doHandleDataChange(dataPath: String, data: AnyRef) {
debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election %s" debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election %s"
.format(dataPath, data.toString)) .format(dataPath, data.toString))
inLock(controllerContext.controllerLock) { inLock(controllerContext.controllerLock) {
val partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString) val partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString)
if(controllerContext.partitionsUndergoingPreferredReplicaElection.nonEmpty) if (controllerContext.partitionsUndergoingPreferredReplicaElection.nonEmpty)
info("These partitions are already undergoing preferred replica election: %s" info("These partitions are already undergoing preferred replica election: %s"
.format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(","))) .format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
val partitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection val partitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection
val partitionsForTopicsToBeDeleted = partitions.filter(p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic)) val partitionsForTopicsToBeDeleted = partitions.filter(p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
if(partitionsForTopicsToBeDeleted.nonEmpty) { if (partitionsForTopicsToBeDeleted.nonEmpty) {
error("Skipping preferred replica election for partitions %s since the respective topics are being deleted" error("Skipping preferred replica election for partitions %s since the respective topics are being deleted"
.format(partitionsForTopicsToBeDeleted)) .format(partitionsForTopicsToBeDeleted))
} }
@ -1433,12 +1415,7 @@ class PreferredReplicaElectionListener(controller: KafkaController) extends IZkD
} }
} }
/** def doHandleDataDeleted(dataPath: String) {}
* @throws Exception On any error.
*/
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
}
} }
case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty, case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,

View File

@ -17,13 +17,11 @@
package kafka.controller package kafka.controller
import collection._ import collection._
import collection.JavaConverters._
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import kafka.api.LeaderAndIsr import kafka.api.LeaderAndIsr
import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
import kafka.utils.{Logging, ReplicationUtils} import kafka.utils.{Logging, ReplicationUtils}
import kafka.utils.ZkUtils._ import kafka.utils.ZkUtils._
import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener}
import org.I0Itec.zkclient.exception.ZkNodeExistsException import org.I0Itec.zkclient.exception.ZkNodeExistsException
import kafka.controller.Callbacks.CallbackBuilder import kafka.controller.Callbacks.CallbackBuilder
import kafka.utils.CoreUtils._ import kafka.utils.CoreUtils._
@ -48,8 +46,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller) private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
private val hasStarted = new AtomicBoolean(false) private val hasStarted = new AtomicBoolean(false)
private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext) private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext)
private val topicChangeListener = new TopicChangeListener() private val topicChangeListener = new TopicChangeListener(controller)
private val deleteTopicsListener = new DeleteTopicsListener() private val deleteTopicsListener = new DeleteTopicsListener(controller)
private val partitionModificationsListeners: mutable.Map[String, PartitionModificationsListener] = mutable.Map.empty private val partitionModificationsListeners: mutable.Map[String, PartitionModificationsListener] = mutable.Map.empty
private val stateChangeLogger = KafkaController.stateChangeLogger private val stateChangeLogger = KafkaController.stateChangeLogger
@ -375,7 +373,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
} }
def registerPartitionChangeListener(topic: String) = { def registerPartitionChangeListener(topic: String) = {
partitionModificationsListeners.put(topic, new PartitionModificationsListener(topic)) partitionModificationsListeners.put(topic, new PartitionModificationsListener(controller, topic))
zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic)) zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic))
} }
@ -406,17 +404,17 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
/** /**
* This is the zookeeper listener that triggers all the state transitions for a partition * This is the zookeeper listener that triggers all the state transitions for a partition
*/ */
class TopicChangeListener extends IZkChildListener with Logging { class TopicChangeListener(protected val controller: KafkaController) extends ControllerZkChildListener {
this.logIdent = "[TopicChangeListener on Controller " + controller.config.brokerId + "]: "
@throws(classOf[Exception]) protected def logName = "TopicChangeListener"
def handleChildChange(parentPath : String, children : java.util.List[String]) {
def doHandleChildChange(parentPath: String, children: Seq[String]) {
inLock(controllerContext.controllerLock) { inLock(controllerContext.controllerLock) {
if (hasStarted.get) { if (hasStarted.get) {
try { try {
val currentChildren = { val currentChildren = {
debug("Topic change listener fired for path %s with children %s".format(parentPath, children.asScala.mkString(","))) debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
children.asScala.toSet children.toSet
} }
val newTopics = currentChildren -- controllerContext.allTopics val newTopics = currentChildren -- controllerContext.allTopics
val deletedTopics = controllerContext.allTopics -- currentChildren val deletedTopics = controllerContext.allTopics -- currentChildren
@ -431,7 +429,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
if (newTopics.nonEmpty) if (newTopics.nonEmpty)
controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet) controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)
} catch { } catch {
case e: Throwable => error("Error while handling new topic", e ) case e: Throwable => error("Error while handling new topic", e)
} }
} }
} }
@ -443,21 +441,22 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
* 1. Add the topic to be deleted to the delete topics cache, only if the topic exists * 1. Add the topic to be deleted to the delete topics cache, only if the topic exists
* 2. If there are topics to be deleted, it signals the delete topic thread * 2. If there are topics to be deleted, it signals the delete topic thread
*/ */
class DeleteTopicsListener() extends IZkChildListener with Logging { class DeleteTopicsListener(protected val controller: KafkaController) extends ControllerZkChildListener {
this.logIdent = "[DeleteTopicsListener on " + controller.config.brokerId + "]: " private val zkUtils = controllerContext.zkUtils
val zkUtils = controllerContext.zkUtils
protected def logName = "DeleteTopicsListener"
/** /**
* Invoked when a topic is being deleted * Invoked when a topic is being deleted
* @throws Exception On any error. * @throws Exception On any error.
*/ */
@throws(classOf[Exception]) @throws[Exception]
def handleChildChange(parentPath : String, children : java.util.List[String]) { def doHandleChildChange(parentPath: String, children: Seq[String]) {
inLock(controllerContext.controllerLock) { inLock(controllerContext.controllerLock) {
var topicsToBeDeleted = children.asScala.toSet var topicsToBeDeleted = children.toSet
debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(","))) debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
if(nonExistentTopics.nonEmpty) { if (nonExistentTopics.nonEmpty) {
warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(",")) warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic))) nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
} }
@ -481,29 +480,20 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
// If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics // If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
for (topic <- topicsToBeDeleted) { for (topic <- topicsToBeDeleted) {
info("Removing " + getDeleteTopicPath(topic) + " since delete topic is disabled") info("Removing " + getDeleteTopicPath(topic) + " since delete topic is disabled")
val zkUtils = controllerContext.zkUtils
zkUtils.zkClient.delete(getDeleteTopicPath(topic)) zkUtils.zkClient.delete(getDeleteTopicPath(topic))
} }
} }
} }
} }
/** def doHandleDataDeleted(dataPath: String) {}
*
* @throws Exception
* On any error.
*/
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
}
} }
class PartitionModificationsListener(topic: String) extends IZkDataListener with Logging { class PartitionModificationsListener(protected val controller: KafkaController, topic: String) extends ControllerZkDataListener {
this.logIdent = "[AddPartitionsListener on " + controller.config.brokerId + "]: " protected def logName = "AddPartitionsListener"
@throws(classOf[Exception]) def doHandleDataChange(dataPath: String, data: AnyRef) {
def handleDataChange(dataPath : String, data: Object) {
inLock(controllerContext.controllerLock) { inLock(controllerContext.controllerLock) {
try { try {
info(s"Partition modification triggered $data for path $dataPath") info(s"Partition modification triggered $data for path $dataPath")
@ -521,15 +511,13 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
} }
} }
} catch { } catch {
case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e ) case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e)
} }
} }
} }
@throws(classOf[Exception])
def handleDataDeleted(parentPath : String) {
// this is not implemented for partition change // this is not implemented for partition change
} def doHandleDataDeleted(parentPath: String): Unit = {}
} }
} }

View File

@ -17,13 +17,11 @@
package kafka.controller package kafka.controller
import collection._ import collection._
import collection.JavaConverters._
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import kafka.common.{StateChangeFailedException, TopicAndPartition} import kafka.common.{StateChangeFailedException, TopicAndPartition}
import kafka.controller.Callbacks.CallbackBuilder import kafka.controller.Callbacks.CallbackBuilder
import kafka.utils.{Logging, ReplicationUtils, ZkUtils} import kafka.utils.{Logging, ReplicationUtils, ZkUtils}
import org.I0Itec.zkclient.IZkChildListener
import kafka.utils.CoreUtils._ import kafka.utils.CoreUtils._
/** /**
@ -49,7 +47,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
private val controllerId = controller.config.brokerId private val controllerId = controller.config.brokerId
private val zkUtils = controllerContext.zkUtils private val zkUtils = controllerContext.zkUtils
private val replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = mutable.Map.empty private val replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = mutable.Map.empty
private val brokerChangeListener = new BrokerChangeListener() private val brokerChangeListener = new BrokerChangeListener(controller)
private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller) private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
private val hasStarted = new AtomicBoolean(false) private val hasStarted = new AtomicBoolean(false)
private val stateChangeLogger = KafkaController.stateChangeLogger private val stateChangeLogger = KafkaController.stateChangeLogger
@ -348,15 +346,17 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
/** /**
* This is the zookeeper listener that triggers all the state transitions for a replica * This is the zookeeper listener that triggers all the state transitions for a replica
*/ */
class BrokerChangeListener() extends IZkChildListener with Logging { class BrokerChangeListener(protected val controller: KafkaController) extends ControllerZkChildListener {
this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: "
def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) { protected def logName = "BrokerChangeListener"
info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.asScala.sorted.mkString(",")))
def doHandleChildChange(parentPath: String, currentBrokerList: Seq[String]) {
info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.sorted.mkString(",")))
inLock(controllerContext.controllerLock) { inLock(controllerContext.controllerLock) {
if (hasStarted.get) { if (hasStarted.get) {
ControllerStats.leaderElectionTimer.time { ControllerStats.leaderElectionTimer.time {
try { try {
val curBrokers = currentBrokerList.asScala.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo) val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)
val curBrokerIds = curBrokers.map(_.id) val curBrokerIds = curBrokers.map(_.id)
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds

View File

@ -1153,7 +1153,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestChannel.sendResponse(new RequestChannel.Response(request, responseBody)) requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
} }
if (!controller.isActive()) { if (!controller.isActive) {
val results = createTopicsRequest.topics.asScala.map { case (topic, _) => val results = createTopicsRequest.topics.asScala.map { case (topic, _) =>
(topic, Errors.NOT_CONTROLLER) (topic, Errors.NOT_CONTROLLER)
} }
@ -1203,7 +1203,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestChannel.sendResponse(new RequestChannel.Response(request, responseBody)) requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
} }
if (!controller.isActive()) { if (!controller.isActive) {
val results = deleteTopicRequest.topics.asScala.map { topic => val results = deleteTopicRequest.topics.asScala.map { topic =>
(topic, Errors.NOT_CONTROLLER) (topic, Errors.NOT_CONTROLLER)
}.toMap }.toMap

View File

@ -92,12 +92,12 @@ class KafkaHealthcheck(brokerId: Int,
} }
} }
@throws(classOf[Exception]) @throws[Exception]
override def handleStateChanged(state: KeeperState) { override def handleStateChanged(state: KeeperState) {
stateToMeterMap.get(state).foreach(_.mark()) stateToMeterMap.get(state).foreach(_.mark())
} }
@throws(classOf[Exception]) @throws[Exception]
override def handleNewSession() { override def handleNewSession() {
info("re-registering broker info in ZK for broker " + brokerId) info("re-registering broker info in ZK for broker " + brokerId)
register() register()

View File

@ -16,7 +16,6 @@
*/ */
package kafka.server package kafka.server
import kafka.utils.ZkUtils._
import kafka.utils.CoreUtils._ import kafka.utils.CoreUtils._
import kafka.utils.{Json, Logging, ZKCheckedEphemeral} import kafka.utils.{Json, Logging, ZKCheckedEphemeral}
import org.I0Itec.zkclient.exception.ZkNodeExistsException import org.I0Itec.zkclient.exception.ZkNodeExistsException
@ -121,7 +120,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
* Called when the leader information stored in zookeeper has changed. Record the new leader in memory * Called when the leader information stored in zookeeper has changed. Record the new leader in memory
* @throws Exception On any error. * @throws Exception On any error.
*/ */
@throws(classOf[Exception]) @throws[Exception]
def handleDataChange(dataPath: String, data: Object) { def handleDataChange(dataPath: String, data: Object) {
val shouldResign = inLock(controllerContext.controllerLock) { val shouldResign = inLock(controllerContext.controllerLock) {
val amILeaderBeforeDataChange = amILeader val amILeaderBeforeDataChange = amILeader
@ -131,17 +130,16 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
amILeaderBeforeDataChange && !amILeader amILeaderBeforeDataChange && !amILeader
} }
if (shouldResign) { if (shouldResign)
onResigningAsLeader() onResigningAsLeader()
} }
}
/** /**
* Called when the leader information stored in zookeeper has been delete. Try to elect as the leader * Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
* @throws Exception * @throws Exception
* On any error. * On any error.
*/ */
@throws(classOf[Exception]) @throws[Exception]
def handleDataDeleted(dataPath: String) { def handleDataDeleted(dataPath: String) {
val shouldResign = inLock(controllerContext.controllerLock) { val shouldResign = inLock(controllerContext.controllerLock) {
debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader" debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
@ -149,9 +147,8 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
amILeader amILeader
} }
if(shouldResign) { if (shouldResign)
onResigningAsLeader() onResigningAsLeader()
}
inLock(controllerContext.controllerLock) { inLock(controllerContext.controllerLock) {
elect elect

View File

@ -68,7 +68,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
val epochMap: mutable.Map[Int, Int] = mutable.Map.empty val epochMap: mutable.Map[Int, Int] = mutable.Map.empty
for (server <- this.servers) { for (server <- this.servers) {
epochMap += (server.config.brokerId -> server.kafkaController.epoch) epochMap += (server.config.brokerId -> server.kafkaController.epoch)
if(server.kafkaController.isActive()) { if(server.kafkaController.isActive) {
controller = server controller = server
} }
} }

View File

@ -56,13 +56,13 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
def controllerSocketServer = { def controllerSocketServer = {
servers.find { server => servers.find { server =>
server.kafkaController.isActive() server.kafkaController.isActive
}.map(_.socketServer).getOrElse(throw new IllegalStateException("No controller broker is available")) }.map(_.socketServer).getOrElse(throw new IllegalStateException("No controller broker is available"))
} }
def notControllerSocketServer = { def notControllerSocketServer = {
servers.find { server => servers.find { server =>
!server.kafkaController.isActive() !server.kafkaController.isActive
}.map(_.socketServer).getOrElse(throw new IllegalStateException("No non-controller broker is available")) }.map(_.socketServer).getOrElse(throw new IllegalStateException("No non-controller broker is available"))
} }

View File

@ -49,7 +49,7 @@ class MetadataRequestTest extends BaseRequestTest {
@Test @Test
def testControllerId() { def testControllerId() {
val controllerServer = servers.find(_.kafkaController.isActive()).get val controllerServer = servers.find(_.kafkaController.isActive).get
val controllerId = controllerServer.config.brokerId val controllerId = controllerServer.config.brokerId
val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(), 1) val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(), 1)
@ -60,7 +60,7 @@ class MetadataRequestTest extends BaseRequestTest {
controllerServer.shutdown() controllerServer.shutdown()
controllerServer.startup() controllerServer.startup()
val controllerServer2 = servers.find(_.kafkaController.isActive()).get val controllerServer2 = servers.find(_.kafkaController.isActive).get
val controllerId2 = controllerServer2.config.brokerId val controllerId2 = controllerServer2.config.brokerId
assertNotEquals("Controller id should switch to a new broker", controllerId, controllerId2) assertNotEquals("Controller id should switch to a new broker", controllerId, controllerId2)
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {