diff --git a/build.gradle b/build.gradle
index 338bc9aa0cf..6f58553879b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2426,6 +2426,7 @@ project(':tools') {
implementation libs.slf4jApi
implementation libs.slf4jReload4j
implementation libs.joptSimple
+ implementation libs.re2j
implementation libs.jose4j // for SASL/OAUTHBEARER JWT validation
implementation libs.jacksonJaxrsJsonProvider
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index c71ffa4accc..c6ec12994fd 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -331,6 +331,7 @@
+
diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
index 71af2a05ff5..33e483cbb94 100644
--- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
@@ -52,6 +52,8 @@ import org.apache.kafka.server.util.CommandLineUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.re2j.Pattern;
+import com.google.re2j.PatternSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,6 +87,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import joptsimple.OptionException;
+import joptsimple.OptionSpec;
public class ConsumerGroupCommand {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerGroupCommand.class);
@@ -94,10 +97,27 @@ public class ConsumerGroupCommand {
public static void main(String[] args) {
ConsumerGroupCommandOptions opts = ConsumerGroupCommandOptions.fromArgs(args);
try {
- // should have exactly one action
- long actions = Stream.of(opts.listOpt, opts.describeOpt, opts.deleteOpt, opts.resetOffsetsOpt, opts.deleteOffsetsOpt).filter(opts.options::has).count();
- if (actions != 1)
- CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --list, --describe, --delete, --reset-offsets, --delete-offsets");
+ List> actions = List.of(
+ opts.listOpt,
+ opts.describeOpt,
+ opts.deleteOpt,
+ opts.resetOffsetsOpt,
+ opts.deleteOffsetsOpt,
+ opts.validateRegexOpt
+ );
+
+ // Should have exactly one action.
+ if (actions.stream().filter(opts.options::has).count() != 1) {
+ CommandLineUtils.printUsageAndExit(
+ opts.parser,
+ String.format(
+ "Command must include exactly one action: %s",
+ actions.stream().map(opt ->
+ "--" + opt.options().get(0)
+ ).collect(Collectors.joining(", "))
+ )
+ );
+ }
run(opts);
} catch (OptionException e) {
@@ -106,6 +126,11 @@ public class ConsumerGroupCommand {
}
static void run(ConsumerGroupCommandOptions opts) {
+ if (opts.options.has(opts.validateRegexOpt)) {
+ validateRegex(opts.options.valueOf(opts.validateRegexOpt));
+ return;
+ }
+
try (ConsumerGroupService consumerGroupService = new ConsumerGroupService(opts, Collections.emptyMap())) {
if (opts.options.has(opts.listOpt))
consumerGroupService.listGroups();
@@ -130,6 +155,15 @@ public class ConsumerGroupCommand {
}
}
+ static void validateRegex(String regex) {
+ try {
+ Pattern.compile(regex);
+ System.out.printf("The regular expression `%s` is valid.%n", regex);
+ } catch (PatternSyntaxException ex) {
+ System.out.printf("The regular expression `%s` is invalid: %s.%n", regex, ex.getDescription());
+ }
+ }
+
static Set groupStatesFromString(String input) {
Set parsedStates = Arrays.stream(input.split(",")).map(s -> GroupState.parse(s.trim())).collect(Collectors.toSet());
Set validStates = GroupState.groupStatesForType(GroupType.CONSUMER);
diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
index ec4914fa556..6289b3a3099 100644
--- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
+++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
@@ -35,7 +35,7 @@ import static org.apache.kafka.tools.ToolsUtils.minus;
public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerGroupCommandOptions.class);
- private static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) to connect to.";
+ private static final String BOOTSTRAP_SERVER_DOC = "The server(s) to connect to. REQUIRED for all options except for --validate-regex.";
private static final String GROUP_DOC = "The consumer group we wish to act on.";
private static final String TOPIC_DOC = "The topic whose consumer group information should be deleted or topic whose should be included in the reset offset process. " +
"In `reset-offsets` case, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the process. " +
@@ -84,6 +84,7 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
"Example: --bootstrap-server localhost:9092 --list --type classic,consumer" + NL +
"This option may be used with the '--list' option only.";
private static final String DELETE_OFFSETS_DOC = "Delete offsets of consumer group. Supports one consumer group at the time, and multiple topics.";
+ private static final String VALIDATE_REGEX_DOC = "Validate that the syntax of the provided regular expression is valid according to the RE2 format.";
final OptionSpec bootstrapServerOpt;
final OptionSpec groupOpt;
@@ -113,6 +114,7 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
final OptionSpec offsetsOpt;
final OptionSpec stateOpt;
final OptionSpec typeOpt;
+ final OptionSpec validateRegexOpt;
final Set> allGroupSelectionScopeOpts;
final Set> allConsumerGroupLevelOpts;
@@ -196,6 +198,10 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
.availableIf(listOpt)
.withOptionalArg()
.ofType(String.class);
+ validateRegexOpt = parser.accepts("validate-regex", VALIDATE_REGEX_DOC)
+ .withRequiredArg()
+ .describedAs("regex")
+ .ofType(String.class);
allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt, allGroupsOpt));
allConsumerGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, describeOpt, deleteOpt, resetOffsetsOpt));
@@ -210,7 +216,9 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
void checkArgs() {
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to list all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.");
- CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt);
+ if (!options.has(validateRegexOpt)) {
+ CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt);
+ }
if (options.has(describeOpt)) {
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java
new file mode 100644
index 00000000000..000a882347a
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import org.apache.kafka.tools.ToolsTestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ConsumerGroupCommandTest {
+ @Test
+ public void testValidateRegexCommandWithValidRegex() {
+ String output = ToolsTestUtils.grabConsoleOutput(
+ () -> ConsumerGroupCommand.main(List.of(
+ "--validate-regex",
+ "foo.*"
+ ).toArray(new String[0]))
+ );
+
+ assertEquals(
+ "The regular expression `foo.*` is valid.\n",
+ output
+ );
+ }
+
+ @Test
+ public void testValidateRegexCommandWithInvalidRegex() {
+ String output = ToolsTestUtils.grabConsoleOutput(
+ () -> ConsumerGroupCommand.main(List.of(
+ "--validate-regex",
+ "[foo.*"
+ ).toArray(new String[0]))
+ );
+
+ assertEquals(
+ "The regular expression `[foo.*` is invalid: missing closing ].\n",
+ output
+ );
+ }
+}