MINOR: remove unused DelayedElectLeader (#19490)

The `DelayedElectLeader` is only used in `TestReplicaManager`, but there
is no reference in it, so we can safely remove it.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
PoAn Yang 2025-04-16 23:56:31 +08:00 committed by GitHub
parent 4b2a3102da
commit 18e4608d1c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 1 additions and 89 deletions

View File

@ -1,85 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.ApiError
import org.apache.kafka.server.purgatory.DelayedOperation
import scala.collection.{Map, mutable}
/** A delayed elect leader operation that can be created by the replica manager and watched
* in the elect leader purgatory
*/
class DelayedElectLeader(
delayMs: Long,
expectedLeaders: Map[TopicPartition, Int],
results: Map[TopicPartition, ApiError],
replicaManager: ReplicaManager,
responseCallback: Map[TopicPartition, ApiError] => Unit
) extends DelayedOperation(delayMs) with Logging {
private val waitingPartitions = mutable.Map() ++= expectedLeaders
private val fullResults = mutable.Map() ++= results
/**
* Call-back to execute when a delayed operation gets expired and hence forced to complete.
*/
override def onExpiration(): Unit = {}
/**
* Process for completing an operation; This function needs to be defined
* in subclasses and will be called exactly once in forceComplete()
*/
override def onComplete(): Unit = {
// This could be called to force complete, so I need the full list of partitions, so I can time them all out.
updateWaiting()
val timedOut = waitingPartitions.map {
case (tp, _) => tp -> new ApiError(Errors.REQUEST_TIMED_OUT, null)
}
responseCallback(timedOut ++ fullResults)
}
/**
* Try to complete the delayed operation by first checking if the operation
* can be completed by now. If yes execute the completion logic by calling
* forceComplete() and return true iff forceComplete returns true; otherwise return false
*
* This function needs to be defined in subclasses
*/
override def tryComplete(): Boolean = {
updateWaiting()
debug(s"tryComplete() waitingPartitions: $waitingPartitions")
waitingPartitions.isEmpty && forceComplete()
}
private def updateWaiting(): Unit = {
val metadataCache = replicaManager.metadataCache
val completedPartitions = waitingPartitions.collect {
case (tp, leader) if metadataCache.getLeaderAndIsr(tp.topic, tp.partition).filter(_.leader == leader).isPresent() => tp
}
completedPartitions.foreach { tp =>
waitingPartitions -= tp
fullResults += tp -> ApiError.NONE
}
}
}

View File

@ -172,7 +172,6 @@ object AbstractCoordinatorConcurrencyTest {
val producePurgatory: DelayedOperationPurgatory[DelayedProduce],
val delayedFetchPurgatoryParam: DelayedOperationPurgatory[DelayedFetch],
val delayedDeleteRecordsPurgatoryParam: DelayedOperationPurgatory[DelayedDeleteRecords],
val delayedElectLeaderPurgatoryParam: DelayedOperationPurgatory[DelayedElectLeader],
val delayedRemoteFetchPurgatoryParam: DelayedOperationPurgatory[DelayedRemoteFetch],
val delayedRemoteListOffsetsPurgatoryParam: DelayedOperationPurgatory[DelayedRemoteListOffsets])
extends ReplicaManager(
@ -290,10 +289,8 @@ object AbstractCoordinatorConcurrencyTest {
"Fetch", timer, 0, 1000, false, true)
val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords](
"DeleteRecords", timer, 0, 1000, false, true)
val mockElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader](
"ElectLeader", timer, 0, 1000, false, true)
new TestReplicaManager(config, time, scheduler, logManager, quotaManagers, watchKeys, producePurgatory,
mockFetchPurgatory, mockDeleteRecordsPurgatory, mockElectLeaderPurgatory, mockRemoteFetchPurgatory,
mockFetchPurgatory, mockDeleteRecordsPurgatory, mockRemoteFetchPurgatory,
mockRemoteListOffsetsPurgatory)
}
}