From 7ea32a0e938c22119f11908aa419aaf0ffd9b6d8 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Mon, 14 Jul 2025 22:52:35 -0500 Subject: [PATCH] KAFKA-19459: List internal topics for the user (#20157) For the Kafka Stream group commands, if delete topic requests fail due to version mismatch, user will have to remove the topics manually by first retrieving the relevant internal topics. To assist the user, the internal topic names are now included as part of the error message, so that the user could delete the internal topics associated with this application directly. Reviewers: TengYao Chi , Alieh Saeedi --- .../tools/streams/StreamsGroupCommand.java | 18 +++++++++++++++--- .../tools/streams/DeleteStreamsGroupTest.java | 13 +++++++++++-- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java index 30de2900830..7ac61e9df9e 100644 --- a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java @@ -496,7 +496,8 @@ public class StreamsGroupCommand { if (e.getCause() instanceof UnknownTopicOrPartitionException) { printError("Deleting internal topics for group '" + groupId + "' failed because the topics do not exist.", Optional.empty()); } else if (e.getCause() instanceof UnsupportedVersionException) { - printError("Deleting internal topics is not supported by the broker version. " + + printError("Deleting internal topics is not supported by the broker version.\n" + + "Internal topics: (" + String.join(",", internalTopics) + ").\n" + "Use 'kafka-topics.sh' to delete the group's internal topics.", Optional.of(e.getCause())); } else { printError("Deleting internal topics for group '" + groupId + "' failed due to " + e.getMessage(), Optional.of(e)); @@ -830,8 +831,19 @@ public class StreamsGroupCommand { } } catch (InterruptedException | ExecutionException e) { if (e.getCause() instanceof UnsupportedVersionException) { - printError("Retrieving internal topics is not supported by the broker version. " + - "Use 'kafka-topics.sh' to list and delete the group's internal topics.", Optional.of(e.getCause())); + try { + // Retrieve internal topic list if possible, and add the list of topic names to error message + Set allTopics = adminClient.listTopics().names().get(); + List internalTopics = allTopics.stream() + .filter(topic -> groupIds.stream().anyMatch(groupId -> isInferredInternalTopic(topic, groupId))) + .collect(Collectors.toList()); + printError("Retrieving internal topics is not supported by the broker version.\n" + + "Internal topics: (" + String.join(",", internalTopics) + ").\n" + + "Use 'kafka-topics.sh' to delete the group's internal topics.", Optional.of(e.getCause())); + } catch (InterruptedException | ExecutionException ex) { + printError("Retrieving internal topics is not supported by the broker version. " + + "Use 'kafka-topics.sh' to list and delete the group's internal topics.", Optional.of(e.getCause())); + } } else { printError("Retrieving internal topics failed due to " + e.getMessage(), Optional.of(e)); } diff --git a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java index 5e00cd5dae3..aa89d8796de 100644 --- a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java @@ -405,11 +405,20 @@ public class DeleteStreamsGroupTest { updateStreamsGroupProtocol((short) 0); final Map result = new HashMap<>(); String output = ToolsTestUtils.grabConsoleOutput(() -> result.putAll(service.deleteGroups())); + System.out.println(output); assertTrue(output.contains("Deletion of requested streams groups ('" + appId + "') was successful."), "The streams group could not be deleted as expected"); - assertTrue(output.contains("Retrieving internal topics is not supported by the broker version. " + - "Use 'kafka-topics.sh' to list and delete the group's internal topics.")); + assertTrue(output.contains("Retrieving internal topics is not supported by the broker version.")); + assertTrue(output.contains("Use 'kafka-topics.sh' to delete the group's internal topics.")); + // Validate the list of internal topics in error message + assertTrue(output.contains("Internal topics:")); + System.out.println(output); + assertTrue( + output.matches("(?s).*" + APP_ID_PREFIX + "[a-zA-Z0-9\\-]+-(aggregated_value-changelog|repartition|changelog).*"), + "The internal topic name does not match the expected format. Output: " + output + ); + assertEquals(1, result.size()); assertTrue(result.containsKey(appId)); assertNull(result.get(appId), "The streams group could not be deleted as expected");