From 0bf830fc9c3915bc99b6e487e6083dabd593c5d3 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Tue, 13 Feb 2024 19:24:07 +0100 Subject: [PATCH] KAFKA-14576: Move ConsoleConsumer to tools (#15274) Reviewers: Josep Prat , Omnia Ibrahim --- bin/kafka-console-consumer.sh | 2 +- bin/windows/kafka-console-consumer.bat | 2 +- checkstyle/import-control.xml | 18 +- checkstyle/suppressions.xml | 4 +- .../scala/kafka/tools/ConsoleConsumer.scala | 647 ----------------- .../main/scala/kafka/utils/ToolsUtils.scala | 3 +- .../kafka/tools/CustomDeserializerTest.scala | 65 -- .../tools/DefaultMessageFormatterTest.scala | 234 ------ .../kafka/tools/ConsoleConsumerTest.scala | 676 ------------------ .../KStreamAggregationIntegrationTest.java | 8 +- tests/kafkatest/services/console_consumer.py | 7 +- tests/kafkatest/version.py | 4 + .../kafka/tools/consumer/ConsoleConsumer.java | 234 ++++++ .../consumer/ConsoleConsumerOptions.java | 414 +++++++++++ .../consumer/DefaultMessageFormatter.java | 210 ++++++ .../consumer/LoggingMessageFormatter.java | 49 ++ .../tools/consumer/NoOpMessageFormatter.java | 30 + .../apache/kafka/tools/ToolsTestUtils.java | 10 + .../consumer/ConsoleConsumerOptionsTest.java | 621 ++++++++++++++++ .../tools/consumer/ConsoleConsumerTest.java | 211 ++++++ .../consumer/DefaultMessageFormatterTest.java | 147 ++++ .../consumer/NoOpMessageFormatterTest.java | 41 ++ 22 files changed, 1996 insertions(+), 1641 deletions(-) delete mode 100755 core/src/main/scala/kafka/tools/ConsoleConsumer.scala delete mode 100644 core/src/test/scala/kafka/tools/CustomDeserializerTest.scala delete mode 100644 core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala delete mode 100644 core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala create mode 100644 tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java create mode 100644 tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java create mode 100644 tools/src/main/java/org/apache/kafka/tools/consumer/DefaultMessageFormatter.java create mode 100644 tools/src/main/java/org/apache/kafka/tools/consumer/LoggingMessageFormatter.java create mode 100644 tools/src/main/java/org/apache/kafka/tools/consumer/NoOpMessageFormatter.java create mode 100644 tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java create mode 100644 tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java create mode 100644 tools/src/test/java/org/apache/kafka/tools/consumer/DefaultMessageFormatterTest.java create mode 100644 tools/src/test/java/org/apache/kafka/tools/consumer/NoOpMessageFormatterTest.java diff --git a/bin/kafka-console-consumer.sh b/bin/kafka-console-consumer.sh index dbaac2b83b1..f2201462738 100755 --- a/bin/kafka-console-consumer.sh +++ b/bin/kafka-console-consumer.sh @@ -18,4 +18,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M" fi -exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@" +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.consumer.ConsoleConsumer "$@" diff --git a/bin/windows/kafka-console-consumer.bat b/bin/windows/kafka-console-consumer.bat index bbbd33656ad..9236cfb16a3 100644 --- a/bin/windows/kafka-console-consumer.bat +++ b/bin/windows/kafka-console-consumer.bat @@ -16,5 +16,5 @@ rem limitations under the License. SetLocal set KAFKA_HEAP_OPTS=-Xmx512M -"%~dp0kafka-run-class.bat" kafka.tools.ConsoleConsumer %* +"%~dp0kafka-run-class.bat" org.apache.kafka.tools.consumer.ConsoleConsumer %* EndLocal diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 2f9c3960483..caf1fe5ebe1 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -321,6 +321,17 @@ + + + + + + + + + + + @@ -331,13 +342,6 @@ - - - - - - - diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 925c50dec86..6a8bc0b1852 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -273,11 +273,11 @@ + files="(ConsoleConsumer|DefaultMessageFormatter|StreamsResetter|ProducerPerformance|Agent).java"/> + files="(DefaultMessageFormatter|ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier|ReplicaVerificationTool).java"/> - error("Authentication failed: terminating consumer process", e) - Exit.exit(1) - case e: Throwable => - error("Unknown error when running consumer: ", e) - Exit.exit(1) - } - } - - def run(conf: ConsumerConfig): Unit = { - val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs.toLong else Long.MaxValue - val consumer = new KafkaConsumer(consumerProps(conf), new ByteArrayDeserializer, new ByteArrayDeserializer) - - val consumerWrapper = - if (conf.partitionArg.isDefined) - new ConsumerWrapper(Option(conf.topicArg), conf.partitionArg, Option(conf.offsetArg), None, consumer, timeoutMs) - else - new ConsumerWrapper(Option(conf.topicArg), None, None, Option(conf.includedTopicsArg), consumer, timeoutMs) - - addShutdownHook(consumerWrapper, conf) - - try process(conf.maxMessages, conf.formatter, consumerWrapper, System.out, conf.skipMessageOnError) - finally { - consumerWrapper.cleanup() - conf.formatter.close() - reportRecordCount() - - shutdownLatch.countDown() - } - } - - def addShutdownHook(consumer: ConsumerWrapper, conf: ConsumerConfig): Unit = { - Exit.addShutdownHook("consumer-shutdown-hook", { - consumer.wakeup() - - shutdownLatch.await() - - if (conf.enableSystestEventsLogging) { - System.out.println("shutdown_complete") - } - }) - } - - def process(maxMessages: Integer, formatter: MessageFormatter, consumer: ConsumerWrapper, output: PrintStream, - skipMessageOnError: Boolean): Unit = { - while (messageCount < maxMessages || maxMessages == -1) { - val msg: ConsumerRecord[Array[Byte], Array[Byte]] = try { - consumer.receive() - } catch { - case _: WakeupException => - trace("Caught WakeupException because consumer is shutdown, ignore and terminate.") - // Consumer will be closed - return - case e: Throwable => - error("Error processing message, terminating consumer process: ", e) - // Consumer will be closed - return - } - messageCount += 1 - try { - formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp, msg.timestampType, - 0, 0, msg.key, msg.value, msg.headers, Optional.empty[Integer]), output) - } catch { - case e: Throwable => - if (skipMessageOnError) { - error("Error processing message, skipping this message: ", e) - } else { - // Consumer will be closed - throw e - } - } - if (checkErr(output, formatter)) { - // Consumer will be closed - return - } - } - } - - def reportRecordCount(): Unit = { - System.err.println(s"Processed a total of $messageCount messages") - } - - def checkErr(output: PrintStream, formatter: MessageFormatter): Boolean = { - val gotError = output.checkError() - if (gotError) { - // This means no one is listening to our output stream anymore, time to shutdown - System.err.println("Unable to write to standard out, closing consumer.") - } - gotError - } - - private[tools] def consumerProps(config: ConsumerConfig): Properties = { - val props = new Properties - props ++= config.consumerProps - props ++= config.extraConsumerProps - setAutoOffsetResetValue(config, props) - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer) - if (props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG) == null) - props.put(ConsumerConfig.CLIENT_ID_CONFIG, "console-consumer") - CommandLineUtils.maybeMergeOptions( - props, ConsumerConfig.ISOLATION_LEVEL_CONFIG, config.options, config.isolationLevelOpt) - props - } - - /** - * Used by consumerProps to retrieve the correct value for the consumer parameter 'auto.offset.reset'. - * - * Order of priority is: - * 1. Explicitly set parameter via --consumer.property command line parameter - * 2. Explicit --from-beginning given -> 'earliest' - * 3. Default value of 'latest' - * - * In case both --from-beginning and an explicit value are specified an error is thrown if these - * are conflicting. - */ - def setAutoOffsetResetValue(config: ConsumerConfig, props: Properties): Unit = { - val (earliestConfigValue, latestConfigValue) = ("earliest", "latest") - - if (props.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) { - // auto.offset.reset parameter was specified on the command line - val autoResetOption = props.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) - if (config.options.has(config.resetBeginningOpt) && earliestConfigValue != autoResetOption) { - // conflicting options - latest und earliest, throw an error - System.err.println(s"Can't simultaneously specify --from-beginning and 'auto.offset.reset=$autoResetOption', " + - "please remove one option") - Exit.exit(1) - } - // nothing to do, checking for valid parameter values happens later and the specified - // value was already copied during .putall operation - } else { - // no explicit value for auto.offset.reset was specified - // if --from-beginning was specified use earliest, otherwise default to latest - val autoResetOption = if (config.options.has(config.resetBeginningOpt)) earliestConfigValue else latestConfigValue - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoResetOption) - } - } - - class ConsumerConfig(args: Array[String]) extends CommandDefaultOptions(args) { - val topicOpt = parser.accepts("topic", "The topic to consume on.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) - val whitelistOpt = parser.accepts("whitelist", - "DEPRECATED, use --include instead; ignored if --include specified. Regular expression specifying list of topics to include for consumption.") - .withRequiredArg - .describedAs("Java regex (String)") - .ofType(classOf[String]) - val includeOpt = parser.accepts("include", - "Regular expression specifying list of topics to include for consumption.") - .withRequiredArg - .describedAs("Java regex (String)") - .ofType(classOf[String]) - val partitionIdOpt = parser.accepts("partition", "The partition to consume from. Consumption " + - "starts from the end of the partition unless '--offset' is specified.") - .withRequiredArg - .describedAs("partition") - .ofType(classOf[java.lang.Integer]) - val offsetOpt = parser.accepts("offset", "The offset to consume from (a non-negative number), or 'earliest' which means from beginning, or 'latest' which means from end") - .withRequiredArg - .describedAs("consume offset") - .ofType(classOf[String]) - .defaultsTo("latest") - val consumerPropertyOpt = parser.accepts("consumer-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.") - .withRequiredArg - .describedAs("consumer_prop") - .ofType(classOf[String]) - val consumerConfigOpt = parser.accepts("consumer.config", s"Consumer config properties file. Note that $consumerPropertyOpt takes precedence over this config.") - .withRequiredArg - .describedAs("config file") - .ofType(classOf[String]) - val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.") - .withRequiredArg - .describedAs("class") - .ofType(classOf[String]) - .defaultsTo(classOf[DefaultMessageFormatter].getName) - val messageFormatterArgOpt = parser.accepts("property", - """The properties to initialize the message formatter. Default properties include: - | print.timestamp=true|false - | print.key=true|false - | print.offset=true|false - | print.partition=true|false - | print.headers=true|false - | print.value=true|false - | key.separator= - | line.separator= - | headers.separator= - | null.literal= - | key.deserializer= - | value.deserializer= - | header.deserializer= - | - |Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers.""" - .stripMargin) - .withRequiredArg - .describedAs("prop") - .ofType(classOf[String]) - val messageFormatterConfigOpt = parser.accepts("formatter-config", s"Config properties file to initialize the message formatter. Note that $messageFormatterArgOpt takes precedence over this config.") - .withRequiredArg - .describedAs("config file") - .ofType(classOf[String]) - val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " + - "start with the earliest message present in the log rather than the latest message.") - val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.") - .withRequiredArg - .describedAs("num_messages") - .ofType(classOf[java.lang.Integer]) - val timeoutMsOpt = parser.accepts("timeout-ms", "If specified, exit if no message is available for consumption for the specified interval.") - .withRequiredArg - .describedAs("timeout_ms") - .ofType(classOf[java.lang.Integer]) - val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " + - "skip it instead of halt.") - val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The server(s) to connect to.") - .withRequiredArg - .describedAs("server to connect to") - .ofType(classOf[String]) - val keyDeserializerOpt = parser.accepts("key-deserializer") - .withRequiredArg - .describedAs("deserializer for key") - .ofType(classOf[String]) - val valueDeserializerOpt = parser.accepts("value-deserializer") - .withRequiredArg - .describedAs("deserializer for values") - .ofType(classOf[String]) - val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events", - "Log lifecycle events of the consumer in addition to logging consumed " + - "messages. (This is specific for system tests.)") - val isolationLevelOpt = parser.accepts("isolation-level", - "Set to read_committed in order to filter out transactional messages which are not committed. Set to read_uncommitted " + - "to read all messages.") - .withRequiredArg() - .ofType(classOf[String]) - .defaultsTo("read_uncommitted") - - val groupIdOpt = parser.accepts("group", "The consumer group id of the consumer.") - .withRequiredArg - .describedAs("consumer group id") - .ofType(classOf[String]) - - options = tryParse(parser, args) - - CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read data from Kafka topics and outputs it to standard output.") - - var groupIdPassed = true - val enableSystestEventsLogging = options.has(enableSystestEventsLoggingOpt) - - // topic must be specified. - var topicArg: String = _ - var includedTopicsArg: String = _ - val extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt)) - val consumerProps = if (options.has(consumerConfigOpt)) - Utils.loadProps(options.valueOf(consumerConfigOpt)) - else - new Properties() - val fromBeginning = options.has(resetBeginningOpt) - val partitionArg = if (options.has(partitionIdOpt)) Some(options.valueOf(partitionIdOpt).intValue) else None - val skipMessageOnError = options.has(skipMessageOnErrorOpt) - val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) - val formatterArgs = if (options.has(messageFormatterConfigOpt)) - Utils.loadProps(options.valueOf(messageFormatterConfigOpt)) - else - new Properties() - formatterArgs ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt)) - val maxMessages = if (options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1 - val timeoutMs = if (options.has(timeoutMsOpt)) options.valueOf(timeoutMsOpt).intValue else -1 - val bootstrapServer = options.valueOf(bootstrapServerOpt) - val keyDeserializer = options.valueOf(keyDeserializerOpt) - val valueDeserializer = options.valueOf(valueDeserializerOpt) - val formatter: MessageFormatter = messageFormatterClass.getDeclaredConstructor().newInstance().asInstanceOf[MessageFormatter] - - if (keyDeserializer != null && keyDeserializer.nonEmpty) { - formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer) - } - if (valueDeserializer != null && valueDeserializer.nonEmpty) { - formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer) - } - - formatter.configure(formatterArgs.asScala.asJava) - - topicArg = options.valueOf(topicOpt) - includedTopicsArg = if (options.has(includeOpt)) - options.valueOf(includeOpt) - else - options.valueOf(whitelistOpt) - - val topicOrFilterArgs = List(topicArg, includedTopicsArg).filterNot(_ == null) - // user need to specify value for either --topic or one of the include filters options (--include or --whitelist) - if (topicOrFilterArgs.size != 1) - CommandLineUtils.printUsageAndExit(parser, s"Exactly one of --include/--topic is required. " + - s"${if (options.has(whitelistOpt)) "--whitelist is DEPRECATED use --include instead; ignored if --include specified."}") - - if (partitionArg.isDefined) { - if (!options.has(topicOpt)) - CommandLineUtils.printUsageAndExit(parser, "The topic is required when partition is specified.") - if (fromBeginning && options.has(offsetOpt)) - CommandLineUtils.printUsageAndExit(parser, "Options from-beginning and offset cannot be specified together.") - } else if (options.has(offsetOpt)) - CommandLineUtils.printUsageAndExit(parser, "The partition is required when offset is specified.") - - def invalidOffset(offset: String): Nothing = - ToolsUtils.printUsageAndExit(parser, s"The provided offset value '$offset' is incorrect. Valid values are " + - "'earliest', 'latest', or a non-negative long.") - - val offsetArg = - if (options.has(offsetOpt)) { - options.valueOf(offsetOpt).toLowerCase(Locale.ROOT) match { - case "earliest" => ListOffsetsRequest.EARLIEST_TIMESTAMP - case "latest" => ListOffsetsRequest.LATEST_TIMESTAMP - case offsetString => - try { - val offset = offsetString.toLong - if (offset < 0) - invalidOffset(offsetString) - offset - } catch { - case _: NumberFormatException => invalidOffset(offsetString) - } - } - } - else if (fromBeginning) ListOffsetsRequest.EARLIEST_TIMESTAMP - else ListOffsetsRequest.LATEST_TIMESTAMP - - CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt) - - // if the group id is provided in more than place (through different means) all values must be the same - val groupIdsProvided = Set( - Option(options.valueOf(groupIdOpt)), // via --group - Option(consumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)), // via --consumer-property - Option(extraConsumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)) // via --consumer.config - ).flatten - - if (groupIdsProvided.size > 1) { - CommandLineUtils.printUsageAndExit(parser, "The group ids provided in different places (directly using '--group', " - + "via '--consumer-property', or via '--consumer.config') do not match. " - + s"Detected group ids: ${groupIdsProvided.mkString("'", "', '", "'")}") - } - - groupIdsProvided.headOption match { - case Some(group) => - consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, group) - case None => - consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"console-consumer-${new Random().nextInt(100000)}") - // By default, avoid unnecessary expansion of the coordinator cache since - // the auto-generated group and its offsets is not intended to be used again - if (!consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) - consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") - groupIdPassed = false - } - - if (groupIdPassed && partitionArg.isDefined) - CommandLineUtils.printUsageAndExit(parser, "Options group and partition cannot be specified together.") - - def tryParse(parser: OptionParser, args: Array[String]): OptionSet = { - try - parser.parse(args: _*) - catch { - case e: OptionException => - ToolsUtils.printUsageAndExit(parser, e.getMessage) - } - } - } - - private[tools] class ConsumerWrapper( - topic: Option[String], - partitionId: Option[Int], - offset: Option[Long], - includedTopics: Option[String], - consumer: Consumer[Array[Byte], Array[Byte]], - timeoutMs: Long = Long.MaxValue, - time: Time = Time.SYSTEM - ) { - consumerInit() - var recordIter = Collections.emptyList[ConsumerRecord[Array[Byte], Array[Byte]]]().iterator() - - def consumerInit(): Unit = { - (topic, partitionId, offset, includedTopics) match { - case (Some(topic), Some(partitionId), Some(offset), None) => - seek(topic, partitionId, offset) - case (Some(topic), Some(partitionId), None, None) => - // default to latest if no offset is provided - seek(topic, partitionId, ListOffsetsRequest.LATEST_TIMESTAMP) - case (Some(topic), None, None, None) => - consumer.subscribe(Collections.singletonList(topic)) - case (None, None, None, Some(include)) => - consumer.subscribe(Pattern.compile(include)) - case _ => - throw new IllegalArgumentException("An invalid combination of arguments is provided. " + - "Exactly one of 'topic' or 'include' must be provided. " + - "If 'topic' is provided, an optional 'partition' may also be provided. " + - "If 'partition' is provided, an optional 'offset' may also be provided, otherwise, consumption starts from the end of the partition.") - } - } - - def seek(topic: String, partitionId: Int, offset: Long): Unit = { - val topicPartition = new TopicPartition(topic, partitionId) - consumer.assign(Collections.singletonList(topicPartition)) - offset match { - case ListOffsetsRequest.EARLIEST_TIMESTAMP => consumer.seekToBeginning(Collections.singletonList(topicPartition)) - case ListOffsetsRequest.LATEST_TIMESTAMP => consumer.seekToEnd(Collections.singletonList(topicPartition)) - case _ => consumer.seek(topicPartition, offset) - } - } - - def resetUnconsumedOffsets(): Unit = { - val smallestUnconsumedOffsets = collection.mutable.Map[TopicPartition, Long]() - while (recordIter.hasNext) { - val record = recordIter.next() - val tp = new TopicPartition(record.topic, record.partition) - // avoid auto-committing offsets which haven't been consumed - smallestUnconsumedOffsets.getOrElseUpdate(tp, record.offset) - } - smallestUnconsumedOffsets.forKeyValue { (tp, offset) => consumer.seek(tp, offset) } - } - - def receive(): ConsumerRecord[Array[Byte], Array[Byte]] = { - val startTimeMs = time.milliseconds - while (!recordIter.hasNext) { - recordIter = consumer.poll(Duration.ofMillis(timeoutMs)).iterator - if (!recordIter.hasNext && (time.milliseconds - startTimeMs > timeoutMs)) { - throw new TimeoutException() - } - } - - recordIter.next - } - - def wakeup(): Unit = { - this.consumer.wakeup() - } - - def cleanup(): Unit = { - resetUnconsumedOffsets() - this.consumer.close() - } - - } -} - -class DefaultMessageFormatter extends MessageFormatter { - var printTimestamp = false - var printKey = false - var printValue = true - var printPartition = false - var printOffset = false - var printHeaders = false - var keySeparator = utfBytes("\t") - var lineSeparator = utfBytes("\n") - var headersSeparator = utfBytes(",") - var nullLiteral = utfBytes("null") - - var keyDeserializer: Option[Deserializer[_]] = None - var valueDeserializer: Option[Deserializer[_]] = None - var headersDeserializer: Option[Deserializer[_]] = None - - override def configure(configs: Map[String, _]): Unit = { - getPropertyIfExists(configs, "print.timestamp", getBoolProperty).foreach(printTimestamp = _) - getPropertyIfExists(configs, "print.key", getBoolProperty).foreach(printKey = _) - getPropertyIfExists(configs, "print.offset", getBoolProperty).foreach(printOffset = _) - getPropertyIfExists(configs, "print.partition", getBoolProperty).foreach(printPartition = _) - getPropertyIfExists(configs, "print.headers", getBoolProperty).foreach(printHeaders = _) - getPropertyIfExists(configs, "print.value", getBoolProperty).foreach(printValue = _) - getPropertyIfExists(configs, "key.separator", getByteProperty).foreach(keySeparator = _) - getPropertyIfExists(configs, "line.separator", getByteProperty).foreach(lineSeparator = _) - getPropertyIfExists(configs, "headers.separator", getByteProperty).foreach(headersSeparator = _) - getPropertyIfExists(configs, "null.literal", getByteProperty).foreach(nullLiteral = _) - - keyDeserializer = getPropertyIfExists(configs, "key.deserializer", getDeserializerProperty(true)) - valueDeserializer = getPropertyIfExists(configs, "value.deserializer", getDeserializerProperty(false)) - headersDeserializer = getPropertyIfExists(configs, "headers.deserializer", getDeserializerProperty(false)) - } - - def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = { - - def writeSeparator(columnSeparator: Boolean): Unit = { - if (columnSeparator) - output.write(keySeparator) - else - output.write(lineSeparator) - } - - def deserialize(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String) = { - val nonNullBytes = Option(sourceBytes).getOrElse(nullLiteral) - val convertedBytes = deserializer - .map(d => utfBytes(d.deserialize(topic, consumerRecord.headers, nonNullBytes).toString)) - .getOrElse(nonNullBytes) - convertedBytes - } - - import consumerRecord._ - - if (printTimestamp) { - if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) - output.write(utfBytes(s"$timestampType:$timestamp")) - else - output.write(utfBytes("NO_TIMESTAMP")) - writeSeparator(columnSeparator = printOffset || printPartition || printHeaders || printKey || printValue) - } - - if (printPartition) { - output.write(utfBytes("Partition:")) - output.write(utfBytes(partition().toString)) - writeSeparator(columnSeparator = printOffset || printHeaders || printKey || printValue) - } - - if (printOffset) { - output.write(utfBytes("Offset:")) - output.write(utfBytes(offset().toString)) - writeSeparator(columnSeparator = printHeaders || printKey || printValue) - } - - if (printHeaders) { - val headersIt = headers().iterator.asScala - if (headersIt.hasNext) { - headersIt.foreach { header => - output.write(utfBytes(header.key() + ":")) - output.write(deserialize(headersDeserializer, header.value(), topic)) - if (headersIt.hasNext) { - output.write(headersSeparator) - } - } - } else { - output.write(utfBytes("NO_HEADERS")) - } - writeSeparator(columnSeparator = printKey || printValue) - } - - if (printKey) { - output.write(deserialize(keyDeserializer, key, topic)) - writeSeparator(columnSeparator = printValue) - } - - if (printValue) { - output.write(deserialize(valueDeserializer, value, topic)) - output.write(lineSeparator) - } - } - - private def propertiesWithKeyPrefixStripped(prefix: String, configs: Map[String, _]): Map[String, _] = { - val newConfigs = collection.mutable.Map[String, Any]() - configs.asScala.foreach { case (key, value) => - if (key.startsWith(prefix) && key.length > prefix.length) - newConfigs.put(key.substring(prefix.length), value) - } - newConfigs.asJava - } - - private def utfBytes(str: String) = str.getBytes(StandardCharsets.UTF_8) - - private def getByteProperty(configs: Map[String, _], key: String): Array[Byte] = { - utfBytes(configs.get(key).asInstanceOf[String]) - } - - private def getBoolProperty(configs: Map[String, _], key: String): Boolean = { - configs.get(key).asInstanceOf[String].trim.equalsIgnoreCase("true") - } - - private def getDeserializerProperty(isKey: Boolean)(configs: Map[String, _], propertyName: String): Deserializer[_] = { - val deserializer = Class.forName(configs.get(propertyName).asInstanceOf[String]).getDeclaredConstructor().newInstance().asInstanceOf[Deserializer[_]] - val deserializerConfig = propertiesWithKeyPrefixStripped(propertyName + ".", configs) - .asScala - .asJava - deserializer.configure(deserializerConfig, isKey) - deserializer - } - - private def getPropertyIfExists[T](configs: Map[String, _], key: String, getter: (Map[String, _], String) => T): Option[T] = { - if (configs.containsKey(key)) - Some(getter(configs, key)) - else - None - } -} - -class LoggingMessageFormatter extends MessageFormatter with LazyLogging { - private val defaultWriter: DefaultMessageFormatter = new DefaultMessageFormatter - - override def configure(configs: Map[String, _]): Unit = defaultWriter.configure(configs) - - def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = { - import consumerRecord._ - defaultWriter.writeTo(consumerRecord, output) - logger.info({if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) s"$timestampType:$timestamp, " else ""} + - s"key:${if (key == null) "null" else new String(key, StandardCharsets.UTF_8)}, " + - s"value:${if (value == null) "null" else new String(value, StandardCharsets.UTF_8)}") - } -} - -class NoOpMessageFormatter extends MessageFormatter { - - def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {} -} - diff --git a/core/src/main/scala/kafka/utils/ToolsUtils.scala b/core/src/main/scala/kafka/utils/ToolsUtils.scala index 7a2aa3111c5..edd79c00030 100644 --- a/core/src/main/scala/kafka/utils/ToolsUtils.scala +++ b/core/src/main/scala/kafka/utils/ToolsUtils.scala @@ -69,8 +69,7 @@ object ToolsUtils { /** * This is a simple wrapper around `CommandLineUtils.printUsageAndExit`. * It is needed for tools migration (KAFKA-14525), as there is no Java equivalent for return type `Nothing`. - * Can be removed once [[kafka.admin.ConsumerGroupCommand]], [[kafka.tools.ConsoleConsumer]] - * and [[kafka.tools.ConsoleProducer]] are migrated. + * Can be removed once [[kafka.admin.ConsumerGroupCommand]] and [[kafka.tools.ConsoleProducer]] are migrated. * * @param parser Command line options parser. * @param message Error message. diff --git a/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala b/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala deleted file mode 100644 index 244a9cfb148..00000000000 --- a/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - -import java.io.PrintStream - -import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.common.header.Headers -import org.apache.kafka.common.serialization.Deserializer -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Test -import org.mockito.Mockito._ - -class CustomDeserializer extends Deserializer[String] { - - override def deserialize(topic: String, data: Array[Byte]): String = { - assertNotNull(topic, "topic must not be null") - new String(data) - } - - override def deserialize(topic: String, headers: Headers, data: Array[Byte]): String = { - println("WITH HEADERS") - new String(data) - } -} - -class CustomDeserializerTest { - - @Test - def checkFormatterCallDeserializerWithHeaders(): Unit = { - val formatter = new DefaultMessageFormatter() - formatter.valueDeserializer = Some(new CustomDeserializer) - val output = TestUtils.grabConsoleOutput(formatter.writeTo( - new ConsumerRecord("topic_test", 1, 1L, "key".getBytes, "value".getBytes), mock(classOf[PrintStream]))) - assertTrue(output.contains("WITH HEADERS"), "DefaultMessageFormatter should call `deserialize` method with headers.") - formatter.close() - } - - @Test - def checkDeserializerTopicIsNotNull(): Unit = { - val formatter = new DefaultMessageFormatter() - formatter.keyDeserializer = Some(new CustomDeserializer) - - formatter.writeTo(new ConsumerRecord("topic_test", 1, 1L, "key".getBytes, "value".getBytes), - mock(classOf[PrintStream])) - - formatter.close() - } -} diff --git a/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala b/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala deleted file mode 100644 index 12bbc948110..00000000000 --- a/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala +++ /dev/null @@ -1,234 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - -import java.io.{ByteArrayOutputStream, Closeable, PrintStream} -import java.nio.charset.StandardCharsets -import java.util - -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.common.header.Header -import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders} -import org.apache.kafka.common.record.TimestampType -import org.apache.kafka.common.serialization.Deserializer -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.{Arguments, MethodSource} - -import java.util.Optional -import scala.jdk.CollectionConverters._ - -class DefaultMessageFormatterTest { - import DefaultMessageFormatterTest._ - - @ParameterizedTest - @MethodSource(Array("parameters")) - def testWriteRecord(name: String, record: ConsumerRecord[Array[Byte], Array[Byte]], properties: Map[String, String], expected: String): Unit = { - withResource(new ByteArrayOutputStream()) { baos => - withResource(new PrintStream(baos)) { ps => - val formatter = buildFormatter(properties) - formatter.writeTo(record, ps) - val actual = new String(baos.toByteArray(), StandardCharsets.UTF_8) - assertEquals(expected, actual) - - } - } - } -} - -object DefaultMessageFormatterTest { - def parameters: java.util.stream.Stream[Arguments] = { - Seq( - Arguments.of( - "print nothing", - consumerRecord(), - Map("print.value" -> "false"), - ""), - Arguments.of( - "print key", - consumerRecord(), - Map("print.key" -> "true", - "print.value" -> "false"), - "someKey\n"), - Arguments.of( - "print value", - consumerRecord(), - Map(), - "someValue\n"), - Arguments.of( - "print empty timestamp", - consumerRecord(timestampType = TimestampType.NO_TIMESTAMP_TYPE), - Map("print.timestamp" -> "true", - "print.value" -> "false"), - "NO_TIMESTAMP\n"), - Arguments.of( - "print log append time timestamp", - consumerRecord(timestampType = TimestampType.LOG_APPEND_TIME), - Map("print.timestamp" -> "true", - "print.value" -> "false"), - "LogAppendTime:1234\n"), - Arguments.of( - "print create time timestamp", - consumerRecord(timestampType = TimestampType.CREATE_TIME), - Map("print.timestamp" -> "true", - "print.value" -> "false"), - "CreateTime:1234\n"), - Arguments.of( - "print partition", - consumerRecord(), - Map("print.partition" -> "true", - "print.value" -> "false"), - "Partition:9\n"), - Arguments.of( - "print offset", - consumerRecord(), - Map("print.offset" -> "true", - "print.value" -> "false"), - "Offset:9876\n"), - Arguments.of( - "print headers", - consumerRecord(), - Map("print.headers" -> "true", - "print.value" -> "false"), - "h1:v1,h2:v2\n"), - Arguments.of( - "print empty headers", - consumerRecord(headers = Nil), - Map("print.headers" -> "true", - "print.value" -> "false"), - "NO_HEADERS\n"), - Arguments.of( - "print all possible fields with default delimiters", - consumerRecord(), - Map("print.key" -> "true", - "print.timestamp" -> "true", - "print.partition" -> "true", - "print.offset" -> "true", - "print.headers" -> "true", - "print.value" -> "true"), - "CreateTime:1234\tPartition:9\tOffset:9876\th1:v1,h2:v2\tsomeKey\tsomeValue\n"), - Arguments.of( - "print all possible fields with custom delimiters", - consumerRecord(), - Map("key.separator" -> "|", - "line.separator" -> "^", - "headers.separator" -> "#", - "print.key" -> "true", - "print.timestamp" -> "true", - "print.partition" -> "true", - "print.offset" -> "true", - "print.headers" -> "true", - "print.value" -> "true"), - "CreateTime:1234|Partition:9|Offset:9876|h1:v1#h2:v2|someKey|someValue^"), - Arguments.of( - "print key with custom deserializer", - consumerRecord(), - Map("print.key" -> "true", - "print.headers" -> "true", - "print.value" -> "true", - "key.deserializer" -> "kafka.tools.UpperCaseDeserializer"), - "h1:v1,h2:v2\tSOMEKEY\tsomeValue\n"), - Arguments.of( - "print value with custom deserializer", - consumerRecord(), - Map("print.key" -> "true", - "print.headers" -> "true", - "print.value" -> "true", - "value.deserializer" -> "kafka.tools.UpperCaseDeserializer"), - "h1:v1,h2:v2\tsomeKey\tSOMEVALUE\n"), - Arguments.of( - "print headers with custom deserializer", - consumerRecord(), - Map("print.key" -> "true", - "print.headers" -> "true", - "print.value" -> "true", - "headers.deserializer" -> "kafka.tools.UpperCaseDeserializer"), - "h1:V1,h2:V2\tsomeKey\tsomeValue\n"), - Arguments.of( - "print key and value", - consumerRecord(), - Map("print.key" -> "true", - "print.value" -> "true"), - "someKey\tsomeValue\n"), - Arguments.of( - "print fields in the beginning, middle and the end", - consumerRecord(), - Map("print.key" -> "true", - "print.value" -> "true", - "print.partition" -> "true"), - "Partition:9\tsomeKey\tsomeValue\n"), - Arguments.of( - "null value without custom null literal", - consumerRecord(value = null), - Map("print.key" -> "true"), - "someKey\tnull\n"), - Arguments.of( - "null value with custom null literal", - consumerRecord(value = null), - Map("print.key" -> "true", - "null.literal" -> "NULL"), - "someKey\tNULL\n"), - ).asJava.stream() - } - - private def buildFormatter(propsToSet: Map[String, String]): DefaultMessageFormatter = { - val formatter = new DefaultMessageFormatter() - formatter.configure(propsToSet.asJava) - formatter - } - - - private def header(key: String, value: String) = { - new RecordHeader(key, value.getBytes(StandardCharsets.UTF_8)) - } - - private def consumerRecord(key: String = "someKey", - value: String = "someValue", - headers: Iterable[Header] = Seq(header("h1", "v1"), header("h2", "v2")), - partition: Int = 9, - offset: Long = 9876, - timestamp: Long = 1234, - timestampType: TimestampType = TimestampType.CREATE_TIME) = { - new ConsumerRecord[Array[Byte], Array[Byte]]( - "someTopic", - partition, - offset, - timestamp, - timestampType, - 0, - 0, - if (key == null) null else key.getBytes(StandardCharsets.UTF_8), - if (value == null) null else value.getBytes(StandardCharsets.UTF_8), - new RecordHeaders(headers.asJava), - Optional.empty[Integer]) - } - - private def withResource[Resource <: Closeable, Result](resource: Resource)(handler: Resource => Result): Result = { - try { - handler(resource) - } finally { - resource.close() - } - } -} - -class UpperCaseDeserializer extends Deserializer[String] { - override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {} - override def deserialize(topic: String, data: Array[Byte]): String = new String(data, StandardCharsets.UTF_8).toUpperCase - override def close(): Unit = {} -} diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala deleted file mode 100644 index 054cede8e62..00000000000 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ /dev/null @@ -1,676 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - -import java.io.{ByteArrayOutputStream, PrintStream} -import java.util.{HashMap, Optional, Map => JMap} -import java.time.Duration -import kafka.tools.ConsoleConsumer.ConsumerWrapper -import kafka.utils.{Exit, TestUtils} -import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, OffsetResetStrategy} -import org.apache.kafka.common.{MessageFormatter, TopicPartition} -import org.apache.kafka.common.record.TimestampType -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.test.MockDeserializer -import org.mockito.Mockito._ -import org.mockito.ArgumentMatchers -import ArgumentMatchers._ -import org.apache.kafka.clients.consumer.Consumer -import org.apache.kafka.clients.consumer.ConsumerRecords -import org.apache.kafka.common.errors.TimeoutException -import org.apache.kafka.common.header.internals.RecordHeaders -import org.apache.kafka.server.util.MockTime -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{BeforeEach, Test} - -import scala.jdk.CollectionConverters._ - -class ConsoleConsumerTest { - - @BeforeEach - def setup(): Unit = { - ConsoleConsumer.messageCount = 0 - } - - @Test - def shouldThrowTimeoutExceptionWhenTimeoutIsReached(): Unit = { - val topic = "test" - val time = new MockTime - val timeoutMs = 1000 - - val mockConsumer = mock(classOf[Consumer[Array[Byte], Array[Byte]]]) - - when(mockConsumer.poll(Duration.ofMillis(timeoutMs))).thenAnswer { _ => - time.sleep(timeoutMs / 2 + 1) - ConsumerRecords.EMPTY - } - - val consumer = new ConsumerWrapper( - topic = Some(topic), - partitionId = None, - offset = None, - includedTopics = None, - consumer = mockConsumer, - timeoutMs = timeoutMs, - time = time - ) - - assertThrows(classOf[TimeoutException], () => consumer.receive()) - } - - @Test - def shouldResetUnConsumedOffsetsBeforeExit(): Unit = { - val topic = "test" - val maxMessages: Int = 123 - val totalMessages: Int = 700 - val startOffset: java.lang.Long = 0L - - val mockConsumer = new MockConsumer[Array[Byte], Array[Byte]](OffsetResetStrategy.EARLIEST) - val tp1 = new TopicPartition(topic, 0) - val tp2 = new TopicPartition(topic, 1) - - val consumer = new ConsumerWrapper(Some(topic), None, None, None, mockConsumer) - - mockConsumer.rebalance(List(tp1, tp2).asJava) - mockConsumer.updateBeginningOffsets(Map(tp1 -> startOffset, tp2 -> startOffset).asJava) - - 0 until totalMessages foreach { i => - // add all records, each partition should have half of `totalMessages` - mockConsumer.addRecord(new ConsumerRecord[Array[Byte], Array[Byte]](topic, i % 2, i / 2, "key".getBytes, "value".getBytes)) - } - - val formatter = mock(classOf[MessageFormatter]) - - ConsoleConsumer.process(maxMessages, formatter, consumer, System.out, skipMessageOnError = false) - assertEquals(totalMessages, mockConsumer.position(tp1) + mockConsumer.position(tp2)) - - consumer.resetUnconsumedOffsets() - assertEquals(maxMessages, mockConsumer.position(tp1) + mockConsumer.position(tp2)) - - verify(formatter, times(maxMessages)).writeTo(any(), any()) - } - - @Test - def shouldLimitReadsToMaxMessageLimit(): Unit = { - val consumer = mock(classOf[ConsumerWrapper]) - val formatter = mock(classOf[MessageFormatter]) - val record = new ConsumerRecord("foo", 1, 1, Array[Byte](), Array[Byte]()) - - val messageLimit: Int = 10 - when(consumer.receive()).thenReturn(record) - - ConsoleConsumer.process(messageLimit, formatter, consumer, System.out, true) - - verify(consumer, times(messageLimit)).receive() - verify(formatter, times(messageLimit)).writeTo(any(), any()) - - consumer.cleanup() - } - - @Test - def shouldStopWhenOutputCheckErrorFails(): Unit = { - val consumer = mock(classOf[ConsumerWrapper]) - val formatter = mock(classOf[MessageFormatter]) - val printStream = mock(classOf[PrintStream]) - - val record = new ConsumerRecord("foo", 1, 1, Array[Byte](), Array[Byte]()) - - when(consumer.receive()).thenReturn(record) - //Simulate an error on System.out after the first record has been printed - when(printStream.checkError()).thenReturn(true) - - ConsoleConsumer.process(-1, formatter, consumer, printStream, true) - - verify(formatter).writeTo(any(), ArgumentMatchers.eq(printStream)) - verify(consumer).receive() - verify(printStream).checkError() - - consumer.cleanup() - } - - @Test - def shouldParseValidConsumerValidConfig(): Unit = { - //Given - val args: Array[String] = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--from-beginning") - - //When - val config = new ConsoleConsumer.ConsumerConfig(args) - - //Then - assertEquals("localhost:9092", config.bootstrapServer) - assertEquals("test", config.topicArg) - assertEquals(true, config.fromBeginning) - } - - @Test - def shouldParseIncludeArgument(): Unit = { - //Given - val args: Array[String] = Array( - "--bootstrap-server", "localhost:9092", - "--include", "includeTest*", - "--from-beginning") - - //When - val config = new ConsoleConsumer.ConsumerConfig(args) - - //Then - assertEquals("localhost:9092", config.bootstrapServer) - assertEquals("includeTest*", config.includedTopicsArg) - assertEquals(true, config.fromBeginning) - } - - @Test - def shouldParseWhitelistArgument(): Unit = { - //Given - val args: Array[String] = Array( - "--bootstrap-server", "localhost:9092", - "--whitelist", "whitelistTest*", - "--from-beginning") - - //When - val config = new ConsoleConsumer.ConsumerConfig(args) - - //Then - assertEquals("localhost:9092", config.bootstrapServer) - assertEquals("whitelistTest*", config.includedTopicsArg) - assertEquals(true, config.fromBeginning) - } - - @Test - def shouldIgnoreWhitelistArgumentIfIncludeSpecified(): Unit = { - //Given - val args: Array[String] = Array( - "--bootstrap-server", "localhost:9092", - "--include", "includeTest*", - "--whitelist", "whitelistTest*", - "--from-beginning") - - //When - val config = new ConsoleConsumer.ConsumerConfig(args) - - //Then - assertEquals("localhost:9092", config.bootstrapServer) - assertEquals("includeTest*", config.includedTopicsArg) - assertEquals(true, config.fromBeginning) - } - - @Test - def shouldParseValidSimpleConsumerValidConfigWithNumericOffset(): Unit = { - //Given - val args: Array[String] = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--partition", "0", - "--offset", "3") - - //When - val config = new ConsoleConsumer.ConsumerConfig(args) - - //Then - assertEquals("localhost:9092", config.bootstrapServer) - assertEquals("test", config.topicArg) - assertEquals(0, config.partitionArg.get) - assertEquals(3, config.offsetArg) - assertEquals(false, config.fromBeginning) - - } - - @Test - def shouldExitOnUnrecognizedNewConsumerOption(): Unit = { - Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull)) - - //Given - val args: Array[String] = Array( - "--new-consumer", - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--from-beginning") - - try assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args)) - finally Exit.resetExitProcedure() - } - - @Test - def shouldParseValidSimpleConsumerValidConfigWithStringOffset(): Unit = { - //Given - val args: Array[String] = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--partition", "0", - "--offset", "LatEst", - "--property", "print.value=false") - - //When - val config = new ConsoleConsumer.ConsumerConfig(args) - - //Then - assertEquals("localhost:9092", config.bootstrapServer) - assertEquals("test", config.topicArg) - assertEquals(0, config.partitionArg.get) - assertEquals(-1, config.offsetArg) - assertEquals(false, config.fromBeginning) - assertEquals(false, config.formatter.asInstanceOf[DefaultMessageFormatter].printValue) - } - - @Test - def shouldParseValidConsumerConfigWithAutoOffsetResetLatest(): Unit = { - //Given - val args: Array[String] = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--consumer-property", "auto.offset.reset=latest") - - //When - val config = new ConsoleConsumer.ConsumerConfig(args) - val consumerProperties = ConsoleConsumer.consumerProps(config) - - //Then - assertEquals("localhost:9092", config.bootstrapServer) - assertEquals("test", config.topicArg) - assertEquals(false, config.fromBeginning) - assertEquals("latest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) - } - - @Test - def shouldParseValidConsumerConfigWithAutoOffsetResetEarliest(): Unit = { - //Given - val args: Array[String] = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--consumer-property", "auto.offset.reset=earliest") - - //When - val config = new ConsoleConsumer.ConsumerConfig(args) - val consumerProperties = ConsoleConsumer.consumerProps(config) - - //Then - assertEquals("localhost:9092", config.bootstrapServer) - assertEquals("test", config.topicArg) - assertEquals(false, config.fromBeginning) - assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) - } - - @Test - def shouldParseValidConsumerConfigWithAutoOffsetResetAndMatchingFromBeginning(): Unit = { - //Given - val args: Array[String] = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--consumer-property", "auto.offset.reset=earliest", - "--from-beginning") - - //When - val config = new ConsoleConsumer.ConsumerConfig(args) - val consumerProperties = ConsoleConsumer.consumerProps(config) - - //Then - assertEquals("localhost:9092", config.bootstrapServer) - assertEquals("test", config.topicArg) - assertEquals(true, config.fromBeginning) - assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) - } - - @Test - def shouldParseValidConsumerConfigWithNoOffsetReset(): Unit = { - //Given - val args: Array[String] = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test") - - //When - val config = new ConsoleConsumer.ConsumerConfig(args) - val consumerProperties = ConsoleConsumer.consumerProps(config) - - //Then - assertEquals("localhost:9092", config.bootstrapServer) - assertEquals("test", config.topicArg) - assertEquals(false, config.fromBeginning) - assertEquals("latest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) - } - - @Test - def shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginning(): Unit = { - Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull)) - - //Given - val args: Array[String] = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--consumer-property", "auto.offset.reset=latest", - "--from-beginning") - try { - val config = new ConsoleConsumer.ConsumerConfig(args) - assertThrows(classOf[IllegalArgumentException], () => ConsoleConsumer.consumerProps(config)) - } - finally Exit.resetExitProcedure() - } - - @Test - def shouldParseConfigsFromFile(): Unit = { - val propsFile = TestUtils.tempPropertiesFile(Map("request.timeout.ms" -> "1000", "group.id" -> "group1")) - val args: Array[String] = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--consumer.config", propsFile.getAbsolutePath - ) - - val config = new ConsoleConsumer.ConsumerConfig(args) - - assertEquals("1000", config.consumerProps.getProperty("request.timeout.ms")) - assertEquals("group1", config.consumerProps.getProperty("group.id")) - } - - @Test - def groupIdsProvidedInDifferentPlacesMustMatch(): Unit = { - Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull)) - - // different in all three places - var propsFile = TestUtils.tempPropertiesFile(Map("group.id" -> "group-from-file")) - var args: Array[String] = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--group", "group-from-arguments", - "--consumer-property", "group.id=group-from-properties", - "--consumer.config", propsFile.getAbsolutePath - ) - - assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args)) - - // the same in all three places - propsFile = TestUtils.tempPropertiesFile(Map("group.id" -> "test-group")) - args = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--group", "test-group", - "--consumer-property", "group.id=test-group", - "--consumer.config", propsFile.getAbsolutePath - ) - - var config = new ConsoleConsumer.ConsumerConfig(args) - var props = ConsoleConsumer.consumerProps(config) - assertEquals("test-group", props.getProperty("group.id")) - - // different via --consumer-property and --consumer.config - propsFile = TestUtils.tempPropertiesFile(Map("group.id" -> "group-from-file")) - args = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--consumer-property", "group.id=group-from-properties", - "--consumer.config", propsFile.getAbsolutePath - ) - - assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args)) - - // different via --consumer-property and --group - args = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--group", "group-from-arguments", - "--consumer-property", "group.id=group-from-properties" - ) - - assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args)) - - // different via --group and --consumer.config - propsFile = TestUtils.tempPropertiesFile(Map("group.id" -> "group-from-file")) - args = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--group", "group-from-arguments", - "--consumer.config", propsFile.getAbsolutePath - ) - assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args)) - - // via --group only - args = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--group", "group-from-arguments" - ) - - config = new ConsoleConsumer.ConsumerConfig(args) - props = ConsoleConsumer.consumerProps(config) - assertEquals("group-from-arguments", props.getProperty("group.id")) - - Exit.resetExitProcedure() - } - - @Test - def testCustomPropertyShouldBePassedToConfigureMethod(): Unit = { - val args = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--property", "print.key=true", - "--property", "key.deserializer=org.apache.kafka.test.MockDeserializer", - "--property", "key.deserializer.my-props=abc" - ) - val config = new ConsoleConsumer.ConsumerConfig(args) - assertTrue(config.formatter.isInstanceOf[DefaultMessageFormatter]) - assertTrue(config.formatterArgs.containsKey("key.deserializer.my-props")) - val formatter = config.formatter.asInstanceOf[DefaultMessageFormatter] - assertTrue(formatter.keyDeserializer.get.isInstanceOf[MockDeserializer]) - val keyDeserializer = formatter.keyDeserializer.get.asInstanceOf[MockDeserializer] - assertEquals(1, keyDeserializer.configs.size) - assertEquals("abc", keyDeserializer.configs.get("my-props")) - assertTrue(keyDeserializer.isKey) - } - - @Test - def testCustomConfigShouldBePassedToConfigureMethod(): Unit = { - val propsFile = TestUtils.tempPropertiesFile(Map("key.deserializer.my-props" -> "abc", "print.key" -> "false")) - val args = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--property", "print.key=true", - "--property", "key.deserializer=org.apache.kafka.test.MockDeserializer", - "--formatter-config", propsFile.getAbsolutePath - ) - val config = new ConsoleConsumer.ConsumerConfig(args) - assertTrue(config.formatter.isInstanceOf[DefaultMessageFormatter]) - assertTrue(config.formatterArgs.containsKey("key.deserializer.my-props")) - val formatter = config.formatter.asInstanceOf[DefaultMessageFormatter] - assertTrue(formatter.keyDeserializer.get.isInstanceOf[MockDeserializer]) - val keyDeserializer = formatter.keyDeserializer.get.asInstanceOf[MockDeserializer] - assertEquals(1, keyDeserializer.configs.size) - assertEquals("abc", keyDeserializer.configs.get("my-props")) - assertTrue(keyDeserializer.isKey) - } - - @Test - def shouldParseGroupIdFromBeginningGivenTogether(): Unit = { - // Start from earliest - var args: Array[String] = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--group", "test-group", - "--from-beginning") - - var config = new ConsoleConsumer.ConsumerConfig(args) - assertEquals("localhost:9092", config.bootstrapServer) - assertEquals("test", config.topicArg) - assertEquals(-2, config.offsetArg) - assertEquals(true, config.fromBeginning) - - // Start from latest - args = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--group", "test-group" - ) - - config = new ConsoleConsumer.ConsumerConfig(args) - assertEquals("localhost:9092", config.bootstrapServer) - assertEquals("test", config.topicArg) - assertEquals(-1, config.offsetArg) - assertEquals(false, config.fromBeginning) - } - - @Test - def shouldExitOnGroupIdAndPartitionGivenTogether(): Unit = { - Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull)) - //Given - val args: Array[String] = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--group", "test-group", - "--partition", "0") - - try assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args)) - finally Exit.resetExitProcedure() - } - - @Test - def shouldExitOnOffsetWithoutPartition(): Unit = { - Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull)) - //Given - val args: Array[String] = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--offset", "10") - - try assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args)) - finally Exit.resetExitProcedure() - } - - @Test - def testDefaultMessageFormatter(): Unit = { - val record = new ConsumerRecord("topic", 0, 123, "key".getBytes, "value".getBytes) - val formatter = new DefaultMessageFormatter() - val configs: JMap[String, String] = new HashMap() - - formatter.configure(configs) - var out = new ByteArrayOutputStream() - formatter.writeTo(record, new PrintStream(out)) - assertEquals("value\n", out.toString) - - configs.put("print.key", "true") - formatter.configure(configs) - out = new ByteArrayOutputStream() - formatter.writeTo(record, new PrintStream(out)) - assertEquals("key\tvalue\n", out.toString) - - configs.put("print.partition", "true") - formatter.configure(configs) - out = new ByteArrayOutputStream() - formatter.writeTo(record, new PrintStream(out)) - assertEquals("Partition:0\tkey\tvalue\n", out.toString) - - configs.put("print.timestamp", "true") - formatter.configure(configs) - out = new ByteArrayOutputStream() - formatter.writeTo(record, new PrintStream(out)) - assertEquals("NO_TIMESTAMP\tPartition:0\tkey\tvalue\n", out.toString) - - configs.put("print.offset", "true") - formatter.configure(configs) - out = new ByteArrayOutputStream() - formatter.writeTo(record, new PrintStream(out)) - assertEquals("NO_TIMESTAMP\tPartition:0\tOffset:123\tkey\tvalue\n", out.toString) - - out = new ByteArrayOutputStream() - val record2 = new ConsumerRecord("topic", 0, 123, 123L, TimestampType.CREATE_TIME, -1, -1, "key".getBytes, "value".getBytes, - new RecordHeaders(), Optional.empty[Integer]) - formatter.writeTo(record2, new PrintStream(out)) - assertEquals("CreateTime:123\tPartition:0\tOffset:123\tkey\tvalue\n", out.toString) - formatter.close() - } - - @Test - def testNoOpMessageFormatter(): Unit = { - val record = new ConsumerRecord("topic", 0, 123, "key".getBytes, "value".getBytes) - val formatter = new NoOpMessageFormatter() - - formatter.configure(new HashMap()) - val out = new ByteArrayOutputStream() - formatter.writeTo(record, new PrintStream(out)) - assertEquals("", out.toString) - } - - @Test - def shouldExitIfNoTopicOrFilterSpecified(): Unit = { - Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull)) - //Given - val args: Array[String] = Array( - "--bootstrap-server", "localhost:9092") - - try assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args)) - finally Exit.resetExitProcedure() - } - - @Test - def shouldExitIfTopicAndIncludeSpecified(): Unit = { - Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull)) - //Given - val args: Array[String] = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--include", "includeTest*") - - try assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args)) - finally Exit.resetExitProcedure() - } - - @Test - def shouldExitIfTopicAndWhitelistSpecified(): Unit = { - Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull)) - //Given - val args: Array[String] = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--whitelist", "whitelistTest*") - - try assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args)) - finally Exit.resetExitProcedure() - } - - @Test - def testClientIdOverride(): Unit = { - //Given - val args: Array[String] = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--from-beginning", - "--consumer-property", "client.id=consumer-1") - - //When - val config = new ConsoleConsumer.ConsumerConfig(args) - val consumerProperties = ConsoleConsumer.consumerProps(config) - - //Then - assertEquals("consumer-1", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG)) - } - - @Test - def testDefaultClientId(): Unit = { - //Given - val args: Array[String] = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--from-beginning") - - //When - val config = new ConsoleConsumer.ConsumerConfig(args) - val consumerProperties = ConsoleConsumer.consumerProps(config) - - //Then - assertEquals("console-consumer", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG)) - } -} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index 03877c68595..1f3fc98b778 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.integration; -import kafka.tools.ConsoleConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; @@ -62,6 +61,8 @@ import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlySessionStore; import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.TestUtils; +import org.apache.kafka.tools.consumer.ConsoleConsumer; +import org.apache.kafka.tools.consumer.ConsoleConsumerOptions; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -1117,7 +1118,7 @@ public class KStreamAggregationIntegrationTest { final Deserializer valueDeserializer, final Class innerClass, final int numMessages, - final boolean printTimestamp) { + final boolean printTimestamp) throws Exception { final ByteArrayOutputStream newConsole = new ByteArrayOutputStream(); final PrintStream originalStream = System.out; try (final PrintStream newStream = new PrintStream(newConsole)) { @@ -1139,8 +1140,7 @@ public class KStreamAggregationIntegrationTest { "--property", "key.deserializer.window.size.ms=500", }; - ConsoleConsumer.messageCount_$eq(0); //reset the message count - ConsoleConsumer.run(new ConsoleConsumer.ConsumerConfig(args)); + ConsoleConsumer.run(new ConsoleConsumerOptions(args)); newStream.flush(); System.setOut(originalStream); return newConsole.toString(); diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index eb5d3d9f85d..fb87f20df19 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -21,7 +21,7 @@ from ducktape.utils.util import wait_until from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.monitor.jmx import JmxMixin, JmxTool -from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0 +from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0, LATEST_3_7 from kafkatest.services.kafka.util import fix_opts_for_new_jvm """ @@ -210,7 +210,10 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) # LoggingMessageFormatter was introduced after 0.9 if node.version > LATEST_0_9: - cmd += " --formatter kafka.tools.LoggingMessageFormatter" + if node.version > LATEST_3_7: + cmd += " --formatter org.apache.kafka.tools.consumer.LoggingMessageFormatter" + else: + cmd += " --formatter kafka.tools.LoggingMessageFormatter" if self.enable_systest_events: # enable systest events is only available in 0.10.0 and later diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 6d0b1ae329f..3e3da017bc4 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -263,3 +263,7 @@ LATEST_3_5 = V_3_5_2 V_3_6_0 = KafkaVersion("3.6.0") V_3_6_1 = KafkaVersion("3.6.1") LATEST_3_6 = V_3_6_1 + +# 3.7.x version +V_3_7_0 = KafkaVersion("3.7.0") +LATEST_3_7 = V_3_7_0 diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java new file mode 100644 index 00000000000..f84fb88c23f --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import java.io.PrintStream; +import java.time.Duration; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.concurrent.CountDownLatch; +import java.util.regex.Pattern; +import java.util.Collections; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Consumer that dumps messages to standard out. + */ +public class ConsoleConsumer { + + private static final Logger LOG = LoggerFactory.getLogger(ConsoleConsumer.class); + private static final CountDownLatch SHUTDOWN_LATCH = new CountDownLatch(1); + + static int messageCount = 0; + + public static void main(String[] args) throws Exception { + ConsoleConsumerOptions opts = new ConsoleConsumerOptions(args); + try { + run(opts); + } catch (AuthenticationException ae) { + LOG.error("Authentication failed: terminating consumer process", ae); + Exit.exit(1); + } catch (Throwable t) { + LOG.error("Unknown error when running consumer: ", t); + Exit.exit(1); + } + } + + public static void run(ConsoleConsumerOptions opts) { + messageCount = 0; + long timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : Long.MAX_VALUE; + Consumer consumer = new KafkaConsumer<>(opts.consumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer()); + ConsumerWrapper consumerWrapper = opts.partitionArg().isPresent() + ? new ConsumerWrapper(Optional.of(opts.topicArg()), opts.partitionArg(), OptionalLong.of(opts.offsetArg()), Optional.empty(), consumer, timeoutMs) + : new ConsumerWrapper(Optional.of(opts.topicArg()), OptionalInt.empty(), OptionalLong.empty(), Optional.ofNullable(opts.includedTopicsArg()), consumer, timeoutMs); + + addShutdownHook(consumerWrapper, opts); + + try { + process(opts.maxMessages(), opts.formatter(), consumerWrapper, System.out, opts.skipMessageOnError()); + } finally { + consumerWrapper.cleanup(); + opts.formatter().close(); + reportRecordCount(); + + SHUTDOWN_LATCH.countDown(); + } + } + + static void addShutdownHook(ConsumerWrapper consumer, ConsoleConsumerOptions conf) { + Exit.addShutdownHook("consumer-shutdown-hook", () -> { + try { + consumer.wakeup(); + SHUTDOWN_LATCH.await(); + } catch (Throwable t) { + LOG.error("Exception while running shutdown hook: ", t); + } + if (conf.enableSystestEventsLogging()) { + System.out.println("shutdown_complete"); + } + }); + } + + static void process(int maxMessages, MessageFormatter formatter, ConsumerWrapper consumer, PrintStream output, boolean skipMessageOnError) { + while (messageCount < maxMessages || maxMessages == -1) { + ConsumerRecord msg; + try { + msg = consumer.receive(); + } catch (WakeupException we) { + LOG.trace("Caught WakeupException because consumer is shutdown, ignore and terminate."); + // Consumer will be closed + return; + } catch (Throwable t) { + LOG.error("Error processing message, terminating consumer process: ", t); + // Consumer will be closed + return; + } + messageCount += 1; + try { + formatter.writeTo(new ConsumerRecord<>(msg.topic(), msg.partition(), msg.offset(), msg.timestamp(), msg.timestampType(), + 0, 0, msg.key(), msg.value(), msg.headers(), Optional.empty()), output); + } catch (Throwable t) { + if (skipMessageOnError) { + LOG.error("Error processing message, skipping this message: ", t); + } else { + // Consumer will be closed + throw t; + } + } + if (checkErr(output)) { + // Consumer will be closed + return; + } + } + } + + static void reportRecordCount() { + System.err.println("Processed a total of " + messageCount + " messages"); + } + + static boolean checkErr(PrintStream output) { + boolean gotError = output.checkError(); + if (gotError) { + // This means no one is listening to our output stream anymore, time to shut down + System.err.println("Unable to write to standard out, closing consumer."); + } + return gotError; + } + + public static class ConsumerWrapper { + final Optional topic; + final OptionalInt partitionId; + final OptionalLong offset; + final Optional includedTopics; + final Consumer consumer; + final long timeoutMs; + final Time time = Time.SYSTEM; + + Iterator> recordIter = Collections.emptyIterator(); + + public ConsumerWrapper(Optional topic, + OptionalInt partitionId, + OptionalLong offset, + Optional includedTopics, + Consumer consumer, + long timeoutMs) { + this.topic = topic; + this.partitionId = partitionId; + this.offset = offset; + this.includedTopics = includedTopics; + this.consumer = consumer; + this.timeoutMs = timeoutMs; + + if (topic.isPresent() && partitionId.isPresent() && offset.isPresent() && !includedTopics.isPresent()) { + seek(topic.get(), partitionId.getAsInt(), offset.getAsLong()); + } else if (topic.isPresent() && partitionId.isPresent() && !offset.isPresent() && !includedTopics.isPresent()) { + // default to latest if no offset is provided + seek(topic.get(), partitionId.getAsInt(), ListOffsetsRequest.LATEST_TIMESTAMP); + } else if (topic.isPresent() && !partitionId.isPresent() && !offset.isPresent() && !includedTopics.isPresent()) { + consumer.subscribe(Collections.singletonList(topic.get())); + } else if (!topic.isPresent() && !partitionId.isPresent() && !offset.isPresent() && includedTopics.isPresent()) { + consumer.subscribe(Pattern.compile(includedTopics.get())); + } else { + throw new IllegalArgumentException("An invalid combination of arguments is provided. " + + "Exactly one of 'topic' or 'include' must be provided. " + + "If 'topic' is provided, an optional 'partition' may also be provided. " + + "If 'partition' is provided, an optional 'offset' may also be provided, otherwise, consumption starts from the end of the partition."); + } + } + + private void seek(String topic, int partitionId, long offset) { + TopicPartition topicPartition = new TopicPartition(topic, partitionId); + consumer.assign(Collections.singletonList(topicPartition)); + if (offset == ListOffsetsRequest.EARLIEST_TIMESTAMP) { + consumer.seekToBeginning(Collections.singletonList(topicPartition)); + } else if (offset == ListOffsetsRequest.LATEST_TIMESTAMP) { + consumer.seekToEnd(Collections.singletonList(topicPartition)); + } else { + consumer.seek(topicPartition, offset); + } + } + + void resetUnconsumedOffsets() { + Map smallestUnconsumedOffsets = new HashMap<>(); + while (recordIter.hasNext()) { + ConsumerRecord record = recordIter.next(); + TopicPartition tp = new TopicPartition(record.topic(), record.partition()); + // avoid auto-committing offsets which haven't been consumed + smallestUnconsumedOffsets.putIfAbsent(tp, record.offset()); + } + smallestUnconsumedOffsets.forEach(consumer::seek); + } + + ConsumerRecord receive() { + long startTimeMs = time.milliseconds(); + while (!recordIter.hasNext()) { + recordIter = consumer.poll(Duration.ofMillis(timeoutMs)).iterator(); + if (!recordIter.hasNext() && (time.milliseconds() - startTimeMs > timeoutMs)) { + throw new TimeoutException(); + } + } + return recordIter.next(); + } + + void wakeup() { + this.consumer.wakeup(); + } + + void cleanup() { + resetUnconsumedOffsets(); + this.consumer.close(); + } + } +} diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java new file mode 100644 index 00000000000..a713afb2bf2 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import joptsimple.OptionException; +import joptsimple.OptionSpec; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.OptionalInt; +import java.util.Properties; +import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; + +public final class ConsoleConsumerOptions extends CommandDefaultOptions { + + private static final Random RANDOM = new Random(); + + private final OptionSpec topicOpt; + private final OptionSpec whitelistOpt; + private final OptionSpec includeOpt; + private final OptionSpec partitionIdOpt; + private final OptionSpec offsetOpt; + private final OptionSpec messageFormatterOpt; + private final OptionSpec messageFormatterArgOpt; + private final OptionSpec messageFormatterConfigOpt; + private final OptionSpec resetBeginningOpt; + private final OptionSpec maxMessagesOpt; + private final OptionSpec timeoutMsOpt; + private final OptionSpec skipMessageOnErrorOpt; + private final OptionSpec bootstrapServerOpt; + private final OptionSpec keyDeserializerOpt; + private final OptionSpec valueDeserializerOpt; + private final OptionSpec enableSystestEventsLoggingOpt; + private final OptionSpec isolationLevelOpt; + private final OptionSpec groupIdOpt; + + private final Properties consumerProps; + private final long offset; + private final MessageFormatter formatter; + + public ConsoleConsumerOptions(String[] args) throws IOException { + super(args); + topicOpt = parser.accepts("topic", "The topic to consume on.") + .withRequiredArg() + .describedAs("topic") + .ofType(String.class); + whitelistOpt = parser.accepts("whitelist", + "DEPRECATED, use --include instead; ignored if --include specified. Regular expression specifying list of topics to include for consumption.") + .withRequiredArg() + .describedAs("Java regex (String)") + .ofType(String.class); + includeOpt = parser.accepts("include", + "Regular expression specifying list of topics to include for consumption.") + .withRequiredArg() + .describedAs("Java regex (String)") + .ofType(String.class); + partitionIdOpt = parser.accepts("partition", + "The partition to consume from. Consumption starts from the end of the partition unless '--offset' is specified.") + .withRequiredArg() + .describedAs("partition") + .ofType(Integer.class); + offsetOpt = parser.accepts("offset", "The offset to consume from (a non-negative number), or 'earliest' which means from beginning, or 'latest' which means from end") + .withRequiredArg() + .describedAs("consume offset") + .ofType(String.class) + .defaultsTo("latest"); + OptionSpec consumerPropertyOpt = parser.accepts("consumer-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.") + .withRequiredArg() + .describedAs("consumer_prop") + .ofType(String.class); + OptionSpec consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file. Note that " + consumerPropertyOpt + " takes precedence over this config.") + .withRequiredArg() + .describedAs("config file") + .ofType(String.class); + messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.") + .withRequiredArg() + .describedAs("class") + .ofType(String.class) + .defaultsTo(DefaultMessageFormatter.class.getName()); + messageFormatterArgOpt = parser.accepts("property", + "The properties to initialize the message formatter. Default properties include: \n" + + " print.timestamp=true|false\n" + + " print.key=true|false\n" + + " print.offset=true|false\n" + + " print.partition=true|false\n" + + " print.headers=true|false\n" + + " print.value=true|false\n" + + " key.separator=\n" + + " line.separator=\n" + + " headers.separator=\n" + + " null.literal=\n" + + " key.deserializer=\n" + + " value.deserializer=\n" + + " header.deserializer=\n" + + "\nUsers can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers.") + .withRequiredArg() + .describedAs("prop") + .ofType(String.class); + messageFormatterConfigOpt = parser.accepts("formatter-config", "Config properties file to initialize the message formatter. Note that " + messageFormatterArgOpt + " takes precedence over this config.") + .withRequiredArg() + .describedAs("config file") + .ofType(String.class); + resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " + + "start with the earliest message present in the log rather than the latest message."); + maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.") + .withRequiredArg() + .describedAs("num_messages") + .ofType(Integer.class); + timeoutMsOpt = parser.accepts("timeout-ms", "If specified, exit if no message is available for consumption for the specified interval.") + .withRequiredArg() + .describedAs("timeout_ms") + .ofType(Integer.class); + skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " + + "skip it instead of halt."); + bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The server(s) to connect to.") + .withRequiredArg() + .describedAs("server to connect to") + .ofType(String.class); + keyDeserializerOpt = parser.accepts("key-deserializer") + .withRequiredArg() + .describedAs("deserializer for key") + .ofType(String.class); + valueDeserializerOpt = parser.accepts("value-deserializer") + .withRequiredArg() + .describedAs("deserializer for values") + .ofType(String.class); + enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events", + "Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.)"); + isolationLevelOpt = parser.accepts("isolation-level", + "Set to read_committed in order to filter out transactional messages which are not committed. Set to read_uncommitted " + + "to read all messages.") + .withRequiredArg() + .ofType(String.class) + .defaultsTo("read_uncommitted"); + groupIdOpt = parser.accepts("group", "The consumer group id of the consumer.") + .withRequiredArg() + .describedAs("consumer group id") + .ofType(String.class); + + try { + options = parser.parse(args); + } catch (OptionException oe) { + CommandLineUtils.printUsageAndExit(parser, oe.getMessage()); + } + + CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read data from Kafka topics and outputs it to standard output."); + + checkRequiredArgs(); + + Properties consumerPropsFromFile = options.has(consumerConfigOpt) + ? Utils.loadProps(options.valueOf(consumerConfigOpt)) + : new Properties(); + Properties extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt)); + Set groupIdsProvided = checkConsumerGroup(consumerPropsFromFile, extraConsumerProps); + consumerProps = buildConsumerProps(consumerPropsFromFile, extraConsumerProps, groupIdsProvided); + offset = parseOffset(); + formatter = buildFormatter(); + } + + private void checkRequiredArgs() { + List topicOrFilterArgs = new ArrayList<>(Arrays.asList(topicArg(), includedTopicsArg())); + topicOrFilterArgs.removeIf(Objects::isNull); + // user need to specify value for either --topic or one of the include filters options (--include or --whitelist) + if (topicOrFilterArgs.size() != 1) { + CommandLineUtils.printUsageAndExit(parser, "Exactly one of --include/--topic is required. " + + (options.has(whitelistOpt) ? "--whitelist is DEPRECATED use --include instead; ignored if --include specified." : "")); + } + + if (partitionArg().isPresent()) { + if (!options.has(topicOpt)) { + CommandLineUtils.printUsageAndExit(parser, "The topic is required when partition is specified."); + } + if (fromBeginning() && options.has(offsetOpt)) { + CommandLineUtils.printUsageAndExit(parser, "Options from-beginning and offset cannot be specified together."); + } + } else if (options.has(offsetOpt)) { + CommandLineUtils.printUsageAndExit(parser, "The partition is required when offset is specified."); + } + + CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt); + } + + private Set checkConsumerGroup(Properties consumerPropsFromFile, Properties extraConsumerProps) { + // if the group id is provided in more than place (through different means) all values must be the same + Set groupIdsProvided = new HashSet<>(); + if (options.has(groupIdOpt)) { + groupIdsProvided.add(options.valueOf(groupIdOpt)); + } + + if (consumerPropsFromFile.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { + groupIdsProvided.add((String) consumerPropsFromFile.get(ConsumerConfig.GROUP_ID_CONFIG)); + } + + if (extraConsumerProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { + groupIdsProvided.add(extraConsumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG)); + } + if (groupIdsProvided.size() > 1) { + CommandLineUtils.printUsageAndExit(parser, "The group ids provided in different places (directly using '--group', " + + "via '--consumer-property', or via '--consumer.config') do not match. " + + "Detected group ids: " + + groupIdsProvided.stream().map(group -> "'" + group + "'").collect(Collectors.joining(", "))); + } + if (!groupIdsProvided.isEmpty() && partitionArg().isPresent()) { + CommandLineUtils.printUsageAndExit(parser, "Options group and partition cannot be specified together."); + } + return groupIdsProvided; + } + + private Properties buildConsumerProps(Properties consumerPropsFromFile, Properties extraConsumerProps, Set groupIdsProvided) { + Properties consumerProps = new Properties(); + consumerProps.putAll(consumerPropsFromFile); + consumerProps.putAll(extraConsumerProps); + setAutoOffsetResetValue(consumerProps); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer()); + if (consumerProps.getProperty(ConsumerConfig.CLIENT_ID_CONFIG) == null) { + consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "console-consumer"); + } + CommandLineUtils.maybeMergeOptions(consumerProps, ConsumerConfig.ISOLATION_LEVEL_CONFIG, options, isolationLevelOpt); + + if (groupIdsProvided.isEmpty()) { + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "console-consumer-" + RANDOM.nextInt(100000)); + // By default, avoid unnecessary expansion of the coordinator cache since + // the auto-generated group and its offsets is not intended to be used again + if (!consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + } + } else { + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdsProvided.iterator().next()); + } + return consumerProps; + } + + /** + * Used to retrieve the correct value for the consumer parameter 'auto.offset.reset'. + * Order of priority is: + * 1. Explicitly set parameter via --consumer.property command line parameter + * 2. Explicit --from-beginning given -> 'earliest' + * 3. Default value of 'latest' + * In case both --from-beginning and an explicit value are specified an error is thrown if these + * are conflicting. + */ + private void setAutoOffsetResetValue(Properties props) { + String earliestConfigValue = "earliest"; + String latestConfigValue = "latest"; + + if (props.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) { + // auto.offset.reset parameter was specified on the command line + String autoResetOption = props.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); + if (fromBeginning() && !earliestConfigValue.equals(autoResetOption)) { + // conflicting options - latest und earliest, throw an error + System.err.println("Can't simultaneously specify --from-beginning and 'auto.offset.reset=" + autoResetOption + "', " + + "please remove one option"); + Exit.exit(1); + } + // nothing to do, checking for valid parameter values happens later and the specified + // value was already copied during .putall operation + } else { + // no explicit value for auto.offset.reset was specified + // if --from-beginning was specified use earliest, otherwise default to latest + String autoResetOption = fromBeginning() ? earliestConfigValue : latestConfigValue; + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoResetOption); + } + } + + private long parseOffset() { + if (options.has(offsetOpt)) { + switch (options.valueOf(offsetOpt).toLowerCase(Locale.ROOT)) { + case "earliest": + return ListOffsetsRequest.EARLIEST_TIMESTAMP; + case "latest": + return ListOffsetsRequest.LATEST_TIMESTAMP; + default: + String offsetString = options.valueOf(offsetOpt); + try { + long offset = Long.parseLong(offsetString); + if (offset < 0) { + invalidOffset(offsetString); + } + return offset; + } catch (NumberFormatException nfe) { + invalidOffset(offsetString); + } + } + } else if (fromBeginning()) { + return ListOffsetsRequest.EARLIEST_TIMESTAMP; + } + return ListOffsetsRequest.LATEST_TIMESTAMP; + } + + private void invalidOffset(String offset) { + CommandLineUtils.printUsageAndExit(parser, "The provided offset value '" + offset + "' is incorrect. Valid values are " + + "'earliest', 'latest', or a non-negative long."); + } + + private MessageFormatter buildFormatter() { + MessageFormatter formatter = null; + try { + Class messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)); + formatter = (MessageFormatter) messageFormatterClass.getDeclaredConstructor().newInstance(); + + Properties formatterArgs = formatterArgs(); + Map formatterConfigs = new HashMap<>(); + for (final String name : formatterArgs.stringPropertyNames()) { + formatterConfigs.put(name, formatterArgs.getProperty(name)); + } + + formatter.configure(formatterConfigs); + + } catch (Exception e) { + CommandLineUtils.printUsageAndExit(parser, e.getMessage()); + } + return formatter; + } + + Properties consumerProps() { + return consumerProps; + } + + boolean fromBeginning() { + return options.has(resetBeginningOpt); + } + + long offsetArg() { + return offset; + } + + boolean skipMessageOnError() { + return options.has(skipMessageOnErrorOpt); + } + + OptionalInt partitionArg() { + if (options.has(partitionIdOpt)) { + return OptionalInt.of(options.valueOf(partitionIdOpt)); + } + return OptionalInt.empty(); + } + + String topicArg() { + return options.valueOf(topicOpt); + } + + int maxMessages() { + return options.has(maxMessagesOpt) ? options.valueOf(maxMessagesOpt) : -1; + } + + int timeoutMs() { + return options.has(timeoutMsOpt) ? options.valueOf(timeoutMsOpt) : -1; + } + + boolean enableSystestEventsLogging() { + return options.has(enableSystestEventsLoggingOpt); + } + + String bootstrapServer() { + return options.valueOf(bootstrapServerOpt); + } + + String includedTopicsArg() { + return options.has(includeOpt) + ? options.valueOf(includeOpt) + : options.valueOf(whitelistOpt); + } + + Properties formatterArgs() throws IOException { + Properties formatterArgs = options.has(messageFormatterConfigOpt) + ? Utils.loadProps(options.valueOf(messageFormatterConfigOpt)) + : new Properties(); + String keyDeserializer = options.valueOf(keyDeserializerOpt); + if (keyDeserializer != null && !keyDeserializer.isEmpty()) { + formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer); + } + String valueDeserializer = options.valueOf(valueDeserializerOpt); + if (valueDeserializer != null && !valueDeserializer.isEmpty()) { + formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer); + } + formatterArgs.putAll(CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt))); + + return formatterArgs; + } + + MessageFormatter formatter() { + return formatter; + } +} diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/DefaultMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/DefaultMessageFormatter.java new file mode 100644 index 00000000000..402deb8358e --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/DefaultMessageFormatter.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.serialization.Deserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; + +class DefaultMessageFormatter implements MessageFormatter { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultMessageFormatter.class); + + private boolean printTimestamp = false; + private boolean printKey = false; + private boolean printValue = true; + private boolean printPartition = false; + private boolean printOffset = false; + private boolean printHeaders = false; + private byte[] keySeparator = utfBytes("\t"); + private byte[] lineSeparator = utfBytes("\n"); + private byte[] headersSeparator = utfBytes(","); + private byte[] nullLiteral = utfBytes("null"); + + private Optional> keyDeserializer = Optional.empty(); + private Optional> valueDeserializer = Optional.empty(); + private Optional> headersDeserializer = Optional.empty(); + + @Override + public void configure(Map configs) { + if (configs.containsKey("print.timestamp")) { + printTimestamp = getBoolProperty(configs, "print.timestamp"); + } + if (configs.containsKey("print.key")) { + printKey = getBoolProperty(configs, "print.key"); + } + if (configs.containsKey("print.offset")) { + printOffset = getBoolProperty(configs, "print.offset"); + } + if (configs.containsKey("print.partition")) { + printPartition = getBoolProperty(configs, "print.partition"); + } + if (configs.containsKey("print.headers")) { + printHeaders = getBoolProperty(configs, "print.headers"); + } + if (configs.containsKey("print.value")) { + printValue = getBoolProperty(configs, "print.value"); + } + if (configs.containsKey("key.separator")) { + keySeparator = getByteProperty(configs, "key.separator"); + } + if (configs.containsKey("line.separator")) { + lineSeparator = getByteProperty(configs, "line.separator"); + } + if (configs.containsKey("headers.separator")) { + headersSeparator = getByteProperty(configs, "headers.separator"); + } + if (configs.containsKey("null.literal")) { + nullLiteral = getByteProperty(configs, "null.literal"); + } + + keyDeserializer = getDeserializerProperty(configs, true, "key.deserializer"); + valueDeserializer = getDeserializerProperty(configs, false, "value.deserializer"); + headersDeserializer = getDeserializerProperty(configs, false, "headers.deserializer"); + } + + // for testing + public boolean printValue() { + return printValue; + } + + // for testing + public Optional> keyDeserializer() { + return keyDeserializer; + } + + private void writeSeparator(PrintStream output, boolean columnSeparator) { + try { + if (columnSeparator) { + output.write(keySeparator); + } else { + output.write(lineSeparator); + } + } catch (IOException ioe) { + LOG.error("Unable to write the separator to the output", ioe); + } + } + + private byte[] deserialize(ConsumerRecord consumerRecord, Optional> deserializer, byte[] sourceBytes, String topic) { + byte[] nonNullBytes = sourceBytes != null ? sourceBytes : nullLiteral; + return deserializer.map(value -> utfBytes(value.deserialize(topic, consumerRecord.headers(), nonNullBytes).toString())).orElse(nonNullBytes); + } + + @Override + public void writeTo(ConsumerRecord consumerRecord, PrintStream output) { + try { + if (printTimestamp) { + if (consumerRecord.timestampType() != TimestampType.NO_TIMESTAMP_TYPE) { + output.print(consumerRecord.timestampType() + ":" + consumerRecord.timestamp()); + } else { + output.print("NO_TIMESTAMP"); + } + writeSeparator(output, printOffset || printPartition || printHeaders || printKey || printValue); + } + + if (printPartition) { + output.print("Partition:"); + output.print(consumerRecord.partition()); + writeSeparator(output, printOffset || printHeaders || printKey || printValue); + } + + if (printOffset) { + output.print("Offset:"); + output.print(consumerRecord.offset()); + writeSeparator(output, printHeaders || printKey || printValue); + } + + if (printHeaders) { + Iterator
headersIt = consumerRecord.headers().iterator(); + if (!headersIt.hasNext()) { + output.print("NO_HEADERS"); + } else { + while (headersIt.hasNext()) { + Header header = headersIt.next(); + output.print(header.key() + ":"); + output.write(deserialize(consumerRecord, headersDeserializer, header.value(), consumerRecord.topic())); + if (headersIt.hasNext()) { + output.write(headersSeparator); + } + } + } + writeSeparator(output, printKey || printValue); + } + + if (printKey) { + output.write(deserialize(consumerRecord, keyDeserializer, consumerRecord.key(), consumerRecord.topic())); + writeSeparator(output, printValue); + } + + if (printValue) { + output.write(deserialize(consumerRecord, valueDeserializer, consumerRecord.value(), consumerRecord.topic())); + output.write(lineSeparator); + } + } catch (IOException ioe) { + LOG.error("Unable to write the consumer record to the output", ioe); + } + } + + private Map propertiesWithKeyPrefixStripped(String prefix, Map configs) { + final Map newConfigs = new HashMap<>(); + for (Map.Entry entry : configs.entrySet()) { + if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) { + newConfigs.put(entry.getKey().substring(prefix.length()), entry.getValue()); + } + } + return newConfigs; + } + + private byte[] utfBytes(String str) { + return str.getBytes(StandardCharsets.UTF_8); + } + + private byte[] getByteProperty(Map configs, String key) { + return utfBytes((String) configs.get(key)); + } + + private boolean getBoolProperty(Map configs, String key) { + return ((String) configs.get(key)).trim().equalsIgnoreCase("true"); + } + + private Optional> getDeserializerProperty(Map configs, boolean isKey, String key) { + if (configs.containsKey(key)) { + String name = (String) configs.get(key); + try { + Deserializer deserializer = (Deserializer) Class.forName(name).getDeclaredConstructor().newInstance(); + Map deserializerConfig = propertiesWithKeyPrefixStripped(key + ".", configs); + deserializer.configure(deserializerConfig, isKey); + return Optional.of(deserializer); + } catch (Exception e) { + LOG.error("Unable to instantiate a deserializer from " + name, e); + } + } + return Optional.empty(); + } +} diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/LoggingMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/LoggingMessageFormatter.java new file mode 100644 index 00000000000..f010bbaea9b --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/LoggingMessageFormatter.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.record.TimestampType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +class LoggingMessageFormatter implements MessageFormatter { + + private static final Logger LOG = LoggerFactory.getLogger(LoggingMessageFormatter.class); + private final DefaultMessageFormatter defaultWriter = new DefaultMessageFormatter(); + + @Override + public void configure(Map configs) { + defaultWriter.configure(configs); + } + + @Override + public void writeTo(ConsumerRecord consumerRecord, PrintStream output) { + defaultWriter.writeTo(consumerRecord, output); + String timestamp = consumerRecord.timestampType() != TimestampType.NO_TIMESTAMP_TYPE + ? consumerRecord.timestampType() + ":" + consumerRecord.timestamp() + ", " + : ""; + String key = "key:" + (consumerRecord.key() == null ? "null " : new String(consumerRecord.key(), StandardCharsets.UTF_8) + ", "); + String value = "value:" + (consumerRecord.value() == null ? "null" : new String(consumerRecord.value(), StandardCharsets.UTF_8)); + LOG.info(timestamp + key + value); + } +} diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/NoOpMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/NoOpMessageFormatter.java new file mode 100644 index 00000000000..cba9e9b6728 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/NoOpMessageFormatter.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MessageFormatter; + +import java.io.PrintStream; + +class NoOpMessageFormatter implements MessageFormatter { + + @Override + public void writeTo(ConsumerRecord consumerRecord, PrintStream output) { + + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java b/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java index 9b698b0cc5f..fdc732ea29a 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java +++ b/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java @@ -28,6 +28,8 @@ import org.apache.kafka.common.utils.Exit; import org.apache.kafka.storage.internals.log.LogConfig; import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; import java.util.Arrays; @@ -198,6 +200,14 @@ public class ToolsTestUtils { .collect(Collectors.joining(",")); } + public static File tempPropertiesFile(Map properties) throws IOException { + StringBuilder sb = new StringBuilder(); + for (Map.Entry entry : properties.entrySet()) { + sb.append(entry.getKey() + "=" + entry.getValue() + System.lineSeparator()); + } + return org.apache.kafka.test.TestUtils.tempFile(sb.toString()); + } + public static class MockExitProcedure implements Exit.Procedure { private boolean hasExited = false; private int statusCode; diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java new file mode 100644 index 00000000000..523122c4cdd --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java @@ -0,0 +1,621 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.test.MockDeserializer; +import org.apache.kafka.tools.ToolsTestUtils; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ConsoleConsumerOptionsTest { + + @Test + public void shouldParseValidConsumerValidConfig() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--from-beginning" + }; + + ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); + + assertEquals("localhost:9092", config.bootstrapServer()); + assertEquals("test", config.topicArg()); + assertTrue(config.fromBeginning()); + assertFalse(config.enableSystestEventsLogging()); + assertFalse(config.skipMessageOnError()); + assertEquals(-1, config.maxMessages()); + assertEquals(-1, config.timeoutMs()); + } + + @Test + public void shouldParseIncludeArgument() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--include", "includeTest*", + "--from-beginning" + }; + + ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); + + assertEquals("localhost:9092", config.bootstrapServer()); + assertEquals("includeTest*", config.includedTopicsArg()); + assertTrue(config.fromBeginning()); + } + + @Test + public void shouldParseWhitelistArgument() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--whitelist", "whitelistTest*", + "--from-beginning" + }; + + ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); + + assertEquals("localhost:9092", config.bootstrapServer()); + assertEquals("whitelistTest*", config.includedTopicsArg()); + assertTrue(config.fromBeginning()); + } + + @Test + public void shouldIgnoreWhitelistArgumentIfIncludeSpecified() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--include", "includeTest*", + "--whitelist", "whitelistTest*", + "--from-beginning" + }; + ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); + assertEquals("localhost:9092", config.bootstrapServer()); + assertEquals("includeTest*", config.includedTopicsArg()); + assertTrue(config.fromBeginning()); + } + + @Test + public void shouldParseValidSimpleConsumerValidConfigWithNumericOffset() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--partition", "0", + "--offset", "3" + }; + + ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); + + assertEquals("localhost:9092", config.bootstrapServer()); + assertEquals("test", config.topicArg()); + assertTrue(config.partitionArg().isPresent()); + assertEquals(0, config.partitionArg().getAsInt()); + assertEquals(3, config.offsetArg()); + assertFalse(config.fromBeginning()); + } + + @Test + public void shouldExitOnUnrecognizedNewConsumerOption() { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + String[] args = new String[]{ + "--new-consumer", + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--from-beginning" + }; + + try { + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args)); + } finally { + Exit.resetExitProcedure(); + } + } + + @Test + public void shouldExitIfPartitionButNoTopic() { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--include", "test.*", + "--partition", "0" + }; + + try { + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args)); + } finally { + Exit.resetExitProcedure(); + } + } + + @Test + public void shouldExitIfFromBeginningAndOffset() { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--partition", "0", + "--from-beginning", + "--offset", "123" + }; + + try { + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args)); + } finally { + Exit.resetExitProcedure(); + } + } + + @Test + public void shouldParseValidSimpleConsumerValidConfigWithStringOffset() throws Exception { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--partition", "0", + "--offset", "LatEst", + "--property", "print.value=false" + }; + + ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); + + assertEquals("localhost:9092", config.bootstrapServer()); + assertEquals("test", config.topicArg()); + assertTrue(config.partitionArg().isPresent()); + assertEquals(0, config.partitionArg().getAsInt()); + assertEquals(-1, config.offsetArg()); + assertFalse(config.fromBeginning()); + assertFalse(((DefaultMessageFormatter) config.formatter()).printValue()); + } + + @Test + public void shouldParseValidConsumerConfigWithAutoOffsetResetLatest() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--consumer-property", "auto.offset.reset=latest" + }; + + ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); + Properties consumerProperties = config.consumerProps(); + + assertEquals("localhost:9092", config.bootstrapServer()); + assertEquals("test", config.topicArg()); + assertFalse(config.fromBeginning()); + assertEquals("latest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); + } + + @Test + public void shouldParseValidConsumerConfigWithAutoOffsetResetEarliest() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--consumer-property", "auto.offset.reset=earliest" + }; + + ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); + Properties consumerProperties = config.consumerProps(); + + assertEquals("localhost:9092", config.bootstrapServer()); + assertEquals("test", config.topicArg()); + assertFalse(config.fromBeginning()); + assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); + } + + @Test + public void shouldParseValidConsumerConfigWithAutoOffsetResetAndMatchingFromBeginning() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--consumer-property", "auto.offset.reset=earliest", + "--from-beginning" + }; + + ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); + Properties consumerProperties = config.consumerProps(); + + assertEquals("localhost:9092", config.bootstrapServer()); + assertEquals("test", config.topicArg()); + assertTrue(config.fromBeginning()); + assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); + } + + @Test + public void shouldParseValidConsumerConfigWithNoOffsetReset() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test" + }; + + ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); + Properties consumerProperties = config.consumerProps(); + + assertEquals("localhost:9092", config.bootstrapServer()); + assertEquals("test", config.topicArg()); + assertFalse(config.fromBeginning()); + assertEquals("latest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); + } + + @Test + public void shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginning() { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--consumer-property", "auto.offset.reset=latest", + "--from-beginning" + }; + try { + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args)); + } finally { + Exit.resetExitProcedure(); + } + } + + @Test + public void shouldParseConfigsFromFile() throws IOException { + Map configs = new HashMap<>(); + configs.put("request.timeout.ms", "1000"); + configs.put("group.id", "group1"); + File propsFile = ToolsTestUtils.tempPropertiesFile(configs); + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--consumer.config", propsFile.getAbsolutePath() + }; + + ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); + assertEquals("1000", config.consumerProps().get("request.timeout.ms")); + assertEquals("group1", config.consumerProps().get("group.id")); + } + + @Test + public void groupIdsProvidedInDifferentPlacesMustMatch() throws IOException { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + // different in all three places + File propsFile = ToolsTestUtils.tempPropertiesFile(Collections.singletonMap("group.id", "group-from-file")); + final String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments", + "--consumer-property", "group.id=group-from-properties", + "--consumer.config", propsFile.getAbsolutePath() + }; + + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args)); + + // the same in all three places + propsFile = ToolsTestUtils.tempPropertiesFile(Collections.singletonMap("group.id", "test-group")); + final String[] args1 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "test-group", + "--consumer-property", "group.id=test-group", + "--consumer.config", propsFile.getAbsolutePath() + }; + + ConsoleConsumerOptions config = new ConsoleConsumerOptions(args1); + Properties props = config.consumerProps(); + assertEquals("test-group", props.getProperty("group.id")); + + // different via --consumer-property and --consumer.config + propsFile = ToolsTestUtils.tempPropertiesFile(Collections.singletonMap("group.id", "group-from-file")); + final String[] args2 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--consumer-property", "group.id=group-from-properties", + "--consumer.config", propsFile.getAbsolutePath() + }; + + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args2)); + + // different via --consumer-property and --group + final String[] args3 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments", + "--consumer-property", "group.id=group-from-properties" + }; + + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args3)); + + // different via --group and --consumer.config + propsFile = ToolsTestUtils.tempPropertiesFile(Collections.singletonMap("group.id", "group-from-file")); + final String[] args4 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments", + "--consumer.config", propsFile.getAbsolutePath() + }; + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args4)); + + // via --group only + final String[] args5 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments" + }; + + config = new ConsoleConsumerOptions(args5); + props = config.consumerProps(); + assertEquals("group-from-arguments", props.getProperty("group.id")); + + Exit.resetExitProcedure(); + } + + @Test + public void testCustomPropertyShouldBePassedToConfigureMethod() throws Exception { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--property", "print.key=true", + "--property", "key.deserializer=org.apache.kafka.test.MockDeserializer", + "--property", "key.deserializer.my-props=abc" + }; + + ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); + + assertTrue(config.formatter() instanceof DefaultMessageFormatter); + assertTrue(config.formatterArgs().containsKey("key.deserializer.my-props")); + DefaultMessageFormatter formatter = (DefaultMessageFormatter) config.formatter(); + assertTrue(formatter.keyDeserializer().isPresent()); + assertTrue(formatter.keyDeserializer().get() instanceof MockDeserializer); + MockDeserializer keyDeserializer = (MockDeserializer) formatter.keyDeserializer().get(); + assertEquals(1, keyDeserializer.configs.size()); + assertEquals("abc", keyDeserializer.configs.get("my-props")); + assertTrue(keyDeserializer.isKey); + } + + @Test + public void testCustomConfigShouldBePassedToConfigureMethod() throws Exception { + Map configs = new HashMap<>(); + configs.put("key.deserializer.my-props", "abc"); + configs.put("print.key", "false"); + File propsFile = ToolsTestUtils.tempPropertiesFile(configs); + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--property", "print.key=true", + "--property", "key.deserializer=org.apache.kafka.test.MockDeserializer", + "--formatter-config", propsFile.getAbsolutePath() + }; + + ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); + + assertTrue(config.formatter() instanceof DefaultMessageFormatter); + assertTrue(config.formatterArgs().containsKey("key.deserializer.my-props")); + DefaultMessageFormatter formatter = (DefaultMessageFormatter) config.formatter(); + assertTrue(formatter.keyDeserializer().isPresent()); + assertTrue(formatter.keyDeserializer().get() instanceof MockDeserializer); + MockDeserializer keyDeserializer = (MockDeserializer) formatter.keyDeserializer().get(); + assertEquals(1, keyDeserializer.configs.size()); + assertEquals("abc", keyDeserializer.configs.get("my-props")); + assertTrue(keyDeserializer.isKey); + } + + @Test + public void shouldParseGroupIdFromBeginningGivenTogether() throws IOException { + // Start from earliest + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "test-group", + "--from-beginning" + }; + + ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); + assertEquals("localhost:9092", config.bootstrapServer()); + assertEquals("test", config.topicArg()); + assertEquals(-2, config.offsetArg()); + assertTrue(config.fromBeginning()); + + // Start from latest + args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "test-group" + }; + + config = new ConsoleConsumerOptions(args); + assertEquals("localhost:9092", config.bootstrapServer()); + assertEquals("test", config.topicArg()); + assertEquals(-1, config.offsetArg()); + assertFalse(config.fromBeginning()); + } + + @Test + public void shouldExitOnGroupIdAndPartitionGivenTogether() { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "test-group", + "--partition", "0" + }; + + try { + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args)); + } finally { + Exit.resetExitProcedure(); + } + } + + @Test + public void shouldExitOnOffsetWithoutPartition() { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--offset", "10" + }; + + try { + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args)); + } finally { + Exit.resetExitProcedure(); + } + } + + @Test + public void shouldExitIfNoTopicOrFilterSpecified() { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092" + }; + + try { + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args)); + } finally { + Exit.resetExitProcedure(); + } + } + + @Test + public void shouldExitIfTopicAndIncludeSpecified() { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--include", "includeTest*" + }; + + try { + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args)); + } finally { + Exit.resetExitProcedure(); + } + } + + @Test + public void shouldExitIfTopicAndWhitelistSpecified() { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--whitelist", "whitelistTest*" + }; + + try { + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args)); + } finally { + Exit.resetExitProcedure(); + } + } + + @Test + public void testClientIdOverride() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--from-beginning", + "--consumer-property", "client.id=consumer-1" + }; + + ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); + Properties consumerProperties = config.consumerProps(); + + assertEquals("consumer-1", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); + } + + @Test + public void testDefaultClientId() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--from-beginning" + }; + + ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); + Properties consumerProperties = config.consumerProps(); + + assertEquals("console-consumer", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); + } + + @Test + public void testParseOffset() throws Exception { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + try { + final String[] badOffset = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--partition", "0", + "--offset", "bad" + }; + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(badOffset)); + + final String[] negativeOffset = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--partition", "0", + "--offset", "-100" + }; + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(negativeOffset)); + + final String[] earliestOffset = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--partition", "0", + "--offset", "earliest" + }; + ConsoleConsumerOptions config = new ConsoleConsumerOptions(earliestOffset); + assertEquals(ListOffsetsRequest.EARLIEST_TIMESTAMP, config.offsetArg()); + } finally { + Exit.resetExitProcedure(); + } + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java new file mode 100644 index 00000000000..008893f9c50 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.util.MockTime; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.PrintStream; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class ConsoleConsumerTest { + + @BeforeEach + public void setup() { + ConsoleConsumer.messageCount = 0; + } + + @Test + public void shouldThrowTimeoutExceptionWhenTimeoutIsReached() { + String topic = "test"; + final Time time = new MockTime(); + final int timeoutMs = 1000; + + @SuppressWarnings("unchecked") + Consumer mockConsumer = mock(Consumer.class); + + when(mockConsumer.poll(Duration.ofMillis(timeoutMs))).thenAnswer(invocation -> { + time.sleep(timeoutMs / 2 + 1); + return ConsumerRecords.EMPTY; + }); + + ConsoleConsumer.ConsumerWrapper consumer = new ConsoleConsumer.ConsumerWrapper( + Optional.of(topic), + OptionalInt.empty(), + OptionalLong.empty(), + Optional.empty(), + mockConsumer, + timeoutMs + ); + + assertThrows(TimeoutException.class, consumer::receive); + } + + @Test + public void shouldResetUnConsumedOffsetsBeforeExit() { + String topic = "test"; + int maxMessages = 123; + int totalMessages = 700; + long startOffset = 0L; + + MockConsumer mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + TopicPartition tp1 = new TopicPartition(topic, 0); + TopicPartition tp2 = new TopicPartition(topic, 1); + + ConsoleConsumer.ConsumerWrapper consumer = new ConsoleConsumer.ConsumerWrapper( + Optional.of(topic), + OptionalInt.empty(), + OptionalLong.empty(), + Optional.empty(), + mockConsumer, + 1000L); + + mockConsumer.rebalance(Arrays.asList(tp1, tp2)); + Map offsets = new HashMap<>(); + offsets.put(tp1, startOffset); + offsets.put(tp2, startOffset); + mockConsumer.updateBeginningOffsets(offsets); + + for (int i = 0; i < totalMessages; i++) { + // add all records, each partition should have half of `totalMessages` + mockConsumer.addRecord(new ConsumerRecord<>(topic, i % 2, i / 2, "key".getBytes(), "value".getBytes())); + } + + MessageFormatter formatter = mock(MessageFormatter.class); + + ConsoleConsumer.process(maxMessages, formatter, consumer, System.out, false); + assertEquals(totalMessages, mockConsumer.position(tp1) + mockConsumer.position(tp2)); + + consumer.resetUnconsumedOffsets(); + assertEquals(maxMessages, mockConsumer.position(tp1) + mockConsumer.position(tp2)); + + verify(formatter, times(maxMessages)).writeTo(any(), any()); + consumer.cleanup(); + } + + @Test + public void shouldLimitReadsToMaxMessageLimit() { + ConsoleConsumer.ConsumerWrapper consumer = mock(ConsoleConsumer.ConsumerWrapper.class); + MessageFormatter formatter = mock(MessageFormatter.class); + ConsumerRecord record = new ConsumerRecord<>("foo", 1, 1, new byte[0], new byte[0]); + + int messageLimit = 10; + when(consumer.receive()).thenReturn(record); + + ConsoleConsumer.process(messageLimit, formatter, consumer, System.out, true); + + verify(consumer, times(messageLimit)).receive(); + verify(formatter, times(messageLimit)).writeTo(any(), any()); + + consumer.cleanup(); + } + + @Test + public void shouldStopWhenOutputCheckErrorFails() { + ConsoleConsumer.ConsumerWrapper consumer = mock(ConsoleConsumer.ConsumerWrapper.class); + MessageFormatter formatter = mock(MessageFormatter.class); + PrintStream printStream = mock(PrintStream.class); + + ConsumerRecord record = new ConsumerRecord<>("foo", 1, 1, new byte[0], new byte[0]); + + when(consumer.receive()).thenReturn(record); + //Simulate an error on System.out after the first record has been printed + when(printStream.checkError()).thenReturn(true); + + ConsoleConsumer.process(-1, formatter, consumer, printStream, true); + + verify(formatter).writeTo(any(), eq(printStream)); + verify(consumer).receive(); + verify(printStream).checkError(); + + consumer.cleanup(); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldSeekWhenOffsetIsSet() { + Consumer mockConsumer = mock(Consumer.class); + TopicPartition tp0 = new TopicPartition("test", 0); + + ConsoleConsumer.ConsumerWrapper consumer = new ConsoleConsumer.ConsumerWrapper( + Optional.of(tp0.topic()), + OptionalInt.of(tp0.partition()), + OptionalLong.empty(), + Optional.empty(), + mockConsumer, + 1000L); + + verify(mockConsumer).assign(eq(Collections.singletonList(tp0))); + verify(mockConsumer).seekToEnd(eq(Collections.singletonList(tp0))); + consumer.cleanup(); + reset(mockConsumer); + + consumer = new ConsoleConsumer.ConsumerWrapper( + Optional.of(tp0.topic()), + OptionalInt.of(tp0.partition()), + OptionalLong.of(123L), + Optional.empty(), + mockConsumer, + 1000L); + + verify(mockConsumer).assign(eq(Collections.singletonList(tp0))); + verify(mockConsumer).seek(eq(tp0), eq(123L)); + consumer.cleanup(); + reset(mockConsumer); + + consumer = new ConsoleConsumer.ConsumerWrapper( + Optional.of(tp0.topic()), + OptionalInt.of(tp0.partition()), + OptionalLong.of(ListOffsetsRequest.EARLIEST_TIMESTAMP), + Optional.empty(), + mockConsumer, + 1000L); + + verify(mockConsumer).assign(eq(Collections.singletonList(tp0))); + verify(mockConsumer).seekToBeginning(eq(Collections.singletonList(tp0))); + consumer.cleanup(); + reset(mockConsumer); + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/DefaultMessageFormatterTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/DefaultMessageFormatterTest.java new file mode 100644 index 00000000000..917d44d3a4f --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/DefaultMessageFormatterTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.serialization.Deserializer; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class DefaultMessageFormatterTest { + + @Test + public void testDefaultMessageFormatter() { + ConsumerRecord record = new ConsumerRecord<>("topic", 0, 123, "key".getBytes(), "value".getBytes()); + MessageFormatter formatter = new DefaultMessageFormatter(); + Map configs = new HashMap<>(); + + formatter.configure(configs); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + formatter.writeTo(record, new PrintStream(out)); + assertEquals("value\n", out.toString()); + + configs.put("print.key", "true"); + formatter.configure(configs); + out = new ByteArrayOutputStream(); + formatter.writeTo(record, new PrintStream(out)); + assertEquals("key\tvalue\n", out.toString()); + + configs.put("print.partition", "true"); + formatter.configure(configs); + out = new ByteArrayOutputStream(); + formatter.writeTo(record, new PrintStream(out)); + assertEquals("Partition:0\tkey\tvalue\n", out.toString()); + + configs.put("print.timestamp", "true"); + formatter.configure(configs); + out = new ByteArrayOutputStream(); + formatter.writeTo(record, new PrintStream(out)); + assertEquals("NO_TIMESTAMP\tPartition:0\tkey\tvalue\n", out.toString()); + + configs.put("print.offset", "true"); + formatter.configure(configs); + out = new ByteArrayOutputStream(); + formatter.writeTo(record, new PrintStream(out)); + assertEquals("NO_TIMESTAMP\tPartition:0\tOffset:123\tkey\tvalue\n", out.toString()); + + configs.put("print.headers", "true"); + formatter.configure(configs); + out = new ByteArrayOutputStream(); + formatter.writeTo(record, new PrintStream(out)); + assertEquals("NO_TIMESTAMP\tPartition:0\tOffset:123\tNO_HEADERS\tkey\tvalue\n", out.toString()); + + RecordHeaders headers = new RecordHeaders(); + headers.add("h1", "v1".getBytes()); + headers.add("h2", "v2".getBytes()); + record = new ConsumerRecord<>("topic", 0, 123, 123L, TimestampType.CREATE_TIME, -1, -1, "key".getBytes(), "value".getBytes(), + headers, Optional.empty()); + out = new ByteArrayOutputStream(); + formatter.writeTo(record, new PrintStream(out)); + assertEquals("CreateTime:123\tPartition:0\tOffset:123\th1:v1,h2:v2\tkey\tvalue\n", out.toString()); + + configs.put("print.value", "false"); + formatter.configure(configs); + out = new ByteArrayOutputStream(); + formatter.writeTo(record, new PrintStream(out)); + assertEquals("CreateTime:123\tPartition:0\tOffset:123\th1:v1,h2:v2\tkey\n", out.toString()); + + configs.put("key.separator", ""); + formatter.configure(configs); + out = new ByteArrayOutputStream(); + formatter.writeTo(record, new PrintStream(out)); + assertEquals("CreateTime:123Partition:0Offset:123h1:v1,h2:v2key\n", out.toString()); + + configs.put("line.separator", ""); + formatter.configure(configs); + out = new ByteArrayOutputStream(); + formatter.writeTo(record, new PrintStream(out)); + assertEquals("CreateTime:123Partition:0Offset:123h1:v1,h2:v2key", out.toString()); + + configs.put("headers.separator", "|"); + formatter.configure(configs); + out = new ByteArrayOutputStream(); + formatter.writeTo(record, new PrintStream(out)); + assertEquals("CreateTime:123Partition:0Offset:123h1:v1|h2:v2key", out.toString()); + + record = new ConsumerRecord<>("topic", 0, 123, 123L, TimestampType.CREATE_TIME, -1, -1, "key".getBytes(), "value".getBytes(), + headers, Optional.empty()); + + configs.put("key.deserializer", UpperCaseDeserializer.class.getName()); + formatter.configure(configs); + out = new ByteArrayOutputStream(); + formatter.writeTo(record, new PrintStream(out)); + assertEquals("CreateTime:123Partition:0Offset:123h1:v1|h2:v2KEY", out.toString()); + + configs.put("headers.deserializer", UpperCaseDeserializer.class.getName()); + formatter.configure(configs); + out = new ByteArrayOutputStream(); + formatter.writeTo(record, new PrintStream(out)); + assertEquals("CreateTime:123Partition:0Offset:123h1:V1|h2:V2KEY", out.toString()); + + record = new ConsumerRecord<>("topic", 0, 123, 123L, TimestampType.CREATE_TIME, -1, -1, "key".getBytes(), null, + headers, Optional.empty()); + + configs.put("print.value", "true"); + configs.put("null.literal", ""); + formatter.configure(configs); + out = new ByteArrayOutputStream(); + formatter.writeTo(record, new PrintStream(out)); + assertEquals("CreateTime:123Partition:0Offset:123h1:V1|h2:V2KEY", out.toString()); + formatter.close(); + } + + static class UpperCaseDeserializer implements Deserializer { + + @Override + public String deserialize(String topic, byte[] data) { + return new String(data, StandardCharsets.UTF_8).toUpperCase(Locale.ROOT); + } + } +} + diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/NoOpMessageFormatterTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/NoOpMessageFormatterTest.java new file mode 100644 index 00000000000..1652a484d10 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/NoOpMessageFormatterTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.HashMap; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class NoOpMessageFormatterTest { + + @Test + public void testNoOpMessageFormatter() { + ConsumerRecord record = new ConsumerRecord<>("topic", 0, 123, "key".getBytes(), "value".getBytes()); + try (MessageFormatter formatter = new NoOpMessageFormatter()) { + formatter.configure(new HashMap<>()); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + formatter.writeTo(record, new PrintStream(out)); + assertEquals("", out.toString()); + } + } +}