diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 891e37f8af7..dbd7e5e1730 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -656,7 +656,7 @@ public class SharePartition { * @param isolationLevel The isolation level for the share fetch request. * @return The acquired records for the share partition. */ - @SuppressWarnings("cyclomaticcomplexity") // Consider refactoring to avoid suppression + @SuppressWarnings({"cyclomaticcomplexity", "methodlength"}) // Consider refactoring to avoid suppression public ShareAcquiredRecords acquire( String memberId, int batchSize, @@ -677,6 +677,16 @@ public class SharePartition { return ShareAcquiredRecords.empty(); } + LastOffsetAndMaxRecords lastOffsetAndMaxRecords = lastOffsetAndMaxRecordsToAcquire(fetchOffset, + maxFetchRecords, lastBatch.lastOffset()); + if (lastOffsetAndMaxRecords.maxRecords() <= 0) { + return ShareAcquiredRecords.empty(); + } + // The lastOffsetAndMaxRecords contains the last offset to acquire and the maximum number of records + // to acquire. + int maxRecordsToAcquire = lastOffsetAndMaxRecords.maxRecords(); + long lastOffsetToAcquire = lastOffsetAndMaxRecords.lastOffset(); + // We require the first batch of records to get the base offset. Stop parsing further // batches. RecordBatch firstBatch = fetchPartitionData.records.batches().iterator().next(); @@ -708,8 +718,10 @@ public class SharePartition { if (subMap.isEmpty()) { log.trace("No cached data exists for the share partition for requested fetch batch: {}-{}", groupId, topicIdPartition); + // Do not send the lastOffsetToAcquire as when the subMap is empty, it means that + // there isn't any overlap itself. ShareAcquiredRecords shareAcquiredRecords = acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(), - firstBatch.baseOffset(), lastBatch.lastOffset(), batchSize, maxFetchRecords); + firstBatch.baseOffset(), lastBatch.lastOffset(), batchSize, maxRecordsToAcquire); return maybeFilterAbortedTransactionalAcquiredRecords(fetchPartitionData, isolationLevel, shareAcquiredRecords); } @@ -726,7 +738,7 @@ public class SharePartition { // be an exact match, subset or span over multiple already fetched batches. for (Map.Entry entry : subMap.entrySet()) { // If the acquired count is equal to the max fetch records then break the loop. - if (acquiredCount >= maxFetchRecords) { + if (acquiredCount >= maxRecordsToAcquire) { break; } @@ -739,14 +751,14 @@ public class SharePartition { // Thus, a new batch needs to be acquired for the gap. if (maybeGapStartOffset < entry.getKey()) { ShareAcquiredRecords shareAcquiredRecords = acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(), - maybeGapStartOffset, entry.getKey() - 1, batchSize, maxFetchRecords); + maybeGapStartOffset, entry.getKey() - 1, batchSize, maxRecordsToAcquire); result.addAll(shareAcquiredRecords.acquiredRecords()); acquiredCount += shareAcquiredRecords.count(); } // Set nextBatchStartOffset as the last offset of the current in-flight batch + 1 maybeGapStartOffset = inFlightBatch.lastOffset() + 1; // If the acquired count is equal to the max fetch records then break the loop. - if (acquiredCount >= maxFetchRecords) { + if (acquiredCount >= maxRecordsToAcquire) { break; } } @@ -778,7 +790,7 @@ public class SharePartition { // Do not send max fetch records to acquireSubsetBatchRecords as we want to acquire // all the records from the batch as the batch will anyway be part of the file-records // response batch. - int acquiredSubsetCount = acquireSubsetBatchRecords(memberId, firstBatch.baseOffset(), lastBatch.lastOffset(), inFlightBatch, result); + int acquiredSubsetCount = acquireSubsetBatchRecords(memberId, firstBatch.baseOffset(), lastOffsetToAcquire, inFlightBatch, result); acquiredCount += acquiredSubsetCount; continue; } @@ -810,11 +822,11 @@ public class SharePartition { // Some of the request offsets are not found in the fetched batches. Acquire the // missing records as well. - if (acquiredCount < maxFetchRecords && subMap.lastEntry().getValue().lastOffset() < lastBatch.lastOffset()) { + if (acquiredCount < maxRecordsToAcquire && subMap.lastEntry().getValue().lastOffset() < lastOffsetToAcquire) { log.trace("There exists another batch which needs to be acquired as well"); ShareAcquiredRecords shareAcquiredRecords = acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(), subMap.lastEntry().getValue().lastOffset() + 1, - lastBatch.lastOffset(), batchSize, maxFetchRecords - acquiredCount); + lastOffsetToAcquire, batchSize, maxRecordsToAcquire - acquiredCount); result.addAll(shareAcquiredRecords.acquiredRecords()); acquiredCount += shareAcquiredRecords.count(); } @@ -1385,14 +1397,14 @@ public class SharePartition { sharePartitionMetrics.registerInFlightBatchCount(this.cachedState::size); } - private long numInFlightRecords() { + private int numInFlightRecords() { lock.readLock().lock(); - long numRecords; + int numRecords; try { if (cachedState.isEmpty()) { numRecords = 0; } else { - numRecords = this.endOffset - this.startOffset + 1; + numRecords = (int) (this.endOffset - this.startOffset + 1); } } finally { lock.readLock().unlock(); @@ -1449,6 +1461,46 @@ public class SharePartition { } } + private LastOffsetAndMaxRecords lastOffsetAndMaxRecordsToAcquire(long fetchOffset, int maxFetchRecords, long lastOffset) { + // There can always be records fetched exceeding the max in-flight messages limit. Hence, + // we need to check if the share partition has reached the max in-flight messages limit + // and only acquire limited records. + int maxRecordsToAcquire; + long lastOffsetToAcquire = lastOffset; + lock.readLock().lock(); + try { + int inFlightRecordsCount = numInFlightRecords(); + // Take minimum of maxFetchRecords and remaining capacity to fill max in-flight messages limit. + maxRecordsToAcquire = Math.min(maxFetchRecords, maxInFlightMessages - inFlightRecordsCount); + // If the maxRecordsToAcquire is less than or equal to 0, then ideally (check exists to not + // fetch records for share partitions which are at capacity) the fetch must be happening + // in-between the in-flight batches i.e. some in-flight records have been released (marked + // re-available). In such case, last offset to acquire should be adjusted to the endOffset + // of the share partition, if not adjusted then the records can be acquired post the endOffset. + // For example, if 30 records are already acquired i.e. [0-29] and single offset 20 is released + // then the next fetch request will be at 20. Difference from endOffset will be 10, which + // means that some offset past the endOffset can be acquired (21-29 are already acquired). + // Hence, the lastOffsetToAcquire should be adjusted to the endOffset. + if (maxRecordsToAcquire <= 0) { + if (fetchOffset <= endOffset()) { + // Adjust the max records to acquire to the capacity available to fill the max + // in-flight messages limit. This can happen when the fetch is happening in-between + // the in-flight batches and the share partition has reached the max in-flight messages limit. + maxRecordsToAcquire = Math.min(maxFetchRecords, (int) (endOffset() - fetchOffset + 1)); + // Adjust the last offset to acquire to the endOffset of the share partition. + lastOffsetToAcquire = endOffset(); + } else { + // The share partition is already at max in-flight messages, hence cannot acquire more records. + log.debug("Share partition {}-{} has reached max in-flight messages limit: {}. Cannot acquire more records, inflight records count: {}", + groupId, topicIdPartition, maxInFlightMessages, inFlightRecordsCount); + } + } + } finally { + lock.readLock().unlock(); + } + return new LastOffsetAndMaxRecords(lastOffsetToAcquire, maxRecordsToAcquire); + } + private ShareAcquiredRecords acquireNewBatchRecords( String memberId, Iterable batches, @@ -2242,7 +2294,7 @@ public class SharePartition { .setGroupId(this.groupId) .setTopicsData(List.of(new TopicData<>(topicIdPartition.topicId(), List.of(PartitionFactory.newPartitionStateBatchData( - topicIdPartition.partition(), stateEpoch, startOffset, leaderEpoch, stateBatches)))) + topicIdPartition.partition(), stateEpoch, startOffset(), leaderEpoch, stateBatches)))) ).build()).build()) .whenComplete((result, exception) -> { if (exception != null) { @@ -2803,4 +2855,13 @@ public class SharePartition { this.offsetMetadata = offsetMetadata; } } + + /** + * LastOffsetAndMaxRecords class is used to track the last offset to acquire and the maximum number + * of records that can be acquired in a fetch request. + */ + private record LastOffsetAndMaxRecords( + long lastOffset, + int maxRecords + ) { } } diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index a0bdfee7b5f..c4fc0a0454d 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -1510,6 +1510,150 @@ public class SharePartitionTest { assertTrue(sharePartition.cachedState().containsKey(12L)); } + @Test + public void testAcquireWithMaxInFlightMessagesAndTryAcquireNewBatch() { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withSharePartitionMetrics(sharePartitionMetrics) + .withMaxInflightMessages(20) + .build(); + + // Acquire records, all 10 records should be acquired as within maxInflightMessages limit. + List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 500 /* Max fetch records */, + DEFAULT_FETCH_OFFSET, + fetchPartitionData(memoryRecords(10, 0), 0), + FETCH_ISOLATION_HWM), + 10); + // Validate all 10 records will be acquired as the maxInFlightMessages is 20. + assertArrayEquals(expectedAcquiredRecord(0, 9, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(10, sharePartition.nextFetchOffset()); + + // Create 4 batches of records. + ByteBuffer buffer = ByteBuffer.allocate(4096); + memoryRecordsBuilder(buffer, 5, 10).close(); + memoryRecordsBuilder(buffer, 10, 15).close(); + memoryRecordsBuilder(buffer, 5, 25).close(); + memoryRecordsBuilder(buffer, 2, 30).close(); + + buffer.flip(); + + MemoryRecords records = MemoryRecords.readableRecords(buffer); + + // Acquire records, should be acquired till maxInFlightMessages i.e. 20 records. As second batch + // is ending at 24 offset, hence additional 15 records will be acquired. + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 500 /* Max fetch records */, + DEFAULT_FETCH_OFFSET, + fetchPartitionData(records, 0), + FETCH_ISOLATION_HWM), + 15); + + // Validate 2 batches are fetched one with 5 records and other till end of batch, third batch + // should be skipped. + assertArrayEquals(expectedAcquiredRecord(10, 24, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(25, sharePartition.nextFetchOffset()); + + // Should not acquire any records as the share partition is at capacity and fetch offset is beyond + // the end offset. + fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 500 /* Max fetch records */, + 25 /* Fetch Offset */, + fetchPartitionData(memoryRecords(10, 25), 10), + FETCH_ISOLATION_HWM), + 0); + + assertEquals(25, sharePartition.nextFetchOffset()); + } + + @Test + public void testAcquireWithMaxInFlightMessagesAndReleaseLastOffset() { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withSharePartitionMetrics(sharePartitionMetrics) + .withMaxInflightMessages(20) + .build(); + + // Create 4 batches of records. + ByteBuffer buffer = ByteBuffer.allocate(4096); + memoryRecordsBuilder(buffer, 5, 10).close(); + memoryRecordsBuilder(buffer, 10, 15).close(); + memoryRecordsBuilder(buffer, 5, 25).close(); + memoryRecordsBuilder(buffer, 3, 30).close(); + + buffer.flip(); + + MemoryRecords records = MemoryRecords.readableRecords(buffer); + // Acquire records, should be acquired till maxInFlightMessages i.e. 20 records till 29 offset. + List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 500 /* Max fetch records */, + DEFAULT_FETCH_OFFSET, + fetchPartitionData(records, 10), + FETCH_ISOLATION_HWM), + 20); + + // Validate 3 batches are fetched and fourth batch should be skipped. Max in-flight messages + // limit is reached. + assertArrayEquals(expectedAcquiredRecord(10, 29, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(30, sharePartition.nextFetchOffset()); + + // Release middle batch. + CompletableFuture ackResult = sharePartition.acknowledge( + MEMBER_ID, + List.of(new ShareAcknowledgementBatch(15, 19, List.of((byte) 2)))); + assertNull(ackResult.join()); + assertFalse(ackResult.isCompletedExceptionally()); + // Validate the nextFetchOffset is updated to 15. + assertEquals(15, sharePartition.nextFetchOffset()); + + // The complete released batch should be acquired but not the last batch, starting at offset 30, + // as the lastOffset is adjusted according to the endOffset. + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 500 /* Max fetch records */, + 15 /* Fetch Offset */, + fetchPartitionData(records, 10), + FETCH_ISOLATION_HWM), + 5); + + // Validate 1 batch is fetched, with 5 records till end of batch, last available batch should + // not be acquired + assertArrayEquals(expectedAcquiredRecords(15, 19, 2).toArray(), acquiredRecordsList.toArray()); + assertEquals(30, sharePartition.nextFetchOffset()); + + // Release last offset of the acquired batch. Only 1 record should be released and later acquired. + ackResult = sharePartition.acknowledge( + MEMBER_ID, + List.of(new ShareAcknowledgementBatch(29, 29, List.of((byte) 2)))); + assertNull(ackResult.join()); + assertFalse(ackResult.isCompletedExceptionally()); + // Validate the nextFetchOffset is updated to 29. + assertEquals(29, sharePartition.nextFetchOffset()); + + // Only the last record of the acquired batch should be acquired again. + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 500 /* Max fetch records */, + 29 /* Fetch Offset */, + fetchPartitionData(records, 10), + FETCH_ISOLATION_HWM), + 1); + + // Validate 1 record is acquired. + assertArrayEquals(expectedAcquiredRecord(29, 29, 2).toArray(), acquiredRecordsList.toArray()); + assertEquals(30, sharePartition.nextFetchOffset()); + } + @Test public void testNextFetchOffsetInitialState() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); diff --git a/server/src/main/java/org/apache/kafka/server/share/metrics/SharePartitionMetrics.java b/server/src/main/java/org/apache/kafka/server/share/metrics/SharePartitionMetrics.java index 6ea94d46374..e83a3932402 100644 --- a/server/src/main/java/org/apache/kafka/server/share/metrics/SharePartitionMetrics.java +++ b/server/src/main/java/org/apache/kafka/server/share/metrics/SharePartitionMetrics.java @@ -96,7 +96,7 @@ public class SharePartitionMetrics implements AutoCloseable { * * @param messageCountSupplier The supplier for the in-flight message count. */ - public void registerInFlightMessageCount(Supplier messageCountSupplier) { + public void registerInFlightMessageCount(Supplier messageCountSupplier) { metricsGroup.newGauge( IN_FLIGHT_MESSAGE_COUNT, messageCountSupplier,