mirror of https://github.com/apache/kafka.git
KAFKA-17598; Command line validation tool for RE2J regex (#18031)
This patch introduces the `--validate-regex` argument to the `kafka-consumer-group` command line tool as defined in KIP-848. The new argument allows the verification of RE2 regular expressions. Reviewers: Andrew Schofield <aschofield@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
This commit is contained in:
parent
8fde6dedea
commit
c3506834e9
|
@ -2426,6 +2426,7 @@ project(':tools') {
|
||||||
implementation libs.slf4jApi
|
implementation libs.slf4jApi
|
||||||
implementation libs.slf4jReload4j
|
implementation libs.slf4jReload4j
|
||||||
implementation libs.joptSimple
|
implementation libs.joptSimple
|
||||||
|
implementation libs.re2j
|
||||||
|
|
||||||
implementation libs.jose4j // for SASL/OAUTHBEARER JWT validation
|
implementation libs.jose4j // for SASL/OAUTHBEARER JWT validation
|
||||||
implementation libs.jacksonJaxrsJsonProvider
|
implementation libs.jacksonJaxrsJsonProvider
|
||||||
|
|
|
@ -331,6 +331,7 @@
|
||||||
<allow pkg="org.apache.kafka.tools"/>
|
<allow pkg="org.apache.kafka.tools"/>
|
||||||
<allow pkg="org.apache.kafka.server.config" />
|
<allow pkg="org.apache.kafka.server.config" />
|
||||||
<allow pkg="scala"/>
|
<allow pkg="scala"/>
|
||||||
|
<allow pkg="com.google.re2j"/>
|
||||||
<subpackage name="share">
|
<subpackage name="share">
|
||||||
<allow pkg="org.apache.kafka.server.share" />
|
<allow pkg="org.apache.kafka.server.share" />
|
||||||
<allow pkg="org.apache.kafka.coordinator.share" />
|
<allow pkg="org.apache.kafka.coordinator.share" />
|
||||||
|
|
|
@ -52,6 +52,8 @@ import org.apache.kafka.server.util.CommandLineUtils;
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.ObjectReader;
|
import com.fasterxml.jackson.databind.ObjectReader;
|
||||||
import com.fasterxml.jackson.databind.ObjectWriter;
|
import com.fasterxml.jackson.databind.ObjectWriter;
|
||||||
|
import com.google.re2j.Pattern;
|
||||||
|
import com.google.re2j.PatternSyntaxException;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -85,6 +87,7 @@ import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import joptsimple.OptionException;
|
import joptsimple.OptionException;
|
||||||
|
import joptsimple.OptionSpec;
|
||||||
|
|
||||||
public class ConsumerGroupCommand {
|
public class ConsumerGroupCommand {
|
||||||
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerGroupCommand.class);
|
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerGroupCommand.class);
|
||||||
|
@ -94,10 +97,27 @@ public class ConsumerGroupCommand {
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
ConsumerGroupCommandOptions opts = ConsumerGroupCommandOptions.fromArgs(args);
|
ConsumerGroupCommandOptions opts = ConsumerGroupCommandOptions.fromArgs(args);
|
||||||
try {
|
try {
|
||||||
// should have exactly one action
|
List<OptionSpec<?>> actions = List.of(
|
||||||
long actions = Stream.of(opts.listOpt, opts.describeOpt, opts.deleteOpt, opts.resetOffsetsOpt, opts.deleteOffsetsOpt).filter(opts.options::has).count();
|
opts.listOpt,
|
||||||
if (actions != 1)
|
opts.describeOpt,
|
||||||
CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --list, --describe, --delete, --reset-offsets, --delete-offsets");
|
opts.deleteOpt,
|
||||||
|
opts.resetOffsetsOpt,
|
||||||
|
opts.deleteOffsetsOpt,
|
||||||
|
opts.validateRegexOpt
|
||||||
|
);
|
||||||
|
|
||||||
|
// Should have exactly one action.
|
||||||
|
if (actions.stream().filter(opts.options::has).count() != 1) {
|
||||||
|
CommandLineUtils.printUsageAndExit(
|
||||||
|
opts.parser,
|
||||||
|
String.format(
|
||||||
|
"Command must include exactly one action: %s",
|
||||||
|
actions.stream().map(opt ->
|
||||||
|
"--" + opt.options().get(0)
|
||||||
|
).collect(Collectors.joining(", "))
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
run(opts);
|
run(opts);
|
||||||
} catch (OptionException e) {
|
} catch (OptionException e) {
|
||||||
|
@ -106,6 +126,11 @@ public class ConsumerGroupCommand {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void run(ConsumerGroupCommandOptions opts) {
|
static void run(ConsumerGroupCommandOptions opts) {
|
||||||
|
if (opts.options.has(opts.validateRegexOpt)) {
|
||||||
|
validateRegex(opts.options.valueOf(opts.validateRegexOpt));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
try (ConsumerGroupService consumerGroupService = new ConsumerGroupService(opts, Collections.emptyMap())) {
|
try (ConsumerGroupService consumerGroupService = new ConsumerGroupService(opts, Collections.emptyMap())) {
|
||||||
if (opts.options.has(opts.listOpt))
|
if (opts.options.has(opts.listOpt))
|
||||||
consumerGroupService.listGroups();
|
consumerGroupService.listGroups();
|
||||||
|
@ -130,6 +155,15 @@ public class ConsumerGroupCommand {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void validateRegex(String regex) {
|
||||||
|
try {
|
||||||
|
Pattern.compile(regex);
|
||||||
|
System.out.printf("The regular expression `%s` is valid.%n", regex);
|
||||||
|
} catch (PatternSyntaxException ex) {
|
||||||
|
System.out.printf("The regular expression `%s` is invalid: %s.%n", regex, ex.getDescription());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static Set<GroupState> groupStatesFromString(String input) {
|
static Set<GroupState> groupStatesFromString(String input) {
|
||||||
Set<GroupState> parsedStates = Arrays.stream(input.split(",")).map(s -> GroupState.parse(s.trim())).collect(Collectors.toSet());
|
Set<GroupState> parsedStates = Arrays.stream(input.split(",")).map(s -> GroupState.parse(s.trim())).collect(Collectors.toSet());
|
||||||
Set<GroupState> validStates = GroupState.groupStatesForType(GroupType.CONSUMER);
|
Set<GroupState> validStates = GroupState.groupStatesForType(GroupType.CONSUMER);
|
||||||
|
|
|
@ -35,7 +35,7 @@ import static org.apache.kafka.tools.ToolsUtils.minus;
|
||||||
public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
|
public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
|
||||||
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerGroupCommandOptions.class);
|
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerGroupCommandOptions.class);
|
||||||
|
|
||||||
private static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) to connect to.";
|
private static final String BOOTSTRAP_SERVER_DOC = "The server(s) to connect to. REQUIRED for all options except for --validate-regex.";
|
||||||
private static final String GROUP_DOC = "The consumer group we wish to act on.";
|
private static final String GROUP_DOC = "The consumer group we wish to act on.";
|
||||||
private static final String TOPIC_DOC = "The topic whose consumer group information should be deleted or topic whose should be included in the reset offset process. " +
|
private static final String TOPIC_DOC = "The topic whose consumer group information should be deleted or topic whose should be included in the reset offset process. " +
|
||||||
"In `reset-offsets` case, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the process. " +
|
"In `reset-offsets` case, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the process. " +
|
||||||
|
@ -84,6 +84,7 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
|
||||||
"Example: --bootstrap-server localhost:9092 --list --type classic,consumer" + NL +
|
"Example: --bootstrap-server localhost:9092 --list --type classic,consumer" + NL +
|
||||||
"This option may be used with the '--list' option only.";
|
"This option may be used with the '--list' option only.";
|
||||||
private static final String DELETE_OFFSETS_DOC = "Delete offsets of consumer group. Supports one consumer group at the time, and multiple topics.";
|
private static final String DELETE_OFFSETS_DOC = "Delete offsets of consumer group. Supports one consumer group at the time, and multiple topics.";
|
||||||
|
private static final String VALIDATE_REGEX_DOC = "Validate that the syntax of the provided regular expression is valid according to the RE2 format.";
|
||||||
|
|
||||||
final OptionSpec<String> bootstrapServerOpt;
|
final OptionSpec<String> bootstrapServerOpt;
|
||||||
final OptionSpec<String> groupOpt;
|
final OptionSpec<String> groupOpt;
|
||||||
|
@ -113,6 +114,7 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
|
||||||
final OptionSpec<Void> offsetsOpt;
|
final OptionSpec<Void> offsetsOpt;
|
||||||
final OptionSpec<String> stateOpt;
|
final OptionSpec<String> stateOpt;
|
||||||
final OptionSpec<String> typeOpt;
|
final OptionSpec<String> typeOpt;
|
||||||
|
final OptionSpec<String> validateRegexOpt;
|
||||||
|
|
||||||
final Set<OptionSpec<?>> allGroupSelectionScopeOpts;
|
final Set<OptionSpec<?>> allGroupSelectionScopeOpts;
|
||||||
final Set<OptionSpec<?>> allConsumerGroupLevelOpts;
|
final Set<OptionSpec<?>> allConsumerGroupLevelOpts;
|
||||||
|
@ -196,6 +198,10 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
|
||||||
.availableIf(listOpt)
|
.availableIf(listOpt)
|
||||||
.withOptionalArg()
|
.withOptionalArg()
|
||||||
.ofType(String.class);
|
.ofType(String.class);
|
||||||
|
validateRegexOpt = parser.accepts("validate-regex", VALIDATE_REGEX_DOC)
|
||||||
|
.withRequiredArg()
|
||||||
|
.describedAs("regex")
|
||||||
|
.ofType(String.class);
|
||||||
|
|
||||||
allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt, allGroupsOpt));
|
allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt, allGroupsOpt));
|
||||||
allConsumerGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, describeOpt, deleteOpt, resetOffsetsOpt));
|
allConsumerGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, describeOpt, deleteOpt, resetOffsetsOpt));
|
||||||
|
@ -210,7 +216,9 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
|
||||||
void checkArgs() {
|
void checkArgs() {
|
||||||
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to list all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.");
|
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to list all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.");
|
||||||
|
|
||||||
|
if (!options.has(validateRegexOpt)) {
|
||||||
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt);
|
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt);
|
||||||
|
}
|
||||||
|
|
||||||
if (options.has(describeOpt)) {
|
if (options.has(describeOpt)) {
|
||||||
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
|
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
|
||||||
|
|
|
@ -0,0 +1,57 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.tools.consumer.group;
|
||||||
|
|
||||||
|
import org.apache.kafka.tools.ToolsTestUtils;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
|
public class ConsumerGroupCommandTest {
|
||||||
|
@Test
|
||||||
|
public void testValidateRegexCommandWithValidRegex() {
|
||||||
|
String output = ToolsTestUtils.grabConsoleOutput(
|
||||||
|
() -> ConsumerGroupCommand.main(List.of(
|
||||||
|
"--validate-regex",
|
||||||
|
"foo.*"
|
||||||
|
).toArray(new String[0]))
|
||||||
|
);
|
||||||
|
|
||||||
|
assertEquals(
|
||||||
|
"The regular expression `foo.*` is valid.\n",
|
||||||
|
output
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidateRegexCommandWithInvalidRegex() {
|
||||||
|
String output = ToolsTestUtils.grabConsoleOutput(
|
||||||
|
() -> ConsumerGroupCommand.main(List.of(
|
||||||
|
"--validate-regex",
|
||||||
|
"[foo.*"
|
||||||
|
).toArray(new String[0]))
|
||||||
|
);
|
||||||
|
|
||||||
|
assertEquals(
|
||||||
|
"The regular expression `[foo.*` is invalid: missing closing ].\n",
|
||||||
|
output
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue