KAFKA-16720: Support multiple groups in DescribeShareGroupOffsets RPC (#18834)

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
Andrew Schofield 2025-02-13 18:27:05 +00:00 committed by GitHub
parent 9cb271f1e1
commit 952113e8e0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 692 additions and 329 deletions

View File

@ -37,6 +37,8 @@ public class ListShareGroupOffsetsSpec {
/** /**
* Set the topic partitions whose offsets are to be listed for a share group. * Set the topic partitions whose offsets are to be listed for a share group.
*
* @param topicPartitions List of topic partitions to include
*/ */
public ListShareGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions) { public ListShareGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions) {
this.topicPartitions = topicPartitions; this.topicPartitions = topicPartitions;

View File

@ -22,19 +22,22 @@ import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
import org.apache.kafka.common.Node; import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData; import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest; import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.DescribeShareGroupOffsetsResponse; import org.apache.kafka.common.requests.DescribeShareGroupOffsetsResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -47,10 +50,9 @@ public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched<Coordi
private final Map<String, ListShareGroupOffsetsSpec> groupSpecs; private final Map<String, ListShareGroupOffsetsSpec> groupSpecs;
private final Logger log; private final Logger log;
private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy; private final CoordinatorStrategy lookupStrategy;
public ListShareGroupOffsetsHandler( public ListShareGroupOffsetsHandler(Map<String, ListShareGroupOffsetsSpec> groupSpecs,
Map<String, ListShareGroupOffsetsSpec> groupSpecs,
LogContext logContext) { LogContext logContext) {
this.groupSpecs = groupSpecs; this.groupSpecs = groupSpecs;
this.log = logContext.logger(ListShareGroupOffsetsHandler.class); this.log = logContext.logger(ListShareGroupOffsetsHandler.class);
@ -73,28 +75,32 @@ public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched<Coordi
@Override @Override
public DescribeShareGroupOffsetsRequest.Builder buildBatchedRequest(int coordinatorId, Set<CoordinatorKey> keys) { public DescribeShareGroupOffsetsRequest.Builder buildBatchedRequest(int coordinatorId, Set<CoordinatorKey> keys) {
List<String> groupIds = keys.stream().map(key -> { validateKeys(keys);
if (key.type != FindCoordinatorRequest.CoordinatorType.GROUP) {
throw new IllegalArgumentException("Invalid group coordinator key " + key + List<DescribeShareGroupOffsetsRequestGroup> groups = new ArrayList<>(keys.size());
" when building `DescribeShareGroupOffsets` request"); keys.forEach(coordinatorKey -> {
} String groupId = coordinatorKey.idValue;
return key.idValue;
}).collect(Collectors.toList());
// The DescribeShareGroupOffsetsRequest only includes a single group ID at this point, which is likely a mistake to be fixing a follow-on PR.
String groupId = groupIds.isEmpty() ? null : groupIds.get(0);
if (groupId == null) {
throw new IllegalArgumentException("Missing group id in request");
}
ListShareGroupOffsetsSpec spec = groupSpecs.get(groupId); ListShareGroupOffsetsSpec spec = groupSpecs.get(groupId);
List<DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic> topics = DescribeShareGroupOffsetsRequestGroup requestGroup = new DescribeShareGroupOffsetsRequestGroup()
spec.topicPartitions().stream().map( .setGroupId(groupId);
topicPartition -> new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
.setTopicName(topicPartition.topic()) Map<String, List<Integer>> topicPartitionMap = new HashMap<>();
.setPartitions(List.of(topicPartition.partition())) spec.topicPartitions().forEach(tp -> topicPartitionMap.computeIfAbsent(tp.topic(), t -> new LinkedList<>()).add(tp.partition()));
).collect(Collectors.toList());
Map<String, DescribeShareGroupOffsetsRequestTopic> requestTopics = new HashMap<>();
for (TopicPartition tp : spec.topicPartitions()) {
requestTopics.computeIfAbsent(tp.topic(), t ->
new DescribeShareGroupOffsetsRequestTopic()
.setTopicName(tp.topic())
.setPartitions(new LinkedList<>()))
.partitions()
.add(tp.partition());
}
requestGroup.setTopics(new ArrayList<>(requestTopics.values()));
groups.add(requestGroup);
});
DescribeShareGroupOffsetsRequestData data = new DescribeShareGroupOffsetsRequestData() DescribeShareGroupOffsetsRequestData data = new DescribeShareGroupOffsetsRequestData()
.setGroupId(groupId) .setGroups(groups);
.setTopics(topics);
return new DescribeShareGroupOffsetsRequest.Builder(data, true); return new DescribeShareGroupOffsetsRequest.Builder(data, true);
} }
@ -102,27 +108,41 @@ public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched<Coordi
public ApiResult<CoordinatorKey, Map<TopicPartition, Long>> handleResponse(Node coordinator, public ApiResult<CoordinatorKey, Map<TopicPartition, Long>> handleResponse(Node coordinator,
Set<CoordinatorKey> groupIds, Set<CoordinatorKey> groupIds,
AbstractResponse abstractResponse) { AbstractResponse abstractResponse) {
validateKeys(groupIds);
final DescribeShareGroupOffsetsResponse response = (DescribeShareGroupOffsetsResponse) abstractResponse; final DescribeShareGroupOffsetsResponse response = (DescribeShareGroupOffsetsResponse) abstractResponse;
final Map<CoordinatorKey, Map<TopicPartition, Long>> completed = new HashMap<>(); final Map<CoordinatorKey, Map<TopicPartition, Long>> completed = new HashMap<>();
final Map<CoordinatorKey, Throwable> failed = new HashMap<>(); final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
final List<CoordinatorKey> unmapped = new ArrayList<>();
for (CoordinatorKey groupId : groupIds) { for (CoordinatorKey coordinatorKey : groupIds) {
Map<TopicPartition, Long> data = new HashMap<>(); String groupId = coordinatorKey.idValue;
response.data().responses().stream().map( if (response.hasGroupError(groupId)) {
describedTopic -> handleGroupError(coordinatorKey, response.groupError(groupId), failed, unmapped);
describedTopic.partitions().stream().map( } else {
partition -> { Map<TopicPartition, Long> groupOffsetsListing = new HashMap<>();
if (partition.errorCode() == Errors.NONE.code()) response.data().groups().stream().filter(g -> g.groupId().equals(groupId)).forEach(groupResponse -> {
data.put(new TopicPartition(describedTopic.topicName(), partition.partitionIndex()), partition.startOffset()); for (DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic topicResponse : groupResponse.topics()) {
else for (DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition partitionResponse : topicResponse.partitions()) {
log.error("Skipping return offset for topic {} partition {} due to error {}.", describedTopic.topicName(), partition.partitionIndex(), Errors.forCode(partition.errorCode())); TopicPartition tp = new TopicPartition(topicResponse.topicName(), partitionResponse.partitionIndex());
return data; if (partitionResponse.errorCode() == Errors.NONE.code()) {
// Negative offset indicates there is no start offset for this partition
if (partitionResponse.startOffset() < 0) {
groupOffsetsListing.put(tp, null);
} else {
groupOffsetsListing.put(tp, partitionResponse.startOffset());
} }
).collect(Collectors.toList()) } else {
).collect(Collectors.toList()); log.warn("Skipping return offset for {} due to error {}: {}.", tp, partitionResponse.errorCode(), partitionResponse.errorMessage());
completed.put(groupId, data);
} }
return new ApiResult<>(completed, failed, Collections.emptyList()); }
}
});
completed.put(coordinatorKey, groupOffsetsListing);
}
}
return new ApiResult<>(completed, failed, unmapped);
} }
private static Set<CoordinatorKey> coordinatorKeys(Collection<String> groupIds) { private static Set<CoordinatorKey> coordinatorKeys(Collection<String> groupIds) {
@ -130,4 +150,46 @@ public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched<Coordi
.map(CoordinatorKey::byGroupId) .map(CoordinatorKey::byGroupId)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
} }
private void validateKeys(Set<CoordinatorKey> groupIds) {
Set<CoordinatorKey> keys = coordinatorKeys(groupSpecs.keySet());
if (!keys.containsAll(groupIds)) {
throw new IllegalArgumentException("Received unexpected group ids " + groupIds +
" (expected one of " + keys + ")");
}
}
private void handleGroupError(CoordinatorKey groupId,
Throwable exception,
Map<CoordinatorKey, Throwable> failed,
List<CoordinatorKey> groupsToUnmap) {
Errors error = Errors.forException(exception);
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
case UNKNOWN_MEMBER_ID:
case STALE_MEMBER_EPOCH:
log.debug("`DescribeShareGroupOffsets` request for group id {} failed due to error {}", groupId.idValue, error);
failed.put(groupId, exception);
break;
case COORDINATOR_LOAD_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("`DescribeShareGroupOffsets` request for group id {} failed because the coordinator " +
"is still in the process of loading state. Will retry", groupId.idValue);
break;
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
// If the coordinator is unavailable or there was a coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("`DescribeShareGroupOffsets` request for group id {} returned error {}. " +
"Will attempt to find the coordinator again and retry", groupId.idValue, error);
groupsToUnmap.add(groupId);
break;
default:
log.error("`DescribeShareGroupOffsets` request for group id {} failed due to unexpected error {}", groupId.idValue, error);
failed.put(groupId, exception);
}
}
} }

View File

@ -18,13 +18,13 @@
package org.apache.kafka.common.requests; package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData; import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData; import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup;
import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -60,19 +60,21 @@ public class DescribeShareGroupOffsetsRequest extends AbstractRequest {
this.data = data; this.data = data;
} }
public List<String> groupIds() {
return data.groups()
.stream()
.map(DescribeShareGroupOffsetsRequestGroup::groupId)
.collect(Collectors.toList());
}
public List<DescribeShareGroupOffsetsRequestGroup> groups() {
return data.groups();
}
@Override @Override
public DescribeShareGroupOffsetsResponse getErrorResponse(int throttleTimeMs, Throwable e) { public DescribeShareGroupOffsetsResponse getErrorResponse(int throttleTimeMs, Throwable e) {
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic> results = new ArrayList<>(); // Sets the exception as the group-level error code for all groups, with empty partitions data for all groups
data.topics().forEach( return new DescribeShareGroupOffsetsResponse(throttleTimeMs, groupIds(), e);
topicResult -> results.add(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
.setTopicName(topicResult.topicName())
.setPartitions(topicResult.partitions().stream()
.map(partitionData -> new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partitionData)
.setErrorCode(Errors.forException(e).code()))
.collect(Collectors.toList()))));
return new DescribeShareGroupOffsetsResponse(new DescribeShareGroupOffsetsResponseData()
.setResponses(results));
} }
@Override @Override
@ -87,23 +89,10 @@ public class DescribeShareGroupOffsetsRequest extends AbstractRequest {
); );
} }
public static List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic> getErrorDescribeShareGroupOffsets( public static DescribeShareGroupOffsetsResponseGroup getErrorDescribedGroup(String groupId, Errors error) {
List<DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic> topics, return new DescribeShareGroupOffsetsResponseGroup()
Errors error .setGroupId(groupId)
) {
return topics.stream()
.map(
requestTopic -> new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
.setTopicName(requestTopic.topicName())
.setPartitions(
requestTopic.partitions().stream().map(
partition -> new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(error.code()) .setErrorCode(error.code())
.setErrorMessage(error.message()) .setErrorMessage(error.message());
.setStartOffset(0)
).collect(Collectors.toList())
)
).collect(Collectors.toList());
} }
} }

View File

@ -18,20 +18,56 @@
package org.apache.kafka.common.requests; package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData; import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup;
import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
public class DescribeShareGroupOffsetsResponse extends AbstractResponse { public class DescribeShareGroupOffsetsResponse extends AbstractResponse {
private final DescribeShareGroupOffsetsResponseData data; private final DescribeShareGroupOffsetsResponseData data;
private final Map<String, Throwable> groupLevelErrors = new HashMap<>();
public DescribeShareGroupOffsetsResponse(DescribeShareGroupOffsetsResponseData data) { public DescribeShareGroupOffsetsResponse(DescribeShareGroupOffsetsResponseData data) {
super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS); super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS);
this.data = data; this.data = data;
for (DescribeShareGroupOffsetsResponseGroup group : data.groups()) {
if (group.errorCode() != Errors.NONE.code()) {
this.groupLevelErrors.put(group.groupId(), Errors.forCode(group.errorCode()).exception(group.errorMessage()));
}
}
}
// Builds a response with the same group-level error for all groups and empty topics lists for all groups
public DescribeShareGroupOffsetsResponse(int throttleTimeMs,
List<String> groupIds,
Throwable allGroupsException) {
super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS);
short errorCode = Errors.forException(allGroupsException).code();
List<DescribeShareGroupOffsetsResponseGroup> groupList = new ArrayList<>();
groupIds.forEach(groupId -> {
groupList.add(new DescribeShareGroupOffsetsResponseGroup()
.setGroupId(groupId)
.setErrorCode(errorCode)
.setErrorMessage(errorCode == Errors.UNKNOWN_SERVER_ERROR.code() ? Errors.forCode(errorCode).message() : allGroupsException.getMessage()));
groupLevelErrors.put(groupId, allGroupsException);
});
this.data = new DescribeShareGroupOffsetsResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setGroups(groupList);
}
public boolean hasGroupError(String groupId) {
return groupLevelErrors.containsKey(groupId);
}
public Throwable groupError(String groupId) {
return groupLevelErrors.get(groupId);
} }
@Override @Override
@ -42,11 +78,12 @@ public class DescribeShareGroupOffsetsResponse extends AbstractResponse {
@Override @Override
public Map<Errors, Integer> errorCounts() { public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> counts = new HashMap<>(); Map<Errors, Integer> counts = new HashMap<>();
data.responses().forEach( groupLevelErrors.values().forEach(exception -> updateErrorCounts(counts, Errors.forException(exception)));
result -> result.partitions().forEach( for (DescribeShareGroupOffsetsResponseGroup group : data.groups()) {
partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode())) group.topics().forEach(topic ->
) topic.partitions().forEach(partition ->
); updateErrorCounts(counts, Errors.forCode(partition.errorCode()))));
}
return counts; return counts;
} }
@ -61,8 +98,6 @@ public class DescribeShareGroupOffsetsResponse extends AbstractResponse {
} }
public static DescribeShareGroupOffsetsResponse parse(ByteBuffer buffer, short version) { public static DescribeShareGroupOffsetsResponse parse(ByteBuffer buffer, short version) {
return new DescribeShareGroupOffsetsResponse( return new DescribeShareGroupOffsetsResponse(new DescribeShareGroupOffsetsResponseData(new ByteBufferAccessor(buffer), version));
new DescribeShareGroupOffsetsResponseData(new ByteBufferAccessor(buffer), version)
);
} }
} }

View File

@ -22,6 +22,8 @@
"flexibleVersions": "0+", "flexibleVersions": "0+",
"latestVersionUnstable": true, "latestVersionUnstable": true,
"fields": [ "fields": [
{ "name": "Groups", "type": "[]DescribeShareGroupOffsetsRequestGroup", "versions": "0+",
"about": "The groups to describe offsets for.", "fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The group identifier." }, "about": "The group identifier." },
{ "name": "Topics", "type": "[]DescribeShareGroupOffsetsRequestTopic", "versions": "0+", { "name": "Topics", "type": "[]DescribeShareGroupOffsetsRequestTopic", "versions": "0+",
@ -31,5 +33,6 @@
{ "name": "Partitions", "type": "[]int32", "versions": "0+", { "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions." } "about": "The partitions." }
]} ]}
]}
] ]
} }

View File

@ -13,7 +13,6 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
{ {
"apiKey": 90, "apiKey": 90,
"type": "response", "type": "response",
@ -22,6 +21,7 @@
"flexibleVersions": "0+", "flexibleVersions": "0+",
// Supported errors: // Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+) // - GROUP_AUTHORIZATION_FAILED (version 0+)
// - TOPIC_AUTHORIZATION_FAILED (version 0+)
// - NOT_COORDINATOR (version 0+) // - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+)
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
@ -31,7 +31,11 @@
"fields": [ "fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Responses", "type": "[]DescribeShareGroupOffsetsResponseTopic", "versions": "0+", { "name": "Groups", "type": "[]DescribeShareGroupOffsetsResponseGroup", "versions": "0+",
"about": "The results for each group.", "fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The group identifier." },
{ "name": "Topics", "type": "[]DescribeShareGroupOffsetsResponseTopic", "versions": "0+",
"about": "The results for each topic.", "fields": [ "about": "The results for each topic.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." }, "about": "The topic name." },
@ -45,10 +49,15 @@
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+", { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
"about": "The leader epoch of the partition." }, "about": "The leader epoch of the partition." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+", { "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." }, "about": "The partition-level error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The error message, or null if there was no error." } "about": "The partition-level error message, or null if there was no error." }
]} ]}
]},
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The group-level error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The group-level error message, or null if there was no error." }
]} ]}
] ]
} }

View File

@ -9203,9 +9203,10 @@ public class KafkaAdminClientTest {
ClientRequest clientRequest = mockClient.requests().peek(); ClientRequest clientRequest = mockClient.requests().peek();
assertNotNull(clientRequest); assertNotNull(clientRequest);
DescribeShareGroupOffsetsRequestData data = ((DescribeShareGroupOffsetsRequest.Builder) clientRequest.requestBuilder()).build().data(); DescribeShareGroupOffsetsRequestData data = ((DescribeShareGroupOffsetsRequest.Builder) clientRequest.requestBuilder()).build().data();
assertEquals(GROUP_ID, data.groupId()); assertEquals(1, data.groups().size());
assertEquals(GROUP_ID, data.groups().get(0).groupId());
assertEquals(Collections.singletonList("A"), assertEquals(Collections.singletonList("A"),
data.topics().stream().map(DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic::topicName).collect(Collectors.toList())); data.groups().get(0).topics().stream().map(DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic::topicName).collect(Collectors.toList()));
} }
} }
@ -9223,14 +9224,15 @@ public class KafkaAdminClientTest {
TopicPartition myTopicPartition4 = new TopicPartition("my_topic_1", 4); TopicPartition myTopicPartition4 = new TopicPartition("my_topic_1", 4);
TopicPartition myTopicPartition5 = new TopicPartition("my_topic_2", 6); TopicPartition myTopicPartition5 = new TopicPartition("my_topic_2", 6);
ListShareGroupOffsetsSpec groupSpec = new ListShareGroupOffsetsSpec().topicPartitions( ListShareGroupOffsetsSpec groupSpec = new ListShareGroupOffsetsSpec().topicPartitions(
List.of(myTopicPartition0, myTopicPartition1, myTopicPartition2, myTopicPartition3, myTopicPartition4, myTopicPartition5) List.of(myTopicPartition0, myTopicPartition1, myTopicPartition2, myTopicPartition3, myTopicPartition4, myTopicPartition5)
); );
Map<String, ListShareGroupOffsetsSpec> groupSpecs = new HashMap<>(); Map<String, ListShareGroupOffsetsSpec> groupSpecs = new HashMap<>();
groupSpecs.put(GROUP_ID, groupSpec); groupSpecs.put(GROUP_ID, groupSpec);
DescribeShareGroupOffsetsResponseData data = new DescribeShareGroupOffsetsResponseData().setResponses( DescribeShareGroupOffsetsResponseData data = new DescribeShareGroupOffsetsResponseData().setGroups(
List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId(GROUP_ID).setTopics(
List.of( List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10))), new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10))),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11))), new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11))),
@ -9239,6 +9241,8 @@ public class KafkaAdminClientTest {
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100))), new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100))),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500))) new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500)))
) )
)
)
); );
env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data)); env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data));
@ -9255,6 +9259,92 @@ public class KafkaAdminClientTest {
} }
} }
@Test
public void testListShareGroupOffsetsMultipleGroups() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareBatchedFindCoordinatorResponse(Errors.NONE, env.cluster().controller(), Set.of(GROUP_ID, "group-1")));
TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
TopicPartition myTopicPartition3 = new TopicPartition("my_topic", 3);
TopicPartition myTopicPartition4 = new TopicPartition("my_topic_1", 4);
TopicPartition myTopicPartition5 = new TopicPartition("my_topic_2", 6);
ListShareGroupOffsetsSpec group0Specs = new ListShareGroupOffsetsSpec().topicPartitions(
List.of(myTopicPartition0, myTopicPartition1, myTopicPartition2, myTopicPartition3)
);
ListShareGroupOffsetsSpec group1Specs = new ListShareGroupOffsetsSpec().topicPartitions(
List.of(myTopicPartition4, myTopicPartition5)
);
Map<String, ListShareGroupOffsetsSpec> groupSpecs = new HashMap<>();
groupSpecs.put(GROUP_ID, group0Specs);
groupSpecs.put("group-1", group1Specs);
DescribeShareGroupOffsetsResponseData data = new DescribeShareGroupOffsetsResponseData().setGroups(
List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId(GROUP_ID).setTopics(
List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10))),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11))),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(2).setStartOffset(40))),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(3).setStartOffset(50)))
)
),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId("group-1").setTopics(
List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100))),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500)))
)
)
)
);
env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data));
final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs);
assertEquals(2, result.all().get().size());
final Map<TopicPartition, Long> partitionToOffsetAndMetadataGroup0 = result.partitionsToOffset(GROUP_ID).get();
assertEquals(4, partitionToOffsetAndMetadataGroup0.size());
assertEquals(10, partitionToOffsetAndMetadataGroup0.get(myTopicPartition0));
assertEquals(11, partitionToOffsetAndMetadataGroup0.get(myTopicPartition1));
assertEquals(40, partitionToOffsetAndMetadataGroup0.get(myTopicPartition2));
assertEquals(50, partitionToOffsetAndMetadataGroup0.get(myTopicPartition3));
final Map<TopicPartition, Long> partitionToOffsetAndMetadataGroup1 = result.partitionsToOffset("group-1").get();
assertEquals(2, partitionToOffsetAndMetadataGroup1.size());
assertEquals(100, partitionToOffsetAndMetadataGroup1.get(myTopicPartition4));
assertEquals(500, partitionToOffsetAndMetadataGroup1.get(myTopicPartition5));
}
}
@Test
public void testListShareGroupOffsetsEmpty() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
ListShareGroupOffsetsSpec groupSpec = new ListShareGroupOffsetsSpec();
Map<String, ListShareGroupOffsetsSpec> groupSpecs = new HashMap<>();
groupSpecs.put(GROUP_ID, groupSpec);
DescribeShareGroupOffsetsResponseData data = new DescribeShareGroupOffsetsResponseData().setGroups(
List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId(GROUP_ID)
)
);
env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data));
final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs);
final Map<TopicPartition, Long> partitionToOffsetAndMetadata = result.partitionsToOffset(GROUP_ID).get();
assertEquals(0, partitionToOffsetAndMetadata.size());
}
}
@Test @Test
public void testListShareGroupOffsetsWithErrorInOnePartition() throws Exception { public void testListShareGroupOffsetsWithErrorInOnePartition() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
@ -9274,13 +9364,19 @@ public class KafkaAdminClientTest {
Map<String, ListShareGroupOffsetsSpec> groupSpecs = new HashMap<>(); Map<String, ListShareGroupOffsetsSpec> groupSpecs = new HashMap<>();
groupSpecs.put(GROUP_ID, groupSpec); groupSpecs.put(GROUP_ID, groupSpec);
DescribeShareGroupOffsetsResponseData data = new DescribeShareGroupOffsetsResponseData().setResponses( DescribeShareGroupOffsetsResponseData data = new DescribeShareGroupOffsetsResponseData().setGroups(
List.of( List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10))), new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId(GROUP_ID).setTopics(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11))), List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11)
)),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setErrorCode(Errors.NOT_COORDINATOR.code()).setErrorMessage("Not a Coordinator"))), new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setErrorCode(Errors.NOT_COORDINATOR.code()).setErrorMessage("Not a Coordinator"))),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500))) new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500)))
) )
)
)
); );
env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data)); env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data));

View File

@ -3701,23 +3701,26 @@ public class RequestResponseTest {
private DescribeShareGroupOffsetsRequest createDescribeShareGroupOffsetsRequest(short version) { private DescribeShareGroupOffsetsRequest createDescribeShareGroupOffsetsRequest(short version) {
DescribeShareGroupOffsetsRequestData data = new DescribeShareGroupOffsetsRequestData() DescribeShareGroupOffsetsRequestData data = new DescribeShareGroupOffsetsRequestData()
.setGroups(Collections.singletonList(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
.setGroupId("group") .setGroupId("group")
.setTopics(Collections.singletonList(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() .setTopics(Collections.singletonList(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
.setTopicName("topic-1") .setTopicName("topic-1")
.setPartitions(Collections.singletonList(0)))); .setPartitions(Collections.singletonList(0))))));
return new DescribeShareGroupOffsetsRequest.Builder(data).build(version); return new DescribeShareGroupOffsetsRequest.Builder(data).build(version);
} }
private DescribeShareGroupOffsetsResponse createDescribeShareGroupOffsetsResponse() { private DescribeShareGroupOffsetsResponse createDescribeShareGroupOffsetsResponse() {
DescribeShareGroupOffsetsResponseData data = new DescribeShareGroupOffsetsResponseData() DescribeShareGroupOffsetsResponseData data = new DescribeShareGroupOffsetsResponseData()
.setResponses(Collections.singletonList(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic() .setGroups(Collections.singletonList(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setTopicName("group") .setGroupId("group")
.setTopics(Collections.singletonList(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
.setTopicName("topic-1")
.setTopicId(Uuid.randomUuid()) .setTopicId(Uuid.randomUuid())
.setPartitions(Collections.singletonList(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() .setPartitions(Collections.singletonList(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(0) .setPartitionIndex(0)
.setErrorCode(Errors.NONE.code()) .setErrorCode(Errors.NONE.code())
.setStartOffset(0) .setStartOffset(0)
.setLeaderEpoch(0))))); .setLeaderEpoch(0)))))));
return new DescribeShareGroupOffsetsResponse(data); return new DescribeShareGroupOffsetsResponse(data);
} }

View File

@ -88,8 +88,8 @@ private[group] class GroupCoordinatorAdapter(
override def describeShareGroupOffsets( override def describeShareGroupOffsets(
context: RequestContext, context: RequestContext,
request: DescribeShareGroupOffsetsRequestData request: DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
): CompletableFuture[DescribeShareGroupOffsetsResponseData] = { ): CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup] = {
FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
s"The old group coordinator does not support ${ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS.name} API." s"The old group coordinator does not support ${ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS.name} API."
)) ))

View File

@ -3221,27 +3221,51 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleDescribeShareGroupOffsetsRequest(request: RequestChannel.Request): Unit = { def handleDescribeShareGroupOffsetsRequest(request: RequestChannel.Request): Unit = {
val describeShareGroupOffsetsRequest = request.body[DescribeShareGroupOffsetsRequest] val describeShareGroupOffsetsRequest = request.body[DescribeShareGroupOffsetsRequest]
val groups = describeShareGroupOffsetsRequest.groups()
val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]](groups.size)
groups.forEach { groupDescribeOffsets =>
if (!isShareGroupProtocolEnabled) { if (!isShareGroupProtocolEnabled) {
requestHelper.sendMaybeThrottle(request, describeShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) futures += CompletableFuture.completedFuture(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
CompletableFuture.completedFuture[Unit](()) .setGroupId(groupDescribeOffsets.groupId)
} else if (!authHelper.authorize(request.context, READ, GROUP, describeShareGroupOffsetsRequest.data.groupId)) { .setErrorCode(Errors.UNSUPPORTED_VERSION.code))
requestHelper.sendMaybeThrottle(request, describeShareGroupOffsetsRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) } else if (!authHelper.authorize(request.context, READ, GROUP, groupDescribeOffsets.groupId)) {
CompletableFuture.completedFuture[Unit](()) futures += CompletableFuture.completedFuture(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setGroupId(groupDescribeOffsets.groupId)
.setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code))
} else if (groupDescribeOffsets.topics.isEmpty) {
futures += CompletableFuture.completedFuture(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setGroupId(groupDescribeOffsets.groupId))
} else { } else {
groupCoordinator.describeShareGroupOffsets( futures += describeShareGroupOffsetsForGroup(
request.context, request.context,
describeShareGroupOffsetsRequest.data, groupDescribeOffsets
).handle[Unit] { (response, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request, describeShareGroupOffsetsRequest.getErrorResponse(exception))
} else {
requestHelper.sendMaybeThrottle(
request,
new DescribeShareGroupOffsetsResponse(response)
) )
} }
} }
CompletableFuture.allOf(futures.toArray: _*).handle[Unit] { (_, _) =>
val groupResponses = new ArrayBuffer[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup](futures.size)
val responseData = new DescribeShareGroupOffsetsResponseData().setGroups(groupResponses.asJava)
futures.foreach(future => groupResponses += future.join)
requestHelper.sendMaybeThrottle(request, new DescribeShareGroupOffsetsResponse(responseData))
}
}
private def describeShareGroupOffsetsForGroup(requestContext: RequestContext,
groupDescribeOffsetsRequest: DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
): CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup] = {
groupCoordinator.describeShareGroupOffsets(
requestContext,
groupDescribeOffsetsRequest
).handle[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup] { (groupDescribeOffsetsResponse, exception) =>
if (exception != null) {
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setGroupId(groupDescribeOffsetsRequest.groupId)
.setErrorCode(Errors.forException(exception).code)
} else {
groupDescribeOffsetsResponse
}
} }
} }

View File

@ -101,7 +101,7 @@ class GroupCoordinatorAdapterTest {
val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
val context = makeContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS, ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS.latestVersion) val context = makeContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS, ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS.latestVersion)
val request = new DescribeShareGroupOffsetsRequestData() val request = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
.setGroupId("group") .setGroupId("group")
val future = adapter.describeShareGroupOffsets(context, request) val future = adapter.describeShareGroupOffsets(context, request)

View File

@ -42,8 +42,8 @@ import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.DescribedGroup import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.DescribedGroup
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData.{DescribeShareGroupOffsetsRequestGroup, DescribeShareGroupOffsetsRequestTopic}
import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.{DescribeShareGroupOffsetsResponsePartition, DescribeShareGroupOffsetsResponseTopic} import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.{DescribeShareGroupOffsetsResponsePartition, DescribeShareGroupOffsetsResponseGroup, DescribeShareGroupOffsetsResponseTopic}
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource, AlterConfigsResourceCollection => IAlterConfigsResourceCollection, AlterableConfig => IAlterableConfig, AlterableConfigCollection => IAlterableConfigCollection} import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource, AlterConfigsResourceCollection => IAlterConfigsResourceCollection, AlterableConfig => IAlterableConfig, AlterableConfigCollection => IAlterableConfigCollection}
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{AlterConfigsResourceResponse => IAlterConfigsResourceResponse} import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{AlterConfigsResourceResponse => IAlterConfigsResourceResponse}
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
@ -10445,8 +10445,10 @@ class KafkaApisTest extends Logging {
@Test @Test
def testDescribeShareGroupOffsetsReturnsUnsupportedVersion(): Unit = { def testDescribeShareGroupOffsetsReturnsUnsupportedVersion(): Unit = {
val describeShareGroupOffsetsRequest = new DescribeShareGroupOffsetsRequestData().setGroupId("group").setTopics( val describeShareGroupOffsetsRequest = new DescribeShareGroupOffsetsRequestData().setGroups(
util.List.of(new DescribeShareGroupOffsetsRequestGroup().setGroupId("group").setTopics(
util.List.of(new DescribeShareGroupOffsetsRequestTopic().setTopicName("topic-1").setPartitions(util.List.of(1))) util.List.of(new DescribeShareGroupOffsetsRequestTopic().setTopicName("topic-1").setPartitions(util.List.of(1)))
))
) )
val requestChannelRequest = buildRequest(new DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, true).build()) val requestChannelRequest = buildRequest(new DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, true).build())
@ -10455,16 +10457,18 @@ class KafkaApisTest extends Logging {
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val response = verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest) val response = verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
response.data.responses.forEach(topic => topic.partitions().forEach(partition => assertEquals(Errors.UNSUPPORTED_VERSION.code(), partition.errorCode()))) response.data.groups.forEach(group => group.topics.forEach(topic => topic.partitions.forEach(partition => assertEquals(Errors.UNSUPPORTED_VERSION.code, partition.errorCode))))
} }
@Test @Test
def testDescribeShareGroupOffsetsRequestsAuthorizationFailed(): Unit = { def testDescribeShareGroupOffsetsRequestsAuthorizationFailed(): Unit = {
val describeShareGroupOffsetsRequest = new DescribeShareGroupOffsetsRequestData().setGroupId("group").setTopics( val describeShareGroupOffsetsRequest = new DescribeShareGroupOffsetsRequestData().setGroups(
util.List.of(new DescribeShareGroupOffsetsRequestGroup().setGroupId("group").setTopics(
util.List.of(new DescribeShareGroupOffsetsRequestTopic().setTopicName("topic-1").setPartitions(util.List.of(1))) util.List.of(new DescribeShareGroupOffsetsRequestTopic().setTopicName("topic-1").setPartitions(util.List.of(1)))
))
) )
val requestChannelRequest = buildRequest(new DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, true).build()) val requestChannelRequest = buildRequest(new DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, true).build)
val authorizer: Authorizer = mock(classOf[Authorizer]) val authorizer: Authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
@ -10477,9 +10481,11 @@ class KafkaApisTest extends Logging {
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val response = verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest) val response = verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
response.data.responses.forEach( response.data.groups.forEach(
topic => topic.partitions().forEach( group => group.topics.forEach(
partition => assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code(), partition.errorCode()) topic => topic.partitions.forEach(
partition => assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, partition.errorCode)
)
) )
) )
} }
@ -10487,34 +10493,52 @@ class KafkaApisTest extends Logging {
@Test @Test
def testDescribeShareGroupOffsetsRequestSuccess(): Unit = { def testDescribeShareGroupOffsetsRequestSuccess(): Unit = {
val topicName1 = "topic-1" val topicName1 = "topic-1"
val topicId1 = Uuid.randomUuid() val topicId1 = Uuid.randomUuid
val topicName2 = "topic-2" val topicName2 = "topic-2"
val topicId2 = Uuid.randomUuid() val topicId2 = Uuid.randomUuid
val topicName3 = "topic-3"
val topicId3 = Uuid.randomUuid
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
addTopicToMetadataCache(topicName1, 1, topicId = topicId1) addTopicToMetadataCache(topicName1, 1, topicId = topicId1)
addTopicToMetadataCache(topicName2, 1, topicId = topicId2) addTopicToMetadataCache(topicName2, 1, topicId = topicId2)
addTopicToMetadataCache(topicName3, 1, topicId = topicId3)
val describeShareGroupOffsetsRequest = new DescribeShareGroupOffsetsRequestData().setGroupId("group").setTopics( val describeShareGroupOffsetsRequestGroup1 = new DescribeShareGroupOffsetsRequestGroup().setGroupId("group1").setTopics(
util.List.of( util.List.of(
new DescribeShareGroupOffsetsRequestTopic().setTopicName(topicName1).setPartitions(util.List.of(1, 2, 3)), new DescribeShareGroupOffsetsRequestTopic().setTopicName(topicName1).setPartitions(util.List.of(1, 2, 3)),
new DescribeShareGroupOffsetsRequestTopic().setTopicName(topicName2).setPartitions(util.List.of(10, 20)), new DescribeShareGroupOffsetsRequestTopic().setTopicName(topicName2).setPartitions(util.List.of(10, 20)),
) )
) )
val requestChannelRequest = buildRequest(new DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, true).build()) val describeShareGroupOffsetsRequestGroup2 = new DescribeShareGroupOffsetsRequestGroup().setGroupId("group2").setTopics(
util.List.of(
new DescribeShareGroupOffsetsRequestTopic().setTopicName(topicName3).setPartitions(util.List.of(0)),
)
)
val future = new CompletableFuture[DescribeShareGroupOffsetsResponseData]() val describeShareGroupOffsetsRequest = new DescribeShareGroupOffsetsRequestData()
.setGroups(util.List.of(describeShareGroupOffsetsRequestGroup1, describeShareGroupOffsetsRequestGroup2))
val requestChannelRequest = buildRequest(new DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, true).build)
val futureGroup1 = new CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
when(groupCoordinator.describeShareGroupOffsets( when(groupCoordinator.describeShareGroupOffsets(
requestChannelRequest.context, requestChannelRequest.context,
describeShareGroupOffsetsRequest describeShareGroupOffsetsRequestGroup1
)).thenReturn(future) )).thenReturn(futureGroup1)
val futureGroup2 = new CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
when(groupCoordinator.describeShareGroupOffsets(
requestChannelRequest.context,
describeShareGroupOffsetsRequestGroup2
)).thenReturn(futureGroup2)
kafkaApis = createKafkaApis( kafkaApis = createKafkaApis(
overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
) )
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val describeShareGroupOffsetsResponse = new DescribeShareGroupOffsetsResponseData() val describeShareGroupOffsetsResponseGroup1 = new DescribeShareGroupOffsetsResponseGroup()
.setResponses(util.List.of( .setGroupId("group1")
.setTopics(util.List.of(
new DescribeShareGroupOffsetsResponseTopic() new DescribeShareGroupOffsetsResponseTopic()
.setTopicName(topicName1) .setTopicName(topicName1)
.setTopicId(topicId1) .setTopicId(topicId1)
@ -10557,7 +10581,81 @@ class KafkaApisTest extends Logging {
)) ))
)) ))
future.complete(describeShareGroupOffsetsResponse) val describeShareGroupOffsetsResponseGroup2 = new DescribeShareGroupOffsetsResponseGroup()
.setGroupId("group2")
.setTopics(util.List.of(
new DescribeShareGroupOffsetsResponseTopic()
.setTopicName(topicName3)
.setTopicId(topicId3)
.setPartitions(util.List.of(
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setStartOffset(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
))
))
val describeShareGroupOffsetsResponse = new DescribeShareGroupOffsetsResponseData()
.setGroups(util.List.of(describeShareGroupOffsetsResponseGroup1, describeShareGroupOffsetsResponseGroup2))
futureGroup1.complete(describeShareGroupOffsetsResponseGroup1)
futureGroup2.complete(describeShareGroupOffsetsResponseGroup2)
val response = verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
assertEquals(describeShareGroupOffsetsResponse, response.data)
}
@Test
def testDescribeShareGroupOffsetsRequestEmptyGroupsSuccess(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
val describeShareGroupOffsetsRequest = new DescribeShareGroupOffsetsRequestData
val requestChannelRequest = buildRequest(new DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, true).build)
val future = new CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
kafkaApis = createKafkaApis(
overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val describeShareGroupOffsetsResponseGroup = new DescribeShareGroupOffsetsResponseGroup()
val describeShareGroupOffsetsResponse = new DescribeShareGroupOffsetsResponseData()
future.complete(describeShareGroupOffsetsResponseGroup)
val response = verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
assertEquals(describeShareGroupOffsetsResponse, response.data)
}
@Test
def testDescribeShareGroupOffsetsRequestEmptyTopicsSuccess(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
val describeShareGroupOffsetsRequestGroup = new DescribeShareGroupOffsetsRequestGroup().setGroupId("group")
val describeShareGroupOffsetsRequest = new DescribeShareGroupOffsetsRequestData().setGroups(util.List.of(describeShareGroupOffsetsRequestGroup))
val requestChannelRequest = buildRequest(new DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, true).build)
val future = new CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
when(groupCoordinator.describeShareGroupOffsets(
requestChannelRequest.context,
describeShareGroupOffsetsRequestGroup
)).thenReturn(future)
kafkaApis = createKafkaApis(
overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val describeShareGroupOffsetsResponseGroup = new DescribeShareGroupOffsetsResponseGroup()
.setGroupId("group")
.setTopics(util.List.of())
val describeShareGroupOffsetsResponse = new DescribeShareGroupOffsetsResponseData().setGroups(util.List.of(describeShareGroupOffsetsResponseGroup))
future.complete(describeShareGroupOffsetsResponseGroup)
val response = verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest) val response = verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
assertEquals(describeShareGroupOffsetsResponse, response.data) assertEquals(describeShareGroupOffsetsResponse, response.data)
} }

View File

@ -259,15 +259,17 @@ public interface GroupCoordinator {
); );
/** /**
* Fetch the Share Group Offsets for a given group. * Describe the Share Group Offsets for a given group.
* *
* @param context The request context * @param context The request context
* @param request The DescribeShareGroupOffsets request. * @param request The DescribeShareGroupOffsetsRequestGroup request.
*
* @return A future yielding the results. * @return A future yielding the results.
* The error codes of the response are set to indicate the errors occurred during the execution.
*/ */
CompletableFuture<DescribeShareGroupOffsetsResponseData> describeShareGroupOffsets( CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> describeShareGroupOffsets(
RequestContext context, RequestContext context,
DescribeShareGroupOffsetsRequestData request DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup request
); );
/** /**

View File

@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group; package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.NotCoordinatorException; import org.apache.kafka.common.errors.NotCoordinatorException;
@ -956,47 +957,61 @@ public class GroupCoordinatorService implements GroupCoordinator {
} }
/** /**
* See {@link GroupCoordinator#describeShareGroupOffsets(RequestContext, DescribeShareGroupOffsetsRequestData)}. * See {@link GroupCoordinator#describeShareGroupOffsets(RequestContext, DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup)}.
*/ */
@Override @Override
public CompletableFuture<DescribeShareGroupOffsetsResponseData> describeShareGroupOffsets( public CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> describeShareGroupOffsets(
RequestContext context, RequestContext context,
DescribeShareGroupOffsetsRequestData requestData DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData
) { ) {
if (!isActive.get()) { if (!isActive.get()) {
return CompletableFuture.completedFuture( return CompletableFuture.completedFuture(
new DescribeShareGroupOffsetsResponseData() DescribeShareGroupOffsetsRequest.getErrorDescribedGroup(requestData.groupId(), Errors.COORDINATOR_NOT_AVAILABLE));
.setResponses(DescribeShareGroupOffsetsRequest.getErrorDescribeShareGroupOffsets(
requestData.topics(),
Errors.COORDINATOR_NOT_AVAILABLE
))
);
} }
if (metadataImage == null) { if (metadataImage == null) {
return CompletableFuture.completedFuture( return CompletableFuture.completedFuture(
new DescribeShareGroupOffsetsResponseData() DescribeShareGroupOffsetsRequest.getErrorDescribedGroup(requestData.groupId(), Errors.COORDINATOR_NOT_AVAILABLE));
.setResponses(DescribeShareGroupOffsetsRequest.getErrorDescribeShareGroupOffsets(
requestData.topics(),
Errors.UNKNOWN_TOPIC_OR_PARTITION
))
);
} }
List<ReadShareGroupStateSummaryRequestData.ReadStateSummaryData> readStateSummaryData = Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>();
requestData.topics().stream().map( List<ReadShareGroupStateSummaryRequestData.ReadStateSummaryData> readStateSummaryData = new ArrayList<>(requestData.topics().size());
topic -> new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic> describeShareGroupOffsetsResponseTopicList = new ArrayList<>(requestData.topics().size());
.setTopicId(metadataImage.topics().topicNameToIdView().get(topic.topicName())) requestData.topics().forEach(topic -> {
Uuid topicId = metadataImage.topics().topicNameToIdView().get(topic.topicName());
if (topicId != null) {
requestTopicIdToNameMapping.put(topicId, topic.topicName());
readStateSummaryData.add(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(topicId)
.setPartitions( .setPartitions(
topic.partitions().stream().map( topic.partitions().stream().map(
partitionIndex -> new ReadShareGroupStateSummaryRequestData.PartitionData().setPartition(partitionIndex) partitionIndex -> new ReadShareGroupStateSummaryRequestData.PartitionData().setPartition(partitionIndex)
).toList() ).toList()
) ));
).toList(); } else {
describeShareGroupOffsetsResponseTopicList.add(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
.setTopicName(topic.topicName())
.setPartitions(topic.partitions().stream().map(
partition -> new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
).toList()));
}
});
// If the request for the persister is empty, just complete the operation right away.
if (readStateSummaryData.isEmpty()) {
return CompletableFuture.completedFuture(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setGroupId(requestData.groupId())
.setTopics(describeShareGroupOffsetsResponseTopicList));
}
ReadShareGroupStateSummaryRequestData readSummaryRequestData = new ReadShareGroupStateSummaryRequestData() ReadShareGroupStateSummaryRequestData readSummaryRequestData = new ReadShareGroupStateSummaryRequestData()
.setGroupId(requestData.groupId()) .setGroupId(requestData.groupId())
.setTopics(readStateSummaryData); .setTopics(readStateSummaryData);
CompletableFuture<DescribeShareGroupOffsetsResponseData> future = new CompletableFuture<>(); CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> future = new CompletableFuture<>();
persister.readSummary(ReadShareGroupStateSummaryParameters.from(readSummaryRequestData)) persister.readSummary(ReadShareGroupStateSummaryParameters.from(readSummaryRequestData))
.whenComplete((result, error) -> { .whenComplete((result, error) -> {
if (error != null) { if (error != null) {
@ -1009,20 +1024,23 @@ public class GroupCoordinatorService implements GroupCoordinator {
future.completeExceptionally(new IllegalStateException("Result is null for the read state summary")); future.completeExceptionally(new IllegalStateException("Result is null for the read state summary"));
return; return;
} }
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic> describeShareGroupOffsetsResponseTopicList = result.topicsData().forEach(topicData ->
result.topicsData().stream().map( describeShareGroupOffsetsResponseTopicList.add(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
topicData -> new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
.setTopicId(topicData.topicId()) .setTopicId(topicData.topicId())
.setTopicName(metadataImage.topics().topicIdToNameView().get(topicData.topicId())) .setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
.setPartitions(topicData.partitions().stream().map( .setPartitions(topicData.partitions().stream().map(
partitionData -> new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() partitionData -> new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partitionData.partition()) .setPartitionIndex(partitionData.partition())
.setStartOffset(partitionData.startOffset()) .setStartOffset(partitionData.startOffset())
.setErrorMessage(partitionData.errorMessage()) .setErrorMessage(Errors.forCode(partitionData.errorCode()).message())
.setErrorCode(partitionData.errorCode()) .setErrorCode(partitionData.errorCode())
).toList()) ).toList())
).toList(); ));
future.complete(new DescribeShareGroupOffsetsResponseData().setResponses(describeShareGroupOffsetsResponseTopicList));
future.complete(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setGroupId(requestData.groupId())
.setTopics(describeShareGroupOffsetsResponseTopicList));
}); });
return future; return future;
} }

View File

@ -2202,15 +2202,16 @@ public class GroupCoordinatorServiceTest {
service.startup(() -> 1); service.startup(() -> 1);
int partition = 1; int partition = 1;
DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData() DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
.setGroupId("share-group-id") .setGroupId("share-group-id")
.setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME) .setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition)) .setPartitions(List.of(partition))
)); ));
DescribeShareGroupOffsetsResponseData responseData = new DescribeShareGroupOffsetsResponseData() DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup responseData = new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setResponses( .setGroupId("share-group-id")
.setTopics(
List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic() List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
.setTopicName(TOPIC_NAME) .setTopicName(TOPIC_NAME)
.setTopicId(TOPIC_ID) .setTopicId(TOPIC_ID)
@ -2218,11 +2219,10 @@ public class GroupCoordinatorServiceTest {
.setPartitionIndex(partition) .setPartitionIndex(partition)
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET) .setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
.setErrorCode(PartitionFactory.DEFAULT_ERROR_CODE) .setErrorCode(PartitionFactory.DEFAULT_ERROR_CODE)
.setErrorMessage(PartitionFactory.DEFAULT_ERR_MESSAGE))) .setErrorMessage(PartitionFactory.DEFAULT_ERR_MESSAGE))))
)
); );
CompletableFuture<DescribeShareGroupOffsetsResponseData> future = CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> future =
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get()); assertEquals(responseData, future.get());
@ -2240,7 +2240,7 @@ public class GroupCoordinatorServiceTest {
service.startup(() -> 1); service.startup(() -> 1);
int partition = 1; int partition = 1;
DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData() DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
.setGroupId("share-group-id") .setGroupId("share-group-id")
.setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME) .setTopicName(TOPIC_NAME)
@ -2254,8 +2254,9 @@ public class GroupCoordinatorServiceTest {
.setPartitions(List.of(new ReadShareGroupStateSummaryRequestData.PartitionData() .setPartitions(List.of(new ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartition(partition))))); .setPartition(partition)))));
DescribeShareGroupOffsetsResponseData responseData = new DescribeShareGroupOffsetsResponseData() DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup responseData = new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setResponses( .setGroupId("share-group-id")
.setTopics(
List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic() List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
.setTopicName(TOPIC_NAME) .setTopicName(TOPIC_NAME)
.setTopicId(TOPIC_ID) .setTopicId(TOPIC_ID)
@ -2263,8 +2264,7 @@ public class GroupCoordinatorServiceTest {
.setPartitionIndex(partition) .setPartitionIndex(partition)
.setStartOffset(21) .setStartOffset(21)
.setErrorCode(Errors.NONE.code()) .setErrorCode(Errors.NONE.code())
.setErrorMessage(Errors.NONE.message()))) .setErrorMessage(Errors.NONE.message()))))
)
); );
ReadShareGroupStateSummaryResponseData readShareGroupStateSummaryResponseData = new ReadShareGroupStateSummaryResponseData() ReadShareGroupStateSummaryResponseData readShareGroupStateSummaryResponseData = new ReadShareGroupStateSummaryResponseData()
@ -2286,7 +2286,43 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq(readShareGroupStateSummaryParameters) ArgumentMatchers.eq(readShareGroupStateSummaryParameters)
)).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult)); )).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
CompletableFuture<DescribeShareGroupOffsetsResponseData> future = CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> future =
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testDescribeShareGroupOffsetsNonexistentTopicWithDefaultPersister() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
int partition = 1;
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
.setGroupId("share-group-id")
.setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
.setTopicName("badtopic")
.setPartitions(List.of(partition))
));
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup responseData = new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setGroupId("share-group-id")
.setTopics(
List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
.setTopicName("badtopic")
.setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()))))
);
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> future =
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get()); assertEquals(responseData, future.get());
@ -2304,7 +2340,7 @@ public class GroupCoordinatorServiceTest {
service.startup(() -> 1); service.startup(() -> 1);
int partition = 1; int partition = 1;
DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData() DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
.setGroupId("share-group-id") .setGroupId("share-group-id")
.setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME) .setTopicName(TOPIC_NAME)
@ -2314,7 +2350,7 @@ public class GroupCoordinatorServiceTest {
when(persister.readSummary(ArgumentMatchers.any())) when(persister.readSummary(ArgumentMatchers.any()))
.thenReturn(CompletableFuture.failedFuture(new Exception("Unable to validate read state summary request"))); .thenReturn(CompletableFuture.failedFuture(new Exception("Unable to validate read state summary request")));
CompletableFuture<DescribeShareGroupOffsetsResponseData> future = CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> future =
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData);
assertFutureThrows(future, Exception.class, "Unable to validate read state summary request"); assertFutureThrows(future, Exception.class, "Unable to validate read state summary request");
} }
@ -2331,7 +2367,7 @@ public class GroupCoordinatorServiceTest {
service.startup(() -> 1); service.startup(() -> 1);
int partition = 1; int partition = 1;
DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData() DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
.setGroupId("share-group-id") .setGroupId("share-group-id")
.setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME) .setTopicName(TOPIC_NAME)
@ -2341,7 +2377,7 @@ public class GroupCoordinatorServiceTest {
when(persister.readSummary(ArgumentMatchers.any())) when(persister.readSummary(ArgumentMatchers.any()))
.thenReturn(CompletableFuture.completedFuture(null)); .thenReturn(CompletableFuture.completedFuture(null));
CompletableFuture<DescribeShareGroupOffsetsResponseData> future = CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> future =
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData);
assertFutureThrows(future, IllegalStateException.class, "Result is null for the read state summary"); assertFutureThrows(future, IllegalStateException.class, "Result is null for the read state summary");
} }
@ -2358,7 +2394,7 @@ public class GroupCoordinatorServiceTest {
service.startup(() -> 1); service.startup(() -> 1);
int partition = 1; int partition = 1;
DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData() DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
.setGroupId("share-group-id") .setGroupId("share-group-id")
.setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME) .setTopicName(TOPIC_NAME)
@ -2371,7 +2407,7 @@ public class GroupCoordinatorServiceTest {
when(persister.readSummary(ArgumentMatchers.any())) when(persister.readSummary(ArgumentMatchers.any()))
.thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult)); .thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
CompletableFuture<DescribeShareGroupOffsetsResponseData> future = CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> future =
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData);
assertFutureThrows(future, IllegalStateException.class, "Result is null for the read state summary"); assertFutureThrows(future, IllegalStateException.class, "Result is null for the read state summary");
} }
@ -2385,26 +2421,19 @@ public class GroupCoordinatorServiceTest {
.build(); .build();
int partition = 1; int partition = 1;
DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData() DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
.setGroupId("share-group-id") .setGroupId("share-group-id")
.setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME) .setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition)) .setPartitions(List.of(partition))
)); ));
DescribeShareGroupOffsetsResponseData responseData = new DescribeShareGroupOffsetsResponseData() DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup responseData = new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setResponses( .setGroupId("share-group-id")
List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setStartOffset(0)
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message()))) .setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message());
)
);
CompletableFuture<DescribeShareGroupOffsetsResponseData> future = CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> future =
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get()); assertEquals(responseData, future.get());
@ -2422,26 +2451,19 @@ public class GroupCoordinatorServiceTest {
service.onNewMetadataImage(null, null); service.onNewMetadataImage(null, null);
int partition = 1; int partition = 1;
DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData() DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
.setGroupId("share-group-id") .setGroupId("share-group-id")
.setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME) .setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition)) .setPartitions(List.of(partition))
)); ));
DescribeShareGroupOffsetsResponseData responseData = new DescribeShareGroupOffsetsResponseData() DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup responseData = new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setResponses( .setGroupId("share-group-id")
List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic() .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setTopicName(TOPIC_NAME) .setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message());
.setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setStartOffset(0)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())))
)
);
CompletableFuture<DescribeShareGroupOffsetsResponseData> future = CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> future =
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get()); assertEquals(responseData, future.get());