diff --git a/bin/kafka-console-consumer.sh b/bin/kafka-console-consumer.sh
index dbaac2b83b1..f2201462738 100755
--- a/bin/kafka-console-consumer.sh
+++ b/bin/kafka-console-consumer.sh
@@ -18,4 +18,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
-exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.consumer.ConsoleConsumer "$@"
diff --git a/bin/windows/kafka-console-consumer.bat b/bin/windows/kafka-console-consumer.bat
index bbbd33656ad..9236cfb16a3 100644
--- a/bin/windows/kafka-console-consumer.bat
+++ b/bin/windows/kafka-console-consumer.bat
@@ -16,5 +16,5 @@ rem limitations under the License.
SetLocal
set KAFKA_HEAP_OPTS=-Xmx512M
-"%~dp0kafka-run-class.bat" kafka.tools.ConsoleConsumer %*
+"%~dp0kafka-run-class.bat" org.apache.kafka.tools.consumer.ConsoleConsumer %*
EndLocal
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 2f9c3960483..caf1fe5ebe1 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -321,6 +321,17 @@
+
+
+
+
+
+
+
+
+
+
+
@@ -331,13 +342,6 @@
-
-
-
-
-
-
-
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 925c50dec86..6a8bc0b1852 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -273,11 +273,11 @@
+ files="(ConsoleConsumer|DefaultMessageFormatter|StreamsResetter|ProducerPerformance|Agent).java"/>
+ files="(DefaultMessageFormatter|ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier|ReplicaVerificationTool).java"/>
- error("Authentication failed: terminating consumer process", e)
- Exit.exit(1)
- case e: Throwable =>
- error("Unknown error when running consumer: ", e)
- Exit.exit(1)
- }
- }
-
- def run(conf: ConsumerConfig): Unit = {
- val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs.toLong else Long.MaxValue
- val consumer = new KafkaConsumer(consumerProps(conf), new ByteArrayDeserializer, new ByteArrayDeserializer)
-
- val consumerWrapper =
- if (conf.partitionArg.isDefined)
- new ConsumerWrapper(Option(conf.topicArg), conf.partitionArg, Option(conf.offsetArg), None, consumer, timeoutMs)
- else
- new ConsumerWrapper(Option(conf.topicArg), None, None, Option(conf.includedTopicsArg), consumer, timeoutMs)
-
- addShutdownHook(consumerWrapper, conf)
-
- try process(conf.maxMessages, conf.formatter, consumerWrapper, System.out, conf.skipMessageOnError)
- finally {
- consumerWrapper.cleanup()
- conf.formatter.close()
- reportRecordCount()
-
- shutdownLatch.countDown()
- }
- }
-
- def addShutdownHook(consumer: ConsumerWrapper, conf: ConsumerConfig): Unit = {
- Exit.addShutdownHook("consumer-shutdown-hook", {
- consumer.wakeup()
-
- shutdownLatch.await()
-
- if (conf.enableSystestEventsLogging) {
- System.out.println("shutdown_complete")
- }
- })
- }
-
- def process(maxMessages: Integer, formatter: MessageFormatter, consumer: ConsumerWrapper, output: PrintStream,
- skipMessageOnError: Boolean): Unit = {
- while (messageCount < maxMessages || maxMessages == -1) {
- val msg: ConsumerRecord[Array[Byte], Array[Byte]] = try {
- consumer.receive()
- } catch {
- case _: WakeupException =>
- trace("Caught WakeupException because consumer is shutdown, ignore and terminate.")
- // Consumer will be closed
- return
- case e: Throwable =>
- error("Error processing message, terminating consumer process: ", e)
- // Consumer will be closed
- return
- }
- messageCount += 1
- try {
- formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp, msg.timestampType,
- 0, 0, msg.key, msg.value, msg.headers, Optional.empty[Integer]), output)
- } catch {
- case e: Throwable =>
- if (skipMessageOnError) {
- error("Error processing message, skipping this message: ", e)
- } else {
- // Consumer will be closed
- throw e
- }
- }
- if (checkErr(output, formatter)) {
- // Consumer will be closed
- return
- }
- }
- }
-
- def reportRecordCount(): Unit = {
- System.err.println(s"Processed a total of $messageCount messages")
- }
-
- def checkErr(output: PrintStream, formatter: MessageFormatter): Boolean = {
- val gotError = output.checkError()
- if (gotError) {
- // This means no one is listening to our output stream anymore, time to shutdown
- System.err.println("Unable to write to standard out, closing consumer.")
- }
- gotError
- }
-
- private[tools] def consumerProps(config: ConsumerConfig): Properties = {
- val props = new Properties
- props ++= config.consumerProps
- props ++= config.extraConsumerProps
- setAutoOffsetResetValue(config, props)
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
- if (props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG) == null)
- props.put(ConsumerConfig.CLIENT_ID_CONFIG, "console-consumer")
- CommandLineUtils.maybeMergeOptions(
- props, ConsumerConfig.ISOLATION_LEVEL_CONFIG, config.options, config.isolationLevelOpt)
- props
- }
-
- /**
- * Used by consumerProps to retrieve the correct value for the consumer parameter 'auto.offset.reset'.
- *
- * Order of priority is:
- * 1. Explicitly set parameter via --consumer.property command line parameter
- * 2. Explicit --from-beginning given -> 'earliest'
- * 3. Default value of 'latest'
- *
- * In case both --from-beginning and an explicit value are specified an error is thrown if these
- * are conflicting.
- */
- def setAutoOffsetResetValue(config: ConsumerConfig, props: Properties): Unit = {
- val (earliestConfigValue, latestConfigValue) = ("earliest", "latest")
-
- if (props.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
- // auto.offset.reset parameter was specified on the command line
- val autoResetOption = props.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
- if (config.options.has(config.resetBeginningOpt) && earliestConfigValue != autoResetOption) {
- // conflicting options - latest und earliest, throw an error
- System.err.println(s"Can't simultaneously specify --from-beginning and 'auto.offset.reset=$autoResetOption', " +
- "please remove one option")
- Exit.exit(1)
- }
- // nothing to do, checking for valid parameter values happens later and the specified
- // value was already copied during .putall operation
- } else {
- // no explicit value for auto.offset.reset was specified
- // if --from-beginning was specified use earliest, otherwise default to latest
- val autoResetOption = if (config.options.has(config.resetBeginningOpt)) earliestConfigValue else latestConfigValue
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoResetOption)
- }
- }
-
- class ConsumerConfig(args: Array[String]) extends CommandDefaultOptions(args) {
- val topicOpt = parser.accepts("topic", "The topic to consume on.")
- .withRequiredArg
- .describedAs("topic")
- .ofType(classOf[String])
- val whitelistOpt = parser.accepts("whitelist",
- "DEPRECATED, use --include instead; ignored if --include specified. Regular expression specifying list of topics to include for consumption.")
- .withRequiredArg
- .describedAs("Java regex (String)")
- .ofType(classOf[String])
- val includeOpt = parser.accepts("include",
- "Regular expression specifying list of topics to include for consumption.")
- .withRequiredArg
- .describedAs("Java regex (String)")
- .ofType(classOf[String])
- val partitionIdOpt = parser.accepts("partition", "The partition to consume from. Consumption " +
- "starts from the end of the partition unless '--offset' is specified.")
- .withRequiredArg
- .describedAs("partition")
- .ofType(classOf[java.lang.Integer])
- val offsetOpt = parser.accepts("offset", "The offset to consume from (a non-negative number), or 'earliest' which means from beginning, or 'latest' which means from end")
- .withRequiredArg
- .describedAs("consume offset")
- .ofType(classOf[String])
- .defaultsTo("latest")
- val consumerPropertyOpt = parser.accepts("consumer-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.")
- .withRequiredArg
- .describedAs("consumer_prop")
- .ofType(classOf[String])
- val consumerConfigOpt = parser.accepts("consumer.config", s"Consumer config properties file. Note that $consumerPropertyOpt takes precedence over this config.")
- .withRequiredArg
- .describedAs("config file")
- .ofType(classOf[String])
- val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.")
- .withRequiredArg
- .describedAs("class")
- .ofType(classOf[String])
- .defaultsTo(classOf[DefaultMessageFormatter].getName)
- val messageFormatterArgOpt = parser.accepts("property",
- """The properties to initialize the message formatter. Default properties include:
- | print.timestamp=true|false
- | print.key=true|false
- | print.offset=true|false
- | print.partition=true|false
- | print.headers=true|false
- | print.value=true|false
- | key.separator=
- | line.separator=
- | headers.separator=
- | null.literal=
- | key.deserializer=
- | value.deserializer=
- | header.deserializer=
- |
- |Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers."""
- .stripMargin)
- .withRequiredArg
- .describedAs("prop")
- .ofType(classOf[String])
- val messageFormatterConfigOpt = parser.accepts("formatter-config", s"Config properties file to initialize the message formatter. Note that $messageFormatterArgOpt takes precedence over this config.")
- .withRequiredArg
- .describedAs("config file")
- .ofType(classOf[String])
- val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " +
- "start with the earliest message present in the log rather than the latest message.")
- val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.")
- .withRequiredArg
- .describedAs("num_messages")
- .ofType(classOf[java.lang.Integer])
- val timeoutMsOpt = parser.accepts("timeout-ms", "If specified, exit if no message is available for consumption for the specified interval.")
- .withRequiredArg
- .describedAs("timeout_ms")
- .ofType(classOf[java.lang.Integer])
- val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
- "skip it instead of halt.")
- val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The server(s) to connect to.")
- .withRequiredArg
- .describedAs("server to connect to")
- .ofType(classOf[String])
- val keyDeserializerOpt = parser.accepts("key-deserializer")
- .withRequiredArg
- .describedAs("deserializer for key")
- .ofType(classOf[String])
- val valueDeserializerOpt = parser.accepts("value-deserializer")
- .withRequiredArg
- .describedAs("deserializer for values")
- .ofType(classOf[String])
- val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events",
- "Log lifecycle events of the consumer in addition to logging consumed " +
- "messages. (This is specific for system tests.)")
- val isolationLevelOpt = parser.accepts("isolation-level",
- "Set to read_committed in order to filter out transactional messages which are not committed. Set to read_uncommitted " +
- "to read all messages.")
- .withRequiredArg()
- .ofType(classOf[String])
- .defaultsTo("read_uncommitted")
-
- val groupIdOpt = parser.accepts("group", "The consumer group id of the consumer.")
- .withRequiredArg
- .describedAs("consumer group id")
- .ofType(classOf[String])
-
- options = tryParse(parser, args)
-
- CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read data from Kafka topics and outputs it to standard output.")
-
- var groupIdPassed = true
- val enableSystestEventsLogging = options.has(enableSystestEventsLoggingOpt)
-
- // topic must be specified.
- var topicArg: String = _
- var includedTopicsArg: String = _
- val extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt))
- val consumerProps = if (options.has(consumerConfigOpt))
- Utils.loadProps(options.valueOf(consumerConfigOpt))
- else
- new Properties()
- val fromBeginning = options.has(resetBeginningOpt)
- val partitionArg = if (options.has(partitionIdOpt)) Some(options.valueOf(partitionIdOpt).intValue) else None
- val skipMessageOnError = options.has(skipMessageOnErrorOpt)
- val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
- val formatterArgs = if (options.has(messageFormatterConfigOpt))
- Utils.loadProps(options.valueOf(messageFormatterConfigOpt))
- else
- new Properties()
- formatterArgs ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt))
- val maxMessages = if (options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
- val timeoutMs = if (options.has(timeoutMsOpt)) options.valueOf(timeoutMsOpt).intValue else -1
- val bootstrapServer = options.valueOf(bootstrapServerOpt)
- val keyDeserializer = options.valueOf(keyDeserializerOpt)
- val valueDeserializer = options.valueOf(valueDeserializerOpt)
- val formatter: MessageFormatter = messageFormatterClass.getDeclaredConstructor().newInstance().asInstanceOf[MessageFormatter]
-
- if (keyDeserializer != null && keyDeserializer.nonEmpty) {
- formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer)
- }
- if (valueDeserializer != null && valueDeserializer.nonEmpty) {
- formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
- }
-
- formatter.configure(formatterArgs.asScala.asJava)
-
- topicArg = options.valueOf(topicOpt)
- includedTopicsArg = if (options.has(includeOpt))
- options.valueOf(includeOpt)
- else
- options.valueOf(whitelistOpt)
-
- val topicOrFilterArgs = List(topicArg, includedTopicsArg).filterNot(_ == null)
- // user need to specify value for either --topic or one of the include filters options (--include or --whitelist)
- if (topicOrFilterArgs.size != 1)
- CommandLineUtils.printUsageAndExit(parser, s"Exactly one of --include/--topic is required. " +
- s"${if (options.has(whitelistOpt)) "--whitelist is DEPRECATED use --include instead; ignored if --include specified."}")
-
- if (partitionArg.isDefined) {
- if (!options.has(topicOpt))
- CommandLineUtils.printUsageAndExit(parser, "The topic is required when partition is specified.")
- if (fromBeginning && options.has(offsetOpt))
- CommandLineUtils.printUsageAndExit(parser, "Options from-beginning and offset cannot be specified together.")
- } else if (options.has(offsetOpt))
- CommandLineUtils.printUsageAndExit(parser, "The partition is required when offset is specified.")
-
- def invalidOffset(offset: String): Nothing =
- ToolsUtils.printUsageAndExit(parser, s"The provided offset value '$offset' is incorrect. Valid values are " +
- "'earliest', 'latest', or a non-negative long.")
-
- val offsetArg =
- if (options.has(offsetOpt)) {
- options.valueOf(offsetOpt).toLowerCase(Locale.ROOT) match {
- case "earliest" => ListOffsetsRequest.EARLIEST_TIMESTAMP
- case "latest" => ListOffsetsRequest.LATEST_TIMESTAMP
- case offsetString =>
- try {
- val offset = offsetString.toLong
- if (offset < 0)
- invalidOffset(offsetString)
- offset
- } catch {
- case _: NumberFormatException => invalidOffset(offsetString)
- }
- }
- }
- else if (fromBeginning) ListOffsetsRequest.EARLIEST_TIMESTAMP
- else ListOffsetsRequest.LATEST_TIMESTAMP
-
- CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
-
- // if the group id is provided in more than place (through different means) all values must be the same
- val groupIdsProvided = Set(
- Option(options.valueOf(groupIdOpt)), // via --group
- Option(consumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)), // via --consumer-property
- Option(extraConsumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)) // via --consumer.config
- ).flatten
-
- if (groupIdsProvided.size > 1) {
- CommandLineUtils.printUsageAndExit(parser, "The group ids provided in different places (directly using '--group', "
- + "via '--consumer-property', or via '--consumer.config') do not match. "
- + s"Detected group ids: ${groupIdsProvided.mkString("'", "', '", "'")}")
- }
-
- groupIdsProvided.headOption match {
- case Some(group) =>
- consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, group)
- case None =>
- consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"console-consumer-${new Random().nextInt(100000)}")
- // By default, avoid unnecessary expansion of the coordinator cache since
- // the auto-generated group and its offsets is not intended to be used again
- if (!consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
- consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
- groupIdPassed = false
- }
-
- if (groupIdPassed && partitionArg.isDefined)
- CommandLineUtils.printUsageAndExit(parser, "Options group and partition cannot be specified together.")
-
- def tryParse(parser: OptionParser, args: Array[String]): OptionSet = {
- try
- parser.parse(args: _*)
- catch {
- case e: OptionException =>
- ToolsUtils.printUsageAndExit(parser, e.getMessage)
- }
- }
- }
-
- private[tools] class ConsumerWrapper(
- topic: Option[String],
- partitionId: Option[Int],
- offset: Option[Long],
- includedTopics: Option[String],
- consumer: Consumer[Array[Byte], Array[Byte]],
- timeoutMs: Long = Long.MaxValue,
- time: Time = Time.SYSTEM
- ) {
- consumerInit()
- var recordIter = Collections.emptyList[ConsumerRecord[Array[Byte], Array[Byte]]]().iterator()
-
- def consumerInit(): Unit = {
- (topic, partitionId, offset, includedTopics) match {
- case (Some(topic), Some(partitionId), Some(offset), None) =>
- seek(topic, partitionId, offset)
- case (Some(topic), Some(partitionId), None, None) =>
- // default to latest if no offset is provided
- seek(topic, partitionId, ListOffsetsRequest.LATEST_TIMESTAMP)
- case (Some(topic), None, None, None) =>
- consumer.subscribe(Collections.singletonList(topic))
- case (None, None, None, Some(include)) =>
- consumer.subscribe(Pattern.compile(include))
- case _ =>
- throw new IllegalArgumentException("An invalid combination of arguments is provided. " +
- "Exactly one of 'topic' or 'include' must be provided. " +
- "If 'topic' is provided, an optional 'partition' may also be provided. " +
- "If 'partition' is provided, an optional 'offset' may also be provided, otherwise, consumption starts from the end of the partition.")
- }
- }
-
- def seek(topic: String, partitionId: Int, offset: Long): Unit = {
- val topicPartition = new TopicPartition(topic, partitionId)
- consumer.assign(Collections.singletonList(topicPartition))
- offset match {
- case ListOffsetsRequest.EARLIEST_TIMESTAMP => consumer.seekToBeginning(Collections.singletonList(topicPartition))
- case ListOffsetsRequest.LATEST_TIMESTAMP => consumer.seekToEnd(Collections.singletonList(topicPartition))
- case _ => consumer.seek(topicPartition, offset)
- }
- }
-
- def resetUnconsumedOffsets(): Unit = {
- val smallestUnconsumedOffsets = collection.mutable.Map[TopicPartition, Long]()
- while (recordIter.hasNext) {
- val record = recordIter.next()
- val tp = new TopicPartition(record.topic, record.partition)
- // avoid auto-committing offsets which haven't been consumed
- smallestUnconsumedOffsets.getOrElseUpdate(tp, record.offset)
- }
- smallestUnconsumedOffsets.forKeyValue { (tp, offset) => consumer.seek(tp, offset) }
- }
-
- def receive(): ConsumerRecord[Array[Byte], Array[Byte]] = {
- val startTimeMs = time.milliseconds
- while (!recordIter.hasNext) {
- recordIter = consumer.poll(Duration.ofMillis(timeoutMs)).iterator
- if (!recordIter.hasNext && (time.milliseconds - startTimeMs > timeoutMs)) {
- throw new TimeoutException()
- }
- }
-
- recordIter.next
- }
-
- def wakeup(): Unit = {
- this.consumer.wakeup()
- }
-
- def cleanup(): Unit = {
- resetUnconsumedOffsets()
- this.consumer.close()
- }
-
- }
-}
-
-class DefaultMessageFormatter extends MessageFormatter {
- var printTimestamp = false
- var printKey = false
- var printValue = true
- var printPartition = false
- var printOffset = false
- var printHeaders = false
- var keySeparator = utfBytes("\t")
- var lineSeparator = utfBytes("\n")
- var headersSeparator = utfBytes(",")
- var nullLiteral = utfBytes("null")
-
- var keyDeserializer: Option[Deserializer[_]] = None
- var valueDeserializer: Option[Deserializer[_]] = None
- var headersDeserializer: Option[Deserializer[_]] = None
-
- override def configure(configs: Map[String, _]): Unit = {
- getPropertyIfExists(configs, "print.timestamp", getBoolProperty).foreach(printTimestamp = _)
- getPropertyIfExists(configs, "print.key", getBoolProperty).foreach(printKey = _)
- getPropertyIfExists(configs, "print.offset", getBoolProperty).foreach(printOffset = _)
- getPropertyIfExists(configs, "print.partition", getBoolProperty).foreach(printPartition = _)
- getPropertyIfExists(configs, "print.headers", getBoolProperty).foreach(printHeaders = _)
- getPropertyIfExists(configs, "print.value", getBoolProperty).foreach(printValue = _)
- getPropertyIfExists(configs, "key.separator", getByteProperty).foreach(keySeparator = _)
- getPropertyIfExists(configs, "line.separator", getByteProperty).foreach(lineSeparator = _)
- getPropertyIfExists(configs, "headers.separator", getByteProperty).foreach(headersSeparator = _)
- getPropertyIfExists(configs, "null.literal", getByteProperty).foreach(nullLiteral = _)
-
- keyDeserializer = getPropertyIfExists(configs, "key.deserializer", getDeserializerProperty(true))
- valueDeserializer = getPropertyIfExists(configs, "value.deserializer", getDeserializerProperty(false))
- headersDeserializer = getPropertyIfExists(configs, "headers.deserializer", getDeserializerProperty(false))
- }
-
- def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
-
- def writeSeparator(columnSeparator: Boolean): Unit = {
- if (columnSeparator)
- output.write(keySeparator)
- else
- output.write(lineSeparator)
- }
-
- def deserialize(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String) = {
- val nonNullBytes = Option(sourceBytes).getOrElse(nullLiteral)
- val convertedBytes = deserializer
- .map(d => utfBytes(d.deserialize(topic, consumerRecord.headers, nonNullBytes).toString))
- .getOrElse(nonNullBytes)
- convertedBytes
- }
-
- import consumerRecord._
-
- if (printTimestamp) {
- if (timestampType != TimestampType.NO_TIMESTAMP_TYPE)
- output.write(utfBytes(s"$timestampType:$timestamp"))
- else
- output.write(utfBytes("NO_TIMESTAMP"))
- writeSeparator(columnSeparator = printOffset || printPartition || printHeaders || printKey || printValue)
- }
-
- if (printPartition) {
- output.write(utfBytes("Partition:"))
- output.write(utfBytes(partition().toString))
- writeSeparator(columnSeparator = printOffset || printHeaders || printKey || printValue)
- }
-
- if (printOffset) {
- output.write(utfBytes("Offset:"))
- output.write(utfBytes(offset().toString))
- writeSeparator(columnSeparator = printHeaders || printKey || printValue)
- }
-
- if (printHeaders) {
- val headersIt = headers().iterator.asScala
- if (headersIt.hasNext) {
- headersIt.foreach { header =>
- output.write(utfBytes(header.key() + ":"))
- output.write(deserialize(headersDeserializer, header.value(), topic))
- if (headersIt.hasNext) {
- output.write(headersSeparator)
- }
- }
- } else {
- output.write(utfBytes("NO_HEADERS"))
- }
- writeSeparator(columnSeparator = printKey || printValue)
- }
-
- if (printKey) {
- output.write(deserialize(keyDeserializer, key, topic))
- writeSeparator(columnSeparator = printValue)
- }
-
- if (printValue) {
- output.write(deserialize(valueDeserializer, value, topic))
- output.write(lineSeparator)
- }
- }
-
- private def propertiesWithKeyPrefixStripped(prefix: String, configs: Map[String, _]): Map[String, _] = {
- val newConfigs = collection.mutable.Map[String, Any]()
- configs.asScala.foreach { case (key, value) =>
- if (key.startsWith(prefix) && key.length > prefix.length)
- newConfigs.put(key.substring(prefix.length), value)
- }
- newConfigs.asJava
- }
-
- private def utfBytes(str: String) = str.getBytes(StandardCharsets.UTF_8)
-
- private def getByteProperty(configs: Map[String, _], key: String): Array[Byte] = {
- utfBytes(configs.get(key).asInstanceOf[String])
- }
-
- private def getBoolProperty(configs: Map[String, _], key: String): Boolean = {
- configs.get(key).asInstanceOf[String].trim.equalsIgnoreCase("true")
- }
-
- private def getDeserializerProperty(isKey: Boolean)(configs: Map[String, _], propertyName: String): Deserializer[_] = {
- val deserializer = Class.forName(configs.get(propertyName).asInstanceOf[String]).getDeclaredConstructor().newInstance().asInstanceOf[Deserializer[_]]
- val deserializerConfig = propertiesWithKeyPrefixStripped(propertyName + ".", configs)
- .asScala
- .asJava
- deserializer.configure(deserializerConfig, isKey)
- deserializer
- }
-
- private def getPropertyIfExists[T](configs: Map[String, _], key: String, getter: (Map[String, _], String) => T): Option[T] = {
- if (configs.containsKey(key))
- Some(getter(configs, key))
- else
- None
- }
-}
-
-class LoggingMessageFormatter extends MessageFormatter with LazyLogging {
- private val defaultWriter: DefaultMessageFormatter = new DefaultMessageFormatter
-
- override def configure(configs: Map[String, _]): Unit = defaultWriter.configure(configs)
-
- def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
- import consumerRecord._
- defaultWriter.writeTo(consumerRecord, output)
- logger.info({if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) s"$timestampType:$timestamp, " else ""} +
- s"key:${if (key == null) "null" else new String(key, StandardCharsets.UTF_8)}, " +
- s"value:${if (value == null) "null" else new String(value, StandardCharsets.UTF_8)}")
- }
-}
-
-class NoOpMessageFormatter extends MessageFormatter {
-
- def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {}
-}
-
diff --git a/core/src/main/scala/kafka/utils/ToolsUtils.scala b/core/src/main/scala/kafka/utils/ToolsUtils.scala
index 7a2aa3111c5..edd79c00030 100644
--- a/core/src/main/scala/kafka/utils/ToolsUtils.scala
+++ b/core/src/main/scala/kafka/utils/ToolsUtils.scala
@@ -69,8 +69,7 @@ object ToolsUtils {
/**
* This is a simple wrapper around `CommandLineUtils.printUsageAndExit`.
* It is needed for tools migration (KAFKA-14525), as there is no Java equivalent for return type `Nothing`.
- * Can be removed once [[kafka.admin.ConsumerGroupCommand]], [[kafka.tools.ConsoleConsumer]]
- * and [[kafka.tools.ConsoleProducer]] are migrated.
+ * Can be removed once [[kafka.admin.ConsumerGroupCommand]] and [[kafka.tools.ConsoleProducer]] are migrated.
*
* @param parser Command line options parser.
* @param message Error message.
diff --git a/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala b/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala
deleted file mode 100644
index 244a9cfb148..00000000000
--- a/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import java.io.PrintStream
-
-import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.apache.kafka.common.header.Headers
-import org.apache.kafka.common.serialization.Deserializer
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-import org.mockito.Mockito._
-
-class CustomDeserializer extends Deserializer[String] {
-
- override def deserialize(topic: String, data: Array[Byte]): String = {
- assertNotNull(topic, "topic must not be null")
- new String(data)
- }
-
- override def deserialize(topic: String, headers: Headers, data: Array[Byte]): String = {
- println("WITH HEADERS")
- new String(data)
- }
-}
-
-class CustomDeserializerTest {
-
- @Test
- def checkFormatterCallDeserializerWithHeaders(): Unit = {
- val formatter = new DefaultMessageFormatter()
- formatter.valueDeserializer = Some(new CustomDeserializer)
- val output = TestUtils.grabConsoleOutput(formatter.writeTo(
- new ConsumerRecord("topic_test", 1, 1L, "key".getBytes, "value".getBytes), mock(classOf[PrintStream])))
- assertTrue(output.contains("WITH HEADERS"), "DefaultMessageFormatter should call `deserialize` method with headers.")
- formatter.close()
- }
-
- @Test
- def checkDeserializerTopicIsNotNull(): Unit = {
- val formatter = new DefaultMessageFormatter()
- formatter.keyDeserializer = Some(new CustomDeserializer)
-
- formatter.writeTo(new ConsumerRecord("topic_test", 1, 1L, "key".getBytes, "value".getBytes),
- mock(classOf[PrintStream]))
-
- formatter.close()
- }
-}
diff --git a/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala b/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala
deleted file mode 100644
index 12bbc948110..00000000000
--- a/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import java.io.{ByteArrayOutputStream, Closeable, PrintStream}
-import java.nio.charset.StandardCharsets
-import java.util
-
-import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.apache.kafka.common.header.Header
-import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders}
-import org.apache.kafka.common.record.TimestampType
-import org.apache.kafka.common.serialization.Deserializer
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, MethodSource}
-
-import java.util.Optional
-import scala.jdk.CollectionConverters._
-
-class DefaultMessageFormatterTest {
- import DefaultMessageFormatterTest._
-
- @ParameterizedTest
- @MethodSource(Array("parameters"))
- def testWriteRecord(name: String, record: ConsumerRecord[Array[Byte], Array[Byte]], properties: Map[String, String], expected: String): Unit = {
- withResource(new ByteArrayOutputStream()) { baos =>
- withResource(new PrintStream(baos)) { ps =>
- val formatter = buildFormatter(properties)
- formatter.writeTo(record, ps)
- val actual = new String(baos.toByteArray(), StandardCharsets.UTF_8)
- assertEquals(expected, actual)
-
- }
- }
- }
-}
-
-object DefaultMessageFormatterTest {
- def parameters: java.util.stream.Stream[Arguments] = {
- Seq(
- Arguments.of(
- "print nothing",
- consumerRecord(),
- Map("print.value" -> "false"),
- ""),
- Arguments.of(
- "print key",
- consumerRecord(),
- Map("print.key" -> "true",
- "print.value" -> "false"),
- "someKey\n"),
- Arguments.of(
- "print value",
- consumerRecord(),
- Map(),
- "someValue\n"),
- Arguments.of(
- "print empty timestamp",
- consumerRecord(timestampType = TimestampType.NO_TIMESTAMP_TYPE),
- Map("print.timestamp" -> "true",
- "print.value" -> "false"),
- "NO_TIMESTAMP\n"),
- Arguments.of(
- "print log append time timestamp",
- consumerRecord(timestampType = TimestampType.LOG_APPEND_TIME),
- Map("print.timestamp" -> "true",
- "print.value" -> "false"),
- "LogAppendTime:1234\n"),
- Arguments.of(
- "print create time timestamp",
- consumerRecord(timestampType = TimestampType.CREATE_TIME),
- Map("print.timestamp" -> "true",
- "print.value" -> "false"),
- "CreateTime:1234\n"),
- Arguments.of(
- "print partition",
- consumerRecord(),
- Map("print.partition" -> "true",
- "print.value" -> "false"),
- "Partition:9\n"),
- Arguments.of(
- "print offset",
- consumerRecord(),
- Map("print.offset" -> "true",
- "print.value" -> "false"),
- "Offset:9876\n"),
- Arguments.of(
- "print headers",
- consumerRecord(),
- Map("print.headers" -> "true",
- "print.value" -> "false"),
- "h1:v1,h2:v2\n"),
- Arguments.of(
- "print empty headers",
- consumerRecord(headers = Nil),
- Map("print.headers" -> "true",
- "print.value" -> "false"),
- "NO_HEADERS\n"),
- Arguments.of(
- "print all possible fields with default delimiters",
- consumerRecord(),
- Map("print.key" -> "true",
- "print.timestamp" -> "true",
- "print.partition" -> "true",
- "print.offset" -> "true",
- "print.headers" -> "true",
- "print.value" -> "true"),
- "CreateTime:1234\tPartition:9\tOffset:9876\th1:v1,h2:v2\tsomeKey\tsomeValue\n"),
- Arguments.of(
- "print all possible fields with custom delimiters",
- consumerRecord(),
- Map("key.separator" -> "|",
- "line.separator" -> "^",
- "headers.separator" -> "#",
- "print.key" -> "true",
- "print.timestamp" -> "true",
- "print.partition" -> "true",
- "print.offset" -> "true",
- "print.headers" -> "true",
- "print.value" -> "true"),
- "CreateTime:1234|Partition:9|Offset:9876|h1:v1#h2:v2|someKey|someValue^"),
- Arguments.of(
- "print key with custom deserializer",
- consumerRecord(),
- Map("print.key" -> "true",
- "print.headers" -> "true",
- "print.value" -> "true",
- "key.deserializer" -> "kafka.tools.UpperCaseDeserializer"),
- "h1:v1,h2:v2\tSOMEKEY\tsomeValue\n"),
- Arguments.of(
- "print value with custom deserializer",
- consumerRecord(),
- Map("print.key" -> "true",
- "print.headers" -> "true",
- "print.value" -> "true",
- "value.deserializer" -> "kafka.tools.UpperCaseDeserializer"),
- "h1:v1,h2:v2\tsomeKey\tSOMEVALUE\n"),
- Arguments.of(
- "print headers with custom deserializer",
- consumerRecord(),
- Map("print.key" -> "true",
- "print.headers" -> "true",
- "print.value" -> "true",
- "headers.deserializer" -> "kafka.tools.UpperCaseDeserializer"),
- "h1:V1,h2:V2\tsomeKey\tsomeValue\n"),
- Arguments.of(
- "print key and value",
- consumerRecord(),
- Map("print.key" -> "true",
- "print.value" -> "true"),
- "someKey\tsomeValue\n"),
- Arguments.of(
- "print fields in the beginning, middle and the end",
- consumerRecord(),
- Map("print.key" -> "true",
- "print.value" -> "true",
- "print.partition" -> "true"),
- "Partition:9\tsomeKey\tsomeValue\n"),
- Arguments.of(
- "null value without custom null literal",
- consumerRecord(value = null),
- Map("print.key" -> "true"),
- "someKey\tnull\n"),
- Arguments.of(
- "null value with custom null literal",
- consumerRecord(value = null),
- Map("print.key" -> "true",
- "null.literal" -> "NULL"),
- "someKey\tNULL\n"),
- ).asJava.stream()
- }
-
- private def buildFormatter(propsToSet: Map[String, String]): DefaultMessageFormatter = {
- val formatter = new DefaultMessageFormatter()
- formatter.configure(propsToSet.asJava)
- formatter
- }
-
-
- private def header(key: String, value: String) = {
- new RecordHeader(key, value.getBytes(StandardCharsets.UTF_8))
- }
-
- private def consumerRecord(key: String = "someKey",
- value: String = "someValue",
- headers: Iterable[Header] = Seq(header("h1", "v1"), header("h2", "v2")),
- partition: Int = 9,
- offset: Long = 9876,
- timestamp: Long = 1234,
- timestampType: TimestampType = TimestampType.CREATE_TIME) = {
- new ConsumerRecord[Array[Byte], Array[Byte]](
- "someTopic",
- partition,
- offset,
- timestamp,
- timestampType,
- 0,
- 0,
- if (key == null) null else key.getBytes(StandardCharsets.UTF_8),
- if (value == null) null else value.getBytes(StandardCharsets.UTF_8),
- new RecordHeaders(headers.asJava),
- Optional.empty[Integer])
- }
-
- private def withResource[Resource <: Closeable, Result](resource: Resource)(handler: Resource => Result): Result = {
- try {
- handler(resource)
- } finally {
- resource.close()
- }
- }
-}
-
-class UpperCaseDeserializer extends Deserializer[String] {
- override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {}
- override def deserialize(topic: String, data: Array[Byte]): String = new String(data, StandardCharsets.UTF_8).toUpperCase
- override def close(): Unit = {}
-}
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
deleted file mode 100644
index 054cede8e62..00000000000
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ /dev/null
@@ -1,676 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import java.io.{ByteArrayOutputStream, PrintStream}
-import java.util.{HashMap, Optional, Map => JMap}
-import java.time.Duration
-import kafka.tools.ConsoleConsumer.ConsumerWrapper
-import kafka.utils.{Exit, TestUtils}
-import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, OffsetResetStrategy}
-import org.apache.kafka.common.{MessageFormatter, TopicPartition}
-import org.apache.kafka.common.record.TimestampType
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.test.MockDeserializer
-import org.mockito.Mockito._
-import org.mockito.ArgumentMatchers
-import ArgumentMatchers._
-import org.apache.kafka.clients.consumer.Consumer
-import org.apache.kafka.clients.consumer.ConsumerRecords
-import org.apache.kafka.common.errors.TimeoutException
-import org.apache.kafka.common.header.internals.RecordHeaders
-import org.apache.kafka.server.util.MockTime
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test}
-
-import scala.jdk.CollectionConverters._
-
-class ConsoleConsumerTest {
-
- @BeforeEach
- def setup(): Unit = {
- ConsoleConsumer.messageCount = 0
- }
-
- @Test
- def shouldThrowTimeoutExceptionWhenTimeoutIsReached(): Unit = {
- val topic = "test"
- val time = new MockTime
- val timeoutMs = 1000
-
- val mockConsumer = mock(classOf[Consumer[Array[Byte], Array[Byte]]])
-
- when(mockConsumer.poll(Duration.ofMillis(timeoutMs))).thenAnswer { _ =>
- time.sleep(timeoutMs / 2 + 1)
- ConsumerRecords.EMPTY
- }
-
- val consumer = new ConsumerWrapper(
- topic = Some(topic),
- partitionId = None,
- offset = None,
- includedTopics = None,
- consumer = mockConsumer,
- timeoutMs = timeoutMs,
- time = time
- )
-
- assertThrows(classOf[TimeoutException], () => consumer.receive())
- }
-
- @Test
- def shouldResetUnConsumedOffsetsBeforeExit(): Unit = {
- val topic = "test"
- val maxMessages: Int = 123
- val totalMessages: Int = 700
- val startOffset: java.lang.Long = 0L
-
- val mockConsumer = new MockConsumer[Array[Byte], Array[Byte]](OffsetResetStrategy.EARLIEST)
- val tp1 = new TopicPartition(topic, 0)
- val tp2 = new TopicPartition(topic, 1)
-
- val consumer = new ConsumerWrapper(Some(topic), None, None, None, mockConsumer)
-
- mockConsumer.rebalance(List(tp1, tp2).asJava)
- mockConsumer.updateBeginningOffsets(Map(tp1 -> startOffset, tp2 -> startOffset).asJava)
-
- 0 until totalMessages foreach { i =>
- // add all records, each partition should have half of `totalMessages`
- mockConsumer.addRecord(new ConsumerRecord[Array[Byte], Array[Byte]](topic, i % 2, i / 2, "key".getBytes, "value".getBytes))
- }
-
- val formatter = mock(classOf[MessageFormatter])
-
- ConsoleConsumer.process(maxMessages, formatter, consumer, System.out, skipMessageOnError = false)
- assertEquals(totalMessages, mockConsumer.position(tp1) + mockConsumer.position(tp2))
-
- consumer.resetUnconsumedOffsets()
- assertEquals(maxMessages, mockConsumer.position(tp1) + mockConsumer.position(tp2))
-
- verify(formatter, times(maxMessages)).writeTo(any(), any())
- }
-
- @Test
- def shouldLimitReadsToMaxMessageLimit(): Unit = {
- val consumer = mock(classOf[ConsumerWrapper])
- val formatter = mock(classOf[MessageFormatter])
- val record = new ConsumerRecord("foo", 1, 1, Array[Byte](), Array[Byte]())
-
- val messageLimit: Int = 10
- when(consumer.receive()).thenReturn(record)
-
- ConsoleConsumer.process(messageLimit, formatter, consumer, System.out, true)
-
- verify(consumer, times(messageLimit)).receive()
- verify(formatter, times(messageLimit)).writeTo(any(), any())
-
- consumer.cleanup()
- }
-
- @Test
- def shouldStopWhenOutputCheckErrorFails(): Unit = {
- val consumer = mock(classOf[ConsumerWrapper])
- val formatter = mock(classOf[MessageFormatter])
- val printStream = mock(classOf[PrintStream])
-
- val record = new ConsumerRecord("foo", 1, 1, Array[Byte](), Array[Byte]())
-
- when(consumer.receive()).thenReturn(record)
- //Simulate an error on System.out after the first record has been printed
- when(printStream.checkError()).thenReturn(true)
-
- ConsoleConsumer.process(-1, formatter, consumer, printStream, true)
-
- verify(formatter).writeTo(any(), ArgumentMatchers.eq(printStream))
- verify(consumer).receive()
- verify(printStream).checkError()
-
- consumer.cleanup()
- }
-
- @Test
- def shouldParseValidConsumerValidConfig(): Unit = {
- //Given
- val args: Array[String] = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--from-beginning")
-
- //When
- val config = new ConsoleConsumer.ConsumerConfig(args)
-
- //Then
- assertEquals("localhost:9092", config.bootstrapServer)
- assertEquals("test", config.topicArg)
- assertEquals(true, config.fromBeginning)
- }
-
- @Test
- def shouldParseIncludeArgument(): Unit = {
- //Given
- val args: Array[String] = Array(
- "--bootstrap-server", "localhost:9092",
- "--include", "includeTest*",
- "--from-beginning")
-
- //When
- val config = new ConsoleConsumer.ConsumerConfig(args)
-
- //Then
- assertEquals("localhost:9092", config.bootstrapServer)
- assertEquals("includeTest*", config.includedTopicsArg)
- assertEquals(true, config.fromBeginning)
- }
-
- @Test
- def shouldParseWhitelistArgument(): Unit = {
- //Given
- val args: Array[String] = Array(
- "--bootstrap-server", "localhost:9092",
- "--whitelist", "whitelistTest*",
- "--from-beginning")
-
- //When
- val config = new ConsoleConsumer.ConsumerConfig(args)
-
- //Then
- assertEquals("localhost:9092", config.bootstrapServer)
- assertEquals("whitelistTest*", config.includedTopicsArg)
- assertEquals(true, config.fromBeginning)
- }
-
- @Test
- def shouldIgnoreWhitelistArgumentIfIncludeSpecified(): Unit = {
- //Given
- val args: Array[String] = Array(
- "--bootstrap-server", "localhost:9092",
- "--include", "includeTest*",
- "--whitelist", "whitelistTest*",
- "--from-beginning")
-
- //When
- val config = new ConsoleConsumer.ConsumerConfig(args)
-
- //Then
- assertEquals("localhost:9092", config.bootstrapServer)
- assertEquals("includeTest*", config.includedTopicsArg)
- assertEquals(true, config.fromBeginning)
- }
-
- @Test
- def shouldParseValidSimpleConsumerValidConfigWithNumericOffset(): Unit = {
- //Given
- val args: Array[String] = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--partition", "0",
- "--offset", "3")
-
- //When
- val config = new ConsoleConsumer.ConsumerConfig(args)
-
- //Then
- assertEquals("localhost:9092", config.bootstrapServer)
- assertEquals("test", config.topicArg)
- assertEquals(0, config.partitionArg.get)
- assertEquals(3, config.offsetArg)
- assertEquals(false, config.fromBeginning)
-
- }
-
- @Test
- def shouldExitOnUnrecognizedNewConsumerOption(): Unit = {
- Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
-
- //Given
- val args: Array[String] = Array(
- "--new-consumer",
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--from-beginning")
-
- try assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args))
- finally Exit.resetExitProcedure()
- }
-
- @Test
- def shouldParseValidSimpleConsumerValidConfigWithStringOffset(): Unit = {
- //Given
- val args: Array[String] = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--partition", "0",
- "--offset", "LatEst",
- "--property", "print.value=false")
-
- //When
- val config = new ConsoleConsumer.ConsumerConfig(args)
-
- //Then
- assertEquals("localhost:9092", config.bootstrapServer)
- assertEquals("test", config.topicArg)
- assertEquals(0, config.partitionArg.get)
- assertEquals(-1, config.offsetArg)
- assertEquals(false, config.fromBeginning)
- assertEquals(false, config.formatter.asInstanceOf[DefaultMessageFormatter].printValue)
- }
-
- @Test
- def shouldParseValidConsumerConfigWithAutoOffsetResetLatest(): Unit = {
- //Given
- val args: Array[String] = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--consumer-property", "auto.offset.reset=latest")
-
- //When
- val config = new ConsoleConsumer.ConsumerConfig(args)
- val consumerProperties = ConsoleConsumer.consumerProps(config)
-
- //Then
- assertEquals("localhost:9092", config.bootstrapServer)
- assertEquals("test", config.topicArg)
- assertEquals(false, config.fromBeginning)
- assertEquals("latest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
- }
-
- @Test
- def shouldParseValidConsumerConfigWithAutoOffsetResetEarliest(): Unit = {
- //Given
- val args: Array[String] = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--consumer-property", "auto.offset.reset=earliest")
-
- //When
- val config = new ConsoleConsumer.ConsumerConfig(args)
- val consumerProperties = ConsoleConsumer.consumerProps(config)
-
- //Then
- assertEquals("localhost:9092", config.bootstrapServer)
- assertEquals("test", config.topicArg)
- assertEquals(false, config.fromBeginning)
- assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
- }
-
- @Test
- def shouldParseValidConsumerConfigWithAutoOffsetResetAndMatchingFromBeginning(): Unit = {
- //Given
- val args: Array[String] = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--consumer-property", "auto.offset.reset=earliest",
- "--from-beginning")
-
- //When
- val config = new ConsoleConsumer.ConsumerConfig(args)
- val consumerProperties = ConsoleConsumer.consumerProps(config)
-
- //Then
- assertEquals("localhost:9092", config.bootstrapServer)
- assertEquals("test", config.topicArg)
- assertEquals(true, config.fromBeginning)
- assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
- }
-
- @Test
- def shouldParseValidConsumerConfigWithNoOffsetReset(): Unit = {
- //Given
- val args: Array[String] = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test")
-
- //When
- val config = new ConsoleConsumer.ConsumerConfig(args)
- val consumerProperties = ConsoleConsumer.consumerProps(config)
-
- //Then
- assertEquals("localhost:9092", config.bootstrapServer)
- assertEquals("test", config.topicArg)
- assertEquals(false, config.fromBeginning)
- assertEquals("latest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
- }
-
- @Test
- def shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginning(): Unit = {
- Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
-
- //Given
- val args: Array[String] = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--consumer-property", "auto.offset.reset=latest",
- "--from-beginning")
- try {
- val config = new ConsoleConsumer.ConsumerConfig(args)
- assertThrows(classOf[IllegalArgumentException], () => ConsoleConsumer.consumerProps(config))
- }
- finally Exit.resetExitProcedure()
- }
-
- @Test
- def shouldParseConfigsFromFile(): Unit = {
- val propsFile = TestUtils.tempPropertiesFile(Map("request.timeout.ms" -> "1000", "group.id" -> "group1"))
- val args: Array[String] = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--consumer.config", propsFile.getAbsolutePath
- )
-
- val config = new ConsoleConsumer.ConsumerConfig(args)
-
- assertEquals("1000", config.consumerProps.getProperty("request.timeout.ms"))
- assertEquals("group1", config.consumerProps.getProperty("group.id"))
- }
-
- @Test
- def groupIdsProvidedInDifferentPlacesMustMatch(): Unit = {
- Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
-
- // different in all three places
- var propsFile = TestUtils.tempPropertiesFile(Map("group.id" -> "group-from-file"))
- var args: Array[String] = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--group", "group-from-arguments",
- "--consumer-property", "group.id=group-from-properties",
- "--consumer.config", propsFile.getAbsolutePath
- )
-
- assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args))
-
- // the same in all three places
- propsFile = TestUtils.tempPropertiesFile(Map("group.id" -> "test-group"))
- args = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--group", "test-group",
- "--consumer-property", "group.id=test-group",
- "--consumer.config", propsFile.getAbsolutePath
- )
-
- var config = new ConsoleConsumer.ConsumerConfig(args)
- var props = ConsoleConsumer.consumerProps(config)
- assertEquals("test-group", props.getProperty("group.id"))
-
- // different via --consumer-property and --consumer.config
- propsFile = TestUtils.tempPropertiesFile(Map("group.id" -> "group-from-file"))
- args = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--consumer-property", "group.id=group-from-properties",
- "--consumer.config", propsFile.getAbsolutePath
- )
-
- assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args))
-
- // different via --consumer-property and --group
- args = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--group", "group-from-arguments",
- "--consumer-property", "group.id=group-from-properties"
- )
-
- assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args))
-
- // different via --group and --consumer.config
- propsFile = TestUtils.tempPropertiesFile(Map("group.id" -> "group-from-file"))
- args = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--group", "group-from-arguments",
- "--consumer.config", propsFile.getAbsolutePath
- )
- assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args))
-
- // via --group only
- args = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--group", "group-from-arguments"
- )
-
- config = new ConsoleConsumer.ConsumerConfig(args)
- props = ConsoleConsumer.consumerProps(config)
- assertEquals("group-from-arguments", props.getProperty("group.id"))
-
- Exit.resetExitProcedure()
- }
-
- @Test
- def testCustomPropertyShouldBePassedToConfigureMethod(): Unit = {
- val args = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--property", "print.key=true",
- "--property", "key.deserializer=org.apache.kafka.test.MockDeserializer",
- "--property", "key.deserializer.my-props=abc"
- )
- val config = new ConsoleConsumer.ConsumerConfig(args)
- assertTrue(config.formatter.isInstanceOf[DefaultMessageFormatter])
- assertTrue(config.formatterArgs.containsKey("key.deserializer.my-props"))
- val formatter = config.formatter.asInstanceOf[DefaultMessageFormatter]
- assertTrue(formatter.keyDeserializer.get.isInstanceOf[MockDeserializer])
- val keyDeserializer = formatter.keyDeserializer.get.asInstanceOf[MockDeserializer]
- assertEquals(1, keyDeserializer.configs.size)
- assertEquals("abc", keyDeserializer.configs.get("my-props"))
- assertTrue(keyDeserializer.isKey)
- }
-
- @Test
- def testCustomConfigShouldBePassedToConfigureMethod(): Unit = {
- val propsFile = TestUtils.tempPropertiesFile(Map("key.deserializer.my-props" -> "abc", "print.key" -> "false"))
- val args = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--property", "print.key=true",
- "--property", "key.deserializer=org.apache.kafka.test.MockDeserializer",
- "--formatter-config", propsFile.getAbsolutePath
- )
- val config = new ConsoleConsumer.ConsumerConfig(args)
- assertTrue(config.formatter.isInstanceOf[DefaultMessageFormatter])
- assertTrue(config.formatterArgs.containsKey("key.deserializer.my-props"))
- val formatter = config.formatter.asInstanceOf[DefaultMessageFormatter]
- assertTrue(formatter.keyDeserializer.get.isInstanceOf[MockDeserializer])
- val keyDeserializer = formatter.keyDeserializer.get.asInstanceOf[MockDeserializer]
- assertEquals(1, keyDeserializer.configs.size)
- assertEquals("abc", keyDeserializer.configs.get("my-props"))
- assertTrue(keyDeserializer.isKey)
- }
-
- @Test
- def shouldParseGroupIdFromBeginningGivenTogether(): Unit = {
- // Start from earliest
- var args: Array[String] = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--group", "test-group",
- "--from-beginning")
-
- var config = new ConsoleConsumer.ConsumerConfig(args)
- assertEquals("localhost:9092", config.bootstrapServer)
- assertEquals("test", config.topicArg)
- assertEquals(-2, config.offsetArg)
- assertEquals(true, config.fromBeginning)
-
- // Start from latest
- args = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--group", "test-group"
- )
-
- config = new ConsoleConsumer.ConsumerConfig(args)
- assertEquals("localhost:9092", config.bootstrapServer)
- assertEquals("test", config.topicArg)
- assertEquals(-1, config.offsetArg)
- assertEquals(false, config.fromBeginning)
- }
-
- @Test
- def shouldExitOnGroupIdAndPartitionGivenTogether(): Unit = {
- Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
- //Given
- val args: Array[String] = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--group", "test-group",
- "--partition", "0")
-
- try assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args))
- finally Exit.resetExitProcedure()
- }
-
- @Test
- def shouldExitOnOffsetWithoutPartition(): Unit = {
- Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
- //Given
- val args: Array[String] = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--offset", "10")
-
- try assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args))
- finally Exit.resetExitProcedure()
- }
-
- @Test
- def testDefaultMessageFormatter(): Unit = {
- val record = new ConsumerRecord("topic", 0, 123, "key".getBytes, "value".getBytes)
- val formatter = new DefaultMessageFormatter()
- val configs: JMap[String, String] = new HashMap()
-
- formatter.configure(configs)
- var out = new ByteArrayOutputStream()
- formatter.writeTo(record, new PrintStream(out))
- assertEquals("value\n", out.toString)
-
- configs.put("print.key", "true")
- formatter.configure(configs)
- out = new ByteArrayOutputStream()
- formatter.writeTo(record, new PrintStream(out))
- assertEquals("key\tvalue\n", out.toString)
-
- configs.put("print.partition", "true")
- formatter.configure(configs)
- out = new ByteArrayOutputStream()
- formatter.writeTo(record, new PrintStream(out))
- assertEquals("Partition:0\tkey\tvalue\n", out.toString)
-
- configs.put("print.timestamp", "true")
- formatter.configure(configs)
- out = new ByteArrayOutputStream()
- formatter.writeTo(record, new PrintStream(out))
- assertEquals("NO_TIMESTAMP\tPartition:0\tkey\tvalue\n", out.toString)
-
- configs.put("print.offset", "true")
- formatter.configure(configs)
- out = new ByteArrayOutputStream()
- formatter.writeTo(record, new PrintStream(out))
- assertEquals("NO_TIMESTAMP\tPartition:0\tOffset:123\tkey\tvalue\n", out.toString)
-
- out = new ByteArrayOutputStream()
- val record2 = new ConsumerRecord("topic", 0, 123, 123L, TimestampType.CREATE_TIME, -1, -1, "key".getBytes, "value".getBytes,
- new RecordHeaders(), Optional.empty[Integer])
- formatter.writeTo(record2, new PrintStream(out))
- assertEquals("CreateTime:123\tPartition:0\tOffset:123\tkey\tvalue\n", out.toString)
- formatter.close()
- }
-
- @Test
- def testNoOpMessageFormatter(): Unit = {
- val record = new ConsumerRecord("topic", 0, 123, "key".getBytes, "value".getBytes)
- val formatter = new NoOpMessageFormatter()
-
- formatter.configure(new HashMap())
- val out = new ByteArrayOutputStream()
- formatter.writeTo(record, new PrintStream(out))
- assertEquals("", out.toString)
- }
-
- @Test
- def shouldExitIfNoTopicOrFilterSpecified(): Unit = {
- Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
- //Given
- val args: Array[String] = Array(
- "--bootstrap-server", "localhost:9092")
-
- try assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args))
- finally Exit.resetExitProcedure()
- }
-
- @Test
- def shouldExitIfTopicAndIncludeSpecified(): Unit = {
- Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
- //Given
- val args: Array[String] = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--include", "includeTest*")
-
- try assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args))
- finally Exit.resetExitProcedure()
- }
-
- @Test
- def shouldExitIfTopicAndWhitelistSpecified(): Unit = {
- Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
- //Given
- val args: Array[String] = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--whitelist", "whitelistTest*")
-
- try assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args))
- finally Exit.resetExitProcedure()
- }
-
- @Test
- def testClientIdOverride(): Unit = {
- //Given
- val args: Array[String] = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--from-beginning",
- "--consumer-property", "client.id=consumer-1")
-
- //When
- val config = new ConsoleConsumer.ConsumerConfig(args)
- val consumerProperties = ConsoleConsumer.consumerProps(config)
-
- //Then
- assertEquals("consumer-1", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG))
- }
-
- @Test
- def testDefaultClientId(): Unit = {
- //Given
- val args: Array[String] = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--from-beginning")
-
- //When
- val config = new ConsoleConsumer.ConsumerConfig(args)
- val consumerProperties = ConsoleConsumer.consumerProps(config)
-
- //Then
- assertEquals("console-consumer", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG))
- }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 03877c68595..1f3fc98b778 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.integration;
-import kafka.tools.ConsoleConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -62,6 +61,8 @@ import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.tools.consumer.ConsoleConsumer;
+import org.apache.kafka.tools.consumer.ConsoleConsumerOptions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
@@ -1117,7 +1118,7 @@ public class KStreamAggregationIntegrationTest {
final Deserializer valueDeserializer,
final Class innerClass,
final int numMessages,
- final boolean printTimestamp) {
+ final boolean printTimestamp) throws Exception {
final ByteArrayOutputStream newConsole = new ByteArrayOutputStream();
final PrintStream originalStream = System.out;
try (final PrintStream newStream = new PrintStream(newConsole)) {
@@ -1139,8 +1140,7 @@ public class KStreamAggregationIntegrationTest {
"--property", "key.deserializer.window.size.ms=500",
};
- ConsoleConsumer.messageCount_$eq(0); //reset the message count
- ConsoleConsumer.run(new ConsoleConsumer.ConsumerConfig(args));
+ ConsoleConsumer.run(new ConsoleConsumerOptions(args));
newStream.flush();
System.setOut(originalStream);
return newConsole.toString();
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index eb5d3d9f85d..fb87f20df19 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -21,7 +21,7 @@ from ducktape.utils.util import wait_until
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.monitor.jmx import JmxMixin, JmxTool
-from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0
+from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0, LATEST_3_7
from kafkatest.services.kafka.util import fix_opts_for_new_jvm
"""
@@ -210,7 +210,10 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
# LoggingMessageFormatter was introduced after 0.9
if node.version > LATEST_0_9:
- cmd += " --formatter kafka.tools.LoggingMessageFormatter"
+ if node.version > LATEST_3_7:
+ cmd += " --formatter org.apache.kafka.tools.consumer.LoggingMessageFormatter"
+ else:
+ cmd += " --formatter kafka.tools.LoggingMessageFormatter"
if self.enable_systest_events:
# enable systest events is only available in 0.10.0 and later
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 6d0b1ae329f..3e3da017bc4 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -263,3 +263,7 @@ LATEST_3_5 = V_3_5_2
V_3_6_0 = KafkaVersion("3.6.0")
V_3_6_1 = KafkaVersion("3.6.1")
LATEST_3_6 = V_3_6_1
+
+# 3.7.x version
+V_3_7_0 = KafkaVersion("3.7.0")
+LATEST_3_7 = V_3_7_0
diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
new file mode 100644
index 00000000000..f84fb88c23f
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.CountDownLatch;
+import java.util.regex.Pattern;
+import java.util.Collections;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Consumer that dumps messages to standard out.
+ */
+public class ConsoleConsumer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConsoleConsumer.class);
+ private static final CountDownLatch SHUTDOWN_LATCH = new CountDownLatch(1);
+
+ static int messageCount = 0;
+
+ public static void main(String[] args) throws Exception {
+ ConsoleConsumerOptions opts = new ConsoleConsumerOptions(args);
+ try {
+ run(opts);
+ } catch (AuthenticationException ae) {
+ LOG.error("Authentication failed: terminating consumer process", ae);
+ Exit.exit(1);
+ } catch (Throwable t) {
+ LOG.error("Unknown error when running consumer: ", t);
+ Exit.exit(1);
+ }
+ }
+
+ public static void run(ConsoleConsumerOptions opts) {
+ messageCount = 0;
+ long timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : Long.MAX_VALUE;
+ Consumer consumer = new KafkaConsumer<>(opts.consumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ ConsumerWrapper consumerWrapper = opts.partitionArg().isPresent()
+ ? new ConsumerWrapper(Optional.of(opts.topicArg()), opts.partitionArg(), OptionalLong.of(opts.offsetArg()), Optional.empty(), consumer, timeoutMs)
+ : new ConsumerWrapper(Optional.of(opts.topicArg()), OptionalInt.empty(), OptionalLong.empty(), Optional.ofNullable(opts.includedTopicsArg()), consumer, timeoutMs);
+
+ addShutdownHook(consumerWrapper, opts);
+
+ try {
+ process(opts.maxMessages(), opts.formatter(), consumerWrapper, System.out, opts.skipMessageOnError());
+ } finally {
+ consumerWrapper.cleanup();
+ opts.formatter().close();
+ reportRecordCount();
+
+ SHUTDOWN_LATCH.countDown();
+ }
+ }
+
+ static void addShutdownHook(ConsumerWrapper consumer, ConsoleConsumerOptions conf) {
+ Exit.addShutdownHook("consumer-shutdown-hook", () -> {
+ try {
+ consumer.wakeup();
+ SHUTDOWN_LATCH.await();
+ } catch (Throwable t) {
+ LOG.error("Exception while running shutdown hook: ", t);
+ }
+ if (conf.enableSystestEventsLogging()) {
+ System.out.println("shutdown_complete");
+ }
+ });
+ }
+
+ static void process(int maxMessages, MessageFormatter formatter, ConsumerWrapper consumer, PrintStream output, boolean skipMessageOnError) {
+ while (messageCount < maxMessages || maxMessages == -1) {
+ ConsumerRecord msg;
+ try {
+ msg = consumer.receive();
+ } catch (WakeupException we) {
+ LOG.trace("Caught WakeupException because consumer is shutdown, ignore and terminate.");
+ // Consumer will be closed
+ return;
+ } catch (Throwable t) {
+ LOG.error("Error processing message, terminating consumer process: ", t);
+ // Consumer will be closed
+ return;
+ }
+ messageCount += 1;
+ try {
+ formatter.writeTo(new ConsumerRecord<>(msg.topic(), msg.partition(), msg.offset(), msg.timestamp(), msg.timestampType(),
+ 0, 0, msg.key(), msg.value(), msg.headers(), Optional.empty()), output);
+ } catch (Throwable t) {
+ if (skipMessageOnError) {
+ LOG.error("Error processing message, skipping this message: ", t);
+ } else {
+ // Consumer will be closed
+ throw t;
+ }
+ }
+ if (checkErr(output)) {
+ // Consumer will be closed
+ return;
+ }
+ }
+ }
+
+ static void reportRecordCount() {
+ System.err.println("Processed a total of " + messageCount + " messages");
+ }
+
+ static boolean checkErr(PrintStream output) {
+ boolean gotError = output.checkError();
+ if (gotError) {
+ // This means no one is listening to our output stream anymore, time to shut down
+ System.err.println("Unable to write to standard out, closing consumer.");
+ }
+ return gotError;
+ }
+
+ public static class ConsumerWrapper {
+ final Optional topic;
+ final OptionalInt partitionId;
+ final OptionalLong offset;
+ final Optional includedTopics;
+ final Consumer consumer;
+ final long timeoutMs;
+ final Time time = Time.SYSTEM;
+
+ Iterator> recordIter = Collections.emptyIterator();
+
+ public ConsumerWrapper(Optional topic,
+ OptionalInt partitionId,
+ OptionalLong offset,
+ Optional includedTopics,
+ Consumer consumer,
+ long timeoutMs) {
+ this.topic = topic;
+ this.partitionId = partitionId;
+ this.offset = offset;
+ this.includedTopics = includedTopics;
+ this.consumer = consumer;
+ this.timeoutMs = timeoutMs;
+
+ if (topic.isPresent() && partitionId.isPresent() && offset.isPresent() && !includedTopics.isPresent()) {
+ seek(topic.get(), partitionId.getAsInt(), offset.getAsLong());
+ } else if (topic.isPresent() && partitionId.isPresent() && !offset.isPresent() && !includedTopics.isPresent()) {
+ // default to latest if no offset is provided
+ seek(topic.get(), partitionId.getAsInt(), ListOffsetsRequest.LATEST_TIMESTAMP);
+ } else if (topic.isPresent() && !partitionId.isPresent() && !offset.isPresent() && !includedTopics.isPresent()) {
+ consumer.subscribe(Collections.singletonList(topic.get()));
+ } else if (!topic.isPresent() && !partitionId.isPresent() && !offset.isPresent() && includedTopics.isPresent()) {
+ consumer.subscribe(Pattern.compile(includedTopics.get()));
+ } else {
+ throw new IllegalArgumentException("An invalid combination of arguments is provided. " +
+ "Exactly one of 'topic' or 'include' must be provided. " +
+ "If 'topic' is provided, an optional 'partition' may also be provided. " +
+ "If 'partition' is provided, an optional 'offset' may also be provided, otherwise, consumption starts from the end of the partition.");
+ }
+ }
+
+ private void seek(String topic, int partitionId, long offset) {
+ TopicPartition topicPartition = new TopicPartition(topic, partitionId);
+ consumer.assign(Collections.singletonList(topicPartition));
+ if (offset == ListOffsetsRequest.EARLIEST_TIMESTAMP) {
+ consumer.seekToBeginning(Collections.singletonList(topicPartition));
+ } else if (offset == ListOffsetsRequest.LATEST_TIMESTAMP) {
+ consumer.seekToEnd(Collections.singletonList(topicPartition));
+ } else {
+ consumer.seek(topicPartition, offset);
+ }
+ }
+
+ void resetUnconsumedOffsets() {
+ Map smallestUnconsumedOffsets = new HashMap<>();
+ while (recordIter.hasNext()) {
+ ConsumerRecord record = recordIter.next();
+ TopicPartition tp = new TopicPartition(record.topic(), record.partition());
+ // avoid auto-committing offsets which haven't been consumed
+ smallestUnconsumedOffsets.putIfAbsent(tp, record.offset());
+ }
+ smallestUnconsumedOffsets.forEach(consumer::seek);
+ }
+
+ ConsumerRecord receive() {
+ long startTimeMs = time.milliseconds();
+ while (!recordIter.hasNext()) {
+ recordIter = consumer.poll(Duration.ofMillis(timeoutMs)).iterator();
+ if (!recordIter.hasNext() && (time.milliseconds() - startTimeMs > timeoutMs)) {
+ throw new TimeoutException();
+ }
+ }
+ return recordIter.next();
+ }
+
+ void wakeup() {
+ this.consumer.wakeup();
+ }
+
+ void cleanup() {
+ resetUnconsumedOffsets();
+ this.consumer.close();
+ }
+ }
+}
diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
new file mode 100644
index 00000000000..a713afb2bf2
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public final class ConsoleConsumerOptions extends CommandDefaultOptions {
+
+ private static final Random RANDOM = new Random();
+
+ private final OptionSpec topicOpt;
+ private final OptionSpec whitelistOpt;
+ private final OptionSpec includeOpt;
+ private final OptionSpec partitionIdOpt;
+ private final OptionSpec offsetOpt;
+ private final OptionSpec messageFormatterOpt;
+ private final OptionSpec messageFormatterArgOpt;
+ private final OptionSpec messageFormatterConfigOpt;
+ private final OptionSpec> resetBeginningOpt;
+ private final OptionSpec maxMessagesOpt;
+ private final OptionSpec timeoutMsOpt;
+ private final OptionSpec> skipMessageOnErrorOpt;
+ private final OptionSpec bootstrapServerOpt;
+ private final OptionSpec keyDeserializerOpt;
+ private final OptionSpec valueDeserializerOpt;
+ private final OptionSpec> enableSystestEventsLoggingOpt;
+ private final OptionSpec isolationLevelOpt;
+ private final OptionSpec groupIdOpt;
+
+ private final Properties consumerProps;
+ private final long offset;
+ private final MessageFormatter formatter;
+
+ public ConsoleConsumerOptions(String[] args) throws IOException {
+ super(args);
+ topicOpt = parser.accepts("topic", "The topic to consume on.")
+ .withRequiredArg()
+ .describedAs("topic")
+ .ofType(String.class);
+ whitelistOpt = parser.accepts("whitelist",
+ "DEPRECATED, use --include instead; ignored if --include specified. Regular expression specifying list of topics to include for consumption.")
+ .withRequiredArg()
+ .describedAs("Java regex (String)")
+ .ofType(String.class);
+ includeOpt = parser.accepts("include",
+ "Regular expression specifying list of topics to include for consumption.")
+ .withRequiredArg()
+ .describedAs("Java regex (String)")
+ .ofType(String.class);
+ partitionIdOpt = parser.accepts("partition",
+ "The partition to consume from. Consumption starts from the end of the partition unless '--offset' is specified.")
+ .withRequiredArg()
+ .describedAs("partition")
+ .ofType(Integer.class);
+ offsetOpt = parser.accepts("offset", "The offset to consume from (a non-negative number), or 'earliest' which means from beginning, or 'latest' which means from end")
+ .withRequiredArg()
+ .describedAs("consume offset")
+ .ofType(String.class)
+ .defaultsTo("latest");
+ OptionSpec consumerPropertyOpt = parser.accepts("consumer-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.")
+ .withRequiredArg()
+ .describedAs("consumer_prop")
+ .ofType(String.class);
+ OptionSpec consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file. Note that " + consumerPropertyOpt + " takes precedence over this config.")
+ .withRequiredArg()
+ .describedAs("config file")
+ .ofType(String.class);
+ messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.")
+ .withRequiredArg()
+ .describedAs("class")
+ .ofType(String.class)
+ .defaultsTo(DefaultMessageFormatter.class.getName());
+ messageFormatterArgOpt = parser.accepts("property",
+ "The properties to initialize the message formatter. Default properties include: \n" +
+ " print.timestamp=true|false\n" +
+ " print.key=true|false\n" +
+ " print.offset=true|false\n" +
+ " print.partition=true|false\n" +
+ " print.headers=true|false\n" +
+ " print.value=true|false\n" +
+ " key.separator=\n" +
+ " line.separator=\n" +
+ " headers.separator=\n" +
+ " null.literal=\n" +
+ " key.deserializer=\n" +
+ " value.deserializer=\n" +
+ " header.deserializer=\n" +
+ "\nUsers can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers.")
+ .withRequiredArg()
+ .describedAs("prop")
+ .ofType(String.class);
+ messageFormatterConfigOpt = parser.accepts("formatter-config", "Config properties file to initialize the message formatter. Note that " + messageFormatterArgOpt + " takes precedence over this config.")
+ .withRequiredArg()
+ .describedAs("config file")
+ .ofType(String.class);
+ resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " +
+ "start with the earliest message present in the log rather than the latest message.");
+ maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.")
+ .withRequiredArg()
+ .describedAs("num_messages")
+ .ofType(Integer.class);
+ timeoutMsOpt = parser.accepts("timeout-ms", "If specified, exit if no message is available for consumption for the specified interval.")
+ .withRequiredArg()
+ .describedAs("timeout_ms")
+ .ofType(Integer.class);
+ skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
+ "skip it instead of halt.");
+ bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The server(s) to connect to.")
+ .withRequiredArg()
+ .describedAs("server to connect to")
+ .ofType(String.class);
+ keyDeserializerOpt = parser.accepts("key-deserializer")
+ .withRequiredArg()
+ .describedAs("deserializer for key")
+ .ofType(String.class);
+ valueDeserializerOpt = parser.accepts("value-deserializer")
+ .withRequiredArg()
+ .describedAs("deserializer for values")
+ .ofType(String.class);
+ enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events",
+ "Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.)");
+ isolationLevelOpt = parser.accepts("isolation-level",
+ "Set to read_committed in order to filter out transactional messages which are not committed. Set to read_uncommitted " +
+ "to read all messages.")
+ .withRequiredArg()
+ .ofType(String.class)
+ .defaultsTo("read_uncommitted");
+ groupIdOpt = parser.accepts("group", "The consumer group id of the consumer.")
+ .withRequiredArg()
+ .describedAs("consumer group id")
+ .ofType(String.class);
+
+ try {
+ options = parser.parse(args);
+ } catch (OptionException oe) {
+ CommandLineUtils.printUsageAndExit(parser, oe.getMessage());
+ }
+
+ CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read data from Kafka topics and outputs it to standard output.");
+
+ checkRequiredArgs();
+
+ Properties consumerPropsFromFile = options.has(consumerConfigOpt)
+ ? Utils.loadProps(options.valueOf(consumerConfigOpt))
+ : new Properties();
+ Properties extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt));
+ Set groupIdsProvided = checkConsumerGroup(consumerPropsFromFile, extraConsumerProps);
+ consumerProps = buildConsumerProps(consumerPropsFromFile, extraConsumerProps, groupIdsProvided);
+ offset = parseOffset();
+ formatter = buildFormatter();
+ }
+
+ private void checkRequiredArgs() {
+ List topicOrFilterArgs = new ArrayList<>(Arrays.asList(topicArg(), includedTopicsArg()));
+ topicOrFilterArgs.removeIf(Objects::isNull);
+ // user need to specify value for either --topic or one of the include filters options (--include or --whitelist)
+ if (topicOrFilterArgs.size() != 1) {
+ CommandLineUtils.printUsageAndExit(parser, "Exactly one of --include/--topic is required. " +
+ (options.has(whitelistOpt) ? "--whitelist is DEPRECATED use --include instead; ignored if --include specified." : ""));
+ }
+
+ if (partitionArg().isPresent()) {
+ if (!options.has(topicOpt)) {
+ CommandLineUtils.printUsageAndExit(parser, "The topic is required when partition is specified.");
+ }
+ if (fromBeginning() && options.has(offsetOpt)) {
+ CommandLineUtils.printUsageAndExit(parser, "Options from-beginning and offset cannot be specified together.");
+ }
+ } else if (options.has(offsetOpt)) {
+ CommandLineUtils.printUsageAndExit(parser, "The partition is required when offset is specified.");
+ }
+
+ CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt);
+ }
+
+ private Set checkConsumerGroup(Properties consumerPropsFromFile, Properties extraConsumerProps) {
+ // if the group id is provided in more than place (through different means) all values must be the same
+ Set groupIdsProvided = new HashSet<>();
+ if (options.has(groupIdOpt)) {
+ groupIdsProvided.add(options.valueOf(groupIdOpt));
+ }
+
+ if (consumerPropsFromFile.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+ groupIdsProvided.add((String) consumerPropsFromFile.get(ConsumerConfig.GROUP_ID_CONFIG));
+ }
+
+ if (extraConsumerProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+ groupIdsProvided.add(extraConsumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
+ }
+ if (groupIdsProvided.size() > 1) {
+ CommandLineUtils.printUsageAndExit(parser, "The group ids provided in different places (directly using '--group', "
+ + "via '--consumer-property', or via '--consumer.config') do not match. "
+ + "Detected group ids: "
+ + groupIdsProvided.stream().map(group -> "'" + group + "'").collect(Collectors.joining(", ")));
+ }
+ if (!groupIdsProvided.isEmpty() && partitionArg().isPresent()) {
+ CommandLineUtils.printUsageAndExit(parser, "Options group and partition cannot be specified together.");
+ }
+ return groupIdsProvided;
+ }
+
+ private Properties buildConsumerProps(Properties consumerPropsFromFile, Properties extraConsumerProps, Set groupIdsProvided) {
+ Properties consumerProps = new Properties();
+ consumerProps.putAll(consumerPropsFromFile);
+ consumerProps.putAll(extraConsumerProps);
+ setAutoOffsetResetValue(consumerProps);
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer());
+ if (consumerProps.getProperty(ConsumerConfig.CLIENT_ID_CONFIG) == null) {
+ consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "console-consumer");
+ }
+ CommandLineUtils.maybeMergeOptions(consumerProps, ConsumerConfig.ISOLATION_LEVEL_CONFIG, options, isolationLevelOpt);
+
+ if (groupIdsProvided.isEmpty()) {
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "console-consumer-" + RANDOM.nextInt(100000));
+ // By default, avoid unnecessary expansion of the coordinator cache since
+ // the auto-generated group and its offsets is not intended to be used again
+ if (!consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+ consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ }
+ } else {
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdsProvided.iterator().next());
+ }
+ return consumerProps;
+ }
+
+ /**
+ * Used to retrieve the correct value for the consumer parameter 'auto.offset.reset'.
+ * Order of priority is:
+ * 1. Explicitly set parameter via --consumer.property command line parameter
+ * 2. Explicit --from-beginning given -> 'earliest'
+ * 3. Default value of 'latest'
+ * In case both --from-beginning and an explicit value are specified an error is thrown if these
+ * are conflicting.
+ */
+ private void setAutoOffsetResetValue(Properties props) {
+ String earliestConfigValue = "earliest";
+ String latestConfigValue = "latest";
+
+ if (props.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
+ // auto.offset.reset parameter was specified on the command line
+ String autoResetOption = props.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+ if (fromBeginning() && !earliestConfigValue.equals(autoResetOption)) {
+ // conflicting options - latest und earliest, throw an error
+ System.err.println("Can't simultaneously specify --from-beginning and 'auto.offset.reset=" + autoResetOption + "', " +
+ "please remove one option");
+ Exit.exit(1);
+ }
+ // nothing to do, checking for valid parameter values happens later and the specified
+ // value was already copied during .putall operation
+ } else {
+ // no explicit value for auto.offset.reset was specified
+ // if --from-beginning was specified use earliest, otherwise default to latest
+ String autoResetOption = fromBeginning() ? earliestConfigValue : latestConfigValue;
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoResetOption);
+ }
+ }
+
+ private long parseOffset() {
+ if (options.has(offsetOpt)) {
+ switch (options.valueOf(offsetOpt).toLowerCase(Locale.ROOT)) {
+ case "earliest":
+ return ListOffsetsRequest.EARLIEST_TIMESTAMP;
+ case "latest":
+ return ListOffsetsRequest.LATEST_TIMESTAMP;
+ default:
+ String offsetString = options.valueOf(offsetOpt);
+ try {
+ long offset = Long.parseLong(offsetString);
+ if (offset < 0) {
+ invalidOffset(offsetString);
+ }
+ return offset;
+ } catch (NumberFormatException nfe) {
+ invalidOffset(offsetString);
+ }
+ }
+ } else if (fromBeginning()) {
+ return ListOffsetsRequest.EARLIEST_TIMESTAMP;
+ }
+ return ListOffsetsRequest.LATEST_TIMESTAMP;
+ }
+
+ private void invalidOffset(String offset) {
+ CommandLineUtils.printUsageAndExit(parser, "The provided offset value '" + offset + "' is incorrect. Valid values are " +
+ "'earliest', 'latest', or a non-negative long.");
+ }
+
+ private MessageFormatter buildFormatter() {
+ MessageFormatter formatter = null;
+ try {
+ Class> messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt));
+ formatter = (MessageFormatter) messageFormatterClass.getDeclaredConstructor().newInstance();
+
+ Properties formatterArgs = formatterArgs();
+ Map formatterConfigs = new HashMap<>();
+ for (final String name : formatterArgs.stringPropertyNames()) {
+ formatterConfigs.put(name, formatterArgs.getProperty(name));
+ }
+
+ formatter.configure(formatterConfigs);
+
+ } catch (Exception e) {
+ CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+ }
+ return formatter;
+ }
+
+ Properties consumerProps() {
+ return consumerProps;
+ }
+
+ boolean fromBeginning() {
+ return options.has(resetBeginningOpt);
+ }
+
+ long offsetArg() {
+ return offset;
+ }
+
+ boolean skipMessageOnError() {
+ return options.has(skipMessageOnErrorOpt);
+ }
+
+ OptionalInt partitionArg() {
+ if (options.has(partitionIdOpt)) {
+ return OptionalInt.of(options.valueOf(partitionIdOpt));
+ }
+ return OptionalInt.empty();
+ }
+
+ String topicArg() {
+ return options.valueOf(topicOpt);
+ }
+
+ int maxMessages() {
+ return options.has(maxMessagesOpt) ? options.valueOf(maxMessagesOpt) : -1;
+ }
+
+ int timeoutMs() {
+ return options.has(timeoutMsOpt) ? options.valueOf(timeoutMsOpt) : -1;
+ }
+
+ boolean enableSystestEventsLogging() {
+ return options.has(enableSystestEventsLoggingOpt);
+ }
+
+ String bootstrapServer() {
+ return options.valueOf(bootstrapServerOpt);
+ }
+
+ String includedTopicsArg() {
+ return options.has(includeOpt)
+ ? options.valueOf(includeOpt)
+ : options.valueOf(whitelistOpt);
+ }
+
+ Properties formatterArgs() throws IOException {
+ Properties formatterArgs = options.has(messageFormatterConfigOpt)
+ ? Utils.loadProps(options.valueOf(messageFormatterConfigOpt))
+ : new Properties();
+ String keyDeserializer = options.valueOf(keyDeserializerOpt);
+ if (keyDeserializer != null && !keyDeserializer.isEmpty()) {
+ formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
+ }
+ String valueDeserializer = options.valueOf(valueDeserializerOpt);
+ if (valueDeserializer != null && !valueDeserializer.isEmpty()) {
+ formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
+ }
+ formatterArgs.putAll(CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt)));
+
+ return formatterArgs;
+ }
+
+ MessageFormatter formatter() {
+ return formatter;
+ }
+}
diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/DefaultMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/DefaultMessageFormatter.java
new file mode 100644
index 00000000000..402deb8358e
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/consumer/DefaultMessageFormatter.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+
+class DefaultMessageFormatter implements MessageFormatter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultMessageFormatter.class);
+
+ private boolean printTimestamp = false;
+ private boolean printKey = false;
+ private boolean printValue = true;
+ private boolean printPartition = false;
+ private boolean printOffset = false;
+ private boolean printHeaders = false;
+ private byte[] keySeparator = utfBytes("\t");
+ private byte[] lineSeparator = utfBytes("\n");
+ private byte[] headersSeparator = utfBytes(",");
+ private byte[] nullLiteral = utfBytes("null");
+
+ private Optional> keyDeserializer = Optional.empty();
+ private Optional> valueDeserializer = Optional.empty();
+ private Optional> headersDeserializer = Optional.empty();
+
+ @Override
+ public void configure(Map configs) {
+ if (configs.containsKey("print.timestamp")) {
+ printTimestamp = getBoolProperty(configs, "print.timestamp");
+ }
+ if (configs.containsKey("print.key")) {
+ printKey = getBoolProperty(configs, "print.key");
+ }
+ if (configs.containsKey("print.offset")) {
+ printOffset = getBoolProperty(configs, "print.offset");
+ }
+ if (configs.containsKey("print.partition")) {
+ printPartition = getBoolProperty(configs, "print.partition");
+ }
+ if (configs.containsKey("print.headers")) {
+ printHeaders = getBoolProperty(configs, "print.headers");
+ }
+ if (configs.containsKey("print.value")) {
+ printValue = getBoolProperty(configs, "print.value");
+ }
+ if (configs.containsKey("key.separator")) {
+ keySeparator = getByteProperty(configs, "key.separator");
+ }
+ if (configs.containsKey("line.separator")) {
+ lineSeparator = getByteProperty(configs, "line.separator");
+ }
+ if (configs.containsKey("headers.separator")) {
+ headersSeparator = getByteProperty(configs, "headers.separator");
+ }
+ if (configs.containsKey("null.literal")) {
+ nullLiteral = getByteProperty(configs, "null.literal");
+ }
+
+ keyDeserializer = getDeserializerProperty(configs, true, "key.deserializer");
+ valueDeserializer = getDeserializerProperty(configs, false, "value.deserializer");
+ headersDeserializer = getDeserializerProperty(configs, false, "headers.deserializer");
+ }
+
+ // for testing
+ public boolean printValue() {
+ return printValue;
+ }
+
+ // for testing
+ public Optional> keyDeserializer() {
+ return keyDeserializer;
+ }
+
+ private void writeSeparator(PrintStream output, boolean columnSeparator) {
+ try {
+ if (columnSeparator) {
+ output.write(keySeparator);
+ } else {
+ output.write(lineSeparator);
+ }
+ } catch (IOException ioe) {
+ LOG.error("Unable to write the separator to the output", ioe);
+ }
+ }
+
+ private byte[] deserialize(ConsumerRecord consumerRecord, Optional> deserializer, byte[] sourceBytes, String topic) {
+ byte[] nonNullBytes = sourceBytes != null ? sourceBytes : nullLiteral;
+ return deserializer.map(value -> utfBytes(value.deserialize(topic, consumerRecord.headers(), nonNullBytes).toString())).orElse(nonNullBytes);
+ }
+
+ @Override
+ public void writeTo(ConsumerRecord consumerRecord, PrintStream output) {
+ try {
+ if (printTimestamp) {
+ if (consumerRecord.timestampType() != TimestampType.NO_TIMESTAMP_TYPE) {
+ output.print(consumerRecord.timestampType() + ":" + consumerRecord.timestamp());
+ } else {
+ output.print("NO_TIMESTAMP");
+ }
+ writeSeparator(output, printOffset || printPartition || printHeaders || printKey || printValue);
+ }
+
+ if (printPartition) {
+ output.print("Partition:");
+ output.print(consumerRecord.partition());
+ writeSeparator(output, printOffset || printHeaders || printKey || printValue);
+ }
+
+ if (printOffset) {
+ output.print("Offset:");
+ output.print(consumerRecord.offset());
+ writeSeparator(output, printHeaders || printKey || printValue);
+ }
+
+ if (printHeaders) {
+ Iterator headersIt = consumerRecord.headers().iterator();
+ if (!headersIt.hasNext()) {
+ output.print("NO_HEADERS");
+ } else {
+ while (headersIt.hasNext()) {
+ Header header = headersIt.next();
+ output.print(header.key() + ":");
+ output.write(deserialize(consumerRecord, headersDeserializer, header.value(), consumerRecord.topic()));
+ if (headersIt.hasNext()) {
+ output.write(headersSeparator);
+ }
+ }
+ }
+ writeSeparator(output, printKey || printValue);
+ }
+
+ if (printKey) {
+ output.write(deserialize(consumerRecord, keyDeserializer, consumerRecord.key(), consumerRecord.topic()));
+ writeSeparator(output, printValue);
+ }
+
+ if (printValue) {
+ output.write(deserialize(consumerRecord, valueDeserializer, consumerRecord.value(), consumerRecord.topic()));
+ output.write(lineSeparator);
+ }
+ } catch (IOException ioe) {
+ LOG.error("Unable to write the consumer record to the output", ioe);
+ }
+ }
+
+ private Map propertiesWithKeyPrefixStripped(String prefix, Map configs) {
+ final Map newConfigs = new HashMap<>();
+ for (Map.Entry entry : configs.entrySet()) {
+ if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) {
+ newConfigs.put(entry.getKey().substring(prefix.length()), entry.getValue());
+ }
+ }
+ return newConfigs;
+ }
+
+ private byte[] utfBytes(String str) {
+ return str.getBytes(StandardCharsets.UTF_8);
+ }
+
+ private byte[] getByteProperty(Map configs, String key) {
+ return utfBytes((String) configs.get(key));
+ }
+
+ private boolean getBoolProperty(Map configs, String key) {
+ return ((String) configs.get(key)).trim().equalsIgnoreCase("true");
+ }
+
+ private Optional> getDeserializerProperty(Map configs, boolean isKey, String key) {
+ if (configs.containsKey(key)) {
+ String name = (String) configs.get(key);
+ try {
+ Deserializer> deserializer = (Deserializer>) Class.forName(name).getDeclaredConstructor().newInstance();
+ Map deserializerConfig = propertiesWithKeyPrefixStripped(key + ".", configs);
+ deserializer.configure(deserializerConfig, isKey);
+ return Optional.of(deserializer);
+ } catch (Exception e) {
+ LOG.error("Unable to instantiate a deserializer from " + name, e);
+ }
+ }
+ return Optional.empty();
+ }
+}
diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/LoggingMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/LoggingMessageFormatter.java
new file mode 100644
index 00000000000..f010bbaea9b
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/consumer/LoggingMessageFormatter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.record.TimestampType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+class LoggingMessageFormatter implements MessageFormatter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LoggingMessageFormatter.class);
+ private final DefaultMessageFormatter defaultWriter = new DefaultMessageFormatter();
+
+ @Override
+ public void configure(Map configs) {
+ defaultWriter.configure(configs);
+ }
+
+ @Override
+ public void writeTo(ConsumerRecord consumerRecord, PrintStream output) {
+ defaultWriter.writeTo(consumerRecord, output);
+ String timestamp = consumerRecord.timestampType() != TimestampType.NO_TIMESTAMP_TYPE
+ ? consumerRecord.timestampType() + ":" + consumerRecord.timestamp() + ", "
+ : "";
+ String key = "key:" + (consumerRecord.key() == null ? "null " : new String(consumerRecord.key(), StandardCharsets.UTF_8) + ", ");
+ String value = "value:" + (consumerRecord.value() == null ? "null" : new String(consumerRecord.value(), StandardCharsets.UTF_8));
+ LOG.info(timestamp + key + value);
+ }
+}
diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/NoOpMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/NoOpMessageFormatter.java
new file mode 100644
index 00000000000..cba9e9b6728
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/consumer/NoOpMessageFormatter.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+
+import java.io.PrintStream;
+
+class NoOpMessageFormatter implements MessageFormatter {
+
+ @Override
+ public void writeTo(ConsumerRecord consumerRecord, PrintStream output) {
+
+ }
+}
diff --git a/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java b/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java
index 9b698b0cc5f..fdc732ea29a 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java
@@ -28,6 +28,8 @@ import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.storage.internals.log.LogConfig;
import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
@@ -198,6 +200,14 @@ public class ToolsTestUtils {
.collect(Collectors.joining(","));
}
+ public static File tempPropertiesFile(Map properties) throws IOException {
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry entry : properties.entrySet()) {
+ sb.append(entry.getKey() + "=" + entry.getValue() + System.lineSeparator());
+ }
+ return org.apache.kafka.test.TestUtils.tempFile(sb.toString());
+ }
+
public static class MockExitProcedure implements Exit.Procedure {
private boolean hasExited = false;
private int statusCode;
diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java
new file mode 100644
index 00000000000..523122c4cdd
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java
@@ -0,0 +1,621 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.test.MockDeserializer;
+import org.apache.kafka.tools.ToolsTestUtils;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsoleConsumerOptionsTest {
+
+ @Test
+ public void shouldParseValidConsumerValidConfig() throws IOException {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--from-beginning"
+ };
+
+ ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+
+ assertEquals("localhost:9092", config.bootstrapServer());
+ assertEquals("test", config.topicArg());
+ assertTrue(config.fromBeginning());
+ assertFalse(config.enableSystestEventsLogging());
+ assertFalse(config.skipMessageOnError());
+ assertEquals(-1, config.maxMessages());
+ assertEquals(-1, config.timeoutMs());
+ }
+
+ @Test
+ public void shouldParseIncludeArgument() throws IOException {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--include", "includeTest*",
+ "--from-beginning"
+ };
+
+ ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+
+ assertEquals("localhost:9092", config.bootstrapServer());
+ assertEquals("includeTest*", config.includedTopicsArg());
+ assertTrue(config.fromBeginning());
+ }
+
+ @Test
+ public void shouldParseWhitelistArgument() throws IOException {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--whitelist", "whitelistTest*",
+ "--from-beginning"
+ };
+
+ ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+
+ assertEquals("localhost:9092", config.bootstrapServer());
+ assertEquals("whitelistTest*", config.includedTopicsArg());
+ assertTrue(config.fromBeginning());
+ }
+
+ @Test
+ public void shouldIgnoreWhitelistArgumentIfIncludeSpecified() throws IOException {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--include", "includeTest*",
+ "--whitelist", "whitelistTest*",
+ "--from-beginning"
+ };
+ ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+ assertEquals("localhost:9092", config.bootstrapServer());
+ assertEquals("includeTest*", config.includedTopicsArg());
+ assertTrue(config.fromBeginning());
+ }
+
+ @Test
+ public void shouldParseValidSimpleConsumerValidConfigWithNumericOffset() throws IOException {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--partition", "0",
+ "--offset", "3"
+ };
+
+ ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+
+ assertEquals("localhost:9092", config.bootstrapServer());
+ assertEquals("test", config.topicArg());
+ assertTrue(config.partitionArg().isPresent());
+ assertEquals(0, config.partitionArg().getAsInt());
+ assertEquals(3, config.offsetArg());
+ assertFalse(config.fromBeginning());
+ }
+
+ @Test
+ public void shouldExitOnUnrecognizedNewConsumerOption() {
+ Exit.setExitProcedure((code, message) -> {
+ throw new IllegalArgumentException(message);
+ });
+
+ String[] args = new String[]{
+ "--new-consumer",
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--from-beginning"
+ };
+
+ try {
+ assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
+ } finally {
+ Exit.resetExitProcedure();
+ }
+ }
+
+ @Test
+ public void shouldExitIfPartitionButNoTopic() {
+ Exit.setExitProcedure((code, message) -> {
+ throw new IllegalArgumentException(message);
+ });
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--include", "test.*",
+ "--partition", "0"
+ };
+
+ try {
+ assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
+ } finally {
+ Exit.resetExitProcedure();
+ }
+ }
+
+ @Test
+ public void shouldExitIfFromBeginningAndOffset() {
+ Exit.setExitProcedure((code, message) -> {
+ throw new IllegalArgumentException(message);
+ });
+
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--partition", "0",
+ "--from-beginning",
+ "--offset", "123"
+ };
+
+ try {
+ assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
+ } finally {
+ Exit.resetExitProcedure();
+ }
+ }
+
+ @Test
+ public void shouldParseValidSimpleConsumerValidConfigWithStringOffset() throws Exception {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--partition", "0",
+ "--offset", "LatEst",
+ "--property", "print.value=false"
+ };
+
+ ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+
+ assertEquals("localhost:9092", config.bootstrapServer());
+ assertEquals("test", config.topicArg());
+ assertTrue(config.partitionArg().isPresent());
+ assertEquals(0, config.partitionArg().getAsInt());
+ assertEquals(-1, config.offsetArg());
+ assertFalse(config.fromBeginning());
+ assertFalse(((DefaultMessageFormatter) config.formatter()).printValue());
+ }
+
+ @Test
+ public void shouldParseValidConsumerConfigWithAutoOffsetResetLatest() throws IOException {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--consumer-property", "auto.offset.reset=latest"
+ };
+
+ ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+ Properties consumerProperties = config.consumerProps();
+
+ assertEquals("localhost:9092", config.bootstrapServer());
+ assertEquals("test", config.topicArg());
+ assertFalse(config.fromBeginning());
+ assertEquals("latest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+ }
+
+ @Test
+ public void shouldParseValidConsumerConfigWithAutoOffsetResetEarliest() throws IOException {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--consumer-property", "auto.offset.reset=earliest"
+ };
+
+ ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+ Properties consumerProperties = config.consumerProps();
+
+ assertEquals("localhost:9092", config.bootstrapServer());
+ assertEquals("test", config.topicArg());
+ assertFalse(config.fromBeginning());
+ assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+ }
+
+ @Test
+ public void shouldParseValidConsumerConfigWithAutoOffsetResetAndMatchingFromBeginning() throws IOException {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--consumer-property", "auto.offset.reset=earliest",
+ "--from-beginning"
+ };
+
+ ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+ Properties consumerProperties = config.consumerProps();
+
+ assertEquals("localhost:9092", config.bootstrapServer());
+ assertEquals("test", config.topicArg());
+ assertTrue(config.fromBeginning());
+ assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+ }
+
+ @Test
+ public void shouldParseValidConsumerConfigWithNoOffsetReset() throws IOException {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test"
+ };
+
+ ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+ Properties consumerProperties = config.consumerProps();
+
+ assertEquals("localhost:9092", config.bootstrapServer());
+ assertEquals("test", config.topicArg());
+ assertFalse(config.fromBeginning());
+ assertEquals("latest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+ }
+
+ @Test
+ public void shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginning() {
+ Exit.setExitProcedure((code, message) -> {
+ throw new IllegalArgumentException(message);
+ });
+
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--consumer-property", "auto.offset.reset=latest",
+ "--from-beginning"
+ };
+ try {
+ assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
+ } finally {
+ Exit.resetExitProcedure();
+ }
+ }
+
+ @Test
+ public void shouldParseConfigsFromFile() throws IOException {
+ Map configs = new HashMap<>();
+ configs.put("request.timeout.ms", "1000");
+ configs.put("group.id", "group1");
+ File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--consumer.config", propsFile.getAbsolutePath()
+ };
+
+ ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+ assertEquals("1000", config.consumerProps().get("request.timeout.ms"));
+ assertEquals("group1", config.consumerProps().get("group.id"));
+ }
+
+ @Test
+ public void groupIdsProvidedInDifferentPlacesMustMatch() throws IOException {
+ Exit.setExitProcedure((code, message) -> {
+ throw new IllegalArgumentException(message);
+ });
+
+ // different in all three places
+ File propsFile = ToolsTestUtils.tempPropertiesFile(Collections.singletonMap("group.id", "group-from-file"));
+ final String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--group", "group-from-arguments",
+ "--consumer-property", "group.id=group-from-properties",
+ "--consumer.config", propsFile.getAbsolutePath()
+ };
+
+ assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
+
+ // the same in all three places
+ propsFile = ToolsTestUtils.tempPropertiesFile(Collections.singletonMap("group.id", "test-group"));
+ final String[] args1 = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--group", "test-group",
+ "--consumer-property", "group.id=test-group",
+ "--consumer.config", propsFile.getAbsolutePath()
+ };
+
+ ConsoleConsumerOptions config = new ConsoleConsumerOptions(args1);
+ Properties props = config.consumerProps();
+ assertEquals("test-group", props.getProperty("group.id"));
+
+ // different via --consumer-property and --consumer.config
+ propsFile = ToolsTestUtils.tempPropertiesFile(Collections.singletonMap("group.id", "group-from-file"));
+ final String[] args2 = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--consumer-property", "group.id=group-from-properties",
+ "--consumer.config", propsFile.getAbsolutePath()
+ };
+
+ assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args2));
+
+ // different via --consumer-property and --group
+ final String[] args3 = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--group", "group-from-arguments",
+ "--consumer-property", "group.id=group-from-properties"
+ };
+
+ assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args3));
+
+ // different via --group and --consumer.config
+ propsFile = ToolsTestUtils.tempPropertiesFile(Collections.singletonMap("group.id", "group-from-file"));
+ final String[] args4 = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--group", "group-from-arguments",
+ "--consumer.config", propsFile.getAbsolutePath()
+ };
+ assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args4));
+
+ // via --group only
+ final String[] args5 = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--group", "group-from-arguments"
+ };
+
+ config = new ConsoleConsumerOptions(args5);
+ props = config.consumerProps();
+ assertEquals("group-from-arguments", props.getProperty("group.id"));
+
+ Exit.resetExitProcedure();
+ }
+
+ @Test
+ public void testCustomPropertyShouldBePassedToConfigureMethod() throws Exception {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--property", "print.key=true",
+ "--property", "key.deserializer=org.apache.kafka.test.MockDeserializer",
+ "--property", "key.deserializer.my-props=abc"
+ };
+
+ ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+
+ assertTrue(config.formatter() instanceof DefaultMessageFormatter);
+ assertTrue(config.formatterArgs().containsKey("key.deserializer.my-props"));
+ DefaultMessageFormatter formatter = (DefaultMessageFormatter) config.formatter();
+ assertTrue(formatter.keyDeserializer().isPresent());
+ assertTrue(formatter.keyDeserializer().get() instanceof MockDeserializer);
+ MockDeserializer keyDeserializer = (MockDeserializer) formatter.keyDeserializer().get();
+ assertEquals(1, keyDeserializer.configs.size());
+ assertEquals("abc", keyDeserializer.configs.get("my-props"));
+ assertTrue(keyDeserializer.isKey);
+ }
+
+ @Test
+ public void testCustomConfigShouldBePassedToConfigureMethod() throws Exception {
+ Map configs = new HashMap<>();
+ configs.put("key.deserializer.my-props", "abc");
+ configs.put("print.key", "false");
+ File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--property", "print.key=true",
+ "--property", "key.deserializer=org.apache.kafka.test.MockDeserializer",
+ "--formatter-config", propsFile.getAbsolutePath()
+ };
+
+ ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+
+ assertTrue(config.formatter() instanceof DefaultMessageFormatter);
+ assertTrue(config.formatterArgs().containsKey("key.deserializer.my-props"));
+ DefaultMessageFormatter formatter = (DefaultMessageFormatter) config.formatter();
+ assertTrue(formatter.keyDeserializer().isPresent());
+ assertTrue(formatter.keyDeserializer().get() instanceof MockDeserializer);
+ MockDeserializer keyDeserializer = (MockDeserializer) formatter.keyDeserializer().get();
+ assertEquals(1, keyDeserializer.configs.size());
+ assertEquals("abc", keyDeserializer.configs.get("my-props"));
+ assertTrue(keyDeserializer.isKey);
+ }
+
+ @Test
+ public void shouldParseGroupIdFromBeginningGivenTogether() throws IOException {
+ // Start from earliest
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--group", "test-group",
+ "--from-beginning"
+ };
+
+ ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+ assertEquals("localhost:9092", config.bootstrapServer());
+ assertEquals("test", config.topicArg());
+ assertEquals(-2, config.offsetArg());
+ assertTrue(config.fromBeginning());
+
+ // Start from latest
+ args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--group", "test-group"
+ };
+
+ config = new ConsoleConsumerOptions(args);
+ assertEquals("localhost:9092", config.bootstrapServer());
+ assertEquals("test", config.topicArg());
+ assertEquals(-1, config.offsetArg());
+ assertFalse(config.fromBeginning());
+ }
+
+ @Test
+ public void shouldExitOnGroupIdAndPartitionGivenTogether() {
+ Exit.setExitProcedure((code, message) -> {
+ throw new IllegalArgumentException(message);
+ });
+
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--group", "test-group",
+ "--partition", "0"
+ };
+
+ try {
+ assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
+ } finally {
+ Exit.resetExitProcedure();
+ }
+ }
+
+ @Test
+ public void shouldExitOnOffsetWithoutPartition() {
+ Exit.setExitProcedure((code, message) -> {
+ throw new IllegalArgumentException(message);
+ });
+
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--offset", "10"
+ };
+
+ try {
+ assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
+ } finally {
+ Exit.resetExitProcedure();
+ }
+ }
+
+ @Test
+ public void shouldExitIfNoTopicOrFilterSpecified() {
+ Exit.setExitProcedure((code, message) -> {
+ throw new IllegalArgumentException(message);
+ });
+
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092"
+ };
+
+ try {
+ assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
+ } finally {
+ Exit.resetExitProcedure();
+ }
+ }
+
+ @Test
+ public void shouldExitIfTopicAndIncludeSpecified() {
+ Exit.setExitProcedure((code, message) -> {
+ throw new IllegalArgumentException(message);
+ });
+
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--include", "includeTest*"
+ };
+
+ try {
+ assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
+ } finally {
+ Exit.resetExitProcedure();
+ }
+ }
+
+ @Test
+ public void shouldExitIfTopicAndWhitelistSpecified() {
+ Exit.setExitProcedure((code, message) -> {
+ throw new IllegalArgumentException(message);
+ });
+
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--whitelist", "whitelistTest*"
+ };
+
+ try {
+ assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
+ } finally {
+ Exit.resetExitProcedure();
+ }
+ }
+
+ @Test
+ public void testClientIdOverride() throws IOException {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--from-beginning",
+ "--consumer-property", "client.id=consumer-1"
+ };
+
+ ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+ Properties consumerProperties = config.consumerProps();
+
+ assertEquals("consumer-1", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
+ }
+
+ @Test
+ public void testDefaultClientId() throws IOException {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--from-beginning"
+ };
+
+ ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+ Properties consumerProperties = config.consumerProps();
+
+ assertEquals("console-consumer", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
+ }
+
+ @Test
+ public void testParseOffset() throws Exception {
+ Exit.setExitProcedure((code, message) -> {
+ throw new IllegalArgumentException(message);
+ });
+
+ try {
+ final String[] badOffset = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--partition", "0",
+ "--offset", "bad"
+ };
+ assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(badOffset));
+
+ final String[] negativeOffset = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--partition", "0",
+ "--offset", "-100"
+ };
+ assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(negativeOffset));
+
+ final String[] earliestOffset = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--partition", "0",
+ "--offset", "earliest"
+ };
+ ConsoleConsumerOptions config = new ConsoleConsumerOptions(earliestOffset);
+ assertEquals(ListOffsetsRequest.EARLIEST_TIMESTAMP, config.offsetArg());
+ } finally {
+ Exit.resetExitProcedure();
+ }
+ }
+}
diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java
new file mode 100644
index 00000000000..008893f9c50
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.util.MockTime;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ConsoleConsumerTest {
+
+ @BeforeEach
+ public void setup() {
+ ConsoleConsumer.messageCount = 0;
+ }
+
+ @Test
+ public void shouldThrowTimeoutExceptionWhenTimeoutIsReached() {
+ String topic = "test";
+ final Time time = new MockTime();
+ final int timeoutMs = 1000;
+
+ @SuppressWarnings("unchecked")
+ Consumer mockConsumer = mock(Consumer.class);
+
+ when(mockConsumer.poll(Duration.ofMillis(timeoutMs))).thenAnswer(invocation -> {
+ time.sleep(timeoutMs / 2 + 1);
+ return ConsumerRecords.EMPTY;
+ });
+
+ ConsoleConsumer.ConsumerWrapper consumer = new ConsoleConsumer.ConsumerWrapper(
+ Optional.of(topic),
+ OptionalInt.empty(),
+ OptionalLong.empty(),
+ Optional.empty(),
+ mockConsumer,
+ timeoutMs
+ );
+
+ assertThrows(TimeoutException.class, consumer::receive);
+ }
+
+ @Test
+ public void shouldResetUnConsumedOffsetsBeforeExit() {
+ String topic = "test";
+ int maxMessages = 123;
+ int totalMessages = 700;
+ long startOffset = 0L;
+
+ MockConsumer mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ TopicPartition tp1 = new TopicPartition(topic, 0);
+ TopicPartition tp2 = new TopicPartition(topic, 1);
+
+ ConsoleConsumer.ConsumerWrapper consumer = new ConsoleConsumer.ConsumerWrapper(
+ Optional.of(topic),
+ OptionalInt.empty(),
+ OptionalLong.empty(),
+ Optional.empty(),
+ mockConsumer,
+ 1000L);
+
+ mockConsumer.rebalance(Arrays.asList(tp1, tp2));
+ Map offsets = new HashMap<>();
+ offsets.put(tp1, startOffset);
+ offsets.put(tp2, startOffset);
+ mockConsumer.updateBeginningOffsets(offsets);
+
+ for (int i = 0; i < totalMessages; i++) {
+ // add all records, each partition should have half of `totalMessages`
+ mockConsumer.addRecord(new ConsumerRecord<>(topic, i % 2, i / 2, "key".getBytes(), "value".getBytes()));
+ }
+
+ MessageFormatter formatter = mock(MessageFormatter.class);
+
+ ConsoleConsumer.process(maxMessages, formatter, consumer, System.out, false);
+ assertEquals(totalMessages, mockConsumer.position(tp1) + mockConsumer.position(tp2));
+
+ consumer.resetUnconsumedOffsets();
+ assertEquals(maxMessages, mockConsumer.position(tp1) + mockConsumer.position(tp2));
+
+ verify(formatter, times(maxMessages)).writeTo(any(), any());
+ consumer.cleanup();
+ }
+
+ @Test
+ public void shouldLimitReadsToMaxMessageLimit() {
+ ConsoleConsumer.ConsumerWrapper consumer = mock(ConsoleConsumer.ConsumerWrapper.class);
+ MessageFormatter formatter = mock(MessageFormatter.class);
+ ConsumerRecord record = new ConsumerRecord<>("foo", 1, 1, new byte[0], new byte[0]);
+
+ int messageLimit = 10;
+ when(consumer.receive()).thenReturn(record);
+
+ ConsoleConsumer.process(messageLimit, formatter, consumer, System.out, true);
+
+ verify(consumer, times(messageLimit)).receive();
+ verify(formatter, times(messageLimit)).writeTo(any(), any());
+
+ consumer.cleanup();
+ }
+
+ @Test
+ public void shouldStopWhenOutputCheckErrorFails() {
+ ConsoleConsumer.ConsumerWrapper consumer = mock(ConsoleConsumer.ConsumerWrapper.class);
+ MessageFormatter formatter = mock(MessageFormatter.class);
+ PrintStream printStream = mock(PrintStream.class);
+
+ ConsumerRecord record = new ConsumerRecord<>("foo", 1, 1, new byte[0], new byte[0]);
+
+ when(consumer.receive()).thenReturn(record);
+ //Simulate an error on System.out after the first record has been printed
+ when(printStream.checkError()).thenReturn(true);
+
+ ConsoleConsumer.process(-1, formatter, consumer, printStream, true);
+
+ verify(formatter).writeTo(any(), eq(printStream));
+ verify(consumer).receive();
+ verify(printStream).checkError();
+
+ consumer.cleanup();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldSeekWhenOffsetIsSet() {
+ Consumer mockConsumer = mock(Consumer.class);
+ TopicPartition tp0 = new TopicPartition("test", 0);
+
+ ConsoleConsumer.ConsumerWrapper consumer = new ConsoleConsumer.ConsumerWrapper(
+ Optional.of(tp0.topic()),
+ OptionalInt.of(tp0.partition()),
+ OptionalLong.empty(),
+ Optional.empty(),
+ mockConsumer,
+ 1000L);
+
+ verify(mockConsumer).assign(eq(Collections.singletonList(tp0)));
+ verify(mockConsumer).seekToEnd(eq(Collections.singletonList(tp0)));
+ consumer.cleanup();
+ reset(mockConsumer);
+
+ consumer = new ConsoleConsumer.ConsumerWrapper(
+ Optional.of(tp0.topic()),
+ OptionalInt.of(tp0.partition()),
+ OptionalLong.of(123L),
+ Optional.empty(),
+ mockConsumer,
+ 1000L);
+
+ verify(mockConsumer).assign(eq(Collections.singletonList(tp0)));
+ verify(mockConsumer).seek(eq(tp0), eq(123L));
+ consumer.cleanup();
+ reset(mockConsumer);
+
+ consumer = new ConsoleConsumer.ConsumerWrapper(
+ Optional.of(tp0.topic()),
+ OptionalInt.of(tp0.partition()),
+ OptionalLong.of(ListOffsetsRequest.EARLIEST_TIMESTAMP),
+ Optional.empty(),
+ mockConsumer,
+ 1000L);
+
+ verify(mockConsumer).assign(eq(Collections.singletonList(tp0)));
+ verify(mockConsumer).seekToBeginning(eq(Collections.singletonList(tp0)));
+ consumer.cleanup();
+ reset(mockConsumer);
+ }
+}
diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/DefaultMessageFormatterTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/DefaultMessageFormatterTest.java
new file mode 100644
index 00000000000..917d44d3a4f
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/consumer/DefaultMessageFormatterTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DefaultMessageFormatterTest {
+
+ @Test
+ public void testDefaultMessageFormatter() {
+ ConsumerRecord record = new ConsumerRecord<>("topic", 0, 123, "key".getBytes(), "value".getBytes());
+ MessageFormatter formatter = new DefaultMessageFormatter();
+ Map configs = new HashMap<>();
+
+ formatter.configure(configs);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ formatter.writeTo(record, new PrintStream(out));
+ assertEquals("value\n", out.toString());
+
+ configs.put("print.key", "true");
+ formatter.configure(configs);
+ out = new ByteArrayOutputStream();
+ formatter.writeTo(record, new PrintStream(out));
+ assertEquals("key\tvalue\n", out.toString());
+
+ configs.put("print.partition", "true");
+ formatter.configure(configs);
+ out = new ByteArrayOutputStream();
+ formatter.writeTo(record, new PrintStream(out));
+ assertEquals("Partition:0\tkey\tvalue\n", out.toString());
+
+ configs.put("print.timestamp", "true");
+ formatter.configure(configs);
+ out = new ByteArrayOutputStream();
+ formatter.writeTo(record, new PrintStream(out));
+ assertEquals("NO_TIMESTAMP\tPartition:0\tkey\tvalue\n", out.toString());
+
+ configs.put("print.offset", "true");
+ formatter.configure(configs);
+ out = new ByteArrayOutputStream();
+ formatter.writeTo(record, new PrintStream(out));
+ assertEquals("NO_TIMESTAMP\tPartition:0\tOffset:123\tkey\tvalue\n", out.toString());
+
+ configs.put("print.headers", "true");
+ formatter.configure(configs);
+ out = new ByteArrayOutputStream();
+ formatter.writeTo(record, new PrintStream(out));
+ assertEquals("NO_TIMESTAMP\tPartition:0\tOffset:123\tNO_HEADERS\tkey\tvalue\n", out.toString());
+
+ RecordHeaders headers = new RecordHeaders();
+ headers.add("h1", "v1".getBytes());
+ headers.add("h2", "v2".getBytes());
+ record = new ConsumerRecord<>("topic", 0, 123, 123L, TimestampType.CREATE_TIME, -1, -1, "key".getBytes(), "value".getBytes(),
+ headers, Optional.empty());
+ out = new ByteArrayOutputStream();
+ formatter.writeTo(record, new PrintStream(out));
+ assertEquals("CreateTime:123\tPartition:0\tOffset:123\th1:v1,h2:v2\tkey\tvalue\n", out.toString());
+
+ configs.put("print.value", "false");
+ formatter.configure(configs);
+ out = new ByteArrayOutputStream();
+ formatter.writeTo(record, new PrintStream(out));
+ assertEquals("CreateTime:123\tPartition:0\tOffset:123\th1:v1,h2:v2\tkey\n", out.toString());
+
+ configs.put("key.separator", "");
+ formatter.configure(configs);
+ out = new ByteArrayOutputStream();
+ formatter.writeTo(record, new PrintStream(out));
+ assertEquals("CreateTime:123Partition:0Offset:123h1:v1,h2:v2key\n", out.toString());
+
+ configs.put("line.separator", "");
+ formatter.configure(configs);
+ out = new ByteArrayOutputStream();
+ formatter.writeTo(record, new PrintStream(out));
+ assertEquals("CreateTime:123Partition:0Offset:123h1:v1,h2:v2key", out.toString());
+
+ configs.put("headers.separator", "|");
+ formatter.configure(configs);
+ out = new ByteArrayOutputStream();
+ formatter.writeTo(record, new PrintStream(out));
+ assertEquals("CreateTime:123Partition:0Offset:123h1:v1|h2:v2key", out.toString());
+
+ record = new ConsumerRecord<>("topic", 0, 123, 123L, TimestampType.CREATE_TIME, -1, -1, "key".getBytes(), "value".getBytes(),
+ headers, Optional.empty());
+
+ configs.put("key.deserializer", UpperCaseDeserializer.class.getName());
+ formatter.configure(configs);
+ out = new ByteArrayOutputStream();
+ formatter.writeTo(record, new PrintStream(out));
+ assertEquals("CreateTime:123Partition:0Offset:123h1:v1|h2:v2KEY", out.toString());
+
+ configs.put("headers.deserializer", UpperCaseDeserializer.class.getName());
+ formatter.configure(configs);
+ out = new ByteArrayOutputStream();
+ formatter.writeTo(record, new PrintStream(out));
+ assertEquals("CreateTime:123Partition:0Offset:123h1:V1|h2:V2KEY", out.toString());
+
+ record = new ConsumerRecord<>("topic", 0, 123, 123L, TimestampType.CREATE_TIME, -1, -1, "key".getBytes(), null,
+ headers, Optional.empty());
+
+ configs.put("print.value", "true");
+ configs.put("null.literal", "");
+ formatter.configure(configs);
+ out = new ByteArrayOutputStream();
+ formatter.writeTo(record, new PrintStream(out));
+ assertEquals("CreateTime:123Partition:0Offset:123h1:V1|h2:V2KEY", out.toString());
+ formatter.close();
+ }
+
+ static class UpperCaseDeserializer implements Deserializer {
+
+ @Override
+ public String deserialize(String topic, byte[] data) {
+ return new String(data, StandardCharsets.UTF_8).toUpperCase(Locale.ROOT);
+ }
+ }
+}
+
diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/NoOpMessageFormatterTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/NoOpMessageFormatterTest.java
new file mode 100644
index 00000000000..1652a484d10
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/consumer/NoOpMessageFormatterTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.HashMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class NoOpMessageFormatterTest {
+
+ @Test
+ public void testNoOpMessageFormatter() {
+ ConsumerRecord record = new ConsumerRecord<>("topic", 0, 123, "key".getBytes(), "value".getBytes());
+ try (MessageFormatter formatter = new NoOpMessageFormatter()) {
+ formatter.configure(new HashMap<>());
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ formatter.writeTo(record, new PrintStream(out));
+ assertEquals("", out.toString());
+ }
+ }
+}