diff --git a/core/src/main/scala/kafka/common/TopicAndPartition.scala b/core/src/main/scala/kafka/common/TopicAndPartition.scala index df3db912f5d..13a3f2820bb 100644 --- a/core/src/main/scala/kafka/common/TopicAndPartition.scala +++ b/core/src/main/scala/kafka/common/TopicAndPartition.scala @@ -1,6 +1,7 @@ package kafka.common import kafka.cluster.{Replica, Partition} +import kafka.utils.Json /** * Licensed to the Apache Software Foundation (ASF) under one or more @@ -24,6 +25,8 @@ import kafka.cluster.{Replica, Partition} */ case class TopicAndPartition(topic: String, partition: Int) { + private val version: Long = 1L + def this(tuple: (String, Int)) = this(tuple._1, tuple._2) def this(partition: Partition) = this(partition.topic, partition.partitionId) @@ -33,5 +36,6 @@ case class TopicAndPartition(topic: String, partition: Int) { def asTuple = (topic, partition) override def toString = "[%s,%d]".format(topic, partition) -} + def toJson = Json.encode(Map("version" -> version, "topic" -> topic, "partition" -> partition)) +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 36350579b16..09630d07afc 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -16,8 +16,9 @@ */ package kafka.controller -import collection._ -import collection.Set +import java.util + +import scala.collection._ import com.yammer.metrics.core.Gauge import java.util.concurrent.TimeUnit import kafka.admin.AdminUtils @@ -31,7 +32,7 @@ import kafka.utils.ZkUtils._ import kafka.utils._ import kafka.utils.CoreUtils._ import org.apache.zookeeper.Watcher.Event.KeeperState -import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} +import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient} import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.locks.ReentrantLock @@ -169,6 +170,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt private val partitionReassignedListener = new PartitionsReassignedListener(this) private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this) + private val isrChangeNotificationListener = new IsrChangeNotificationListener(this) newGauge( "ActiveControllerCount", @@ -307,6 +309,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt incrementControllerEpoch(zkClient) // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks registerReassignedPartitionsListener() + registerIsrChangeNotificationListener() registerPreferredReplicaElectionListener() partitionStateMachine.registerListeners() replicaStateMachine.registerListeners() @@ -792,8 +795,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt controllerContext.controllerChannelManager.startup() } - private def updateLeaderAndIsrCache() { - val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, controllerContext.partitionReplicaAssignment.keySet) + def updateLeaderAndIsrCache(topicAndPartitions: Set[TopicAndPartition] = controllerContext.partitionReplicaAssignment.keySet) { + val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, topicAndPartitions) for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo) controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch) } @@ -892,6 +895,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener) } + private def registerIsrChangeNotificationListener() = { + debug("Registering IsrChangeNotificationListener") + ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.IsrChangeNotificationPath) + zkClient.subscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener) + } + private def deregisterReassignedPartitionsListener() = { zkClient.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener) } @@ -1280,6 +1289,56 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: } } +/** + * Called when leader intimates of isr change + * @param controller + */ +class IsrChangeNotificationListener(controller: KafkaController) extends IZkChildListener with Logging { + var topicAndPartitionSet: Set[TopicAndPartition] = Set() + + override def handleChildChange(parentPath: String, currentChildren: util.List[String]): Unit = { + import scala.collection.JavaConverters._ + + inLock(controller.controllerContext.controllerLock) { + debug("[IsrChangeNotificationListener] Fired!!!") + val childrenAsScala: mutable.Buffer[String] = currentChildren.asScala + val topicAndPartitions: immutable.Set[TopicAndPartition] = childrenAsScala.map(x => getTopicAndPartition(x)).flatten.toSet + controller.updateLeaderAndIsrCache(topicAndPartitions) + processUpdateNotifications(topicAndPartitions) + + // delete processed children + childrenAsScala.map(x => ZkUtils.deletePath(controller.controllerContext.zkClient, ZkUtils.TopicConfigChangesPath + "/" + x)) + } + } + + private def processUpdateNotifications(topicAndPartitions: immutable.Set[TopicAndPartition]) { + val liveBrokers: Seq[Int] = controller.controllerContext.liveOrShuttingDownBrokerIds.toSeq + controller.sendUpdateMetadataRequest(liveBrokers, topicAndPartitions) + debug("Sending MetadataRequest to Brokers:" + liveBrokers + " for TopicAndPartitions:" + topicAndPartitions) + } + + private def getTopicAndPartition(child: String): Option[TopicAndPartition] = { + val changeZnode: String = ZkUtils.IsrChangeNotificationPath + "/" + child + val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(controller.controllerContext.zkClient, changeZnode) + if (jsonOpt.isDefined) { + val json = Json.parseFull(jsonOpt.get) + + json match { + case Some(m) => + val topicAndPartition = m.asInstanceOf[Map[String, Any]] + val topic = topicAndPartition("topic").asInstanceOf[String] + val partition = topicAndPartition("partition").asInstanceOf[Int] + Some(TopicAndPartition(topic, partition)) + case None => + error("Invalid topic and partition JSON: " + json + " in ZK: " + changeZnode) + None + } + } else { + None + } + } +} + /** * Starts the preferred replica leader election for the list of partitions specified under * /admin/preferred_replica_election - diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala index 60687332b4c..783ba1026d0 100644 --- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala +++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala @@ -18,22 +18,32 @@ package kafka.utils import kafka.api.LeaderAndIsr +import kafka.common.TopicAndPartition import kafka.controller.LeaderIsrAndControllerEpoch -import org.apache.zookeeper.data.Stat import org.I0Itec.zkclient.ZkClient +import org.apache.zookeeper.data.Stat -import scala.Some import scala.collection._ object ReplicationUtils extends Logging { + val IsrChangeNotificationPrefix = "isr_change_" + def updateLeaderAndIsr(zkClient: ZkClient, topic: String, partitionId: Int, newLeaderAndIsr: LeaderAndIsr, controllerEpoch: Int, zkVersion: Int): (Boolean,Int) = { debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newLeaderAndIsr.isr.mkString(","))) val path = ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId) val newLeaderData = ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch - ZkUtils.conditionalUpdatePersistentPath(zkClient, path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData)) + val updatePersistentPath: (Boolean, Int) = ZkUtils.conditionalUpdatePersistentPath(zkClient, path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData)) + if (updatePersistentPath._1) { + val topicAndPartition: TopicAndPartition = TopicAndPartition(topic, partitionId) + val isrChangeNotificationPath: String = ZkUtils.createSequentialPersistentPath( + zkClient, ZkUtils.IsrChangeNotificationPath + "/" + IsrChangeNotificationPrefix, + topicAndPartition.toJson) + debug("Added " + isrChangeNotificationPath + " for " + topicAndPartition) + } + updatePersistentPath } def checkLeaderAndIsrZkData(zkClient: ZkClient, path: String, expectedLeaderAndIsrInfo: String): (Boolean,Int) = { diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 78475e3d5ec..166814c2959 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -47,6 +47,7 @@ object ZkUtils extends Logging { val DeleteTopicsPath = "/admin/delete_topics" val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election" val BrokerSequenceIdPath = "/brokers/seqid" + val IsrChangeNotificationPath = "/isr_change_notification" def getTopicPath(topic: String): String = { BrokerTopicsPath + "/" + topic diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 995b0590149..a95ee5e0849 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -17,28 +17,32 @@ package kafka.integration -import org.apache.kafka.common.protocol.SecurityProtocol -import org.scalatest.junit.JUnit3Suite -import kafka.zk.ZooKeeperTestHarness -import kafka.admin.AdminUtils import java.nio.ByteBuffer + import junit.framework.Assert._ -import kafka.cluster.{BrokerEndPoint, Broker} +import kafka.admin.AdminUtils +import kafka.api.{TopicMetadataResponse, TopicMetadataRequest} +import kafka.client.ClientUtils +import kafka.cluster.{Broker, BrokerEndPoint} +import kafka.common.ErrorMapping +import kafka.server.{NotRunning, KafkaConfig, KafkaServer} import kafka.utils.TestUtils import kafka.utils.TestUtils._ -import kafka.server.{KafkaServer, KafkaConfig} -import kafka.api.TopicMetadataRequest -import kafka.common.ErrorMapping -import kafka.client.ClientUtils +import kafka.zk.ZooKeeperTestHarness +import org.apache.kafka.common.protocol.SecurityProtocol +import org.scalatest.junit.JUnit3Suite class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { private var server1: KafkaServer = null var brokerEndPoints: Seq[BrokerEndPoint] = null + var adHocConfigs: Seq[KafkaConfig] = null + val numConfigs: Int = 2 override def setUp() { super.setUp() - val props = createBrokerConfigs(1, zkConnect) - val configs = props.map(KafkaConfig.fromProps) + val props = createBrokerConfigs(numConfigs, zkConnect) + val configs: Seq[KafkaConfig] = props.map(KafkaConfig.fromProps) + adHocConfigs = configs.takeRight(configs.size - 1) // Started and stopped by individual test cases server1 = TestUtils.createServer(configs.head) brokerEndPoints = Seq(new Broker(server1.config.brokerId, server1.config.hostName, server1.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT)) } @@ -130,4 +134,62 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(1, partitionMetadata.head.replicas.size) assertTrue(partitionMetadata.head.leader.isDefined) } + + private def checkIsr(servers: Seq[KafkaServer]): Unit = { + val activeBrokers: Seq[KafkaServer] = servers.filter(x => x.brokerState.currentState != NotRunning.state) + val expectedIsr: Seq[BrokerEndPoint] = activeBrokers.map( + x => new BrokerEndPoint(x.config.brokerId, + if (x.config.hostName.nonEmpty) x.config.hostName else "localhost", + x.boundPort()) + ) + + // Assert that topic metadata at new brokers is updated correctly + activeBrokers.foreach(x => { + var metadata: TopicMetadataResponse = new TopicMetadataResponse(Seq(), Seq(), -1) + waitUntilTrue(() => { + metadata = ClientUtils.fetchTopicMetadata( + Set.empty, + Seq(new BrokerEndPoint( + x.config.brokerId, + if (x.config.hostName.nonEmpty) x.config.hostName else "localhost", + x.boundPort())), + "TopicMetadataTest-testBasicTopicMetadata", + 2000, 0) + metadata.topicsMetadata.nonEmpty && + metadata.topicsMetadata.head.partitionsMetadata.nonEmpty && + expectedIsr == metadata.topicsMetadata.head.partitionsMetadata.head.isr + }, + "Topic metadata is not correctly updated for broker " + x + ".\n" + + "Expected ISR: " + expectedIsr + "\n" + + "Actual ISR : " + (if (metadata.topicsMetadata.nonEmpty && + metadata.topicsMetadata.head.partitionsMetadata.nonEmpty) + metadata.topicsMetadata.head.partitionsMetadata.head.isr + else + "")) + }) + } + + + def testIsrAfterBrokerShutDownAndJoinsBack { + // start adHoc brokers + val adHocServers = adHocConfigs.map(p => createServer(p)) + val allServers: Seq[KafkaServer] = Seq(server1) ++ adHocServers + + // create topic + val topic: String = "test" + AdminUtils.createTopic(zkClient, topic, 1, numConfigs) + + // shutdown a broker + adHocServers.last.shutdown() + adHocServers.last.awaitShutdown() + + // startup a broker + adHocServers.last.startup() + + // check metadata is still correct and updated at all brokers + checkIsr(allServers) + + // shutdown adHoc brokers + adHocServers.map(p => p.shutdown()) + } } diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala index c96c0ffd958..b9de8d677a4 100644 --- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala @@ -70,6 +70,8 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness { EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) EasyMock.replay(replicaManager) + ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.IsrChangeNotificationPath) + val replicas = List(0,1) // regular update