From 8cfd6312644260717d67fd4dd87dc6794fde52bb Mon Sep 17 00:00:00 2001 From: ShivsundarR Date: Fri, 16 Aug 2024 15:26:54 +0530 Subject: [PATCH] KAFKA-16723 : Added kafka-console-share-consumer.sh tool. (#16860) Added kafka-console-share-consumer.sh which will start a share consumer on a share group. This tool helps to read data from Kafka topics using share groups and outputs it to standard output. Reviewers: Andrew Schofield , Apoorv Mittal , Manikumar Reddy --- bin/kafka-console-share-consumer.sh | 21 ++ bin/windows/kafka-console-share-consumer.bat | 20 ++ .../tools/consumer/ConsoleShareConsumer.java | 186 ++++++++++++ .../consumer/ConsoleShareConsumerOptions.java | 268 +++++++++++++++++ .../ConsoleShareConsumerOptionsTest.java | 271 ++++++++++++++++++ .../consumer/ConsoleShareConsumerTest.java | 134 +++++++++ 6 files changed, 900 insertions(+) create mode 100755 bin/kafka-console-share-consumer.sh create mode 100644 bin/windows/kafka-console-share-consumer.bat create mode 100644 tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java create mode 100644 tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptions.java create mode 100644 tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptionsTest.java create mode 100644 tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerTest.java diff --git a/bin/kafka-console-share-consumer.sh b/bin/kafka-console-share-consumer.sh new file mode 100755 index 00000000000..6b544d091c2 --- /dev/null +++ b/bin/kafka-console-share-consumer.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then + export KAFKA_HEAP_OPTS="-Xmx512M" +fi + +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.consumer.ConsoleShareConsumer "$@" \ No newline at end of file diff --git a/bin/windows/kafka-console-share-consumer.bat b/bin/windows/kafka-console-share-consumer.bat new file mode 100644 index 00000000000..c2a0cdce931 --- /dev/null +++ b/bin/windows/kafka-console-share-consumer.bat @@ -0,0 +1,20 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +SetLocal +set KAFKA_HEAP_OPTS=-Xmx512M +"%~dp0kafka-run-class.bat" org.apache.kafka.tools.consumer.ConsoleShareConsumer %* +EndLocal diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java new file mode 100644 index 00000000000..1aaa7af5b84 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.AcknowledgeType; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaShareConsumer; +import org.apache.kafka.clients.consumer.ShareConsumer; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Time; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.PrintStream; +import java.time.Duration; +import java.util.Collections; +import java.util.Iterator; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; + + +/** + * Share Consumer that dumps messages to standard out. + */ +public class ConsoleShareConsumer { + + private static final Logger LOG = LoggerFactory.getLogger(ConsoleShareConsumer.class); + private static final CountDownLatch SHUTDOWN_LATCH = new CountDownLatch(1); + + static int messageCount = 0; + + public static void main(String[] args) throws Exception { + ConsoleShareConsumerOptions opts = new ConsoleShareConsumerOptions(args); + try { + run(opts); + } catch (AuthenticationException ae) { + LOG.error("Authentication failed: terminating consumer process", ae); + Exit.exit(1); + } catch (Throwable t) { + LOG.error("Unknown error when running consumer: ", t); + Exit.exit(1); + } + } + + public static void run(ConsoleShareConsumerOptions opts) { + messageCount = 0; + long timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : Long.MAX_VALUE; + + ShareConsumer consumer = new KafkaShareConsumer<>(opts.consumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer()); + ConsumerWrapper consumerWrapper = new ConsumerWrapper(opts.topicArg(), consumer, timeoutMs); + + addShutdownHook(consumerWrapper); + + try { + process(opts.maxMessages(), opts.formatter(), consumerWrapper, System.out, opts.rejectMessageOnError(), opts.acknowledgeType()); + } finally { + consumerWrapper.cleanup(); + opts.formatter().close(); + reportRecordCount(); + + SHUTDOWN_LATCH.countDown(); + } + } + + private static void addShutdownHook(ConsumerWrapper consumer) { + Exit.addShutdownHook("consumer-shutdown-hook", () -> { + try { + consumer.wakeup(); + SHUTDOWN_LATCH.await(); + } catch (Throwable t) { + LOG.error("Exception while running shutdown hook: ", t); + } + }); + } + + static void process(int maxMessages, MessageFormatter formatter, ConsumerWrapper consumer, PrintStream output, + boolean rejectMessageOnError, AcknowledgeType acknowledgeType) { + while (messageCount < maxMessages || maxMessages == -1) { + ConsumerRecord msg; + try { + msg = consumer.receive(); + } catch (WakeupException we) { + LOG.trace("Caught WakeupException because consumer is shutdown, ignore and terminate."); + // Consumer will be closed + return; + } catch (Throwable t) { + LOG.error("Error processing message, terminating consumer process: ", t); + // Consumer will be closed + return; + } + messageCount += 1; + try { + formatter.writeTo(new ConsumerRecord<>(msg.topic(), msg.partition(), msg.offset(), msg.timestamp(), msg.timestampType(), + 0, 0, msg.key(), msg.value(), msg.headers(), Optional.empty()), output); + consumer.acknowledge(msg, acknowledgeType); + } catch (Throwable t) { + if (rejectMessageOnError) { + LOG.error("Error processing message, rejecting this message: ", t); + consumer.acknowledge(msg, AcknowledgeType.REJECT); + } else { + // Consumer will be closed + throw t; + } + } + if (checkErr(output)) { + // Consumer will be closed + return; + } + } + } + + private static void reportRecordCount() { + System.err.println("Processed a total of " + messageCount + " messages"); + } + + private static boolean checkErr(PrintStream output) { + boolean gotError = output.checkError(); + if (gotError) { + // This means no one is listening to our output stream anymore, time to shut down + System.err.println("Unable to write to standard out, closing consumer."); + } + return gotError; + } + + public static class ConsumerWrapper { + final String topic; + final ShareConsumer consumer; + final long timeoutMs; + final Time time = Time.SYSTEM; + + Iterator> recordIter = Collections.emptyIterator(); + + public ConsumerWrapper(String topic, + ShareConsumer consumer, + long timeoutMs) { + this.topic = topic; + this.consumer = consumer; + this.timeoutMs = timeoutMs; + + consumer.subscribe(Collections.singletonList(topic)); + } + + ConsumerRecord receive() { + long startTimeMs = time.milliseconds(); + while (!recordIter.hasNext()) { + recordIter = consumer.poll(Duration.ofMillis(timeoutMs)).iterator(); + if (!recordIter.hasNext() && (time.milliseconds() - startTimeMs > timeoutMs)) { + throw new TimeoutException(); + } + } + return recordIter.next(); + } + + void acknowledge(ConsumerRecord record, AcknowledgeType acknowledgeType) { + consumer.acknowledge(record, acknowledgeType); + } + + void wakeup() { + this.consumer.wakeup(); + } + + void cleanup() { + this.consumer.close(); + } + } +} diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptions.java new file mode 100644 index 00000000000..2152bd701de --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptions.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.AcknowledgeType; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; + +import joptsimple.OptionException; +import joptsimple.OptionSpec; + +public final class ConsoleShareConsumerOptions extends CommandDefaultOptions { + private final OptionSpec messageFormatterOpt; + private final OptionSpec messageFormatterConfigOpt; + private final OptionSpec messageFormatterArgOpt; + private final OptionSpec keyDeserializerOpt; + private final OptionSpec valueDeserializerOpt; + private final OptionSpec maxMessagesOpt; + private final OptionSpec rejectMessageOnErrorOpt; + private final OptionSpec rejectOpt; + private final OptionSpec releaseOpt; + private final OptionSpec topicOpt; + private final OptionSpec timeoutMsOpt; + private final OptionSpec bootstrapServerOpt; + private final OptionSpec groupIdOpt; + private final Properties consumerProps; + private final MessageFormatter formatter; + + public ConsoleShareConsumerOptions(String[] args) throws IOException { + super(args); + topicOpt = parser.accepts("topic", "The topic to consume from.") + .withRequiredArg() + .describedAs("topic") + .ofType(String.class); + OptionSpec consumerPropertyOpt = parser.accepts("consumer-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.") + .withRequiredArg() + .describedAs("consumer_prop") + .ofType(String.class); + OptionSpec consumerConfigOpt = parser.accepts("consumer-config", "Consumer config properties file. Note that " + consumerPropertyOpt + " takes precedence over this config.") + .withRequiredArg() + .describedAs("config file") + .ofType(String.class); + messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting Kafka messages for display.") + .withRequiredArg() + .describedAs("class") + .ofType(String.class) + .defaultsTo(DefaultMessageFormatter.class.getName()); + messageFormatterArgOpt = parser.accepts("property", + "The properties to initialize the message formatter. Default properties include: \n" + + " print.timestamp=true|false\n" + + " print.key=true|false\n" + + " print.offset=true|false\n" + + " print.partition=true|false\n" + + " print.headers=true|false\n" + + " print.value=true|false\n" + + " key.separator=\n" + + " line.separator=\n" + + " headers.separator=\n" + + " null.literal=\n" + + " key.deserializer=\n" + + " value.deserializer=\n" + + " header.deserializer=\n" + + "\nUsers can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers.") + .withRequiredArg() + .describedAs("prop") + .ofType(String.class); + messageFormatterConfigOpt = parser.accepts("formatter-config", "Config properties file to initialize the message formatter. Note that " + messageFormatterArgOpt + " takes precedence over this config.") + .withRequiredArg() + .describedAs("config file") + .ofType(String.class); + maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.") + .withRequiredArg() + .describedAs("num_messages") + .ofType(Integer.class); + timeoutMsOpt = parser.accepts("timeout-ms", "If specified, exit if no message is available for consumption for the specified interval.") + .withRequiredArg() + .describedAs("timeout_ms") + .ofType(Integer.class); + rejectOpt = parser.accepts("reject", "If specified, messages are rejected as they are consumed."); + releaseOpt = parser.accepts("release", "If specified, messages are released as they are consumed."); + rejectMessageOnErrorOpt = parser.accepts("reject-message-on-error", "If there is an error when processing a message, " + + "reject it instead of halting."); + bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The server(s) to connect to.") + .withRequiredArg() + .describedAs("server to connect to") + .ofType(String.class); + keyDeserializerOpt = parser.accepts("key-deserializer", "The name of the class to use for deserializing keys.") + .withRequiredArg() + .describedAs("deserializer for key") + .ofType(String.class); + valueDeserializerOpt = parser.accepts("value-deserializer", "The name of the class to use for deserializing values.") + .withRequiredArg() + .describedAs("deserializer for values") + .ofType(String.class); + groupIdOpt = parser.accepts("group", "The share group id of the consumer.") + .withRequiredArg() + .describedAs("share group id") + .ofType(String.class); + + try { + options = parser.parse(args); + } catch (OptionException oe) { + CommandLineUtils.printUsageAndExit(parser, oe.getMessage()); + } + + CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read data from Kafka topics using share groups and outputs it to standard output."); + + checkRequiredArgs(); + + if (options.has(rejectOpt) && options.has(releaseOpt)) { + CommandLineUtils.printUsageAndExit(parser, "At most one of --reject and --release may be specified."); + } + + Properties consumerPropsFromFile = options.has(consumerConfigOpt) + ? Utils.loadProps(options.valueOf(consumerConfigOpt)) + : new Properties(); + Properties extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt)); + + Set groupIdsProvided = checkShareGroup(consumerPropsFromFile, extraConsumerProps); + consumerProps = buildConsumerProps(consumerPropsFromFile, extraConsumerProps, groupIdsProvided); + formatter = buildFormatter(); + } + + private void checkRequiredArgs() { + if (!options.has(topicOpt)) { + CommandLineUtils.printUsageAndExit(parser, "--topic is a required argument"); + } + CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt); + } + + private Set checkShareGroup(Properties consumerPropsFromFile, Properties extraConsumerProps) { + // if the group id is provided in more than place (through different means) all values must be the same + Set groupIdsProvided = new HashSet<>(); + if (options.has(groupIdOpt)) { + groupIdsProvided.add(options.valueOf(groupIdOpt)); + } + + if (consumerPropsFromFile.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { + groupIdsProvided.add((String) consumerPropsFromFile.get(ConsumerConfig.GROUP_ID_CONFIG)); + } + + if (extraConsumerProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { + groupIdsProvided.add(extraConsumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG)); + } + // The default value for group.id is "console-share-consumer" + if (groupIdsProvided.isEmpty()) { + groupIdsProvided.add("console-share-consumer"); + } else if (groupIdsProvided.size() > 1) { + CommandLineUtils.printUsageAndExit(parser, "The group ids provided in different places (directly using '--group', " + + "via '--consumer-property', or via '--consumer-config') do not match. " + + "Detected group ids: " + + groupIdsProvided.stream().map(group -> "'" + group + "'").collect(Collectors.joining(", "))); + } + return groupIdsProvided; + } + + private Properties buildConsumerProps(Properties consumerPropsFromFile, Properties extraConsumerProps, Set groupIdsProvided) { + Properties consumerProps = new Properties(consumerPropsFromFile); + consumerProps.putAll(extraConsumerProps); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer()); + if (consumerProps.getProperty(ConsumerConfig.CLIENT_ID_CONFIG) == null) { + consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "console-share-consumer"); + } + + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdsProvided.iterator().next()); + return consumerProps; + } + + private MessageFormatter buildFormatter() { + MessageFormatter formatter = null; + try { + Class messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)); + formatter = (MessageFormatter) messageFormatterClass.getDeclaredConstructor().newInstance(); + + Properties formatterArgs = formatterArgs(); + Map formatterConfigs = new HashMap<>(); + for (final String name : formatterArgs.stringPropertyNames()) { + formatterConfigs.put(name, formatterArgs.getProperty(name)); + } + + formatter.configure(formatterConfigs); + + } catch (Exception e) { + CommandLineUtils.printUsageAndExit(parser, e.getMessage()); + } + return formatter; + } + + Properties consumerProps() { + return consumerProps; + } + + boolean rejectMessageOnError() { + return options.has(rejectMessageOnErrorOpt); + } + + AcknowledgeType acknowledgeType() { + if (options.has(rejectOpt)) { + return AcknowledgeType.REJECT; + } else if (options.has(releaseOpt)) { + return AcknowledgeType.RELEASE; + } else { + return AcknowledgeType.ACCEPT; + } + } + + String topicArg() { + return options.valueOf(topicOpt); + } + + int maxMessages() { + return options.has(maxMessagesOpt) ? options.valueOf(maxMessagesOpt) : -1; + } + + int timeoutMs() { + return options.has(timeoutMsOpt) ? options.valueOf(timeoutMsOpt) : -1; + } + + String bootstrapServer() { + return options.valueOf(bootstrapServerOpt); + } + + Properties formatterArgs() throws IOException { + Properties formatterArgs = options.has(messageFormatterConfigOpt) + ? Utils.loadProps(options.valueOf(messageFormatterConfigOpt)) + : new Properties(); + String keyDeserializer = options.valueOf(keyDeserializerOpt); + if (keyDeserializer != null && !keyDeserializer.isEmpty()) { + formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer); + } + String valueDeserializer = options.valueOf(valueDeserializerOpt); + if (valueDeserializer != null && !valueDeserializer.isEmpty()) { + formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer); + } + formatterArgs.putAll(CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt))); + + return formatterArgs; + } + + MessageFormatter formatter() { + return formatter; + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptionsTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptionsTest.java new file mode 100644 index 00000000000..31db9bd82ab --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptionsTest.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.AcknowledgeType; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.tools.ToolsTestUtils; + +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class ConsoleShareConsumerOptionsTest { + + @Test + public void shouldParseValidConsumerValidConfig() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + }; + + ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args); + + assertEquals("localhost:9092", config.bootstrapServer()); + assertEquals("test", config.topicArg()); + assertFalse(config.rejectMessageOnError()); + assertEquals(-1, config.maxMessages()); + assertEquals(-1, config.timeoutMs()); + } + + @Test + public void shouldExitOnUnrecognizedNewConsumerOption() { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + String[] args = new String[]{ + "--new-consumer", + "--bootstrap-server", "localhost:9092", + "--topic", "test" + }; + + try { + assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args)); + } finally { + Exit.resetExitProcedure(); + } + } + + @Test + public void shouldParseValidConsumerConfigWithSessionTimeout() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--consumer-property", "session.timeout.ms=10000" + }; + + ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args); + Properties consumerProperties = config.consumerProps(); + + assertEquals("localhost:9092", config.bootstrapServer()); + assertEquals("test", config.topicArg()); + assertEquals("10000", consumerProperties.getProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG)); + } + + @Test + public void shouldParseConfigsFromFile() throws IOException { + Map configs = new HashMap<>(); + configs.put("request.timeout.ms", "1000"); + configs.put("group.id", "group1"); + File propsFile = ToolsTestUtils.tempPropertiesFile(configs); + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--consumer-config", propsFile.getAbsolutePath() + }; + + ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args); + + assertEquals("1000", config.consumerProps().getProperty("request.timeout.ms")); + assertEquals("group1", config.consumerProps().getProperty("group.id")); + } + + @Test + public void groupIdsProvidedInDifferentPlacesMustMatch() throws IOException { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + // different in all three places + File propsFile = ToolsTestUtils.tempPropertiesFile(Collections.singletonMap("group.id", "group-from-file")); + final String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments", + "--consumer-property", "group.id=group-from-properties", + "--consumer-config", propsFile.getAbsolutePath() + }; + + assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args)); + + // the same in all three places + propsFile = ToolsTestUtils.tempPropertiesFile(Collections.singletonMap("group.id", "test-group")); + final String[] args1 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "test-group", + "--consumer-property", "group.id=test-group", + "--consumer-config", propsFile.getAbsolutePath() + }; + + ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args1); + Properties props = config.consumerProps(); + assertEquals("test-group", props.getProperty("group.id")); + + // different via --consumer-property and --consumer-config + propsFile = ToolsTestUtils.tempPropertiesFile(Collections.singletonMap("group.id", "group-from-file")); + final String[] args2 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--consumer-property", "group.id=group-from-properties", + "--consumer-config", propsFile.getAbsolutePath() + }; + + assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args2)); + + // different via --consumer-property and --group + final String[] args3 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments", + "--consumer-property", "group.id=group-from-properties" + }; + + assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args3)); + + // different via --group and --consumer-config + propsFile = ToolsTestUtils.tempPropertiesFile(Collections.singletonMap("group.id", "group-from-file")); + final String[] args4 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments", + "--consumer-config", propsFile.getAbsolutePath() + }; + assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args4)); + + // via --group only + final String[] args5 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments" + }; + + config = new ConsoleShareConsumerOptions(args5); + props = config.consumerProps(); + assertEquals("group-from-arguments", props.getProperty("group.id")); + + Exit.resetExitProcedure(); + } + + @Test + public void shouldExitIfNoTopicSpecified() { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092" + }; + + try { + assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args)); + } finally { + Exit.resetExitProcedure(); + } + } + + @Test + public void testClientIdOverride() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--consumer-property", "client.id=consumer-1" + }; + + ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args); + Properties consumerProperties = config.consumerProps(); + + assertEquals("consumer-1", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); + } + + @Test + public void testDefaultClientId() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + }; + + ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args); + Properties consumerProperties = config.consumerProps(); + + assertEquals("console-share-consumer", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); + } + + @Test + public void testRejectOption() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--reject" + }; + + ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args); + assertEquals(AcknowledgeType.REJECT, config.acknowledgeType()); + } + + @Test + public void testReleaseOption() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--release" + }; + + ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args); + assertEquals(AcknowledgeType.RELEASE, config.acknowledgeType()); + } + + @Test + public void testRejectAndReleaseOption() throws IOException { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--reject", + "--release" + }; + + try { + assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args)); + } finally { + Exit.resetExitProcedure(); + } + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerTest.java new file mode 100644 index 00000000000..5e3f7a16db5 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.AcknowledgeType; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.ShareConsumer; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.util.MockTime; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.PrintStream; +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class ConsoleShareConsumerTest { + + @BeforeEach + public void setup() { + ConsoleShareConsumer.messageCount = 0; + } + + @Test + public void shouldThrowTimeoutExceptionWhenTimeoutIsReached() { + String topic = "test"; + final Time time = new MockTime(); + final int timeoutMs = 1000; + + @SuppressWarnings("unchecked") + ShareConsumer mockConsumer = mock(ShareConsumer.class); + + when(mockConsumer.poll(Duration.ofMillis(timeoutMs))).thenAnswer(invocation -> { + time.sleep(timeoutMs / 2 + 1); + return ConsumerRecords.EMPTY; + }); + + ConsoleShareConsumer.ConsumerWrapper consumer = new ConsoleShareConsumer.ConsumerWrapper( + topic, + mockConsumer, + timeoutMs + ); + + assertThrows(TimeoutException.class, consumer::receive); + } + + @Test + public void shouldLimitReadsToMaxMessageLimit() { + ConsoleShareConsumer.ConsumerWrapper consumer = mock(ConsoleShareConsumer.ConsumerWrapper.class); + MessageFormatter formatter = mock(MessageFormatter.class); + ConsumerRecord record = new ConsumerRecord<>("foo", 1, 1, new byte[0], new byte[0]); + + int messageLimit = 10; + when(consumer.receive()).thenReturn(record); + + ConsoleShareConsumer.process(messageLimit, formatter, consumer, System.out, true, AcknowledgeType.ACCEPT); + + verify(consumer, times(messageLimit)).receive(); + verify(formatter, times(messageLimit)).writeTo(any(), any()); + + consumer.cleanup(); + } + + @Test + public void shouldStopWhenOutputCheckErrorFails() { + ConsoleShareConsumer.ConsumerWrapper consumer = mock(ConsoleShareConsumer.ConsumerWrapper.class); + MessageFormatter formatter = mock(MessageFormatter.class); + PrintStream printStream = mock(PrintStream.class); + + ConsumerRecord record = new ConsumerRecord<>("foo", 1, 1, new byte[0], new byte[0]); + + when(consumer.receive()).thenReturn(record); + //Simulate an error on System.out after the first record has been printed + when(printStream.checkError()).thenReturn(true); + + ConsoleShareConsumer.process(-1, formatter, consumer, printStream, true, AcknowledgeType.ACCEPT); + + verify(formatter).writeTo(any(), eq(printStream)); + verify(consumer).receive(); + verify(printStream).checkError(); + + consumer.cleanup(); + } + + @Test + public void testRejectMessageOnError() { + ConsoleShareConsumer.ConsumerWrapper consumer = mock(ConsoleShareConsumer.ConsumerWrapper.class); + MessageFormatter formatter = mock(MessageFormatter.class); + PrintStream printStream = mock(PrintStream.class); + + ConsumerRecord record = new ConsumerRecord<>("foo", 1, 1, new byte[0], new byte[0]); + + when(consumer.receive()).thenReturn(record); + + //Simulate an error on formatter.writeTo() call + doThrow(new RuntimeException()) + .when(formatter) + .writeTo(any(), any()); + + ConsoleShareConsumer.process(1, formatter, consumer, printStream, true, AcknowledgeType.ACCEPT); + + verify(formatter).writeTo(any(), eq(printStream)); + verify(consumer).receive(); + verify(consumer).acknowledge(record, AcknowledgeType.REJECT); + + consumer.cleanup(); + } +}