KAFKA-14915: Allow reading from remote storage for multiple partitions in one fetchRequest (#20045)

This PR enables reading remote storage for multiple partitions in one
fetchRequest. The main changes are:
1. In `DelayedRemoteFetch`, we accept multiple remoteFetchTasks and
other metadata now.
2. In `DelayedRemoteFetch`, we'll wait until all remoteFetch done,
either succeeded or failed.
3. In `ReplicaManager#fetchMessage`, we'll create one
`DelayedRemoteFetch` and pass multiple remoteFetch metadata to it, and
watch all of them.
4. Added tests

Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Federico Valeri <fedevaleri@gmail.com>, Satish Duggana <satishd@apache.org>
This commit is contained in:
Luke Chen 2025-07-14 22:12:08 +08:00 committed by GitHub
parent 29cf97b9ad
commit e1ff387605
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 494 additions and 128 deletions

View File

@ -197,9 +197,7 @@ public class ConsumerConfig extends AbstractConfig {
"this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. " +
"The maximum record batch size accepted by the broker is defined via <code>message.max.bytes</code> (broker config) or " +
"<code>max.message.bytes</code> (topic config). A fetch request consists of many partitions, and there is another setting that controls how much " +
"data is returned for each partition in a fetch request - see <code>max.partition.fetch.bytes</code>. Note that there is a current limitation when " +
"performing remote reads from tiered storage (KIP-405) - only one partition out of the fetch request is fetched from the remote store (KAFKA-14915). " +
"Note also that the consumer performs multiple fetches in parallel.";
"data is returned for each partition in a fetch request - see <code>max.partition.fetch.bytes</code>. Note that the consumer performs multiple fetches in parallel.";
public static final int DEFAULT_FETCH_MAX_BYTES = 50 * 1024 * 1024;
/**
@ -224,9 +222,7 @@ public class ConsumerConfig extends AbstractConfig {
"partition of the fetch is larger than this limit, the " +
"batch will still be returned to ensure that the consumer can make progress. The maximum record batch size " +
"accepted by the broker is defined via <code>message.max.bytes</code> (broker config) or " +
"<code>max.message.bytes</code> (topic config). See " + FETCH_MAX_BYTES_CONFIG + " for limiting the consumer request size. " +
"Consider increasing <code>max.partition.fetch.bytes</code> especially in the cases of remote storage reads (KIP-405), because currently only " +
"one partition per fetch request is served from the remote store (KAFKA-14915).";
"<code>max.message.bytes</code> (topic config). See " + FETCH_MAX_BYTES_CONFIG + " for limiting the consumer request size.";
public static final int DEFAULT_MAX_PARTITION_FETCH_BYTES = 1 * 1024 * 1024;
/** <code>send.buffer.bytes</code> */

View File

@ -28,6 +28,7 @@ import org.apache.kafka.server.purgatory.DelayedOperation
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.storage.internals.log.{LogOffsetMetadata, RemoteLogReadResult, RemoteStorageFetchInfo}
import java.util
import java.util.concurrent.{CompletableFuture, Future, TimeUnit}
import java.util.{Optional, OptionalInt, OptionalLong}
import scala.collection._
@ -36,9 +37,9 @@ import scala.collection._
* A remote fetch operation that can be created by the replica manager and watched
* in the remote fetch operation purgatory
*/
class DelayedRemoteFetch(remoteFetchTask: Future[Void],
remoteFetchResult: CompletableFuture[RemoteLogReadResult],
remoteFetchInfo: RemoteStorageFetchInfo,
class DelayedRemoteFetch(remoteFetchTasks: util.Map[TopicIdPartition, Future[Void]],
remoteFetchResults: util.Map[TopicIdPartition, CompletableFuture[RemoteLogReadResult]],
remoteFetchInfos: util.Map[TopicIdPartition, RemoteStorageFetchInfo],
remoteFetchMaxWaitMs: Long,
fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)],
fetchParams: FetchParams,
@ -56,7 +57,7 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
*
* Case a: This broker is no longer the leader of the partition it tries to fetch
* Case b: This broker does not know the partition it tries to fetch
* Case c: The remote storage read request completed (succeeded or failed)
* Case c: All the remote storage read request completed (succeeded or failed)
* Case d: The partition is in an offline log directory on this broker
*
* Upon completion, should return whatever data is available for each valid partition
@ -81,7 +82,8 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
return forceComplete()
}
}
if (remoteFetchResult.isDone) // Case c
// Case c
if (remoteFetchResults.values().stream().allMatch(taskResult => taskResult.isDone))
forceComplete()
else
false
@ -90,8 +92,13 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
override def onExpiration(): Unit = {
// cancel the remote storage read task, if it has not been executed yet and
// avoid interrupting the task if it is already running as it may force closing opened/cached resources as transaction index.
val cancelled = remoteFetchTask.cancel(false)
if (!cancelled) debug(s"Remote fetch task for RemoteStorageFetchInfo: $remoteFetchInfo could not be cancelled and its isDone value is ${remoteFetchTask.isDone}")
remoteFetchTasks.forEach { (topicIdPartition, task) =>
if (task != null && !task.isDone) {
if (!task.cancel(false)) {
debug(s"Remote fetch task for remoteFetchInfo: ${remoteFetchInfos.get(topicIdPartition)} could not be cancelled.")
}
}
}
DelayedRemoteFetchMetrics.expiredRequestMeter.mark()
}
@ -101,7 +108,8 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
*/
override def onComplete(): Unit = {
val fetchPartitionData = localReadResults.map { case (tp, result) =>
if (tp.topicPartition().equals(remoteFetchInfo.topicPartition)
val remoteFetchResult = remoteFetchResults.get(tp)
if (remoteFetchInfos.containsKey(tp)
&& remoteFetchResult.isDone
&& result.error == Errors.NONE
&& result.info.delayedRemoteStorageFetch.isPresent) {

View File

@ -1579,15 +1579,18 @@ class ReplicaManager(val config: KafkaConfig,
}
/**
* Returns [[LogReadResult]] with error if a task for RemoteStorageFetchInfo could not be scheduled successfully
* else returns [[None]].
* Initiates an asynchronous remote storage fetch operation for the given remote fetch information.
*
* This method schedules a remote fetch task with the remote log manager and sets up the necessary
* completion handling for the operation. The remote fetch result will be used to populate the
* delayed remote fetch purgatory when completed.
*
* @param remoteFetchInfo The remote storage fetch information
*
* @return A tuple containing the remote fetch task and the remote fetch result
*/
private def processRemoteFetch(remoteFetchInfo: RemoteStorageFetchInfo,
params: FetchParams,
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit,
logReadResults: Seq[(TopicIdPartition, LogReadResult)],
fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Option[LogReadResult] = {
val key = new TopicPartitionOperationKey(remoteFetchInfo.topicPartition.topic(), remoteFetchInfo.topicPartition.partition())
private def processRemoteFetch(remoteFetchInfo: RemoteStorageFetchInfo): (Future[Void], CompletableFuture[RemoteLogReadResult]) = {
val key = new TopicPartitionOperationKey(remoteFetchInfo.topicIdPartition)
val remoteFetchResult = new CompletableFuture[RemoteLogReadResult]
var remoteFetchTask: Future[Void] = null
try {
@ -1597,31 +1600,39 @@ class ReplicaManager(val config: KafkaConfig,
})
} catch {
case e: RejectedExecutionException =>
// Return the error if any in scheduling the remote fetch task
warn("Unable to fetch data from remote storage", e)
return Some(createLogReadResult(e))
warn(s"Unable to fetch data from remote storage for remoteFetchInfo: $remoteFetchInfo", e)
// Store the error in RemoteLogReadResult if any in scheduling the remote fetch task.
// It will be sent back to the client in DelayedRemoteFetch along with other successful remote fetch results.
remoteFetchResult.complete(new RemoteLogReadResult(Optional.empty, Optional.of(e)))
}
(remoteFetchTask, remoteFetchResult)
}
/**
* Process all remote fetches by creating async read tasks and handling them in DelayedRemoteFetch collectively.
*/
private def processRemoteFetches(remoteFetchInfos: util.HashMap[TopicIdPartition, RemoteStorageFetchInfo],
params: FetchParams,
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit,
logReadResults: Seq[(TopicIdPartition, LogReadResult)],
remoteFetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = {
val remoteFetchTasks = new util.HashMap[TopicIdPartition, Future[Void]]
val remoteFetchResults = new util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]]
remoteFetchInfos.forEach { (topicIdPartition, remoteFetchInfo) =>
val (task, result) = processRemoteFetch(remoteFetchInfo)
remoteFetchTasks.put(topicIdPartition, task)
remoteFetchResults.put(topicIdPartition, result)
}
val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong
val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs,
fetchPartitionStatus, params, logReadResults, this, responseCallback)
delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, util.Collections.singletonList(key))
None
}
val remoteFetch = new DelayedRemoteFetch(remoteFetchTasks, remoteFetchResults, remoteFetchInfos, remoteFetchMaxWaitMs,
remoteFetchPartitionStatus, params, logReadResults, this, responseCallback)
private def buildPartitionToFetchPartitionData(logReadResults: Seq[(TopicIdPartition, LogReadResult)],
remoteFetchTopicPartition: TopicPartition,
error: LogReadResult): Seq[(TopicIdPartition, FetchPartitionData)] = {
logReadResults.map { case (tp, result) =>
val fetchPartitionData = {
if (tp.topicPartition().equals(remoteFetchTopicPartition))
error
else
result
}.toFetchPartitionData(false)
tp -> fetchPartitionData
}
// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
val delayedFetchKeys = remoteFetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList
delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, delayedFetchKeys.asJava)
}
/**
@ -1639,8 +1650,8 @@ class ReplicaManager(val config: KafkaConfig,
var bytesReadable: Long = 0
var errorReadingData = false
// The 1st topic-partition that has to be read from remote storage
var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty()
// topic-partitions that have to be read from remote storage
val remoteFetchInfos = new util.HashMap[TopicIdPartition, RemoteStorageFetchInfo]()
var hasDivergingEpoch = false
var hasPreferredReadReplica = false
@ -1651,8 +1662,8 @@ class ReplicaManager(val config: KafkaConfig,
brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
if (logReadResult.error != Errors.NONE)
errorReadingData = true
if (!remoteFetchInfo.isPresent && logReadResult.info.delayedRemoteStorageFetch.isPresent) {
remoteFetchInfo = logReadResult.info.delayedRemoteStorageFetch
if (logReadResult.info.delayedRemoteStorageFetch.isPresent) {
remoteFetchInfos.put(topicIdPartition, logReadResult.info.delayedRemoteStorageFetch.get())
}
if (logReadResult.divergingEpoch.isPresent)
hasDivergingEpoch = true
@ -1669,7 +1680,7 @@ class ReplicaManager(val config: KafkaConfig,
// 4) some error happens while reading data
// 5) we found a diverging epoch
// 6) has a preferred read replica
if (!remoteFetchInfo.isPresent && (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData ||
if (remoteFetchInfos.isEmpty && (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData ||
hasDivergingEpoch || hasPreferredReadReplica)) {
val fetchPartitionData = logReadResults.map { case (tp, result) =>
val isReassignmentFetch = params.isFromFollower && isAddingReplica(tp.topicPartition, params.replicaId)
@ -1686,15 +1697,8 @@ class ReplicaManager(val config: KafkaConfig,
})
}
if (remoteFetchInfo.isPresent) {
val maybeLogReadResultWithError = processRemoteFetch(remoteFetchInfo.get(), params, responseCallback, logReadResults, fetchPartitionStatus)
if (maybeLogReadResultWithError.isDefined) {
// If there is an error in scheduling the remote fetch task, return what we currently have
// (the data read from local log segment for the other topic-partitions) and an error for the topic-partition
// that we couldn't read from remote storage
val partitionToFetchPartitionData = buildPartitionToFetchPartitionData(logReadResults, remoteFetchInfo.get().topicPartition, maybeLogReadResultWithError.get)
responseCallback(partitionToFetchPartitionData)
}
if (!remoteFetchInfos.isEmpty) {
processRemoteFetches(remoteFetchInfos, params, responseCallback, logReadResults, fetchPartitionStatus.toSeq)
} else {
// If there is not enough data to respond and there is no remote data, we will let the fetch request
// wait for new data.
@ -1902,9 +1906,9 @@ class ReplicaManager(val config: KafkaConfig,
)
} else {
// For consume fetch requests, create a dummy FetchDataInfo with the remote storage fetch information.
// For the first topic-partition that needs remote data, we will use this information to read the data in another thread.
// For the topic-partitions that need remote data, we will use this information to read the data in another thread.
new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, Optional.empty(),
Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes, minOneMessage, tp.topicPartition(),
Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes, minOneMessage, tp,
fetchInfo, params.isolation)))
}

View File

@ -16,6 +16,7 @@
*/
package kafka.server
import com.yammer.metrics.core.Meter
import kafka.cluster.Partition
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.common.protocol.Errors
@ -28,9 +29,10 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPa
import org.apache.kafka.storage.internals.log._
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.Mockito.{mock, verify, when}
import org.mockito.ArgumentMatchers.anyBoolean
import org.mockito.Mockito.{mock, never, verify, when}
import java.util.{Optional, OptionalLong}
import java.util.{Collections, Optional, OptionalLong}
import java.util.concurrent.{CompletableFuture, Future}
import scala.collection._
import scala.jdk.CollectionConverters._
@ -39,6 +41,7 @@ class DelayedRemoteFetchTest {
private val maxBytes = 1024
private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
private val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
private val topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), 0, "topic2")
private val fetchOffset = 500L
private val logStartOffset = 0L
private val currentLeaderEpoch = Optional.of[Integer](10)
@ -61,14 +64,22 @@ class DelayedRemoteFetchTest {
}
val future: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]()
future.complete(null)
val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition.topicPartition(), null, null)
future.complete(buildRemoteReadResult(Errors.NONE))
val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null)
val highWatermark = 100
val leaderLogStartOffset = 10
val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset)
val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs,
Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
val delayedRemoteFetch = new DelayedRemoteFetch(
java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](),
java.util.Collections.singletonMap(topicIdPartition, future),
java.util.Collections.singletonMap(topicIdPartition, fetchInfo),
remoteFetchMaxWaitMs,
Seq(topicIdPartition -> fetchStatus),
fetchParams,
Seq(topicIdPartition -> logReadInfo),
replicaManager,
callback)
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
.thenReturn(mock(classOf[Partition]))
@ -97,14 +108,23 @@ class DelayedRemoteFetchTest {
}
val future: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]()
future.complete(null)
val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition.topicPartition(), null, null)
future.complete(buildRemoteReadResult(Errors.NONE))
val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null)
val highWatermark = 100
val leaderLogStartOffset = 10
val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset)
val fetchParams = buildFetchParams(replicaId = 1, maxWaitMs = 500)
assertThrows(classOf[IllegalStateException], () => new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs,
Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback))
assertThrows(classOf[IllegalStateException], () => new DelayedRemoteFetch(
java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](),
java.util.Collections.singletonMap(topicIdPartition, future),
java.util.Collections.singletonMap(topicIdPartition, fetchInfo),
remoteFetchMaxWaitMs,
Seq(topicIdPartition -> fetchStatus),
fetchParams,
Seq(topicIdPartition -> logReadInfo),
replicaManager,
callback))
}
@Test
@ -123,12 +143,20 @@ class DelayedRemoteFetchTest {
.thenThrow(new NotLeaderOrFollowerException(s"Replica for $topicIdPartition not available"))
val future: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]()
val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition.topicPartition(), null, null)
val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null)
val logReadInfo = buildReadResult(Errors.NONE)
val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs,
Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
val delayedRemoteFetch = new DelayedRemoteFetch(
java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](),
java.util.Collections.singletonMap(topicIdPartition, future),
java.util.Collections.singletonMap(topicIdPartition, fetchInfo),
remoteFetchMaxWaitMs,
Seq(topicIdPartition -> fetchStatus),
fetchParams,
Seq(topicIdPartition -> logReadInfo),
replicaManager,
callback)
// delayed remote fetch should still be able to complete
assertTrue(delayedRemoteFetch.tryComplete())
@ -152,14 +180,22 @@ class DelayedRemoteFetchTest {
.thenReturn(mock(classOf[Partition]))
val future: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]()
future.complete(null)
val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition.topicPartition(), null, null)
future.complete(buildRemoteReadResult(Errors.NONE))
val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null)
// build a read result with error
val logReadInfo = buildReadResult(Errors.FENCED_LEADER_EPOCH)
val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs,
Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
val delayedRemoteFetch = new DelayedRemoteFetch(
java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](),
java.util.Collections.singletonMap(topicIdPartition, future),
java.util.Collections.singletonMap(topicIdPartition, fetchInfo),
remoteFetchMaxWaitMs,
Seq(topicIdPartition -> fetchStatus),
fetchParams,
Seq(topicIdPartition -> logReadInfo),
replicaManager,
callback)
assertTrue(delayedRemoteFetch.tryComplete())
assertTrue(delayedRemoteFetch.isCompleted)
@ -170,52 +206,262 @@ class DelayedRemoteFetchTest {
@Test
def testRequestExpiry(): Unit = {
var actualTopicPartition: Option[TopicIdPartition] = None
var fetchResultOpt: Option[FetchPartitionData] = None
val responses = mutable.Map[TopicIdPartition, FetchPartitionData]()
def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = {
assertEquals(1, responses.size)
actualTopicPartition = Some(responses.head._1)
fetchResultOpt = Some(responses.head._2)
def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = {
responseSeq.foreach { case (tp, data) =>
responses.put(tp, data)
}
}
def expiresPerSecValue(): Double = {
val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
val metric = allMetrics.find { case (n, _) => n.getMBeanName.endsWith("kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec") }
if (metric.isEmpty)
0
else
metric.get._2.asInstanceOf[Meter].count
}
val remoteFetchTaskExpired = mock(classOf[Future[Void]])
val remoteFetchTask2 = mock(classOf[Future[Void]])
// complete the 2nd task, and keep the 1st one expired
when(remoteFetchTask2.isDone).thenReturn(true)
// Create futures - one completed, one not
val future1: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]()
val future2: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]()
// Only complete one remote fetch
future2.complete(buildRemoteReadResult(Errors.NONE))
val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null)
val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2, null, null)
val highWatermark = 100
val leaderLogStartOffset = 10
val remoteFetchTask = mock(classOf[Future[Void]])
val future: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]()
val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition.topicPartition(), null, null)
val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset)
val logReadInfo1 = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset)
val logReadInfo2 = buildReadResult(Errors.NONE)
val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future, fetchInfo, remoteFetchMaxWaitMs,
Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
val fetchStatus1 = FetchPartitionStatus(
startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
val fetchStatus2 = FetchPartitionStatus(
startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100),
fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 100, logStartOffset, maxBytes, currentLeaderEpoch))
// Set up maps for multiple partitions
val remoteFetchTasks = new java.util.HashMap[TopicIdPartition, Future[Void]]()
val remoteFetchResults = new java.util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]]()
val remoteFetchInfos = new java.util.HashMap[TopicIdPartition, RemoteStorageFetchInfo]()
remoteFetchTasks.put(topicIdPartition, remoteFetchTaskExpired)
remoteFetchTasks.put(topicIdPartition2, remoteFetchTask2)
remoteFetchResults.put(topicIdPartition, future1)
remoteFetchResults.put(topicIdPartition2, future2)
remoteFetchInfos.put(topicIdPartition, fetchInfo1)
remoteFetchInfos.put(topicIdPartition2, fetchInfo2)
val delayedRemoteFetch = new DelayedRemoteFetch(
remoteFetchTasks,
remoteFetchResults,
remoteFetchInfos,
remoteFetchMaxWaitMs,
Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2),
fetchParams,
Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2),
replicaManager,
callback)
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
.thenReturn(mock(classOf[Partition]))
when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition))
.thenReturn(mock(classOf[Partition]))
// Verify that the ExpiresPerSec metric is zero before fetching
val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
assertEquals(0, metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec"))
val existingMetricVal = expiresPerSecValue()
// Verify the delayedRemoteFetch is not completed yet
assertFalse(delayedRemoteFetch.isCompleted)
// Force the delayed remote fetch to expire
delayedRemoteFetch.run()
// Check that the task was cancelled and force-completed
verify(remoteFetchTask).cancel(false)
// Check that the expired task was cancelled and force-completed
verify(remoteFetchTaskExpired).cancel(anyBoolean())
verify(remoteFetchTask2, never()).cancel(anyBoolean())
assertTrue(delayedRemoteFetch.isCompleted)
// Check that the ExpiresPerSec metric was incremented
assertEquals(1, metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec"))
assertTrue(expiresPerSecValue() > existingMetricVal)
// Fetch results should still include local read results
assertTrue(actualTopicPartition.isDefined)
assertEquals(topicIdPartition, actualTopicPartition.get)
assertTrue(fetchResultOpt.isDefined)
// Fetch results should include 2 results and the expired one should return local read results
assertEquals(2, responses.size)
assertTrue(responses.contains(topicIdPartition))
assertTrue(responses.contains(topicIdPartition2))
val fetchResult = fetchResultOpt.get
assertEquals(Errors.NONE, fetchResult.error)
assertEquals(highWatermark, fetchResult.highWatermark)
assertEquals(leaderLogStartOffset, fetchResult.logStartOffset)
assertEquals(Errors.NONE, responses(topicIdPartition).error)
assertEquals(highWatermark, responses(topicIdPartition).highWatermark)
assertEquals(leaderLogStartOffset, responses(topicIdPartition).logStartOffset)
assertEquals(Errors.NONE, responses(topicIdPartition2).error)
}
@Test
def testMultiplePartitions(): Unit = {
val responses = mutable.Map[TopicIdPartition, FetchPartitionData]()
def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = {
responseSeq.foreach { case (tp, data) =>
responses.put(tp, data)
}
}
// Create futures - one completed, one not
val future1: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]()
val future2: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]()
// Only complete one remote fetch
future1.complete(buildRemoteReadResult(Errors.NONE))
val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null)
val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2, null, null)
val highWatermark1 = 100
val leaderLogStartOffset1 = 10
val highWatermark2 = 200
val leaderLogStartOffset2 = 20
val logReadInfo1 = buildReadResult(Errors.NONE, 100, 10)
val logReadInfo2 = buildReadResult(Errors.NONE, 200, 20)
val fetchStatus1 = FetchPartitionStatus(
startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
val fetchStatus2 = FetchPartitionStatus(
startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100),
fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 100, logStartOffset, maxBytes, currentLeaderEpoch))
// Set up maps for multiple partitions
val remoteFetchResults = new java.util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]]()
val remoteFetchInfos = new java.util.HashMap[TopicIdPartition, RemoteStorageFetchInfo]()
remoteFetchResults.put(topicIdPartition, future1)
remoteFetchResults.put(topicIdPartition2, future2)
remoteFetchInfos.put(topicIdPartition, fetchInfo1)
remoteFetchInfos.put(topicIdPartition2, fetchInfo2)
val delayedRemoteFetch = new DelayedRemoteFetch(
Collections.emptyMap[TopicIdPartition, Future[Void]](),
remoteFetchResults,
remoteFetchInfos,
remoteFetchMaxWaitMs,
Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2),
fetchParams,
Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2),
replicaManager,
callback)
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
.thenReturn(mock(classOf[Partition]))
when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition))
.thenReturn(mock(classOf[Partition]))
// Should not complete since future2 is not done
assertFalse(delayedRemoteFetch.tryComplete())
assertFalse(delayedRemoteFetch.isCompleted)
// Complete future2
future2.complete(buildRemoteReadResult(Errors.NONE))
// Now it should complete
assertTrue(delayedRemoteFetch.tryComplete())
assertTrue(delayedRemoteFetch.isCompleted)
// Verify both partitions were processed without error
assertEquals(2, responses.size)
assertTrue(responses.contains(topicIdPartition))
assertTrue(responses.contains(topicIdPartition2))
assertEquals(Errors.NONE, responses(topicIdPartition).error)
assertEquals(highWatermark1, responses(topicIdPartition).highWatermark)
assertEquals(leaderLogStartOffset1, responses(topicIdPartition).logStartOffset)
assertEquals(Errors.NONE, responses(topicIdPartition2).error)
assertEquals(highWatermark2, responses(topicIdPartition2).highWatermark)
assertEquals(leaderLogStartOffset2, responses(topicIdPartition2).logStartOffset)
}
@Test
def testMultiplePartitionsWithFailedResults(): Unit = {
val responses = mutable.Map[TopicIdPartition, FetchPartitionData]()
def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = {
responseSeq.foreach { case (tp, data) =>
responses.put(tp, data)
}
}
// Create futures - one successful, one with error
val future1: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]()
val future2: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]()
// Created 1 successful result and 1 failed result
future1.complete(buildRemoteReadResult(Errors.NONE))
future2.complete(buildRemoteReadResult(Errors.UNKNOWN_SERVER_ERROR))
val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null)
val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2, null, null)
val logReadInfo1 = buildReadResult(Errors.NONE, 100, 10)
val logReadInfo2 = buildReadResult(Errors.NONE, 200, 20)
val fetchStatus1 = FetchPartitionStatus(
startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
val fetchStatus2 = FetchPartitionStatus(
startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100),
fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 100, logStartOffset, maxBytes, currentLeaderEpoch))
// Set up maps for multiple partitions
val remoteFetchResults = new java.util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]]()
val remoteFetchInfos = new java.util.HashMap[TopicIdPartition, RemoteStorageFetchInfo]()
remoteFetchResults.put(topicIdPartition, future1)
remoteFetchResults.put(topicIdPartition2, future2)
remoteFetchInfos.put(topicIdPartition, fetchInfo1)
remoteFetchInfos.put(topicIdPartition2, fetchInfo2)
val delayedRemoteFetch = new DelayedRemoteFetch(
Collections.emptyMap[TopicIdPartition, Future[Void]](),
remoteFetchResults,
remoteFetchInfos,
remoteFetchMaxWaitMs,
Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2),
fetchParams,
Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2),
replicaManager,
callback)
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
.thenReturn(mock(classOf[Partition]))
when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition))
.thenReturn(mock(classOf[Partition]))
assertTrue(delayedRemoteFetch.tryComplete())
assertTrue(delayedRemoteFetch.isCompleted)
// Verify both partitions were processed
assertEquals(2, responses.size)
assertTrue(responses.contains(topicIdPartition))
assertTrue(responses.contains(topicIdPartition2))
// First partition should be successful
val fetchResult1 = responses(topicIdPartition)
assertEquals(Errors.NONE, fetchResult1.error)
// Second partition should have an error due to remote fetch failure
val fetchResult2 = responses(topicIdPartition2)
assertEquals(Errors.UNKNOWN_SERVER_ERROR, fetchResult2.error)
}
private def buildFetchParams(replicaId: Int,
@ -235,7 +481,8 @@ class DelayedRemoteFetchTest {
highWatermark: Int = 0,
leaderLogStartOffset: Int = 0): LogReadResult = {
new LogReadResult(
new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY, false, Optional.empty(),
Optional.of(mock(classOf[RemoteStorageFetchInfo]))),
Optional.empty(),
highWatermark,
leaderLogStartOffset,
@ -246,4 +493,9 @@ class DelayedRemoteFetchTest {
if (error != Errors.NONE) Optional.of[Throwable](error.exception) else Optional.empty[Throwable]())
}
private def buildRemoteReadResult(error: Errors): RemoteLogReadResult = {
new RemoteLogReadResult(
Optional.of(new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY)),
if (error != Errors.NONE) Optional.of[Throwable](error.exception) else Optional.empty[Throwable]())
}
}

View File

@ -77,7 +77,7 @@ import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegments, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegments, ProducerStateManager, ProducerStateManagerConfig, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeEach, Test}
@ -94,8 +94,8 @@ import java.net.InetAddress
import java.nio.file.{Files, Paths}
import java.util
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
import java.util.concurrent.{Callable, CompletableFuture, ConcurrentHashMap, CountDownLatch, TimeUnit}
import java.util.function.BiConsumer
import java.util.concurrent.{Callable, CompletableFuture, ConcurrentHashMap, CountDownLatch, Future, TimeUnit}
import java.util.function.{BiConsumer, Consumer}
import java.util.stream.IntStream
import java.util.{Collections, Optional, OptionalLong, Properties}
import scala.collection.{Map, Seq, mutable}
@ -3390,7 +3390,7 @@ class ReplicaManagerTest {
} else {
verify(mockRemoteLogManager).asyncRead(remoteStorageFetchInfoArg.capture(), any())
val remoteStorageFetchInfo = remoteStorageFetchInfoArg.getValue
assertEquals(tp0, remoteStorageFetchInfo.topicPartition)
assertEquals(tp0, remoteStorageFetchInfo.topicIdPartition.topicPartition)
assertEquals(fetchOffset, remoteStorageFetchInfo.fetchInfo.fetchOffset)
assertEquals(topicId, remoteStorageFetchInfo.fetchInfo.topicId)
assertEquals(startOffset, remoteStorageFetchInfo.fetchInfo.logStartOffset)
@ -3594,6 +3594,109 @@ class ReplicaManagerTest {
}
}
@Test
def testMultipleRemoteFetchesInOneFetchRequest(): Unit = {
val replicaId = -1
val tp0 = new TopicPartition(topic, 0)
val tp1 = new TopicPartition(topic, 1)
val tidp0 = new TopicIdPartition(topicId, tp0)
val tidp1 = new TopicIdPartition(topicId, tp1)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteFetchQuotaExceeded = Some(false))
try {
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
val leaderEpoch = 0
val leaderDelta0 = createLeaderDelta(topicId, tp0, leaderId = 0, leaderEpoch = leaderEpoch)
val leaderDelta1 = createLeaderDelta(topicId, tp1, leaderId = 0, leaderEpoch = leaderEpoch)
val leaderMetadataImage0 = imageFromTopics(leaderDelta0.apply())
val leaderMetadataImage1 = imageFromTopics(leaderDelta1.apply())
replicaManager.applyDelta(leaderDelta0, leaderMetadataImage0)
replicaManager.applyDelta(leaderDelta1, leaderMetadataImage1)
val params = new FetchParams(replicaId, 1, 1000, 10, 100, FetchIsolation.LOG_END, Optional.empty)
val fetchOffsetTp0 = 1
val fetchOffsetTp1 = 2
val responseSeq = new AtomicReference[Seq[(TopicIdPartition, FetchPartitionData)]]()
val responseLatch = new CountDownLatch(1)
def fetchCallback(responseStatus: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = {
responseSeq.set(responseStatus)
responseLatch.countDown()
}
val callbacks: util.Set[Consumer[RemoteLogReadResult]] = new util.HashSet[Consumer[RemoteLogReadResult]]()
when(mockRemoteLogManager.asyncRead(any(), any())).thenAnswer(ans => {
callbacks.add(ans.getArgument(1, classOf[Consumer[RemoteLogReadResult]]))
mock(classOf[Future[Void]])
})
// Start the fetch request for both partitions - this should trigger remote fetches since
// the default mocked log behavior throws OffsetOutOfRangeException
replicaManager.fetchMessages(params, Seq(
tidp0 -> new PartitionData(topicId, fetchOffsetTp0, startOffset, 100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch)),
tidp1 -> new PartitionData(topicId, fetchOffsetTp1, startOffset, 100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))
), UNBOUNDED_QUOTA, fetchCallback)
// Verify that exactly two asyncRead calls were made (one for each partition)
val remoteStorageFetchInfoArg: ArgumentCaptor[RemoteStorageFetchInfo] = ArgumentCaptor.forClass(classOf[RemoteStorageFetchInfo])
verify(mockRemoteLogManager, times(2)).asyncRead(remoteStorageFetchInfoArg.capture(), any())
// Verify that remote fetch operations were properly set up for both partitions
assertTrue(replicaManager.delayedRemoteFetchPurgatory.watched == 2, "DelayedRemoteFetch purgatory should have operations")
// Verify both partitions were captured in the remote fetch requests
val capturedFetchInfos = remoteStorageFetchInfoArg.getAllValues.asScala
assertEquals(2, capturedFetchInfos.size, "Should have 2 remote storage fetch info calls")
val capturedTopicPartitions = capturedFetchInfos.map(_.topicIdPartition.topicPartition).toSet
assertTrue(capturedTopicPartitions.contains(tp0), "Should contain " + tp0)
assertTrue(capturedTopicPartitions.contains(tp1), "Should contain " + tp1)
// Verify the fetch info details are correct for both partitions
capturedFetchInfos.foreach { fetchInfo =>
assertEquals(topicId, fetchInfo.fetchInfo.topicId)
assertEquals(startOffset, fetchInfo.fetchInfo.logStartOffset)
assertEquals(leaderEpoch, fetchInfo.fetchInfo.currentLeaderEpoch.get())
if (fetchInfo.topicIdPartition.topicPartition == tp0) {
assertEquals(fetchOffsetTp0, fetchInfo.fetchInfo.fetchOffset)
} else {
assertEquals(fetchOffsetTp1, fetchInfo.fetchInfo.fetchOffset)
}
}
// Complete the 2 asyncRead tasks
callbacks.forEach(callback => callback.accept(buildRemoteReadResult(Errors.NONE)))
// Wait for the fetch callback to complete and verify responseSeq content
assertTrue(responseLatch.await(5, TimeUnit.SECONDS), "Fetch callback should complete")
val responseData = responseSeq.get()
assertNotNull(responseData, "Response sequence should not be null")
assertEquals(2, responseData.size, "Response should contain data for both partitions")
// Verify that response contains both tidp0 and tidp1 and have no errors
val responseTopicIdPartitions = responseData.map(_._1).toSet
assertTrue(responseTopicIdPartitions.contains(tidp0), "Response should contain " + tidp0)
assertTrue(responseTopicIdPartitions.contains(tidp1), "Response should contain " + tidp1)
responseData.foreach { case (_, fetchPartitionData) =>
assertEquals(Errors.NONE, fetchPartitionData.error)
}
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}
private def buildRemoteReadResult(error: Errors): RemoteLogReadResult = {
new RemoteLogReadResult(
Optional.of(new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY)),
if (error != Errors.NONE) Optional.of[Throwable](error.exception) else Optional.empty[Throwable]())
}
private def yammerMetricValue(name: String): Any = {
val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
val (_, metric) = allMetrics.find { case (n, _) => n.getMBeanName.endsWith(name) }

View File

@ -4358,7 +4358,6 @@ $ bin/kafka-topics.sh --create --topic tieredTopic --bootstrap-server localhost:
<li>Disabling tiered storage on all topics where it is enabled is required before disabling tiered storage at the broker level</li>
<li>Admin actions related to tiered storage feature are only supported on clients from version 3.0 onwards</li>
<li>No support for log segments missing producer snapshot file. It can happen when topic is created before v2.8.0.</li>
<li>Only one partition per fetch request is served from the remote store. This limitation can become a bottleneck for consumer client throughput - consider configuring <code>max.partition.fetch.bytes</code> appropriately.</li>
</ul>
<p>For more information, please check <a href="https://cwiki.apache.org/confluence/x/9xDOEg">Kafka Tiered Storage GA Release Notes</a>.

View File

@ -199,7 +199,7 @@ public class DelayedOperationPurgatory<T extends DelayedOperation> {
}
/**
* Return the total size of watch lists the purgatory. Since an operation may be watched
* Return the total size of watch lists in the purgatory. Since an operation may be watched
* on multiple lists, and some of its watched entries may still be in the watch lists
* even when it has been completed, this number may be larger than the number of real operations watched
*/

View File

@ -1659,7 +1659,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader {
public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException {
int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
TopicPartition tp = remoteStorageFetchInfo.topicPartition;
TopicPartition tp = remoteStorageFetchInfo.topicIdPartition.topicPartition();
FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo;
boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED;
@ -1715,6 +1715,8 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader {
// - there is no minimum-one-message constraint and
// - the first batch size is more than maximum bytes that can be sent and
if (!remoteStorageFetchInfo.minOneMessage && firstBatchSize > maxBytes) {
LOGGER.debug("Returning empty record for offset {} in partition {} because the first batch size {} " +
"is greater than max fetch bytes {}", offset, tp, firstBatchSize, maxBytes);
return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY);
}

View File

@ -192,8 +192,8 @@ public final class RemoteLogManagerConfig {
public static final int DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS = 1;
public static final String REMOTE_FETCH_MAX_WAIT_MS_PROP = "remote.fetch.max.wait.ms";
public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will wait before answering the remote fetch request. " +
"Note that the broker currently only fetches one partition per fetch request from the remote store. (KAFKA-14915)";
public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will wait before answering the fetch request containing remote fetch partitions. " +
"It's important to be aware that the request will only be responded after all remote partitions have been successfully fetched, have failed, or this timeout is exceeded.";
public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 500;
public static final String REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP = "remote.list.offsets.request.timeout.ms";

View File

@ -51,7 +51,7 @@ public class RemoteLogReader implements Callable<Void> {
this.rlm = rlm;
this.brokerTopicStats = brokerTopicStats;
this.callback = callback;
this.brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteFetchRequestRate().mark();
this.brokerTopicStats.topicStats(fetchInfo.topicIdPartition.topic()).remoteFetchRequestRate().mark();
this.brokerTopicStats.allTopicsStats().remoteFetchRequestRate().mark();
this.quotaManager = quotaManager;
this.remoteReadTimer = remoteReadTimer;
@ -61,20 +61,20 @@ public class RemoteLogReader implements Callable<Void> {
public Void call() {
RemoteLogReadResult result;
try {
LOGGER.debug("Reading records from remote storage for topic partition {}", fetchInfo.topicPartition);
LOGGER.debug("Reading records from remote storage for topic partition {}", fetchInfo.topicIdPartition);
FetchDataInfo fetchDataInfo = remoteReadTimer.time(() -> rlm.read(fetchInfo));
brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes());
brokerTopicStats.topicStats(fetchInfo.topicIdPartition.topic()).remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes());
brokerTopicStats.allTopicsStats().remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes());
result = new RemoteLogReadResult(Optional.of(fetchDataInfo), Optional.empty());
} catch (OffsetOutOfRangeException e) {
result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
} catch (Exception e) {
brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).failedRemoteFetchRequestRate().mark();
brokerTopicStats.topicStats(fetchInfo.topicIdPartition.topic()).failedRemoteFetchRequestRate().mark();
brokerTopicStats.allTopicsStats().failedRemoteFetchRequestRate().mark();
LOGGER.error("Error occurred while reading the remote data for {}", fetchInfo.topicPartition, e);
LOGGER.error("Error occurred while reading the remote data for {}", fetchInfo.topicIdPartition, e);
result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
}
LOGGER.debug("Finished reading records from remote storage for topic partition {}", fetchInfo.topicPartition);
LOGGER.debug("Finished reading records from remote storage for topic partition {}", fetchInfo.topicIdPartition);
quotaManager.record(result.fetchDataInfo.map(fetchDataInfo -> fetchDataInfo.records.sizeInBytes()).orElse(0));
callback.accept(result);
return null;

View File

@ -16,7 +16,7 @@
*/
package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.server.storage.log.FetchIsolation;
@ -24,15 +24,15 @@ public class RemoteStorageFetchInfo {
public final int fetchMaxBytes;
public final boolean minOneMessage;
public final TopicPartition topicPartition;
public final TopicIdPartition topicIdPartition;
public final FetchRequest.PartitionData fetchInfo;
public final FetchIsolation fetchIsolation;
public RemoteStorageFetchInfo(int fetchMaxBytes, boolean minOneMessage, TopicPartition topicPartition,
public RemoteStorageFetchInfo(int fetchMaxBytes, boolean minOneMessage, TopicIdPartition topicIdPartition,
FetchRequest.PartitionData fetchInfo, FetchIsolation fetchIsolation) {
this.fetchMaxBytes = fetchMaxBytes;
this.minOneMessage = minOneMessage;
this.topicPartition = topicPartition;
this.topicIdPartition = topicIdPartition;
this.fetchInfo = fetchInfo;
this.fetchIsolation = fetchIsolation;
}
@ -42,7 +42,7 @@ public class RemoteStorageFetchInfo {
return "RemoteStorageFetchInfo{" +
"fetchMaxBytes=" + fetchMaxBytes +
", minOneMessage=" + minOneMessage +
", topicPartition=" + topicPartition +
", topicIdPartition=" + topicIdPartition +
", fetchInfo=" + fetchInfo +
", fetchIsolation=" + fetchIsolation +
'}';

View File

@ -205,6 +205,7 @@ public class RemoteLogManagerTest {
private final TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Follower", 0));
private final Map<String, Uuid> topicIds = new HashMap<>();
private final TopicPartition tp = new TopicPartition("TestTopic", 5);
private final TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp);
private final EpochEntry epochEntry0 = new EpochEntry(0, 0);
private final EpochEntry epochEntry1 = new EpochEntry(1, 100);
private final EpochEntry epochEntry2 = new EpochEntry(2, 200);
@ -3100,7 +3101,7 @@ public class RemoteLogManagerTest {
);
RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(
0, false, tp, partitionData, FetchIsolation.TXN_COMMITTED
0, false, tpId, partitionData, FetchIsolation.TXN_COMMITTED
);
try (RemoteLogManager remoteLogManager = new RemoteLogManager(
@ -3180,7 +3181,7 @@ public class RemoteLogManagerTest {
);
RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(
0, minOneMessage, tp, partitionData, FetchIsolation.HIGH_WATERMARK
0, minOneMessage, tpId, partitionData, FetchIsolation.HIGH_WATERMARK
);
try (RemoteLogManager remoteLogManager = new RemoteLogManager(
@ -3266,7 +3267,7 @@ public class RemoteLogManagerTest {
when(firstBatch.sizeInBytes()).thenReturn(recordBatchSizeInBytes);
doNothing().when(firstBatch).writeTo(capture.capture());
RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(
0, true, tp, partitionData, FetchIsolation.HIGH_WATERMARK
0, true, tpId, partitionData, FetchIsolation.HIGH_WATERMARK
);
@ -3651,7 +3652,7 @@ public class RemoteLogManagerTest {
FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(
Uuid.randomUuid(), fetchOffset, 0, 100, Optional.empty());
RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(
1048576, true, leaderTopicIdPartition.topicPartition(),
1048576, true, leaderTopicIdPartition,
partitionData, FetchIsolation.HIGH_WATERMARK);
FetchDataInfo fetchDataInfo = remoteLogManager.read(remoteStorageFetchInfo);
// firstBatch baseOffset may not be equal to the fetchOffset

View File

@ -18,7 +18,8 @@ package org.apache.kafka.server.log.remote.storage;
import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.server.log.remote.quota.RLMQuotaManager;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
@ -69,7 +70,7 @@ public class RemoteLogReaderTest {
when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenReturn(fetchDataInfo);
Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null);
RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicIdPartition(Uuid.randomUuid(), 0, TOPIC), null, null);
RemoteLogReader remoteLogReader =
new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, brokerTopicStats, mockQuotaManager, timer);
remoteLogReader.call();
@ -102,7 +103,7 @@ public class RemoteLogReaderTest {
when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenThrow(new RuntimeException("error"));
Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null);
RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicIdPartition(Uuid.randomUuid(), 0, TOPIC), null, null);
RemoteLogReader remoteLogReader =
new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, brokerTopicStats, mockQuotaManager, timer);
remoteLogReader.call();