KAFKA-19290: Exploit mapKey optimisation in protocol requests and responses (wip) (#19815)

The mapKey optimisation can be used in some KIP-932 RPC schemas to
improve efficiency of some key-based accesses.

* AlterShareGroupOffsetsResponse
* ShareFetchRequest
* ShareFetchResponse
* ShareAcknowledgeRequest
* ShareAcknowledgeResponse

Reviewers: Andrew Schofield <aschofield@confluent.io>

---------

Signed-off-by: PoAn Yang <payang@apache.org>
This commit is contained in:
PoAn Yang 2025-06-06 21:19:08 +08:00 committed by GitHub
parent 4d6cf3efef
commit e0adec5549
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 557 additions and 541 deletions

View File

@ -20,13 +20,12 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic;
import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
public class AlterShareGroupOffsetsResponse extends AbstractResponse {
@ -71,15 +70,15 @@ public class AlterShareGroupOffsetsResponse extends AbstractResponse {
public static class Builder {
AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData();
HashMap<String, AlterShareGroupOffsetsResponseTopic> topics = new HashMap<>();
AlterShareGroupOffsetsResponseTopicCollection topics = new AlterShareGroupOffsetsResponseTopicCollection();
private AlterShareGroupOffsetsResponseTopic getOrCreateTopic(String topic, Uuid topicId) {
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic topicData = topics.get(topic);
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic topicData = topics.find(topic);
if (topicData == null) {
topicData = new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic()
.setTopicName(topic)
.setTopicId(topicId == null ? Uuid.ZERO_UUID : topicId);
topics.put(topic, topicData);
topics.add(topicData);
}
return topicData;
}
@ -94,7 +93,7 @@ public class AlterShareGroupOffsetsResponse extends AbstractResponse {
}
public AlterShareGroupOffsetsResponse build() {
data.setResponses(new ArrayList<>(topics.values()));
data.setResponses(topics);
return new AlterShareGroupOffsetsResponse(data);
}

View File

@ -17,15 +17,12 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareAcknowledgeRequestData;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -49,32 +46,26 @@ public class ShareAcknowledgeRequest extends AbstractRequest {
data.setShareSessionEpoch(metadata.epoch());
}
// Build a map of topics to acknowledge keyed by topic ID, and within each a map of partitions keyed by index
Map<Uuid, Map<Integer, ShareAcknowledgeRequestData.AcknowledgePartition>> ackMap = new HashMap<>();
ShareAcknowledgeRequestData.AcknowledgeTopicCollection ackTopics = new ShareAcknowledgeRequestData.AcknowledgeTopicCollection();
for (Map.Entry<TopicIdPartition, List<ShareAcknowledgeRequestData.AcknowledgementBatch>> acknowledgeEntry : acknowledgementsMap.entrySet()) {
TopicIdPartition tip = acknowledgeEntry.getKey();
Map<Integer, ShareAcknowledgeRequestData.AcknowledgePartition> partMap = ackMap.computeIfAbsent(tip.topicId(), k -> new HashMap<>());
ShareAcknowledgeRequestData.AcknowledgePartition ackPartition = partMap.get(tip.partition());
ShareAcknowledgeRequestData.AcknowledgeTopic ackTopic = ackTopics.find(tip.topicId());
if (ackTopic == null) {
ackTopic = new ShareAcknowledgeRequestData.AcknowledgeTopic()
.setTopicId(tip.topicId())
.setPartitions(new ShareAcknowledgeRequestData.AcknowledgePartitionCollection());
ackTopics.add(ackTopic);
}
ShareAcknowledgeRequestData.AcknowledgePartition ackPartition = ackTopic.partitions().find(tip.partition());
if (ackPartition == null) {
ackPartition = new ShareAcknowledgeRequestData.AcknowledgePartition()
.setPartitionIndex(tip.partition());
partMap.put(tip.partition(), ackPartition);
ackTopic.partitions().add(ackPartition);
}
ackPartition.setAcknowledgementBatches(acknowledgeEntry.getValue());
}
// Finally, build up the data to fetch
data.setTopics(new ArrayList<>());
ackMap.forEach((topicId, partMap) -> {
ShareAcknowledgeRequestData.AcknowledgeTopic ackTopic = new ShareAcknowledgeRequestData.AcknowledgeTopic()
.setTopicId(topicId)
.setPartitions(new ArrayList<>());
data.topics().add(ackTopic);
partMap.forEach((index, ackPartition) -> ackTopic.partitions().add(ackPartition));
});
data.setTopics(ackTopics);
return new ShareAcknowledgeRequest.Builder(data);
}

View File

@ -18,7 +18,6 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
@ -115,22 +114,21 @@ public class ShareAcknowledgeResponse extends AbstractResponse {
public static ShareAcknowledgeResponseData toMessage(Errors error, int throttleTimeMs,
Iterator<Map.Entry<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> partIterator,
List<Node> nodeEndpoints) {
Map<Uuid, ShareAcknowledgeResponseData.ShareAcknowledgeTopicResponse> topicResponseList = new LinkedHashMap<>();
ShareAcknowledgeResponseData.ShareAcknowledgeTopicResponseCollection topicResponses = new ShareAcknowledgeResponseData.ShareAcknowledgeTopicResponseCollection();
while (partIterator.hasNext()) {
Map.Entry<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> entry = partIterator.next();
ShareAcknowledgeResponseData.PartitionData partitionData = entry.getValue();
// Since PartitionData alone doesn't know the partition ID, we set it here
partitionData.setPartitionIndex(entry.getKey().topicPartition().partition());
// Checking if the topic is already present in the map
if (topicResponseList.containsKey(entry.getKey().topicId())) {
topicResponseList.get(entry.getKey().topicId()).partitions().add(partitionData);
} else {
List<ShareAcknowledgeResponseData.PartitionData> partitionResponses = new ArrayList<>();
partitionResponses.add(partitionData);
topicResponseList.put(entry.getKey().topicId(), new ShareAcknowledgeResponseData.ShareAcknowledgeTopicResponse()
ShareAcknowledgeResponseData.ShareAcknowledgeTopicResponse topicResponse = topicResponses.find(entry.getKey().topicId());
if (topicResponse == null) {
topicResponse = new ShareAcknowledgeResponseData.ShareAcknowledgeTopicResponse()
.setTopicId(entry.getKey().topicId())
.setPartitions(partitionResponses));
.setPartitions(new ArrayList<>());
topicResponses.add(topicResponse);
}
topicResponse.partitions().add(partitionData);
}
ShareAcknowledgeResponseData data = new ShareAcknowledgeResponseData();
// KafkaApis should only pass in node endpoints on error, otherwise this should be an empty list
@ -142,6 +140,6 @@ public class ShareAcknowledgeResponse extends AbstractResponse {
.setRack(endpoint.rack())));
return data.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(error.code())
.setResponses(new ArrayList<>(topicResponseList.values()));
.setResponses(topicResponses);
}
}

View File

@ -62,15 +62,24 @@ public class ShareFetchRequest extends AbstractRequest {
data.setBatchSize(batchSize);
// Build a map of topics to fetch keyed by topic ID, and within each a map of partitions keyed by index
Map<Uuid, Map<Integer, ShareFetchRequestData.FetchPartition>> fetchMap = new HashMap<>();
ShareFetchRequestData.FetchTopicCollection fetchTopics = new ShareFetchRequestData.FetchTopicCollection();
// First, start by adding the list of topic-partitions we are fetching
if (!isClosingShareSession) {
for (TopicIdPartition tip : send) {
Map<Integer, ShareFetchRequestData.FetchPartition> partMap = fetchMap.computeIfAbsent(tip.topicId(), k -> new HashMap<>());
ShareFetchRequestData.FetchPartition fetchPartition = new ShareFetchRequestData.FetchPartition()
ShareFetchRequestData.FetchTopic fetchTopic = fetchTopics.find(tip.topicId());
if (fetchTopic == null) {
fetchTopic = new ShareFetchRequestData.FetchTopic()
.setTopicId(tip.topicId())
.setPartitions(new ShareFetchRequestData.FetchPartitionCollection());
fetchTopics.add(fetchTopic);
}
ShareFetchRequestData.FetchPartition fetchPartition = fetchTopic.partitions().find(tip.partition());
if (fetchPartition == null) {
fetchPartition = new ShareFetchRequestData.FetchPartition()
.setPartitionIndex(tip.partition());
partMap.put(tip.partition(), fetchPartition);
fetchTopic.partitions().add(fetchPartition);
}
}
}
@ -78,27 +87,24 @@ public class ShareFetchRequest extends AbstractRequest {
// topic-partitions will be a subset, but if the assignment changes, there might be new entries to add
for (Map.Entry<TopicIdPartition, List<ShareFetchRequestData.AcknowledgementBatch>> acknowledgeEntry : acknowledgementsMap.entrySet()) {
TopicIdPartition tip = acknowledgeEntry.getKey();
Map<Integer, ShareFetchRequestData.FetchPartition> partMap = fetchMap.computeIfAbsent(tip.topicId(), k -> new HashMap<>());
ShareFetchRequestData.FetchPartition fetchPartition = partMap.get(tip.partition());
ShareFetchRequestData.FetchTopic fetchTopic = fetchTopics.find(tip.topicId());
if (fetchTopic == null) {
fetchTopic = new ShareFetchRequestData.FetchTopic()
.setTopicId(tip.topicId())
.setPartitions(new ShareFetchRequestData.FetchPartitionCollection());
fetchTopics.add(fetchTopic);
}
ShareFetchRequestData.FetchPartition fetchPartition = fetchTopic.partitions().find(tip.partition());
if (fetchPartition == null) {
fetchPartition = new ShareFetchRequestData.FetchPartition()
.setPartitionIndex(tip.partition());
partMap.put(tip.partition(), fetchPartition);
fetchTopic.partitions().add(fetchPartition);
}
fetchPartition.setAcknowledgementBatches(acknowledgeEntry.getValue());
}
// Build up the data to fetch
if (!fetchMap.isEmpty()) {
data.setTopics(new ArrayList<>());
fetchMap.forEach((topicId, partMap) -> {
ShareFetchRequestData.FetchTopic fetchTopic = new ShareFetchRequestData.FetchTopic()
.setTopicId(topicId)
.setPartitions(new ArrayList<>());
partMap.forEach((index, fetchPartition) -> fetchTopic.partitions().add(fetchPartition));
data.topics().add(fetchTopic);
});
}
data.setTopics(fetchTopics);
Builder builder = new Builder(data);
// And finally, forget the topic-partitions that are no longer in the session

View File

@ -168,7 +168,7 @@ public class ShareFetchResponse extends AbstractResponse {
private static ShareFetchResponseData toMessage(Errors error, int throttleTimeMs,
Iterator<Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData>> partIterator,
List<Node> nodeEndpoints, int acquisitionLockTimeout) {
Map<Uuid, ShareFetchResponseData.ShareFetchableTopicResponse> topicResponseList = new LinkedHashMap<>();
ShareFetchResponseData.ShareFetchableTopicResponseCollection topicResponses = new ShareFetchResponseData.ShareFetchableTopicResponseCollection();
while (partIterator.hasNext()) {
Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData> entry = partIterator.next();
ShareFetchResponseData.PartitionData partitionData = entry.getValue();
@ -180,15 +180,14 @@ public class ShareFetchResponse extends AbstractResponse {
if (partitionData.records() == null)
partitionData.setRecords(MemoryRecords.EMPTY);
// Checking if the topic is already present in the map
if (topicResponseList.containsKey(entry.getKey().topicId())) {
topicResponseList.get(entry.getKey().topicId()).partitions().add(partitionData);
} else {
List<ShareFetchResponseData.PartitionData> partitionResponses = new ArrayList<>();
partitionResponses.add(partitionData);
topicResponseList.put(entry.getKey().topicId(), new ShareFetchResponseData.ShareFetchableTopicResponse()
ShareFetchResponseData.ShareFetchableTopicResponse topicResponse = topicResponses.find(entry.getKey().topicId());
if (topicResponse == null) {
topicResponse = new ShareFetchResponseData.ShareFetchableTopicResponse()
.setTopicId(entry.getKey().topicId())
.setPartitions(partitionResponses));
.setPartitions(new ArrayList<>());
topicResponses.add(topicResponse);
}
topicResponse.partitions().add(partitionData);
}
ShareFetchResponseData data = new ShareFetchResponseData();
// KafkaApis should only pass in node endpoints on error, otherwise this should be an empty list
@ -201,7 +200,7 @@ public class ShareFetchResponse extends AbstractResponse {
return data.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(error.code())
.setAcquisitionLockTimeoutMs(acquisitionLockTimeout)
.setResponses(new ArrayList<>(topicResponseList.values()));
.setResponses(topicResponses);
}
public static ShareFetchResponseData.PartitionData partitionResponse(TopicIdPartition topicIdPartition, Errors error) {

View File

@ -25,7 +25,7 @@
"about": "The top-level error message, or null if there was no error." },
{ "name": "Responses", "type": "[]AlterShareGroupOffsetsResponseTopic", "versions": "0+",
"about": "The results for each topic.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The unique topic ID." },

View File

@ -32,10 +32,10 @@
"about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
{ "name": "Topics", "type": "[]AcknowledgeTopic", "versions": "0+",
"about": "The topics containing records to acknowledge.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID." },
{ "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID.", "mapKey": true },
{ "name": "Partitions", "type": "[]AcknowledgePartition", "versions": "0+",
"about": "The partitions containing records to acknowledge.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
{ "name": "PartitionIndex", "type": "int32", "versions": "0+", "mapKey": true,
"about": "The partition index." },
{ "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
"about": "Record batches to acknowledge.", "fields": [

View File

@ -43,7 +43,7 @@
"about": "The top-level error message, or null if there was no error." },
{ "name": "Responses", "type": "[]ShareAcknowledgeTopicResponse", "versions": "0+",
"about": "The response topics.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
{ "name": "TopicId", "type": "uuid", "versions": "0+", "mapKey": true,
"about": "The unique topic ID." },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
"about": "The topic partitions.", "fields": [

View File

@ -42,10 +42,10 @@
"about": "The optimal number of records for batches of acquired records and acknowledgements." },
{ "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
"about": "The topics to fetch.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."},
{ "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID.", "mapKey": true },
{ "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
"about": "The partitions to fetch.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
{ "name": "PartitionIndex", "type": "int32", "versions": "0+", "mapKey": true,
"about": "The partition index." },
{ "name": "PartitionMaxBytes", "type": "int32", "versions": "0",
"about": "The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." },

View File

@ -46,7 +46,7 @@
"about": "The time in milliseconds for which the acquired records are locked." },
{ "name": "Responses", "type": "[]ShareFetchableTopicResponse", "versions": "0+",
"about": "The response topics.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
{ "name": "TopicId", "type": "uuid", "versions": "0+", "mapKey": true,
"about": "The unique topic ID." },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
"about": "The topic partitions.", "fields": [

View File

@ -11329,10 +11329,10 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData().setResponses(
List.of(
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection(List.of(
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(0), new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(1))),
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName("bar").setPartitions(List.of(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(0)))
)
).iterator())
);
TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0);
@ -11358,10 +11358,10 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData().setResponses(
List.of(
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection(List.of(
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(0), new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(1).setErrorCode(Errors.NON_EMPTY_GROUP.code()).setErrorMessage("The group is not empty"))),
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName("bar").setPartitions(List.of(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(0)))
)
).iterator())
);
TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0);

View File

@ -166,9 +166,9 @@ public class KafkaShareConsumerTest {
return request.data().groupId().equals(groupId) &&
request.data().shareSessionEpoch() == 0 &&
request.data().batchSize() == batchSize &&
request.data().topics().get(0).topicId().equals(topicId1) &&
request.data().topics().get(0).partitions().size() == 1 &&
request.data().topics().get(0).partitions().get(0).acknowledgementBatches().isEmpty();
request.data().topics().stream().findFirst().get().topicId().equals(topicId1) &&
request.data().topics().stream().findFirst().get().partitions().size() == 1 &&
request.data().topics().stream().findFirst().get().partitions().stream().findFirst().get().acknowledgementBatches().isEmpty();
} else {
return false;
}
@ -180,10 +180,10 @@ public class KafkaShareConsumerTest {
ShareAcknowledgeRequest request = (ShareAcknowledgeRequest) body;
return request.data().groupId().equals(groupId) &&
request.data().shareSessionEpoch() == 1 &&
request.data().topics().get(0).partitions().get(0).acknowledgementBatches().get(0).firstOffset() == 0 &&
request.data().topics().get(0).partitions().get(0).acknowledgementBatches().get(0).lastOffset() == 1 &&
request.data().topics().get(0).partitions().get(0).acknowledgementBatches().get(0).acknowledgeTypes().size() == 1 &&
request.data().topics().get(0).partitions().get(0).acknowledgementBatches().get(0).acknowledgeTypes().get(0) == (byte) 1;
request.data().topics().stream().findFirst().get().partitions().stream().findFirst().get().acknowledgementBatches().get(0).firstOffset() == 0 &&
request.data().topics().stream().findFirst().get().partitions().stream().findFirst().get().acknowledgementBatches().get(0).lastOffset() == 1 &&
request.data().topics().stream().findFirst().get().partitions().stream().findFirst().get().acknowledgementBatches().get(0).acknowledgeTypes().size() == 1 &&
request.data().topics().stream().findFirst().get().partitions().stream().findFirst().get().acknowledgementBatches().get(0).acknowledgeTypes().get(0) == (byte) 1;
} else {
return false;
}
@ -243,9 +243,9 @@ public class KafkaShareConsumerTest {
return request.data().groupId().equals(groupId) &&
request.data().shareSessionEpoch() == 0 &&
request.data().batchSize() == batchSize &&
request.data().topics().get(0).topicId().equals(topicId1) &&
request.data().topics().get(0).partitions().size() == 1 &&
request.data().topics().get(0).partitions().get(0).acknowledgementBatches().isEmpty();
request.data().topics().stream().findFirst().get().topicId().equals(topicId1) &&
request.data().topics().stream().findFirst().get().partitions().size() == 1 &&
request.data().topics().stream().findFirst().get().partitions().stream().findFirst().get().acknowledgementBatches().isEmpty();
} else {
return false;
}
@ -411,7 +411,7 @@ public class KafkaShareConsumerTest {
.setPartitions(List.of(partData));
return new ShareAcknowledgeResponse(
new ShareAcknowledgeResponseData()
.setResponses(List.of(topicResponse))
.setResponses(new ShareAcknowledgeResponseData.ShareAcknowledgeTopicResponseCollection(List.of(topicResponse).iterator()))
);
}
}

View File

@ -47,6 +47,7 @@ import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.message.ShareFetchRequestData;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@ -994,12 +995,14 @@ public class ShareConsumeRequestManagerTest {
// Verify the builder data for node0.
assertEquals(1, builder1.data().topics().size());
assertEquals(tip0.topicId(), builder1.data().topics().get(0).topicId());
assertEquals(1, builder1.data().topics().get(0).partitions().size());
assertEquals(0, builder1.data().topics().get(0).partitions().get(0).partitionIndex());
assertEquals(1, builder1.data().topics().get(0).partitions().get(0).acknowledgementBatches().size());
assertEquals(0L, builder1.data().topics().get(0).partitions().get(0).acknowledgementBatches().get(0).firstOffset());
assertEquals(2L, builder1.data().topics().get(0).partitions().get(0).acknowledgementBatches().get(0).lastOffset());
ShareFetchRequestData.FetchTopic fetchTopic = builder1.data().topics().stream().findFirst().get();
assertEquals(tip0.topicId(), fetchTopic.topicId());
assertEquals(1, fetchTopic.partitions().size());
ShareFetchRequestData.FetchPartition fetchPartition = fetchTopic.partitions().stream().findFirst().get();
assertEquals(0, fetchPartition.partitionIndex());
assertEquals(1, fetchPartition.acknowledgementBatches().size());
assertEquals(0L, fetchPartition.acknowledgementBatches().get(0).firstOffset());
assertEquals(2L, fetchPartition.acknowledgementBatches().get(0).lastOffset());
assertEquals(1, builder1.data().forgottenTopicsData().size());
assertEquals(tip0.topicId(), builder1.data().forgottenTopicsData().get(0).topicId());
@ -1008,9 +1011,10 @@ public class ShareConsumeRequestManagerTest {
// Verify the builder data for node1.
assertEquals(1, builder2.data().topics().size());
assertEquals(tip1.topicId(), builder2.data().topics().get(0).topicId());
assertEquals(1, builder2.data().topics().get(0).partitions().size());
assertEquals(1, builder2.data().topics().get(0).partitions().get(0).partitionIndex());
fetchTopic = builder2.data().topics().stream().findFirst().get();
assertEquals(tip1.topicId(), fetchTopic.topicId());
assertEquals(1, fetchTopic.partitions().size());
assertEquals(1, fetchTopic.partitions().stream().findFirst().get().partitionIndex());
}
@Test
@ -1049,9 +1053,10 @@ public class ShareConsumeRequestManagerTest {
ShareFetchRequest.Builder builder = (ShareFetchRequest.Builder) pollResult.unsentRequests.get(0).requestBuilder();
assertEquals(1, builder.data().topics().size());
assertEquals(tip1.topicId(), builder.data().topics().get(0).topicId());
assertEquals(1, builder.data().topics().get(0).partitions().size());
assertEquals(1, builder.data().topics().get(0).partitions().get(0).partitionIndex());
ShareFetchRequestData.FetchTopic fetchTopic = builder.data().topics().stream().findFirst().get();
assertEquals(tip1.topicId(), fetchTopic.topicId());
assertEquals(1, fetchTopic.partitions().size());
assertEquals(1, fetchTopic.partitions().stream().findFirst().get().partitionIndex());
assertEquals(0, builder.data().forgottenTopicsData().size());
}

View File

@ -458,7 +458,7 @@ public class ShareSessionHandlerTest {
ShareFetchRequestData requestData = handler.newShareFetchBuilder(groupId, fetchConfig).build().data();
// We should have cleared the unsent acknowledgements before this ShareFetch.
assertEquals(0, requestData.topics().get(0).partitions().get(0).acknowledgementBatches().size());
assertEquals(0, requestData.topics().stream().findFirst().get().partitions().stream().findFirst().get().acknowledgementBatches().size());
ArrayList<TopicIdPartition> expectedToSend1 = new ArrayList<>();
expectedToSend1.add(new TopicIdPartition(fooId, 1, "foo"));

View File

@ -1447,10 +1447,10 @@ public class RequestResponseTest {
ShareFetchRequestData data = new ShareFetchRequestData()
.setGroupId("group")
.setMemberId(Uuid.randomUuid().toString())
.setTopics(singletonList(new ShareFetchRequestData.FetchTopic()
.setTopics(new ShareFetchRequestData.FetchTopicCollection(List.of(new ShareFetchRequestData.FetchTopic()
.setTopicId(Uuid.randomUuid())
.setPartitions(singletonList(new ShareFetchRequestData.FetchPartition()
.setPartitionIndex(0)))));
.setPartitions(new ShareFetchRequestData.FetchPartitionCollection(List.of(new ShareFetchRequestData.FetchPartition()
.setPartitionIndex(0)).iterator()))).iterator()));
return new ShareFetchRequest.Builder(data).build(version);
}
@ -1473,24 +1473,24 @@ public class RequestResponseTest {
private ShareAcknowledgeRequest createShareAcknowledgeRequest(short version) {
ShareAcknowledgeRequestData data = new ShareAcknowledgeRequestData()
.setMemberId(Uuid.randomUuid().toString())
.setTopics(singletonList(new ShareAcknowledgeRequestData.AcknowledgeTopic()
.setTopics(new ShareAcknowledgeRequestData.AcknowledgeTopicCollection(List.of(new ShareAcknowledgeRequestData.AcknowledgeTopic()
.setTopicId(Uuid.randomUuid())
.setPartitions(singletonList(new ShareAcknowledgeRequestData.AcknowledgePartition()
.setPartitions(new ShareAcknowledgeRequestData.AcknowledgePartitionCollection(List.of(new ShareAcknowledgeRequestData.AcknowledgePartition()
.setPartitionIndex(0)
.setAcknowledgementBatches(singletonList(new ShareAcknowledgeRequestData.AcknowledgementBatch()
.setFirstOffset(0)
.setLastOffset(0)
.setAcknowledgeTypes(Collections.singletonList((byte) 0))))))));
.setAcknowledgeTypes(Collections.singletonList((byte) 0))))).iterator()))).iterator()));
return new ShareAcknowledgeRequest.Builder(data).build(version);
}
private ShareAcknowledgeResponse createShareAcknowledgeResponse() {
ShareAcknowledgeResponseData data = new ShareAcknowledgeResponseData();
data.setResponses(singletonList(new ShareAcknowledgeResponseData.ShareAcknowledgeTopicResponse()
data.setResponses(new ShareAcknowledgeResponseData.ShareAcknowledgeTopicResponseCollection(List.of(new ShareAcknowledgeResponseData.ShareAcknowledgeTopicResponse()
.setTopicId(Uuid.randomUuid())
.setPartitions(singletonList(new ShareAcknowledgeResponseData.PartitionData()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code())))));
.setErrorCode(Errors.NONE.code())))).iterator()));
data.setThrottleTimeMs(345);
data.setErrorCode(Errors.NONE.code());
return new ShareAcknowledgeResponse(data);
@ -3817,12 +3817,13 @@ public class RequestResponseTest {
private AlterShareGroupOffsetsResponse createAlterShareGroupOffsetsResponse() {
AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData()
.setResponses(List.of(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic()
.setPartitions(List.of(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code())))
.setTopicName("topic")
.setTopicId(Uuid.randomUuid())));
.setResponses(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection(List.of(
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic()
.setPartitions(List.of(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code())))
.setTopicName("topic")
.setTopicId(Uuid.randomUuid())).iterator()));
return new AlterShareGroupOffsetsResponse(data);
}

View File

@ -617,9 +617,9 @@ public class SharePartitionManagerTest {
ShareFetchResponse resp2 = context2.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData2);
assertEquals(Errors.NONE, resp2.error());
assertEquals(1, resp2.data().responses().size());
assertEquals(barId, resp2.data().responses().get(0).topicId());
assertEquals(1, resp2.data().responses().get(0).partitions().size());
assertEquals(0, resp2.data().responses().get(0).partitions().get(0).partitionIndex());
assertEquals(barId, resp2.data().responses().stream().findFirst().get().topicId());
assertEquals(1, resp2.data().responses().stream().findFirst().get().partitions().size());
assertEquals(0, resp2.data().responses().stream().findFirst().get().partitions().get(0).partitionIndex());
assertEquals(1, resp2.responseData(topicNames).size());
}

View File

@ -725,9 +725,9 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
.setGroupId(shareGroup)
.setMemberId(Uuid.randomUuid().toString)
.setShareSessionEpoch(1)
.setTopics(List(new ShareAcknowledgeRequestData.AcknowledgeTopic()
.setTopics(new ShareAcknowledgeRequestData.AcknowledgeTopicCollection(util.List.of(new ShareAcknowledgeRequestData.AcknowledgeTopic()
.setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID))
.setPartitions(List(
.setPartitions(new ShareAcknowledgeRequestData.AcknowledgePartitionCollection(util.List.of(
new ShareAcknowledgeRequestData.AcknowledgePartition()
.setPartitionIndex(part)
.setAcknowledgementBatches(List(
@ -736,8 +736,8 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
.setLastOffset(1)
.setAcknowledgeTypes(util.List.of(1.toByte))
).asJava)
).asJava)
).asJava)
).iterator))
).iterator))
new ShareAcknowledgeRequest.Builder(shareAcknowledgeRequestData).build(ApiKeys.SHARE_ACKNOWLEDGE.latestVersion)
}
@ -3261,7 +3261,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
val request = createShareFetchRequest
val response = connectAndReceive[ShareFetchResponse](request, listenerName = listenerName)
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, Errors.forCode(response.data.responses.get(0).partitions.get(0).errorCode))
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, Errors.forCode(response.data.responses.stream().findFirst().get().partitions.get(0).errorCode))
}
@Test
@ -3586,7 +3586,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
val request = alterShareGroupOffsetsRequest
val response = connectAndReceive[AlterShareGroupOffsetsResponse](request, listenerName = listenerName)
assertEquals(1, response.data.responses.size)
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, response.data.responses.get(0).partitions.get(0).errorCode, s"Unexpected response $response")
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, response.data.responses.stream().findFirst().get().partitions.get(0).errorCode, s"Unexpected response $response")
}
private def sendAndReceiveFirstRegexHeartbeat(memberId: String,

File diff suppressed because it is too large Load Diff

View File

@ -197,8 +197,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
val expectedPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -206,7 +206,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), Collections.singletonList(9), Collections.singletonList(1)))
val partitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
val partitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedPartitionData, partitionData)
}
@ -272,10 +272,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
val partitionsCount = shareFetchResponseData.responses().get(0).partitions().size()
val partitionsCount = shareFetchResponseData.responses().stream().findFirst().get().partitions().size()
if (partitionsCount > 0) {
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
shareFetchResponseData.responses().get(0).partitions().asScala.foreach(partitionData => {
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
shareFetchResponseData.responses().stream().findFirst().get().partitions().asScala.foreach(partitionData => {
if (!partitionData.acquiredRecords().isEmpty) {
responses = responses :+ partitionData
}
@ -393,24 +393,24 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData1.errorCode)
assertEquals(30000, shareFetchResponseData1.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData1.responses().size())
assertEquals(topicId, shareFetchResponseData1.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData1.responses().get(0).partitions().size())
val partitionData1 = shareFetchResponseData1.responses().get(0).partitions().get(0)
assertEquals(topicId, shareFetchResponseData1.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData1.responses().stream().findFirst().get().partitions().size())
val partitionData1 = shareFetchResponseData1.responses().stream().findFirst().get().partitions().get(0)
val shareFetchResponseData2 = shareFetchResponse2.data()
assertEquals(Errors.NONE.code, shareFetchResponseData2.errorCode)
assertEquals(30000, shareFetchResponseData2.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData2.responses().size())
assertEquals(topicId, shareFetchResponseData2.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData2.responses().get(0).partitions().size())
val partitionData2 = shareFetchResponseData2.responses().get(0).partitions().get(0)
assertEquals(topicId, shareFetchResponseData2.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData2.responses().stream().findFirst().get().partitions().size())
val partitionData2 = shareFetchResponseData2.responses().stream().findFirst().get().partitions().get(0)
val shareFetchResponseData3 = shareFetchResponse3.data()
assertEquals(Errors.NONE.code, shareFetchResponseData3.errorCode)
assertEquals(1, shareFetchResponseData3.responses().size())
assertEquals(topicId, shareFetchResponseData3.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData3.responses().get(0).partitions().size())
val partitionData3 = shareFetchResponseData3.responses().get(0).partitions().get(0)
assertEquals(topicId, shareFetchResponseData3.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData3.responses().stream().findFirst().get().partitions().size())
val partitionData3 = shareFetchResponseData3.responses().stream().findFirst().get().partitions().get(0)
val expectedPartitionData1 = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(0)
@ -491,8 +491,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
var expectedFetchPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -500,7 +500,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), Collections.singletonList(9), Collections.singletonList(1)))
var fetchPartitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
var fetchPartitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedFetchPartitionData, fetchPartitionData)
// Send a Share Acknowledge request to acknowledge the fetched records
@ -517,14 +517,14 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode)
assertEquals(1, shareAcknowledgeResponseData.responses().size())
assertEquals(topicId, shareAcknowledgeResponseData.responses().get(0).topicId())
assertEquals(1, shareAcknowledgeResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareAcknowledgeResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareAcknowledgeResponseData.responses().stream().findFirst().get().partitions().size())
val expectedAcknowledgePartitionData = new ShareAcknowledgeResponseData.PartitionData()
.setPartitionIndex(partition)
.setErrorCode(Errors.NONE.code())
val acknowledgePartitionData = shareAcknowledgeResponseData.responses().get(0).partitions().get(0)
val acknowledgePartitionData = shareAcknowledgeResponseData.responses().stream().findFirst().get().partitions().get(0)
compareAcknowledgeResponsePartitions(expectedAcknowledgePartitionData, acknowledgePartitionData)
// Producing 10 more records to the topic
@ -540,8 +540,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
expectedFetchPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -549,7 +549,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(10), Collections.singletonList(19), Collections.singletonList(1))) // Only the records from offset 10 onwards should be fetched because records at offsets 0-9 have been acknowledged
fetchPartitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
fetchPartitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedFetchPartitionData, fetchPartitionData)
}
@ -611,8 +611,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(15000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
var expectedFetchPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -620,7 +620,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), Collections.singletonList(9), Collections.singletonList(1)))
var fetchPartitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
var fetchPartitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedFetchPartitionData, fetchPartitionData)
// Producing 10 more records to the topic created above
@ -640,8 +640,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(15000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
expectedFetchPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -649,7 +649,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(10), Collections.singletonList(19), Collections.singletonList(1))) // The records at offsets 0 to 9 will not be re fetched because they have been acknowledged
fetchPartitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
fetchPartitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedFetchPartitionData, fetchPartitionData)
// Producing 10 more records to the topic
@ -665,8 +665,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(15000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
expectedFetchPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -674,7 +674,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(20), Collections.singletonList(29), Collections.singletonList(1))) // Only the records from offset 20 onwards should be fetched because records at offsets 0-9 have been acknowledged before and 10 to 19 are currently acquired
fetchPartitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
fetchPartitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedFetchPartitionData, fetchPartitionData)
}
@ -734,8 +734,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
var expectedFetchPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -743,7 +743,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), Collections.singletonList(9), Collections.singletonList(1)))
var fetchPartitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
var fetchPartitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedFetchPartitionData, fetchPartitionData)
// Send a Share Acknowledge request to acknowledge the fetched records
@ -760,14 +760,14 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode)
assertEquals(1, shareAcknowledgeResponseData.responses().size())
assertEquals(topicId, shareAcknowledgeResponseData.responses().get(0).topicId())
assertEquals(1, shareAcknowledgeResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareAcknowledgeResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareAcknowledgeResponseData.responses().stream().findFirst().get().partitions().size())
val expectedAcknowledgePartitionData = new ShareAcknowledgeResponseData.PartitionData()
.setPartitionIndex(partition)
.setErrorCode(Errors.NONE.code())
val acknowledgePartitionData = shareAcknowledgeResponseData.responses().get(0).partitions().get(0)
val acknowledgePartitionData = shareAcknowledgeResponseData.responses().stream().findFirst().get().partitions().get(0)
compareAcknowledgeResponsePartitions(expectedAcknowledgePartitionData, acknowledgePartitionData)
// Sending a third share fetch request to check if acknowledgements were done successfully
@ -780,8 +780,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
expectedFetchPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -789,7 +789,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), Collections.singletonList(9), Collections.singletonList(2))) // Records at offsets 0 to 9 should be fetched again because they were released with delivery count as 2
fetchPartitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
fetchPartitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedFetchPartitionData, fetchPartitionData)
}
@ -849,8 +849,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
var expectedFetchPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -858,7 +858,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), Collections.singletonList(9), Collections.singletonList(1)))
val fetchPartitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
val fetchPartitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedFetchPartitionData, fetchPartitionData)
// Producing 10 more records to the topic created above
@ -894,10 +894,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
val responseSize = shareFetchResponseData.responses().get(0).partitions().size()
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
val responseSize = shareFetchResponseData.responses().stream().findFirst().get().partitions().size()
if (responseSize > 0) {
acquiredRecords.addAll(shareFetchResponseData.responses().get(0).partitions().get(0).acquiredRecords())
acquiredRecords.addAll(shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0).acquiredRecords())
}
// There should be 2 acquired record batches finally -
// 1. batch containing 0-9 offsets which were initially acknowledged as RELEASED.
@ -968,8 +968,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
var expectedFetchPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -977,7 +977,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), Collections.singletonList(9), Collections.singletonList(1)))
var fetchPartitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
var fetchPartitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedFetchPartitionData, fetchPartitionData)
// Send a Share Acknowledge request to acknowledge the fetched records
@ -994,14 +994,14 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode)
assertEquals(1, shareAcknowledgeResponseData.responses().size())
assertEquals(topicId, shareAcknowledgeResponseData.responses().get(0).topicId())
assertEquals(1, shareAcknowledgeResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareAcknowledgeResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareAcknowledgeResponseData.responses().stream().findFirst().get().partitions().size())
val expectedAcknowledgePartitionData = new ShareAcknowledgeResponseData.PartitionData()
.setPartitionIndex(partition)
.setErrorCode(Errors.NONE.code())
val acknowledgePartitionData = shareAcknowledgeResponseData.responses().get(0).partitions().get(0)
val acknowledgePartitionData = shareAcknowledgeResponseData.responses().stream().findFirst().get().partitions().get(0)
compareAcknowledgeResponsePartitions(expectedAcknowledgePartitionData, acknowledgePartitionData)
// Producing 10 more records to the topic
@ -1017,8 +1017,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
expectedFetchPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -1026,7 +1026,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(10), Collections.singletonList(19), Collections.singletonList(1))) // Only the records from offset 10 onwards should be fetched because records at offsets 0-9 have been rejected
fetchPartitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
fetchPartitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedFetchPartitionData, fetchPartitionData)
}
@ -1086,8 +1086,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
var expectedFetchPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -1095,7 +1095,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), Collections.singletonList(9), Collections.singletonList(1)))
var fetchPartitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
var fetchPartitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedFetchPartitionData, fetchPartitionData)
// Producing 10 more records to the topic created above
@ -1115,8 +1115,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
expectedFetchPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -1124,7 +1124,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(10), Collections.singletonList(19), Collections.singletonList(1))) // The records at offsets 0 to 9 will not be re fetched because they have been rejected
fetchPartitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
fetchPartitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedFetchPartitionData, fetchPartitionData)
// Producing 10 more records to the topic
@ -1140,8 +1140,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
expectedFetchPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -1149,7 +1149,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(20), Collections.singletonList(29), Collections.singletonList(1))) // Only the records from offset 20 onwards should be fetched because records at offsets 0-9 have been rejected before and 10 to 19 are currently acquired
fetchPartitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
fetchPartitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedFetchPartitionData, fetchPartitionData)
}
@ -1211,8 +1211,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
var expectedFetchPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -1220,7 +1220,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), Collections.singletonList(9), Collections.singletonList(1)))
var fetchPartitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
var fetchPartitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedFetchPartitionData, fetchPartitionData)
// Send a Share Acknowledge request to acknowledge the fetched records
@ -1237,14 +1237,14 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode)
assertEquals(1, shareAcknowledgeResponseData.responses().size())
assertEquals(topicId, shareAcknowledgeResponseData.responses().get(0).topicId())
assertEquals(1, shareAcknowledgeResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareAcknowledgeResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareAcknowledgeResponseData.responses().stream().findFirst().get().partitions().size())
var expectedAcknowledgePartitionData = new ShareAcknowledgeResponseData.PartitionData()
.setPartitionIndex(partition)
.setErrorCode(Errors.NONE.code())
var acknowledgePartitionData = shareAcknowledgeResponseData.responses().get(0).partitions().get(0)
var acknowledgePartitionData = shareAcknowledgeResponseData.responses().stream().findFirst().get().partitions().get(0)
compareAcknowledgeResponsePartitions(expectedAcknowledgePartitionData, acknowledgePartitionData)
// Sending a third share fetch request to check if acknowledgements were done successfully
@ -1257,8 +1257,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
expectedFetchPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -1266,7 +1266,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), Collections.singletonList(9), Collections.singletonList(2))) // Records at offsets 0 to 9 should be fetched again because they were released with delivery count as 2
fetchPartitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
fetchPartitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedFetchPartitionData, fetchPartitionData)
// Send a Share Acknowledge request to acknowledge the fetched records
@ -1282,14 +1282,14 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode)
assertEquals(1, shareAcknowledgeResponseData.responses().size())
assertEquals(topicId, shareAcknowledgeResponseData.responses().get(0).topicId())
assertEquals(1, shareAcknowledgeResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareAcknowledgeResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareAcknowledgeResponseData.responses().stream().findFirst().get().partitions().size())
expectedAcknowledgePartitionData = new ShareAcknowledgeResponseData.PartitionData()
.setPartitionIndex(partition)
.setErrorCode(Errors.NONE.code())
acknowledgePartitionData = shareAcknowledgeResponseData.responses().get(0).partitions().get(0)
acknowledgePartitionData = shareAcknowledgeResponseData.responses().stream().findFirst().get().partitions().get(0)
compareAcknowledgeResponsePartitions(expectedAcknowledgePartitionData, acknowledgePartitionData)
// Producing 10 new records to the topic
@ -1305,8 +1305,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
expectedFetchPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -1314,7 +1314,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(10), Collections.singletonList(19), Collections.singletonList(1))) // Only new records from offset 10 to 19 will be fetched, records at offsets 0 to 9 have been archived because delivery count limit has been exceeded
fetchPartitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
fetchPartitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedFetchPartitionData, fetchPartitionData)
}
@ -1392,26 +1392,26 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val shareFetchResponseData1 = shareFetchResponse1.data()
assertEquals(Errors.NONE.code, shareFetchResponseData1.errorCode)
assertEquals(1, shareFetchResponseData1.responses().size())
assertEquals(topicId, shareFetchResponseData1.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData1.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData1.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData1.responses().stream().findFirst().get().partitions().size())
val partitionData1 = shareFetchResponseData1.responses().get(0).partitions().get(0)
val partitionData1 = shareFetchResponseData1.responses().stream().findFirst().get().partitions().get(0)
val shareFetchResponseData2 = shareFetchResponse2.data()
assertEquals(Errors.NONE.code, shareFetchResponseData2.errorCode)
assertEquals(1, shareFetchResponseData2.responses().size())
assertEquals(topicId, shareFetchResponseData2.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData2.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData2.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData2.responses().stream().findFirst().get().partitions().size())
val partitionData2 = shareFetchResponseData2.responses().get(0).partitions().get(0)
val partitionData2 = shareFetchResponseData2.responses().stream().findFirst().get().partitions().get(0)
val shareFetchResponseData3 = shareFetchResponse3.data()
assertEquals(Errors.NONE.code, shareFetchResponseData3.errorCode)
assertEquals(1, shareFetchResponseData3.responses().size())
assertEquals(topicId, shareFetchResponseData3.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData3.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData3.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData3.responses().stream().findFirst().get().partitions().size())
val partitionData3 = shareFetchResponseData3.responses().get(0).partitions().get(0)
val partitionData3 = shareFetchResponseData3.responses().stream().findFirst().get().partitions().get(0)
// There should be no common records between the 3 consumers as they are part of the same group
assertTrue(partitionData1.acquiredRecords().get(0).lastOffset() < partitionData2.acquiredRecords().get(0).firstOffset())
@ -1496,26 +1496,26 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val shareFetchResponseData1 = shareFetchResponse1.data()
assertEquals(Errors.NONE.code, shareFetchResponseData1.errorCode)
assertEquals(1, shareFetchResponseData1.responses().size())
assertEquals(topicId, shareFetchResponseData1.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData1.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData1.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData1.responses().stream().findFirst().get().partitions().size())
val partitionData1 = shareFetchResponseData1.responses().get(0).partitions().get(0)
val partitionData1 = shareFetchResponseData1.responses().stream().findFirst().get().partitions().get(0)
val shareFetchResponseData2 = shareFetchResponse2.data()
assertEquals(Errors.NONE.code, shareFetchResponseData2.errorCode)
assertEquals(1, shareFetchResponseData2.responses().size())
assertEquals(topicId, shareFetchResponseData2.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData2.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData2.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData2.responses().stream().findFirst().get().partitions().size())
val partitionData2 = shareFetchResponseData2.responses().get(0).partitions().get(0)
val partitionData2 = shareFetchResponseData2.responses().stream().findFirst().get().partitions().get(0)
val shareFetchResponseData3 = shareFetchResponse3.data()
assertEquals(Errors.NONE.code, shareFetchResponseData3.errorCode)
assertEquals(1, shareFetchResponseData3.responses().size())
assertEquals(topicId, shareFetchResponseData3.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData3.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData3.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData3.responses().stream().findFirst().get().partitions().size())
val partitionData3 = shareFetchResponseData3.responses().get(0).partitions().get(0)
val partitionData3 = shareFetchResponseData3.responses().stream().findFirst().get().partitions().get(0)
// All the consumers should consume all the records since they are part of different groups
assertEquals(partitionData1.acquiredRecords(), expectedAcquiredRecords(Collections.singletonList(0), Collections.singletonList(9), Collections.singletonList(1)))
@ -1579,8 +1579,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
var expectedFetchPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -1588,7 +1588,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), Collections.singletonList(9), Collections.singletonList(1)))
var fetchPartitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
var fetchPartitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedFetchPartitionData, fetchPartitionData)
// Producing 10 more records to the topic created above
@ -1608,8 +1608,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
expectedFetchPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -1617,7 +1617,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(10), Collections.singletonList(19), Collections.singletonList(1))) // The records at offsets 0 to 9 will not be re fetched because they have been acknowledged
fetchPartitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
fetchPartitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedFetchPartitionData, fetchPartitionData)
// Sending a final fetch request to close the session
@ -1692,8 +1692,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
var expectedFetchPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -1701,7 +1701,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), Collections.singletonList(9), Collections.singletonList(1)))
var fetchPartitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
var fetchPartitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedFetchPartitionData, fetchPartitionData)
// Producing 10 more records to the topic created above
@ -1721,8 +1721,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
expectedFetchPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -1730,7 +1730,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(10), Collections.singletonList(19), Collections.singletonList(1))) // The records at offsets 0 to 9 will not be re fetched because they have been acknowledged
fetchPartitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
fetchPartitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedFetchPartitionData, fetchPartitionData)
// Sending a Share Acknowledge request to close the session
@ -1747,14 +1747,14 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode)
assertEquals(1, shareAcknowledgeResponseData.responses().size())
assertEquals(topicId, shareAcknowledgeResponseData.responses().get(0).topicId())
assertEquals(1, shareAcknowledgeResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareAcknowledgeResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareAcknowledgeResponseData.responses().stream().findFirst().get().partitions().size())
val expectedAcknowledgePartitionData = new ShareAcknowledgeResponseData.PartitionData()
.setPartitionIndex(partition)
.setErrorCode(Errors.NONE.code())
val acknowledgePartitionData = shareAcknowledgeResponseData.responses().get(0).partitions().get(0)
val acknowledgePartitionData = shareAcknowledgeResponseData.responses().stream().findFirst().get().partitions().get(0)
compareAcknowledgeResponsePartitions(expectedAcknowledgePartitionData, acknowledgePartitionData)
}
@ -1915,8 +1915,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
val expectedPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -1924,7 +1924,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), Collections.singletonList(9), Collections.singletonList(1)))
val partitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
val partitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedPartitionData, partitionData)
// Sending a thord Share Fetch request with invalid share session epoch
@ -1992,8 +1992,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
val expectedPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -2001,7 +2001,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), Collections.singletonList(9), Collections.singletonList(1)))
val partitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
val partitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedPartitionData, partitionData)
// Sending Share Acknowledge request with invalid share session epoch
@ -2075,16 +2075,15 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
val expectedPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
.setErrorCode(Errors.NONE.code())
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), Collections.singletonList(9), Collections.singletonList(1)))
val partitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
val partitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedPartitionData, partitionData)
// Sending a third Share Fetch request with wrong member Id
@ -2241,8 +2240,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
val expectedPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -2250,7 +2249,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), Collections.singletonList(9), Collections.singletonList(1)))
val partitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
val partitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedPartitionData, partitionData)
// Sending a Share Acknowledge request with wrong member Id
@ -2331,10 +2330,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
val partitionsCount = shareFetchResponseData.responses().get(0).partitions().size()
val partitionsCount = shareFetchResponseData.responses().stream().findFirst().get().partitions().size()
if (partitionsCount > 0) {
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
shareFetchResponseData.responses().get(0).partitions().asScala.foreach(partitionData => {
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
shareFetchResponseData.responses().stream().findFirst().get().partitions().asScala.foreach(partitionData => {
if (!partitionData.acquiredRecords().isEmpty) {
responses = responses :+ partitionData
}
@ -2358,8 +2357,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
assertEquals(topicId, shareFetchResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData.responses().stream().findFirst().get().partitions().size())
val expectedPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition2)
@ -2367,7 +2366,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(10), Collections.singletonList(19), Collections.singletonList(1)))
val partitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
val partitionData = shareFetchResponseData.responses().stream().findFirst().get().partitions().get(0)
compareFetchResponsePartitions(expectedPartitionData, partitionData)
}
@ -2425,8 +2424,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses.size)
assertEquals(topicId, shareFetchResponseData.responses.get(0).topicId)
assertEquals(1, shareFetchResponseData.responses.get(0).partitions.size)
assertEquals(topicId, shareFetchResponseData.responses.stream().findFirst().get().topicId)
assertEquals(1, shareFetchResponseData.responses.stream().findFirst().get().partitions.size)
val expectedPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -2434,7 +2433,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code)
.setAcquiredRecords(expectedAcquiredRecords(util.List.of(0), util.List.of(0), util.List.of(1)))
val partitionData = shareFetchResponseData.responses.get(0).partitions.get(0)
val partitionData = shareFetchResponseData.responses.stream().findFirst().get().partitions.get(0)
compareFetchResponsePartitions(expectedPartitionData, partitionData)
}
@ -2492,8 +2491,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses.size)
assertEquals(topicId, shareFetchResponseData.responses.get(0).topicId)
assertEquals(1, shareFetchResponseData.responses.get(0).partitions.size)
assertEquals(topicId, shareFetchResponseData.responses.stream().findFirst().get().topicId)
assertEquals(1, shareFetchResponseData.responses.stream().findFirst().get().partitions.size)
val expectedPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
@ -2501,7 +2500,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code)
.setAcquiredRecords(expectedAcquiredRecords(util.List.of(0, 1, 2, 3, 4), util.List.of(0, 1, 2, 3, 4), util.List.of(1, 1, 1, 1, 1)))
val partitionData = shareFetchResponseData.responses.get(0).partitions.get(0)
val partitionData = shareFetchResponseData.responses.stream().findFirst().get().partitions.get(0)
compareFetchResponsePartitions(expectedPartitionData, partitionData)
}

View File

@ -702,7 +702,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
.setErrorMessage(error.message())
.setResponses(response.responses());
data.setResponses(
response.responses().stream()
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection(response.responses().stream()
.map(topic -> {
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic topicData = new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic()
.setTopicName(topic.topicName());
@ -715,7 +715,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
});
return topicData;
})
.collect(Collectors.toList()));
.iterator()));
// don't uninitialized share group state here, as we regard this alter share group offsets request failed.
return data;
}

View File

@ -8107,7 +8107,7 @@ public class GroupMetadataManager {
) {
final long currentTimeMs = time.milliseconds();
Group group = groups.get(groupId);
List<AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic> alterShareGroupOffsetsResponseTopics = new ArrayList<>();
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection alterShareGroupOffsetsResponseTopics = new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection();
Map<Uuid, InitMapValue> initializingTopics = new HashMap<>();
Map<Uuid, Map<Integer, Long>> offsetByTopicPartitions = new HashMap<>();