KAFKA-18761: [2/N] List share group offsets with state and auth (#19328)

This PR approaches completion of Admin.listShareGroupOffsets() and
kafka-share-groups.sh --describe --offsets.

Prior to this patch, kafka-share-groups.sh was only able to describe the
offsets for partitions which were assigned to active members. Now, the
Admin.listShareGroupOffsets() uses the persister's knowledge of the
share-partitions which have initialised state. Then, it uses this list
to obtain a complete set of offset information.

The PR also implements the topic-based authorisation checking. If
Admin.listShareGroupOffsets() is called with a list of topic-partitions
specified, the authz checking is performed on the supplied list,
returning errors for any topics to which the client is not authorised.
If Admin.listShareGroupOffsets() is called without a list of
topic-partitions specified, the list of topics is discovered from the
persister as described above, and then the response is filtered down to
only show the topics to which the client is authorised. This is
consistent with other similar RPCs in the Kafka protocol, such as
OffsetFetch.

Reviewers: David Arthur <mumrah@gmail.com>, Sushant Mahajan <smahajan@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
Andrew Schofield 2025-04-04 13:25:19 +01:00 committed by GitHub
parent 98c0f3024d
commit d4d9f11816
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 917 additions and 79 deletions

View File

@ -21,7 +21,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -47,9 +46,10 @@ public class ListShareGroupOffsetsSpec {
/**
* Returns the topic partitions whose offsets are to be listed for a share group.
* {@code null} indicates that offsets of all partitions of the group are to be listed.
*/
public Collection<TopicPartition> topicPartitions() {
return topicPartitions == null ? List.of() : topicPartitions;
return topicPartitions;
}
@Override

View File

@ -37,7 +37,6 @@ import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -84,19 +83,23 @@ public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched<Coordi
DescribeShareGroupOffsetsRequestGroup requestGroup = new DescribeShareGroupOffsetsRequestGroup()
.setGroupId(groupId);
Map<String, List<Integer>> topicPartitionMap = new HashMap<>();
spec.topicPartitions().forEach(tp -> topicPartitionMap.computeIfAbsent(tp.topic(), t -> new LinkedList<>()).add(tp.partition()));
if (spec.topicPartitions() != null) {
Map<String, List<Integer>> topicPartitionMap = new HashMap<>();
spec.topicPartitions().forEach(tp -> topicPartitionMap.computeIfAbsent(tp.topic(), t -> new ArrayList<>()).add(tp.partition()));
Map<String, DescribeShareGroupOffsetsRequestTopic> requestTopics = new HashMap<>();
for (TopicPartition tp : spec.topicPartitions()) {
requestTopics.computeIfAbsent(tp.topic(), t ->
new DescribeShareGroupOffsetsRequestTopic()
.setTopicName(tp.topic())
.setPartitions(new LinkedList<>()))
.partitions()
.add(tp.partition());
Map<String, DescribeShareGroupOffsetsRequestTopic> requestTopics = new HashMap<>();
for (TopicPartition tp : spec.topicPartitions()) {
requestTopics.computeIfAbsent(tp.topic(), t ->
new DescribeShareGroupOffsetsRequestTopic()
.setTopicName(tp.topic())
.setPartitions(new ArrayList<>()))
.partitions()
.add(tp.partition());
}
requestGroup.setTopics(new ArrayList<>(requestTopics.values()));
} else {
requestGroup.setTopics(null);
}
requestGroup.setTopics(new ArrayList<>(requestTopics.values()));
groups.add(requestGroup);
});
DescribeShareGroupOffsetsRequestData data = new DescribeShareGroupOffsetsRequestData()

View File

@ -26,8 +26,8 @@
"about": "The groups to describe offsets for.", "fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The group identifier." },
{ "name": "Topics", "type": "[]DescribeShareGroupOffsetsRequestTopic", "versions": "0+",
"about": "The topics to describe offsets for.", "fields": [
{ "name": "Topics", "type": "[]DescribeShareGroupOffsetsRequestTopic", "versions": "0+", "nullableVersions": "0+",
"about": "The topics to describe offsets for, or null for all topic-partitions.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",

View File

@ -10559,9 +10559,7 @@ public class KafkaAdminClientTest {
TopicPartition myTopicPartition4 = new TopicPartition("my_topic_1", 4);
TopicPartition myTopicPartition5 = new TopicPartition("my_topic_2", 6);
ListShareGroupOffsetsSpec groupSpec = new ListShareGroupOffsetsSpec().topicPartitions(
List.of(myTopicPartition0, myTopicPartition1, myTopicPartition2, myTopicPartition3, myTopicPartition4, myTopicPartition5)
);
ListShareGroupOffsetsSpec groupSpec = new ListShareGroupOffsetsSpec();
Map<String, ListShareGroupOffsetsSpec> groupSpecs = new HashMap<>();
groupSpecs.put(GROUP_ID, groupSpec);

View File

@ -3508,6 +3508,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]](groups.size)
groups.forEach { groupDescribeOffsets =>
val isAllPartitions = groupDescribeOffsets.topics == null
if (!isShareGroupProtocolEnabled) {
futures += CompletableFuture.completedFuture(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setGroupId(groupDescribeOffsets.groupId)
@ -3516,6 +3517,11 @@ class KafkaApis(val requestChannel: RequestChannel,
futures += CompletableFuture.completedFuture(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setGroupId(groupDescribeOffsets.groupId)
.setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code))
} else if (isAllPartitions) {
futures += describeShareGroupAllOffsetsForGroup(
request.context,
groupDescribeOffsets
)
} else if (groupDescribeOffsets.topics.isEmpty) {
futures += CompletableFuture.completedFuture(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setGroupId(groupDescribeOffsets.groupId))
@ -3535,19 +3541,76 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
private def describeShareGroupOffsetsForGroup(requestContext: RequestContext,
private def describeShareGroupAllOffsetsForGroup(requestContext: RequestContext,
groupDescribeOffsetsRequest: DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
): CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup] = {
groupCoordinator.describeShareGroupOffsets(
groupCoordinator.describeShareGroupAllOffsets(
requestContext,
groupDescribeOffsetsRequest
).handle[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup] { (groupDescribeOffsetsResponse, exception) =>
if (exception != null) {
val error = Errors.forException(exception)
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setGroupId(groupDescribeOffsetsRequest.groupId)
.setErrorCode(Errors.forException(exception).code)
.setErrorCode(error.code)
.setErrorMessage(error.message)
} else {
// Clients are not allowed to see offsets for topics that are not authorized for Describe.
val (authorizedOffsets, _) = authHelper.partitionSeqByAuthorized(
requestContext,
DESCRIBE,
TOPIC,
groupDescribeOffsetsResponse.topics.asScala
)(_.topicName)
groupDescribeOffsetsResponse.setTopics(authorizedOffsets.asJava)
}
}
}
private def describeShareGroupOffsetsForGroup(requestContext: RequestContext,
groupDescribeOffsetsRequest: DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
): CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup] = {
// Clients are not allowed to see offsets for topics that are not authorized for Describe.
val (authorizedTopics, unauthorizedTopics) = authHelper.partitionSeqByAuthorized(
requestContext,
DESCRIBE,
TOPIC,
groupDescribeOffsetsRequest.topics.asScala
)(_.topicName)
groupCoordinator.describeShareGroupOffsets(
requestContext,
new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
.setGroupId(groupDescribeOffsetsRequest.groupId)
.setTopics(authorizedTopics.asJava)
).handle[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup] { (groupDescribeOffsetsResponse, exception) =>
if (exception != null) {
val error = Errors.forException(exception)
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setGroupId(groupDescribeOffsetsRequest.groupId)
.setErrorCode(error.code)
.setErrorMessage(error.message)
} else if (groupDescribeOffsetsResponse.errorCode() != Errors.NONE.code) {
groupDescribeOffsetsResponse
} else {
val topics = new util.ArrayList[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic](
groupDescribeOffsetsResponse.topics.size + unauthorizedTopics.size
)
topics.addAll(groupDescribeOffsetsResponse.topics)
unauthorizedTopics.foreach { topic =>
val topicResponse = new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
.setTopicName(topic.topicName)
.setTopicId(Uuid.ZERO_UUID)
topic.partitions().forEach { partitionIndex =>
topicResponse.partitions.add(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partitionIndex)
.setStartOffset(-1)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
.setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message))
}
topics.add(topicResponse)
}
groupDescribeOffsetsResponse.setTopics(topics)
}
}
}

View File

@ -11134,7 +11134,7 @@ class KafkaApisTest extends Logging {
}
@Test
def testDescribeShareGroupOffsetsRequestsAuthorizationFailed(): Unit = {
def testDescribeShareGroupOffsetsRequestGroupAuthorizationFailed(): Unit = {
val describeShareGroupOffsetsRequest = new DescribeShareGroupOffsetsRequestData().setGroups(
util.List.of(new DescribeShareGroupOffsetsRequestGroup().setGroupId("group").setTopics(
util.List.of(new DescribeShareGroupOffsetsRequestTopic().setTopicName("topic-1").setPartitions(util.List.of(1)))
@ -11163,6 +11163,34 @@ class KafkaApisTest extends Logging {
)
}
@Test
def testDescribeShareGroupAllOffsetsRequestGroupAuthorizationFailed(): Unit = {
val describeShareGroupOffsetsRequest = new DescribeShareGroupOffsetsRequestData().setGroups(
util.List.of(new DescribeShareGroupOffsetsRequestGroup().setGroupId("group").setTopics(null))
)
val requestChannelRequest = buildRequest(new DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, true).build)
val authorizer: Authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
.thenReturn(util.List.of(AuthorizationResult.DENIED))
metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(
overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
authorizer = Some(authorizer),
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val response = verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
response.data.groups.forEach(
group => group.topics.forEach(
topic => topic.partitions.forEach(
partition => assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, partition.errorCode)
)
)
)
}
@Test
def testDescribeShareGroupOffsetsRequestSuccess(): Unit = {
val topicName1 = "topic-1"
@ -11279,6 +11307,453 @@ class KafkaApisTest extends Logging {
assertEquals(describeShareGroupOffsetsResponse, response.data)
}
@Test
def testDescribeShareGroupOffsetsRequestTopicAuthorizationFailed(): Unit = {
val topicName1 = "topic-1"
val topicId1 = Uuid.randomUuid
val topicName2 = "topic-2"
val topicId2 = Uuid.randomUuid
val topicName3 = "topic-3"
val topicId3 = Uuid.randomUuid
metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
addTopicToMetadataCache(topicName1, 1, topicId = topicId1)
addTopicToMetadataCache(topicName2, 1, topicId = topicId2)
addTopicToMetadataCache(topicName3, 1, topicId = topicId3)
val describeShareGroupOffsetsRequestGroup1 = new DescribeShareGroupOffsetsRequestGroup().setGroupId("group1").setTopics(
util.List.of(
new DescribeShareGroupOffsetsRequestTopic().setTopicName(topicName1).setPartitions(util.List.of(1, 2, 3)),
new DescribeShareGroupOffsetsRequestTopic().setTopicName(topicName2).setPartitions(util.List.of(10, 20)),
)
)
val describeShareGroupOffsetsRequestGroup2 = new DescribeShareGroupOffsetsRequestGroup().setGroupId("group2").setTopics(
util.List.of(
new DescribeShareGroupOffsetsRequestTopic().setTopicName(topicName3).setPartitions(util.List.of(0)),
)
)
val describeShareGroupOffsetsRequest = new DescribeShareGroupOffsetsRequestData()
.setGroups(util.List.of(describeShareGroupOffsetsRequestGroup1, describeShareGroupOffsetsRequestGroup2))
val requestChannelRequest = buildRequest(new DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, true).build)
// The group coordinator will only be asked for information about topics which are authorized
val futureGroup1 = new CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
when(groupCoordinator.describeShareGroupOffsets(
requestChannelRequest.context,
new DescribeShareGroupOffsetsRequestGroup().setGroupId("group1").setTopics(
util.List.of(
new DescribeShareGroupOffsetsRequestTopic().setTopicName(topicName1).setPartitions(util.List.of(1, 2, 3)),
)
)
)).thenReturn(futureGroup1)
val futureGroup2 = new CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
when(groupCoordinator.describeShareGroupOffsets(
requestChannelRequest.context,
new DescribeShareGroupOffsetsRequestGroup().setGroupId("group2").setTopics(
util.List.of(
)
)
)).thenReturn(futureGroup2)
val authorizer: Authorizer = mock(classOf[Authorizer])
val acls = Map(
"group1" -> AuthorizationResult.ALLOWED,
"group2" -> AuthorizationResult.ALLOWED,
topicName1 -> AuthorizationResult.ALLOWED,
topicName2 -> AuthorizationResult.DENIED,
topicName3 -> AuthorizationResult.DENIED
)
when(authorizer.authorize(
any[RequestContext],
any[util.List[Action]]
)).thenAnswer { invocation =>
val actions = invocation.getArgument(1, classOf[util.List[Action]])
actions.asScala.map { action =>
acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
}.asJava
}
kafkaApis = createKafkaApis(
overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
authorizer = Some(authorizer)
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
// These are the responses to the KafkaApis request, complete with authorization errors
val describeShareGroupOffsetsResponseGroup1 = new DescribeShareGroupOffsetsResponseGroup()
.setGroupId("group1")
.setTopics(util.List.of(
new DescribeShareGroupOffsetsResponseTopic()
.setTopicName(topicName1)
.setTopicId(topicId1)
.setPartitions(util.List.of(
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setStartOffset(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(2)
.setStartOffset(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(3)
.setStartOffset(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
)),
new DescribeShareGroupOffsetsResponseTopic()
.setTopicName(topicName2)
.setTopicId(Uuid.ZERO_UUID)
.setPartitions(util.List.of(
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(10)
.setStartOffset(-1)
.setLeaderEpoch(0)
.setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(20)
.setStartOffset(-1)
.setLeaderEpoch(0)
.setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
))
))
val describeShareGroupOffsetsResponseGroup2 = new DescribeShareGroupOffsetsResponseGroup()
.setGroupId("group2")
.setTopics(util.List.of(
new DescribeShareGroupOffsetsResponseTopic()
.setTopicName(topicName3)
.setTopicId(Uuid.ZERO_UUID)
.setPartitions(util.List.of(
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setStartOffset(-1)
.setLeaderEpoch(0)
.setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
))
))
val describeShareGroupOffsetsResponse = new DescribeShareGroupOffsetsResponseData()
.setGroups(util.List.of(describeShareGroupOffsetsResponseGroup1, describeShareGroupOffsetsResponseGroup2))
// And these are the responses to the topics which were authorized
val describeShareGroupOffsetsGroupCoordinatorResponseGroup1 = new DescribeShareGroupOffsetsResponseGroup()
.setGroupId("group1")
.setTopics(util.List.of(
new DescribeShareGroupOffsetsResponseTopic()
.setTopicName(topicName1)
.setTopicId(topicId1)
.setPartitions(util.List.of(
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setStartOffset(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(2)
.setStartOffset(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(3)
.setStartOffset(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
))
))
val describeShareGroupOffsetsGroupCoordinatorResponseGroup2 = new DescribeShareGroupOffsetsResponseGroup()
.setGroupId("group2")
.setTopics(util.List.of())
futureGroup1.complete(describeShareGroupOffsetsGroupCoordinatorResponseGroup1)
futureGroup2.complete(describeShareGroupOffsetsGroupCoordinatorResponseGroup2)
val response = verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
assertEquals(describeShareGroupOffsetsResponse, response.data)
}
@Test
def testDescribeShareGroupAllOffsetsRequestTopicAuthorizationFailed(): Unit = {
val topicName1 = "topic-1"
val topicId1 = Uuid.randomUuid
val topicName2 = "topic-2"
val topicId2 = Uuid.randomUuid
val topicName3 = "topic-3"
val topicId3 = Uuid.randomUuid
metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
addTopicToMetadataCache(topicName1, 1, topicId = topicId1)
addTopicToMetadataCache(topicName2, 1, topicId = topicId2)
addTopicToMetadataCache(topicName3, 1, topicId = topicId3)
val describeShareGroupOffsetsRequestGroup1 = new DescribeShareGroupOffsetsRequestGroup().setGroupId("group1").setTopics(null)
val describeShareGroupOffsetsRequestGroup2 = new DescribeShareGroupOffsetsRequestGroup().setGroupId("group2").setTopics(null)
val describeShareGroupOffsetsRequest = new DescribeShareGroupOffsetsRequestData()
.setGroups(util.List.of(describeShareGroupOffsetsRequestGroup1, describeShareGroupOffsetsRequestGroup2))
val requestChannelRequest = buildRequest(new DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, true).build)
// The group coordinator is being asked for information about all topics, not just those which are authorized
val futureGroup1 = new CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
when(groupCoordinator.describeShareGroupAllOffsets(
requestChannelRequest.context,
new DescribeShareGroupOffsetsRequestGroup().setGroupId("group1").setTopics(null)
)).thenReturn(futureGroup1)
val futureGroup2 = new CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
when(groupCoordinator.describeShareGroupAllOffsets(
requestChannelRequest.context,
new DescribeShareGroupOffsetsRequestGroup().setGroupId("group2").setTopics(null)
)).thenReturn(futureGroup2)
val authorizer: Authorizer = mock(classOf[Authorizer])
val acls = Map(
"group1" -> AuthorizationResult.ALLOWED,
"group2" -> AuthorizationResult.ALLOWED,
topicName1 -> AuthorizationResult.ALLOWED,
topicName2 -> AuthorizationResult.DENIED,
topicName3 -> AuthorizationResult.DENIED
)
when(authorizer.authorize(
any[RequestContext],
any[util.List[Action]]
)).thenAnswer { invocation =>
val actions = invocation.getArgument(1, classOf[util.List[Action]])
actions.asScala.map { action =>
acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
}.asJava
}
kafkaApis = createKafkaApis(
overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
authorizer = Some(authorizer)
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
// These are the responses to the KafkaApis request, with unauthorized topics filtered out
val describeShareGroupOffsetsResponseGroup1 = new DescribeShareGroupOffsetsResponseGroup()
.setGroupId("group1")
.setTopics(util.List.of(
new DescribeShareGroupOffsetsResponseTopic()
.setTopicName(topicName1)
.setTopicId(topicId1)
.setPartitions(util.List.of(
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setStartOffset(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(2)
.setStartOffset(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(3)
.setStartOffset(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
))
))
val describeShareGroupOffsetsResponseGroup2 = new DescribeShareGroupOffsetsResponseGroup()
.setGroupId("group2")
.setTopics(util.List.of())
// And these are the responses from the group coordinator for all topics, even those which are not authorized
val describeShareGroupOffsetsGroupCoordinatorResponseGroup1 = new DescribeShareGroupOffsetsResponseGroup()
.setGroupId("group1")
.setTopics(util.List.of(
new DescribeShareGroupOffsetsResponseTopic()
.setTopicName(topicName1)
.setTopicId(topicId1)
.setPartitions(util.List.of(
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setStartOffset(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(2)
.setStartOffset(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(3)
.setStartOffset(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
)),
new DescribeShareGroupOffsetsResponseTopic()
.setTopicName(topicName2)
.setTopicId(topicId2)
.setPartitions(util.List.of(
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(10)
.setStartOffset(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(20)
.setStartOffset(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
))
))
val describeShareGroupOffsetsGroupCoordinatorResponseGroup2 = new DescribeShareGroupOffsetsResponseGroup()
.setGroupId("group2")
.setTopics(util.List.of(
new DescribeShareGroupOffsetsResponseTopic()
.setTopicName(topicName3)
.setTopicId(topicId3)
.setPartitions(util.List.of(
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setStartOffset(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
))
))
val describeShareGroupOffsetsResponse = new DescribeShareGroupOffsetsResponseData()
.setGroups(util.List.of(describeShareGroupOffsetsResponseGroup1, describeShareGroupOffsetsResponseGroup2))
futureGroup1.complete(describeShareGroupOffsetsGroupCoordinatorResponseGroup1)
futureGroup2.complete(describeShareGroupOffsetsGroupCoordinatorResponseGroup2)
val response = verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
assertEquals(describeShareGroupOffsetsResponse, response.data)
}
@Test
def testDescribeShareGroupAllOffsetsRequestSuccess(): Unit = {
val topicName1 = "topic-1"
val topicId1 = Uuid.randomUuid
val topicName2 = "topic-2"
val topicId2 = Uuid.randomUuid
val topicName3 = "topic-3"
val topicId3 = Uuid.randomUuid
metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
addTopicToMetadataCache(topicName1, 1, topicId = topicId1)
addTopicToMetadataCache(topicName2, 1, topicId = topicId2)
addTopicToMetadataCache(topicName3, 1, topicId = topicId3)
val describeShareGroupOffsetsRequestGroup1 = new DescribeShareGroupOffsetsRequestGroup().setGroupId("group1").setTopics(null)
val describeShareGroupOffsetsRequestGroup2 = new DescribeShareGroupOffsetsRequestGroup().setGroupId("group2").setTopics(null)
val describeShareGroupOffsetsRequest = new DescribeShareGroupOffsetsRequestData()
.setGroups(util.List.of(describeShareGroupOffsetsRequestGroup1, describeShareGroupOffsetsRequestGroup2))
val requestChannelRequest = buildRequest(new DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, true).build)
val futureGroup1 = new CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
when(groupCoordinator.describeShareGroupAllOffsets(
requestChannelRequest.context,
describeShareGroupOffsetsRequestGroup1
)).thenReturn(futureGroup1)
val futureGroup2 = new CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
when(groupCoordinator.describeShareGroupAllOffsets(
requestChannelRequest.context,
describeShareGroupOffsetsRequestGroup2
)).thenReturn(futureGroup2)
kafkaApis = createKafkaApis(
overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val describeShareGroupOffsetsResponseGroup1 = new DescribeShareGroupOffsetsResponseGroup()
.setGroupId("group1")
.setTopics(util.List.of(
new DescribeShareGroupOffsetsResponseTopic()
.setTopicName(topicName1)
.setTopicId(topicId1)
.setPartitions(util.List.of(
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setStartOffset(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(2)
.setStartOffset(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(3)
.setStartOffset(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
)),
new DescribeShareGroupOffsetsResponseTopic()
.setTopicName(topicName2)
.setTopicId(topicId2)
.setPartitions(util.List.of(
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(10)
.setStartOffset(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(20)
.setStartOffset(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
))
))
val describeShareGroupOffsetsResponseGroup2 = new DescribeShareGroupOffsetsResponseGroup()
.setGroupId("group2")
.setTopics(util.List.of(
new DescribeShareGroupOffsetsResponseTopic()
.setTopicName(topicName3)
.setTopicId(topicId3)
.setPartitions(util.List.of(
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setStartOffset(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
))
))
val describeShareGroupOffsetsResponse = new DescribeShareGroupOffsetsResponseData()
.setGroups(util.List.of(describeShareGroupOffsetsResponseGroup1, describeShareGroupOffsetsResponseGroup2))
futureGroup1.complete(describeShareGroupOffsetsResponseGroup1)
futureGroup2.complete(describeShareGroupOffsetsResponseGroup2)
val response = verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
assertEquals(describeShareGroupOffsetsResponse, response.data)
}
@Test
def testDescribeShareGroupOffsetsRequestEmptyGroupsSuccess(): Unit = {
metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)

View File

@ -297,6 +297,20 @@ public interface GroupCoordinator {
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup request
);
/**
* Describe all Share Group Offsets for a given group.
*
* @param context The request context
* @param request The DescribeShareGroupOffsetsRequestGroup request.
*
* @return A future yielding the results.
* The error codes of the response are set to indicate the errors occurred during the execution.
*/
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> describeShareGroupAllOffsets(
RequestContext context,
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup request
);
/**
* Commit offsets for a given Group.
*

View File

@ -1386,13 +1386,16 @@ public class GroupCoordinatorService implements GroupCoordinator {
).toList()
));
} else {
// If the topic does not exist, the start offset is returned as -1 (uninitialized offset).
// This is consistent with OffsetFetch for situations in which there is no offset information to fetch.
// It's treated as absence of data, rather than an error, unlike TOPIC_AUTHORIZATION_ERROR for example.
describeShareGroupOffsetsResponseTopicList.add(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
.setTopicName(topic.topicName())
.setTopicId(Uuid.ZERO_UUID)
.setPartitions(topic.partitions().stream().map(
partition -> new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
).toList()));
}
});
@ -1408,6 +1411,67 @@ public class GroupCoordinatorService implements GroupCoordinator {
ReadShareGroupStateSummaryRequestData readSummaryRequestData = new ReadShareGroupStateSummaryRequestData()
.setGroupId(requestData.groupId())
.setTopics(readStateSummaryData);
return readShareGroupStateSummary(readSummaryRequestData, requestTopicIdToNameMapping, describeShareGroupOffsetsResponseTopicList);
}
/**
* See {@link GroupCoordinator#describeShareGroupAllOffsets(RequestContext, DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup)}.
*/
@Override
public CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> describeShareGroupAllOffsets(
RequestContext context,
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData
) {
if (!isActive.get()) {
return CompletableFuture.completedFuture(
DescribeShareGroupOffsetsRequest.getErrorDescribedGroup(requestData.groupId(), Errors.COORDINATOR_NOT_AVAILABLE));
}
if (metadataImage == null) {
return CompletableFuture.completedFuture(
DescribeShareGroupOffsetsRequest.getErrorDescribedGroup(requestData.groupId(), Errors.COORDINATOR_NOT_AVAILABLE));
}
return runtime.scheduleReadOperation(
"share-group-initialized-partitions",
topicPartitionFor(requestData.groupId()),
(coordinator, offset) -> coordinator.initializedShareGroupPartitions(requestData.groupId())
).thenCompose(topicPartitionMap -> {
Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>();
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic> describeShareGroupOffsetsResponseTopicList = new ArrayList<>(topicPartitionMap.size());
ReadShareGroupStateSummaryRequestData readSummaryRequestData = new ReadShareGroupStateSummaryRequestData()
.setGroupId(requestData.groupId());
topicPartitionMap.forEach((topicId, partitionSet) -> {
String topicName = metadataImage.topics().topicIdToNameView().get(topicId);
if (topicName != null) {
requestTopicIdToNameMapping.put(topicId, topicName);
readSummaryRequestData.topics().add(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(topicId)
.setPartitions(
partitionSet.stream().map(
partitionIndex -> new ReadShareGroupStateSummaryRequestData.PartitionData().setPartition(partitionIndex)
).toList()
));
}
});
return readShareGroupStateSummary(readSummaryRequestData, requestTopicIdToNameMapping, describeShareGroupOffsetsResponseTopicList);
});
}
private CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> readShareGroupStateSummary(
ReadShareGroupStateSummaryRequestData readSummaryRequestData,
Map<Uuid, String> requestTopicIdToNameMapping,
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic> describeShareGroupOffsetsResponseTopicList
) {
// If the request for the persister is empty, just complete the operation right away.
if (readSummaryRequestData.topics().isEmpty()) {
return CompletableFuture.completedFuture(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setGroupId(readSummaryRequestData.groupId())
.setTopics(describeShareGroupOffsetsResponseTopicList));
}
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> future = new CompletableFuture<>();
persister.readSummary(ReadShareGroupStateSummaryParameters.from(readSummaryRequestData))
.whenComplete((result, error) -> {
@ -1421,6 +1485,10 @@ public class GroupCoordinatorService implements GroupCoordinator {
future.completeExceptionally(new IllegalStateException("Result is null for the read state summary"));
return;
}
// Return -1 (uninitialized offset) for the situation where the persister returned an error.
// This is consistent with OffsetFetch for situations in which there is no offset information to fetch.
// It's treated as absence of data, rather than an error.
result.topicsData().forEach(topicData ->
describeShareGroupOffsetsResponseTopicList.add(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
.setTopicId(topicData.topicId())
@ -1428,15 +1496,13 @@ public class GroupCoordinatorService implements GroupCoordinator {
.setPartitions(topicData.partitions().stream().map(
partitionData -> new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partitionData.partition())
.setStartOffset(partitionData.startOffset())
.setErrorMessage(Errors.forCode(partitionData.errorCode()).message())
.setErrorCode(partitionData.errorCode())
.setStartOffset(partitionData.errorCode() == Errors.NONE.code() ? partitionData.startOffset() : PartitionFactory.UNINITIALIZED_START_OFFSET)
).toList())
));
future.complete(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setGroupId(requestData.groupId())
.setGroupId(readSummaryRequestData.groupId())
.setTopics(describeShareGroupOffsetsResponseTopicList));
});
return future;

View File

@ -400,7 +400,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
* @param context The request context.
* @param request The actual StreamsGroupHeartbeat request.
*
* @return A result containing the StreamsGroupHeartbeat response, a list of internal topics to be created and
* @return A Result containing the StreamsGroupHeartbeat response, a list of internal topics to be created and
* a list of records to update the state machine.
*/
public CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupHeartbeat(
@ -466,6 +466,19 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
return groupMetadataManager.reconcileShareGroupStateInitializingState(offset);
}
/**
* Returns the set of share-partitions whose share-group state has been initialized in the persister.
*
* @param groupId The group id corresponding to the share group whose share partitions have been initialized.
*
* @return A map representing the initialized share-partitions for the share group.
*/
public Map<Uuid, Set<Integer>> initializedShareGroupPartitions(
String groupId
) {
return groupMetadataManager.initializedShareGroupPartitions(groupId);
}
/**
* Handles a JoinGroup request.
*

View File

@ -3037,7 +3037,7 @@ public class GroupMetadataManager {
}
return member;
}
/**
* Gets or subscribes a static consumer group member. This method also replaces the
* previous static member if allowed.
@ -4115,7 +4115,8 @@ public class GroupMetadataManager {
return shareGroupFenceMember(group, member, response);
}
/** Fences a member from a consumer group and maybe downgrade the consumer group to a classic group.
/**
* Fences a member from a consumer group and maybe downgrade the consumer group to a classic group.
*
* @param group The group.
* @param member The member.
@ -5028,7 +5029,7 @@ public class GroupMetadataManager {
}
private Map<Uuid, Map.Entry<String, Set<Integer>>> attachTopicName(Map<Uuid, Set<Integer>> initMap) {
TopicsImage topicsImage = metadataImage.topics();
TopicsImage topicsImage = metadataImage.topics();
Map<Uuid, Map.Entry<String, Set<Integer>>> finalMap = new HashMap<>();
for (Map.Entry<Uuid, Set<Integer>> entry : initMap.entrySet()) {
Uuid topicId = entry.getKey();
@ -5038,6 +5039,28 @@ public class GroupMetadataManager {
return Collections.unmodifiableMap(finalMap);
}
/**
* Returns the set of share partitions whose state has been initialized.
*
* @param groupId The group id corresponding to the share group whose share partitions have been initialized.
*
* @return A map representing the initialized share-partitions for the share group.
*/
public Map<Uuid, Set<Integer>> initializedShareGroupPartitions(
String groupId
) {
Map<Uuid, Set<Integer>> resultMap = new HashMap<>();
ShareGroupStatePartitionMetadataInfo currentMap = shareGroupPartitionMetadata.get(groupId);
if (currentMap != null) {
currentMap.initializedTopics().forEach((topicId, partitions) -> {
resultMap.put(topicId, new HashSet<>(partitions));
});
}
return resultMap;
}
/**
* Replays ConsumerGroupMemberMetadataKey/Value to update the hard state of
* the consumer group. It updates the subscription part of the member or
@ -8289,7 +8312,7 @@ public class GroupMetadataManager {
private Map<String, String> streamsGroupAssignmentConfigs(String groupId) {
return Map.of("group.streams.num.standby.replicas", "0");
}
/**
* Generate a classic group heartbeat key for the timer.
*

View File

@ -122,6 +122,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
@ -2845,9 +2846,7 @@ public class GroupCoordinatorServiceTest {
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
.setErrorCode(PartitionFactory.DEFAULT_ERROR_CODE)
.setErrorMessage(PartitionFactory.DEFAULT_ERR_MESSAGE))))
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET))))
);
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> future =
@ -2890,9 +2889,7 @@ public class GroupCoordinatorServiceTest {
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setStartOffset(21)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(Errors.NONE.message()))))
.setStartOffset(21))))
);
ReadShareGroupStateSummaryResponseData readShareGroupStateSummaryResponseData = new ReadShareGroupStateSummaryResponseData()
@ -2902,9 +2899,7 @@ public class GroupCoordinatorServiceTest {
.setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(partition)
.setStartOffset(21)
.setStateEpoch(1)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(Errors.NONE.message())))
.setStateEpoch(1)))
)
);
@ -2944,10 +2939,10 @@ public class GroupCoordinatorServiceTest {
.setTopics(
List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
.setTopicName("badtopic")
.setTopicId(Uuid.ZERO_UUID)
.setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()))))
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET))))
);
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> future =
@ -3097,6 +3092,196 @@ public class GroupCoordinatorServiceTest {
assertEquals(responseData, future.get());
}
@Test
public void testDescribeShareGroupAllOffsets() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
MetadataImage image = new MetadataImageBuilder()
.addTopic(TOPIC_ID, TOPIC_NAME, 3)
.build();
service.onNewMetadataImage(image, null);
int partition = 1;
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-initialized-partitions"),
ArgumentMatchers.any(),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(Map.of(TOPIC_ID, Set.of(partition))));
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
.setGroupId("share-group-id")
.setTopics(null);
ReadShareGroupStateSummaryRequestData readShareGroupStateSummaryRequestData = new ReadShareGroupStateSummaryRequestData()
.setGroupId("share-group-id")
.setTopics(List.of(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartition(partition)))));
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup responseData = new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setGroupId("share-group-id")
.setTopics(
List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
.setTopicName(TOPIC_NAME)
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setStartOffset(21))))
);
ReadShareGroupStateSummaryResponseData readShareGroupStateSummaryResponseData = new ReadShareGroupStateSummaryResponseData()
.setResults(
List.of(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(partition)
.setStartOffset(21)
.setStateEpoch(1)))
)
);
ReadShareGroupStateSummaryParameters readShareGroupStateSummaryParameters = ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData);
ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult = ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData);
when(persister.readSummary(
ArgumentMatchers.eq(readShareGroupStateSummaryParameters)
)).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> future =
service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testDescribeShareGroupAllOffsetsThrowsError() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
MetadataImage image = new MetadataImageBuilder()
.addTopic(TOPIC_ID, TOPIC_NAME, 3)
.build();
service.onNewMetadataImage(image, null);
int partition = 1;
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-initialized-partitions"),
ArgumentMatchers.any(),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(Map.of(TOPIC_ID, Set.of(partition))));
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
.setGroupId("share-group-id")
.setTopics(null);
when(persister.readSummary(ArgumentMatchers.any()))
.thenReturn(CompletableFuture.failedFuture(new Exception("Unable to validate read state summary request")));
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> future =
service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData);
assertFutureThrows(Exception.class, future, "Unable to validate read state summary request");
}
@Test
public void testDescribeShareGroupAllOffsetsNullResult() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
MetadataImage image = new MetadataImageBuilder()
.addTopic(TOPIC_ID, TOPIC_NAME, 3)
.build();
service.onNewMetadataImage(image, null);
int partition = 1;
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-initialized-partitions"),
ArgumentMatchers.any(),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(Map.of(TOPIC_ID, Set.of(partition))));
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
.setGroupId("share-group-id")
.setTopics(null);
when(persister.readSummary(ArgumentMatchers.any()))
.thenReturn(CompletableFuture.completedFuture(null));
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> future =
service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData);
assertFutureThrows(IllegalStateException.class, future, "Result is null for the read state summary");
}
@Test
public void testDescribeShareGroupAllOffsetsCoordinatorNotActive() throws ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.build();
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
.setGroupId("share-group-id")
.setTopics(null);
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup responseData = new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setGroupId("share-group-id")
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message());
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> future =
service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testDescribeShareGroupAllOffsetsMetadataImageNull() throws ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.build(true);
// Forcing a null Metadata Image
service.onNewMetadataImage(null, null);
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
.setGroupId("share-group-id")
.setTopics(null);
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup responseData = new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setGroupId("share-group-id")
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message());
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> future =
service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testPersisterInitializeSuccess() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();

View File

@ -20579,6 +20579,8 @@ public class GroupMetadataManagerTest {
2,
true
);
assertEquals(Map.of(t1Uuid, Set.of(0, 1), t2Uuid, Set.of(0, 1)), context.groupMetadataManager.initializedShareGroupPartitions(groupId));
}
@Test
@ -20713,6 +20715,8 @@ public class GroupMetadataManagerTest {
assertNull(result.response());
assertEquals(List.of(), result.records());
assertEquals(Map.of(), context.groupMetadataManager.initializedShareGroupPartitions(groupId));
}
@Test
@ -20788,6 +20792,8 @@ public class GroupMetadataManagerTest {
t3Name, new TopicMetadata(t3Id, t3Name, 3)
))
);
assertEquals(Map.of(t2Id, Set.of(0, 1, 2)), context.groupMetadataManager.initializedShareGroupPartitions(groupId));
}
@Test

View File

@ -315,12 +315,7 @@ public class ShareGroupCommand {
TreeMap<String, Entry<ShareGroupDescription, Collection<SharePartitionOffsetInformation>>> groupOffsets = new TreeMap<>();
shareGroups.forEach((groupId, shareGroup) -> {
Set<TopicPartition> allTp = new HashSet<>();
for (ShareMemberDescription memberDescription : shareGroup.members()) {
allTp.addAll(memberDescription.assignment().topicPartitions());
}
ListShareGroupOffsetsSpec offsetsSpec = new ListShareGroupOffsetsSpec().topicPartitions(allTp);
ListShareGroupOffsetsSpec offsetsSpec = new ListShareGroupOffsetsSpec();
Map<String, ListShareGroupOffsetsSpec> groupSpecs = new HashMap<>();
groupSpecs.put(groupId, offsetsSpec);
@ -349,37 +344,34 @@ public class ShareGroupCommand {
private void printOffsets(TreeMap<String, Entry<ShareGroupDescription, Collection<SharePartitionOffsetInformation>>> offsets, boolean verbose) {
offsets.forEach((groupId, tuple) -> {
ShareGroupDescription description = tuple.getKey();
Collection<SharePartitionOffsetInformation> offsetsInfo = tuple.getValue();
if (maybePrintEmptyGroupState(groupId, description.groupState(), offsetsInfo.size())) {
String fmt = printOffsetFormat(groupId, offsetsInfo, verbose);
String fmt = printOffsetFormat(groupId, offsetsInfo, verbose);
if (verbose) {
System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "LEADER-EPOCH", "START-OFFSET");
} else {
System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "START-OFFSET");
}
for (SharePartitionOffsetInformation info : offsetsInfo) {
if (verbose) {
System.out.printf(fmt,
groupId,
info.topic,
info.partition,
MISSING_COLUMN_VALUE, // Temporary
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
);
} else {
System.out.printf(fmt,
groupId,
info.topic,
info.partition,
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
);
}
}
System.out.println();
if (verbose) {
System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "LEADER-EPOCH", "START-OFFSET");
} else {
System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "START-OFFSET");
}
for (SharePartitionOffsetInformation info : offsetsInfo) {
if (verbose) {
System.out.printf(fmt,
groupId,
info.topic,
info.partition,
MISSING_COLUMN_VALUE, // Temporary
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
);
} else {
System.out.printf(fmt,
groupId,
info.topic,
info.partition,
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
);
}
}
System.out.println();
});
}