kafka/core
Apoorv Mittal 997abe464f
KAFKA-19389: Fix memory consumption for completed share fetch requests (#19928)
For ShareFetch Requests, the fetch happens through DelayedShareFetch
operation. The operations which are already completed has reference to
data being sent as response. As the operation is watched over multiple
keys i.e. DelayedShareFetchGroupKey and DelayedShareFetchPartitionKey,
hence if the operation is already completed by either  watched keys  but
then again the reference to the operation is still present in other
watched  key. Which means the memory can only be free once purge
operation is  triggered by DelayedOperationPurgatory which removes the
watched key  operation from remaining keys, as the operation is already
completed.

The purge operation is dependent on the config
`ShareGroupConfig#SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG`
hence if the value is not smaller than the number of share fetch
requests which can consume complete memory of the broker then broker can
go out of memory. This can also be avoided by having lower fetch max
bytes for request but this value is client dependent hence can't rely to
prevent  the broker.

This PR triggers the completion on both watched keys hence the
DelayedShareFetch operation shall be removed from both keys which frees
the broker memory as soon the share fetch response is sent.

#### Testing

Tested with LocalTieredStorage where broker goes OOM after reading some
8040 messages before the fix, with default configurations as mentioned
in the
doc

[here](https://kafka.apache.org/documentation/#tiered_storage_config_ex).
But after the fix the consumption continues without any issue. And the
memory is released instantaneously.

Reviewers: Jun Rao <junrao@gmail.com>, Andrew Schofield
<aschofield@confluent.io>
2025-06-10 17:36:27 +01:00
..
src KAFKA-19389: Fix memory consumption for completed share fetch requests (#19928) 2025-06-10 17:36:27 +01:00
.gitignore