KAFKA-18433: Add BatchSize to ShareFetch request (1/N) (#18439)

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
Andrew Schofield 2025-01-08 15:29:43 +00:00 committed by GitHub
parent b51b31ed9c
commit 3f9d2c2db0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 9 additions and 5 deletions

View File

@ -171,7 +171,7 @@ public class ShareSessionHandler {
return ShareFetchRequest.Builder.forConsumer( return ShareFetchRequest.Builder.forConsumer(
groupId, nextMetadata, fetchConfig.maxWaitMs, groupId, nextMetadata, fetchConfig.maxWaitMs,
fetchConfig.minBytes, fetchConfig.maxBytes, fetchConfig.fetchSize, fetchConfig.minBytes, fetchConfig.maxBytes, fetchConfig.fetchSize, fetchConfig.maxPollRecords,
added, removed, acknowledgementBatches); added, removed, acknowledgementBatches);
} }

View File

@ -49,7 +49,7 @@ public class ShareFetchRequest extends AbstractRequest {
} }
public static Builder forConsumer(String groupId, ShareRequestMetadata metadata, public static Builder forConsumer(String groupId, ShareRequestMetadata metadata,
int maxWait, int minBytes, int maxBytes, int fetchSize, int maxWait, int minBytes, int maxBytes, int fetchSize, int batchSize,
List<TopicIdPartition> send, List<TopicIdPartition> forget, List<TopicIdPartition> send, List<TopicIdPartition> forget,
Map<TopicIdPartition, List<ShareFetchRequestData.AcknowledgementBatch>> acknowledgementsMap) { Map<TopicIdPartition, List<ShareFetchRequestData.AcknowledgementBatch>> acknowledgementsMap) {
ShareFetchRequestData data = new ShareFetchRequestData(); ShareFetchRequestData data = new ShareFetchRequestData();
@ -67,6 +67,7 @@ public class ShareFetchRequest extends AbstractRequest {
data.setMaxWaitMs(maxWait); data.setMaxWaitMs(maxWait);
data.setMinBytes(minBytes); data.setMinBytes(minBytes);
data.setMaxBytes(maxBytes); data.setMaxBytes(maxBytes);
data.setBatchSize(batchSize);
// Build a map of topics to fetch keyed by topic ID, and within each a map of partitions keyed by index // Build a map of topics to fetch keyed by topic ID, and within each a map of partitions keyed by index
Map<Uuid, Map<Integer, ShareFetchRequestData.FetchPartition>> fetchMap = new HashMap<>(); Map<Uuid, Map<Integer, ShareFetchRequestData.FetchPartition>> fetchMap = new HashMap<>();

View File

@ -37,6 +37,8 @@
"about": "The minimum bytes to accumulate in the response." }, "about": "The minimum bytes to accumulate in the response." },
{ "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff", { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff",
"about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." }, "about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." },
{ "name": "BatchSize", "type": "int32", "versions": "0+",
"about": "The optimal number of records for batches of acquired records and acknowledgements." },
{ "name": "Topics", "type": "[]FetchTopic", "versions": "0+", { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
"about": "The topics to fetch.", "fields": [ "about": "The topics to fetch.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."}, { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."},
@ -45,7 +47,7 @@
{ "name": "PartitionIndex", "type": "int32", "versions": "0+", { "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." }, "about": "The partition index." },
{ "name": "PartitionMaxBytes", "type": "int32", "versions": "0+", { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
"about": "The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." }, "about": "TO BE REMOVED. The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." },
{ "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+", { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
"about": "Record batches to acknowledge.", "fields": [ "about": "Record batches to acknowledge.", "fields": [
{ "name": "FirstOffset", "type": "int64", "versions": "0+", { "name": "FirstOffset", "type": "int64", "versions": "0+",

View File

@ -2366,8 +2366,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]], acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]],
maxWaitMs: Int = MAX_WAIT_MS, maxWaitMs: Int = MAX_WAIT_MS,
minBytes: Int = 0, minBytes: Int = 0,
maxBytes: Int = Int.MaxValue): ShareFetchRequest = { maxBytes: Int = Int.MaxValue,
ShareFetchRequest.Builder.forConsumer(groupId, metadata, maxWaitMs, minBytes, maxBytes, maxPartitionBytes, send.asJava, forget.asJava, acknowledgementsMap.asJava) batchSize: Int = 500): ShareFetchRequest = {
ShareFetchRequest.Builder.forConsumer(groupId, metadata, maxWaitMs, minBytes, maxBytes, maxPartitionBytes, batchSize, send.asJava, forget.asJava, acknowledgementsMap.asJava)
.build() .build()
} }