KAFKA-14499: [5/N] Refactor GroupCoordinator.fetchOffsets and GroupCoordinator.fetchAllOffsets (#14310)

This patch refactors the GroupCoordinator.fetchOffsets and GroupCoordinator.fetchAllOffsets methods to take an OffsetFetchRequestGroup and to return an OffsetFetchResponseGroup. It prepares the ground for adding the member id and the member epoch to the OffsetFetchRequest. This change also makes those two methods more aligned with the others in the interface.

Reviewers: Calvin Liu <caliu@confluent.io>, Justine Olshan <jolshan@confluent.io>
This commit is contained in:
David Jacot 2023-09-01 03:45:24 -07:00 committed by GitHub
parent d0f3cf1f9f
commit dcff0878c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 188 additions and 166 deletions

View File

@ -275,11 +275,11 @@ private[group] class GroupCoordinatorAdapter(
override def fetchAllOffsets(
context: RequestContext,
groupId: String,
request: OffsetFetchRequestData.OffsetFetchRequestGroup,
requireStable: Boolean
): CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]] = {
): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
handleFetchOffset(
groupId,
request.groupId,
requireStable,
None
)
@ -287,19 +287,18 @@ private[group] class GroupCoordinatorAdapter(
override def fetchOffsets(
context: RequestContext,
groupId: String,
topics: util.List[OffsetFetchRequestData.OffsetFetchRequestTopics],
request: OffsetFetchRequestData.OffsetFetchRequestGroup,
requireStable: Boolean
): CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]] = {
): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
val topicPartitions = new mutable.ArrayBuffer[TopicPartition]()
topics.forEach { topic =>
request.topics.forEach { topic =>
topic.partitionIndexes.forEach { partition =>
topicPartitions += new TopicPartition(topic.name, partition)
}
}
handleFetchOffset(
groupId,
request.groupId,
requireStable,
Some(topicPartitions.toSeq)
)
@ -309,14 +308,14 @@ private[group] class GroupCoordinatorAdapter(
groupId: String,
requireStable: Boolean,
partitions: Option[Seq[TopicPartition]]
): CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]] = {
): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
val (error, results) = coordinator.handleFetchOffsets(
groupId,
requireStable,
partitions
)
val future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
val future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
if (error != Errors.NONE) {
future.completeExceptionally(error.exception)
} else {
@ -343,7 +342,9 @@ private[group] class GroupCoordinatorAdapter(
.setErrorCode(offset.error.code))
}
future.complete(topicsList)
future.complete(new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId(groupId)
.setTopics(topicsList))
}
future

View File

@ -1452,17 +1452,17 @@ class KafkaApis(val requestChannel: RequestChannel,
private def fetchAllOffsetsForGroup(
requestContext: RequestContext,
groupOffsetFetch: OffsetFetchRequestData.OffsetFetchRequestGroup,
offsetFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup,
requireStable: Boolean
): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
groupCoordinator.fetchAllOffsets(
requestContext,
groupOffsetFetch.groupId,
offsetFetchRequest,
requireStable
).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] { (offsets, exception) =>
).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] { (offsetFetchResponse, exception) =>
if (exception != null) {
new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId(groupOffsetFetch.groupId)
.setGroupId(offsetFetchRequest.groupId)
.setErrorCode(Errors.forException(exception).code)
} else {
// Clients are not allowed to see offsets for topics that are not authorized for Describe.
@ -1470,19 +1470,16 @@ class KafkaApis(val requestChannel: RequestChannel,
requestContext,
DESCRIBE,
TOPIC,
offsets.asScala
offsetFetchResponse.topics.asScala
)(_.name)
new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId(groupOffsetFetch.groupId)
.setTopics(authorizedOffsets.asJava)
offsetFetchResponse.setTopics(authorizedOffsets.asJava)
}
}
}
private def fetchOffsetsForGroup(
requestContext: RequestContext,
groupOffsetFetch: OffsetFetchRequestData.OffsetFetchRequestGroup,
offsetFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup,
requireStable: Boolean
): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
// Clients are not allowed to see offsets for topics that are not authorized for Describe.
@ -1490,25 +1487,25 @@ class KafkaApis(val requestChannel: RequestChannel,
requestContext,
DESCRIBE,
TOPIC,
groupOffsetFetch.topics.asScala
offsetFetchRequest.topics.asScala
)(_.name)
groupCoordinator.fetchOffsets(
requestContext,
groupOffsetFetch.groupId,
authorizedTopics.asJava,
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId(offsetFetchRequest.groupId)
.setTopics(authorizedTopics.asJava),
requireStable
).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] { (topicOffsets, exception) =>
).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] { (offsetFetchResponse, exception) =>
if (exception != null) {
new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId(groupOffsetFetch.groupId)
.setGroupId(offsetFetchRequest.groupId)
.setErrorCode(Errors.forException(exception).code)
} else {
val response = new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId(groupOffsetFetch.groupId)
response.topics.addAll(topicOffsets)
val topics = new util.ArrayList[OffsetFetchResponseData.OffsetFetchResponseTopics](
offsetFetchResponse.topics.size + unauthorizedTopics.size
)
topics.addAll(offsetFetchResponse.topics)
unauthorizedTopics.foreach { topic =>
val topicResponse = new OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic.name)
topic.partitionIndexes.forEach { partitionIndex =>
@ -1517,17 +1514,19 @@ class KafkaApis(val requestChannel: RequestChannel,
.setCommittedOffset(-1)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code))
}
response.topics.add(topicResponse)
topics.add(topicResponse)
}
response
offsetFetchResponse.setTopics(topics)
}
}
}
private def partitionByAuthorized(seq: Seq[TopicPartition], context: RequestContext):
(Seq[TopicPartition], Seq[TopicPartition]) =
private def partitionByAuthorized(
seq: Seq[TopicPartition],
context: RequestContext
): (Seq[TopicPartition], Seq[TopicPartition]) = {
authHelper.partitionSeqByAuthorized(context, DESCRIBE, TOPIC, seq)(_.topic)
}
def handleFindCoordinatorRequest(request: RequestChannel.Request): Unit = {
val version = request.header.apiVersion

View File

@ -502,7 +502,7 @@ class GroupCoordinatorAdapterTest {
val ctx = makeContext(ApiKeys.OFFSET_FETCH, ApiKeys.OFFSET_FETCH.latestVersion)
val future = adapter.fetchAllOffsets(
ctx,
"group",
new OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId("group"),
true
)
@ -537,9 +537,10 @@ class GroupCoordinatorAdapterTest {
).asJava)
)
assertEquals("group", future.get().groupId)
assertEquals(
expectedResponse.sortWith(_.name > _.name),
future.get().asScala.toList.sortWith(_.name > _.name)
future.get().topics.asScala.toList.sortWith(_.name > _.name)
)
}
@ -583,15 +584,15 @@ class GroupCoordinatorAdapterTest {
val ctx = makeContext(ApiKeys.OFFSET_FETCH, ApiKeys.OFFSET_FETCH.latestVersion)
val future = adapter.fetchOffsets(
ctx,
"group",
List(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName(foo0.topic)
.setPartitionIndexes(List[Integer](foo0.partition, foo1.partition).asJava),
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName(bar1.topic)
.setPartitionIndexes(List[Integer](bar1.partition).asJava),
).asJava,
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group")
.setTopics(List(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName(foo0.topic)
.setPartitionIndexes(List[Integer](foo0.partition, foo1.partition).asJava),
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName(bar1.topic)
.setPartitionIndexes(List[Integer](bar1.partition).asJava)).asJava),
true
)
@ -626,9 +627,10 @@ class GroupCoordinatorAdapterTest {
).asJava)
)
assertEquals("group", future.get().groupId)
assertEquals(
expectedResponse.sortWith(_.name > _.name),
future.get().asScala.toList.sortWith(_.name > _.name)
future.get().topics.asScala.toList.sortWith(_.name > _.name)
)
}

View File

@ -4316,28 +4316,33 @@ class KafkaApisTest {
} else {
val requestChannelRequest = makeRequest(version)
val group1Future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
val group1Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
when(groupCoordinator.fetchOffsets(
requestChannelRequest.context,
"group-1",
List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setPartitionIndexes(List[Integer](0, 1).asJava)
).asJava,
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group-1")
.setTopics(List(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setPartitionIndexes(List[Integer](0, 1).asJava)).asJava),
false
)).thenReturn(group1Future)
val group2Future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
val group2Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
when(groupCoordinator.fetchAllOffsets(
requestChannelRequest.context,
"group-2",
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group-2")
.setTopics(null),
false
)).thenReturn(group2Future)
val group3Future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
val group3Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
when(groupCoordinator.fetchAllOffsets(
requestChannelRequest.context,
"group-3",
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group-3")
.setTopics(null),
false
)).thenReturn(group3Future)
@ -4388,8 +4393,8 @@ class KafkaApisTest {
val expectedOffsetFetchResponse = new OffsetFetchResponseData()
.setGroups(List(group1Response, group2Response, group3Response).asJava)
group1Future.complete(group1Response.topics)
group2Future.complete(group2Response.topics)
group1Future.complete(group1Response)
group2Future.complete(group2Response)
group3Future.completeExceptionally(Errors.INVALID_GROUP_ID.exception)
val response = verifyNoThrottling[OffsetFetchResponse](requestChannelRequest)
@ -4418,14 +4423,14 @@ class KafkaApisTest {
val requestChannelRequest = makeRequest(version)
val future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
val future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
when(groupCoordinator.fetchOffsets(
requestChannelRequest.context,
"group-1",
List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setPartitionIndexes(List[Integer](0, 1).asJava)
).asJava,
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group-1")
.setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setPartitionIndexes(List[Integer](0, 1).asJava)).asJava),
false
)).thenReturn(future)
@ -4469,7 +4474,7 @@ class KafkaApisTest {
).asJava)
}
future.complete(group1Response.topics)
future.complete(group1Response)
val response = verifyNoThrottling[OffsetFetchResponse](requestChannelRequest)
assertEquals(expectedOffsetFetchResponse, response.data)
@ -4493,10 +4498,12 @@ class KafkaApisTest {
val requestChannelRequest = makeRequest(version)
val future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
val future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
when(groupCoordinator.fetchAllOffsets(
requestChannelRequest.context,
"group-1",
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group-1")
.setTopics(null),
false
)).thenReturn(future)
@ -4540,7 +4547,7 @@ class KafkaApisTest {
).asJava)
}
future.complete(group1Response.topics)
future.complete(group1Response)
val response = verifyNoThrottling[OffsetFetchResponse](requestChannelRequest)
assertEquals(expectedOffsetFetchResponse, response.data)
@ -4588,22 +4595,24 @@ class KafkaApisTest {
}
// group-1 is allowed and bar is allowed.
val group1Future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
val group1Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
when(groupCoordinator.fetchOffsets(
requestChannelRequest.context,
"group-1",
List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("bar")
.setPartitionIndexes(List[Integer](0).asJava)
).asJava,
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group-1")
.setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("bar")
.setPartitionIndexes(List[Integer](0).asJava)).asJava),
false
)).thenReturn(group1Future)
// group-3 is allowed and bar is allowed.
val group3Future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
val group3Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
when(groupCoordinator.fetchAllOffsets(
requestChannelRequest.context,
"group-3",
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group-3")
.setTopics(null),
false
)).thenReturn(group3Future)
@ -4690,8 +4699,8 @@ class KafkaApisTest {
.setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code),
).asJava)
group1Future.complete(group1ResponseFromCoordinator.topics)
group3Future.complete(group3ResponseFromCoordinator.topics)
group1Future.complete(group1ResponseFromCoordinator)
group3Future.complete(group3ResponseFromCoordinator)
val response = verifyNoThrottling[OffsetFetchResponse](requestChannelRequest)
assertEquals(expectedOffsetFetchResponse, response.data)

View File

@ -170,15 +170,13 @@ public interface GroupCoordinator {
* Fetch offsets for a given Group.
*
* @param context The request context.
* @param groupId The group id.
* @param topics The topics to fetch the offsets for.
* @param request The OffsetFetchRequestGroup request.
*
* @return A future yielding the results or an exception.
*/
CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>> fetchOffsets(
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetchOffsets(
RequestContext context,
String groupId,
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
OffsetFetchRequestData.OffsetFetchRequestGroup request,
boolean requireStable
);
@ -186,13 +184,13 @@ public interface GroupCoordinator {
* Fetch all offsets for a given Group.
*
* @param context The request context.
* @param groupId The group id.
* @param request The OffsetFetchRequestGroup request.
*
* @return A future yielding the results or an exception.
*/
CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>> fetchAllOffsets(
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetchAllOffsets(
RequestContext context,
String groupId,
OffsetFetchRequestData.OffsetFetchRequestGroup request,
boolean requireStable
);

View File

@ -469,13 +469,12 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
/**
* See {@link GroupCoordinator#fetchOffsets(RequestContext, String, List, boolean)}.
* See {@link GroupCoordinator#fetchOffsets(RequestContext, OffsetFetchRequestData.OffsetFetchRequestGroup, boolean)}.
*/
@Override
public CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>> fetchOffsets(
public CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetchOffsets(
RequestContext context,
String groupId,
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
OffsetFetchRequestData.OffsetFetchRequestGroup request,
boolean requireStable
) {
if (!isActive.get()) {
@ -483,7 +482,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
// For backwards compatibility, we support fetch commits for the empty group id.
if (groupId == null) {
if (request.groupId() == null) {
return FutureUtils.failedFuture(Errors.INVALID_GROUP_ID.exception());
}
@ -498,28 +497,28 @@ public class GroupCoordinatorService implements GroupCoordinator {
if (requireStable) {
return runtime.scheduleWriteOperation(
"fetch-offsets",
topicPartitionFor(groupId),
topicPartitionFor(request.groupId()),
coordinator -> new CoordinatorResult<>(
Collections.emptyList(),
coordinator.fetchOffsets(groupId, topics, Long.MAX_VALUE)
coordinator.fetchOffsets(request, Long.MAX_VALUE)
)
);
} else {
return runtime.scheduleReadOperation(
"fetch-offsets",
topicPartitionFor(groupId),
(coordinator, offset) -> coordinator.fetchOffsets(groupId, topics, offset)
topicPartitionFor(request.groupId()),
(coordinator, offset) -> coordinator.fetchOffsets(request, offset)
);
}
}
/**
* See {@link GroupCoordinator#fetchAllOffsets(RequestContext, String, boolean)}.
* See {@link GroupCoordinator#fetchAllOffsets(RequestContext, OffsetFetchRequestData.OffsetFetchRequestGroup, boolean)}.
*/
@Override
public CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>> fetchAllOffsets(
public CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetchAllOffsets(
RequestContext context,
String groupId,
OffsetFetchRequestData.OffsetFetchRequestGroup request,
boolean requireStable
) {
if (!isActive.get()) {
@ -527,7 +526,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
// For backwards compatibility, we support fetch commits for the empty group id.
if (groupId == null) {
if (request.groupId() == null) {
return FutureUtils.failedFuture(Errors.INVALID_GROUP_ID.exception());
}
@ -542,17 +541,17 @@ public class GroupCoordinatorService implements GroupCoordinator {
if (requireStable) {
return runtime.scheduleWriteOperation(
"fetch-all-offsets",
topicPartitionFor(groupId),
topicPartitionFor(request.groupId()),
coordinator -> new CoordinatorResult<>(
Collections.emptyList(),
coordinator.fetchAllOffsets(groupId, Long.MAX_VALUE)
coordinator.fetchAllOffsets(request, Long.MAX_VALUE)
)
);
} else {
return runtime.scheduleReadOperation(
"fetch-all-offsets",
topicPartitionFor(groupId),
(coordinator, offset) -> coordinator.fetchAllOffsets(groupId, offset)
topicPartitionFor(request.groupId()),
(coordinator, offset) -> coordinator.fetchAllOffsets(request, offset)
);
}
}

View File

@ -58,7 +58,6 @@ import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
@ -262,35 +261,33 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> {
/**
* Fetch offsets for a given set of partitions and a given group.
*
* @param groupId The group id.
* @param topics The topics to fetch the offsets for.
* @param request The OffsetFetchRequestGroup request.
* @param epoch The epoch (or offset) used to read from the
* timeline data structure.
*
* @return A List of OffsetFetchResponseTopics response.
*/
public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchOffsets(
String groupId,
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
public OffsetFetchResponseData.OffsetFetchResponseGroup fetchOffsets(
OffsetFetchRequestData.OffsetFetchRequestGroup request,
long epoch
) throws ApiException {
return offsetMetadataManager.fetchOffsets(groupId, topics, epoch);
return offsetMetadataManager.fetchOffsets(request, epoch);
}
/**
* Fetch all offsets for a given group.
*
* @param groupId The group id.
* @param request The OffsetFetchRequestGroup request.
* @param epoch The epoch (or offset) used to read from the
* timeline data structure.
*
* @return A List of OffsetFetchResponseTopics response.
*/
public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchAllOffsets(
String groupId,
public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets(
OffsetFetchRequestData.OffsetFetchRequestGroup request,
long epoch
) throws ApiException {
return offsetMetadataManager.fetchAllOffsets(groupId, epoch);
return offsetMetadataManager.fetchAllOffsets(request, epoch);
}
/**

View File

@ -332,29 +332,27 @@ public class OffsetMetadataManager {
/**
* Fetch offsets for a given Group.
*
* @param groupId The group id.
* @param topics The topics to fetch the offsets for.
* @param request The OffsetFetchRequestGroup request.
* @param lastCommittedOffset The last committed offsets in the timeline.
*
* @return A List of OffsetFetchResponseTopics response.
*/
public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchOffsets(
String groupId,
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
public OffsetFetchResponseData.OffsetFetchResponseGroup fetchOffsets(
OffsetFetchRequestData.OffsetFetchRequestGroup request,
long lastCommittedOffset
) throws ApiException {
boolean failAllPartitions = false;
try {
validateOffsetFetch(groupId, lastCommittedOffset);
validateOffsetFetch(request.groupId(), lastCommittedOffset);
} catch (GroupIdNotFoundException ex) {
failAllPartitions = true;
}
final List<OffsetFetchResponseData.OffsetFetchResponseTopics> topicResponses = new ArrayList<>(topics.size());
final List<OffsetFetchResponseData.OffsetFetchResponseTopics> topicResponses = new ArrayList<>(request.topics().size());
final TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> groupOffsets =
failAllPartitions ? null : offsetsByGroup.get(groupId, lastCommittedOffset);
failAllPartitions ? null : offsetsByGroup.get(request.groupId(), lastCommittedOffset);
topics.forEach(topic -> {
request.topics().forEach(topic -> {
final OffsetFetchResponseData.OffsetFetchResponseTopics topicResponse =
new OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic.name());
topicResponses.add(topicResponse);
@ -382,30 +380,34 @@ public class OffsetMetadataManager {
});
});
return topicResponses;
return new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId(request.groupId())
.setTopics(topicResponses);
}
/**
* Fetch all offsets for a given Group.
*
* @param groupId The group id.
* @param request The OffsetFetchRequestGroup request.
* @param lastCommittedOffset The last committed offsets in the timeline.
*
* @return A List of OffsetFetchResponseTopics response.
*/
public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchAllOffsets(
String groupId,
public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets(
OffsetFetchRequestData.OffsetFetchRequestGroup request,
long lastCommittedOffset
) throws ApiException {
try {
validateOffsetFetch(groupId, lastCommittedOffset);
validateOffsetFetch(request.groupId(), lastCommittedOffset);
} catch (GroupIdNotFoundException ex) {
return Collections.emptyList();
return new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId(request.groupId())
.setTopics(Collections.emptyList());
}
final List<OffsetFetchResponseData.OffsetFetchResponseTopics> topicResponses = new ArrayList<>();
final TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> groupOffsets =
offsetsByGroup.get(groupId, lastCommittedOffset);
offsetsByGroup.get(request.groupId(), lastCommittedOffset);
if (groupOffsets != null) {
groupOffsets.entrySet(lastCommittedOffset).forEach(topicEntry -> {
@ -429,7 +431,9 @@ public class OffsetMetadataManager {
});
}
return topicResponses;
return new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId(request.groupId())
.setTopics(topicResponses);
}
/**

View File

@ -63,7 +63,6 @@ import org.mockito.ArgumentMatchers;
import java.net.InetAddress;
import java.util.Collections;
import java.util.List;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
@ -614,40 +613,43 @@ public class GroupCoordinatorServiceTest {
service.startup(() -> 1);
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topicsRequest =
Collections.singletonList(new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setPartitionIndexes(Collections.singletonList(0)));
OffsetFetchRequestData.OffsetFetchRequestGroup request =
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group")
.setTopics(Collections.singletonList(new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setPartitionIndexes(Collections.singletonList(0))));
List<OffsetFetchResponseData.OffsetFetchResponseTopics> topicsResponse =
Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(0)
.setCommittedOffset(100L))));
OffsetFetchResponseData.OffsetFetchResponseGroup response =
new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId("group")
.setTopics(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(0)
.setCommittedOffset(100L)))));
if (requireStable) {
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("fetch-offsets"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(topicsResponse));
)).thenReturn(CompletableFuture.completedFuture(response));
} else {
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("fetch-offsets"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(topicsResponse));
)).thenReturn(CompletableFuture.completedFuture(response));
}
CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>> future = service.fetchOffsets(
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> future = service.fetchOffsets(
requestContext(ApiKeys.OFFSET_FETCH),
"group",
topicsRequest,
request,
requireStable
);
assertEquals(topicsResponse, future.get(5, TimeUnit.SECONDS));
assertEquals(response, future.get(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@ -664,33 +666,39 @@ public class GroupCoordinatorServiceTest {
service.startup(() -> 1);
List<OffsetFetchResponseData.OffsetFetchResponseTopics> topicsResponse =
Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(0)
.setCommittedOffset(100L))));
OffsetFetchRequestData.OffsetFetchRequestGroup request =
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group");
OffsetFetchResponseData.OffsetFetchResponseGroup response =
new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId("group")
.setTopics(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(0)
.setCommittedOffset(100L)))));
if (requireStable) {
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("fetch-all-offsets"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(topicsResponse));
)).thenReturn(CompletableFuture.completedFuture(response));
} else {
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("fetch-all-offsets"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(topicsResponse));
)).thenReturn(CompletableFuture.completedFuture(response));
}
CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>> future = service.fetchAllOffsets(
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> future = service.fetchAllOffsets(
requestContext(ApiKeys.OFFSET_FETCH),
"group",
request,
requireStable
);
assertEquals(topicsResponse, future.get(5, TimeUnit.SECONDS));
assertEquals(response, future.get(5, TimeUnit.SECONDS));
}
}

View File

@ -182,21 +182,26 @@ public class OffsetMetadataManagerTest {
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
long committedOffset
) {
return offsetMetadataManager.fetchOffsets(
groupId,
topics,
OffsetFetchResponseData.OffsetFetchResponseGroup response = offsetMetadataManager.fetchOffsets(
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId(groupId)
.setTopics(topics),
committedOffset
);
assertEquals(groupId, response.groupId());
return response.topics();
}
public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchAllOffsets(
String groupId,
long committedOffset
) {
return offsetMetadataManager.fetchAllOffsets(
groupId,
OffsetFetchResponseData.OffsetFetchResponseGroup response = offsetMetadataManager.fetchAllOffsets(
new OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId(groupId),
committedOffset
);
assertEquals(groupId, response.groupId());
return response.topics();
}
public List<MockCoordinatorTimer.ExpiredTimeout<Void, Record>> sleep(long ms) {