From 3bc50f937cb3acd5b814587da5518d8fb5e5ec21 Mon Sep 17 00:00:00 2001 From: Shivsundar R Date: Wed, 17 Sep 2025 10:28:20 -0400 Subject: [PATCH] KAFKA-19623: Implement KIP-1147 for console producer/consumer/share-consumer. (#20479) *What* https://issues.apache.org/jira/browse/KAFKA-19623 - The PR implements KIP-1147 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-1147%3A+Improve+consistency+of+command-line+arguments) for the console tools i.e. `ConsoleProducer`, `ConsoleConsumer` and `ConsoleShareConsumer`. - Currently the previous names for the options are still usable but there will be warning message stating those are deprecated and will be removed in a future version. - I have added unit tests and also manually verified using the console tools that things are working as expected. Reviewers: Andrew Schofield , Jhen-Yung Hsu , Jimmy Wang <48462172+JimmyWang6@users.noreply.github.com> --- docker/examples/README.md | 8 +- docker/test/docker_sanity_test.py | 16 +- docs/security.html | 4 +- .../KStreamAggregationIntegrationTest.java | 14 +- tests/kafkatest/services/console_consumer.py | 19 +- .../services/console_share_consumer.py | 21 +- tests/kafkatest/version.py | 14 + .../apache/kafka/tools/ConsoleProducer.java | 40 +- .../consumer/ConsoleConsumerOptions.java | 71 +++- .../consumer/ConsoleShareConsumerOptions.java | 74 +++- .../kafka/tools/ConsoleProducerTest.java | 113 +++++- .../consumer/ConsoleConsumerOptionsTest.java | 364 ++++++++++++++---- .../ConsoleShareConsumerOptionsTest.java | 359 ++++++++++++++--- 13 files changed, 930 insertions(+), 187 deletions(-) diff --git a/docker/examples/README.md b/docker/examples/README.md index e76247bdb54..162e27c711a 100644 --- a/docker/examples/README.md +++ b/docker/examples/README.md @@ -147,7 +147,7 @@ Single Node - To produce messages using client scripts (Ensure that java version >= 17): ``` # Run from root of the repo - $ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9093 --producer.config ./docker/examples/fixtures/client-secrets/client-ssl.properties + $ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9093 --command-config ./docker/examples/fixtures/client-secrets/client-ssl.properties ``` - File Input: - Here ssl configs are provided via file input. @@ -167,7 +167,7 @@ Single Node - To produce messages using client scripts (Ensure that java version >= 17): ``` # Run from root of the repo - $ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9093 --producer.config ./docker/examples/fixtures/client-secrets/client-ssl.properties + $ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9093 --command-config ./docker/examples/fixtures/client-secrets/client-ssl.properties ``` Multi Node Cluster @@ -219,7 +219,7 @@ Multi Node Cluster - To produce messages using client scripts (Ensure that java version >= 17): ``` # Run from root of the repo - $ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:29093 --producer.config ./docker/examples/fixtures/client-secrets/client-ssl.properties + $ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:29093 --command-config ./docker/examples/fixtures/client-secrets/client-ssl.properties ``` - Isolated: - Examples are present in `docker-compose-files/cluster/isolated` directory. @@ -258,7 +258,7 @@ Multi Node Cluster - To produce messages using client scripts (Ensure that java version >= 17): ``` # Run from root of the repo - $ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:29093 --producer.config ./docker/examples/fixtures/client-secrets/client-ssl.properties + $ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:29093 --command-config ./docker/examples/fixtures/client-secrets/client-ssl.properties ``` - Note that the examples are meant to be tried one at a time, make sure you close an example server before trying out the other to avoid conflicts. diff --git a/docker/test/docker_sanity_test.py b/docker/test/docker_sanity_test.py index d2135fb0295..0d21bf47fee 100644 --- a/docker/test/docker_sanity_test.py +++ b/docker/test/docker_sanity_test.py @@ -65,7 +65,7 @@ class DockerSanityTest(unittest.TestCase): subprocess.run(["bash", "-c", " ".join(command)]) def consume_message(self, topic, consumer_config): - command = [f"{self.FIXTURES_DIR}/{constants.KAFKA_CONSOLE_CONSUMER}", "--topic", topic, "--property", "'print.key=true'", "--property", "'key.separator=:'", "--from-beginning", "--max-messages", "1", "--timeout-ms", f"{constants.CLIENT_TIMEOUT}"] + command = [f"{self.FIXTURES_DIR}/{constants.KAFKA_CONSOLE_CONSUMER}", "--topic", topic, "--formatter-property", "'print.key=true'", "--formatter-property", "'key.separator=:'", "--from-beginning", "--max-messages", "1", "--timeout-ms", f"{constants.CLIENT_TIMEOUT}"] command.extend(consumer_config) message = subprocess.check_output(["bash", "-c", " ".join(command)]) return message.decode("utf-8").strip() @@ -93,9 +93,9 @@ class DockerSanityTest(unittest.TestCase): errors.append(constants.BROKER_METRICS_ERROR_PREFIX + str(e)) return errors - producer_config = ["--bootstrap-server", "localhost:9092", "--property", "client.id=host"] + producer_config = ["--bootstrap-server", "localhost:9092", "--command-property", "client.id=host"] self.produce_message(constants.BROKER_METRICS_TEST_TOPIC, producer_config, "key", "message") - consumer_config = ["--bootstrap-server", "localhost:9092", "--property", "auto.offset.reset=earliest"] + consumer_config = ["--bootstrap-server", "localhost:9092", "--command-property", "auto.offset.reset=earliest"] message = self.consume_message(constants.BROKER_METRICS_TEST_TOPIC, consumer_config) try: self.assertEqual(message, "key:message") @@ -129,13 +129,13 @@ class DockerSanityTest(unittest.TestCase): return errors producer_config = ["--bootstrap-server", ssl_broker_port, - "--producer.config", f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}"] + "--command-config", f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}"] self.produce_message(topic, producer_config, "key", "message") consumer_config = [ "--bootstrap-server", ssl_broker_port, - "--property", "auto.offset.reset=earliest", - "--consumer.config", f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}", + "--command-property", "auto.offset.reset=earliest", + "--command-config", f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}", ] message = self.consume_message(topic, consumer_config) try: @@ -155,7 +155,7 @@ class DockerSanityTest(unittest.TestCase): errors.append(constants.BROKER_RESTART_ERROR_PREFIX + str(e)) return errors - producer_config = ["--bootstrap-server", "localhost:9092", "--property", "client.id=host"] + producer_config = ["--bootstrap-server", "localhost:9092", "--command-property", "client.id=host"] self.produce_message(constants.BROKER_RESTART_TEST_TOPIC, producer_config, "key", "message") print("Stopping Container") @@ -163,7 +163,7 @@ class DockerSanityTest(unittest.TestCase): print("Resuming Container") self.resume_container() - consumer_config = ["--bootstrap-server", "localhost:9092", "--property", "auto.offset.reset=earliest"] + consumer_config = ["--bootstrap-server", "localhost:9092", "--command-property", "auto.offset.reset=earliest"] message = self.consume_message(constants.BROKER_RESTART_TEST_TOPIC, consumer_config) try: self.assertEqual(message, "key:message") diff --git a/docs/security.html b/docs/security.html index 5940fc3cda6..9364a05e40a 100644 --- a/docs/security.html +++ b/docs/security.html @@ -510,8 +510,8 @@ ssl.key.password=test1234
Examples using console-producer and console-consumer: -
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test --producer.config client-ssl.properties
-$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties
+
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test --command-config client-ssl.properties
+$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --command-config client-ssl.properties

7.4 Authentication using SASL

diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index 7037ce1368d..aa283726134 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -1122,15 +1122,15 @@ public class KStreamAggregationIntegrationTest { final String[] args = new String[] { "--bootstrap-server", CLUSTER.bootstrapServers(), "--from-beginning", - "--property", "print.key=true", - "--property", "print.timestamp=" + printTimestamp, + "--formatter-property", "print.key=true", + "--formatter-property", "print.timestamp=" + printTimestamp, "--topic", outputTopic, "--max-messages", String.valueOf(numMessages), - "--property", "key.deserializer=" + keyDeserializer.getClass().getName(), - "--property", "value.deserializer=" + valueDeserializer.getClass().getName(), - "--property", "key.separator=" + keySeparator, - "--property", "key.deserializer." + TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS + "=" + Serdes.serdeFrom(innerClass).getClass().getName(), - "--property", "key.deserializer.window.size.ms=500", + "--formatter-property", "key.deserializer=" + keyDeserializer.getClass().getName(), + "--formatter-property", "value.deserializer=" + valueDeserializer.getClass().getName(), + "--formatter-property", "key.separator=" + keySeparator, + "--formatter-property", "key.deserializer." + TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS + "=" + Serdes.serdeFrom(innerClass).getClass().getName(), + "--formatter-property", "key.deserializer.window.size.ms=500", }; ConsoleConsumer.run(new ConsoleConsumerOptions(args)); diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 9755faa1969..0f95a0f7c79 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -88,7 +88,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) jaas_override_variables A dict of variables to be used in the jaas.conf template file kafka_opts_override Override parameters of the KAFKA_OPTS environment variable client_prop_file_override Override client.properties file used by the consumer - consumer_properties A dict of values to pass in as --consumer-property key=value + consumer_properties A dict of values to pass in as --command-property key=value. For versions older than KAFKA_4_2_0, these will be passed as --consumer-property key=value """ JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []), root=ConsoleConsumer.PERSISTENT_ROOT) @@ -163,8 +163,11 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) "export KAFKA_LOG4J_OPTS=\"%(log4j_param)s%(log4j_config)s\"; " \ "export KAFKA_OPTS=%(kafka_opts)s; " \ "%(console_consumer)s " \ - "--topic %(topic)s " \ - "--consumer.config %(config_file)s " % args + "--topic %(topic)s " % args + + version = get_version(node) + command_config_arg = "--command-config" if version.supports_command_config() else "--consumer.config" + cmd += "%s %s" % (command_config_arg, args['config_file']) cmd += " --bootstrap-server %(broker_list)s" % args cmd += " --isolation-level %s" % self.isolation_level @@ -176,14 +179,15 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) # This will be added in the properties file instead cmd += " --timeout-ms %s" % self.consumer_timeout_ms + formatter_property_arg = "--formatter-property" if version.supports_formatter_property else "--property" if self.print_timestamp: - cmd += " --property print.timestamp=true" + cmd += " %s print.timestamp=true" % formatter_property_arg if self.print_key: - cmd += " --property print.key=true" + cmd += " %s print.key=true" % formatter_property_arg if self.print_partition: - cmd += " --property print.partition=true" + cmd += " %s print.partition=true" % formatter_property_arg # LoggingMessageFormatter was introduced after 0.9 if node.version > LATEST_3_7: @@ -194,9 +198,10 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) if self.enable_systest_events: cmd += " --enable-systest-events" + command_property_arg = "--command-property" if version.supports_command_property() else "--consumer-property" if self.consumer_properties is not None: for k, v in self.consumer_properties.items(): - cmd += " --consumer-property %s=%s" % (k, v) + cmd += " %s %s=%s" % (command_property_arg, k, v) cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args return cmd diff --git a/tests/kafkatest/services/console_share_consumer.py b/tests/kafkatest/services/console_share_consumer.py index 03fbaeaf5a5..2d7da50fe30 100644 --- a/tests/kafkatest/services/console_share_consumer.py +++ b/tests/kafkatest/services/console_share_consumer.py @@ -21,7 +21,7 @@ from ducktape.utils.util import wait_until from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.monitor.jmx import JmxMixin -from kafkatest.version import DEV_BRANCH, LATEST_4_1 +from kafkatest.version import DEV_BRANCH, LATEST_4_1, get_version from kafkatest.services.kafka.util import fix_opts_for_new_jvm, get_log4j_config_param, get_log4j_config_for_tools """ @@ -84,7 +84,7 @@ class ConsoleShareConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadSer jaas_override_variables A dict of variables to be used in the jaas.conf template file kafka_opts_override Override parameters of the KAFKA_OPTS environment variable client_prop_file_override Override client.properties file used by the consumer - share_consumer_properties A dict of values to pass in as --consumer-property key=value + share_consumer_properties A dict of values to pass in as --command-property key=value. For versions older than KAFKA_4_2_0, these will be passed as --consumer-property key=value """ JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []), root=ConsoleShareConsumer.PERSISTENT_ROOT) @@ -156,31 +156,36 @@ class ConsoleShareConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadSer "export KAFKA_LOG4J_OPTS=\"%(log4j_param)s%(log4j_config)s\"; " \ "export KAFKA_OPTS=%(kafka_opts)s; " \ "%(console_share_consumer)s " \ - "--topic %(topic)s " \ - "--consumer-config %(config_file)s " % args + "--topic %(topic)s " % args + + version = get_version(node) + command_config_arg = "--command-config" if version.supports_command_config() else "--consumer-config" + cmd += "%s %s" % (command_config_arg, args['config_file']) cmd += " --bootstrap-server %(broker_list)s" % args if self.share_consumer_timeout_ms is not None: # This will be added in the properties file instead cmd += " --timeout-ms %s" % self.share_consumer_timeout_ms + formatter_property_arg = "--formatter-property" if version.supports_formatter_property else "--property" if self.print_timestamp: - cmd += " --property print.timestamp=true" + cmd += " %s print.timestamp=true" % formatter_property_arg if self.print_key: - cmd += " --property print.key=true" + cmd += " %s print.key=true" % formatter_property_arg if self.print_partition: - cmd += " --property print.partition=true" + cmd += " %s print.partition=true" % formatter_property_arg cmd += " --formatter org.apache.kafka.tools.consumer.LoggingMessageFormatter" if self.enable_systest_events: cmd += " --enable-systest-events" + command_property_arg = "--command-property" if version.supports_command_property() else "--consumer-property" if self.share_consumer_properties is not None: for k, v in self.share_consumer_properties.items(): - cmd += " --consumer-property %s=%s" % (k, v) + cmd += " %s %s=%s" % (command_property_arg, k, v) cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args return cmd diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index e4d600ac403..8114d51861e 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -104,6 +104,20 @@ class KafkaVersion(LooseVersion): # - For older versions, continue using --producer.config or --consumer.config return self >= V_4_2_0 + def supports_command_property(self): + # According to KIP-1147, --producer-property and --consumer-property have been deprecated and will be removed in future versions + # For backward compatibility, we select the configuration based on node version: + # - For versions 4.2.0 and above, use --command-property + # - For older versions, continue using --producer-property or --consumer-property + return self >= V_4_2_0 + + def supports_formatter_property(self): + # According to KIP-1147, --property has been deprecated and will be removed in future versions + # For backward compatibility, we select the configuration based on node version: + # - For versions 4.2.0 and above, use --formatter-property + # - For older versions, continue using --property + return self >= V_4_2_0 + def get_version(node=None): """Return the version attached to the given node. Default to DEV_BRANCH if node or node.version is undefined (aka None) diff --git a/tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java b/tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java index 93e81bd5a43..3e9e34f4210 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java +++ b/tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java @@ -131,8 +131,12 @@ public class ConsoleProducer { private final OptionSpec socketBufferSizeOpt; private final OptionSpec propertyOpt; private final OptionSpec readerConfigOpt; + @Deprecated(since = "4.2", forRemoval = true) private final OptionSpec producerPropertyOpt; + private OptionSpec commandPropertyOpt; + @Deprecated(since = "4.2", forRemoval = true) private final OptionSpec producerConfigOpt; + private OptionSpec commandConfigOpt; public ConsoleProducerOptions(String[] args) { super(args); @@ -250,11 +254,20 @@ public class ConsoleProducer { .withRequiredArg() .describedAs("config file") .ofType(String.class); - producerPropertyOpt = parser.accepts("producer-property", "A mechanism to pass user-defined properties in the form key=value to the producer. ") + producerPropertyOpt = parser.accepts("producer-property", "(DEPRECATED) A mechanism to pass user-defined properties in the form key=value to the producer." + + "This option will be removed in a future version. Use --command-property instead.") .withRequiredArg() .describedAs("producer_prop") .ofType(String.class); - producerConfigOpt = parser.accepts("producer.config", "Producer config properties file. Note that " + producerPropertyOpt + " takes precedence over this config.") + commandPropertyOpt = parser.accepts("command-property", "A mechanism to pass user-defined properties in the form key=value to the producer.") + .withRequiredArg() + .describedAs("producer_prop") + .ofType(String.class); + producerConfigOpt = parser.accepts("producer.config", "(DEPRECATED) Producer config properties file. Note that " + commandPropertyOpt + " takes precedence over this config. This option will be removed in a future version. Use --command-config instead.") + .withRequiredArg() + .describedAs("config file") + .ofType(String.class); + commandConfigOpt = parser.accepts("command-config", "Producer config properties file. Note that " + commandPropertyOpt + " takes precedence over this config.") .withRequiredArg() .describedAs("config file") .ofType(String.class); @@ -273,6 +286,23 @@ public class ConsoleProducer { CommandLineUtils.checkRequiredArgs(parser, options, topicOpt); + if (options.has(commandConfigOpt) && options.has(producerConfigOpt)) { + CommandLineUtils.printUsageAndExit(parser, "Options --command-config and --producer.config cannot be specified together."); + } + if (options.has(commandPropertyOpt) && options.has(producerPropertyOpt)) { + CommandLineUtils.printUsageAndExit(parser, "Options --command-property and --producer-property cannot be specified together."); + } + + if (options.has(producerPropertyOpt)) { + System.out.println("Warning: --producer-property is deprecated and will be removed in a future version. Use --command-property instead."); + commandPropertyOpt = producerPropertyOpt; + } + + if (options.has(producerConfigOpt)) { + System.out.println("Warning: --producer.config is deprecated and will be removed in a future version. Use --command-config instead."); + commandConfigOpt = producerConfigOpt; + } + try { ToolsUtils.validateBootstrapServer(options.valueOf(bootstrapServerOpt)); } catch (IllegalArgumentException e) { @@ -314,11 +344,11 @@ public class ConsoleProducer { Properties producerProps() throws IOException { Properties props = new Properties(); - if (options.has(producerConfigOpt)) { - props.putAll(loadProps(options.valueOf(producerConfigOpt))); + if (options.has(commandConfigOpt)) { + props.putAll(loadProps(options.valueOf(commandConfigOpt))); } - props.putAll(parseKeyValueArgs(options.valuesOf(producerPropertyOpt))); + props.putAll(parseKeyValueArgs(options.valuesOf(commandPropertyOpt))); props.put(BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOpt)); props.put(COMPRESSION_TYPE_CONFIG, compressionCodec()); diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java index fe33bfe6c68..abe6322fd97 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java @@ -48,7 +48,9 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions { private final OptionSpec partitionIdOpt; private final OptionSpec offsetOpt; private final OptionSpec messageFormatterOpt; - private final OptionSpec messageFormatterArgOpt; + @Deprecated(since = "4.2", forRemoval = true) + private final OptionSpec messageFormatterArgOptDeprecated; + private OptionSpec messageFormatterArgOpt; private final OptionSpec messageFormatterConfigOpt; private final OptionSpec resetBeginningOpt; private final OptionSpec maxMessagesOpt; @@ -66,6 +68,7 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions { private final long timeoutMs; private final MessageFormatter formatter; + @SuppressWarnings("deprecation") public ConsoleConsumerOptions(String[] args) throws IOException { super(args); topicOpt = parser.accepts("topic", "The topic to consume on.") @@ -87,11 +90,23 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions { .describedAs("consume offset") .ofType(String.class) .defaultsTo("latest"); - OptionSpec consumerPropertyOpt = parser.accepts("consumer-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.") + @Deprecated(since = "4.2", forRemoval = true) + OptionSpec consumerPropertyOpt = parser.accepts("consumer-property", "(DEPRECATED) A mechanism to pass user-defined properties in the form key=value to the consumer. " + + "This option will be removed in a future version. Use --command-property instead.") .withRequiredArg() .describedAs("consumer_prop") .ofType(String.class); - OptionSpec consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file. Note that " + consumerPropertyOpt + " takes precedence over this config.") + OptionSpec commandPropertyOpt = parser.accepts("command-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.") + .withRequiredArg() + .describedAs("consumer_prop") + .ofType(String.class); + @Deprecated(since = "4.2", forRemoval = true) + OptionSpec consumerConfigOpt = parser.accepts("consumer.config", "(DEPRECATED) Consumer config properties file. Note that " + commandPropertyOpt + " takes precedence over this config. " + + "This option will be removed in a future version. Use --command-config instead.") + .withRequiredArg() + .describedAs("config file") + .ofType(String.class); + OptionSpec commandConfigOpt = parser.accepts("command-config", "Consumer config properties file. Note that " + commandPropertyOpt + " takes precedence over this config.") .withRequiredArg() .describedAs("config file") .ofType(String.class); @@ -100,7 +115,28 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions { .describedAs("class") .ofType(String.class) .defaultsTo(DefaultMessageFormatter.class.getName()); - messageFormatterArgOpt = parser.accepts("property", + messageFormatterArgOptDeprecated = parser.accepts("property", + "(DEPRECATED) The properties to initialize the message formatter. Default properties include: \n" + + " print.timestamp=true|false\n" + + " print.key=true|false\n" + + " print.offset=true|false\n" + + " print.epoch=true|false\n" + + " print.partition=true|false\n" + + " print.headers=true|false\n" + + " print.value=true|false\n" + + " key.separator=\n" + + " line.separator=\n" + + " headers.separator=\n" + + " null.literal=\n" + + " key.deserializer=\n" + + " value.deserializer=\n" + + " header.deserializer=\n" + + "\nUsers can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers. " + + "\nThis option will be removed in a future version. Use --formatter-property instead.") + .withRequiredArg() + .describedAs("prop") + .ofType(String.class); + messageFormatterArgOpt = parser.accepts("formatter-property", "The properties to initialize the message formatter. Default properties include: \n" + " print.timestamp=true|false\n" + " print.key=true|false\n" + @@ -170,11 +206,25 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions { CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read data from Kafka topics and outputs it to standard output."); checkRequiredArgs(); + if (options.has(consumerPropertyOpt) && options.has(commandPropertyOpt)) { + CommandLineUtils.printUsageAndExit(parser, "Options --consumer-property and --command-property cannot be specified together."); + } + if (options.has(consumerConfigOpt) && options.has(commandConfigOpt)) { + CommandLineUtils.printUsageAndExit(parser, "Options --consumer.config and --command-config cannot be specified together."); + } - Properties consumerPropsFromFile = options.has(consumerConfigOpt) - ? Utils.loadProps(options.valueOf(consumerConfigOpt)) + if (options.has(consumerPropertyOpt)) { + System.out.println("Option --consumer-property is deprecated and will be removed in a future version. Use --command-property instead."); + commandPropertyOpt = consumerPropertyOpt; + } + if (options.has(consumerConfigOpt)) { + System.out.println("Option --consumer.config is deprecated and will be removed in a future version. Use --command-config instead."); + commandConfigOpt = consumerConfigOpt; + } + Properties consumerPropsFromFile = options.has(commandConfigOpt) + ? Utils.loadProps(options.valueOf(commandConfigOpt)) : new Properties(); - Properties extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt)); + Properties extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(commandPropertyOpt)); Set groupIdsProvided = checkConsumerGroup(consumerPropsFromFile, extraConsumerProps); consumerProps = buildConsumerProps(consumerPropsFromFile, extraConsumerProps, groupIdsProvided); offset = parseOffset(); @@ -323,6 +373,13 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions { Class messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)); formatter = (MessageFormatter) messageFormatterClass.getDeclaredConstructor().newInstance(); + if (options.has(messageFormatterArgOpt) && options.has(messageFormatterArgOptDeprecated)) { + CommandLineUtils.printUsageAndExit(parser, "Options --property and --formatter-property cannot be specified together."); + } + if (options.has(messageFormatterArgOptDeprecated)) { + System.out.println("Option --property is deprecated and will be removed in a future version. Use --formatter-property instead."); + messageFormatterArgOpt = messageFormatterArgOptDeprecated; + } Properties formatterArgs = formatterArgs(); Map formatterConfigs = new HashMap<>(); for (final String name : formatterArgs.stringPropertyNames()) { diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptions.java index f00407b25eb..3472d07afa5 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptions.java @@ -37,7 +37,9 @@ import joptsimple.OptionSpec; public final class ConsoleShareConsumerOptions extends CommandDefaultOptions { private final OptionSpec messageFormatterOpt; private final OptionSpec messageFormatterConfigOpt; - private final OptionSpec messageFormatterArgOpt; + @Deprecated(since = "4.2", forRemoval = true) + private final OptionSpec messageFormatterArgOptDeprecated; + private OptionSpec messageFormatterArgOpt; private final OptionSpec keyDeserializerOpt; private final OptionSpec valueDeserializerOpt; private final OptionSpec maxMessagesOpt; @@ -52,17 +54,30 @@ public final class ConsoleShareConsumerOptions extends CommandDefaultOptions { private final MessageFormatter formatter; private final OptionSpec enableSystestEventsLoggingOpt; + @SuppressWarnings("deprecation") public ConsoleShareConsumerOptions(String[] args) throws IOException { super(args); topicOpt = parser.accepts("topic", "The topic to consume from.") .withRequiredArg() .describedAs("topic") .ofType(String.class); - OptionSpec consumerPropertyOpt = parser.accepts("consumer-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.") + @Deprecated(since = "4.2", forRemoval = true) + OptionSpec consumerPropertyOpt = parser.accepts("consumer-property", "(DEPRECATED) A mechanism to pass user-defined properties in the form key=value to the consumer. " + + "This option will be removed in a future version. Use --command-property instead.") .withRequiredArg() .describedAs("consumer_prop") .ofType(String.class); - OptionSpec consumerConfigOpt = parser.accepts("consumer-config", "Consumer config properties file. Note that " + consumerPropertyOpt + " takes precedence over this config.") + OptionSpec commandPropertyOpt = parser.accepts("command-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.") + .withRequiredArg() + .describedAs("consumer_prop") + .ofType(String.class); + @Deprecated(since = "4.2", forRemoval = true) + OptionSpec consumerConfigOpt = parser.accepts("consumer-config", "(DEPRECATED) Consumer config properties file. Note that " + commandPropertyOpt + " takes precedence over this config. " + + "This option will be removed in a future version. Use --command-config instead.") + .withRequiredArg() + .describedAs("config file") + .ofType(String.class); + OptionSpec commandConfigOpt = parser.accepts("command-config", "Consumer config properties file. Note that " + commandPropertyOpt + " takes precedence over this config.") .withRequiredArg() .describedAs("config file") .ofType(String.class); @@ -71,7 +86,29 @@ public final class ConsoleShareConsumerOptions extends CommandDefaultOptions { .describedAs("class") .ofType(String.class) .defaultsTo(DefaultMessageFormatter.class.getName()); - messageFormatterArgOpt = parser.accepts("property", + messageFormatterArgOptDeprecated = parser.accepts("property", + "(DEPRECATED) The properties to initialize the message formatter. Default properties include: \n" + + " print.timestamp=true|false\n" + + " print.key=true|false\n" + + " print.offset=true|false\n" + + " print.delivery=true|false\n" + + " print.epoch=true|false\n" + + " print.partition=true|false\n" + + " print.headers=true|false\n" + + " print.value=true|false\n" + + " key.separator=\n" + + " line.separator=\n" + + " headers.separator=\n" + + " null.literal=\n" + + " key.deserializer=\n" + + " value.deserializer=\n" + + " header.deserializer=\n" + + "\nUsers can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers. " + + "\nThis option will be removed in a future version. Use --formatter-property instead.") + .withRequiredArg() + .describedAs("prop") + .ofType(String.class); + messageFormatterArgOpt = parser.accepts("formatter-property", "The properties to initialize the message formatter. Default properties include: \n" + " print.timestamp=true|false\n" + " print.key=true|false\n" + @@ -141,10 +178,26 @@ public final class ConsoleShareConsumerOptions extends CommandDefaultOptions { CommandLineUtils.printUsageAndExit(parser, "At most one of --reject and --release may be specified."); } - Properties consumerPropsFromFile = options.has(consumerConfigOpt) - ? Utils.loadProps(options.valueOf(consumerConfigOpt)) + if (options.has(consumerPropertyOpt) && options.has(commandPropertyOpt)) { + CommandLineUtils.printUsageAndExit(parser, "Options --consumer-property and --command-property cannot be specified together."); + } + if (options.has(consumerConfigOpt) && options.has(commandConfigOpt)) { + CommandLineUtils.printUsageAndExit(parser, "Options --consumer-config and --command-config cannot be specified together."); + } + + if (options.has(consumerPropertyOpt)) { + System.out.println("Option --consumer-property is deprecated and will be removed in a future version. Use --command-property instead."); + commandPropertyOpt = consumerPropertyOpt; + } + if (options.has(consumerConfigOpt)) { + System.out.println("Option --consumer-config is deprecated and will be removed in a future version. Use --command-config instead."); + commandConfigOpt = consumerConfigOpt; + } + + Properties consumerPropsFromFile = options.has(commandConfigOpt) + ? Utils.loadProps(options.valueOf(commandConfigOpt)) : new Properties(); - Properties extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt)); + Properties extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(commandPropertyOpt)); Set groupIdsProvided = checkShareGroup(consumerPropsFromFile, extraConsumerProps); consumerProps = buildConsumerProps(consumerPropsFromFile, extraConsumerProps, groupIdsProvided); @@ -203,6 +256,13 @@ public final class ConsoleShareConsumerOptions extends CommandDefaultOptions { Class messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)); formatter = (MessageFormatter) messageFormatterClass.getDeclaredConstructor().newInstance(); + if (options.has(messageFormatterArgOpt) && options.has(messageFormatterArgOptDeprecated)) { + CommandLineUtils.printUsageAndExit(parser, "Options --property and --formatter-property cannot be specified together."); + } + if (options.has(messageFormatterArgOptDeprecated)) { + System.out.println("Option --property is deprecated and will be removed in a future version. Use --formatter-property instead."); + messageFormatterArgOpt = messageFormatterArgOptDeprecated; + } Properties formatterArgs = formatterArgs(); Map formatterConfigs = new HashMap<>(); for (final String name : formatterArgs.stringPropertyNames()) { diff --git a/tools/src/test/java/org/apache/kafka/tools/ConsoleProducerTest.java b/tools/src/test/java/org/apache/kafka/tools/ConsoleProducerTest.java index 6752aef29c7..e1a519ac6de 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConsoleProducerTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConsoleProducerTest.java @@ -34,6 +34,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Files; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -57,11 +58,16 @@ public class ConsoleProducerTest { "--bootstrap-server", "localhost:1002", "--topic", "t3", }; - private static final String[] CLIENT_ID_OVERRIDE = new String[]{ + private static final String[] CLIENT_ID_OVERRIDE_DEPRECATED = new String[]{ "--bootstrap-server", "localhost:1001", "--topic", "t3", "--producer-property", "client.id=producer-1" }; + private static final String[] CLIENT_ID_OVERRIDE = new String[]{ + "--bootstrap-server", "localhost:1001", + "--topic", "t3", + "--command-property", "client.id=producer-1" + }; private static final String[] BATCH_SIZE_OVERRIDDEN_BY_MAX_PARTITION_MEMORY_BYTES_VALUE = new String[]{ "--bootstrap-server", "localhost:1002", "--topic", "t3", @@ -151,8 +157,8 @@ public class ConsoleProducerTest { } @Test - public void testClientIdOverride() throws IOException { - ConsoleProducerOptions opts = new ConsoleProducerOptions(CLIENT_ID_OVERRIDE); + public void testClientIdOverrideDeprecated() throws IOException { + ConsoleProducerOptions opts = new ConsoleProducerOptions(CLIENT_ID_OVERRIDE_DEPRECATED); ProducerConfig producerConfig = new ProducerConfig(opts.producerProps()); assertEquals("producer-1", producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG)); @@ -222,6 +228,107 @@ public class ConsoleProducerTest { assertEquals(1, reader.closeCount()); } + @Test + public void shouldExitOnBothProducerPropertyAndCommandProperty() { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--producer-property", "acks=all", + "--command-property", "batch.size=16384" + }; + + try { + assertThrows(IllegalArgumentException.class, () -> new ConsoleProducerOptions(args)); + } finally { + Exit.resetExitProcedure(); + } + } + + @Test + public void shouldExitOnBothProducerConfigAndCommandConfig() throws IOException { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + Map configs = new HashMap<>(); + configs.put("acks", "all"); + File propsFile = ToolsTestUtils.tempPropertiesFile(configs); + + Map configs2 = new HashMap<>(); + configs2.put("batch.size", "16384"); + File propsFile2 = ToolsTestUtils.tempPropertiesFile(configs2); + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--producer.config", propsFile.getAbsolutePath(), + "--command-config", propsFile2.getAbsolutePath() + }; + + try { + assertThrows(IllegalArgumentException.class, () -> new ConsoleProducerOptions(args)); + } finally { + Exit.resetExitProcedure(); + } + } + + @Test + public void testClientIdOverrideUsingCommandProperty() throws IOException { + ConsoleProducerOptions opts = new ConsoleProducerOptions(CLIENT_ID_OVERRIDE); + ProducerConfig producerConfig = new ProducerConfig(opts.producerProps()); + + assertEquals("producer-1", producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG)); + } + + @Test + public void testProducerConfigFromFileUsingCommandConfig() throws IOException { + Map configs = new HashMap<>(); + configs.put("acks", "all"); + configs.put("batch.size", "32768"); + File propsFile = ToolsTestUtils.tempPropertiesFile(configs); + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--command-config", propsFile.getAbsolutePath() + }; + + ConsoleProducerOptions opts = new ConsoleProducerOptions(args); + ProducerConfig producerConfig = new ProducerConfig(opts.producerProps()); + + // "all" gets converted to "-1" internally by ProducerConfig + assertEquals("-1", producerConfig.getString(ProducerConfig.ACKS_CONFIG)); + assertEquals(32768, producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG)); + } + + @Test + public void testCommandPropertyOverridesConfig() throws IOException { + Map configs = new HashMap<>(); + configs.put("acks", "1"); + configs.put("batch.size", "16384"); + File propsFile = ToolsTestUtils.tempPropertiesFile(configs); + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--command-config", propsFile.getAbsolutePath(), + "--command-property", "acks=all" + }; + + ConsoleProducerOptions opts = new ConsoleProducerOptions(args); + ProducerConfig producerConfig = new ProducerConfig(opts.producerProps()); + + // Command property should override the config file value + // "all" gets converted to "-1" internally by ProducerConfig + assertEquals("-1", producerConfig.getString(ProducerConfig.ACKS_CONFIG)); + // Config file value should still be present + assertEquals(16384, producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG)); + } + public static class TestRecordReader implements RecordReader { private int configureCount = 0; private int closeCount = 0; diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java index 02ded3d5ca5..4639ff63a85 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java @@ -172,7 +172,7 @@ public class ConsoleConsumerOptionsTest { } @Test - public void shouldParseValidConsumerConfigWithAutoOffsetResetLatest() throws IOException { + public void shouldParseValidConsumerConfigWithAutoOffsetResetLatestDeprecated() throws IOException { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", @@ -189,7 +189,7 @@ public class ConsoleConsumerOptionsTest { } @Test - public void shouldParseValidConsumerConfigWithAutoOffsetResetEarliest() throws IOException { + public void shouldParseValidConsumerConfigWithAutoOffsetResetEarliestDeprecated() throws IOException { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", @@ -206,7 +206,7 @@ public class ConsoleConsumerOptionsTest { } @Test - public void shouldParseValidConsumerConfigWithAutoOffsetResetAndMatchingFromBeginning() throws IOException { + public void shouldParseValidConsumerConfigWithAutoOffsetResetAndMatchingFromBeginningDeprecated() throws IOException { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", @@ -240,7 +240,7 @@ public class ConsoleConsumerOptionsTest { } @Test - public void shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginning() { + public void shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginningDeprecated() { Exit.setExitProcedure((code, message) -> { throw new IllegalArgumentException(message); }); @@ -259,7 +259,7 @@ public class ConsoleConsumerOptionsTest { } @Test - public void shouldParseConfigsFromFile() throws IOException { + public void shouldParseConfigsFromFileDeprecated() throws IOException { Map configs = new HashMap<>(); configs.put("request.timeout.ms", "1000"); configs.put("group.id", "group1"); @@ -276,80 +276,82 @@ public class ConsoleConsumerOptionsTest { } @Test - public void groupIdsProvidedInDifferentPlacesMustMatch() throws IOException { + public void groupIdsProvidedInDifferentPlacesMustMatchDeprecated() throws IOException { Exit.setExitProcedure((code, message) -> { throw new IllegalArgumentException(message); }); + try { - // different in all three places - File propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file")); - final String[] args = new String[]{ - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--group", "group-from-arguments", - "--consumer-property", "group.id=group-from-properties", - "--consumer.config", propsFile.getAbsolutePath() - }; + // different in all three places + File propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file")); + final String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments", + "--consumer-property", "group.id=group-from-properties", + "--consumer.config", propsFile.getAbsolutePath() + }; - assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args)); + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args)); - // the same in all three places - propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "test-group")); - final String[] args1 = new String[]{ - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--group", "test-group", - "--consumer-property", "group.id=test-group", - "--consumer.config", propsFile.getAbsolutePath() - }; + // the same in all three places + propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "test-group")); + final String[] args1 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "test-group", + "--consumer-property", "group.id=test-group", + "--consumer.config", propsFile.getAbsolutePath() + }; - ConsoleConsumerOptions config = new ConsoleConsumerOptions(args1); - Properties props = config.consumerProps(); - assertEquals("test-group", props.getProperty("group.id")); + ConsoleConsumerOptions config = new ConsoleConsumerOptions(args1); + Properties props = config.consumerProps(); + assertEquals("test-group", props.getProperty("group.id")); - // different via --consumer-property and --consumer.config - propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file")); - final String[] args2 = new String[]{ - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--consumer-property", "group.id=group-from-properties", - "--consumer.config", propsFile.getAbsolutePath() - }; + // different via --consumer-property and --consumer.config + propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file")); + final String[] args2 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--consumer-property", "group.id=group-from-properties", + "--consumer.config", propsFile.getAbsolutePath() + }; - assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args2)); + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args2)); - // different via --consumer-property and --group - final String[] args3 = new String[]{ - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--group", "group-from-arguments", - "--consumer-property", "group.id=group-from-properties" - }; + // different via --consumer-property and --group + final String[] args3 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments", + "--consumer-property", "group.id=group-from-properties" + }; - assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args3)); + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args3)); - // different via --group and --consumer.config - propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file")); - final String[] args4 = new String[]{ - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--group", "group-from-arguments", - "--consumer.config", propsFile.getAbsolutePath() - }; - assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args4)); + // different via --group and --consumer.config + propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file")); + final String[] args4 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments", + "--consumer.config", propsFile.getAbsolutePath() + }; + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args4)); - // via --group only - final String[] args5 = new String[]{ - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--group", "group-from-arguments" - }; + // via --group only + final String[] args5 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments" + }; - config = new ConsoleConsumerOptions(args5); - props = config.consumerProps(); - assertEquals("group-from-arguments", props.getProperty("group.id")); - - Exit.resetExitProcedure(); + config = new ConsoleConsumerOptions(args5); + props = config.consumerProps(); + assertEquals("group-from-arguments", props.getProperty("group.id")); + } finally { + Exit.resetExitProcedure(); + } } @Test @@ -508,7 +510,7 @@ public class ConsoleConsumerOptionsTest { } @Test - public void testClientIdOverride() throws IOException { + public void testClientIdOverrideDeprecated() throws IOException { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", @@ -618,4 +620,234 @@ public class ConsoleConsumerOptionsTest { "--formatter", formatter, }; } + + @Test + public void shouldExitOnBothConsumerPropertyAndCommandProperty() { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--consumer-property", "auto.offset.reset=latest", + "--command-property", "session.timeout.ms=10000" + }; + + try { + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args)); + } finally { + Exit.resetExitProcedure(); + } + } + + @Test + public void shouldExitOnBothConsumerConfigAndCommandConfig() throws IOException { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + Map configs = new HashMap<>(); + configs.put("request.timeout.ms", "1000"); + File propsFile = ToolsTestUtils.tempPropertiesFile(configs); + + Map configs2 = new HashMap<>(); + configs2.put("session.timeout.ms", "10000"); + File propsFile2 = ToolsTestUtils.tempPropertiesFile(configs2); + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--consumer.config", propsFile.getAbsolutePath(), + "--command-config", propsFile2.getAbsolutePath() + }; + + try { + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args)); + } finally { + Exit.resetExitProcedure(); + } + } + + @Test + public void shouldParseValidConsumerConfigWithAutoOffsetResetLatestUsingCommandProperty() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--command-property", "auto.offset.reset=latest" + }; + + ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); + Properties consumerProperties = config.consumerProps(); + + assertEquals("localhost:9092", config.bootstrapServer()); + assertEquals("test", config.topicArg().orElse("")); + assertFalse(config.fromBeginning()); + assertEquals("latest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); + } + + @Test + public void shouldParseValidConsumerConfigWithAutoOffsetResetEarliestUsingCommandProperty() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--command-property", "auto.offset.reset=earliest" + }; + + ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); + Properties consumerProperties = config.consumerProps(); + + assertEquals("localhost:9092", config.bootstrapServer()); + assertEquals("test", config.topicArg().orElse("")); + assertFalse(config.fromBeginning()); + assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); + } + + @Test + public void shouldParseValidConsumerConfigWithAutoOffsetResetAndMatchingFromBeginningUsingCommandProperty() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--command-property", "auto.offset.reset=earliest", + "--from-beginning" + }; + + ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); + Properties consumerProperties = config.consumerProps(); + + assertEquals("localhost:9092", config.bootstrapServer()); + assertEquals("test", config.topicArg().orElse("")); + assertTrue(config.fromBeginning()); + assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); + } + + @Test + public void shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginningUsingCommandProperty() { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--command-property", "auto.offset.reset=latest", + "--from-beginning" + }; + try { + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args)); + } finally { + Exit.resetExitProcedure(); + } + } + + @Test + public void shouldParseConfigsFromFileUsingCommandConfig() throws IOException { + Map configs = new HashMap<>(); + configs.put("request.timeout.ms", "1000"); + configs.put("group.id", "group1"); + File propsFile = ToolsTestUtils.tempPropertiesFile(configs); + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--command-config", propsFile.getAbsolutePath() + }; + + ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); + assertEquals("1000", config.consumerProps().get("request.timeout.ms")); + assertEquals("group1", config.consumerProps().get("group.id")); + } + + @Test + public void groupIdsProvidedInDifferentPlacesMustMatchUsingCommandConfig() throws IOException { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + try { + // different in all three places + File propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file")); + final String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments", + "--command-property", "group.id=group-from-properties", + "--command-config", propsFile.getAbsolutePath() + }; + + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args)); + + // the same in all three places + propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "test-group")); + final String[] args1 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "test-group", + "--command-property", "group.id=test-group", + "--command-config", propsFile.getAbsolutePath() + }; + + ConsoleConsumerOptions config = new ConsoleConsumerOptions(args1); + Properties props = config.consumerProps(); + assertEquals("test-group", props.getProperty("group.id")); + + // different via --command-property and --command-config + propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file")); + final String[] args2 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--command-property", "group.id=group-from-properties", + "--command-config", propsFile.getAbsolutePath() + }; + + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args2)); + + // different via --command-property and --group + final String[] args3 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments", + "--command-property", "group.id=group-from-properties" + }; + + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args3)); + + // different via --group and --command-config + propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file")); + final String[] args4 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments", + "--command-config", propsFile.getAbsolutePath() + }; + assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args4)); + + // via --group only + final String[] args5 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments" + }; + + config = new ConsoleConsumerOptions(args5); + props = config.consumerProps(); + assertEquals("group-from-arguments", props.getProperty("group.id")); + } finally { + Exit.resetExitProcedure(); + } + } + + @Test + public void testClientIdOverrideUsingCommandProperty() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--from-beginning", + "--command-property", "client.id=consumer-1" + }; + + ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); + Properties consumerProperties = config.consumerProps(); + + assertEquals("consumer-1", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); + } } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptionsTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptionsTest.java index c2b7b73c1a5..fecf53dbbec 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptionsTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptionsTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.AcknowledgeType; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.MockDeserializer; import org.apache.kafka.tools.ToolsTestUtils; import org.junit.jupiter.api.Test; @@ -32,7 +33,9 @@ import java.util.Properties; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; public class ConsoleShareConsumerOptionsTest { @@ -72,7 +75,7 @@ public class ConsoleShareConsumerOptionsTest { } @Test - public void shouldParseValidConsumerConfigWithSessionTimeout() throws IOException { + public void shouldParseValidConsumerConfigWithSessionTimeoutDeprecated() throws IOException { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", @@ -88,7 +91,7 @@ public class ConsoleShareConsumerOptionsTest { } @Test - public void shouldParseConfigsFromFile() throws IOException { + public void shouldParseConfigsFromFileDeprecated() throws IOException { Map configs = new HashMap<>(); configs.put("request.timeout.ms", "1000"); configs.put("group.id", "group1"); @@ -109,80 +112,82 @@ public class ConsoleShareConsumerOptionsTest { } @Test - public void groupIdsProvidedInDifferentPlacesMustMatch() throws IOException { + public void groupIdsProvidedInDifferentPlacesMustMatchDeprecated() throws IOException { Exit.setExitProcedure((code, message) -> { throw new IllegalArgumentException(message); }); - // different in all three places - File propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file")); - final String[] args = new String[]{ - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--group", "group-from-arguments", - "--consumer-property", "group.id=group-from-properties", - "--consumer-config", propsFile.getAbsolutePath() - }; + try { + // different in all three places + File propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file")); + final String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments", + "--consumer-property", "group.id=group-from-properties", + "--consumer-config", propsFile.getAbsolutePath() + }; - assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args)); + assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args)); - // the same in all three places - propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "test-group")); - final String[] args1 = new String[]{ - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--group", "test-group", - "--consumer-property", "group.id=test-group", - "--consumer-config", propsFile.getAbsolutePath() - }; + // the same in all three places + propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "test-group")); + final String[] args1 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "test-group", + "--consumer-property", "group.id=test-group", + "--consumer-config", propsFile.getAbsolutePath() + }; - ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args1); - Properties props = config.consumerProps(); - assertEquals("test-group", props.getProperty("group.id")); + ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args1); + Properties props = config.consumerProps(); + assertEquals("test-group", props.getProperty("group.id")); - // different via --consumer-property and --consumer-config - propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file")); - final String[] args2 = new String[]{ - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--consumer-property", "group.id=group-from-properties", - "--consumer-config", propsFile.getAbsolutePath() - }; + // different via --consumer-property and --consumer-config + propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file")); + final String[] args2 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--consumer-property", "group.id=group-from-properties", + "--consumer-config", propsFile.getAbsolutePath() + }; - assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args2)); + assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args2)); - // different via --consumer-property and --group - final String[] args3 = new String[]{ - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--group", "group-from-arguments", - "--consumer-property", "group.id=group-from-properties" - }; + // different via --consumer-property and --group + final String[] args3 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments", + "--consumer-property", "group.id=group-from-properties" + }; - assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args3)); + assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args3)); - // different via --group and --consumer-config - propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file")); - final String[] args4 = new String[]{ - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--group", "group-from-arguments", - "--consumer-config", propsFile.getAbsolutePath() - }; - assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args4)); + // different via --group and --consumer-config + propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file")); + final String[] args4 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments", + "--consumer-config", propsFile.getAbsolutePath() + }; + assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args4)); - // via --group only - final String[] args5 = new String[]{ - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--group", "group-from-arguments" - }; + // via --group only + final String[] args5 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments" + }; - config = new ConsoleShareConsumerOptions(args5); - props = config.consumerProps(); - assertEquals("group-from-arguments", props.getProperty("group.id")); - - Exit.resetExitProcedure(); + config = new ConsoleShareConsumerOptions(args5); + props = config.consumerProps(); + assertEquals("group-from-arguments", props.getProperty("group.id")); + } finally { + Exit.resetExitProcedure(); + } } @Test @@ -203,7 +208,7 @@ public class ConsoleShareConsumerOptionsTest { } @Test - public void testClientIdOverride() throws IOException { + public void testClientIdOverrideDeprecated() throws IOException { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", @@ -216,6 +221,56 @@ public class ConsoleShareConsumerOptionsTest { assertEquals("consumer-1", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); } + @Test + public void testCustomPropertyShouldBePassedToConfigureMethod() throws Exception { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--property", "print.key=true", + "--property", "key.deserializer=org.apache.kafka.test.MockDeserializer", + "--property", "key.deserializer.my-props=abc" + }; + + ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args); + + assertInstanceOf(DefaultMessageFormatter.class, config.formatter()); + assertTrue(config.formatterArgs().containsKey("key.deserializer.my-props")); + DefaultMessageFormatter formatter = (DefaultMessageFormatter) config.formatter(); + assertTrue(formatter.keyDeserializer().isPresent()); + assertInstanceOf(MockDeserializer.class, formatter.keyDeserializer().get()); + MockDeserializer keyDeserializer = (MockDeserializer) formatter.keyDeserializer().get(); + assertEquals(1, keyDeserializer.configs.size()); + assertEquals("abc", keyDeserializer.configs.get("my-props")); + assertTrue(keyDeserializer.isKey); + } + + @Test + public void testCustomConfigShouldBePassedToConfigureMethod() throws Exception { + Map configs = new HashMap<>(); + configs.put("key.deserializer.my-props", "abc"); + configs.put("print.key", "false"); + File propsFile = ToolsTestUtils.tempPropertiesFile(configs); + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--property", "print.key=true", + "--property", "key.deserializer=org.apache.kafka.test.MockDeserializer", + "--formatter-config", propsFile.getAbsolutePath() + }; + + ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args); + + assertInstanceOf(DefaultMessageFormatter.class, config.formatter()); + assertTrue(config.formatterArgs().containsKey("key.deserializer.my-props")); + DefaultMessageFormatter formatter = (DefaultMessageFormatter) config.formatter(); + assertTrue(formatter.keyDeserializer().isPresent()); + assertInstanceOf(MockDeserializer.class, formatter.keyDeserializer().get()); + MockDeserializer keyDeserializer = (MockDeserializer) formatter.keyDeserializer().get(); + assertEquals(1, keyDeserializer.configs.size()); + assertEquals("abc", keyDeserializer.configs.get("my-props")); + assertTrue(keyDeserializer.isKey); + } + @Test public void testDefaultClientId() throws IOException { String[] args = new String[]{ @@ -271,4 +326,182 @@ public class ConsoleShareConsumerOptionsTest { Exit.resetExitProcedure(); } } + + @Test + public void shouldExitOnBothConsumerPropertyAndCommandProperty() { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--consumer-property", "session.timeout.ms=10000", + "--command-property", "request.timeout.ms=30000" + }; + + try { + assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args)); + } finally { + Exit.resetExitProcedure(); + } + } + + @Test + public void shouldExitOnBothConsumerConfigAndCommandConfig() throws IOException { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + Map configs = new HashMap<>(); + configs.put("request.timeout.ms", "1000"); + File propsFile = ToolsTestUtils.tempPropertiesFile(configs); + + Map configs2 = new HashMap<>(); + configs2.put("session.timeout.ms", "10000"); + File propsFile2 = ToolsTestUtils.tempPropertiesFile(configs2); + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--consumer-config", propsFile.getAbsolutePath(), + "--command-config", propsFile2.getAbsolutePath() + }; + + try { + assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args)); + } finally { + Exit.resetExitProcedure(); + } + } + + @Test + public void shouldParseValidConsumerConfigWithSessionTimeout() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--command-property", "session.timeout.ms=10000" + }; + + ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args); + Properties consumerProperties = config.consumerProps(); + + assertEquals("localhost:9092", config.bootstrapServer()); + assertEquals("test", config.topicArg()); + assertEquals("10000", consumerProperties.getProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG)); + } + + @Test + public void shouldParseConfigsFromFile() throws IOException { + Map configs = new HashMap<>(); + configs.put("request.timeout.ms", "1000"); + configs.put("group.id", "group1"); + File propsFile = ToolsTestUtils.tempPropertiesFile(configs); + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--command-config", propsFile.getAbsolutePath() + }; + + ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args); + + // KafkaShareConsumer uses Utils.propsToMap to convert the properties to a map, + // so using the same method to check the map has the expected values + Map configMap = Utils.propsToMap(config.consumerProps()); + assertEquals("1000", configMap.get("request.timeout.ms")); + assertEquals("group1", configMap.get("group.id")); + } + + @Test + public void groupIdsProvidedInDifferentPlacesMustMatch() throws IOException { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + try { + // different in all three places + File propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file")); + final String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments", + "--command-property", "group.id=group-from-properties", + "--command-config", propsFile.getAbsolutePath() + }; + + assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args)); + + // the same in all three places + propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "test-group")); + final String[] args1 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "test-group", + "--command-property", "group.id=test-group", + "--command-config", propsFile.getAbsolutePath() + }; + + ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args1); + Properties props = config.consumerProps(); + assertEquals("test-group", props.getProperty("group.id")); + + // different via --command-property and --command-config + propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file")); + final String[] args2 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--command-property", "group.id=group-from-properties", + "--command-config", propsFile.getAbsolutePath() + }; + + assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args2)); + + // different via --command-property and --group + final String[] args3 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments", + "--command-property", "group.id=group-from-properties" + }; + + assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args3)); + + // different via --group and --command-config + propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file")); + final String[] args4 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments", + "--command-config", propsFile.getAbsolutePath() + }; + assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args4)); + + // via --group only + final String[] args5 = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments" + }; + + config = new ConsoleShareConsumerOptions(args5); + props = config.consumerProps(); + assertEquals("group-from-arguments", props.getProperty("group.id")); + } finally { + Exit.resetExitProcedure(); + } + } + + @Test + public void testClientIdOverride() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--command-property", "client.id=consumer-1" + }; + + ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args); + Properties consumerProperties = config.consumerProps(); + + assertEquals("consumer-1", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); + } }