diff --git a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java index 30de2900830..7ac61e9df9e 100644 --- a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java @@ -496,7 +496,8 @@ public class StreamsGroupCommand { if (e.getCause() instanceof UnknownTopicOrPartitionException) { printError("Deleting internal topics for group '" + groupId + "' failed because the topics do not exist.", Optional.empty()); } else if (e.getCause() instanceof UnsupportedVersionException) { - printError("Deleting internal topics is not supported by the broker version. " + + printError("Deleting internal topics is not supported by the broker version.\n" + + "Internal topics: (" + String.join(",", internalTopics) + ").\n" + "Use 'kafka-topics.sh' to delete the group's internal topics.", Optional.of(e.getCause())); } else { printError("Deleting internal topics for group '" + groupId + "' failed due to " + e.getMessage(), Optional.of(e)); @@ -830,8 +831,19 @@ public class StreamsGroupCommand { } } catch (InterruptedException | ExecutionException e) { if (e.getCause() instanceof UnsupportedVersionException) { - printError("Retrieving internal topics is not supported by the broker version. " + - "Use 'kafka-topics.sh' to list and delete the group's internal topics.", Optional.of(e.getCause())); + try { + // Retrieve internal topic list if possible, and add the list of topic names to error message + Set allTopics = adminClient.listTopics().names().get(); + List internalTopics = allTopics.stream() + .filter(topic -> groupIds.stream().anyMatch(groupId -> isInferredInternalTopic(topic, groupId))) + .collect(Collectors.toList()); + printError("Retrieving internal topics is not supported by the broker version.\n" + + "Internal topics: (" + String.join(",", internalTopics) + ").\n" + + "Use 'kafka-topics.sh' to delete the group's internal topics.", Optional.of(e.getCause())); + } catch (InterruptedException | ExecutionException ex) { + printError("Retrieving internal topics is not supported by the broker version. " + + "Use 'kafka-topics.sh' to list and delete the group's internal topics.", Optional.of(e.getCause())); + } } else { printError("Retrieving internal topics failed due to " + e.getMessage(), Optional.of(e)); } diff --git a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java index 5e00cd5dae3..aa89d8796de 100644 --- a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java @@ -405,11 +405,20 @@ public class DeleteStreamsGroupTest { updateStreamsGroupProtocol((short) 0); final Map result = new HashMap<>(); String output = ToolsTestUtils.grabConsoleOutput(() -> result.putAll(service.deleteGroups())); + System.out.println(output); assertTrue(output.contains("Deletion of requested streams groups ('" + appId + "') was successful."), "The streams group could not be deleted as expected"); - assertTrue(output.contains("Retrieving internal topics is not supported by the broker version. " + - "Use 'kafka-topics.sh' to list and delete the group's internal topics.")); + assertTrue(output.contains("Retrieving internal topics is not supported by the broker version.")); + assertTrue(output.contains("Use 'kafka-topics.sh' to delete the group's internal topics.")); + // Validate the list of internal topics in error message + assertTrue(output.contains("Internal topics:")); + System.out.println(output); + assertTrue( + output.matches("(?s).*" + APP_ID_PREFIX + "[a-zA-Z0-9\\-]+-(aggregated_value-changelog|repartition|changelog).*"), + "The internal topic name does not match the expected format. Output: " + output + ); + assertEquals(1, result.size()); assertTrue(result.containsKey(appId)); assertNull(result.get(appId), "The streams group could not be deleted as expected");