mirror of https://github.com/apache/kafka.git
Estimate the fetch size for remote fetch to avoid to exceed the `fetch.max.bytes` config. We don't want to query the remoteLogMetadata during API handling, thus we assume the remote fetch can get `max.partition.fetch.bytes` size. Tests added. Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
This commit is contained in:
parent
7cb370b786
commit
eb378da99c
|
@ -1852,7 +1852,11 @@ class ReplicaManager(val config: KafkaConfig,
|
||||||
// Once we read from a non-empty partition, we stop ignoring request and partition level size limits
|
// Once we read from a non-empty partition, we stop ignoring request and partition level size limits
|
||||||
if (recordBatchSize > 0)
|
if (recordBatchSize > 0)
|
||||||
minOneMessage = false
|
minOneMessage = false
|
||||||
limitBytes = math.max(0, limitBytes - recordBatchSize)
|
// Because we don't know how much data will be retrieved in remote fetch yet, and we don't want to block the API call
|
||||||
|
// to query remoteLogMetadata, assume it will fetch the max bytes size of data to avoid to exceed the "fetch.max.bytes" setting.
|
||||||
|
val estimatedRecordBatchSize = if (recordBatchSize == 0 && readResult.info.delayedRemoteStorageFetch.isPresent)
|
||||||
|
readResult.info.delayedRemoteStorageFetch.get.fetchMaxBytes else recordBatchSize
|
||||||
|
limitBytes = math.max(0, limitBytes - estimatedRecordBatchSize)
|
||||||
result += (tp -> readResult)
|
result += (tp -> readResult)
|
||||||
}
|
}
|
||||||
result
|
result
|
||||||
|
|
|
@ -112,7 +112,9 @@ object ReplicaManagerTest {
|
||||||
class ReplicaManagerTest {
|
class ReplicaManagerTest {
|
||||||
|
|
||||||
private val topic = "test-topic"
|
private val topic = "test-topic"
|
||||||
|
private val topic2 = "test-topic2"
|
||||||
private val topicId = Uuid.fromString("YK2ed2GaTH2JpgzUaJ8tgg")
|
private val topicId = Uuid.fromString("YK2ed2GaTH2JpgzUaJ8tgg")
|
||||||
|
private val topicId2 = Uuid.randomUuid()
|
||||||
private val topicIds = scala.Predef.Map("test-topic" -> topicId)
|
private val topicIds = scala.Predef.Map("test-topic" -> topicId)
|
||||||
private val topicNames = topicIds.map(_.swap)
|
private val topicNames = topicIds.map(_.swap)
|
||||||
private val topicPartition = new TopicPartition(topic, 0)
|
private val topicPartition = new TopicPartition(topic, 0)
|
||||||
|
@ -3294,38 +3296,53 @@ class ReplicaManagerTest {
|
||||||
@ValueSource(booleans = Array(true, false))
|
@ValueSource(booleans = Array(true, false))
|
||||||
def testOffsetOutOfRangeExceptionWhenReadFromLog(isFromFollower: Boolean): Unit = {
|
def testOffsetOutOfRangeExceptionWhenReadFromLog(isFromFollower: Boolean): Unit = {
|
||||||
val replicaId = if (isFromFollower) 1 else -1
|
val replicaId = if (isFromFollower) 1 else -1
|
||||||
|
val fetchMaxBytes = 150
|
||||||
|
val partitionMaxBytes = 100
|
||||||
val tp0 = new TopicPartition(topic, 0)
|
val tp0 = new TopicPartition(topic, 0)
|
||||||
|
val tp02 = new TopicPartition(topic2, 0)
|
||||||
val tidp0 = new TopicIdPartition(topicId, tp0)
|
val tidp0 = new TopicIdPartition(topicId, tp0)
|
||||||
|
val tidp02 = new TopicIdPartition(topicId2, tp02)
|
||||||
// create a replicaManager with remoteLog enabled
|
// create a replicaManager with remoteLog enabled
|
||||||
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteFetchQuotaExceeded = Some(false))
|
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteFetchQuotaExceeded = Some(false))
|
||||||
try {
|
try {
|
||||||
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
|
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
|
||||||
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
|
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
|
||||||
|
replicaManager.createPartition(tp02).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
|
||||||
val partition0Replicas = Seq[Integer](0, 1).asJava
|
val partition0Replicas = Seq[Integer](0, 1).asJava
|
||||||
val topicIds = Map(tp0.topic -> topicId).asJava
|
val topicIds = Map(tp0.topic -> topicId, tp02.topic -> topicId2).asJava
|
||||||
val leaderEpoch = 0
|
val leaderEpoch = 0
|
||||||
val delta = createLeaderDelta(topicIds.get(topic), tp0, partition0Replicas.get(0), partition0Replicas, partition0Replicas)
|
val delta = createLeaderDelta(topicIds.get(topic), tp0, partition0Replicas.get(0), partition0Replicas, partition0Replicas)
|
||||||
|
val delta2 = createLeaderDelta(topicIds.get(topic2), tp02, partition0Replicas.get(0), partition0Replicas, partition0Replicas)
|
||||||
val leaderMetadataImage = imageFromTopics(delta.apply())
|
val leaderMetadataImage = imageFromTopics(delta.apply())
|
||||||
|
val leaderMetadataImage2 = imageFromTopics(delta2.apply())
|
||||||
replicaManager.applyDelta(delta, leaderMetadataImage)
|
replicaManager.applyDelta(delta, leaderMetadataImage)
|
||||||
|
replicaManager.applyDelta(delta2, leaderMetadataImage2)
|
||||||
|
|
||||||
val params = new FetchParams(replicaId, 1, 1000, 0, 100, FetchIsolation.LOG_END, Optional.empty)
|
val params = new FetchParams(replicaId, 1, 100, 0, fetchMaxBytes, FetchIsolation.LOG_END, Optional.empty)
|
||||||
// when reading log, it'll throw OffsetOutOfRangeException, which will be handled separately
|
// when reading logs from 2 partitions, they'll throw OffsetOutOfRangeException, which will be handled separately
|
||||||
val result = replicaManager.readFromLog(params, Seq(tidp0 -> new PartitionData(topicId, 1, 0, 100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), UNBOUNDED_QUOTA, false)
|
val results = replicaManager.readFromLog(params, Seq(
|
||||||
|
tidp0 -> new PartitionData(topicId, 1, 0, partitionMaxBytes, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch)),
|
||||||
|
tidp02 -> new PartitionData(topicId2, 1, 0, partitionMaxBytes, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), UNBOUNDED_QUOTA, false)
|
||||||
|
|
||||||
|
results.foreach { case (tidp, partitionData) =>
|
||||||
|
assertEquals(startOffset, partitionData.leaderLogStartOffset)
|
||||||
|
assertEquals(endOffset, partitionData.leaderLogEndOffset)
|
||||||
|
assertEquals(highHW, partitionData.highWatermark)
|
||||||
if (isFromFollower) {
|
if (isFromFollower) {
|
||||||
// expect OFFSET_MOVED_TO_TIERED_STORAGE error returned if it's from follower, since the data is already available in remote log
|
// expect OFFSET_MOVED_TO_TIERED_STORAGE error returned if it's from follower, since the data is already available in remote log
|
||||||
assertEquals(Errors.OFFSET_MOVED_TO_TIERED_STORAGE, result.head._2.error)
|
assertEquals(Errors.OFFSET_MOVED_TO_TIERED_STORAGE, partitionData.error)
|
||||||
} else {
|
assertFalse(partitionData.info.delayedRemoteStorageFetch.isPresent)
|
||||||
assertEquals(Errors.NONE, result.head._2.error)
|
|
||||||
}
|
|
||||||
assertEquals(startOffset, result.head._2.leaderLogStartOffset)
|
|
||||||
assertEquals(endOffset, result.head._2.leaderLogEndOffset)
|
|
||||||
assertEquals(highHW, result.head._2.highWatermark)
|
|
||||||
if (isFromFollower) {
|
|
||||||
assertFalse(result.head._2.info.delayedRemoteStorageFetch.isPresent)
|
|
||||||
} else {
|
} else {
|
||||||
|
assertEquals(Errors.NONE, partitionData.error)
|
||||||
// for consumer fetch, we should return a delayedRemoteStorageFetch to wait for remote fetch
|
// for consumer fetch, we should return a delayedRemoteStorageFetch to wait for remote fetch
|
||||||
assertTrue(result.head._2.info.delayedRemoteStorageFetch.isPresent)
|
assertTrue(partitionData.info.delayedRemoteStorageFetch.isPresent)
|
||||||
|
// verify the 1st partition will set the fetchMaxBytes to partitionMaxBytes,
|
||||||
|
// and the 2nd one will set to the remaining (fetchMaxBytes - partitionMaxBytes) to meet the "fetch.max.bytes" config.
|
||||||
|
if (tidp.topic == topic)
|
||||||
|
assertEquals(partitionMaxBytes, partitionData.info.delayedRemoteStorageFetch.get().fetchMaxBytes)
|
||||||
|
else
|
||||||
|
assertEquals(fetchMaxBytes - partitionMaxBytes, partitionData.info.delayedRemoteStorageFetch.get().fetchMaxBytes)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
replicaManager.shutdown(checkpointHW = false)
|
replicaManager.shutdown(checkpointHW = false)
|
||||||
|
@ -3723,8 +3740,8 @@ class ReplicaManagerTest {
|
||||||
partitionDir.mkdir()
|
partitionDir.mkdir()
|
||||||
when(mockLog.dir).thenReturn(partitionDir)
|
when(mockLog.dir).thenReturn(partitionDir)
|
||||||
when(mockLog.parentDir).thenReturn(path)
|
when(mockLog.parentDir).thenReturn(path)
|
||||||
when(mockLog.topicId).thenReturn(Optional.of(topicId))
|
when(mockLog.topicId).thenReturn(Optional.of(topicId)).thenReturn(Optional.of(topicId2))
|
||||||
when(mockLog.topicPartition).thenReturn(new TopicPartition(topic, 0))
|
when(mockLog.topicPartition).thenReturn(new TopicPartition(topic, 0)).thenReturn(new TopicPartition(topic2, 0))
|
||||||
when(mockLog.highWatermark).thenReturn(highHW)
|
when(mockLog.highWatermark).thenReturn(highHW)
|
||||||
when(mockLog.updateHighWatermark(anyLong())).thenReturn(0L)
|
when(mockLog.updateHighWatermark(anyLong())).thenReturn(0L)
|
||||||
when(mockLog.logEndOffsetMetadata).thenReturn(new LogOffsetMetadata(10))
|
when(mockLog.logEndOffsetMetadata).thenReturn(new LogOffsetMetadata(10))
|
||||||
|
|
Loading…
Reference in New Issue