From 30f08d158a97e0d83b59e5910fbb0a84c5c6d14f Mon Sep 17 00:00:00 2001 From: Jorge Quilcate Otoya Date: Wed, 6 Dec 2017 11:38:38 -0800 Subject: [PATCH] KAFKA-5520: KIP-171; Extend Consumer Group Reset Offset for Stream Application MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application Merge changes from KIP-198 Ref: https://github.com/apache/kafka/pull/3831 Author: Jorge Quilcate Otoya Author: Ismael Juma Author: Matthias J. Sax Author: Manikumar Reddy Author: Guozhang Wang Author: Apurva Mehta Author: Rajini Sivaram Author: Jason Gustafson Author: Vahid Hashemian Author: Bill Bejeck Author: Dong Lin Author: Soenke Liebau Author: Colin P. Mccabe Author: Damian Guy Author: Xavier Léauté Author: Maytee Chinavanichkit Author: Joel Hamill Author: Paolo Patierno Author: siva santhalingam Author: Tommy Becker Author: Mickael Maison Author: Onur Karaman Author: tedyu Author: Xin Li Author: Magnus Edenhill Author: Manjula K Author: Hugo Louro Author: Jeff Widman Author: bartdevylder Author: Ewen Cheslack-Postava Author: Jacek Laskowski Author: Tom Bentley Author: Konstantine Karantasis Reviewers: Matthias J. Sax , Guozhang Wang Closes #4159 from jeqo/feature/kip-171 --- checkstyle/import-control-core.xml | 3 + checkstyle/import-control.xml | 4 + .../kafka/admin/ConsumerGroupCommand.scala | 8 +- .../scala/kafka/tools/StreamsResetter.java | 300 +++++++++++++++--- .../admin/ResetConsumerGroupOffsetTest.scala | 42 +-- .../AbstractResetIntegrationTest.java | 263 ++++++++++++--- .../integration/ResetIntegrationTest.java | 21 +- .../ResetIntegrationWithSslTest.java | 2 +- .../streams/tools/StreamsResetterTest.java | 293 +++++++++++++++++ 9 files changed, 835 insertions(+), 101 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index bf06a19de34..48659867000 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -60,6 +60,9 @@ + + + diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 403cae2b7c2..a2b508e7b2a 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -233,6 +233,10 @@ + + + + diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index d71f062e3b4..9e35ebcd9b0 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -529,8 +529,8 @@ object ConsumerGroupCommand extends Logging { case "Empty" | "Dead" => val partitionsToReset = getPartitionsToReset(groupId) val preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset) - val execute = opts.options.has(opts.executeOpt) - if (execute) + val dryRun = opts.options.has(opts.dryRunOpt) + if (!dryRun) getConsumer().commitSync(preparedOffsets.asJava) preparedOffsets case currentState => @@ -727,7 +727,7 @@ object ConsumerGroupCommand extends Logging { "Has 3 execution options: (default) to plan which offsets to reset, --execute to execute the reset-offsets process, and --export to export the results to a CSV format." + nl + "Has the following scenarios to choose: --to-datetime, --by-period, --to-earliest, --to-latest, --shift-by, --from-file, --to-current. One scenario must be choose" + nl + "To define the scope use: --all-topics or --topic. . One scope must be choose, unless you use '--from-file' scenario" - val ExecuteDoc = "Execute operation. Supported operations: reset-offsets." + val DryRunDoc = "Only show results without executing changes on Consumer Groups. Supported operations: reset-offsets." val ExportDoc = "Export operation execution to a CSV file. Supported operations: reset-offsets." val ResetToOffsetDoc = "Reset offsets to a specific offset." val ResetFromFileDoc = "Reset offsets to values defined in CSV file." @@ -770,7 +770,7 @@ object ConsumerGroupCommand extends Logging { .describedAs("command config property file") .ofType(classOf[String]) val resetOffsetsOpt = parser.accepts("reset-offsets", ResetOffsetsDoc) - val executeOpt = parser.accepts("execute", ExecuteDoc) + val dryRunOpt = parser.accepts("dry-run", DryRunDoc) val exportOpt = parser.accepts("export", ExportDoc) val resetToOffsetOpt = parser.accepts("to-offset", ResetToOffsetDoc) .withRequiredArg() diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 55392588bb9..4851a948399 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -21,11 +21,14 @@ import joptsimple.OptionParser; import joptsimple.OptionSet; import joptsimple.OptionSpec; import joptsimple.OptionSpecBuilder; +import kafka.utils.CommandLineUtils; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DeleteTopicsResult; import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.annotation.InterfaceStability; @@ -33,8 +36,14 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; +import javax.xml.datatype.DatatypeConfigurationException; +import javax.xml.datatype.DatatypeFactory; +import javax.xml.datatype.Duration; import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -74,12 +83,19 @@ public class StreamsResetter { private static OptionSpec applicationIdOption; private static OptionSpec inputTopicsOption; private static OptionSpec intermediateTopicsOption; + private static OptionSpec toOffsetOption; + private static OptionSpec toDatetimeOption; + private static OptionSpec byDurationOption; + private static OptionSpecBuilder toEarliestOption; + private static OptionSpecBuilder toLatestOption; + private static OptionSpec fromFileOption; + private static OptionSpec shiftByOption; private static OptionSpecBuilder dryRunOption; private static OptionSpec commandConfigOption; private OptionSet options = null; private final List allTopics = new LinkedList<>(); - private boolean dryRun = false; + public int run(final String[] args) { return run(args, new Properties()); @@ -93,7 +109,7 @@ public class StreamsResetter { try { parseArguments(args); - dryRun = options.has(dryRunOption); + final boolean dryRun = options.has(dryRunOption); final String groupId = options.valueOf(applicationIdOption); final Properties properties = new Properties(); @@ -114,8 +130,8 @@ public class StreamsResetter { final HashMap consumerConfig = new HashMap<>(config); consumerConfig.putAll(properties); - maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig); - maybeDeleteInternalTopics(kafkaAdminClient); + maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, dryRun); + maybeDeleteInternalTopics(kafkaAdminClient, dryRun); } catch (final Throwable e) { exitCode = EXIT_CODE_ERROR; @@ -169,6 +185,24 @@ public class StreamsResetter { .ofType(String.class) .withValuesSeparatedBy(',') .describedAs("list"); + toOffsetOption = optionParser.accepts("to-offset", "Reset offsets to a specific offset.") + .withRequiredArg() + .ofType(Long.class); + toDatetimeOption = optionParser.accepts("to-datetime", "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'") + .withRequiredArg() + .ofType(String.class); + byDurationOption = optionParser.accepts("by-duration", "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'") + .withRequiredArg() + .ofType(String.class); + toEarliestOption = optionParser.accepts("to-earliest", "Reset offsets to earliest offset."); + toLatestOption = optionParser.accepts("to-latest", "Reset offsets to latest offset."); + fromFileOption = optionParser.accepts("from-file", "Reset offsets to values defined in CSV file.") + .withRequiredArg() + .ofType(String.class); + shiftByOption = optionParser.accepts("shift-by", "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative") + .withRequiredArg() + .describedAs("number-of-offsets") + .ofType(Long.class); commandConfigOption = optionParser.accepts("config-file", "Property file containing configs to be passed to admin clients and embedded consumer.") .withRequiredArg() .ofType(String.class) @@ -184,17 +218,33 @@ public class StreamsResetter { printHelp(optionParser); throw e; } + + scala.collection.immutable.HashSet> allScenarioOptions = new scala.collection.immutable.HashSet<>(); + allScenarioOptions.$plus(toOffsetOption); + allScenarioOptions.$plus(toDatetimeOption); + allScenarioOptions.$plus(byDurationOption); + allScenarioOptions.$plus(toEarliestOption); + allScenarioOptions.$plus(toLatestOption); + allScenarioOptions.$plus(fromFileOption); + allScenarioOptions.$plus(shiftByOption); + + CommandLineUtils.checkInvalidArgs(optionParser, options, toOffsetOption, allScenarioOptions.$minus(toOffsetOption)); + CommandLineUtils.checkInvalidArgs(optionParser, options, toDatetimeOption, allScenarioOptions.$minus(toDatetimeOption)); + CommandLineUtils.checkInvalidArgs(optionParser, options, byDurationOption, allScenarioOptions.$minus(byDurationOption)); + CommandLineUtils.checkInvalidArgs(optionParser, options, toEarliestOption, allScenarioOptions.$minus(toEarliestOption)); + CommandLineUtils.checkInvalidArgs(optionParser, options, toLatestOption, allScenarioOptions.$minus(toLatestOption)); + CommandLineUtils.checkInvalidArgs(optionParser, options, fromFileOption, allScenarioOptions.$minus(fromFileOption)); + CommandLineUtils.checkInvalidArgs(optionParser, options, shiftByOption, allScenarioOptions.$minus(shiftByOption)); } - private void maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consumerConfig) { + private void maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consumerConfig, final boolean dryRun) throws Exception { final List inputTopics = options.valuesOf(inputTopicsOption); final List intermediateTopics = options.valuesOf(intermediateTopicsOption); - final List notFoundInputTopics = new ArrayList<>(); final List notFoundIntermediateTopics = new ArrayList<>(); - String groupId = options.valueOf(applicationIdOption); + final String groupId = options.valueOf(applicationIdOption); if (inputTopics.size() == 0 && intermediateTopics.size() == 0) { System.out.println("No input or intermediate topics specified. Skipping seek."); @@ -202,7 +252,7 @@ public class StreamsResetter { } if (inputTopics.size() != 0) { - System.out.println("Seek-to-beginning for input topics " + inputTopics); + System.out.println("Reset-offsets for input topics " + inputTopics); } if (intermediateTopics.size() != 0) { System.out.println("Seek-to-end for intermediate topics " + intermediateTopics); @@ -215,6 +265,7 @@ public class StreamsResetter { notFoundInputTopics.add(topic); } else { topicsToSubscribe.add(topic); + } } for (final String topic : intermediateTopics) { @@ -249,9 +300,9 @@ public class StreamsResetter { } } - maybeSeekToBeginning(client, inputTopicPartitions); + maybeReset(groupId, client, inputTopicPartitions); - maybeSeekToEnd(client, intermediateTopicPartitions); + maybeSeekToEnd(groupId, client, intermediateTopicPartitions); if (!dryRun) { for (final TopicPartition p : partitions) { @@ -274,49 +325,225 @@ public class StreamsResetter { } } - } catch (final RuntimeException e) { + } catch (final Exception e) { System.err.println("ERROR: Resetting offsets failed."); throw e; } System.out.println("Done."); } - private void maybeSeekToEnd(final KafkaConsumer client, + // visible for testing + public void maybeSeekToEnd(final String groupId, + final Consumer client, final Set intermediateTopicPartitions) { - - final String groupId = options.valueOf(applicationIdOption); - final List intermediateTopics = options.valuesOf(intermediateTopicsOption); - if (intermediateTopicPartitions.size() > 0) { System.out.println("Following intermediate topics offsets will be reset to end (for consumer group " + groupId + ")"); - for (final String topic : intermediateTopics) { - if (allTopics.contains(topic)) { - System.out.println("Topic: " + topic); + for (final TopicPartition topicPartition : intermediateTopicPartitions) { + if (allTopics.contains(topicPartition.topic())) { + System.out.println("Topic: " + topicPartition.topic()); } } - if (!dryRun) { - client.seekToEnd(intermediateTopicPartitions); + + client.seekToEnd(intermediateTopicPartitions); + } + } + + private void maybeReset(final String groupId, + final Consumer client, + final Set inputTopicPartitions) + throws Exception { + + if (inputTopicPartitions.size() > 0) { + System.out.println("Following input topics offsets will be reset to (for consumer group " + groupId + ")"); + if (options.has(toOffsetOption)) { + resetOffsetsTo(client, inputTopicPartitions, options.valueOf(toOffsetOption)); + } else if (options.has(toEarliestOption)) { + client.seekToBeginning(inputTopicPartitions); + } else if (options.has(toLatestOption)) { + client.seekToEnd(inputTopicPartitions); + } else if (options.has(shiftByOption)) { + shiftOffsetsBy(client, inputTopicPartitions, options.valueOf(shiftByOption)); + } else if (options.has(toDatetimeOption)) { + final String ts = options.valueOf(toDatetimeOption); + final Long timestamp = getDateTime(ts); + resetToDatetime(client, inputTopicPartitions, timestamp); + } else if (options.has(byDurationOption)) { + final String duration = options.valueOf(byDurationOption); + final Duration durationParsed = DatatypeFactory.newInstance().newDuration(duration); + resetByDuration(client, inputTopicPartitions, durationParsed); + } else if (options.has(fromFileOption)) { + final String resetPlanPath = options.valueOf(fromFileOption); + final Map topicPartitionsAndOffset = getTopicPartitionOffsetFromResetPlan(resetPlanPath); + resetOffsetsFromResetPlan(client, inputTopicPartitions, topicPartitionsAndOffset); + } else { + client.seekToBeginning(inputTopicPartitions); + } + + for (final TopicPartition p : inputTopicPartitions) { + final Long position = client.position(p); + System.out.println("Topic: " + p.topic() + " Partition: " + p.partition() + " Offset: " + position); } } } - private void maybeSeekToBeginning(final KafkaConsumer client, - final Set inputTopicPartitions) { + // visible for testing + public void resetOffsetsFromResetPlan(Consumer client, Set inputTopicPartitions, Map topicPartitionsAndOffset) { + final Map endOffsets = client.endOffsets(inputTopicPartitions); + final Map beginningOffsets = client.beginningOffsets(inputTopicPartitions); - final List inputTopics = options.valuesOf(inputTopicsOption); - final String groupId = options.valueOf(applicationIdOption); + final Map validatedTopicPartitionsAndOffset = + checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets); - if (inputTopicPartitions.size() > 0) { - System.out.println("Following input topics offsets will be reset to beginning (for consumer group " + groupId + ")"); - for (final String topic : inputTopics) { - if (allTopics.contains(topic)) { - System.out.println("Topic: " + topic); - } + for (final TopicPartition topicPartition : inputTopicPartitions) { + final Long offset = validatedTopicPartitionsAndOffset.get(topicPartition); + client.seek(topicPartition, offset); + } + } + + private Map getTopicPartitionOffsetFromResetPlan(String resetPlanPath) throws IOException, ParseException { + final String resetPlanCsv = Utils.readFileAsString(resetPlanPath); + return parseResetPlan(resetPlanCsv); + } + + private void resetByDuration(Consumer client, Set inputTopicPartitions, Duration duration) throws DatatypeConfigurationException { + final Date now = new Date(); + duration.negate().addTo(now); + final Long timestamp = now.getTime(); + + final Map topicPartitionsAndTimes = new HashMap<>(inputTopicPartitions.size()); + for (final TopicPartition topicPartition : inputTopicPartitions) { + topicPartitionsAndTimes.put(topicPartition, timestamp); + } + + final Map topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes); + + for (final TopicPartition topicPartition : inputTopicPartitions) { + final Long offset = topicPartitionsAndOffset.get(topicPartition).offset(); + client.seek(topicPartition, offset); + } + } + + private void resetToDatetime(Consumer client, Set inputTopicPartitions, Long timestamp) { + final Map topicPartitionsAndTimes = new HashMap<>(inputTopicPartitions.size()); + for (final TopicPartition topicPartition : inputTopicPartitions) { + topicPartitionsAndTimes.put(topicPartition, timestamp); + } + + final Map topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes); + + for (final TopicPartition topicPartition : inputTopicPartitions) { + final Long offset = topicPartitionsAndOffset.get(topicPartition).offset(); + client.seek(topicPartition, offset); + } + } + + // visible for testing + public void shiftOffsetsBy(Consumer client, Set inputTopicPartitions, Long shiftBy) { + final Map endOffsets = client.endOffsets(inputTopicPartitions); + final Map beginningOffsets = client.beginningOffsets(inputTopicPartitions); + + final Map topicPartitionsAndOffset = new HashMap<>(inputTopicPartitions.size()); + for (final TopicPartition topicPartition : inputTopicPartitions) { + final Long position = client.position(topicPartition); + final Long offset = position + shiftBy; + topicPartitionsAndOffset.put(topicPartition, offset); + } + + final Map validatedTopicPartitionsAndOffset = + checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets); + + for (final TopicPartition topicPartition : inputTopicPartitions) { + client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition)); + } + } + + // visible for testing + public void resetOffsetsTo(Consumer client, Set inputTopicPartitions, Long offset) { + final Map endOffsets = client.endOffsets(inputTopicPartitions); + final Map beginningOffsets = client.beginningOffsets(inputTopicPartitions); + + final Map topicPartitionsAndOffset = new HashMap<>(inputTopicPartitions.size()); + for (final TopicPartition topicPartition : inputTopicPartitions) { + topicPartitionsAndOffset.put(topicPartition, offset); + } + + final Map validatedTopicPartitionsAndOffset = + checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets); + + for (final TopicPartition topicPartition : inputTopicPartitions) { + client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition)); + } + } + + // visible for testing + public Long getDateTime(String timestamp) throws ParseException { + final String[] timestampParts = timestamp.split("T"); + if (timestampParts.length < 2) { + throw new ParseException("Error parsing timestamp. It does not contain a 'T' according to ISO8601 format", timestamp.length()); + } + + final String secondPart = timestampParts[1]; + if (secondPart == null || secondPart.isEmpty()) { + throw new ParseException("Error parsing timestamp. Time part after 'T' is null or empty", timestamp.length()); + } + + if (!(secondPart.contains("+") || secondPart.contains("-") || secondPart.contains("Z"))) { + timestamp = timestamp + "Z"; + } + + try { + final Date date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse(timestamp); + return date.getTime(); + } catch (ParseException e) { + final Date date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(timestamp); + return date.getTime(); + } + } + + private Map parseResetPlan(final String resetPlanCsv) throws ParseException { + final Map topicPartitionAndOffset = new HashMap<>(); + if (resetPlanCsv == null || resetPlanCsv.isEmpty()) { + throw new ParseException("Error parsing reset plan CSV file. It is empty,", 0); + } + + final String[] resetPlanCsvParts = resetPlanCsv.split("\n"); + + for (final String line : resetPlanCsvParts) { + final String[] lineParts = line.split(","); + if (lineParts.length != 3) { + throw new ParseException("Reset plan CSV file is not following the format `TOPIC,PARTITION,OFFSET`.", 0); } - if (!dryRun) { - client.seekToBeginning(inputTopicPartitions); + final String topic = lineParts[0]; + final int partition = Integer.parseInt(lineParts[1]); + final long offset = Long.parseLong(lineParts[2]); + final TopicPartition topicPartition = new TopicPartition(topic, partition); + topicPartitionAndOffset.put(topicPartition, offset); + } + + return topicPartitionAndOffset; + } + + private Map checkOffsetRange(final Map inputTopicPartitionsAndOffset, + final Map beginningOffsets, + final Map endOffsets) { + final Map validatedTopicPartitionsOffsets = new HashMap<>(); + for (final Map.Entry topicPartitionAndOffset : inputTopicPartitionsAndOffset.entrySet()) { + final Long endOffset = endOffsets.get(topicPartitionAndOffset.getKey()); + final Long offset = topicPartitionAndOffset.getValue(); + if (offset < endOffset) { + final Long beginningOffset = beginningOffsets.get(topicPartitionAndOffset.getKey()); + if (offset > beginningOffset) { + validatedTopicPartitionsOffsets.put(topicPartitionAndOffset.getKey(), offset); + } else { + System.out.println("New offset (" + offset + ") is lower than earliest offset. Value will be set to " + beginningOffset); + validatedTopicPartitionsOffsets.put(topicPartitionAndOffset.getKey(), beginningOffset); + } + } else { + System.out.println("New offset (" + offset + ") is higher than latest offset. Value will be set to " + endOffset); + validatedTopicPartitionsOffsets.put(topicPartitionAndOffset.getKey(), endOffset); } } + return validatedTopicPartitionsOffsets; } private boolean isInputTopic(final String topic) { @@ -327,7 +554,7 @@ public class StreamsResetter { return options.valuesOf(intermediateTopicsOption).contains(topic); } - private void maybeDeleteInternalTopics(final KafkaAdminClient adminClient) { + private void maybeDeleteInternalTopics(final KafkaAdminClient adminClient, final boolean dryRun) { System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption)); List topicsToDelete = new ArrayList<>(); @@ -346,8 +573,9 @@ public class StreamsResetter { System.out.println("Done."); } - private void doDelete(final List topicsToDelete, - final KafkaAdminClient adminClient) { + // visible for testing + public void doDelete(final List topicsToDelete, + final AdminClient adminClient) { boolean hasDeleteErrors = false; final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete); final Map> results = deleteTopicsResult.values(); diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala index 82274871af1..f26d3c45b1e 100644 --- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala @@ -84,7 +84,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsNewConsumerExistingTopic(): Unit = { - val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "new.group", "--topic", topic1, "--to-offset", "50", "--execute") + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "new.group", "--topic", topic1, "--to-offset", "50") val opts = new ConsumerGroupCommandOptions(cgcArgs) val consumerGroupCommand = new KafkaConsumerGroupService(opts) @@ -112,7 +112,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000) - val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics") + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--dry-run") val opts = new ConsumerGroupCommandOptions(cgcArgs) val consumerGroupCommand = createConsumerGroupService(opts) @@ -133,7 +133,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { executor.shutdown() - val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(calendar.getTime), "--execute") + val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(calendar.getTime)) val opts1 = new ConsumerGroupCommandOptions(cgcArgs1) val consumerGroupCommand1 = createConsumerGroupService(opts1) @@ -157,7 +157,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { TestUtils.produceMessages(servers, topic1, 50, acks = 1, 100 * 1000) - val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics") + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--dry-run") val opts = new ConsumerGroupCommandOptions(cgcArgs) val consumerGroupCommand = createConsumerGroupService(opts) @@ -177,7 +177,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { executor.shutdown() - val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(checkpoint), "--execute") + val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(checkpoint)) val opts1 = new ConsumerGroupCommandOptions(cgcArgs1) val consumerGroupCommand1 = createConsumerGroupService(opts1) @@ -218,7 +218,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { private def invokeGetDateTimeMethod(format: SimpleDateFormat) { val checkpoint = new Date() - val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(checkpoint), "--execute") + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(checkpoint)) val opts = new ConsumerGroupCommandOptions(cgcArgs) val consumerGroupCommand = createConsumerGroupService(opts) consumerGroupCommand.getDateTime @@ -226,7 +226,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsByDuration() { - val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT1M", "--execute") + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT1M") val opts = new ConsumerGroupCommandOptions(cgcArgs) val consumerGroupCommand = createConsumerGroupService(opts) @@ -246,7 +246,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsByDurationToEarliest() { - val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT0.1S", "--execute") + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT0.1S") val opts = new ConsumerGroupCommandOptions(cgcArgs) val consumerGroupCommand = createConsumerGroupService(opts) @@ -265,7 +265,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsToEarliest() { - val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-earliest", "--execute") + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-earliest") val opts = new ConsumerGroupCommandOptions(cgcArgs) val consumerGroupCommand = createConsumerGroupService(opts) @@ -284,7 +284,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsToLatest() { - val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-latest", "--execute") + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-latest") val opts = new ConsumerGroupCommandOptions(cgcArgs) val consumerGroupCommand = createConsumerGroupService(opts) @@ -306,7 +306,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsToCurrentOffset() { - val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-current", "--execute") + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-current") val opts = new ConsumerGroupCommandOptions(cgcArgs) val consumerGroupCommand = createConsumerGroupService(opts) @@ -347,7 +347,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsToSpecificOffset() { - val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-offset", "1", "--execute") + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-offset", "1") val opts = new ConsumerGroupCommandOptions(cgcArgs) val consumerGroupCommand = createConsumerGroupService(opts) @@ -367,7 +367,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsShiftPlus() { - val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "50", "--execute") + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "50") val opts = new ConsumerGroupCommandOptions(cgcArgs) val consumerGroupCommand = createConsumerGroupService(opts) @@ -388,7 +388,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsShiftMinus() { - val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "-50", "--execute") + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "-50") val opts = new ConsumerGroupCommandOptions(cgcArgs) val consumerGroupCommand = createConsumerGroupService(opts) @@ -410,7 +410,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsShiftByLowerThanEarliest() { - val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "-150", "--execute") + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "-150") val opts = new ConsumerGroupCommandOptions(cgcArgs) val consumerGroupCommand = createConsumerGroupService(opts) @@ -431,7 +431,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsShiftByHigherThanLatest() { - val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "150", "--execute") + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "150") val opts = new ConsumerGroupCommandOptions(cgcArgs) val consumerGroupCommand = createConsumerGroupService(opts) @@ -452,7 +452,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsToEarliestOnOneTopic() { - val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", topic1, "--to-earliest", "--execute") + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", topic1, "--to-earliest") val opts = new ConsumerGroupCommandOptions(cgcArgs) val consumerGroupCommand = createConsumerGroupService(opts) @@ -471,7 +471,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsToEarliestOnOneTopicAndPartition() { - val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", String.format("%s:1", topic1), "--to-earliest", "--execute") + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", String.format("%s:1", topic1), "--to-earliest") val opts = new ConsumerGroupCommandOptions(cgcArgs) val consumerGroupCommand = createConsumerGroupService(opts) @@ -494,7 +494,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { "--group", group, "--topic", topic1, "--topic", topic2, - "--to-earliest", "--execute") + "--to-earliest") val opts = new ConsumerGroupCommandOptions(cgcArgs) val consumerGroupCommand = createConsumerGroupService(opts) @@ -521,7 +521,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { "--group", group, "--topic", String.format("%s:1", topic1), "--topic", String.format("%s:1", topic2), - "--to-earliest", "--execute") + "--to-earliest") val opts = new ConsumerGroupCommandOptions(cgcArgs) val consumerGroupCommand = createConsumerGroupService(opts) @@ -563,7 +563,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { }, "Expected the consume all messages and save reset offsets plan to file") - val cgcArgsExec = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--from-file", file.getCanonicalPath) + val cgcArgsExec = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--from-file", file.getCanonicalPath, "--dry-run") val optsExec = new ConsumerGroupCommandOptions(cgcArgsExec) val consumerGroupCommandExec = createConsumerGroupService(optsExec) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java index adb887c8e94..9131007b6bc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java @@ -50,6 +50,10 @@ import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; import java.util.Collections; import java.util.List; import java.util.Properties; @@ -176,7 +180,7 @@ abstract class AbstractResetIntegrationTest { // RESET streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); streams.cleanUp(); - cleanGlobal(null, sslConfig); + cleanGlobal(sslConfig, false, null, null); TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); @@ -194,7 +198,7 @@ abstract class AbstractResetIntegrationTest { TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); - cleanGlobal(null, sslConfig); + cleanGlobal(sslConfig, false, null, null); } void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception { @@ -248,7 +252,7 @@ abstract class AbstractResetIntegrationTest { // RESET streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfiguration); streams.cleanUp(); - cleanGlobal(INTERMEDIATE_USER_TOPIC, sslConfig); + cleanGlobal(sslConfig, true, null, null); TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); @@ -263,8 +267,7 @@ abstract class AbstractResetIntegrationTest { final List> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( resultTopicConsumerConfig, OUTPUT_TOPIC_2_RERUN, - 40 - ); + 40); streams.close(); assertThat(resultRerun, equalTo(result)); @@ -272,11 +275,194 @@ abstract class AbstractResetIntegrationTest { TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); - cleanGlobal(INTERMEDIATE_USER_TOPIC, sslConfig); + cleanGlobal(sslConfig, true, null, null); cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC); } + void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic() throws Exception { + final Properties sslConfig = getClientSslConfig(); + final Properties streamsConfiguration = prepareTest(); + + final Properties resultTopicConsumerConfig = new Properties(); + if (sslConfig != null) { + resultTopicConsumerConfig.putAll(sslConfig); + } + resultTopicConsumerConfig.putAll(TestUtils.consumerConfig( + bootstrapServers, + APP_ID + "-standard-consumer-" + OUTPUT_TOPIC, + LongDeserializer.class, + LongDeserializer.class)); + + // RUN + KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); + streams.start(); + final List> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + resultTopicConsumerConfig, + OUTPUT_TOPIC, + 10); + + streams.close(); + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, + "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); + + // RESET + final File resetFile = File.createTempFile("reset", ".csv"); + try (BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile))) { + writer.write(INPUT_TOPIC + ",0,1"); + writer.close(); + } + + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); + streams.cleanUp(); + + cleanGlobal(sslConfig, false, "--from-file", resetFile.getAbsolutePath()); + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + + assertInternalTopicsGotDeleted(null); + + resetFile.deleteOnExit(); + + // RE-RUN + streams.start(); + final List> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + resultTopicConsumerConfig, + OUTPUT_TOPIC, + 5); + streams.close(); + + result.remove(0); + assertThat(resultRerun, equalTo(result)); + + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + cleanGlobal(sslConfig, false, null, null); + } + + void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic() throws Exception { + final Properties sslConfig = getClientSslConfig(); + final Properties streamsConfiguration = prepareTest(); + + final Properties resultTopicConsumerConfig = new Properties(); + if (sslConfig != null) { + resultTopicConsumerConfig.putAll(sslConfig); + } + resultTopicConsumerConfig.putAll(TestUtils.consumerConfig( + bootstrapServers, + APP_ID + "-standard-consumer-" + OUTPUT_TOPIC, + LongDeserializer.class, + LongDeserializer.class)); + + // RUN + KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); + streams.start(); + final List> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + resultTopicConsumerConfig, + OUTPUT_TOPIC, + 10); + + streams.close(); + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, + "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); + + // RESET + final File resetFile = File.createTempFile("reset", ".csv"); + try (BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile))) { + writer.write(INPUT_TOPIC + ",0,1"); + writer.close(); + } + + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); + streams.cleanUp(); + + + final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); + final Calendar calendar = Calendar.getInstance(); + calendar.add(Calendar.DATE, -1); + + cleanGlobal(sslConfig, false, "--to-datetime", format.format(calendar.getTime())); + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + + assertInternalTopicsGotDeleted(null); + + resetFile.deleteOnExit(); + + // RE-RUN + streams.start(); + final List> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + resultTopicConsumerConfig, + OUTPUT_TOPIC, + 10); + streams.close(); + + assertThat(resultRerun, equalTo(result)); + + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + cleanGlobal(sslConfig, false, null, null); + } + + void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws Exception { + final Properties sslConfig = getClientSslConfig(); + final Properties streamsConfiguration = prepareTest(); + + final Properties resultTopicConsumerConfig = new Properties(); + if (sslConfig != null) { + resultTopicConsumerConfig.putAll(sslConfig); + } + resultTopicConsumerConfig.putAll(TestUtils.consumerConfig( + bootstrapServers, + APP_ID + "-standard-consumer-" + OUTPUT_TOPIC, + LongDeserializer.class, + LongDeserializer.class)); + + // RUN + KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); + streams.start(); + final List> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + resultTopicConsumerConfig, + OUTPUT_TOPIC, + 10); + + streams.close(); + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, + "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); + + // RESET + final File resetFile = File.createTempFile("reset", ".csv"); + try (BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile))) { + writer.write(INPUT_TOPIC + ",0,1"); + writer.close(); + } + + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); + streams.cleanUp(); + cleanGlobal(sslConfig, false, "--by-duration", "PT1M"); + + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + + assertInternalTopicsGotDeleted(null); + + resetFile.deleteOnExit(); + + // RE-RUN + streams.start(); + final List> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + resultTopicConsumerConfig, + OUTPUT_TOPIC, + 10); + streams.close(); + + assertThat(resultRerun, equalTo(result)); + + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + cleanGlobal(sslConfig, false, null, null); + } + private Properties prepareTest() throws IOException { Properties streamsConfiguration = getClientSslConfig(); if (streamsConfiguration == null) { @@ -302,6 +488,10 @@ abstract class AbstractResetIntegrationTest { private void prepareInputData() throws Exception { cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN); + add10InputElements(); + } + + private void add10InputElements() throws java.util.concurrent.ExecutionException, InterruptedException { Properties producerConfig = getClientSslConfig(); if (producerConfig == null) { producerConfig = new Properties(); @@ -379,40 +569,39 @@ abstract class AbstractResetIntegrationTest { return builder.build(); } - private void cleanGlobal(final String intermediateUserTopic, final Properties sslConfig) throws Exception { + private void cleanGlobal(final Properties sslConfig, + final boolean withIntermediateTopics, + final String resetScenario, + final String resetScenarioArg) throws Exception { // leaving --zookeeper arg here to ensure tool works if users add it - final String[] parameters; - if (intermediateUserTopic != null) { - parameters = new String[]{ - "--application-id", APP_ID + testNo, + final List parameterList = new ArrayList<>( + Arrays.asList("--application-id", APP_ID + testNo, "--bootstrap-servers", bootstrapServers, - "--input-topics", INPUT_TOPIC, - "--intermediate-topics", INTERMEDIATE_USER_TOPIC, - "--zookeeper", "localhost:2181" - }; - } else { - if (sslConfig != null) { - final File configFile = TestUtils.tempFile(); - final BufferedWriter writer = new BufferedWriter(new FileWriter(configFile)); - writer.write(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG + "=SSL\n"); - writer.write(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG + "=" + sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG) + "\n"); - writer.write(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG + "=" + sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG) + "\n"); - writer.close(); - - parameters = new String[]{ - "--application-id", APP_ID + testNo, - "--bootstrap-servers", bootstrapServers, - "--input-topics", INPUT_TOPIC, - "--config-file", configFile.getAbsolutePath() - }; - } else { - parameters = new String[]{ - "--application-id", APP_ID + testNo, - "--bootstrap-servers", bootstrapServers, - "--input-topics", INPUT_TOPIC - }; - } + "--input-topics", INPUT_TOPIC)); + if (withIntermediateTopics) { + parameterList.add("--intermediate-topics"); + parameterList.add(INTERMEDIATE_USER_TOPIC); } + if (sslConfig != null) { + final File configFile = TestUtils.tempFile(); + final BufferedWriter writer = new BufferedWriter(new FileWriter(configFile)); + writer.write(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG + "=SSL\n"); + writer.write(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG + "=" + sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG) + "\n"); + writer.write(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG + "=" + sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG) + "\n"); + writer.close(); + + parameterList.add("--config-file"); + parameterList.add(configFile.getAbsolutePath()); + } + if (resetScenario != null) { + parameterList.add(resetScenario); + } + if (resetScenarioArg != null) { + parameterList.add(resetScenarioArg); + } + + final String[] parameters = parameterList.toArray(new String[parameterList.size()]); + final Properties cleanUpConfig = new Properties(); cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index a72bad41b5a..d781d95c9c6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -35,6 +35,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { @ClassRule public static final EmbeddedKafkaCluster CLUSTER; + static { final Properties props = new Properties(); // we double the value passed to `time.sleep` in each iteration in one of the map functions, so we disable @@ -55,13 +56,29 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { beforePrepareTest(); } + + @Test + public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception { + super.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic(); + } + @Test public void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception { super.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(); } @Test - public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception { - super.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic(); + public void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic() throws Exception { + super.testReprocessingFromFileAfterResetWithoutIntermediateUserTopic(); + } + + @Test + public void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic() throws Exception { + super.testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic(); + } + + @Test + public void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws Exception { + super.testReprocessingByDurationAfterResetWithoutIntermediateUserTopic(); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java index e58b18c948f..abf4c384b2e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java @@ -51,6 +51,7 @@ public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest { @ClassRule public static final EmbeddedKafkaCluster CLUSTER; + static { final Properties props = new Properties(); // we double the value passed to `time.sleep` in each iteration in one of the map functions, so we disable @@ -89,5 +90,4 @@ public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest { public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception { super.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic(); } - } diff --git a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java new file mode 100644 index 00000000000..c6b0f5f8252 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tools; + +import kafka.tools.StreamsResetter; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.admin.MockKafkaAdminClientEnv; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.DeleteTopicsResponse; +import org.junit.Before; +import org.junit.Test; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * + */ +public class StreamsResetterTest { + + private static final String TOPIC = "topic1"; + private final StreamsResetter streamsResetter = new StreamsResetter(); + private final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + private final TopicPartition topicPartition = new TopicPartition(TOPIC, 0); + private final Set inputTopicPartitions = new HashSet<>(Collections.singletonList(topicPartition)); + + @Before + public void setUp() { + consumer.assign(Collections.singletonList(topicPartition)); + + consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0L, new byte[] {}, new byte[] {})); + consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1L, new byte[] {}, new byte[] {})); + consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 2L, new byte[] {}, new byte[] {})); + consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 3L, new byte[] {}, new byte[] {})); + consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 4L, new byte[] {}, new byte[] {})); + } + + @Test + public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() { + final Map endOffsets = new HashMap<>(); + endOffsets.put(topicPartition, 4L); + consumer.updateEndOffsets(endOffsets); + + final Map beginningOffsets = new HashMap<>(); + beginningOffsets.put(topicPartition, 0L); + consumer.updateBeginningOffsets(beginningOffsets); + + streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L); + + final ConsumerRecords records = consumer.poll(500); + assertEquals(3, records.count()); + } + + @Test + public void testResetToSpecificOffsetWhenBeforeBeginningOffset() { + final Map endOffsets = new HashMap<>(); + endOffsets.put(topicPartition, 4L); + consumer.updateEndOffsets(endOffsets); + + final Map beginningOffsets = new HashMap<>(); + beginningOffsets.put(topicPartition, 3L); + consumer.updateBeginningOffsets(beginningOffsets); + + streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L); + + final ConsumerRecords records = consumer.poll(500); + assertEquals(2, records.count()); + } + + @Test + public void testResetToSpecificOffsetWhenAfterEndOffset() { + final Map endOffsets = new HashMap<>(); + endOffsets.put(topicPartition, 3L); + consumer.updateEndOffsets(endOffsets); + + final Map beginningOffsets = new HashMap<>(); + beginningOffsets.put(topicPartition, 0L); + consumer.updateBeginningOffsets(beginningOffsets); + + streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 4L); + + final ConsumerRecords records = consumer.poll(500); + assertEquals(2, records.count()); + } + + @Test + public void testShiftOffsetByWhenBetweenBeginningAndEndOffset() { + final Map endOffsets = new HashMap<>(); + endOffsets.put(topicPartition, 4L); + consumer.updateEndOffsets(endOffsets); + + final Map beginningOffsets = new HashMap<>(); + beginningOffsets.put(topicPartition, 0L); + consumer.updateBeginningOffsets(beginningOffsets); + + streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 3L); + + final ConsumerRecords records = consumer.poll(500); + assertEquals(2, records.count()); + } + + @Test + public void testShiftOffsetByWhenBeforeBeginningOffset() { + final Map endOffsets = new HashMap<>(); + endOffsets.put(topicPartition, 4L); + consumer.updateEndOffsets(endOffsets); + + final Map beginningOffsets = new HashMap<>(); + beginningOffsets.put(topicPartition, 0L); + consumer.updateBeginningOffsets(beginningOffsets); + + streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, -3L); + + final ConsumerRecords records = consumer.poll(500); + assertEquals(5, records.count()); + } + + @Test + public void testShiftOffsetByWhenAfterEndOffset() { + final Map endOffsets = new HashMap<>(); + endOffsets.put(topicPartition, 3L); + consumer.updateEndOffsets(endOffsets); + + final Map beginningOffsets = new HashMap<>(); + beginningOffsets.put(topicPartition, 0L); + consumer.updateBeginningOffsets(beginningOffsets); + + streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 5L); + + final ConsumerRecords records = consumer.poll(500); + assertEquals(2, records.count()); + } + + @Test + public void testResetUsingPlanWhenBetweenBeginningAndEndOffset() { + final Map endOffsets = new HashMap<>(); + endOffsets.put(topicPartition, 4L); + consumer.updateEndOffsets(endOffsets); + + final Map beginningOffsets = new HashMap<>(); + beginningOffsets.put(topicPartition, 0L); + consumer.updateBeginningOffsets(beginningOffsets); + + final Map topicPartitionsAndOffset = new HashMap<>(); + topicPartitionsAndOffset.put(topicPartition, 3L); + streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset); + + final ConsumerRecords records = consumer.poll(500); + assertEquals(2, records.count()); + } + + @Test + public void testResetUsingPlanWhenBeforeBeginningOffset() { + final Map endOffsets = new HashMap<>(); + endOffsets.put(topicPartition, 4L); + consumer.updateEndOffsets(endOffsets); + + final Map beginningOffsets = new HashMap<>(); + beginningOffsets.put(topicPartition, 3L); + consumer.updateBeginningOffsets(beginningOffsets); + + final Map topicPartitionsAndOffset = new HashMap<>(); + topicPartitionsAndOffset.put(topicPartition, 1L); + streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset); + + final ConsumerRecords records = consumer.poll(500); + assertEquals(2, records.count()); + } + + @Test + public void testResetUsingPlanWhenAfterEndOffset() { + final Map endOffsets = new HashMap<>(); + endOffsets.put(topicPartition, 3L); + consumer.updateEndOffsets(endOffsets); + + final Map beginningOffsets = new HashMap<>(); + beginningOffsets.put(topicPartition, 0L); + consumer.updateBeginningOffsets(beginningOffsets); + + final Map topicPartitionsAndOffset = new HashMap<>(); + topicPartitionsAndOffset.put(topicPartition, 5L); + streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset); + + final ConsumerRecords records = consumer.poll(500); + assertEquals(2, records.count()); + } + + @Test + public void shouldSeekToEndOffset() { + final Map endOffsets = new HashMap<>(); + endOffsets.put(topicPartition, 3L); + consumer.updateEndOffsets(endOffsets); + + final Map beginningOffsets = new HashMap<>(); + beginningOffsets.put(topicPartition, 0L); + consumer.updateBeginningOffsets(beginningOffsets); + + final Set intermediateTopicPartitions = new HashSet<>(); + intermediateTopicPartitions.add(topicPartition); + streamsResetter.maybeSeekToEnd("g1", consumer, intermediateTopicPartitions); + + final ConsumerRecords records = consumer.poll(500); + assertEquals(2, records.count()); + } + + @Test + public void shouldDeleteTopic() { + Cluster cluster = createCluster(1); + try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) { + env.kafkaClient().setNode(cluster.controller()); + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.emptySet()); + env.kafkaClient().prepareResponse(new DeleteTopicsResponse(Collections.singletonMap(TOPIC, Errors.NONE))); + streamsResetter.doDelete(Collections.singletonList(topicPartition.topic()), env.adminClient()); + } + } + + private Cluster createCluster(int numNodes) { + HashMap nodes = new HashMap<>(); + for (int i = 0; i != numNodes; ++i) { + nodes.put(i, new Node(i, "localhost", 8121 + i)); + } + return new Cluster("mockClusterId", nodes.values(), + Collections.emptySet(), Collections.emptySet(), + Collections.emptySet(), nodes.get(0)); + } + + @Test + public void shouldAcceptValidDateFormats() throws ParseException { + //check valid formats + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")); + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")); + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX")); + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX")); + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")); + } + + @Test + public void shouldThrowOnInvalidDateFormat() throws ParseException { + //check some invalid formats + try { + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")); + fail("Call to getDateTime should fail"); + } catch (final Exception e) { + e.printStackTrace(); + } + + try { + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.X")); + fail("Call to getDateTime should fail"); + } catch (final Exception e) { + e.printStackTrace(); + } + } + + private void invokeGetDateTimeMethod(final SimpleDateFormat format) throws ParseException { + final Date checkpoint = new Date(); + final StreamsResetter streamsResetter = new StreamsResetter(); + final String formattedCheckpoint = format.format(checkpoint); + streamsResetter.getDateTime(formattedCheckpoint); + } +} \ No newline at end of file