mirror of https://github.com/apache/kafka.git
KAFKA-19389: Fix memory consumption for completed share fetch requests (#19928)
For ShareFetch Requests, the fetch happens through DelayedShareFetch operation. The operations which are already completed has reference to data being sent as response. As the operation is watched over multiple keys i.e. DelayedShareFetchGroupKey and DelayedShareFetchPartitionKey, hence if the operation is already completed by either watched keys but then again the reference to the operation is still present in other watched key. Which means the memory can only be free once purge operation is triggered by DelayedOperationPurgatory which removes the watched key operation from remaining keys, as the operation is already completed. The purge operation is dependent on the config `ShareGroupConfig#SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG` hence if the value is not smaller than the number of share fetch requests which can consume complete memory of the broker then broker can go out of memory. This can also be avoided by having lower fetch max bytes for request but this value is client dependent hence can't rely to prevent the broker. This PR triggers the completion on both watched keys hence the DelayedShareFetch operation shall be removed from both keys which frees the broker memory as soon the share fetch response is sent. #### Testing Tested with LocalTieredStorage where broker goes OOM after reading some 8040 messages before the fix, with default configurations as mentioned in the doc [here](https://kafka.apache.org/documentation/#tiered_storage_config_ex). But after the fix the consumption continues without any issue. And the memory is released instantaneously. Reviewers: Jun Rao <junrao@gmail.com>, Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
parent
2b589a451a
commit
6ddcfab578
|
@ -34,6 +34,7 @@ import org.apache.kafka.server.metrics.KafkaMetricsGroup;
|
||||||
import org.apache.kafka.server.purgatory.DelayedOperation;
|
import org.apache.kafka.server.purgatory.DelayedOperation;
|
||||||
import org.apache.kafka.server.share.SharePartitionKey;
|
import org.apache.kafka.server.share.SharePartitionKey;
|
||||||
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
|
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
|
||||||
|
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey;
|
||||||
import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
|
import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
|
||||||
import org.apache.kafka.server.share.fetch.ShareFetch;
|
import org.apache.kafka.server.share.fetch.ShareFetch;
|
||||||
import org.apache.kafka.server.share.fetch.ShareFetchPartitionData;
|
import org.apache.kafka.server.share.fetch.ShareFetchPartitionData;
|
||||||
|
@ -804,13 +805,22 @@ public class DelayedShareFetch extends DelayedOperation {
|
||||||
}
|
}
|
||||||
// Releasing the lock to move ahead with the next request in queue.
|
// Releasing the lock to move ahead with the next request in queue.
|
||||||
releasePartitionLocks(topicIdPartitions);
|
releasePartitionLocks(topicIdPartitions);
|
||||||
// If we have a fetch request completed for a topic-partition, we release the locks for that partition,
|
replicaManager.addToActionQueue(() -> topicIdPartitions.forEach(topicIdPartition -> {
|
||||||
// then we should check if there is a pending share fetch request for the topic-partition and complete it.
|
// If we have a fetch request completed for a share-partition, we release the locks for that partition,
|
||||||
|
// then we should check if there is a pending share fetch request for the share-partition and complete it.
|
||||||
// We add the action to delayed actions queue to avoid an infinite call stack, which could happen if
|
// We add the action to delayed actions queue to avoid an infinite call stack, which could happen if
|
||||||
// we directly call delayedShareFetchPurgatory.checkAndComplete
|
// we directly call delayedShareFetchPurgatory.checkAndComplete.
|
||||||
replicaManager.addToActionQueue(() -> topicIdPartitions.forEach(topicIdPartition ->
|
|
||||||
replicaManager.completeDelayedShareFetchRequest(
|
replicaManager.completeDelayedShareFetchRequest(
|
||||||
new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition()))));
|
new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition()));
|
||||||
|
// As DelayedShareFetch operation is watched over multiple keys, same operation might be
|
||||||
|
// completed and can contain references to data fetched. Hence, if the operation is not
|
||||||
|
// removed from other watched keys then there can be a memory leak. The removal of the
|
||||||
|
// operation is dependent on the purge task by DelayedOperationPurgatory. Hence, this can
|
||||||
|
// also be prevented by setting smaller value for configuration {@link ShareGroupConfig#SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG}.
|
||||||
|
// However, it's best to trigger the check on all the keys that are being watched which
|
||||||
|
// should free the memory for the completed operation.
|
||||||
|
replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchPartitionKey(topicIdPartition));
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.server.share.fetch;
|
package org.apache.kafka.server.share.fetch;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.TopicIdPartition;
|
||||||
import org.apache.kafka.common.Uuid;
|
import org.apache.kafka.common.Uuid;
|
||||||
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
@ -27,6 +28,10 @@ public class DelayedShareFetchPartitionKey implements DelayedShareFetchKey {
|
||||||
private final Uuid topicId;
|
private final Uuid topicId;
|
||||||
private final int partition;
|
private final int partition;
|
||||||
|
|
||||||
|
public DelayedShareFetchPartitionKey(TopicIdPartition topicIdPartition) {
|
||||||
|
this(topicIdPartition.topicId(), topicIdPartition.partition());
|
||||||
|
}
|
||||||
|
|
||||||
public DelayedShareFetchPartitionKey(Uuid topicId, int partition) {
|
public DelayedShareFetchPartitionKey(Uuid topicId, int partition) {
|
||||||
this.topicId = topicId;
|
this.topicId = topicId;
|
||||||
this.partition = partition;
|
this.partition = partition;
|
||||||
|
|
Loading…
Reference in New Issue