KAFKA-19389: Fix memory consumption for completed share fetch requests (#19928)

For ShareFetch Requests, the fetch happens through DelayedShareFetch
operation. The operations which are already completed has reference to
data being sent as response. As the operation is watched over multiple
keys i.e. DelayedShareFetchGroupKey and DelayedShareFetchPartitionKey,
hence if the operation is already completed by either  watched keys  but
then again the reference to the operation is still present in other
watched  key. Which means the memory can only be free once purge
operation is  triggered by DelayedOperationPurgatory which removes the
watched key  operation from remaining keys, as the operation is already
completed.

The purge operation is dependent on the config
`ShareGroupConfig#SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG`
hence if the value is not smaller than the number of share fetch
requests which can consume complete memory of the broker then broker can
go out of memory. This can also be avoided by having lower fetch max
bytes for request but this value is client dependent hence can't rely to
prevent  the broker.

This PR triggers the completion on both watched keys hence the
DelayedShareFetch operation shall be removed from both keys which frees
the broker memory as soon the share fetch response is sent.

#### Testing

Tested with LocalTieredStorage where broker goes OOM after reading some
8040 messages before the fix, with default configurations as mentioned
in the
doc

[here](https://kafka.apache.org/documentation/#tiered_storage_config_ex).
But after the fix the consumption continues without any issue. And the
memory is released instantaneously.

Reviewers: Jun Rao <junrao@gmail.com>, Andrew Schofield
<aschofield@confluent.io>
This commit is contained in:
Apoorv Mittal 2025-06-10 17:36:27 +01:00 committed by GitHub
parent f69379cf6b
commit 997abe464f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 21 additions and 6 deletions

View File

@ -34,6 +34,7 @@ import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey;
import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.share.fetch.ShareFetchPartitionData;
@ -804,13 +805,22 @@ public class DelayedShareFetch extends DelayedOperation {
}
// Releasing the lock to move ahead with the next request in queue.
releasePartitionLocks(topicIdPartitions);
// If we have a fetch request completed for a topic-partition, we release the locks for that partition,
// then we should check if there is a pending share fetch request for the topic-partition and complete it.
// We add the action to delayed actions queue to avoid an infinite call stack, which could happen if
// we directly call delayedShareFetchPurgatory.checkAndComplete
replicaManager.addToActionQueue(() -> topicIdPartitions.forEach(topicIdPartition ->
replicaManager.addToActionQueue(() -> topicIdPartitions.forEach(topicIdPartition -> {
// If we have a fetch request completed for a share-partition, we release the locks for that partition,
// then we should check if there is a pending share fetch request for the share-partition and complete it.
// We add the action to delayed actions queue to avoid an infinite call stack, which could happen if
// we directly call delayedShareFetchPurgatory.checkAndComplete.
replicaManager.completeDelayedShareFetchRequest(
new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition()))));
new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition()));
// As DelayedShareFetch operation is watched over multiple keys, same operation might be
// completed and can contain references to data fetched. Hence, if the operation is not
// removed from other watched keys then there can be a memory leak. The removal of the
// operation is dependent on the purge task by DelayedOperationPurgatory. Hence, this can
// also be prevented by setting smaller value for configuration {@link ShareGroupConfig#SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG}.
// However, it's best to trigger the check on all the keys that are being watched which
// should free the memory for the completed operation.
replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchPartitionKey(topicIdPartition));
}));
}
/**

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.server.share.fetch;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import java.util.Objects;
@ -27,6 +28,10 @@ public class DelayedShareFetchPartitionKey implements DelayedShareFetchKey {
private final Uuid topicId;
private final int partition;
public DelayedShareFetchPartitionKey(TopicIdPartition topicIdPartition) {
this(topicIdPartition.topicId(), topicIdPartition.partition());
}
public DelayedShareFetchPartitionKey(Uuid topicId, int partition) {
this.topicId = topicId;
this.partition = partition;