diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java b/core/src/main/java/kafka/server/share/ShareFetchUtils.java index b957cbbdcf1..da4f8ef6668 100644 --- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java +++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java @@ -25,8 +25,11 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.errors.OffsetNotAvailableException; import org.apache.kafka.common.message.ShareFetchResponseData; +import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch; import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.Records; import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.server.share.fetch.ShareAcquiredRecords; @@ -39,6 +42,7 @@ import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -122,13 +126,7 @@ public class ShareFetchUtils { .setAcquiredRecords(Collections.emptyList()); } else { partitionData - // We set the records to the fetchPartitionData records. We do not alter the records - // fetched from the replica manager as they follow zero copy buffer. The acquired records - // might be a subset of the records fetched from the replica manager, depending - // on the max fetch records or available records in the share partition. The client - // sends the max bytes in request which should limit the bytes sent to the client - // in the response. - .setRecords(fetchPartitionData.records) + .setRecords(maybeSliceFetchRecords(fetchPartitionData.records, shareAcquiredRecords)) .setAcquiredRecords(shareAcquiredRecords.acquiredRecords()); acquiredRecordsCount += shareAcquiredRecords.count(); } @@ -196,4 +194,68 @@ public class ShareFetchUtils { } return partition; } + + /** + * Slice the fetch records based on the acquired records. The slicing is done based on the first + * and last offset of the acquired records from the list. The slicing doesn't consider individual + * acquired batches rather the boundaries of the acquired list. The method expects the acquired + * records list to be within the fetch records bounds. + * + * @param records The records to be sliced. + * @param shareAcquiredRecords The share acquired records containing the non-empty acquired records. + * @return The sliced records, if the records are of type FileRecords and the acquired records are a subset + * of the fetched records. Otherwise, the original records are returned. + */ + static Records maybeSliceFetchRecords(Records records, ShareAcquiredRecords shareAcquiredRecords) { + if (!(records instanceof FileRecords fileRecords)) { + return records; + } + // The acquired records should be non-empty, do not check as the method is called only when the + // acquired records are non-empty. + List acquiredRecords = shareAcquiredRecords.acquiredRecords(); + try { + final Iterator iterator = fileRecords.batchIterator(); + // Track the first overlapping batch with the first acquired offset. + FileChannelRecordBatch firstOverlapBatch = iterator.next(); + // If there exists single fetch batch, then return the original records. + if (!iterator.hasNext()) { + return records; + } + // Find the first and last acquired offset to slice the records. + final long firstAcquiredOffset = acquiredRecords.get(0).firstOffset(); + final long lastAcquiredOffset = acquiredRecords.get(acquiredRecords.size() - 1).lastOffset(); + int startPosition = 0; + int size = 0; + // Start iterating from the second batch. + while (iterator.hasNext()) { + FileChannelRecordBatch batch = iterator.next(); + // Iterate until finds the first overlap batch with the first acquired offset. All the + // batches before this first overlap batch should be sliced hence increment the start + // position. + if (batch.baseOffset() <= firstAcquiredOffset) { + startPosition += firstOverlapBatch.sizeInBytes(); + firstOverlapBatch = batch; + continue; + } + // Break if traversed all the batches till the last acquired offset. + if (batch.baseOffset() > lastAcquiredOffset) { + break; + } + size += batch.sizeInBytes(); + } + // Include the first overlap batch as it's the last batch traversed which overlapped the first + // acquired offset. + size += firstOverlapBatch.sizeInBytes(); + // Check if we do not need slicing i.e. neither start position nor size changed. + if (startPosition == 0 && size == fileRecords.sizeInBytes()) { + return records; + } + return fileRecords.slice(startPosition, size); + } catch (Exception e) { + log.error("Error while checking batches for acquired records: {}, skipping slicing.", acquiredRecords, e); + // If there is an exception while slicing, return the original records so that the fetch + // can continue with the original records. + return records; + } + } } diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index e037bd0081c..7eb6584bed7 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -35,7 +35,6 @@ import org.apache.kafka.server.purgatory.DelayedOperationPurgatory; import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey; import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy; -import org.apache.kafka.server.share.fetch.ShareAcquiredRecords; import org.apache.kafka.server.share.fetch.ShareFetch; import org.apache.kafka.server.share.metrics.ShareGroupMetrics; import org.apache.kafka.server.storage.log.FetchIsolation; @@ -74,6 +73,7 @@ import static kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_P import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES; import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult; import static kafka.server.share.SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch; +import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords; import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.orderedMap; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -186,7 +186,7 @@ public class DelayedShareFetchTest { when(sp0.canAcquireRecords()).thenReturn(true); when(sp1.canAcquireRecords()).thenReturn(false); when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any())).thenReturn( - ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); // We are testing the case when the share partition is getting fetched for the first time, so for the first time // the fetchOffsetMetadata will return empty. Post the readFromLog call, the fetchOffsetMetadata will be @@ -259,7 +259,7 @@ public class DelayedShareFetchTest { when(sp0.canAcquireRecords()).thenReturn(true); when(sp1.canAcquireRecords()).thenReturn(false); when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any())).thenReturn( - ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); // We are testing the case when the share partition has been fetched before, hence we are mocking positionDiff // functionality to give the file position difference as 1 byte, so it doesn't satisfy the minBytes(2). @@ -312,7 +312,7 @@ public class DelayedShareFetchTest { when(sp0.canAcquireRecords()).thenReturn(true); when(sp1.canAcquireRecords()).thenReturn(false); when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn( - ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0))); @@ -427,7 +427,7 @@ public class DelayedShareFetchTest { when(sp0.canAcquireRecords()).thenReturn(true); when(sp1.canAcquireRecords()).thenReturn(false); when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn( - ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Collections.singleton(tp0)); @@ -591,7 +591,7 @@ public class DelayedShareFetchTest { when(sp1.maybeAcquireFetchLock()).thenReturn(true); when(sp1.canAcquireRecords()).thenReturn(true); when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn( - ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); // when forceComplete is called for delayedShareFetch2, since tp1 is common in between delayed share fetch // requests, it should add a "check and complete" action for request key tp1 on the purgatory. @@ -689,7 +689,7 @@ public class DelayedShareFetchTest { when(sp0.canAcquireRecords()).thenReturn(true); when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any())).thenReturn( - ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); // Mocking partition object to throw an exception during min bytes calculation while calling fetchOffsetSnapshot @@ -897,15 +897,15 @@ public class DelayedShareFetchTest { BROKER_TOPIC_STATS); when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn( - ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn( - ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn( - ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); when(sp3.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn( - ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); when(sp4.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn( - ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); // All 5 partitions are acquirable. doAnswer(invocation -> buildLogReadResult(sharePartitions.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); @@ -995,9 +995,9 @@ public class DelayedShareFetchTest { BROKER_TOPIC_STATS); when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn( - ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn( - ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); // Only 2 out of 5 partitions are acquirable. Set acquirableTopicPartitions = new LinkedHashSet<>(); diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java index 82a9d410447..6baa3b05b53 100644 --- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java +++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java @@ -24,10 +24,14 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.errors.FencedLeaderEpochException; import org.apache.kafka.common.message.ShareFetchResponseData; +import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.server.share.SharePartitionKey; @@ -39,10 +43,12 @@ import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.server.storage.log.FetchPartitionData; import org.apache.kafka.storage.internals.log.OffsetResultHolder; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; +import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import java.io.IOException; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -54,6 +60,9 @@ import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES; +import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createFileRecords; +import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords; +import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder; import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.orderedMap; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -95,10 +104,10 @@ public class ShareFetchUtilsTest { when(sp1.nextFetchOffset()).thenReturn((long) 3); when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn( - ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords() + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords() .setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn( - ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords() + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords() .setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1))); LinkedHashMap sharePartitions = new LinkedHashMap<>(); @@ -222,10 +231,10 @@ public class ShareFetchUtilsTest { when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn( ShareAcquiredRecords.empty(), - ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords() + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords() .setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn( - ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords() + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords() .setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1)), ShareAcquiredRecords.empty()); @@ -352,7 +361,7 @@ public class ShareFetchUtilsTest { } @Test - public void testProcessFetchResponseWithMaxFetchRecords() { + public void testProcessFetchResponseWithMaxFetchRecords() throws IOException { String groupId = "grp"; TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); @@ -374,24 +383,29 @@ public class ShareFetchUtilsTest { ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, memberId.toString(), new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, 10, BROKER_TOPIC_STATS); - MemoryRecords records1 = MemoryRecords.withRecords(Compression.NONE, - new SimpleRecord("0".getBytes(), "v".getBytes()), - new SimpleRecord("1".getBytes(), "v".getBytes()), - new SimpleRecord("2".getBytes(), "v".getBytes()), - new SimpleRecord(null, "value".getBytes())); + LinkedHashMap recordsPerOffset = new LinkedHashMap<>(); + recordsPerOffset.put(0L, 1); + recordsPerOffset.put(1L, 1); + recordsPerOffset.put(2L, 1); + recordsPerOffset.put(3L, 1); + Records records1 = createFileRecords(recordsPerOffset); + + recordsPerOffset.clear(); + recordsPerOffset.put(100L, 4); + Records records2 = createFileRecords(recordsPerOffset); FetchPartitionData fetchPartitionData1 = new FetchPartitionData(Errors.NONE, 0L, 0L, records1, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false); FetchPartitionData fetchPartitionData2 = new FetchPartitionData(Errors.NONE, 0L, 0L, - records1, Optional.empty(), OptionalLong.empty(), Optional.empty(), + records2, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false); when(sp0.acquire(memberId.toString(), BATCH_SIZE, 10, 0, fetchPartitionData1)).thenReturn( - ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords() + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords() .setFirstOffset(0).setLastOffset(1).setDeliveryCount((short) 1))); when(sp1.acquire(memberId.toString(), BATCH_SIZE, 8, 0, fetchPartitionData2)).thenReturn( - ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords() + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords() .setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1))); // Send the topic partitions in order so can validate if correct mock is called, accounting @@ -401,23 +415,27 @@ public class ShareFetchUtilsTest { new ShareFetchPartitionData(tp1, 0, fetchPartitionData2) ); - Map resultData1 = + Map resultData = ShareFetchUtils.processFetchResponse(shareFetch, responseData, sharePartitions, mock(ReplicaManager.class), EXCEPTION_HANDLER); - assertEquals(2, resultData1.size()); - assertTrue(resultData1.containsKey(tp0)); - assertTrue(resultData1.containsKey(tp1)); - assertEquals(0, resultData1.get(tp0).partitionIndex()); - assertEquals(1, resultData1.get(tp1).partitionIndex()); - assertEquals(Errors.NONE.code(), resultData1.get(tp0).errorCode()); - assertEquals(Errors.NONE.code(), resultData1.get(tp1).errorCode()); - assertEquals(1, resultData1.get(tp0).acquiredRecords().size()); - assertEquals(0, resultData1.get(tp0).acquiredRecords().get(0).firstOffset()); - assertEquals(1, resultData1.get(tp0).acquiredRecords().get(0).lastOffset()); - assertEquals(1, resultData1.get(tp1).acquiredRecords().size()); - assertEquals(100, resultData1.get(tp1).acquiredRecords().get(0).firstOffset()); - assertEquals(103, resultData1.get(tp1).acquiredRecords().get(0).lastOffset()); + assertEquals(2, resultData.size()); + assertTrue(resultData.containsKey(tp0)); + assertTrue(resultData.containsKey(tp1)); + assertEquals(0, resultData.get(tp0).partitionIndex()); + assertEquals(1, resultData.get(tp1).partitionIndex()); + assertEquals(Errors.NONE.code(), resultData.get(tp0).errorCode()); + assertEquals(Errors.NONE.code(), resultData.get(tp1).errorCode()); + assertEquals(1, resultData.get(tp0).acquiredRecords().size()); + assertEquals(0, resultData.get(tp0).acquiredRecords().get(0).firstOffset()); + assertEquals(1, resultData.get(tp0).acquiredRecords().get(0).lastOffset()); + assertEquals(1, resultData.get(tp1).acquiredRecords().size()); + assertEquals(100, resultData.get(tp1).acquiredRecords().get(0).firstOffset()); + assertEquals(103, resultData.get(tp1).acquiredRecords().get(0).lastOffset()); + + // Validate the slicing for fetched data happened for tp0 records, not for tp1 records. + assertTrue(records1.sizeInBytes() > resultData.get(tp0).records().sizeInBytes()); + assertEquals(records2.sizeInBytes(), resultData.get(tp1).records().sizeInBytes()); } @Test @@ -455,4 +473,181 @@ public class ShareFetchUtilsTest { Mockito.verify(exceptionHandler, times(1)).accept(new SharePartitionKey("grp", tp0), exception); Mockito.verify(sp0, times(0)).updateCacheAndOffsets(any(Long.class)); } + + @Test + public void testMaybeSliceFetchRecordsSingleBatch() throws IOException { + // Create 1 batch of records with 10 records. + FileRecords records = createFileRecords(Map.of(5L, 10)); + + // Acquire all offsets, should return same records. + List acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(5).setLastOffset(14).setDeliveryCount((short) 1)); + Records slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 10)); + assertEquals(records, slicedRecords); + + // Acquire offsets out of first offset bound should return the records for the matching batch. + acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(2).setLastOffset(14).setDeliveryCount((short) 1)); + slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 10)); + assertEquals(records, slicedRecords); + + // Acquire offsets out of last offset bound should return the records for the matching batch. + acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(5).setLastOffset(20).setDeliveryCount((short) 1)); + slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 5)); + assertEquals(records, slicedRecords); + + // Acquire only subset of batch offsets, starting from the first offset. + acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(5).setLastOffset(8).setDeliveryCount((short) 1)); + slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 1)); + assertEquals(records, slicedRecords); + + // Acquire only subset of batch offsets, ending at the last offset. + acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(8).setLastOffset(14).setDeliveryCount((short) 1)); + slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 1)); + assertEquals(records, slicedRecords); + + // Acquire only subset of batch offsets, within the batch. + acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(8).setLastOffset(10).setDeliveryCount((short) 1)); + slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 1)); + assertEquals(records, slicedRecords); + } + + @Test + public void testMaybeSliceFetchRecordsMultipleBatches() throws IOException { + // Create 3 batches of records with 3, 2 and 4 records respectively. + LinkedHashMap recordsPerOffset = new LinkedHashMap<>(); + recordsPerOffset.put(0L, 3); + recordsPerOffset.put(3L, 2); + recordsPerOffset.put(7L, 4); // Gap of 2 offsets between batches. + FileRecords records = createFileRecords(recordsPerOffset); + + // Acquire all offsets, should return same records. + List acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(0).setLastOffset(10).setDeliveryCount((short) 1)); + Records slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 11)); + assertEquals(records, slicedRecords); + + // Acquire offsets from all batches, but only first record from last batch. Should return + // all batches. + acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(0).setLastOffset(7).setDeliveryCount((short) 1)); + slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 5)); + assertEquals(records, slicedRecords); + + // Acquire only first batch offsets, should return only first batch. + acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(0).setLastOffset(2).setDeliveryCount((short) 1)); + slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 5)); + assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes()); + List recordBatches = TestUtils.toList(slicedRecords.batches()); + assertEquals(1, recordBatches.size()); + assertEquals(0, recordBatches.get(0).baseOffset()); + assertEquals(2, recordBatches.get(0).lastOffset()); + + // Acquire only second batch offsets, should return only second batch. + acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(3).setLastOffset(4).setDeliveryCount((short) 1)); + slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 5)); + assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes()); + recordBatches = TestUtils.toList(slicedRecords.batches()); + assertEquals(1, recordBatches.size()); + assertEquals(3, recordBatches.get(0).baseOffset()); + assertEquals(4, recordBatches.get(0).lastOffset()); + + // Acquire only last batch offsets, should return only last batch. + acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(7).setLastOffset(10).setDeliveryCount((short) 1)); + slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 1)); + assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes()); + recordBatches = TestUtils.toList(slicedRecords.batches()); + assertEquals(1, recordBatches.size()); + assertEquals(7, recordBatches.get(0).baseOffset()); + assertEquals(10, recordBatches.get(0).lastOffset()); + + // Acquire only subset of first batch offsets, should return only first batch. + acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short) 1)); + slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 1)); + assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes()); + recordBatches = TestUtils.toList(slicedRecords.batches()); + assertEquals(1, recordBatches.size()); + assertEquals(0, recordBatches.get(0).baseOffset()); + assertEquals(2, recordBatches.get(0).lastOffset()); + + // Acquire only subset of second batch offsets, should return only second batch. + acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(4).setLastOffset(4).setDeliveryCount((short) 1)); + slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 1)); + assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes()); + recordBatches = TestUtils.toList(slicedRecords.batches()); + assertEquals(1, recordBatches.size()); + assertEquals(3, recordBatches.get(0).baseOffset()); + assertEquals(4, recordBatches.get(0).lastOffset()); + + // Acquire only subset of last batch offsets, should return only last batch. + acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(8).setLastOffset(8).setDeliveryCount((short) 1)); + slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 1)); + assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes()); + recordBatches = TestUtils.toList(slicedRecords.batches()); + assertEquals(1, recordBatches.size()); + assertEquals(7, recordBatches.get(0).baseOffset()); + assertEquals(10, recordBatches.get(0).lastOffset()); + + // Acquire including gaps between batches, should return 2 batches. + acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(4).setLastOffset(8).setDeliveryCount((short) 1)); + slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 1)); + assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes()); + recordBatches = TestUtils.toList(slicedRecords.batches()); + assertEquals(2, recordBatches.size()); + assertEquals(3, recordBatches.get(0).baseOffset()); + assertEquals(4, recordBatches.get(0).lastOffset()); + assertEquals(7, recordBatches.get(1).baseOffset()); + assertEquals(10, recordBatches.get(1).lastOffset()); + + // Acquire with multiple acquired records, should return matching batches. + acquiredRecords = List.of( + new AcquiredRecords().setFirstOffset(0).setLastOffset(2).setDeliveryCount((short) 1), + new AcquiredRecords().setFirstOffset(3).setLastOffset(4).setDeliveryCount((short) 1)); + slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 1)); + assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes()); + recordBatches = TestUtils.toList(slicedRecords.batches()); + assertEquals(2, recordBatches.size()); + assertEquals(0, recordBatches.get(0).baseOffset()); + assertEquals(2, recordBatches.get(0).lastOffset()); + assertEquals(3, recordBatches.get(1).baseOffset()); + assertEquals(4, recordBatches.get(1).lastOffset()); + + // Acquire with multiple acquired records of individual offsets from single batch, should return + // matching batch. + acquiredRecords = List.of( + new AcquiredRecords().setFirstOffset(8).setLastOffset(8).setDeliveryCount((short) 1), + new AcquiredRecords().setFirstOffset(9).setLastOffset(9).setDeliveryCount((short) 1)); + slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 1)); + assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes()); + recordBatches = TestUtils.toList(slicedRecords.batches()); + assertEquals(1, recordBatches.size()); + assertEquals(7, recordBatches.get(0).baseOffset()); + assertEquals(10, recordBatches.get(0).lastOffset()); + + // Acquire with multiple acquired records of individual offsets from multiple batch, should return + // multiple matching batches. + acquiredRecords = List.of( + new AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short) 1), + new AcquiredRecords().setFirstOffset(9).setLastOffset(9).setDeliveryCount((short) 1)); + slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new ShareAcquiredRecords(acquiredRecords, 1)); + assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes()); + } + + @Test + public void testMaybeSliceFetchRecordsException() throws IOException { + // Create 1 batch of records with 3 records. + FileRecords records = createFileRecords(Map.of(0L, 3)); + // Send empty acquired records which should trigger an exception and same file records should + // be returned. The method doesn't expect empty acquired records. + Records slicedRecords = ShareFetchUtils.maybeSliceFetchRecords( + records, new ShareAcquiredRecords(List.of(), 3)); + assertEquals(records, slicedRecords); + } + + @Test + public void testMaybeSliceFetchRecordsNonFileRecords() { + // Send memory records which should be returned as is. + try (MemoryRecordsBuilder records = memoryRecordsBuilder(2, 0)) { + List acquiredRecords = List.of(new AcquiredRecords().setFirstOffset(0).setLastOffset(1).setDeliveryCount((short) 1)); + Records slicedRecords = ShareFetchUtils.maybeSliceFetchRecords( + records.build(), new ShareAcquiredRecords(acquiredRecords, 2)); + assertEquals(records.build(), slicedRecords); + } + } } diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index d970aecf51c..6257e87f96e 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -25,7 +25,6 @@ import kafka.server.share.SharePartitionManager.SharePartitionListener; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.errors.CoordinatorNotAvailableException; import org.apache.kafka.common.errors.FencedStateEpochException; import org.apache.kafka.common.errors.GroupIdNotFoundException; @@ -40,7 +39,6 @@ import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.Records; -import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -85,6 +83,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID; +import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder; import static org.apache.kafka.test.TestUtils.assertFutureThrows; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -6233,19 +6232,6 @@ public class SharePartitionTest { return memoryRecordsBuilder(numOfRecords, startOffset).build(); } - private MemoryRecordsBuilder memoryRecordsBuilder(int numOfRecords, long startOffset) { - return memoryRecordsBuilder(ByteBuffer.allocate(1024), numOfRecords, startOffset); - } - - private MemoryRecordsBuilder memoryRecordsBuilder(ByteBuffer buffer, int numOfRecords, long startOffset) { - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, - TimestampType.CREATE_TIME, startOffset, 2); - for (int i = 0; i < numOfRecords; i++) { - builder.appendWithOffset(startOffset + i, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes()); - } - return builder; - } - private List expectedAcquiredRecord(long baseOffset, long lastOffset, int deliveryCount) { return Collections.singletonList(new AcquiredRecords() .setFirstOffset(baseOffset) diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index 3745dd8e4f8..75b5b41004d 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -554,6 +554,7 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read + diff --git a/server/src/main/java/org/apache/kafka/server/share/fetch/ShareAcquiredRecords.java b/server/src/main/java/org/apache/kafka/server/share/fetch/ShareAcquiredRecords.java index 3e97e5b11b3..4eaa46447b6 100644 --- a/server/src/main/java/org/apache/kafka/server/share/fetch/ShareAcquiredRecords.java +++ b/server/src/main/java/org/apache/kafka/server/share/fetch/ShareAcquiredRecords.java @@ -19,8 +19,8 @@ package org.apache.kafka.server.share.fetch; import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords; -import java.util.Collections; import java.util.List; +import java.util.Objects; /** * The ShareAcquiredRecords class is used to send the acquired records and associated metadata. @@ -43,12 +43,12 @@ public class ShareAcquiredRecords { List acquiredRecords, int count ) { - this.acquiredRecords = acquiredRecords; + this.acquiredRecords = Objects.requireNonNull(acquiredRecords); this.count = count; } private ShareAcquiredRecords() { - this.acquiredRecords = Collections.emptyList(); + this.acquiredRecords = List.of(); this.count = 0; } @@ -63,10 +63,4 @@ public class ShareAcquiredRecords { public static ShareAcquiredRecords empty() { return EMPTY_SHARE_ACQUIRED_RECORDS; } - - public static ShareAcquiredRecords fromAcquiredRecords(AcquiredRecords acquiredRecords) { - return new ShareAcquiredRecords( - List.of(acquiredRecords), (int) (acquiredRecords.lastOffset() - acquiredRecords.firstOffset() + 1) - ); - } } diff --git a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java index 9a83bebf88b..ef3aae2eee7 100644 --- a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java +++ b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java @@ -17,10 +17,23 @@ package org.apache.kafka.server.share.fetch; import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.test.TestUtils; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Set; +import static org.apache.kafka.test.TestUtils.tempFile; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -74,4 +87,62 @@ public class ShareFetchTestUtils { assertEquals(original.get(key), result.get(key)); } } + + /** + * Create a file records with the given offset values, the number of records from each given start + * offset. + * + * @param recordsPerOffset The offset values and the number of records to create from given offset. + * @return The file records. + * @throws IOException If the file records cannot be created. + */ + public static FileRecords createFileRecords(Map recordsPerOffset) throws IOException { + FileRecords fileRecords = FileRecords.open(tempFile()); + for (Entry entry : recordsPerOffset.entrySet()) { + try (MemoryRecordsBuilder records = memoryRecordsBuilder(entry.getValue(), entry.getKey())) { + fileRecords.append(records.build()); + } + } + return fileRecords; + } + + /** + * Create a memory records builder with the given number of records and start offset. + * + * @param numOfRecords The number of records to create. + * @param startOffset The start offset of the records. + * @return The memory records builder. + */ + public static MemoryRecordsBuilder memoryRecordsBuilder(int numOfRecords, long startOffset) { + return memoryRecordsBuilder(ByteBuffer.allocate(1024), numOfRecords, startOffset); + } + + /** + * Create a memory records builder with the number of records and start offset, in the given buffer. + * + * @param buffer The buffer to write the records to. + * @param numOfRecords The number of records to create. + * @param startOffset The start offset of the records. + * @return The memory records builder. + */ + public static MemoryRecordsBuilder memoryRecordsBuilder(ByteBuffer buffer, int numOfRecords, long startOffset) { + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, + TimestampType.CREATE_TIME, startOffset, 2); + for (int i = 0; i < numOfRecords; i++) { + builder.appendWithOffset(startOffset + i, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes()); + } + return builder; + } + + /** + * Create a share acquired records from the given acquired records. + * + * @param acquiredRecords The acquired records to create the share acquired records from. + * @return The share acquired records. + */ + public static ShareAcquiredRecords createShareAcquiredRecords(AcquiredRecords acquiredRecords) { + return new ShareAcquiredRecords( + List.of(acquiredRecords), (int) (acquiredRecords.lastOffset() - acquiredRecords.firstOffset() + 1) + ); + } }