KAFKA-1184 High-Level Consumer: expose fetcher threads number as a parameter; reviewed by Neha Narkhede

This commit is contained in:
Evelina Stepanova 2014-01-30 08:36:53 -08:00 committed by Neha Narkhede
parent 269d16d3c9
commit 9c1d8e35c5
4 changed files with 12 additions and 1 deletions

View File

@ -28,6 +28,7 @@ object ConsumerConfig extends Config {
val SocketBufferSize = 64*1024
val FetchSize = 1024 * 1024
val MaxFetchSize = 10*FetchSize
val NumConsumerFetchers = 1
val DefaultFetcherBackoffMs = 1000
val AutoCommit = true
val AutoCommitInterval = 60 * 1000
@ -94,6 +95,9 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
/** the number of byes of messages to attempt to fetch */
val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize)
/** the number threads used to fetch data */
val numConsumerFetchers = props.getInt("num.consumer.fetchers", NumConsumerFetchers)
/** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */
val autoCommitEnable = props.getBoolean("auto.commit.enable", AutoCommit)

View File

@ -41,7 +41,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
private val config: ConsumerConfig,
private val zkClient : ZkClient)
extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds),
config.clientId, 1) {
config.clientId, config.numConsumerFetchers) {
private var partitionMap: immutable.Map[TopicAndPartition, PartitionTopicInfo] = null
private var cluster: Cluster = null
private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition]

View File

@ -150,6 +150,7 @@ object TestUtils extends Logging {
props.put("auto.commit.interval.ms", "1000")
props.put("rebalance.max.retries", "4")
props.put("auto.offset.reset", "smallest")
props.put("num.consumer.fetchers", "2")
props
}

View File

@ -112,6 +112,11 @@ object ConsumerPerformance {
.describedAs("count")
.ofType(classOf[java.lang.Integer])
.defaultsTo(10)
val numFetchersOpt = parser.accepts("num-fetch-threads", "Number of fetcher threads.")
.withRequiredArg
.describedAs("count")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
val options = parser.parse(args : _*)
@ -130,6 +135,7 @@ object ConsumerPerformance {
props.put("auto.offset.reset", if(options.has(resetBeginningOffsetOpt)) "largest" else "smallest")
props.put("zookeeper.connect", options.valueOf(zkConnectOpt))
props.put("consumer.timeout.ms", "5000")
props.put("num.consumer.fetchers", options.valueOf(numFetchersOpt).toString)
val consumerConfig = new ConsumerConfig(props)
val numThreads = options.valueOf(numThreadsOpt).intValue
val topic = options.valueOf(topicOpt)