KAFKA-2603: Add timeout arg to ConsoleConsumer for new consumer

Added --timeout-ms argument to ConsoleConsumer that works with both old and new consumer. Also modified ducktape ConsoleConsumer service to use this arg instead of consumer.timeout.ms config that works only with the old consumer.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Aditya Auradkar, Ismael Juma, Guozhang Wang

Closes #274 from rajinisivaram/KAFKA-2603
This commit is contained in:
Rajini Sivaram 2015-10-14 13:35:49 -07:00 committed by Guozhang Wang
parent 27c099b043
commit f13d115596
4 changed files with 20 additions and 11 deletions

View File

@ -32,7 +32,7 @@ trait BaseConsumer {
case class BaseConsumerRecord(topic: String, partition: Int, offset: Long, key: Array[Byte], value: Array[Byte])
class NewShinyConsumer(topic: String, consumerProps: Properties) extends BaseConsumer {
class NewShinyConsumer(topic: String, consumerProps: Properties, val timeoutMs: Long = Long.MaxValue) extends BaseConsumer {
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConversions._
@ -41,8 +41,11 @@ class NewShinyConsumer(topic: String, consumerProps: Properties) extends BaseCon
var recordIter = consumer.poll(0).iterator
override def receive(): BaseConsumerRecord = {
while (!recordIter.hasNext)
recordIter = consumer.poll(Long.MaxValue).iterator
if (!recordIter.hasNext) {
recordIter = consumer.poll(timeoutMs).iterator
if (!recordIter.hasNext)
throw new ConsumerTimeoutException
}
val record = recordIter.next
BaseConsumerRecord(record.topic, record.partition, record.offset, record.key, record.value)

View File

@ -51,7 +51,8 @@ object ConsoleConsumer extends Logging {
val consumer =
if (conf.useNewConsumer) {
new NewShinyConsumer(conf.topicArg, getNewConsumerProps(conf))
val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs else Long.MaxValue
new NewShinyConsumer(conf.topicArg, getNewConsumerProps(conf), timeoutMs)
} else {
checkZk(conf)
new OldConsumer(conf.filterSpec, getOldConsumerProps(conf))
@ -100,8 +101,8 @@ object ConsoleConsumer extends Logging {
consumer.receive()
} catch {
case e: Throwable => {
error("Error processing message, stopping consumer: ", e)
consumer.stop()
error("Error processing message, terminating consumer process: ", e)
// Consumer will be closed
return
}
}
@ -112,7 +113,7 @@ object ConsoleConsumer extends Logging {
if (skipMessageOnError) {
error("Error processing message, skipping this message: ", e)
} else {
consumer.stop()
// Consumer will be closed
throw e
}
}
@ -149,6 +150,8 @@ object ConsoleConsumer extends Logging {
if (config.options.has(config.deleteConsumerOffsetsOpt))
ZkUtils.maybeDeletePath(config.options.valueOf(config.zkConnectOpt), "/consumers/" + config.consumerProps.getProperty("group.id"))
if (config.timeoutMs >= 0)
props.put("consumer.timeout.ms", config.timeoutMs.toString)
props
}
@ -204,6 +207,10 @@ object ConsoleConsumer extends Logging {
.withRequiredArg
.describedAs("num_messages")
.ofType(classOf[java.lang.Integer])
val timeoutMsOpt = parser.accepts("timeout-ms", "If specified, exit if no message is available for consumption for the specified interval.")
.withRequiredArg
.describedAs("timeout_ms")
.ofType(classOf[java.lang.Integer])
val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
"skip it instead of halt.")
val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
@ -246,6 +253,7 @@ object ConsoleConsumer extends Logging {
val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt))
val maxMessages = if (options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
val timeoutMs = if (options.has(timeoutMsOpt)) options.valueOf(timeoutMsOpt).intValue else -1
val bootstrapServer = options.valueOf(bootstrapServerOpt)
val keyDeserializer = options.valueOf(keyDeserializerOpt)
val valueDeserializer = options.valueOf(valueDeserializerOpt)

View File

@ -161,6 +161,8 @@ class ConsoleConsumer(JmxMixin, PerformanceService):
cmd += " --zookeeper %(zk_connect)s" % args
if self.from_beginning:
cmd += " --from-beginning"
if self.consumer_timeout_ms is not None:
cmd += " --timeout-ms %s" % self.consumer_timeout_ms
cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args
return cmd

View File

@ -14,10 +14,6 @@
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
{% if consumer_timeout_ms is defined and consumer_timeout_ms is not none %}
consumer.timeout.ms={{ consumer_timeout_ms }}
{% endif %}
group.id={{ group_id|default('test-consumer-group') }}
{% if client_id is defined and client_id is not none %}