KAFKA-262 Bug in the consumer rebalancing logic causes one consumer to release partitions that it does not own; patched by Neha Narkhede; reviewed by Jun Rao

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1242552 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Neha Narkhede 2012-02-09 22:04:32 +00:00
parent 345bc7bad5
commit a888a9d09e
23 changed files with 37 additions and 114 deletions

View File

@ -22,6 +22,7 @@ import java.util.concurrent.atomic._
import scala.collection._
import kafka.cluster._
import kafka.utils._
import mutable.ListBuffer
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import java.net.InetAddress
import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
@ -91,7 +92,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private val rebalanceLock = new Object
private var fetcher: Option[Fetcher] = None
private var zkClient: ZkClient = null
private val topicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]]
private var topicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]]
// queues : (topic,consumerThreadId) -> queue
private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]]
private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false)
@ -373,8 +374,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]])
extends IZkChildListener {
private val dirs = new ZKGroupDirs(group)
private var oldPartitionsPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]()
private var oldConsumersPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]()
@throws(classOf[Exception])
def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
@ -390,25 +389,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
deletePath(zkClient, znode)
debug("Consumer " + consumerIdString + " releasing " + znode)
}
topicRegistry.remove(topic)
}
}
private def getRelevantTopicMap(myTopicThreadIdsMap: Map[String, Set[String]],
newPartMap: Map[String,List[String]],
oldPartMap: Map[String,List[String]],
newConsumerMap: Map[String,List[String]],
oldConsumerMap: Map[String,List[String]]): Map[String, Set[String]] = {
var relevantTopicThreadIdsMap = new mutable.HashMap[String, Set[String]]()
for ( (topic, consumerThreadIdSet) <- myTopicThreadIdsMap )
if ( oldPartMap.get(topic) != newPartMap.get(topic) || oldConsumerMap.get(topic) != newConsumerMap.get(topic))
relevantTopicThreadIdsMap += (topic -> consumerThreadIdSet)
relevantTopicThreadIdsMap
}
def resetState() {
topicRegistry.clear
oldConsumersPerTopicMap.clear
oldPartitionsPerTopicMap.clear
}
def syncedRebalance() {
@ -437,11 +423,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
/* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
* clear the cache */
info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
oldConsumersPerTopicMap.clear()
oldPartitionsPerTopicMap.clear()
}
// commit offsets
commitOffsets()
// stop all fetchers and clear all the queues to avoid data duplication
closeFetchersForQueues(cluster, kafkaMessageStreams, queues.map(q => q._2))
// release all partitions, reset state and retry
@ -457,14 +439,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val myTopicThreadIdsMap = getTopicCount(zkClient, group, consumerIdString).getConsumerThreadIdsPerTopic
val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
val partitionsPerTopicMap = getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keys.iterator)
val relevantTopicThreadIdsMap = getRelevantTopicMap(myTopicThreadIdsMap, partitionsPerTopicMap, oldPartitionsPerTopicMap, consumersPerTopicMap, oldConsumersPerTopicMap)
if (relevantTopicThreadIdsMap.size <= 0) {
info("Consumer %s with %s and topic partitions %s doesn't need to rebalance.".
format(consumerIdString, consumersPerTopicMap, partitionsPerTopicMap))
debug("Partitions per topic cache " + oldPartitionsPerTopicMap)
debug("Consumers per topic cache " + oldConsumersPerTopicMap)
return true
}
/**
* fetchers must be stopped to avoid data duplication, since if the current
@ -472,14 +446,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
* But if we don't stop the fetchers first, this consumer would continue returning data for released
* partitions in parallel. So, not stopping the fetchers leads to duplicate data.
*/
closeFetchers(cluster, kafkaMessageStreams, relevantTopicThreadIdsMap)
closeFetchers(cluster, kafkaMessageStreams, myTopicThreadIdsMap)
releasePartitionOwnership()
var partitionOwnershipDecision = new collection.mutable.HashMap[(String, String), String]()
for ((topic, consumerThreadIdSet) <- relevantTopicThreadIdsMap) {
topicRegistry.remove(topic)
topicRegistry.put(topic, new Pool[Partition, PartitionTopicInfo])
var currentTopicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]]
for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) {
currentTopicRegistry.put(topic, new Pool[Partition, PartitionTopicInfo])
val topicDirs = new ZKGroupTopicDirs(group, topic)
val curConsumers = consumersPerTopicMap.get(topic).get
@ -507,10 +482,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
for (i <- startPart until startPart + nParts) {
val partition = curPartitions(i)
info(consumerThreadId + " attempting to claim partition " + partition)
val ownPartition = processPartition(topicDirs, partition, topic, consumerThreadId)
if (!ownPartition)
return false
else // record the partition ownership decision
addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId)
// record the partition ownership decision
partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
}
}
@ -525,8 +498,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
info("Updating the cache")
debug("Partitions per topic cache " + partitionsPerTopicMap)
debug("Consumers per topic cache " + consumersPerTopicMap)
oldPartitionsPerTopicMap = partitionsPerTopicMap
oldConsumersPerTopicMap = consumersPerTopicMap
topicRegistry = currentTopicRegistry
updateFetcher(cluster, kafkaMessageStreams)
true
}else
@ -579,27 +551,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
}
private def processPartition(topicDirs: ZKGroupTopicDirs, partition: String,
topic: String, consumerThreadId: String) : Boolean = {
val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
// check if some other consumer owns this partition at this time
val currentPartitionOwner = readDataMaybeNull(zkClient, partitionOwnerPath)
if(currentPartitionOwner != null) {
if(currentPartitionOwner.equals(consumerThreadId)) {
info(partitionOwnerPath + " exists with value " + currentPartitionOwner + " during connection loss; this is ok")
addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId)
true
}
else {
info(partitionOwnerPath + " exists with value " + currentPartitionOwner)
false
}
} else {
addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId)
true
}
}
private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[(String, String), String]): Boolean = {
val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner =>
val topic = partitionOwner._1._1
@ -620,15 +571,17 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
case e2 => throw e2
}
}
val success = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => if(decision) 0 else 1)
if(success > 0) false /* even if one of the partition ownership attempt has failed, return false */
val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1))
/* even if one of the partition ownership attempt has failed, return false */
if(hasPartitionOwnershipFailed > 0) false
else true
}
private def addPartitionTopicInfo(topicDirs: ZKGroupTopicDirs, partitionString: String,
private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Partition, PartitionTopicInfo]],
topicDirs: ZKGroupTopicDirs, partitionString: String,
topic: String, consumerThreadId: String) {
val partition = Partition.parse(partitionString)
val partTopicInfoMap = topicRegistry.get(topic)
val partTopicInfoMap = currentTopicRegistry.get(topic)
val znode = topicDirs.consumerOffsetDir + "/" + partition.name
val offsetString = readDataMaybeNull(zkClient, znode)

View File

@ -18,13 +18,13 @@
package kafka.producer
import async.MissingConfigException
import org.apache.log4j.spi.{LoggingEvent, ErrorCode}
import org.apache.log4j.spi.LoggingEvent
import org.apache.log4j.AppenderSkeleton
import org.apache.log4j.helpers.LogLog
import kafka.utils.{Utils, Logging}
import kafka.utils.Logging
import kafka.serializer.Encoder
import java.util.{Properties, Date}
import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
import kafka.message.Message
import scala.collection._
class KafkaLog4jAppender extends AppenderSkeleton with Logging {

View File

@ -57,9 +57,9 @@ object VerifyConsumerRebalance extends Logging {
// check if the rebalancing operation succeeded.
try {
if(validateRebalancingOperation(zkClient, group))
info("Rebalance operation successful !")
println("Rebalance operation successful !")
else
error("Rebalance operation failed !")
println("Rebalance operation failed !")
} catch {
case e2: Throwable => error("Error while verifying current rebalancing operation", e2)
}
@ -132,6 +132,4 @@ object VerifyConsumerRebalance extends Logging {
rebalanceSucceeded
}
}

View File

@ -18,7 +18,7 @@
package kafka
import message.Message
import org.apache.log4j.{Logger, PropertyConfigurator}
import org.apache.log4j.PropertyConfigurator
import kafka.utils.Logging
import serializer.Encoder

View File

@ -17,8 +17,6 @@
package kafka
import java.net.URI
import java.util.Arrays.asList
import java.io._
import java.nio._
import java.nio.channels._

View File

@ -17,7 +17,6 @@
package kafka.log
import kafka.log._
import kafka.message._
import kafka.utils.{TestUtils, Utils}

View File

@ -22,10 +22,9 @@ import org.scalatest.junit.JUnit3Suite
import org.apache.log4j.Logger
import java.util.Properties
import kafka.consumer.SimpleConsumer
import kafka.utils.{Utils, TestUtils}
import kafka.utils.TestUtils
import kafka.api.{OffsetRequest, FetchRequest}
import junit.framework.Assert._
import java.io.File
class BackwardsCompatibilityTest extends JUnit3Suite {

View File

@ -17,11 +17,6 @@
package kafka.integration
import java.util.Properties
import junit.framework.Assert._
import kafka.producer._
import kafka.consumer._
import kafka.message._
import kafka.server._
import kafka.utils.{Utils, TestUtils}
import org.scalatest.junit.JUnit3Suite

View File

@ -35,7 +35,6 @@ trait BaseMessageSetTestCases extends JUnitSuite {
@Test
def testWrittenEqualsRead {
import scala.collection.JavaConversions._
val messageSet = createMessageSet(messages)
TestUtils.checkEquals(messages.iterator, toMessageIterator(messageSet))
}

View File

@ -18,7 +18,6 @@
package kafka.javaapi.message
import java.nio._
import junit.framework.TestCase
import junit.framework.Assert._
import org.junit.Test
import kafka.message.{DefaultCompressionCodec, CompressionCodec, NoCompressionCodec, Message}

View File

@ -24,7 +24,6 @@ import kafka.zk.EmbeddedZookeeper
import kafka.utils.{TestZKUtils, TestUtils}
import org.junit.{After, Before, Test}
import junit.framework.Assert
import collection.mutable.HashMap
import org.easymock.EasyMock
import kafka.utils.Utils
import java.util.concurrent.ConcurrentHashMap
@ -34,7 +33,7 @@ import org.scalatest.junit.JUnitSuite
import kafka.producer.{SyncProducerConfig, Partitioner, ProducerConfig, DefaultPartitioner}
import kafka.producer.ProducerPool
import kafka.javaapi.message.ByteBufferMessageSet
import kafka.producer.async.{AsyncProducer, AsyncProducerConfig}
import kafka.producer.async.AsyncProducer
import kafka.javaapi.Implicits._
import kafka.serializer.{StringEncoder, Encoder}
import kafka.javaapi.consumer.SimpleConsumer

View File

@ -17,11 +17,11 @@
package kafka.javaapi.producer
import junit.framework.{Assert, TestCase}
import junit.framework.Assert
import kafka.utils.SystemTime
import kafka.utils.TestUtils
import kafka.server.{KafkaServer, KafkaConfig}
import org.apache.log4j.{Logger, Level}
import org.apache.log4j.Logger
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}
import java.util.Properties

View File

@ -18,14 +18,13 @@
package kafka.log
import java.io._
import java.nio._
import java.util.ArrayList
import junit.framework.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}
import kafka.utils.{Utils, TestUtils, Range}
import kafka.common.OffsetOutOfRangeException
import kafka.message.{NoCompressionCodec, MessageSet, ByteBufferMessageSet, Message}
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
class LogTest extends JUnitSuite {

View File

@ -23,13 +23,12 @@ import java.util.Properties
import java.io.File
import kafka.consumer.SimpleConsumer
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.TestUtils
import kafka.utils.{TestUtils, TestZKUtils,Utils, Logging}
import kafka.zk.EmbeddedZookeeper
import junit.framework.Assert._
import kafka.api.FetchRequest
import kafka.serializer.Encoder
import kafka.message.{MessageSet, Message}
import kafka.message.Message
import kafka.producer.async.MissingConfigException
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}

View File

@ -17,7 +17,6 @@
package kafka.message
import java.util.Arrays
import junit.framework.Assert._
import kafka.utils.TestUtils._
import org.scalatest.junit.JUnitSuite

View File

@ -18,12 +18,9 @@
package kafka.message
import java.nio._
import java.util.Arrays
import junit.framework.TestCase
import junit.framework.Assert._
import kafka.utils.TestUtils._
import org.junit.Test
import kafka.message._
class FileMessageSetTest extends BaseMessageSetTestCases {

View File

@ -19,9 +19,6 @@ package kafka.message
import java.util._
import java.nio._
import java.nio.channels._
import java.io._
import junit.framework.TestCase
import junit.framework.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.{Before, Test}

View File

@ -19,13 +19,9 @@ package kafka.network;
import java.net._
import java.io._
import java.nio._
import java.nio.channels._
import org.junit._
import junit.framework.Assert._
import org.scalatest.junit.JUnitSuite
import kafka.utils.TestUtils
import kafka.network._
import java.util.Random
import org.apache.log4j._

View File

@ -17,7 +17,7 @@
package kafka.producer
import junit.framework.{Assert, TestCase}
import junit.framework.Assert
import java.util.Properties
import org.easymock.EasyMock
import kafka.api.ProducerRequest

View File

@ -17,14 +17,13 @@
package kafka.producer
import async.{AsyncProducerConfig, AsyncProducer}
import async.AsyncProducer
import java.util.Properties
import org.apache.log4j.{Logger, Level}
import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig}
import kafka.zk.EmbeddedZookeeper
import org.junit.{After, Before, Test}
import junit.framework.Assert
import collection.mutable.HashMap
import org.easymock.EasyMock
import java.util.concurrent.ConcurrentHashMap
import kafka.cluster.Partition

View File

@ -17,11 +17,11 @@
package kafka.producer
import junit.framework.{Assert, TestCase}
import junit.framework.Assert
import kafka.utils.SystemTime
import kafka.utils.TestUtils
import kafka.server.{KafkaServer, KafkaConfig}
import org.apache.log4j.{Logger, Level}
import org.apache.log4j.Logger
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}
import kafka.common.MessageSizeTooLargeException

View File

@ -20,13 +20,11 @@ import kafka.utils.TestUtils
import java.io.File
import kafka.utils.Utils
import kafka.api.FetchRequest
import kafka.integration.ProducerConsumerTestHarness
import kafka.producer.{SyncProducer, SyncProducerConfig}
import kafka.consumer.SimpleConsumer
import java.util.Properties
import org.scalatest.junit.JUnitSuite
import junit.framework.{Assert, TestCase}
import org.junit.{After, Before, Test}
import org.junit.Test
import junit.framework.Assert._
import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}