KAFKA-17125 Add integration test for StreamsGroup in Admin API (#18911)

Integration test for both `--list` and `--describe` commands.
This commit is contained in:
Alieh Saeedi 2025-02-21 16:27:00 +01:00 committed by GitHub
parent 31545eb614
commit 1b02873511
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 462 additions and 48 deletions

View File

@ -20,6 +20,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.ListGroupsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
@ -28,6 +29,7 @@ import org.apache.kafka.clients.admin.StreamsGroupDescription;
import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
import org.apache.kafka.clients.admin.StreamsGroupMemberDescription;
import org.apache.kafka.clients.admin.StreamsGroupSubtopologyDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.TopicPartition;
@ -38,6 +40,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -169,17 +172,19 @@ public class StreamsGroupCommand {
}
public void describeGroups() throws ExecutionException, InterruptedException {
String group = opts.options.valueOf(opts.groupOpt);
StreamsGroupDescription description = getDescribeGroup(group);
if (description == null)
return;
boolean verbose = opts.options.has(opts.verboseOpt);
if (opts.options.has(opts.membersOpt)) {
printMembers(description, verbose);
} else if (opts.options.has(opts.stateOpt)) {
printStates(description, verbose);
} else {
printOffsets(description, verbose);
List<String> groups = listStreamsGroups();
if (!groups.isEmpty()) {
StreamsGroupDescription description = getDescribeGroup(groups.get(0));
if (description == null)
return;
boolean verbose = opts.options.has(opts.verboseOpt);
if (opts.options.has(opts.membersOpt)) {
printMembers(description, verbose);
} else if (opts.options.has(opts.stateOpt)) {
printStates(description, verbose);
} else {
printOffsets(description, verbose);
}
}
}
@ -201,43 +206,52 @@ public class StreamsGroupCommand {
}
if (!verbose) {
String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n";
String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %s\n";
System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
for (StreamsGroupMemberDescription member : members) {
System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS", "CLIENT-ID");
System.out.printf(fmt, description.groupId(), member.memberId(), member.processId(), member.clientId());
printTasks(member.assignment(), false);
System.out.println();
System.out.printf(fmt, description.groupId(), member.memberId(), member.processId(), member.clientId(), getTasksForPrinting(member.assignment(), Optional.empty()));
}
} else {
String fmt = "%" + -groupLen + "s %s %-15s%" + -maxMemberIdLen + "s %s %15s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n";
String fmt = "%" + -groupLen + "s %-25s %-15s%" + -maxMemberIdLen + "s %-15s %-15s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %s\n";
System.out.printf(fmt, "GROUP", "TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
for (StreamsGroupMemberDescription member : members) {
System.out.printf(fmt, "GROUP", "TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT-ID");
System.out.printf(fmt, description.groupId(), description.targetAssignmentEpoch(), description.topologyEpoch(), member.memberId(),
member.isClassic() ? "classic" : "streams", member.memberEpoch(), member.processId(), member.clientId());
printTasks(member.assignment(), false);
printTasks(member.targetAssignment(), true);
System.out.println();
member.isClassic() ? "classic" : "streams", member.memberEpoch(), member.processId(), member.clientId(), getTasksForPrinting(member.assignment(), Optional.of(member.targetAssignment())));
}
}
}
}
private void printTaskType(List<StreamsGroupMemberAssignment.TaskIds> tasks, String taskType) {
System.out.printf("%s%n", taskType + ": " + tasks.stream().map(taskId -> taskId.subtopologyId() + ": [" + taskId.partitions()).collect(Collectors.joining(",")) + "] ");
private String prepareTaskType(List<StreamsGroupMemberAssignment.TaskIds> tasks, String taskType) {
if (tasks.isEmpty()) {
return "";
}
StringBuilder builder = new StringBuilder(taskType).append(": ");
for (StreamsGroupMemberAssignment.TaskIds taskIds : tasks) {
builder.append(taskIds.subtopologyId()).append(":[");
builder.append(taskIds.partitions().stream().map(String::valueOf).collect(Collectors.joining(",")));
builder.append("]; ");
}
return builder.toString();
}
private void printTasks(StreamsGroupMemberAssignment assignment, boolean isTarget) {
String typePrefix = isTarget ? "TARGET-" : "";
printTaskType(assignment.activeTasks(), typePrefix + "ACTIVE-TASKS:");
printTaskType(assignment.standbyTasks(), typePrefix + "STANDBY-TASKS:");
printTaskType(assignment.warmupTasks(), typePrefix + "WARMUP-TASKS:");
private String getTasksForPrinting(StreamsGroupMemberAssignment assignment, Optional<StreamsGroupMemberAssignment> targetAssignment) {
StringBuilder builder = new StringBuilder();
builder.append(prepareTaskType(assignment.activeTasks(), "ACTIVE"))
.append(prepareTaskType(assignment.standbyTasks(), "STANDBY"))
.append(prepareTaskType(assignment.warmupTasks(), "WARMUP"));
targetAssignment.ifPresent(target -> builder.append(prepareTaskType(target.activeTasks(), "TARGET-ACTIVE"))
.append(prepareTaskType(target.standbyTasks(), "TARGET-STANDBY"))
.append(prepareTaskType(target.warmupTasks(), "TARGET-WARMUP")));
return builder.toString();
}
private void printStates(StreamsGroupDescription description, boolean verbose) {
maybePrintEmptyGroupState(description.groupId(), description.groupState(), 1);
int groupLen = Math.max(15, description.groupId().length());
String coordinator = description.coordinator().host() + ":" + description.coordinator().port() + " (" + description.coordinator().idString() + ")";
String coordinator = description.coordinator().host() + ":" + description.coordinator().port() + " (" + description.coordinator().idString() + ")";
int coordinatorLen = Math.max(25, coordinator.length());
if (!verbose) {
@ -245,14 +259,14 @@ public class StreamsGroupCommand {
System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", "#MEMBERS");
System.out.printf(fmt, description.groupId(), coordinator, description.groupState().toString(), description.members().size());
} else {
String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s %-15s %-15s %-15s %s%n";
String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s %-15s %-15s %-25s %s\n";
System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", "GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH", "#MEMBERS");
System.out.printf(fmt, description.groupId(), coordinator, description.groupState().toString(), description.groupEpoch(), description.targetAssignmentEpoch(), description.members().size());
}
}
private void printOffsets(StreamsGroupDescription description, boolean verbose) throws ExecutionException, InterruptedException {
Map<TopicPartition, Long> offsets = getOffsets(description.members(), description);
Map<TopicPartition, OffsetsInfo> offsets = getOffsets(description);
if (maybePrintEmptyGroupState(description.groupId(), description.groupState(), offsets.size())) {
int groupLen = Math.max(15, description.groupId().length());
int maxTopicLen = 15;
@ -261,22 +275,25 @@ public class StreamsGroupCommand {
}
if (!verbose) {
String fmt = "%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s %s%n";
String fmt = "%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s %-15s%n";
System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "OFFSET-LAG");
for (Map.Entry<TopicPartition, Long> offset : offsets.entrySet()) {
System.out.printf(fmt, description.groupId(), offset.getKey().topic(), offset.getKey().partition(), offset.getValue());
for (Map.Entry<TopicPartition, OffsetsInfo> offset : offsets.entrySet()) {
System.out.printf(fmt, description.groupId(), offset.getKey().topic(), offset.getKey().partition(), offset.getValue().lag);
}
} else {
String fmt = "%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s %-15s %s%n";
System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "LEADER-EPOCH", "OFFSET-LAG");
for (Map.Entry<TopicPartition, Long> offset : offsets.entrySet()) {
System.out.printf(fmt, description.groupId(), offset.getKey().topic(), offset.getKey().partition(), "", offset.getValue());
String fmt = "%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s %-15s %-15s %-15s %-15s%n";
System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "CURRENT-OFFSET", "LEADER-EPOCH", "LOG-END-OFFSET", "OFFSET-LAG");
for (Map.Entry<TopicPartition, OffsetsInfo> offset : offsets.entrySet()) {
System.out.printf(fmt, description.groupId(), offset.getKey().topic(), offset.getKey().partition(),
offset.getValue().currentOffset.map(Object::toString).orElse("-"), offset.getValue().leaderEpoch.map(Object::toString).orElse("-"),
offset.getValue().logEndOffset, offset.getValue().lag);
}
}
}
}
Map<TopicPartition, Long> getOffsets(Collection<StreamsGroupMemberDescription> members, StreamsGroupDescription description) throws ExecutionException, InterruptedException {
Map<TopicPartition, OffsetsInfo> getOffsets(StreamsGroupDescription description) throws ExecutionException, InterruptedException {
final Collection<StreamsGroupMemberDescription> members = description.members();
Set<TopicPartition> allTp = new HashSet<>();
for (StreamsGroupMemberDescription memberDescription : members) {
allTp.addAll(getTopicPartitions(memberDescription.assignment().activeTasks(), description));
@ -291,14 +308,31 @@ public class StreamsGroupCommand {
}
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> earliestResult = adminClient.listOffsets(earliest).all().get();
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestResult = adminClient.listOffsets(latest).all().get();
Map<TopicPartition, OffsetAndMetadata> committedOffsets = getCommittedOffsets(description.groupId());
Map<TopicPartition, Long> lag = new HashMap<>();
Map<TopicPartition, OffsetsInfo> output = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> tp : earliestResult.entrySet()) {
lag.put(tp.getKey(), latestResult.get(tp.getKey()).offset() - earliestResult.get(tp.getKey()).offset());
final Optional<Long> currentOffset = committedOffsets.containsKey(tp.getKey()) ? Optional.of(committedOffsets.get(tp.getKey()).offset()) : Optional.empty();
final Optional<Integer> leaderEpoch = committedOffsets.containsKey(tp.getKey()) ? committedOffsets.get(tp.getKey()).leaderEpoch() : Optional.empty();
final long lag = currentOffset.map(current -> latestResult.get(tp.getKey()).offset() - current).orElseGet(() -> latestResult.get(tp.getKey()).offset() - earliestResult.get(tp.getKey()).offset());
output.put(tp.getKey(),
new OffsetsInfo(
currentOffset,
leaderEpoch,
latestResult.get(tp.getKey()).offset(),
lag));
}
return lag;
return output;
}
Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets(String groupId) {
try {
return adminClient.listConsumerGroupOffsets(
Collections.singletonMap(groupId, new ListConsumerGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
/**
* Prints a summary of the state for situations where the group is empty or dead.
@ -347,4 +381,7 @@ public class StreamsGroupCommand {
return Admin.create(props);
}
}
record OffsetsInfo(Optional<Long> currentOffset, Optional<Integer> leaderEpoch, Long logEndOffset, Long lag) {
}
}

View File

@ -60,6 +60,11 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions {
public final OptionSpec<Void> verboseOpt;
public static StreamsGroupCommandOptions fromArgs(String[] args) {
StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args);
opts.checkArgs();
return opts;
}
public StreamsGroupCommandOptions(String[] args) {
super(args);
@ -104,9 +109,6 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions {
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt);
if (options.has(describeOpt)) {
if (!options.has(groupOpt))
CommandLineUtils.printUsageAndExit(parser,
"Option " + describeOpt + " takes the option: " + groupOpt);
List<OptionSpec<?>> mutuallyExclusiveOpts = Arrays.asList(membersOpt, offsetsOpt, stateOpt);
if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 1 : 0).sum() > 1) {
CommandLineUtils.printUsageAndExit(parser,

View File

@ -0,0 +1,366 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools.streams;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.ToolsTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import joptsimple.OptionException;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
@Timeout(600)
@Tag("integration")
public class StreamsGroupCommandTest {
public static EmbeddedKafkaCluster cluster = null;
static KafkaStreams streams;
private static final String APP_ID = "streams-group-command-test";
private static final String INPUT_TOPIC = "customInputTopic";
private static final String OUTPUT_TOPIC = "customOutputTopic";
@BeforeAll
public static void setup() throws Exception {
// start the cluster and create the input topic
final Properties props = new Properties();
props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,streams");
cluster = new EmbeddedKafkaCluster(1, props);
cluster.start();
cluster.createTopic(INPUT_TOPIC, 2, 1);
// start kafka streams
Properties streamsProp = new Properties();
streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsProp.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
streamsProp.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
streamsProp.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
streams = new KafkaStreams(topology(), streamsProp);
startApplicationAndWaitUntilRunning(streams);
}
@AfterAll
public static void closeCluster() {
streams.close();
cluster.stop();
cluster = null;
}
@Test
public void testListStreamsGroupWithoutFilters() throws Exception {
try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list"})) {
Set<String> expectedGroups = new HashSet<>(Collections.singleton(APP_ID));
final AtomicReference<Set> foundGroups = new AtomicReference<>();
TestUtils.waitForCondition(() -> {
foundGroups.set(new HashSet<>(service.listStreamsGroups()));
return Objects.equals(expectedGroups, foundGroups.get());
}, "Expected --list to show streams groups " + expectedGroups + ", but found " + foundGroups.get() + ".");
}
}
@Test
public void testListWithUnrecognizedNewOption() throws Exception {
String[] cgcArgs = new String[]{"--new-option", "--bootstrap-server", cluster.bootstrapServers(), "--list"};
Assertions.assertThrows(OptionException.class, () -> getStreamsGroupService(cgcArgs));
}
@Test
public void testListStreamsGroupWithStates() throws Exception {
try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list", "--state"})) {
Set<GroupListing> expectedListing = Set.of(
new GroupListing(
APP_ID,
Optional.of(GroupType.STREAMS),
"streams",
Optional.of(GroupState.STABLE))
);
final AtomicReference<Set<GroupListing>> foundListing = new AtomicReference<>();
TestUtils.waitForCondition(() -> {
foundListing.set(new HashSet<>(service.listStreamsGroupsInStates(Collections.emptySet())));
return Objects.equals(expectedListing, foundListing.get());
}, "Expected --list to show streams groups " + expectedListing + ", but found " + foundListing.get() + ".");
}
}
@Test
public void testListStreamsGroupWithSpecifiedStates() throws Exception {
try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "stable"})) {
Set<GroupListing> expectedListing = Set.of(
new GroupListing(
APP_ID,
Optional.of(GroupType.STREAMS),
"streams",
Optional.of(GroupState.STABLE))
);
final AtomicReference<Set<GroupListing>> foundListing = new AtomicReference<>();
TestUtils.waitForCondition(() -> {
foundListing.set(new HashSet<>(service.listStreamsGroupsInStates(Collections.emptySet())));
return Objects.equals(expectedListing, foundListing.get());
}, "Expected --list to show streams groups " + expectedListing + ", but found " + foundListing.get() + ".");
}
try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "PreparingRebalance"})) {
Set<GroupListing> expectedListing = Collections.emptySet();
final AtomicReference<Set<GroupListing>> foundListing = new AtomicReference<>();
TestUtils.waitForCondition(() -> {
foundListing.set(new HashSet<>(service.listStreamsGroupsInStates(Collections.singleton(GroupState.PREPARING_REBALANCE))));
return Objects.equals(expectedListing, foundListing.get());
}, "Expected --list to show streams groups " + expectedListing + ", but found " + foundListing.get() + ".");
}
}
@Test
public void testListStreamsGroupOutput() throws Exception {
validateListOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--list"),
Collections.emptyList(),
Set.of(Collections.singletonList(APP_ID))
);
validateListOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--list", "--state"),
Arrays.asList("GROUP", "STATE"),
Set.of(Arrays.asList(APP_ID, "Stable"))
);
validateListOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "Stable"),
Arrays.asList("GROUP", "STATE"),
Set.of(Arrays.asList(APP_ID, "Stable"))
);
// Check case-insensitivity in state filter.
validateListOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "stable"),
Arrays.asList("GROUP", "STATE"),
Set.of(Arrays.asList(APP_ID, "Stable"))
);
}
@Test
public void testDescribeStreamsGroup() throws Exception {
final List<String> expectedHeader = List.of("GROUP", "TOPIC", "PARTITION", "OFFSET-LAG");
final Set<List<String>> expectedRows = Set.of(
List.of(APP_ID, INPUT_TOPIC, "0", "0"),
List.of(APP_ID, INPUT_TOPIC, "1", "0"),
List.of(APP_ID, "streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition", "0", "0"),
List.of(APP_ID, "streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition", "1", "0"));
validateDescribeOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe"), expectedHeader, expectedRows, List.of());
// --describe --offsets has the same output as --describe
validateDescribeOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--offsets"), expectedHeader, expectedRows, List.of());
}
@Test
public void testDescribeStreamsGroupWithVerboseOption() throws Exception {
final List<String> expectedHeader = List.of("GROUP", "TOPIC", "PARTITION", "CURRENT-OFFSET", "LEADER-EPOCH", "LOG-END-OFFSET", "OFFSET-LAG");
final Set<List<String>> expectedRows = Set.of(
List.of(APP_ID, INPUT_TOPIC, "0", "-", "-", "0", "0"),
List.of(APP_ID, INPUT_TOPIC, "1", "-", "-", "0", "0"),
List.of(APP_ID, "streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition", "0", "-", "-", "0", "0"),
List.of(APP_ID, "streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition", "1", "-", "-", "0", "0"));
// The state-store-topic name is not deterministic, so we don't care about topic names.
validateDescribeOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose"), expectedHeader, expectedRows, List.of());
// --describe --offsets has the same output as --describe
validateDescribeOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--offsets", "--verbose"), expectedHeader, expectedRows, List.of());
validateDescribeOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose", "--offsets"), expectedHeader, expectedRows, List.of());
}
@Test
public void testDescribeStreamsGroupWithStateOption() throws Exception {
final List<String> expectedHeader = Arrays.asList("GROUP", "COORDINATOR", "(ID)", "STATE", "#MEMBERS");
final Set<List<String>> expectedRows = Set.of(Arrays.asList(APP_ID, "", "", "Stable", "2"));
// The coordinator is not deterministic, so we don't care about it.
final List<Integer> dontCares = List.of(1, 2);
validateDescribeOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--state"), expectedHeader, expectedRows, dontCares);
}
@Test
public void testDescribeStreamsGroupWithStateAndVerboseOptions() throws Exception {
final List<String> expectedHeader = Arrays.asList("GROUP", "COORDINATOR", "(ID)", "STATE", "GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH", "#MEMBERS");
final Set<List<String>> expectedRows = Set.of(Arrays.asList(APP_ID, "", "", "Stable", "3", "3", "2"));
// The coordinator is not deterministic, so we don't care about it.
final List<Integer> dontCares = List.of(1, 2);
validateDescribeOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--state", "--verbose"), expectedHeader, expectedRows, dontCares);
validateDescribeOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose", "--state"), expectedHeader, expectedRows, dontCares);
}
@Test
public void testDescribeStreamsGroupWithMembersOption() throws Exception {
final List<String> expectedHeader = List.of("GROUP", "MEMBER", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
final Set<List<String>> expectedRows = Set.of(
List.of(APP_ID, "", "", "", "ACTIVE:","0:[0,1];"),
List.of(APP_ID, "", "", "", "ACTIVE:","1:[0,1];"));
// The member and process names as well as client-id are not deterministic, so we don't care about them.
final List<Integer> dontCares = List.of(1, 2, 3);
validateDescribeOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--members"), expectedHeader, expectedRows, dontCares);
}
@Test
public void testDescribeStreamsGroupWithMembersAndVerboseOptions() throws Exception {
final List<String> expectedHeader = List.of("GROUP", "TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
final Set<List<String>> expectedRows = Set.of(
List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:","0:[0,1];", "TARGET-ACTIVE:","0:[0,1];"),
List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:","1:[0,1];", "TARGET-ACTIVE:","1:[0,1];"));
// The member and process names as well as client-id are not deterministic, so we don't care about them.
final List<Integer> dontCares = List.of(3, 6, 7);
validateDescribeOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--members", "--verbose"), expectedHeader, expectedRows, dontCares);
validateDescribeOutput(
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose", "--members"), expectedHeader, expectedRows, dontCares);
}
private static Topology topology() {
final StreamsBuilder builder = new StreamsBuilder();
builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> value)
.count()
.toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
return builder.build();
}
private StreamsGroupCommand.StreamsGroupService getStreamsGroupService(String[] args) {
StreamsGroupCommandOptions opts = StreamsGroupCommandOptions.fromArgs(args);
return new StreamsGroupCommand.StreamsGroupService(
opts,
Collections.singletonMap(AdminClientConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE))
);
}
private static void validateListOutput(
List<String> args,
List<String> expectedHeader,
Set<List<String>> expectedRows
) throws InterruptedException {
final AtomicReference<String> out = new AtomicReference<>("");
TestUtils.waitForCondition(() -> {
String output = ToolsTestUtils.grabConsoleOutput(() -> StreamsGroupCommand.main(args.toArray(new String[0])));
out.set(output);
String[] lines = output.split("\n");
if (lines.length == 1 && lines[0].isEmpty()) lines = new String[]{};
if (!expectedHeader.isEmpty() && lines.length > 0) {
List<String> header = Arrays.asList(lines[0].split("\\s+"));
if (!expectedHeader.equals(header)) return false;
}
Set<List<String>> groups = Arrays.stream(lines, expectedHeader.isEmpty() ? 0 : 1, lines.length)
.map(line -> Arrays.asList(line.split("\\s+")))
.collect(Collectors.toSet());
return expectedRows.equals(groups);
}, () -> String.format("Expected header=%s and groups=%s, but found:%n%s", expectedHeader, expectedRows, out.get()));
}
private static void validateDescribeOutput(
List<String> args,
List<String> expectedHeader,
Set<List<String>> expectedRows,
List<Integer> dontCareIndices
) throws InterruptedException {
final AtomicReference<String> out = new AtomicReference<>("");
TestUtils.waitForCondition(() -> {
String output = ToolsTestUtils.grabConsoleOutput(() -> StreamsGroupCommand.main(args.toArray(new String[0])));
out.set(output);
String[] lines = output.split("\n");
if (lines.length == 1 && lines[0].isEmpty()) lines = new String[]{};
if (lines.length == 0) return false;
List<String> header = Arrays.asList(lines[0].split("\\s+"));
if (!expectedHeader.equals(header)) return false;
Set<List<String>> groupDesc = Arrays.stream(Arrays.copyOfRange(lines, 1, lines.length))
.map(line -> Arrays.asList(line.split("\\s+")))
.collect(Collectors.toSet());
if (groupDesc.size() != expectedRows.size()) return false;
// clear the dontCare fields and then compare two sets
return expectedRows
.equals(
groupDesc.stream()
.map(list -> {
List<String> listCloned = new ArrayList<>(list);
dontCareIndices.forEach(index -> listCloned.set(index, ""));
return listCloned;
}).collect(Collectors.toSet())
);
}, () -> String.format("Expected header=%s and groups=%s, but found:%n%s", expectedHeader, expectedRows, out.get()));
}
}

View File

@ -20,6 +20,7 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.ListGroupsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
@ -28,6 +29,7 @@ import org.apache.kafka.clients.admin.StreamsGroupDescription;
import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
import org.apache.kafka.clients.admin.StreamsGroupMemberDescription;
import org.apache.kafka.clients.admin.StreamsGroupSubtopologyDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaFuture;
@ -177,6 +179,13 @@ public class StreamsGroupCommandUnitTest {
when(adminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(startOffset, endOffset);
ListConsumerGroupOffsetsResult result = mock(ListConsumerGroupOffsetsResult.class);
Map<TopicPartition, OffsetAndMetadata> committedOffsetsMap = new HashMap<>();
committedOffsetsMap.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(12, Optional.of(0), ""));
when(adminClient.listConsumerGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(result);
when(result.partitionsToOffsetAndMetadata(ArgumentMatchers.anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap));
StreamsGroupMemberDescription description = new StreamsGroupMemberDescription("foo", 0, Optional.empty(),
Optional.empty(), "bar", "baz", 0, "qux",
Optional.empty(), Map.of(), List.of(), List.of(),
@ -193,9 +202,9 @@ public class StreamsGroupCommandUnitTest {
new Node(0, "host", 0),
null);
StreamsGroupCommand.StreamsGroupService service = new StreamsGroupCommand.StreamsGroupService(null, adminClient);
Map<TopicPartition, Long> lags = service.getOffsets(List.of(description), x);
Map<TopicPartition, StreamsGroupCommand.OffsetsInfo> lags = service.getOffsets(x);
assertEquals(1, lags.size());
assertEquals(20, lags.get(new TopicPartition("topic1", 0)));
assertEquals(new StreamsGroupCommand.OffsetsInfo(Optional.of(12L), Optional.of(0), 30L, 18L), lags.get(new TopicPartition("topic1", 0)));
service.close();
}