MINOR: Add documentation about KIP-405 remote reads serving just one partition per FetchRequest (#19336)

[As discussed in the mailing
list](https://lists.apache.org/thread/m03mpkm93737kk6d1nd6fbv9wdgsrhv9),
the broker only fetches remote data for ONE partition in a given
FetchRequest. In other words, if a consumer sends a FetchRequest
requesting 50 topic-partitions, and each partition's requested offset is
not stored locally - the broker will fetch and respond with just one
partition's worth of data from the remote store, and the rest will be
empty.

Given our defaults for total fetch response is 50 MiB and per partition
is 1 MiB, this can limit throughput. This patch documents the behavior
in 3 configs - `fetch.max.bytes`, `max.partition.fetch.bytes` and
`remote.fetch.max.wait.ms`

Reviewers: Luke Chen <showuon@gmail.com>, Kamal Chandraprakash
 <kamal.chandraprakash@gmail.com>, Satish Duggana <satishd@apache.org>
This commit is contained in:
Stanislav Kozlovski 2025-05-10 13:18:55 +02:00 committed by GitHub
parent 042be5b9ac
commit 0bc8d0c962
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 14 additions and 7 deletions

View File

@ -196,7 +196,10 @@ public class ConsumerConfig extends AbstractConfig {
"Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than " +
"this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. " +
"The maximum record batch size accepted by the broker is defined via <code>message.max.bytes</code> (broker config) or " +
"<code>max.message.bytes</code> (topic config). Note that the consumer performs multiple fetches in parallel.";
"<code>max.message.bytes</code> (topic config). A fetch request consists of many partitions, and there is another setting that controls how much " +
"data is returned for each partition in a fetch request - see <code>max.partition.fetch.bytes</code>. Note that there is a current limitation when " +
"performing remote reads from tiered storage (KIP-405) - only one partition out of the fetch request is fetched from the remote store (KAFKA-14915). " +
"Note also that the consumer performs multiple fetches in parallel.";
public static final int DEFAULT_FETCH_MAX_BYTES = 50 * 1024 * 1024;
/**
@ -221,7 +224,9 @@ public class ConsumerConfig extends AbstractConfig {
"partition of the fetch is larger than this limit, the " +
"batch will still be returned to ensure that the consumer can make progress. The maximum record batch size " +
"accepted by the broker is defined via <code>message.max.bytes</code> (broker config) or " +
"<code>max.message.bytes</code> (topic config). See " + FETCH_MAX_BYTES_CONFIG + " for limiting the consumer request size.";
"<code>max.message.bytes</code> (topic config). See " + FETCH_MAX_BYTES_CONFIG + " for limiting the consumer request size. " +
"Consider increasing <code>max.partition.fetch.bytes</code> especially in the cases of remote storage reads (KIP-405), because currently only " +
"one partition per fetch request is served from the remote store (KAFKA-14915).";
public static final int DEFAULT_MAX_PARTITION_FETCH_BYTES = 1 * 1024 * 1024;
/** <code>send.buffer.bytes</code> */

View File

@ -4104,7 +4104,7 @@ foo
<p>In order to migrate from ZooKeeper to KRaft you need to use a bridge release. The last bridge release is Kafka 3.9.
See the <a href="/39/documentation/#kraft_zk_migration">ZooKeeper to KRaft Migration steps</a> in the 3.9 documentation.</p>
<h3 class="anchor-heading"><a id="tiered_storage" class="anchor-link"></a><a href="#kraft">6.9 Tiered Storage</a></h3>
<h3 class="anchor-heading"><a id="tiered_storage" class="anchor-link"></a><a href="#tiered_storage">6.9 Tiered Storage</a></h3>
<h4 class="anchor-heading"><a id="tiered_storage_overview" class="anchor-link"></a><a href="#tiered_storage_overview">Tiered Storage Overview</a></h4>
@ -4121,12 +4121,12 @@ foo
<h5 class="anchor-heading"><a id="tiered_storage_config_broker" class="anchor-link"></a><a href="#tiered_storage_config_broker">Broker Configurations</a></h5>
<p>By default, Kafka server will not enable tiered storage feature. <code>remote.log.storage.system.enable</code>
<p>By default, the Kafka server will not enable the tiered storage feature. <code>remote.log.storage.system.enable</code>
is the property to control whether to enable tiered storage functionality in a broker or not. Setting it to "true" enables this feature.
</p>
<p><code>RemoteStorageManager</code> is an interface to provide the lifecycle of remote log segments and indexes. Kafka server
doesn't provide out-of-the-box implementation of RemoteStorageManager. Configuring <code>remote.log.storage.manager.class.name</code>
doesn't provide out-of-the-box implementation of RemoteStorageManager. Users must configure <code>remote.log.storage.manager.class.name</code>
and <code>remote.log.storage.manager.class.path</code> to specify the implementation of RemoteStorageManager.
</p>
@ -4261,9 +4261,10 @@ $ bin/kafka-topics.sh --create --topic tieredTopic --bootstrap-server localhost:
<p>While the Tiered Storage works for most use cases, it is still important to be aware of the following limitations:
<ul>
<li>No support for compacted topics</li>
<li>Deleting tiered storage enabled topics is required before disabling tiered storage at the broker level</li>
<li>Disabling tiered storage on all topics where it is enabled is required before disabling tiered storage at the broker level</li>
<li>Admin actions related to tiered storage feature are only supported on clients from version 3.0 onwards</li>
<li>No support for log segments missing producer snapshot file. It can happen when topic is created before v2.8.0.</li>
<li>Only one partition per fetch request is served from the remote store. This limitation can become a bottleneck for consumer client throughput - consider configuring <code>max.partition.fetch.bytes</code> appropriately.</li>
</ul>
<p>For more information, please check <a href="https://cwiki.apache.org/confluence/x/9xDOEg">Kafka Tiered Storage GA Release Notes</a>.

View File

@ -183,7 +183,8 @@ public final class RemoteLogManagerConfig {
public static final int DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS = 1;
public static final String REMOTE_FETCH_MAX_WAIT_MS_PROP = "remote.fetch.max.wait.ms";
public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will wait before answering the remote fetch request";
public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will wait before answering the remote fetch request. " +
"Note that the broker currently only fetches one partition per fetch request from the remote store. (KAFKA-14915)";
public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 500;
public static final String REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP = "remote.list.offsets.request.timeout.ms";