diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 485bcdf838f..8c5150ab3c8 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -95,6 +95,7 @@ public class StreamsResetter { private static OptionSpec applicationIdOption; private static OptionSpec inputTopicsOption; private static OptionSpec intermediateTopicsOption; + private static OptionSpec internalTopicsOption; private static OptionSpec toOffsetOption; private static OptionSpec toDatetimeOption; private static OptionSpec byDurationOption; @@ -114,7 +115,8 @@ public class StreamsResetter { + "intermediate topics (topics that are input and output topics, e.g., used by deprecated through() method).\n" + "* This tool deletes the internal topics that were created by Kafka Streams (topics starting with " + "\"-\").\n" - + "You do not need to specify internal topics because the tool finds them automatically.\n" + + "The tool finds these internal topics automatically. If the topics flagged automatically for deletion by " + + "the dry-run are unsuitable, you can specify a subset with the \"--internal-topics\" option.\n" + "* This tool will not delete output topics (if you want to delete them, you need to do it yourself " + "with the bin/kafka-topics.sh command).\n" + "* This tool will not clean up the local state on the stream application instances (the persisted " @@ -126,7 +128,9 @@ public class StreamsResetter { + "members immediately. Make sure to stop all stream applications when this option is specified " + "to avoid unexpected disruptions.\n\n" + "*** Important! You will get wrong output if you don't clean up the local stores after running the " - + "reset tool!\n\n"; + + "reset tool!\n\n" + + "*** Warning! This tool makes irreversible changes to your application. It is strongly recommended that " + + "you run this once with \"--dry-run\" to preview your changes before making them.\n\n"; private OptionSet options = null; private final List allTopics = new LinkedList<>(); @@ -166,7 +170,7 @@ public class StreamsResetter { final HashMap consumerConfig = new HashMap<>(config); consumerConfig.putAll(properties); exitCode = maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, dryRun); - maybeDeleteInternalTopics(adminClient, dryRun); + exitCode |= maybeDeleteInternalTopics(adminClient, dryRun); } catch (final Throwable e) { exitCode = EXIT_CODE_ERROR; System.err.println("ERROR: " + e); @@ -224,6 +228,13 @@ public class StreamsResetter { .ofType(String.class) .withValuesSeparatedBy(',') .describedAs("list"); + internalTopicsOption = optionParser.accepts("internal-topics", "Comma-separated list of " + + "internal topics to delete. Must be a subset of the internal topics marked for deletion by the " + + "default behaviour (do a dry-run without this option to view these topics).") + .withRequiredArg() + .ofType(String.class) + .withValuesSeparatedBy(',') + .describedAs("list"); toOffsetOption = optionParser.accepts("to-offset", "Reset offsets to a specific offset.") .withRequiredArg() .ofType(Long.class); @@ -608,22 +619,35 @@ public class StreamsResetter { return options.valuesOf(intermediateTopicsOption).contains(topic); } - private void maybeDeleteInternalTopics(final Admin adminClient, final boolean dryRun) { - System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption)); - final List topicsToDelete = new ArrayList<>(); - for (final String listing : allTopics) { - if (isInternalTopic(listing)) { - if (!dryRun) { - topicsToDelete.add(listing); - } else { - System.out.println("Topic: " + listing); - } + private int maybeDeleteInternalTopics(final Admin adminClient, final boolean dryRun) { + final List inferredInternalTopics = allTopics.stream() + .filter(this::isInferredInternalTopic) + .collect(Collectors.toList()); + final List specifiedInternalTopics = options.valuesOf(internalTopicsOption); + final List topicsToDelete; + + if (!specifiedInternalTopics.isEmpty()) { + if (!inferredInternalTopics.containsAll(specifiedInternalTopics)) { + throw new IllegalArgumentException("Invalid topic specified in the " + + "--internal-topics option. " + + "Ensure that the topics specified are all internal topics. " + + "Do a dry run without the --internal-topics option to see the " + + "list of all internal topics that can be deleted."); } + + topicsToDelete = specifiedInternalTopics; + System.out.println("Deleting specified internal topics " + topicsToDelete); + } else { + topicsToDelete = inferredInternalTopics; + System.out.println("Deleting inferred internal topics " + topicsToDelete); } + if (!dryRun) { doDelete(topicsToDelete, adminClient); } + System.out.println("Done."); + return EXIT_CODE_SUCCESS; } // visible for testing @@ -647,7 +671,7 @@ public class StreamsResetter { } } - private boolean isInternalTopic(final String topicName) { + private boolean isInferredInternalTopic(final String topicName) { // Specified input/intermediate topics might be named like internal topics (by chance). // Even is this is not expected in general, we need to exclude those topics here // and don't consider them as internal topics even if they follow the same naming schema. @@ -657,7 +681,7 @@ public class StreamsResetter { } // visible for testing - public boolean matchesInternalTopicFormat(final String topicName) { + public static boolean matchesInternalTopicFormat(final String topicName) { return topicName.endsWith("-changelog") || topicName.endsWith("-repartition") || topicName.endsWith("-subscription-registration-topic") || topicName.endsWith("-subscription-response-topic") diff --git a/docs/streams/developer-guide/app-reset-tool.html b/docs/streams/developer-guide/app-reset-tool.html index 1b90af0cb80..d6d07c27c97 100644 --- a/docs/streams/developer-guide/app-reset-tool.html +++ b/docs/streams/developer-guide/app-reset-tool.html @@ -77,6 +77,7 @@

Step 1: Run the application reset tool

Invoke the application reset tool from the command line

+

Warning! This tool makes irreversible changes to your application. It is strongly recommended that you run this once with --dry-run to preview your changes before making them.

<path-to-kafka>/bin/kafka-streams-application-reset

The tool accepts the following parameters:

@@ -105,6 +106,11 @@ topics (topics used in the through() method). For these topics, the tool will skip to the end. +--internal-topics <String: list> Comma-separated list of internal topics + to delete. Must be a subset of the + internal topics marked for deletion by + the default behaviour (do a dry-run without + this option to view these topics). --shift-by <Long: number-of-offsets> Reset offsets shifting current offset by 'n', where 'n' can be positive or negative diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java index 76c53b15443..e672ca1f8f3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java @@ -60,6 +60,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.stream.Collectors; import static java.time.Duration.ofMillis; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup; @@ -77,7 +78,7 @@ public abstract class AbstractResetIntegrationTest { abstract Map getClientSslConfig(); @Rule - public final TestName testName = new TestName(); + public final TestName testName = new TestName(); @AfterClass public static void afterClassCleanup() { @@ -205,6 +206,34 @@ public abstract class AbstractResetIntegrationTest { } } + @Test + public void testResetWhenInternalTopicsAreSpecified() throws Exception { + final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); + + // RUN + streams = new KafkaStreams(setupTopologyWithIntermediateTopic(true, OUTPUT_TOPIC_2), streamsConfig); + streams.start(); + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); + + streams.close(); + waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT); + + // RESET + streams.cleanUp(); + + final List internalTopics = cluster.getAllTopicsInCluster().stream() + .filter(topic -> StreamsResetter.matchesInternalTopicFormat(topic)) + .collect(Collectors.toList()); + cleanGlobal(false, + "--internal-topics", + String.join(",", internalTopics.subList(1, internalTopics.size())), + appID); + waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT); + + assertInternalTopicsGotDeleted(internalTopics.get(0)); + } + @Test public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception { final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); @@ -358,7 +387,6 @@ public abstract class AbstractResetIntegrationTest { final String resetScenario, final String resetScenarioArg, final String appID) throws Exception { - // leaving --zookeeper arg here to ensure tool works if users add it final List parameterList = new ArrayList<>( Arrays.asList("--application-id", appID, "--bootstrap-servers", cluster.bootstrapServers(), @@ -405,11 +433,11 @@ public abstract class AbstractResetIntegrationTest { Assert.assertTrue(cleanResult); } - protected void assertInternalTopicsGotDeleted(final String intermediateUserTopic) throws Exception { + protected void assertInternalTopicsGotDeleted(final String additionalExistingTopic) throws Exception { // do not use list topics request, but read from the embedded cluster's zookeeper path directly to confirm - if (intermediateUserTopic != null) { + if (additionalExistingTopic != null) { cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, - Topic.GROUP_METADATA_TOPIC_NAME, intermediateUserTopic); + Topic.GROUP_METADATA_TOPIC_NAME, additionalExistingTopic); } else { cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, Topic.GROUP_METADATA_TOPIC_NAME); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index f355afaa728..5c236e6ba80 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -151,6 +151,38 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { Assert.assertEquals(1, exitCode); } + @Test + public void shouldNotAllowToResetWhenSpecifiedInternalTopicDoesNotExist() { + final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); + final String[] parameters = new String[] { + "--application-id", appID, + "--bootstrap-servers", cluster.bootstrapServers(), + "--internal-topics", NON_EXISTING_TOPIC + }; + final Properties cleanUpConfig = new Properties(); + cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); + cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(CLEANUP_CONSUMER_TIMEOUT)); + + final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig); + Assert.assertEquals(1, exitCode); + } + + @Test + public void shouldNotAllowToResetWhenSpecifiedInternalTopicIsNotInternal() { + final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); + final String[] parameters = new String[] { + "--application-id", appID, + "--bootstrap-servers", cluster.bootstrapServers(), + "--internal-topics", INPUT_TOPIC + }; + final Properties cleanUpConfig = new Properties(); + cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); + cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(CLEANUP_CONSUMER_TIMEOUT)); + + final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig); + Assert.assertEquals(1, exitCode); + } + @Test public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() throws Exception { final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); diff --git a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java index 0383c3c3797..d4f78410001 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java @@ -265,10 +265,10 @@ public class StreamsResetterTest { @Test public void shouldDetermineInternalTopicBasedOnTopicName1() { - assertTrue(streamsResetter.matchesInternalTopicFormat("appId-named-subscription-response-topic")); - assertTrue(streamsResetter.matchesInternalTopicFormat("appId-named-subscription-registration-topic")); - assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-12323232-topic")); - assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-12323232-topic")); + assertTrue(StreamsResetter.matchesInternalTopicFormat("appId-named-subscription-response-topic")); + assertTrue(StreamsResetter.matchesInternalTopicFormat("appId-named-subscription-registration-topic")); + assertTrue(StreamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-12323232-topic")); + assertTrue(StreamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-12323232-topic")); } @Test