diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java index 55acea53eea..5fe66a59988 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java @@ -34,6 +34,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.GroupProtocol; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; @@ -528,6 +529,15 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons .metavar("TOPIC") .help("Consumes messages from this topic."); + parser.addArgument("--group-protocol") + .action(store()) + .required(false) + .type(String.class) + .setDefault(GroupProtocol.CLASSIC.name) + .metavar("GROUP_PROTOCOL") + .dest("groupProtocol") + .help(String.format("Group protocol (must be one of %s)", Utils.join(GroupProtocol.values(), ", "))); + parser.addArgument("--group-id") .action(store()) .required(true) @@ -617,6 +627,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons } } + consumerProps.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, res.getString("groupProtocol")); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, res.getString("groupId")); String groupInstanceId = res.getString("groupInstanceId");