mirror of https://github.com/apache/kafka.git
KAFKA-16723 : Added kafka-console-share-consumer.sh tool. (#16860)
Added kafka-console-share-consumer.sh which will start a share consumer on a share group. This tool helps to read data from Kafka topics using share groups and outputs it to standard output. Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
parent
011d35237c
commit
8cfd631264
|
@ -0,0 +1,21 @@
|
|||
#!/bin/bash
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
|
||||
export KAFKA_HEAP_OPTS="-Xmx512M"
|
||||
fi
|
||||
|
||||
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.consumer.ConsoleShareConsumer "$@"
|
|
@ -0,0 +1,20 @@
|
|||
@echo off
|
||||
rem Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
rem contributor license agreements. See the NOTICE file distributed with
|
||||
rem this work for additional information regarding copyright ownership.
|
||||
rem The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
rem (the "License"); you may not use this file except in compliance with
|
||||
rem the License. You may obtain a copy of the License at
|
||||
rem
|
||||
rem http://www.apache.org/licenses/LICENSE-2.0
|
||||
rem
|
||||
rem Unless required by applicable law or agreed to in writing, software
|
||||
rem distributed under the License is distributed on an "AS IS" BASIS,
|
||||
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
rem See the License for the specific language governing permissions and
|
||||
rem limitations under the License.
|
||||
|
||||
SetLocal
|
||||
set KAFKA_HEAP_OPTS=-Xmx512M
|
||||
"%~dp0kafka-run-class.bat" org.apache.kafka.tools.consumer.ConsoleShareConsumer %*
|
||||
EndLocal
|
|
@ -0,0 +1,186 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.tools.consumer;
|
||||
|
||||
import org.apache.kafka.clients.consumer.AcknowledgeType;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
|
||||
import org.apache.kafka.clients.consumer.ShareConsumer;
|
||||
import org.apache.kafka.common.MessageFormatter;
|
||||
import org.apache.kafka.common.errors.AuthenticationException;
|
||||
import org.apache.kafka.common.errors.TimeoutException;
|
||||
import org.apache.kafka.common.errors.WakeupException;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.utils.Exit;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.PrintStream;
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
|
||||
/**
|
||||
* Share Consumer that dumps messages to standard out.
|
||||
*/
|
||||
public class ConsoleShareConsumer {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ConsoleShareConsumer.class);
|
||||
private static final CountDownLatch SHUTDOWN_LATCH = new CountDownLatch(1);
|
||||
|
||||
static int messageCount = 0;
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
ConsoleShareConsumerOptions opts = new ConsoleShareConsumerOptions(args);
|
||||
try {
|
||||
run(opts);
|
||||
} catch (AuthenticationException ae) {
|
||||
LOG.error("Authentication failed: terminating consumer process", ae);
|
||||
Exit.exit(1);
|
||||
} catch (Throwable t) {
|
||||
LOG.error("Unknown error when running consumer: ", t);
|
||||
Exit.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
public static void run(ConsoleShareConsumerOptions opts) {
|
||||
messageCount = 0;
|
||||
long timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : Long.MAX_VALUE;
|
||||
|
||||
ShareConsumer<byte[], byte[]> consumer = new KafkaShareConsumer<>(opts.consumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
|
||||
ConsumerWrapper consumerWrapper = new ConsumerWrapper(opts.topicArg(), consumer, timeoutMs);
|
||||
|
||||
addShutdownHook(consumerWrapper);
|
||||
|
||||
try {
|
||||
process(opts.maxMessages(), opts.formatter(), consumerWrapper, System.out, opts.rejectMessageOnError(), opts.acknowledgeType());
|
||||
} finally {
|
||||
consumerWrapper.cleanup();
|
||||
opts.formatter().close();
|
||||
reportRecordCount();
|
||||
|
||||
SHUTDOWN_LATCH.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
private static void addShutdownHook(ConsumerWrapper consumer) {
|
||||
Exit.addShutdownHook("consumer-shutdown-hook", () -> {
|
||||
try {
|
||||
consumer.wakeup();
|
||||
SHUTDOWN_LATCH.await();
|
||||
} catch (Throwable t) {
|
||||
LOG.error("Exception while running shutdown hook: ", t);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
static void process(int maxMessages, MessageFormatter formatter, ConsumerWrapper consumer, PrintStream output,
|
||||
boolean rejectMessageOnError, AcknowledgeType acknowledgeType) {
|
||||
while (messageCount < maxMessages || maxMessages == -1) {
|
||||
ConsumerRecord<byte[], byte[]> msg;
|
||||
try {
|
||||
msg = consumer.receive();
|
||||
} catch (WakeupException we) {
|
||||
LOG.trace("Caught WakeupException because consumer is shutdown, ignore and terminate.");
|
||||
// Consumer will be closed
|
||||
return;
|
||||
} catch (Throwable t) {
|
||||
LOG.error("Error processing message, terminating consumer process: ", t);
|
||||
// Consumer will be closed
|
||||
return;
|
||||
}
|
||||
messageCount += 1;
|
||||
try {
|
||||
formatter.writeTo(new ConsumerRecord<>(msg.topic(), msg.partition(), msg.offset(), msg.timestamp(), msg.timestampType(),
|
||||
0, 0, msg.key(), msg.value(), msg.headers(), Optional.empty()), output);
|
||||
consumer.acknowledge(msg, acknowledgeType);
|
||||
} catch (Throwable t) {
|
||||
if (rejectMessageOnError) {
|
||||
LOG.error("Error processing message, rejecting this message: ", t);
|
||||
consumer.acknowledge(msg, AcknowledgeType.REJECT);
|
||||
} else {
|
||||
// Consumer will be closed
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
if (checkErr(output)) {
|
||||
// Consumer will be closed
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void reportRecordCount() {
|
||||
System.err.println("Processed a total of " + messageCount + " messages");
|
||||
}
|
||||
|
||||
private static boolean checkErr(PrintStream output) {
|
||||
boolean gotError = output.checkError();
|
||||
if (gotError) {
|
||||
// This means no one is listening to our output stream anymore, time to shut down
|
||||
System.err.println("Unable to write to standard out, closing consumer.");
|
||||
}
|
||||
return gotError;
|
||||
}
|
||||
|
||||
public static class ConsumerWrapper {
|
||||
final String topic;
|
||||
final ShareConsumer<byte[], byte[]> consumer;
|
||||
final long timeoutMs;
|
||||
final Time time = Time.SYSTEM;
|
||||
|
||||
Iterator<ConsumerRecord<byte[], byte[]>> recordIter = Collections.emptyIterator();
|
||||
|
||||
public ConsumerWrapper(String topic,
|
||||
ShareConsumer<byte[], byte[]> consumer,
|
||||
long timeoutMs) {
|
||||
this.topic = topic;
|
||||
this.consumer = consumer;
|
||||
this.timeoutMs = timeoutMs;
|
||||
|
||||
consumer.subscribe(Collections.singletonList(topic));
|
||||
}
|
||||
|
||||
ConsumerRecord<byte[], byte[]> receive() {
|
||||
long startTimeMs = time.milliseconds();
|
||||
while (!recordIter.hasNext()) {
|
||||
recordIter = consumer.poll(Duration.ofMillis(timeoutMs)).iterator();
|
||||
if (!recordIter.hasNext() && (time.milliseconds() - startTimeMs > timeoutMs)) {
|
||||
throw new TimeoutException();
|
||||
}
|
||||
}
|
||||
return recordIter.next();
|
||||
}
|
||||
|
||||
void acknowledge(ConsumerRecord<byte[], byte[]> record, AcknowledgeType acknowledgeType) {
|
||||
consumer.acknowledge(record, acknowledgeType);
|
||||
}
|
||||
|
||||
void wakeup() {
|
||||
this.consumer.wakeup();
|
||||
}
|
||||
|
||||
void cleanup() {
|
||||
this.consumer.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,268 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.tools.consumer;
|
||||
|
||||
import org.apache.kafka.clients.consumer.AcknowledgeType;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.MessageFormatter;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.server.util.CommandDefaultOptions;
|
||||
import org.apache.kafka.server.util.CommandLineUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import joptsimple.OptionException;
|
||||
import joptsimple.OptionSpec;
|
||||
|
||||
public final class ConsoleShareConsumerOptions extends CommandDefaultOptions {
|
||||
private final OptionSpec<String> messageFormatterOpt;
|
||||
private final OptionSpec<String> messageFormatterConfigOpt;
|
||||
private final OptionSpec<String> messageFormatterArgOpt;
|
||||
private final OptionSpec<String> keyDeserializerOpt;
|
||||
private final OptionSpec<String> valueDeserializerOpt;
|
||||
private final OptionSpec<Integer> maxMessagesOpt;
|
||||
private final OptionSpec<?> rejectMessageOnErrorOpt;
|
||||
private final OptionSpec<?> rejectOpt;
|
||||
private final OptionSpec<?> releaseOpt;
|
||||
private final OptionSpec<String> topicOpt;
|
||||
private final OptionSpec<Integer> timeoutMsOpt;
|
||||
private final OptionSpec<String> bootstrapServerOpt;
|
||||
private final OptionSpec<String> groupIdOpt;
|
||||
private final Properties consumerProps;
|
||||
private final MessageFormatter formatter;
|
||||
|
||||
public ConsoleShareConsumerOptions(String[] args) throws IOException {
|
||||
super(args);
|
||||
topicOpt = parser.accepts("topic", "The topic to consume from.")
|
||||
.withRequiredArg()
|
||||
.describedAs("topic")
|
||||
.ofType(String.class);
|
||||
OptionSpec<String> consumerPropertyOpt = parser.accepts("consumer-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.")
|
||||
.withRequiredArg()
|
||||
.describedAs("consumer_prop")
|
||||
.ofType(String.class);
|
||||
OptionSpec<String> consumerConfigOpt = parser.accepts("consumer-config", "Consumer config properties file. Note that " + consumerPropertyOpt + " takes precedence over this config.")
|
||||
.withRequiredArg()
|
||||
.describedAs("config file")
|
||||
.ofType(String.class);
|
||||
messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting Kafka messages for display.")
|
||||
.withRequiredArg()
|
||||
.describedAs("class")
|
||||
.ofType(String.class)
|
||||
.defaultsTo(DefaultMessageFormatter.class.getName());
|
||||
messageFormatterArgOpt = parser.accepts("property",
|
||||
"The properties to initialize the message formatter. Default properties include: \n" +
|
||||
" print.timestamp=true|false\n" +
|
||||
" print.key=true|false\n" +
|
||||
" print.offset=true|false\n" +
|
||||
" print.partition=true|false\n" +
|
||||
" print.headers=true|false\n" +
|
||||
" print.value=true|false\n" +
|
||||
" key.separator=<key.separator>\n" +
|
||||
" line.separator=<line.separator>\n" +
|
||||
" headers.separator=<line.separator>\n" +
|
||||
" null.literal=<null.literal>\n" +
|
||||
" key.deserializer=<key.deserializer>\n" +
|
||||
" value.deserializer=<value.deserializer>\n" +
|
||||
" header.deserializer=<header.deserializer>\n" +
|
||||
"\nUsers can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers.")
|
||||
.withRequiredArg()
|
||||
.describedAs("prop")
|
||||
.ofType(String.class);
|
||||
messageFormatterConfigOpt = parser.accepts("formatter-config", "Config properties file to initialize the message formatter. Note that " + messageFormatterArgOpt + " takes precedence over this config.")
|
||||
.withRequiredArg()
|
||||
.describedAs("config file")
|
||||
.ofType(String.class);
|
||||
maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.")
|
||||
.withRequiredArg()
|
||||
.describedAs("num_messages")
|
||||
.ofType(Integer.class);
|
||||
timeoutMsOpt = parser.accepts("timeout-ms", "If specified, exit if no message is available for consumption for the specified interval.")
|
||||
.withRequiredArg()
|
||||
.describedAs("timeout_ms")
|
||||
.ofType(Integer.class);
|
||||
rejectOpt = parser.accepts("reject", "If specified, messages are rejected as they are consumed.");
|
||||
releaseOpt = parser.accepts("release", "If specified, messages are released as they are consumed.");
|
||||
rejectMessageOnErrorOpt = parser.accepts("reject-message-on-error", "If there is an error when processing a message, " +
|
||||
"reject it instead of halting.");
|
||||
bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The server(s) to connect to.")
|
||||
.withRequiredArg()
|
||||
.describedAs("server to connect to")
|
||||
.ofType(String.class);
|
||||
keyDeserializerOpt = parser.accepts("key-deserializer", "The name of the class to use for deserializing keys.")
|
||||
.withRequiredArg()
|
||||
.describedAs("deserializer for key")
|
||||
.ofType(String.class);
|
||||
valueDeserializerOpt = parser.accepts("value-deserializer", "The name of the class to use for deserializing values.")
|
||||
.withRequiredArg()
|
||||
.describedAs("deserializer for values")
|
||||
.ofType(String.class);
|
||||
groupIdOpt = parser.accepts("group", "The share group id of the consumer.")
|
||||
.withRequiredArg()
|
||||
.describedAs("share group id")
|
||||
.ofType(String.class);
|
||||
|
||||
try {
|
||||
options = parser.parse(args);
|
||||
} catch (OptionException oe) {
|
||||
CommandLineUtils.printUsageAndExit(parser, oe.getMessage());
|
||||
}
|
||||
|
||||
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read data from Kafka topics using share groups and outputs it to standard output.");
|
||||
|
||||
checkRequiredArgs();
|
||||
|
||||
if (options.has(rejectOpt) && options.has(releaseOpt)) {
|
||||
CommandLineUtils.printUsageAndExit(parser, "At most one of --reject and --release may be specified.");
|
||||
}
|
||||
|
||||
Properties consumerPropsFromFile = options.has(consumerConfigOpt)
|
||||
? Utils.loadProps(options.valueOf(consumerConfigOpt))
|
||||
: new Properties();
|
||||
Properties extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt));
|
||||
|
||||
Set<String> groupIdsProvided = checkShareGroup(consumerPropsFromFile, extraConsumerProps);
|
||||
consumerProps = buildConsumerProps(consumerPropsFromFile, extraConsumerProps, groupIdsProvided);
|
||||
formatter = buildFormatter();
|
||||
}
|
||||
|
||||
private void checkRequiredArgs() {
|
||||
if (!options.has(topicOpt)) {
|
||||
CommandLineUtils.printUsageAndExit(parser, "--topic is a required argument");
|
||||
}
|
||||
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt);
|
||||
}
|
||||
|
||||
private Set<String> checkShareGroup(Properties consumerPropsFromFile, Properties extraConsumerProps) {
|
||||
// if the group id is provided in more than place (through different means) all values must be the same
|
||||
Set<String> groupIdsProvided = new HashSet<>();
|
||||
if (options.has(groupIdOpt)) {
|
||||
groupIdsProvided.add(options.valueOf(groupIdOpt));
|
||||
}
|
||||
|
||||
if (consumerPropsFromFile.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
|
||||
groupIdsProvided.add((String) consumerPropsFromFile.get(ConsumerConfig.GROUP_ID_CONFIG));
|
||||
}
|
||||
|
||||
if (extraConsumerProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
|
||||
groupIdsProvided.add(extraConsumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
|
||||
}
|
||||
// The default value for group.id is "console-share-consumer"
|
||||
if (groupIdsProvided.isEmpty()) {
|
||||
groupIdsProvided.add("console-share-consumer");
|
||||
} else if (groupIdsProvided.size() > 1) {
|
||||
CommandLineUtils.printUsageAndExit(parser, "The group ids provided in different places (directly using '--group', "
|
||||
+ "via '--consumer-property', or via '--consumer-config') do not match. "
|
||||
+ "Detected group ids: "
|
||||
+ groupIdsProvided.stream().map(group -> "'" + group + "'").collect(Collectors.joining(", ")));
|
||||
}
|
||||
return groupIdsProvided;
|
||||
}
|
||||
|
||||
private Properties buildConsumerProps(Properties consumerPropsFromFile, Properties extraConsumerProps, Set<String> groupIdsProvided) {
|
||||
Properties consumerProps = new Properties(consumerPropsFromFile);
|
||||
consumerProps.putAll(extraConsumerProps);
|
||||
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer());
|
||||
if (consumerProps.getProperty(ConsumerConfig.CLIENT_ID_CONFIG) == null) {
|
||||
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "console-share-consumer");
|
||||
}
|
||||
|
||||
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdsProvided.iterator().next());
|
||||
return consumerProps;
|
||||
}
|
||||
|
||||
private MessageFormatter buildFormatter() {
|
||||
MessageFormatter formatter = null;
|
||||
try {
|
||||
Class<?> messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt));
|
||||
formatter = (MessageFormatter) messageFormatterClass.getDeclaredConstructor().newInstance();
|
||||
|
||||
Properties formatterArgs = formatterArgs();
|
||||
Map<String, String> formatterConfigs = new HashMap<>();
|
||||
for (final String name : formatterArgs.stringPropertyNames()) {
|
||||
formatterConfigs.put(name, formatterArgs.getProperty(name));
|
||||
}
|
||||
|
||||
formatter.configure(formatterConfigs);
|
||||
|
||||
} catch (Exception e) {
|
||||
CommandLineUtils.printUsageAndExit(parser, e.getMessage());
|
||||
}
|
||||
return formatter;
|
||||
}
|
||||
|
||||
Properties consumerProps() {
|
||||
return consumerProps;
|
||||
}
|
||||
|
||||
boolean rejectMessageOnError() {
|
||||
return options.has(rejectMessageOnErrorOpt);
|
||||
}
|
||||
|
||||
AcknowledgeType acknowledgeType() {
|
||||
if (options.has(rejectOpt)) {
|
||||
return AcknowledgeType.REJECT;
|
||||
} else if (options.has(releaseOpt)) {
|
||||
return AcknowledgeType.RELEASE;
|
||||
} else {
|
||||
return AcknowledgeType.ACCEPT;
|
||||
}
|
||||
}
|
||||
|
||||
String topicArg() {
|
||||
return options.valueOf(topicOpt);
|
||||
}
|
||||
|
||||
int maxMessages() {
|
||||
return options.has(maxMessagesOpt) ? options.valueOf(maxMessagesOpt) : -1;
|
||||
}
|
||||
|
||||
int timeoutMs() {
|
||||
return options.has(timeoutMsOpt) ? options.valueOf(timeoutMsOpt) : -1;
|
||||
}
|
||||
|
||||
String bootstrapServer() {
|
||||
return options.valueOf(bootstrapServerOpt);
|
||||
}
|
||||
|
||||
Properties formatterArgs() throws IOException {
|
||||
Properties formatterArgs = options.has(messageFormatterConfigOpt)
|
||||
? Utils.loadProps(options.valueOf(messageFormatterConfigOpt))
|
||||
: new Properties();
|
||||
String keyDeserializer = options.valueOf(keyDeserializerOpt);
|
||||
if (keyDeserializer != null && !keyDeserializer.isEmpty()) {
|
||||
formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
|
||||
}
|
||||
String valueDeserializer = options.valueOf(valueDeserializerOpt);
|
||||
if (valueDeserializer != null && !valueDeserializer.isEmpty()) {
|
||||
formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
|
||||
}
|
||||
formatterArgs.putAll(CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt)));
|
||||
|
||||
return formatterArgs;
|
||||
}
|
||||
|
||||
MessageFormatter formatter() {
|
||||
return formatter;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,271 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.tools.consumer;
|
||||
|
||||
import org.apache.kafka.clients.consumer.AcknowledgeType;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.utils.Exit;
|
||||
import org.apache.kafka.tools.ToolsTestUtils;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
public class ConsoleShareConsumerOptionsTest {
|
||||
|
||||
@Test
|
||||
public void shouldParseValidConsumerValidConfig() throws IOException {
|
||||
String[] args = new String[]{
|
||||
"--bootstrap-server", "localhost:9092",
|
||||
"--topic", "test",
|
||||
};
|
||||
|
||||
ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args);
|
||||
|
||||
assertEquals("localhost:9092", config.bootstrapServer());
|
||||
assertEquals("test", config.topicArg());
|
||||
assertFalse(config.rejectMessageOnError());
|
||||
assertEquals(-1, config.maxMessages());
|
||||
assertEquals(-1, config.timeoutMs());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldExitOnUnrecognizedNewConsumerOption() {
|
||||
Exit.setExitProcedure((code, message) -> {
|
||||
throw new IllegalArgumentException(message);
|
||||
});
|
||||
|
||||
String[] args = new String[]{
|
||||
"--new-consumer",
|
||||
"--bootstrap-server", "localhost:9092",
|
||||
"--topic", "test"
|
||||
};
|
||||
|
||||
try {
|
||||
assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args));
|
||||
} finally {
|
||||
Exit.resetExitProcedure();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldParseValidConsumerConfigWithSessionTimeout() throws IOException {
|
||||
String[] args = new String[]{
|
||||
"--bootstrap-server", "localhost:9092",
|
||||
"--topic", "test",
|
||||
"--consumer-property", "session.timeout.ms=10000"
|
||||
};
|
||||
|
||||
ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args);
|
||||
Properties consumerProperties = config.consumerProps();
|
||||
|
||||
assertEquals("localhost:9092", config.bootstrapServer());
|
||||
assertEquals("test", config.topicArg());
|
||||
assertEquals("10000", consumerProperties.getProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldParseConfigsFromFile() throws IOException {
|
||||
Map<String, String> configs = new HashMap<>();
|
||||
configs.put("request.timeout.ms", "1000");
|
||||
configs.put("group.id", "group1");
|
||||
File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
|
||||
String[] args = new String[]{
|
||||
"--bootstrap-server", "localhost:9092",
|
||||
"--topic", "test",
|
||||
"--consumer-config", propsFile.getAbsolutePath()
|
||||
};
|
||||
|
||||
ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args);
|
||||
|
||||
assertEquals("1000", config.consumerProps().getProperty("request.timeout.ms"));
|
||||
assertEquals("group1", config.consumerProps().getProperty("group.id"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void groupIdsProvidedInDifferentPlacesMustMatch() throws IOException {
|
||||
Exit.setExitProcedure((code, message) -> {
|
||||
throw new IllegalArgumentException(message);
|
||||
});
|
||||
|
||||
// different in all three places
|
||||
File propsFile = ToolsTestUtils.tempPropertiesFile(Collections.singletonMap("group.id", "group-from-file"));
|
||||
final String[] args = new String[]{
|
||||
"--bootstrap-server", "localhost:9092",
|
||||
"--topic", "test",
|
||||
"--group", "group-from-arguments",
|
||||
"--consumer-property", "group.id=group-from-properties",
|
||||
"--consumer-config", propsFile.getAbsolutePath()
|
||||
};
|
||||
|
||||
assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args));
|
||||
|
||||
// the same in all three places
|
||||
propsFile = ToolsTestUtils.tempPropertiesFile(Collections.singletonMap("group.id", "test-group"));
|
||||
final String[] args1 = new String[]{
|
||||
"--bootstrap-server", "localhost:9092",
|
||||
"--topic", "test",
|
||||
"--group", "test-group",
|
||||
"--consumer-property", "group.id=test-group",
|
||||
"--consumer-config", propsFile.getAbsolutePath()
|
||||
};
|
||||
|
||||
ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args1);
|
||||
Properties props = config.consumerProps();
|
||||
assertEquals("test-group", props.getProperty("group.id"));
|
||||
|
||||
// different via --consumer-property and --consumer-config
|
||||
propsFile = ToolsTestUtils.tempPropertiesFile(Collections.singletonMap("group.id", "group-from-file"));
|
||||
final String[] args2 = new String[]{
|
||||
"--bootstrap-server", "localhost:9092",
|
||||
"--topic", "test",
|
||||
"--consumer-property", "group.id=group-from-properties",
|
||||
"--consumer-config", propsFile.getAbsolutePath()
|
||||
};
|
||||
|
||||
assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args2));
|
||||
|
||||
// different via --consumer-property and --group
|
||||
final String[] args3 = new String[]{
|
||||
"--bootstrap-server", "localhost:9092",
|
||||
"--topic", "test",
|
||||
"--group", "group-from-arguments",
|
||||
"--consumer-property", "group.id=group-from-properties"
|
||||
};
|
||||
|
||||
assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args3));
|
||||
|
||||
// different via --group and --consumer-config
|
||||
propsFile = ToolsTestUtils.tempPropertiesFile(Collections.singletonMap("group.id", "group-from-file"));
|
||||
final String[] args4 = new String[]{
|
||||
"--bootstrap-server", "localhost:9092",
|
||||
"--topic", "test",
|
||||
"--group", "group-from-arguments",
|
||||
"--consumer-config", propsFile.getAbsolutePath()
|
||||
};
|
||||
assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args4));
|
||||
|
||||
// via --group only
|
||||
final String[] args5 = new String[]{
|
||||
"--bootstrap-server", "localhost:9092",
|
||||
"--topic", "test",
|
||||
"--group", "group-from-arguments"
|
||||
};
|
||||
|
||||
config = new ConsoleShareConsumerOptions(args5);
|
||||
props = config.consumerProps();
|
||||
assertEquals("group-from-arguments", props.getProperty("group.id"));
|
||||
|
||||
Exit.resetExitProcedure();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldExitIfNoTopicSpecified() {
|
||||
Exit.setExitProcedure((code, message) -> {
|
||||
throw new IllegalArgumentException(message);
|
||||
});
|
||||
|
||||
String[] args = new String[]{
|
||||
"--bootstrap-server", "localhost:9092"
|
||||
};
|
||||
|
||||
try {
|
||||
assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args));
|
||||
} finally {
|
||||
Exit.resetExitProcedure();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientIdOverride() throws IOException {
|
||||
String[] args = new String[]{
|
||||
"--bootstrap-server", "localhost:9092",
|
||||
"--topic", "test",
|
||||
"--consumer-property", "client.id=consumer-1"
|
||||
};
|
||||
|
||||
ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args);
|
||||
Properties consumerProperties = config.consumerProps();
|
||||
|
||||
assertEquals("consumer-1", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDefaultClientId() throws IOException {
|
||||
String[] args = new String[]{
|
||||
"--bootstrap-server", "localhost:9092",
|
||||
"--topic", "test",
|
||||
};
|
||||
|
||||
ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args);
|
||||
Properties consumerProperties = config.consumerProps();
|
||||
|
||||
assertEquals("console-share-consumer", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRejectOption() throws IOException {
|
||||
String[] args = new String[]{
|
||||
"--bootstrap-server", "localhost:9092",
|
||||
"--topic", "test",
|
||||
"--reject"
|
||||
};
|
||||
|
||||
ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args);
|
||||
assertEquals(AcknowledgeType.REJECT, config.acknowledgeType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReleaseOption() throws IOException {
|
||||
String[] args = new String[]{
|
||||
"--bootstrap-server", "localhost:9092",
|
||||
"--topic", "test",
|
||||
"--release"
|
||||
};
|
||||
|
||||
ConsoleShareConsumerOptions config = new ConsoleShareConsumerOptions(args);
|
||||
assertEquals(AcknowledgeType.RELEASE, config.acknowledgeType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRejectAndReleaseOption() throws IOException {
|
||||
Exit.setExitProcedure((code, message) -> {
|
||||
throw new IllegalArgumentException(message);
|
||||
});
|
||||
String[] args = new String[]{
|
||||
"--bootstrap-server", "localhost:9092",
|
||||
"--topic", "test",
|
||||
"--reject",
|
||||
"--release"
|
||||
};
|
||||
|
||||
try {
|
||||
assertThrows(IllegalArgumentException.class, () -> new ConsoleShareConsumerOptions(args));
|
||||
} finally {
|
||||
Exit.resetExitProcedure();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,134 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.tools.consumer;
|
||||
|
||||
import org.apache.kafka.clients.consumer.AcknowledgeType;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.ShareConsumer;
|
||||
import org.apache.kafka.common.MessageFormatter;
|
||||
import org.apache.kafka.common.errors.TimeoutException;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.server.util.MockTime;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.PrintStream;
|
||||
import java.time.Duration;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class ConsoleShareConsumerTest {
|
||||
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
ConsoleShareConsumer.messageCount = 0;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldThrowTimeoutExceptionWhenTimeoutIsReached() {
|
||||
String topic = "test";
|
||||
final Time time = new MockTime();
|
||||
final int timeoutMs = 1000;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
ShareConsumer<byte[], byte[]> mockConsumer = mock(ShareConsumer.class);
|
||||
|
||||
when(mockConsumer.poll(Duration.ofMillis(timeoutMs))).thenAnswer(invocation -> {
|
||||
time.sleep(timeoutMs / 2 + 1);
|
||||
return ConsumerRecords.EMPTY;
|
||||
});
|
||||
|
||||
ConsoleShareConsumer.ConsumerWrapper consumer = new ConsoleShareConsumer.ConsumerWrapper(
|
||||
topic,
|
||||
mockConsumer,
|
||||
timeoutMs
|
||||
);
|
||||
|
||||
assertThrows(TimeoutException.class, consumer::receive);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldLimitReadsToMaxMessageLimit() {
|
||||
ConsoleShareConsumer.ConsumerWrapper consumer = mock(ConsoleShareConsumer.ConsumerWrapper.class);
|
||||
MessageFormatter formatter = mock(MessageFormatter.class);
|
||||
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("foo", 1, 1, new byte[0], new byte[0]);
|
||||
|
||||
int messageLimit = 10;
|
||||
when(consumer.receive()).thenReturn(record);
|
||||
|
||||
ConsoleShareConsumer.process(messageLimit, formatter, consumer, System.out, true, AcknowledgeType.ACCEPT);
|
||||
|
||||
verify(consumer, times(messageLimit)).receive();
|
||||
verify(formatter, times(messageLimit)).writeTo(any(), any());
|
||||
|
||||
consumer.cleanup();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldStopWhenOutputCheckErrorFails() {
|
||||
ConsoleShareConsumer.ConsumerWrapper consumer = mock(ConsoleShareConsumer.ConsumerWrapper.class);
|
||||
MessageFormatter formatter = mock(MessageFormatter.class);
|
||||
PrintStream printStream = mock(PrintStream.class);
|
||||
|
||||
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("foo", 1, 1, new byte[0], new byte[0]);
|
||||
|
||||
when(consumer.receive()).thenReturn(record);
|
||||
//Simulate an error on System.out after the first record has been printed
|
||||
when(printStream.checkError()).thenReturn(true);
|
||||
|
||||
ConsoleShareConsumer.process(-1, formatter, consumer, printStream, true, AcknowledgeType.ACCEPT);
|
||||
|
||||
verify(formatter).writeTo(any(), eq(printStream));
|
||||
verify(consumer).receive();
|
||||
verify(printStream).checkError();
|
||||
|
||||
consumer.cleanup();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRejectMessageOnError() {
|
||||
ConsoleShareConsumer.ConsumerWrapper consumer = mock(ConsoleShareConsumer.ConsumerWrapper.class);
|
||||
MessageFormatter formatter = mock(MessageFormatter.class);
|
||||
PrintStream printStream = mock(PrintStream.class);
|
||||
|
||||
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("foo", 1, 1, new byte[0], new byte[0]);
|
||||
|
||||
when(consumer.receive()).thenReturn(record);
|
||||
|
||||
//Simulate an error on formatter.writeTo() call
|
||||
doThrow(new RuntimeException())
|
||||
.when(formatter)
|
||||
.writeTo(any(), any());
|
||||
|
||||
ConsoleShareConsumer.process(1, formatter, consumer, printStream, true, AcknowledgeType.ACCEPT);
|
||||
|
||||
verify(formatter).writeTo(any(), eq(printStream));
|
||||
verify(consumer).receive();
|
||||
verify(consumer).acknowledge(record, AcknowledgeType.REJECT);
|
||||
|
||||
consumer.cleanup();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue