kafka-1367; Broker topic metadata not kept in sync with ZooKeeper; patched by Ashish Singh; reviewed by Jun Rao

This commit is contained in:
Ashish Singh 2015-07-07 09:45:26 -07:00 committed by Jun Rao
parent 271b18d119
commit f77dc386c0
6 changed files with 158 additions and 20 deletions

View File

@ -1,6 +1,7 @@
package kafka.common
import kafka.cluster.{Replica, Partition}
import kafka.utils.Json
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
@ -24,6 +25,8 @@ import kafka.cluster.{Replica, Partition}
*/
case class TopicAndPartition(topic: String, partition: Int) {
private val version: Long = 1L
def this(tuple: (String, Int)) = this(tuple._1, tuple._2)
def this(partition: Partition) = this(partition.topic, partition.partitionId)
@ -33,5 +36,6 @@ case class TopicAndPartition(topic: String, partition: Int) {
def asTuple = (topic, partition)
override def toString = "[%s,%d]".format(topic, partition)
}
def toJson = Json.encode(Map("version" -> version, "topic" -> topic, "partition" -> partition))
}

View File

@ -16,8 +16,9 @@
*/
package kafka.controller
import collection._
import collection.Set
import java.util
import scala.collection._
import com.yammer.metrics.core.Gauge
import java.util.concurrent.TimeUnit
import kafka.admin.AdminUtils
@ -31,7 +32,7 @@ import kafka.utils.ZkUtils._
import kafka.utils._
import kafka.utils.CoreUtils._
import org.apache.zookeeper.Watcher.Event.KeeperState
import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient}
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
@ -169,6 +170,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
private val partitionReassignedListener = new PartitionsReassignedListener(this)
private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)
private val isrChangeNotificationListener = new IsrChangeNotificationListener(this)
newGauge(
"ActiveControllerCount",
@ -307,6 +309,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
incrementControllerEpoch(zkClient)
// before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
registerReassignedPartitionsListener()
registerIsrChangeNotificationListener()
registerPreferredReplicaElectionListener()
partitionStateMachine.registerListeners()
replicaStateMachine.registerListeners()
@ -792,8 +795,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
controllerContext.controllerChannelManager.startup()
}
private def updateLeaderAndIsrCache() {
val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, controllerContext.partitionReplicaAssignment.keySet)
def updateLeaderAndIsrCache(topicAndPartitions: Set[TopicAndPartition] = controllerContext.partitionReplicaAssignment.keySet) {
val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, topicAndPartitions)
for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo)
controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch)
}
@ -892,6 +895,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
}
private def registerIsrChangeNotificationListener() = {
debug("Registering IsrChangeNotificationListener")
ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.IsrChangeNotificationPath)
zkClient.subscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
}
private def deregisterReassignedPartitionsListener() = {
zkClient.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
}
@ -1280,6 +1289,56 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
}
}
/**
* Called when leader intimates of isr change
* @param controller
*/
class IsrChangeNotificationListener(controller: KafkaController) extends IZkChildListener with Logging {
var topicAndPartitionSet: Set[TopicAndPartition] = Set()
override def handleChildChange(parentPath: String, currentChildren: util.List[String]): Unit = {
import scala.collection.JavaConverters._
inLock(controller.controllerContext.controllerLock) {
debug("[IsrChangeNotificationListener] Fired!!!")
val childrenAsScala: mutable.Buffer[String] = currentChildren.asScala
val topicAndPartitions: immutable.Set[TopicAndPartition] = childrenAsScala.map(x => getTopicAndPartition(x)).flatten.toSet
controller.updateLeaderAndIsrCache(topicAndPartitions)
processUpdateNotifications(topicAndPartitions)
// delete processed children
childrenAsScala.map(x => ZkUtils.deletePath(controller.controllerContext.zkClient, ZkUtils.TopicConfigChangesPath + "/" + x))
}
}
private def processUpdateNotifications(topicAndPartitions: immutable.Set[TopicAndPartition]) {
val liveBrokers: Seq[Int] = controller.controllerContext.liveOrShuttingDownBrokerIds.toSeq
controller.sendUpdateMetadataRequest(liveBrokers, topicAndPartitions)
debug("Sending MetadataRequest to Brokers:" + liveBrokers + " for TopicAndPartitions:" + topicAndPartitions)
}
private def getTopicAndPartition(child: String): Option[TopicAndPartition] = {
val changeZnode: String = ZkUtils.IsrChangeNotificationPath + "/" + child
val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(controller.controllerContext.zkClient, changeZnode)
if (jsonOpt.isDefined) {
val json = Json.parseFull(jsonOpt.get)
json match {
case Some(m) =>
val topicAndPartition = m.asInstanceOf[Map[String, Any]]
val topic = topicAndPartition("topic").asInstanceOf[String]
val partition = topicAndPartition("partition").asInstanceOf[Int]
Some(TopicAndPartition(topic, partition))
case None =>
error("Invalid topic and partition JSON: " + json + " in ZK: " + changeZnode)
None
}
} else {
None
}
}
}
/**
* Starts the preferred replica leader election for the list of partitions specified under
* /admin/preferred_replica_election -

View File

@ -18,22 +18,32 @@
package kafka.utils
import kafka.api.LeaderAndIsr
import kafka.common.TopicAndPartition
import kafka.controller.LeaderIsrAndControllerEpoch
import org.apache.zookeeper.data.Stat
import org.I0Itec.zkclient.ZkClient
import org.apache.zookeeper.data.Stat
import scala.Some
import scala.collection._
object ReplicationUtils extends Logging {
val IsrChangeNotificationPrefix = "isr_change_"
def updateLeaderAndIsr(zkClient: ZkClient, topic: String, partitionId: Int, newLeaderAndIsr: LeaderAndIsr, controllerEpoch: Int,
zkVersion: Int): (Boolean,Int) = {
debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newLeaderAndIsr.isr.mkString(",")))
val path = ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId)
val newLeaderData = ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch)
// use the epoch of the controller that made the leadership decision, instead of the current controller epoch
ZkUtils.conditionalUpdatePersistentPath(zkClient, path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData))
val updatePersistentPath: (Boolean, Int) = ZkUtils.conditionalUpdatePersistentPath(zkClient, path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData))
if (updatePersistentPath._1) {
val topicAndPartition: TopicAndPartition = TopicAndPartition(topic, partitionId)
val isrChangeNotificationPath: String = ZkUtils.createSequentialPersistentPath(
zkClient, ZkUtils.IsrChangeNotificationPath + "/" + IsrChangeNotificationPrefix,
topicAndPartition.toJson)
debug("Added " + isrChangeNotificationPath + " for " + topicAndPartition)
}
updatePersistentPath
}
def checkLeaderAndIsrZkData(zkClient: ZkClient, path: String, expectedLeaderAndIsrInfo: String): (Boolean,Int) = {

View File

@ -47,6 +47,7 @@ object ZkUtils extends Logging {
val DeleteTopicsPath = "/admin/delete_topics"
val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
val BrokerSequenceIdPath = "/brokers/seqid"
val IsrChangeNotificationPath = "/isr_change_notification"
def getTopicPath(topic: String): String = {
BrokerTopicsPath + "/" + topic

View File

@ -17,28 +17,32 @@
package kafka.integration
import org.apache.kafka.common.protocol.SecurityProtocol
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import kafka.admin.AdminUtils
import java.nio.ByteBuffer
import junit.framework.Assert._
import kafka.cluster.{BrokerEndPoint, Broker}
import kafka.admin.AdminUtils
import kafka.api.{TopicMetadataResponse, TopicMetadataRequest}
import kafka.client.ClientUtils
import kafka.cluster.{Broker, BrokerEndPoint}
import kafka.common.ErrorMapping
import kafka.server.{NotRunning, KafkaConfig, KafkaServer}
import kafka.utils.TestUtils
import kafka.utils.TestUtils._
import kafka.server.{KafkaServer, KafkaConfig}
import kafka.api.TopicMetadataRequest
import kafka.common.ErrorMapping
import kafka.client.ClientUtils
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.protocol.SecurityProtocol
import org.scalatest.junit.JUnit3Suite
class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
private var server1: KafkaServer = null
var brokerEndPoints: Seq[BrokerEndPoint] = null
var adHocConfigs: Seq[KafkaConfig] = null
val numConfigs: Int = 2
override def setUp() {
super.setUp()
val props = createBrokerConfigs(1, zkConnect)
val configs = props.map(KafkaConfig.fromProps)
val props = createBrokerConfigs(numConfigs, zkConnect)
val configs: Seq[KafkaConfig] = props.map(KafkaConfig.fromProps)
adHocConfigs = configs.takeRight(configs.size - 1) // Started and stopped by individual test cases
server1 = TestUtils.createServer(configs.head)
brokerEndPoints = Seq(new Broker(server1.config.brokerId, server1.config.hostName, server1.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
}
@ -130,4 +134,62 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
assertEquals(1, partitionMetadata.head.replicas.size)
assertTrue(partitionMetadata.head.leader.isDefined)
}
private def checkIsr(servers: Seq[KafkaServer]): Unit = {
val activeBrokers: Seq[KafkaServer] = servers.filter(x => x.brokerState.currentState != NotRunning.state)
val expectedIsr: Seq[BrokerEndPoint] = activeBrokers.map(
x => new BrokerEndPoint(x.config.brokerId,
if (x.config.hostName.nonEmpty) x.config.hostName else "localhost",
x.boundPort())
)
// Assert that topic metadata at new brokers is updated correctly
activeBrokers.foreach(x => {
var metadata: TopicMetadataResponse = new TopicMetadataResponse(Seq(), Seq(), -1)
waitUntilTrue(() => {
metadata = ClientUtils.fetchTopicMetadata(
Set.empty,
Seq(new BrokerEndPoint(
x.config.brokerId,
if (x.config.hostName.nonEmpty) x.config.hostName else "localhost",
x.boundPort())),
"TopicMetadataTest-testBasicTopicMetadata",
2000, 0)
metadata.topicsMetadata.nonEmpty &&
metadata.topicsMetadata.head.partitionsMetadata.nonEmpty &&
expectedIsr == metadata.topicsMetadata.head.partitionsMetadata.head.isr
},
"Topic metadata is not correctly updated for broker " + x + ".\n" +
"Expected ISR: " + expectedIsr + "\n" +
"Actual ISR : " + (if (metadata.topicsMetadata.nonEmpty &&
metadata.topicsMetadata.head.partitionsMetadata.nonEmpty)
metadata.topicsMetadata.head.partitionsMetadata.head.isr
else
""))
})
}
def testIsrAfterBrokerShutDownAndJoinsBack {
// start adHoc brokers
val adHocServers = adHocConfigs.map(p => createServer(p))
val allServers: Seq[KafkaServer] = Seq(server1) ++ adHocServers
// create topic
val topic: String = "test"
AdminUtils.createTopic(zkClient, topic, 1, numConfigs)
// shutdown a broker
adHocServers.last.shutdown()
adHocServers.last.awaitShutdown()
// startup a broker
adHocServers.last.startup()
// check metadata is still correct and updated at all brokers
checkIsr(allServers)
// shutdown adHoc brokers
adHocServers.map(p => p.shutdown())
}
}

View File

@ -70,6 +70,8 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness {
EasyMock.expect(replicaManager.zkClient).andReturn(zkClient)
EasyMock.replay(replicaManager)
ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.IsrChangeNotificationPath)
val replicas = List(0,1)
// regular update