embedded controller; patched by Yang Ye; reviewed by Jun Rao; KAFKA-335

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1350291 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jun Rao 2012-06-14 15:17:35 +00:00
parent 7d3a782e50
commit 83c82a3ecc
10 changed files with 471 additions and 7 deletions

View File

@ -39,7 +39,7 @@ class Partition(val topic: String,
try {
leaderISRUpdateLock.lock()
if(newLeader.isDefined) {
info("Updating leader for for topic %s partition %d to replica %d".format(topic, partitionId, newLeader.get))
info("Updating leader for topic %s partition %d to replica %d".format(topic, partitionId, newLeader.get))
leaderReplicaId = newLeader
}
leaderReplicaId

View File

@ -31,7 +31,7 @@ class SimpleConsumer( val host: String,
val bufferSize: Int ) extends Logging {
private val lock = new Object()
private val blockingChannel = new BlockingChannel(host, port, bufferSize, 0, soTimeout)
private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
private def connect(): BlockingChannel = {
close

View File

@ -22,6 +22,11 @@ import java.nio.channels._
import kafka.utils.{nonthreadsafe, Logging}
import kafka.api.RequestOrResponse
object BlockingChannel{
val UseDefaultBufferSize = -1
}
/**
* A simple blocking channel with timeouts correctly enabled.
*
@ -32,7 +37,6 @@ class BlockingChannel( val host: String,
val readBufferSize: Int,
val writeBufferSize: Int,
val readTimeoutMs: Int ) extends Logging {
private var connected = false
private var channel: SocketChannel = null
private var readChannel: ReadableByteChannel = null

View File

@ -42,7 +42,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
private val lock = new Object()
@volatile private var shutdown: Boolean = false
private val blockingChannel = new BlockingChannel(config.host, config.port, 0, config.bufferSize, config.socketTimeoutMs)
private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize, config.bufferSize, config.socketTimeoutMs)
trace("Instantiating Scala Sync Producer")

View File

@ -99,6 +99,13 @@ class KafkaConfig(props: Properties) extends ZKConfig(props) {
* Following properties are relevant to Kafka replication
*/
/* the socket timeout for controller-to-broker channels */
val controllerSocketTimeoutMs = Utils.getInt(props, "controller.socket.timeout.ms", 30000)
/* the buffer size for controller-to-broker-channels */
val controllerMessageQueueSize= Utils.getInt(props, "controller.message.queue.size", 10)
/* default replication factors for automatically created topics */
val defaultReplicationFactor = Utils.getInt(props, "default.replication.factor", 1)

View File

@ -0,0 +1,288 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.common.KafkaZookeeperClient
import collection.mutable.HashMap
import collection.immutable.Set
import kafka.cluster.Broker
import kafka.api._
import java.lang.Object
import kafka.network.{Receive, BlockingChannel}
import kafka.utils.{ZkUtils, Logging}
import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, BlockingQueue}
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import java.util.concurrent.atomic.AtomicBoolean
import org.I0Itec.zkclient.{IZkStateListener, ZkClient, IZkDataListener, IZkChildListener}
import org.apache.zookeeper.Watcher.Event.KeeperState
class RequestSendThread(val brokerId: Int,
val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
val channel: BlockingChannel)
extends Thread("requestSendThread-" + brokerId) with Logging {
val isRunning: AtomicBoolean = new AtomicBoolean(true)
private val shutDownLatch = new CountDownLatch(1)
private val lock = new Object
def shutDown(): Unit = {
info("Shutting down controller request send thread to broker %d".format(brokerId))
isRunning.set(false)
interrupt()
shutDownLatch.await()
info("Controller request send thread to broker %d shutting down completed".format(brokerId))
}
override def run(): Unit = {
try{
info("In controller, thread for broker: " + brokerId + " started running")
while(isRunning.get()){
val queueItem = queue.take()
val request = queueItem._1
val callback = queueItem._2
var receive: Receive = null
lock synchronized {
channel.send(request)
receive = channel.receive()
}
var response: RequestOrResponse = null
request.requestId.get match {
case RequestKeys.LeaderAndISRRequest =>
response = LeaderAndISRResponse.readFrom(receive.buffer)
case RequestKeys.StopReplicaRequest =>
response = StopReplicaResponse.readFrom(receive.buffer)
}
if(callback != null){
callback(response)
}
}
} catch{
case e: InterruptedException => warn("Controller request send thread to broker %d is intterrupted. Shutting down".format(brokerId))
case e1 => error("Error in controller request send thread to broker %d down due to ".format(brokerId), e1)
}
shutDownLatch.countDown()
}
}
class ControllerChannelManager(allBrokers: Set[Broker], config : KafkaConfig) extends Logging{
private val brokers = new HashMap[Int, Broker]
private val messageChannels = new HashMap[Int, BlockingChannel]
private val messageQueues = new HashMap[Int, BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)]]
private val messageThreads = new HashMap[Int, RequestSendThread]
for(broker <- allBrokers){
brokers.put(broker.id, broker)
info("channel for broker " + broker.id + " created" + " at host: " + broker.host + " and port: " + broker.port)
val channel = new BlockingChannel(broker.host, broker.port,
BlockingChannel.UseDefaultBufferSize,
BlockingChannel.UseDefaultBufferSize,
config.controllerSocketTimeoutMs)
channel.connect()
messageChannels.put(broker.id, channel)
messageQueues.put(broker.id, new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize))
}
def startUp() = {
for((brokerId, broker) <- brokers){
val thread = new RequestSendThread(brokerId, messageQueues(brokerId), messageChannels(brokerId))
thread.setDaemon(false)
thread.start()
messageThreads.put(broker.id, thread)
}
}
def shutDown() = {
for((brokerId, broker) <- brokers){
removeBroker(brokerId)
}
}
def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null){
messageQueues(brokerId).put((request, callback))
}
def addBroker(broker: Broker){
brokers.put(broker.id, broker)
messageQueues.put(broker.id, new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize))
info("channel for broker " + broker.id + " created" + " at host: " + broker.host + " and port: " + broker.port)
val channel = new BlockingChannel(broker.host, broker.port,
BlockingChannel.UseDefaultBufferSize,
BlockingChannel.UseDefaultBufferSize,
config.controllerSocketTimeoutMs)
channel.connect()
messageChannels.put(broker.id, channel)
val thread = new RequestSendThread(broker.id, messageQueues(broker.id), messageChannels(broker.id))
thread.setDaemon(false)
thread.start()
messageThreads.put(broker.id, thread)
}
def removeBroker(brokerId: Int){
brokers.remove(brokerId)
messageChannels(brokerId).disconnect()
messageChannels.remove(brokerId)
messageQueues.remove(brokerId)
messageThreads(brokerId).shutDown()
messageThreads.remove(brokerId)
}
}
class KafkaController(config : KafkaConfig) extends Logging {
info("controller startup");
private val lock = new Object
private var zkClient: ZkClient = null
private var controllerChannelManager: ControllerChannelManager = null
private var allBrokers : Set[Broker] = null
private var allTopics: Set[String] = null
private def tryToBecomeController() = {
lock synchronized {
val curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)
if (curController == null){
try {
ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.ControllerPath, config.brokerId.toString())
// Only the broker successfully registering as the controller can execute following code, otherwise
// some exception will be thrown.
registerBrokerChangeListener()
registerTopicChangeListener()
allBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
allTopics = ZkUtils.getAllTopics(zkClient).toSet
controllerChannelManager = new ControllerChannelManager(allBrokers, config)
controllerChannelManager.startUp()
} catch {
case e: ZkNodeExistsException =>
registerControllerExistListener()
info("Broker " + config.brokerId + " didn't succeed registering as the controller since it's taken by someone else")
case e2 => throw e2
}
}
else info("Broker " + config.brokerId + " see not null skip " + " current controller " + curController)
}
}
def isActive(): Boolean = {
controllerChannelManager != null
}
def startup() = {
zkClient = KafkaZookeeperClient.getZookeeperClient(config)
registerSessionExpirationListener()
registerControllerExistListener()
tryToBecomeController()
}
def shutDown() = {
if(controllerChannelManager != null){
controllerChannelManager.shutDown()
}
zkClient.close()
}
def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = {
controllerChannelManager.sendRequest(brokerId, request, callback)
}
private def registerBrokerChangeListener() = {
zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, new BrokerChangeListener())
}
private def registerTopicChangeListener() = {
zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, new TopicChangeListener())
}
private def registerSessionExpirationListener() = {
zkClient.subscribeStateChanges(new SessionExpireListener())
}
private def registerControllerExistListener(){
zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerExistListener())
}
class SessionExpireListener() extends IZkStateListener {
@throws(classOf[Exception])
def handleStateChanged(state: KeeperState) {
// do nothing, since zkclient will do reconnect for us.
}
/**
* Called after the zookeeper session has expired and a new session has been created. You would have to re-create
* any ephemeral nodes here.
*
* @throws Exception
* On any error.
*/
@throws(classOf[Exception])
def handleNewSession() {
info("Controller session expires, clean up the state, current controller: " + config.brokerId)
controllerChannelManager.shutDown()
controllerChannelManager = null
info("Controller session expires, the channel manager shut downr: " + config.brokerId)
tryToBecomeController()
}
}
class BrokerChangeListener() extends IZkChildListener with Logging {
def handleChildChange(parentPath : String, javaCurChildren : java.util.List[String]) {
import scala.collection.JavaConversions._
lock synchronized {
info("Broker change listener at controller triggerred")
val allBrokerIds = allBrokers.map(_.id)
val curChildrenSeq: Seq[String] = javaCurChildren
val curBrokerIdsSeq = curChildrenSeq.map(_.toInt)
val curBrokerIds = curBrokerIdsSeq.toSet
val addedBrokerIds = curBrokerIds -- allBrokerIds
val addedBrokersSeq = ZkUtils.getBrokerInfoFromIds(zkClient, addedBrokerIds.toSeq)
info("Added brokers: " + addedBrokerIds.toString())
val deletedBrokerIds = allBrokerIds -- curBrokerIds
info("Deleted brokers: " + deletedBrokerIds.toString())
allBrokers = ZkUtils.getBrokerInfoFromIds(zkClient, curBrokerIdsSeq).toSet
for(broker <- addedBrokersSeq){
controllerChannelManager.addBroker(broker)
}
for (brokerId <- deletedBrokerIds){
controllerChannelManager.removeBroker(brokerId)
}
/** TODO: add other broker change handler logic**/
}
}
}
class TopicChangeListener extends IZkChildListener with Logging {
@throws(classOf[Exception])
def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
// TODO: Incomplete, do not need to review this time
}
}
class ControllerExistListener extends IZkDataListener with Logging {
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
// do nothing, since No logic is needed here
}
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
info("Controller fail over, broker " + config.brokerId + " try to become controller")
tryToBecomeController()
}
}
}

View File

@ -42,13 +42,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
var kafkaZookeeper: KafkaZooKeeper = null
private var replicaManager: ReplicaManager = null
private var apis: KafkaApis = null
var kafkaController: KafkaController = new KafkaController(config)
/**
* Start up API for bringing up a single instance of the Kafka server.
* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
*/
def startup() {
info("Starting Kafka server...")
info("Starting Kafka server..." + config.brokerId)
isShuttingDown = new AtomicBoolean(false)
shutdownLatch = new CountDownLatch(1)
var needRecovery = true
@ -89,6 +90,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
// starting relevant replicas and leader election for partitions assigned to this broker
kafkaZookeeper.startup
kafkaController.startup()
info("Server started.")
}
@ -110,13 +113,16 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
Utils.unregisterMBean(statsMBeanName)
if(logManager != null)
logManager.close()
if(kafkaController != null)
kafkaController.shutDown()
kafkaZookeeper.close
val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
debug("Creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
cleanShutDownFile.createNewFile
shutdownLatch.countDown()
info("Kafka server shut down completed")
info("Kafka server with id %d shut down completed".format(config.brokerId))
}
}

View File

@ -32,6 +32,7 @@ object ZkUtils extends Logging {
val BrokerIdsPath = "/brokers/ids"
val BrokerTopicsPath = "/brokers/topics"
val BrokerStatePath = "/brokers/state"
val ControllerPath = "/controller"
def getTopicPath(topic: String): String ={
BrokerTopicsPath + "/" + topic
@ -41,10 +42,19 @@ object ZkUtils extends Logging {
getTopicPath(topic) + "/partitions"
}
def getController(zkClient: ZkClient): Int= {
val controller = readDataMaybeNull(zkClient, ControllerPath)
controller.toInt
}
def getTopicPartitionPath(topic: String, partitionId: String): String ={
getTopicPartitionsPath(topic) + "/" + partitionId
}
def getTopicPartitionLeaderAndISR(topic: String, partitionId: String): String ={
getTopicPartitionPath(topic, partitionId) + "/" + "leaderAndISR"
}
def getTopicVersion(zkClient: ZkClient, topic: String): String ={
readDataMaybeNull(zkClient, getTopicPath(topic))
}

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.controller
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.TestUtils._
import junit.framework.Assert._
import kafka.server.{KafkaServer, KafkaConfig}
import kafka.api._
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicInteger
import kafka.utils.{ControllerTestUtils, ZkUtils, TestUtils}
class ControllerBasicTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = createBrokerConfigs(4)
val configs = props.map(p => new KafkaConfig(p))
var brokers: Seq[KafkaServer] = null
override def setUp() {
super.setUp()
brokers = configs.map(config => TestUtils.createServer(config))
}
override def tearDown() {
super.tearDown()
brokers.foreach(_.shutdown())
}
def testControllerFailOver(){
brokers(0).shutdown()
brokers(1).shutdown()
brokers(3).shutdown()
Thread.sleep(1000)
var curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)
assertEquals(curController, "2")
brokers(1).startup()
brokers(2).shutdown()
Thread.sleep(1000)
curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)
assertEquals(curController, "1")
}
def testControllerCommandSend(){
Thread.sleep(1000)
for(broker <- brokers){
if(broker.kafkaController.isActive){
val leaderAndISRRequest = ControllerTestUtils.createSampleLeaderAndISRRequest()
val stopReplicaRequest = ControllerTestUtils.createSampleStopReplicaRequest()
val successCount: AtomicInteger = new AtomicInteger(0)
val countDownLatch: CountDownLatch = new CountDownLatch(8)
def compareLeaderAndISRResponseWithExpectedOne(response: RequestOrResponse){
val expectedResponse = ControllerTestUtils.createSampleLeaderAndISRResponse()
if(response.equals(expectedResponse))
successCount.addAndGet(1)
countDownLatch.countDown()
}
def compareStopReplicaResponseWithExpectedOne(response: RequestOrResponse){
val expectedResponse = ControllerTestUtils.createSampleStopReplicaResponse()
if(response.equals(expectedResponse))
successCount.addAndGet(1)
countDownLatch.countDown()
}
broker.kafkaController.sendRequest(0, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
broker.kafkaController.sendRequest(1, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
broker.kafkaController.sendRequest(2, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
broker.kafkaController.sendRequest(3, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
broker.kafkaController.sendRequest(0, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
broker.kafkaController.sendRequest(1, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
broker.kafkaController.sendRequest(2, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
broker.kafkaController.sendRequest(3, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
countDownLatch.await()
assertEquals(successCount.get(), 8)
}
}
}
}

View File

@ -31,10 +31,13 @@ import org.I0Itec.zkclient.ZkClient
import kafka.cluster.Broker
import collection.mutable.ListBuffer
import kafka.consumer.ConsumerConfig
import kafka.api.{TopicData, PartitionData}
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit
import kafka.serializer.{DefaultEncoder, Encoder}
import kafka.common.ErrorMapping
import kafka.api._
import collection.mutable.{Map, Set}
/**
* Utility functions to help with testing
@ -396,6 +399,52 @@ object TestUtils extends Logging {
}
object ControllerTestUtils{
def createSampleLeaderAndISRRequest() : LeaderAndISRRequest = {
val topic1 = "test1"
val topic2 = "test2"
val leader1 = 1;
val ISR1 = List(1, 2, 3)
val leader2 = 2;
val ISR2 = List(2, 3, 4)
val leaderAndISR1 = new LeaderAndISR(leader1, 1, ISR1, 1)
val leaderAndISR2 = new LeaderAndISR(leader2, 1, ISR2, 2)
val map = Map(((topic1, 1), leaderAndISR1), ((topic1, 2), leaderAndISR1),
((topic2, 1), leaderAndISR2), ((topic2, 2), leaderAndISR2))
new LeaderAndISRRequest(1, "client 1", 1, 4, map)
}
def createSampleLeaderAndISRResponse() : LeaderAndISRResponse = {
val topic1 = "test1"
val topic2 = "test2"
val responseMap = Map(((topic1, 1), ErrorMapping.NoError), ((topic1, 2), ErrorMapping.NoError),
((topic2, 1), ErrorMapping.NoError), ((topic2, 2), ErrorMapping.NoError))
new LeaderAndISRResponse(1, responseMap)
}
def createSampleStopReplicaRequest() : StopReplicaRequest = {
val topic1 = "test1"
val topic2 = "test2"
new StopReplicaRequest(1, "client 1", 1000, Set((topic1, 1), (topic1, 2),
(topic2, 1), (topic2, 2)))
}
def createSampleStopReplicaResponse() : StopReplicaResponse = {
val topic1 = "test1"
val topic2 = "test2"
val responseMap = Map(((topic1, 1), ErrorMapping.NoError), ((topic1, 2), ErrorMapping.NoError),
((topic2, 1), ErrorMapping.NoError), ((topic2, 2), ErrorMapping.NoError))
new StopReplicaResponse(1, responseMap)
}
}
object TestZKUtils {
val zookeeperConnect = "127.0.0.1:2182"
}