diff --git a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java index 626b53c12c4..b580485139b 100644 --- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java +++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java @@ -22,7 +22,6 @@ import kafka.log.remote.RemoteLogManager; import kafka.server.AddPartitionsToTxnManager; import kafka.server.AlterPartitionManager; import kafka.server.DelayedDeleteRecords; -import kafka.server.DelayedElectLeader; import kafka.server.DelayedFetch; import kafka.server.DelayedProduce; import kafka.server.DelayedRemoteFetch; @@ -66,7 +65,6 @@ public class ReplicaManagerBuilder { private Optional> delayedProducePurgatory = Optional.empty(); private Optional> delayedFetchPurgatory = Optional.empty(); private Optional> delayedDeleteRecordsPurgatory = Optional.empty(); - private Optional> delayedElectLeaderPurgatory = Optional.empty(); private Optional> delayedRemoteFetchPurgatory = Optional.empty(); private Optional> delayedRemoteListOffsetsPurgatory = Optional.empty(); private Optional> delayedShareFetchPurgatory = Optional.empty(); @@ -130,36 +128,11 @@ public class ReplicaManagerBuilder { return this; } - public ReplicaManagerBuilder setIsShuttingDown(AtomicBoolean isShuttingDown) { - this.isShuttingDown = isShuttingDown; - return this; - } - - public ReplicaManagerBuilder setDelayedProducePurgatory(DelayedOperationPurgatory delayedProducePurgatory) { - this.delayedProducePurgatory = Optional.of(delayedProducePurgatory); - return this; - } - public ReplicaManagerBuilder setDelayedFetchPurgatory(DelayedOperationPurgatory delayedFetchPurgatory) { this.delayedFetchPurgatory = Optional.of(delayedFetchPurgatory); return this; } - public ReplicaManagerBuilder setDelayedRemoteFetchPurgatory(DelayedOperationPurgatory delayedRemoteFetchPurgatory) { - this.delayedRemoteFetchPurgatory = Optional.of(delayedRemoteFetchPurgatory); - return this; - } - - public ReplicaManagerBuilder setDelayedDeleteRecordsPurgatory(DelayedOperationPurgatory delayedDeleteRecordsPurgatory) { - this.delayedDeleteRecordsPurgatory = Optional.of(delayedDeleteRecordsPurgatory); - return this; - } - - public ReplicaManagerBuilder setDelayedElectLeaderPurgatoryParam(DelayedOperationPurgatory delayedElectLeaderPurgatory) { - this.delayedElectLeaderPurgatory = Optional.of(delayedElectLeaderPurgatory); - return this; - } - public ReplicaManagerBuilder setThreadNamePrefix(String threadNamePrefix) { this.threadNamePrefix = Optional.of(threadNamePrefix); return this; @@ -170,11 +143,6 @@ public class ReplicaManagerBuilder { return this; } - public ReplicaManagerBuilder setAddPartitionsToTransactionManager(AddPartitionsToTxnManager addPartitionsToTxnManager) { - this.addPartitionsToTxnManager = Optional.of(addPartitionsToTxnManager); - return this; - } - public ReplicaManagerBuilder setDirectoryEventHandler(DirectoryEventHandler directoryEventHandler) { this.directoryEventHandler = directoryEventHandler; return this; @@ -206,7 +174,6 @@ public class ReplicaManagerBuilder { OptionConverters.toScala(delayedProducePurgatory), OptionConverters.toScala(delayedFetchPurgatory), OptionConverters.toScala(delayedDeleteRecordsPurgatory), - OptionConverters.toScala(delayedElectLeaderPurgatory), OptionConverters.toScala(delayedRemoteFetchPurgatory), OptionConverters.toScala(delayedRemoteListOffsetsPurgatory), OptionConverters.toScala(delayedShareFetchPurgatory), diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 4764bba5cb7..5035c86aa06 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -1635,24 +1635,6 @@ class Partition(val topicPartition: TopicPartition, localLog.fetchOffsetSnapshot } - def legacyFetchOffsetsForTimestamp(timestamp: Long, - maxNumOffsets: Int, - isFromConsumer: Boolean, - fetchOnlyFromLeader: Boolean): Seq[Long] = inReadLock(leaderIsrUpdateLock) { - val localLog = localLogWithEpochOrThrow(Optional.empty(), fetchOnlyFromLeader) - val allOffsets = localLog.legacyFetchOffsetsBefore(timestamp, maxNumOffsets) - - if (!isFromConsumer) { - allOffsets - } else { - val hw = localLog.highWatermark - if (allOffsets.exists(_ > hw)) - hw +: allOffsets.dropWhile(_ > hw) - else - allOffsets - } - } - def logStartOffset: Long = { inReadLock(leaderIsrUpdateLock) { leaderLogIfLocal.map(_.logStartOffset).getOrElse(-1) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index b76e151c2d3..d8e415028ea 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -53,7 +53,7 @@ import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common} import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition, TopicOptionalIdPartition} import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.network.BrokerEndPoint -import org.apache.kafka.server.purgatory.{DelayedOperationKey, DelayedOperationPurgatory, TopicPartitionOperationKey} +import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey} import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFetchPartitionKey} import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData} import org.apache.kafka.server.util.{Scheduler, ShutdownableThread} @@ -274,7 +274,6 @@ class ReplicaManager(val config: KafkaConfig, delayedProducePurgatoryParam: Option[DelayedOperationPurgatory[DelayedProduce]] = None, delayedFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedFetch]] = None, delayedDeleteRecordsPurgatoryParam: Option[DelayedOperationPurgatory[DelayedDeleteRecords]] = None, - delayedElectLeaderPurgatoryParam: Option[DelayedOperationPurgatory[DelayedElectLeader]] = None, delayedRemoteFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedRemoteFetch]] = None, delayedRemoteListOffsetsPurgatoryParam: Option[DelayedOperationPurgatory[DelayedRemoteListOffsets]] = None, delayedShareFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedShareFetch]] = None, @@ -298,9 +297,6 @@ class ReplicaManager(val config: KafkaConfig, new DelayedOperationPurgatory[DelayedDeleteRecords]( "DeleteRecords", config.brokerId, config.deleteRecordsPurgatoryPurgeIntervalRequests)) - val delayedElectLeaderPurgatory = delayedElectLeaderPurgatoryParam.getOrElse( - new DelayedOperationPurgatory[DelayedElectLeader]( - "ElectLeader", config.brokerId)) val delayedRemoteFetchPurgatory = delayedRemoteFetchPurgatoryParam.getOrElse( new DelayedOperationPurgatory[DelayedRemoteFetch]( "RemoteFetch", config.brokerId)) @@ -387,13 +383,6 @@ class ReplicaManager(val config: KafkaConfig, def getLog(topicPartition: TopicPartition): Option[UnifiedLog] = logManager.getLog(topicPartition) - def hasDelayedElectionOperations: Boolean = delayedElectLeaderPurgatory.numDelayed != 0 - - def tryCompleteElection(key: DelayedOperationKey): Unit = { - val completed = delayedElectLeaderPurgatory.checkAndComplete(key) - debug("Request key %s unblocked %d ElectLeader.".format(key.keyLabel, completed)) - } - def startup(): Unit = { // start ISR expiration thread // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR @@ -628,10 +617,6 @@ class ReplicaManager(val config: KafkaConfig, onlinePartition(topicPartition).flatMap(_.log) } - def getLogDir(topicPartition: TopicPartition): Option[String] = { - localLog(topicPartition).map(_.parentDir) - } - def tryCompleteActions(): Unit = defaultActionQueue.tryCompleteActions() def addToActionQueue(action: Runnable): Unit = defaultActionQueue.add(action) @@ -1490,15 +1475,6 @@ class ReplicaManager(val config: KafkaConfig, partition.fetchOffsetForTimestamp(timestamp, isolationLevel, currentLeaderEpoch, fetchOnlyFromLeader, remoteLogManager) } - def legacyFetchOffsetsForTimestamp(topicPartition: TopicPartition, - timestamp: Long, - maxNumOffsets: Int, - isFromConsumer: Boolean, - fetchOnlyFromLeader: Boolean): Seq[Long] = { - val partition = getPartitionOrException(topicPartition) - partition.legacyFetchOffsetsForTimestamp(timestamp, maxNumOffsets, isFromConsumer, fetchOnlyFromLeader) - } - /** * Returns [[LogReadResult]] with error if a task for RemoteStorageFetchInfo could not be scheduled successfully * else returns [[None]]. @@ -2525,7 +2501,6 @@ class ReplicaManager(val config: KafkaConfig, delayedRemoteListOffsetsPurgatory.shutdown() delayedProducePurgatory.shutdown() delayedDeleteRecordsPurgatory.shutdown() - delayedElectLeaderPurgatory.shutdown() delayedShareFetchPurgatory.shutdown() if (checkpointHW) checkpointHighWatermarks() diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala index df8b6b6f6d4..bdc12c3051e 100644 --- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala @@ -190,7 +190,6 @@ object AbstractCoordinatorConcurrencyTest { delayedProducePurgatoryParam = Some(producePurgatory), delayedFetchPurgatoryParam = Some(delayedFetchPurgatoryParam), delayedDeleteRecordsPurgatoryParam = Some(delayedDeleteRecordsPurgatoryParam), - delayedElectLeaderPurgatoryParam = Some(delayedElectLeaderPurgatoryParam), delayedRemoteFetchPurgatoryParam = Some(delayedRemoteFetchPurgatoryParam), delayedRemoteListOffsetsPurgatoryParam = Some(delayedRemoteListOffsetsPurgatoryParam), threadNamePrefix = Option(this.getClass.getName)) { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 6f6bc663082..a6aa6f4b5ee 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -2723,8 +2723,6 @@ class ReplicaManagerTest { "Fetch", timer, 0, false) val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords]( "DeleteRecords", timer, 0, false) - val mockElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader]( - "ElectLeader", timer, 0, false) val mockRemoteFetchPurgatory = new DelayedOperationPurgatory[DelayedRemoteFetch]( "RemoteFetch", timer, 0, false) val mockRemoteListOffsetsPurgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets]( @@ -2754,7 +2752,6 @@ class ReplicaManagerTest { delayedProducePurgatoryParam = Some(mockProducePurgatory), delayedFetchPurgatoryParam = Some(mockFetchPurgatory), delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory), - delayedElectLeaderPurgatoryParam = Some(mockElectLeaderPurgatory), delayedRemoteFetchPurgatoryParam = Some(mockRemoteFetchPurgatory), delayedRemoteListOffsetsPurgatoryParam = Some(mockRemoteListOffsetsPurgatory), delayedShareFetchPurgatoryParam = Some(mockDelayedShareFetchPurgatory), @@ -3150,8 +3147,6 @@ class ReplicaManagerTest { "Fetch", timer, 0, false) val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords]( "DeleteRecords", timer, 0, false) - val mockDelayedElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader]( - "DelayedElectLeader", timer, 0, false) val mockDelayedRemoteFetchPurgatory = new DelayedOperationPurgatory[DelayedRemoteFetch]( "DelayedRemoteFetch", timer, 0, false) val mockDelayedRemoteListOffsetsPurgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets]( @@ -3188,7 +3183,6 @@ class ReplicaManagerTest { delayedProducePurgatoryParam = Some(mockProducePurgatory), delayedFetchPurgatoryParam = Some(mockFetchPurgatory), delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory), - delayedElectLeaderPurgatoryParam = Some(mockDelayedElectLeaderPurgatory), delayedRemoteFetchPurgatoryParam = Some(mockDelayedRemoteFetchPurgatory), delayedRemoteListOffsetsPurgatoryParam = Some(mockDelayedRemoteListOffsetsPurgatory), delayedShareFetchPurgatoryParam = Some(mockDelayedShareFetchPurgatory), diff --git a/docs/zk2kraft.html b/docs/zk2kraft.html index 123aaca4e18..2d3e8148c80 100644 --- a/docs/zk2kraft.html +++ b/docs/zk2kraft.html @@ -188,5 +188,14 @@ In Kraft mode, Zookeeper is not used, so the metrics is removed.

+
  • +

    + Remove the metrics for leader election purgatory. +

    +
      +
    • kafka.server:type=DelayedOperationPurgatory,delayedOperation=ElectLeader,name=PurgatorySize
    • +
    • kafka.server:type=DelayedOperationPurgatory,delayedOperation=ElectLeader,name=NumDelayedOperations
    • +
    +
  • \ No newline at end of file