KAFKA-14576: Move ConsoleConsumer to tools (#15274)

Reviewers: Josep Prat <josep.prat@aiven.io>, Omnia Ibrahim <o.g.h.ibrahim@gmail.com>
This commit is contained in:
Mickael Maison 2024-02-13 19:24:07 +01:00 committed by GitHub
parent 8c0488b887
commit 0bf830fc9c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 1996 additions and 1641 deletions

View File

@ -18,4 +18,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.consumer.ConsoleConsumer "$@"

View File

@ -16,5 +16,5 @@ rem limitations under the License.
SetLocal
set KAFKA_HEAP_OPTS=-Xmx512M
"%~dp0kafka-run-class.bat" kafka.tools.ConsoleConsumer %*
"%~dp0kafka-run-class.bat" org.apache.kafka.tools.consumer.ConsoleConsumer %*
EndLocal

View File

@ -321,6 +321,17 @@
<allow pkg="kafka.utils" />
<allow pkg="scala.collection" />
<subpackage name="consumer">
<allow pkg="org.apache.kafka.tools"/>
<subpackage name="group">
<allow pkg="kafka.api"/>
<allow pkg="org.apache.kafka.tools"/>
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="scala"/>
</subpackage>
</subpackage>
<subpackage name="reassign">
<allow pkg="org.apache.kafka.admin"/>
<allow pkg="org.apache.kafka.tools"/>
@ -331,13 +342,6 @@
<allow pkg="scala" />
</subpackage>
<subpackage name="consumer.group">
<allow pkg="kafka.api"/>
<allow pkg="org.apache.kafka.tools"/>
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="scala"/>
</subpackage>
<subpackage name="other">
<allow pkg="org.apache.kafka.tools.reassign"/>
<allow pkg="kafka.log" />

View File

@ -273,11 +273,11 @@
<suppress checks="ClassDataAbstractionCoupling"
files="VerifiableConsumer.java"/>
<suppress checks="CyclomaticComplexity"
files="(StreamsResetter|ProducerPerformance|Agent).java"/>
files="(ConsoleConsumer|DefaultMessageFormatter|StreamsResetter|ProducerPerformance|Agent).java"/>
<suppress checks="BooleanExpressionComplexity"
files="StreamsResetter.java"/>
<suppress checks="NPathComplexity"
files="(ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier|ReplicaVerificationTool).java"/>
files="(DefaultMessageFormatter|ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier|ReplicaVerificationTool).java"/>
<suppress checks="ImportControl"
files="SignalLogger.java"/>
<suppress checks="IllegalImport"

View File

@ -1,647 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import java.io.PrintStream
import java.nio.charset.StandardCharsets
import java.time.Duration
import java.util.concurrent.CountDownLatch
import java.util.regex.Pattern
import java.util.{Collections, Locale, Map, Optional, Properties, Random}
import com.typesafe.scalalogging.LazyLogging
import joptsimple._
import kafka.utils.Implicits._
import kafka.utils.{Exit, _}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecord, KafkaConsumer}
import org.apache.kafka.common.{MessageFormatter, TopicPartition}
import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException, WakeupException}
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.requests.ListOffsetsRequest
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
/**
* Consumer that dumps messages to standard out.
*/
object ConsoleConsumer extends Logging {
var messageCount = 0
private val shutdownLatch = new CountDownLatch(1)
def main(args: Array[String]): Unit = {
val conf = new ConsumerConfig(args)
try {
run(conf)
} catch {
case e: AuthenticationException =>
error("Authentication failed: terminating consumer process", e)
Exit.exit(1)
case e: Throwable =>
error("Unknown error when running consumer: ", e)
Exit.exit(1)
}
}
def run(conf: ConsumerConfig): Unit = {
val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs.toLong else Long.MaxValue
val consumer = new KafkaConsumer(consumerProps(conf), new ByteArrayDeserializer, new ByteArrayDeserializer)
val consumerWrapper =
if (conf.partitionArg.isDefined)
new ConsumerWrapper(Option(conf.topicArg), conf.partitionArg, Option(conf.offsetArg), None, consumer, timeoutMs)
else
new ConsumerWrapper(Option(conf.topicArg), None, None, Option(conf.includedTopicsArg), consumer, timeoutMs)
addShutdownHook(consumerWrapper, conf)
try process(conf.maxMessages, conf.formatter, consumerWrapper, System.out, conf.skipMessageOnError)
finally {
consumerWrapper.cleanup()
conf.formatter.close()
reportRecordCount()
shutdownLatch.countDown()
}
}
def addShutdownHook(consumer: ConsumerWrapper, conf: ConsumerConfig): Unit = {
Exit.addShutdownHook("consumer-shutdown-hook", {
consumer.wakeup()
shutdownLatch.await()
if (conf.enableSystestEventsLogging) {
System.out.println("shutdown_complete")
}
})
}
def process(maxMessages: Integer, formatter: MessageFormatter, consumer: ConsumerWrapper, output: PrintStream,
skipMessageOnError: Boolean): Unit = {
while (messageCount < maxMessages || maxMessages == -1) {
val msg: ConsumerRecord[Array[Byte], Array[Byte]] = try {
consumer.receive()
} catch {
case _: WakeupException =>
trace("Caught WakeupException because consumer is shutdown, ignore and terminate.")
// Consumer will be closed
return
case e: Throwable =>
error("Error processing message, terminating consumer process: ", e)
// Consumer will be closed
return
}
messageCount += 1
try {
formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp, msg.timestampType,
0, 0, msg.key, msg.value, msg.headers, Optional.empty[Integer]), output)
} catch {
case e: Throwable =>
if (skipMessageOnError) {
error("Error processing message, skipping this message: ", e)
} else {
// Consumer will be closed
throw e
}
}
if (checkErr(output, formatter)) {
// Consumer will be closed
return
}
}
}
def reportRecordCount(): Unit = {
System.err.println(s"Processed a total of $messageCount messages")
}
def checkErr(output: PrintStream, formatter: MessageFormatter): Boolean = {
val gotError = output.checkError()
if (gotError) {
// This means no one is listening to our output stream anymore, time to shutdown
System.err.println("Unable to write to standard out, closing consumer.")
}
gotError
}
private[tools] def consumerProps(config: ConsumerConfig): Properties = {
val props = new Properties
props ++= config.consumerProps
props ++= config.extraConsumerProps
setAutoOffsetResetValue(config, props)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
if (props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG) == null)
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "console-consumer")
CommandLineUtils.maybeMergeOptions(
props, ConsumerConfig.ISOLATION_LEVEL_CONFIG, config.options, config.isolationLevelOpt)
props
}
/**
* Used by consumerProps to retrieve the correct value for the consumer parameter 'auto.offset.reset'.
*
* Order of priority is:
* 1. Explicitly set parameter via --consumer.property command line parameter
* 2. Explicit --from-beginning given -> 'earliest'
* 3. Default value of 'latest'
*
* In case both --from-beginning and an explicit value are specified an error is thrown if these
* are conflicting.
*/
def setAutoOffsetResetValue(config: ConsumerConfig, props: Properties): Unit = {
val (earliestConfigValue, latestConfigValue) = ("earliest", "latest")
if (props.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
// auto.offset.reset parameter was specified on the command line
val autoResetOption = props.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
if (config.options.has(config.resetBeginningOpt) && earliestConfigValue != autoResetOption) {
// conflicting options - latest und earliest, throw an error
System.err.println(s"Can't simultaneously specify --from-beginning and 'auto.offset.reset=$autoResetOption', " +
"please remove one option")
Exit.exit(1)
}
// nothing to do, checking for valid parameter values happens later and the specified
// value was already copied during .putall operation
} else {
// no explicit value for auto.offset.reset was specified
// if --from-beginning was specified use earliest, otherwise default to latest
val autoResetOption = if (config.options.has(config.resetBeginningOpt)) earliestConfigValue else latestConfigValue
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoResetOption)
}
}
class ConsumerConfig(args: Array[String]) extends CommandDefaultOptions(args) {
val topicOpt = parser.accepts("topic", "The topic to consume on.")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
val whitelistOpt = parser.accepts("whitelist",
"DEPRECATED, use --include instead; ignored if --include specified. Regular expression specifying list of topics to include for consumption.")
.withRequiredArg
.describedAs("Java regex (String)")
.ofType(classOf[String])
val includeOpt = parser.accepts("include",
"Regular expression specifying list of topics to include for consumption.")
.withRequiredArg
.describedAs("Java regex (String)")
.ofType(classOf[String])
val partitionIdOpt = parser.accepts("partition", "The partition to consume from. Consumption " +
"starts from the end of the partition unless '--offset' is specified.")
.withRequiredArg
.describedAs("partition")
.ofType(classOf[java.lang.Integer])
val offsetOpt = parser.accepts("offset", "The offset to consume from (a non-negative number), or 'earliest' which means from beginning, or 'latest' which means from end")
.withRequiredArg
.describedAs("consume offset")
.ofType(classOf[String])
.defaultsTo("latest")
val consumerPropertyOpt = parser.accepts("consumer-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.")
.withRequiredArg
.describedAs("consumer_prop")
.ofType(classOf[String])
val consumerConfigOpt = parser.accepts("consumer.config", s"Consumer config properties file. Note that $consumerPropertyOpt takes precedence over this config.")
.withRequiredArg
.describedAs("config file")
.ofType(classOf[String])
val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.")
.withRequiredArg
.describedAs("class")
.ofType(classOf[String])
.defaultsTo(classOf[DefaultMessageFormatter].getName)
val messageFormatterArgOpt = parser.accepts("property",
"""The properties to initialize the message formatter. Default properties include:
| print.timestamp=true|false
| print.key=true|false
| print.offset=true|false
| print.partition=true|false
| print.headers=true|false
| print.value=true|false
| key.separator=<key.separator>
| line.separator=<line.separator>
| headers.separator=<line.separator>
| null.literal=<null.literal>
| key.deserializer=<key.deserializer>
| value.deserializer=<value.deserializer>
| header.deserializer=<header.deserializer>
|
|Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers."""
.stripMargin)
.withRequiredArg
.describedAs("prop")
.ofType(classOf[String])
val messageFormatterConfigOpt = parser.accepts("formatter-config", s"Config properties file to initialize the message formatter. Note that $messageFormatterArgOpt takes precedence over this config.")
.withRequiredArg
.describedAs("config file")
.ofType(classOf[String])
val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " +
"start with the earliest message present in the log rather than the latest message.")
val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.")
.withRequiredArg
.describedAs("num_messages")
.ofType(classOf[java.lang.Integer])
val timeoutMsOpt = parser.accepts("timeout-ms", "If specified, exit if no message is available for consumption for the specified interval.")
.withRequiredArg
.describedAs("timeout_ms")
.ofType(classOf[java.lang.Integer])
val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
"skip it instead of halt.")
val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The server(s) to connect to.")
.withRequiredArg
.describedAs("server to connect to")
.ofType(classOf[String])
val keyDeserializerOpt = parser.accepts("key-deserializer")
.withRequiredArg
.describedAs("deserializer for key")
.ofType(classOf[String])
val valueDeserializerOpt = parser.accepts("value-deserializer")
.withRequiredArg
.describedAs("deserializer for values")
.ofType(classOf[String])
val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events",
"Log lifecycle events of the consumer in addition to logging consumed " +
"messages. (This is specific for system tests.)")
val isolationLevelOpt = parser.accepts("isolation-level",
"Set to read_committed in order to filter out transactional messages which are not committed. Set to read_uncommitted " +
"to read all messages.")
.withRequiredArg()
.ofType(classOf[String])
.defaultsTo("read_uncommitted")
val groupIdOpt = parser.accepts("group", "The consumer group id of the consumer.")
.withRequiredArg
.describedAs("consumer group id")
.ofType(classOf[String])
options = tryParse(parser, args)
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read data from Kafka topics and outputs it to standard output.")
var groupIdPassed = true
val enableSystestEventsLogging = options.has(enableSystestEventsLoggingOpt)
// topic must be specified.
var topicArg: String = _
var includedTopicsArg: String = _
val extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt))
val consumerProps = if (options.has(consumerConfigOpt))
Utils.loadProps(options.valueOf(consumerConfigOpt))
else
new Properties()
val fromBeginning = options.has(resetBeginningOpt)
val partitionArg = if (options.has(partitionIdOpt)) Some(options.valueOf(partitionIdOpt).intValue) else None
val skipMessageOnError = options.has(skipMessageOnErrorOpt)
val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
val formatterArgs = if (options.has(messageFormatterConfigOpt))
Utils.loadProps(options.valueOf(messageFormatterConfigOpt))
else
new Properties()
formatterArgs ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt))
val maxMessages = if (options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
val timeoutMs = if (options.has(timeoutMsOpt)) options.valueOf(timeoutMsOpt).intValue else -1
val bootstrapServer = options.valueOf(bootstrapServerOpt)
val keyDeserializer = options.valueOf(keyDeserializerOpt)
val valueDeserializer = options.valueOf(valueDeserializerOpt)
val formatter: MessageFormatter = messageFormatterClass.getDeclaredConstructor().newInstance().asInstanceOf[MessageFormatter]
if (keyDeserializer != null && keyDeserializer.nonEmpty) {
formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer)
}
if (valueDeserializer != null && valueDeserializer.nonEmpty) {
formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
}
formatter.configure(formatterArgs.asScala.asJava)
topicArg = options.valueOf(topicOpt)
includedTopicsArg = if (options.has(includeOpt))
options.valueOf(includeOpt)
else
options.valueOf(whitelistOpt)
val topicOrFilterArgs = List(topicArg, includedTopicsArg).filterNot(_ == null)
// user need to specify value for either --topic or one of the include filters options (--include or --whitelist)
if (topicOrFilterArgs.size != 1)
CommandLineUtils.printUsageAndExit(parser, s"Exactly one of --include/--topic is required. " +
s"${if (options.has(whitelistOpt)) "--whitelist is DEPRECATED use --include instead; ignored if --include specified."}")
if (partitionArg.isDefined) {
if (!options.has(topicOpt))
CommandLineUtils.printUsageAndExit(parser, "The topic is required when partition is specified.")
if (fromBeginning && options.has(offsetOpt))
CommandLineUtils.printUsageAndExit(parser, "Options from-beginning and offset cannot be specified together.")
} else if (options.has(offsetOpt))
CommandLineUtils.printUsageAndExit(parser, "The partition is required when offset is specified.")
def invalidOffset(offset: String): Nothing =
ToolsUtils.printUsageAndExit(parser, s"The provided offset value '$offset' is incorrect. Valid values are " +
"'earliest', 'latest', or a non-negative long.")
val offsetArg =
if (options.has(offsetOpt)) {
options.valueOf(offsetOpt).toLowerCase(Locale.ROOT) match {
case "earliest" => ListOffsetsRequest.EARLIEST_TIMESTAMP
case "latest" => ListOffsetsRequest.LATEST_TIMESTAMP
case offsetString =>
try {
val offset = offsetString.toLong
if (offset < 0)
invalidOffset(offsetString)
offset
} catch {
case _: NumberFormatException => invalidOffset(offsetString)
}
}
}
else if (fromBeginning) ListOffsetsRequest.EARLIEST_TIMESTAMP
else ListOffsetsRequest.LATEST_TIMESTAMP
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
// if the group id is provided in more than place (through different means) all values must be the same
val groupIdsProvided = Set(
Option(options.valueOf(groupIdOpt)), // via --group
Option(consumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)), // via --consumer-property
Option(extraConsumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)) // via --consumer.config
).flatten
if (groupIdsProvided.size > 1) {
CommandLineUtils.printUsageAndExit(parser, "The group ids provided in different places (directly using '--group', "
+ "via '--consumer-property', or via '--consumer.config') do not match. "
+ s"Detected group ids: ${groupIdsProvided.mkString("'", "', '", "'")}")
}
groupIdsProvided.headOption match {
case Some(group) =>
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, group)
case None =>
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"console-consumer-${new Random().nextInt(100000)}")
// By default, avoid unnecessary expansion of the coordinator cache since
// the auto-generated group and its offsets is not intended to be used again
if (!consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
groupIdPassed = false
}
if (groupIdPassed && partitionArg.isDefined)
CommandLineUtils.printUsageAndExit(parser, "Options group and partition cannot be specified together.")
def tryParse(parser: OptionParser, args: Array[String]): OptionSet = {
try
parser.parse(args: _*)
catch {
case e: OptionException =>
ToolsUtils.printUsageAndExit(parser, e.getMessage)
}
}
}
private[tools] class ConsumerWrapper(
topic: Option[String],
partitionId: Option[Int],
offset: Option[Long],
includedTopics: Option[String],
consumer: Consumer[Array[Byte], Array[Byte]],
timeoutMs: Long = Long.MaxValue,
time: Time = Time.SYSTEM
) {
consumerInit()
var recordIter = Collections.emptyList[ConsumerRecord[Array[Byte], Array[Byte]]]().iterator()
def consumerInit(): Unit = {
(topic, partitionId, offset, includedTopics) match {
case (Some(topic), Some(partitionId), Some(offset), None) =>
seek(topic, partitionId, offset)
case (Some(topic), Some(partitionId), None, None) =>
// default to latest if no offset is provided
seek(topic, partitionId, ListOffsetsRequest.LATEST_TIMESTAMP)
case (Some(topic), None, None, None) =>
consumer.subscribe(Collections.singletonList(topic))
case (None, None, None, Some(include)) =>
consumer.subscribe(Pattern.compile(include))
case _ =>
throw new IllegalArgumentException("An invalid combination of arguments is provided. " +
"Exactly one of 'topic' or 'include' must be provided. " +
"If 'topic' is provided, an optional 'partition' may also be provided. " +
"If 'partition' is provided, an optional 'offset' may also be provided, otherwise, consumption starts from the end of the partition.")
}
}
def seek(topic: String, partitionId: Int, offset: Long): Unit = {
val topicPartition = new TopicPartition(topic, partitionId)
consumer.assign(Collections.singletonList(topicPartition))
offset match {
case ListOffsetsRequest.EARLIEST_TIMESTAMP => consumer.seekToBeginning(Collections.singletonList(topicPartition))
case ListOffsetsRequest.LATEST_TIMESTAMP => consumer.seekToEnd(Collections.singletonList(topicPartition))
case _ => consumer.seek(topicPartition, offset)
}
}
def resetUnconsumedOffsets(): Unit = {
val smallestUnconsumedOffsets = collection.mutable.Map[TopicPartition, Long]()
while (recordIter.hasNext) {
val record = recordIter.next()
val tp = new TopicPartition(record.topic, record.partition)
// avoid auto-committing offsets which haven't been consumed
smallestUnconsumedOffsets.getOrElseUpdate(tp, record.offset)
}
smallestUnconsumedOffsets.forKeyValue { (tp, offset) => consumer.seek(tp, offset) }
}
def receive(): ConsumerRecord[Array[Byte], Array[Byte]] = {
val startTimeMs = time.milliseconds
while (!recordIter.hasNext) {
recordIter = consumer.poll(Duration.ofMillis(timeoutMs)).iterator
if (!recordIter.hasNext && (time.milliseconds - startTimeMs > timeoutMs)) {
throw new TimeoutException()
}
}
recordIter.next
}
def wakeup(): Unit = {
this.consumer.wakeup()
}
def cleanup(): Unit = {
resetUnconsumedOffsets()
this.consumer.close()
}
}
}
class DefaultMessageFormatter extends MessageFormatter {
var printTimestamp = false
var printKey = false
var printValue = true
var printPartition = false
var printOffset = false
var printHeaders = false
var keySeparator = utfBytes("\t")
var lineSeparator = utfBytes("\n")
var headersSeparator = utfBytes(",")
var nullLiteral = utfBytes("null")
var keyDeserializer: Option[Deserializer[_]] = None
var valueDeserializer: Option[Deserializer[_]] = None
var headersDeserializer: Option[Deserializer[_]] = None
override def configure(configs: Map[String, _]): Unit = {
getPropertyIfExists(configs, "print.timestamp", getBoolProperty).foreach(printTimestamp = _)
getPropertyIfExists(configs, "print.key", getBoolProperty).foreach(printKey = _)
getPropertyIfExists(configs, "print.offset", getBoolProperty).foreach(printOffset = _)
getPropertyIfExists(configs, "print.partition", getBoolProperty).foreach(printPartition = _)
getPropertyIfExists(configs, "print.headers", getBoolProperty).foreach(printHeaders = _)
getPropertyIfExists(configs, "print.value", getBoolProperty).foreach(printValue = _)
getPropertyIfExists(configs, "key.separator", getByteProperty).foreach(keySeparator = _)
getPropertyIfExists(configs, "line.separator", getByteProperty).foreach(lineSeparator = _)
getPropertyIfExists(configs, "headers.separator", getByteProperty).foreach(headersSeparator = _)
getPropertyIfExists(configs, "null.literal", getByteProperty).foreach(nullLiteral = _)
keyDeserializer = getPropertyIfExists(configs, "key.deserializer", getDeserializerProperty(true))
valueDeserializer = getPropertyIfExists(configs, "value.deserializer", getDeserializerProperty(false))
headersDeserializer = getPropertyIfExists(configs, "headers.deserializer", getDeserializerProperty(false))
}
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
def writeSeparator(columnSeparator: Boolean): Unit = {
if (columnSeparator)
output.write(keySeparator)
else
output.write(lineSeparator)
}
def deserialize(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String) = {
val nonNullBytes = Option(sourceBytes).getOrElse(nullLiteral)
val convertedBytes = deserializer
.map(d => utfBytes(d.deserialize(topic, consumerRecord.headers, nonNullBytes).toString))
.getOrElse(nonNullBytes)
convertedBytes
}
import consumerRecord._
if (printTimestamp) {
if (timestampType != TimestampType.NO_TIMESTAMP_TYPE)
output.write(utfBytes(s"$timestampType:$timestamp"))
else
output.write(utfBytes("NO_TIMESTAMP"))
writeSeparator(columnSeparator = printOffset || printPartition || printHeaders || printKey || printValue)
}
if (printPartition) {
output.write(utfBytes("Partition:"))
output.write(utfBytes(partition().toString))
writeSeparator(columnSeparator = printOffset || printHeaders || printKey || printValue)
}
if (printOffset) {
output.write(utfBytes("Offset:"))
output.write(utfBytes(offset().toString))
writeSeparator(columnSeparator = printHeaders || printKey || printValue)
}
if (printHeaders) {
val headersIt = headers().iterator.asScala
if (headersIt.hasNext) {
headersIt.foreach { header =>
output.write(utfBytes(header.key() + ":"))
output.write(deserialize(headersDeserializer, header.value(), topic))
if (headersIt.hasNext) {
output.write(headersSeparator)
}
}
} else {
output.write(utfBytes("NO_HEADERS"))
}
writeSeparator(columnSeparator = printKey || printValue)
}
if (printKey) {
output.write(deserialize(keyDeserializer, key, topic))
writeSeparator(columnSeparator = printValue)
}
if (printValue) {
output.write(deserialize(valueDeserializer, value, topic))
output.write(lineSeparator)
}
}
private def propertiesWithKeyPrefixStripped(prefix: String, configs: Map[String, _]): Map[String, _] = {
val newConfigs = collection.mutable.Map[String, Any]()
configs.asScala.foreach { case (key, value) =>
if (key.startsWith(prefix) && key.length > prefix.length)
newConfigs.put(key.substring(prefix.length), value)
}
newConfigs.asJava
}
private def utfBytes(str: String) = str.getBytes(StandardCharsets.UTF_8)
private def getByteProperty(configs: Map[String, _], key: String): Array[Byte] = {
utfBytes(configs.get(key).asInstanceOf[String])
}
private def getBoolProperty(configs: Map[String, _], key: String): Boolean = {
configs.get(key).asInstanceOf[String].trim.equalsIgnoreCase("true")
}
private def getDeserializerProperty(isKey: Boolean)(configs: Map[String, _], propertyName: String): Deserializer[_] = {
val deserializer = Class.forName(configs.get(propertyName).asInstanceOf[String]).getDeclaredConstructor().newInstance().asInstanceOf[Deserializer[_]]
val deserializerConfig = propertiesWithKeyPrefixStripped(propertyName + ".", configs)
.asScala
.asJava
deserializer.configure(deserializerConfig, isKey)
deserializer
}
private def getPropertyIfExists[T](configs: Map[String, _], key: String, getter: (Map[String, _], String) => T): Option[T] = {
if (configs.containsKey(key))
Some(getter(configs, key))
else
None
}
}
class LoggingMessageFormatter extends MessageFormatter with LazyLogging {
private val defaultWriter: DefaultMessageFormatter = new DefaultMessageFormatter
override def configure(configs: Map[String, _]): Unit = defaultWriter.configure(configs)
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
import consumerRecord._
defaultWriter.writeTo(consumerRecord, output)
logger.info({if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) s"$timestampType:$timestamp, " else ""} +
s"key:${if (key == null) "null" else new String(key, StandardCharsets.UTF_8)}, " +
s"value:${if (value == null) "null" else new String(value, StandardCharsets.UTF_8)}")
}
}
class NoOpMessageFormatter extends MessageFormatter {
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {}
}

View File

@ -69,8 +69,7 @@ object ToolsUtils {
/**
* This is a simple wrapper around `CommandLineUtils.printUsageAndExit`.
* It is needed for tools migration (KAFKA-14525), as there is no Java equivalent for return type `Nothing`.
* Can be removed once [[kafka.admin.ConsumerGroupCommand]], [[kafka.tools.ConsoleConsumer]]
* and [[kafka.tools.ConsoleProducer]] are migrated.
* Can be removed once [[kafka.admin.ConsumerGroupCommand]] and [[kafka.tools.ConsoleProducer]] are migrated.
*
* @param parser Command line options parser.
* @param message Error message.

View File

@ -1,65 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import java.io.PrintStream
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.serialization.Deserializer
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.Mockito._
class CustomDeserializer extends Deserializer[String] {
override def deserialize(topic: String, data: Array[Byte]): String = {
assertNotNull(topic, "topic must not be null")
new String(data)
}
override def deserialize(topic: String, headers: Headers, data: Array[Byte]): String = {
println("WITH HEADERS")
new String(data)
}
}
class CustomDeserializerTest {
@Test
def checkFormatterCallDeserializerWithHeaders(): Unit = {
val formatter = new DefaultMessageFormatter()
formatter.valueDeserializer = Some(new CustomDeserializer)
val output = TestUtils.grabConsoleOutput(formatter.writeTo(
new ConsumerRecord("topic_test", 1, 1L, "key".getBytes, "value".getBytes), mock(classOf[PrintStream])))
assertTrue(output.contains("WITH HEADERS"), "DefaultMessageFormatter should call `deserialize` method with headers.")
formatter.close()
}
@Test
def checkDeserializerTopicIsNotNull(): Unit = {
val formatter = new DefaultMessageFormatter()
formatter.keyDeserializer = Some(new CustomDeserializer)
formatter.writeTo(new ConsumerRecord("topic_test", 1, 1L, "key".getBytes, "value".getBytes),
mock(classOf[PrintStream]))
formatter.close()
}
}

View File

@ -1,234 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import java.io.{ByteArrayOutputStream, Closeable, PrintStream}
import java.nio.charset.StandardCharsets
import java.util
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders}
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.serialization.Deserializer
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
import java.util.Optional
import scala.jdk.CollectionConverters._
class DefaultMessageFormatterTest {
import DefaultMessageFormatterTest._
@ParameterizedTest
@MethodSource(Array("parameters"))
def testWriteRecord(name: String, record: ConsumerRecord[Array[Byte], Array[Byte]], properties: Map[String, String], expected: String): Unit = {
withResource(new ByteArrayOutputStream()) { baos =>
withResource(new PrintStream(baos)) { ps =>
val formatter = buildFormatter(properties)
formatter.writeTo(record, ps)
val actual = new String(baos.toByteArray(), StandardCharsets.UTF_8)
assertEquals(expected, actual)
}
}
}
}
object DefaultMessageFormatterTest {
def parameters: java.util.stream.Stream[Arguments] = {
Seq(
Arguments.of(
"print nothing",
consumerRecord(),
Map("print.value" -> "false"),
""),
Arguments.of(
"print key",
consumerRecord(),
Map("print.key" -> "true",
"print.value" -> "false"),
"someKey\n"),
Arguments.of(
"print value",
consumerRecord(),
Map(),
"someValue\n"),
Arguments.of(
"print empty timestamp",
consumerRecord(timestampType = TimestampType.NO_TIMESTAMP_TYPE),
Map("print.timestamp" -> "true",
"print.value" -> "false"),
"NO_TIMESTAMP\n"),
Arguments.of(
"print log append time timestamp",
consumerRecord(timestampType = TimestampType.LOG_APPEND_TIME),
Map("print.timestamp" -> "true",
"print.value" -> "false"),
"LogAppendTime:1234\n"),
Arguments.of(
"print create time timestamp",
consumerRecord(timestampType = TimestampType.CREATE_TIME),
Map("print.timestamp" -> "true",
"print.value" -> "false"),
"CreateTime:1234\n"),
Arguments.of(
"print partition",
consumerRecord(),
Map("print.partition" -> "true",
"print.value" -> "false"),
"Partition:9\n"),
Arguments.of(
"print offset",
consumerRecord(),
Map("print.offset" -> "true",
"print.value" -> "false"),
"Offset:9876\n"),
Arguments.of(
"print headers",
consumerRecord(),
Map("print.headers" -> "true",
"print.value" -> "false"),
"h1:v1,h2:v2\n"),
Arguments.of(
"print empty headers",
consumerRecord(headers = Nil),
Map("print.headers" -> "true",
"print.value" -> "false"),
"NO_HEADERS\n"),
Arguments.of(
"print all possible fields with default delimiters",
consumerRecord(),
Map("print.key" -> "true",
"print.timestamp" -> "true",
"print.partition" -> "true",
"print.offset" -> "true",
"print.headers" -> "true",
"print.value" -> "true"),
"CreateTime:1234\tPartition:9\tOffset:9876\th1:v1,h2:v2\tsomeKey\tsomeValue\n"),
Arguments.of(
"print all possible fields with custom delimiters",
consumerRecord(),
Map("key.separator" -> "|",
"line.separator" -> "^",
"headers.separator" -> "#",
"print.key" -> "true",
"print.timestamp" -> "true",
"print.partition" -> "true",
"print.offset" -> "true",
"print.headers" -> "true",
"print.value" -> "true"),
"CreateTime:1234|Partition:9|Offset:9876|h1:v1#h2:v2|someKey|someValue^"),
Arguments.of(
"print key with custom deserializer",
consumerRecord(),
Map("print.key" -> "true",
"print.headers" -> "true",
"print.value" -> "true",
"key.deserializer" -> "kafka.tools.UpperCaseDeserializer"),
"h1:v1,h2:v2\tSOMEKEY\tsomeValue\n"),
Arguments.of(
"print value with custom deserializer",
consumerRecord(),
Map("print.key" -> "true",
"print.headers" -> "true",
"print.value" -> "true",
"value.deserializer" -> "kafka.tools.UpperCaseDeserializer"),
"h1:v1,h2:v2\tsomeKey\tSOMEVALUE\n"),
Arguments.of(
"print headers with custom deserializer",
consumerRecord(),
Map("print.key" -> "true",
"print.headers" -> "true",
"print.value" -> "true",
"headers.deserializer" -> "kafka.tools.UpperCaseDeserializer"),
"h1:V1,h2:V2\tsomeKey\tsomeValue\n"),
Arguments.of(
"print key and value",
consumerRecord(),
Map("print.key" -> "true",
"print.value" -> "true"),
"someKey\tsomeValue\n"),
Arguments.of(
"print fields in the beginning, middle and the end",
consumerRecord(),
Map("print.key" -> "true",
"print.value" -> "true",
"print.partition" -> "true"),
"Partition:9\tsomeKey\tsomeValue\n"),
Arguments.of(
"null value without custom null literal",
consumerRecord(value = null),
Map("print.key" -> "true"),
"someKey\tnull\n"),
Arguments.of(
"null value with custom null literal",
consumerRecord(value = null),
Map("print.key" -> "true",
"null.literal" -> "NULL"),
"someKey\tNULL\n"),
).asJava.stream()
}
private def buildFormatter(propsToSet: Map[String, String]): DefaultMessageFormatter = {
val formatter = new DefaultMessageFormatter()
formatter.configure(propsToSet.asJava)
formatter
}
private def header(key: String, value: String) = {
new RecordHeader(key, value.getBytes(StandardCharsets.UTF_8))
}
private def consumerRecord(key: String = "someKey",
value: String = "someValue",
headers: Iterable[Header] = Seq(header("h1", "v1"), header("h2", "v2")),
partition: Int = 9,
offset: Long = 9876,
timestamp: Long = 1234,
timestampType: TimestampType = TimestampType.CREATE_TIME) = {
new ConsumerRecord[Array[Byte], Array[Byte]](
"someTopic",
partition,
offset,
timestamp,
timestampType,
0,
0,
if (key == null) null else key.getBytes(StandardCharsets.UTF_8),
if (value == null) null else value.getBytes(StandardCharsets.UTF_8),
new RecordHeaders(headers.asJava),
Optional.empty[Integer])
}
private def withResource[Resource <: Closeable, Result](resource: Resource)(handler: Resource => Result): Result = {
try {
handler(resource)
} finally {
resource.close()
}
}
}
class UpperCaseDeserializer extends Deserializer[String] {
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {}
override def deserialize(topic: String, data: Array[Byte]): String = new String(data, StandardCharsets.UTF_8).toUpperCase
override def close(): Unit = {}
}

View File

@ -1,676 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import java.io.{ByteArrayOutputStream, PrintStream}
import java.util.{HashMap, Optional, Map => JMap}
import java.time.Duration
import kafka.tools.ConsoleConsumer.ConsumerWrapper
import kafka.utils.{Exit, TestUtils}
import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, OffsetResetStrategy}
import org.apache.kafka.common.{MessageFormatter, TopicPartition}
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.test.MockDeserializer
import org.mockito.Mockito._
import org.mockito.ArgumentMatchers
import ArgumentMatchers._
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.header.internals.RecordHeaders
import org.apache.kafka.server.util.MockTime
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test}
import scala.jdk.CollectionConverters._
class ConsoleConsumerTest {
@BeforeEach
def setup(): Unit = {
ConsoleConsumer.messageCount = 0
}
@Test
def shouldThrowTimeoutExceptionWhenTimeoutIsReached(): Unit = {
val topic = "test"
val time = new MockTime
val timeoutMs = 1000
val mockConsumer = mock(classOf[Consumer[Array[Byte], Array[Byte]]])
when(mockConsumer.poll(Duration.ofMillis(timeoutMs))).thenAnswer { _ =>
time.sleep(timeoutMs / 2 + 1)
ConsumerRecords.EMPTY
}
val consumer = new ConsumerWrapper(
topic = Some(topic),
partitionId = None,
offset = None,
includedTopics = None,
consumer = mockConsumer,
timeoutMs = timeoutMs,
time = time
)
assertThrows(classOf[TimeoutException], () => consumer.receive())
}
@Test
def shouldResetUnConsumedOffsetsBeforeExit(): Unit = {
val topic = "test"
val maxMessages: Int = 123
val totalMessages: Int = 700
val startOffset: java.lang.Long = 0L
val mockConsumer = new MockConsumer[Array[Byte], Array[Byte]](OffsetResetStrategy.EARLIEST)
val tp1 = new TopicPartition(topic, 0)
val tp2 = new TopicPartition(topic, 1)
val consumer = new ConsumerWrapper(Some(topic), None, None, None, mockConsumer)
mockConsumer.rebalance(List(tp1, tp2).asJava)
mockConsumer.updateBeginningOffsets(Map(tp1 -> startOffset, tp2 -> startOffset).asJava)
0 until totalMessages foreach { i =>
// add all records, each partition should have half of `totalMessages`
mockConsumer.addRecord(new ConsumerRecord[Array[Byte], Array[Byte]](topic, i % 2, i / 2, "key".getBytes, "value".getBytes))
}
val formatter = mock(classOf[MessageFormatter])
ConsoleConsumer.process(maxMessages, formatter, consumer, System.out, skipMessageOnError = false)
assertEquals(totalMessages, mockConsumer.position(tp1) + mockConsumer.position(tp2))
consumer.resetUnconsumedOffsets()
assertEquals(maxMessages, mockConsumer.position(tp1) + mockConsumer.position(tp2))
verify(formatter, times(maxMessages)).writeTo(any(), any())
}
@Test
def shouldLimitReadsToMaxMessageLimit(): Unit = {
val consumer = mock(classOf[ConsumerWrapper])
val formatter = mock(classOf[MessageFormatter])
val record = new ConsumerRecord("foo", 1, 1, Array[Byte](), Array[Byte]())
val messageLimit: Int = 10
when(consumer.receive()).thenReturn(record)
ConsoleConsumer.process(messageLimit, formatter, consumer, System.out, true)
verify(consumer, times(messageLimit)).receive()
verify(formatter, times(messageLimit)).writeTo(any(), any())
consumer.cleanup()
}
@Test
def shouldStopWhenOutputCheckErrorFails(): Unit = {
val consumer = mock(classOf[ConsumerWrapper])
val formatter = mock(classOf[MessageFormatter])
val printStream = mock(classOf[PrintStream])
val record = new ConsumerRecord("foo", 1, 1, Array[Byte](), Array[Byte]())
when(consumer.receive()).thenReturn(record)
//Simulate an error on System.out after the first record has been printed
when(printStream.checkError()).thenReturn(true)
ConsoleConsumer.process(-1, formatter, consumer, printStream, true)
verify(formatter).writeTo(any(), ArgumentMatchers.eq(printStream))
verify(consumer).receive()
verify(printStream).checkError()
consumer.cleanup()
}
@Test
def shouldParseValidConsumerValidConfig(): Unit = {
//Given
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--from-beginning")
//When
val config = new ConsoleConsumer.ConsumerConfig(args)
//Then
assertEquals("localhost:9092", config.bootstrapServer)
assertEquals("test", config.topicArg)
assertEquals(true, config.fromBeginning)
}
@Test
def shouldParseIncludeArgument(): Unit = {
//Given
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--include", "includeTest*",
"--from-beginning")
//When
val config = new ConsoleConsumer.ConsumerConfig(args)
//Then
assertEquals("localhost:9092", config.bootstrapServer)
assertEquals("includeTest*", config.includedTopicsArg)
assertEquals(true, config.fromBeginning)
}
@Test
def shouldParseWhitelistArgument(): Unit = {
//Given
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--whitelist", "whitelistTest*",
"--from-beginning")
//When
val config = new ConsoleConsumer.ConsumerConfig(args)
//Then
assertEquals("localhost:9092", config.bootstrapServer)
assertEquals("whitelistTest*", config.includedTopicsArg)
assertEquals(true, config.fromBeginning)
}
@Test
def shouldIgnoreWhitelistArgumentIfIncludeSpecified(): Unit = {
//Given
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--include", "includeTest*",
"--whitelist", "whitelistTest*",
"--from-beginning")
//When
val config = new ConsoleConsumer.ConsumerConfig(args)
//Then
assertEquals("localhost:9092", config.bootstrapServer)
assertEquals("includeTest*", config.includedTopicsArg)
assertEquals(true, config.fromBeginning)
}
@Test
def shouldParseValidSimpleConsumerValidConfigWithNumericOffset(): Unit = {
//Given
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--partition", "0",
"--offset", "3")
//When
val config = new ConsoleConsumer.ConsumerConfig(args)
//Then
assertEquals("localhost:9092", config.bootstrapServer)
assertEquals("test", config.topicArg)
assertEquals(0, config.partitionArg.get)
assertEquals(3, config.offsetArg)
assertEquals(false, config.fromBeginning)
}
@Test
def shouldExitOnUnrecognizedNewConsumerOption(): Unit = {
Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
//Given
val args: Array[String] = Array(
"--new-consumer",
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--from-beginning")
try assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args))
finally Exit.resetExitProcedure()
}
@Test
def shouldParseValidSimpleConsumerValidConfigWithStringOffset(): Unit = {
//Given
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--partition", "0",
"--offset", "LatEst",
"--property", "print.value=false")
//When
val config = new ConsoleConsumer.ConsumerConfig(args)
//Then
assertEquals("localhost:9092", config.bootstrapServer)
assertEquals("test", config.topicArg)
assertEquals(0, config.partitionArg.get)
assertEquals(-1, config.offsetArg)
assertEquals(false, config.fromBeginning)
assertEquals(false, config.formatter.asInstanceOf[DefaultMessageFormatter].printValue)
}
@Test
def shouldParseValidConsumerConfigWithAutoOffsetResetLatest(): Unit = {
//Given
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--consumer-property", "auto.offset.reset=latest")
//When
val config = new ConsoleConsumer.ConsumerConfig(args)
val consumerProperties = ConsoleConsumer.consumerProps(config)
//Then
assertEquals("localhost:9092", config.bootstrapServer)
assertEquals("test", config.topicArg)
assertEquals(false, config.fromBeginning)
assertEquals("latest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
}
@Test
def shouldParseValidConsumerConfigWithAutoOffsetResetEarliest(): Unit = {
//Given
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--consumer-property", "auto.offset.reset=earliest")
//When
val config = new ConsoleConsumer.ConsumerConfig(args)
val consumerProperties = ConsoleConsumer.consumerProps(config)
//Then
assertEquals("localhost:9092", config.bootstrapServer)
assertEquals("test", config.topicArg)
assertEquals(false, config.fromBeginning)
assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
}
@Test
def shouldParseValidConsumerConfigWithAutoOffsetResetAndMatchingFromBeginning(): Unit = {
//Given
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--consumer-property", "auto.offset.reset=earliest",
"--from-beginning")
//When
val config = new ConsoleConsumer.ConsumerConfig(args)
val consumerProperties = ConsoleConsumer.consumerProps(config)
//Then
assertEquals("localhost:9092", config.bootstrapServer)
assertEquals("test", config.topicArg)
assertEquals(true, config.fromBeginning)
assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
}
@Test
def shouldParseValidConsumerConfigWithNoOffsetReset(): Unit = {
//Given
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test")
//When
val config = new ConsoleConsumer.ConsumerConfig(args)
val consumerProperties = ConsoleConsumer.consumerProps(config)
//Then
assertEquals("localhost:9092", config.bootstrapServer)
assertEquals("test", config.topicArg)
assertEquals(false, config.fromBeginning)
assertEquals("latest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
}
@Test
def shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginning(): Unit = {
Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
//Given
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--consumer-property", "auto.offset.reset=latest",
"--from-beginning")
try {
val config = new ConsoleConsumer.ConsumerConfig(args)
assertThrows(classOf[IllegalArgumentException], () => ConsoleConsumer.consumerProps(config))
}
finally Exit.resetExitProcedure()
}
@Test
def shouldParseConfigsFromFile(): Unit = {
val propsFile = TestUtils.tempPropertiesFile(Map("request.timeout.ms" -> "1000", "group.id" -> "group1"))
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--consumer.config", propsFile.getAbsolutePath
)
val config = new ConsoleConsumer.ConsumerConfig(args)
assertEquals("1000", config.consumerProps.getProperty("request.timeout.ms"))
assertEquals("group1", config.consumerProps.getProperty("group.id"))
}
@Test
def groupIdsProvidedInDifferentPlacesMustMatch(): Unit = {
Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
// different in all three places
var propsFile = TestUtils.tempPropertiesFile(Map("group.id" -> "group-from-file"))
var args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "group-from-arguments",
"--consumer-property", "group.id=group-from-properties",
"--consumer.config", propsFile.getAbsolutePath
)
assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args))
// the same in all three places
propsFile = TestUtils.tempPropertiesFile(Map("group.id" -> "test-group"))
args = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "test-group",
"--consumer-property", "group.id=test-group",
"--consumer.config", propsFile.getAbsolutePath
)
var config = new ConsoleConsumer.ConsumerConfig(args)
var props = ConsoleConsumer.consumerProps(config)
assertEquals("test-group", props.getProperty("group.id"))
// different via --consumer-property and --consumer.config
propsFile = TestUtils.tempPropertiesFile(Map("group.id" -> "group-from-file"))
args = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--consumer-property", "group.id=group-from-properties",
"--consumer.config", propsFile.getAbsolutePath
)
assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args))
// different via --consumer-property and --group
args = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "group-from-arguments",
"--consumer-property", "group.id=group-from-properties"
)
assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args))
// different via --group and --consumer.config
propsFile = TestUtils.tempPropertiesFile(Map("group.id" -> "group-from-file"))
args = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "group-from-arguments",
"--consumer.config", propsFile.getAbsolutePath
)
assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args))
// via --group only
args = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "group-from-arguments"
)
config = new ConsoleConsumer.ConsumerConfig(args)
props = ConsoleConsumer.consumerProps(config)
assertEquals("group-from-arguments", props.getProperty("group.id"))
Exit.resetExitProcedure()
}
@Test
def testCustomPropertyShouldBePassedToConfigureMethod(): Unit = {
val args = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--property", "print.key=true",
"--property", "key.deserializer=org.apache.kafka.test.MockDeserializer",
"--property", "key.deserializer.my-props=abc"
)
val config = new ConsoleConsumer.ConsumerConfig(args)
assertTrue(config.formatter.isInstanceOf[DefaultMessageFormatter])
assertTrue(config.formatterArgs.containsKey("key.deserializer.my-props"))
val formatter = config.formatter.asInstanceOf[DefaultMessageFormatter]
assertTrue(formatter.keyDeserializer.get.isInstanceOf[MockDeserializer])
val keyDeserializer = formatter.keyDeserializer.get.asInstanceOf[MockDeserializer]
assertEquals(1, keyDeserializer.configs.size)
assertEquals("abc", keyDeserializer.configs.get("my-props"))
assertTrue(keyDeserializer.isKey)
}
@Test
def testCustomConfigShouldBePassedToConfigureMethod(): Unit = {
val propsFile = TestUtils.tempPropertiesFile(Map("key.deserializer.my-props" -> "abc", "print.key" -> "false"))
val args = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--property", "print.key=true",
"--property", "key.deserializer=org.apache.kafka.test.MockDeserializer",
"--formatter-config", propsFile.getAbsolutePath
)
val config = new ConsoleConsumer.ConsumerConfig(args)
assertTrue(config.formatter.isInstanceOf[DefaultMessageFormatter])
assertTrue(config.formatterArgs.containsKey("key.deserializer.my-props"))
val formatter = config.formatter.asInstanceOf[DefaultMessageFormatter]
assertTrue(formatter.keyDeserializer.get.isInstanceOf[MockDeserializer])
val keyDeserializer = formatter.keyDeserializer.get.asInstanceOf[MockDeserializer]
assertEquals(1, keyDeserializer.configs.size)
assertEquals("abc", keyDeserializer.configs.get("my-props"))
assertTrue(keyDeserializer.isKey)
}
@Test
def shouldParseGroupIdFromBeginningGivenTogether(): Unit = {
// Start from earliest
var args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "test-group",
"--from-beginning")
var config = new ConsoleConsumer.ConsumerConfig(args)
assertEquals("localhost:9092", config.bootstrapServer)
assertEquals("test", config.topicArg)
assertEquals(-2, config.offsetArg)
assertEquals(true, config.fromBeginning)
// Start from latest
args = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "test-group"
)
config = new ConsoleConsumer.ConsumerConfig(args)
assertEquals("localhost:9092", config.bootstrapServer)
assertEquals("test", config.topicArg)
assertEquals(-1, config.offsetArg)
assertEquals(false, config.fromBeginning)
}
@Test
def shouldExitOnGroupIdAndPartitionGivenTogether(): Unit = {
Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
//Given
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "test-group",
"--partition", "0")
try assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args))
finally Exit.resetExitProcedure()
}
@Test
def shouldExitOnOffsetWithoutPartition(): Unit = {
Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
//Given
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--offset", "10")
try assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args))
finally Exit.resetExitProcedure()
}
@Test
def testDefaultMessageFormatter(): Unit = {
val record = new ConsumerRecord("topic", 0, 123, "key".getBytes, "value".getBytes)
val formatter = new DefaultMessageFormatter()
val configs: JMap[String, String] = new HashMap()
formatter.configure(configs)
var out = new ByteArrayOutputStream()
formatter.writeTo(record, new PrintStream(out))
assertEquals("value\n", out.toString)
configs.put("print.key", "true")
formatter.configure(configs)
out = new ByteArrayOutputStream()
formatter.writeTo(record, new PrintStream(out))
assertEquals("key\tvalue\n", out.toString)
configs.put("print.partition", "true")
formatter.configure(configs)
out = new ByteArrayOutputStream()
formatter.writeTo(record, new PrintStream(out))
assertEquals("Partition:0\tkey\tvalue\n", out.toString)
configs.put("print.timestamp", "true")
formatter.configure(configs)
out = new ByteArrayOutputStream()
formatter.writeTo(record, new PrintStream(out))
assertEquals("NO_TIMESTAMP\tPartition:0\tkey\tvalue\n", out.toString)
configs.put("print.offset", "true")
formatter.configure(configs)
out = new ByteArrayOutputStream()
formatter.writeTo(record, new PrintStream(out))
assertEquals("NO_TIMESTAMP\tPartition:0\tOffset:123\tkey\tvalue\n", out.toString)
out = new ByteArrayOutputStream()
val record2 = new ConsumerRecord("topic", 0, 123, 123L, TimestampType.CREATE_TIME, -1, -1, "key".getBytes, "value".getBytes,
new RecordHeaders(), Optional.empty[Integer])
formatter.writeTo(record2, new PrintStream(out))
assertEquals("CreateTime:123\tPartition:0\tOffset:123\tkey\tvalue\n", out.toString)
formatter.close()
}
@Test
def testNoOpMessageFormatter(): Unit = {
val record = new ConsumerRecord("topic", 0, 123, "key".getBytes, "value".getBytes)
val formatter = new NoOpMessageFormatter()
formatter.configure(new HashMap())
val out = new ByteArrayOutputStream()
formatter.writeTo(record, new PrintStream(out))
assertEquals("", out.toString)
}
@Test
def shouldExitIfNoTopicOrFilterSpecified(): Unit = {
Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
//Given
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092")
try assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args))
finally Exit.resetExitProcedure()
}
@Test
def shouldExitIfTopicAndIncludeSpecified(): Unit = {
Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
//Given
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--include", "includeTest*")
try assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args))
finally Exit.resetExitProcedure()
}
@Test
def shouldExitIfTopicAndWhitelistSpecified(): Unit = {
Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
//Given
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--whitelist", "whitelistTest*")
try assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args))
finally Exit.resetExitProcedure()
}
@Test
def testClientIdOverride(): Unit = {
//Given
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--from-beginning",
"--consumer-property", "client.id=consumer-1")
//When
val config = new ConsoleConsumer.ConsumerConfig(args)
val consumerProperties = ConsoleConsumer.consumerProps(config)
//Then
assertEquals("consumer-1", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG))
}
@Test
def testDefaultClientId(): Unit = {
//Given
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--from-beginning")
//When
val config = new ConsoleConsumer.ConsumerConfig(args)
val consumerProperties = ConsoleConsumer.consumerProps(config)
//Then
assertEquals("console-consumer", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG))
}
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.integration;
import kafka.tools.ConsoleConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
@ -62,6 +61,8 @@ import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.consumer.ConsoleConsumer;
import org.apache.kafka.tools.consumer.ConsoleConsumerOptions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
@ -1117,7 +1118,7 @@ public class KStreamAggregationIntegrationTest {
final Deserializer<V> valueDeserializer,
final Class innerClass,
final int numMessages,
final boolean printTimestamp) {
final boolean printTimestamp) throws Exception {
final ByteArrayOutputStream newConsole = new ByteArrayOutputStream();
final PrintStream originalStream = System.out;
try (final PrintStream newStream = new PrintStream(newConsole)) {
@ -1139,8 +1140,7 @@ public class KStreamAggregationIntegrationTest {
"--property", "key.deserializer.window.size.ms=500",
};
ConsoleConsumer.messageCount_$eq(0); //reset the message count
ConsoleConsumer.run(new ConsoleConsumer.ConsumerConfig(args));
ConsoleConsumer.run(new ConsoleConsumerOptions(args));
newStream.flush();
System.setOut(originalStream);
return newConsole.toString();

View File

@ -21,7 +21,7 @@ from ducktape.utils.util import wait_until
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.monitor.jmx import JmxMixin, JmxTool
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0, LATEST_3_7
from kafkatest.services.kafka.util import fix_opts_for_new_jvm
"""
@ -210,6 +210,9 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
# LoggingMessageFormatter was introduced after 0.9
if node.version > LATEST_0_9:
if node.version > LATEST_3_7:
cmd += " --formatter org.apache.kafka.tools.consumer.LoggingMessageFormatter"
else:
cmd += " --formatter kafka.tools.LoggingMessageFormatter"
if self.enable_systest_events:

View File

@ -263,3 +263,7 @@ LATEST_3_5 = V_3_5_2
V_3_6_0 = KafkaVersion("3.6.0")
V_3_6_1 = KafkaVersion("3.6.1")
LATEST_3_6 = V_3_6_1
# 3.7.x version
V_3_7_0 = KafkaVersion("3.7.0")
LATEST_3_7 = V_3_7_0

View File

@ -0,0 +1,234 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools.consumer;
import java.io.PrintStream;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.CountDownLatch;
import java.util.regex.Pattern;
import java.util.Collections;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Consumer that dumps messages to standard out.
*/
public class ConsoleConsumer {
private static final Logger LOG = LoggerFactory.getLogger(ConsoleConsumer.class);
private static final CountDownLatch SHUTDOWN_LATCH = new CountDownLatch(1);
static int messageCount = 0;
public static void main(String[] args) throws Exception {
ConsoleConsumerOptions opts = new ConsoleConsumerOptions(args);
try {
run(opts);
} catch (AuthenticationException ae) {
LOG.error("Authentication failed: terminating consumer process", ae);
Exit.exit(1);
} catch (Throwable t) {
LOG.error("Unknown error when running consumer: ", t);
Exit.exit(1);
}
}
public static void run(ConsoleConsumerOptions opts) {
messageCount = 0;
long timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : Long.MAX_VALUE;
Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(opts.consumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
ConsumerWrapper consumerWrapper = opts.partitionArg().isPresent()
? new ConsumerWrapper(Optional.of(opts.topicArg()), opts.partitionArg(), OptionalLong.of(opts.offsetArg()), Optional.empty(), consumer, timeoutMs)
: new ConsumerWrapper(Optional.of(opts.topicArg()), OptionalInt.empty(), OptionalLong.empty(), Optional.ofNullable(opts.includedTopicsArg()), consumer, timeoutMs);
addShutdownHook(consumerWrapper, opts);
try {
process(opts.maxMessages(), opts.formatter(), consumerWrapper, System.out, opts.skipMessageOnError());
} finally {
consumerWrapper.cleanup();
opts.formatter().close();
reportRecordCount();
SHUTDOWN_LATCH.countDown();
}
}
static void addShutdownHook(ConsumerWrapper consumer, ConsoleConsumerOptions conf) {
Exit.addShutdownHook("consumer-shutdown-hook", () -> {
try {
consumer.wakeup();
SHUTDOWN_LATCH.await();
} catch (Throwable t) {
LOG.error("Exception while running shutdown hook: ", t);
}
if (conf.enableSystestEventsLogging()) {
System.out.println("shutdown_complete");
}
});
}
static void process(int maxMessages, MessageFormatter formatter, ConsumerWrapper consumer, PrintStream output, boolean skipMessageOnError) {
while (messageCount < maxMessages || maxMessages == -1) {
ConsumerRecord<byte[], byte[]> msg;
try {
msg = consumer.receive();
} catch (WakeupException we) {
LOG.trace("Caught WakeupException because consumer is shutdown, ignore and terminate.");
// Consumer will be closed
return;
} catch (Throwable t) {
LOG.error("Error processing message, terminating consumer process: ", t);
// Consumer will be closed
return;
}
messageCount += 1;
try {
formatter.writeTo(new ConsumerRecord<>(msg.topic(), msg.partition(), msg.offset(), msg.timestamp(), msg.timestampType(),
0, 0, msg.key(), msg.value(), msg.headers(), Optional.empty()), output);
} catch (Throwable t) {
if (skipMessageOnError) {
LOG.error("Error processing message, skipping this message: ", t);
} else {
// Consumer will be closed
throw t;
}
}
if (checkErr(output)) {
// Consumer will be closed
return;
}
}
}
static void reportRecordCount() {
System.err.println("Processed a total of " + messageCount + " messages");
}
static boolean checkErr(PrintStream output) {
boolean gotError = output.checkError();
if (gotError) {
// This means no one is listening to our output stream anymore, time to shut down
System.err.println("Unable to write to standard out, closing consumer.");
}
return gotError;
}
public static class ConsumerWrapper {
final Optional<String> topic;
final OptionalInt partitionId;
final OptionalLong offset;
final Optional<String> includedTopics;
final Consumer<byte[], byte[]> consumer;
final long timeoutMs;
final Time time = Time.SYSTEM;
Iterator<ConsumerRecord<byte[], byte[]>> recordIter = Collections.emptyIterator();
public ConsumerWrapper(Optional<String> topic,
OptionalInt partitionId,
OptionalLong offset,
Optional<String> includedTopics,
Consumer<byte[], byte[]> consumer,
long timeoutMs) {
this.topic = topic;
this.partitionId = partitionId;
this.offset = offset;
this.includedTopics = includedTopics;
this.consumer = consumer;
this.timeoutMs = timeoutMs;
if (topic.isPresent() && partitionId.isPresent() && offset.isPresent() && !includedTopics.isPresent()) {
seek(topic.get(), partitionId.getAsInt(), offset.getAsLong());
} else if (topic.isPresent() && partitionId.isPresent() && !offset.isPresent() && !includedTopics.isPresent()) {
// default to latest if no offset is provided
seek(topic.get(), partitionId.getAsInt(), ListOffsetsRequest.LATEST_TIMESTAMP);
} else if (topic.isPresent() && !partitionId.isPresent() && !offset.isPresent() && !includedTopics.isPresent()) {
consumer.subscribe(Collections.singletonList(topic.get()));
} else if (!topic.isPresent() && !partitionId.isPresent() && !offset.isPresent() && includedTopics.isPresent()) {
consumer.subscribe(Pattern.compile(includedTopics.get()));
} else {
throw new IllegalArgumentException("An invalid combination of arguments is provided. " +
"Exactly one of 'topic' or 'include' must be provided. " +
"If 'topic' is provided, an optional 'partition' may also be provided. " +
"If 'partition' is provided, an optional 'offset' may also be provided, otherwise, consumption starts from the end of the partition.");
}
}
private void seek(String topic, int partitionId, long offset) {
TopicPartition topicPartition = new TopicPartition(topic, partitionId);
consumer.assign(Collections.singletonList(topicPartition));
if (offset == ListOffsetsRequest.EARLIEST_TIMESTAMP) {
consumer.seekToBeginning(Collections.singletonList(topicPartition));
} else if (offset == ListOffsetsRequest.LATEST_TIMESTAMP) {
consumer.seekToEnd(Collections.singletonList(topicPartition));
} else {
consumer.seek(topicPartition, offset);
}
}
void resetUnconsumedOffsets() {
Map<TopicPartition, Long> smallestUnconsumedOffsets = new HashMap<>();
while (recordIter.hasNext()) {
ConsumerRecord<byte[], byte[]> record = recordIter.next();
TopicPartition tp = new TopicPartition(record.topic(), record.partition());
// avoid auto-committing offsets which haven't been consumed
smallestUnconsumedOffsets.putIfAbsent(tp, record.offset());
}
smallestUnconsumedOffsets.forEach(consumer::seek);
}
ConsumerRecord<byte[], byte[]> receive() {
long startTimeMs = time.milliseconds();
while (!recordIter.hasNext()) {
recordIter = consumer.poll(Duration.ofMillis(timeoutMs)).iterator();
if (!recordIter.hasNext() && (time.milliseconds() - startTimeMs > timeoutMs)) {
throw new TimeoutException();
}
}
return recordIter.next();
}
void wakeup() {
this.consumer.wakeup();
}
void cleanup() {
resetUnconsumedOffsets();
this.consumer.close();
}
}
}

View File

@ -0,0 +1,414 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools.consumer;
import joptsimple.OptionException;
import joptsimple.OptionSpec;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
public final class ConsoleConsumerOptions extends CommandDefaultOptions {
private static final Random RANDOM = new Random();
private final OptionSpec<String> topicOpt;
private final OptionSpec<String> whitelistOpt;
private final OptionSpec<String> includeOpt;
private final OptionSpec<Integer> partitionIdOpt;
private final OptionSpec<String> offsetOpt;
private final OptionSpec<String> messageFormatterOpt;
private final OptionSpec<String> messageFormatterArgOpt;
private final OptionSpec<String> messageFormatterConfigOpt;
private final OptionSpec<?> resetBeginningOpt;
private final OptionSpec<Integer> maxMessagesOpt;
private final OptionSpec<Integer> timeoutMsOpt;
private final OptionSpec<?> skipMessageOnErrorOpt;
private final OptionSpec<String> bootstrapServerOpt;
private final OptionSpec<String> keyDeserializerOpt;
private final OptionSpec<String> valueDeserializerOpt;
private final OptionSpec<?> enableSystestEventsLoggingOpt;
private final OptionSpec<String> isolationLevelOpt;
private final OptionSpec<String> groupIdOpt;
private final Properties consumerProps;
private final long offset;
private final MessageFormatter formatter;
public ConsoleConsumerOptions(String[] args) throws IOException {
super(args);
topicOpt = parser.accepts("topic", "The topic to consume on.")
.withRequiredArg()
.describedAs("topic")
.ofType(String.class);
whitelistOpt = parser.accepts("whitelist",
"DEPRECATED, use --include instead; ignored if --include specified. Regular expression specifying list of topics to include for consumption.")
.withRequiredArg()
.describedAs("Java regex (String)")
.ofType(String.class);
includeOpt = parser.accepts("include",
"Regular expression specifying list of topics to include for consumption.")
.withRequiredArg()
.describedAs("Java regex (String)")
.ofType(String.class);
partitionIdOpt = parser.accepts("partition",
"The partition to consume from. Consumption starts from the end of the partition unless '--offset' is specified.")
.withRequiredArg()
.describedAs("partition")
.ofType(Integer.class);
offsetOpt = parser.accepts("offset", "The offset to consume from (a non-negative number), or 'earliest' which means from beginning, or 'latest' which means from end")
.withRequiredArg()
.describedAs("consume offset")
.ofType(String.class)
.defaultsTo("latest");
OptionSpec<String> consumerPropertyOpt = parser.accepts("consumer-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.")
.withRequiredArg()
.describedAs("consumer_prop")
.ofType(String.class);
OptionSpec<String> consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file. Note that " + consumerPropertyOpt + " takes precedence over this config.")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.")
.withRequiredArg()
.describedAs("class")
.ofType(String.class)
.defaultsTo(DefaultMessageFormatter.class.getName());
messageFormatterArgOpt = parser.accepts("property",
"The properties to initialize the message formatter. Default properties include: \n" +
" print.timestamp=true|false\n" +
" print.key=true|false\n" +
" print.offset=true|false\n" +
" print.partition=true|false\n" +
" print.headers=true|false\n" +
" print.value=true|false\n" +
" key.separator=<key.separator>\n" +
" line.separator=<line.separator>\n" +
" headers.separator=<line.separator>\n" +
" null.literal=<null.literal>\n" +
" key.deserializer=<key.deserializer>\n" +
" value.deserializer=<value.deserializer>\n" +
" header.deserializer=<header.deserializer>\n" +
"\nUsers can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers.")
.withRequiredArg()
.describedAs("prop")
.ofType(String.class);
messageFormatterConfigOpt = parser.accepts("formatter-config", "Config properties file to initialize the message formatter. Note that " + messageFormatterArgOpt + " takes precedence over this config.")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " +
"start with the earliest message present in the log rather than the latest message.");
maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.")
.withRequiredArg()
.describedAs("num_messages")
.ofType(Integer.class);
timeoutMsOpt = parser.accepts("timeout-ms", "If specified, exit if no message is available for consumption for the specified interval.")
.withRequiredArg()
.describedAs("timeout_ms")
.ofType(Integer.class);
skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
"skip it instead of halt.");
bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The server(s) to connect to.")
.withRequiredArg()
.describedAs("server to connect to")
.ofType(String.class);
keyDeserializerOpt = parser.accepts("key-deserializer")
.withRequiredArg()
.describedAs("deserializer for key")
.ofType(String.class);
valueDeserializerOpt = parser.accepts("value-deserializer")
.withRequiredArg()
.describedAs("deserializer for values")
.ofType(String.class);
enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events",
"Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.)");
isolationLevelOpt = parser.accepts("isolation-level",
"Set to read_committed in order to filter out transactional messages which are not committed. Set to read_uncommitted " +
"to read all messages.")
.withRequiredArg()
.ofType(String.class)
.defaultsTo("read_uncommitted");
groupIdOpt = parser.accepts("group", "The consumer group id of the consumer.")
.withRequiredArg()
.describedAs("consumer group id")
.ofType(String.class);
try {
options = parser.parse(args);
} catch (OptionException oe) {
CommandLineUtils.printUsageAndExit(parser, oe.getMessage());
}
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read data from Kafka topics and outputs it to standard output.");
checkRequiredArgs();
Properties consumerPropsFromFile = options.has(consumerConfigOpt)
? Utils.loadProps(options.valueOf(consumerConfigOpt))
: new Properties();
Properties extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt));
Set<String> groupIdsProvided = checkConsumerGroup(consumerPropsFromFile, extraConsumerProps);
consumerProps = buildConsumerProps(consumerPropsFromFile, extraConsumerProps, groupIdsProvided);
offset = parseOffset();
formatter = buildFormatter();
}
private void checkRequiredArgs() {
List<String> topicOrFilterArgs = new ArrayList<>(Arrays.asList(topicArg(), includedTopicsArg()));
topicOrFilterArgs.removeIf(Objects::isNull);
// user need to specify value for either --topic or one of the include filters options (--include or --whitelist)
if (topicOrFilterArgs.size() != 1) {
CommandLineUtils.printUsageAndExit(parser, "Exactly one of --include/--topic is required. " +
(options.has(whitelistOpt) ? "--whitelist is DEPRECATED use --include instead; ignored if --include specified." : ""));
}
if (partitionArg().isPresent()) {
if (!options.has(topicOpt)) {
CommandLineUtils.printUsageAndExit(parser, "The topic is required when partition is specified.");
}
if (fromBeginning() && options.has(offsetOpt)) {
CommandLineUtils.printUsageAndExit(parser, "Options from-beginning and offset cannot be specified together.");
}
} else if (options.has(offsetOpt)) {
CommandLineUtils.printUsageAndExit(parser, "The partition is required when offset is specified.");
}
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt);
}
private Set<String> checkConsumerGroup(Properties consumerPropsFromFile, Properties extraConsumerProps) {
// if the group id is provided in more than place (through different means) all values must be the same
Set<String> groupIdsProvided = new HashSet<>();
if (options.has(groupIdOpt)) {
groupIdsProvided.add(options.valueOf(groupIdOpt));
}
if (consumerPropsFromFile.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
groupIdsProvided.add((String) consumerPropsFromFile.get(ConsumerConfig.GROUP_ID_CONFIG));
}
if (extraConsumerProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
groupIdsProvided.add(extraConsumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
}
if (groupIdsProvided.size() > 1) {
CommandLineUtils.printUsageAndExit(parser, "The group ids provided in different places (directly using '--group', "
+ "via '--consumer-property', or via '--consumer.config') do not match. "
+ "Detected group ids: "
+ groupIdsProvided.stream().map(group -> "'" + group + "'").collect(Collectors.joining(", ")));
}
if (!groupIdsProvided.isEmpty() && partitionArg().isPresent()) {
CommandLineUtils.printUsageAndExit(parser, "Options group and partition cannot be specified together.");
}
return groupIdsProvided;
}
private Properties buildConsumerProps(Properties consumerPropsFromFile, Properties extraConsumerProps, Set<String> groupIdsProvided) {
Properties consumerProps = new Properties();
consumerProps.putAll(consumerPropsFromFile);
consumerProps.putAll(extraConsumerProps);
setAutoOffsetResetValue(consumerProps);
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer());
if (consumerProps.getProperty(ConsumerConfig.CLIENT_ID_CONFIG) == null) {
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "console-consumer");
}
CommandLineUtils.maybeMergeOptions(consumerProps, ConsumerConfig.ISOLATION_LEVEL_CONFIG, options, isolationLevelOpt);
if (groupIdsProvided.isEmpty()) {
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "console-consumer-" + RANDOM.nextInt(100000));
// By default, avoid unnecessary expansion of the coordinator cache since
// the auto-generated group and its offsets is not intended to be used again
if (!consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
}
} else {
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdsProvided.iterator().next());
}
return consumerProps;
}
/**
* Used to retrieve the correct value for the consumer parameter 'auto.offset.reset'.
* Order of priority is:
* 1. Explicitly set parameter via --consumer.property command line parameter
* 2. Explicit --from-beginning given -> 'earliest'
* 3. Default value of 'latest'
* In case both --from-beginning and an explicit value are specified an error is thrown if these
* are conflicting.
*/
private void setAutoOffsetResetValue(Properties props) {
String earliestConfigValue = "earliest";
String latestConfigValue = "latest";
if (props.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
// auto.offset.reset parameter was specified on the command line
String autoResetOption = props.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
if (fromBeginning() && !earliestConfigValue.equals(autoResetOption)) {
// conflicting options - latest und earliest, throw an error
System.err.println("Can't simultaneously specify --from-beginning and 'auto.offset.reset=" + autoResetOption + "', " +
"please remove one option");
Exit.exit(1);
}
// nothing to do, checking for valid parameter values happens later and the specified
// value was already copied during .putall operation
} else {
// no explicit value for auto.offset.reset was specified
// if --from-beginning was specified use earliest, otherwise default to latest
String autoResetOption = fromBeginning() ? earliestConfigValue : latestConfigValue;
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoResetOption);
}
}
private long parseOffset() {
if (options.has(offsetOpt)) {
switch (options.valueOf(offsetOpt).toLowerCase(Locale.ROOT)) {
case "earliest":
return ListOffsetsRequest.EARLIEST_TIMESTAMP;
case "latest":
return ListOffsetsRequest.LATEST_TIMESTAMP;
default:
String offsetString = options.valueOf(offsetOpt);
try {
long offset = Long.parseLong(offsetString);
if (offset < 0) {
invalidOffset(offsetString);
}
return offset;
} catch (NumberFormatException nfe) {
invalidOffset(offsetString);
}
}
} else if (fromBeginning()) {
return ListOffsetsRequest.EARLIEST_TIMESTAMP;
}
return ListOffsetsRequest.LATEST_TIMESTAMP;
}
private void invalidOffset(String offset) {
CommandLineUtils.printUsageAndExit(parser, "The provided offset value '" + offset + "' is incorrect. Valid values are " +
"'earliest', 'latest', or a non-negative long.");
}
private MessageFormatter buildFormatter() {
MessageFormatter formatter = null;
try {
Class<?> messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt));
formatter = (MessageFormatter) messageFormatterClass.getDeclaredConstructor().newInstance();
Properties formatterArgs = formatterArgs();
Map<String, String> formatterConfigs = new HashMap<>();
for (final String name : formatterArgs.stringPropertyNames()) {
formatterConfigs.put(name, formatterArgs.getProperty(name));
}
formatter.configure(formatterConfigs);
} catch (Exception e) {
CommandLineUtils.printUsageAndExit(parser, e.getMessage());
}
return formatter;
}
Properties consumerProps() {
return consumerProps;
}
boolean fromBeginning() {
return options.has(resetBeginningOpt);
}
long offsetArg() {
return offset;
}
boolean skipMessageOnError() {
return options.has(skipMessageOnErrorOpt);
}
OptionalInt partitionArg() {
if (options.has(partitionIdOpt)) {
return OptionalInt.of(options.valueOf(partitionIdOpt));
}
return OptionalInt.empty();
}
String topicArg() {
return options.valueOf(topicOpt);
}
int maxMessages() {
return options.has(maxMessagesOpt) ? options.valueOf(maxMessagesOpt) : -1;
}
int timeoutMs() {
return options.has(timeoutMsOpt) ? options.valueOf(timeoutMsOpt) : -1;
}
boolean enableSystestEventsLogging() {
return options.has(enableSystestEventsLoggingOpt);
}
String bootstrapServer() {
return options.valueOf(bootstrapServerOpt);
}
String includedTopicsArg() {
return options.has(includeOpt)
? options.valueOf(includeOpt)
: options.valueOf(whitelistOpt);
}
Properties formatterArgs() throws IOException {
Properties formatterArgs = options.has(messageFormatterConfigOpt)
? Utils.loadProps(options.valueOf(messageFormatterConfigOpt))
: new Properties();
String keyDeserializer = options.valueOf(keyDeserializerOpt);
if (keyDeserializer != null && !keyDeserializer.isEmpty()) {
formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
}
String valueDeserializer = options.valueOf(valueDeserializerOpt);
if (valueDeserializer != null && !valueDeserializer.isEmpty()) {
formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
}
formatterArgs.putAll(CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt)));
return formatterArgs;
}
MessageFormatter formatter() {
return formatter;
}
}

View File

@ -0,0 +1,210 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
class DefaultMessageFormatter implements MessageFormatter {
private static final Logger LOG = LoggerFactory.getLogger(DefaultMessageFormatter.class);
private boolean printTimestamp = false;
private boolean printKey = false;
private boolean printValue = true;
private boolean printPartition = false;
private boolean printOffset = false;
private boolean printHeaders = false;
private byte[] keySeparator = utfBytes("\t");
private byte[] lineSeparator = utfBytes("\n");
private byte[] headersSeparator = utfBytes(",");
private byte[] nullLiteral = utfBytes("null");
private Optional<Deserializer<?>> keyDeserializer = Optional.empty();
private Optional<Deserializer<?>> valueDeserializer = Optional.empty();
private Optional<Deserializer<?>> headersDeserializer = Optional.empty();
@Override
public void configure(Map<String, ?> configs) {
if (configs.containsKey("print.timestamp")) {
printTimestamp = getBoolProperty(configs, "print.timestamp");
}
if (configs.containsKey("print.key")) {
printKey = getBoolProperty(configs, "print.key");
}
if (configs.containsKey("print.offset")) {
printOffset = getBoolProperty(configs, "print.offset");
}
if (configs.containsKey("print.partition")) {
printPartition = getBoolProperty(configs, "print.partition");
}
if (configs.containsKey("print.headers")) {
printHeaders = getBoolProperty(configs, "print.headers");
}
if (configs.containsKey("print.value")) {
printValue = getBoolProperty(configs, "print.value");
}
if (configs.containsKey("key.separator")) {
keySeparator = getByteProperty(configs, "key.separator");
}
if (configs.containsKey("line.separator")) {
lineSeparator = getByteProperty(configs, "line.separator");
}
if (configs.containsKey("headers.separator")) {
headersSeparator = getByteProperty(configs, "headers.separator");
}
if (configs.containsKey("null.literal")) {
nullLiteral = getByteProperty(configs, "null.literal");
}
keyDeserializer = getDeserializerProperty(configs, true, "key.deserializer");
valueDeserializer = getDeserializerProperty(configs, false, "value.deserializer");
headersDeserializer = getDeserializerProperty(configs, false, "headers.deserializer");
}
// for testing
public boolean printValue() {
return printValue;
}
// for testing
public Optional<Deserializer<?>> keyDeserializer() {
return keyDeserializer;
}
private void writeSeparator(PrintStream output, boolean columnSeparator) {
try {
if (columnSeparator) {
output.write(keySeparator);
} else {
output.write(lineSeparator);
}
} catch (IOException ioe) {
LOG.error("Unable to write the separator to the output", ioe);
}
}
private byte[] deserialize(ConsumerRecord<byte[], byte[]> consumerRecord, Optional<Deserializer<?>> deserializer, byte[] sourceBytes, String topic) {
byte[] nonNullBytes = sourceBytes != null ? sourceBytes : nullLiteral;
return deserializer.map(value -> utfBytes(value.deserialize(topic, consumerRecord.headers(), nonNullBytes).toString())).orElse(nonNullBytes);
}
@Override
public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output) {
try {
if (printTimestamp) {
if (consumerRecord.timestampType() != TimestampType.NO_TIMESTAMP_TYPE) {
output.print(consumerRecord.timestampType() + ":" + consumerRecord.timestamp());
} else {
output.print("NO_TIMESTAMP");
}
writeSeparator(output, printOffset || printPartition || printHeaders || printKey || printValue);
}
if (printPartition) {
output.print("Partition:");
output.print(consumerRecord.partition());
writeSeparator(output, printOffset || printHeaders || printKey || printValue);
}
if (printOffset) {
output.print("Offset:");
output.print(consumerRecord.offset());
writeSeparator(output, printHeaders || printKey || printValue);
}
if (printHeaders) {
Iterator<Header> headersIt = consumerRecord.headers().iterator();
if (!headersIt.hasNext()) {
output.print("NO_HEADERS");
} else {
while (headersIt.hasNext()) {
Header header = headersIt.next();
output.print(header.key() + ":");
output.write(deserialize(consumerRecord, headersDeserializer, header.value(), consumerRecord.topic()));
if (headersIt.hasNext()) {
output.write(headersSeparator);
}
}
}
writeSeparator(output, printKey || printValue);
}
if (printKey) {
output.write(deserialize(consumerRecord, keyDeserializer, consumerRecord.key(), consumerRecord.topic()));
writeSeparator(output, printValue);
}
if (printValue) {
output.write(deserialize(consumerRecord, valueDeserializer, consumerRecord.value(), consumerRecord.topic()));
output.write(lineSeparator);
}
} catch (IOException ioe) {
LOG.error("Unable to write the consumer record to the output", ioe);
}
}
private Map<String, ?> propertiesWithKeyPrefixStripped(String prefix, Map<String, ?> configs) {
final Map<String, Object> newConfigs = new HashMap<>();
for (Map.Entry<String, ?> entry : configs.entrySet()) {
if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) {
newConfigs.put(entry.getKey().substring(prefix.length()), entry.getValue());
}
}
return newConfigs;
}
private byte[] utfBytes(String str) {
return str.getBytes(StandardCharsets.UTF_8);
}
private byte[] getByteProperty(Map<String, ?> configs, String key) {
return utfBytes((String) configs.get(key));
}
private boolean getBoolProperty(Map<String, ?> configs, String key) {
return ((String) configs.get(key)).trim().equalsIgnoreCase("true");
}
private Optional<Deserializer<?>> getDeserializerProperty(Map<String, ?> configs, boolean isKey, String key) {
if (configs.containsKey(key)) {
String name = (String) configs.get(key);
try {
Deserializer<?> deserializer = (Deserializer<?>) Class.forName(name).getDeclaredConstructor().newInstance();
Map<String, ?> deserializerConfig = propertiesWithKeyPrefixStripped(key + ".", configs);
deserializer.configure(deserializerConfig, isKey);
return Optional.of(deserializer);
} catch (Exception e) {
LOG.error("Unable to instantiate a deserializer from " + name, e);
}
}
return Optional.empty();
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.record.TimestampType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
class LoggingMessageFormatter implements MessageFormatter {
private static final Logger LOG = LoggerFactory.getLogger(LoggingMessageFormatter.class);
private final DefaultMessageFormatter defaultWriter = new DefaultMessageFormatter();
@Override
public void configure(Map<String, ?> configs) {
defaultWriter.configure(configs);
}
@Override
public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output) {
defaultWriter.writeTo(consumerRecord, output);
String timestamp = consumerRecord.timestampType() != TimestampType.NO_TIMESTAMP_TYPE
? consumerRecord.timestampType() + ":" + consumerRecord.timestamp() + ", "
: "";
String key = "key:" + (consumerRecord.key() == null ? "null " : new String(consumerRecord.key(), StandardCharsets.UTF_8) + ", ");
String value = "value:" + (consumerRecord.value() == null ? "null" : new String(consumerRecord.value(), StandardCharsets.UTF_8));
LOG.info(timestamp + key + value);
}
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MessageFormatter;
import java.io.PrintStream;
class NoOpMessageFormatter implements MessageFormatter {
@Override
public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output) {
}
}

View File

@ -28,6 +28,8 @@ import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.storage.internals.log.LogConfig;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
@ -198,6 +200,14 @@ public class ToolsTestUtils {
.collect(Collectors.joining(","));
}
public static File tempPropertiesFile(Map<String, String> properties) throws IOException {
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, String> entry : properties.entrySet()) {
sb.append(entry.getKey() + "=" + entry.getValue() + System.lineSeparator());
}
return org.apache.kafka.test.TestUtils.tempFile(sb.toString());
}
public static class MockExitProcedure implements Exit.Procedure {
private boolean hasExited = false;
private int statusCode;

View File

@ -0,0 +1,621 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.test.MockDeserializer;
import org.apache.kafka.tools.ToolsTestUtils;
import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ConsoleConsumerOptionsTest {
@Test
public void shouldParseValidConsumerValidConfig() throws IOException {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--from-beginning"
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("test", config.topicArg());
assertTrue(config.fromBeginning());
assertFalse(config.enableSystestEventsLogging());
assertFalse(config.skipMessageOnError());
assertEquals(-1, config.maxMessages());
assertEquals(-1, config.timeoutMs());
}
@Test
public void shouldParseIncludeArgument() throws IOException {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--include", "includeTest*",
"--from-beginning"
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("includeTest*", config.includedTopicsArg());
assertTrue(config.fromBeginning());
}
@Test
public void shouldParseWhitelistArgument() throws IOException {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--whitelist", "whitelistTest*",
"--from-beginning"
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("whitelistTest*", config.includedTopicsArg());
assertTrue(config.fromBeginning());
}
@Test
public void shouldIgnoreWhitelistArgumentIfIncludeSpecified() throws IOException {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--include", "includeTest*",
"--whitelist", "whitelistTest*",
"--from-beginning"
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("includeTest*", config.includedTopicsArg());
assertTrue(config.fromBeginning());
}
@Test
public void shouldParseValidSimpleConsumerValidConfigWithNumericOffset() throws IOException {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--partition", "0",
"--offset", "3"
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("test", config.topicArg());
assertTrue(config.partitionArg().isPresent());
assertEquals(0, config.partitionArg().getAsInt());
assertEquals(3, config.offsetArg());
assertFalse(config.fromBeginning());
}
@Test
public void shouldExitOnUnrecognizedNewConsumerOption() {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});
String[] args = new String[]{
"--new-consumer",
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--from-beginning"
};
try {
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
} finally {
Exit.resetExitProcedure();
}
}
@Test
public void shouldExitIfPartitionButNoTopic() {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--include", "test.*",
"--partition", "0"
};
try {
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
} finally {
Exit.resetExitProcedure();
}
}
@Test
public void shouldExitIfFromBeginningAndOffset() {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--partition", "0",
"--from-beginning",
"--offset", "123"
};
try {
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
} finally {
Exit.resetExitProcedure();
}
}
@Test
public void shouldParseValidSimpleConsumerValidConfigWithStringOffset() throws Exception {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--partition", "0",
"--offset", "LatEst",
"--property", "print.value=false"
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("test", config.topicArg());
assertTrue(config.partitionArg().isPresent());
assertEquals(0, config.partitionArg().getAsInt());
assertEquals(-1, config.offsetArg());
assertFalse(config.fromBeginning());
assertFalse(((DefaultMessageFormatter) config.formatter()).printValue());
}
@Test
public void shouldParseValidConsumerConfigWithAutoOffsetResetLatest() throws IOException {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--consumer-property", "auto.offset.reset=latest"
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
Properties consumerProperties = config.consumerProps();
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("test", config.topicArg());
assertFalse(config.fromBeginning());
assertEquals("latest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
}
@Test
public void shouldParseValidConsumerConfigWithAutoOffsetResetEarliest() throws IOException {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--consumer-property", "auto.offset.reset=earliest"
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
Properties consumerProperties = config.consumerProps();
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("test", config.topicArg());
assertFalse(config.fromBeginning());
assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
}
@Test
public void shouldParseValidConsumerConfigWithAutoOffsetResetAndMatchingFromBeginning() throws IOException {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--consumer-property", "auto.offset.reset=earliest",
"--from-beginning"
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
Properties consumerProperties = config.consumerProps();
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("test", config.topicArg());
assertTrue(config.fromBeginning());
assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
}
@Test
public void shouldParseValidConsumerConfigWithNoOffsetReset() throws IOException {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test"
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
Properties consumerProperties = config.consumerProps();
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("test", config.topicArg());
assertFalse(config.fromBeginning());
assertEquals("latest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
}
@Test
public void shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginning() {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--consumer-property", "auto.offset.reset=latest",
"--from-beginning"
};
try {
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
} finally {
Exit.resetExitProcedure();
}
}
@Test
public void shouldParseConfigsFromFile() throws IOException {
Map<String, String> configs = new HashMap<>();
configs.put("request.timeout.ms", "1000");
configs.put("group.id", "group1");
File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--consumer.config", propsFile.getAbsolutePath()
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
assertEquals("1000", config.consumerProps().get("request.timeout.ms"));
assertEquals("group1", config.consumerProps().get("group.id"));
}
@Test
public void groupIdsProvidedInDifferentPlacesMustMatch() throws IOException {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});
// different in all three places
File propsFile = ToolsTestUtils.tempPropertiesFile(Collections.singletonMap("group.id", "group-from-file"));
final String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "group-from-arguments",
"--consumer-property", "group.id=group-from-properties",
"--consumer.config", propsFile.getAbsolutePath()
};
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
// the same in all three places
propsFile = ToolsTestUtils.tempPropertiesFile(Collections.singletonMap("group.id", "test-group"));
final String[] args1 = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "test-group",
"--consumer-property", "group.id=test-group",
"--consumer.config", propsFile.getAbsolutePath()
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args1);
Properties props = config.consumerProps();
assertEquals("test-group", props.getProperty("group.id"));
// different via --consumer-property and --consumer.config
propsFile = ToolsTestUtils.tempPropertiesFile(Collections.singletonMap("group.id", "group-from-file"));
final String[] args2 = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--consumer-property", "group.id=group-from-properties",
"--consumer.config", propsFile.getAbsolutePath()
};
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args2));
// different via --consumer-property and --group
final String[] args3 = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "group-from-arguments",
"--consumer-property", "group.id=group-from-properties"
};
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args3));
// different via --group and --consumer.config
propsFile = ToolsTestUtils.tempPropertiesFile(Collections.singletonMap("group.id", "group-from-file"));
final String[] args4 = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "group-from-arguments",
"--consumer.config", propsFile.getAbsolutePath()
};
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args4));
// via --group only
final String[] args5 = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "group-from-arguments"
};
config = new ConsoleConsumerOptions(args5);
props = config.consumerProps();
assertEquals("group-from-arguments", props.getProperty("group.id"));
Exit.resetExitProcedure();
}
@Test
public void testCustomPropertyShouldBePassedToConfigureMethod() throws Exception {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--property", "print.key=true",
"--property", "key.deserializer=org.apache.kafka.test.MockDeserializer",
"--property", "key.deserializer.my-props=abc"
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
assertTrue(config.formatter() instanceof DefaultMessageFormatter);
assertTrue(config.formatterArgs().containsKey("key.deserializer.my-props"));
DefaultMessageFormatter formatter = (DefaultMessageFormatter) config.formatter();
assertTrue(formatter.keyDeserializer().isPresent());
assertTrue(formatter.keyDeserializer().get() instanceof MockDeserializer);
MockDeserializer keyDeserializer = (MockDeserializer) formatter.keyDeserializer().get();
assertEquals(1, keyDeserializer.configs.size());
assertEquals("abc", keyDeserializer.configs.get("my-props"));
assertTrue(keyDeserializer.isKey);
}
@Test
public void testCustomConfigShouldBePassedToConfigureMethod() throws Exception {
Map<String, String> configs = new HashMap<>();
configs.put("key.deserializer.my-props", "abc");
configs.put("print.key", "false");
File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--property", "print.key=true",
"--property", "key.deserializer=org.apache.kafka.test.MockDeserializer",
"--formatter-config", propsFile.getAbsolutePath()
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
assertTrue(config.formatter() instanceof DefaultMessageFormatter);
assertTrue(config.formatterArgs().containsKey("key.deserializer.my-props"));
DefaultMessageFormatter formatter = (DefaultMessageFormatter) config.formatter();
assertTrue(formatter.keyDeserializer().isPresent());
assertTrue(formatter.keyDeserializer().get() instanceof MockDeserializer);
MockDeserializer keyDeserializer = (MockDeserializer) formatter.keyDeserializer().get();
assertEquals(1, keyDeserializer.configs.size());
assertEquals("abc", keyDeserializer.configs.get("my-props"));
assertTrue(keyDeserializer.isKey);
}
@Test
public void shouldParseGroupIdFromBeginningGivenTogether() throws IOException {
// Start from earliest
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "test-group",
"--from-beginning"
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("test", config.topicArg());
assertEquals(-2, config.offsetArg());
assertTrue(config.fromBeginning());
// Start from latest
args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "test-group"
};
config = new ConsoleConsumerOptions(args);
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("test", config.topicArg());
assertEquals(-1, config.offsetArg());
assertFalse(config.fromBeginning());
}
@Test
public void shouldExitOnGroupIdAndPartitionGivenTogether() {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "test-group",
"--partition", "0"
};
try {
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
} finally {
Exit.resetExitProcedure();
}
}
@Test
public void shouldExitOnOffsetWithoutPartition() {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--offset", "10"
};
try {
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
} finally {
Exit.resetExitProcedure();
}
}
@Test
public void shouldExitIfNoTopicOrFilterSpecified() {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});
String[] args = new String[]{
"--bootstrap-server", "localhost:9092"
};
try {
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
} finally {
Exit.resetExitProcedure();
}
}
@Test
public void shouldExitIfTopicAndIncludeSpecified() {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--include", "includeTest*"
};
try {
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
} finally {
Exit.resetExitProcedure();
}
}
@Test
public void shouldExitIfTopicAndWhitelistSpecified() {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--whitelist", "whitelistTest*"
};
try {
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
} finally {
Exit.resetExitProcedure();
}
}
@Test
public void testClientIdOverride() throws IOException {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--from-beginning",
"--consumer-property", "client.id=consumer-1"
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
Properties consumerProperties = config.consumerProps();
assertEquals("consumer-1", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
}
@Test
public void testDefaultClientId() throws IOException {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--from-beginning"
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
Properties consumerProperties = config.consumerProps();
assertEquals("console-consumer", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
}
@Test
public void testParseOffset() throws Exception {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});
try {
final String[] badOffset = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--partition", "0",
"--offset", "bad"
};
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(badOffset));
final String[] negativeOffset = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--partition", "0",
"--offset", "-100"
};
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(negativeOffset));
final String[] earliestOffset = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--partition", "0",
"--offset", "earliest"
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(earliestOffset);
assertEquals(ListOffsetsRequest.EARLIEST_TIMESTAMP, config.offsetArg());
} finally {
Exit.resetExitProcedure();
}
}
}

View File

@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools.consumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.MockTime;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.PrintStream;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ConsoleConsumerTest {
@BeforeEach
public void setup() {
ConsoleConsumer.messageCount = 0;
}
@Test
public void shouldThrowTimeoutExceptionWhenTimeoutIsReached() {
String topic = "test";
final Time time = new MockTime();
final int timeoutMs = 1000;
@SuppressWarnings("unchecked")
Consumer<byte[], byte[]> mockConsumer = mock(Consumer.class);
when(mockConsumer.poll(Duration.ofMillis(timeoutMs))).thenAnswer(invocation -> {
time.sleep(timeoutMs / 2 + 1);
return ConsumerRecords.EMPTY;
});
ConsoleConsumer.ConsumerWrapper consumer = new ConsoleConsumer.ConsumerWrapper(
Optional.of(topic),
OptionalInt.empty(),
OptionalLong.empty(),
Optional.empty(),
mockConsumer,
timeoutMs
);
assertThrows(TimeoutException.class, consumer::receive);
}
@Test
public void shouldResetUnConsumedOffsetsBeforeExit() {
String topic = "test";
int maxMessages = 123;
int totalMessages = 700;
long startOffset = 0L;
MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
TopicPartition tp1 = new TopicPartition(topic, 0);
TopicPartition tp2 = new TopicPartition(topic, 1);
ConsoleConsumer.ConsumerWrapper consumer = new ConsoleConsumer.ConsumerWrapper(
Optional.of(topic),
OptionalInt.empty(),
OptionalLong.empty(),
Optional.empty(),
mockConsumer,
1000L);
mockConsumer.rebalance(Arrays.asList(tp1, tp2));
Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.put(tp1, startOffset);
offsets.put(tp2, startOffset);
mockConsumer.updateBeginningOffsets(offsets);
for (int i = 0; i < totalMessages; i++) {
// add all records, each partition should have half of `totalMessages`
mockConsumer.addRecord(new ConsumerRecord<>(topic, i % 2, i / 2, "key".getBytes(), "value".getBytes()));
}
MessageFormatter formatter = mock(MessageFormatter.class);
ConsoleConsumer.process(maxMessages, formatter, consumer, System.out, false);
assertEquals(totalMessages, mockConsumer.position(tp1) + mockConsumer.position(tp2));
consumer.resetUnconsumedOffsets();
assertEquals(maxMessages, mockConsumer.position(tp1) + mockConsumer.position(tp2));
verify(formatter, times(maxMessages)).writeTo(any(), any());
consumer.cleanup();
}
@Test
public void shouldLimitReadsToMaxMessageLimit() {
ConsoleConsumer.ConsumerWrapper consumer = mock(ConsoleConsumer.ConsumerWrapper.class);
MessageFormatter formatter = mock(MessageFormatter.class);
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("foo", 1, 1, new byte[0], new byte[0]);
int messageLimit = 10;
when(consumer.receive()).thenReturn(record);
ConsoleConsumer.process(messageLimit, formatter, consumer, System.out, true);
verify(consumer, times(messageLimit)).receive();
verify(formatter, times(messageLimit)).writeTo(any(), any());
consumer.cleanup();
}
@Test
public void shouldStopWhenOutputCheckErrorFails() {
ConsoleConsumer.ConsumerWrapper consumer = mock(ConsoleConsumer.ConsumerWrapper.class);
MessageFormatter formatter = mock(MessageFormatter.class);
PrintStream printStream = mock(PrintStream.class);
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("foo", 1, 1, new byte[0], new byte[0]);
when(consumer.receive()).thenReturn(record);
//Simulate an error on System.out after the first record has been printed
when(printStream.checkError()).thenReturn(true);
ConsoleConsumer.process(-1, formatter, consumer, printStream, true);
verify(formatter).writeTo(any(), eq(printStream));
verify(consumer).receive();
verify(printStream).checkError();
consumer.cleanup();
}
@Test
@SuppressWarnings("unchecked")
public void shouldSeekWhenOffsetIsSet() {
Consumer<byte[], byte[]> mockConsumer = mock(Consumer.class);
TopicPartition tp0 = new TopicPartition("test", 0);
ConsoleConsumer.ConsumerWrapper consumer = new ConsoleConsumer.ConsumerWrapper(
Optional.of(tp0.topic()),
OptionalInt.of(tp0.partition()),
OptionalLong.empty(),
Optional.empty(),
mockConsumer,
1000L);
verify(mockConsumer).assign(eq(Collections.singletonList(tp0)));
verify(mockConsumer).seekToEnd(eq(Collections.singletonList(tp0)));
consumer.cleanup();
reset(mockConsumer);
consumer = new ConsoleConsumer.ConsumerWrapper(
Optional.of(tp0.topic()),
OptionalInt.of(tp0.partition()),
OptionalLong.of(123L),
Optional.empty(),
mockConsumer,
1000L);
verify(mockConsumer).assign(eq(Collections.singletonList(tp0)));
verify(mockConsumer).seek(eq(tp0), eq(123L));
consumer.cleanup();
reset(mockConsumer);
consumer = new ConsoleConsumer.ConsumerWrapper(
Optional.of(tp0.topic()),
OptionalInt.of(tp0.partition()),
OptionalLong.of(ListOffsetsRequest.EARLIEST_TIMESTAMP),
Optional.empty(),
mockConsumer,
1000L);
verify(mockConsumer).assign(eq(Collections.singletonList(tp0)));
verify(mockConsumer).seekToBeginning(eq(Collections.singletonList(tp0)));
consumer.cleanup();
reset(mockConsumer);
}
}

View File

@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class DefaultMessageFormatterTest {
@Test
public void testDefaultMessageFormatter() {
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("topic", 0, 123, "key".getBytes(), "value".getBytes());
MessageFormatter formatter = new DefaultMessageFormatter();
Map<String, String> configs = new HashMap<>();
formatter.configure(configs);
ByteArrayOutputStream out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out));
assertEquals("value\n", out.toString());
configs.put("print.key", "true");
formatter.configure(configs);
out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out));
assertEquals("key\tvalue\n", out.toString());
configs.put("print.partition", "true");
formatter.configure(configs);
out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out));
assertEquals("Partition:0\tkey\tvalue\n", out.toString());
configs.put("print.timestamp", "true");
formatter.configure(configs);
out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out));
assertEquals("NO_TIMESTAMP\tPartition:0\tkey\tvalue\n", out.toString());
configs.put("print.offset", "true");
formatter.configure(configs);
out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out));
assertEquals("NO_TIMESTAMP\tPartition:0\tOffset:123\tkey\tvalue\n", out.toString());
configs.put("print.headers", "true");
formatter.configure(configs);
out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out));
assertEquals("NO_TIMESTAMP\tPartition:0\tOffset:123\tNO_HEADERS\tkey\tvalue\n", out.toString());
RecordHeaders headers = new RecordHeaders();
headers.add("h1", "v1".getBytes());
headers.add("h2", "v2".getBytes());
record = new ConsumerRecord<>("topic", 0, 123, 123L, TimestampType.CREATE_TIME, -1, -1, "key".getBytes(), "value".getBytes(),
headers, Optional.empty());
out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out));
assertEquals("CreateTime:123\tPartition:0\tOffset:123\th1:v1,h2:v2\tkey\tvalue\n", out.toString());
configs.put("print.value", "false");
formatter.configure(configs);
out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out));
assertEquals("CreateTime:123\tPartition:0\tOffset:123\th1:v1,h2:v2\tkey\n", out.toString());
configs.put("key.separator", "<sep>");
formatter.configure(configs);
out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out));
assertEquals("CreateTime:123<sep>Partition:0<sep>Offset:123<sep>h1:v1,h2:v2<sep>key\n", out.toString());
configs.put("line.separator", "<end>");
formatter.configure(configs);
out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out));
assertEquals("CreateTime:123<sep>Partition:0<sep>Offset:123<sep>h1:v1,h2:v2<sep>key<end>", out.toString());
configs.put("headers.separator", "|");
formatter.configure(configs);
out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out));
assertEquals("CreateTime:123<sep>Partition:0<sep>Offset:123<sep>h1:v1|h2:v2<sep>key<end>", out.toString());
record = new ConsumerRecord<>("topic", 0, 123, 123L, TimestampType.CREATE_TIME, -1, -1, "key".getBytes(), "value".getBytes(),
headers, Optional.empty());
configs.put("key.deserializer", UpperCaseDeserializer.class.getName());
formatter.configure(configs);
out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out));
assertEquals("CreateTime:123<sep>Partition:0<sep>Offset:123<sep>h1:v1|h2:v2<sep>KEY<end>", out.toString());
configs.put("headers.deserializer", UpperCaseDeserializer.class.getName());
formatter.configure(configs);
out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out));
assertEquals("CreateTime:123<sep>Partition:0<sep>Offset:123<sep>h1:V1|h2:V2<sep>KEY<end>", out.toString());
record = new ConsumerRecord<>("topic", 0, 123, 123L, TimestampType.CREATE_TIME, -1, -1, "key".getBytes(), null,
headers, Optional.empty());
configs.put("print.value", "true");
configs.put("null.literal", "<null>");
formatter.configure(configs);
out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out));
assertEquals("CreateTime:123<sep>Partition:0<sep>Offset:123<sep>h1:V1|h2:V2<sep>KEY<sep><null><end>", out.toString());
formatter.close();
}
static class UpperCaseDeserializer implements Deserializer<String> {
@Override
public String deserialize(String topic, byte[] data) {
return new String(data, StandardCharsets.UTF_8).toUpperCase(Locale.ROOT);
}
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MessageFormatter;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.HashMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class NoOpMessageFormatterTest {
@Test
public void testNoOpMessageFormatter() {
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("topic", 0, 123, "key".getBytes(), "value".getBytes());
try (MessageFormatter formatter = new NoOpMessageFormatter()) {
formatter.configure(new HashMap<>());
ByteArrayOutputStream out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out));
assertEquals("", out.toString());
}
}
}