mirror of https://github.com/apache/kafka.git
KAFKA-18522: Slice records for share fetch (#18804)
The PR handles slicing of fetched records based on acquire response for share fetch. There could be additional bytes fetched from log but acquired offsets can be a subset, typically with `max fetch records` configuration. Rather sending additional bytes of fetched data to client we should slice the file and wire only needed batches. Note: If the acquired offsets are within a batch then we need to send the entire batch within the file record. Hence rather checking for individual batches, PR finds the first and last acquired offset, and trims the file for all batches between (inclusive) these two offsets. Reviewers: Christo Lolov <lolovc@amazon.com>, Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>
This commit is contained in:
parent
38c984307c
commit
48a506b7b8
|
@ -25,8 +25,11 @@ import org.apache.kafka.common.TopicPartition;
|
|||
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
|
||||
import org.apache.kafka.common.errors.OffsetNotAvailableException;
|
||||
import org.apache.kafka.common.message.ShareFetchResponseData;
|
||||
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
|
||||
import org.apache.kafka.common.record.FileRecords;
|
||||
import org.apache.kafka.common.record.Records;
|
||||
import org.apache.kafka.common.requests.ListOffsetsRequest;
|
||||
import org.apache.kafka.server.share.SharePartitionKey;
|
||||
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
|
||||
|
@ -39,6 +42,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -122,13 +126,7 @@ public class ShareFetchUtils {
|
|||
.setAcquiredRecords(Collections.emptyList());
|
||||
} else {
|
||||
partitionData
|
||||
// We set the records to the fetchPartitionData records. We do not alter the records
|
||||
// fetched from the replica manager as they follow zero copy buffer. The acquired records
|
||||
// might be a subset of the records fetched from the replica manager, depending
|
||||
// on the max fetch records or available records in the share partition. The client
|
||||
// sends the max bytes in request which should limit the bytes sent to the client
|
||||
// in the response.
|
||||
.setRecords(fetchPartitionData.records)
|
||||
.setRecords(maybeSliceFetchRecords(fetchPartitionData.records, shareAcquiredRecords))
|
||||
.setAcquiredRecords(shareAcquiredRecords.acquiredRecords());
|
||||
acquiredRecordsCount += shareAcquiredRecords.count();
|
||||
}
|
||||
|
@ -196,4 +194,68 @@ public class ShareFetchUtils {
|
|||
}
|
||||
return partition;
|
||||
}
|
||||
|
||||
/**
|
||||
* Slice the fetch records based on the acquired records. The slicing is done based on the first
|
||||
* and last offset of the acquired records from the list. The slicing doesn't consider individual
|
||||
* acquired batches rather the boundaries of the acquired list. The method expects the acquired
|
||||
* records list to be within the fetch records bounds.
|
||||
*
|
||||
* @param records The records to be sliced.
|
||||
* @param shareAcquiredRecords The share acquired records containing the non-empty acquired records.
|
||||
* @return The sliced records, if the records are of type FileRecords and the acquired records are a subset
|
||||
* of the fetched records. Otherwise, the original records are returned.
|
||||
*/
|
||||
static Records maybeSliceFetchRecords(Records records, ShareAcquiredRecords shareAcquiredRecords) {
|
||||
if (!(records instanceof FileRecords fileRecords)) {
|
||||
return records;
|
||||
}
|
||||
// The acquired records should be non-empty, do not check as the method is called only when the
|
||||
// acquired records are non-empty.
|
||||
List<AcquiredRecords> acquiredRecords = shareAcquiredRecords.acquiredRecords();
|
||||
try {
|
||||
final Iterator<FileChannelRecordBatch> iterator = fileRecords.batchIterator();
|
||||
// Track the first overlapping batch with the first acquired offset.
|
||||
FileChannelRecordBatch firstOverlapBatch = iterator.next();
|
||||
// If there exists single fetch batch, then return the original records.
|
||||
if (!iterator.hasNext()) {
|
||||
return records;
|
||||
}
|
||||
// Find the first and last acquired offset to slice the records.
|
||||
final long firstAcquiredOffset = acquiredRecords.get(0).firstOffset();
|
||||
final long lastAcquiredOffset = acquiredRecords.get(acquiredRecords.size() - 1).lastOffset();
|
||||
int startPosition = 0;
|
||||
int size = 0;
|
||||
// Start iterating from the second batch.
|
||||
while (iterator.hasNext()) {
|
||||
FileChannelRecordBatch batch = iterator.next();
|
||||
// Iterate until finds the first overlap batch with the first acquired offset. All the
|
||||
// batches before this first overlap batch should be sliced hence increment the start
|
||||
// position.
|
||||
if (batch.baseOffset() <= firstAcquiredOffset) {
|
||||
startPosition += firstOverlapBatch.sizeInBytes();
|
||||
firstOverlapBatch = batch;
|
||||
continue;
|
||||
}
|
||||
// Break if traversed all the batches till the last acquired offset.
|
||||
if (batch.baseOffset() > lastAcquiredOffset) {
|
||||
break;
|
||||
}
|
||||
size += batch.sizeInBytes();
|
||||
}
|
||||
// Include the first overlap batch as it's the last batch traversed which overlapped the first
|
||||
// acquired offset.
|
||||
size += firstOverlapBatch.sizeInBytes();
|
||||
// Check if we do not need slicing i.e. neither start position nor size changed.
|
||||
if (startPosition == 0 && size == fileRecords.sizeInBytes()) {
|
||||
return records;
|
||||
}
|
||||
return fileRecords.slice(startPosition, size);
|
||||
} catch (Exception e) {
|
||||
log.error("Error while checking batches for acquired records: {}, skipping slicing.", acquiredRecords, e);
|
||||
// If there is an exception while slicing, return the original records so that the fetch
|
||||
// can continue with the original records.
|
||||
return records;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
|
|||
import org.apache.kafka.server.share.SharePartitionKey;
|
||||
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
|
||||
import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
|
||||
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
|
||||
import org.apache.kafka.server.share.fetch.ShareFetch;
|
||||
import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
|
||||
import org.apache.kafka.server.storage.log.FetchIsolation;
|
||||
|
@ -74,6 +73,7 @@ import static kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_P
|
|||
import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES;
|
||||
import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult;
|
||||
import static kafka.server.share.SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch;
|
||||
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords;
|
||||
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.orderedMap;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
|
@ -186,7 +186,7 @@ public class DelayedShareFetchTest {
|
|||
when(sp0.canAcquireRecords()).thenReturn(true);
|
||||
when(sp1.canAcquireRecords()).thenReturn(false);
|
||||
when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any())).thenReturn(
|
||||
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
|
||||
// We are testing the case when the share partition is getting fetched for the first time, so for the first time
|
||||
// the fetchOffsetMetadata will return empty. Post the readFromLog call, the fetchOffsetMetadata will be
|
||||
|
@ -259,7 +259,7 @@ public class DelayedShareFetchTest {
|
|||
when(sp0.canAcquireRecords()).thenReturn(true);
|
||||
when(sp1.canAcquireRecords()).thenReturn(false);
|
||||
when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any())).thenReturn(
|
||||
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
|
||||
// We are testing the case when the share partition has been fetched before, hence we are mocking positionDiff
|
||||
// functionality to give the file position difference as 1 byte, so it doesn't satisfy the minBytes(2).
|
||||
|
@ -312,7 +312,7 @@ public class DelayedShareFetchTest {
|
|||
when(sp0.canAcquireRecords()).thenReturn(true);
|
||||
when(sp1.canAcquireRecords()).thenReturn(false);
|
||||
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
|
||||
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
|
||||
|
||||
when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
|
||||
|
@ -427,7 +427,7 @@ public class DelayedShareFetchTest {
|
|||
when(sp0.canAcquireRecords()).thenReturn(true);
|
||||
when(sp1.canAcquireRecords()).thenReturn(false);
|
||||
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
|
||||
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
|
||||
|
||||
PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Collections.singleton(tp0));
|
||||
|
@ -591,7 +591,7 @@ public class DelayedShareFetchTest {
|
|||
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
|
||||
when(sp1.canAcquireRecords()).thenReturn(true);
|
||||
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
|
||||
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
|
||||
// when forceComplete is called for delayedShareFetch2, since tp1 is common in between delayed share fetch
|
||||
// requests, it should add a "check and complete" action for request key tp1 on the purgatory.
|
||||
|
@ -689,7 +689,7 @@ public class DelayedShareFetchTest {
|
|||
|
||||
when(sp0.canAcquireRecords()).thenReturn(true);
|
||||
when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any())).thenReturn(
|
||||
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
|
||||
|
||||
// Mocking partition object to throw an exception during min bytes calculation while calling fetchOffsetSnapshot
|
||||
|
@ -897,15 +897,15 @@ public class DelayedShareFetchTest {
|
|||
BROKER_TOPIC_STATS);
|
||||
|
||||
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
|
||||
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
|
||||
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
|
||||
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
when(sp3.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
|
||||
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
when(sp4.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
|
||||
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
|
||||
// All 5 partitions are acquirable.
|
||||
doAnswer(invocation -> buildLogReadResult(sharePartitions.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
|
||||
|
@ -995,9 +995,9 @@ public class DelayedShareFetchTest {
|
|||
BROKER_TOPIC_STATS);
|
||||
|
||||
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
|
||||
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
|
||||
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
|
||||
// Only 2 out of 5 partitions are acquirable.
|
||||
Set<TopicIdPartition> acquirableTopicPartitions = new LinkedHashSet<>();
|
||||
|
|
|
@ -24,10 +24,14 @@ import org.apache.kafka.common.Uuid;
|
|||
import org.apache.kafka.common.compress.Compression;
|
||||
import org.apache.kafka.common.errors.FencedLeaderEpochException;
|
||||
import org.apache.kafka.common.message.ShareFetchResponseData;
|
||||
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
|
||||
import org.apache.kafka.common.protocol.ApiKeys;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.record.FileRecords;
|
||||
import org.apache.kafka.common.record.MemoryRecords;
|
||||
import org.apache.kafka.common.record.MemoryRecordsBuilder;
|
||||
import org.apache.kafka.common.record.RecordBatch;
|
||||
import org.apache.kafka.common.record.Records;
|
||||
import org.apache.kafka.common.record.SimpleRecord;
|
||||
import org.apache.kafka.common.requests.FetchRequest;
|
||||
import org.apache.kafka.server.share.SharePartitionKey;
|
||||
|
@ -39,10 +43,12 @@ import org.apache.kafka.server.storage.log.FetchParams;
|
|||
import org.apache.kafka.server.storage.log.FetchPartitionData;
|
||||
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
|
||||
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
|
@ -54,6 +60,9 @@ import java.util.concurrent.CompletableFuture;
|
|||
import java.util.function.BiConsumer;
|
||||
|
||||
import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES;
|
||||
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createFileRecords;
|
||||
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords;
|
||||
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder;
|
||||
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.orderedMap;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
|
@ -95,10 +104,10 @@ public class ShareFetchUtilsTest {
|
|||
when(sp1.nextFetchOffset()).thenReturn((long) 3);
|
||||
|
||||
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
|
||||
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords()
|
||||
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords()
|
||||
.setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
|
||||
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords()
|
||||
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords()
|
||||
.setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1)));
|
||||
|
||||
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>();
|
||||
|
@ -222,10 +231,10 @@ public class ShareFetchUtilsTest {
|
|||
|
||||
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
|
||||
ShareAcquiredRecords.empty(),
|
||||
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords()
|
||||
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords()
|
||||
.setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
|
||||
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
|
||||
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords()
|
||||
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords()
|
||||
.setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1)),
|
||||
ShareAcquiredRecords.empty());
|
||||
|
||||
|
@ -352,7 +361,7 @@ public class ShareFetchUtilsTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testProcessFetchResponseWithMaxFetchRecords() {
|
||||
public void testProcessFetchResponseWithMaxFetchRecords() throws IOException {
|
||||
String groupId = "grp";
|
||||
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
|
||||
TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
|
||||
|
@ -374,24 +383,29 @@ public class ShareFetchUtilsTest {
|
|||
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, memberId.toString(),
|
||||
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, 10, BROKER_TOPIC_STATS);
|
||||
|
||||
MemoryRecords records1 = MemoryRecords.withRecords(Compression.NONE,
|
||||
new SimpleRecord("0".getBytes(), "v".getBytes()),
|
||||
new SimpleRecord("1".getBytes(), "v".getBytes()),
|
||||
new SimpleRecord("2".getBytes(), "v".getBytes()),
|
||||
new SimpleRecord(null, "value".getBytes()));
|
||||
LinkedHashMap<Long, Integer> recordsPerOffset = new LinkedHashMap<>();
|
||||
recordsPerOffset.put(0L, 1);
|
||||
recordsPerOffset.put(1L, 1);
|
||||
recordsPerOffset.put(2L, 1);
|
||||
recordsPerOffset.put(3L, 1);
|
||||
Records records1 = createFileRecords(recordsPerOffset);
|
||||
|
||||
recordsPerOffset.clear();
|
||||
recordsPerOffset.put(100L, 4);
|
||||
Records records2 = createFileRecords(recordsPerOffset);
|
||||
|
||||
FetchPartitionData fetchPartitionData1 = new FetchPartitionData(Errors.NONE, 0L, 0L,
|
||||
records1, Optional.empty(), OptionalLong.empty(), Optional.empty(),
|
||||
OptionalInt.empty(), false);
|
||||
FetchPartitionData fetchPartitionData2 = new FetchPartitionData(Errors.NONE, 0L, 0L,
|
||||
records1, Optional.empty(), OptionalLong.empty(), Optional.empty(),
|
||||
records2, Optional.empty(), OptionalLong.empty(), Optional.empty(),
|
||||
OptionalInt.empty(), false);
|
||||
|
||||
when(sp0.acquire(memberId.toString(), BATCH_SIZE, 10, 0, fetchPartitionData1)).thenReturn(
|
||||
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords()
|
||||
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords()
|
||||
.setFirstOffset(0).setLastOffset(1).setDeliveryCount((short) 1)));
|
||||
when(sp1.acquire(memberId.toString(), BATCH_SIZE, 8, 0, fetchPartitionData2)).thenReturn(
|
||||
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords()
|
||||
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords()
|
||||
.setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1)));
|
||||
|
||||
// Send the topic partitions in order so can validate if correct mock is called, accounting
|
||||
|
@ -401,23 +415,27 @@ public class ShareFetchUtilsTest {
|
|||
new ShareFetchPartitionData(tp1, 0, fetchPartitionData2)
|
||||
);
|
||||
|
||||
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData1 =
|
||||
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData =
|
||||
ShareFetchUtils.processFetchResponse(shareFetch, responseData, sharePartitions,
|
||||
mock(ReplicaManager.class), EXCEPTION_HANDLER);
|
||||
|
||||
assertEquals(2, resultData1.size());
|
||||
assertTrue(resultData1.containsKey(tp0));
|
||||
assertTrue(resultData1.containsKey(tp1));
|
||||
assertEquals(0, resultData1.get(tp0).partitionIndex());
|
||||
assertEquals(1, resultData1.get(tp1).partitionIndex());
|
||||
assertEquals(Errors.NONE.code(), resultData1.get(tp0).errorCode());
|
||||
assertEquals(Errors.NONE.code(), resultData1.get(tp1).errorCode());
|
||||
assertEquals(1, resultData1.get(tp0).acquiredRecords().size());
|
||||
assertEquals(0, resultData1.get(tp0).acquiredRecords().get(0).firstOffset());
|
||||
assertEquals(1, resultData1.get(tp0).acquiredRecords().get(0).lastOffset());
|
||||
assertEquals(1, resultData1.get(tp1).acquiredRecords().size());
|
||||
assertEquals(100, resultData1.get(tp1).acquiredRecords().get(0).firstOffset());
|
||||
assertEquals(103, resultData1.get(tp1).acquiredRecords().get(0).lastOffset());
|
||||
assertEquals(2, resultData.size());
|
||||
assertTrue(resultData.containsKey(tp0));
|
||||
assertTrue(resultData.containsKey(tp1));
|
||||
assertEquals(0, resultData.get(tp0).partitionIndex());
|
||||
assertEquals(1, resultData.get(tp1).partitionIndex());
|
||||
assertEquals(Errors.NONE.code(), resultData.get(tp0).errorCode());
|
||||
assertEquals(Errors.NONE.code(), resultData.get(tp1).errorCode());
|
||||
assertEquals(1, resultData.get(tp0).acquiredRecords().size());
|
||||
assertEquals(0, resultData.get(tp0).acquiredRecords().get(0).firstOffset());
|
||||
assertEquals(1, resultData.get(tp0).acquiredRecords().get(0).lastOffset());
|
||||
assertEquals(1, resultData.get(tp1).acquiredRecords().size());
|
||||
assertEquals(100, resultData.get(tp1).acquiredRecords().get(0).firstOffset());
|
||||
assertEquals(103, resultData.get(tp1).acquiredRecords().get(0).lastOffset());
|
||||
|
||||
// Validate the slicing for fetched data happened for tp0 records, not for tp1 records.
|
||||
assertTrue(records1.sizeInBytes() > resultData.get(tp0).records().sizeInBytes());
|
||||
assertEquals(records2.sizeInBytes(), resultData.get(tp1).records().sizeInBytes());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -455,4 +473,181 @@ public class ShareFetchUtilsTest {
|
|||
Mockito.verify(exceptionHandler, times(1)).accept(new SharePartitionKey("grp", tp0), exception);
|
||||
Mockito.verify(sp0, times(0)).updateCacheAndOffsets(any(Long.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaybeSliceFetchRecordsSingleBatch() throws IOException {
|
||||
// Create 1 batch of records with 10 records.
|
||||
FileRecords records = createFileRecords(Map.of(5L, 10));
|
||||
|
||||
// Acquire all offsets, should return same records.
|
||||
List<AcquiredRecords> acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(5).setLastOffset(14).setDeliveryCount((short) 1));
|
||||
Records slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 10));
|
||||
assertEquals(records, slicedRecords);
|
||||
|
||||
// Acquire offsets out of first offset bound should return the records for the matching batch.
|
||||
acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(2).setLastOffset(14).setDeliveryCount((short) 1));
|
||||
slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 10));
|
||||
assertEquals(records, slicedRecords);
|
||||
|
||||
// Acquire offsets out of last offset bound should return the records for the matching batch.
|
||||
acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(5).setLastOffset(20).setDeliveryCount((short) 1));
|
||||
slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 5));
|
||||
assertEquals(records, slicedRecords);
|
||||
|
||||
// Acquire only subset of batch offsets, starting from the first offset.
|
||||
acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(5).setLastOffset(8).setDeliveryCount((short) 1));
|
||||
slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 1));
|
||||
assertEquals(records, slicedRecords);
|
||||
|
||||
// Acquire only subset of batch offsets, ending at the last offset.
|
||||
acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(8).setLastOffset(14).setDeliveryCount((short) 1));
|
||||
slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 1));
|
||||
assertEquals(records, slicedRecords);
|
||||
|
||||
// Acquire only subset of batch offsets, within the batch.
|
||||
acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(8).setLastOffset(10).setDeliveryCount((short) 1));
|
||||
slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 1));
|
||||
assertEquals(records, slicedRecords);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaybeSliceFetchRecordsMultipleBatches() throws IOException {
|
||||
// Create 3 batches of records with 3, 2 and 4 records respectively.
|
||||
LinkedHashMap<Long, Integer> recordsPerOffset = new LinkedHashMap<>();
|
||||
recordsPerOffset.put(0L, 3);
|
||||
recordsPerOffset.put(3L, 2);
|
||||
recordsPerOffset.put(7L, 4); // Gap of 2 offsets between batches.
|
||||
FileRecords records = createFileRecords(recordsPerOffset);
|
||||
|
||||
// Acquire all offsets, should return same records.
|
||||
List<AcquiredRecords> acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(0).setLastOffset(10).setDeliveryCount((short) 1));
|
||||
Records slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 11));
|
||||
assertEquals(records, slicedRecords);
|
||||
|
||||
// Acquire offsets from all batches, but only first record from last batch. Should return
|
||||
// all batches.
|
||||
acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(0).setLastOffset(7).setDeliveryCount((short) 1));
|
||||
slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 5));
|
||||
assertEquals(records, slicedRecords);
|
||||
|
||||
// Acquire only first batch offsets, should return only first batch.
|
||||
acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(0).setLastOffset(2).setDeliveryCount((short) 1));
|
||||
slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 5));
|
||||
assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
|
||||
List<RecordBatch> recordBatches = TestUtils.toList(slicedRecords.batches());
|
||||
assertEquals(1, recordBatches.size());
|
||||
assertEquals(0, recordBatches.get(0).baseOffset());
|
||||
assertEquals(2, recordBatches.get(0).lastOffset());
|
||||
|
||||
// Acquire only second batch offsets, should return only second batch.
|
||||
acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(3).setLastOffset(4).setDeliveryCount((short) 1));
|
||||
slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 5));
|
||||
assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
|
||||
recordBatches = TestUtils.toList(slicedRecords.batches());
|
||||
assertEquals(1, recordBatches.size());
|
||||
assertEquals(3, recordBatches.get(0).baseOffset());
|
||||
assertEquals(4, recordBatches.get(0).lastOffset());
|
||||
|
||||
// Acquire only last batch offsets, should return only last batch.
|
||||
acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(7).setLastOffset(10).setDeliveryCount((short) 1));
|
||||
slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 1));
|
||||
assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
|
||||
recordBatches = TestUtils.toList(slicedRecords.batches());
|
||||
assertEquals(1, recordBatches.size());
|
||||
assertEquals(7, recordBatches.get(0).baseOffset());
|
||||
assertEquals(10, recordBatches.get(0).lastOffset());
|
||||
|
||||
// Acquire only subset of first batch offsets, should return only first batch.
|
||||
acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short) 1));
|
||||
slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 1));
|
||||
assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
|
||||
recordBatches = TestUtils.toList(slicedRecords.batches());
|
||||
assertEquals(1, recordBatches.size());
|
||||
assertEquals(0, recordBatches.get(0).baseOffset());
|
||||
assertEquals(2, recordBatches.get(0).lastOffset());
|
||||
|
||||
// Acquire only subset of second batch offsets, should return only second batch.
|
||||
acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(4).setLastOffset(4).setDeliveryCount((short) 1));
|
||||
slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 1));
|
||||
assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
|
||||
recordBatches = TestUtils.toList(slicedRecords.batches());
|
||||
assertEquals(1, recordBatches.size());
|
||||
assertEquals(3, recordBatches.get(0).baseOffset());
|
||||
assertEquals(4, recordBatches.get(0).lastOffset());
|
||||
|
||||
// Acquire only subset of last batch offsets, should return only last batch.
|
||||
acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(8).setLastOffset(8).setDeliveryCount((short) 1));
|
||||
slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 1));
|
||||
assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
|
||||
recordBatches = TestUtils.toList(slicedRecords.batches());
|
||||
assertEquals(1, recordBatches.size());
|
||||
assertEquals(7, recordBatches.get(0).baseOffset());
|
||||
assertEquals(10, recordBatches.get(0).lastOffset());
|
||||
|
||||
// Acquire including gaps between batches, should return 2 batches.
|
||||
acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(4).setLastOffset(8).setDeliveryCount((short) 1));
|
||||
slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 1));
|
||||
assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
|
||||
recordBatches = TestUtils.toList(slicedRecords.batches());
|
||||
assertEquals(2, recordBatches.size());
|
||||
assertEquals(3, recordBatches.get(0).baseOffset());
|
||||
assertEquals(4, recordBatches.get(0).lastOffset());
|
||||
assertEquals(7, recordBatches.get(1).baseOffset());
|
||||
assertEquals(10, recordBatches.get(1).lastOffset());
|
||||
|
||||
// Acquire with multiple acquired records, should return matching batches.
|
||||
acquiredRecords = List.of(
|
||||
new AcquiredRecords().setFirstOffset(0).setLastOffset(2).setDeliveryCount((short) 1),
|
||||
new AcquiredRecords().setFirstOffset(3).setLastOffset(4).setDeliveryCount((short) 1));
|
||||
slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 1));
|
||||
assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
|
||||
recordBatches = TestUtils.toList(slicedRecords.batches());
|
||||
assertEquals(2, recordBatches.size());
|
||||
assertEquals(0, recordBatches.get(0).baseOffset());
|
||||
assertEquals(2, recordBatches.get(0).lastOffset());
|
||||
assertEquals(3, recordBatches.get(1).baseOffset());
|
||||
assertEquals(4, recordBatches.get(1).lastOffset());
|
||||
|
||||
// Acquire with multiple acquired records of individual offsets from single batch, should return
|
||||
// matching batch.
|
||||
acquiredRecords = List.of(
|
||||
new AcquiredRecords().setFirstOffset(8).setLastOffset(8).setDeliveryCount((short) 1),
|
||||
new AcquiredRecords().setFirstOffset(9).setLastOffset(9).setDeliveryCount((short) 1));
|
||||
slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 1));
|
||||
assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
|
||||
recordBatches = TestUtils.toList(slicedRecords.batches());
|
||||
assertEquals(1, recordBatches.size());
|
||||
assertEquals(7, recordBatches.get(0).baseOffset());
|
||||
assertEquals(10, recordBatches.get(0).lastOffset());
|
||||
|
||||
// Acquire with multiple acquired records of individual offsets from multiple batch, should return
|
||||
// multiple matching batches.
|
||||
acquiredRecords = List.of(
|
||||
new AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short) 1),
|
||||
new AcquiredRecords().setFirstOffset(9).setLastOffset(9).setDeliveryCount((short) 1));
|
||||
slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 1));
|
||||
assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaybeSliceFetchRecordsException() throws IOException {
|
||||
// Create 1 batch of records with 3 records.
|
||||
FileRecords records = createFileRecords(Map.of(0L, 3));
|
||||
// Send empty acquired records which should trigger an exception and same file records should
|
||||
// be returned. The method doesn't expect empty acquired records.
|
||||
Records slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(
|
||||
records, new ShareAcquiredRecords(List.of(), 3));
|
||||
assertEquals(records, slicedRecords);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaybeSliceFetchRecordsNonFileRecords() {
|
||||
// Send memory records which should be returned as is.
|
||||
try (MemoryRecordsBuilder records = memoryRecordsBuilder(2, 0)) {
|
||||
List<AcquiredRecords> acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(0).setLastOffset(1).setDeliveryCount((short) 1));
|
||||
Records slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(
|
||||
records.build(), new ShareAcquiredRecords(acquiredRecords, 2));
|
||||
assertEquals(records.build(), slicedRecords);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ import kafka.server.share.SharePartitionManager.SharePartitionListener;
|
|||
import org.apache.kafka.common.TopicIdPartition;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.common.compress.Compression;
|
||||
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
|
||||
import org.apache.kafka.common.errors.FencedStateEpochException;
|
||||
import org.apache.kafka.common.errors.GroupIdNotFoundException;
|
||||
|
@ -40,7 +39,6 @@ import org.apache.kafka.common.record.FileRecords;
|
|||
import org.apache.kafka.common.record.MemoryRecords;
|
||||
import org.apache.kafka.common.record.MemoryRecordsBuilder;
|
||||
import org.apache.kafka.common.record.Records;
|
||||
import org.apache.kafka.common.record.TimestampType;
|
||||
import org.apache.kafka.common.requests.ListOffsetsRequest;
|
||||
import org.apache.kafka.common.utils.MockTime;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
|
@ -85,6 +83,7 @@ import java.util.concurrent.Executors;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID;
|
||||
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder;
|
||||
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
@ -6233,19 +6232,6 @@ public class SharePartitionTest {
|
|||
return memoryRecordsBuilder(numOfRecords, startOffset).build();
|
||||
}
|
||||
|
||||
private MemoryRecordsBuilder memoryRecordsBuilder(int numOfRecords, long startOffset) {
|
||||
return memoryRecordsBuilder(ByteBuffer.allocate(1024), numOfRecords, startOffset);
|
||||
}
|
||||
|
||||
private MemoryRecordsBuilder memoryRecordsBuilder(ByteBuffer buffer, int numOfRecords, long startOffset) {
|
||||
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE,
|
||||
TimestampType.CREATE_TIME, startOffset, 2);
|
||||
for (int i = 0; i < numOfRecords; i++) {
|
||||
builder.appendWithOffset(startOffset + i, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
private List<AcquiredRecords> expectedAcquiredRecord(long baseOffset, long lastOffset, int deliveryCount) {
|
||||
return Collections.singletonList(new AcquiredRecords()
|
||||
.setFirstOffset(baseOffset)
|
||||
|
|
|
@ -554,6 +554,7 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
|
|||
<Class name="org.apache.kafka.common.Node"/>
|
||||
<Class name="org.apache.kafka.common.record.UnalignedMemoryRecords"/>
|
||||
<Class name="org.apache.kafka.clients.Metadata$LeaderAndEpoch"/>
|
||||
<Class name="org.apache.kafka.server.share.fetch.ShareAcquiredRecords"/>
|
||||
</Or>
|
||||
<Bug pattern="SING_SINGLETON_HAS_NONPRIVATE_CONSTRUCTOR"/>
|
||||
</Match>
|
||||
|
|
|
@ -19,8 +19,8 @@ package org.apache.kafka.server.share.fetch;
|
|||
|
||||
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* The ShareAcquiredRecords class is used to send the acquired records and associated metadata.
|
||||
|
@ -43,12 +43,12 @@ public class ShareAcquiredRecords {
|
|||
List<AcquiredRecords> acquiredRecords,
|
||||
int count
|
||||
) {
|
||||
this.acquiredRecords = acquiredRecords;
|
||||
this.acquiredRecords = Objects.requireNonNull(acquiredRecords);
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
private ShareAcquiredRecords() {
|
||||
this.acquiredRecords = Collections.emptyList();
|
||||
this.acquiredRecords = List.of();
|
||||
this.count = 0;
|
||||
}
|
||||
|
||||
|
@ -63,10 +63,4 @@ public class ShareAcquiredRecords {
|
|||
public static ShareAcquiredRecords empty() {
|
||||
return EMPTY_SHARE_ACQUIRED_RECORDS;
|
||||
}
|
||||
|
||||
public static ShareAcquiredRecords fromAcquiredRecords(AcquiredRecords acquiredRecords) {
|
||||
return new ShareAcquiredRecords(
|
||||
List.of(acquiredRecords), (int) (acquiredRecords.lastOffset() - acquiredRecords.firstOffset() + 1)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,10 +17,23 @@
|
|||
package org.apache.kafka.server.share.fetch;
|
||||
|
||||
import org.apache.kafka.common.TopicIdPartition;
|
||||
import org.apache.kafka.common.compress.Compression;
|
||||
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
|
||||
import org.apache.kafka.common.record.FileRecords;
|
||||
import org.apache.kafka.common.record.MemoryRecords;
|
||||
import org.apache.kafka.common.record.MemoryRecordsBuilder;
|
||||
import org.apache.kafka.common.record.TimestampType;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.kafka.test.TestUtils.tempFile;
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
|
@ -74,4 +87,62 @@ public class ShareFetchTestUtils {
|
|||
assertEquals(original.get(key), result.get(key));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a file records with the given offset values, the number of records from each given start
|
||||
* offset.
|
||||
*
|
||||
* @param recordsPerOffset The offset values and the number of records to create from given offset.
|
||||
* @return The file records.
|
||||
* @throws IOException If the file records cannot be created.
|
||||
*/
|
||||
public static FileRecords createFileRecords(Map<Long, Integer> recordsPerOffset) throws IOException {
|
||||
FileRecords fileRecords = FileRecords.open(tempFile());
|
||||
for (Entry<Long, Integer> entry : recordsPerOffset.entrySet()) {
|
||||
try (MemoryRecordsBuilder records = memoryRecordsBuilder(entry.getValue(), entry.getKey())) {
|
||||
fileRecords.append(records.build());
|
||||
}
|
||||
}
|
||||
return fileRecords;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a memory records builder with the given number of records and start offset.
|
||||
*
|
||||
* @param numOfRecords The number of records to create.
|
||||
* @param startOffset The start offset of the records.
|
||||
* @return The memory records builder.
|
||||
*/
|
||||
public static MemoryRecordsBuilder memoryRecordsBuilder(int numOfRecords, long startOffset) {
|
||||
return memoryRecordsBuilder(ByteBuffer.allocate(1024), numOfRecords, startOffset);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a memory records builder with the number of records and start offset, in the given buffer.
|
||||
*
|
||||
* @param buffer The buffer to write the records to.
|
||||
* @param numOfRecords The number of records to create.
|
||||
* @param startOffset The start offset of the records.
|
||||
* @return The memory records builder.
|
||||
*/
|
||||
public static MemoryRecordsBuilder memoryRecordsBuilder(ByteBuffer buffer, int numOfRecords, long startOffset) {
|
||||
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE,
|
||||
TimestampType.CREATE_TIME, startOffset, 2);
|
||||
for (int i = 0; i < numOfRecords; i++) {
|
||||
builder.appendWithOffset(startOffset + i, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a share acquired records from the given acquired records.
|
||||
*
|
||||
* @param acquiredRecords The acquired records to create the share acquired records from.
|
||||
* @return The share acquired records.
|
||||
*/
|
||||
public static ShareAcquiredRecords createShareAcquiredRecords(AcquiredRecords acquiredRecords) {
|
||||
return new ShareAcquiredRecords(
|
||||
List.of(acquiredRecords), (int) (acquiredRecords.lastOffset() - acquiredRecords.firstOffset() + 1)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue