KAFKA-19227: Piggybacked share fetch acknowledgements performance issue (#19612)

The PR fixes the issue when ShareAcknowledgements are piggybacked on
ShareFetch. The current default configuration in clients sets `batch
size` and `max fetch records` as per the `max.poll.records` config,
default 500. Which means all records in a single poll will be fetched
and acknowledged. Also the default configuration for inflight records in
a partition is 200. Which means prior fetch records has to be
acknowledged prior fetching another batch from share partition.

The piggybacked share fetch-acknowledgement calls from KafkaApis are
async and later the response is combined. If respective share fetch
starts waiting in purgatory because all inflight records are currently
full, hence when startOffset is moved as part of acknowledgement, then a
trigger should happen which should try completing any pending share
fetch requests in purgatory. Else the share fetch requests wait in
purgatory for timeout though records are available, which dips the share
fetch performance.

The regular fetch has a single criteria to land requests in purgatory,
which is min bytes criteria, hence any produce in respective topic
partition triggers to check any pending fetch requests. But share fetch
can wait in purgatory because of multiple reasons: 1) Min bytes 2)
Inflight records exhaustion 3) Share partition fetch lock competition.
The trigger already happens for 1 and current PR fixes 2. We will
investigate further if there should be any handling required for 3.

Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield
<aschofield@confluent.io>
This commit is contained in:
Apoorv Mittal 2025-05-06 09:58:25 +01:00 committed by GitHub
parent 9823d6781c
commit ac9520b922
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 56 additions and 14 deletions

View File

@ -328,6 +328,12 @@ public class SharePartition {
*/
private final OffsetMetadata fetchOffsetMetadata;
/**
* The delayed share fetch key is used to track the delayed share fetch requests that are waiting
* for the respective share partition.
*/
private final DelayedShareFetchKey delayedShareFetchKey;
/**
* The state epoch is used to track the version of the state of the share partition.
*/
@ -414,6 +420,7 @@ public class SharePartition {
this.replicaManager = replicaManager;
this.groupConfigManager = groupConfigManager;
this.fetchOffsetMetadata = new OffsetMetadata();
this.delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, topicIdPartition);
this.listener = listener;
this.sharePartitionMetrics = sharePartitionMetrics;
this.registerGaugeMetrics();
@ -551,6 +558,9 @@ public class SharePartition {
}
// Release the lock.
lock.writeLock().unlock();
// Avoid triggering the listener for waiting share fetch requests in purgatory as the
// share partition manager keeps track of same and will trigger the listener for the
// respective share partition.
// Complete the future.
if (isFailed) {
future.completeExceptionally(throwable);
@ -2048,6 +2058,10 @@ public class SharePartition {
}
writeShareGroupState(stateBatches).whenComplete((result, exception) -> {
// There can be a pending delayed share fetch requests for the share partition which are waiting
// on the startOffset to move ahead, hence track if the state is updated in the cache. If
// yes, then notify the delayed share fetch purgatory to complete the pending requests.
boolean cacheStateUpdated = false;
lock.writeLock().lock();
try {
if (exception != null) {
@ -2066,27 +2080,31 @@ public class SharePartition {
state.cancelAndClearAcquisitionLockTimeoutTask();
});
// Update the cached state and start and end offsets after acknowledging/releasing the acquired records.
maybeUpdateCachedStateAndOffsets();
cacheStateUpdated = maybeUpdateCachedStateAndOffsets();
future.complete(null);
} finally {
lock.writeLock().unlock();
// Maybe complete the delayed share fetch request if the state has been changed in cache
// which might have moved start offset ahead. Hence, the pending delayed share fetch
// request can be completed. The call should be made outside the lock to avoid deadlock.
maybeCompleteDelayedShareFetchRequest(cacheStateUpdated);
}
});
}
private void maybeUpdateCachedStateAndOffsets() {
private boolean maybeUpdateCachedStateAndOffsets() {
lock.writeLock().lock();
try {
if (!canMoveStartOffset()) {
return;
return false;
}
// This will help to find the next position for the startOffset.
// The new position of startOffset will be lastOffsetAcknowledged + 1
long lastOffsetAcknowledged = findLastOffsetAcknowledged();
// If lastOffsetAcknowledged is -1, this means we cannot move out startOffset ahead
// If lastOffsetAcknowledged is -1, this means we cannot move startOffset ahead
if (lastOffsetAcknowledged == -1) {
return;
return false;
}
// This is true if all records in the cachedState have been acknowledged (either Accept or Reject).
@ -2097,7 +2115,7 @@ public class SharePartition {
endOffset = lastCachedOffset + 1;
cachedState.clear();
// Nothing further to do.
return;
return true;
}
/*
@ -2144,6 +2162,7 @@ public class SharePartition {
if (lastKeyToRemove != -1) {
cachedState.subMap(firstKeyToRemove, true, lastKeyToRemove, true).clear();
}
return true;
} finally {
lock.writeLock().unlock();
}
@ -2405,13 +2424,10 @@ public class SharePartition {
lock.writeLock().unlock();
}
// If we have an acquisition lock timeout for a share-partition, then we should check if
// there is a pending share fetch request for the share-partition and complete it.
// Skip null check for stateBatches, it should always be initialized if reached here.
if (!stateBatches.isEmpty()) {
// If we have an acquisition lock timeout for a share-partition, then we should check if
// there is a pending share fetch request for the share-partition and complete it.
DelayedShareFetchKey delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition());
replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey);
}
maybeCompleteDelayedShareFetchRequest(!stateBatches.isEmpty());
}
private void releaseAcquisitionLockOnTimeoutForCompleteBatch(InFlightBatch inFlightBatch,
@ -2486,6 +2502,12 @@ public class SharePartition {
}
}
private void maybeCompleteDelayedShareFetchRequest(boolean shouldComplete) {
if (shouldComplete) {
replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey);
}
}
private long startOffsetDuringInitialization(long partitionDataStartOffset) {
// Set the state epoch and end offset from the persisted state.
if (partitionDataStartOffset != PartitionFactory.UNINITIALIZED_START_OFFSET) {

View File

@ -55,6 +55,7 @@ import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.metrics.SharePartitionMetrics;
import org.apache.kafka.server.share.persister.NoOpStatePersister;
@ -1670,7 +1671,11 @@ public class SharePartitionTest {
@Test
public void testAcknowledgeSingleRecordBatch() {
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
SharePartition sharePartition = SharePartitionBuilder.builder()
.withReplicaManager(replicaManager)
.withState(SharePartitionState.ACTIVE)
.build();
MemoryRecords records1 = memoryRecords(1, 0);
MemoryRecords records2 = memoryRecords(1, 1);
@ -1693,11 +1698,18 @@ public class SharePartitionTest {
assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(1L).batchState());
assertEquals(1, sharePartition.cachedState().get(1L).batchDeliveryCount());
assertNull(sharePartition.cachedState().get(1L).offsetState());
// Should not invoke completeDelayedShareFetchRequest as the first offset is not acknowledged yet.
Mockito.verify(replicaManager, Mockito.times(0))
.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(GROUP_ID, TOPIC_ID_PARTITION));
}
@Test
public void testAcknowledgeMultipleRecordBatch() {
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
SharePartition sharePartition = SharePartitionBuilder.builder()
.withReplicaManager(replicaManager)
.withState(SharePartitionState.ACTIVE)
.build();
MemoryRecords records = memoryRecords(10, 5);
List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 10);
@ -1711,6 +1723,9 @@ public class SharePartitionTest {
assertEquals(15, sharePartition.nextFetchOffset());
assertEquals(0, sharePartition.cachedState().size());
// Should invoke completeDelayedShareFetchRequest as the start offset is moved.
Mockito.verify(replicaManager, Mockito.times(1))
.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(GROUP_ID, TOPIC_ID_PARTITION));
}
@Test

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.server.share.fetch;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import java.util.Objects;
@ -28,6 +29,10 @@ public class DelayedShareFetchGroupKey implements DelayedShareFetchKey {
private final Uuid topicId;
private final int partition;
public DelayedShareFetchGroupKey(String groupId, TopicIdPartition topicIdPartition) {
this(groupId, topicIdPartition.topicId(), topicIdPartition.partition());
}
public DelayedShareFetchGroupKey(String groupId, Uuid topicId, int partition) {
this.groupId = groupId;
this.topicId = topicId;