mirror of https://github.com/apache/kafka.git
KAFKA-1030 Addition of partitions requires bouncing all the consumers of that topic; reviewed by Neha, Swapnil, Joel
This commit is contained in:
parent
aebf746190
commit
c6ca971738
|
@ -31,7 +31,6 @@ import java.util.UUID
|
|||
import kafka.serializer._
|
||||
import kafka.utils.ZkUtils._
|
||||
import kafka.common._
|
||||
import kafka.client.ClientUtils
|
||||
import com.yammer.metrics.core.Gauge
|
||||
import kafka.metrics._
|
||||
import scala.Some
|
||||
|
@ -422,17 +421,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
|
|||
true
|
||||
}
|
||||
else {
|
||||
val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet,
|
||||
brokers,
|
||||
config.clientId,
|
||||
config.socketTimeoutMs,
|
||||
correlationId.getAndIncrement).topicsMetadata
|
||||
val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
|
||||
topicsMetadata.foreach(m => {
|
||||
val topic = m.topic
|
||||
val partitions = m.partitionsMetadata.map(m1 => m1.partitionId)
|
||||
partitionsPerTopicMap.put(topic, partitions)
|
||||
})
|
||||
val partitionsAssignmentPerTopicMap = getPartitionAssignmentForTopics(zkClient, myTopicThreadIdsMap.keySet.toSeq)
|
||||
val partitionsPerTopicMap = partitionsAssignmentPerTopicMap.map(p => (p._1, p._2.keySet.toSeq))
|
||||
|
||||
/**
|
||||
* fetchers must be stopped to avoid data duplication, since if the current
|
||||
|
|
Loading…
Reference in New Issue