mirror of https://github.com/apache/kafka.git
KAFKA-6435: KIP-623 Add internal topics option to streamResetter (#8923)
Allow user to specify subset of internal topics to clean up with application reset tool Reviewers: Boyang Chen <boyang@confluent.io>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Walker Carlson <wcarlson@confluent.io>
This commit is contained in:
parent
bcfe910356
commit
0ea440b2af
|
@ -95,6 +95,7 @@ public class StreamsResetter {
|
|||
private static OptionSpec<String> applicationIdOption;
|
||||
private static OptionSpec<String> inputTopicsOption;
|
||||
private static OptionSpec<String> intermediateTopicsOption;
|
||||
private static OptionSpec<String> internalTopicsOption;
|
||||
private static OptionSpec<Long> toOffsetOption;
|
||||
private static OptionSpec<String> toDatetimeOption;
|
||||
private static OptionSpec<String> byDurationOption;
|
||||
|
@ -114,7 +115,8 @@ public class StreamsResetter {
|
|||
+ "intermediate topics (topics that are input and output topics, e.g., used by deprecated through() method).\n"
|
||||
+ "* This tool deletes the internal topics that were created by Kafka Streams (topics starting with "
|
||||
+ "\"<application.id>-\").\n"
|
||||
+ "You do not need to specify internal topics because the tool finds them automatically.\n"
|
||||
+ "The tool finds these internal topics automatically. If the topics flagged automatically for deletion by "
|
||||
+ "the dry-run are unsuitable, you can specify a subset with the \"--internal-topics\" option.\n"
|
||||
+ "* This tool will not delete output topics (if you want to delete them, you need to do it yourself "
|
||||
+ "with the bin/kafka-topics.sh command).\n"
|
||||
+ "* This tool will not clean up the local state on the stream application instances (the persisted "
|
||||
|
@ -126,7 +128,9 @@ public class StreamsResetter {
|
|||
+ "members immediately. Make sure to stop all stream applications when this option is specified "
|
||||
+ "to avoid unexpected disruptions.\n\n"
|
||||
+ "*** Important! You will get wrong output if you don't clean up the local stores after running the "
|
||||
+ "reset tool!\n\n";
|
||||
+ "reset tool!\n\n"
|
||||
+ "*** Warning! This tool makes irreversible changes to your application. It is strongly recommended that "
|
||||
+ "you run this once with \"--dry-run\" to preview your changes before making them.\n\n";
|
||||
|
||||
private OptionSet options = null;
|
||||
private final List<String> allTopics = new LinkedList<>();
|
||||
|
@ -166,7 +170,7 @@ public class StreamsResetter {
|
|||
final HashMap<Object, Object> consumerConfig = new HashMap<>(config);
|
||||
consumerConfig.putAll(properties);
|
||||
exitCode = maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, dryRun);
|
||||
maybeDeleteInternalTopics(adminClient, dryRun);
|
||||
exitCode |= maybeDeleteInternalTopics(adminClient, dryRun);
|
||||
} catch (final Throwable e) {
|
||||
exitCode = EXIT_CODE_ERROR;
|
||||
System.err.println("ERROR: " + e);
|
||||
|
@ -224,6 +228,13 @@ public class StreamsResetter {
|
|||
.ofType(String.class)
|
||||
.withValuesSeparatedBy(',')
|
||||
.describedAs("list");
|
||||
internalTopicsOption = optionParser.accepts("internal-topics", "Comma-separated list of "
|
||||
+ "internal topics to delete. Must be a subset of the internal topics marked for deletion by the "
|
||||
+ "default behaviour (do a dry-run without this option to view these topics).")
|
||||
.withRequiredArg()
|
||||
.ofType(String.class)
|
||||
.withValuesSeparatedBy(',')
|
||||
.describedAs("list");
|
||||
toOffsetOption = optionParser.accepts("to-offset", "Reset offsets to a specific offset.")
|
||||
.withRequiredArg()
|
||||
.ofType(Long.class);
|
||||
|
@ -608,22 +619,35 @@ public class StreamsResetter {
|
|||
return options.valuesOf(intermediateTopicsOption).contains(topic);
|
||||
}
|
||||
|
||||
private void maybeDeleteInternalTopics(final Admin adminClient, final boolean dryRun) {
|
||||
System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption));
|
||||
final List<String> topicsToDelete = new ArrayList<>();
|
||||
for (final String listing : allTopics) {
|
||||
if (isInternalTopic(listing)) {
|
||||
if (!dryRun) {
|
||||
topicsToDelete.add(listing);
|
||||
} else {
|
||||
System.out.println("Topic: " + listing);
|
||||
}
|
||||
private int maybeDeleteInternalTopics(final Admin adminClient, final boolean dryRun) {
|
||||
final List<String> inferredInternalTopics = allTopics.stream()
|
||||
.filter(this::isInferredInternalTopic)
|
||||
.collect(Collectors.toList());
|
||||
final List<String> specifiedInternalTopics = options.valuesOf(internalTopicsOption);
|
||||
final List<String> topicsToDelete;
|
||||
|
||||
if (!specifiedInternalTopics.isEmpty()) {
|
||||
if (!inferredInternalTopics.containsAll(specifiedInternalTopics)) {
|
||||
throw new IllegalArgumentException("Invalid topic specified in the "
|
||||
+ "--internal-topics option. "
|
||||
+ "Ensure that the topics specified are all internal topics. "
|
||||
+ "Do a dry run without the --internal-topics option to see the "
|
||||
+ "list of all internal topics that can be deleted.");
|
||||
}
|
||||
|
||||
topicsToDelete = specifiedInternalTopics;
|
||||
System.out.println("Deleting specified internal topics " + topicsToDelete);
|
||||
} else {
|
||||
topicsToDelete = inferredInternalTopics;
|
||||
System.out.println("Deleting inferred internal topics " + topicsToDelete);
|
||||
}
|
||||
|
||||
if (!dryRun) {
|
||||
doDelete(topicsToDelete, adminClient);
|
||||
}
|
||||
|
||||
System.out.println("Done.");
|
||||
return EXIT_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// visible for testing
|
||||
|
@ -647,7 +671,7 @@ public class StreamsResetter {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean isInternalTopic(final String topicName) {
|
||||
private boolean isInferredInternalTopic(final String topicName) {
|
||||
// Specified input/intermediate topics might be named like internal topics (by chance).
|
||||
// Even is this is not expected in general, we need to exclude those topics here
|
||||
// and don't consider them as internal topics even if they follow the same naming schema.
|
||||
|
@ -657,7 +681,7 @@ public class StreamsResetter {
|
|||
}
|
||||
|
||||
// visible for testing
|
||||
public boolean matchesInternalTopicFormat(final String topicName) {
|
||||
public static boolean matchesInternalTopicFormat(final String topicName) {
|
||||
return topicName.endsWith("-changelog") || topicName.endsWith("-repartition")
|
||||
|| topicName.endsWith("-subscription-registration-topic")
|
||||
|| topicName.endsWith("-subscription-response-topic")
|
||||
|
|
|
@ -77,6 +77,7 @@
|
|||
<div class="section" id="step-1-run-the-application-reset-tool">
|
||||
<h2>Step 1: Run the application reset tool<a class="headerlink" href="#step-1-run-the-application-reset-tool" title="Permalink to this headline"></a></h2>
|
||||
<p>Invoke the application reset tool from the command line</p>
|
||||
<p>Warning! This tool makes irreversible changes to your application. It is strongly recommended that you run this once with <code class="docutils literal"><span class="pre">--dry-run</span></code> to preview your changes before making them.</p>
|
||||
<div class="highlight-bash"><div class="highlight"><pre><span></span><code><path-to-kafka>/bin/kafka-streams-application-reset</code></pre></div>
|
||||
</div>
|
||||
<p>The tool accepts the following parameters:</p>
|
||||
|
@ -105,6 +106,11 @@
|
|||
topics <span class="o">(</span>topics used in the through<span class="o">()</span>
|
||||
method<span class="o">)</span>. For these topics, the tool
|
||||
will skip to the end.
|
||||
--internal-topics <String: list> Comma-separated list of internal topics
|
||||
to delete. Must be a subset of the
|
||||
internal topics marked for deletion by
|
||||
the default behaviour (do a dry-run without
|
||||
this option to view these topics).
|
||||
--shift-by <Long: number-of-offsets> Reset offsets shifting current offset by
|
||||
'n', where 'n' can be positive or
|
||||
negative
|
||||
|
|
|
@ -60,6 +60,7 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.time.Duration.ofMillis;
|
||||
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
|
||||
|
@ -77,7 +78,7 @@ public abstract class AbstractResetIntegrationTest {
|
|||
abstract Map<String, Object> getClientSslConfig();
|
||||
|
||||
@Rule
|
||||
public final TestName testName = new TestName();
|
||||
public final TestName testName = new TestName();
|
||||
|
||||
@AfterClass
|
||||
public static void afterClassCleanup() {
|
||||
|
@ -205,6 +206,34 @@ public abstract class AbstractResetIntegrationTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResetWhenInternalTopicsAreSpecified() throws Exception {
|
||||
final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
|
||||
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
|
||||
|
||||
// RUN
|
||||
streams = new KafkaStreams(setupTopologyWithIntermediateTopic(true, OUTPUT_TOPIC_2), streamsConfig);
|
||||
streams.start();
|
||||
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
|
||||
|
||||
streams.close();
|
||||
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
|
||||
|
||||
// RESET
|
||||
streams.cleanUp();
|
||||
|
||||
final List<String> internalTopics = cluster.getAllTopicsInCluster().stream()
|
||||
.filter(topic -> StreamsResetter.matchesInternalTopicFormat(topic))
|
||||
.collect(Collectors.toList());
|
||||
cleanGlobal(false,
|
||||
"--internal-topics",
|
||||
String.join(",", internalTopics.subList(1, internalTopics.size())),
|
||||
appID);
|
||||
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
|
||||
|
||||
assertInternalTopicsGotDeleted(internalTopics.get(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
|
||||
final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
|
||||
|
@ -358,7 +387,6 @@ public abstract class AbstractResetIntegrationTest {
|
|||
final String resetScenario,
|
||||
final String resetScenarioArg,
|
||||
final String appID) throws Exception {
|
||||
// leaving --zookeeper arg here to ensure tool works if users add it
|
||||
final List<String> parameterList = new ArrayList<>(
|
||||
Arrays.asList("--application-id", appID,
|
||||
"--bootstrap-servers", cluster.bootstrapServers(),
|
||||
|
@ -405,11 +433,11 @@ public abstract class AbstractResetIntegrationTest {
|
|||
Assert.assertTrue(cleanResult);
|
||||
}
|
||||
|
||||
protected void assertInternalTopicsGotDeleted(final String intermediateUserTopic) throws Exception {
|
||||
protected void assertInternalTopicsGotDeleted(final String additionalExistingTopic) throws Exception {
|
||||
// do not use list topics request, but read from the embedded cluster's zookeeper path directly to confirm
|
||||
if (intermediateUserTopic != null) {
|
||||
if (additionalExistingTopic != null) {
|
||||
cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN,
|
||||
Topic.GROUP_METADATA_TOPIC_NAME, intermediateUserTopic);
|
||||
Topic.GROUP_METADATA_TOPIC_NAME, additionalExistingTopic);
|
||||
} else {
|
||||
cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN,
|
||||
Topic.GROUP_METADATA_TOPIC_NAME);
|
||||
|
|
|
@ -151,6 +151,38 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
|
|||
Assert.assertEquals(1, exitCode);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotAllowToResetWhenSpecifiedInternalTopicDoesNotExist() {
|
||||
final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
|
||||
final String[] parameters = new String[] {
|
||||
"--application-id", appID,
|
||||
"--bootstrap-servers", cluster.bootstrapServers(),
|
||||
"--internal-topics", NON_EXISTING_TOPIC
|
||||
};
|
||||
final Properties cleanUpConfig = new Properties();
|
||||
cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
|
||||
cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(CLEANUP_CONSUMER_TIMEOUT));
|
||||
|
||||
final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
|
||||
Assert.assertEquals(1, exitCode);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotAllowToResetWhenSpecifiedInternalTopicIsNotInternal() {
|
||||
final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
|
||||
final String[] parameters = new String[] {
|
||||
"--application-id", appID,
|
||||
"--bootstrap-servers", cluster.bootstrapServers(),
|
||||
"--internal-topics", INPUT_TOPIC
|
||||
};
|
||||
final Properties cleanUpConfig = new Properties();
|
||||
cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
|
||||
cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(CLEANUP_CONSUMER_TIMEOUT));
|
||||
|
||||
final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
|
||||
Assert.assertEquals(1, exitCode);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() throws Exception {
|
||||
final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
|
||||
|
|
|
@ -265,10 +265,10 @@ public class StreamsResetterTest {
|
|||
|
||||
@Test
|
||||
public void shouldDetermineInternalTopicBasedOnTopicName1() {
|
||||
assertTrue(streamsResetter.matchesInternalTopicFormat("appId-named-subscription-response-topic"));
|
||||
assertTrue(streamsResetter.matchesInternalTopicFormat("appId-named-subscription-registration-topic"));
|
||||
assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-12323232-topic"));
|
||||
assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-12323232-topic"));
|
||||
assertTrue(StreamsResetter.matchesInternalTopicFormat("appId-named-subscription-response-topic"));
|
||||
assertTrue(StreamsResetter.matchesInternalTopicFormat("appId-named-subscription-registration-topic"));
|
||||
assertTrue(StreamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-12323232-topic"));
|
||||
assertTrue(StreamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-12323232-topic"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue