From 7d027a4d83af15428d4aa9bdb2144adcad73408c Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Tue, 6 May 2025 14:53:12 +0100 Subject: [PATCH] KAFKA-19218: Add missing leader epoch to share group state summary response (#19602) When the persister is responding to a read share-group state summary request, it has no way of including the leader epoch in its response, even though it has the information to hand. This means that the leader epoch information is not initialised in the admin client operation to list share group offsets, and this then means that the information cannot be displayed in kafka-share-groups.sh. Reviewers: Apoorv Mittal , Sushant Mahajan --- .../kafka/clients/admin/KafkaAdminClient.java | 2 +- .../admin/ListShareGroupOffsetsResult.java | 11 +- .../ListShareGroupOffsetsHandler.java | 20 ++-- .../ReadShareGroupStateSummaryResponse.java | 2 + .../ReadShareGroupStateSummaryResponse.json | 2 + .../clients/admin/AdminClientTestUtils.java | 4 +- .../clients/admin/KafkaAdminClientTest.java | 110 ++++++++++++------ .../group/GroupCoordinatorService.java | 1 + .../persister/DefaultStatePersister.java | 2 + .../share/persister/NoOpStatePersister.java | 3 +- .../share/persister/PartitionFactory.java | 8 +- .../persister/PartitionStateSummaryData.java | 2 + .../ReadShareGroupStateSummaryResult.java | 3 +- .../persister/DefaultStatePersisterTest.java | 18 +-- .../share/ShareCoordinatorShard.java | 2 + .../share/ShareCoordinatorShardTest.java | 1 + .../consumer/group/ShareGroupCommand.java | 39 +++++-- .../consumer/group/ShareGroupCommandTest.java | 13 ++- 18 files changed, 155 insertions(+), 88 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 91698c813e7..0f37edb2dd8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -3846,7 +3846,7 @@ public class KafkaAdminClient extends AdminClient { @Override public ListShareGroupOffsetsResult listShareGroupOffsets(final Map groupSpecs, final ListShareGroupOffsetsOptions options) { - SimpleAdminApiFuture> future = ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet()); + SimpleAdminApiFuture> future = ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet()); ListShareGroupOffsetsHandler handler = new ListShareGroupOffsetsHandler(groupSpecs, logContext); invokeDriver(handler, future, options.timeoutMs); return new ListShareGroupOffsetsResult(future.all()); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java index d39f3711f4c..e1dcd932309 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.clients.admin.internals.CoordinatorKey; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.annotation.InterfaceStability; @@ -35,9 +36,9 @@ import java.util.stream.Collectors; @InterfaceStability.Evolving public class ListShareGroupOffsetsResult { - private final Map>> futures; + private final Map>> futures; - ListShareGroupOffsetsResult(final Map>> futures) { + ListShareGroupOffsetsResult(final Map>> futures) { this.futures = futures.entrySet().stream() .collect(Collectors.toMap(e -> e.getKey().idValue, Map.Entry::getValue)); } @@ -47,10 +48,10 @@ public class ListShareGroupOffsetsResult { * * @return Future which yields all {@code Map>} objects, if requests for all the groups succeed. */ - public KafkaFuture>> all() { + public KafkaFuture>> all() { return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply( nil -> { - Map> offsets = new HashMap<>(futures.size()); + Map> offsets = new HashMap<>(futures.size()); futures.forEach((groupId, future) -> { try { offsets.put(groupId, future.get()); @@ -70,7 +71,7 @@ public class ListShareGroupOffsetsResult { * @param groupId The group ID. * @return Future which yields a map of topic partitions to offsets for the specified group. */ - public KafkaFuture> partitionsToOffset(String groupId) { + public KafkaFuture> partitionsToOffsetAndMetadata(String groupId) { if (!futures.containsKey(groupId)) { throw new IllegalArgumentException("Group ID not found: " + groupId); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java index fcaba5a67e6..f9b9e987930 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java @@ -19,6 +19,7 @@ package org.apache.kafka.clients.admin.internals; import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions; import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData; @@ -39,13 +40,14 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; /** * This class is the handler for {@link KafkaAdminClient#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)} call */ -public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched> { +public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched> { private final Map groupSpecs; private final Logger log; @@ -58,7 +60,7 @@ public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched> newFuture(Collection groupIds) { + public static AdminApiFuture.SimpleAdminApiFuture> newFuture(Collection groupIds) { return AdminApiFuture.forKeys(coordinatorKeys(groupIds)); } @@ -108,13 +110,13 @@ public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched> handleResponse(Node coordinator, - Set groupIds, - AbstractResponse abstractResponse) { + public ApiResult> handleResponse(Node coordinator, + Set groupIds, + AbstractResponse abstractResponse) { validateKeys(groupIds); final DescribeShareGroupOffsetsResponse response = (DescribeShareGroupOffsetsResponse) abstractResponse; - final Map> completed = new HashMap<>(); + final Map> completed = new HashMap<>(); final Map failed = new HashMap<>(); final List unmapped = new ArrayList<>(); @@ -123,17 +125,19 @@ public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched groupOffsetsListing = new HashMap<>(); + Map groupOffsetsListing = new HashMap<>(); response.data().groups().stream().filter(g -> g.groupId().equals(groupId)).forEach(groupResponse -> { for (DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic topicResponse : groupResponse.topics()) { for (DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition partitionResponse : topicResponse.partitions()) { TopicPartition tp = new TopicPartition(topicResponse.topicName(), partitionResponse.partitionIndex()); if (partitionResponse.errorCode() == Errors.NONE.code()) { + final long startOffset = partitionResponse.startOffset(); + final Optional leaderEpoch = partitionResponse.leaderEpoch() < 0 ? Optional.empty() : Optional.of(partitionResponse.leaderEpoch()); // Negative offset indicates there is no start offset for this partition if (partitionResponse.startOffset() < 0) { groupOffsetsListing.put(tp, null); } else { - groupOffsetsListing.put(tp, partitionResponse.startOffset()); + groupOffsetsListing.put(tp, new OffsetAndMetadata(startOffset, leaderEpoch, "")); } } else { log.warn("Skipping return offset for {} due to error {}: {}.", tp, partitionResponse.errorCode(), partitionResponse.errorMessage()); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java index 86363add1e1..a2787ff82c9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java @@ -99,6 +99,7 @@ public class ReadShareGroupStateSummaryResponse extends AbstractResponse { Uuid topicId, int partition, long startOffset, + int leaderEpoch, int stateEpoch ) { return new ReadShareGroupStateSummaryResponseData() @@ -109,6 +110,7 @@ public class ReadShareGroupStateSummaryResponse extends AbstractResponse { new ReadShareGroupStateSummaryResponseData.PartitionResult() .setPartition(partition) .setStartOffset(startOffset) + .setLeaderEpoch(leaderEpoch) .setStateEpoch(stateEpoch) )) )); diff --git a/clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json b/clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json index ddf9d7044a6..81e3edc554e 100644 --- a/clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json +++ b/clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json @@ -41,6 +41,8 @@ "about": "The error message, or null if there was no error." }, { "name": "StateEpoch", "type": "int32", "versions": "0+", "about": "The state epoch of the share-partition." }, + { "name": "LeaderEpoch", "type": "int32", "versions": "0+", + "about": "The leader epoch of the share-partition." }, { "name": "StartOffset", "type": "int64", "versions": "0+", "about": "The share-partition start offset." } ]} diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java index 061982e34d3..c98ffb9483f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java @@ -176,8 +176,8 @@ public class AdminClientTestUtils { return new ListClientMetricsResourcesResult(future); } - public static ListShareGroupOffsetsResult createListShareGroupOffsetsResult(Map>> groupOffsets) { - Map>> coordinatorFutures = groupOffsets.entrySet().stream() + public static ListShareGroupOffsetsResult createListShareGroupOffsetsResult(Map>> groupOffsets) { + Map>> coordinatorFutures = groupOffsets.entrySet().stream() .collect(Collectors.toMap( entry -> CoordinatorKey.byGroupId(entry.getKey()), Map.Entry::getValue diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 339a431aa1c..74c381be17e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -10577,12 +10577,24 @@ public class KafkaAdminClientTest { List.of( new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId(GROUP_ID).setTopics( List.of( - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10))), - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11))), - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(2).setStartOffset(40))), - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(3).setStartOffset(50))), - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100))), - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500))) + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions( + List.of( + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10).setLeaderEpoch(0), + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11).setLeaderEpoch(0), + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(2).setStartOffset(40).setLeaderEpoch(0), + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(3).setStartOffset(50).setLeaderEpoch(1) + ) + ), + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions( + List.of( + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100).setLeaderEpoch(2) + ) + ), + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions( + List.of( + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500).setLeaderEpoch(3) + ) + ) ) ) ) @@ -10590,15 +10602,15 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data)); final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs); - final Map partitionToOffsetAndMetadata = result.partitionsToOffset(GROUP_ID).get(); + final Map partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata(GROUP_ID).get(); assertEquals(6, partitionToOffsetAndMetadata.size()); - assertEquals(10, partitionToOffsetAndMetadata.get(myTopicPartition0)); - assertEquals(11, partitionToOffsetAndMetadata.get(myTopicPartition1)); - assertEquals(40, partitionToOffsetAndMetadata.get(myTopicPartition2)); - assertEquals(50, partitionToOffsetAndMetadata.get(myTopicPartition3)); - assertEquals(100, partitionToOffsetAndMetadata.get(myTopicPartition4)); - assertEquals(500, partitionToOffsetAndMetadata.get(myTopicPartition5)); + assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition0)); + assertEquals(new OffsetAndMetadata(11, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition1)); + assertEquals(new OffsetAndMetadata(40, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition2)); + assertEquals(new OffsetAndMetadata(50, Optional.of(1), ""), partitionToOffsetAndMetadata.get(myTopicPartition3)); + assertEquals(new OffsetAndMetadata(100, Optional.of(2), ""), partitionToOffsetAndMetadata.get(myTopicPartition4)); + assertEquals(new OffsetAndMetadata(500, Optional.of(3), ""), partitionToOffsetAndMetadata.get(myTopicPartition5)); } } @@ -10630,16 +10642,28 @@ public class KafkaAdminClientTest { List.of( new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId(GROUP_ID).setTopics( List.of( - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10))), - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11))), - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(2).setStartOffset(40))), - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(3).setStartOffset(50))) + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions( + List.of( + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10).setLeaderEpoch(0), + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11).setLeaderEpoch(0), + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(2).setStartOffset(40).setLeaderEpoch(0), + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(3).setStartOffset(50).setLeaderEpoch(1) + ) + ) ) ), new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId("group-1").setTopics( List.of( - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100))), - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500))) + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions( + List.of( + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100).setLeaderEpoch(2) + ) + ), + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions( + List.of( + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500).setLeaderEpoch(2) + ) + ) ) ) ) @@ -10649,17 +10673,17 @@ public class KafkaAdminClientTest { final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs); assertEquals(2, result.all().get().size()); - final Map partitionToOffsetAndMetadataGroup0 = result.partitionsToOffset(GROUP_ID).get(); + final Map partitionToOffsetAndMetadataGroup0 = result.partitionsToOffsetAndMetadata(GROUP_ID).get(); assertEquals(4, partitionToOffsetAndMetadataGroup0.size()); - assertEquals(10, partitionToOffsetAndMetadataGroup0.get(myTopicPartition0)); - assertEquals(11, partitionToOffsetAndMetadataGroup0.get(myTopicPartition1)); - assertEquals(40, partitionToOffsetAndMetadataGroup0.get(myTopicPartition2)); - assertEquals(50, partitionToOffsetAndMetadataGroup0.get(myTopicPartition3)); + assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition0)); + assertEquals(new OffsetAndMetadata(11, Optional.of(0), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition1)); + assertEquals(new OffsetAndMetadata(40, Optional.of(0), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition2)); + assertEquals(new OffsetAndMetadata(50, Optional.of(1), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition3)); - final Map partitionToOffsetAndMetadataGroup1 = result.partitionsToOffset("group-1").get(); + final Map partitionToOffsetAndMetadataGroup1 = result.partitionsToOffsetAndMetadata("group-1").get(); assertEquals(2, partitionToOffsetAndMetadataGroup1.size()); - assertEquals(100, partitionToOffsetAndMetadataGroup1.get(myTopicPartition4)); - assertEquals(500, partitionToOffsetAndMetadataGroup1.get(myTopicPartition5)); + assertEquals(new OffsetAndMetadata(100, Optional.of(2), ""), partitionToOffsetAndMetadataGroup1.get(myTopicPartition4)); + assertEquals(new OffsetAndMetadata(500, Optional.of(2), ""), partitionToOffsetAndMetadataGroup1.get(myTopicPartition5)); } } @@ -10682,7 +10706,7 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data)); final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs); - final Map partitionToOffsetAndMetadata = result.partitionsToOffset(GROUP_ID).get(); + final Map partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata(GROUP_ID).get(); assertEquals(0, partitionToOffsetAndMetadata.size()); } @@ -10711,12 +10735,22 @@ public class KafkaAdminClientTest { List.of( new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId(GROUP_ID).setTopics( List.of( - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of( - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10), - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11) - )), - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setErrorCode(Errors.NOT_COORDINATOR.code()).setErrorMessage("Not a Coordinator"))), - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500))) + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions( + List.of( + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10).setLeaderEpoch(0), + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11).setLeaderEpoch(1) + ) + ), + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions( + List.of( + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setErrorCode(Errors.NOT_COORDINATOR.code()).setErrorMessage("Not a Coordinator") + ) + ), + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions( + List.of( + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500).setLeaderEpoch(2) + ) + ) ) ) ) @@ -10724,13 +10758,13 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data)); final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs); - final Map partitionToOffsetAndMetadata = result.partitionsToOffset(GROUP_ID).get(); + final Map partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata(GROUP_ID).get(); // For myTopicPartition2 we have set an error as the response. Thus, it should be skipped from the final result assertEquals(3, partitionToOffsetAndMetadata.size()); - assertEquals(10, partitionToOffsetAndMetadata.get(myTopicPartition0)); - assertEquals(11, partitionToOffsetAndMetadata.get(myTopicPartition1)); - assertEquals(500, partitionToOffsetAndMetadata.get(myTopicPartition3)); + assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition0)); + assertEquals(new OffsetAndMetadata(11, Optional.of(1), ""), partitionToOffsetAndMetadata.get(myTopicPartition1)); + assertEquals(new OffsetAndMetadata(500, Optional.of(2), ""), partitionToOffsetAndMetadata.get(myTopicPartition3)); } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index ebccd66359b..cc2566b492e 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -1702,6 +1702,7 @@ public class GroupCoordinatorService implements GroupCoordinator { partitionData -> new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(partitionData.partition()) .setStartOffset(partitionData.errorCode() == Errors.NONE.code() ? partitionData.startOffset() : PartitionFactory.UNINITIALIZED_START_OFFSET) + .setLeaderEpoch(partitionData.errorCode() == Errors.NONE.code() ? partitionData.leaderEpoch() : PartitionFactory.DEFAULT_LEADER_EPOCH) ).toList()) )); diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java index 1b4a0756515..ae8b8c317c3 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java @@ -486,6 +486,7 @@ public class DefaultStatePersister implements Persister { partitionResult.partition(), partitionResult.stateEpoch(), partitionResult.startOffset(), + partitionResult.leaderEpoch(), partitionResult.errorCode(), partitionResult.errorMessage())) .toList(); @@ -495,6 +496,7 @@ public class DefaultStatePersister implements Persister { partition, -1, -1, + -1, Errors.UNKNOWN_SERVER_ERROR.code(), // No specific public error code exists for InterruptedException / ExecutionException "Error reading state from share coordinator: " + e.getMessage())); } diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java index 2814fc3aa82..908891dd463 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java @@ -92,7 +92,8 @@ public class NoOpStatePersister implements Persister { for (TopicData topicData : reqData.topicsData()) { resultArgs.add(new TopicData<>(topicData.topicId(), topicData.partitions().stream(). map(partitionIdData -> PartitionFactory.newPartitionStateSummaryData( - partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.UNINITIALIZED_START_OFFSET, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE)) + partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.UNINITIALIZED_START_OFFSET, + PartitionFactory.DEFAULT_LEADER_EPOCH, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE)) .collect(Collectors.toList()))); } return CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResult.Builder().setTopicsData(resultArgs).build()); diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java index 009eb9cccc1..78a6902a170 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java @@ -47,12 +47,8 @@ public class PartitionFactory { return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null); } - public static PartitionStateErrorData newPartitionStateErrorData(int partition, int stateEpoch, long startOffset, short errorCode, String errorMessage) { - return new PartitionData(partition, stateEpoch, startOffset, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null); - } - - public static PartitionStateSummaryData newPartitionStateSummaryData(int partition, int stateEpoch, long startOffset, short errorCode, String errorMessage) { - return new PartitionData(partition, stateEpoch, startOffset, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null); + public static PartitionStateSummaryData newPartitionStateSummaryData(int partition, int stateEpoch, long startOffset, int leaderEpoch, short errorCode, String errorMessage) { + return new PartitionData(partition, stateEpoch, startOffset, errorCode, errorMessage, leaderEpoch, null); } public static PartitionStateBatchData newPartitionStateBatchData(int partition, int stateEpoch, long startOffset, int leaderEpoch, List stateBatches) { diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java index dc4732a79ae..58a9dc10615 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java @@ -24,6 +24,8 @@ package org.apache.kafka.server.share.persister; public interface PartitionStateSummaryData extends PartitionInfoData, PartitionIdData { int stateEpoch(); + int leaderEpoch(); + long startOffset(); short errorCode(); diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java index 7e0bee13c38..249eb20ed94 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java @@ -38,7 +38,8 @@ public class ReadShareGroupStateSummaryResult implements PersisterResult { .map(readStateSummaryResult -> new TopicData<>(readStateSummaryResult.topicId(), readStateSummaryResult.partitions().stream() .map(partitionResult -> PartitionFactory.newPartitionStateSummaryData( - partitionResult.partition(), partitionResult.stateEpoch(), partitionResult.startOffset(), partitionResult.errorCode(), partitionResult.errorMessage())) + partitionResult.partition(), partitionResult.stateEpoch(), partitionResult.startOffset(), + partitionResult.leaderEpoch(), partitionResult.errorCode(), partitionResult.errorMessage())) .collect(Collectors.toList()))) .collect(Collectors.toList())) .build(); diff --git a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java index 1a668b5dc32..697d958723a 100644 --- a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java @@ -868,7 +868,7 @@ class DefaultStatePersisterTest { return requestGroupId.equals(groupId) && requestTopicId == topicId1 && requestPartition == partition1; }, - new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId1, partition1, 0, 1)), + new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId1, partition1, 0, 1, 1)), coordinatorNode1); client.prepareResponseFrom( @@ -880,7 +880,7 @@ class DefaultStatePersisterTest { return requestGroupId.equals(groupId) && requestTopicId == topicId2 && requestPartition == partition2; }, - new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId2, partition2, 0, 1)), + new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId2, partition2, 0, 1, 1)), coordinatorNode2); ShareCoordinatorMetadataCacheHelper cacheHelper = getDefaultCacheHelper(suppliedNode); @@ -930,12 +930,12 @@ class DefaultStatePersisterTest { HashSet expectedResultMap = new HashSet<>(); expectedResultMap.add( - (PartitionData) PartitionFactory.newPartitionStateSummaryData(partition1, 1, 0, Errors.NONE.code(), + (PartitionData) PartitionFactory.newPartitionStateSummaryData(partition1, 1, 0, 1, Errors.NONE.code(), null )); expectedResultMap.add( - (PartitionData) PartitionFactory.newPartitionStateSummaryData(partition2, 1, 0, Errors.NONE.code(), + (PartitionData) PartitionFactory.newPartitionStateSummaryData(partition2, 1, 0, 1, Errors.NONE.code(), null )); @@ -1437,6 +1437,7 @@ class DefaultStatePersisterTest { tp1.topicId(), tp1.partition(), 1L, + 1, 2 ) ) @@ -1468,7 +1469,7 @@ class DefaultStatePersisterTest { results.topicsData().contains( new TopicData<>( tp1.topicId(), - List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, Errors.NONE.code(), null)) + List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 1, Errors.NONE.code(), null)) ) ) ); @@ -1476,7 +1477,7 @@ class DefaultStatePersisterTest { results.topicsData().contains( new TopicData<>( tp2.topicId(), - List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "unknown tp")) + List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), 0, 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "unknown tp")) ) ) ); @@ -1496,6 +1497,7 @@ class DefaultStatePersisterTest { tp1.topicId(), tp1.partition(), 1L, + 1, 2 ) ) @@ -1517,7 +1519,7 @@ class DefaultStatePersisterTest { results.topicsData().contains( new TopicData<>( tp1.topicId(), - List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, Errors.NONE.code(), null)) + List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 1, Errors.NONE.code(), null)) ) ) ); @@ -1525,7 +1527,7 @@ class DefaultStatePersisterTest { results.topicsData().contains( new TopicData<>( tp2.topicId(), - List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), -1, -1L, Errors.UNKNOWN_SERVER_ERROR.code(), "Error reading state from share coordinator: java.lang.Exception: scary stuff")) + List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), -1, -1L, -1, Errors.UNKNOWN_SERVER_ERROR.code(), "Error reading state from share coordinator: java.lang.Exception: scary stuff")) ) ) ); diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java index d38564fd6f8..7f03f9254b1 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java @@ -453,6 +453,7 @@ public class ShareCoordinatorShard implements CoordinatorShard earliestResult = adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId); + Map startOffsets = adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId); Set partitionOffsets = new HashSet<>(); - for (Entry tp : earliestResult.entrySet()) { - SharePartitionOffsetInformation partitionOffsetInfo = new SharePartitionOffsetInformation( - groupId, - tp.getKey().topic(), - tp.getKey().partition(), - Optional.ofNullable(earliestResult.get(tp.getKey())) - ); - partitionOffsets.add(partitionOffsetInfo); - } + startOffsets.forEach((tp, offsetAndMetadata) -> { + if (offsetAndMetadata != null) { + partitionOffsets.add(new SharePartitionOffsetInformation( + groupId, + tp.topic(), + tp.partition(), + Optional.of(offsetAndMetadata.offset()), + offsetAndMetadata.leaderEpoch() + )); + } else { + partitionOffsets.add(new SharePartitionOffsetInformation( + groupId, + tp.topic(), + tp.partition(), + Optional.empty(), + Optional.empty() + )); + } + }); + groupOffsets.put(groupId, new SimpleImmutableEntry<>(shareGroup, partitionOffsets)); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); @@ -447,7 +459,7 @@ public class ShareGroupCommand { groupId, info.topic, info.partition, - MISSING_COLUMN_VALUE, // Temporary + info.leaderEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE), info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE) ); } else { @@ -569,17 +581,20 @@ public class ShareGroupCommand { final String topic; final int partition; final Optional offset; + final Optional leaderEpoch; SharePartitionOffsetInformation( String group, String topic, int partition, - Optional offset + Optional offset, + Optional leaderEpoch ) { this.group = group; this.topic = topic; this.partition = partition; this.offset = offset; + this.leaderEpoch = leaderEpoch; } } } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java index f1f91217511..546cab50e0d 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.admin.ShareGroupDescription; import org.apache.kafka.clients.admin.ShareMemberAssignment; import org.apache.kafka.clients.admin.ShareMemberDescription; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; import org.apache.kafka.common.KafkaFuture; @@ -191,7 +192,7 @@ public class ShareGroupCommandTest { ListShareGroupOffsetsResult listShareGroupOffsetsResult = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( firstGroup, - KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), 0L)) + KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), new OffsetAndMetadata(0L, Optional.of(1), ""))) ) ); @@ -208,7 +209,7 @@ public class ShareGroupCommandTest { List expectedValues; if (describeType.contains("--verbose")) { - expectedValues = List.of(firstGroup, "topic1", "0", "-", "0"); + expectedValues = List.of(firstGroup, "topic1", "0", "1", "0"); } else { expectedValues = List.of(firstGroup, "topic1", "0", "0"); } @@ -299,13 +300,13 @@ public class ShareGroupCommandTest { ListShareGroupOffsetsResult listShareGroupOffsetsResult1 = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( firstGroup, - KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), 0L)) + KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), new OffsetAndMetadata(0, Optional.of(1), ""))) ) ); ListShareGroupOffsetsResult listShareGroupOffsetsResult2 = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( secondGroup, - KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), 0L)) + KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), new OffsetAndMetadata(0, Optional.of(1), ""))) ) ); @@ -333,8 +334,8 @@ public class ShareGroupCommandTest { List expectedValues1, expectedValues2; if (describeType.contains("--verbose")) { - expectedValues1 = List.of(firstGroup, "topic1", "0", "-", "0"); - expectedValues2 = List.of(secondGroup, "topic1", "0", "-", "0"); + expectedValues1 = List.of(firstGroup, "topic1", "0", "1", "0"); + expectedValues2 = List.of(secondGroup, "topic1", "0", "1", "0"); } else { expectedValues1 = List.of(firstGroup, "topic1", "0", "0"); expectedValues2 = List.of(secondGroup, "topic1", "0", "0");