diff --git a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java index d52dd2bf711..beb0a25ffed 100644 --- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java +++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java @@ -33,6 +33,7 @@ import kafka.server.KafkaConfig; import kafka.server.MetadataCache; import kafka.server.QuotaFactory.QuotaManagers; import kafka.server.ReplicaManager; +import kafka.server.share.DelayedShareFetch; import kafka.zk.KafkaZkClient; import org.apache.kafka.common.metrics.Metrics; @@ -70,6 +71,7 @@ public class ReplicaManagerBuilder { private Optional> delayedElectLeaderPurgatory = Optional.empty(); private Optional> delayedRemoteFetchPurgatory = Optional.empty(); private Optional> delayedRemoteListOffsetsPurgatory = Optional.empty(); + private Optional> delayedShareFetchPurgatory = Optional.empty(); private Optional threadNamePrefix = Optional.empty(); private Long brokerEpoch = -1L; private Optional addPartitionsToTxnManager = Optional.empty(); @@ -215,6 +217,7 @@ public class ReplicaManagerBuilder { OptionConverters.toScala(delayedElectLeaderPurgatory), OptionConverters.toScala(delayedRemoteFetchPurgatory), OptionConverters.toScala(delayedRemoteListOffsetsPurgatory), + OptionConverters.toScala(delayedShareFetchPurgatory), OptionConverters.toScala(threadNamePrefix), () -> brokerEpoch, OptionConverters.toScala(addPartitionsToTxnManager), diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index dd6d5453e6d..4a3d3154395 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -135,8 +135,12 @@ public class DelayedShareFetch extends DelayedOperation { // then we should check if there is a pending share fetch request for the topic-partition and complete it. // We add the action to delayed actions queue to avoid an infinite call stack, which could happen if // we directly call delayedShareFetchPurgatory.checkAndComplete - sharePartitionManager.addPurgatoryCheckAndCompleteDelayedActionToActionQueue( - topicPartitionData.keySet(), shareFetchData.groupId()); + replicaManager.addToActionQueue(() -> { + topicPartitionData.keySet().forEach(topicIdPartition -> + replicaManager.completeDelayedShareFetchRequest( + new DelayedShareFetchGroupKey(shareFetchData.groupId(), topicIdPartition.topicId(), topicIdPartition.partition()))); + return BoxedUnit.UNIT; + }); } } diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetchPartitionKey.java b/core/src/main/java/kafka/server/share/DelayedShareFetchPartitionKey.java index 1aa6b29edb8..02f0439cea1 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetchPartitionKey.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetchPartitionKey.java @@ -29,7 +29,7 @@ public class DelayedShareFetchPartitionKey implements DelayedShareFetchKey, Del private final Uuid topicId; private final int partition; - DelayedShareFetchPartitionKey(Uuid topicId, int partition) { + public DelayedShareFetchPartitionKey(Uuid topicId, int partition) { this.topicId = topicId; this.partition = partition; } diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 5011d435dde..f9aa2fb528f 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -16,7 +16,7 @@ */ package kafka.server.share; -import kafka.server.DelayedOperationPurgatory; +import kafka.server.ReplicaManager; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicIdPartition; @@ -271,9 +271,10 @@ public class SharePartition { private SharePartitionState partitionState; /** - * The delayed share fetch purgatory is used to store the share fetch requests that could not be processed immediately. + * The replica manager is used to check to see if any delayed share fetch request can be completed because of data + * availability due to acquisition lock timeout. */ - private final DelayedOperationPurgatory delayedShareFetchPurgatory; + private final ReplicaManager replicaManager; SharePartition( String groupId, @@ -284,7 +285,7 @@ public class SharePartition { Timer timer, Time time, Persister persister, - DelayedOperationPurgatory delayedShareFetchPurgatory, + ReplicaManager replicaManager, GroupConfigManager groupConfigManager ) { this.groupId = groupId; @@ -300,7 +301,7 @@ public class SharePartition { this.time = time; this.persister = persister; this.partitionState = SharePartitionState.EMPTY; - this.delayedShareFetchPurgatory = delayedShareFetchPurgatory; + this.replicaManager = replicaManager; this.groupConfigManager = groupConfigManager; } @@ -1810,7 +1811,7 @@ public class SharePartition { // If we have an acquisition lock timeout for a share-partition, then we should check if // there is a pending share fetch request for the share-partition and complete it. DelayedShareFetchKey delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()); - delayedShareFetchPurgatory.checkAndComplete(delayedShareFetchKey); + replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey); }); } } finally { diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index c1a5976cbd4..33bd5c37606 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -16,8 +16,6 @@ */ package kafka.server.share; -import kafka.server.ActionQueue; -import kafka.server.DelayedOperationPurgatory; import kafka.server.ReplicaManager; import org.apache.kafka.clients.consumer.AcknowledgeType; @@ -75,7 +73,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import scala.jdk.javaapi.CollectionConverters; -import scala.runtime.BoxedUnit; /** * The SharePartitionManager is responsible for managing the SharePartitions and ShareSessions. @@ -151,16 +148,6 @@ public class SharePartitionManager implements AutoCloseable { */ private final ShareGroupMetrics shareGroupMetrics; - /** - * The delayed share fetch purgatory is used to store the share fetch requests that could not be processed immediately. - */ - private final DelayedOperationPurgatory delayedShareFetchPurgatory; - - /** - * The delayed actions queue is used to complete any pending delayed share fetch actions. - */ - private final ActionQueue delayedActionsQueue; - public SharePartitionManager( ReplicaManager replicaManager, Time time, @@ -168,9 +155,7 @@ public class SharePartitionManager implements AutoCloseable { int defaultRecordLockDurationMs, int maxDeliveryCount, int maxInFlightMessages, - int shareFetchPurgatoryPurgeIntervalRequests, Persister persister, - ActionQueue delayedActionsQueue, GroupConfigManager groupConfigManager, Metrics metrics ) { @@ -181,9 +166,7 @@ public class SharePartitionManager implements AutoCloseable { defaultRecordLockDurationMs, maxDeliveryCount, maxInFlightMessages, - shareFetchPurgatoryPurgeIntervalRequests, persister, - delayedActionsQueue, groupConfigManager, metrics ); @@ -197,9 +180,7 @@ public class SharePartitionManager implements AutoCloseable { int defaultRecordLockDurationMs, int maxDeliveryCount, int maxInFlightMessages, - int shareFetchPurgatoryPurgeIntervalRequests, Persister persister, - ActionQueue delayedActionsQueue, GroupConfigManager groupConfigManager, Metrics metrics ) { @@ -215,8 +196,6 @@ public class SharePartitionManager implements AutoCloseable { this.maxDeliveryCount = maxDeliveryCount; this.maxInFlightMessages = maxInFlightMessages; this.persister = persister; - this.delayedShareFetchPurgatory = new DelayedOperationPurgatory<>("ShareFetch", this.timer, this.replicaManager.localBrokerId(), shareFetchPurgatoryPurgeIntervalRequests, true, true); - this.delayedActionsQueue = delayedActionsQueue; this.groupConfigManager = groupConfigManager; this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time); } @@ -234,8 +213,6 @@ public class SharePartitionManager implements AutoCloseable { int maxDeliveryCount, int maxInFlightMessages, Persister persister, - DelayedOperationPurgatory delayedShareFetchPurgatory, - ActionQueue delayedActionsQueue, GroupConfigManager groupConfigManager, Metrics metrics ) { @@ -250,8 +227,6 @@ public class SharePartitionManager implements AutoCloseable { this.maxDeliveryCount = maxDeliveryCount; this.maxInFlightMessages = maxInFlightMessages; this.persister = persister; - this.delayedShareFetchPurgatory = delayedShareFetchPurgatory; - this.delayedActionsQueue = delayedActionsQueue; this.groupConfigManager = groupConfigManager; this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time); } @@ -319,7 +294,7 @@ public class SharePartitionManager implements AutoCloseable { // If we have an acknowledgement completed for a topic-partition, then we should check if // there is a pending share fetch request for the topic-partition and complete it. DelayedShareFetchKey delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()); - delayedShareFetchPurgatory.checkAndComplete(delayedShareFetchKey); + replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey); futures.put(topicIdPartition, future); } else { @@ -338,15 +313,6 @@ public class SharePartitionManager implements AutoCloseable { }); } - void addPurgatoryCheckAndCompleteDelayedActionToActionQueue(Set topicIdPartitions, String groupId) { - delayedActionsQueue.add(() -> { - topicIdPartitions.forEach(topicIdPartition -> - delayedShareFetchPurgatory.checkAndComplete( - new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()))); - return BoxedUnit.UNIT; - }); - } - /** * The release session method is used to release the session for the memberId of respective group. * The method post removing session also releases acquired records for the respective member. @@ -397,7 +363,7 @@ public class SharePartitionManager implements AutoCloseable { // If we have a release acquired request completed for a topic-partition, then we should check if // there is a pending share fetch request for the topic-partition and complete it. DelayedShareFetchKey delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()); - delayedShareFetchPurgatory.checkAndComplete(delayedShareFetchKey); + replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey); futuresMap.put(topicIdPartition, future); } @@ -557,13 +523,12 @@ public class SharePartitionManager implements AutoCloseable { // Add the share fetch request to the delayed share fetch purgatory to process the fetch request if it can be // completed else watch until it can be completed/timeout. private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, Set keys) { - delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch, - CollectionConverters.asScala(keys).toSeq().indices()); + replicaManager.addDelayedShareFetchRequest(delayedShareFetch, + CollectionConverters.asScala(keys).toSeq()); } @Override public void close() throws Exception { - this.delayedShareFetchPurgatory.shutdown(); this.timer.close(); this.persister.stop(); if (!fetchQueue.isEmpty()) { @@ -676,7 +641,7 @@ public class SharePartitionManager implements AutoCloseable { timer, time, persister, - delayedShareFetchPurgatory, + replicaManager, groupConfigManager ); this.shareGroupMetrics.partitionLoadTime(start); diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 6ea90540401..77a7e6d46cb 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -25,6 +25,7 @@ import kafka.log._ import kafka.log.remote.RemoteLogManager import kafka.server._ import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache} +import kafka.server.share.{DelayedShareFetch, DelayedShareFetchPartitionKey} import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ import kafka.zookeeper.ZooKeeperClientException @@ -87,16 +88,20 @@ trait AlterPartitionListener { def markFailed(): Unit } -class DelayedOperations(topicPartition: TopicPartition, +class DelayedOperations(topicId: Option[Uuid], + topicPartition: TopicPartition, produce: DelayedOperationPurgatory[DelayedProduce], fetch: DelayedOperationPurgatory[DelayedFetch], - deleteRecords: DelayedOperationPurgatory[DelayedDeleteRecords]) { + deleteRecords: DelayedOperationPurgatory[DelayedDeleteRecords], + shareFetch: DelayedOperationPurgatory[DelayedShareFetch]) { def checkAndCompleteAll(): Unit = { val requestKey = TopicPartitionOperationKey(topicPartition) CoreUtils.swallow(() -> fetch.checkAndComplete(requestKey), fetch, Level.ERROR) CoreUtils.swallow(() -> produce.checkAndComplete(requestKey), produce, Level.ERROR) CoreUtils.swallow(() -> deleteRecords.checkAndComplete(requestKey), deleteRecords, Level.ERROR) + if (topicId.isDefined) CoreUtils.swallow(() -> shareFetch.checkAndComplete(new DelayedShareFetchPartitionKey( + topicId.get, topicPartition.partition())), shareFetch, Level.ERROR) } def numDelayedDelete: Int = deleteRecords.numDelayed @@ -132,10 +137,12 @@ object Partition { } val delayedOperations = new DelayedOperations( + topicId, topicPartition, replicaManager.delayedProducePurgatory, replicaManager.delayedFetchPurgatory, - replicaManager.delayedDeleteRecordsPurgatory) + replicaManager.delayedDeleteRecordsPurgatory, + replicaManager.delayedShareFetchPurgatory) new Partition(topicPartition, _topicId = topicId, diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 59c03c40315..c779482b50b 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -433,9 +433,7 @@ class BrokerServer( config.shareGroupConfig.shareGroupRecordLockDurationMs, config.shareGroupConfig.shareGroupDeliveryCountLimit, config.shareGroupConfig.shareGroupPartitionMaxRecordLocks, - config.shareGroupConfig.shareFetchPurgatoryPurgeIntervalRequests, persister, - defaultActionQueue, groupConfigManager, new Metrics() ) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index b0a76a2d70e..447722c026d 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -25,6 +25,7 @@ import kafka.server.HostedPartition.Online import kafka.server.QuotaFactory.QuotaManagers import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported} import kafka.server.metadata.ZkMetadataCache +import kafka.server.share.{DelayedShareFetch, DelayedShareFetchKey, DelayedShareFetchPartitionKey} import kafka.utils._ import kafka.zk.KafkaZkClient import org.apache.kafka.common.errors._ @@ -55,7 +56,7 @@ import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta} import org.apache.kafka.metadata.LeaderAndIsr import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.server.common -import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal} +import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, TopicOptionalIdPartition} import org.apache.kafka.server.common.MetadataVersion._ import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData} @@ -286,6 +287,7 @@ class ReplicaManager(val config: KafkaConfig, delayedElectLeaderPurgatoryParam: Option[DelayedOperationPurgatory[DelayedElectLeader]] = None, delayedRemoteFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedRemoteFetch]] = None, delayedRemoteListOffsetsPurgatoryParam: Option[DelayedOperationPurgatory[DelayedRemoteListOffsets]] = None, + delayedShareFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedShareFetch]] = None, threadNamePrefix: Option[String] = None, val brokerEpochSupplier: () => Long = () => -1, addPartitionsToTxnManager: Option[AddPartitionsToTxnManager] = None, @@ -315,6 +317,10 @@ class ReplicaManager(val config: KafkaConfig, val delayedRemoteListOffsetsPurgatory = delayedRemoteListOffsetsPurgatoryParam.getOrElse( DelayedOperationPurgatory[DelayedRemoteListOffsets]( purgatoryName = "RemoteListOffsets", brokerId = config.brokerId)) + val delayedShareFetchPurgatory = delayedShareFetchPurgatoryParam.getOrElse( + DelayedOperationPurgatory[DelayedShareFetch]( + purgatoryName = "ShareFetch", brokerId = config.brokerId, + purgeInterval = config.shareGroupConfig.shareFetchPurgatoryPurgeIntervalRequests)) /* epoch of the controller that last changed the leader */ @volatile private[server] var controllerEpoch: Int = KafkaController.InitialControllerEpoch @@ -463,12 +469,14 @@ class ReplicaManager(val config: KafkaConfig, }) } - private def completeDelayedOperationsWhenNotPartitionLeader(topicPartition: TopicPartition): Unit = { + private def completeDelayedOperationsWhenNotPartitionLeader(topicPartition: TopicPartition, topicId: Option[Uuid]): Unit = { val topicPartitionOperationKey = TopicPartitionOperationKey(topicPartition) delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey) delayedFetchPurgatory.checkAndComplete(topicPartitionOperationKey) delayedRemoteFetchPurgatory.checkAndComplete(topicPartitionOperationKey) delayedRemoteListOffsetsPurgatory.checkAndComplete(topicPartitionOperationKey) + if (topicId.isDefined) delayedShareFetchPurgatory.checkAndComplete( + new DelayedShareFetchPartitionKey(topicId.get, topicPartition.partition())) } /** @@ -480,6 +488,27 @@ class ReplicaManager(val config: KafkaConfig, topicPartitions.foreach(tp => delayedFetchPurgatory.checkAndComplete(TopicPartitionOperationKey(tp))) } + /** + * Complete any delayed share fetch requests that have been unblocked since new data is available from the leader + * for one of the partitions. This could happen due to acknowledgements, acquisition lock timeout of records, partition + * locks getting freed and release of acquired records due to share session close. + * @param delayedShareFetchKey The key corresponding to which the share fetch request has been stored in the purgatory + */ + private[server] def completeDelayedShareFetchRequest(delayedShareFetchKey: DelayedShareFetchKey): Unit = { + delayedShareFetchPurgatory.checkAndComplete(delayedShareFetchKey) + } + + /** + * Add and watch a share fetch request in the delayed share fetch purgatory corresponding to a set of keys in case it cannot be + * completed instantaneously, otherwise complete it. + * @param delayedShareFetch Refers to the DelayedOperation over share fetch request + * @param delayedShareFetchKeys The keys corresponding to which the delayed share fetch request will be stored in the purgatory + */ + private[server] def addDelayedShareFetchRequest(delayedShareFetch: DelayedShareFetch, + delayedShareFetchKeys : Seq[DelayedShareFetchKey]): Unit = { + delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch, delayedShareFetchKeys) + } + /** * Registers the provided listener to the partition iff the partition is online. */ @@ -610,6 +639,7 @@ class ReplicaManager(val config: KafkaConfig, val partitionsToDelete = mutable.Set.empty[TopicPartition] partitionsToStop.foreach { stopPartition => val topicPartition = stopPartition.topicPartition + var topicId: Option[Uuid] = None if (stopPartition.deleteLocalLog) { getPartition(topicPartition) match { case hostedPartition: HostedPartition.Online => @@ -618,6 +648,7 @@ class ReplicaManager(val config: KafkaConfig, // Logs are not deleted here. They are deleted in a single batch later on. // This is done to avoid having to checkpoint for every deletions. hostedPartition.partition.delete() + topicId = hostedPartition.partition.topicId } case _ => @@ -626,7 +657,7 @@ class ReplicaManager(val config: KafkaConfig, } // If we were the leader, we may have some operations still waiting for completion. // We force completion to prevent them from timing out. - completeDelayedOperationsWhenNotPartitionLeader(topicPartition) + completeDelayedOperationsWhenNotPartitionLeader(topicPartition, topicId) } // Third delete the logs and checkpoint. @@ -744,6 +775,8 @@ class ReplicaManager(val config: KafkaConfig, def tryCompleteActions(): Unit = defaultActionQueue.tryCompleteActions() + def addToActionQueue(action: () => Unit): Unit = defaultActionQueue.add(action) + /** * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas; * the callback function will be triggered either when timeout or the required acks are satisfied; @@ -783,13 +816,15 @@ class ReplicaManager(val config: KafkaConfig, } val sTime = time.milliseconds - val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, + val localProduceResultsWithTopicId = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, origin, entriesPerPartition, requiredAcks, requestLocal, verificationGuards.toMap) debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) + val localProduceResults : Map[TopicPartition, LogAppendResult] = localProduceResultsWithTopicId.map { + case(k, v) => (k.topicPartition, v)} val produceStatus = buildProducePartitionStatus(localProduceResults) - addCompletePurgatoryAction(actionQueue, localProduceResults) + addCompletePurgatoryAction(actionQueue, localProduceResultsWithTopicId) recordValidationStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordValidationStats }) @@ -940,17 +975,19 @@ class ReplicaManager(val config: KafkaConfig, private def addCompletePurgatoryAction( actionQueue: ActionQueue, - appendResults: Map[TopicPartition, LogAppendResult] + appendResults: Map[TopicOptionalIdPartition, LogAppendResult] ): Unit = { actionQueue.add { - () => appendResults.foreach { case (topicPartition, result) => - val requestKey = TopicPartitionOperationKey(topicPartition) + () => appendResults.foreach { case (topicOptionalIdPartition, result) => + val requestKey = TopicPartitionOperationKey(topicOptionalIdPartition.topicPartition) result.info.leaderHwChange match { case LeaderHwChange.INCREASED => // some delayed operations may be unblocked after HW changed delayedProducePurgatory.checkAndComplete(requestKey) delayedFetchPurgatory.checkAndComplete(requestKey) delayedDeleteRecordsPurgatory.checkAndComplete(requestKey) + if (topicOptionalIdPartition.topicId.isPresent) delayedShareFetchPurgatory.checkAndComplete(new DelayedShareFetchPartitionKey( + topicOptionalIdPartition.topicId.get, topicOptionalIdPartition.partition)) case LeaderHwChange.SAME => // probably unblock some follower fetch requests since log end offset has been updated delayedFetchPurgatory.checkAndComplete(requestKey) @@ -1392,7 +1429,8 @@ class ReplicaManager(val config: KafkaConfig, entriesPerPartition: Map[TopicPartition, MemoryRecords], requiredAcks: Short, requestLocal: RequestLocal, - verificationGuards: Map[TopicPartition, VerificationGuard]): Map[TopicPartition, LogAppendResult] = { + verificationGuards: Map[TopicPartition, VerificationGuard]): + Map[TopicOptionalIdPartition, LogAppendResult] = { val traceEnabled = isTraceEnabled def processFailedRecord(topicPartition: TopicPartition, t: Throwable) = { val logStartOffset = onlinePartition(topicPartition).map(_.logStartOffset).getOrElse(-1L) @@ -1417,7 +1455,7 @@ class ReplicaManager(val config: KafkaConfig, // reject appending to internal topics if it is not allowed if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) { - (topicPartition, LogAppendResult( + (new TopicOptionalIdPartition(Optional.empty(), topicPartition), LogAppendResult( LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}")), hasCustomErrorMessage = false)) @@ -1438,7 +1476,10 @@ class ReplicaManager(val config: KafkaConfig, trace(s"${records.sizeInBytes} written to log $topicPartition beginning at offset " + s"${info.firstOffset} and ending at offset ${info.lastOffset}") - (topicPartition, LogAppendResult(info, exception = None, hasCustomErrorMessage = false)) + var topicId: Optional[Uuid] = Optional.empty() + if (partition.topicId.isDefined) topicId = Optional.of(partition.topicId.get) + + (new TopicOptionalIdPartition(topicId, topicPartition), LogAppendResult(info, exception = None, hasCustomErrorMessage = false)) } catch { // NOTE: Failed produce requests metric is not incremented for known exceptions // it is supposed to indicate un-expected failures of a broker in handling a produce request @@ -1448,15 +1489,15 @@ class ReplicaManager(val config: KafkaConfig, _: RecordBatchTooLargeException | _: CorruptRecordException | _: KafkaStorageException) => - (topicPartition, LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(e), hasCustomErrorMessage = false)) + (new TopicOptionalIdPartition(Optional.empty(), topicPartition), LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(e), hasCustomErrorMessage = false)) case rve: RecordValidationException => val logStartOffset = processFailedRecord(topicPartition, rve.invalidException) val recordErrors = rve.recordErrors - (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(logStartOffset, recordErrors), + (new TopicOptionalIdPartition(Optional.empty(), topicPartition), LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(logStartOffset, recordErrors), Some(rve.invalidException), hasCustomErrorMessage = true)) case t: Throwable => val logStartOffset = processFailedRecord(topicPartition, t) - (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset), + (new TopicOptionalIdPartition(Optional.empty(), topicPartition), LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset), Some(t), hasCustomErrorMessage = false)) } } @@ -2452,7 +2493,7 @@ class ReplicaManager(val config: KafkaConfig, s"epoch $controllerEpoch with correlation id $correlationId for ${partitionsToMakeFollower.size} partitions") partitionsToMakeFollower.foreach { partition => - completeDelayedOperationsWhenNotPartitionLeader(partition.topicPartition) + completeDelayedOperationsWhenNotPartitionLeader(partition.topicPartition, partition.topicId) } if (isShuttingDown.get()) { @@ -2676,6 +2717,7 @@ class ReplicaManager(val config: KafkaConfig, delayedProducePurgatory.shutdown() delayedDeleteRecordsPurgatory.shutdown() delayedElectLeaderPurgatory.shutdown() + delayedShareFetchPurgatory.shutdown() if (checkpointHW) checkpointHighWatermarks() replicaSelectorOpt.foreach(_.close) @@ -3029,7 +3071,8 @@ class ReplicaManager(val config: KafkaConfig, replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets) stateChangeLogger.info(s"Started fetchers as part of become-follower for ${partitionsToStartFetching.size} partitions") - partitionsToStartFetching.keySet.foreach(completeDelayedOperationsWhenNotPartitionLeader) + partitionsToStartFetching.foreach{ case (topicPartition, partition) => + completeDelayedOperationsWhenNotPartitionLeader(topicPartition, partition.topicId)} updateLeaderAndFollowerMetrics(followerTopicSet) } diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 31af83a5875..31f2975d349 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -16,7 +16,6 @@ */ package kafka.server.share; -import kafka.server.DelayedActionQueue; import kafka.server.DelayedOperationPurgatory; import kafka.server.ReplicaManager; import kafka.server.ReplicaQuota; @@ -54,6 +53,7 @@ import scala.jdk.javaapi.CollectionConverters; import static kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL; import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES; import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult; +import static kafka.server.share.SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -330,6 +330,7 @@ public class DelayedShareFetchTest { DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( "TestShareFetch", mockTimer, replicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); Set delayedShareFetchWatchKeys = new HashSet<>(); partitionMaxBytes1.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()))); @@ -357,16 +358,12 @@ public class DelayedShareFetchTest { doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); - DelayedActionQueue delayedActionQueue = spy(new DelayedActionQueue()); - Map partitionCacheMap = new ConcurrentHashMap<>(); partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2); SharePartitionManager sharePartitionManager2 = SharePartitionManagerTest.SharePartitionManagerBuilder .builder() - .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) - .withDelayedActionsQueue(delayedActionQueue) .withPartitionCacheMap(partitionCacheMap) .build(); @@ -390,8 +387,8 @@ public class DelayedShareFetchTest { Mockito.verify(replicaManager, times(1)).readFromLog( any(), any(), any(ReplicaQuota.class), anyBoolean()); assertFalse(delayedShareFetch1.isCompleted()); - Mockito.verify(delayedActionQueue, times(1)).add(any()); - Mockito.verify(delayedActionQueue, times(0)).tryCompleteActions(); + Mockito.verify(replicaManager, times(1)).addToActionQueue(any()); + Mockito.verify(replicaManager, times(0)).tryCompleteActions(); } static class DelayedShareFetchBuilder { diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index dedb61fa165..8a6e729a167 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -16,7 +16,6 @@ */ package kafka.server.share; -import kafka.server.DelayedActionQueue; import kafka.server.DelayedOperationPurgatory; import kafka.server.LogReadResult; import kafka.server.ReplicaManager; @@ -1034,13 +1033,13 @@ public class SharePartitionManagerTest { DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( "TestShareFetch", mockTimer, replicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() .withReplicaManager(replicaManager) .withTime(time) .withMetrics(metrics) .withTimer(mockTimer) - .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) .build(); doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); @@ -1096,12 +1095,12 @@ public class SharePartitionManagerTest { DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( "TestShareFetch", mockTimer, replicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() .withTime(time) .withReplicaManager(replicaManager) .withTimer(mockTimer) - .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) .build(); SharePartition sp0 = mock(SharePartition.class); @@ -1192,12 +1191,13 @@ public class SharePartitionManagerTest { DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( "TestShareFetch", mockTimer, replicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() .withPartitionCacheMap(partitionCacheMap) .withReplicaManager(replicaManager) .withTimer(mockTimer) - .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory).build(); + .build(); CompletableFuture> future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), fetchParams, partitionMaxBytes); @@ -1223,11 +1223,11 @@ public class SharePartitionManagerTest { DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( "TestShareFetch", mockTimer, replicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() .withReplicaManager(replicaManager) .withTimer(mockTimer) - .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) .build(); doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); @@ -1667,12 +1667,12 @@ public class SharePartitionManagerTest { DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( "TestShareFetch", mockTimer, replicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() .withReplicaManager(replicaManager) .withTime(time) .withTimer(mockTimer) - .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) .withFetchQueue(fetchQueue).build(); doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); @@ -1724,6 +1724,7 @@ public class SharePartitionManagerTest { DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( "TestShareFetch", mockTimer, replicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); // Initially you cannot acquire records for both sp1 and sp2. when(sp1.maybeAcquireFetchLock()).thenReturn(true); @@ -1736,7 +1737,6 @@ public class SharePartitionManagerTest { SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() .withPartitionCacheMap(partitionCacheMap) - .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) .withReplicaManager(replicaManager) .withTimer(mockTimer) .build(); @@ -1820,6 +1820,7 @@ public class SharePartitionManagerTest { DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( "TestShareFetch", mockTimer, replicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); // Initially you cannot acquire records for both all 3 share partitions. when(sp1.maybeAcquireFetchLock()).thenReturn(true); @@ -1834,7 +1835,6 @@ public class SharePartitionManagerTest { SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() .withPartitionCacheMap(partitionCacheMap) - .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) .withReplicaManager(replicaManager) .withTimer(mockTimer) .build(); @@ -1913,6 +1913,7 @@ public class SharePartitionManagerTest { DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( "TestShareFetch", mockTimer, replicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); // Initially you cannot acquire records for both sp1 and sp2. when(sp1.maybeAcquireFetchLock()).thenReturn(true); @@ -1926,7 +1927,6 @@ public class SharePartitionManagerTest { SharePartitionManager sharePartitionManager = spy(SharePartitionManagerBuilder.builder() .withPartitionCacheMap(partitionCacheMap) .withCache(cache) - .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) .withReplicaManager(replicaManager) .withTimer(mockTimer) .build()); @@ -2013,6 +2013,7 @@ public class SharePartitionManagerTest { DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( "TestShareFetch", mockTimer, replicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); // Initially you cannot acquire records for both all 3 share partitions. when(sp1.maybeAcquireFetchLock()).thenReturn(true); @@ -2028,7 +2029,6 @@ public class SharePartitionManagerTest { SharePartitionManager sharePartitionManager = spy(SharePartitionManagerBuilder.builder() .withPartitionCacheMap(partitionCacheMap) .withCache(cache) - .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) .withReplicaManager(replicaManager) .withTimer(mockTimer) .build()); @@ -2084,10 +2084,11 @@ public class SharePartitionManagerTest { DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( "TestShareFetch", mockTimer, replicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() .withPartitionCacheMap(partitionCacheMap).withReplicaManager(replicaManager).withTimer(mockTimer) - .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory).build(); + .build(); CompletableFuture> future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), fetchParams, partitionMaxBytes); @@ -2122,10 +2123,11 @@ public class SharePartitionManagerTest { DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( "TestShareFetch", mockTimer, replicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() .withPartitionCacheMap(partitionCacheMap).withReplicaManager(replicaManager).withTimer(mockTimer) - .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory).build(); + .build(); // Return LeaderNotAvailableException to simulate initialization failure. when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new LeaderNotAvailableException("Leader not available"))); @@ -2289,6 +2291,21 @@ public class SharePartitionManagerTest { return CollectionConverters.asScala(logReadResults).toSeq(); } + static void mockReplicaManagerDelayedShareFetch(ReplicaManager replicaManager, + DelayedOperationPurgatory delayedShareFetchPurgatory) { + doAnswer(invocationOnMock -> { + Object[] args = invocationOnMock.getArguments(); + delayedShareFetchPurgatory.checkAndComplete(args[0]); + return null; + }).when(replicaManager).completeDelayedShareFetchRequest(any(DelayedShareFetchKey.class)); + + doAnswer(invocationOnMock -> { + Object[] args = invocationOnMock.getArguments(); + delayedShareFetchPurgatory.tryCompleteElseWatch((DelayedShareFetch) args[0], (Seq) args[1]); + return null; + }).when(replicaManager).addDelayedShareFetchRequest(any(), any()); + } + static class SharePartitionManagerBuilder { private ReplicaManager replicaManager = mock(ReplicaManager.class); private Time time = new MockTime(); @@ -2298,8 +2315,6 @@ public class SharePartitionManagerTest { private Timer timer = new MockTimer(); private Metrics metrics = new Metrics(); private ConcurrentLinkedQueue fetchQueue = new ConcurrentLinkedQueue<>(); - private DelayedOperationPurgatory delayedShareFetchPurgatory = mock(DelayedOperationPurgatory.class); - private DelayedActionQueue delayedActionsQueue = mock(DelayedActionQueue.class); private SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) { this.replicaManager = replicaManager; @@ -2341,16 +2356,6 @@ public class SharePartitionManagerTest { return this; } - SharePartitionManagerBuilder withDelayedShareFetchPurgatory(DelayedOperationPurgatory delayedShareFetchPurgatory) { - this.delayedShareFetchPurgatory = delayedShareFetchPurgatory; - return this; - } - - SharePartitionManagerBuilder withDelayedActionsQueue(DelayedActionQueue delayedActionsQueue) { - this.delayedActionsQueue = delayedActionsQueue; - return this; - } - public static SharePartitionManagerBuilder builder() { return new SharePartitionManagerBuilder(); } @@ -2366,8 +2371,6 @@ public class SharePartitionManagerTest { MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES, persister, - delayedShareFetchPurgatory, - delayedActionsQueue, mock(GroupConfigManager.class), metrics); } diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index 3ebb6bdb4e8..0d20a828081 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -16,7 +16,7 @@ */ package kafka.server.share; -import kafka.server.DelayedOperationPurgatory; +import kafka.server.ReplicaManager; import kafka.server.share.SharePartition.InFlightState; import kafka.server.share.SharePartition.RecordState; import kafka.server.share.SharePartition.SharePartitionState; @@ -4949,7 +4949,7 @@ public class SharePartitionTest { private int maxDeliveryCount = MAX_DELIVERY_COUNT; private int maxInflightMessages = MAX_IN_FLIGHT_MESSAGES; private Persister persister = NoOpShareStatePersister.getInstance(); - private final DelayedOperationPurgatory delayedShareFetchPurgatory = Mockito.mock(DelayedOperationPurgatory.class); + private final ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); private GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class); private SharePartitionBuilder withMaxInflightMessages(int maxInflightMessages) { @@ -4982,8 +4982,8 @@ public class SharePartitionTest { } public SharePartition build() { - return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, maxInflightMessages, maxDeliveryCount, defaultAcquisitionLockTimeoutMs, - mockTimer, MOCK_TIME, persister, delayedShareFetchPurgatory, groupConfigManager); + return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, maxInflightMessages, maxDeliveryCount, + defaultAcquisitionLockTimeoutMs, mockTimer, MOCK_TIME, persister, replicaManager, groupConfigManager); } } } diff --git a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala index 9b93e95ebe8..b3ad8a84400 100644 --- a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala @@ -21,7 +21,7 @@ import kafka.server.MetadataCache import kafka.server.metadata.MockConfigRepository import kafka.utils.TestUtils import kafka.utils.TestUtils.MockAlterPartitionManager -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.utils.Utils @@ -49,6 +49,7 @@ class AbstractPartitionTest { val brokerId = AbstractPartitionTest.brokerId val remoteReplicaId = brokerId + 1 + val topicId : Option[Uuid] = Option(Uuid.randomUuid()) val topicPartition = new TopicPartition("test-topic", 0) val time = new MockTime() var tmpDir: File = _ diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index f09f4e0821d..1ab842e4888 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -46,6 +46,7 @@ import java.nio.ByteBuffer import java.util.Optional import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Semaphore} import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache} +import kafka.server.share.{DelayedShareFetch, DelayedShareFetchPartitionKey} import org.apache.kafka.clients.ClientResponse import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.config.TopicConfig @@ -4110,7 +4111,11 @@ class PartitionTest extends AbstractPartitionTest { val deleteRecords = mock(classOf[DelayedOperationPurgatory[DelayedDeleteRecords]]) when(deleteRecords.checkAndComplete(requestKey)).thenThrow(new RuntimeException("uh oh")) - val delayedOperations = new DelayedOperations(topicPartition, produce, fetch, deleteRecords) + val shareFetch = mock(classOf[DelayedOperationPurgatory[DelayedShareFetch]]) + when(shareFetch.checkAndComplete(new DelayedShareFetchPartitionKey(topicId.get, topicPartition.partition()))) + .thenThrow(new RuntimeException("uh oh")) + + val delayedOperations = new DelayedOperations(topicId, topicPartition, produce, fetch, deleteRecords, shareFetch) val spyLogManager = spy(logManager) val partition = new Partition(topicPartition, replicaLagTimeMaxMs = ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_DEFAULT, diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index b7d5ca1c249..af82cffd6de 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -26,6 +26,7 @@ import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_S import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} import kafka.server.epoch.util.MockBlockingSender +import kafka.server.share.DelayedShareFetch import kafka.utils.TestUtils.waitUntilTrue import kafka.utils.{Pool, TestUtils} import kafka.zk.KafkaZkClient @@ -3004,6 +3005,8 @@ class ReplicaManagerTest { purgatoryName = "RemoteFetch", timer, reaperEnabled = false) val mockRemoteListOffsetsPurgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets]( purgatoryName = "RemoteListOffsets", timer, reaperEnabled = false) + val mockDelayedShareFetchPurgatory = new DelayedOperationPurgatory[DelayedShareFetch]( + purgatoryName = "ShareFetch", timer, reaperEnabled = false) // Mock network client to show leader offset of 5 val blockingSend = new MockBlockingSender( @@ -3030,6 +3033,7 @@ class ReplicaManagerTest { delayedElectLeaderPurgatoryParam = Some(mockElectLeaderPurgatory), delayedRemoteFetchPurgatoryParam = Some(mockRemoteFetchPurgatory), delayedRemoteListOffsetsPurgatoryParam = Some(mockRemoteListOffsetsPurgatory), + delayedShareFetchPurgatoryParam = Some(mockDelayedShareFetchPurgatory), threadNamePrefix = Option(this.getClass.getName)) { override protected def createReplicaFetcherManager(metrics: Metrics, @@ -3428,6 +3432,8 @@ class ReplicaManagerTest { purgatoryName = "DelayedRemoteFetch", timer, reaperEnabled = false) val mockDelayedRemoteListOffsetsPurgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets]( purgatoryName = "RemoteListOffsets", timer, reaperEnabled = false) + val mockDelayedShareFetchPurgatory = new DelayedOperationPurgatory[DelayedShareFetch]( + purgatoryName = "ShareFetch", timer, reaperEnabled = false) when(metadataCache.contains(new TopicPartition(topic, 0))).thenReturn(true) @@ -3461,6 +3467,7 @@ class ReplicaManagerTest { delayedElectLeaderPurgatoryParam = Some(mockDelayedElectLeaderPurgatory), delayedRemoteFetchPurgatoryParam = Some(mockDelayedRemoteFetchPurgatory), delayedRemoteListOffsetsPurgatoryParam = Some(mockDelayedRemoteListOffsetsPurgatory), + delayedShareFetchPurgatoryParam = Some(mockDelayedShareFetchPurgatory), threadNamePrefix = Option(this.getClass.getName), addPartitionsToTxnManager = Some(addPartitionsToTxnManager), directoryEventHandler = directoryEventHandler, diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index 242d65556eb..9b92b943e1d 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -181,7 +181,7 @@ public class ReplicaFetcherThreadBenchmark { Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), tp)).thenReturn(Optional.of(0L)); AlterPartitionManager isrChannelManager = Mockito.mock(AlterPartitionManager.class); Partition partition = new Partition(tp, 100, MetadataVersion.latestTesting(), - 0, () -> -1, Time.SYSTEM, alterPartitionListener, new DelayedOperationsMock(tp), + 0, () -> -1, Time.SYSTEM, alterPartitionListener, new DelayedOperationsMock(topicId, tp), Mockito.mock(MetadataCache.class), logManager, isrChannelManager, topicId); partition.makeFollower(partitionState, offsetCheckpoints, topicId, Option.empty()); @@ -277,8 +277,8 @@ public class ReplicaFetcherThreadBenchmark { // avoid mocked DelayedOperations to avoid mocked class affecting benchmark results private static class DelayedOperationsMock extends DelayedOperations { - DelayedOperationsMock(TopicPartition topicPartition) { - super(topicPartition, null, null, null); + DelayedOperationsMock(Option topicId, TopicPartition topicPartition) { + super(topicId, topicPartition, null, null, null, null); } @Override diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java index a42e725db86..2a35bb97a6c 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java @@ -139,7 +139,7 @@ public class UpdateFollowerFetchStateBenchmark { // avoid mocked DelayedOperations to avoid mocked class affecting benchmark results private class DelayedOperationsMock extends DelayedOperations { DelayedOperationsMock() { - super(topicPartition, null, null, null); + super(topicId, topicPartition, null, null, null, null); } @Override diff --git a/server-common/src/main/java/org/apache/kafka/server/common/TopicOptionalIdPartition.java b/server-common/src/main/java/org/apache/kafka/server/common/TopicOptionalIdPartition.java new file mode 100644 index 00000000000..00d3976a623 --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/common/TopicOptionalIdPartition.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; + +import java.util.Objects; +import java.util.Optional; + +/** + * This represents universally unique identifier with topic id for a topic partition. However, for this wrapper, we can + * have an optional topic id with a not null topic partition to account for the functionalities that don't have topic id incorporated yet. + */ +public class TopicOptionalIdPartition { + + private final Optional topicId; + private final TopicPartition topicPartition; + + /** + * Create an instance with the provided parameters. + * + * @param topicId the topic id + * @param topicPartition the topic partition + */ + public TopicOptionalIdPartition(Optional topicId, TopicPartition topicPartition) { + this.topicId = topicId; + this.topicPartition = Objects.requireNonNull(topicPartition, "topicPartition can not be null"); + } + + /** + * @return Universally unique id representing this topic partition. + */ + public Optional topicId() { + return topicId; + } + + /** + * @return the topic name. + */ + public String topic() { + return topicPartition.topic(); + } + + /** + * @return the partition id. + */ + public int partition() { + return topicPartition.partition(); + } + + /** + * @return Topic partition representing this instance. + */ + public TopicPartition topicPartition() { + return topicPartition; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TopicOptionalIdPartition that = (TopicOptionalIdPartition) o; + return topicId.equals(that.topicId) && + topicPartition.equals(that.topicPartition); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 0; + if (topicId.isPresent()) { + result = prime + topicId.get().hashCode(); + } + result = prime * result + topicPartition.hashCode(); + return result; + } + + @Override + public String toString() { + return topicId.map(uuid -> uuid + ":" + topic() + "-" + partition()).orElseGet(() -> "none" + ":" + topic() + "-" + partition()); + } +}