KAFKA-18288: Fix Streams CLI describe (#20099)
CI / build (push) Waiting to run Details

This PR includes the following fixes:

- Streams CLI used to list and return the description of the first group
which is a bug. With this fix, it returns the descriptions of the groups
specified by the `--group` or `all-groups`. Integration test are added
to verify the fix.
- `timeoutOption` is missing in describe groups. This fix adds and tests
it with short timeout.
- `DescribeStreamsGroupsHandler` used to return an empty group in `DEAD`
state when the group id was not found, but with this fix, it throws
`GroupIdNotFoundException`
This commit is contained in:
Alieh Saeedi 2025-07-08 15:28:56 +02:00 committed by GitHub
parent a88fd01e74
commit db1c6f31a3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 212 additions and 55 deletions

View File

@ -36,7 +36,6 @@ import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -259,22 +258,9 @@ public class DescribeStreamsGroupsHandler extends AdminApiHandler.Batched<Coordi
break;
case GROUP_ID_NOT_FOUND:
// In order to maintain compatibility with describeConsumerGroups, an unknown group ID is
// reported as a DEAD streams group, and the admin client operation did not fail
log.debug("`DescribeStreamsGroups` request for group id {} failed because the group does not exist. {}",
groupId.idValue, errorMsg != null ? errorMsg : "");
final StreamsGroupDescription streamsGroupDescription =
new StreamsGroupDescription(
groupId.idValue,
-1,
-1,
-1,
Collections.emptySet(),
Collections.emptySet(),
GroupState.DEAD,
coordinator,
validAclOperations(describedGroup.authorizedOperations()));
completed.put(groupId, streamsGroupDescription);
failed.put(groupId, error.exception(errorMsg));
break;
default:

View File

@ -23,6 +23,7 @@ import org.apache.kafka.clients.admin.DeleteStreamsGroupOffsetsOptions;
import org.apache.kafka.clients.admin.DeleteStreamsGroupOffsetsResult;
import org.apache.kafka.clients.admin.DeleteStreamsGroupsOptions;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeStreamsGroupsOptions;
import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
@ -242,24 +243,28 @@ public class StreamsGroupCommand {
}
public void describeGroups() throws ExecutionException, InterruptedException {
List<String> groups = listStreamsGroups();
if (!groups.isEmpty()) {
StreamsGroupDescription description = getDescribeGroup(groups.get(0));
if (description == null)
return;
boolean verbose = opts.options.has(opts.verboseOpt);
if (opts.options.has(opts.membersOpt)) {
printMembers(description, verbose);
} else if (opts.options.has(opts.stateOpt)) {
printStates(description, verbose);
} else {
printOffsets(description, verbose);
List<String> groupIds = opts.options.has(opts.allGroupsOpt)
? new ArrayList<>(listStreamsGroups())
: new ArrayList<>(opts.options.valuesOf(opts.groupOpt));
if (!groupIds.isEmpty()) {
for (String groupId : groupIds) {
StreamsGroupDescription description = getDescribeGroup(groupId);
boolean verbose = opts.options.has(opts.verboseOpt);
if (opts.options.has(opts.membersOpt)) {
printMembers(description, verbose);
} else if (opts.options.has(opts.stateOpt)) {
printStates(description, verbose);
} else {
printOffsets(description, verbose);
}
}
}
}
StreamsGroupDescription getDescribeGroup(String group) throws ExecutionException, InterruptedException {
DescribeStreamsGroupsResult result = adminClient.describeStreamsGroups(List.of(group));
DescribeStreamsGroupsResult result = adminClient.describeStreamsGroups(
List.of(group),
new DescribeStreamsGroupsOptions().timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue()));
Map<String, StreamsGroupDescription> descriptionMap = result.all().get();
return descriptionMap.get(group);
}

View File

@ -18,7 +18,9 @@ package org.apache.kafka.tools.streams;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.KafkaStreams;
@ -39,15 +41,24 @@ import org.junit.jupiter.api.Timeout;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import joptsimple.OptionException;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(600)
@Tag("integration")
@ -55,9 +66,13 @@ public class DescribeStreamsGroupTest {
public static EmbeddedKafkaCluster cluster = null;
static KafkaStreams streams;
private static final String APP_ID = "streams-group-command-test";
private static final String APP_ID_2 = "streams-group-command-test-2";
private static final String INPUT_TOPIC = "customInputTopic";
private static final String OUTPUT_TOPIC = "customOutputTopic";
private static final String INPUT_TOPIC_2 = "customInputTopic2";
private static final String OUTPUT_TOPIC_2 = "customOutputTopic2";
private static String bootstrapServers;
@BeforeAll
public static void setup() throws Exception {
// start the cluster and create the input topic
@ -66,20 +81,12 @@ public class DescribeStreamsGroupTest {
cluster = new EmbeddedKafkaCluster(1, props);
cluster.start();
cluster.createTopic(INPUT_TOPIC, 2, 1);
bootstrapServers = cluster.bootstrapServers();
// start kafka streams
Properties streamsProp = new Properties();
streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsProp.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
streamsProp.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
streamsProp.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
streams = new KafkaStreams(topology(), streamsProp);
Properties streamsProp = streamsProp(APP_ID);
streams = new KafkaStreams(topology(INPUT_TOPIC, OUTPUT_TOPIC), streamsProp);
startApplicationAndWaitUntilRunning(streams);
}
@ -90,6 +97,28 @@ public class DescribeStreamsGroupTest {
cluster = null;
}
@Test
public void testDescribeWithUnrecognizedOption() {
String[] args = new String[]{"--unrecognized-option", "--bootstrap-server", bootstrapServers, "--describe", "--group", APP_ID};
assertThrows(OptionException.class, () -> getStreamsGroupService(args));
}
@Test
public void testDescribeWithoutGroupOption() {
final String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--describe"};
AtomicBoolean exited = new AtomicBoolean(false);
Exit.setExitProcedure(((statusCode, message) -> {
assertNotEquals(0, statusCode);
assertTrue(message.contains("Option [describe] takes one of these options: [all-groups], [group]"));
exited.set(true);
}));
try {
getStreamsGroupService(args);
} finally {
assertTrue(exited.get());
}
}
@Test
public void testDescribeStreamsGroup() throws Exception {
final List<String> expectedHeader = List.of("GROUP", "TOPIC", "PARTITION", "OFFSET-LAG");
@ -100,10 +129,10 @@ public class DescribeStreamsGroupTest {
List.of(APP_ID, "streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition", "1", "0"));
validateDescribeOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--group", APP_ID), expectedHeader, expectedRows, List.of());
Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--group", APP_ID), expectedHeader, expectedRows, List.of());
// --describe --offsets has the same output as --describe
validateDescribeOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--offsets", "--group", APP_ID), expectedHeader, expectedRows, List.of());
Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--offsets", "--group", APP_ID), expectedHeader, expectedRows, List.of());
}
@Test
@ -116,12 +145,12 @@ public class DescribeStreamsGroupTest {
List.of(APP_ID, "streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition", "1", "-", "-", "0", "0"));
validateDescribeOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, List.of());
Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, List.of());
// --describe --offsets has the same output as --describe
validateDescribeOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--offsets", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, List.of());
Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--offsets", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, List.of());
validateDescribeOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose", "--offsets", "--group", APP_ID), expectedHeader, expectedRows, List.of());
Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--offsets", "--group", APP_ID), expectedHeader, expectedRows, List.of());
}
@Test
@ -131,7 +160,7 @@ public class DescribeStreamsGroupTest {
// The coordinator is not deterministic, so we don't care about it.
final List<Integer> dontCares = List.of(1, 2);
validateDescribeOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--state", "--group", APP_ID), expectedHeader, expectedRows, dontCares);
Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--state", "--group", APP_ID), expectedHeader, expectedRows, dontCares);
}
@Test
@ -142,9 +171,9 @@ public class DescribeStreamsGroupTest {
final List<Integer> dontCares = List.of(1, 2);
validateDescribeOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--state", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, dontCares);
Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--state", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, dontCares);
validateDescribeOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose", "--state", "--group", APP_ID), expectedHeader, expectedRows, dontCares);
Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--state", "--group", APP_ID), expectedHeader, expectedRows, dontCares);
}
@Test
@ -157,7 +186,7 @@ public class DescribeStreamsGroupTest {
final List<Integer> dontCares = List.of(1, 2, 3);
validateDescribeOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--members", "--group", APP_ID), expectedHeader, expectedRows, dontCares);
Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--members", "--group", APP_ID), expectedHeader, expectedRows, dontCares);
}
@Test
@ -170,21 +199,87 @@ public class DescribeStreamsGroupTest {
final List<Integer> dontCares = List.of(3, 6, 7);
validateDescribeOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--members", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, dontCares);
Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, dontCares);
validateDescribeOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose", "--members", "--group", APP_ID), expectedHeader, expectedRows, dontCares);
Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--group", APP_ID), expectedHeader, expectedRows, dontCares);
}
private static Topology topology() {
@Test
public void testDescribeMultipleStreamsGroupWithMembersAndVerboseOptions() throws Exception {
KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2, OUTPUT_TOPIC_2), streamsProp(APP_ID_2));
startApplicationAndWaitUntilRunning(streams2);
final List<String> expectedHeader = List.of("GROUP", "TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
final Set<List<String>> expectedRows1 = Set.of(
List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "0:[0,1];", "TARGET-ACTIVE:", "0:[0,1];"),
List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "1:[0,1];", "TARGET-ACTIVE:", "1:[0,1];"));
final Set<List<String>> expectedRows2 = Set.of(
List.of(APP_ID_2, "2", "0", "dont-care", "streams", "2", "", ""),
List.of(APP_ID_2, "2", "0", "", "streams", "2", "", ""));
final Map<String, Set<List<String>>> expectedRowsMap = new HashMap<>();
expectedRowsMap.put(APP_ID, expectedRows1);
expectedRowsMap.put(APP_ID_2, expectedRows2);
// The member and process names as well as client-id are not deterministic, so we don't care about them.
final List<Integer> dontCares = List.of(3, 6, 7);
validateDescribeOutput(
Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID, "--group", APP_ID_2),
expectedHeader, expectedRowsMap, dontCares);
validateDescribeOutput(
Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--group", APP_ID, "--group", APP_ID_2),
expectedHeader, expectedRowsMap, dontCares);
validateDescribeOutput(
Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--all-groups"),
expectedHeader, expectedRowsMap, dontCares);
streams2.close();
streams2.cleanUp();
}
@Test
public void testDescribeNonExistingStreamsGroup() {
final String nonExistingGroup = "non-existing-group";
final String errorMessage = String.format(
"Error: Executing streams group command failed due to org.apache.kafka.common.errors.GroupIdNotFoundException: Group %s not found.",
nonExistingGroup);
validateDescribeOutput(
Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", nonExistingGroup), errorMessage);
validateDescribeOutput(
Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--group", nonExistingGroup), errorMessage);
}
@Test
public void testDescribeStreamsGroupWithShortTimeout() {
List<String> args = Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID, "--timeout", "1");
Throwable e = assertThrows(ExecutionException.class, () -> getStreamsGroupService(args.toArray(new String[0])).describeGroups());
assertEquals(TimeoutException.class, e.getCause().getClass());
}
private static Topology topology(String inputTopic, String outputTopic) {
final StreamsBuilder builder = new StreamsBuilder();
builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> value)
.count()
.toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
return builder.build();
}
private static Properties streamsProp(String appId) {
Properties streamsProp = new Properties();
streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsProp.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
streamsProp.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
streamsProp.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
return streamsProp;
}
private StreamsGroupCommand.StreamsGroupService getStreamsGroupService(String[] args) {
StreamsGroupCommandOptions opts = StreamsGroupCommandOptions.fromArgs(args);
return new StreamsGroupCommand.StreamsGroupService(
@ -193,6 +288,11 @@ public class DescribeStreamsGroupTest {
);
}
private static void validateDescribeOutput(List<String> args, String errorMessage) {
String output = ToolsTestUtils.grabConsoleOutput(() -> StreamsGroupCommand.main(args.toArray(new String[0])));
assertEquals(errorMessage, output.trim());
}
private static void validateDescribeOutput(
List<String> args,
List<String> expectedHeader,
@ -227,4 +327,69 @@ public class DescribeStreamsGroupTest {
);
}, () -> String.format("Expected header=%s and groups=%s, but found:%n%s", expectedHeader, expectedRows, out.get()));
}
private static void validateDescribeOutput(
List<String> args,
List<String> expectedHeader,
Map<String, Set<List<String>>> expectedRows,
List<Integer> dontCareIndices
) throws InterruptedException {
final AtomicReference<String> out = new AtomicReference<>("");
TestUtils.waitForCondition(() -> {
String output = ToolsTestUtils.grabConsoleOutput(() -> StreamsGroupCommand.main(args.toArray(new String[0])));
out.set(output);
String[] lines = output.split("\n");
if (lines.length == 1 && lines[0].isEmpty()) lines = new String[]{};
if (lines.length == 0) return false;
List<String> header = Arrays.asList(lines[0].split("\\s+"));
if (!expectedHeader.equals(header)) return false;
Map<String, Set<List<String>>> groupdescMap = splitOutputByGroup(lines);
if (groupdescMap.size() != expectedRows.size()) return false;
// clear the dontCare fields and then compare two sets
boolean compareResult = true;
for (Map.Entry<String, Set<List<String>>> entry : groupdescMap.entrySet()) {
String group = entry.getKey();
Set<List<String>> groupDesc = entry.getValue();
if (!expectedRows.containsKey(group)) return false;
Set<List<String>> expectedGroupDesc = expectedRows.get(group);
if (expectedGroupDesc.size() != groupDesc.size())
compareResult = false;
for (List<String> list : groupDesc) {
List<String> listCloned = new ArrayList<>(list);
dontCareIndices.forEach(index -> listCloned.set(index, ""));
if (!expectedGroupDesc.contains(listCloned)) {
compareResult = false;
}
}
}
return compareResult;
}, () -> String.format("Expected header=%s and groups=%s, but found:%n%s", expectedHeader, expectedRows, out.get()));
}
private static Map<String, Set<List<String>>> splitOutputByGroup(String[] lines) {
Map<String, Set<List<String>>> groupdescMap = new HashMap<>();
String headerLine = lines[0].replaceAll(" ", "");
String groupName = lines[1].split("\\s+")[0];
int j = 1;
for (int i = j; i < lines.length; i++) {
if (lines[i].replaceAll(" ", "").equals(headerLine) || i == lines.length - 1) {
if (i == lines.length - 1) i++;
Set<List<String>> groupDesc = Arrays.stream(Arrays.copyOfRange(lines, j, i))
.map(line -> Arrays.asList(line.split("\\s+")))
.collect(Collectors.toSet());
groupdescMap.put(groupName, groupDesc);
if (i + 1 < lines.length) {
j = i + 1;
groupName = lines[j].split("\\s+")[0];
}
}
}
return groupdescMap;
}
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.AdminClientTestUtils;
import org.apache.kafka.clients.admin.DeleteStreamsGroupsOptions;
import org.apache.kafka.clients.admin.DeleteStreamsGroupsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeStreamsGroupsOptions;
import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
@ -174,7 +175,7 @@ public class StreamsGroupCommandTest {
null);
resultMap.put(firstGroup, exp);
when(result.all()).thenReturn(KafkaFuture.completedFuture(resultMap));
when(ADMIN_CLIENT.describeStreamsGroups(ArgumentMatchers.anyCollection())).thenReturn(result);
when(ADMIN_CLIENT.describeStreamsGroups(ArgumentMatchers.anyCollection(), any(DescribeStreamsGroupsOptions.class))).thenReturn(result);
StreamsGroupCommandOptions streamsGroupCommandOptions = new StreamsGroupCommandOptions(
new String[]{"--bootstrap-server", BOOTSTRAP_SERVERS, "--group", firstGroup, "--describe"});