From e1ff38760545b472267aaa60a3422bd8cbf74b6a Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Mon, 14 Jul 2025 22:12:08 +0800 Subject: [PATCH] KAFKA-14915: Allow reading from remote storage for multiple partitions in one fetchRequest (#20045) This PR enables reading remote storage for multiple partitions in one fetchRequest. The main changes are: 1. In `DelayedRemoteFetch`, we accept multiple remoteFetchTasks and other metadata now. 2. In `DelayedRemoteFetch`, we'll wait until all remoteFetch done, either succeeded or failed. 3. In `ReplicaManager#fetchMessage`, we'll create one `DelayedRemoteFetch` and pass multiple remoteFetch metadata to it, and watch all of them. 4. Added tests Reviewers: Kamal Chandraprakash, Federico Valeri , Satish Duggana --- .../clients/consumer/ConsumerConfig.java | 8 +- .../kafka/server/DelayedRemoteFetch.scala | 24 +- .../scala/kafka/server/ReplicaManager.scala | 92 ++--- .../kafka/server/DelayedRemoteFetchTest.scala | 338 +++++++++++++++--- .../kafka/server/ReplicaManagerTest.scala | 111 +++++- docs/ops.html | 1 - .../purgatory/DelayedOperationPurgatory.java | 2 +- .../log/remote/storage/RemoteLogManager.java | 4 +- .../storage/RemoteLogManagerConfig.java | 4 +- .../log/remote/storage/RemoteLogReader.java | 12 +- .../internals/log/RemoteStorageFetchInfo.java | 10 +- .../remote/storage/RemoteLogManagerTest.java | 9 +- .../remote/storage/RemoteLogReaderTest.java | 7 +- 13 files changed, 494 insertions(+), 128 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 7700090ccef..3fcdf20953c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -197,9 +197,7 @@ public class ConsumerConfig extends AbstractConfig { "this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. " + "The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or " + "max.message.bytes (topic config). A fetch request consists of many partitions, and there is another setting that controls how much " + - "data is returned for each partition in a fetch request - see max.partition.fetch.bytes. Note that there is a current limitation when " + - "performing remote reads from tiered storage (KIP-405) - only one partition out of the fetch request is fetched from the remote store (KAFKA-14915). " + - "Note also that the consumer performs multiple fetches in parallel."; + "data is returned for each partition in a fetch request - see max.partition.fetch.bytes. Note that the consumer performs multiple fetches in parallel."; public static final int DEFAULT_FETCH_MAX_BYTES = 50 * 1024 * 1024; /** @@ -224,9 +222,7 @@ public class ConsumerConfig extends AbstractConfig { "partition of the fetch is larger than this limit, the " + "batch will still be returned to ensure that the consumer can make progress. The maximum record batch size " + "accepted by the broker is defined via message.max.bytes (broker config) or " + - "max.message.bytes (topic config). See " + FETCH_MAX_BYTES_CONFIG + " for limiting the consumer request size. " + - "Consider increasing max.partition.fetch.bytes especially in the cases of remote storage reads (KIP-405), because currently only " + - "one partition per fetch request is served from the remote store (KAFKA-14915)."; + "max.message.bytes (topic config). See " + FETCH_MAX_BYTES_CONFIG + " for limiting the consumer request size."; public static final int DEFAULT_MAX_PARTITION_FETCH_BYTES = 1 * 1024 * 1024; /** send.buffer.bytes */ diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala index 317c8dd4ac9..cb14a14b3e9 100644 --- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala @@ -28,6 +28,7 @@ import org.apache.kafka.server.purgatory.DelayedOperation import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData} import org.apache.kafka.storage.internals.log.{LogOffsetMetadata, RemoteLogReadResult, RemoteStorageFetchInfo} +import java.util import java.util.concurrent.{CompletableFuture, Future, TimeUnit} import java.util.{Optional, OptionalInt, OptionalLong} import scala.collection._ @@ -36,9 +37,9 @@ import scala.collection._ * A remote fetch operation that can be created by the replica manager and watched * in the remote fetch operation purgatory */ -class DelayedRemoteFetch(remoteFetchTask: Future[Void], - remoteFetchResult: CompletableFuture[RemoteLogReadResult], - remoteFetchInfo: RemoteStorageFetchInfo, +class DelayedRemoteFetch(remoteFetchTasks: util.Map[TopicIdPartition, Future[Void]], + remoteFetchResults: util.Map[TopicIdPartition, CompletableFuture[RemoteLogReadResult]], + remoteFetchInfos: util.Map[TopicIdPartition, RemoteStorageFetchInfo], remoteFetchMaxWaitMs: Long, fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)], fetchParams: FetchParams, @@ -56,7 +57,7 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void], * * Case a: This broker is no longer the leader of the partition it tries to fetch * Case b: This broker does not know the partition it tries to fetch - * Case c: The remote storage read request completed (succeeded or failed) + * Case c: All the remote storage read request completed (succeeded or failed) * Case d: The partition is in an offline log directory on this broker * * Upon completion, should return whatever data is available for each valid partition @@ -81,7 +82,8 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void], return forceComplete() } } - if (remoteFetchResult.isDone) // Case c + // Case c + if (remoteFetchResults.values().stream().allMatch(taskResult => taskResult.isDone)) forceComplete() else false @@ -90,8 +92,13 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void], override def onExpiration(): Unit = { // cancel the remote storage read task, if it has not been executed yet and // avoid interrupting the task if it is already running as it may force closing opened/cached resources as transaction index. - val cancelled = remoteFetchTask.cancel(false) - if (!cancelled) debug(s"Remote fetch task for RemoteStorageFetchInfo: $remoteFetchInfo could not be cancelled and its isDone value is ${remoteFetchTask.isDone}") + remoteFetchTasks.forEach { (topicIdPartition, task) => + if (task != null && !task.isDone) { + if (!task.cancel(false)) { + debug(s"Remote fetch task for remoteFetchInfo: ${remoteFetchInfos.get(topicIdPartition)} could not be cancelled.") + } + } + } DelayedRemoteFetchMetrics.expiredRequestMeter.mark() } @@ -101,7 +108,8 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void], */ override def onComplete(): Unit = { val fetchPartitionData = localReadResults.map { case (tp, result) => - if (tp.topicPartition().equals(remoteFetchInfo.topicPartition) + val remoteFetchResult = remoteFetchResults.get(tp) + if (remoteFetchInfos.containsKey(tp) && remoteFetchResult.isDone && result.error == Errors.NONE && result.info.delayedRemoteStorageFetch.isPresent) { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index e70a4726216..448ec1cf264 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1579,15 +1579,18 @@ class ReplicaManager(val config: KafkaConfig, } /** - * Returns [[LogReadResult]] with error if a task for RemoteStorageFetchInfo could not be scheduled successfully - * else returns [[None]]. + * Initiates an asynchronous remote storage fetch operation for the given remote fetch information. + * + * This method schedules a remote fetch task with the remote log manager and sets up the necessary + * completion handling for the operation. The remote fetch result will be used to populate the + * delayed remote fetch purgatory when completed. + * + * @param remoteFetchInfo The remote storage fetch information + * + * @return A tuple containing the remote fetch task and the remote fetch result */ - private def processRemoteFetch(remoteFetchInfo: RemoteStorageFetchInfo, - params: FetchParams, - responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit, - logReadResults: Seq[(TopicIdPartition, LogReadResult)], - fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Option[LogReadResult] = { - val key = new TopicPartitionOperationKey(remoteFetchInfo.topicPartition.topic(), remoteFetchInfo.topicPartition.partition()) + private def processRemoteFetch(remoteFetchInfo: RemoteStorageFetchInfo): (Future[Void], CompletableFuture[RemoteLogReadResult]) = { + val key = new TopicPartitionOperationKey(remoteFetchInfo.topicIdPartition) val remoteFetchResult = new CompletableFuture[RemoteLogReadResult] var remoteFetchTask: Future[Void] = null try { @@ -1597,31 +1600,39 @@ class ReplicaManager(val config: KafkaConfig, }) } catch { case e: RejectedExecutionException => - // Return the error if any in scheduling the remote fetch task - warn("Unable to fetch data from remote storage", e) - return Some(createLogReadResult(e)) + warn(s"Unable to fetch data from remote storage for remoteFetchInfo: $remoteFetchInfo", e) + // Store the error in RemoteLogReadResult if any in scheduling the remote fetch task. + // It will be sent back to the client in DelayedRemoteFetch along with other successful remote fetch results. + remoteFetchResult.complete(new RemoteLogReadResult(Optional.empty, Optional.of(e))) } - val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong - val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs, - fetchPartitionStatus, params, logReadResults, this, responseCallback) - delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, util.Collections.singletonList(key)) - None + (remoteFetchTask, remoteFetchResult) } - private def buildPartitionToFetchPartitionData(logReadResults: Seq[(TopicIdPartition, LogReadResult)], - remoteFetchTopicPartition: TopicPartition, - error: LogReadResult): Seq[(TopicIdPartition, FetchPartitionData)] = { - logReadResults.map { case (tp, result) => - val fetchPartitionData = { - if (tp.topicPartition().equals(remoteFetchTopicPartition)) - error - else - result - }.toFetchPartitionData(false) - - tp -> fetchPartitionData + /** + * Process all remote fetches by creating async read tasks and handling them in DelayedRemoteFetch collectively. + */ + private def processRemoteFetches(remoteFetchInfos: util.HashMap[TopicIdPartition, RemoteStorageFetchInfo], + params: FetchParams, + responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit, + logReadResults: Seq[(TopicIdPartition, LogReadResult)], + remoteFetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = { + val remoteFetchTasks = new util.HashMap[TopicIdPartition, Future[Void]] + val remoteFetchResults = new util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]] + + remoteFetchInfos.forEach { (topicIdPartition, remoteFetchInfo) => + val (task, result) = processRemoteFetch(remoteFetchInfo) + remoteFetchTasks.put(topicIdPartition, task) + remoteFetchResults.put(topicIdPartition, result) } + + val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong + val remoteFetch = new DelayedRemoteFetch(remoteFetchTasks, remoteFetchResults, remoteFetchInfos, remoteFetchMaxWaitMs, + remoteFetchPartitionStatus, params, logReadResults, this, responseCallback) + + // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation + val delayedFetchKeys = remoteFetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList + delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, delayedFetchKeys.asJava) } /** @@ -1639,8 +1650,8 @@ class ReplicaManager(val config: KafkaConfig, var bytesReadable: Long = 0 var errorReadingData = false - // The 1st topic-partition that has to be read from remote storage - var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty() + // topic-partitions that have to be read from remote storage + val remoteFetchInfos = new util.HashMap[TopicIdPartition, RemoteStorageFetchInfo]() var hasDivergingEpoch = false var hasPreferredReadReplica = false @@ -1651,8 +1662,8 @@ class ReplicaManager(val config: KafkaConfig, brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark() if (logReadResult.error != Errors.NONE) errorReadingData = true - if (!remoteFetchInfo.isPresent && logReadResult.info.delayedRemoteStorageFetch.isPresent) { - remoteFetchInfo = logReadResult.info.delayedRemoteStorageFetch + if (logReadResult.info.delayedRemoteStorageFetch.isPresent) { + remoteFetchInfos.put(topicIdPartition, logReadResult.info.delayedRemoteStorageFetch.get()) } if (logReadResult.divergingEpoch.isPresent) hasDivergingEpoch = true @@ -1669,7 +1680,7 @@ class ReplicaManager(val config: KafkaConfig, // 4) some error happens while reading data // 5) we found a diverging epoch // 6) has a preferred read replica - if (!remoteFetchInfo.isPresent && (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData || + if (remoteFetchInfos.isEmpty && (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData || hasDivergingEpoch || hasPreferredReadReplica)) { val fetchPartitionData = logReadResults.map { case (tp, result) => val isReassignmentFetch = params.isFromFollower && isAddingReplica(tp.topicPartition, params.replicaId) @@ -1686,15 +1697,8 @@ class ReplicaManager(val config: KafkaConfig, }) } - if (remoteFetchInfo.isPresent) { - val maybeLogReadResultWithError = processRemoteFetch(remoteFetchInfo.get(), params, responseCallback, logReadResults, fetchPartitionStatus) - if (maybeLogReadResultWithError.isDefined) { - // If there is an error in scheduling the remote fetch task, return what we currently have - // (the data read from local log segment for the other topic-partitions) and an error for the topic-partition - // that we couldn't read from remote storage - val partitionToFetchPartitionData = buildPartitionToFetchPartitionData(logReadResults, remoteFetchInfo.get().topicPartition, maybeLogReadResultWithError.get) - responseCallback(partitionToFetchPartitionData) - } + if (!remoteFetchInfos.isEmpty) { + processRemoteFetches(remoteFetchInfos, params, responseCallback, logReadResults, fetchPartitionStatus.toSeq) } else { // If there is not enough data to respond and there is no remote data, we will let the fetch request // wait for new data. @@ -1902,9 +1906,9 @@ class ReplicaManager(val config: KafkaConfig, ) } else { // For consume fetch requests, create a dummy FetchDataInfo with the remote storage fetch information. - // For the first topic-partition that needs remote data, we will use this information to read the data in another thread. + // For the topic-partitions that need remote data, we will use this information to read the data in another thread. new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, Optional.empty(), - Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes, minOneMessage, tp.topicPartition(), + Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes, minOneMessage, tp, fetchInfo, params.isolation))) } diff --git a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala index b65de12182e..23b4b32b0d7 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala @@ -16,6 +16,7 @@ */ package kafka.server +import com.yammer.metrics.core.Meter import kafka.cluster.Partition import org.apache.kafka.common.errors.NotLeaderOrFollowerException import org.apache.kafka.common.protocol.Errors @@ -28,9 +29,10 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPa import org.apache.kafka.storage.internals.log._ import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test -import org.mockito.Mockito.{mock, verify, when} +import org.mockito.ArgumentMatchers.anyBoolean +import org.mockito.Mockito.{mock, never, verify, when} -import java.util.{Optional, OptionalLong} +import java.util.{Collections, Optional, OptionalLong} import java.util.concurrent.{CompletableFuture, Future} import scala.collection._ import scala.jdk.CollectionConverters._ @@ -39,6 +41,7 @@ class DelayedRemoteFetchTest { private val maxBytes = 1024 private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) private val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") + private val topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), 0, "topic2") private val fetchOffset = 500L private val logStartOffset = 0L private val currentLeaderEpoch = Optional.of[Integer](10) @@ -61,14 +64,22 @@ class DelayedRemoteFetchTest { } val future: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() - future.complete(null) - val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition.topicPartition(), null, null) + future.complete(buildRemoteReadResult(Errors.NONE)) + val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null) val highWatermark = 100 val leaderLogStartOffset = 10 val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset) - val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs, - Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback) + val delayedRemoteFetch = new DelayedRemoteFetch( + java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](), + java.util.Collections.singletonMap(topicIdPartition, future), + java.util.Collections.singletonMap(topicIdPartition, fetchInfo), + remoteFetchMaxWaitMs, + Seq(topicIdPartition -> fetchStatus), + fetchParams, + Seq(topicIdPartition -> logReadInfo), + replicaManager, + callback) when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)) .thenReturn(mock(classOf[Partition])) @@ -97,14 +108,23 @@ class DelayedRemoteFetchTest { } val future: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() - future.complete(null) - val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition.topicPartition(), null, null) + future.complete(buildRemoteReadResult(Errors.NONE)) + val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null) val highWatermark = 100 val leaderLogStartOffset = 10 val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset) val fetchParams = buildFetchParams(replicaId = 1, maxWaitMs = 500) - assertThrows(classOf[IllegalStateException], () => new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs, - Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback)) + + assertThrows(classOf[IllegalStateException], () => new DelayedRemoteFetch( + java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](), + java.util.Collections.singletonMap(topicIdPartition, future), + java.util.Collections.singletonMap(topicIdPartition, fetchInfo), + remoteFetchMaxWaitMs, + Seq(topicIdPartition -> fetchStatus), + fetchParams, + Seq(topicIdPartition -> logReadInfo), + replicaManager, + callback)) } @Test @@ -123,12 +143,20 @@ class DelayedRemoteFetchTest { .thenThrow(new NotLeaderOrFollowerException(s"Replica for $topicIdPartition not available")) val future: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() - val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition.topicPartition(), null, null) + val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null) val logReadInfo = buildReadResult(Errors.NONE) - val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs, - Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback) + val delayedRemoteFetch = new DelayedRemoteFetch( + java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](), + java.util.Collections.singletonMap(topicIdPartition, future), + java.util.Collections.singletonMap(topicIdPartition, fetchInfo), + remoteFetchMaxWaitMs, + Seq(topicIdPartition -> fetchStatus), + fetchParams, + Seq(topicIdPartition -> logReadInfo), + replicaManager, + callback) // delayed remote fetch should still be able to complete assertTrue(delayedRemoteFetch.tryComplete()) @@ -152,14 +180,22 @@ class DelayedRemoteFetchTest { .thenReturn(mock(classOf[Partition])) val future: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() - future.complete(null) - val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition.topicPartition(), null, null) + future.complete(buildRemoteReadResult(Errors.NONE)) + val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null) // build a read result with error val logReadInfo = buildReadResult(Errors.FENCED_LEADER_EPOCH) - val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs, - Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback) + val delayedRemoteFetch = new DelayedRemoteFetch( + java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](), + java.util.Collections.singletonMap(topicIdPartition, future), + java.util.Collections.singletonMap(topicIdPartition, fetchInfo), + remoteFetchMaxWaitMs, + Seq(topicIdPartition -> fetchStatus), + fetchParams, + Seq(topicIdPartition -> logReadInfo), + replicaManager, + callback) assertTrue(delayedRemoteFetch.tryComplete()) assertTrue(delayedRemoteFetch.isCompleted) @@ -170,52 +206,262 @@ class DelayedRemoteFetchTest { @Test def testRequestExpiry(): Unit = { - var actualTopicPartition: Option[TopicIdPartition] = None - var fetchResultOpt: Option[FetchPartitionData] = None + val responses = mutable.Map[TopicIdPartition, FetchPartitionData]() - def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { - assertEquals(1, responses.size) - actualTopicPartition = Some(responses.head._1) - fetchResultOpt = Some(responses.head._2) + def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + responseSeq.foreach { case (tp, data) => + responses.put(tp, data) + } } + def expiresPerSecValue(): Double = { + val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala + val metric = allMetrics.find { case (n, _) => n.getMBeanName.endsWith("kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec") } + + if (metric.isEmpty) + 0 + else + metric.get._2.asInstanceOf[Meter].count + } + + val remoteFetchTaskExpired = mock(classOf[Future[Void]]) + val remoteFetchTask2 = mock(classOf[Future[Void]]) + // complete the 2nd task, and keep the 1st one expired + when(remoteFetchTask2.isDone).thenReturn(true) + + // Create futures - one completed, one not + val future1: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() + val future2: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() + // Only complete one remote fetch + future2.complete(buildRemoteReadResult(Errors.NONE)) + + val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null) + val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2, null, null) + val highWatermark = 100 val leaderLogStartOffset = 10 - val remoteFetchTask = mock(classOf[Future[Void]]) - val future: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() - val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition.topicPartition(), null, null) - val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset) + val logReadInfo1 = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset) + val logReadInfo2 = buildReadResult(Errors.NONE) - val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future, fetchInfo, remoteFetchMaxWaitMs, - Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback) + val fetchStatus1 = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + val fetchStatus2 = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100), + fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 100, logStartOffset, maxBytes, currentLeaderEpoch)) + + // Set up maps for multiple partitions + val remoteFetchTasks = new java.util.HashMap[TopicIdPartition, Future[Void]]() + val remoteFetchResults = new java.util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]]() + val remoteFetchInfos = new java.util.HashMap[TopicIdPartition, RemoteStorageFetchInfo]() + + remoteFetchTasks.put(topicIdPartition, remoteFetchTaskExpired) + remoteFetchTasks.put(topicIdPartition2, remoteFetchTask2) + remoteFetchResults.put(topicIdPartition, future1) + remoteFetchResults.put(topicIdPartition2, future2) + remoteFetchInfos.put(topicIdPartition, fetchInfo1) + remoteFetchInfos.put(topicIdPartition2, fetchInfo2) + + val delayedRemoteFetch = new DelayedRemoteFetch( + remoteFetchTasks, + remoteFetchResults, + remoteFetchInfos, + remoteFetchMaxWaitMs, + Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2), + fetchParams, + Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2), + replicaManager, + callback) when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)) .thenReturn(mock(classOf[Partition])) + when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition)) + .thenReturn(mock(classOf[Partition])) // Verify that the ExpiresPerSec metric is zero before fetching - val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics - assertEquals(0, metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec")) + val existingMetricVal = expiresPerSecValue() + // Verify the delayedRemoteFetch is not completed yet + assertFalse(delayedRemoteFetch.isCompleted) // Force the delayed remote fetch to expire delayedRemoteFetch.run() - // Check that the task was cancelled and force-completed - verify(remoteFetchTask).cancel(false) + // Check that the expired task was cancelled and force-completed + verify(remoteFetchTaskExpired).cancel(anyBoolean()) + verify(remoteFetchTask2, never()).cancel(anyBoolean()) assertTrue(delayedRemoteFetch.isCompleted) // Check that the ExpiresPerSec metric was incremented - assertEquals(1, metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec")) + assertTrue(expiresPerSecValue() > existingMetricVal) - // Fetch results should still include local read results - assertTrue(actualTopicPartition.isDefined) - assertEquals(topicIdPartition, actualTopicPartition.get) - assertTrue(fetchResultOpt.isDefined) + // Fetch results should include 2 results and the expired one should return local read results + assertEquals(2, responses.size) + assertTrue(responses.contains(topicIdPartition)) + assertTrue(responses.contains(topicIdPartition2)) - val fetchResult = fetchResultOpt.get - assertEquals(Errors.NONE, fetchResult.error) - assertEquals(highWatermark, fetchResult.highWatermark) - assertEquals(leaderLogStartOffset, fetchResult.logStartOffset) + assertEquals(Errors.NONE, responses(topicIdPartition).error) + assertEquals(highWatermark, responses(topicIdPartition).highWatermark) + assertEquals(leaderLogStartOffset, responses(topicIdPartition).logStartOffset) + + assertEquals(Errors.NONE, responses(topicIdPartition2).error) + } + + @Test + def testMultiplePartitions(): Unit = { + val responses = mutable.Map[TopicIdPartition, FetchPartitionData]() + + def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + responseSeq.foreach { case (tp, data) => + responses.put(tp, data) + } + } + + // Create futures - one completed, one not + val future1: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() + val future2: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() + // Only complete one remote fetch + future1.complete(buildRemoteReadResult(Errors.NONE)) + + val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null) + val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2, null, null) + + val highWatermark1 = 100 + val leaderLogStartOffset1 = 10 + val highWatermark2 = 200 + val leaderLogStartOffset2 = 20 + + val logReadInfo1 = buildReadResult(Errors.NONE, 100, 10) + val logReadInfo2 = buildReadResult(Errors.NONE, 200, 20) + + val fetchStatus1 = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + val fetchStatus2 = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100), + fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 100, logStartOffset, maxBytes, currentLeaderEpoch)) + + // Set up maps for multiple partitions + val remoteFetchResults = new java.util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]]() + val remoteFetchInfos = new java.util.HashMap[TopicIdPartition, RemoteStorageFetchInfo]() + + remoteFetchResults.put(topicIdPartition, future1) + remoteFetchResults.put(topicIdPartition2, future2) + remoteFetchInfos.put(topicIdPartition, fetchInfo1) + remoteFetchInfos.put(topicIdPartition2, fetchInfo2) + + val delayedRemoteFetch = new DelayedRemoteFetch( + Collections.emptyMap[TopicIdPartition, Future[Void]](), + remoteFetchResults, + remoteFetchInfos, + remoteFetchMaxWaitMs, + Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2), + fetchParams, + Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2), + replicaManager, + callback) + + when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)) + .thenReturn(mock(classOf[Partition])) + when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition)) + .thenReturn(mock(classOf[Partition])) + + // Should not complete since future2 is not done + assertFalse(delayedRemoteFetch.tryComplete()) + assertFalse(delayedRemoteFetch.isCompleted) + + // Complete future2 + future2.complete(buildRemoteReadResult(Errors.NONE)) + + // Now it should complete + assertTrue(delayedRemoteFetch.tryComplete()) + assertTrue(delayedRemoteFetch.isCompleted) + + // Verify both partitions were processed without error + assertEquals(2, responses.size) + assertTrue(responses.contains(topicIdPartition)) + assertTrue(responses.contains(topicIdPartition2)) + + assertEquals(Errors.NONE, responses(topicIdPartition).error) + assertEquals(highWatermark1, responses(topicIdPartition).highWatermark) + assertEquals(leaderLogStartOffset1, responses(topicIdPartition).logStartOffset) + + assertEquals(Errors.NONE, responses(topicIdPartition2).error) + assertEquals(highWatermark2, responses(topicIdPartition2).highWatermark) + assertEquals(leaderLogStartOffset2, responses(topicIdPartition2).logStartOffset) + } + + @Test + def testMultiplePartitionsWithFailedResults(): Unit = { + val responses = mutable.Map[TopicIdPartition, FetchPartitionData]() + + def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + responseSeq.foreach { case (tp, data) => + responses.put(tp, data) + } + } + + // Create futures - one successful, one with error + val future1: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() + val future2: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() + + // Created 1 successful result and 1 failed result + future1.complete(buildRemoteReadResult(Errors.NONE)) + future2.complete(buildRemoteReadResult(Errors.UNKNOWN_SERVER_ERROR)) + + val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null) + val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2, null, null) + + val logReadInfo1 = buildReadResult(Errors.NONE, 100, 10) + val logReadInfo2 = buildReadResult(Errors.NONE, 200, 20) + + val fetchStatus1 = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + val fetchStatus2 = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100), + fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 100, logStartOffset, maxBytes, currentLeaderEpoch)) + + // Set up maps for multiple partitions + val remoteFetchResults = new java.util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]]() + val remoteFetchInfos = new java.util.HashMap[TopicIdPartition, RemoteStorageFetchInfo]() + + remoteFetchResults.put(topicIdPartition, future1) + remoteFetchResults.put(topicIdPartition2, future2) + remoteFetchInfos.put(topicIdPartition, fetchInfo1) + remoteFetchInfos.put(topicIdPartition2, fetchInfo2) + + val delayedRemoteFetch = new DelayedRemoteFetch( + Collections.emptyMap[TopicIdPartition, Future[Void]](), + remoteFetchResults, + remoteFetchInfos, + remoteFetchMaxWaitMs, + Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2), + fetchParams, + Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2), + replicaManager, + callback) + + when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)) + .thenReturn(mock(classOf[Partition])) + when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition)) + .thenReturn(mock(classOf[Partition])) + + assertTrue(delayedRemoteFetch.tryComplete()) + assertTrue(delayedRemoteFetch.isCompleted) + + // Verify both partitions were processed + assertEquals(2, responses.size) + assertTrue(responses.contains(topicIdPartition)) + assertTrue(responses.contains(topicIdPartition2)) + + // First partition should be successful + val fetchResult1 = responses(topicIdPartition) + assertEquals(Errors.NONE, fetchResult1.error) + + // Second partition should have an error due to remote fetch failure + val fetchResult2 = responses(topicIdPartition2) + assertEquals(Errors.UNKNOWN_SERVER_ERROR, fetchResult2.error) } private def buildFetchParams(replicaId: Int, @@ -235,7 +481,8 @@ class DelayedRemoteFetchTest { highWatermark: Int = 0, leaderLogStartOffset: Int = 0): LogReadResult = { new LogReadResult( - new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), + new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY, false, Optional.empty(), + Optional.of(mock(classOf[RemoteStorageFetchInfo]))), Optional.empty(), highWatermark, leaderLogStartOffset, @@ -246,4 +493,9 @@ class DelayedRemoteFetchTest { if (error != Errors.NONE) Optional.of[Throwable](error.exception) else Optional.empty[Throwable]()) } + private def buildRemoteReadResult(error: Errors): RemoteLogReadResult = { + new RemoteLogReadResult( + Optional.of(new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY)), + if (error != Errors.NONE) Optional.of[Throwable](error.exception) else Optional.empty[Throwable]()) + } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 761fc49b2ea..843a0d60342 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -77,7 +77,7 @@ import org.apache.kafka.server.util.timer.MockTimer import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler} import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegments, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegments, ProducerStateManager, ProducerStateManagerConfig, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeEach, Test} @@ -94,8 +94,8 @@ import java.net.InetAddress import java.nio.file.{Files, Paths} import java.util import java.util.concurrent.atomic.{AtomicLong, AtomicReference} -import java.util.concurrent.{Callable, CompletableFuture, ConcurrentHashMap, CountDownLatch, TimeUnit} -import java.util.function.BiConsumer +import java.util.concurrent.{Callable, CompletableFuture, ConcurrentHashMap, CountDownLatch, Future, TimeUnit} +import java.util.function.{BiConsumer, Consumer} import java.util.stream.IntStream import java.util.{Collections, Optional, OptionalLong, Properties} import scala.collection.{Map, Seq, mutable} @@ -3390,7 +3390,7 @@ class ReplicaManagerTest { } else { verify(mockRemoteLogManager).asyncRead(remoteStorageFetchInfoArg.capture(), any()) val remoteStorageFetchInfo = remoteStorageFetchInfoArg.getValue - assertEquals(tp0, remoteStorageFetchInfo.topicPartition) + assertEquals(tp0, remoteStorageFetchInfo.topicIdPartition.topicPartition) assertEquals(fetchOffset, remoteStorageFetchInfo.fetchInfo.fetchOffset) assertEquals(topicId, remoteStorageFetchInfo.fetchInfo.topicId) assertEquals(startOffset, remoteStorageFetchInfo.fetchInfo.logStartOffset) @@ -3594,6 +3594,109 @@ class ReplicaManagerTest { } } + @Test + def testMultipleRemoteFetchesInOneFetchRequest(): Unit = { + val replicaId = -1 + val tp0 = new TopicPartition(topic, 0) + val tp1 = new TopicPartition(topic, 1) + val tidp0 = new TopicIdPartition(topicId, tp0) + val tidp1 = new TopicIdPartition(topicId, tp1) + + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteFetchQuotaExceeded = Some(false)) + + try { + val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + + val leaderEpoch = 0 + val leaderDelta0 = createLeaderDelta(topicId, tp0, leaderId = 0, leaderEpoch = leaderEpoch) + val leaderDelta1 = createLeaderDelta(topicId, tp1, leaderId = 0, leaderEpoch = leaderEpoch) + val leaderMetadataImage0 = imageFromTopics(leaderDelta0.apply()) + val leaderMetadataImage1 = imageFromTopics(leaderDelta1.apply()) + replicaManager.applyDelta(leaderDelta0, leaderMetadataImage0) + replicaManager.applyDelta(leaderDelta1, leaderMetadataImage1) + + val params = new FetchParams(replicaId, 1, 1000, 10, 100, FetchIsolation.LOG_END, Optional.empty) + val fetchOffsetTp0 = 1 + val fetchOffsetTp1 = 2 + + val responseSeq = new AtomicReference[Seq[(TopicIdPartition, FetchPartitionData)]]() + val responseLatch = new CountDownLatch(1) + + def fetchCallback(responseStatus: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + responseSeq.set(responseStatus) + responseLatch.countDown() + } + + val callbacks: util.Set[Consumer[RemoteLogReadResult]] = new util.HashSet[Consumer[RemoteLogReadResult]]() + when(mockRemoteLogManager.asyncRead(any(), any())).thenAnswer(ans => { + callbacks.add(ans.getArgument(1, classOf[Consumer[RemoteLogReadResult]])) + mock(classOf[Future[Void]]) + }) + + // Start the fetch request for both partitions - this should trigger remote fetches since + // the default mocked log behavior throws OffsetOutOfRangeException + replicaManager.fetchMessages(params, Seq( + tidp0 -> new PartitionData(topicId, fetchOffsetTp0, startOffset, 100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch)), + tidp1 -> new PartitionData(topicId, fetchOffsetTp1, startOffset, 100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch)) + ), UNBOUNDED_QUOTA, fetchCallback) + + // Verify that exactly two asyncRead calls were made (one for each partition) + val remoteStorageFetchInfoArg: ArgumentCaptor[RemoteStorageFetchInfo] = ArgumentCaptor.forClass(classOf[RemoteStorageFetchInfo]) + verify(mockRemoteLogManager, times(2)).asyncRead(remoteStorageFetchInfoArg.capture(), any()) + + // Verify that remote fetch operations were properly set up for both partitions + assertTrue(replicaManager.delayedRemoteFetchPurgatory.watched == 2, "DelayedRemoteFetch purgatory should have operations") + + // Verify both partitions were captured in the remote fetch requests + val capturedFetchInfos = remoteStorageFetchInfoArg.getAllValues.asScala + assertEquals(2, capturedFetchInfos.size, "Should have 2 remote storage fetch info calls") + + val capturedTopicPartitions = capturedFetchInfos.map(_.topicIdPartition.topicPartition).toSet + assertTrue(capturedTopicPartitions.contains(tp0), "Should contain " + tp0) + assertTrue(capturedTopicPartitions.contains(tp1), "Should contain " + tp1) + + // Verify the fetch info details are correct for both partitions + capturedFetchInfos.foreach { fetchInfo => + assertEquals(topicId, fetchInfo.fetchInfo.topicId) + assertEquals(startOffset, fetchInfo.fetchInfo.logStartOffset) + assertEquals(leaderEpoch, fetchInfo.fetchInfo.currentLeaderEpoch.get()) + if (fetchInfo.topicIdPartition.topicPartition == tp0) { + assertEquals(fetchOffsetTp0, fetchInfo.fetchInfo.fetchOffset) + } else { + assertEquals(fetchOffsetTp1, fetchInfo.fetchInfo.fetchOffset) + } + } + + // Complete the 2 asyncRead tasks + callbacks.forEach(callback => callback.accept(buildRemoteReadResult(Errors.NONE))) + + // Wait for the fetch callback to complete and verify responseSeq content + assertTrue(responseLatch.await(5, TimeUnit.SECONDS), "Fetch callback should complete") + + val responseData = responseSeq.get() + assertNotNull(responseData, "Response sequence should not be null") + assertEquals(2, responseData.size, "Response should contain data for both partitions") + + // Verify that response contains both tidp0 and tidp1 and have no errors + val responseTopicIdPartitions = responseData.map(_._1).toSet + assertTrue(responseTopicIdPartitions.contains(tidp0), "Response should contain " + tidp0) + assertTrue(responseTopicIdPartitions.contains(tidp1), "Response should contain " + tidp1) + responseData.foreach { case (_, fetchPartitionData) => + assertEquals(Errors.NONE, fetchPartitionData.error) + } + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + private def buildRemoteReadResult(error: Errors): RemoteLogReadResult = { + new RemoteLogReadResult( + Optional.of(new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY)), + if (error != Errors.NONE) Optional.of[Throwable](error.exception) else Optional.empty[Throwable]()) + } + private def yammerMetricValue(name: String): Any = { val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala val (_, metric) = allMetrics.find { case (n, _) => n.getMBeanName.endsWith(name) } diff --git a/docs/ops.html b/docs/ops.html index 09748a9686d..0b1e8fa6880 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -4358,7 +4358,6 @@ $ bin/kafka-topics.sh --create --topic tieredTopic --bootstrap-server localhost:
  • Disabling tiered storage on all topics where it is enabled is required before disabling tiered storage at the broker level
  • Admin actions related to tiered storage feature are only supported on clients from version 3.0 onwards
  • No support for log segments missing producer snapshot file. It can happen when topic is created before v2.8.0.
  • -
  • Only one partition per fetch request is served from the remote store. This limitation can become a bottleneck for consumer client throughput - consider configuring max.partition.fetch.bytes appropriately.
  • For more information, please check Kafka Tiered Storage GA Release Notes. diff --git a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java index bfb6d97f6d9..157fdcb885d 100644 --- a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java +++ b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java @@ -199,7 +199,7 @@ public class DelayedOperationPurgatory { } /** - * Return the total size of watch lists the purgatory. Since an operation may be watched + * Return the total size of watch lists in the purgatory. Since an operation may be watched * on multiple lists, and some of its watched entries may still be in the watch lists * even when it has been completed, this number may be larger than the number of real operations watched */ diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index 0970f6f0a34..852e1d4d1fb 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -1659,7 +1659,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; - TopicPartition tp = remoteStorageFetchInfo.topicPartition; + TopicPartition tp = remoteStorageFetchInfo.topicIdPartition.topicPartition(); FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; @@ -1715,6 +1715,8 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { // - there is no minimum-one-message constraint and // - the first batch size is more than maximum bytes that can be sent and if (!remoteStorageFetchInfo.minOneMessage && firstBatchSize > maxBytes) { + LOGGER.debug("Returning empty record for offset {} in partition {} because the first batch size {} " + + "is greater than max fetch bytes {}", offset, tp, firstBatchSize, maxBytes); return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY); } diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 586816dae29..0b1b049d766 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -192,8 +192,8 @@ public final class RemoteLogManagerConfig { public static final int DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS = 1; public static final String REMOTE_FETCH_MAX_WAIT_MS_PROP = "remote.fetch.max.wait.ms"; - public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will wait before answering the remote fetch request. " + - "Note that the broker currently only fetches one partition per fetch request from the remote store. (KAFKA-14915)"; + public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will wait before answering the fetch request containing remote fetch partitions. " + + "It's important to be aware that the request will only be responded after all remote partitions have been successfully fetched, have failed, or this timeout is exceeded."; public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 500; public static final String REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP = "remote.list.offsets.request.timeout.ms"; diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogReader.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogReader.java index a23ee7207ae..898ff52760c 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogReader.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogReader.java @@ -51,7 +51,7 @@ public class RemoteLogReader implements Callable { this.rlm = rlm; this.brokerTopicStats = brokerTopicStats; this.callback = callback; - this.brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteFetchRequestRate().mark(); + this.brokerTopicStats.topicStats(fetchInfo.topicIdPartition.topic()).remoteFetchRequestRate().mark(); this.brokerTopicStats.allTopicsStats().remoteFetchRequestRate().mark(); this.quotaManager = quotaManager; this.remoteReadTimer = remoteReadTimer; @@ -61,20 +61,20 @@ public class RemoteLogReader implements Callable { public Void call() { RemoteLogReadResult result; try { - LOGGER.debug("Reading records from remote storage for topic partition {}", fetchInfo.topicPartition); + LOGGER.debug("Reading records from remote storage for topic partition {}", fetchInfo.topicIdPartition); FetchDataInfo fetchDataInfo = remoteReadTimer.time(() -> rlm.read(fetchInfo)); - brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes()); + brokerTopicStats.topicStats(fetchInfo.topicIdPartition.topic()).remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes()); brokerTopicStats.allTopicsStats().remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes()); result = new RemoteLogReadResult(Optional.of(fetchDataInfo), Optional.empty()); } catch (OffsetOutOfRangeException e) { result = new RemoteLogReadResult(Optional.empty(), Optional.of(e)); } catch (Exception e) { - brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).failedRemoteFetchRequestRate().mark(); + brokerTopicStats.topicStats(fetchInfo.topicIdPartition.topic()).failedRemoteFetchRequestRate().mark(); brokerTopicStats.allTopicsStats().failedRemoteFetchRequestRate().mark(); - LOGGER.error("Error occurred while reading the remote data for {}", fetchInfo.topicPartition, e); + LOGGER.error("Error occurred while reading the remote data for {}", fetchInfo.topicIdPartition, e); result = new RemoteLogReadResult(Optional.empty(), Optional.of(e)); } - LOGGER.debug("Finished reading records from remote storage for topic partition {}", fetchInfo.topicPartition); + LOGGER.debug("Finished reading records from remote storage for topic partition {}", fetchInfo.topicIdPartition); quotaManager.record(result.fetchDataInfo.map(fetchDataInfo -> fetchDataInfo.records.sizeInBytes()).orElse(0)); callback.accept(result); return null; diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java index c110e750d7c..e02fea8c0ef 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.storage.internals.log; -import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.server.storage.log.FetchIsolation; @@ -24,15 +24,15 @@ public class RemoteStorageFetchInfo { public final int fetchMaxBytes; public final boolean minOneMessage; - public final TopicPartition topicPartition; + public final TopicIdPartition topicIdPartition; public final FetchRequest.PartitionData fetchInfo; public final FetchIsolation fetchIsolation; - public RemoteStorageFetchInfo(int fetchMaxBytes, boolean minOneMessage, TopicPartition topicPartition, + public RemoteStorageFetchInfo(int fetchMaxBytes, boolean minOneMessage, TopicIdPartition topicIdPartition, FetchRequest.PartitionData fetchInfo, FetchIsolation fetchIsolation) { this.fetchMaxBytes = fetchMaxBytes; this.minOneMessage = minOneMessage; - this.topicPartition = topicPartition; + this.topicIdPartition = topicIdPartition; this.fetchInfo = fetchInfo; this.fetchIsolation = fetchIsolation; } @@ -42,7 +42,7 @@ public class RemoteStorageFetchInfo { return "RemoteStorageFetchInfo{" + "fetchMaxBytes=" + fetchMaxBytes + ", minOneMessage=" + minOneMessage + - ", topicPartition=" + topicPartition + + ", topicIdPartition=" + topicIdPartition + ", fetchInfo=" + fetchInfo + ", fetchIsolation=" + fetchIsolation + '}'; diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java index e82536d7233..182fda9abb9 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java @@ -205,6 +205,7 @@ public class RemoteLogManagerTest { private final TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Follower", 0)); private final Map topicIds = new HashMap<>(); private final TopicPartition tp = new TopicPartition("TestTopic", 5); + private final TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp); private final EpochEntry epochEntry0 = new EpochEntry(0, 0); private final EpochEntry epochEntry1 = new EpochEntry(1, 100); private final EpochEntry epochEntry2 = new EpochEntry(2, 200); @@ -3100,7 +3101,7 @@ public class RemoteLogManagerTest { ); RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo( - 0, false, tp, partitionData, FetchIsolation.TXN_COMMITTED + 0, false, tpId, partitionData, FetchIsolation.TXN_COMMITTED ); try (RemoteLogManager remoteLogManager = new RemoteLogManager( @@ -3180,7 +3181,7 @@ public class RemoteLogManagerTest { ); RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo( - 0, minOneMessage, tp, partitionData, FetchIsolation.HIGH_WATERMARK + 0, minOneMessage, tpId, partitionData, FetchIsolation.HIGH_WATERMARK ); try (RemoteLogManager remoteLogManager = new RemoteLogManager( @@ -3266,7 +3267,7 @@ public class RemoteLogManagerTest { when(firstBatch.sizeInBytes()).thenReturn(recordBatchSizeInBytes); doNothing().when(firstBatch).writeTo(capture.capture()); RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo( - 0, true, tp, partitionData, FetchIsolation.HIGH_WATERMARK + 0, true, tpId, partitionData, FetchIsolation.HIGH_WATERMARK ); @@ -3651,7 +3652,7 @@ public class RemoteLogManagerTest { FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData( Uuid.randomUuid(), fetchOffset, 0, 100, Optional.empty()); RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo( - 1048576, true, leaderTopicIdPartition.topicPartition(), + 1048576, true, leaderTopicIdPartition, partitionData, FetchIsolation.HIGH_WATERMARK); FetchDataInfo fetchDataInfo = remoteLogManager.read(remoteStorageFetchInfo); // firstBatch baseOffset may not be equal to the fetchOffset diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogReaderTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogReaderTest.java index 6c7026a52d5..efb6bb76e05 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogReaderTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogReaderTest.java @@ -18,7 +18,8 @@ package org.apache.kafka.server.log.remote.storage; import kafka.utils.TestUtils; -import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.record.Records; import org.apache.kafka.server.log.remote.quota.RLMQuotaManager; import org.apache.kafka.storage.internals.log.FetchDataInfo; @@ -69,7 +70,7 @@ public class RemoteLogReaderTest { when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenReturn(fetchDataInfo); Consumer callback = mock(Consumer.class); - RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null); + RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicIdPartition(Uuid.randomUuid(), 0, TOPIC), null, null); RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, brokerTopicStats, mockQuotaManager, timer); remoteLogReader.call(); @@ -102,7 +103,7 @@ public class RemoteLogReaderTest { when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenThrow(new RuntimeException("error")); Consumer callback = mock(Consumer.class); - RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null); + RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicIdPartition(Uuid.randomUuid(), 0, TOPIC), null, null); RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, brokerTopicStats, mockQuotaManager, timer); remoteLogReader.call();