KAFKA-18090: Add ShareMemberDescription and Assignment (#17975)

Introduce ShareMemberDescription and ShareMemberAssignment as distinct classes for share groups. Although the correspondence with consumer groups is fairly close, the concepts are likely to diverge over time and separating these concepts now makes sense.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
Andrew Schofield 2024-11-29 04:50:01 +00:00 committed by GitHub
parent b8c8e0c713
commit e7bbcdb251
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 206 additions and 40 deletions

View File

@ -21,7 +21,7 @@ import java.util.Objects;
import java.util.Optional;
/**
* A detailed description of a single group instance in the cluster.
* A detailed description of a single group member in the cluster.
*/
public class MemberDescription {
private final String memberId;

View File

@ -35,20 +35,20 @@ import java.util.stream.Collectors;
@InterfaceStability.Evolving
public class ShareGroupDescription {
private final String groupId;
private final Collection<MemberDescription> members;
private final Collection<ShareMemberDescription> members;
private final GroupState groupState;
private final Node coordinator;
private final Set<AclOperation> authorizedOperations;
public ShareGroupDescription(String groupId,
Collection<MemberDescription> members,
Collection<ShareMemberDescription> members,
GroupState groupState,
Node coordinator) {
this(groupId, members, groupState, coordinator, Collections.emptySet());
}
public ShareGroupDescription(String groupId,
Collection<MemberDescription> members,
Collection<ShareMemberDescription> members,
GroupState groupState,
Node coordinator,
Set<AclOperation> authorizedOperations) {
@ -86,7 +86,7 @@ public class ShareGroupDescription {
/**
* A list of the members of the share group.
*/
public Collection<MemberDescription> members() {
public Collection<ShareMemberDescription> members() {
return members;
}
@ -114,7 +114,7 @@ public class ShareGroupDescription {
@Override
public String toString() {
return "(groupId=" + groupId +
", members=" + members.stream().map(MemberDescription::toString).collect(Collectors.joining(",")) +
", members=" + members.stream().map(ShareMemberDescription::toString).collect(Collectors.joining(",")) +
", groupState=" + groupState +
", coordinator=" + coordinator +
", authorizedOperations=" + authorizedOperations +

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
* A description of the assignments of a specific share group member.
*/
@InterfaceStability.Evolving
public class ShareMemberAssignment {
private final Set<TopicPartition> topicPartitions;
/**
* Creates an instance with the specified parameters.
*
* @param topicPartitions List of topic partitions
*/
public ShareMemberAssignment(Set<TopicPartition> topicPartitions) {
this.topicPartitions = topicPartitions == null ? Collections.emptySet() : Set.copyOf(topicPartitions);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShareMemberAssignment that = (ShareMemberAssignment) o;
return Objects.equals(topicPartitions, that.topicPartitions);
}
@Override
public int hashCode() {
return topicPartitions != null ? topicPartitions.hashCode() : 0;
}
/**
* The topic partitions assigned to a group member.
*/
public Set<TopicPartition> topicPartitions() {
return topicPartitions;
}
@Override
public String toString() {
return "(topicPartitions=" + topicPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",")) + ")";
}
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collections;
import java.util.Objects;
/**
* A detailed description of a single share group member in the cluster.
*/
@InterfaceStability.Evolving
public class ShareMemberDescription {
private final String memberId;
private final String clientId;
private final String host;
private final ShareMemberAssignment assignment;
public ShareMemberDescription(
String memberId,
String clientId,
String host,
ShareMemberAssignment assignment
) {
this.memberId = memberId == null ? "" : memberId;
this.clientId = clientId == null ? "" : clientId;
this.host = host == null ? "" : host;
this.assignment = assignment == null ?
new ShareMemberAssignment(Collections.emptySet()) : assignment;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShareMemberDescription that = (ShareMemberDescription) o;
return memberId.equals(that.memberId) &&
clientId.equals(that.clientId) &&
host.equals(that.host) &&
assignment.equals(that.assignment);
}
@Override
public int hashCode() {
return Objects.hash(memberId, clientId, host, assignment);
}
/**
* The consumer id of the group member.
*/
public String consumerId() {
return memberId;
}
/**
* The client id of the group member.
*/
public String clientId() {
return clientId;
}
/**
* The host where the group member is running.
*/
public String host() {
return host;
}
/**
* The assignment of the group member.
*/
public ShareMemberAssignment assignment() {
return assignment;
}
@Override
public String toString() {
return "(memberId=" + memberId +
", clientId=" + clientId +
", host=" + host +
", assignment=" + assignment + ")";
}
}

View File

@ -16,9 +16,9 @@
*/
package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.clients.admin.MemberAssignment;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.admin.ShareGroupDescription;
import org.apache.kafka.clients.admin.ShareMemberAssignment;
import org.apache.kafka.clients.admin.ShareMemberDescription;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
@ -114,15 +114,15 @@ public class DescribeShareGroupsHandler extends AdminApiHandler.Batched<Coordina
continue;
}
final List<MemberDescription> memberDescriptions = new ArrayList<>(describedGroup.members().size());
final List<ShareMemberDescription> memberDescriptions = new ArrayList<>(describedGroup.members().size());
final Set<AclOperation> authorizedOperations = validAclOperations(describedGroup.authorizedOperations());
describedGroup.members().forEach(groupMember ->
memberDescriptions.add(new MemberDescription(
memberDescriptions.add(new ShareMemberDescription(
groupMember.memberId(),
groupMember.clientId(),
groupMember.clientHost(),
new MemberAssignment(convertAssignment(groupMember.assignment()))
new ShareMemberAssignment(convertAssignment(groupMember.assignment()))
))
);

View File

@ -4974,11 +4974,11 @@ public class KafkaAdminClientTest {
expectedTopicPartitions.add(1, new TopicPartition("my_topic", 1));
expectedTopicPartitions.add(2, new TopicPartition("my_topic", 2));
List<MemberDescription> expectedMemberDescriptions = new ArrayList<>();
expectedMemberDescriptions.add(convertToMemberDescriptions(memberOne,
new MemberAssignment(new HashSet<>(expectedTopicPartitions))));
expectedMemberDescriptions.add(convertToMemberDescriptions(memberTwo,
new MemberAssignment(new HashSet<>(expectedTopicPartitions))));
List<ShareMemberDescription> expectedMemberDescriptions = new ArrayList<>();
expectedMemberDescriptions.add(convertToShareMemberDescriptions(memberOne,
new ShareMemberAssignment(new HashSet<>(expectedTopicPartitions))));
expectedMemberDescriptions.add(convertToShareMemberDescriptions(memberTwo,
new ShareMemberAssignment(new HashSet<>(expectedTopicPartitions))));
data.groups().add(new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId(GROUP_ID)
.setGroupState(GroupState.STABLE.toString())
@ -8549,13 +8549,12 @@ public class KafkaAdminClientTest {
assignment);
}
private static MemberDescription convertToMemberDescriptions(ShareGroupDescribeResponseData.Member member,
MemberAssignment assignment) {
return new MemberDescription(member.memberId(),
Optional.empty(),
member.clientId(),
member.clientHost(),
assignment);
private static ShareMemberDescription convertToShareMemberDescriptions(ShareGroupDescribeResponseData.Member member,
ShareMemberAssignment assignment) {
return new ShareMemberDescription(member.memberId(),
member.clientId(),
member.clientHost(),
assignment);
}
@Test

View File

@ -23,9 +23,9 @@ import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.ListGroupsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.ShareGroupDescription;
import org.apache.kafka.clients.admin.ShareMemberDescription;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.TopicPartition;
@ -36,7 +36,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -70,7 +69,7 @@ public class ShareGroupCommand {
}
public static void run(ShareGroupCommandOptions opts) {
try (ShareGroupService shareGroupService = new ShareGroupService(opts, Collections.emptyMap())) {
try (ShareGroupService shareGroupService = new ShareGroupService(opts, Map.of())) {
if (opts.options.has(opts.listOpt)) {
shareGroupService.listGroups();
} else if (opts.options.has(opts.describeOpt)) {
@ -128,7 +127,7 @@ public class ShareGroupCommand {
if (opts.options.has(opts.stateOpt)) {
String stateValue = opts.options.valueOf(opts.stateOpt);
Set<GroupState> states = (stateValue == null || stateValue.isEmpty())
? Collections.emptySet()
? Set.of()
: groupStatesFromString(stateValue);
List<GroupListing> listings = listShareGroupsInStates(states);
@ -201,7 +200,7 @@ public class ShareGroupCommand {
}
ShareGroupDescription getDescribeGroup(String group) throws ExecutionException, InterruptedException {
DescribeShareGroupsResult result = adminClient.describeShareGroups(Collections.singletonList(group));
DescribeShareGroupsResult result = adminClient.describeShareGroups(List.of(group));
Map<String, ShareGroupDescription> descriptionMap = result.all().get();
if (descriptionMap.containsKey(group)) {
return descriptionMap.get(group);
@ -209,9 +208,9 @@ public class ShareGroupCommand {
return null;
}
Map<TopicPartition, Long> getOffsets(Collection<MemberDescription> members) throws ExecutionException, InterruptedException {
Map<TopicPartition, Long> getOffsets(Collection<ShareMemberDescription> members) throws ExecutionException, InterruptedException {
Set<TopicPartition> allTp = new HashSet<>();
for (MemberDescription memberDescription : members) {
for (ShareMemberDescription memberDescription : members) {
allTp.addAll(memberDescription.assignment().topicPartitions());
}
// fetch latest and earliest offsets
@ -269,9 +268,9 @@ public class ShareGroupCommand {
private void printMembers(ShareGroupDescription description) {
int groupLen = Math.max(15, description.groupId().length());
int maxConsumerIdLen = 15, maxHostLen = 15, maxClientIdLen = 15;
Collection<MemberDescription> members = description.members();
Collection<ShareMemberDescription> members = description.members();
if (maybePrintEmptyGroupState(description.groupId(), description.groupState(), description.members().size())) {
for (MemberDescription member : members) {
for (ShareMemberDescription member : members) {
maxConsumerIdLen = Math.max(maxConsumerIdLen, member.consumerId().length());
maxHostLen = Math.max(maxHostLen, member.host().length());
maxClientIdLen = Math.max(maxClientIdLen, member.clientId().length());
@ -279,7 +278,7 @@ public class ShareGroupCommand {
String fmt = "%" + -groupLen + "s %" + -maxConsumerIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %s\n";
System.out.printf(fmt, "GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID", "ASSIGNMENT");
for (MemberDescription member : members) {
for (ShareMemberDescription member : members) {
System.out.printf(fmt, description.groupId(), member.consumerId(), member.host(), member.clientId(),
member.assignment().topicPartitions().stream().map(part -> part.topic() + ":" + part.partition()).collect(Collectors.joining(",")));
}

View File

@ -23,10 +23,10 @@ import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.ListGroupsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.MemberAssignment;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.ShareGroupDescription;
import org.apache.kafka.clients.admin.ShareMemberAssignment;
import org.apache.kafka.clients.admin.ShareMemberDescription;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaFuture;
@ -42,6 +42,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@ -92,8 +93,8 @@ public class ShareGroupCommandTest {
Map<String, ShareGroupDescription> resultMap = new HashMap<>();
ShareGroupDescription exp = new ShareGroupDescription(
firstGroup,
Collections.singletonList(new MemberDescription("memid1", "clId1", "host1", new MemberAssignment(
Collections.singleton(new TopicPartition("topic1", 0))
List.of(new ShareMemberDescription("memid1", "clId1", "host1", new ShareMemberAssignment(
Set.of(new TopicPartition("topic1", 0))
))),
GroupState.STABLE,
new Node(0, "host1", 9090));
@ -123,10 +124,10 @@ public class ShareGroupCommandTest {
when(adminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(startOffset, endOffset);
MemberDescription description = new MemberDescription("", "", "",
new MemberAssignment(Collections.singleton(new TopicPartition("topic1", 0))));
ShareMemberDescription description = new ShareMemberDescription("", "", "",
new ShareMemberAssignment(Set.of(new TopicPartition("topic1", 0))));
ShareGroupService service = new ShareGroupService(null, adminClient);
Map<TopicPartition, Long> lags = service.getOffsets(Collections.singletonList(description));
Map<TopicPartition, Long> lags = service.getOffsets(List.of(description));
assertEquals(1, lags.size());
assertEquals(20, lags.get(new TopicPartition("topic1", 0)));
service.close();