KAFKA-18932: Removed usage of partition max bytes from share fetch requests (#19148)

This PR aims to remove the usage of partition max bytes from share fetch
requests. Partition Max Bytes is being defined by
`PartitionMaxBytesStrategy` which was added to the broker as part of PR
https://github.com/apache/kafka/pull/17870

Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
Abhinav Dixit 2025-03-12 18:49:19 +05:30 committed by GitHub
parent f3da8f500e
commit c07c59ad24
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 581 additions and 962 deletions

View File

@ -179,7 +179,7 @@ public class ShareSessionHandler {
return ShareFetchRequest.Builder.forConsumer(
groupId, nextMetadata, fetchConfig.maxWaitMs,
fetchConfig.minBytes, fetchConfig.maxBytes, fetchConfig.fetchSize, fetchConfig.maxPollRecords,
fetchConfig.minBytes, fetchConfig.maxBytes, fetchConfig.maxPollRecords,
added, removed, acknowledgementBatches);
}

View File

@ -28,10 +28,8 @@ import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class ShareFetchRequest extends AbstractRequest {
@ -49,19 +47,17 @@ public class ShareFetchRequest extends AbstractRequest {
}
public static Builder forConsumer(String groupId, ShareRequestMetadata metadata,
int maxWait, int minBytes, int maxBytes, int fetchSize, int batchSize,
int maxWait, int minBytes, int maxBytes, int batchSize,
List<TopicIdPartition> send, List<TopicIdPartition> forget,
Map<TopicIdPartition, List<ShareFetchRequestData.AcknowledgementBatch>> acknowledgementsMap) {
ShareFetchRequestData data = new ShareFetchRequestData();
data.setGroupId(groupId);
int ackOnlyPartitionMaxBytes = fetchSize;
boolean isClosingShareSession = false;
if (metadata != null) {
data.setMemberId(metadata.memberId().toString());
data.setShareSessionEpoch(metadata.epoch());
if (metadata.isFinalEpoch()) {
isClosingShareSession = true;
ackOnlyPartitionMaxBytes = 0;
}
}
data.setMaxWaitMs(maxWait);
@ -77,8 +73,7 @@ public class ShareFetchRequest extends AbstractRequest {
for (TopicIdPartition tip : send) {
Map<Integer, ShareFetchRequestData.FetchPartition> partMap = fetchMap.computeIfAbsent(tip.topicId(), k -> new HashMap<>());
ShareFetchRequestData.FetchPartition fetchPartition = new ShareFetchRequestData.FetchPartition()
.setPartitionIndex(tip.partition())
.setPartitionMaxBytes(fetchSize);
.setPartitionIndex(tip.partition());
partMap.put(tip.partition(), fetchPartition);
}
}
@ -91,8 +86,7 @@ public class ShareFetchRequest extends AbstractRequest {
ShareFetchRequestData.FetchPartition fetchPartition = partMap.get(tip.partition());
if (fetchPartition == null) {
fetchPartition = new ShareFetchRequestData.FetchPartition()
.setPartitionIndex(tip.partition())
.setPartitionMaxBytes(ackOnlyPartitionMaxBytes);
.setPartitionIndex(tip.partition());
partMap.put(tip.partition(), fetchPartition);
}
fetchPartition.setAcknowledgementBatches(acknowledgeEntry.getValue());
@ -151,7 +145,7 @@ public class ShareFetchRequest extends AbstractRequest {
}
private final ShareFetchRequestData data;
private volatile LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData = null;
private volatile List<TopicIdPartition> shareFetchData = null;
private volatile List<TopicIdPartition> toForget = null;
public ShareFetchRequest(ShareFetchRequestData data, short version) {
@ -179,41 +173,6 @@ public class ShareFetchRequest extends AbstractRequest {
);
}
public static final class SharePartitionData {
public final Uuid topicId;
public final int maxBytes;
public SharePartitionData(
Uuid topicId,
int maxBytes
) {
this.topicId = topicId;
this.maxBytes = maxBytes;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShareFetchRequest.SharePartitionData that = (ShareFetchRequest.SharePartitionData) o;
return Objects.equals(topicId, that.topicId) &&
maxBytes == that.maxBytes;
}
@Override
public int hashCode() {
return Objects.hash(topicId, maxBytes);
}
@Override
public String toString() {
return "SharePartitionData(" +
"topicId=" + topicId +
", maxBytes=" + maxBytes +
')';
}
}
public int minBytes() {
return data.minBytes();
}
@ -226,23 +185,18 @@ public class ShareFetchRequest extends AbstractRequest {
return data.maxWaitMs();
}
public Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData(Map<Uuid, String> topicNames) {
public List<TopicIdPartition> shareFetchData(Map<Uuid, String> topicNames) {
if (shareFetchData == null) {
synchronized (this) {
if (shareFetchData == null) {
// Assigning the lazy-initialized `shareFetchData` in the last step
// to avoid other threads accessing a half-initialized object.
final LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchDataTmp = new LinkedHashMap<>();
final List<TopicIdPartition> shareFetchDataTmp = new ArrayList<>();
data.topics().forEach(shareFetchTopic -> {
String name = topicNames.get(shareFetchTopic.topicId());
shareFetchTopic.partitions().forEach(shareFetchPartition -> {
// Topic name may be null here if the topic name was unable to be resolved using the topicNames map.
shareFetchDataTmp.put(new TopicIdPartition(shareFetchTopic.topicId(), new TopicPartition(name, shareFetchPartition.partitionIndex())),
new ShareFetchRequest.SharePartitionData(
shareFetchTopic.topicId(),
shareFetchPartition.partitionMaxBytes()
)
);
shareFetchDataTmp.add(new TopicIdPartition(shareFetchTopic.topicId(), shareFetchPartition.partitionIndex(), name));
});
});
shareFetchData = shareFetchDataTmp;

View File

@ -46,8 +46,6 @@
"about": "The partitions to fetch.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
"about": "TO BE REMOVED. The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." },
{ "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
"about": "Record batches to acknowledge.", "fields": [
{ "name": "FirstOffset", "type": "int64", "versions": "0+",

View File

@ -181,7 +181,7 @@ public class DelayedShareFetch extends DelayedOperation {
return;
} else {
// Update metric to record acquired to requested partitions.
double requestTopicToAcquired = (double) topicPartitionData.size() / shareFetch.partitionMaxBytes().size();
double requestTopicToAcquired = (double) topicPartitionData.size() / shareFetch.topicIdPartitions().size();
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) (requestTopicToAcquired * 100));
}
log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}",

View File

@ -30,7 +30,6 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.requests.ShareRequestMetadata;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.Time;
@ -248,7 +247,7 @@ public class SharePartitionManager implements AutoCloseable {
* @param memberId The member id, generated by the group-coordinator, this is used to identify the client.
* @param fetchParams The fetch parameters from the share fetch request.
* @param batchSize The number of records per acquired records batch.
* @param partitionMaxBytes The maximum number of bytes to fetch for each partition.
* @param topicIdPartitions The topic partitions to fetch for.
*
* @return A future that will be completed with the fetched messages.
*/
@ -258,17 +257,17 @@ public class SharePartitionManager implements AutoCloseable {
FetchParams fetchParams,
int sessionEpoch,
int batchSize,
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes
List<TopicIdPartition> topicIdPartitions
) {
log.trace("Fetch request for topicIdPartitions: {} with groupId: {} fetch params: {}",
partitionMaxBytes.keySet(), groupId, fetchParams);
topicIdPartitions, groupId, fetchParams);
LinkedHashMap<TopicIdPartition, Integer> topicIdPartitions = PartitionRotateStrategy
List<TopicIdPartition> rotatedTopicIdPartitions = PartitionRotateStrategy
.type(PartitionRotateStrategy.StrategyType.ROUND_ROBIN)
.rotate(partitionMaxBytes, new PartitionRotateMetadata(sessionEpoch));
.rotate(topicIdPartitions, new PartitionRotateMetadata(sessionEpoch));
CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new CompletableFuture<>();
processShareFetch(new ShareFetch(fetchParams, groupId, memberId, future, topicIdPartitions, batchSize, maxFetchRecords, brokerTopicStats));
processShareFetch(new ShareFetch(fetchParams, groupId, memberId, future, rotatedTopicIdPartitions, batchSize, maxFetchRecords, brokerTopicStats));
return future;
}
@ -427,29 +426,20 @@ public class SharePartitionManager implements AutoCloseable {
/**
* The newContext method is used to create a new share fetch context for every share fetch request.
* @param groupId The group id in the share fetch request.
* @param shareFetchData The topic-partitions and their corresponding maxBytes data in the share fetch request.
* @param shareFetchData The topic-partitions in the share fetch request.
* @param toForget The topic-partitions to forget present in the share fetch request.
* @param reqMetadata The metadata in the share fetch request.
* @param isAcknowledgeDataPresent This tells whether the fetch request received includes piggybacked acknowledgements or not
* @return The new share fetch context object
*/
public ShareFetchContext newContext(String groupId, Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData,
public ShareFetchContext newContext(String groupId, List<TopicIdPartition> shareFetchData,
List<TopicIdPartition> toForget, ShareRequestMetadata reqMetadata, Boolean isAcknowledgeDataPresent) {
ShareFetchContext context;
// TopicPartition with maxBytes as 0 should not be added in the cachedPartitions
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchDataWithMaxBytes = new HashMap<>();
shareFetchData.forEach((tp, sharePartitionData) -> {
if (sharePartitionData.maxBytes > 0) shareFetchDataWithMaxBytes.put(tp, sharePartitionData);
});
// If the request's epoch is FINAL_EPOCH or INITIAL_EPOCH, we should remove the existing sessions. Also, start a
// new session in case it is INITIAL_EPOCH. Hence, we need to treat them as special cases.
if (reqMetadata.isFull()) {
ShareSessionKey key = shareSessionKey(groupId, reqMetadata.memberId());
if (reqMetadata.epoch() == ShareRequestMetadata.FINAL_EPOCH) {
// If the epoch is FINAL_EPOCH, don't try to create a new session.
if (!shareFetchDataWithMaxBytes.isEmpty()) {
throw Errors.INVALID_REQUEST.exception();
}
if (cache.get(key) == null) {
log.error("Share session error for {}: no such share session found", key);
throw Errors.SHARE_SESSION_NOT_FOUND.exception();
@ -464,9 +454,9 @@ public class SharePartitionManager implements AutoCloseable {
log.debug("Removed share session with key {}", key);
}
ImplicitLinkedHashCollection<CachedSharePartition> cachedSharePartitions = new
ImplicitLinkedHashCollection<>(shareFetchDataWithMaxBytes.size());
shareFetchDataWithMaxBytes.forEach((topicIdPartition, reqData) ->
cachedSharePartitions.mustAdd(new CachedSharePartition(topicIdPartition, reqData, false)));
ImplicitLinkedHashCollection<>(shareFetchData.size());
shareFetchData.forEach(topicIdPartition ->
cachedSharePartitions.mustAdd(new CachedSharePartition(topicIdPartition, false)));
ShareSessionKey responseShareSessionKey = cache.maybeCreateSession(groupId, reqMetadata.memberId(),
time.milliseconds(), cachedSharePartitions);
if (responseShareSessionKey == null) {
@ -474,10 +464,10 @@ public class SharePartitionManager implements AutoCloseable {
throw Errors.SHARE_SESSION_NOT_FOUND.exception();
}
context = new ShareSessionContext(reqMetadata, shareFetchDataWithMaxBytes);
context = new ShareSessionContext(reqMetadata, shareFetchData);
log.debug("Created a new ShareSessionContext with key {} isSubsequent {} returning {}. A new share " +
"session will be started.", responseShareSessionKey, false,
partitionsToLogString(shareFetchDataWithMaxBytes.keySet()));
partitionsToLogString(shareFetchData));
}
} else {
// We update the already existing share session.
@ -494,7 +484,7 @@ public class SharePartitionManager implements AutoCloseable {
throw Errors.INVALID_SHARE_SESSION_EPOCH.exception();
}
Map<ShareSession.ModifiedTopicIdPartitionType, List<TopicIdPartition>> modifiedTopicIdPartitions = shareSession.update(
shareFetchDataWithMaxBytes, toForget);
shareFetchData, toForget);
cache.touch(shareSession, time.milliseconds());
shareSession.epoch = ShareRequestMetadata.nextEpoch(shareSession.epoch);
log.debug("Created a new ShareSessionContext for session key {}, epoch {}: " +
@ -586,7 +576,7 @@ public class SharePartitionManager implements AutoCloseable {
// Visible for testing.
void processShareFetch(ShareFetch shareFetch) {
if (shareFetch.partitionMaxBytes().isEmpty()) {
if (shareFetch.topicIdPartitions().isEmpty()) {
// If there are no partitions to fetch then complete the future with an empty map.
shareFetch.maybeComplete(Collections.emptyMap());
return;
@ -596,7 +586,7 @@ public class SharePartitionManager implements AutoCloseable {
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>();
// Track the topics for which we have received a share fetch request for metrics.
Set<String> topics = new HashSet<>();
for (TopicIdPartition topicIdPartition : shareFetch.partitionMaxBytes().keySet()) {
for (TopicIdPartition topicIdPartition : shareFetch.topicIdPartitions()) {
topics.add(topicIdPartition.topic());
SharePartitionKey sharePartitionKey = sharePartitionKey(
shareFetch.groupId(),

View File

@ -2996,12 +2996,8 @@ class KafkaApis(val requestChannel: RequestChannel,
erroneousAndValidPartitionData.erroneous.forEach {
case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp
}
erroneousAndValidPartitionData.validTopicIdPartitions.forEach {
case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp
}
shareFetchData.forEach {
case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp
}
erroneousAndValidPartitionData.validTopicIdPartitions.forEach(tp => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp)
shareFetchData.forEach { tp => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp}
// Kafka share consumers need READ permission on each topic they are fetching.
val authorizedTopics = authHelper.filterByAuthorized(
@ -3138,15 +3134,15 @@ class KafkaApis(val requestChannel: RequestChannel,
val erroneous = mutable.Map.empty[TopicIdPartition, ShareFetchResponseData.PartitionData]
erroneousAndValidPartitionData.erroneous.forEach { (topicIdPartition, partitionData) => erroneous.put(topicIdPartition, partitionData) }
val interestedWithMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer]
val interestedTopicPartitions = new util.ArrayList[TopicIdPartition]
erroneousAndValidPartitionData.validTopicIdPartitions.forEach { case (topicIdPartition, sharePartitionData) =>
erroneousAndValidPartitionData.validTopicIdPartitions.forEach { case topicIdPartition =>
if (!authorizedTopics.contains(topicIdPartition.topicPartition.topic))
erroneous += topicIdPartition -> ShareFetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicIdPartition.topicPartition))
erroneous += topicIdPartition -> ShareFetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
interestedWithMaxBytes.put(topicIdPartition, sharePartitionData.maxBytes)
interestedTopicPartitions.add(topicIdPartition)
}
val shareFetchRequest = request.body[ShareFetchRequest]
@ -3154,7 +3150,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val clientId = request.header.clientId
val groupId = shareFetchRequest.data.groupId
if (interestedWithMaxBytes.isEmpty) {
if (interestedTopicPartitions.isEmpty) {
CompletableFuture.completedFuture(erroneous)
} else {
// for share fetch from consumer, cap fetchMaxBytes to the maximum bytes that could be fetched without being
@ -3191,7 +3187,7 @@ class KafkaApis(val requestChannel: RequestChannel,
params,
shareSessionEpoch,
shareFetchRequest.data.batchSize,
interestedWithMaxBytes
interestedTopicPartitions
).thenApply{ result =>
val combinedResult = mutable.Map.empty[TopicIdPartition, ShareFetchResponseData.PartitionData]
result.asScala.foreach { case (tp, data) =>

View File

@ -69,11 +69,9 @@ import scala.Tuple2;
import scala.jdk.javaapi.CollectionConverters;
import static kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL;
import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES;
import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult;
import static kafka.server.share.SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.orderedMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@ -120,7 +118,6 @@ public class DelayedShareFetchTest {
Uuid topicId = Uuid.randomUuid();
TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);
SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
@ -133,7 +130,7 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp1, sp1);
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS,
new CompletableFuture<>(), List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
when(sp0.canAcquireRecords()).thenReturn(false);
@ -166,7 +163,6 @@ public class DelayedShareFetchTest {
ReplicaManager replicaManager = mock(ReplicaManager.class);
TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);
SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
@ -181,7 +177,7 @@ public class DelayedShareFetchTest {
ShareFetch shareFetch = new ShareFetch(
new FetchParams(FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS,
new CompletableFuture<>(), List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
when(sp0.canAcquireRecords()).thenReturn(true);
@ -198,7 +194,7 @@ public class DelayedShareFetchTest {
LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, 1);
mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata);
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
doAnswer(invocation -> buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
BiConsumer<SharePartitionKey, Throwable> exceptionHandler = mockExceptionHandler();
PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Collections.singleton(tp0));
@ -239,7 +235,6 @@ public class DelayedShareFetchTest {
ReplicaManager replicaManager = mock(ReplicaManager.class);
TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);
SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
@ -254,7 +249,7 @@ public class DelayedShareFetchTest {
ShareFetch shareFetch = new ShareFetch(
new FetchParams(FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS,
new CompletableFuture<>(), List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
when(sp0.canAcquireRecords()).thenReturn(true);
@ -294,7 +289,6 @@ public class DelayedShareFetchTest {
ReplicaManager replicaManager = mock(ReplicaManager.class);
TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);
SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
@ -307,14 +301,14 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp1, sp1);
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS,
new CompletableFuture<>(), List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
doAnswer(invocation -> buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0, 1);
@ -354,7 +348,6 @@ public class DelayedShareFetchTest {
ReplicaManager replicaManager = mock(ReplicaManager.class);
TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);
SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
@ -368,7 +361,7 @@ public class DelayedShareFetchTest {
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
future, partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS, BROKER_TOPIC_STATS);
future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, BROKER_TOPIC_STATS);
when(sp0.canAcquireRecords()).thenReturn(false);
when(sp1.canAcquireRecords()).thenReturn(false);
@ -409,7 +402,6 @@ public class DelayedShareFetchTest {
ReplicaManager replicaManager = mock(ReplicaManager.class);
TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);
SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
@ -422,14 +414,14 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp1, sp1);
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS,
new CompletableFuture<>(), List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
doAnswer(invocation -> buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Collections.singleton(tp0));
@ -470,7 +462,6 @@ public class DelayedShareFetchTest {
Uuid topicId = Uuid.randomUuid();
ReplicaManager replicaManager = mock(ReplicaManager.class);
TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0);
SharePartition sp0 = mock(SharePartition.class);
@ -479,7 +470,7 @@ public class DelayedShareFetchTest {
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
future, partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS, BROKER_TOPIC_STATS);
future, List.of(tp0), BATCH_SIZE, MAX_FETCH_RECORDS, BROKER_TOPIC_STATS);
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp0.canAcquireRecords()).thenReturn(false);
@ -525,7 +516,7 @@ public class DelayedShareFetchTest {
TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
TopicIdPartition tp2 = new TopicIdPartition(topicId, new TopicPartition("foo", 2));
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes1 = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);
List<TopicIdPartition> topicIdPartitions1 = List.of(tp0, tp1);
SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
@ -541,7 +532,7 @@ public class DelayedShareFetchTest {
sharePartitions1.put(tp2, sp2);
ShareFetch shareFetch1 = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes1, BATCH_SIZE, MAX_FETCH_RECORDS,
new CompletableFuture<>(), topicIdPartitions1, BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
@ -550,7 +541,7 @@ public class DelayedShareFetchTest {
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
List<DelayedOperationKey> delayedShareFetchWatchKeys = new ArrayList<>();
partitionMaxBytes1.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
topicIdPartitions1.forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
DelayedShareFetch delayedShareFetch1 = DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch1)
@ -567,12 +558,11 @@ public class DelayedShareFetchTest {
assertTrue(delayedShareFetch1.lock().tryLock());
delayedShareFetch1.lock().unlock();
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes2 = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);
ShareFetch shareFetch2 = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes2, BATCH_SIZE, MAX_FETCH_RECORDS,
new CompletableFuture<>(), List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
doAnswer(invocation -> buildLogReadResult(List.of(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Collections.singleton(tp1));
@ -615,7 +605,6 @@ public class DelayedShareFetchTest {
ReplicaManager replicaManager = mock(ReplicaManager.class);
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);
SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
@ -628,7 +617,7 @@ public class DelayedShareFetchTest {
ShareFetch shareFetch = new ShareFetch(
new FetchParams(FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
future, partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS, BROKER_TOPIC_STATS);
future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, BROKER_TOPIC_STATS);
PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Collections.singleton(tp1));
@ -652,7 +641,7 @@ public class DelayedShareFetchTest {
when(logReadResult.info()).thenReturn(fetchDataInfo);
logReadResponse.put(tp0, logReadResult);
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
doAnswer(invocation -> buildLogReadResult(List.of(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
LinkedHashMap<TopicIdPartition, LogReadResult> combinedLogReadResponse = delayedShareFetch.combineLogReadResponse(topicPartitionData, logReadResponse);
assertEquals(topicPartitionData.keySet(), combinedLogReadResponse.keySet());
assertEquals(combinedLogReadResponse.get(tp0), logReadResponse.get(tp0));
@ -673,7 +662,6 @@ public class DelayedShareFetchTest {
Uuid topicId = Uuid.randomUuid();
ReplicaManager replicaManager = mock(ReplicaManager.class);
TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0);
SharePartition sp0 = mock(SharePartition.class);
@ -685,13 +673,13 @@ public class DelayedShareFetchTest {
ShareFetch shareFetch = new ShareFetch(
new FetchParams(FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS,
new CompletableFuture<>(), List.of(tp0), BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any())).thenReturn(
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
doAnswer(invocation -> buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
// Mocking partition object to throw an exception during min bytes calculation while calling fetchOffsetSnapshot
Partition partition = mock(Partition.class);
@ -752,7 +740,6 @@ public class DelayedShareFetchTest {
public void testTryCompleteLocksReleasedOnCompleteException() {
ReplicaManager replicaManager = mock(ReplicaManager.class);
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0);
SharePartition sp0 = mock(SharePartition.class);
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
@ -763,10 +750,10 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp0, sp0);
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS,
new CompletableFuture<>(), List.of(tp0), BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
doAnswer(invocation -> buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0, 1);
PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Collections.singleton(tp0));
@ -799,11 +786,11 @@ public class DelayedShareFetchTest {
sharePartitions1.put(tp0, sp0);
ReplicaManager replicaManager = mock(ReplicaManager.class);
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
doAnswer(invocation -> buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0, 1);
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), orderedMap(PARTITION_MAX_BYTES, tp0), BATCH_SIZE, MAX_FETCH_RECORDS,
new CompletableFuture<>(), List.of(tp0), BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Collections.singleton(tp0));
@ -837,7 +824,7 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp0, sp0);
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), orderedMap(PARTITION_MAX_BYTES, tp0), BATCH_SIZE, MAX_FETCH_RECORDS,
new CompletableFuture<>(), List.of(tp0), BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
@ -856,7 +843,6 @@ public class DelayedShareFetchTest {
String groupId = "grp";
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
SharePartition sp0 = mock(SharePartition.class);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0);
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp0.canAcquireRecords()).thenReturn(true);
@ -867,7 +853,7 @@ public class DelayedShareFetchTest {
ShareFetch shareFetch = new ShareFetch(
new FetchParams(FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
future, partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS, BROKER_TOPIC_STATS);
future, List.of(tp0), BATCH_SIZE, MAX_FETCH_RECORDS, BROKER_TOPIC_STATS);
// partitionMaxBytesStrategy.maxBytes() function throws an exception
PartitionMaxBytesStrategy partitionMaxBytesStrategy = mock(PartitionMaxBytesStrategy.class);
@ -912,8 +898,6 @@ public class DelayedShareFetchTest {
SharePartition sp3 = mock(SharePartition.class);
SharePartition sp4 = mock(SharePartition.class);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1, tp2, tp3, tp4);
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
when(sp2.maybeAcquireFetchLock()).thenReturn(true);
@ -933,7 +917,7 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp4, sp4);
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS,
new CompletableFuture<>(), List.of(tp0, tp1, tp2, tp3, tp4), BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
@ -948,7 +932,7 @@ public class DelayedShareFetchTest {
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
// All 5 partitions are acquirable.
doAnswer(invocation -> buildLogReadResult(sharePartitions.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
doAnswer(invocation -> buildLogReadResult(sharePartitions.keySet().stream().toList())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
@ -1010,8 +994,6 @@ public class DelayedShareFetchTest {
SharePartition sp3 = mock(SharePartition.class);
SharePartition sp4 = mock(SharePartition.class);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1, tp2, tp3, tp4);
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
when(sp2.maybeAcquireFetchLock()).thenReturn(false);
@ -1031,7 +1013,7 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp4, sp4);
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS,
new CompletableFuture<>(), List.of(tp0, tp1, tp2, tp3, tp4), BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
@ -1043,7 +1025,7 @@ public class DelayedShareFetchTest {
Set<TopicIdPartition> acquirableTopicPartitions = new LinkedHashSet<>();
acquirableTopicPartitions.add(tp0);
acquirableTopicPartitions.add(tp1);
doAnswer(invocation -> buildLogReadResult(acquirableTopicPartitions)).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
doAnswer(invocation -> buildLogReadResult(acquirableTopicPartitions.stream().toList())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
@ -1090,7 +1072,6 @@ public class DelayedShareFetchTest {
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2));
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1, tp2);
SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
@ -1104,7 +1085,7 @@ public class DelayedShareFetchTest {
ShareFetch shareFetch = new ShareFetch(
new FetchParams(FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS,
new CompletableFuture<>(), List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder()
@ -1132,7 +1113,7 @@ public class DelayedShareFetchTest {
fetchableTopicPartitions.add(tp1);
fetchableTopicPartitions.add(tp2);
// We will be doing replica manager fetch only for tp1 and tp2.
doAnswer(invocation -> buildLogReadResult(fetchableTopicPartitions)).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
doAnswer(invocation -> buildLogReadResult(fetchableTopicPartitions.stream().toList())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
LinkedHashMap<TopicIdPartition, LogReadResult> combinedLogReadResponse = delayedShareFetch.combineLogReadResponse(topicPartitionData, logReadResponse);
assertEquals(topicPartitionData.keySet(), combinedLogReadResponse.keySet());
@ -1162,7 +1143,7 @@ public class DelayedShareFetchTest {
public void testOnCompleteExecutionOnTimeout() {
ShareFetch shareFetch = new ShareFetch(
FETCH_PARAMS, "grp", Uuid.randomUuid().toString(),
new CompletableFuture<>(), new LinkedHashMap<>(), BATCH_SIZE, MAX_FETCH_RECORDS,
new CompletableFuture<>(), List.of(), BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)

View File

@ -58,11 +58,9 @@ import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createFileRecords;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.orderedMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -94,7 +92,6 @@ public class ShareFetchUtilsTest {
String memberId = Uuid.randomUuid().toString();
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);
SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
@ -114,7 +111,7 @@ public class ShareFetchUtilsTest {
sharePartitions.put(tp1, sp1);
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, memberId,
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, 100, BROKER_TOPIC_STATS);
new CompletableFuture<>(), List.of(tp0, tp1), BATCH_SIZE, 100, BROKER_TOPIC_STATS);
MemoryRecords records = MemoryRecords.withRecords(Compression.NONE,
new SimpleRecord("0".getBytes(), "v".getBytes()),
@ -160,7 +157,6 @@ public class ShareFetchUtilsTest {
String memberId = Uuid.randomUuid().toString();
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);
SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
@ -176,7 +172,7 @@ public class ShareFetchUtilsTest {
sharePartitions.put(tp1, sp1);
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, memberId,
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, 100, BROKER_TOPIC_STATS);
new CompletableFuture<>(), List.of(tp0, tp1), BATCH_SIZE, 100, BROKER_TOPIC_STATS);
List<ShareFetchPartitionData> responseData = List.of(
new ShareFetchPartitionData(tp0, 0, new FetchPartitionData(Errors.NONE, 0L, 0L,
@ -207,8 +203,6 @@ public class ShareFetchUtilsTest {
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);
SharePartition sp0 = Mockito.mock(SharePartition.class);
SharePartition sp1 = Mockito.mock(SharePartition.class);
@ -217,7 +211,7 @@ public class ShareFetchUtilsTest {
sharePartitions.put(tp1, sp1);
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, 100, BROKER_TOPIC_STATS);
new CompletableFuture<>(), List.of(tp0, tp1), BATCH_SIZE, 100, BROKER_TOPIC_STATS);
ReplicaManager replicaManager = mock(ReplicaManager.class);
@ -303,14 +297,13 @@ public class ShareFetchUtilsTest {
String groupId = "grp";
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0);
SharePartition sp0 = Mockito.mock(SharePartition.class);
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>();
sharePartitions.put(tp0, sp0);
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, 100, BROKER_TOPIC_STATS);
new CompletableFuture<>(), List.of(tp0), BATCH_SIZE, 100, BROKER_TOPIC_STATS);
ReplicaManager replicaManager = mock(ReplicaManager.class);
@ -365,8 +358,6 @@ public class ShareFetchUtilsTest {
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);
SharePartition sp0 = Mockito.mock(SharePartition.class);
SharePartition sp1 = Mockito.mock(SharePartition.class);
@ -380,7 +371,7 @@ public class ShareFetchUtilsTest {
Uuid memberId = Uuid.randomUuid();
// Set max fetch records to 10
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, memberId.toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, 10, BROKER_TOPIC_STATS);
new CompletableFuture<>(), List.of(tp0, tp1), BATCH_SIZE, 10, BROKER_TOPIC_STATS);
LinkedHashMap<Long, Integer> recordsPerOffset = new LinkedHashMap<>();
recordsPerOffset.put(0L, 1);

File diff suppressed because it is too large Load Diff

View File

@ -6082,8 +6082,7 @@ class ReplicaManagerTest {
try {
val groupId = "grp"
val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 0))
val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer]
partitionMaxBytes.put(tp1, 1000)
val topicPartitions = util.List.of(tp1)
val sp1 = mock(classOf[SharePartition])
val sharePartitions = new util.LinkedHashMap[TopicIdPartition, SharePartition]
@ -6095,7 +6094,7 @@ class ReplicaManagerTest {
groupId,
Uuid.randomUuid.toString,
future,
partitionMaxBytes,
topicPartitions,
500,
100,
brokerTopicStats)
@ -6109,7 +6108,7 @@ class ReplicaManagerTest {
time))
val delayedShareFetchWatchKeys : util.List[DelayedShareFetchKey] = new util.ArrayList[DelayedShareFetchKey]
partitionMaxBytes.keySet.forEach((topicIdPartition: TopicIdPartition) => delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId, topicIdPartition.partition)))
topicPartitions.forEach((topicIdPartition: TopicIdPartition) => delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId, topicIdPartition.partition)))
// You cannot acquire records for sp1, so request will be stored in purgatory waiting for timeout.
when(sp1.maybeAcquireFetchLock).thenReturn(false)

View File

@ -37,8 +37,7 @@ import scala.jdk.CollectionConverters._
))
@Tag("integration")
class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster){
private final val MAX_PARTITION_BYTES = 10000
private final val MAX_WAIT_MS = 5000
@AfterEach
@ -59,7 +58,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic1", 1))
)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
assertEquals(Errors.UNSUPPORTED_VERSION.code(), shareFetchResponse.data().errorCode())
@ -127,7 +126,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the share fetch request to the non-replica and verify the error code
val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest, nonReplicaId)
val partitionData = shareFetchResponse.responseData(topicNames).get(topicIdPartition)
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, partitionData.errorCode)
@ -181,7 +180,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Send the second share fetch request to fetch the records produced above
val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMap)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
@ -251,7 +250,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Send the second share fetch request to fetch the records produced above
val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMap)
// For the multi partition fetch request, the response may not be available in the first attempt
// as the share partitions might not be initialized yet. So, we retry until we get the response.
@ -352,9 +351,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Send the first share fetch request to initialize the share partitions
// Create different share fetch requests for different partitions as they may have leaders on separate brokers
var shareFetchRequest1 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send1, Seq.empty, acknowledgementsMap)
var shareFetchRequest2 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send2, Seq.empty, acknowledgementsMap)
var shareFetchRequest3 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send3, Seq.empty, acknowledgementsMap)
var shareFetchRequest1 = createShareFetchRequest(groupId, metadata, send1, Seq.empty, acknowledgementsMap)
var shareFetchRequest2 = createShareFetchRequest(groupId, metadata, send2, Seq.empty, acknowledgementsMap)
var shareFetchRequest3 = createShareFetchRequest(groupId, metadata, send3, Seq.empty, acknowledgementsMap)
var shareFetchResponse1 = connectAndReceive[ShareFetchResponse](shareFetchRequest1, destination = leader1)
var shareFetchResponse2 = connectAndReceive[ShareFetchResponse](shareFetchRequest2, destination = leader2)
@ -368,9 +367,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Send the second share fetch request to fetch the records produced above
// Create different share fetch requests for different partitions as they may have leaders on separate brokers
shareFetchRequest1 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send1, Seq.empty, acknowledgementsMap)
shareFetchRequest2 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send2, Seq.empty, acknowledgementsMap)
shareFetchRequest3 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send3, Seq.empty, acknowledgementsMap)
shareFetchRequest1 = createShareFetchRequest(groupId, metadata, send1, Seq.empty, acknowledgementsMap)
shareFetchRequest2 = createShareFetchRequest(groupId, metadata, send2, Seq.empty, acknowledgementsMap)
shareFetchRequest3 = createShareFetchRequest(groupId, metadata, send3, Seq.empty, acknowledgementsMap)
shareFetchResponse1 = connectAndReceive[ShareFetchResponse](shareFetchRequest1, destination = leader1)
shareFetchResponse2 = connectAndReceive[ShareFetchResponse](shareFetchRequest2, destination = leader2)
@ -469,7 +468,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
@ -517,7 +516,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Sending a third share fetch request to check if acknowledgements were done successfully
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
@ -585,7 +584,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var shareSessionEpoch: Int = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
@ -613,7 +612,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setFirstOffset(0)
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
@ -637,7 +636,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Sending a fourth share fetch request to confirm if acknowledgements were done successfully
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
@ -705,7 +704,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
@ -750,7 +749,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Sending a third share fetch request to check if acknowledgements were done successfully
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
@ -818,7 +817,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
@ -862,7 +861,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava) // Release the records
releaseAcknowledgementSent = true
}
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
@ -935,7 +934,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
@ -983,7 +982,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Sending a third share fetch request to check if acknowledgements were done successfully
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
@ -1051,7 +1050,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
@ -1079,7 +1078,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setFirstOffset(0)
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(3.toByte))).asJava) // Reject the records
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
@ -1103,7 +1102,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Sending a fourth share fetch request to confirm if acknowledgements were done successfully
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
@ -1173,7 +1172,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
@ -1218,7 +1217,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Sending a third share fetch request to check if acknowledgements were done successfully
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
@ -1265,7 +1264,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Sending a fourth share fetch request to check if acknowledgements were done successfully
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
@ -1284,79 +1283,6 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
compareFetchResponsePartitions(expectedFetchPartitionData, fetchPartitionData)
}
@ClusterTests(
Array(
new ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
new ClusterConfigProperty(key = "group.share.enable", value = "true"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
)
),
new ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
new ClusterConfigProperty(key = "group.share.enable", value = "true"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
new ClusterConfigProperty(key = "group.share.persister.class.name", value = "org.apache.kafka.server.share.persister.DefaultStatePersister"),
new ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "1"),
new ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true")
)
),
)
)
def testShareFetchBrokerDoesNotRespectPartitionsSizeLimit(): Unit = {
val groupId: String = "group"
val memberId = Uuid.randomUuid()
val topic = "topic"
val partition = 0
createTopicAndReturnLeaders(topic, numPartitions = 3)
val topicIds = getTopicIds.asJava
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send)
initProducer()
// Producing 3 large messages to the topic created above
produceData(topicIdPartition, 10)
produceData(topicIdPartition, "large message 1", new String(new Array[Byte](MAX_PARTITION_BYTES/3)))
produceData(topicIdPartition, "large message 2", new String(new Array[Byte](MAX_PARTITION_BYTES/3)))
produceData(topicIdPartition, "large message 3", new String(new Array[Byte](MAX_PARTITION_BYTES/3)))
// Send the second share fetch request to fetch the records produced above
val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())
val expectedPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
.setErrorCode(Errors.NONE.code())
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), Collections.singletonList(12), Collections.singletonList(1)))
// The first 10 records will be consumed as it is. For the last 3 records, each of size MAX_PARTITION_BYTES/3,
// all 3 of then will be consumed (offsets 10, 11 and 12) because even though the inclusion of the third last record will exceed
// the max partition bytes limit. We should only consider the request level maxBytes as the hard limit.
val partitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
compareFetchResponsePartitions(expectedPartitionData, partitionData)
}
@ClusterTests(
Array(
new ClusterTest(
@ -1410,15 +1336,15 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// mocking the behaviour of multiple share consumers from the same share group
val metadata1: ShareRequestMetadata = new ShareRequestMetadata(memberId1, ShareRequestMetadata.INITIAL_EPOCH)
val acknowledgementsMap1: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest1 = createShareFetchRequest(groupId, metadata1, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap1, minBytes = 100, maxBytes = 1500)
val shareFetchRequest1 = createShareFetchRequest(groupId, metadata1, send, Seq.empty, acknowledgementsMap1, minBytes = 100, maxBytes = 1500)
val metadata2: ShareRequestMetadata = new ShareRequestMetadata(memberId2, ShareRequestMetadata.INITIAL_EPOCH)
val acknowledgementsMap2: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest2 = createShareFetchRequest(groupId, metadata2, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap2, minBytes = 100, maxBytes = 1500)
val shareFetchRequest2 = createShareFetchRequest(groupId, metadata2, send, Seq.empty, acknowledgementsMap2, minBytes = 100, maxBytes = 1500)
val metadata3: ShareRequestMetadata = new ShareRequestMetadata(memberId3, ShareRequestMetadata.INITIAL_EPOCH)
val acknowledgementsMap3: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest3 = createShareFetchRequest(groupId, metadata3, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap3, minBytes = 100, maxBytes = 1500)
val shareFetchRequest3 = createShareFetchRequest(groupId, metadata3, send, Seq.empty, acknowledgementsMap3, minBytes = 100, maxBytes = 1500)
val shareFetchResponse1 = connectAndReceive[ShareFetchResponse](shareFetchRequest1)
val shareFetchResponse2 = connectAndReceive[ShareFetchResponse](shareFetchRequest2)
@ -1510,15 +1436,15 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// mocking the behaviour of 3 different share groups
val metadata1 = new ShareRequestMetadata(memberId1, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
val acknowledgementsMap1: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest1 = createShareFetchRequest(groupId1, metadata1, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap1)
val shareFetchRequest1 = createShareFetchRequest(groupId1, metadata1, send, Seq.empty, acknowledgementsMap1)
val metadata2 = new ShareRequestMetadata(memberId2, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
val acknowledgementsMap2: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest2 = createShareFetchRequest(groupId2, metadata2, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap2)
val shareFetchRequest2 = createShareFetchRequest(groupId2, metadata2, send, Seq.empty, acknowledgementsMap2)
val metadata3 = new ShareRequestMetadata(memberId3, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
val acknowledgementsMap3: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest3 = createShareFetchRequest(groupId3, metadata3, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap3)
val shareFetchRequest3 = createShareFetchRequest(groupId3, metadata3, send, Seq.empty, acknowledgementsMap3)
val shareFetchResponse1 = connectAndReceive[ShareFetchResponse](shareFetchRequest1)
val shareFetchResponse2 = connectAndReceive[ShareFetchResponse](shareFetchRequest2)
@ -1604,7 +1530,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
@ -1632,7 +1558,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setFirstOffset(0)
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
@ -1657,7 +1583,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setFirstOffset(10)
.setLastOffset(19)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records
shareFetchRequest = createShareFetchRequest(groupId, metadata, 0, send, Seq.empty, Map.empty)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
@ -1714,7 +1640,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
@ -1742,7 +1668,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setFirstOffset(0)
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
@ -1833,7 +1759,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setFirstOffset(0)
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Acknowledgements in the Initial Fetch Request
val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMap)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
@ -1940,7 +1866,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Send the second share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
@ -1961,7 +1887,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Sending a thord Share Fetch request with invalid share session epoch
shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(shareSessionEpoch))
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
@ -2016,7 +1942,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Send the second share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
@ -2098,7 +2024,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Send the second share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
@ -2119,7 +2045,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Sending a third Share Fetch request with wrong member Id
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
@ -2175,7 +2101,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Send the second share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
@ -2260,7 +2186,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMap)
// For the multi partition fetch request, the response may not be available in the first attempt
// as the share partitions might not be initialized yet. So, we retry until we get the response.
@ -2290,7 +2216,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val forget: Seq[TopicIdPartition] = Seq(topicIdPartition1)
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, Seq.empty, forget, acknowledgementsMap)
shareFetchRequest = createShareFetchRequest(groupId, metadata, Seq.empty, forget, acknowledgementsMap)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
@ -2315,7 +2241,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val partitions: util.Set[Integer] = new util.HashSet()
TestUtils.waitUntilTrue(() => {
val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, topicIdPartitions, Seq.empty, Map.empty)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, topicIdPartitions, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
@ -2358,7 +2284,6 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
private def createShareFetchRequest(groupId: String,
metadata: ShareRequestMetadata,
maxPartitionBytes: Int,
send: Seq[TopicIdPartition],
forget: Seq[TopicIdPartition],
acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]],
@ -2366,7 +2291,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
minBytes: Int = 0,
maxBytes: Int = Int.MaxValue,
batchSize: Int = 500): ShareFetchRequest = {
ShareFetchRequest.Builder.forConsumer(groupId, metadata, maxWaitMs, minBytes, maxBytes, maxPartitionBytes, batchSize, send.asJava, forget.asJava, acknowledgementsMap.asJava)
ShareFetchRequest.Builder.forConsumer(groupId, metadata, maxWaitMs, minBytes, maxBytes, batchSize, send.asJava, forget.asJava, acknowledgementsMap.asJava)
.build()
}

View File

@ -21,7 +21,6 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.requests.ShareFetchResponse;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
@ -42,33 +41,30 @@ public class CachedSharePartition implements ImplicitLinkedHashCollection.Elemen
private final Uuid topicId;
private final int partition;
private final Optional<Integer> leaderEpoch;
private int maxBytes;
private boolean requiresUpdateInResponse;
private int cachedNext = ImplicitLinkedHashCollection.INVALID_INDEX;
private int cachedPrev = ImplicitLinkedHashCollection.INVALID_INDEX;
private CachedSharePartition(String topic, Uuid topicId, int partition, int maxBytes, Optional<Integer> leaderEpoch,
private CachedSharePartition(String topic, Uuid topicId, int partition, Optional<Integer> leaderEpoch,
boolean requiresUpdateInResponse) {
this.topic = topic;
this.topicId = topicId;
this.partition = partition;
this.maxBytes = maxBytes;
this.leaderEpoch = leaderEpoch;
this.requiresUpdateInResponse = requiresUpdateInResponse;
}
public CachedSharePartition(String topic, Uuid topicId, int partition, boolean requiresUpdateInResponse) {
this(topic, topicId, partition, -1, Optional.empty(), requiresUpdateInResponse);
this(topic, topicId, partition, Optional.empty(), requiresUpdateInResponse);
}
public CachedSharePartition(TopicIdPartition topicIdPartition) {
this(topicIdPartition.topic(), topicIdPartition.topicId(), topicIdPartition.partition(), false);
}
public CachedSharePartition(TopicIdPartition topicIdPartition, ShareFetchRequest.SharePartitionData reqData,
boolean requiresUpdateInResponse) {
this(topicIdPartition.topic(), topicIdPartition.topicId(), topicIdPartition.partition(), reqData.maxBytes,
public CachedSharePartition(TopicIdPartition topicIdPartition, boolean requiresUpdateInResponse) {
this(topicIdPartition.topic(), topicIdPartition.topicId(), topicIdPartition.partition(),
Optional.empty(), requiresUpdateInResponse);
}
@ -84,15 +80,6 @@ public class CachedSharePartition implements ImplicitLinkedHashCollection.Elemen
return partition;
}
public ShareFetchRequest.SharePartitionData reqData() {
return new ShareFetchRequest.SharePartitionData(topicId, maxBytes);
}
public void updateRequestParams(ShareFetchRequest.SharePartitionData reqData) {
// Update our cached request parameters.
maxBytes = reqData.maxBytes;
}
/**
* Determine whether the specified cached partition should be included in the ShareFetchResponse we send back to
* the fetcher and update it if requested.
@ -128,7 +115,6 @@ public class CachedSharePartition implements ImplicitLinkedHashCollection.Elemen
return "CachedSharePartition(topic=" + topic +
", topicId=" + topicId +
", partition=" + partition +
", maxBytes=" + maxBytes +
", leaderEpoch=" + leaderEpoch +
")";
}

View File

@ -20,10 +20,11 @@ package org.apache.kafka.server.share;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.requests.ShareFetchResponse;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
@ -31,36 +32,36 @@ import java.util.Map;
*/
public class ErroneousAndValidPartitionData {
private final Map<TopicIdPartition, ShareFetchResponseData.PartitionData> erroneous;
private final Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> validTopicIdPartitions;
private final List<TopicIdPartition> validTopicIdPartitions;
public ErroneousAndValidPartitionData(Map<TopicIdPartition, ShareFetchResponseData.PartitionData> erroneous,
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> validTopicIdPartitions) {
List<TopicIdPartition> validTopicIdPartitions) {
this.erroneous = erroneous;
this.validTopicIdPartitions = validTopicIdPartitions;
}
public ErroneousAndValidPartitionData(Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData) {
public ErroneousAndValidPartitionData(List<TopicIdPartition> shareFetchData) {
erroneous = new HashMap<>();
validTopicIdPartitions = new HashMap<>();
shareFetchData.forEach((topicIdPartition, sharePartitionData) -> {
validTopicIdPartitions = new ArrayList<>();
shareFetchData.forEach(topicIdPartition -> {
if (topicIdPartition.topic() == null) {
erroneous.put(topicIdPartition, ShareFetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID));
} else {
validTopicIdPartitions.put(topicIdPartition, sharePartitionData);
validTopicIdPartitions.add(topicIdPartition);
}
});
}
public ErroneousAndValidPartitionData() {
this.erroneous = new HashMap<>();
this.validTopicIdPartitions = new HashMap<>();
this.erroneous = Map.of();
this.validTopicIdPartitions = List.of();
}
public Map<TopicIdPartition, ShareFetchResponseData.PartitionData> erroneous() {
return erroneous;
}
public Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> validTopicIdPartitions() {
public List<TopicIdPartition> validTopicIdPartitions() {
return validTopicIdPartitions;
}
}

View File

@ -23,8 +23,6 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.requests.ShareFetchRequest.SharePartitionData;
import org.apache.kafka.common.requests.ShareFetchResponse;
import org.apache.kafka.common.requests.ShareRequestMetadata;
import org.apache.kafka.server.share.CachedSharePartition;
@ -34,6 +32,7 @@ import org.apache.kafka.server.share.session.ShareSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@ -52,7 +51,7 @@ public class ShareSessionContext extends ShareFetchContext {
private final ShareRequestMetadata reqMetadata;
private final boolean isSubsequent;
private Map<TopicIdPartition, SharePartitionData> shareFetchData;
private List<TopicIdPartition> shareFetchData;
private ShareSession session;
/**
@ -62,7 +61,7 @@ public class ShareSessionContext extends ShareFetchContext {
* @param shareFetchData The share partition data from the share fetch request.
*/
public ShareSessionContext(ShareRequestMetadata reqMetadata,
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData) {
List<TopicIdPartition> shareFetchData) {
this.reqMetadata = reqMetadata;
this.shareFetchData = shareFetchData;
this.isSubsequent = false;
@ -81,7 +80,7 @@ public class ShareSessionContext extends ShareFetchContext {
}
// Visible for testing
public Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData() {
public List<TopicIdPartition> shareFetchData() {
return shareFetchData;
}
@ -229,17 +228,16 @@ public class ShareSessionContext extends ShareFetchContext {
return new ErroneousAndValidPartitionData(shareFetchData);
}
Map<TopicIdPartition, PartitionData> erroneous = new HashMap<>();
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> valid = new HashMap<>();
List<TopicIdPartition> valid = new ArrayList<>();
// Take the session lock and iterate over all the cached partitions.
synchronized (session) {
session.partitionMap().forEach(cachedSharePartition -> {
TopicIdPartition topicIdPartition = new TopicIdPartition(cachedSharePartition.topicId(), new
TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition()));
ShareFetchRequest.SharePartitionData reqData = cachedSharePartition.reqData();
if (topicIdPartition.topic() == null) {
erroneous.put(topicIdPartition, ShareFetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID));
} else {
valid.put(topicIdPartition, reqData);
valid.add(topicIdPartition);
}
});
return new ErroneousAndValidPartitionData(erroneous, valid);

View File

@ -18,9 +18,10 @@ package org.apache.kafka.server.share.fetch;
import org.apache.kafka.common.TopicIdPartition;
import java.util.LinkedHashMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
/**
* The PartitionRotateStrategy is used to rotate the partitions based on the respective strategy.
@ -48,7 +49,7 @@ public interface PartitionRotateStrategy {
*
* @return the rotated topicIdPartitions
*/
LinkedHashMap<TopicIdPartition, Integer> rotate(LinkedHashMap<TopicIdPartition, Integer> topicIdPartitions, PartitionRotateMetadata metadata);
List<TopicIdPartition> rotate(List<TopicIdPartition> topicIdPartitions, PartitionRotateMetadata metadata);
static PartitionRotateStrategy type(StrategyType type) {
return switch (type) {
@ -64,8 +65,8 @@ public interface PartitionRotateStrategy {
*
* @return the rotated topicIdPartitions
*/
static LinkedHashMap<TopicIdPartition, Integer> rotateRoundRobin(
LinkedHashMap<TopicIdPartition, Integer> topicIdPartitions,
static List<TopicIdPartition> rotateRoundRobin(
List<TopicIdPartition> topicIdPartitions,
PartitionRotateMetadata metadata
) {
if (topicIdPartitions.isEmpty() || topicIdPartitions.size() == 1 || metadata.sessionEpoch < 1) {
@ -80,20 +81,11 @@ public interface PartitionRotateStrategy {
return topicIdPartitions;
}
// TODO: Once the partition max bytes is removed then the partition will be a linked list and rotation
// will be a simple operation. Else consider using ImplicitLinkedHashCollection.
LinkedHashMap<TopicIdPartition, Integer> suffixPartitions = new LinkedHashMap<>(rotateAt);
LinkedHashMap<TopicIdPartition, Integer> rotatedPartitions = new LinkedHashMap<>(topicIdPartitions.size());
int i = 0;
for (Map.Entry<TopicIdPartition, Integer> entry : topicIdPartitions.entrySet()) {
if (i < rotateAt) {
suffixPartitions.put(entry.getKey(), entry.getValue());
} else {
rotatedPartitions.put(entry.getKey(), entry.getValue());
}
i++;
}
rotatedPartitions.putAll(suffixPartitions);
// Avoid modifying the original list, create copy.
List<TopicIdPartition> rotatedPartitions = new ArrayList<>(topicIdPartitions);
// Elements from the list should move left by the distance provided i.e. if the original list is [1,2,3],
// and rotation is by 1, then output should be [2,3,1] and not [3,1,2]. Hence, negate the distance here.
Collections.rotate(rotatedPartitions, -1 * rotateAt);
return rotatedPartitions;
}

View File

@ -26,8 +26,8 @@ import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@ -55,9 +55,9 @@ public class ShareFetch {
*/
private final String memberId;
/**
* The maximum number of bytes that can be fetched for each partition.
* The topic partitions to be fetched.
*/
private final LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes;
private final List<TopicIdPartition> topicIdPartitions;
/**
* The batch size of the fetch request.
*/
@ -81,7 +81,7 @@ public class ShareFetch {
String groupId,
String memberId,
CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes,
List<TopicIdPartition> topicIdPartitions,
int batchSize,
int maxFetchRecords,
BrokerTopicStats brokerTopicStats
@ -90,7 +90,7 @@ public class ShareFetch {
this.groupId = groupId;
this.memberId = memberId;
this.future = future;
this.partitionMaxBytes = partitionMaxBytes;
this.topicIdPartitions = topicIdPartitions;
this.batchSize = batchSize;
this.maxFetchRecords = maxFetchRecords;
this.brokerTopicStats = brokerTopicStats;
@ -104,8 +104,8 @@ public class ShareFetch {
return memberId;
}
public LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes() {
return partitionMaxBytes;
public List<TopicIdPartition> topicIdPartitions() {
return topicIdPartitions;
}
public FetchParams fetchParams() {
@ -151,7 +151,7 @@ public class ShareFetch {
* @return true if all the partitions in the request have errored, false otherwise.
*/
public synchronized boolean errorInAllPartitions() {
return erroneous != null && erroneous.size() == partitionMaxBytes().size();
return erroneous != null && erroneous.size() == topicIdPartitions().size();
}
/**

View File

@ -18,7 +18,6 @@
package org.apache.kafka.server.share.session;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.server.share.CachedSharePartition;
@ -110,25 +109,21 @@ public class ShareSession {
return new LastUsedKey(key, lastUsedMs);
}
// Visible for testing
public synchronized long creationMs() {
return creationMs;
}
// Update the cached partition data based on the request.
public synchronized Map<ModifiedTopicIdPartitionType, List<TopicIdPartition>> update(Map<TopicIdPartition,
ShareFetchRequest.SharePartitionData> shareFetchData, List<TopicIdPartition> toForget) {
public synchronized Map<ModifiedTopicIdPartitionType, List<TopicIdPartition>> update(
List<TopicIdPartition> shareFetchData,
List<TopicIdPartition> toForget
) {
List<TopicIdPartition> added = new ArrayList<>();
List<TopicIdPartition> updated = new ArrayList<>();
List<TopicIdPartition> removed = new ArrayList<>();
shareFetchData.forEach((topicIdPartition, sharePartitionData) -> {
CachedSharePartition cachedSharePartitionKey = new CachedSharePartition(topicIdPartition, sharePartitionData, true);
shareFetchData.forEach(topicIdPartition -> {
CachedSharePartition cachedSharePartitionKey = new CachedSharePartition(topicIdPartition, true);
CachedSharePartition cachedPart = partitionMap.find(cachedSharePartitionKey);
if (cachedPart == null) {
partitionMap.mustAdd(cachedSharePartitionKey);
added.add(topicIdPartition);
} else {
cachedPart.updateRequestParams(sharePartitionData);
updated.add(topicIdPartition);
}
});

View File

@ -24,9 +24,10 @@ import org.apache.kafka.server.share.fetch.PartitionRotateStrategy.StrategyType;
import org.junit.jupiter.api.Test;
import java.util.LinkedHashMap;
import java.util.ArrayList;
import java.util.List;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.validateRotatedMapEquals;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.validateRotatedListEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -35,64 +36,64 @@ public class PartitionRotateStrategyTest {
@Test
public void testRoundRobinStrategy() {
PartitionRotateStrategy strategy = PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN);
LinkedHashMap<TopicIdPartition, Integer> partitions = createPartitions(3);
List<TopicIdPartition> partitions = createPartitions(3);
LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate(partitions, new PartitionRotateMetadata(1));
List<TopicIdPartition> result = strategy.rotate(partitions, new PartitionRotateMetadata(1));
assertEquals(3, result.size());
validateRotatedMapEquals(partitions, result, 1);
validateRotatedListEquals(partitions, result, 1);
// Session epoch is greater than the number of partitions.
result = strategy.rotate(partitions, new PartitionRotateMetadata(5));
assertEquals(3, result.size());
validateRotatedMapEquals(partitions, result, 2);
validateRotatedListEquals(partitions, result, 2);
// Session epoch is at Integer.MAX_VALUE.
result = strategy.rotate(partitions, new PartitionRotateMetadata(Integer.MAX_VALUE));
assertEquals(3, result.size());
validateRotatedMapEquals(partitions, result, 1);
validateRotatedListEquals(partitions, result, 1);
// No rotation at same size as epoch.
result = strategy.rotate(partitions, new PartitionRotateMetadata(3));
assertEquals(3, result.size());
validateRotatedMapEquals(partitions, result, 0);
validateRotatedListEquals(partitions, result, 0);
}
@Test
public void testRoundRobinStrategyWithSpecialSessionEpochs() {
PartitionRotateStrategy strategy = PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN);
LinkedHashMap<TopicIdPartition, Integer> partitions = createPartitions(3);
LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate(
List<TopicIdPartition> partitions = createPartitions(3);
List<TopicIdPartition> result = strategy.rotate(
partitions,
new PartitionRotateMetadata(ShareRequestMetadata.INITIAL_EPOCH));
assertEquals(3, result.size());
validateRotatedMapEquals(partitions, result, 0);
validateRotatedListEquals(partitions, result, 0);
result = strategy.rotate(
partitions,
new PartitionRotateMetadata(ShareRequestMetadata.FINAL_EPOCH));
assertEquals(3, result.size());
validateRotatedMapEquals(partitions, result, 0);
validateRotatedListEquals(partitions, result, 0);
}
@Test
public void testRoundRobinStrategyWithEmptyPartitions() {
PartitionRotateStrategy strategy = PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN);
// Empty partitions.
LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate(new LinkedHashMap<>(), new PartitionRotateMetadata(5));
List<TopicIdPartition> result = strategy.rotate(new ArrayList<>(), new PartitionRotateMetadata(5));
// The result should be empty.
assertTrue(result.isEmpty());
}
/**
* Create an ordered map of TopicIdPartition to partition max bytes.
* Create a list of topic partitions.
* @param size The number of topic-partitions to create.
* @return The ordered map of TopicIdPartition to partition max bytes.
* @return The list of topic partitions.
*/
private LinkedHashMap<TopicIdPartition, Integer> createPartitions(int size) {
LinkedHashMap<TopicIdPartition, Integer> partitions = new LinkedHashMap<>();
private List<TopicIdPartition> createPartitions(int size) {
List<TopicIdPartition> partitions = new ArrayList<>();
for (int i = 0; i < size; i++) {
partitions.put(new TopicIdPartition(Uuid.randomUuid(), i, "foo" + i), 1 /* partition max bytes*/);
partitions.add(new TopicIdPartition(Uuid.randomUuid(), i, "foo" + i));
}
return partitions;
}

View File

@ -32,7 +32,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.orderedMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -60,7 +59,7 @@ public class ShareFetchTest {
public void testErrorInAllPartitions() {
TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), GROUP_ID, MEMBER_ID, new CompletableFuture<>(),
orderedMap(10, topicIdPartition), BATCH_SIZE, 100, brokerTopicStats);
List.of(topicIdPartition), BATCH_SIZE, 100, brokerTopicStats);
assertFalse(shareFetch.errorInAllPartitions());
shareFetch.addErroneous(topicIdPartition, new RuntimeException());
@ -72,7 +71,7 @@ public class ShareFetchTest {
TopicIdPartition topicIdPartition0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), GROUP_ID, MEMBER_ID, new CompletableFuture<>(),
orderedMap(10, topicIdPartition0, topicIdPartition1), BATCH_SIZE, 100, brokerTopicStats);
List.of(topicIdPartition0, topicIdPartition1), BATCH_SIZE, 100, brokerTopicStats);
assertFalse(shareFetch.errorInAllPartitions());
shareFetch.addErroneous(topicIdPartition0, new RuntimeException());
@ -87,7 +86,7 @@ public class ShareFetchTest {
TopicIdPartition topicIdPartition0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), GROUP_ID, MEMBER_ID, new CompletableFuture<>(),
orderedMap(10, topicIdPartition0, topicIdPartition1), BATCH_SIZE, 100, brokerTopicStats);
List.of(topicIdPartition0, topicIdPartition1), BATCH_SIZE, 100, brokerTopicStats);
Set<TopicIdPartition> result = shareFetch.filterErroneousTopicPartitions(Set.of(topicIdPartition0, topicIdPartition1));
// No erroneous partitions, hence all partitions should be returned.
assertEquals(2, result.size());
@ -113,7 +112,7 @@ public class ShareFetchTest {
CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new CompletableFuture<>();
ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), GROUP_ID, MEMBER_ID, future,
orderedMap(10, topicIdPartition0, topicIdPartition1), BATCH_SIZE, 100, brokerTopicStats);
List.of(topicIdPartition0, topicIdPartition1), BATCH_SIZE, 100, brokerTopicStats);
// Add both erroneous partition and complete request.
shareFetch.addErroneous(topicIdPartition0, new RuntimeException());
@ -134,7 +133,7 @@ public class ShareFetchTest {
CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new CompletableFuture<>();
ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), GROUP_ID, MEMBER_ID, future,
orderedMap(10, topicIdPartition0, topicIdPartition1), BATCH_SIZE, 100, brokerTopicStats);
List.of(topicIdPartition0, topicIdPartition1), BATCH_SIZE, 100, brokerTopicStats);
// Add an erroneous partition and complete request.
shareFetch.addErroneous(topicIdPartition0, new RuntimeException());
@ -154,7 +153,7 @@ public class ShareFetchTest {
CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new CompletableFuture<>();
ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), GROUP_ID, MEMBER_ID, future,
orderedMap(10, topicIdPartition0, topicIdPartition1), BATCH_SIZE, 100, brokerTopicStats);
List.of(topicIdPartition0, topicIdPartition1), BATCH_SIZE, 100, brokerTopicStats);
shareFetch.maybeCompleteWithException(List.of(topicIdPartition0, topicIdPartition1), new RuntimeException());
assertEquals(2, future.join().size());
@ -173,7 +172,7 @@ public class ShareFetchTest {
CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new CompletableFuture<>();
ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), GROUP_ID, MEMBER_ID, future,
orderedMap(10, topicIdPartition0, topicIdPartition1, topicIdPartition2), BATCH_SIZE, 100, brokerTopicStats);
List.of(topicIdPartition0, topicIdPartition1, topicIdPartition2), BATCH_SIZE, 100, brokerTopicStats);
shareFetch.maybeCompleteWithException(List.of(topicIdPartition0, topicIdPartition2), new RuntimeException());
assertEquals(2, future.join().size());
@ -191,7 +190,7 @@ public class ShareFetchTest {
CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new CompletableFuture<>();
ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), GROUP_ID, MEMBER_ID, future,
orderedMap(10, topicIdPartition0, topicIdPartition1), BATCH_SIZE, 100, brokerTopicStats);
List.of(topicIdPartition0, topicIdPartition1), BATCH_SIZE, 100, brokerTopicStats);
shareFetch.addErroneous(topicIdPartition0, new RuntimeException());
shareFetch.maybeCompleteWithException(List.of(topicIdPartition1), new RuntimeException());

View File

@ -30,15 +30,12 @@ import com.yammer.metrics.core.Gauge;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import static org.apache.kafka.test.TestUtils.tempFile;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Helper functions for writing share fetch unit tests.
@ -46,49 +43,29 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class ShareFetchTestUtils {
/**
* Create an ordered map of TopicIdPartition to partition max bytes.
* Validate that the rotated list is equal to the original list rotated by the given position.
*
* @param partitionMaxBytes The maximum number of bytes that can be fetched for each partition.
* @param topicIdPartitions The topic partitions to create the map for.
* @return The ordered map of TopicIdPartition to partition max bytes.
* @param original The original list.
* @param result The rotated list.
* @param rotationAt The position to rotate the elements at.
*/
public static LinkedHashMap<TopicIdPartition, Integer> orderedMap(int partitionMaxBytes, TopicIdPartition... topicIdPartitions) {
LinkedHashMap<TopicIdPartition, Integer> map = new LinkedHashMap<>();
for (TopicIdPartition tp : topicIdPartitions) {
map.put(tp, partitionMaxBytes);
}
return map;
}
/**
* Validate that the rotated map is equal to the original map with the keys rotated by the given position.
*
* @param original The original map.
* @param result The rotated map.
* @param rotationAt The position to rotate the keys at.
*/
public static void validateRotatedMapEquals(
LinkedHashMap<TopicIdPartition, Integer> original,
LinkedHashMap<TopicIdPartition, Integer> result,
public static void validateRotatedListEquals(
List<TopicIdPartition> original,
List<TopicIdPartition> result,
int rotationAt
) {
Set<TopicIdPartition> originalKeys = original.keySet();
Set<TopicIdPartition> resultKeys = result.keySet();
TopicIdPartition[] originalKeysArray = new TopicIdPartition[originalKeys.size()];
TopicIdPartition[] originalKeysArray = new TopicIdPartition[original.size()];
int i = 0;
for (TopicIdPartition key : originalKeys) {
for (TopicIdPartition key : original) {
if (i < rotationAt) {
originalKeysArray[originalKeys.size() - rotationAt + i] = key;
originalKeysArray[original.size() - rotationAt + i] = key;
} else {
originalKeysArray[i - rotationAt] = key;
}
i++;
}
assertArrayEquals(originalKeysArray, resultKeys.toArray());
for (TopicIdPartition key : originalKeys) {
assertEquals(original.get(key), result.get(key));
}
assertArrayEquals(originalKeysArray, result.toArray());
}
/**