KAFKA-18925: Add streams groups support to Admin.listGroups (#19155)

Add support so that Admin.listGroups can represent
streams groups and their states.

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Lucas Brutschy 2025-03-11 15:48:07 +01:00 committed by GitHub
parent 278a93c45d
commit 6551e87815
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 200 additions and 11 deletions

View File

@ -32,17 +32,18 @@ import java.util.stream.Collectors;
* The following table shows the correspondence between the group states and types. * The following table shows the correspondence between the group states and types.
* <table> * <table>
* <thead> * <thead>
* <tr><th>State</th><th>Classic group</th><th>Consumer group</th><th>Share group</th></tr> * <tr><th>State</th><th>Classic group</th><th>Consumer group</th><th>Share group</th><th>Streams group</th></tr>
* </thead> * </thead>
* <tbody> * <tbody>
* <tr><td>UNKNOWN</td><td>Yes</td><td>Yes</td><td>Yes</td></tr> * <tr><td>UNKNOWN</td><td>Yes</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
* <tr><td>PREPARING_REBALANCE</td><td>Yes</td><td>Yes</td><td></td></tr> * <tr><td>PREPARING_REBALANCE</td><td>Yes</td><td>Yes</td><td></td><td></td></tr>
* <tr><td>COMPLETING_REBALANCE</td><td>Yes</td><td>Yes</td><td></td></tr> * <tr><td>COMPLETING_REBALANCE</td><td>Yes</td><td>Yes</td><td></td><td></td></tr>
* <tr><td>STABLE</td><td>Yes</td><td>Yes</td><td>Yes</td></tr> * <tr><td>STABLE</td><td>Yes</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
* <tr><td>DEAD</td><td>Yes</td><td>Yes</td><td>Yes</td></tr> * <tr><td>DEAD</td><td>Yes</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
* <tr><td>EMPTY</td><td>Yes</td><td>Yes</td><td>Yes</td></tr> * <tr><td>EMPTY</td><td>Yes</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
* <tr><td>ASSIGNING</td><td></td><td>Yes</td><td></td></tr> * <tr><td>ASSIGNING</td><td></td><td>Yes</td><td></td><td>Yes</td></tr>
* <tr><td>RECONCILING</td><td></td><td>Yes</td><td></td></tr> * <tr><td>RECONCILING</td><td></td><td>Yes</td><td></td><td>Yes</td></tr>
* <tr><td>NOT_READY</td><td></td><td></td><td></td><td>Yes</td></tr>
* </tbody> * </tbody>
* </table> * </table>
*/ */
@ -55,7 +56,8 @@ public enum GroupState {
DEAD("Dead"), DEAD("Dead"),
EMPTY("Empty"), EMPTY("Empty"),
ASSIGNING("Assigning"), ASSIGNING("Assigning"),
RECONCILING("Reconciling"); RECONCILING("Reconciling"),
NOT_READY("NotReady");
private static final Map<String, GroupState> NAME_TO_ENUM = Arrays.stream(values()) private static final Map<String, GroupState> NAME_TO_ENUM = Arrays.stream(values())
.collect(Collectors.toMap(state -> state.name.toUpperCase(Locale.ROOT), Function.identity())); .collect(Collectors.toMap(state -> state.name.toUpperCase(Locale.ROOT), Function.identity()));
@ -79,6 +81,8 @@ public enum GroupState {
return Set.of(PREPARING_REBALANCE, COMPLETING_REBALANCE, STABLE, DEAD, EMPTY); return Set.of(PREPARING_REBALANCE, COMPLETING_REBALANCE, STABLE, DEAD, EMPTY);
} else if (type == GroupType.CONSUMER) { } else if (type == GroupType.CONSUMER) {
return Set.of(PREPARING_REBALANCE, COMPLETING_REBALANCE, STABLE, DEAD, EMPTY, ASSIGNING, RECONCILING); return Set.of(PREPARING_REBALANCE, COMPLETING_REBALANCE, STABLE, DEAD, EMPTY, ASSIGNING, RECONCILING);
} else if (type == GroupType.STREAMS) {
return Set.of(STABLE, DEAD, EMPTY, ASSIGNING, RECONCILING, NOT_READY);
} else if (type == GroupType.SHARE) { } else if (type == GroupType.SHARE) {
return Set.of(STABLE, DEAD, EMPTY); return Set.of(STABLE, DEAD, EMPTY);
} else { } else {

View File

@ -26,7 +26,8 @@ public enum GroupType {
UNKNOWN("Unknown"), UNKNOWN("Unknown"),
CONSUMER("Consumer"), CONSUMER("Consumer"),
CLASSIC("Classic"), CLASSIC("Classic"),
SHARE("Share"); SHARE("Share"),
STREAMS("Streams");
private static final Map<String, GroupType> NAME_TO_ENUM = Arrays.stream(values()) private static final Map<String, GroupType> NAME_TO_ENUM = Arrays.stream(values())
.collect(Collectors.toMap(type -> type.name.toLowerCase(Locale.ROOT), Function.identity())); .collect(Collectors.toMap(type -> type.name.toLowerCase(Locale.ROOT), Function.identity()));

View File

@ -140,6 +140,7 @@ import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse; import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData; import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData.ListedGroup;
import org.apache.kafka.common.message.ListOffsetsResponseData; import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse; import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
@ -5991,6 +5992,189 @@ public class KafkaAdminClientTest {
} }
} }
@Test
public void testListStreamsGroups() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0),
AdminClientConfig.RETRIES_CONFIG, "2")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Empty metadata response should be retried
env.kafkaClient().prepareResponse(
RequestTestUtils.metadataResponse(
Collections.emptyList(),
env.cluster().clusterResource().clusterId(),
-1,
Collections.emptyList()));
env.kafkaClient().prepareResponse(
RequestTestUtils.metadataResponse(
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
env.cluster().controller().id(),
Collections.emptyList()));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(singletonList(
new ListedGroup()
.setGroupId("streams-group-1")
.setGroupType(GroupType.STREAMS.toString())
.setGroupState("Stable")
))),
env.cluster().nodeById(0));
// handle retriable errors
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setGroups(Collections.emptyList())
),
env.cluster().nodeById(1));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
.setGroups(Collections.emptyList())
),
env.cluster().nodeById(1));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(Arrays.asList(
new ListGroupsResponseData.ListedGroup()
.setGroupId("streams-group-2")
.setGroupType(GroupType.STREAMS.toString())
.setGroupState("Stable"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("streams-group-3")
.setGroupType(GroupType.STREAMS.toString())
.setGroupState("Stable")
))),
env.cluster().nodeById(1));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(singletonList(
new ListedGroup()
.setGroupId("streams-group-4")
.setGroupType(GroupType.STREAMS.toString())
.setGroupState("Stable")
))),
env.cluster().nodeById(2));
// fatal error
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setGroups(Collections.emptyList())),
env.cluster().nodeById(3));
final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS)));
TestUtils.assertFutureThrows(UnknownServerException.class, result.all());
Collection<GroupListing> listings = result.valid().get();
assertEquals(4, listings.size());
Set<String> groupIds = new HashSet<>();
for (GroupListing listing : listings) {
groupIds.add(listing.groupId());
assertTrue(listing.groupState().isPresent());
}
assertEquals(Set.of("streams-group-1", "streams-group-2", "streams-group-3", "streams-group-4"), groupIds);
assertEquals(1, result.errors().get().size());
}
}
@Test
public void testListStreamsGroupsMetadataFailure() throws Exception {
final Cluster cluster = mockCluster(3, 0);
final Time time = new MockTime();
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
AdminClientConfig.RETRIES_CONFIG, "0")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Empty metadata causes the request to fail since we have no list of brokers
// to send the ListGroups requests to
env.kafkaClient().prepareResponse(
RequestTestUtils.metadataResponse(
Collections.emptyList(),
env.cluster().clusterResource().clusterId(),
-1,
Collections.emptyList()));
final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS)));
TestUtils.assertFutureThrows(KafkaException.class, result.all());
}
}
@Test
public void testListStreamsGroupsWithStates() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(Arrays.asList(
new ListGroupsResponseData.ListedGroup()
.setGroupId("streams-group-1")
.setGroupType(GroupType.STREAMS.toString())
.setProtocolType("streams")
.setGroupState("Stable"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("streams-group-2")
.setGroupType(GroupType.STREAMS.toString())
.setProtocolType("streams")
.setGroupState("NotReady")))),
env.cluster().nodeById(0));
final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS)));
Collection<GroupListing> listings = result.valid().get();
assertEquals(2, listings.size());
List<GroupListing> expected = new ArrayList<>();
expected.add(new GroupListing("streams-group-1", Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE)));
expected.add(new GroupListing("streams-group-2", Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.NOT_READY)));
assertEquals(expected, listings);
assertEquals(0, result.errors().get().size());
}
}
@Test
public void testListStreamsGroupsWithStatesOlderBrokerVersion() {
ApiVersion listGroupV4 = new ApiVersion()
.setApiKey(ApiKeys.LIST_GROUPS.id)
.setMinVersion((short) 0)
.setMaxVersion((short) 4);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV4)));
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
// Check we should not be able to list streams groups with broker having version < 5
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(Collections.singletonList(
new ListGroupsResponseData.ListedGroup()
.setGroupId("streams-group-1")))),
env.cluster().nodeById(0));
ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS)));
TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all());
}
}
@Test @Test
public void testDescribeShareGroups() throws Exception { public void testDescribeShareGroups() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {