KAFKA-300 Leader election; patched by nehanarkhede; reviewed by junrao

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1303473 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Neha Narkhede 2012-03-21 17:29:32 +00:00
parent 4dd7f95569
commit 854fb29c26
31 changed files with 711 additions and 201 deletions

View File

@ -80,10 +80,6 @@ object AdminUtils extends Logging {
for (i <- 0 until replicaAssignmentList.size) {
val zkPath = ZkUtils.getTopicPartitionReplicasPath(topic, i.toString)
ZkUtils.updatePersistentPath(zkClient, zkPath, Utils.seqToCSV(replicaAssignmentList(i)))
// TODO: Remove this with leader election patch
// assign leader for the partition i
// ZkUtils.updateEphemeralPath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, i.toString),
// replicaAssignmentList(i).head)
debug("Updated path %s with %s for replica assignment".format(zkPath, Utils.seqToCSV(replicaAssignmentList(i))))
}
}

View File

@ -16,14 +16,12 @@
*/
package kafka.cluster
case class Partition(brokerId: Int, partId: Int, topic: String = "") extends Ordered[Partition] {
def name = partId
def compare(that: Partition) =
if (this.topic == that.topic)
this.partId - that.partId
else
this.topic.compareTo(that.topic)
}
/**
* Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
* TODO: Commit queue to be added as part of KAFKA-46. Add AR, ISR, CUR, RAR state maintenance as part of KAFKA-302
*/
case class Partition(topic: String, val partId: Int, var leader: Option[Replica] = None,
assignedReplicas: Set[Replica] = Set.empty[Replica],
inSyncReplicas: Set[Replica] = Set.empty[Replica],
catchUpReplicas: Set[Replica] = Set.empty[Replica],
reassignedReplicas: Set[Replica] = Set.empty[Replica])

View File

@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.cluster
import kafka.log.Log
case class Replica(brokerId: Int, partition: Partition, topic: String,
var log: Option[Log] = None, var hw: Long = -1, var leo: Long = -1, isLocal: Boolean = false)

View File

@ -20,7 +20,7 @@ package kafka.consumer
import java.io.IOException
import java.util.concurrent.CountDownLatch
import kafka.api.{FetchRequestBuilder, OffsetRequest}
import kafka.cluster.{Partition, Broker}
import kafka.cluster.Broker
import kafka.common.ErrorMapping
import kafka.message.ByteBufferMessageSet
import kafka.utils._
@ -48,7 +48,7 @@ class FetcherRunnable(val name: String,
override def run() {
for (infopti <- partitionTopicInfos)
info(name + " start fetching topic: " + infopti.topic + " part: " + infopti.partition.partId + " offset: "
info(name + " start fetching topic: " + infopti.topic + " part: " + infopti.partitionId + " offset: "
+ infopti.getFetchOffset + " from " + broker.host + ":" + broker.port)
var reqId = 0
@ -61,7 +61,7 @@ class FetcherRunnable(val name: String,
maxWait(0).
minBytes(0)
partitionTopicInfos.foreach(pti =>
builder.addFetch(pti.topic, pti.partition.partId, pti.getFetchOffset(), config.fetchSize)
builder.addFetch(pti.topic, pti.partitionId, pti.getFetchOffset(), config.fetchSize)
)
val fetchRequest = builder.build()
@ -70,13 +70,13 @@ class FetcherRunnable(val name: String,
var read = 0L
for(infopti <- partitionTopicInfos) {
val messages = response.messageSet(infopti.topic, infopti.partition.partId).asInstanceOf[ByteBufferMessageSet]
val messages = response.messageSet(infopti.topic, infopti.partitionId).asInstanceOf[ByteBufferMessageSet]
try {
var done = false
if(messages.getErrorCode == ErrorMapping.OffsetOutOfRangeCode) {
info("offset for " + infopti + " out of range")
// see if we can fix this error
val resetOffset = resetConsumerOffsets(infopti.topic, infopti.partition)
val resetOffset = resetConsumerOffsets(infopti.topic, infopti.partitionId)
if(resetOffset >= 0) {
infopti.resetFetchOffset(resetOffset)
infopti.resetConsumeOffset(resetOffset)
@ -126,7 +126,7 @@ class FetcherRunnable(val name: String,
private def shutdownComplete() = shutdownLatch.countDown
private def resetConsumerOffsets(topic : String,
partition: Partition) : Long = {
partitionId: Int) : Long = {
var offset : Long = 0
config.autoOffsetReset match {
case OffsetRequest.SmallestTimeString => offset = OffsetRequest.EarliestTime
@ -135,13 +135,13 @@ class FetcherRunnable(val name: String,
}
// get mentioned offset from the broker
val offsets = simpleConsumer.getOffsetsBefore(topic, partition.partId, offset, 1)
val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, offset, 1)
val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
// reset manually in zookeeper
info("updating partition " + partition.name + " for topic " + topic + " with " +
info("updating partition " + partitionId + " for topic " + topic + " with " +
(if(offset == OffsetRequest.EarliestTime) "earliest " else " latest ") + "offset " + offsets(0))
ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition.name, offsets(0).toString)
ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partitionId, offsets(0).toString)
offsets(0)
}

View File

@ -20,13 +20,12 @@ package kafka.consumer
import java.util.concurrent._
import java.util.concurrent.atomic._
import kafka.message._
import kafka.cluster._
import kafka.utils.Logging
import kafka.common.ErrorMapping
private[consumer] class PartitionTopicInfo(val topic: String,
val brokerId: Int,
val partition: Partition,
val partitionId: Int,
private val chunkQueue: BlockingQueue[FetchedDataChunk],
private val consumedOffset: AtomicLong,
private val fetchedOffset: AtomicLong,
@ -74,6 +73,6 @@ private[consumer] class PartitionTopicInfo(val topic: String,
chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
}
override def toString(): String = topic + ":" + partition.toString + ": fetched offset = " + fetchedOffset.get +
override def toString(): String = topic + ":" + partitionId.toString + ": fetched offset = " + fetchedOffset.get +
": consumed offset = " + consumedOffset.get
}

View File

@ -91,7 +91,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private val rebalanceLock = new Object
private var fetcher: Option[Fetcher] = None
private var zkClient: ZkClient = null
private val topicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]]
private val topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
// queues : (topic,consumerThreadId) -> queue
private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]]
private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false)
@ -202,7 +202,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
// this API is used by unit tests only
def getTopicRegistry: Pool[String, Pool[Partition, PartitionTopicInfo]] = topicRegistry
def getTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]] = topicRegistry
private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
info("begin registering consumer " + consumerIdString + " in ZK")
@ -241,7 +241,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
for (info <- infos.values) {
val newOffset = info.getConsumeOffset
try {
updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partition.name,
updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partitionId,
newOffset.toString)
} catch {
case t: Throwable =>
@ -261,7 +261,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
for(partition <- infos.values) {
builder.append("\n {")
builder.append{partition.partition.name}
builder.append{partition}
builder.append(",fetch offset:" + partition.getFetchOffset)
builder.append(",consumer offset:" + partition.getConsumeOffset)
builder.append("}")
@ -278,10 +278,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
getLatestOffset(topic, brokerId, partitionId) - getConsumedOffset(topic, brokerId, partitionId)
def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long = {
val partition = new Partition(brokerId, partitionId)
val partitionInfos = topicRegistry.get(topic)
if (partitionInfos != null) {
val partitionInfo = partitionInfos.get(partition)
val partitionInfo = partitionInfos.get(partitionId)
if (partitionInfo != null)
return partitionInfo.getConsumeOffset
}
@ -289,7 +288,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
//otherwise, try to get it from zookeeper
try {
val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
val znode = topicDirs.consumerOffsetDir + "/" + partition.name
val znode = topicDirs.consumerOffsetDir + "/" + partitionId
val offsetString = readDataMaybeNull(zkClient, znode)
if (offsetString != null)
return offsetString.toLong
@ -383,7 +382,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
info("Releasing partition ownership")
for ((topic, infos) <- topicRegistry) {
for(partition <- infos.keys) {
val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition.partId.toString)
val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition.toString)
deletePath(zkClient, partitionOwnerPath)
debug("Consumer " + consumerIdString + " releasing " + partitionOwnerPath)
}
@ -475,7 +474,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
var partitionOwnershipDecision = new collection.mutable.HashMap[(String, String), String]()
for ((topic, consumerThreadIdSet) <- relevantTopicThreadIdsMap) {
topicRegistry.remove(topic)
topicRegistry.put(topic, new Pool[Partition, PartitionTopicInfo])
topicRegistry.put(topic, new Pool[Int, PartitionTopicInfo])
val topicDirs = new ZKGroupTopicDirs(group, topic)
val curConsumers = consumersPerTopicMap.get(topic).get
@ -566,7 +565,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
for (partition <- partitionInfos.values)
allPartitionInfos ::= partition
info("Consumer " + consumerIdString + " selected partitions : " +
allPartitionInfos.sortWith((s,t) => s.partition < t.partition).map(_.toString).mkString(","))
allPartitionInfos.sortWith((s,t) => s.partitionId < t.partitionId).map(_.toString).mkString(","))
fetcher match {
case Some(f) =>
@ -648,18 +647,17 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
else
offset = offsetString.toLong
val partitionObject = new Partition(leader, partition.toInt, topic)
val queue = queues.get((topic, consumerThreadId))
val consumedOffset = new AtomicLong(offset)
val fetchedOffset = new AtomicLong(offset)
val partTopicInfo = new PartitionTopicInfo(topic,
leader,
partitionObject,
partition.toInt,
queue,
consumedOffset,
fetchedOffset,
new AtomicInteger(config.fetchSize))
partTopicInfoMap.put(partitionObject, partTopicInfo)
partTopicInfoMap.put(partition.toInt, partTopicInfo)
debug(partTopicInfo + " selected new offset " + offset)
}
}

View File

@ -26,7 +26,6 @@ class ProducerRequest(val correlationId: Int,
val ackTimeout: Int,
val data: Array[TopicData]) extends Request(RequestKeys.Produce) {
import Implicits._
val underlying = new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeout, data)
def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) }

View File

@ -26,7 +26,7 @@ import kafka.common._
import kafka.api.OffsetRequest
import java.util._
private[log] object Log {
private[kafka] object Log {
val FileSuffix = ".kafka"
/**
@ -100,7 +100,7 @@ private[log] class LogSegment(val file: File, val messageSet: FileMessageSet, va
* An append-only log for storing messages.
*/
@threadsafe
private[log] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean) extends Logging {
private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean) extends Logging {
/* A lock that guards all modifications to the log */
private val lock = new Object

View File

@ -25,6 +25,7 @@ import kafka.server.{KafkaConfig, KafkaZooKeeper}
import kafka.common.{InvalidTopicException, InvalidPartitionException}
import kafka.api.OffsetRequest
import org.I0Itec.zkclient.ZkClient
import kafka.cluster.{Partition, Replica}
/**
* The guy who creates and hands out logs
@ -51,6 +52,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap)
private val logRetentionSize = config.logRetentionSize
private val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false)
private var replicas: Map[(String, Int), Replica] = new mutable.HashMap[(String, Int), Replica]()
/* Initialize a log for each subdirectory of the main log directory */
private val logs = new Pool[String, Pool[Int, Log]]()
@ -121,6 +123,8 @@ private[kafka] class LogManager(val config: KafkaConfig,
}
}
def getReplicaForPartition(topic: String, partition: Int): Option[Replica] = replicas.get((topic, partition))
/**
* Return the Pool (partitions) for a specific log
*/
@ -145,17 +149,23 @@ private[kafka] class LogManager(val config: KafkaConfig,
def getOffsets(offsetRequest: OffsetRequest): Array[Long] = {
val log = getLog(offsetRequest.topic, offsetRequest.partition)
if (log != null) return log.getOffsetsBefore(offsetRequest)
Log.getEmptyOffsets(offsetRequest)
log match {
case Some(l) => l.getOffsetsBefore(offsetRequest)
case None => Log.getEmptyOffsets(offsetRequest)
}
}
/**
* Get the log if exists
*/
def getLog(topic: String, partition: Int): Log = {
def getLog(topic: String, partition: Int): Option[Log] = {
val parts = getLogPool(topic, partition)
if (parts == null) return null
parts.get(partition)
if (parts == null) None
else {
val log = parts.get(partition)
if(log == null) None
else Some(log)
}
}
/**
@ -188,9 +198,40 @@ private[kafka] class LogManager(val config: KafkaConfig,
info("Created log for '" + topic + "'-" + partition)
}
// add this log to the list of replicas hosted on this broker
addReplicaForPartition(topic, partition)
log
}
def addReplicaForPartition(topic: String, partitionId: Int): Replica = {
val replica = replicas.get((topic, partitionId))
val log = getLog(topic, partitionId)
replica match {
case Some(r) =>
r.log match {
case None =>
val log = getLog(topic, partitionId)
r.log = log
case Some(l) => // nothing to do since log already exists
}
case None =>
val partition = new Partition(topic, partitionId)
log match {
case Some(l) =>
val replica = new Replica(config.brokerId, partition, topic, log, l.getHighwaterMark, l.maxSize, true)
replicas += (topic, partitionId) -> replica
info("Added replica for topic %s partition %s on broker %d"
.format(replica.topic, replica.partition.partId, replica.brokerId))
case None =>
val replica = new Replica(config.brokerId, partition, topic, None, -1, -1, false)
replicas += (topic, partitionId) -> replica
info("Added replica for topic %s partition %s on broker %d"
.format(replica.topic, replica.partition.partId, replica.brokerId))
}
}
replicas.get((topic, partitionId)).get
}
/* Attemps to delete all provided segments from a log and returns how many it was able to */
private def deleteSegments(log: Log, segments: Seq[LogSegment]): Int = {
var total = 0

View File

@ -16,12 +16,11 @@
*/
package kafka.producer
import kafka.cluster.{Broker, Partition}
import collection.mutable.HashMap
import kafka.api.{TopicMetadataRequest, TopicMetadata}
import java.lang.IllegalStateException
import kafka.common.NoLeaderForPartitionException
import kafka.utils.Logging
import kafka.cluster.{Replica, Partition}
class BrokerPartitionInfo(producerPool: ProducerPool) extends Logging {
val topicPartitionInfo = new HashMap[String, TopicMetadata]()
@ -33,7 +32,8 @@ class BrokerPartitionInfo(producerPool: ProducerPool) extends Logging {
* @return a sequence of (brokerId, numPartitions). Returns a zero-length
* sequence if no brokers are available.
*/
def getBrokerPartitionInfo(topic: String): Seq[(Partition, Broker)] = {
def getBrokerPartitionInfo(topic: String): Seq[Partition] = {
debug("Getting broker partition info for topic %s".format(topic))
// check if the cache has metadata for this topic
val topicMetadata = topicPartitionInfo.get(topic)
val metadata: TopicMetadata =
@ -51,11 +51,18 @@ class BrokerPartitionInfo(producerPool: ProducerPool) extends Logging {
}
val partitionMetadata = metadata.partitionsMetadata
partitionMetadata.map { m =>
val partition = new Partition(topic, m.partitionId)
m.leader match {
case Some(leader) => (new Partition(leader.id, m.partitionId, topic) -> leader)
case None => throw new NoLeaderForPartitionException("No leader for topic %s, partition %d".format(topic, m.partitionId))
case Some(leader) =>
val leaderReplica = new Replica(leader.id, partition, topic)
partition.leader = Some(leaderReplica)
debug("Topic %s partition %d has leader %d".format(topic, m.partitionId, leader.id))
partition
case None =>
debug("Topic %s partition %d does not have a leader yet".format(topic, m.partitionId))
partition
}
}.sortWith((s, t) => s._1.partId < t._1.partId)
}.sortWith((s, t) => s.partId < t.partId)
}
/**
@ -78,8 +85,8 @@ class BrokerPartitionInfo(producerPool: ProducerPool) extends Logging {
// refresh cache for all topics
val topics = topicPartitionInfo.keySet.toList
val topicMetadata = producer.send(new TopicMetadataRequest(topics))
info("Fetched metadata for topics %s".format(topicMetadata.mkString(",")))
topicMetadata.foreach(metadata => topicPartitionInfo += (metadata.topic -> metadata))
}
}
}

View File

@ -18,14 +18,14 @@
package kafka.producer.async
import kafka.api.{ProducerRequest, TopicData, PartitionData}
import kafka.common.{FailedToSendMessageException, InvalidPartitionException, NoBrokersForPartitionException}
import kafka.cluster.{Partition, Broker}
import kafka.cluster.Partition
import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet}
import kafka.producer._
import kafka.serializer.Encoder
import kafka.utils.{Utils, Logging}
import scala.collection.Map
import scala.collection.mutable.{ListBuffer, HashMap}
import kafka.common.{NoLeaderForPartitionException, InvalidPartitionException, NoBrokersForPartitionException}
import kafka.utils.{Utils, Logging}
class DefaultEventHandler[K,V](config: ProducerConfig, // this api is for testing
private val partitioner: Partitioner[K], // use the other constructor
@ -43,41 +43,37 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
def handle(events: Seq[ProducerData[K,V]]) {
lock synchronized {
val serializedData = serialize(events)
handleSerializedData(serializedData, config.producerRetries)
var outstandingProduceRequests = serializedData
var remainingRetries = config.producerRetries
Stream.continually(dispatchSerializedData(outstandingProduceRequests))
.takeWhile(requests => (remainingRetries > 0) && (requests.size > 0)).foreach {
currentOutstandingRequests =>
outstandingProduceRequests = currentOutstandingRequests
// back off and update the topic metadata cache before attempting another send operation
Thread.sleep(config.producerRetryBackoffMs)
brokerPartitionInfo.updateInfo()
remainingRetries -= 1
}
}
}
private def handleSerializedData(messages: Seq[ProducerData[K,Message]], requiredRetries: Int) {
private def dispatchSerializedData(messages: Seq[ProducerData[K,Message]]): Seq[ProducerData[K, Message]] = {
val partitionedData = partitionAndCollate(messages)
val failedProduceRequests = new ListBuffer[ProducerData[K,Message]]
try {
for ( (brokerid, eventsPerBrokerMap) <- partitionedData ) {
if (logger.isTraceEnabled)
eventsPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partitions: %s"
.format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap)
try {
send(brokerid, messageSetPerBroker)
} catch {
case t =>
warn("error sending data to broker " + brokerid, t)
var numRetries = 0
val eventsPerBroker = new ListBuffer[ProducerData[K,Message]]
eventsPerBrokerMap.foreach(e => eventsPerBroker.appendAll(e._2))
while (numRetries < requiredRetries) {
numRetries +=1
Thread.sleep(config.producerRetryBackoffMs)
try {
brokerPartitionInfo.updateInfo()
handleSerializedData(eventsPerBroker, 0)
return
}
catch {
case t => warn("error sending data to broker " + brokerid + " in " + numRetries + " retry", t)
}
}
throw new FailedToSendMessageException("can't send data after " + numRetries + " retries", t)
if((brokerid < 0) || (!send(brokerid, messageSetPerBroker)))
failedProduceRequests.appendAll(eventsPerBrokerMap.map(r => r._2).flatten)
}
}catch {
case t: Throwable => error("Failed to send messages")
}
failedProduceRequests
}
def serialize(events: Seq[ProducerData[K,V]]): Seq[ProducerData[K,Message]] = {
@ -93,16 +89,22 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
val partitionIndex = getPartition(event.getKey, totalNumPartitions)
val brokerPartition = topicPartitionsList(partitionIndex)
val leaderBrokerId = brokerPartition.leader match {
case Some(leader) => leader.brokerId
case None => -1
// postpone the failure until the send operation, so that requests for other brokers are handled correctly
}
var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null
ret.get(brokerPartition._2.id) match {
ret.get(leaderBrokerId) match {
case Some(element) =>
dataPerBroker = element.asInstanceOf[HashMap[(String, Int), Seq[ProducerData[K,Message]]]]
case None =>
dataPerBroker = new HashMap[(String, Int), Seq[ProducerData[K,Message]]]
ret.put(brokerPartition._2.id, dataPerBroker)
ret.put(leaderBrokerId, dataPerBroker)
}
val topicAndPartition = (event.getTopic, brokerPartition._1.partId)
val topicAndPartition = (event.getTopic, brokerPartition.partId)
var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null
dataPerBroker.get(topicAndPartition) match {
case Some(element) =>
@ -116,10 +118,11 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
ret
}
private def getPartitionListForTopic(pd: ProducerData[K,Message]): Seq[(Partition, Broker)] = {
private def getPartitionListForTopic(pd: ProducerData[K,Message]): Seq[Partition] = {
debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic)
debug("Broker partitions registered for topic: " + pd.getTopic + " = " + topicPartitionsList)
debug("Broker partitions registered for topic: %s are %s"
.format(pd.getTopic, topicPartitionsList.map(p => p.partId).mkString(",")))
val totalNumPartitions = topicPartitionsList.length
if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey)
topicPartitionsList
@ -150,7 +153,10 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
* @param brokerId the broker that will receive the request
* @param messagesPerTopic the messages as a map from (topic, partition) -> messages
*/
private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]) {
private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]): Boolean = {
try {
if(brokerId < 0)
throw new NoLeaderForPartitionException("No leader for some partition(s) on broker %d".format(brokerId))
if(messagesPerTopic.size > 0) {
val topics = new HashMap[String, ListBuffer[PartitionData]]()
for( ((topicName, partitionId), messagesSet) <- messagesPerTopic ) {
@ -165,8 +171,12 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
val syncProducer = producerPool.getProducer(brokerId)
val response = syncProducer.send(producerRequest)
// TODO: possibly send response to response callback handler
trace("kafka producer sent messages for topics %s to broker %s:%d"
.format(messagesPerTopic, syncProducer.config.host, syncProducer.config.port))
trace("kafka producer sent messages for topics %s to broker %d on %s:%d"
.format(messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port))
}
true
}catch {
case t: Throwable => false
}
}

View File

@ -63,6 +63,10 @@ class ProducerSendThread[K,V](val threadName: String,
// returns a null object
val expired = currentQueueItem == null
if(currentQueueItem != null) {
if(currentQueueItem.getKey == null)
trace("Dequeued item for topic %s, no partition key, data: %s"
.format(currentQueueItem.getTopic, currentQueueItem.getData.toString))
else
trace("Dequeued item for topic %s, partition key: %s, data: %s"
.format(currentQueueItem.getTopic, currentQueueItem.getKey.toString, currentQueueItem.getData.toString))
events += currentQueueItem

View File

@ -130,7 +130,7 @@ class KafkaApis(val logManager: LogManager) extends Logging {
try {
trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
val log = logManager.getLog(topic, partition)
response = Right(if(log != null) log.read(offset, maxSize) else MessageSet.Empty)
response = Right(log match { case Some(l) => l.read(offset, maxSize) case None => MessageSet.Empty })
} catch {
case e =>
error("error when processing request " + (topic, partition, offset, maxSize), e)
@ -168,7 +168,7 @@ class KafkaApis(val logManager: LogManager) extends Logging {
if(config.autoCreateTopics) {
CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions,
config.defaultReplicationFactor)
info("Auto creation of topic %s with partitions %d and replication factor %d is successful!"
info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
.format(topic, config.numPartitions, config.defaultReplicationFactor))
val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
newTopicMetadata match {

View File

@ -94,6 +94,15 @@ class KafkaConfig(props: Properties) extends ZKConfig(props) {
/* enable auto creation of topic on the server */
val autoCreateTopics = Utils.getBoolean(props, "auto.create.topics", true)
/**
* Following properties are relevant to Kafka replication
*/
/* default replication factors for automatically created topics */
val defaultReplicationFactor = Utils.getInt(props, "default.replication.factor", 1)
}
/* wait time in ms to allow the preferred replica for a partition to become the leader. This property is used during
* leader election on all replicas minus the preferred replica */
val preferredReplicaWaitTime = Utils.getLong(props, "preferred.replica.wait.time", 300)
}

View File

@ -20,9 +20,9 @@ package kafka.server
import java.util.concurrent._
import java.util.concurrent.atomic._
import java.io.File
import kafka.utils.{Mx4jLoader, Utils, SystemTime, Logging}
import kafka.network.{SocketServerStats, SocketServer}
import kafka.log.LogManager
import kafka.utils._
/**
* Represents the lifecycle of a single Kafka broker. Handles all functionality required
@ -72,6 +72,11 @@ class KafkaServer(val config: KafkaConfig) extends Logging {
* So this should happen after socket server start.
*/
logManager.startup
// starting relevant replicas and leader election for partitions assigned to this broker
// TODO: Some part of the broker startup logic is hidden inside KafkaZookeeper, but some of it has to be done here
// since it requires the log manager to come up. Ideally log manager should not hide KafkaZookeeper inside it
logManager.kafkaZookeeper.startReplicasForTopics(ZkUtils.getAllTopics(logManager.getZookeeperClient))
info("Server started.")
}
@ -82,7 +87,7 @@ class KafkaServer(val config: KafkaConfig) extends Logging {
def shutdown() {
val canShutdown = isShuttingDown.compareAndSet(false, true);
if (canShutdown) {
info("Shutting down Kafka server")
info("Shutting down Kafka server with id " + config.brokerId)
if (socketServer != null)
socketServer.shutdown()
if(requestHandlerPool != null)
@ -108,3 +113,5 @@ class KafkaServer(val config: KafkaConfig) extends Logging {
def getStats(): SocketServerStats = socketServer.stats
}

View File

@ -18,11 +18,12 @@
package kafka.server
import kafka.utils._
import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
import org.apache.zookeeper.Watcher.Event.KeeperState
import kafka.log.LogManager
import java.net.InetAddress
import kafka.common.KafkaZookeeperClient
import kafka.cluster.Replica
import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient}
/**
* Handles the server's interaction with zookeeper. The server needs to register the following paths:
@ -36,12 +37,18 @@ class KafkaZooKeeper(config: KafkaConfig, logManager: LogManager) extends Loggin
var zkClient: ZkClient = null
var topics: List[String] = Nil
val lock = new Object()
var existingTopics: Set[String] = Set.empty[String]
val leaderChangeListener = new LeaderChangeListener
val topicPartitionsChangeListener = new TopicChangeListener
private val topicListenerLock = new Object
private val leaderChangeLock = new Object
def startup() {
/* start client */
info("connecting to ZK: " + config.zkConnect)
zkClient = KafkaZookeeperClient.getZookeeperClient(config)
zkClient.subscribeStateChanges(new SessionExpireListener)
subscribeToTopicAndPartitionsChanges
}
def registerBrokerInZk() {
@ -73,6 +80,12 @@ class KafkaZooKeeper(config: KafkaConfig, logManager: LogManager) extends Loggin
info("re-registering broker info in ZK for broker " + config.brokerId)
registerBrokerInZk()
info("done re-registering broker")
info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicPartitionsChangeListener)
val topics = ZkUtils.getAllTopics(zkClient)
debug("Existing topics are %s".format(topics.mkString(",")))
topics.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.getTopicPartitionsPath(topic), topicPartitionsChangeListener))
handleNewTopics(topics)
}
}
@ -83,4 +96,167 @@ class KafkaZooKeeper(config: KafkaConfig, logManager: LogManager) extends Loggin
}
}
def handleNewTopics(topics: Seq[String]) {
// get relevant partitions to this broker
val topicsAndPartitionsOnThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
topicsAndPartitionsOnThisBroker.foreach { tp =>
val topic = tp._1
val partitionsAssignedToThisBroker = tp._2
// subscribe to leader changes for these partitions
subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker)
// start replicas for these partitions
startReplicasForPartitions(topic, partitionsAssignedToThisBroker)
}
}
def handleNewPartitions(topic: String, partitions: Seq[Int]) {
info("Handling topic %s partitions %s".format(topic, partitions.mkString(",")))
// find the partitions relevant to this broker
val partitionsAssignedToThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topic, partitions, config.brokerId)
info("Partitions assigned to broker %d for topic %s are %s"
.format(config.brokerId, topic, partitionsAssignedToThisBroker.mkString(",")))
// subscribe to leader changes for these partitions
subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker)
// start replicas for these partitions
startReplicasForPartitions(topic, partitionsAssignedToThisBroker)
}
def subscribeToTopicAndPartitionsChanges {
info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicPartitionsChangeListener)
val topics = ZkUtils.getAllTopics(zkClient)
debug("Existing topics are %s".format(topics.mkString(",")))
topics.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.getTopicPartitionsPath(topic), topicPartitionsChangeListener))
val partitionsAssignedToThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
debug("Partitions assigned to broker %d are %s".format(config.brokerId, partitionsAssignedToThisBroker.mkString(",")))
partitionsAssignedToThisBroker.foreach { tp =>
val topic = tp._1
val partitions = tp._2.map(p => p.toInt)
partitions.foreach { partition =>
// register leader change listener
zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString), leaderChangeListener)
}
}
}
private def subscribeToLeaderForPartitions(topic: String, partitions: Seq[Int]) {
partitions.foreach { partition =>
info("Broker %d subscribing to leader changes for topic %s partition %d".format(config.brokerId, topic, partition))
// register leader change listener
zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString), leaderChangeListener)
}
}
def startReplicasForTopics(topics: Seq[String]) {
val partitionsAssignedToThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
partitionsAssignedToThisBroker.foreach(tp => startReplicasForPartitions(tp._1, tp._2))
}
private def startReplicasForPartitions(topic: String, partitions: Seq[Int]) {
partitions.foreach { partition =>
val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partition).map(r => r.toInt)
if(assignedReplicas.contains(config.brokerId)) {
val replica = logManager.addReplicaForPartition(topic, partition)
startReplica(replica)
} else
warn("Ignoring partition %d of topic %s since broker %d doesn't host any replicas for it"
.format(partition, topic, config.brokerId))
}
}
private def startReplica(replica: Replica) {
info("Starting replica for topic %s partition %d on broker %d".format(replica.topic, replica.partition.partId, replica.brokerId))
replica.log match {
case Some(log) => // log is already started
case None =>
// TODO: Add log recovery upto the last checkpointed HW as part of KAFKA-46
}
ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partId) match {
case Some(leader) => info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partId, leader))
case None => // leader election
leaderElection(replica)
}
}
def leaderElection(replica: Replica) {
info("Broker %d electing leader for topic %s partition %d".format(config.brokerId, replica.topic, replica.partition.partId))
// read the AR list for replica.partition from ZK
val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, replica.topic, replica.partition.partId).map(r => r.toInt)
// TODO: read the ISR as part of KAFKA-302
if(assignedReplicas.contains(replica.brokerId)) {
// wait for some time if it is not the preferred replica
try {
if(replica.brokerId != assignedReplicas.head)
Thread.sleep(config.preferredReplicaWaitTime)
}catch {
case e => // ignoring
}
if(ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic, replica.partition.partId, replica.brokerId)) {
info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic, replica.partition.partId))
// TODO: Become leader as part of KAFKA-302
}
}
}
class TopicChangeListener extends IZkChildListener with Logging {
@throws(classOf[Exception])
def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
topicListenerLock.synchronized {
debug("Topic/partition change listener fired for path " + parentPath)
import scala.collection.JavaConversions._
val currentChildren = asBuffer(curChilds)
// check if topic has changed or a partition for an existing topic has changed
if(parentPath == ZkUtils.BrokerTopicsPath) {
val currentTopics = currentChildren
debug("New topics " + currentTopics.mkString(","))
// for each new topic [topic], watch the path /brokers/topics/[topic]/partitions
currentTopics.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.getTopicPartitionsPath(topic), this))
handleNewTopics(currentTopics)
}else {
val topic = parentPath.split("/").takeRight(2).head
debug("Partitions changed for topic %s on broker %d with new value %s"
.format(topic, config.brokerId, currentChildren.mkString(",")))
handleNewPartitions(topic, currentChildren.map(p => p.toInt).toSeq)
}
}
}
}
class LeaderChangeListener extends IZkDataListener with Logging {
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
// handle leader change event for path
val newLeader: String = data.asInstanceOf[String]
debug("Leader change listener fired for path %s. New leader is %s".format(dataPath, newLeader))
// TODO: update the leader in the list of replicas maintained by the log manager
}
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
leaderChangeLock.synchronized {
// leader is deleted for topic partition
val topic = dataPath.split("/").takeRight(4).head
val partitionId = dataPath.split("/").takeRight(2).head.toInt
debug("Leader deleted listener fired for topic %s partition %d on broker %d"
.format(topic, partitionId, config.brokerId))
val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionId).map(r => r.toInt)
if(assignedReplicas.contains(config.brokerId)) {
val replica = logManager.getReplicaForPartition(topic, partitionId)
replica match {
case Some(r) => leaderElection(r)
case None => error("No replica exists for topic %s partition %s on broker %d"
.format(topic, partitionId, config.brokerId))
}
}
}
}
}
}

View File

@ -246,6 +246,22 @@ object Utils extends Logging {
else value
}
def getLong(props: Properties, name: String, default: Long): Long =
getLongInRange(props, name, default, (Long.MinValue, Long.MaxValue))
def getLongInRange(props: Properties, name: String, default: Long, range: (Long, Long)): Long = {
val v =
if(props.containsKey(name))
props.getProperty(name).toInt
else
default
if(v < range._1 || v > range._2)
throw new IllegalArgumentException(name + " has value " + v + " which is not in the range " + range + ".")
else
v
}
def getLongInRange(buffer: ByteBuffer, name: String, range: (Long, Long)): Long = {
val value = buffer.getLong
if(value < range._1 || value > range._2)

View File

@ -17,13 +17,14 @@
package kafka.utils
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.serialize.ZkSerializer
import kafka.cluster.{Broker, Cluster}
import scala.collection._
import java.util.Properties
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
import kafka.consumer.TopicCount
import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
import java.util.concurrent.locks.Condition
object ZkUtils extends Logging {
val ConsumersPath = "/consumers"
@ -68,15 +69,9 @@ object ZkUtils extends Logging {
}
def getLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int): Option[Int] = {
// TODO: When leader election is implemented, change this method to return the leader as follows
// until then, assume the first replica as the leader
// val leader = readDataMaybeNull(zkClient, getTopicPartitionLeaderPath(topic, partition.toString))
val replicaListString = readDataMaybeNull(zkClient, getTopicPartitionReplicasPath(topic, partition.toString))
val replicas = Utils.getCSVList(replicaListString)
replicas.size match {
case 0 => None
case _ => Some(replicas.head.toInt)
}
val leader = readDataMaybeNull(zkClient, getTopicPartitionLeaderPath(topic, partition.toString))
if(leader == null) None
else Some(leader.toInt)
}
def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[String] = {
@ -94,6 +89,16 @@ object ZkUtils extends Logging {
replicas.contains(brokerId.toString)
}
def tryToBecomeLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = {
try {
createEphemeralPathExpectConflict(zkClient, getTopicPartitionLeaderPath(topic, partition.toString), brokerId.toString)
true
} catch {
case e: ZkNodeExistsException => error("Leader exists for topic %s partition %d".format(topic, partition)); false
case oe => false
}
}
def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, creator: String, port: Int) {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
val broker = new Broker(id, creator, host, port)
@ -317,6 +322,27 @@ object ZkUtils extends Logging {
ret
}
def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Map[String, Seq[Int]] = {
val topicsAndPartitions = getPartitionsForTopics(zkClient, topics.iterator)
topicsAndPartitions.map { tp =>
val topic = tp._1
val partitions = tp._2.map(p => p.toInt)
val relevantPartitions = partitions.filter { partition =>
val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partition).map(r => r.toInt)
assignedReplicas.contains(brokerId)
}
(topic -> relevantPartitions)
}
}
def getPartitionsAssignedToBroker(zkClient: ZkClient, topic: String, partitions: Seq[Int], broker: Int): Seq[Int] = {
partitions.filter { p =>
val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, p).map(r => r.toInt)
assignedReplicas.contains(broker)
}
}
def deletePartition(zkClient : ZkClient, brokerId: Int, topic: String) {
val brokerIdPath = BrokerIdsPath + "/" + brokerId
zkClient.delete(brokerIdPath)
@ -372,6 +398,29 @@ object ZkUtils extends Logging {
def getBrokerInfoFromIds(zkClient: ZkClient, brokerIds: Seq[Int]): Seq[Broker] =
brokerIds.map( bid => Broker.createBroker(bid, ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)) )
def getAllTopics(zkClient: ZkClient): Seq[String] = {
val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
if(topics == null) Seq.empty[String]
else topics
}
}
class LeaderExists(topic: String, partition: Int, leaderExists: Condition) extends IZkDataListener {
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
val t = dataPath.split("/").takeRight(3).head
val p = dataPath.split("/").takeRight(2).head.toInt
if(t == topic && p == partition)
leaderExists.signal()
}
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
leaderExists.signal()
}
}
object ZKStringSerializer extends ZkSerializer {

View File

@ -18,8 +18,8 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.logger.kafka=OFF
log4j.logger.kafka=INFO
# zkclient can be verbose, during debugging it is common to adjust is separately
log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
log4j.logger.org.apache.zookeeper=WARN
log4j.logger.org.I0Itec.zkclient.ZkClient=OFF
log4j.logger.org.apache.zookeeper=OFF

View File

@ -169,8 +169,6 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
case Some(metadata) => assertEquals(topic, metadata.topic)
assertNotNull("partition metadata list cannot be null", metadata.partitionsMetadata)
assertEquals("partition metadata list length should be 2", 2, metadata.partitionsMetadata.size)
assertEquals("leader of partition 0 should be 0", 0, metadata.partitionsMetadata.head.leader.get.id)
assertEquals("leader of partition 1 should be 1", 1, metadata.partitionsMetadata.last.leader.get.id)
val actualReplicaAssignment = metadata.partitionsMetadata.map(p => p.replicas)
val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
assertEquals(expectedReplicaAssignment.toList, actualReplicaList)

View File

@ -43,7 +43,7 @@ class TopicCountTest extends JUnitSuite {
@Test
def testPartition() {
assertTrue(new Partition(10, 0) == new Partition(10, 0))
assertTrue(new Partition(10, 1) != new Partition(10, 0))
assertTrue(new Partition("foo", 10) == new Partition("foo", 10))
assertTrue(new Partition("foo", 1) != new Partition("foo", 0))
}
}

View File

@ -31,6 +31,7 @@ import org.I0Itec.zkclient.ZkClient
import kafka.utils._
import kafka.producer.{ProducerConfig, ProducerData, Producer}
import java.util.{Collections, Properties}
import kafka.utils.TestUtils._
class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
@ -51,10 +52,12 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val consumer2 = "consumer2"
val consumer3 = "consumer3"
val nMessages = 2
var zkClient: ZkClient = null
override def setUp() {
super.setUp()
dirs = new ZKGroupTopicDirs(group, topic)
zkClient = new ZkClient(zkConnect, 6000, 3000, ZKStringSerializer)
}
def testBasic() {
@ -94,6 +97,10 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
TestUtils.createConsumerProperties(zkConnect, group, consumer1))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
assertEquals(sentMessages1.size, receivedMessages1.size)
assertEquals(sentMessages1, receivedMessages1)
@ -102,7 +109,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
val expected_1 = List( ("0", "group1_consumer1-0"),
("1", "group1_consumer1-0"))
// assertEquals(expected_1, actual_1)
assertEquals(expected_1, actual_1)
// commit consumed offsets
@ -118,7 +124,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
Thread.sleep(200)
waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
@ -141,7 +148,10 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val sentMessages3_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages)
val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
Thread.sleep(200)
waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
@ -168,6 +178,9 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum)
waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
// create a consumer
val consumerConfig1 = new ConsumerConfig(
TestUtils.createConsumerProperties(zkConnect, group, consumer1))
@ -196,7 +209,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
Thread.sleep(200)
waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
@ -219,7 +233,10 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val sentMessages3_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec)
val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
Thread.sleep(200)
waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
@ -300,11 +317,15 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
waitUntilLeaderIsElected(zkClient, topic, 0, 500)
waitUntilLeaderIsElected(zkClient, topic, 1, 500)
val zkConsumerConnector =
new ZookeeperConsumerConnector(consumerConfig, true)
val topicMessageStreams =
zkConsumerConnector.createMessageStreams(Predef.Map(topic -> 1), new StringDecoder)
var receivedMessages: List[String] = Nil
for ((topic, messageStreams) <- topicMessageStreams) {
for (messageStream <- messageStreams) {
@ -341,10 +362,10 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val topicRegistry = zkConsumerConnector1.getTopicRegistry
assertEquals(1, topicRegistry.map(r => r._1).size)
assertEquals(topic, topicRegistry.map(r => r._1).head)
val topicsAndPartitionsInRegistry = topicRegistry.map(r => (r._1, r._2.map(p => p._1)))
val topicsAndPartitionsInRegistry = topicRegistry.map(r => (r._1, r._2.map(p => p._2)))
val brokerPartition = topicsAndPartitionsInRegistry.head._2.head
assertEquals(0, brokerPartition.brokerId)
assertEquals(0, brokerPartition.partId)
assertEquals(0, brokerPartition.partitionId)
// also check partition ownership
val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)

View File

@ -27,8 +27,9 @@ import kafka.message._
import kafka.server._
import org.scalatest.junit.JUnit3Suite
import kafka.integration.KafkaServerTestHarness
import kafka.utils.TestUtils
import kafka.producer.{ProducerData, Producer}
import kafka.utils.TestUtils
import kafka.utils.TestUtils._
class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
@ -43,7 +44,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
val queue = new LinkedBlockingQueue[FetchedDataChunk]
val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
c.brokerId,
new Partition(c.brokerId, 0),
0,
queue,
new AtomicLong(0),
new AtomicLong(0),
@ -66,6 +67,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
def testFetcher() {
val perNode = 2
var count = sendMessages(perNode)
waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
fetch(count)
Thread.sleep(100)
assertQueueEmpty()

View File

@ -21,11 +21,11 @@ import kafka.api.FetchRequestBuilder
import kafka.common.OffsetOutOfRangeException
import kafka.message.{Message, ByteBufferMessageSet}
import kafka.server.{KafkaRequestHandler, KafkaConfig}
import kafka.utils.TestUtils
import org.apache.log4j.{Level, Logger}
import org.scalatest.junit.JUnit3Suite
import scala.collection._
import kafka.producer.ProducerData
import kafka.utils.TestUtils
/**
* End to end tests of the primitive apis against a local server
@ -61,6 +61,7 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
val producerData = new ProducerData[String, Message](topic, topic, sentMessages)
producer.send(producerData)
var fetchedMessage: ByteBufferMessageSet = null
while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())

View File

@ -39,6 +39,8 @@ trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarnes
props.put("buffer.size", "65536")
props.put("connect.timeout.ms", "100000")
props.put("reconnect.interval", "10000")
props.put("producer.retry.backoff.ms", "1000")
props.put("producer.num.retries", "3")
producer = new Producer(new ProducerConfig(props))
consumer = new SimpleConsumer(host,
port,

View File

@ -20,14 +20,14 @@ package kafka.javaapi.consumer
import junit.framework.Assert._
import kafka.integration.KafkaServerTestHarness
import kafka.server._
import kafka.utils.{Utils, Logging}
import kafka.utils.TestUtils
import org.scalatest.junit.JUnit3Suite
import scala.collection.JavaConversions._
import kafka.consumer.{ConsumerConfig, KafkaMessageStream}
import org.apache.log4j.{Level, Logger}
import kafka.message._
import kafka.javaapi.producer.{ProducerData, Producer}
import kafka.utils.{Utils, Logging, TestUtils}
import kafka.utils.TestUtils._
class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
@ -52,11 +52,15 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
// send some messages to each broker
val sentMessages1 = sendMessages(nMessages, "batch1")
waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
// create a consumer
val consumerConfig1 = new ConsumerConfig(
TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1))
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
assertEquals(sentMessages1, receivedMessages1)

View File

@ -26,7 +26,7 @@ import kafka.message.Message
import kafka.producer.async.MissingConfigException
import kafka.serializer.Encoder
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.zk.{EmbeddedZookeeper, ZooKeeperTestHarness}
import kafka.zk.ZooKeeperTestHarness
import org.apache.log4j.spi.LoggingEvent
import org.apache.log4j.{PropertyConfigurator, Logger}
import org.junit.{After, Before, Test}
@ -36,22 +36,16 @@ import kafka.utils._
class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
var logDirZk: File = null
var logDirBl: File = null
var serverBl: KafkaServer = null
var serverZk: KafkaServer = null
var simpleConsumerZk: SimpleConsumer = null
var simpleConsumerBl: SimpleConsumer = null
val tLogger = Logger.getLogger(getClass())
private val brokerZk = 0
private val brokerBl = 1
private val ports = TestUtils.choosePorts(2)
private val (portZk, portBl) = (ports(0), ports(1))
private var zkServer:EmbeddedZookeeper = null
private val portZk = ports(0)
@Before
override def setUp() {
@ -62,26 +56,17 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
logDirZk = new File(logDirZkPath)
serverZk = TestUtils.createServer(new KafkaConfig(propsZk));
val propsBl: Properties = TestUtils.createBrokerConfig(brokerBl, portBl)
val logDirBlPath = propsBl.getProperty("log.dir")
logDirBl = new File(logDirBlPath)
serverBl = TestUtils.createServer(new KafkaConfig(propsBl))
Thread.sleep(100)
simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024)
simpleConsumerBl = new SimpleConsumer("localhost", portBl, 1000000, 64*1024)
}
@After
override def tearDown() {
simpleConsumerZk.close
simpleConsumerBl.close
serverZk.shutdown
serverBl.shutdown
Utils.rm(logDirZk)
Utils.rm(logDirBl)
Thread.sleep(500)
super.tearDown()
@ -174,13 +159,6 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
count = count + 1
}
val response2 = simpleConsumerBl.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, 0L, 1024*1024).build())
val messagesFromOtherBroker = response2.messageSet("test-topic", 0)
for(message <- messagesFromOtherBroker) {
count = count + 1
}
assertEquals(5, count)
}
@ -195,11 +173,6 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
props.put("log4j.logger.kafka.log4j", "INFO,KAFKA")
props
}
private def getLogDir(): File = {
val dir = TestUtils.tempDir()
dir
}
}
class AppenderStringEncoder extends Encoder[LoggingEvent] {

View File

@ -124,10 +124,58 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
producer.close
}
// @Test
// def testZKSendWithDeadBroker() {
// val props = new Properties()
// props.put("serializer.class", "kafka.serializer.StringEncoder")
// props.put("partitioner.class", "kafka.utils.StaticPartitioner")
// props.put("zk.connect", TestZKUtils.zookeeperConnect)
//
// // create topic
// CreateTopicCommand.createTopic(zkClient, "new-topic", 2, 1, "0,0")
//
// val config = new ProducerConfig(props)
//
// val producer = new Producer[String, String](config)
// val message = new Message("test1".getBytes)
// try {
//// // kill 2nd broker
//// server1.shutdown
//// Thread.sleep(100)
//
// // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get sent to partition 0 and
// // all partitions have broker 0 as the leader.
// producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
// Thread.sleep(100)
//
// producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
// Thread.sleep(3000)
//
// // restart server 1
//// server1.startup()
//// Thread.sleep(100)
//
// // cross check if brokers got the messages
// val response = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
// val messageSet = response.messageSet("new-topic", 0).iterator
// var numMessagesReceived = 0
// while(messageSet.hasNext) {
// val messageAndOffset = messageSet.next()
// assertEquals(message, messageSet.next.message)
// println("Received message at offset %d".format(messageAndOffset.offset))
// numMessagesReceived += 1
// }
// assertEquals("Message set should have 2 messages", 2, numMessagesReceived)
// } catch {
// case e: Exception => fail("Not expected", e)
// }
// producer.close
// }
// TODO: Need to rewrite when SyncProducer changes to throw timeout exceptions
// and when leader logic is changed.
@Test
def testZKSendWithDeadBroker() {
// @Test
// def testZKSendWithDeadBroker2() {
// val props = new Properties()
// props.put("serializer.class", "kafka.serializer.StringEncoder")
// props.put("partitioner.class", "kafka.utils.StaticPartitioner")
@ -172,7 +220,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
// case e: Exception => fail("Not expected", e)
// }
// producer.close
}
// }
@Test
def testZKSendToExistingTopicWithNoBrokers() {
@ -227,7 +275,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
} catch {
case e: Exception => fail("Not expected", e)
} finally {
server.shutdown
if(server != null) server.shutdown
producer.close
}
}

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import kafka.admin.CreateTopicCommand
import org.I0Itec.zkclient.ZkClient
import kafka.utils.TestUtils._
import junit.framework.Assert._
import kafka.utils.{ZKStringSerializer, Utils, TestUtils}
class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
val brokerId1 = 0
val brokerId2 = 1
val port1 = TestUtils.choosePort()
val port2 = TestUtils.choosePort()
val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
var zkClient: ZkClient = null
override def setUp() {
super.setUp()
zkClient = new ZkClient(zkConnect, 6000, 3000, ZKStringSerializer)
// start both servers
val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
servers ++= List(server1, server2)
}
override def tearDown() {
// shutdown the servers and delete data hosted on them
servers.map(server => server.shutdown())
servers.map(server => Utils.rm(server.config.logDir))
super.tearDown()
}
def testLeaderElectionWithCreateTopic {
// start 2 brokers
val topic = "new-topic"
val partitionId = 0
// create topic with 1 partition, 2 replicas, one on each broker
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
// wait until leader is elected
var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1))
// kill the server hosting the preferred replica
servers.head.shutdown()
// check if leader moves to the other server
leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 5000)
assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
Thread.sleep(500)
// bring the preferred replica back
servers.head.startup()
leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
// TODO: Once the optimization for preferred replica re-election is in, this check should change to broker 0
assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1))
// shutdown current leader (broker 1)
servers.last.shutdown()
leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
// test if the leader is the preferred replica
assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1))
}
}

View File

@ -24,10 +24,11 @@ import junit.framework.Assert._
import kafka.message.{Message, ByteBufferMessageSet}
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.{TestUtils, Utils}
import kafka.producer._
import kafka.utils.TestUtils._
import kafka.admin.CreateTopicCommand
import kafka.api.FetchRequestBuilder
import kafka.utils.{TestUtils, Utils}
class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
val port = TestUtils.choosePort
@ -72,7 +73,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
val server = new KafkaServer(config)
server.startup()
Thread.sleep(100)
waitUntilLeaderIsElected(zookeeper.client, topic, 0, 1000)
var fetchedMessage: ByteBufferMessageSet = null
while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
@ -83,7 +84,6 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
val newOffset = fetchedMessage.validBytes
// send some more messages
println("Sending messages to topic " + topic)
producer.send(new ProducerData[Int, Message](topic, 0, sent2))
Thread.sleep(200)

View File

@ -34,11 +34,13 @@ import kafka.consumer.{KafkaMessageStream, ConsumerConfig}
import scala.collection.Map
import kafka.serializer.Encoder
import kafka.api.{ProducerRequest, TopicData, PartitionData}
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit
/**
* Utility functions to help with testing
*/
object TestUtils {
object TestUtils extends Logging {
val Letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
val Digits = "0123456789"
@ -385,6 +387,35 @@ object TestUtils {
val pr = new kafka.javaapi.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeout, data)
pr
}
def waitUntilLeaderIsElected(zkClient: ZkClient, topic: String, partition: Int, timeoutMs: Long): Option[Int] = {
val leaderLock = new ReentrantLock()
val leaderExists = leaderLock.newCondition()
info("Waiting for leader to be elected for topic %s partition %d".format(topic, partition))
leaderLock.lock()
try {
// check if leader already exists
val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
leader match {
case Some(l) => info("Leader %d exists for topic %s partition %d".format(l, topic, partition))
leader
case None => zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString),
new LeaderExists(topic, partition, leaderExists))
leaderExists.await(timeoutMs, TimeUnit.MILLISECONDS)
// check if leader is elected
val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
leader match {
case Some(l) => info("Leader %d elected for topic %s partition %d".format(l, topic, partition))
case None => error("Timing out after %d ms since leader is not elected for topic %s partition %d"
.format(timeoutMs, topic, partition))
}
leader
}
} finally {
leaderLock.unlock()
}
}
}
object TestZKUtils {