From db1c6f31a3f0ba1ef403b9906402afb507119ad0 Mon Sep 17 00:00:00 2001 From: Alieh Saeedi <107070585+aliehsaeedii@users.noreply.github.com> Date: Tue, 8 Jul 2025 15:28:56 +0200 Subject: [PATCH] KAFKA-18288: Fix Streams CLI describe (#20099) This PR includes the following fixes: - Streams CLI used to list and return the description of the first group which is a bug. With this fix, it returns the descriptions of the groups specified by the `--group` or `all-groups`. Integration test are added to verify the fix. - `timeoutOption` is missing in describe groups. This fix adds and tests it with short timeout. - `DescribeStreamsGroupsHandler` used to return an empty group in `DEAD` state when the group id was not found, but with this fix, it throws `GroupIdNotFoundException` --- .../DescribeStreamsGroupsHandler.java | 16 +- .../tools/streams/StreamsGroupCommand.java | 31 +-- .../streams/DescribeStreamsGroupTest.java | 217 +++++++++++++++--- .../streams/StreamsGroupCommandTest.java | 3 +- 4 files changed, 212 insertions(+), 55 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java index 8355a78b9d4..9c037d7dd46 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java @@ -36,7 +36,6 @@ import org.slf4j.Logger; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -259,22 +258,9 @@ public class DescribeStreamsGroupsHandler extends AdminApiHandler.Batched groups = listStreamsGroups(); - if (!groups.isEmpty()) { - StreamsGroupDescription description = getDescribeGroup(groups.get(0)); - if (description == null) - return; - boolean verbose = opts.options.has(opts.verboseOpt); - if (opts.options.has(opts.membersOpt)) { - printMembers(description, verbose); - } else if (opts.options.has(opts.stateOpt)) { - printStates(description, verbose); - } else { - printOffsets(description, verbose); + List groupIds = opts.options.has(opts.allGroupsOpt) + ? new ArrayList<>(listStreamsGroups()) + : new ArrayList<>(opts.options.valuesOf(opts.groupOpt)); + if (!groupIds.isEmpty()) { + for (String groupId : groupIds) { + StreamsGroupDescription description = getDescribeGroup(groupId); + boolean verbose = opts.options.has(opts.verboseOpt); + if (opts.options.has(opts.membersOpt)) { + printMembers(description, verbose); + } else if (opts.options.has(opts.stateOpt)) { + printStates(description, verbose); + } else { + printOffsets(description, verbose); + } } } } StreamsGroupDescription getDescribeGroup(String group) throws ExecutionException, InterruptedException { - DescribeStreamsGroupsResult result = adminClient.describeStreamsGroups(List.of(group)); + DescribeStreamsGroupsResult result = adminClient.describeStreamsGroups( + List.of(group), + new DescribeStreamsGroupsOptions().timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())); Map descriptionMap = result.all().get(); return descriptionMap.get(group); } diff --git a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java index 0f3515b552b..05bf36cbc34 100644 --- a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java @@ -18,7 +18,9 @@ package org.apache.kafka.tools.streams; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; import org.apache.kafka.streams.GroupProtocol; import org.apache.kafka.streams.KafkaStreams; @@ -39,15 +41,24 @@ import org.junit.jupiter.api.Timeout; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import joptsimple.OptionException; + import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; @Timeout(600) @Tag("integration") @@ -55,9 +66,13 @@ public class DescribeStreamsGroupTest { public static EmbeddedKafkaCluster cluster = null; static KafkaStreams streams; private static final String APP_ID = "streams-group-command-test"; + private static final String APP_ID_2 = "streams-group-command-test-2"; + private static final String INPUT_TOPIC = "customInputTopic"; private static final String OUTPUT_TOPIC = "customOutputTopic"; - + private static final String INPUT_TOPIC_2 = "customInputTopic2"; + private static final String OUTPUT_TOPIC_2 = "customOutputTopic2"; + private static String bootstrapServers; @BeforeAll public static void setup() throws Exception { // start the cluster and create the input topic @@ -66,20 +81,12 @@ public class DescribeStreamsGroupTest { cluster = new EmbeddedKafkaCluster(1, props); cluster.start(); cluster.createTopic(INPUT_TOPIC, 2, 1); + bootstrapServers = cluster.bootstrapServers(); // start kafka streams - Properties streamsProp = new Properties(); - streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsProp.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); - streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID); - streamsProp.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); - streamsProp.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault())); - - streams = new KafkaStreams(topology(), streamsProp); + Properties streamsProp = streamsProp(APP_ID); + streams = new KafkaStreams(topology(INPUT_TOPIC, OUTPUT_TOPIC), streamsProp); startApplicationAndWaitUntilRunning(streams); } @@ -90,6 +97,28 @@ public class DescribeStreamsGroupTest { cluster = null; } + @Test + public void testDescribeWithUnrecognizedOption() { + String[] args = new String[]{"--unrecognized-option", "--bootstrap-server", bootstrapServers, "--describe", "--group", APP_ID}; + assertThrows(OptionException.class, () -> getStreamsGroupService(args)); + } + + @Test + public void testDescribeWithoutGroupOption() { + final String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--describe"}; + AtomicBoolean exited = new AtomicBoolean(false); + Exit.setExitProcedure(((statusCode, message) -> { + assertNotEquals(0, statusCode); + assertTrue(message.contains("Option [describe] takes one of these options: [all-groups], [group]")); + exited.set(true); + })); + try { + getStreamsGroupService(args); + } finally { + assertTrue(exited.get()); + } + } + @Test public void testDescribeStreamsGroup() throws Exception { final List expectedHeader = List.of("GROUP", "TOPIC", "PARTITION", "OFFSET-LAG"); @@ -100,10 +129,10 @@ public class DescribeStreamsGroupTest { List.of(APP_ID, "streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition", "1", "0")); validateDescribeOutput( - Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--group", APP_ID), expectedHeader, expectedRows, List.of()); + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--group", APP_ID), expectedHeader, expectedRows, List.of()); // --describe --offsets has the same output as --describe validateDescribeOutput( - Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--offsets", "--group", APP_ID), expectedHeader, expectedRows, List.of()); + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--offsets", "--group", APP_ID), expectedHeader, expectedRows, List.of()); } @Test @@ -116,12 +145,12 @@ public class DescribeStreamsGroupTest { List.of(APP_ID, "streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition", "1", "-", "-", "0", "0")); validateDescribeOutput( - Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, List.of()); + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, List.of()); // --describe --offsets has the same output as --describe validateDescribeOutput( - Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--offsets", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, List.of()); + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--offsets", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, List.of()); validateDescribeOutput( - Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose", "--offsets", "--group", APP_ID), expectedHeader, expectedRows, List.of()); + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--offsets", "--group", APP_ID), expectedHeader, expectedRows, List.of()); } @Test @@ -131,7 +160,7 @@ public class DescribeStreamsGroupTest { // The coordinator is not deterministic, so we don't care about it. final List dontCares = List.of(1, 2); validateDescribeOutput( - Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--state", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--state", "--group", APP_ID), expectedHeader, expectedRows, dontCares); } @Test @@ -142,9 +171,9 @@ public class DescribeStreamsGroupTest { final List dontCares = List.of(1, 2); validateDescribeOutput( - Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--state", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--state", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, dontCares); validateDescribeOutput( - Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose", "--state", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--state", "--group", APP_ID), expectedHeader, expectedRows, dontCares); } @Test @@ -157,7 +186,7 @@ public class DescribeStreamsGroupTest { final List dontCares = List.of(1, 2, 3); validateDescribeOutput( - Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--members", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--members", "--group", APP_ID), expectedHeader, expectedRows, dontCares); } @Test @@ -170,21 +199,87 @@ public class DescribeStreamsGroupTest { final List dontCares = List.of(3, 6, 7); validateDescribeOutput( - Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--members", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, dontCares); validateDescribeOutput( - Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose", "--members", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--group", APP_ID), expectedHeader, expectedRows, dontCares); } - private static Topology topology() { + @Test + public void testDescribeMultipleStreamsGroupWithMembersAndVerboseOptions() throws Exception { + KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2, OUTPUT_TOPIC_2), streamsProp(APP_ID_2)); + startApplicationAndWaitUntilRunning(streams2); + + final List expectedHeader = List.of("GROUP", "TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS"); + final Set> expectedRows1 = Set.of( + List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "0:[0,1];", "TARGET-ACTIVE:", "0:[0,1];"), + List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "1:[0,1];", "TARGET-ACTIVE:", "1:[0,1];")); + final Set> expectedRows2 = Set.of( + List.of(APP_ID_2, "2", "0", "dont-care", "streams", "2", "", ""), + List.of(APP_ID_2, "2", "0", "", "streams", "2", "", "")); + final Map>> expectedRowsMap = new HashMap<>(); + expectedRowsMap.put(APP_ID, expectedRows1); + expectedRowsMap.put(APP_ID_2, expectedRows2); + + // The member and process names as well as client-id are not deterministic, so we don't care about them. + final List dontCares = List.of(3, 6, 7); + + validateDescribeOutput( + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID, "--group", APP_ID_2), + expectedHeader, expectedRowsMap, dontCares); + validateDescribeOutput( + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--group", APP_ID, "--group", APP_ID_2), + expectedHeader, expectedRowsMap, dontCares); + validateDescribeOutput( + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--all-groups"), + expectedHeader, expectedRowsMap, dontCares); + + streams2.close(); + streams2.cleanUp(); + } + + @Test + public void testDescribeNonExistingStreamsGroup() { + final String nonExistingGroup = "non-existing-group"; + final String errorMessage = String.format( + "Error: Executing streams group command failed due to org.apache.kafka.common.errors.GroupIdNotFoundException: Group %s not found.", + nonExistingGroup); + + validateDescribeOutput( + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", nonExistingGroup), errorMessage); + validateDescribeOutput( + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--group", nonExistingGroup), errorMessage); + } + + @Test + public void testDescribeStreamsGroupWithShortTimeout() { + List args = Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID, "--timeout", "1"); + Throwable e = assertThrows(ExecutionException.class, () -> getStreamsGroupService(args.toArray(new String[0])).describeGroups()); + assertEquals(TimeoutException.class, e.getCause().getClass()); + } + + private static Topology topology(String inputTopic, String outputTopic) { final StreamsBuilder builder = new StreamsBuilder(); - builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String())) + builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String())) .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) .groupBy((key, value) -> value) .count() - .toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long())); + .toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long())); return builder.build(); } + private static Properties streamsProp(String appId) { + Properties streamsProp = new Properties(); + streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsProp.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); + streamsProp.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); + streamsProp.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault())); + return streamsProp; + } + private StreamsGroupCommand.StreamsGroupService getStreamsGroupService(String[] args) { StreamsGroupCommandOptions opts = StreamsGroupCommandOptions.fromArgs(args); return new StreamsGroupCommand.StreamsGroupService( @@ -193,6 +288,11 @@ public class DescribeStreamsGroupTest { ); } + private static void validateDescribeOutput(List args, String errorMessage) { + String output = ToolsTestUtils.grabConsoleOutput(() -> StreamsGroupCommand.main(args.toArray(new String[0]))); + assertEquals(errorMessage, output.trim()); + } + private static void validateDescribeOutput( List args, List expectedHeader, @@ -227,4 +327,69 @@ public class DescribeStreamsGroupTest { ); }, () -> String.format("Expected header=%s and groups=%s, but found:%n%s", expectedHeader, expectedRows, out.get())); } + + private static void validateDescribeOutput( + List args, + List expectedHeader, + Map>> expectedRows, + List dontCareIndices + ) throws InterruptedException { + final AtomicReference out = new AtomicReference<>(""); + TestUtils.waitForCondition(() -> { + String output = ToolsTestUtils.grabConsoleOutput(() -> StreamsGroupCommand.main(args.toArray(new String[0]))); + out.set(output); + + String[] lines = output.split("\n"); + if (lines.length == 1 && lines[0].isEmpty()) lines = new String[]{}; + + if (lines.length == 0) return false; + List header = Arrays.asList(lines[0].split("\\s+")); + if (!expectedHeader.equals(header)) return false; + + Map>> groupdescMap = splitOutputByGroup(lines); + + if (groupdescMap.size() != expectedRows.size()) return false; + + // clear the dontCare fields and then compare two sets + boolean compareResult = true; + for (Map.Entry>> entry : groupdescMap.entrySet()) { + String group = entry.getKey(); + Set> groupDesc = entry.getValue(); + if (!expectedRows.containsKey(group)) return false; + Set> expectedGroupDesc = expectedRows.get(group); + if (expectedGroupDesc.size() != groupDesc.size()) + compareResult = false; + for (List list : groupDesc) { + List listCloned = new ArrayList<>(list); + dontCareIndices.forEach(index -> listCloned.set(index, "")); + if (!expectedGroupDesc.contains(listCloned)) { + compareResult = false; + } + } + } + + return compareResult; + }, () -> String.format("Expected header=%s and groups=%s, but found:%n%s", expectedHeader, expectedRows, out.get())); + } + + private static Map>> splitOutputByGroup(String[] lines) { + Map>> groupdescMap = new HashMap<>(); + String headerLine = lines[0].replaceAll(" ", ""); + String groupName = lines[1].split("\\s+")[0]; + int j = 1; + for (int i = j; i < lines.length; i++) { + if (lines[i].replaceAll(" ", "").equals(headerLine) || i == lines.length - 1) { + if (i == lines.length - 1) i++; + Set> groupDesc = Arrays.stream(Arrays.copyOfRange(lines, j, i)) + .map(line -> Arrays.asList(line.split("\\s+"))) + .collect(Collectors.toSet()); + groupdescMap.put(groupName, groupDesc); + if (i + 1 < lines.length) { + j = i + 1; + groupName = lines[j].split("\\s+")[0]; + } + } + } + return groupdescMap; + } } diff --git a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java index 6f7d199b3bd..bf1296dd235 100644 --- a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.AdminClientTestUtils; import org.apache.kafka.clients.admin.DeleteStreamsGroupsOptions; import org.apache.kafka.clients.admin.DeleteStreamsGroupsResult; import org.apache.kafka.clients.admin.DeleteTopicsResult; +import org.apache.kafka.clients.admin.DescribeStreamsGroupsOptions; import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult; import org.apache.kafka.clients.admin.DescribeTopicsOptions; import org.apache.kafka.clients.admin.DescribeTopicsResult; @@ -174,7 +175,7 @@ public class StreamsGroupCommandTest { null); resultMap.put(firstGroup, exp); when(result.all()).thenReturn(KafkaFuture.completedFuture(resultMap)); - when(ADMIN_CLIENT.describeStreamsGroups(ArgumentMatchers.anyCollection())).thenReturn(result); + when(ADMIN_CLIENT.describeStreamsGroups(ArgumentMatchers.anyCollection(), any(DescribeStreamsGroupsOptions.class))).thenReturn(result); StreamsGroupCommandOptions streamsGroupCommandOptions = new StreamsGroupCommandOptions( new String[]{"--bootstrap-server", BOOTSTRAP_SERVERS, "--group", firstGroup, "--describe"});