KAFKA-14734: Use CommandDefaultOptions in StreamsResetter (#13983)

This PR adds CommandDefaultOptions usage like in the other joptsimple based tools. It also moves the associated unit test class from streams to tools module as discussed in #13127 (comment)

Reviewers:  Luke Chen <showuon@gmail.com>, Bruno Cadonna <cadonna@apache.org>, Sagar Rao <sagarmeansocean@gmail.com>
This commit is contained in:
Federico Valeri 2023-07-20 12:45:05 +02:00 committed by GitHub
parent 583f708006
commit 334c41d604
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 293 additions and 254 deletions

View File

@ -378,10 +378,6 @@
<allow pkg="kafka.admin" />
</subpackage>
<subpackage name="tools">
<allow pkg="org.apache.kafka.tools" />
</subpackage>
<subpackage name="state">
<allow pkg="org.rocksdb" />
</subpackage>

View File

@ -427,7 +427,7 @@ public abstract class AbstractResetIntegrationTest {
cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(CLEANUP_CONSUMER_TIMEOUT));
return new StreamsResetter().run(parameters, cleanUpConfig) == 0;
return new StreamsResetter().execute(parameters, cleanUpConfig) == 0;
}
protected void cleanGlobal(final boolean withIntermediateTopics,

View File

@ -117,7 +117,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.start();
final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
final int exitCode = new StreamsResetter().execute(parameters, cleanUpConfig);
Assert.assertEquals(1, exitCode);
streams.close();
@ -135,7 +135,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(CLEANUP_CONSUMER_TIMEOUT));
final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
final int exitCode = new StreamsResetter().execute(parameters, cleanUpConfig);
Assert.assertEquals(1, exitCode);
}
@ -151,7 +151,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(CLEANUP_CONSUMER_TIMEOUT));
final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
final int exitCode = new StreamsResetter().execute(parameters, cleanUpConfig);
Assert.assertEquals(1, exitCode);
}
@ -167,7 +167,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(CLEANUP_CONSUMER_TIMEOUT));
final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
final int exitCode = new StreamsResetter().execute(parameters, cleanUpConfig);
Assert.assertEquals(1, exitCode);
}
@ -183,7 +183,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(CLEANUP_CONSUMER_TIMEOUT));
final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
final int exitCode = new StreamsResetter().execute(parameters, cleanUpConfig);
Assert.assertEquals(1, exitCode);
}

View File

@ -17,8 +17,6 @@
package org.apache.kafka.tools;
import joptsimple.OptionException;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import org.apache.kafka.clients.admin.Admin;
@ -38,6 +36,7 @@ import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
import java.io.IOException;
@ -89,26 +88,6 @@ import java.util.stream.Collectors;
public class StreamsResetter {
private static final int EXIT_CODE_SUCCESS = 0;
private static final int EXIT_CODE_ERROR = 1;
private static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:9092";
private static OptionSpec<String> bootstrapServersOption;
private static OptionSpec<String> bootstrapServerOption;
private static OptionSpec<String> applicationIdOption;
private static OptionSpec<String> inputTopicsOption;
private static OptionSpec<String> intermediateTopicsOption;
private static OptionSpec<String> internalTopicsOption;
private static OptionSpec<Long> toOffsetOption;
private static OptionSpec<String> toDatetimeOption;
private static OptionSpec<String> byDurationOption;
private static OptionSpecBuilder toEarliestOption;
private static OptionSpecBuilder toLatestOption;
private static OptionSpec<String> fromFileOption;
private static OptionSpec<Long> shiftByOption;
private static OptionSpecBuilder dryRunOption;
private static OptionSpec<Void> helpOption;
private static OptionSpec<Void> versionOption;
private static OptionSpec<String> commandConfigOption;
private static OptionSpecBuilder forceOption;
private final static String USAGE = "This tool helps to quickly reset an application in order to reprocess "
+ "its data from scratch.\n"
@ -133,77 +112,69 @@ public class StreamsResetter {
+ "*** Warning! This tool makes irreversible changes to your application. It is strongly recommended that "
+ "you run this once with \"--dry-run\" to preview your changes before making them.\n\n";
private OptionSet options = null;
private final List<String> allTopics = new LinkedList<>();
public int run(final String[] args) {
return run(args, new Properties());
public static void main(final String[] args) {
Exit.exit(new StreamsResetter().execute(args));
}
public int run(final String[] args,
final Properties config) {
int exitCode;
public int execute(final String[] args) {
return execute(args, new Properties());
}
Admin adminClient = null;
public int execute(final String[] args, final Properties config) {
try {
parseArguments(args);
StreamsResetterOptions options = new StreamsResetterOptions(args);
final boolean dryRun = options.has(dryRunOption);
final String groupId = options.valueOf(applicationIdOption);
final Properties properties = new Properties();
if (options.has(commandConfigOption)) {
properties.putAll(Utils.loadProps(options.valueOf(commandConfigOption)));
String groupId = options.applicationId();
Properties properties = new Properties();
if (options.hasCommandConfig()) {
properties.putAll(Utils.loadProps(options.commandConfig()));
}
String bootstrapServerValue = BOOTSTRAP_SERVER_DEFAULT;
if (options.has(bootstrapServerOption))
bootstrapServerValue = options.valueOf(bootstrapServerOption);
else if (options.has(bootstrapServersOption))
bootstrapServerValue = options.valueOf(bootstrapServersOption);
String bootstrapServerValue = "localhost:9092";
if (options.hasBootstrapServer()) {
bootstrapServerValue = options.bootstrapServer();
} else if (options.hasBootstrapServers()) {
bootstrapServerValue = options.bootstrapServers();
}
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerValue);
adminClient = Admin.create(properties);
maybeDeleteActiveConsumers(groupId, adminClient);
try (Admin adminClient = Admin.create(properties)) {
maybeDeleteActiveConsumers(groupId, adminClient, options);
allTopics.clear();
allTopics.addAll(adminClient.listTopics().names().get(60, TimeUnit.SECONDS));
allTopics.clear();
allTopics.addAll(adminClient.listTopics().names().get(60, TimeUnit.SECONDS));
if (dryRun) {
System.out.println("----Dry run displays the actions which will be performed when running Streams Reset Tool----");
if (options.hasDryRun()) {
System.out.println("----Dry run displays the actions which will be performed when running Streams Reset Tool----");
}
final HashMap<Object, Object> consumerConfig = new HashMap<>(config);
consumerConfig.putAll(properties);
int exitCode = maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, options);
exitCode |= maybeDeleteInternalTopics(adminClient, options);
return exitCode;
}
final HashMap<Object, Object> consumerConfig = new HashMap<>(config);
consumerConfig.putAll(properties);
exitCode = maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, dryRun);
exitCode |= maybeDeleteInternalTopics(adminClient, dryRun);
} catch (final Throwable e) {
exitCode = EXIT_CODE_ERROR;
} catch (Throwable e) {
System.err.println("ERROR: " + e);
e.printStackTrace(System.err);
} finally {
if (adminClient != null) {
adminClient.close(Duration.ofSeconds(60));
}
return EXIT_CODE_ERROR;
}
return exitCode;
}
private void maybeDeleteActiveConsumers(final String groupId,
final Admin adminClient)
final Admin adminClient,
final StreamsResetterOptions options)
throws ExecutionException, InterruptedException {
final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups(
Collections.singleton(groupId),
new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000));
final List<MemberDescription> members =
new ArrayList<>(describeResult.describedGroups().get(groupId).get().members());
if (!members.isEmpty()) {
if (options.has(forceOption)) {
if (options.hasForce()) {
System.out.println("Force deleting all active members in the group: " + groupId);
adminClient.removeMembersFromConsumerGroup(groupId, new RemoveMembersFromConsumerGroupOptions()).all().get();
} else {
@ -215,125 +186,16 @@ public class StreamsResetter {
}
}
private void parseArguments(final String[] args) {
final OptionParser optionParser = new OptionParser(false);
applicationIdOption = optionParser.accepts("application-id", "The Kafka Streams application ID (application.id).")
.withRequiredArg()
.ofType(String.class)
.describedAs("id")
.required();
bootstrapServersOption = optionParser.accepts("bootstrap-servers", "DEPRECATED: Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2")
.withRequiredArg()
.ofType(String.class)
.describedAs("urls");
bootstrapServerOption = optionParser.accepts("bootstrap-server", "REQUIRED unless --bootstrap-servers(deprecated) is specified. The server(s) to connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2. (default: localhost:9092)")
.withRequiredArg()
.ofType(String.class)
.describedAs("server to connect to");
inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics. For these topics, the tool by default will reset the offset to the earliest available offset. "
+ "Reset to other offset position by appending other reset offset option, ex: --input-topics foo --shift-by 5")
.withRequiredArg()
.ofType(String.class)
.withValuesSeparatedBy(',')
.describedAs("list");
intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics (topics that are input and output topics, "
+ "e.g., used in the deprecated through() method). For these topics, the tool will skip to the end.")
.withRequiredArg()
.ofType(String.class)
.withValuesSeparatedBy(',')
.describedAs("list");
internalTopicsOption = optionParser.accepts("internal-topics", "Comma-separated list of "
+ "internal topics to delete. Must be a subset of the internal topics marked for deletion by the "
+ "default behaviour (do a dry-run without this option to view these topics).")
.withRequiredArg()
.ofType(String.class)
.withValuesSeparatedBy(',')
.describedAs("list");
toOffsetOption = optionParser.accepts("to-offset", "Reset offsets to a specific offset.")
.withRequiredArg()
.ofType(Long.class);
toDatetimeOption = optionParser.accepts("to-datetime", "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'")
.withRequiredArg()
.ofType(String.class);
byDurationOption = optionParser.accepts("by-duration", "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'")
.withRequiredArg()
.ofType(String.class);
toEarliestOption = optionParser.accepts("to-earliest", "Reset offsets to earliest offset.");
toLatestOption = optionParser.accepts("to-latest", "Reset offsets to latest offset.");
fromFileOption = optionParser.accepts("from-file", "Reset offsets to values defined in CSV file.")
.withRequiredArg()
.ofType(String.class);
shiftByOption = optionParser.accepts("shift-by", "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative")
.withRequiredArg()
.describedAs("number-of-offsets")
.ofType(Long.class);
commandConfigOption = optionParser.accepts("config-file", "Property file containing configs to be passed to admin clients and embedded consumer.")
.withRequiredArg()
.ofType(String.class)
.describedAs("file name");
forceOption = optionParser.accepts("force", "Force the removal of members of the consumer group (intended to remove stopped members if a long session timeout was used). " +
"Make sure to shut down all stream applications when this option is specified to avoid unexpected rebalances.");
dryRunOption = optionParser.accepts("dry-run", "Display the actions that would be performed without executing the reset commands.");
helpOption = optionParser.accepts("help", "Print usage information.").forHelp();
versionOption = optionParser.accepts("version", "Print version information and exit.").forHelp();
try {
options = optionParser.parse(args);
if (args.length == 0 || options.has(helpOption)) {
CommandLineUtils.printUsageAndExit(optionParser, USAGE);
}
if (options.has(versionOption)) {
CommandLineUtils.printVersionAndExit();
}
} catch (final OptionException e) {
CommandLineUtils.printUsageAndExit(optionParser, e.getMessage());
}
final Set<OptionSpec<?>> allScenarioOptions = new HashSet<>();
allScenarioOptions.add(toOffsetOption);
allScenarioOptions.add(toDatetimeOption);
allScenarioOptions.add(byDurationOption);
allScenarioOptions.add(toEarliestOption);
allScenarioOptions.add(toLatestOption);
allScenarioOptions.add(fromFileOption);
allScenarioOptions.add(shiftByOption);
checkInvalidArgs(optionParser, options, allScenarioOptions, toOffsetOption);
checkInvalidArgs(optionParser, options, allScenarioOptions, toDatetimeOption);
checkInvalidArgs(optionParser, options, allScenarioOptions, byDurationOption);
checkInvalidArgs(optionParser, options, allScenarioOptions, toEarliestOption);
checkInvalidArgs(optionParser, options, allScenarioOptions, toLatestOption);
checkInvalidArgs(optionParser, options, allScenarioOptions, fromFileOption);
checkInvalidArgs(optionParser, options, allScenarioOptions, shiftByOption);
}
private <T> void checkInvalidArgs(final OptionParser optionParser,
final OptionSet options,
final Set<OptionSpec<?>> allOptions,
final OptionSpec<T> option) {
final Set<OptionSpec<?>> invalidOptions = new HashSet<>(allOptions);
invalidOptions.remove(option);
CommandLineUtils.checkInvalidArgs(
optionParser,
options,
option,
invalidOptions);
}
private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map<Object, Object> consumerConfig,
final boolean dryRun)
final StreamsResetterOptions options)
throws IOException, ParseException {
final List<String> inputTopics = options.valuesOf(inputTopicsOption);
final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption);
final List<String> inputTopics = options.inputTopicsOption();
final List<String> intermediateTopics = options.intermediateTopicsOption();
int topicNotFound = EXIT_CODE_SUCCESS;
final List<String> notFoundInputTopics = new ArrayList<>();
final List<String> notFoundIntermediateTopics = new ArrayList<>();
final String groupId = options.valueOf(applicationIdOption);
if (inputTopics.size() == 0 && intermediateTopics.size() == 0) {
System.out.println("No input or intermediate topics specified. Skipping seek.");
return EXIT_CODE_SUCCESS;
@ -387,7 +249,7 @@ public class StreamsResetter {
final Properties config = new Properties();
config.putAll(consumerConfig);
config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, options.applicationId());
config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
try (final KafkaConsumer<byte[], byte[]> client =
@ -404,20 +266,20 @@ public class StreamsResetter {
for (final TopicPartition p : partitions) {
final String topic = p.topic();
if (isInputTopic(topic)) {
if (options.isInputTopic(topic)) {
inputTopicPartitions.add(p);
} else if (isIntermediateTopic(topic)) {
} else if (options.isIntermediateTopic(topic)) {
intermediateTopicPartitions.add(p);
} else {
System.err.println("Skipping invalid partition: " + p);
}
}
maybeReset(groupId, client, inputTopicPartitions);
maybeReset(client, inputTopicPartitions, options);
maybeSeekToEnd(groupId, client, intermediateTopicPartitions);
maybeSeekToEnd(options.applicationId(), client, intermediateTopicPartitions);
if (!dryRun) {
if (!options.hasDryRun()) {
for (final TopicPartition p : partitions) {
client.position(p);
}
@ -446,30 +308,29 @@ public class StreamsResetter {
}
}
private void maybeReset(final String groupId,
final Consumer<byte[], byte[]> client,
final Set<TopicPartition> inputTopicPartitions)
private void maybeReset(final Consumer<byte[], byte[]> client,
final Set<TopicPartition> inputTopicPartitions,
final StreamsResetterOptions options)
throws IOException, ParseException {
if (inputTopicPartitions.size() > 0) {
System.out.println("Following input topics offsets will be reset to (for consumer group " + groupId + ")");
if (options.has(toOffsetOption)) {
resetOffsetsTo(client, inputTopicPartitions, options.valueOf(toOffsetOption));
} else if (options.has(toEarliestOption)) {
System.out.println("Following input topics offsets will be reset to (for consumer group " + options.applicationId() + ")");
if (options.hasToOffset()) {
resetOffsetsTo(client, inputTopicPartitions, options.toOffset());
} else if (options.hasToEarliest()) {
client.seekToBeginning(inputTopicPartitions);
} else if (options.has(toLatestOption)) {
} else if (options.hasToLatest()) {
client.seekToEnd(inputTopicPartitions);
} else if (options.has(shiftByOption)) {
shiftOffsetsBy(client, inputTopicPartitions, options.valueOf(shiftByOption));
} else if (options.has(toDatetimeOption)) {
final String ts = options.valueOf(toDatetimeOption);
} else if (options.hasShiftBy()) {
shiftOffsetsBy(client, inputTopicPartitions, options.shiftBy());
} else if (options.hasToDatetime()) {
final String ts = options.toDatetime();
final long timestamp = Utils.getDateTime(ts);
resetToDatetime(client, inputTopicPartitions, timestamp);
} else if (options.has(byDurationOption)) {
final String duration = options.valueOf(byDurationOption);
} else if (options.hasByDuration()) {
final String duration = options.byDuration();
resetByDuration(client, inputTopicPartitions, Duration.parse(duration));
} else if (options.has(fromFileOption)) {
final String resetPlanPath = options.valueOf(fromFileOption);
} else if (options.hasFromFile()) {
final String resetPlanPath = options.fromFile();
final Map<TopicPartition, Long> topicPartitionsAndOffset =
getTopicPartitionOffsetFromResetPlan(resetPlanPath);
resetOffsetsFromResetPlan(client, inputTopicPartitions, topicPartitionsAndOffset);
@ -500,7 +361,6 @@ public class StreamsResetter {
private Map<TopicPartition, Long> getTopicPartitionOffsetFromResetPlan(final String resetPlanPath)
throws IOException, ParseException {
final String resetPlanCsv = Utils.readFileAsString(resetPlanPath);
return parseResetPlan(resetPlanCsv);
}
@ -578,7 +438,6 @@ public class StreamsResetter {
}
}
private Map<TopicPartition, Long> parseResetPlan(final String resetPlanCsv) throws ParseException {
final Map<TopicPartition, Long> topicPartitionAndOffset = new HashMap<>();
if (resetPlanCsv == null || resetPlanCsv.isEmpty()) {
@ -625,19 +484,11 @@ public class StreamsResetter {
return validatedTopicPartitionsOffsets;
}
private boolean isInputTopic(final String topic) {
return options.valuesOf(inputTopicsOption).contains(topic);
}
private boolean isIntermediateTopic(final String topic) {
return options.valuesOf(intermediateTopicsOption).contains(topic);
}
private int maybeDeleteInternalTopics(final Admin adminClient, final boolean dryRun) {
private int maybeDeleteInternalTopics(final Admin adminClient, final StreamsResetterOptions options) {
final List<String> inferredInternalTopics = allTopics.stream()
.filter(this::isInferredInternalTopic)
.filter(options::isInferredInternalTopic)
.collect(Collectors.toList());
final List<String> specifiedInternalTopics = options.valuesOf(internalTopicsOption);
final List<String> specifiedInternalTopics = options.internalTopics();
final List<String> topicsToDelete;
if (!specifiedInternalTopics.isEmpty()) {
@ -656,7 +507,7 @@ public class StreamsResetter {
System.out.println("Deleting inferred internal topics " + topicsToDelete);
}
if (!dryRun) {
if (!options.hasDryRun()) {
doDelete(topicsToDelete, adminClient);
}
@ -665,8 +516,7 @@ public class StreamsResetter {
}
// visible for testing
public void doDelete(final List<String> topicsToDelete,
final Admin adminClient) {
public void doDelete(final List<String> topicsToDelete, final Admin adminClient) {
boolean hasDeleteErrors = false;
final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete);
final Map<String, KafkaFuture<Void>> results = deleteTopicsResult.topicNameValues();
@ -685,15 +535,6 @@ public class StreamsResetter {
}
}
private boolean isInferredInternalTopic(final String topicName) {
// Specified input/intermediate topics might be named like internal topics (by chance).
// Even is this is not expected in general, we need to exclude those topics here
// and don't consider them as internal topics even if they follow the same naming schema.
// Cf. https://issues.apache.org/jira/browse/KAFKA-7930
return !isInputTopic(topicName) && !isIntermediateTopic(topicName) && topicName.startsWith(options.valueOf(applicationIdOption) + "-")
&& matchesInternalTopicFormat(topicName);
}
// visible for testing
public static boolean matchesInternalTopicFormat(final String topicName) {
return topicName.endsWith("-changelog") || topicName.endsWith("-repartition")
@ -703,8 +544,216 @@ public class StreamsResetter {
|| topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-\\d+-topic");
}
public static void main(final String[] args) {
Exit.exit(new StreamsResetter().run(args));
}
private static class StreamsResetterOptions extends CommandDefaultOptions {
private final OptionSpec<String> bootstrapServersOption;
private final OptionSpec<String> bootstrapServerOption;
private final OptionSpec<String> applicationIdOption;
private final OptionSpec<String> inputTopicsOption;
private final OptionSpec<String> intermediateTopicsOption;
private final OptionSpec<String> internalTopicsOption;
private final OptionSpec<Long> toOffsetOption;
private final OptionSpec<String> toDatetimeOption;
private final OptionSpec<String> byDurationOption;
private final OptionSpecBuilder toEarliestOption;
private final OptionSpecBuilder toLatestOption;
private final OptionSpec<String> fromFileOption;
private final OptionSpec<Long> shiftByOption;
private final OptionSpecBuilder dryRunOption;
private final OptionSpec<String> commandConfigOption;
private final OptionSpecBuilder forceOption;
public StreamsResetterOptions(String[] args) {
super(args);
applicationIdOption = parser.accepts("application-id", "The Kafka Streams application ID (application.id).")
.withRequiredArg()
.ofType(String.class)
.describedAs("id")
.required();
bootstrapServersOption = parser.accepts("bootstrap-servers", "DEPRECATED: Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2")
.withRequiredArg()
.ofType(String.class)
.describedAs("urls");
bootstrapServerOption = parser.accepts("bootstrap-server", "REQUIRED unless --bootstrap-servers(deprecated) is specified. The server(s) to connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2. (default: localhost:9092)")
.withRequiredArg()
.ofType(String.class)
.describedAs("server to connect to");
inputTopicsOption = parser.accepts("input-topics", "Comma-separated list of user input topics. For these topics, the tool by default will reset the offset to the earliest available offset. "
+ "Reset to other offset position by appending other reset offset option, ex: --input-topics foo --shift-by 5")
.withRequiredArg()
.ofType(String.class)
.withValuesSeparatedBy(',')
.describedAs("list");
intermediateTopicsOption = parser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics (topics that are input and output topics, "
+ "e.g., used in the deprecated through() method). For these topics, the tool will skip to the end.")
.withRequiredArg()
.ofType(String.class)
.withValuesSeparatedBy(',')
.describedAs("list");
internalTopicsOption = parser.accepts("internal-topics", "Comma-separated list of "
+ "internal topics to delete. Must be a subset of the internal topics marked for deletion by the "
+ "default behaviour (do a dry-run without this option to view these topics).")
.withRequiredArg()
.ofType(String.class)
.withValuesSeparatedBy(',')
.describedAs("list");
toOffsetOption = parser.accepts("to-offset", "Reset offsets to a specific offset.")
.withRequiredArg()
.ofType(Long.class);
toDatetimeOption = parser.accepts("to-datetime", "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'")
.withRequiredArg()
.ofType(String.class);
byDurationOption = parser.accepts("by-duration", "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'")
.withRequiredArg()
.ofType(String.class);
toEarliestOption = parser.accepts("to-earliest", "Reset offsets to earliest offset.");
toLatestOption = parser.accepts("to-latest", "Reset offsets to latest offset.");
fromFileOption = parser.accepts("from-file", "Reset offsets to values defined in CSV file.")
.withRequiredArg()
.ofType(String.class);
shiftByOption = parser.accepts("shift-by", "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative")
.withRequiredArg()
.describedAs("number-of-offsets")
.ofType(Long.class);
commandConfigOption = parser.accepts("config-file", "Property file containing configs to be passed to admin clients and embedded consumer.")
.withRequiredArg()
.ofType(String.class)
.describedAs("file name");
forceOption = parser.accepts("force", "Force the removal of members of the consumer group (intended to remove stopped members if a long session timeout was used). " +
"Make sure to shut down all stream applications when this option is specified to avoid unexpected rebalances.");
dryRunOption = parser.accepts("dry-run", "Display the actions that would be performed without executing the reset commands.");
try {
options = parser.parse(args);
if (CommandLineUtils.isPrintHelpNeeded(this)) {
CommandLineUtils.printUsageAndExit(parser, USAGE);
}
if (CommandLineUtils.isPrintVersionNeeded(this)) {
CommandLineUtils.printVersionAndExit();
}
CommandLineUtils.checkInvalidArgs(parser, options, toOffsetOption, toDatetimeOption, byDurationOption, toEarliestOption, toLatestOption, fromFileOption, shiftByOption);
CommandLineUtils.checkInvalidArgs(parser, options, toDatetimeOption, toOffsetOption, byDurationOption, toEarliestOption, toLatestOption, fromFileOption, shiftByOption);
CommandLineUtils.checkInvalidArgs(parser, options, byDurationOption, toOffsetOption, toDatetimeOption, toEarliestOption, toLatestOption, fromFileOption, shiftByOption);
CommandLineUtils.checkInvalidArgs(parser, options, toEarliestOption, toOffsetOption, toDatetimeOption, byDurationOption, toLatestOption, fromFileOption, shiftByOption);
CommandLineUtils.checkInvalidArgs(parser, options, toLatestOption, toOffsetOption, toDatetimeOption, byDurationOption, toEarliestOption, fromFileOption, shiftByOption);
CommandLineUtils.checkInvalidArgs(parser, options, fromFileOption, toOffsetOption, toDatetimeOption, byDurationOption, toEarliestOption, toLatestOption, shiftByOption);
CommandLineUtils.checkInvalidArgs(parser, options, shiftByOption, toOffsetOption, toDatetimeOption, byDurationOption, toEarliestOption, toLatestOption, fromFileOption);
} catch (final OptionException e) {
CommandLineUtils.printUsageAndExit(parser, e.getMessage());
}
}
public boolean hasDryRun() {
return options.has(dryRunOption);
}
public String applicationId() {
return options.valueOf(applicationIdOption);
}
public boolean hasCommandConfig() {
return options.has(commandConfigOption);
}
public String commandConfig() {
return options.valueOf(commandConfigOption);
}
public boolean hasBootstrapServer() {
return options.has(bootstrapServerOption);
}
public String bootstrapServer() {
return options.valueOf(bootstrapServerOption);
}
public boolean hasBootstrapServers() {
return options.has(bootstrapServersOption);
}
public String bootstrapServers() {
return options.valueOf(bootstrapServersOption);
}
public boolean hasForce() {
return options.has(forceOption);
}
public List<String> inputTopicsOption() {
return options.valuesOf(inputTopicsOption);
}
public List<String> intermediateTopicsOption() {
return options.valuesOf(intermediateTopicsOption);
}
public boolean hasToOffset() {
return options.has(toOffsetOption);
}
public long toOffset() {
return options.valueOf(toOffsetOption);
}
public boolean hasToEarliest() {
return options.has(toEarliestOption);
}
public boolean hasToLatest() {
return options.has(toLatestOption);
}
public boolean hasShiftBy() {
return options.has(shiftByOption);
}
public long shiftBy() {
return options.valueOf(shiftByOption);
}
public boolean hasToDatetime() {
return options.has(toDatetimeOption);
}
public String toDatetime() {
return options.valueOf(toDatetimeOption);
}
public boolean hasByDuration() {
return options.has(byDurationOption);
}
public String byDuration() {
return options.valueOf(byDurationOption);
}
public boolean hasFromFile() {
return options.has(fromFileOption);
}
public String fromFile() {
return options.valueOf(fromFileOption);
}
public boolean isInputTopic(String topic) {
return options.valuesOf(inputTopicsOption).contains(topic);
}
public boolean isIntermediateTopic(String topic) {
return options.valuesOf(intermediateTopicsOption).contains(topic);
}
private boolean isInferredInternalTopic(final String topicName) {
// Specified input/intermediate topics might be named like internal topics (by chance).
// Even is this is not expected in general, we need to exclude those topics here
// and don't consider them as internal topics even if they follow the same naming schema.
// Cf. https://issues.apache.org/jira/browse/KAFKA-7930
return !isInputTopic(topicName) && !isIntermediateTopic(topicName) && topicName.startsWith(options.valueOf(applicationIdOption) + "-")
&& matchesInternalTopicFormat(topicName);
}
public List<String> internalTopics() {
return options.valuesOf(internalTopicsOption);
}
}
}

View File

@ -14,9 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tools;
package org.apache.kafka.tools;
import org.apache.kafka.tools.StreamsResetter;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -28,10 +27,9 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.time.Duration;
import java.time.Instant;
@ -41,24 +39,22 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 600, unit = TimeUnit.SECONDS)
public class StreamsResetterTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);
private static final String TOPIC = "topic1";
private final StreamsResetter streamsResetter = new StreamsResetter();
private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
private final TopicPartition topicPartition = new TopicPartition(TOPIC, 0);
private final Set<TopicPartition> inputTopicPartitions = new HashSet<>(Collections.singletonList(topicPartition));
@Before
public void setUp() {
@BeforeEach
public void beforeEach() {
consumer.assign(Collections.singletonList(topicPartition));
consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0L, new byte[] {}, new byte[] {}));
consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1L, new byte[] {}, new byte[] {}));
consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 2L, new byte[] {}, new byte[] {}));
@ -309,7 +305,6 @@ public class StreamsResetterTest {
}
private static class EmptyPartitionConsumer<K, V> extends MockConsumer<K, V> {
public EmptyPartitionConsumer(final OffsetResetStrategy offsetResetStrategy) {
super(offsetResetStrategy);
}
@ -321,5 +316,4 @@ public class StreamsResetterTest {
return topicPartitionToOffsetAndTimestamp;
}
}
}