KAFKA-19218: Add missing leader epoch to share group state summary response (#19602)
CI / build (push) Waiting to run Details

When the persister is responding to a read share-group state summary
request, it has no way of including the leader epoch in its response,
even though it has the information to hand. This means that the leader
epoch information is not initialised in the admin client operation to
list share group offsets, and this then means that the information
cannot be displayed in kafka-share-groups.sh.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Sushant Mahajan
 <smahajan@confluent.io>
This commit is contained in:
Andrew Schofield 2025-05-06 14:53:12 +01:00 committed by GitHub
parent 0810650da1
commit 7d027a4d83
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 155 additions and 88 deletions

View File

@ -3846,7 +3846,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
public ListShareGroupOffsetsResult listShareGroupOffsets(final Map<String, ListShareGroupOffsetsSpec> groupSpecs,
final ListShareGroupOffsetsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Long>> future = ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet());
SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> future = ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet());
ListShareGroupOffsetsHandler handler = new ListShareGroupOffsetsHandler(groupSpecs, logContext);
invokeDriver(handler, future, options.timeoutMs);
return new ListShareGroupOffsetsResult(future.all());

View File

@ -18,6 +18,7 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
@ -35,9 +36,9 @@ import java.util.stream.Collectors;
@InterfaceStability.Evolving
public class ListShareGroupOffsetsResult {
private final Map<String, KafkaFuture<Map<TopicPartition, Long>>> futures;
private final Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures;
ListShareGroupOffsetsResult(final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, Long>>> futures) {
ListShareGroupOffsetsResult(final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
this.futures = futures.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().idValue, Map.Entry::getValue));
}
@ -47,10 +48,10 @@ public class ListShareGroupOffsetsResult {
*
* @return Future which yields all {@code Map<String, Map<TopicPartition, Long>>} objects, if requests for all the groups succeed.
*/
public KafkaFuture<Map<String, Map<TopicPartition, Long>>> all() {
public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture<?>[0])).thenApply(
nil -> {
Map<String, Map<TopicPartition, Long>> offsets = new HashMap<>(futures.size());
Map<String, Map<TopicPartition, OffsetAndMetadata>> offsets = new HashMap<>(futures.size());
futures.forEach((groupId, future) -> {
try {
offsets.put(groupId, future.get());
@ -70,7 +71,7 @@ public class ListShareGroupOffsetsResult {
* @param groupId The group ID.
* @return Future which yields a map of topic partitions to offsets for the specified group.
*/
public KafkaFuture<Map<TopicPartition, Long>> partitionsToOffset(String groupId) {
public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata(String groupId) {
if (!futures.containsKey(groupId)) {
throw new IllegalArgumentException("Group ID not found: " + groupId);
}

View File

@ -19,6 +19,7 @@ package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
@ -39,13 +40,14 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
* This class is the handler for {@link KafkaAdminClient#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)} call
*/
public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, Long>> {
public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> {
private final Map<String, ListShareGroupOffsetsSpec> groupSpecs;
private final Logger log;
@ -58,7 +60,7 @@ public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched<Coordi
this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext);
}
public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Long>> newFuture(Collection<String> groupIds) {
public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> newFuture(Collection<String> groupIds) {
return AdminApiFuture.forKeys(coordinatorKeys(groupIds));
}
@ -108,13 +110,13 @@ public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched<Coordi
}
@Override
public ApiResult<CoordinatorKey, Map<TopicPartition, Long>> handleResponse(Node coordinator,
public ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleResponse(Node coordinator,
Set<CoordinatorKey> groupIds,
AbstractResponse abstractResponse) {
validateKeys(groupIds);
final DescribeShareGroupOffsetsResponse response = (DescribeShareGroupOffsetsResponse) abstractResponse;
final Map<CoordinatorKey, Map<TopicPartition, Long>> completed = new HashMap<>();
final Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> completed = new HashMap<>();
final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
final List<CoordinatorKey> unmapped = new ArrayList<>();
@ -123,17 +125,19 @@ public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched<Coordi
if (response.hasGroupError(groupId)) {
handleGroupError(coordinatorKey, response.groupError(groupId), failed, unmapped);
} else {
Map<TopicPartition, Long> groupOffsetsListing = new HashMap<>();
Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>();
response.data().groups().stream().filter(g -> g.groupId().equals(groupId)).forEach(groupResponse -> {
for (DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic topicResponse : groupResponse.topics()) {
for (DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition partitionResponse : topicResponse.partitions()) {
TopicPartition tp = new TopicPartition(topicResponse.topicName(), partitionResponse.partitionIndex());
if (partitionResponse.errorCode() == Errors.NONE.code()) {
final long startOffset = partitionResponse.startOffset();
final Optional<Integer> leaderEpoch = partitionResponse.leaderEpoch() < 0 ? Optional.empty() : Optional.of(partitionResponse.leaderEpoch());
// Negative offset indicates there is no start offset for this partition
if (partitionResponse.startOffset() < 0) {
groupOffsetsListing.put(tp, null);
} else {
groupOffsetsListing.put(tp, partitionResponse.startOffset());
groupOffsetsListing.put(tp, new OffsetAndMetadata(startOffset, leaderEpoch, ""));
}
} else {
log.warn("Skipping return offset for {} due to error {}: {}.", tp, partitionResponse.errorCode(), partitionResponse.errorMessage());

View File

@ -99,6 +99,7 @@ public class ReadShareGroupStateSummaryResponse extends AbstractResponse {
Uuid topicId,
int partition,
long startOffset,
int leaderEpoch,
int stateEpoch
) {
return new ReadShareGroupStateSummaryResponseData()
@ -109,6 +110,7 @@ public class ReadShareGroupStateSummaryResponse extends AbstractResponse {
new ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(partition)
.setStartOffset(startOffset)
.setLeaderEpoch(leaderEpoch)
.setStateEpoch(stateEpoch)
))
));

View File

@ -41,6 +41,8 @@
"about": "The error message, or null if there was no error." },
{ "name": "StateEpoch", "type": "int32", "versions": "0+",
"about": "The state epoch of the share-partition." },
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
"about": "The leader epoch of the share-partition." },
{ "name": "StartOffset", "type": "int64", "versions": "0+",
"about": "The share-partition start offset." }
]}

View File

@ -176,8 +176,8 @@ public class AdminClientTestUtils {
return new ListClientMetricsResourcesResult(future);
}
public static ListShareGroupOffsetsResult createListShareGroupOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition, Long>>> groupOffsets) {
Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, Long>>> coordinatorFutures = groupOffsets.entrySet().stream()
public static ListShareGroupOffsetsResult createListShareGroupOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> groupOffsets) {
Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> coordinatorFutures = groupOffsets.entrySet().stream()
.collect(Collectors.toMap(
entry -> CoordinatorKey.byGroupId(entry.getKey()),
Map.Entry::getValue

View File

@ -10577,12 +10577,24 @@ public class KafkaAdminClientTest {
List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId(GROUP_ID).setTopics(
List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10))),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11))),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(2).setStartOffset(40))),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(3).setStartOffset(50))),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100))),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500)))
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(
List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10).setLeaderEpoch(0),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11).setLeaderEpoch(0),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(2).setStartOffset(40).setLeaderEpoch(0),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(3).setStartOffset(50).setLeaderEpoch(1)
)
),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(
List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100).setLeaderEpoch(2)
)
),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(
List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500).setLeaderEpoch(3)
)
)
)
)
)
@ -10590,15 +10602,15 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data));
final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs);
final Map<TopicPartition, Long> partitionToOffsetAndMetadata = result.partitionsToOffset(GROUP_ID).get();
final Map<TopicPartition, OffsetAndMetadata> partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata(GROUP_ID).get();
assertEquals(6, partitionToOffsetAndMetadata.size());
assertEquals(10, partitionToOffsetAndMetadata.get(myTopicPartition0));
assertEquals(11, partitionToOffsetAndMetadata.get(myTopicPartition1));
assertEquals(40, partitionToOffsetAndMetadata.get(myTopicPartition2));
assertEquals(50, partitionToOffsetAndMetadata.get(myTopicPartition3));
assertEquals(100, partitionToOffsetAndMetadata.get(myTopicPartition4));
assertEquals(500, partitionToOffsetAndMetadata.get(myTopicPartition5));
assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition0));
assertEquals(new OffsetAndMetadata(11, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition1));
assertEquals(new OffsetAndMetadata(40, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition2));
assertEquals(new OffsetAndMetadata(50, Optional.of(1), ""), partitionToOffsetAndMetadata.get(myTopicPartition3));
assertEquals(new OffsetAndMetadata(100, Optional.of(2), ""), partitionToOffsetAndMetadata.get(myTopicPartition4));
assertEquals(new OffsetAndMetadata(500, Optional.of(3), ""), partitionToOffsetAndMetadata.get(myTopicPartition5));
}
}
@ -10630,16 +10642,28 @@ public class KafkaAdminClientTest {
List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId(GROUP_ID).setTopics(
List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10))),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11))),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(2).setStartOffset(40))),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(3).setStartOffset(50)))
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(
List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10).setLeaderEpoch(0),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11).setLeaderEpoch(0),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(2).setStartOffset(40).setLeaderEpoch(0),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(3).setStartOffset(50).setLeaderEpoch(1)
)
)
)
),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId("group-1").setTopics(
List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100))),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500)))
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(
List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100).setLeaderEpoch(2)
)
),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(
List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500).setLeaderEpoch(2)
)
)
)
)
)
@ -10649,17 +10673,17 @@ public class KafkaAdminClientTest {
final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs);
assertEquals(2, result.all().get().size());
final Map<TopicPartition, Long> partitionToOffsetAndMetadataGroup0 = result.partitionsToOffset(GROUP_ID).get();
final Map<TopicPartition, OffsetAndMetadata> partitionToOffsetAndMetadataGroup0 = result.partitionsToOffsetAndMetadata(GROUP_ID).get();
assertEquals(4, partitionToOffsetAndMetadataGroup0.size());
assertEquals(10, partitionToOffsetAndMetadataGroup0.get(myTopicPartition0));
assertEquals(11, partitionToOffsetAndMetadataGroup0.get(myTopicPartition1));
assertEquals(40, partitionToOffsetAndMetadataGroup0.get(myTopicPartition2));
assertEquals(50, partitionToOffsetAndMetadataGroup0.get(myTopicPartition3));
assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition0));
assertEquals(new OffsetAndMetadata(11, Optional.of(0), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition1));
assertEquals(new OffsetAndMetadata(40, Optional.of(0), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition2));
assertEquals(new OffsetAndMetadata(50, Optional.of(1), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition3));
final Map<TopicPartition, Long> partitionToOffsetAndMetadataGroup1 = result.partitionsToOffset("group-1").get();
final Map<TopicPartition, OffsetAndMetadata> partitionToOffsetAndMetadataGroup1 = result.partitionsToOffsetAndMetadata("group-1").get();
assertEquals(2, partitionToOffsetAndMetadataGroup1.size());
assertEquals(100, partitionToOffsetAndMetadataGroup1.get(myTopicPartition4));
assertEquals(500, partitionToOffsetAndMetadataGroup1.get(myTopicPartition5));
assertEquals(new OffsetAndMetadata(100, Optional.of(2), ""), partitionToOffsetAndMetadataGroup1.get(myTopicPartition4));
assertEquals(new OffsetAndMetadata(500, Optional.of(2), ""), partitionToOffsetAndMetadataGroup1.get(myTopicPartition5));
}
}
@ -10682,7 +10706,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data));
final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs);
final Map<TopicPartition, Long> partitionToOffsetAndMetadata = result.partitionsToOffset(GROUP_ID).get();
final Map<TopicPartition, OffsetAndMetadata> partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata(GROUP_ID).get();
assertEquals(0, partitionToOffsetAndMetadata.size());
}
@ -10711,12 +10735,22 @@ public class KafkaAdminClientTest {
List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId(GROUP_ID).setTopics(
List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11)
)),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setErrorCode(Errors.NOT_COORDINATOR.code()).setErrorMessage("Not a Coordinator"))),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500)))
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(
List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10).setLeaderEpoch(0),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11).setLeaderEpoch(1)
)
),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(
List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setErrorCode(Errors.NOT_COORDINATOR.code()).setErrorMessage("Not a Coordinator")
)
),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(
List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500).setLeaderEpoch(2)
)
)
)
)
)
@ -10724,13 +10758,13 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data));
final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs);
final Map<TopicPartition, Long> partitionToOffsetAndMetadata = result.partitionsToOffset(GROUP_ID).get();
final Map<TopicPartition, OffsetAndMetadata> partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata(GROUP_ID).get();
// For myTopicPartition2 we have set an error as the response. Thus, it should be skipped from the final result
assertEquals(3, partitionToOffsetAndMetadata.size());
assertEquals(10, partitionToOffsetAndMetadata.get(myTopicPartition0));
assertEquals(11, partitionToOffsetAndMetadata.get(myTopicPartition1));
assertEquals(500, partitionToOffsetAndMetadata.get(myTopicPartition3));
assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition0));
assertEquals(new OffsetAndMetadata(11, Optional.of(1), ""), partitionToOffsetAndMetadata.get(myTopicPartition1));
assertEquals(new OffsetAndMetadata(500, Optional.of(2), ""), partitionToOffsetAndMetadata.get(myTopicPartition3));
}
}

View File

@ -1702,6 +1702,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
partitionData -> new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partitionData.partition())
.setStartOffset(partitionData.errorCode() == Errors.NONE.code() ? partitionData.startOffset() : PartitionFactory.UNINITIALIZED_START_OFFSET)
.setLeaderEpoch(partitionData.errorCode() == Errors.NONE.code() ? partitionData.leaderEpoch() : PartitionFactory.DEFAULT_LEADER_EPOCH)
).toList())
));

View File

@ -486,6 +486,7 @@ public class DefaultStatePersister implements Persister {
partitionResult.partition(),
partitionResult.stateEpoch(),
partitionResult.startOffset(),
partitionResult.leaderEpoch(),
partitionResult.errorCode(),
partitionResult.errorMessage()))
.toList();
@ -495,6 +496,7 @@ public class DefaultStatePersister implements Persister {
partition,
-1,
-1,
-1,
Errors.UNKNOWN_SERVER_ERROR.code(), // No specific public error code exists for InterruptedException / ExecutionException
"Error reading state from share coordinator: " + e.getMessage()));
}

View File

@ -92,7 +92,8 @@ public class NoOpStatePersister implements Persister {
for (TopicData<PartitionIdLeaderEpochData> topicData : reqData.topicsData()) {
resultArgs.add(new TopicData<>(topicData.topicId(), topicData.partitions().stream().
map(partitionIdData -> PartitionFactory.newPartitionStateSummaryData(
partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.UNINITIALIZED_START_OFFSET, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE))
partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.UNINITIALIZED_START_OFFSET,
PartitionFactory.DEFAULT_LEADER_EPOCH, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE))
.collect(Collectors.toList())));
}
return CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResult.Builder().setTopicsData(resultArgs).build());

View File

@ -47,12 +47,8 @@ public class PartitionFactory {
return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null);
}
public static PartitionStateErrorData newPartitionStateErrorData(int partition, int stateEpoch, long startOffset, short errorCode, String errorMessage) {
return new PartitionData(partition, stateEpoch, startOffset, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null);
}
public static PartitionStateSummaryData newPartitionStateSummaryData(int partition, int stateEpoch, long startOffset, short errorCode, String errorMessage) {
return new PartitionData(partition, stateEpoch, startOffset, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null);
public static PartitionStateSummaryData newPartitionStateSummaryData(int partition, int stateEpoch, long startOffset, int leaderEpoch, short errorCode, String errorMessage) {
return new PartitionData(partition, stateEpoch, startOffset, errorCode, errorMessage, leaderEpoch, null);
}
public static PartitionStateBatchData newPartitionStateBatchData(int partition, int stateEpoch, long startOffset, int leaderEpoch, List<PersisterStateBatch> stateBatches) {

View File

@ -24,6 +24,8 @@ package org.apache.kafka.server.share.persister;
public interface PartitionStateSummaryData extends PartitionInfoData, PartitionIdData {
int stateEpoch();
int leaderEpoch();
long startOffset();
short errorCode();

View File

@ -38,7 +38,8 @@ public class ReadShareGroupStateSummaryResult implements PersisterResult {
.map(readStateSummaryResult -> new TopicData<>(readStateSummaryResult.topicId(),
readStateSummaryResult.partitions().stream()
.map(partitionResult -> PartitionFactory.newPartitionStateSummaryData(
partitionResult.partition(), partitionResult.stateEpoch(), partitionResult.startOffset(), partitionResult.errorCode(), partitionResult.errorMessage()))
partitionResult.partition(), partitionResult.stateEpoch(), partitionResult.startOffset(),
partitionResult.leaderEpoch(), partitionResult.errorCode(), partitionResult.errorMessage()))
.collect(Collectors.toList())))
.collect(Collectors.toList()))
.build();

View File

@ -868,7 +868,7 @@ class DefaultStatePersisterTest {
return requestGroupId.equals(groupId) && requestTopicId == topicId1 && requestPartition == partition1;
},
new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId1, partition1, 0, 1)),
new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId1, partition1, 0, 1, 1)),
coordinatorNode1);
client.prepareResponseFrom(
@ -880,7 +880,7 @@ class DefaultStatePersisterTest {
return requestGroupId.equals(groupId) && requestTopicId == topicId2 && requestPartition == partition2;
},
new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId2, partition2, 0, 1)),
new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId2, partition2, 0, 1, 1)),
coordinatorNode2);
ShareCoordinatorMetadataCacheHelper cacheHelper = getDefaultCacheHelper(suppliedNode);
@ -930,12 +930,12 @@ class DefaultStatePersisterTest {
HashSet<PartitionData> expectedResultMap = new HashSet<>();
expectedResultMap.add(
(PartitionData) PartitionFactory.newPartitionStateSummaryData(partition1, 1, 0, Errors.NONE.code(),
(PartitionData) PartitionFactory.newPartitionStateSummaryData(partition1, 1, 0, 1, Errors.NONE.code(),
null
));
expectedResultMap.add(
(PartitionData) PartitionFactory.newPartitionStateSummaryData(partition2, 1, 0, Errors.NONE.code(),
(PartitionData) PartitionFactory.newPartitionStateSummaryData(partition2, 1, 0, 1, Errors.NONE.code(),
null
));
@ -1437,6 +1437,7 @@ class DefaultStatePersisterTest {
tp1.topicId(),
tp1.partition(),
1L,
1,
2
)
)
@ -1468,7 +1469,7 @@ class DefaultStatePersisterTest {
results.topicsData().contains(
new TopicData<>(
tp1.topicId(),
List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, Errors.NONE.code(), null))
List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 1, Errors.NONE.code(), null))
)
)
);
@ -1476,7 +1477,7 @@ class DefaultStatePersisterTest {
results.topicsData().contains(
new TopicData<>(
tp2.topicId(),
List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "unknown tp"))
List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), 0, 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "unknown tp"))
)
)
);
@ -1496,6 +1497,7 @@ class DefaultStatePersisterTest {
tp1.topicId(),
tp1.partition(),
1L,
1,
2
)
)
@ -1517,7 +1519,7 @@ class DefaultStatePersisterTest {
results.topicsData().contains(
new TopicData<>(
tp1.topicId(),
List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, Errors.NONE.code(), null))
List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 1, Errors.NONE.code(), null))
)
)
);
@ -1525,7 +1527,7 @@ class DefaultStatePersisterTest {
results.topicsData().contains(
new TopicData<>(
tp2.topicId(),
List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), -1, -1L, Errors.UNKNOWN_SERVER_ERROR.code(), "Error reading state from share coordinator: java.lang.Exception: scary stuff"))
List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), -1, -1L, -1, Errors.UNKNOWN_SERVER_ERROR.code(), "Error reading state from share coordinator: java.lang.Exception: scary stuff"))
)
)
);

View File

@ -453,6 +453,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
topicId,
partitionId,
PartitionFactory.UNINITIALIZED_START_OFFSET,
PartitionFactory.DEFAULT_LEADER_EPOCH,
PartitionFactory.DEFAULT_STATE_EPOCH
);
} else {
@ -470,6 +471,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
topicId,
partitionId,
offsetValue.startOffset(),
offsetValue.leaderEpoch(),
offsetValue.stateEpoch()
);
}

View File

@ -605,6 +605,7 @@ class ShareCoordinatorShardTest {
TOPIC_ID,
PARTITION,
0,
0,
0
), result.response());

View File

@ -30,6 +30,7 @@ import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ShareGroupDescription;
import org.apache.kafka.clients.admin.ShareMemberAssignment;
import org.apache.kafka.clients.admin.ShareMemberDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaFuture;
@ -403,19 +404,30 @@ public class ShareGroupCommand {
groupSpecs.put(groupId, offsetsSpec);
try {
Map<TopicPartition, Long> earliestResult = adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
Map<TopicPartition, OffsetAndMetadata> startOffsets = adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
Set<SharePartitionOffsetInformation> partitionOffsets = new HashSet<>();
for (Entry<TopicPartition, Long> tp : earliestResult.entrySet()) {
SharePartitionOffsetInformation partitionOffsetInfo = new SharePartitionOffsetInformation(
startOffsets.forEach((tp, offsetAndMetadata) -> {
if (offsetAndMetadata != null) {
partitionOffsets.add(new SharePartitionOffsetInformation(
groupId,
tp.getKey().topic(),
tp.getKey().partition(),
Optional.ofNullable(earliestResult.get(tp.getKey()))
);
partitionOffsets.add(partitionOffsetInfo);
tp.topic(),
tp.partition(),
Optional.of(offsetAndMetadata.offset()),
offsetAndMetadata.leaderEpoch()
));
} else {
partitionOffsets.add(new SharePartitionOffsetInformation(
groupId,
tp.topic(),
tp.partition(),
Optional.empty(),
Optional.empty()
));
}
});
groupOffsets.put(groupId, new SimpleImmutableEntry<>(shareGroup, partitionOffsets));
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
@ -447,7 +459,7 @@ public class ShareGroupCommand {
groupId,
info.topic,
info.partition,
MISSING_COLUMN_VALUE, // Temporary
info.leaderEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
);
} else {
@ -569,17 +581,20 @@ public class ShareGroupCommand {
final String topic;
final int partition;
final Optional<Long> offset;
final Optional<Integer> leaderEpoch;
SharePartitionOffsetInformation(
String group,
String topic,
int partition,
Optional<Long> offset
Optional<Long> offset,
Optional<Integer> leaderEpoch
) {
this.group = group;
this.topic = topic;
this.partition = partition;
this.offset = offset;
this.leaderEpoch = leaderEpoch;
}
}
}

View File

@ -32,6 +32,7 @@ import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.ShareGroupDescription;
import org.apache.kafka.clients.admin.ShareMemberAssignment;
import org.apache.kafka.clients.admin.ShareMemberDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaFuture;
@ -191,7 +192,7 @@ public class ShareGroupCommandTest {
ListShareGroupOffsetsResult listShareGroupOffsetsResult = AdminClientTestUtils.createListShareGroupOffsetsResult(
Map.of(
firstGroup,
KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), 0L))
KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), new OffsetAndMetadata(0L, Optional.of(1), "")))
)
);
@ -208,7 +209,7 @@ public class ShareGroupCommandTest {
List<String> expectedValues;
if (describeType.contains("--verbose")) {
expectedValues = List.of(firstGroup, "topic1", "0", "-", "0");
expectedValues = List.of(firstGroup, "topic1", "0", "1", "0");
} else {
expectedValues = List.of(firstGroup, "topic1", "0", "0");
}
@ -299,13 +300,13 @@ public class ShareGroupCommandTest {
ListShareGroupOffsetsResult listShareGroupOffsetsResult1 = AdminClientTestUtils.createListShareGroupOffsetsResult(
Map.of(
firstGroup,
KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), 0L))
KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), new OffsetAndMetadata(0, Optional.of(1), "")))
)
);
ListShareGroupOffsetsResult listShareGroupOffsetsResult2 = AdminClientTestUtils.createListShareGroupOffsetsResult(
Map.of(
secondGroup,
KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), 0L))
KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), new OffsetAndMetadata(0, Optional.of(1), "")))
)
);
@ -333,8 +334,8 @@ public class ShareGroupCommandTest {
List<String> expectedValues1, expectedValues2;
if (describeType.contains("--verbose")) {
expectedValues1 = List.of(firstGroup, "topic1", "0", "-", "0");
expectedValues2 = List.of(secondGroup, "topic1", "0", "-", "0");
expectedValues1 = List.of(firstGroup, "topic1", "0", "1", "0");
expectedValues2 = List.of(secondGroup, "topic1", "0", "1", "0");
} else {
expectedValues1 = List.of(firstGroup, "topic1", "0", "0");
expectedValues2 = List.of(secondGroup, "topic1", "0", "0");