mirror of https://github.com/apache/kafka.git
make # of consumer rebalance retries configurable; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-213
git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1207645 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cb2564c45e
commit
078bd591e1
|
|
@ -30,6 +30,7 @@ object ConsumerConfig {
|
||||||
val AutoCommit = true
|
val AutoCommit = true
|
||||||
val AutoCommitInterval = 10 * 1000
|
val AutoCommitInterval = 10 * 1000
|
||||||
val MaxQueuedChunks = 100
|
val MaxQueuedChunks = 100
|
||||||
|
val MaxRebalanceRetries = 4
|
||||||
val AutoOffsetReset = OffsetRequest.SmallestTimeString
|
val AutoOffsetReset = OffsetRequest.SmallestTimeString
|
||||||
val ConsumerTimeoutMs = -1
|
val ConsumerTimeoutMs = -1
|
||||||
val MirrorTopicsWhitelist = ""
|
val MirrorTopicsWhitelist = ""
|
||||||
|
|
@ -77,6 +78,9 @@ class ConsumerConfig(props: Properties) extends ZKConfig(props) {
|
||||||
/** max number of messages buffered for consumption */
|
/** max number of messages buffered for consumption */
|
||||||
val maxQueuedChunks = Utils.getInt(props, "queuedchunks.max", MaxQueuedChunks)
|
val maxQueuedChunks = Utils.getInt(props, "queuedchunks.max", MaxQueuedChunks)
|
||||||
|
|
||||||
|
/** max number of retries during rebalance */
|
||||||
|
val maxRebalanceRetries = Utils.getInt(props, "rebalance.retries.max", MaxRebalanceRetries)
|
||||||
|
|
||||||
/* what to do if an offset is out of range.
|
/* what to do if an offset is out of range.
|
||||||
smallest : automatically reset the offset to the smallest offset
|
smallest : automatically reset the offset to the smallest offset
|
||||||
largest : automatically reset the offset to the largest offset
|
largest : automatically reset the offset to the largest offset
|
||||||
|
|
|
||||||
|
|
@ -67,7 +67,6 @@ import kafka.common.InvalidConfigException
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
private[kafka] object ZookeeperConsumerConnector {
|
private[kafka] object ZookeeperConsumerConnector {
|
||||||
val MAX_N_RETRIES = 4
|
|
||||||
val shutdownCommand: FetchedDataChunk = new FetchedDataChunk(null, null, -1L)
|
val shutdownCommand: FetchedDataChunk = new FetchedDataChunk(null, null, -1L)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -424,7 +423,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
|
||||||
|
|
||||||
def syncedRebalance() {
|
def syncedRebalance() {
|
||||||
rebalanceLock synchronized {
|
rebalanceLock synchronized {
|
||||||
for (i <- 0 until ZookeeperConsumerConnector.MAX_N_RETRIES) {
|
for (i <- 0 until config.maxRebalanceRetries) {
|
||||||
info("begin rebalancing consumer " + consumerIdString + " try #" + i)
|
info("begin rebalancing consumer " + consumerIdString + " try #" + i)
|
||||||
var done = false
|
var done = false
|
||||||
try {
|
try {
|
||||||
|
|
@ -447,7 +446,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new RuntimeException(consumerIdString + " can't rebalance after " + ZookeeperConsumerConnector.MAX_N_RETRIES +" retires")
|
throw new RuntimeException(consumerIdString + " can't rebalance after " + config.maxRebalanceRetries +" retries")
|
||||||
}
|
}
|
||||||
|
|
||||||
private def rebalance(): Boolean = {
|
private def rebalance(): Boolean = {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue