diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala index 86ef919c93e..4d81fe0dddb 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala @@ -275,11 +275,11 @@ private[group] class GroupCoordinatorAdapter( override def fetchAllOffsets( context: RequestContext, - groupId: String, + request: OffsetFetchRequestData.OffsetFetchRequestGroup, requireStable: Boolean - ): CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]] = { + ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = { handleFetchOffset( - groupId, + request.groupId, requireStable, None ) @@ -287,19 +287,18 @@ private[group] class GroupCoordinatorAdapter( override def fetchOffsets( context: RequestContext, - groupId: String, - topics: util.List[OffsetFetchRequestData.OffsetFetchRequestTopics], + request: OffsetFetchRequestData.OffsetFetchRequestGroup, requireStable: Boolean - ): CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]] = { + ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = { val topicPartitions = new mutable.ArrayBuffer[TopicPartition]() - topics.forEach { topic => + request.topics.forEach { topic => topic.partitionIndexes.forEach { partition => topicPartitions += new TopicPartition(topic.name, partition) } } handleFetchOffset( - groupId, + request.groupId, requireStable, Some(topicPartitions.toSeq) ) @@ -309,14 +308,14 @@ private[group] class GroupCoordinatorAdapter( groupId: String, requireStable: Boolean, partitions: Option[Seq[TopicPartition]] - ): CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]] = { + ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = { val (error, results) = coordinator.handleFetchOffsets( groupId, requireStable, partitions ) - val future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]() + val future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]() if (error != Errors.NONE) { future.completeExceptionally(error.exception) } else { @@ -343,7 +342,9 @@ private[group] class GroupCoordinatorAdapter( .setErrorCode(offset.error.code)) } - future.complete(topicsList) + future.complete(new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId(groupId) + .setTopics(topicsList)) } future diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 9af56815817..ab4bbe96237 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1452,17 +1452,17 @@ class KafkaApis(val requestChannel: RequestChannel, private def fetchAllOffsetsForGroup( requestContext: RequestContext, - groupOffsetFetch: OffsetFetchRequestData.OffsetFetchRequestGroup, + offsetFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup, requireStable: Boolean ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = { groupCoordinator.fetchAllOffsets( requestContext, - groupOffsetFetch.groupId, + offsetFetchRequest, requireStable - ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] { (offsets, exception) => + ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] { (offsetFetchResponse, exception) => if (exception != null) { new OffsetFetchResponseData.OffsetFetchResponseGroup() - .setGroupId(groupOffsetFetch.groupId) + .setGroupId(offsetFetchRequest.groupId) .setErrorCode(Errors.forException(exception).code) } else { // Clients are not allowed to see offsets for topics that are not authorized for Describe. @@ -1470,19 +1470,16 @@ class KafkaApis(val requestChannel: RequestChannel, requestContext, DESCRIBE, TOPIC, - offsets.asScala + offsetFetchResponse.topics.asScala )(_.name) - - new OffsetFetchResponseData.OffsetFetchResponseGroup() - .setGroupId(groupOffsetFetch.groupId) - .setTopics(authorizedOffsets.asJava) + offsetFetchResponse.setTopics(authorizedOffsets.asJava) } } } private def fetchOffsetsForGroup( requestContext: RequestContext, - groupOffsetFetch: OffsetFetchRequestData.OffsetFetchRequestGroup, + offsetFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup, requireStable: Boolean ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = { // Clients are not allowed to see offsets for topics that are not authorized for Describe. @@ -1490,25 +1487,25 @@ class KafkaApis(val requestChannel: RequestChannel, requestContext, DESCRIBE, TOPIC, - groupOffsetFetch.topics.asScala + offsetFetchRequest.topics.asScala )(_.name) groupCoordinator.fetchOffsets( requestContext, - groupOffsetFetch.groupId, - authorizedTopics.asJava, + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId(offsetFetchRequest.groupId) + .setTopics(authorizedTopics.asJava), requireStable - ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] { (topicOffsets, exception) => + ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] { (offsetFetchResponse, exception) => if (exception != null) { new OffsetFetchResponseData.OffsetFetchResponseGroup() - .setGroupId(groupOffsetFetch.groupId) + .setGroupId(offsetFetchRequest.groupId) .setErrorCode(Errors.forException(exception).code) } else { - val response = new OffsetFetchResponseData.OffsetFetchResponseGroup() - .setGroupId(groupOffsetFetch.groupId) - - response.topics.addAll(topicOffsets) - + val topics = new util.ArrayList[OffsetFetchResponseData.OffsetFetchResponseTopics]( + offsetFetchResponse.topics.size + unauthorizedTopics.size + ) + topics.addAll(offsetFetchResponse.topics) unauthorizedTopics.foreach { topic => val topicResponse = new OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic.name) topic.partitionIndexes.forEach { partitionIndex => @@ -1517,17 +1514,19 @@ class KafkaApis(val requestChannel: RequestChannel, .setCommittedOffset(-1) .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)) } - response.topics.add(topicResponse) + topics.add(topicResponse) } - - response + offsetFetchResponse.setTopics(topics) } } } - private def partitionByAuthorized(seq: Seq[TopicPartition], context: RequestContext): - (Seq[TopicPartition], Seq[TopicPartition]) = + private def partitionByAuthorized( + seq: Seq[TopicPartition], + context: RequestContext + ): (Seq[TopicPartition], Seq[TopicPartition]) = { authHelper.partitionSeqByAuthorized(context, DESCRIBE, TOPIC, seq)(_.topic) + } def handleFindCoordinatorRequest(request: RequestChannel.Request): Unit = { val version = request.header.apiVersion diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala index 7b7e10b1c94..42ffe856d67 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala @@ -502,7 +502,7 @@ class GroupCoordinatorAdapterTest { val ctx = makeContext(ApiKeys.OFFSET_FETCH, ApiKeys.OFFSET_FETCH.latestVersion) val future = adapter.fetchAllOffsets( ctx, - "group", + new OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId("group"), true ) @@ -537,9 +537,10 @@ class GroupCoordinatorAdapterTest { ).asJava) ) + assertEquals("group", future.get().groupId) assertEquals( expectedResponse.sortWith(_.name > _.name), - future.get().asScala.toList.sortWith(_.name > _.name) + future.get().topics.asScala.toList.sortWith(_.name > _.name) ) } @@ -583,15 +584,15 @@ class GroupCoordinatorAdapterTest { val ctx = makeContext(ApiKeys.OFFSET_FETCH, ApiKeys.OFFSET_FETCH.latestVersion) val future = adapter.fetchOffsets( ctx, - "group", - List( - new OffsetFetchRequestData.OffsetFetchRequestTopics() - .setName(foo0.topic) - .setPartitionIndexes(List[Integer](foo0.partition, foo1.partition).asJava), - new OffsetFetchRequestData.OffsetFetchRequestTopics() - .setName(bar1.topic) - .setPartitionIndexes(List[Integer](bar1.partition).asJava), - ).asJava, + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group") + .setTopics(List( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName(foo0.topic) + .setPartitionIndexes(List[Integer](foo0.partition, foo1.partition).asJava), + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName(bar1.topic) + .setPartitionIndexes(List[Integer](bar1.partition).asJava)).asJava), true ) @@ -626,9 +627,10 @@ class GroupCoordinatorAdapterTest { ).asJava) ) + assertEquals("group", future.get().groupId) assertEquals( expectedResponse.sortWith(_.name > _.name), - future.get().asScala.toList.sortWith(_.name > _.name) + future.get().topics.asScala.toList.sortWith(_.name > _.name) ) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index cbdf3005908..50ba898b25c 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -4316,28 +4316,33 @@ class KafkaApisTest { } else { val requestChannelRequest = makeRequest(version) - val group1Future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]() + val group1Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]() when(groupCoordinator.fetchOffsets( requestChannelRequest.context, - "group-1", - List(new OffsetFetchRequestData.OffsetFetchRequestTopics() - .setName("foo") - .setPartitionIndexes(List[Integer](0, 1).asJava) - ).asJava, + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-1") + .setTopics(List( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setPartitionIndexes(List[Integer](0, 1).asJava)).asJava), false )).thenReturn(group1Future) - val group2Future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]() + val group2Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]() when(groupCoordinator.fetchAllOffsets( requestChannelRequest.context, - "group-2", + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-2") + .setTopics(null), false )).thenReturn(group2Future) - val group3Future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]() + val group3Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]() when(groupCoordinator.fetchAllOffsets( requestChannelRequest.context, - "group-3", + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-3") + .setTopics(null), false )).thenReturn(group3Future) @@ -4388,8 +4393,8 @@ class KafkaApisTest { val expectedOffsetFetchResponse = new OffsetFetchResponseData() .setGroups(List(group1Response, group2Response, group3Response).asJava) - group1Future.complete(group1Response.topics) - group2Future.complete(group2Response.topics) + group1Future.complete(group1Response) + group2Future.complete(group2Response) group3Future.completeExceptionally(Errors.INVALID_GROUP_ID.exception) val response = verifyNoThrottling[OffsetFetchResponse](requestChannelRequest) @@ -4418,14 +4423,14 @@ class KafkaApisTest { val requestChannelRequest = makeRequest(version) - val future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]() + val future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]() when(groupCoordinator.fetchOffsets( requestChannelRequest.context, - "group-1", - List(new OffsetFetchRequestData.OffsetFetchRequestTopics() - .setName("foo") - .setPartitionIndexes(List[Integer](0, 1).asJava) - ).asJava, + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-1") + .setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setPartitionIndexes(List[Integer](0, 1).asJava)).asJava), false )).thenReturn(future) @@ -4469,7 +4474,7 @@ class KafkaApisTest { ).asJava) } - future.complete(group1Response.topics) + future.complete(group1Response) val response = verifyNoThrottling[OffsetFetchResponse](requestChannelRequest) assertEquals(expectedOffsetFetchResponse, response.data) @@ -4493,10 +4498,12 @@ class KafkaApisTest { val requestChannelRequest = makeRequest(version) - val future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]() + val future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]() when(groupCoordinator.fetchAllOffsets( requestChannelRequest.context, - "group-1", + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-1") + .setTopics(null), false )).thenReturn(future) @@ -4540,7 +4547,7 @@ class KafkaApisTest { ).asJava) } - future.complete(group1Response.topics) + future.complete(group1Response) val response = verifyNoThrottling[OffsetFetchResponse](requestChannelRequest) assertEquals(expectedOffsetFetchResponse, response.data) @@ -4588,22 +4595,24 @@ class KafkaApisTest { } // group-1 is allowed and bar is allowed. - val group1Future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]() + val group1Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]() when(groupCoordinator.fetchOffsets( requestChannelRequest.context, - "group-1", - List(new OffsetFetchRequestData.OffsetFetchRequestTopics() - .setName("bar") - .setPartitionIndexes(List[Integer](0).asJava) - ).asJava, + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-1") + .setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("bar") + .setPartitionIndexes(List[Integer](0).asJava)).asJava), false )).thenReturn(group1Future) // group-3 is allowed and bar is allowed. - val group3Future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]() + val group3Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]() when(groupCoordinator.fetchAllOffsets( requestChannelRequest.context, - "group-3", + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-3") + .setTopics(null), false )).thenReturn(group3Future) @@ -4690,8 +4699,8 @@ class KafkaApisTest { .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code), ).asJava) - group1Future.complete(group1ResponseFromCoordinator.topics) - group3Future.complete(group3ResponseFromCoordinator.topics) + group1Future.complete(group1ResponseFromCoordinator) + group3Future.complete(group3ResponseFromCoordinator) val response = verifyNoThrottling[OffsetFetchResponse](requestChannelRequest) assertEquals(expectedOffsetFetchResponse, response.data) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java index 643bbdadb4d..0476413c9ee 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java @@ -170,15 +170,13 @@ public interface GroupCoordinator { * Fetch offsets for a given Group. * * @param context The request context. - * @param groupId The group id. - * @param topics The topics to fetch the offsets for. + * @param request The OffsetFetchRequestGroup request. * * @return A future yielding the results or an exception. */ - CompletableFuture> fetchOffsets( + CompletableFuture fetchOffsets( RequestContext context, - String groupId, - List topics, + OffsetFetchRequestData.OffsetFetchRequestGroup request, boolean requireStable ); @@ -186,13 +184,13 @@ public interface GroupCoordinator { * Fetch all offsets for a given Group. * * @param context The request context. - * @param groupId The group id. + * @param request The OffsetFetchRequestGroup request. * * @return A future yielding the results or an exception. */ - CompletableFuture> fetchAllOffsets( + CompletableFuture fetchAllOffsets( RequestContext context, - String groupId, + OffsetFetchRequestData.OffsetFetchRequestGroup request, boolean requireStable ); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index d56d827ec6b..f018fa0ae6e 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -469,13 +469,12 @@ public class GroupCoordinatorService implements GroupCoordinator { } /** - * See {@link GroupCoordinator#fetchOffsets(RequestContext, String, List, boolean)}. + * See {@link GroupCoordinator#fetchOffsets(RequestContext, OffsetFetchRequestData.OffsetFetchRequestGroup, boolean)}. */ @Override - public CompletableFuture> fetchOffsets( + public CompletableFuture fetchOffsets( RequestContext context, - String groupId, - List topics, + OffsetFetchRequestData.OffsetFetchRequestGroup request, boolean requireStable ) { if (!isActive.get()) { @@ -483,7 +482,7 @@ public class GroupCoordinatorService implements GroupCoordinator { } // For backwards compatibility, we support fetch commits for the empty group id. - if (groupId == null) { + if (request.groupId() == null) { return FutureUtils.failedFuture(Errors.INVALID_GROUP_ID.exception()); } @@ -498,28 +497,28 @@ public class GroupCoordinatorService implements GroupCoordinator { if (requireStable) { return runtime.scheduleWriteOperation( "fetch-offsets", - topicPartitionFor(groupId), + topicPartitionFor(request.groupId()), coordinator -> new CoordinatorResult<>( Collections.emptyList(), - coordinator.fetchOffsets(groupId, topics, Long.MAX_VALUE) + coordinator.fetchOffsets(request, Long.MAX_VALUE) ) ); } else { return runtime.scheduleReadOperation( "fetch-offsets", - topicPartitionFor(groupId), - (coordinator, offset) -> coordinator.fetchOffsets(groupId, topics, offset) + topicPartitionFor(request.groupId()), + (coordinator, offset) -> coordinator.fetchOffsets(request, offset) ); } } /** - * See {@link GroupCoordinator#fetchAllOffsets(RequestContext, String, boolean)}. + * See {@link GroupCoordinator#fetchAllOffsets(RequestContext, OffsetFetchRequestData.OffsetFetchRequestGroup, boolean)}. */ @Override - public CompletableFuture> fetchAllOffsets( + public CompletableFuture fetchAllOffsets( RequestContext context, - String groupId, + OffsetFetchRequestData.OffsetFetchRequestGroup request, boolean requireStable ) { if (!isActive.get()) { @@ -527,7 +526,7 @@ public class GroupCoordinatorService implements GroupCoordinator { } // For backwards compatibility, we support fetch commits for the empty group id. - if (groupId == null) { + if (request.groupId() == null) { return FutureUtils.failedFuture(Errors.INVALID_GROUP_ID.exception()); } @@ -542,17 +541,17 @@ public class GroupCoordinatorService implements GroupCoordinator { if (requireStable) { return runtime.scheduleWriteOperation( "fetch-all-offsets", - topicPartitionFor(groupId), + topicPartitionFor(request.groupId()), coordinator -> new CoordinatorResult<>( Collections.emptyList(), - coordinator.fetchAllOffsets(groupId, Long.MAX_VALUE) + coordinator.fetchAllOffsets(request, Long.MAX_VALUE) ) ); } else { return runtime.scheduleReadOperation( "fetch-all-offsets", - topicPartitionFor(groupId), - (coordinator, offset) -> coordinator.fetchAllOffsets(groupId, offset) + topicPartitionFor(request.groupId()), + (coordinator, offset) -> coordinator.fetchAllOffsets(request, offset) ); } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 6b4f7e08a5a..94a4329f54c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -58,7 +58,6 @@ import org.apache.kafka.image.MetadataImage; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; -import java.util.List; import java.util.concurrent.CompletableFuture; /** @@ -262,35 +261,33 @@ public class GroupCoordinatorShard implements CoordinatorShard { /** * Fetch offsets for a given set of partitions and a given group. * - * @param groupId The group id. - * @param topics The topics to fetch the offsets for. + * @param request The OffsetFetchRequestGroup request. * @param epoch The epoch (or offset) used to read from the * timeline data structure. * * @return A List of OffsetFetchResponseTopics response. */ - public List fetchOffsets( - String groupId, - List topics, + public OffsetFetchResponseData.OffsetFetchResponseGroup fetchOffsets( + OffsetFetchRequestData.OffsetFetchRequestGroup request, long epoch ) throws ApiException { - return offsetMetadataManager.fetchOffsets(groupId, topics, epoch); + return offsetMetadataManager.fetchOffsets(request, epoch); } /** * Fetch all offsets for a given group. * - * @param groupId The group id. + * @param request The OffsetFetchRequestGroup request. * @param epoch The epoch (or offset) used to read from the * timeline data structure. * * @return A List of OffsetFetchResponseTopics response. */ - public List fetchAllOffsets( - String groupId, + public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( + OffsetFetchRequestData.OffsetFetchRequestGroup request, long epoch ) throws ApiException { - return offsetMetadataManager.fetchAllOffsets(groupId, epoch); + return offsetMetadataManager.fetchAllOffsets(request, epoch); } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java index 6743317f344..e551a61e849 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java @@ -332,29 +332,27 @@ public class OffsetMetadataManager { /** * Fetch offsets for a given Group. * - * @param groupId The group id. - * @param topics The topics to fetch the offsets for. + * @param request The OffsetFetchRequestGroup request. * @param lastCommittedOffset The last committed offsets in the timeline. * * @return A List of OffsetFetchResponseTopics response. */ - public List fetchOffsets( - String groupId, - List topics, + public OffsetFetchResponseData.OffsetFetchResponseGroup fetchOffsets( + OffsetFetchRequestData.OffsetFetchRequestGroup request, long lastCommittedOffset ) throws ApiException { boolean failAllPartitions = false; try { - validateOffsetFetch(groupId, lastCommittedOffset); + validateOffsetFetch(request.groupId(), lastCommittedOffset); } catch (GroupIdNotFoundException ex) { failAllPartitions = true; } - final List topicResponses = new ArrayList<>(topics.size()); + final List topicResponses = new ArrayList<>(request.topics().size()); final TimelineHashMap> groupOffsets = - failAllPartitions ? null : offsetsByGroup.get(groupId, lastCommittedOffset); + failAllPartitions ? null : offsetsByGroup.get(request.groupId(), lastCommittedOffset); - topics.forEach(topic -> { + request.topics().forEach(topic -> { final OffsetFetchResponseData.OffsetFetchResponseTopics topicResponse = new OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic.name()); topicResponses.add(topicResponse); @@ -382,30 +380,34 @@ public class OffsetMetadataManager { }); }); - return topicResponses; + return new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId(request.groupId()) + .setTopics(topicResponses); } /** * Fetch all offsets for a given Group. * - * @param groupId The group id. + * @param request The OffsetFetchRequestGroup request. * @param lastCommittedOffset The last committed offsets in the timeline. * * @return A List of OffsetFetchResponseTopics response. */ - public List fetchAllOffsets( - String groupId, + public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( + OffsetFetchRequestData.OffsetFetchRequestGroup request, long lastCommittedOffset ) throws ApiException { try { - validateOffsetFetch(groupId, lastCommittedOffset); + validateOffsetFetch(request.groupId(), lastCommittedOffset); } catch (GroupIdNotFoundException ex) { - return Collections.emptyList(); + return new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId(request.groupId()) + .setTopics(Collections.emptyList()); } final List topicResponses = new ArrayList<>(); final TimelineHashMap> groupOffsets = - offsetsByGroup.get(groupId, lastCommittedOffset); + offsetsByGroup.get(request.groupId(), lastCommittedOffset); if (groupOffsets != null) { groupOffsets.entrySet(lastCommittedOffset).forEach(topicEntry -> { @@ -429,7 +431,9 @@ public class OffsetMetadataManager { }); } - return topicResponses; + return new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId(request.groupId()) + .setTopics(topicResponses); } /** diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 666880c0431..c5e0c97555b 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -63,7 +63,6 @@ import org.mockito.ArgumentMatchers; import java.net.InetAddress; import java.util.Collections; -import java.util.List; import java.util.OptionalInt; import java.util.Properties; import java.util.concurrent.CompletableFuture; @@ -614,40 +613,43 @@ public class GroupCoordinatorServiceTest { service.startup(() -> 1); - List topicsRequest = - Collections.singletonList(new OffsetFetchRequestData.OffsetFetchRequestTopics() - .setName("foo") - .setPartitionIndexes(Collections.singletonList(0))); + OffsetFetchRequestData.OffsetFetchRequestGroup request = + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group") + .setTopics(Collections.singletonList(new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setPartitionIndexes(Collections.singletonList(0)))); - List topicsResponse = - Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponseTopics() - .setName("foo") - .setPartitions(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponsePartitions() - .setPartitionIndex(0) - .setCommittedOffset(100L)))); + OffsetFetchResponseData.OffsetFetchResponseGroup response = + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId("group") + .setTopics(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName("foo") + .setPartitions(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(0) + .setCommittedOffset(100L))))); if (requireStable) { when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("fetch-offsets"), ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), ArgumentMatchers.any() - )).thenReturn(CompletableFuture.completedFuture(topicsResponse)); + )).thenReturn(CompletableFuture.completedFuture(response)); } else { when(runtime.scheduleReadOperation( ArgumentMatchers.eq("fetch-offsets"), ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), ArgumentMatchers.any() - )).thenReturn(CompletableFuture.completedFuture(topicsResponse)); + )).thenReturn(CompletableFuture.completedFuture(response)); } - CompletableFuture> future = service.fetchOffsets( + CompletableFuture future = service.fetchOffsets( requestContext(ApiKeys.OFFSET_FETCH), - "group", - topicsRequest, + request, requireStable ); - assertEquals(topicsResponse, future.get(5, TimeUnit.SECONDS)); + assertEquals(response, future.get(5, TimeUnit.SECONDS)); } @ParameterizedTest @@ -664,33 +666,39 @@ public class GroupCoordinatorServiceTest { service.startup(() -> 1); - List topicsResponse = - Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponseTopics() - .setName("foo") - .setPartitions(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponsePartitions() - .setPartitionIndex(0) - .setCommittedOffset(100L)))); + OffsetFetchRequestData.OffsetFetchRequestGroup request = + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group"); + + OffsetFetchResponseData.OffsetFetchResponseGroup response = + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId("group") + .setTopics(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName("foo") + .setPartitions(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(0) + .setCommittedOffset(100L))))); if (requireStable) { when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("fetch-all-offsets"), ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), ArgumentMatchers.any() - )).thenReturn(CompletableFuture.completedFuture(topicsResponse)); + )).thenReturn(CompletableFuture.completedFuture(response)); } else { when(runtime.scheduleReadOperation( ArgumentMatchers.eq("fetch-all-offsets"), ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), ArgumentMatchers.any() - )).thenReturn(CompletableFuture.completedFuture(topicsResponse)); + )).thenReturn(CompletableFuture.completedFuture(response)); } - CompletableFuture> future = service.fetchAllOffsets( + CompletableFuture future = service.fetchAllOffsets( requestContext(ApiKeys.OFFSET_FETCH), - "group", + request, requireStable ); - assertEquals(topicsResponse, future.get(5, TimeUnit.SECONDS)); + assertEquals(response, future.get(5, TimeUnit.SECONDS)); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java index df472cfe30a..e60ec1bc4e9 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -182,21 +182,26 @@ public class OffsetMetadataManagerTest { List topics, long committedOffset ) { - return offsetMetadataManager.fetchOffsets( - groupId, - topics, + OffsetFetchResponseData.OffsetFetchResponseGroup response = offsetMetadataManager.fetchOffsets( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId(groupId) + .setTopics(topics), committedOffset ); + assertEquals(groupId, response.groupId()); + return response.topics(); } public List fetchAllOffsets( String groupId, long committedOffset ) { - return offsetMetadataManager.fetchAllOffsets( - groupId, + OffsetFetchResponseData.OffsetFetchResponseGroup response = offsetMetadataManager.fetchAllOffsets( + new OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId(groupId), committedOffset ); + assertEquals(groupId, response.groupId()); + return response.topics(); } public List> sleep(long ms) {