diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 91698c813e7..0f37edb2dd8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -3846,7 +3846,7 @@ public class KafkaAdminClient extends AdminClient { @Override public ListShareGroupOffsetsResult listShareGroupOffsets(final Map groupSpecs, final ListShareGroupOffsetsOptions options) { - SimpleAdminApiFuture> future = ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet()); + SimpleAdminApiFuture> future = ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet()); ListShareGroupOffsetsHandler handler = new ListShareGroupOffsetsHandler(groupSpecs, logContext); invokeDriver(handler, future, options.timeoutMs); return new ListShareGroupOffsetsResult(future.all()); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java index d39f3711f4c..e1dcd932309 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.clients.admin.internals.CoordinatorKey; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.annotation.InterfaceStability; @@ -35,9 +36,9 @@ import java.util.stream.Collectors; @InterfaceStability.Evolving public class ListShareGroupOffsetsResult { - private final Map>> futures; + private final Map>> futures; - ListShareGroupOffsetsResult(final Map>> futures) { + ListShareGroupOffsetsResult(final Map>> futures) { this.futures = futures.entrySet().stream() .collect(Collectors.toMap(e -> e.getKey().idValue, Map.Entry::getValue)); } @@ -47,10 +48,10 @@ public class ListShareGroupOffsetsResult { * * @return Future which yields all {@code Map>} objects, if requests for all the groups succeed. */ - public KafkaFuture>> all() { + public KafkaFuture>> all() { return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply( nil -> { - Map> offsets = new HashMap<>(futures.size()); + Map> offsets = new HashMap<>(futures.size()); futures.forEach((groupId, future) -> { try { offsets.put(groupId, future.get()); @@ -70,7 +71,7 @@ public class ListShareGroupOffsetsResult { * @param groupId The group ID. * @return Future which yields a map of topic partitions to offsets for the specified group. */ - public KafkaFuture> partitionsToOffset(String groupId) { + public KafkaFuture> partitionsToOffsetAndMetadata(String groupId) { if (!futures.containsKey(groupId)) { throw new IllegalArgumentException("Group ID not found: " + groupId); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java index fcaba5a67e6..f9b9e987930 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java @@ -19,6 +19,7 @@ package org.apache.kafka.clients.admin.internals; import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions; import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData; @@ -39,13 +40,14 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; /** * This class is the handler for {@link KafkaAdminClient#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)} call */ -public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched> { +public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched> { private final Map groupSpecs; private final Logger log; @@ -58,7 +60,7 @@ public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched> newFuture(Collection groupIds) { + public static AdminApiFuture.SimpleAdminApiFuture> newFuture(Collection groupIds) { return AdminApiFuture.forKeys(coordinatorKeys(groupIds)); } @@ -108,13 +110,13 @@ public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched> handleResponse(Node coordinator, - Set groupIds, - AbstractResponse abstractResponse) { + public ApiResult> handleResponse(Node coordinator, + Set groupIds, + AbstractResponse abstractResponse) { validateKeys(groupIds); final DescribeShareGroupOffsetsResponse response = (DescribeShareGroupOffsetsResponse) abstractResponse; - final Map> completed = new HashMap<>(); + final Map> completed = new HashMap<>(); final Map failed = new HashMap<>(); final List unmapped = new ArrayList<>(); @@ -123,17 +125,19 @@ public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched groupOffsetsListing = new HashMap<>(); + Map groupOffsetsListing = new HashMap<>(); response.data().groups().stream().filter(g -> g.groupId().equals(groupId)).forEach(groupResponse -> { for (DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic topicResponse : groupResponse.topics()) { for (DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition partitionResponse : topicResponse.partitions()) { TopicPartition tp = new TopicPartition(topicResponse.topicName(), partitionResponse.partitionIndex()); if (partitionResponse.errorCode() == Errors.NONE.code()) { + final long startOffset = partitionResponse.startOffset(); + final Optional leaderEpoch = partitionResponse.leaderEpoch() < 0 ? Optional.empty() : Optional.of(partitionResponse.leaderEpoch()); // Negative offset indicates there is no start offset for this partition if (partitionResponse.startOffset() < 0) { groupOffsetsListing.put(tp, null); } else { - groupOffsetsListing.put(tp, partitionResponse.startOffset()); + groupOffsetsListing.put(tp, new OffsetAndMetadata(startOffset, leaderEpoch, "")); } } else { log.warn("Skipping return offset for {} due to error {}: {}.", tp, partitionResponse.errorCode(), partitionResponse.errorMessage()); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java index 86363add1e1..a2787ff82c9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java @@ -99,6 +99,7 @@ public class ReadShareGroupStateSummaryResponse extends AbstractResponse { Uuid topicId, int partition, long startOffset, + int leaderEpoch, int stateEpoch ) { return new ReadShareGroupStateSummaryResponseData() @@ -109,6 +110,7 @@ public class ReadShareGroupStateSummaryResponse extends AbstractResponse { new ReadShareGroupStateSummaryResponseData.PartitionResult() .setPartition(partition) .setStartOffset(startOffset) + .setLeaderEpoch(leaderEpoch) .setStateEpoch(stateEpoch) )) )); diff --git a/clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json b/clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json index ddf9d7044a6..81e3edc554e 100644 --- a/clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json +++ b/clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json @@ -41,6 +41,8 @@ "about": "The error message, or null if there was no error." }, { "name": "StateEpoch", "type": "int32", "versions": "0+", "about": "The state epoch of the share-partition." }, + { "name": "LeaderEpoch", "type": "int32", "versions": "0+", + "about": "The leader epoch of the share-partition." }, { "name": "StartOffset", "type": "int64", "versions": "0+", "about": "The share-partition start offset." } ]} diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java index 061982e34d3..c98ffb9483f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java @@ -176,8 +176,8 @@ public class AdminClientTestUtils { return new ListClientMetricsResourcesResult(future); } - public static ListShareGroupOffsetsResult createListShareGroupOffsetsResult(Map>> groupOffsets) { - Map>> coordinatorFutures = groupOffsets.entrySet().stream() + public static ListShareGroupOffsetsResult createListShareGroupOffsetsResult(Map>> groupOffsets) { + Map>> coordinatorFutures = groupOffsets.entrySet().stream() .collect(Collectors.toMap( entry -> CoordinatorKey.byGroupId(entry.getKey()), Map.Entry::getValue diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 339a431aa1c..74c381be17e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -10577,12 +10577,24 @@ public class KafkaAdminClientTest { List.of( new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId(GROUP_ID).setTopics( List.of( - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10))), - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11))), - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(2).setStartOffset(40))), - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(3).setStartOffset(50))), - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100))), - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500))) + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions( + List.of( + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10).setLeaderEpoch(0), + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11).setLeaderEpoch(0), + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(2).setStartOffset(40).setLeaderEpoch(0), + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(3).setStartOffset(50).setLeaderEpoch(1) + ) + ), + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions( + List.of( + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100).setLeaderEpoch(2) + ) + ), + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions( + List.of( + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500).setLeaderEpoch(3) + ) + ) ) ) ) @@ -10590,15 +10602,15 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data)); final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs); - final Map partitionToOffsetAndMetadata = result.partitionsToOffset(GROUP_ID).get(); + final Map partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata(GROUP_ID).get(); assertEquals(6, partitionToOffsetAndMetadata.size()); - assertEquals(10, partitionToOffsetAndMetadata.get(myTopicPartition0)); - assertEquals(11, partitionToOffsetAndMetadata.get(myTopicPartition1)); - assertEquals(40, partitionToOffsetAndMetadata.get(myTopicPartition2)); - assertEquals(50, partitionToOffsetAndMetadata.get(myTopicPartition3)); - assertEquals(100, partitionToOffsetAndMetadata.get(myTopicPartition4)); - assertEquals(500, partitionToOffsetAndMetadata.get(myTopicPartition5)); + assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition0)); + assertEquals(new OffsetAndMetadata(11, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition1)); + assertEquals(new OffsetAndMetadata(40, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition2)); + assertEquals(new OffsetAndMetadata(50, Optional.of(1), ""), partitionToOffsetAndMetadata.get(myTopicPartition3)); + assertEquals(new OffsetAndMetadata(100, Optional.of(2), ""), partitionToOffsetAndMetadata.get(myTopicPartition4)); + assertEquals(new OffsetAndMetadata(500, Optional.of(3), ""), partitionToOffsetAndMetadata.get(myTopicPartition5)); } } @@ -10630,16 +10642,28 @@ public class KafkaAdminClientTest { List.of( new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId(GROUP_ID).setTopics( List.of( - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10))), - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11))), - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(2).setStartOffset(40))), - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(3).setStartOffset(50))) + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions( + List.of( + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10).setLeaderEpoch(0), + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11).setLeaderEpoch(0), + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(2).setStartOffset(40).setLeaderEpoch(0), + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(3).setStartOffset(50).setLeaderEpoch(1) + ) + ) ) ), new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId("group-1").setTopics( List.of( - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100))), - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500))) + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions( + List.of( + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100).setLeaderEpoch(2) + ) + ), + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions( + List.of( + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500).setLeaderEpoch(2) + ) + ) ) ) ) @@ -10649,17 +10673,17 @@ public class KafkaAdminClientTest { final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs); assertEquals(2, result.all().get().size()); - final Map partitionToOffsetAndMetadataGroup0 = result.partitionsToOffset(GROUP_ID).get(); + final Map partitionToOffsetAndMetadataGroup0 = result.partitionsToOffsetAndMetadata(GROUP_ID).get(); assertEquals(4, partitionToOffsetAndMetadataGroup0.size()); - assertEquals(10, partitionToOffsetAndMetadataGroup0.get(myTopicPartition0)); - assertEquals(11, partitionToOffsetAndMetadataGroup0.get(myTopicPartition1)); - assertEquals(40, partitionToOffsetAndMetadataGroup0.get(myTopicPartition2)); - assertEquals(50, partitionToOffsetAndMetadataGroup0.get(myTopicPartition3)); + assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition0)); + assertEquals(new OffsetAndMetadata(11, Optional.of(0), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition1)); + assertEquals(new OffsetAndMetadata(40, Optional.of(0), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition2)); + assertEquals(new OffsetAndMetadata(50, Optional.of(1), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition3)); - final Map partitionToOffsetAndMetadataGroup1 = result.partitionsToOffset("group-1").get(); + final Map partitionToOffsetAndMetadataGroup1 = result.partitionsToOffsetAndMetadata("group-1").get(); assertEquals(2, partitionToOffsetAndMetadataGroup1.size()); - assertEquals(100, partitionToOffsetAndMetadataGroup1.get(myTopicPartition4)); - assertEquals(500, partitionToOffsetAndMetadataGroup1.get(myTopicPartition5)); + assertEquals(new OffsetAndMetadata(100, Optional.of(2), ""), partitionToOffsetAndMetadataGroup1.get(myTopicPartition4)); + assertEquals(new OffsetAndMetadata(500, Optional.of(2), ""), partitionToOffsetAndMetadataGroup1.get(myTopicPartition5)); } } @@ -10682,7 +10706,7 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data)); final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs); - final Map partitionToOffsetAndMetadata = result.partitionsToOffset(GROUP_ID).get(); + final Map partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata(GROUP_ID).get(); assertEquals(0, partitionToOffsetAndMetadata.size()); } @@ -10711,12 +10735,22 @@ public class KafkaAdminClientTest { List.of( new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId(GROUP_ID).setTopics( List.of( - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of( - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10), - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11) - )), - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setErrorCode(Errors.NOT_COORDINATOR.code()).setErrorMessage("Not a Coordinator"))), - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500))) + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions( + List.of( + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10).setLeaderEpoch(0), + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11).setLeaderEpoch(1) + ) + ), + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions( + List.of( + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setErrorCode(Errors.NOT_COORDINATOR.code()).setErrorMessage("Not a Coordinator") + ) + ), + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions( + List.of( + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500).setLeaderEpoch(2) + ) + ) ) ) ) @@ -10724,13 +10758,13 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data)); final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs); - final Map partitionToOffsetAndMetadata = result.partitionsToOffset(GROUP_ID).get(); + final Map partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata(GROUP_ID).get(); // For myTopicPartition2 we have set an error as the response. Thus, it should be skipped from the final result assertEquals(3, partitionToOffsetAndMetadata.size()); - assertEquals(10, partitionToOffsetAndMetadata.get(myTopicPartition0)); - assertEquals(11, partitionToOffsetAndMetadata.get(myTopicPartition1)); - assertEquals(500, partitionToOffsetAndMetadata.get(myTopicPartition3)); + assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition0)); + assertEquals(new OffsetAndMetadata(11, Optional.of(1), ""), partitionToOffsetAndMetadata.get(myTopicPartition1)); + assertEquals(new OffsetAndMetadata(500, Optional.of(2), ""), partitionToOffsetAndMetadata.get(myTopicPartition3)); } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index ebccd66359b..cc2566b492e 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -1702,6 +1702,7 @@ public class GroupCoordinatorService implements GroupCoordinator { partitionData -> new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(partitionData.partition()) .setStartOffset(partitionData.errorCode() == Errors.NONE.code() ? partitionData.startOffset() : PartitionFactory.UNINITIALIZED_START_OFFSET) + .setLeaderEpoch(partitionData.errorCode() == Errors.NONE.code() ? partitionData.leaderEpoch() : PartitionFactory.DEFAULT_LEADER_EPOCH) ).toList()) )); diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java index 1b4a0756515..ae8b8c317c3 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java @@ -486,6 +486,7 @@ public class DefaultStatePersister implements Persister { partitionResult.partition(), partitionResult.stateEpoch(), partitionResult.startOffset(), + partitionResult.leaderEpoch(), partitionResult.errorCode(), partitionResult.errorMessage())) .toList(); @@ -495,6 +496,7 @@ public class DefaultStatePersister implements Persister { partition, -1, -1, + -1, Errors.UNKNOWN_SERVER_ERROR.code(), // No specific public error code exists for InterruptedException / ExecutionException "Error reading state from share coordinator: " + e.getMessage())); } diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java index 2814fc3aa82..908891dd463 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java @@ -92,7 +92,8 @@ public class NoOpStatePersister implements Persister { for (TopicData topicData : reqData.topicsData()) { resultArgs.add(new TopicData<>(topicData.topicId(), topicData.partitions().stream(). map(partitionIdData -> PartitionFactory.newPartitionStateSummaryData( - partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.UNINITIALIZED_START_OFFSET, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE)) + partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.UNINITIALIZED_START_OFFSET, + PartitionFactory.DEFAULT_LEADER_EPOCH, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE)) .collect(Collectors.toList()))); } return CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResult.Builder().setTopicsData(resultArgs).build()); diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java index 009eb9cccc1..78a6902a170 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java @@ -47,12 +47,8 @@ public class PartitionFactory { return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null); } - public static PartitionStateErrorData newPartitionStateErrorData(int partition, int stateEpoch, long startOffset, short errorCode, String errorMessage) { - return new PartitionData(partition, stateEpoch, startOffset, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null); - } - - public static PartitionStateSummaryData newPartitionStateSummaryData(int partition, int stateEpoch, long startOffset, short errorCode, String errorMessage) { - return new PartitionData(partition, stateEpoch, startOffset, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null); + public static PartitionStateSummaryData newPartitionStateSummaryData(int partition, int stateEpoch, long startOffset, int leaderEpoch, short errorCode, String errorMessage) { + return new PartitionData(partition, stateEpoch, startOffset, errorCode, errorMessage, leaderEpoch, null); } public static PartitionStateBatchData newPartitionStateBatchData(int partition, int stateEpoch, long startOffset, int leaderEpoch, List stateBatches) { diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java index dc4732a79ae..58a9dc10615 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java @@ -24,6 +24,8 @@ package org.apache.kafka.server.share.persister; public interface PartitionStateSummaryData extends PartitionInfoData, PartitionIdData { int stateEpoch(); + int leaderEpoch(); + long startOffset(); short errorCode(); diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java index 7e0bee13c38..249eb20ed94 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java @@ -38,7 +38,8 @@ public class ReadShareGroupStateSummaryResult implements PersisterResult { .map(readStateSummaryResult -> new TopicData<>(readStateSummaryResult.topicId(), readStateSummaryResult.partitions().stream() .map(partitionResult -> PartitionFactory.newPartitionStateSummaryData( - partitionResult.partition(), partitionResult.stateEpoch(), partitionResult.startOffset(), partitionResult.errorCode(), partitionResult.errorMessage())) + partitionResult.partition(), partitionResult.stateEpoch(), partitionResult.startOffset(), + partitionResult.leaderEpoch(), partitionResult.errorCode(), partitionResult.errorMessage())) .collect(Collectors.toList()))) .collect(Collectors.toList())) .build(); diff --git a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java index 1a668b5dc32..697d958723a 100644 --- a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java @@ -868,7 +868,7 @@ class DefaultStatePersisterTest { return requestGroupId.equals(groupId) && requestTopicId == topicId1 && requestPartition == partition1; }, - new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId1, partition1, 0, 1)), + new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId1, partition1, 0, 1, 1)), coordinatorNode1); client.prepareResponseFrom( @@ -880,7 +880,7 @@ class DefaultStatePersisterTest { return requestGroupId.equals(groupId) && requestTopicId == topicId2 && requestPartition == partition2; }, - new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId2, partition2, 0, 1)), + new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId2, partition2, 0, 1, 1)), coordinatorNode2); ShareCoordinatorMetadataCacheHelper cacheHelper = getDefaultCacheHelper(suppliedNode); @@ -930,12 +930,12 @@ class DefaultStatePersisterTest { HashSet expectedResultMap = new HashSet<>(); expectedResultMap.add( - (PartitionData) PartitionFactory.newPartitionStateSummaryData(partition1, 1, 0, Errors.NONE.code(), + (PartitionData) PartitionFactory.newPartitionStateSummaryData(partition1, 1, 0, 1, Errors.NONE.code(), null )); expectedResultMap.add( - (PartitionData) PartitionFactory.newPartitionStateSummaryData(partition2, 1, 0, Errors.NONE.code(), + (PartitionData) PartitionFactory.newPartitionStateSummaryData(partition2, 1, 0, 1, Errors.NONE.code(), null )); @@ -1437,6 +1437,7 @@ class DefaultStatePersisterTest { tp1.topicId(), tp1.partition(), 1L, + 1, 2 ) ) @@ -1468,7 +1469,7 @@ class DefaultStatePersisterTest { results.topicsData().contains( new TopicData<>( tp1.topicId(), - List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, Errors.NONE.code(), null)) + List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 1, Errors.NONE.code(), null)) ) ) ); @@ -1476,7 +1477,7 @@ class DefaultStatePersisterTest { results.topicsData().contains( new TopicData<>( tp2.topicId(), - List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "unknown tp")) + List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), 0, 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "unknown tp")) ) ) ); @@ -1496,6 +1497,7 @@ class DefaultStatePersisterTest { tp1.topicId(), tp1.partition(), 1L, + 1, 2 ) ) @@ -1517,7 +1519,7 @@ class DefaultStatePersisterTest { results.topicsData().contains( new TopicData<>( tp1.topicId(), - List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, Errors.NONE.code(), null)) + List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 1, Errors.NONE.code(), null)) ) ) ); @@ -1525,7 +1527,7 @@ class DefaultStatePersisterTest { results.topicsData().contains( new TopicData<>( tp2.topicId(), - List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), -1, -1L, Errors.UNKNOWN_SERVER_ERROR.code(), "Error reading state from share coordinator: java.lang.Exception: scary stuff")) + List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), -1, -1L, -1, Errors.UNKNOWN_SERVER_ERROR.code(), "Error reading state from share coordinator: java.lang.Exception: scary stuff")) ) ) ); diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java index d38564fd6f8..7f03f9254b1 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java @@ -453,6 +453,7 @@ public class ShareCoordinatorShard implements CoordinatorShard earliestResult = adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId); + Map startOffsets = adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId); Set partitionOffsets = new HashSet<>(); - for (Entry tp : earliestResult.entrySet()) { - SharePartitionOffsetInformation partitionOffsetInfo = new SharePartitionOffsetInformation( - groupId, - tp.getKey().topic(), - tp.getKey().partition(), - Optional.ofNullable(earliestResult.get(tp.getKey())) - ); - partitionOffsets.add(partitionOffsetInfo); - } + startOffsets.forEach((tp, offsetAndMetadata) -> { + if (offsetAndMetadata != null) { + partitionOffsets.add(new SharePartitionOffsetInformation( + groupId, + tp.topic(), + tp.partition(), + Optional.of(offsetAndMetadata.offset()), + offsetAndMetadata.leaderEpoch() + )); + } else { + partitionOffsets.add(new SharePartitionOffsetInformation( + groupId, + tp.topic(), + tp.partition(), + Optional.empty(), + Optional.empty() + )); + } + }); + groupOffsets.put(groupId, new SimpleImmutableEntry<>(shareGroup, partitionOffsets)); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); @@ -447,7 +459,7 @@ public class ShareGroupCommand { groupId, info.topic, info.partition, - MISSING_COLUMN_VALUE, // Temporary + info.leaderEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE), info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE) ); } else { @@ -569,17 +581,20 @@ public class ShareGroupCommand { final String topic; final int partition; final Optional offset; + final Optional leaderEpoch; SharePartitionOffsetInformation( String group, String topic, int partition, - Optional offset + Optional offset, + Optional leaderEpoch ) { this.group = group; this.topic = topic; this.partition = partition; this.offset = offset; + this.leaderEpoch = leaderEpoch; } } } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java index f1f91217511..546cab50e0d 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.admin.ShareGroupDescription; import org.apache.kafka.clients.admin.ShareMemberAssignment; import org.apache.kafka.clients.admin.ShareMemberDescription; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; import org.apache.kafka.common.KafkaFuture; @@ -191,7 +192,7 @@ public class ShareGroupCommandTest { ListShareGroupOffsetsResult listShareGroupOffsetsResult = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( firstGroup, - KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), 0L)) + KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), new OffsetAndMetadata(0L, Optional.of(1), ""))) ) ); @@ -208,7 +209,7 @@ public class ShareGroupCommandTest { List expectedValues; if (describeType.contains("--verbose")) { - expectedValues = List.of(firstGroup, "topic1", "0", "-", "0"); + expectedValues = List.of(firstGroup, "topic1", "0", "1", "0"); } else { expectedValues = List.of(firstGroup, "topic1", "0", "0"); } @@ -299,13 +300,13 @@ public class ShareGroupCommandTest { ListShareGroupOffsetsResult listShareGroupOffsetsResult1 = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( firstGroup, - KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), 0L)) + KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), new OffsetAndMetadata(0, Optional.of(1), ""))) ) ); ListShareGroupOffsetsResult listShareGroupOffsetsResult2 = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( secondGroup, - KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), 0L)) + KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), new OffsetAndMetadata(0, Optional.of(1), ""))) ) ); @@ -333,8 +334,8 @@ public class ShareGroupCommandTest { List expectedValues1, expectedValues2; if (describeType.contains("--verbose")) { - expectedValues1 = List.of(firstGroup, "topic1", "0", "-", "0"); - expectedValues2 = List.of(secondGroup, "topic1", "0", "-", "0"); + expectedValues1 = List.of(firstGroup, "topic1", "0", "1", "0"); + expectedValues2 = List.of(secondGroup, "topic1", "0", "1", "0"); } else { expectedValues1 = List.of(firstGroup, "topic1", "0", "0"); expectedValues2 = List.of(secondGroup, "topic1", "0", "0");