mirror of https://github.com/apache/kafka.git
For the Kafka Stream group commands, if delete topic requests fail due to version mismatch, user will have to remove the topics manually by first retrieving the relevant internal topics. To assist the user, the internal topic names are now included as part of the error message, so that the user could delete the internal topics associated with this application directly. Reviewers: TengYao Chi <frankvicky@apache.org>, Alieh Saeedi <asaeedi@confluent.io>
This commit is contained in:
parent
dd82542493
commit
7ea32a0e93
|
@ -496,7 +496,8 @@ public class StreamsGroupCommand {
|
||||||
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
|
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
|
||||||
printError("Deleting internal topics for group '" + groupId + "' failed because the topics do not exist.", Optional.empty());
|
printError("Deleting internal topics for group '" + groupId + "' failed because the topics do not exist.", Optional.empty());
|
||||||
} else if (e.getCause() instanceof UnsupportedVersionException) {
|
} else if (e.getCause() instanceof UnsupportedVersionException) {
|
||||||
printError("Deleting internal topics is not supported by the broker version. " +
|
printError("Deleting internal topics is not supported by the broker version.\n" +
|
||||||
|
"Internal topics: (" + String.join(",", internalTopics) + ").\n" +
|
||||||
"Use 'kafka-topics.sh' to delete the group's internal topics.", Optional.of(e.getCause()));
|
"Use 'kafka-topics.sh' to delete the group's internal topics.", Optional.of(e.getCause()));
|
||||||
} else {
|
} else {
|
||||||
printError("Deleting internal topics for group '" + groupId + "' failed due to " + e.getMessage(), Optional.of(e));
|
printError("Deleting internal topics for group '" + groupId + "' failed due to " + e.getMessage(), Optional.of(e));
|
||||||
|
@ -830,8 +831,19 @@ public class StreamsGroupCommand {
|
||||||
}
|
}
|
||||||
} catch (InterruptedException | ExecutionException e) {
|
} catch (InterruptedException | ExecutionException e) {
|
||||||
if (e.getCause() instanceof UnsupportedVersionException) {
|
if (e.getCause() instanceof UnsupportedVersionException) {
|
||||||
|
try {
|
||||||
|
// Retrieve internal topic list if possible, and add the list of topic names to error message
|
||||||
|
Set<String> allTopics = adminClient.listTopics().names().get();
|
||||||
|
List<String> internalTopics = allTopics.stream()
|
||||||
|
.filter(topic -> groupIds.stream().anyMatch(groupId -> isInferredInternalTopic(topic, groupId)))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
printError("Retrieving internal topics is not supported by the broker version.\n" +
|
||||||
|
"Internal topics: (" + String.join(",", internalTopics) + ").\n" +
|
||||||
|
"Use 'kafka-topics.sh' to delete the group's internal topics.", Optional.of(e.getCause()));
|
||||||
|
} catch (InterruptedException | ExecutionException ex) {
|
||||||
printError("Retrieving internal topics is not supported by the broker version. " +
|
printError("Retrieving internal topics is not supported by the broker version. " +
|
||||||
"Use 'kafka-topics.sh' to list and delete the group's internal topics.", Optional.of(e.getCause()));
|
"Use 'kafka-topics.sh' to list and delete the group's internal topics.", Optional.of(e.getCause()));
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
printError("Retrieving internal topics failed due to " + e.getMessage(), Optional.of(e));
|
printError("Retrieving internal topics failed due to " + e.getMessage(), Optional.of(e));
|
||||||
}
|
}
|
||||||
|
|
|
@ -405,11 +405,20 @@ public class DeleteStreamsGroupTest {
|
||||||
updateStreamsGroupProtocol((short) 0);
|
updateStreamsGroupProtocol((short) 0);
|
||||||
final Map<String, Throwable> result = new HashMap<>();
|
final Map<String, Throwable> result = new HashMap<>();
|
||||||
String output = ToolsTestUtils.grabConsoleOutput(() -> result.putAll(service.deleteGroups()));
|
String output = ToolsTestUtils.grabConsoleOutput(() -> result.putAll(service.deleteGroups()));
|
||||||
|
System.out.println(output);
|
||||||
|
|
||||||
assertTrue(output.contains("Deletion of requested streams groups ('" + appId + "') was successful."),
|
assertTrue(output.contains("Deletion of requested streams groups ('" + appId + "') was successful."),
|
||||||
"The streams group could not be deleted as expected");
|
"The streams group could not be deleted as expected");
|
||||||
assertTrue(output.contains("Retrieving internal topics is not supported by the broker version. " +
|
assertTrue(output.contains("Retrieving internal topics is not supported by the broker version."));
|
||||||
"Use 'kafka-topics.sh' to list and delete the group's internal topics."));
|
assertTrue(output.contains("Use 'kafka-topics.sh' to delete the group's internal topics."));
|
||||||
|
// Validate the list of internal topics in error message
|
||||||
|
assertTrue(output.contains("Internal topics:"));
|
||||||
|
System.out.println(output);
|
||||||
|
assertTrue(
|
||||||
|
output.matches("(?s).*" + APP_ID_PREFIX + "[a-zA-Z0-9\\-]+-(aggregated_value-changelog|repartition|changelog).*"),
|
||||||
|
"The internal topic name does not match the expected format. Output: " + output
|
||||||
|
);
|
||||||
|
|
||||||
assertEquals(1, result.size());
|
assertEquals(1, result.size());
|
||||||
assertTrue(result.containsKey(appId));
|
assertTrue(result.containsKey(appId));
|
||||||
assertNull(result.get(appId), "The streams group could not be deleted as expected");
|
assertNull(result.get(appId), "The streams group could not be deleted as expected");
|
||||||
|
|
Loading…
Reference in New Issue