KAFKA-5520: KIP-171; Extend Consumer Group Reset Offset for Stream Application

KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application

Merge changes from KIP-198

Ref: https://github.com/apache/kafka/pull/3831

Author: Jorge Quilcate Otoya <quilcate.jorge@gmail.com>
Author: Ismael Juma <ismael@juma.me.uk>
Author: Matthias J. Sax <matthias@confluent.io>
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Author: Guozhang Wang <wangguoz@gmail.com>
Author: Apurva Mehta <apurva@confluent.io>
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Author: Jason Gustafson <jason@confluent.io>
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Author: Bill Bejeck <bill@confluent.io>
Author: Dong Lin <lindong28@gmail.com>
Author: Soenke Liebau <soenke.liebau@opencore.com>
Author: Colin P. Mccabe <cmccabe@confluent.io>
Author: Damian Guy <damian.guy@gmail.com>
Author: Xavier Léauté <xl+github@xvrl.net>
Author: Maytee Chinavanichkit <maytee.chinavanichkit@linecorp.com>
Author: Joel Hamill <git config --global user.email>
Author: Paolo Patierno <ppatierno@live.com>
Author: siva santhalingam <siva.santhalingam@gmail.com>
Author: Tommy Becker <tobecker@tivo.com>
Author: Mickael Maison <mickael.maison@gmail.com>
Author: Onur Karaman <okaraman@linkedin.com>
Author: tedyu <yuzhihong@gmail.com>
Author: Xin Li <Xin.Li@trivago.com>
Author: Magnus Edenhill <magnus@edenhill.se>
Author: Manjula K <manjula@kafka-summit.org>
Author: Hugo Louro <hmclouro@gmail.com>
Author: Jeff Widman <jeff@jeffwidman.com>
Author: bartdevylder <bartdevylder@gmail.com>
Author: Ewen Cheslack-Postava <me@ewencp.org>
Author: Jacek Laskowski <jacek@japila.pl>
Author: Tom Bentley <tbentley@redhat.com>
Author: Konstantine Karantasis <konstantine@confluent.io>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #4159 from jeqo/feature/kip-171
This commit is contained in:
Jorge Quilcate Otoya 2017-12-06 11:38:38 -08:00 committed by Guozhang Wang
parent fd8f182cc4
commit 30f08d158a
9 changed files with 835 additions and 101 deletions

View File

@ -60,6 +60,9 @@
<allow pkg="kafka.consumer" />
<allow pkg="joptsimple" />
<allow pkg="org.apache.kafka.clients.consumer" />
<allow class="javax.xml.datatype.Duration" />
<allow class="javax.xml.datatype.DatatypeFactory" />
<allow class="javax.xml.datatype.DatatypeConfigurationException" />
</subpackage>
<subpackage name="coordinator">

View File

@ -233,6 +233,10 @@
<allow pkg="kafka.admin" />
</subpackage>
<subpackage name="tools">
<allow pkg="kafka.tools" />
</subpackage>
<subpackage name="state">
<allow pkg="org.rocksdb" />
</subpackage>

View File

@ -529,8 +529,8 @@ object ConsumerGroupCommand extends Logging {
case "Empty" | "Dead" =>
val partitionsToReset = getPartitionsToReset(groupId)
val preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset)
val execute = opts.options.has(opts.executeOpt)
if (execute)
val dryRun = opts.options.has(opts.dryRunOpt)
if (!dryRun)
getConsumer().commitSync(preparedOffsets.asJava)
preparedOffsets
case currentState =>
@ -727,7 +727,7 @@ object ConsumerGroupCommand extends Logging {
"Has 3 execution options: (default) to plan which offsets to reset, --execute to execute the reset-offsets process, and --export to export the results to a CSV format." + nl +
"Has the following scenarios to choose: --to-datetime, --by-period, --to-earliest, --to-latest, --shift-by, --from-file, --to-current. One scenario must be choose" + nl +
"To define the scope use: --all-topics or --topic. . One scope must be choose, unless you use '--from-file' scenario"
val ExecuteDoc = "Execute operation. Supported operations: reset-offsets."
val DryRunDoc = "Only show results without executing changes on Consumer Groups. Supported operations: reset-offsets."
val ExportDoc = "Export operation execution to a CSV file. Supported operations: reset-offsets."
val ResetToOffsetDoc = "Reset offsets to a specific offset."
val ResetFromFileDoc = "Reset offsets to values defined in CSV file."
@ -770,7 +770,7 @@ object ConsumerGroupCommand extends Logging {
.describedAs("command config property file")
.ofType(classOf[String])
val resetOffsetsOpt = parser.accepts("reset-offsets", ResetOffsetsDoc)
val executeOpt = parser.accepts("execute", ExecuteDoc)
val dryRunOpt = parser.accepts("dry-run", DryRunDoc)
val exportOpt = parser.accepts("export", ExportDoc)
val resetToOffsetOpt = parser.accepts("to-offset", ResetToOffsetDoc)
.withRequiredArg()

View File

@ -21,11 +21,14 @@ import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import kafka.utils.CommandLineUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
@ -33,8 +36,14 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import javax.xml.datatype.DatatypeConfigurationException;
import javax.xml.datatype.DatatypeFactory;
import javax.xml.datatype.Duration;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@ -74,12 +83,19 @@ public class StreamsResetter {
private static OptionSpec<String> applicationIdOption;
private static OptionSpec<String> inputTopicsOption;
private static OptionSpec<String> intermediateTopicsOption;
private static OptionSpec<Long> toOffsetOption;
private static OptionSpec<String> toDatetimeOption;
private static OptionSpec<String> byDurationOption;
private static OptionSpecBuilder toEarliestOption;
private static OptionSpecBuilder toLatestOption;
private static OptionSpec<String> fromFileOption;
private static OptionSpec<Long> shiftByOption;
private static OptionSpecBuilder dryRunOption;
private static OptionSpec<String> commandConfigOption;
private OptionSet options = null;
private final List<String> allTopics = new LinkedList<>();
private boolean dryRun = false;
public int run(final String[] args) {
return run(args, new Properties());
@ -93,7 +109,7 @@ public class StreamsResetter {
try {
parseArguments(args);
dryRun = options.has(dryRunOption);
final boolean dryRun = options.has(dryRunOption);
final String groupId = options.valueOf(applicationIdOption);
final Properties properties = new Properties();
@ -114,8 +130,8 @@ public class StreamsResetter {
final HashMap<Object, Object> consumerConfig = new HashMap<>(config);
consumerConfig.putAll(properties);
maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig);
maybeDeleteInternalTopics(kafkaAdminClient);
maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, dryRun);
maybeDeleteInternalTopics(kafkaAdminClient, dryRun);
} catch (final Throwable e) {
exitCode = EXIT_CODE_ERROR;
@ -169,6 +185,24 @@ public class StreamsResetter {
.ofType(String.class)
.withValuesSeparatedBy(',')
.describedAs("list");
toOffsetOption = optionParser.accepts("to-offset", "Reset offsets to a specific offset.")
.withRequiredArg()
.ofType(Long.class);
toDatetimeOption = optionParser.accepts("to-datetime", "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'")
.withRequiredArg()
.ofType(String.class);
byDurationOption = optionParser.accepts("by-duration", "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'")
.withRequiredArg()
.ofType(String.class);
toEarliestOption = optionParser.accepts("to-earliest", "Reset offsets to earliest offset.");
toLatestOption = optionParser.accepts("to-latest", "Reset offsets to latest offset.");
fromFileOption = optionParser.accepts("from-file", "Reset offsets to values defined in CSV file.")
.withRequiredArg()
.ofType(String.class);
shiftByOption = optionParser.accepts("shift-by", "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative")
.withRequiredArg()
.describedAs("number-of-offsets")
.ofType(Long.class);
commandConfigOption = optionParser.accepts("config-file", "Property file containing configs to be passed to admin clients and embedded consumer.")
.withRequiredArg()
.ofType(String.class)
@ -184,17 +218,33 @@ public class StreamsResetter {
printHelp(optionParser);
throw e;
}
scala.collection.immutable.HashSet<OptionSpec<?>> allScenarioOptions = new scala.collection.immutable.HashSet<>();
allScenarioOptions.$plus(toOffsetOption);
allScenarioOptions.$plus(toDatetimeOption);
allScenarioOptions.$plus(byDurationOption);
allScenarioOptions.$plus(toEarliestOption);
allScenarioOptions.$plus(toLatestOption);
allScenarioOptions.$plus(fromFileOption);
allScenarioOptions.$plus(shiftByOption);
CommandLineUtils.checkInvalidArgs(optionParser, options, toOffsetOption, allScenarioOptions.$minus(toOffsetOption));
CommandLineUtils.checkInvalidArgs(optionParser, options, toDatetimeOption, allScenarioOptions.$minus(toDatetimeOption));
CommandLineUtils.checkInvalidArgs(optionParser, options, byDurationOption, allScenarioOptions.$minus(byDurationOption));
CommandLineUtils.checkInvalidArgs(optionParser, options, toEarliestOption, allScenarioOptions.$minus(toEarliestOption));
CommandLineUtils.checkInvalidArgs(optionParser, options, toLatestOption, allScenarioOptions.$minus(toLatestOption));
CommandLineUtils.checkInvalidArgs(optionParser, options, fromFileOption, allScenarioOptions.$minus(fromFileOption));
CommandLineUtils.checkInvalidArgs(optionParser, options, shiftByOption, allScenarioOptions.$minus(shiftByOption));
}
private void maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consumerConfig) {
private void maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consumerConfig, final boolean dryRun) throws Exception {
final List<String> inputTopics = options.valuesOf(inputTopicsOption);
final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption);
final List<String> notFoundInputTopics = new ArrayList<>();
final List<String> notFoundIntermediateTopics = new ArrayList<>();
String groupId = options.valueOf(applicationIdOption);
final String groupId = options.valueOf(applicationIdOption);
if (inputTopics.size() == 0 && intermediateTopics.size() == 0) {
System.out.println("No input or intermediate topics specified. Skipping seek.");
@ -202,7 +252,7 @@ public class StreamsResetter {
}
if (inputTopics.size() != 0) {
System.out.println("Seek-to-beginning for input topics " + inputTopics);
System.out.println("Reset-offsets for input topics " + inputTopics);
}
if (intermediateTopics.size() != 0) {
System.out.println("Seek-to-end for intermediate topics " + intermediateTopics);
@ -215,6 +265,7 @@ public class StreamsResetter {
notFoundInputTopics.add(topic);
} else {
topicsToSubscribe.add(topic);
}
}
for (final String topic : intermediateTopics) {
@ -249,9 +300,9 @@ public class StreamsResetter {
}
}
maybeSeekToBeginning(client, inputTopicPartitions);
maybeReset(groupId, client, inputTopicPartitions);
maybeSeekToEnd(client, intermediateTopicPartitions);
maybeSeekToEnd(groupId, client, intermediateTopicPartitions);
if (!dryRun) {
for (final TopicPartition p : partitions) {
@ -274,49 +325,225 @@ public class StreamsResetter {
}
}
} catch (final RuntimeException e) {
} catch (final Exception e) {
System.err.println("ERROR: Resetting offsets failed.");
throw e;
}
System.out.println("Done.");
}
private void maybeSeekToEnd(final KafkaConsumer<byte[], byte[]> client,
// visible for testing
public void maybeSeekToEnd(final String groupId,
final Consumer<byte[], byte[]> client,
final Set<TopicPartition> intermediateTopicPartitions) {
final String groupId = options.valueOf(applicationIdOption);
final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption);
if (intermediateTopicPartitions.size() > 0) {
System.out.println("Following intermediate topics offsets will be reset to end (for consumer group " + groupId + ")");
for (final String topic : intermediateTopics) {
if (allTopics.contains(topic)) {
System.out.println("Topic: " + topic);
for (final TopicPartition topicPartition : intermediateTopicPartitions) {
if (allTopics.contains(topicPartition.topic())) {
System.out.println("Topic: " + topicPartition.topic());
}
}
if (!dryRun) {
client.seekToEnd(intermediateTopicPartitions);
client.seekToEnd(intermediateTopicPartitions);
}
}
private void maybeReset(final String groupId,
final Consumer<byte[], byte[]> client,
final Set<TopicPartition> inputTopicPartitions)
throws Exception {
if (inputTopicPartitions.size() > 0) {
System.out.println("Following input topics offsets will be reset to (for consumer group " + groupId + ")");
if (options.has(toOffsetOption)) {
resetOffsetsTo(client, inputTopicPartitions, options.valueOf(toOffsetOption));
} else if (options.has(toEarliestOption)) {
client.seekToBeginning(inputTopicPartitions);
} else if (options.has(toLatestOption)) {
client.seekToEnd(inputTopicPartitions);
} else if (options.has(shiftByOption)) {
shiftOffsetsBy(client, inputTopicPartitions, options.valueOf(shiftByOption));
} else if (options.has(toDatetimeOption)) {
final String ts = options.valueOf(toDatetimeOption);
final Long timestamp = getDateTime(ts);
resetToDatetime(client, inputTopicPartitions, timestamp);
} else if (options.has(byDurationOption)) {
final String duration = options.valueOf(byDurationOption);
final Duration durationParsed = DatatypeFactory.newInstance().newDuration(duration);
resetByDuration(client, inputTopicPartitions, durationParsed);
} else if (options.has(fromFileOption)) {
final String resetPlanPath = options.valueOf(fromFileOption);
final Map<TopicPartition, Long> topicPartitionsAndOffset = getTopicPartitionOffsetFromResetPlan(resetPlanPath);
resetOffsetsFromResetPlan(client, inputTopicPartitions, topicPartitionsAndOffset);
} else {
client.seekToBeginning(inputTopicPartitions);
}
for (final TopicPartition p : inputTopicPartitions) {
final Long position = client.position(p);
System.out.println("Topic: " + p.topic() + " Partition: " + p.partition() + " Offset: " + position);
}
}
}
private void maybeSeekToBeginning(final KafkaConsumer<byte[], byte[]> client,
final Set<TopicPartition> inputTopicPartitions) {
// visible for testing
public void resetOffsetsFromResetPlan(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Map<TopicPartition, Long> topicPartitionsAndOffset) {
final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
final List<String> inputTopics = options.valuesOf(inputTopicsOption);
final String groupId = options.valueOf(applicationIdOption);
final Map<TopicPartition, Long> validatedTopicPartitionsAndOffset =
checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
if (inputTopicPartitions.size() > 0) {
System.out.println("Following input topics offsets will be reset to beginning (for consumer group " + groupId + ")");
for (final String topic : inputTopics) {
if (allTopics.contains(topic)) {
System.out.println("Topic: " + topic);
}
for (final TopicPartition topicPartition : inputTopicPartitions) {
final Long offset = validatedTopicPartitionsAndOffset.get(topicPartition);
client.seek(topicPartition, offset);
}
}
private Map<TopicPartition, Long> getTopicPartitionOffsetFromResetPlan(String resetPlanPath) throws IOException, ParseException {
final String resetPlanCsv = Utils.readFileAsString(resetPlanPath);
return parseResetPlan(resetPlanCsv);
}
private void resetByDuration(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Duration duration) throws DatatypeConfigurationException {
final Date now = new Date();
duration.negate().addTo(now);
final Long timestamp = now.getTime();
final Map<TopicPartition, Long> topicPartitionsAndTimes = new HashMap<>(inputTopicPartitions.size());
for (final TopicPartition topicPartition : inputTopicPartitions) {
topicPartitionsAndTimes.put(topicPartition, timestamp);
}
final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
for (final TopicPartition topicPartition : inputTopicPartitions) {
final Long offset = topicPartitionsAndOffset.get(topicPartition).offset();
client.seek(topicPartition, offset);
}
}
private void resetToDatetime(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Long timestamp) {
final Map<TopicPartition, Long> topicPartitionsAndTimes = new HashMap<>(inputTopicPartitions.size());
for (final TopicPartition topicPartition : inputTopicPartitions) {
topicPartitionsAndTimes.put(topicPartition, timestamp);
}
final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
for (final TopicPartition topicPartition : inputTopicPartitions) {
final Long offset = topicPartitionsAndOffset.get(topicPartition).offset();
client.seek(topicPartition, offset);
}
}
// visible for testing
public void shiftOffsetsBy(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Long shiftBy) {
final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
final Map<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<>(inputTopicPartitions.size());
for (final TopicPartition topicPartition : inputTopicPartitions) {
final Long position = client.position(topicPartition);
final Long offset = position + shiftBy;
topicPartitionsAndOffset.put(topicPartition, offset);
}
final Map<TopicPartition, Long> validatedTopicPartitionsAndOffset =
checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
for (final TopicPartition topicPartition : inputTopicPartitions) {
client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition));
}
}
// visible for testing
public void resetOffsetsTo(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Long offset) {
final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
final Map<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<>(inputTopicPartitions.size());
for (final TopicPartition topicPartition : inputTopicPartitions) {
topicPartitionsAndOffset.put(topicPartition, offset);
}
final Map<TopicPartition, Long> validatedTopicPartitionsAndOffset =
checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
for (final TopicPartition topicPartition : inputTopicPartitions) {
client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition));
}
}
// visible for testing
public Long getDateTime(String timestamp) throws ParseException {
final String[] timestampParts = timestamp.split("T");
if (timestampParts.length < 2) {
throw new ParseException("Error parsing timestamp. It does not contain a 'T' according to ISO8601 format", timestamp.length());
}
final String secondPart = timestampParts[1];
if (secondPart == null || secondPart.isEmpty()) {
throw new ParseException("Error parsing timestamp. Time part after 'T' is null or empty", timestamp.length());
}
if (!(secondPart.contains("+") || secondPart.contains("-") || secondPart.contains("Z"))) {
timestamp = timestamp + "Z";
}
try {
final Date date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse(timestamp);
return date.getTime();
} catch (ParseException e) {
final Date date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(timestamp);
return date.getTime();
}
}
private Map<TopicPartition, Long> parseResetPlan(final String resetPlanCsv) throws ParseException {
final Map<TopicPartition, Long> topicPartitionAndOffset = new HashMap<>();
if (resetPlanCsv == null || resetPlanCsv.isEmpty()) {
throw new ParseException("Error parsing reset plan CSV file. It is empty,", 0);
}
final String[] resetPlanCsvParts = resetPlanCsv.split("\n");
for (final String line : resetPlanCsvParts) {
final String[] lineParts = line.split(",");
if (lineParts.length != 3) {
throw new ParseException("Reset plan CSV file is not following the format `TOPIC,PARTITION,OFFSET`.", 0);
}
if (!dryRun) {
client.seekToBeginning(inputTopicPartitions);
final String topic = lineParts[0];
final int partition = Integer.parseInt(lineParts[1]);
final long offset = Long.parseLong(lineParts[2]);
final TopicPartition topicPartition = new TopicPartition(topic, partition);
topicPartitionAndOffset.put(topicPartition, offset);
}
return topicPartitionAndOffset;
}
private Map<TopicPartition, Long> checkOffsetRange(final Map<TopicPartition, Long> inputTopicPartitionsAndOffset,
final Map<TopicPartition, Long> beginningOffsets,
final Map<TopicPartition, Long> endOffsets) {
final Map<TopicPartition, Long> validatedTopicPartitionsOffsets = new HashMap<>();
for (final Map.Entry<TopicPartition, Long> topicPartitionAndOffset : inputTopicPartitionsAndOffset.entrySet()) {
final Long endOffset = endOffsets.get(topicPartitionAndOffset.getKey());
final Long offset = topicPartitionAndOffset.getValue();
if (offset < endOffset) {
final Long beginningOffset = beginningOffsets.get(topicPartitionAndOffset.getKey());
if (offset > beginningOffset) {
validatedTopicPartitionsOffsets.put(topicPartitionAndOffset.getKey(), offset);
} else {
System.out.println("New offset (" + offset + ") is lower than earliest offset. Value will be set to " + beginningOffset);
validatedTopicPartitionsOffsets.put(topicPartitionAndOffset.getKey(), beginningOffset);
}
} else {
System.out.println("New offset (" + offset + ") is higher than latest offset. Value will be set to " + endOffset);
validatedTopicPartitionsOffsets.put(topicPartitionAndOffset.getKey(), endOffset);
}
}
return validatedTopicPartitionsOffsets;
}
private boolean isInputTopic(final String topic) {
@ -327,7 +554,7 @@ public class StreamsResetter {
return options.valuesOf(intermediateTopicsOption).contains(topic);
}
private void maybeDeleteInternalTopics(final KafkaAdminClient adminClient) {
private void maybeDeleteInternalTopics(final KafkaAdminClient adminClient, final boolean dryRun) {
System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption));
List<String> topicsToDelete = new ArrayList<>();
@ -346,8 +573,9 @@ public class StreamsResetter {
System.out.println("Done.");
}
private void doDelete(final List<String> topicsToDelete,
final KafkaAdminClient adminClient) {
// visible for testing
public void doDelete(final List<String> topicsToDelete,
final AdminClient adminClient) {
boolean hasDeleteErrors = false;
final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete);
final Map<String, KafkaFuture<Void>> results = deleteTopicsResult.values();

View File

@ -84,7 +84,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsNewConsumerExistingTopic(): Unit = {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "new.group", "--topic", topic1, "--to-offset", "50", "--execute")
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "new.group", "--topic", topic1, "--to-offset", "50")
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = new KafkaConsumerGroupService(opts)
@ -112,7 +112,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics")
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--dry-run")
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
@ -133,7 +133,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
executor.shutdown()
val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(calendar.getTime), "--execute")
val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(calendar.getTime))
val opts1 = new ConsumerGroupCommandOptions(cgcArgs1)
val consumerGroupCommand1 = createConsumerGroupService(opts1)
@ -157,7 +157,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
TestUtils.produceMessages(servers, topic1, 50, acks = 1, 100 * 1000)
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics")
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--dry-run")
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
@ -177,7 +177,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
executor.shutdown()
val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(checkpoint), "--execute")
val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(checkpoint))
val opts1 = new ConsumerGroupCommandOptions(cgcArgs1)
val consumerGroupCommand1 = createConsumerGroupService(opts1)
@ -218,7 +218,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
private def invokeGetDateTimeMethod(format: SimpleDateFormat) {
val checkpoint = new Date()
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(checkpoint), "--execute")
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(checkpoint))
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
consumerGroupCommand.getDateTime
@ -226,7 +226,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsByDuration() {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT1M", "--execute")
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT1M")
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
@ -246,7 +246,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsByDurationToEarliest() {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT0.1S", "--execute")
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT0.1S")
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
@ -265,7 +265,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsToEarliest() {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-earliest", "--execute")
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-earliest")
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
@ -284,7 +284,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsToLatest() {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-latest", "--execute")
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-latest")
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
@ -306,7 +306,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsToCurrentOffset() {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-current", "--execute")
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-current")
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
@ -347,7 +347,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsToSpecificOffset() {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-offset", "1", "--execute")
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-offset", "1")
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
@ -367,7 +367,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsShiftPlus() {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "50", "--execute")
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "50")
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
@ -388,7 +388,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsShiftMinus() {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "-50", "--execute")
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "-50")
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
@ -410,7 +410,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsShiftByLowerThanEarliest() {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "-150", "--execute")
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "-150")
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
@ -431,7 +431,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsShiftByHigherThanLatest() {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "150", "--execute")
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "150")
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
@ -452,7 +452,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsToEarliestOnOneTopic() {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", topic1, "--to-earliest", "--execute")
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", topic1, "--to-earliest")
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
@ -471,7 +471,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsToEarliestOnOneTopicAndPartition() {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", String.format("%s:1", topic1), "--to-earliest", "--execute")
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", String.format("%s:1", topic1), "--to-earliest")
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
@ -494,7 +494,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
"--group", group,
"--topic", topic1,
"--topic", topic2,
"--to-earliest", "--execute")
"--to-earliest")
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
@ -521,7 +521,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
"--group", group,
"--topic", String.format("%s:1", topic1),
"--topic", String.format("%s:1", topic2),
"--to-earliest", "--execute")
"--to-earliest")
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
@ -563,7 +563,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consume all messages and save reset offsets plan to file")
val cgcArgsExec = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--from-file", file.getCanonicalPath)
val cgcArgsExec = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--from-file", file.getCanonicalPath, "--dry-run")
val optsExec = new ConsumerGroupCommandOptions(cgcArgsExec)
val consumerGroupCommandExec = createConsumerGroupService(optsExec)

View File

@ -50,6 +50,10 @@ import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
@ -176,7 +180,7 @@ abstract class AbstractResetIntegrationTest {
// RESET
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
streams.cleanUp();
cleanGlobal(null, sslConfig);
cleanGlobal(sslConfig, false, null, null);
TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
@ -194,7 +198,7 @@ abstract class AbstractResetIntegrationTest {
TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
cleanGlobal(null, sslConfig);
cleanGlobal(sslConfig, false, null, null);
}
void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception {
@ -248,7 +252,7 @@ abstract class AbstractResetIntegrationTest {
// RESET
streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
streams.cleanUp();
cleanGlobal(INTERMEDIATE_USER_TOPIC, sslConfig);
cleanGlobal(sslConfig, true, null, null);
TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
@ -263,8 +267,7 @@ abstract class AbstractResetIntegrationTest {
final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
resultTopicConsumerConfig,
OUTPUT_TOPIC_2_RERUN,
40
);
40);
streams.close();
assertThat(resultRerun, equalTo(result));
@ -272,11 +275,194 @@ abstract class AbstractResetIntegrationTest {
TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
cleanGlobal(INTERMEDIATE_USER_TOPIC, sslConfig);
cleanGlobal(sslConfig, true, null, null);
cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC);
}
void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic() throws Exception {
final Properties sslConfig = getClientSslConfig();
final Properties streamsConfiguration = prepareTest();
final Properties resultTopicConsumerConfig = new Properties();
if (sslConfig != null) {
resultTopicConsumerConfig.putAll(sslConfig);
}
resultTopicConsumerConfig.putAll(TestUtils.consumerConfig(
bootstrapServers,
APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
LongDeserializer.class,
LongDeserializer.class));
// RUN
KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
streams.start();
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
resultTopicConsumerConfig,
OUTPUT_TOPIC,
10);
streams.close();
TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
// RESET
final File resetFile = File.createTempFile("reset", ".csv");
try (BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile))) {
writer.write(INPUT_TOPIC + ",0,1");
writer.close();
}
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
streams.cleanUp();
cleanGlobal(sslConfig, false, "--from-file", resetFile.getAbsolutePath());
TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
assertInternalTopicsGotDeleted(null);
resetFile.deleteOnExit();
// RE-RUN
streams.start();
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
resultTopicConsumerConfig,
OUTPUT_TOPIC,
5);
streams.close();
result.remove(0);
assertThat(resultRerun, equalTo(result));
TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
cleanGlobal(sslConfig, false, null, null);
}
void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic() throws Exception {
final Properties sslConfig = getClientSslConfig();
final Properties streamsConfiguration = prepareTest();
final Properties resultTopicConsumerConfig = new Properties();
if (sslConfig != null) {
resultTopicConsumerConfig.putAll(sslConfig);
}
resultTopicConsumerConfig.putAll(TestUtils.consumerConfig(
bootstrapServers,
APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
LongDeserializer.class,
LongDeserializer.class));
// RUN
KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
streams.start();
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
resultTopicConsumerConfig,
OUTPUT_TOPIC,
10);
streams.close();
TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
// RESET
final File resetFile = File.createTempFile("reset", ".csv");
try (BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile))) {
writer.write(INPUT_TOPIC + ",0,1");
writer.close();
}
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
streams.cleanUp();
final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
final Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.DATE, -1);
cleanGlobal(sslConfig, false, "--to-datetime", format.format(calendar.getTime()));
TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
assertInternalTopicsGotDeleted(null);
resetFile.deleteOnExit();
// RE-RUN
streams.start();
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
resultTopicConsumerConfig,
OUTPUT_TOPIC,
10);
streams.close();
assertThat(resultRerun, equalTo(result));
TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
cleanGlobal(sslConfig, false, null, null);
}
void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws Exception {
final Properties sslConfig = getClientSslConfig();
final Properties streamsConfiguration = prepareTest();
final Properties resultTopicConsumerConfig = new Properties();
if (sslConfig != null) {
resultTopicConsumerConfig.putAll(sslConfig);
}
resultTopicConsumerConfig.putAll(TestUtils.consumerConfig(
bootstrapServers,
APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
LongDeserializer.class,
LongDeserializer.class));
// RUN
KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
streams.start();
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
resultTopicConsumerConfig,
OUTPUT_TOPIC,
10);
streams.close();
TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
// RESET
final File resetFile = File.createTempFile("reset", ".csv");
try (BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile))) {
writer.write(INPUT_TOPIC + ",0,1");
writer.close();
}
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
streams.cleanUp();
cleanGlobal(sslConfig, false, "--by-duration", "PT1M");
TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
assertInternalTopicsGotDeleted(null);
resetFile.deleteOnExit();
// RE-RUN
streams.start();
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
resultTopicConsumerConfig,
OUTPUT_TOPIC,
10);
streams.close();
assertThat(resultRerun, equalTo(result));
TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
cleanGlobal(sslConfig, false, null, null);
}
private Properties prepareTest() throws IOException {
Properties streamsConfiguration = getClientSslConfig();
if (streamsConfiguration == null) {
@ -302,6 +488,10 @@ abstract class AbstractResetIntegrationTest {
private void prepareInputData() throws Exception {
cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
add10InputElements();
}
private void add10InputElements() throws java.util.concurrent.ExecutionException, InterruptedException {
Properties producerConfig = getClientSslConfig();
if (producerConfig == null) {
producerConfig = new Properties();
@ -379,40 +569,39 @@ abstract class AbstractResetIntegrationTest {
return builder.build();
}
private void cleanGlobal(final String intermediateUserTopic, final Properties sslConfig) throws Exception {
private void cleanGlobal(final Properties sslConfig,
final boolean withIntermediateTopics,
final String resetScenario,
final String resetScenarioArg) throws Exception {
// leaving --zookeeper arg here to ensure tool works if users add it
final String[] parameters;
if (intermediateUserTopic != null) {
parameters = new String[]{
"--application-id", APP_ID + testNo,
final List<String> parameterList = new ArrayList<>(
Arrays.asList("--application-id", APP_ID + testNo,
"--bootstrap-servers", bootstrapServers,
"--input-topics", INPUT_TOPIC,
"--intermediate-topics", INTERMEDIATE_USER_TOPIC,
"--zookeeper", "localhost:2181"
};
} else {
if (sslConfig != null) {
final File configFile = TestUtils.tempFile();
final BufferedWriter writer = new BufferedWriter(new FileWriter(configFile));
writer.write(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG + "=SSL\n");
writer.write(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG + "=" + sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG) + "\n");
writer.write(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG + "=" + sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG) + "\n");
writer.close();
parameters = new String[]{
"--application-id", APP_ID + testNo,
"--bootstrap-servers", bootstrapServers,
"--input-topics", INPUT_TOPIC,
"--config-file", configFile.getAbsolutePath()
};
} else {
parameters = new String[]{
"--application-id", APP_ID + testNo,
"--bootstrap-servers", bootstrapServers,
"--input-topics", INPUT_TOPIC
};
}
"--input-topics", INPUT_TOPIC));
if (withIntermediateTopics) {
parameterList.add("--intermediate-topics");
parameterList.add(INTERMEDIATE_USER_TOPIC);
}
if (sslConfig != null) {
final File configFile = TestUtils.tempFile();
final BufferedWriter writer = new BufferedWriter(new FileWriter(configFile));
writer.write(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG + "=SSL\n");
writer.write(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG + "=" + sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG) + "\n");
writer.write(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG + "=" + sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG) + "\n");
writer.close();
parameterList.add("--config-file");
parameterList.add(configFile.getAbsolutePath());
}
if (resetScenario != null) {
parameterList.add(resetScenario);
}
if (resetScenarioArg != null) {
parameterList.add(resetScenarioArg);
}
final String[] parameters = parameterList.toArray(new String[parameterList.size()]);
final Properties cleanUpConfig = new Properties();
cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);

View File

@ -35,6 +35,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER;
static {
final Properties props = new Properties();
// we double the value passed to `time.sleep` in each iteration in one of the map functions, so we disable
@ -55,13 +56,29 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
beforePrepareTest();
}
@Test
public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
super.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic();
}
@Test
public void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception {
super.testReprocessingFromScratchAfterResetWithIntermediateUserTopic();
}
@Test
public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
super.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic();
public void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic() throws Exception {
super.testReprocessingFromFileAfterResetWithoutIntermediateUserTopic();
}
@Test
public void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic() throws Exception {
super.testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic();
}
@Test
public void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws Exception {
super.testReprocessingByDurationAfterResetWithoutIntermediateUserTopic();
}
}

View File

@ -51,6 +51,7 @@ public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER;
static {
final Properties props = new Properties();
// we double the value passed to `time.sleep` in each iteration in one of the map functions, so we disable
@ -89,5 +90,4 @@ public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest {
public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
super.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic();
}
}

View File

@ -0,0 +1,293 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tools;
import kafka.tools.StreamsResetter;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.admin.MockKafkaAdminClientEnv;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DeleteTopicsResponse;
import org.junit.Before;
import org.junit.Test;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
/**
*
*/
public class StreamsResetterTest {
private static final String TOPIC = "topic1";
private final StreamsResetter streamsResetter = new StreamsResetter();
private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
private final TopicPartition topicPartition = new TopicPartition(TOPIC, 0);
private final Set<TopicPartition> inputTopicPartitions = new HashSet<>(Collections.singletonList(topicPartition));
@Before
public void setUp() {
consumer.assign(Collections.singletonList(topicPartition));
consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0L, new byte[] {}, new byte[] {}));
consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1L, new byte[] {}, new byte[] {}));
consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 2L, new byte[] {}, new byte[] {}));
consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 3L, new byte[] {}, new byte[] {}));
consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 4L, new byte[] {}, new byte[] {}));
}
@Test
public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
final Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(topicPartition, 4L);
consumer.updateEndOffsets(endOffsets);
final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(topicPartition, 0L);
consumer.updateBeginningOffsets(beginningOffsets);
streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L);
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
assertEquals(3, records.count());
}
@Test
public void testResetToSpecificOffsetWhenBeforeBeginningOffset() {
final Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(topicPartition, 4L);
consumer.updateEndOffsets(endOffsets);
final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(topicPartition, 3L);
consumer.updateBeginningOffsets(beginningOffsets);
streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L);
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
assertEquals(2, records.count());
}
@Test
public void testResetToSpecificOffsetWhenAfterEndOffset() {
final Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(topicPartition, 3L);
consumer.updateEndOffsets(endOffsets);
final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(topicPartition, 0L);
consumer.updateBeginningOffsets(beginningOffsets);
streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 4L);
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
assertEquals(2, records.count());
}
@Test
public void testShiftOffsetByWhenBetweenBeginningAndEndOffset() {
final Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(topicPartition, 4L);
consumer.updateEndOffsets(endOffsets);
final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(topicPartition, 0L);
consumer.updateBeginningOffsets(beginningOffsets);
streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 3L);
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
assertEquals(2, records.count());
}
@Test
public void testShiftOffsetByWhenBeforeBeginningOffset() {
final Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(topicPartition, 4L);
consumer.updateEndOffsets(endOffsets);
final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(topicPartition, 0L);
consumer.updateBeginningOffsets(beginningOffsets);
streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, -3L);
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
assertEquals(5, records.count());
}
@Test
public void testShiftOffsetByWhenAfterEndOffset() {
final Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(topicPartition, 3L);
consumer.updateEndOffsets(endOffsets);
final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(topicPartition, 0L);
consumer.updateBeginningOffsets(beginningOffsets);
streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 5L);
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
assertEquals(2, records.count());
}
@Test
public void testResetUsingPlanWhenBetweenBeginningAndEndOffset() {
final Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(topicPartition, 4L);
consumer.updateEndOffsets(endOffsets);
final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(topicPartition, 0L);
consumer.updateBeginningOffsets(beginningOffsets);
final Map<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<>();
topicPartitionsAndOffset.put(topicPartition, 3L);
streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset);
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
assertEquals(2, records.count());
}
@Test
public void testResetUsingPlanWhenBeforeBeginningOffset() {
final Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(topicPartition, 4L);
consumer.updateEndOffsets(endOffsets);
final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(topicPartition, 3L);
consumer.updateBeginningOffsets(beginningOffsets);
final Map<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<>();
topicPartitionsAndOffset.put(topicPartition, 1L);
streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset);
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
assertEquals(2, records.count());
}
@Test
public void testResetUsingPlanWhenAfterEndOffset() {
final Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(topicPartition, 3L);
consumer.updateEndOffsets(endOffsets);
final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(topicPartition, 0L);
consumer.updateBeginningOffsets(beginningOffsets);
final Map<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<>();
topicPartitionsAndOffset.put(topicPartition, 5L);
streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset);
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
assertEquals(2, records.count());
}
@Test
public void shouldSeekToEndOffset() {
final Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(topicPartition, 3L);
consumer.updateEndOffsets(endOffsets);
final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(topicPartition, 0L);
consumer.updateBeginningOffsets(beginningOffsets);
final Set<TopicPartition> intermediateTopicPartitions = new HashSet<>();
intermediateTopicPartitions.add(topicPartition);
streamsResetter.maybeSeekToEnd("g1", consumer, intermediateTopicPartitions);
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
assertEquals(2, records.count());
}
@Test
public void shouldDeleteTopic() {
Cluster cluster = createCluster(1);
try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) {
env.kafkaClient().setNode(cluster.controller());
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
env.kafkaClient().prepareResponse(new DeleteTopicsResponse(Collections.singletonMap(TOPIC, Errors.NONE)));
streamsResetter.doDelete(Collections.singletonList(topicPartition.topic()), env.adminClient());
}
}
private Cluster createCluster(int numNodes) {
HashMap<Integer, Node> nodes = new HashMap<>();
for (int i = 0; i != numNodes; ++i) {
nodes.put(i, new Node(i, "localhost", 8121 + i));
}
return new Cluster("mockClusterId", nodes.values(),
Collections.<PartitionInfo>emptySet(), Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
}
@Test
public void shouldAcceptValidDateFormats() throws ParseException {
//check valid formats
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"));
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"));
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX"));
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"));
}
@Test
public void shouldThrowOnInvalidDateFormat() throws ParseException {
//check some invalid formats
try {
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"));
fail("Call to getDateTime should fail");
} catch (final Exception e) {
e.printStackTrace();
}
try {
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.X"));
fail("Call to getDateTime should fail");
} catch (final Exception e) {
e.printStackTrace();
}
}
private void invokeGetDateTimeMethod(final SimpleDateFormat format) throws ParseException {
final Date checkpoint = new Date();
final StreamsResetter streamsResetter = new StreamsResetter();
final String formattedCheckpoint = format.format(checkpoint);
streamsResetter.getDateTime(formattedCheckpoint);
}
}