KAFKA-18592 Cleanup ReplicaManager (#18621)

Reviewers: Ismael Juma <ismael@juma.me.uk>, Christo Lolov <lolovc@amazon.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TengYao Chi 2025-01-24 01:34:36 +08:00 committed by GitHub
parent ce4eeaa379
commit 40890faa1b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 10 additions and 84 deletions

View File

@ -22,7 +22,6 @@ import kafka.log.remote.RemoteLogManager;
import kafka.server.AddPartitionsToTxnManager; import kafka.server.AddPartitionsToTxnManager;
import kafka.server.AlterPartitionManager; import kafka.server.AlterPartitionManager;
import kafka.server.DelayedDeleteRecords; import kafka.server.DelayedDeleteRecords;
import kafka.server.DelayedElectLeader;
import kafka.server.DelayedFetch; import kafka.server.DelayedFetch;
import kafka.server.DelayedProduce; import kafka.server.DelayedProduce;
import kafka.server.DelayedRemoteFetch; import kafka.server.DelayedRemoteFetch;
@ -66,7 +65,6 @@ public class ReplicaManagerBuilder {
private Optional<DelayedOperationPurgatory<DelayedProduce>> delayedProducePurgatory = Optional.empty(); private Optional<DelayedOperationPurgatory<DelayedProduce>> delayedProducePurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedFetch>> delayedFetchPurgatory = Optional.empty(); private Optional<DelayedOperationPurgatory<DelayedFetch>> delayedFetchPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedDeleteRecords>> delayedDeleteRecordsPurgatory = Optional.empty(); private Optional<DelayedOperationPurgatory<DelayedDeleteRecords>> delayedDeleteRecordsPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedElectLeader>> delayedElectLeaderPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedRemoteFetch>> delayedRemoteFetchPurgatory = Optional.empty(); private Optional<DelayedOperationPurgatory<DelayedRemoteFetch>> delayedRemoteFetchPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedRemoteListOffsets>> delayedRemoteListOffsetsPurgatory = Optional.empty(); private Optional<DelayedOperationPurgatory<DelayedRemoteListOffsets>> delayedRemoteListOffsetsPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedShareFetch>> delayedShareFetchPurgatory = Optional.empty(); private Optional<DelayedOperationPurgatory<DelayedShareFetch>> delayedShareFetchPurgatory = Optional.empty();
@ -130,36 +128,11 @@ public class ReplicaManagerBuilder {
return this; return this;
} }
public ReplicaManagerBuilder setIsShuttingDown(AtomicBoolean isShuttingDown) {
this.isShuttingDown = isShuttingDown;
return this;
}
public ReplicaManagerBuilder setDelayedProducePurgatory(DelayedOperationPurgatory<DelayedProduce> delayedProducePurgatory) {
this.delayedProducePurgatory = Optional.of(delayedProducePurgatory);
return this;
}
public ReplicaManagerBuilder setDelayedFetchPurgatory(DelayedOperationPurgatory<DelayedFetch> delayedFetchPurgatory) { public ReplicaManagerBuilder setDelayedFetchPurgatory(DelayedOperationPurgatory<DelayedFetch> delayedFetchPurgatory) {
this.delayedFetchPurgatory = Optional.of(delayedFetchPurgatory); this.delayedFetchPurgatory = Optional.of(delayedFetchPurgatory);
return this; return this;
} }
public ReplicaManagerBuilder setDelayedRemoteFetchPurgatory(DelayedOperationPurgatory<DelayedRemoteFetch> delayedRemoteFetchPurgatory) {
this.delayedRemoteFetchPurgatory = Optional.of(delayedRemoteFetchPurgatory);
return this;
}
public ReplicaManagerBuilder setDelayedDeleteRecordsPurgatory(DelayedOperationPurgatory<DelayedDeleteRecords> delayedDeleteRecordsPurgatory) {
this.delayedDeleteRecordsPurgatory = Optional.of(delayedDeleteRecordsPurgatory);
return this;
}
public ReplicaManagerBuilder setDelayedElectLeaderPurgatoryParam(DelayedOperationPurgatory<DelayedElectLeader> delayedElectLeaderPurgatory) {
this.delayedElectLeaderPurgatory = Optional.of(delayedElectLeaderPurgatory);
return this;
}
public ReplicaManagerBuilder setThreadNamePrefix(String threadNamePrefix) { public ReplicaManagerBuilder setThreadNamePrefix(String threadNamePrefix) {
this.threadNamePrefix = Optional.of(threadNamePrefix); this.threadNamePrefix = Optional.of(threadNamePrefix);
return this; return this;
@ -170,11 +143,6 @@ public class ReplicaManagerBuilder {
return this; return this;
} }
public ReplicaManagerBuilder setAddPartitionsToTransactionManager(AddPartitionsToTxnManager addPartitionsToTxnManager) {
this.addPartitionsToTxnManager = Optional.of(addPartitionsToTxnManager);
return this;
}
public ReplicaManagerBuilder setDirectoryEventHandler(DirectoryEventHandler directoryEventHandler) { public ReplicaManagerBuilder setDirectoryEventHandler(DirectoryEventHandler directoryEventHandler) {
this.directoryEventHandler = directoryEventHandler; this.directoryEventHandler = directoryEventHandler;
return this; return this;
@ -206,7 +174,6 @@ public class ReplicaManagerBuilder {
OptionConverters.toScala(delayedProducePurgatory), OptionConverters.toScala(delayedProducePurgatory),
OptionConverters.toScala(delayedFetchPurgatory), OptionConverters.toScala(delayedFetchPurgatory),
OptionConverters.toScala(delayedDeleteRecordsPurgatory), OptionConverters.toScala(delayedDeleteRecordsPurgatory),
OptionConverters.toScala(delayedElectLeaderPurgatory),
OptionConverters.toScala(delayedRemoteFetchPurgatory), OptionConverters.toScala(delayedRemoteFetchPurgatory),
OptionConverters.toScala(delayedRemoteListOffsetsPurgatory), OptionConverters.toScala(delayedRemoteListOffsetsPurgatory),
OptionConverters.toScala(delayedShareFetchPurgatory), OptionConverters.toScala(delayedShareFetchPurgatory),

View File

@ -1635,24 +1635,6 @@ class Partition(val topicPartition: TopicPartition,
localLog.fetchOffsetSnapshot localLog.fetchOffsetSnapshot
} }
def legacyFetchOffsetsForTimestamp(timestamp: Long,
maxNumOffsets: Int,
isFromConsumer: Boolean,
fetchOnlyFromLeader: Boolean): Seq[Long] = inReadLock(leaderIsrUpdateLock) {
val localLog = localLogWithEpochOrThrow(Optional.empty(), fetchOnlyFromLeader)
val allOffsets = localLog.legacyFetchOffsetsBefore(timestamp, maxNumOffsets)
if (!isFromConsumer) {
allOffsets
} else {
val hw = localLog.highWatermark
if (allOffsets.exists(_ > hw))
hw +: allOffsets.dropWhile(_ > hw)
else
allOffsets
}
}
def logStartOffset: Long = { def logStartOffset: Long = {
inReadLock(leaderIsrUpdateLock) { inReadLock(leaderIsrUpdateLock) {
leaderLogIfLocal.map(_.logStartOffset).getOrElse(-1) leaderLogIfLocal.map(_.logStartOffset).getOrElse(-1)

View File

@ -53,7 +53,7 @@ import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common}
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition, TopicOptionalIdPartition} import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition, TopicOptionalIdPartition}
import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.network.BrokerEndPoint import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.purgatory.{DelayedOperationKey, DelayedOperationPurgatory, TopicPartitionOperationKey} import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFetchPartitionKey} import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFetchPartitionKey}
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData} import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread} import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
@ -274,7 +274,6 @@ class ReplicaManager(val config: KafkaConfig,
delayedProducePurgatoryParam: Option[DelayedOperationPurgatory[DelayedProduce]] = None, delayedProducePurgatoryParam: Option[DelayedOperationPurgatory[DelayedProduce]] = None,
delayedFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedFetch]] = None, delayedFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedFetch]] = None,
delayedDeleteRecordsPurgatoryParam: Option[DelayedOperationPurgatory[DelayedDeleteRecords]] = None, delayedDeleteRecordsPurgatoryParam: Option[DelayedOperationPurgatory[DelayedDeleteRecords]] = None,
delayedElectLeaderPurgatoryParam: Option[DelayedOperationPurgatory[DelayedElectLeader]] = None,
delayedRemoteFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedRemoteFetch]] = None, delayedRemoteFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedRemoteFetch]] = None,
delayedRemoteListOffsetsPurgatoryParam: Option[DelayedOperationPurgatory[DelayedRemoteListOffsets]] = None, delayedRemoteListOffsetsPurgatoryParam: Option[DelayedOperationPurgatory[DelayedRemoteListOffsets]] = None,
delayedShareFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedShareFetch]] = None, delayedShareFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedShareFetch]] = None,
@ -298,9 +297,6 @@ class ReplicaManager(val config: KafkaConfig,
new DelayedOperationPurgatory[DelayedDeleteRecords]( new DelayedOperationPurgatory[DelayedDeleteRecords](
"DeleteRecords", config.brokerId, "DeleteRecords", config.brokerId,
config.deleteRecordsPurgatoryPurgeIntervalRequests)) config.deleteRecordsPurgatoryPurgeIntervalRequests))
val delayedElectLeaderPurgatory = delayedElectLeaderPurgatoryParam.getOrElse(
new DelayedOperationPurgatory[DelayedElectLeader](
"ElectLeader", config.brokerId))
val delayedRemoteFetchPurgatory = delayedRemoteFetchPurgatoryParam.getOrElse( val delayedRemoteFetchPurgatory = delayedRemoteFetchPurgatoryParam.getOrElse(
new DelayedOperationPurgatory[DelayedRemoteFetch]( new DelayedOperationPurgatory[DelayedRemoteFetch](
"RemoteFetch", config.brokerId)) "RemoteFetch", config.brokerId))
@ -387,13 +383,6 @@ class ReplicaManager(val config: KafkaConfig,
def getLog(topicPartition: TopicPartition): Option[UnifiedLog] = logManager.getLog(topicPartition) def getLog(topicPartition: TopicPartition): Option[UnifiedLog] = logManager.getLog(topicPartition)
def hasDelayedElectionOperations: Boolean = delayedElectLeaderPurgatory.numDelayed != 0
def tryCompleteElection(key: DelayedOperationKey): Unit = {
val completed = delayedElectLeaderPurgatory.checkAndComplete(key)
debug("Request key %s unblocked %d ElectLeader.".format(key.keyLabel, completed))
}
def startup(): Unit = { def startup(): Unit = {
// start ISR expiration thread // start ISR expiration thread
// A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR
@ -628,10 +617,6 @@ class ReplicaManager(val config: KafkaConfig,
onlinePartition(topicPartition).flatMap(_.log) onlinePartition(topicPartition).flatMap(_.log)
} }
def getLogDir(topicPartition: TopicPartition): Option[String] = {
localLog(topicPartition).map(_.parentDir)
}
def tryCompleteActions(): Unit = defaultActionQueue.tryCompleteActions() def tryCompleteActions(): Unit = defaultActionQueue.tryCompleteActions()
def addToActionQueue(action: Runnable): Unit = defaultActionQueue.add(action) def addToActionQueue(action: Runnable): Unit = defaultActionQueue.add(action)
@ -1490,15 +1475,6 @@ class ReplicaManager(val config: KafkaConfig,
partition.fetchOffsetForTimestamp(timestamp, isolationLevel, currentLeaderEpoch, fetchOnlyFromLeader, remoteLogManager) partition.fetchOffsetForTimestamp(timestamp, isolationLevel, currentLeaderEpoch, fetchOnlyFromLeader, remoteLogManager)
} }
def legacyFetchOffsetsForTimestamp(topicPartition: TopicPartition,
timestamp: Long,
maxNumOffsets: Int,
isFromConsumer: Boolean,
fetchOnlyFromLeader: Boolean): Seq[Long] = {
val partition = getPartitionOrException(topicPartition)
partition.legacyFetchOffsetsForTimestamp(timestamp, maxNumOffsets, isFromConsumer, fetchOnlyFromLeader)
}
/** /**
* Returns [[LogReadResult]] with error if a task for RemoteStorageFetchInfo could not be scheduled successfully * Returns [[LogReadResult]] with error if a task for RemoteStorageFetchInfo could not be scheduled successfully
* else returns [[None]]. * else returns [[None]].
@ -2525,7 +2501,6 @@ class ReplicaManager(val config: KafkaConfig,
delayedRemoteListOffsetsPurgatory.shutdown() delayedRemoteListOffsetsPurgatory.shutdown()
delayedProducePurgatory.shutdown() delayedProducePurgatory.shutdown()
delayedDeleteRecordsPurgatory.shutdown() delayedDeleteRecordsPurgatory.shutdown()
delayedElectLeaderPurgatory.shutdown()
delayedShareFetchPurgatory.shutdown() delayedShareFetchPurgatory.shutdown()
if (checkpointHW) if (checkpointHW)
checkpointHighWatermarks() checkpointHighWatermarks()

View File

@ -190,7 +190,6 @@ object AbstractCoordinatorConcurrencyTest {
delayedProducePurgatoryParam = Some(producePurgatory), delayedProducePurgatoryParam = Some(producePurgatory),
delayedFetchPurgatoryParam = Some(delayedFetchPurgatoryParam), delayedFetchPurgatoryParam = Some(delayedFetchPurgatoryParam),
delayedDeleteRecordsPurgatoryParam = Some(delayedDeleteRecordsPurgatoryParam), delayedDeleteRecordsPurgatoryParam = Some(delayedDeleteRecordsPurgatoryParam),
delayedElectLeaderPurgatoryParam = Some(delayedElectLeaderPurgatoryParam),
delayedRemoteFetchPurgatoryParam = Some(delayedRemoteFetchPurgatoryParam), delayedRemoteFetchPurgatoryParam = Some(delayedRemoteFetchPurgatoryParam),
delayedRemoteListOffsetsPurgatoryParam = Some(delayedRemoteListOffsetsPurgatoryParam), delayedRemoteListOffsetsPurgatoryParam = Some(delayedRemoteListOffsetsPurgatoryParam),
threadNamePrefix = Option(this.getClass.getName)) { threadNamePrefix = Option(this.getClass.getName)) {

View File

@ -2723,8 +2723,6 @@ class ReplicaManagerTest {
"Fetch", timer, 0, false) "Fetch", timer, 0, false)
val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords]( val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords](
"DeleteRecords", timer, 0, false) "DeleteRecords", timer, 0, false)
val mockElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader](
"ElectLeader", timer, 0, false)
val mockRemoteFetchPurgatory = new DelayedOperationPurgatory[DelayedRemoteFetch]( val mockRemoteFetchPurgatory = new DelayedOperationPurgatory[DelayedRemoteFetch](
"RemoteFetch", timer, 0, false) "RemoteFetch", timer, 0, false)
val mockRemoteListOffsetsPurgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets]( val mockRemoteListOffsetsPurgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets](
@ -2754,7 +2752,6 @@ class ReplicaManagerTest {
delayedProducePurgatoryParam = Some(mockProducePurgatory), delayedProducePurgatoryParam = Some(mockProducePurgatory),
delayedFetchPurgatoryParam = Some(mockFetchPurgatory), delayedFetchPurgatoryParam = Some(mockFetchPurgatory),
delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory), delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory),
delayedElectLeaderPurgatoryParam = Some(mockElectLeaderPurgatory),
delayedRemoteFetchPurgatoryParam = Some(mockRemoteFetchPurgatory), delayedRemoteFetchPurgatoryParam = Some(mockRemoteFetchPurgatory),
delayedRemoteListOffsetsPurgatoryParam = Some(mockRemoteListOffsetsPurgatory), delayedRemoteListOffsetsPurgatoryParam = Some(mockRemoteListOffsetsPurgatory),
delayedShareFetchPurgatoryParam = Some(mockDelayedShareFetchPurgatory), delayedShareFetchPurgatoryParam = Some(mockDelayedShareFetchPurgatory),
@ -3150,8 +3147,6 @@ class ReplicaManagerTest {
"Fetch", timer, 0, false) "Fetch", timer, 0, false)
val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords]( val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords](
"DeleteRecords", timer, 0, false) "DeleteRecords", timer, 0, false)
val mockDelayedElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader](
"DelayedElectLeader", timer, 0, false)
val mockDelayedRemoteFetchPurgatory = new DelayedOperationPurgatory[DelayedRemoteFetch]( val mockDelayedRemoteFetchPurgatory = new DelayedOperationPurgatory[DelayedRemoteFetch](
"DelayedRemoteFetch", timer, 0, false) "DelayedRemoteFetch", timer, 0, false)
val mockDelayedRemoteListOffsetsPurgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets]( val mockDelayedRemoteListOffsetsPurgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets](
@ -3188,7 +3183,6 @@ class ReplicaManagerTest {
delayedProducePurgatoryParam = Some(mockProducePurgatory), delayedProducePurgatoryParam = Some(mockProducePurgatory),
delayedFetchPurgatoryParam = Some(mockFetchPurgatory), delayedFetchPurgatoryParam = Some(mockFetchPurgatory),
delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory), delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory),
delayedElectLeaderPurgatoryParam = Some(mockDelayedElectLeaderPurgatory),
delayedRemoteFetchPurgatoryParam = Some(mockDelayedRemoteFetchPurgatory), delayedRemoteFetchPurgatoryParam = Some(mockDelayedRemoteFetchPurgatory),
delayedRemoteListOffsetsPurgatoryParam = Some(mockDelayedRemoteListOffsetsPurgatory), delayedRemoteListOffsetsPurgatoryParam = Some(mockDelayedRemoteListOffsetsPurgatory),
delayedShareFetchPurgatoryParam = Some(mockDelayedShareFetchPurgatory), delayedShareFetchPurgatoryParam = Some(mockDelayedShareFetchPurgatory),

View File

@ -188,5 +188,14 @@
In Kraft mode, Zookeeper is not used, so the metrics is removed. In Kraft mode, Zookeeper is not used, so the metrics is removed.
</p> </p>
</li> </li>
<li>
<p>
Remove the metrics for leader election purgatory.
</p>
<ul>
<li><code>kafka.server:type=DelayedOperationPurgatory,delayedOperation=ElectLeader,name=PurgatorySize</code></li>
<li><code>kafka.server:type=DelayedOperationPurgatory,delayedOperation=ElectLeader,name=NumDelayedOperations</code></li>
</ul>
</li>
</ul> </ul>
</div> </div>