From eb378da99cb3ee39188a1dc24e2d9a30e439a161 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Thu, 3 Jul 2025 10:45:59 +0800 Subject: [PATCH] KAFKA-19462: Count fetch size when remote fetch (#20088) Estimate the fetch size for remote fetch to avoid to exceed the `fetch.max.bytes` config. We don't want to query the remoteLogMetadata during API handling, thus we assume the remote fetch can get `max.partition.fetch.bytes` size. Tests added. Reviewers: Kamal Chandraprakash --- .../scala/kafka/server/ReplicaManager.scala | 6 +- .../kafka/server/ReplicaManagerTest.scala | 57 ++++++++++++------- 2 files changed, 42 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 00efb5f7a07..e70a4726216 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1852,7 +1852,11 @@ class ReplicaManager(val config: KafkaConfig, // Once we read from a non-empty partition, we stop ignoring request and partition level size limits if (recordBatchSize > 0) minOneMessage = false - limitBytes = math.max(0, limitBytes - recordBatchSize) + // Because we don't know how much data will be retrieved in remote fetch yet, and we don't want to block the API call + // to query remoteLogMetadata, assume it will fetch the max bytes size of data to avoid to exceed the "fetch.max.bytes" setting. + val estimatedRecordBatchSize = if (recordBatchSize == 0 && readResult.info.delayedRemoteStorageFetch.isPresent) + readResult.info.delayedRemoteStorageFetch.get.fetchMaxBytes else recordBatchSize + limitBytes = math.max(0, limitBytes - estimatedRecordBatchSize) result += (tp -> readResult) } result diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 3f2753d3ab7..761fc49b2ea 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -112,7 +112,9 @@ object ReplicaManagerTest { class ReplicaManagerTest { private val topic = "test-topic" + private val topic2 = "test-topic2" private val topicId = Uuid.fromString("YK2ed2GaTH2JpgzUaJ8tgg") + private val topicId2 = Uuid.randomUuid() private val topicIds = scala.Predef.Map("test-topic" -> topicId) private val topicNames = topicIds.map(_.swap) private val topicPartition = new TopicPartition(topic, 0) @@ -3294,38 +3296,53 @@ class ReplicaManagerTest { @ValueSource(booleans = Array(true, false)) def testOffsetOutOfRangeExceptionWhenReadFromLog(isFromFollower: Boolean): Unit = { val replicaId = if (isFromFollower) 1 else -1 + val fetchMaxBytes = 150 + val partitionMaxBytes = 100 val tp0 = new TopicPartition(topic, 0) + val tp02 = new TopicPartition(topic2, 0) val tidp0 = new TopicIdPartition(topicId, tp0) + val tidp02 = new TopicIdPartition(topicId2, tp02) // create a replicaManager with remoteLog enabled val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteFetchQuotaExceeded = Some(false)) try { val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + replicaManager.createPartition(tp02).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) val partition0Replicas = Seq[Integer](0, 1).asJava - val topicIds = Map(tp0.topic -> topicId).asJava + val topicIds = Map(tp0.topic -> topicId, tp02.topic -> topicId2).asJava val leaderEpoch = 0 val delta = createLeaderDelta(topicIds.get(topic), tp0, partition0Replicas.get(0), partition0Replicas, partition0Replicas) + val delta2 = createLeaderDelta(topicIds.get(topic2), tp02, partition0Replicas.get(0), partition0Replicas, partition0Replicas) val leaderMetadataImage = imageFromTopics(delta.apply()) + val leaderMetadataImage2 = imageFromTopics(delta2.apply()) replicaManager.applyDelta(delta, leaderMetadataImage) + replicaManager.applyDelta(delta2, leaderMetadataImage2) - val params = new FetchParams(replicaId, 1, 1000, 0, 100, FetchIsolation.LOG_END, Optional.empty) - // when reading log, it'll throw OffsetOutOfRangeException, which will be handled separately - val result = replicaManager.readFromLog(params, Seq(tidp0 -> new PartitionData(topicId, 1, 0, 100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), UNBOUNDED_QUOTA, false) + val params = new FetchParams(replicaId, 1, 100, 0, fetchMaxBytes, FetchIsolation.LOG_END, Optional.empty) + // when reading logs from 2 partitions, they'll throw OffsetOutOfRangeException, which will be handled separately + val results = replicaManager.readFromLog(params, Seq( + tidp0 -> new PartitionData(topicId, 1, 0, partitionMaxBytes, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch)), + tidp02 -> new PartitionData(topicId2, 1, 0, partitionMaxBytes, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), UNBOUNDED_QUOTA, false) - if (isFromFollower) { - // expect OFFSET_MOVED_TO_TIERED_STORAGE error returned if it's from follower, since the data is already available in remote log - assertEquals(Errors.OFFSET_MOVED_TO_TIERED_STORAGE, result.head._2.error) - } else { - assertEquals(Errors.NONE, result.head._2.error) - } - assertEquals(startOffset, result.head._2.leaderLogStartOffset) - assertEquals(endOffset, result.head._2.leaderLogEndOffset) - assertEquals(highHW, result.head._2.highWatermark) - if (isFromFollower) { - assertFalse(result.head._2.info.delayedRemoteStorageFetch.isPresent) - } else { - // for consumer fetch, we should return a delayedRemoteStorageFetch to wait for remote fetch - assertTrue(result.head._2.info.delayedRemoteStorageFetch.isPresent) + results.foreach { case (tidp, partitionData) => + assertEquals(startOffset, partitionData.leaderLogStartOffset) + assertEquals(endOffset, partitionData.leaderLogEndOffset) + assertEquals(highHW, partitionData.highWatermark) + if (isFromFollower) { + // expect OFFSET_MOVED_TO_TIERED_STORAGE error returned if it's from follower, since the data is already available in remote log + assertEquals(Errors.OFFSET_MOVED_TO_TIERED_STORAGE, partitionData.error) + assertFalse(partitionData.info.delayedRemoteStorageFetch.isPresent) + } else { + assertEquals(Errors.NONE, partitionData.error) + // for consumer fetch, we should return a delayedRemoteStorageFetch to wait for remote fetch + assertTrue(partitionData.info.delayedRemoteStorageFetch.isPresent) + // verify the 1st partition will set the fetchMaxBytes to partitionMaxBytes, + // and the 2nd one will set to the remaining (fetchMaxBytes - partitionMaxBytes) to meet the "fetch.max.bytes" config. + if (tidp.topic == topic) + assertEquals(partitionMaxBytes, partitionData.info.delayedRemoteStorageFetch.get().fetchMaxBytes) + else + assertEquals(fetchMaxBytes - partitionMaxBytes, partitionData.info.delayedRemoteStorageFetch.get().fetchMaxBytes) + } } } finally { replicaManager.shutdown(checkpointHW = false) @@ -3723,8 +3740,8 @@ class ReplicaManagerTest { partitionDir.mkdir() when(mockLog.dir).thenReturn(partitionDir) when(mockLog.parentDir).thenReturn(path) - when(mockLog.topicId).thenReturn(Optional.of(topicId)) - when(mockLog.topicPartition).thenReturn(new TopicPartition(topic, 0)) + when(mockLog.topicId).thenReturn(Optional.of(topicId)).thenReturn(Optional.of(topicId2)) + when(mockLog.topicPartition).thenReturn(new TopicPartition(topic, 0)).thenReturn(new TopicPartition(topic2, 0)) when(mockLog.highWatermark).thenReturn(highHW) when(mockLog.updateHighWatermark(anyLong())).thenReturn(0L) when(mockLog.logEndOffsetMetadata).thenReturn(new LogOffsetMetadata(10))