KAFKA-19623: Implement KIP-1147 for console producer/consumer/share-consumer. (#20479)

*What*
https://issues.apache.org/jira/browse/KAFKA-19623

- The PR implements KIP-1147

(https://cwiki.apache.org/confluence/display/KAFKA/KIP-1147%3A+Improve+consistency+of+command-line+arguments)
for the console tools i.e. `ConsoleProducer`, `ConsoleConsumer` and
`ConsoleShareConsumer`.

- Currently the previous names for the options are still usable but
there will be warning message stating those are deprecated and will be
removed in a future version.
- I have added unit tests and also manually verified using the console
tools that things are working as expected.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Jhen-Yung Hsu
 <jhenyunghsu@gmail.com>, Jimmy Wang
 <48462172+JimmyWang6@users.noreply.github.com>
This commit is contained in:
Shivsundar R 2025-09-17 10:28:20 -04:00 committed by GitHub
parent bbbc0cf793
commit 3bc50f937c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 930 additions and 187 deletions

View File

@ -147,7 +147,7 @@ Single Node
- To produce messages using client scripts (Ensure that java version >= 17): - To produce messages using client scripts (Ensure that java version >= 17):
``` ```
# Run from root of the repo # Run from root of the repo
$ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9093 --producer.config ./docker/examples/fixtures/client-secrets/client-ssl.properties $ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9093 --command-config ./docker/examples/fixtures/client-secrets/client-ssl.properties
``` ```
- File Input: - File Input:
- Here ssl configs are provided via file input. - Here ssl configs are provided via file input.
@ -167,7 +167,7 @@ Single Node
- To produce messages using client scripts (Ensure that java version >= 17): - To produce messages using client scripts (Ensure that java version >= 17):
``` ```
# Run from root of the repo # Run from root of the repo
$ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9093 --producer.config ./docker/examples/fixtures/client-secrets/client-ssl.properties $ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9093 --command-config ./docker/examples/fixtures/client-secrets/client-ssl.properties
``` ```
Multi Node Cluster Multi Node Cluster
@ -219,7 +219,7 @@ Multi Node Cluster
- To produce messages using client scripts (Ensure that java version >= 17): - To produce messages using client scripts (Ensure that java version >= 17):
``` ```
# Run from root of the repo # Run from root of the repo
$ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:29093 --producer.config ./docker/examples/fixtures/client-secrets/client-ssl.properties $ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:29093 --command-config ./docker/examples/fixtures/client-secrets/client-ssl.properties
``` ```
- Isolated: - Isolated:
- Examples are present in `docker-compose-files/cluster/isolated` directory. - Examples are present in `docker-compose-files/cluster/isolated` directory.
@ -258,7 +258,7 @@ Multi Node Cluster
- To produce messages using client scripts (Ensure that java version >= 17): - To produce messages using client scripts (Ensure that java version >= 17):
``` ```
# Run from root of the repo # Run from root of the repo
$ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:29093 --producer.config ./docker/examples/fixtures/client-secrets/client-ssl.properties $ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:29093 --command-config ./docker/examples/fixtures/client-secrets/client-ssl.properties
``` ```
- Note that the examples are meant to be tried one at a time, make sure you close an example server before trying out the other to avoid conflicts. - Note that the examples are meant to be tried one at a time, make sure you close an example server before trying out the other to avoid conflicts.

View File

@ -65,7 +65,7 @@ class DockerSanityTest(unittest.TestCase):
subprocess.run(["bash", "-c", " ".join(command)]) subprocess.run(["bash", "-c", " ".join(command)])
def consume_message(self, topic, consumer_config): def consume_message(self, topic, consumer_config):
command = [f"{self.FIXTURES_DIR}/{constants.KAFKA_CONSOLE_CONSUMER}", "--topic", topic, "--property", "'print.key=true'", "--property", "'key.separator=:'", "--from-beginning", "--max-messages", "1", "--timeout-ms", f"{constants.CLIENT_TIMEOUT}"] command = [f"{self.FIXTURES_DIR}/{constants.KAFKA_CONSOLE_CONSUMER}", "--topic", topic, "--formatter-property", "'print.key=true'", "--formatter-property", "'key.separator=:'", "--from-beginning", "--max-messages", "1", "--timeout-ms", f"{constants.CLIENT_TIMEOUT}"]
command.extend(consumer_config) command.extend(consumer_config)
message = subprocess.check_output(["bash", "-c", " ".join(command)]) message = subprocess.check_output(["bash", "-c", " ".join(command)])
return message.decode("utf-8").strip() return message.decode("utf-8").strip()
@ -93,9 +93,9 @@ class DockerSanityTest(unittest.TestCase):
errors.append(constants.BROKER_METRICS_ERROR_PREFIX + str(e)) errors.append(constants.BROKER_METRICS_ERROR_PREFIX + str(e))
return errors return errors
producer_config = ["--bootstrap-server", "localhost:9092", "--property", "client.id=host"] producer_config = ["--bootstrap-server", "localhost:9092", "--command-property", "client.id=host"]
self.produce_message(constants.BROKER_METRICS_TEST_TOPIC, producer_config, "key", "message") self.produce_message(constants.BROKER_METRICS_TEST_TOPIC, producer_config, "key", "message")
consumer_config = ["--bootstrap-server", "localhost:9092", "--property", "auto.offset.reset=earliest"] consumer_config = ["--bootstrap-server", "localhost:9092", "--command-property", "auto.offset.reset=earliest"]
message = self.consume_message(constants.BROKER_METRICS_TEST_TOPIC, consumer_config) message = self.consume_message(constants.BROKER_METRICS_TEST_TOPIC, consumer_config)
try: try:
self.assertEqual(message, "key:message") self.assertEqual(message, "key:message")
@ -129,13 +129,13 @@ class DockerSanityTest(unittest.TestCase):
return errors return errors
producer_config = ["--bootstrap-server", ssl_broker_port, producer_config = ["--bootstrap-server", ssl_broker_port,
"--producer.config", f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}"] "--command-config", f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}"]
self.produce_message(topic, producer_config, "key", "message") self.produce_message(topic, producer_config, "key", "message")
consumer_config = [ consumer_config = [
"--bootstrap-server", ssl_broker_port, "--bootstrap-server", ssl_broker_port,
"--property", "auto.offset.reset=earliest", "--command-property", "auto.offset.reset=earliest",
"--consumer.config", f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}", "--command-config", f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}",
] ]
message = self.consume_message(topic, consumer_config) message = self.consume_message(topic, consumer_config)
try: try:
@ -155,7 +155,7 @@ class DockerSanityTest(unittest.TestCase):
errors.append(constants.BROKER_RESTART_ERROR_PREFIX + str(e)) errors.append(constants.BROKER_RESTART_ERROR_PREFIX + str(e))
return errors return errors
producer_config = ["--bootstrap-server", "localhost:9092", "--property", "client.id=host"] producer_config = ["--bootstrap-server", "localhost:9092", "--command-property", "client.id=host"]
self.produce_message(constants.BROKER_RESTART_TEST_TOPIC, producer_config, "key", "message") self.produce_message(constants.BROKER_RESTART_TEST_TOPIC, producer_config, "key", "message")
print("Stopping Container") print("Stopping Container")
@ -163,7 +163,7 @@ class DockerSanityTest(unittest.TestCase):
print("Resuming Container") print("Resuming Container")
self.resume_container() self.resume_container()
consumer_config = ["--bootstrap-server", "localhost:9092", "--property", "auto.offset.reset=earliest"] consumer_config = ["--bootstrap-server", "localhost:9092", "--command-property", "auto.offset.reset=earliest"]
message = self.consume_message(constants.BROKER_RESTART_TEST_TOPIC, consumer_config) message = self.consume_message(constants.BROKER_RESTART_TEST_TOPIC, consumer_config)
try: try:
self.assertEqual(message, "key:message") self.assertEqual(message, "key:message")

View File

@ -510,8 +510,8 @@ ssl.key.password=test1234</code></pre>
</ol> </ol>
<br> <br>
Examples using console-producer and console-consumer: Examples using console-producer and console-consumer:
<pre><code class="language-bash">$ bin/kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test --producer.config client-ssl.properties <pre><code class="language-bash">$ bin/kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test --command-config client-ssl.properties
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties</code></pre> $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --command-config client-ssl.properties</code></pre>
</li> </li>
</ol> </ol>
<h3 class="anchor-heading"><a id="security_sasl" class="anchor-link"></a><a href="#security_sasl">7.4 Authentication using SASL</a></h3> <h3 class="anchor-heading"><a id="security_sasl" class="anchor-link"></a><a href="#security_sasl">7.4 Authentication using SASL</a></h3>

View File

@ -1122,15 +1122,15 @@ public class KStreamAggregationIntegrationTest {
final String[] args = new String[] { final String[] args = new String[] {
"--bootstrap-server", CLUSTER.bootstrapServers(), "--bootstrap-server", CLUSTER.bootstrapServers(),
"--from-beginning", "--from-beginning",
"--property", "print.key=true", "--formatter-property", "print.key=true",
"--property", "print.timestamp=" + printTimestamp, "--formatter-property", "print.timestamp=" + printTimestamp,
"--topic", outputTopic, "--topic", outputTopic,
"--max-messages", String.valueOf(numMessages), "--max-messages", String.valueOf(numMessages),
"--property", "key.deserializer=" + keyDeserializer.getClass().getName(), "--formatter-property", "key.deserializer=" + keyDeserializer.getClass().getName(),
"--property", "value.deserializer=" + valueDeserializer.getClass().getName(), "--formatter-property", "value.deserializer=" + valueDeserializer.getClass().getName(),
"--property", "key.separator=" + keySeparator, "--formatter-property", "key.separator=" + keySeparator,
"--property", "key.deserializer." + TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS + "=" + Serdes.serdeFrom(innerClass).getClass().getName(), "--formatter-property", "key.deserializer." + TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS + "=" + Serdes.serdeFrom(innerClass).getClass().getName(),
"--property", "key.deserializer.window.size.ms=500", "--formatter-property", "key.deserializer.window.size.ms=500",
}; };
ConsoleConsumer.run(new ConsoleConsumerOptions(args)); ConsoleConsumer.run(new ConsoleConsumerOptions(args));

View File

@ -88,7 +88,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
jaas_override_variables A dict of variables to be used in the jaas.conf template file jaas_override_variables A dict of variables to be used in the jaas.conf template file
kafka_opts_override Override parameters of the KAFKA_OPTS environment variable kafka_opts_override Override parameters of the KAFKA_OPTS environment variable
client_prop_file_override Override client.properties file used by the consumer client_prop_file_override Override client.properties file used by the consumer
consumer_properties A dict of values to pass in as --consumer-property key=value consumer_properties A dict of values to pass in as --command-property key=value. For versions older than KAFKA_4_2_0, these will be passed as --consumer-property key=value
""" """
JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []), JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
root=ConsoleConsumer.PERSISTENT_ROOT) root=ConsoleConsumer.PERSISTENT_ROOT)
@ -163,8 +163,11 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
"export KAFKA_LOG4J_OPTS=\"%(log4j_param)s%(log4j_config)s\"; " \ "export KAFKA_LOG4J_OPTS=\"%(log4j_param)s%(log4j_config)s\"; " \
"export KAFKA_OPTS=%(kafka_opts)s; " \ "export KAFKA_OPTS=%(kafka_opts)s; " \
"%(console_consumer)s " \ "%(console_consumer)s " \
"--topic %(topic)s " \ "--topic %(topic)s " % args
"--consumer.config %(config_file)s " % args
version = get_version(node)
command_config_arg = "--command-config" if version.supports_command_config() else "--consumer.config"
cmd += "%s %s" % (command_config_arg, args['config_file'])
cmd += " --bootstrap-server %(broker_list)s" % args cmd += " --bootstrap-server %(broker_list)s" % args
cmd += " --isolation-level %s" % self.isolation_level cmd += " --isolation-level %s" % self.isolation_level
@ -176,14 +179,15 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
# This will be added in the properties file instead # This will be added in the properties file instead
cmd += " --timeout-ms %s" % self.consumer_timeout_ms cmd += " --timeout-ms %s" % self.consumer_timeout_ms
formatter_property_arg = "--formatter-property" if version.supports_formatter_property else "--property"
if self.print_timestamp: if self.print_timestamp:
cmd += " --property print.timestamp=true" cmd += " %s print.timestamp=true" % formatter_property_arg
if self.print_key: if self.print_key:
cmd += " --property print.key=true" cmd += " %s print.key=true" % formatter_property_arg
if self.print_partition: if self.print_partition:
cmd += " --property print.partition=true" cmd += " %s print.partition=true" % formatter_property_arg
# LoggingMessageFormatter was introduced after 0.9 # LoggingMessageFormatter was introduced after 0.9
if node.version > LATEST_3_7: if node.version > LATEST_3_7:
@ -194,9 +198,10 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
if self.enable_systest_events: if self.enable_systest_events:
cmd += " --enable-systest-events" cmd += " --enable-systest-events"
command_property_arg = "--command-property" if version.supports_command_property() else "--consumer-property"
if self.consumer_properties is not None: if self.consumer_properties is not None:
for k, v in self.consumer_properties.items(): for k, v in self.consumer_properties.items():
cmd += " --consumer-property %s=%s" % (k, v) cmd += " %s %s=%s" % (command_property_arg, k, v)
cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args
return cmd return cmd

View File

@ -21,7 +21,7 @@ from ducktape.utils.util import wait_until
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.monitor.jmx import JmxMixin from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.version import DEV_BRANCH, LATEST_4_1 from kafkatest.version import DEV_BRANCH, LATEST_4_1, get_version
from kafkatest.services.kafka.util import fix_opts_for_new_jvm, get_log4j_config_param, get_log4j_config_for_tools from kafkatest.services.kafka.util import fix_opts_for_new_jvm, get_log4j_config_param, get_log4j_config_for_tools
""" """
@ -84,7 +84,7 @@ class ConsoleShareConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadSer
jaas_override_variables A dict of variables to be used in the jaas.conf template file jaas_override_variables A dict of variables to be used in the jaas.conf template file
kafka_opts_override Override parameters of the KAFKA_OPTS environment variable kafka_opts_override Override parameters of the KAFKA_OPTS environment variable
client_prop_file_override Override client.properties file used by the consumer client_prop_file_override Override client.properties file used by the consumer
share_consumer_properties A dict of values to pass in as --consumer-property key=value share_consumer_properties A dict of values to pass in as --command-property key=value. For versions older than KAFKA_4_2_0, these will be passed as --consumer-property key=value
""" """
JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []), JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
root=ConsoleShareConsumer.PERSISTENT_ROOT) root=ConsoleShareConsumer.PERSISTENT_ROOT)
@ -156,31 +156,36 @@ class ConsoleShareConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadSer
"export KAFKA_LOG4J_OPTS=\"%(log4j_param)s%(log4j_config)s\"; " \ "export KAFKA_LOG4J_OPTS=\"%(log4j_param)s%(log4j_config)s\"; " \
"export KAFKA_OPTS=%(kafka_opts)s; " \ "export KAFKA_OPTS=%(kafka_opts)s; " \
"%(console_share_consumer)s " \ "%(console_share_consumer)s " \
"--topic %(topic)s " \ "--topic %(topic)s " % args
"--consumer-config %(config_file)s " % args
version = get_version(node)
command_config_arg = "--command-config" if version.supports_command_config() else "--consumer-config"
cmd += "%s %s" % (command_config_arg, args['config_file'])
cmd += " --bootstrap-server %(broker_list)s" % args cmd += " --bootstrap-server %(broker_list)s" % args
if self.share_consumer_timeout_ms is not None: if self.share_consumer_timeout_ms is not None:
# This will be added in the properties file instead # This will be added in the properties file instead
cmd += " --timeout-ms %s" % self.share_consumer_timeout_ms cmd += " --timeout-ms %s" % self.share_consumer_timeout_ms
formatter_property_arg = "--formatter-property" if version.supports_formatter_property else "--property"
if self.print_timestamp: if self.print_timestamp:
cmd += " --property print.timestamp=true" cmd += " %s print.timestamp=true" % formatter_property_arg
if self.print_key: if self.print_key:
cmd += " --property print.key=true" cmd += " %s print.key=true" % formatter_property_arg
if self.print_partition: if self.print_partition:
cmd += " --property print.partition=true" cmd += " %s print.partition=true" % formatter_property_arg
cmd += " --formatter org.apache.kafka.tools.consumer.LoggingMessageFormatter" cmd += " --formatter org.apache.kafka.tools.consumer.LoggingMessageFormatter"
if self.enable_systest_events: if self.enable_systest_events:
cmd += " --enable-systest-events" cmd += " --enable-systest-events"
command_property_arg = "--command-property" if version.supports_command_property() else "--consumer-property"
if self.share_consumer_properties is not None: if self.share_consumer_properties is not None:
for k, v in self.share_consumer_properties.items(): for k, v in self.share_consumer_properties.items():
cmd += " --consumer-property %s=%s" % (k, v) cmd += " %s %s=%s" % (command_property_arg, k, v)
cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args
return cmd return cmd

View File

@ -104,6 +104,20 @@ class KafkaVersion(LooseVersion):
# - For older versions, continue using --producer.config or --consumer.config # - For older versions, continue using --producer.config or --consumer.config
return self >= V_4_2_0 return self >= V_4_2_0
def supports_command_property(self):
# According to KIP-1147, --producer-property and --consumer-property have been deprecated and will be removed in future versions
# For backward compatibility, we select the configuration based on node version:
# - For versions 4.2.0 and above, use --command-property
# - For older versions, continue using --producer-property or --consumer-property
return self >= V_4_2_0
def supports_formatter_property(self):
# According to KIP-1147, --property has been deprecated and will be removed in future versions
# For backward compatibility, we select the configuration based on node version:
# - For versions 4.2.0 and above, use --formatter-property
# - For older versions, continue using --property
return self >= V_4_2_0
def get_version(node=None): def get_version(node=None):
"""Return the version attached to the given node. """Return the version attached to the given node.
Default to DEV_BRANCH if node or node.version is undefined (aka None) Default to DEV_BRANCH if node or node.version is undefined (aka None)

View File

@ -131,8 +131,12 @@ public class ConsoleProducer {
private final OptionSpec<Integer> socketBufferSizeOpt; private final OptionSpec<Integer> socketBufferSizeOpt;
private final OptionSpec<String> propertyOpt; private final OptionSpec<String> propertyOpt;
private final OptionSpec<String> readerConfigOpt; private final OptionSpec<String> readerConfigOpt;
@Deprecated(since = "4.2", forRemoval = true)
private final OptionSpec<String> producerPropertyOpt; private final OptionSpec<String> producerPropertyOpt;
private OptionSpec<String> commandPropertyOpt;
@Deprecated(since = "4.2", forRemoval = true)
private final OptionSpec<String> producerConfigOpt; private final OptionSpec<String> producerConfigOpt;
private OptionSpec<String> commandConfigOpt;
public ConsoleProducerOptions(String[] args) { public ConsoleProducerOptions(String[] args) {
super(args); super(args);
@ -250,11 +254,20 @@ public class ConsoleProducer {
.withRequiredArg() .withRequiredArg()
.describedAs("config file") .describedAs("config file")
.ofType(String.class); .ofType(String.class);
producerPropertyOpt = parser.accepts("producer-property", "A mechanism to pass user-defined properties in the form key=value to the producer. ") producerPropertyOpt = parser.accepts("producer-property", "(DEPRECATED) A mechanism to pass user-defined properties in the form key=value to the producer." +
"This option will be removed in a future version. Use --command-property instead.")
.withRequiredArg() .withRequiredArg()
.describedAs("producer_prop") .describedAs("producer_prop")
.ofType(String.class); .ofType(String.class);
producerConfigOpt = parser.accepts("producer.config", "Producer config properties file. Note that " + producerPropertyOpt + " takes precedence over this config.") commandPropertyOpt = parser.accepts("command-property", "A mechanism to pass user-defined properties in the form key=value to the producer.")
.withRequiredArg()
.describedAs("producer_prop")
.ofType(String.class);
producerConfigOpt = parser.accepts("producer.config", "(DEPRECATED) Producer config properties file. Note that " + commandPropertyOpt + " takes precedence over this config. This option will be removed in a future version. Use --command-config instead.")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
commandConfigOpt = parser.accepts("command-config", "Producer config properties file. Note that " + commandPropertyOpt + " takes precedence over this config.")
.withRequiredArg() .withRequiredArg()
.describedAs("config file") .describedAs("config file")
.ofType(String.class); .ofType(String.class);
@ -273,6 +286,23 @@ public class ConsoleProducer {
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt); CommandLineUtils.checkRequiredArgs(parser, options, topicOpt);
if (options.has(commandConfigOpt) && options.has(producerConfigOpt)) {
CommandLineUtils.printUsageAndExit(parser, "Options --command-config and --producer.config cannot be specified together.");
}
if (options.has(commandPropertyOpt) && options.has(producerPropertyOpt)) {
CommandLineUtils.printUsageAndExit(parser, "Options --command-property and --producer-property cannot be specified together.");
}
if (options.has(producerPropertyOpt)) {
System.out.println("Warning: --producer-property is deprecated and will be removed in a future version. Use --command-property instead.");
commandPropertyOpt = producerPropertyOpt;
}
if (options.has(producerConfigOpt)) {
System.out.println("Warning: --producer.config is deprecated and will be removed in a future version. Use --command-config instead.");
commandConfigOpt = producerConfigOpt;
}
try { try {
ToolsUtils.validateBootstrapServer(options.valueOf(bootstrapServerOpt)); ToolsUtils.validateBootstrapServer(options.valueOf(bootstrapServerOpt));
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
@ -314,11 +344,11 @@ public class ConsoleProducer {
Properties producerProps() throws IOException { Properties producerProps() throws IOException {
Properties props = new Properties(); Properties props = new Properties();
if (options.has(producerConfigOpt)) { if (options.has(commandConfigOpt)) {
props.putAll(loadProps(options.valueOf(producerConfigOpt))); props.putAll(loadProps(options.valueOf(commandConfigOpt)));
} }
props.putAll(parseKeyValueArgs(options.valuesOf(producerPropertyOpt))); props.putAll(parseKeyValueArgs(options.valuesOf(commandPropertyOpt)));
props.put(BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOpt)); props.put(BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOpt));
props.put(COMPRESSION_TYPE_CONFIG, compressionCodec()); props.put(COMPRESSION_TYPE_CONFIG, compressionCodec());

View File

@ -48,7 +48,9 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions {
private final OptionSpec<Integer> partitionIdOpt; private final OptionSpec<Integer> partitionIdOpt;
private final OptionSpec<String> offsetOpt; private final OptionSpec<String> offsetOpt;
private final OptionSpec<String> messageFormatterOpt; private final OptionSpec<String> messageFormatterOpt;
private final OptionSpec<String> messageFormatterArgOpt; @Deprecated(since = "4.2", forRemoval = true)
private final OptionSpec<String> messageFormatterArgOptDeprecated;
private OptionSpec<String> messageFormatterArgOpt;
private final OptionSpec<String> messageFormatterConfigOpt; private final OptionSpec<String> messageFormatterConfigOpt;
private final OptionSpec<?> resetBeginningOpt; private final OptionSpec<?> resetBeginningOpt;
private final OptionSpec<Integer> maxMessagesOpt; private final OptionSpec<Integer> maxMessagesOpt;
@ -66,6 +68,7 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions {
private final long timeoutMs; private final long timeoutMs;
private final MessageFormatter formatter; private final MessageFormatter formatter;
@SuppressWarnings("deprecation")
public ConsoleConsumerOptions(String[] args) throws IOException { public ConsoleConsumerOptions(String[] args) throws IOException {
super(args); super(args);
topicOpt = parser.accepts("topic", "The topic to consume on.") topicOpt = parser.accepts("topic", "The topic to consume on.")
@ -87,11 +90,23 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions {
.describedAs("consume offset") .describedAs("consume offset")
.ofType(String.class) .ofType(String.class)
.defaultsTo("latest"); .defaultsTo("latest");
OptionSpec<String> consumerPropertyOpt = parser.accepts("consumer-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.") @Deprecated(since = "4.2", forRemoval = true)
OptionSpec<String> consumerPropertyOpt = parser.accepts("consumer-property", "(DEPRECATED) A mechanism to pass user-defined properties in the form key=value to the consumer. " +
"This option will be removed in a future version. Use --command-property instead.")
.withRequiredArg() .withRequiredArg()
.describedAs("consumer_prop") .describedAs("consumer_prop")
.ofType(String.class); .ofType(String.class);
OptionSpec<String> consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file. Note that " + consumerPropertyOpt + " takes precedence over this config.") OptionSpec<String> commandPropertyOpt = parser.accepts("command-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.")
.withRequiredArg()
.describedAs("consumer_prop")
.ofType(String.class);
@Deprecated(since = "4.2", forRemoval = true)
OptionSpec<String> consumerConfigOpt = parser.accepts("consumer.config", "(DEPRECATED) Consumer config properties file. Note that " + commandPropertyOpt + " takes precedence over this config. " +
"This option will be removed in a future version. Use --command-config instead.")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
OptionSpec<String> commandConfigOpt = parser.accepts("command-config", "Consumer config properties file. Note that " + commandPropertyOpt + " takes precedence over this config.")
.withRequiredArg() .withRequiredArg()
.describedAs("config file") .describedAs("config file")
.ofType(String.class); .ofType(String.class);
@ -100,7 +115,28 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions {
.describedAs("class") .describedAs("class")
.ofType(String.class) .ofType(String.class)
.defaultsTo(DefaultMessageFormatter.class.getName()); .defaultsTo(DefaultMessageFormatter.class.getName());
messageFormatterArgOpt = parser.accepts("property", messageFormatterArgOptDeprecated = parser.accepts("property",
"(DEPRECATED) The properties to initialize the message formatter. Default properties include: \n" +
" print.timestamp=true|false\n" +
" print.key=true|false\n" +
" print.offset=true|false\n" +
" print.epoch=true|false\n" +
" print.partition=true|false\n" +
" print.headers=true|false\n" +
" print.value=true|false\n" +
" key.separator=<key.separator>\n" +
" line.separator=<line.separator>\n" +
" headers.separator=<line.separator>\n" +
" null.literal=<null.literal>\n" +
" key.deserializer=<key.deserializer>\n" +
" value.deserializer=<value.deserializer>\n" +
" header.deserializer=<header.deserializer>\n" +
"\nUsers can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers. " +
"\nThis option will be removed in a future version. Use --formatter-property instead.")
.withRequiredArg()
.describedAs("prop")
.ofType(String.class);
messageFormatterArgOpt = parser.accepts("formatter-property",
"The properties to initialize the message formatter. Default properties include: \n" + "The properties to initialize the message formatter. Default properties include: \n" +
" print.timestamp=true|false\n" + " print.timestamp=true|false\n" +
" print.key=true|false\n" + " print.key=true|false\n" +
@ -170,11 +206,25 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions {
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read data from Kafka topics and outputs it to standard output."); CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read data from Kafka topics and outputs it to standard output.");
checkRequiredArgs(); checkRequiredArgs();
if (options.has(consumerPropertyOpt) && options.has(commandPropertyOpt)) {
CommandLineUtils.printUsageAndExit(parser, "Options --consumer-property and --command-property cannot be specified together.");
}
if (options.has(consumerConfigOpt) && options.has(commandConfigOpt)) {
CommandLineUtils.printUsageAndExit(parser, "Options --consumer.config and --command-config cannot be specified together.");
}
Properties consumerPropsFromFile = options.has(consumerConfigOpt) if (options.has(consumerPropertyOpt)) {
? Utils.loadProps(options.valueOf(consumerConfigOpt)) System.out.println("Option --consumer-property is deprecated and will be removed in a future version. Use --command-property instead.");
commandPropertyOpt = consumerPropertyOpt;
}
if (options.has(consumerConfigOpt)) {
System.out.println("Option --consumer.config is deprecated and will be removed in a future version. Use --command-config instead.");
commandConfigOpt = consumerConfigOpt;
}
Properties consumerPropsFromFile = options.has(commandConfigOpt)
? Utils.loadProps(options.valueOf(commandConfigOpt))
: new Properties(); : new Properties();
Properties extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt)); Properties extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(commandPropertyOpt));
Set<String> groupIdsProvided = checkConsumerGroup(consumerPropsFromFile, extraConsumerProps); Set<String> groupIdsProvided = checkConsumerGroup(consumerPropsFromFile, extraConsumerProps);
consumerProps = buildConsumerProps(consumerPropsFromFile, extraConsumerProps, groupIdsProvided); consumerProps = buildConsumerProps(consumerPropsFromFile, extraConsumerProps, groupIdsProvided);
offset = parseOffset(); offset = parseOffset();
@ -323,6 +373,13 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions {
Class<?> messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)); Class<?> messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt));
formatter = (MessageFormatter) messageFormatterClass.getDeclaredConstructor().newInstance(); formatter = (MessageFormatter) messageFormatterClass.getDeclaredConstructor().newInstance();
if (options.has(messageFormatterArgOpt) && options.has(messageFormatterArgOptDeprecated)) {
CommandLineUtils.printUsageAndExit(parser, "Options --property and --formatter-property cannot be specified together.");
}
if (options.has(messageFormatterArgOptDeprecated)) {
System.out.println("Option --property is deprecated and will be removed in a future version. Use --formatter-property instead.");
messageFormatterArgOpt = messageFormatterArgOptDeprecated;
}
Properties formatterArgs = formatterArgs(); Properties formatterArgs = formatterArgs();
Map<String, String> formatterConfigs = new HashMap<>(); Map<String, String> formatterConfigs = new HashMap<>();
for (final String name : formatterArgs.stringPropertyNames()) { for (final String name : formatterArgs.stringPropertyNames()) {

View File

@ -37,7 +37,9 @@ import joptsimple.OptionSpec;
public final class ConsoleShareConsumerOptions extends CommandDefaultOptions { public final class ConsoleShareConsumerOptions extends CommandDefaultOptions {
private final OptionSpec<String> messageFormatterOpt; private final OptionSpec<String> messageFormatterOpt;
private final OptionSpec<String> messageFormatterConfigOpt; private final OptionSpec<String> messageFormatterConfigOpt;
private final OptionSpec<String> messageFormatterArgOpt; @Deprecated(since = "4.2", forRemoval = true)
private final OptionSpec<String> messageFormatterArgOptDeprecated;
private OptionSpec<String> messageFormatterArgOpt;
private final OptionSpec<String> keyDeserializerOpt; private final OptionSpec<String> keyDeserializerOpt;
private final OptionSpec<String> valueDeserializerOpt; private final OptionSpec<String> valueDeserializerOpt;
private final OptionSpec<Integer> maxMessagesOpt; private final OptionSpec<Integer> maxMessagesOpt;
@ -52,17 +54,30 @@ public final class ConsoleShareConsumerOptions extends CommandDefaultOptions {
private final MessageFormatter formatter; private final MessageFormatter formatter;
private final OptionSpec<?> enableSystestEventsLoggingOpt; private final OptionSpec<?> enableSystestEventsLoggingOpt;
@SuppressWarnings("deprecation")
public ConsoleShareConsumerOptions(String[] args) throws IOException { public ConsoleShareConsumerOptions(String[] args) throws IOException {
super(args); super(args);
topicOpt = parser.accepts("topic", "The topic to consume from.") topicOpt = parser.accepts("topic", "The topic to consume from.")
.withRequiredArg() .withRequiredArg()
.describedAs("topic") .describedAs("topic")
.ofType(String.class); .ofType(String.class);
OptionSpec<String> consumerPropertyOpt = parser.accepts("consumer-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.") @Deprecated(since = "4.2", forRemoval = true)
OptionSpec<String> consumerPropertyOpt = parser.accepts("consumer-property", "(DEPRECATED) A mechanism to pass user-defined properties in the form key=value to the consumer. " +
"This option will be removed in a future version. Use --command-property instead.")
.withRequiredArg() .withRequiredArg()
.describedAs("consumer_prop") .describedAs("consumer_prop")
.ofType(String.class); .ofType(String.class);
OptionSpec<String> consumerConfigOpt = parser.accepts("consumer-config", "Consumer config properties file. Note that " + consumerPropertyOpt + " takes precedence over this config.") OptionSpec<String> commandPropertyOpt = parser.accepts("command-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.")
.withRequiredArg()
.describedAs("consumer_prop")
.ofType(String.class);
@Deprecated(since = "4.2", forRemoval = true)
OptionSpec<String> consumerConfigOpt = parser.accepts("consumer-config", "(DEPRECATED) Consumer config properties file. Note that " + commandPropertyOpt + " takes precedence over this config. " +
"This option will be removed in a future version. Use --command-config instead.")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
OptionSpec<String> commandConfigOpt = parser.accepts("command-config", "Consumer config properties file. Note that " + commandPropertyOpt + " takes precedence over this config.")
.withRequiredArg() .withRequiredArg()
.describedAs("config file") .describedAs("config file")
.ofType(String.class); .ofType(String.class);
@ -71,7 +86,29 @@ public final class ConsoleShareConsumerOptions extends CommandDefaultOptions {
.describedAs("class") .describedAs("class")
.ofType(String.class) .ofType(String.class)
.defaultsTo(DefaultMessageFormatter.class.getName()); .defaultsTo(DefaultMessageFormatter.class.getName());
messageFormatterArgOpt = parser.accepts("property", messageFormatterArgOptDeprecated = parser.accepts("property",
"(DEPRECATED) The properties to initialize the message formatter. Default properties include: \n" +
" print.timestamp=true|false\n" +
" print.key=true|false\n" +
" print.offset=true|false\n" +
" print.delivery=true|false\n" +
" print.epoch=true|false\n" +
" print.partition=true|false\n" +
" print.headers=true|false\n" +
" print.value=true|false\n" +
" key.separator=<key.separator>\n" +
" line.separator=<line.separator>\n" +
" headers.separator=<line.separator>\n" +
" null.literal=<null.literal>\n" +
" key.deserializer=<key.deserializer>\n" +
" value.deserializer=<value.deserializer>\n" +
" header.deserializer=<header.deserializer>\n" +
"\nUsers can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers. " +
"\nThis option will be removed in a future version. Use --formatter-property instead.")
.withRequiredArg()
.describedAs("prop")
.ofType(String.class);
messageFormatterArgOpt = parser.accepts("formatter-property",
"The properties to initialize the message formatter. Default properties include: \n" + "The properties to initialize the message formatter. Default properties include: \n" +
" print.timestamp=true|false\n" + " print.timestamp=true|false\n" +
" print.key=true|false\n" + " print.key=true|false\n" +
@ -141,10 +178,26 @@ public final class ConsoleShareConsumerOptions extends CommandDefaultOptions {
CommandLineUtils.printUsageAndExit(parser, "At most one of --reject and --release may be specified."); CommandLineUtils.printUsageAndExit(parser, "At most one of --reject and --release may be specified.");
} }
Properties consumerPropsFromFile = options.has(consumerConfigOpt) if (options.has(consumerPropertyOpt) && options.has(commandPropertyOpt)) {
? Utils.loadProps(options.valueOf(consumerConfigOpt)) CommandLineUtils.printUsageAndExit(parser, "Options --consumer-property and --command-property cannot be specified together.");
}
if (options.has(consumerConfigOpt) && options.has(commandConfigOpt)) {
CommandLineUtils.printUsageAndExit(parser, "Options --consumer-config and --command-config cannot be specified together.");
}
if (options.has(consumerPropertyOpt)) {
System.out.println("Option --consumer-property is deprecated and will be removed in a future version. Use --command-property instead.");
commandPropertyOpt = consumerPropertyOpt;
}
if (options.has(consumerConfigOpt)) {
System.out.println("Option --consumer-config is deprecated and will be removed in a future version. Use --command-config instead.");
commandConfigOpt = consumerConfigOpt;
}
Properties consumerPropsFromFile = options.has(commandConfigOpt)
? Utils.loadProps(options.valueOf(commandConfigOpt))
: new Properties(); : new Properties();
Properties extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt)); Properties extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(commandPropertyOpt));
Set<String> groupIdsProvided = checkShareGroup(consumerPropsFromFile, extraConsumerProps); Set<String> groupIdsProvided = checkShareGroup(consumerPropsFromFile, extraConsumerProps);
consumerProps = buildConsumerProps(consumerPropsFromFile, extraConsumerProps, groupIdsProvided); consumerProps = buildConsumerProps(consumerPropsFromFile, extraConsumerProps, groupIdsProvided);
@ -203,6 +256,13 @@ public final class ConsoleShareConsumerOptions extends CommandDefaultOptions {
Class<?> messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)); Class<?> messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt));
formatter = (MessageFormatter) messageFormatterClass.getDeclaredConstructor().newInstance(); formatter = (MessageFormatter) messageFormatterClass.getDeclaredConstructor().newInstance();
if (options.has(messageFormatterArgOpt) && options.has(messageFormatterArgOptDeprecated)) {
CommandLineUtils.printUsageAndExit(parser, "Options --property and --formatter-property cannot be specified together.");
}
if (options.has(messageFormatterArgOptDeprecated)) {
System.out.println("Option --property is deprecated and will be removed in a future version. Use --formatter-property instead.");
messageFormatterArgOpt = messageFormatterArgOptDeprecated;
}
Properties formatterArgs = formatterArgs(); Properties formatterArgs = formatterArgs();
Map<String, String> formatterConfigs = new HashMap<>(); Map<String, String> formatterConfigs = new HashMap<>();
for (final String name : formatterArgs.stringPropertyNames()) { for (final String name : formatterArgs.stringPropertyNames()) {

View File

@ -34,6 +34,7 @@ import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.file.Files; import java.nio.file.Files;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -57,11 +58,16 @@ public class ConsoleProducerTest {
"--bootstrap-server", "localhost:1002", "--bootstrap-server", "localhost:1002",
"--topic", "t3", "--topic", "t3",
}; };
private static final String[] CLIENT_ID_OVERRIDE = new String[]{ private static final String[] CLIENT_ID_OVERRIDE_DEPRECATED = new String[]{
"--bootstrap-server", "localhost:1001", "--bootstrap-server", "localhost:1001",
"--topic", "t3", "--topic", "t3",
"--producer-property", "client.id=producer-1" "--producer-property", "client.id=producer-1"
}; };
private static final String[] CLIENT_ID_OVERRIDE = new String[]{
"--bootstrap-server", "localhost:1001",
"--topic", "t3",
"--command-property", "client.id=producer-1"
};
private static final String[] BATCH_SIZE_OVERRIDDEN_BY_MAX_PARTITION_MEMORY_BYTES_VALUE = new String[]{ private static final String[] BATCH_SIZE_OVERRIDDEN_BY_MAX_PARTITION_MEMORY_BYTES_VALUE = new String[]{
"--bootstrap-server", "localhost:1002", "--bootstrap-server", "localhost:1002",
"--topic", "t3", "--topic", "t3",
@ -151,8 +157,8 @@ public class ConsoleProducerTest {
} }
@Test @Test
public void testClientIdOverride() throws IOException { public void testClientIdOverrideDeprecated() throws IOException {
ConsoleProducerOptions opts = new ConsoleProducerOptions(CLIENT_ID_OVERRIDE); ConsoleProducerOptions opts = new ConsoleProducerOptions(CLIENT_ID_OVERRIDE_DEPRECATED);
ProducerConfig producerConfig = new ProducerConfig(opts.producerProps()); ProducerConfig producerConfig = new ProducerConfig(opts.producerProps());
assertEquals("producer-1", producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG)); assertEquals("producer-1", producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG));
@ -222,6 +228,107 @@ public class ConsoleProducerTest {
assertEquals(1, reader.closeCount()); assertEquals(1, reader.closeCount());
} }
@Test
public void shouldExitOnBothProducerPropertyAndCommandProperty() {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--producer-property", "acks=all",
"--command-property", "batch.size=16384"
};
try {
assertThrows(IllegalArgumentException.class, () -> new ConsoleProducerOptions(args));
} finally {
Exit.resetExitProcedure();
}
}
@Test
public void shouldExitOnBothProducerConfigAndCommandConfig() throws IOException {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});
Map<String, String> configs = new HashMap<>();
configs.put("acks", "all");
File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
Map<String, String> configs2 = new HashMap<>();
configs2.put("batch.size", "16384");
File propsFile2 = ToolsTestUtils.tempPropertiesFile(configs2);
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--producer.config", propsFile.getAbsolutePath(),
"--command-config", propsFile2.getAbsolutePath()
};
try {
assertThrows(IllegalArgumentException.class, () -> new ConsoleProducerOptions(args));
} finally {
Exit.resetExitProcedure();
}
}
@Test
public void testClientIdOverrideUsingCommandProperty() throws IOException {
ConsoleProducerOptions opts = new ConsoleProducerOptions(CLIENT_ID_OVERRIDE);
ProducerConfig producerConfig = new ProducerConfig(opts.producerProps());
assertEquals("producer-1", producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG));
}
@Test
public void testProducerConfigFromFileUsingCommandConfig() throws IOException {
Map<String, String> configs = new HashMap<>();
configs.put("acks", "all");
configs.put("batch.size", "32768");
File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--command-config", propsFile.getAbsolutePath()
};
ConsoleProducerOptions opts = new ConsoleProducerOptions(args);
ProducerConfig producerConfig = new ProducerConfig(opts.producerProps());
// "all" gets converted to "-1" internally by ProducerConfig
assertEquals("-1", producerConfig.getString(ProducerConfig.ACKS_CONFIG));
assertEquals(32768, producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
}
@Test
public void testCommandPropertyOverridesConfig() throws IOException {
Map<String, String> configs = new HashMap<>();
configs.put("acks", "1");
configs.put("batch.size", "16384");
File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--command-config", propsFile.getAbsolutePath(),
"--command-property", "acks=all"
};
ConsoleProducerOptions opts = new ConsoleProducerOptions(args);
ProducerConfig producerConfig = new ProducerConfig(opts.producerProps());
// Command property should override the config file value
// "all" gets converted to "-1" internally by ProducerConfig
assertEquals("-1", producerConfig.getString(ProducerConfig.ACKS_CONFIG));
// Config file value should still be present
assertEquals(16384, producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
}
public static class TestRecordReader implements RecordReader { public static class TestRecordReader implements RecordReader {
private int configureCount = 0; private int configureCount = 0;
private int closeCount = 0; private int closeCount = 0;

View File

@ -172,7 +172,7 @@ public class ConsoleConsumerOptionsTest {
} }
@Test @Test
public void shouldParseValidConsumerConfigWithAutoOffsetResetLatest() throws IOException { public void shouldParseValidConsumerConfigWithAutoOffsetResetLatestDeprecated() throws IOException {
String[] args = new String[]{ String[] args = new String[]{
"--bootstrap-server", "localhost:9092", "--bootstrap-server", "localhost:9092",
"--topic", "test", "--topic", "test",
@ -189,7 +189,7 @@ public class ConsoleConsumerOptionsTest {
} }
@Test @Test
public void shouldParseValidConsumerConfigWithAutoOffsetResetEarliest() throws IOException { public void shouldParseValidConsumerConfigWithAutoOffsetResetEarliestDeprecated() throws IOException {
String[] args = new String[]{ String[] args = new String[]{
"--bootstrap-server", "localhost:9092", "--bootstrap-server", "localhost:9092",
"--topic", "test", "--topic", "test",
@ -206,7 +206,7 @@ public class ConsoleConsumerOptionsTest {
} }
@Test @Test
public void shouldParseValidConsumerConfigWithAutoOffsetResetAndMatchingFromBeginning() throws IOException { public void shouldParseValidConsumerConfigWithAutoOffsetResetAndMatchingFromBeginningDeprecated() throws IOException {
String[] args = new String[]{ String[] args = new String[]{
"--bootstrap-server", "localhost:9092", "--bootstrap-server", "localhost:9092",
"--topic", "test", "--topic", "test",
@ -240,7 +240,7 @@ public class ConsoleConsumerOptionsTest {
} }
@Test @Test
public void shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginning() { public void shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginningDeprecated() {
Exit.setExitProcedure((code, message) -> { Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message); throw new IllegalArgumentException(message);
}); });
@ -259,7 +259,7 @@ public class ConsoleConsumerOptionsTest {
} }
@Test @Test
public void shouldParseConfigsFromFile() throws IOException { public void shouldParseConfigsFromFileDeprecated() throws IOException {
Map<String, String> configs = new HashMap<>(); Map<String, String> configs = new HashMap<>();
configs.put("request.timeout.ms", "1000"); configs.put("request.timeout.ms", "1000");
configs.put("group.id", "group1"); configs.put("group.id", "group1");
@ -276,80 +276,82 @@ public class ConsoleConsumerOptionsTest {
} }
@Test @Test
public void groupIdsProvidedInDifferentPlacesMustMatch() throws IOException { public void groupIdsProvidedInDifferentPlacesMustMatchDeprecated() throws IOException {
Exit.setExitProcedure((code, message) -> { Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message); throw new IllegalArgumentException(message);
}); });
try {
// different in all three places // different in all three places
File propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file")); File propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file"));
final String[] args = new String[]{ final String[] args = new String[]{
"--bootstrap-server", "localhost:9092", "--bootstrap-server", "localhost:9092",
"--topic", "test", "--topic", "test",
"--group", "group-from-arguments", "--group", "group-from-arguments",
"--consumer-property", "group.id=group-from-properties", "--consumer-property", "group.id=group-from-properties",
"--consumer.config", propsFile.getAbsolutePath() "--consumer.config", propsFile.getAbsolutePath()
}; };
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args)); assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
// the same in all three places // the same in all three places
propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "test-group")); propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "test-group"));
final String[] args1 = new String[]{ final String[] args1 = new String[]{
"--bootstrap-server", "localhost:9092", "--bootstrap-server", "localhost:9092",
"--topic", "test", "--topic", "test",
"--group", "test-group", "--group", "test-group",
"--consumer-property", "group.id=test-group", "--consumer-property", "group.id=test-group",
"--consumer.config", propsFile.getAbsolutePath() "--consumer.config", propsFile.getAbsolutePath()
}; };
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args1); ConsoleConsumerOptions config = new ConsoleConsumerOptions(args1);
Properties props = config.consumerProps(); Properties props = config.consumerProps();
assertEquals("test-group", props.getProperty("group.id")); assertEquals("test-group", props.getProperty("group.id"));
// different via --consumer-property and --consumer.config // different via --consumer-property and --consumer.config
propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file")); propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file"));
final String[] args2 = new String[]{ final String[] args2 = new String[]{
"--bootstrap-server", "localhost:9092", "--bootstrap-server", "localhost:9092",
"--topic", "test", "--topic", "test",
"--consumer-property", "group.id=group-from-properties", "--consumer-property", "group.id=group-from-properties",
"--consumer.config", propsFile.getAbsolutePath() "--consumer.config", propsFile.getAbsolutePath()
}; };
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args2)); assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args2));
// different via --consumer-property and --group // different via --consumer-property and --group
final String[] args3 = new String[]{ final String[] args3 = new String[]{
"--bootstrap-server", "localhost:9092", "--bootstrap-server", "localhost:9092",
"--topic", "test", "--topic", "test",
"--group", "group-from-arguments", "--group", "group-from-arguments",
"--consumer-property", "group.id=group-from-properties" "--consumer-property", "group.id=group-from-properties"
}; };
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args3)); assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args3));
// different via --group and --consumer.config // different via --group and --consumer.config
propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file")); propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file"));
final String[] args4 = new String[]{ final String[] args4 = new String[]{
"--bootstrap-server", "localhost:9092", "--bootstrap-server", "localhost:9092",
"--topic", "test", "--topic", "test",
"--group", "group-from-arguments", "--group", "group-from-arguments",
"--consumer.config", propsFile.getAbsolutePath() "--consumer.config", propsFile.getAbsolutePath()
}; };
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args4)); assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args4));
// via --group only // via --group only
final String[] args5 = new String[]{ final String[] args5 = new String[]{
"--bootstrap-server", "localhost:9092", "--bootstrap-server", "localhost:9092",
"--topic", "test", "--topic", "test",
"--group", "group-from-arguments" "--group", "group-from-arguments"
}; };
config = new ConsoleConsumerOptions(args5); config = new ConsoleConsumerOptions(args5);
props = config.consumerProps(); props = config.consumerProps();
assertEquals("group-from-arguments", props.getProperty("group.id")); assertEquals("group-from-arguments", props.getProperty("group.id"));
} finally {
Exit.resetExitProcedure(); Exit.resetExitProcedure();
}
} }
@Test @Test
@ -508,7 +510,7 @@ public class ConsoleConsumerOptionsTest {
} }
@Test @Test
public void testClientIdOverride() throws IOException { public void testClientIdOverrideDeprecated() throws IOException {
String[] args = new String[]{ String[] args = new String[]{
"--bootstrap-server", "localhost:9092", "--bootstrap-server", "localhost:9092",
"--topic", "test", "--topic", "test",
@ -618,4 +620,234 @@ public class ConsoleConsumerOptionsTest {
"--formatter", formatter, "--formatter", formatter,
}; };
} }
@Test
public void shouldExitOnBothConsumerPropertyAndCommandProperty() {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--consumer-property", "auto.offset.reset=latest",
"--command-property", "session.timeout.ms=10000"
};
try {
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
} finally {
Exit.resetExitProcedure();
}
}
@Test
public void shouldExitOnBothConsumerConfigAndCommandConfig() throws IOException {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});
Map<String, String> configs = new HashMap<>();
configs.put("request.timeout.ms", "1000");
File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
Map<String, String> configs2 = new HashMap<>();
configs2.put("session.timeout.ms", "10000");
File propsFile2 = ToolsTestUtils.tempPropertiesFile(configs2);
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--consumer.config", propsFile.getAbsolutePath(),
"--command-config", propsFile2.getAbsolutePath()
};
try {
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
} finally {
Exit.resetExitProcedure();
}
}
@Test
public void shouldParseValidConsumerConfigWithAutoOffsetResetLatestUsingCommandProperty() throws IOException {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--command-property", "auto.offset.reset=latest"
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
Properties consumerProperties = config.consumerProps();
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("test", config.topicArg().orElse(""));
assertFalse(config.fromBeginning());
assertEquals("latest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
}
@Test
public void shouldParseValidConsumerConfigWithAutoOffsetResetEarliestUsingCommandProperty() throws IOException {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--command-property", "auto.offset.reset=earliest"
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
Properties consumerProperties = config.consumerProps();
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("test", config.topicArg().orElse(""));
assertFalse(config.fromBeginning());
assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
}
@Test
public void shouldParseValidConsumerConfigWithAutoOffsetResetAndMatchingFromBeginningUsingCommandProperty() throws IOException {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--command-property", "auto.offset.reset=earliest",
"--from-beginning"
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
Properties consumerProperties = config.consumerProps();
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("test", config.topicArg().orElse(""));
assertTrue(config.fromBeginning());
assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
}
@Test
public void shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginningUsingCommandProperty() {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--command-property", "auto.offset.reset=latest",
"--from-beginning"
};
try {
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
} finally {
Exit.resetExitProcedure();
}
}
@Test
public void shouldParseConfigsFromFileUsingCommandConfig() throws IOException {
Map<String, String> configs = new HashMap<>();
configs.put("request.timeout.ms", "1000");
configs.put("group.id", "group1");
File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--command-config", propsFile.getAbsolutePath()
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
assertEquals("1000", config.consumerProps().get("request.timeout.ms"));
assertEquals("group1", config.consumerProps().get("group.id"));
}
@Test
public void groupIdsProvidedInDifferentPlacesMustMatchUsingCommandConfig() throws IOException {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});
try {
// different in all three places
File propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file"));
final String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "group-from-arguments",
"--command-property", "group.id=group-from-properties",
"--command-config", propsFile.getAbsolutePath()
};
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args));
// the same in all three places
propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "test-group"));
final String[] args1 = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "test-group",
"--command-property", "group.id=test-group",
"--command-config", propsFile.getAbsolutePath()
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args1);
Properties props = config.consumerProps();
assertEquals("test-group", props.getProperty("group.id"));
// different via --command-property and --command-config
propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file"));
final String[] args2 = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--command-property", "group.id=group-from-properties",
"--command-config", propsFile.getAbsolutePath()
};
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args2));
// different via --command-property and --group
final String[] args3 = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "group-from-arguments",
"--command-property", "group.id=group-from-properties"
};
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args3));
// different via --group and --command-config
propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file"));
final String[] args4 = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "group-from-arguments",
"--command-config", propsFile.getAbsolutePath()
};
assertThrows(IllegalArgumentException.class, () -> new ConsoleConsumerOptions(args4));
// via --group only
final String[] args5 = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "group-from-arguments"
};
config = new ConsoleConsumerOptions(args5);
props = config.consumerProps();
assertEquals("group-from-arguments", props.getProperty("group.id"));
} finally {
Exit.resetExitProcedure();
}
}
@Test
public void testClientIdOverrideUsingCommandProperty() throws IOException {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--from-beginning",
"--command-property", "client.id=consumer-1"
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
Properties consumerProperties = config.consumerProps();
assertEquals("consumer-1", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
}
} }

View File

@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.MockDeserializer;
import org.apache.kafka.tools.ToolsTestUtils; import org.apache.kafka.tools.ToolsTestUtils;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -32,7 +33,9 @@ import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ConsoleShareConsumerOptionsTest { public class ConsoleShareConsumerOptionsTest {
@ -72,7 +75,7 @@ public class ConsoleShareConsumerOptionsTest {
} }
@Test @Test
public void shouldParseValidConsumerConfigWithSessionTimeout() throws IOException { public void shouldParseValidConsumerConfigWithSessionTimeoutDeprecated() throws IOException {
String[] args = new String[]{ String[] args = new String[]{
"--bootstrap-server", "localhost:9092", "--bootstrap-server", "localhost:9092",
"--topic", "test", "--topic", "test",
@ -88,7 +91,7 @@ public class ConsoleShareConsumerOptionsTest {
} }
@Test @Test
public void shouldParseConfigsFromFile() throws IOException { public void shouldParseConfigsFromFileDeprecated() throws IOException {
Map<String, String> configs = new HashMap<>(); Map<String, String> configs = new HashMap<>();
configs.put("request.timeout.ms", "1000"); configs.put("request.timeout.ms", "1000");
configs.put("group.id", "group1"); configs.put("group.id", "group1");
@ -109,80 +112,82 @@ public class ConsoleShareConsumerOptionsTest {
} }
@Test @Test
public void groupIdsProvidedInDifferentPlacesMustMatch() throws IOException { public void groupIdsProvidedInDifferentPlacesMustMatchDeprecated() throws IOException {
Exit.setExitProcedure((code, message) -> { Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message); throw new IllegalArgumentException(message);
}); });
// different in all three places try {
File propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file")); // different in all three places
final String[] args = new String[]{ File propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file"));
"--bootstrap-server", "localhost:9092", final String[] args = new String[]{
"--topic", "test", "--bootstrap-server", "localhost:9092",
"--group", "group-from-arguments", "--topic", "test",
"--consumer-property", "group.id=group-from-properties", "--group", "group-from-arguments",
"--consumer-config", propsFile.getAbsolutePath() "--consumer-property", "group.id=group-from-properties",
}; "--consumer-config", propsFile.getAbsolutePath()
};
assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args)); assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args));
// the same in all three places // the same in all three places
propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "test-group")); propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "test-group"));
final String[] args1 = new String[]{ final String[] args1 = new String[]{
"--bootstrap-server", "localhost:9092", "--bootstrap-server", "localhost:9092",
"--topic", "test", "--topic", "test",
"--group", "test-group", "--group", "test-group",
"--consumer-property", "group.id=test-group", "--consumer-property", "group.id=test-group",
"--consumer-config", propsFile.getAbsolutePath() "--consumer-config", propsFile.getAbsolutePath()
}; };
ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args1); ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args1);
Properties props = config.consumerProps(); Properties props = config.consumerProps();
assertEquals("test-group", props.getProperty("group.id")); assertEquals("test-group", props.getProperty("group.id"));
// different via --consumer-property and --consumer-config // different via --consumer-property and --consumer-config
propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file")); propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file"));
final String[] args2 = new String[]{ final String[] args2 = new String[]{
"--bootstrap-server", "localhost:9092", "--bootstrap-server", "localhost:9092",
"--topic", "test", "--topic", "test",
"--consumer-property", "group.id=group-from-properties", "--consumer-property", "group.id=group-from-properties",
"--consumer-config", propsFile.getAbsolutePath() "--consumer-config", propsFile.getAbsolutePath()
}; };
assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args2)); assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args2));
// different via --consumer-property and --group // different via --consumer-property and --group
final String[] args3 = new String[]{ final String[] args3 = new String[]{
"--bootstrap-server", "localhost:9092", "--bootstrap-server", "localhost:9092",
"--topic", "test", "--topic", "test",
"--group", "group-from-arguments", "--group", "group-from-arguments",
"--consumer-property", "group.id=group-from-properties" "--consumer-property", "group.id=group-from-properties"
}; };
assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args3)); assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args3));
// different via --group and --consumer-config // different via --group and --consumer-config
propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file")); propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file"));
final String[] args4 = new String[]{ final String[] args4 = new String[]{
"--bootstrap-server", "localhost:9092", "--bootstrap-server", "localhost:9092",
"--topic", "test", "--topic", "test",
"--group", "group-from-arguments", "--group", "group-from-arguments",
"--consumer-config", propsFile.getAbsolutePath() "--consumer-config", propsFile.getAbsolutePath()
}; };
assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args4)); assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args4));
// via --group only // via --group only
final String[] args5 = new String[]{ final String[] args5 = new String[]{
"--bootstrap-server", "localhost:9092", "--bootstrap-server", "localhost:9092",
"--topic", "test", "--topic", "test",
"--group", "group-from-arguments" "--group", "group-from-arguments"
}; };
config = new ConsoleShareConsumerOptions(args5); config = new ConsoleShareConsumerOptions(args5);
props = config.consumerProps(); props = config.consumerProps();
assertEquals("group-from-arguments", props.getProperty("group.id")); assertEquals("group-from-arguments", props.getProperty("group.id"));
} finally {
Exit.resetExitProcedure(); Exit.resetExitProcedure();
}
} }
@Test @Test
@ -203,7 +208,7 @@ public class ConsoleShareConsumerOptionsTest {
} }
@Test @Test
public void testClientIdOverride() throws IOException { public void testClientIdOverrideDeprecated() throws IOException {
String[] args = new String[]{ String[] args = new String[]{
"--bootstrap-server", "localhost:9092", "--bootstrap-server", "localhost:9092",
"--topic", "test", "--topic", "test",
@ -216,6 +221,56 @@ public class ConsoleShareConsumerOptionsTest {
assertEquals("consumer-1", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); assertEquals("consumer-1", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
} }
@Test
public void testCustomPropertyShouldBePassedToConfigureMethod() throws Exception {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--property", "print.key=true",
"--property", "key.deserializer=org.apache.kafka.test.MockDeserializer",
"--property", "key.deserializer.my-props=abc"
};
ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args);
assertInstanceOf(DefaultMessageFormatter.class, config.formatter());
assertTrue(config.formatterArgs().containsKey("key.deserializer.my-props"));
DefaultMessageFormatter formatter = (DefaultMessageFormatter) config.formatter();
assertTrue(formatter.keyDeserializer().isPresent());
assertInstanceOf(MockDeserializer.class, formatter.keyDeserializer().get());
MockDeserializer keyDeserializer = (MockDeserializer) formatter.keyDeserializer().get();
assertEquals(1, keyDeserializer.configs.size());
assertEquals("abc", keyDeserializer.configs.get("my-props"));
assertTrue(keyDeserializer.isKey);
}
@Test
public void testCustomConfigShouldBePassedToConfigureMethod() throws Exception {
Map<String, String> configs = new HashMap<>();
configs.put("key.deserializer.my-props", "abc");
configs.put("print.key", "false");
File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--property", "print.key=true",
"--property", "key.deserializer=org.apache.kafka.test.MockDeserializer",
"--formatter-config", propsFile.getAbsolutePath()
};
ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args);
assertInstanceOf(DefaultMessageFormatter.class, config.formatter());
assertTrue(config.formatterArgs().containsKey("key.deserializer.my-props"));
DefaultMessageFormatter formatter = (DefaultMessageFormatter) config.formatter();
assertTrue(formatter.keyDeserializer().isPresent());
assertInstanceOf(MockDeserializer.class, formatter.keyDeserializer().get());
MockDeserializer keyDeserializer = (MockDeserializer) formatter.keyDeserializer().get();
assertEquals(1, keyDeserializer.configs.size());
assertEquals("abc", keyDeserializer.configs.get("my-props"));
assertTrue(keyDeserializer.isKey);
}
@Test @Test
public void testDefaultClientId() throws IOException { public void testDefaultClientId() throws IOException {
String[] args = new String[]{ String[] args = new String[]{
@ -271,4 +326,182 @@ public class ConsoleShareConsumerOptionsTest {
Exit.resetExitProcedure(); Exit.resetExitProcedure();
} }
} }
@Test
public void shouldExitOnBothConsumerPropertyAndCommandProperty() {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--consumer-property", "session.timeout.ms=10000",
"--command-property", "request.timeout.ms=30000"
};
try {
assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args));
} finally {
Exit.resetExitProcedure();
}
}
@Test
public void shouldExitOnBothConsumerConfigAndCommandConfig() throws IOException {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});
Map<String, String> configs = new HashMap<>();
configs.put("request.timeout.ms", "1000");
File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
Map<String, String> configs2 = new HashMap<>();
configs2.put("session.timeout.ms", "10000");
File propsFile2 = ToolsTestUtils.tempPropertiesFile(configs2);
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--consumer-config", propsFile.getAbsolutePath(),
"--command-config", propsFile2.getAbsolutePath()
};
try {
assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args));
} finally {
Exit.resetExitProcedure();
}
}
@Test
public void shouldParseValidConsumerConfigWithSessionTimeout() throws IOException {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--command-property", "session.timeout.ms=10000"
};
ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args);
Properties consumerProperties = config.consumerProps();
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("test", config.topicArg());
assertEquals("10000", consumerProperties.getProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG));
}
@Test
public void shouldParseConfigsFromFile() throws IOException {
Map<String, String> configs = new HashMap<>();
configs.put("request.timeout.ms", "1000");
configs.put("group.id", "group1");
File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--command-config", propsFile.getAbsolutePath()
};
ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args);
// KafkaShareConsumer uses Utils.propsToMap to convert the properties to a map,
// so using the same method to check the map has the expected values
Map<String, Object> configMap = Utils.propsToMap(config.consumerProps());
assertEquals("1000", configMap.get("request.timeout.ms"));
assertEquals("group1", configMap.get("group.id"));
}
@Test
public void groupIdsProvidedInDifferentPlacesMustMatch() throws IOException {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});
try {
// different in all three places
File propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file"));
final String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "group-from-arguments",
"--command-property", "group.id=group-from-properties",
"--command-config", propsFile.getAbsolutePath()
};
assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args));
// the same in all three places
propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "test-group"));
final String[] args1 = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "test-group",
"--command-property", "group.id=test-group",
"--command-config", propsFile.getAbsolutePath()
};
ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args1);
Properties props = config.consumerProps();
assertEquals("test-group", props.getProperty("group.id"));
// different via --command-property and --command-config
propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file"));
final String[] args2 = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--command-property", "group.id=group-from-properties",
"--command-config", propsFile.getAbsolutePath()
};
assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args2));
// different via --command-property and --group
final String[] args3 = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "group-from-arguments",
"--command-property", "group.id=group-from-properties"
};
assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args3));
// different via --group and --command-config
propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file"));
final String[] args4 = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "group-from-arguments",
"--command-config", propsFile.getAbsolutePath()
};
assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args4));
// via --group only
final String[] args5 = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--group", "group-from-arguments"
};
config = new ConsoleShareConsumerOptions(args5);
props = config.consumerProps();
assertEquals("group-from-arguments", props.getProperty("group.id"));
} finally {
Exit.resetExitProcedure();
}
}
@Test
public void testClientIdOverride() throws IOException {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--command-property", "client.id=consumer-1"
};
ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args);
Properties consumerProperties = config.consumerProps();
assertEquals("consumer-1", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
}
} }