mirror of https://github.com/apache/kafka.git
KAFKA-18532: Clean Partition.scala zookeeper logic (#18594)
Reviewers: Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
parent
ff3de0cedc
commit
6eddaeba58
|
@ -27,7 +27,6 @@ import kafka.server.metadata.KRaftMetadataCache
|
||||||
import kafka.server.share.DelayedShareFetch
|
import kafka.server.share.DelayedShareFetch
|
||||||
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
|
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
|
||||||
import kafka.utils._
|
import kafka.utils._
|
||||||
import kafka.zookeeper.ZooKeeperClientException
|
|
||||||
import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicIdPartition, TopicPartition, Uuid}
|
import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicIdPartition, TopicPartition, Uuid}
|
||||||
import org.apache.kafka.common.errors._
|
import org.apache.kafka.common.errors._
|
||||||
import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState
|
import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState
|
||||||
|
@ -767,14 +766,7 @@ class Partition(val topicPartition: TopicPartition,
|
||||||
LeaderRecoveryState.RECOVERED
|
LeaderRecoveryState.RECOVERED
|
||||||
)
|
)
|
||||||
|
|
||||||
try {
|
createLogInAssignedDirectoryId(partitionState, highWatermarkCheckpoints, topicId, targetDirectoryId)
|
||||||
createLogInAssignedDirectoryId(partitionState, highWatermarkCheckpoints, topicId, targetDirectoryId)
|
|
||||||
} catch {
|
|
||||||
case e: ZooKeeperClientException =>
|
|
||||||
stateChangeLogger.error(s"A ZooKeeper client exception has occurred and makeLeader will be skipping the " +
|
|
||||||
s"state change for the partition $topicPartition with leader epoch: $leaderEpoch.", e)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
val leaderLog = localLogOrException
|
val leaderLog = localLogOrException
|
||||||
|
|
||||||
|
@ -868,14 +860,7 @@ class Partition(val topicPartition: TopicPartition,
|
||||||
LeaderRecoveryState.of(partitionState.leaderRecoveryState)
|
LeaderRecoveryState.of(partitionState.leaderRecoveryState)
|
||||||
)
|
)
|
||||||
|
|
||||||
try {
|
createLogInAssignedDirectoryId(partitionState, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
|
||||||
createLogInAssignedDirectoryId(partitionState, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
|
|
||||||
} catch {
|
|
||||||
case e: ZooKeeperClientException =>
|
|
||||||
stateChangeLogger.error(s"A ZooKeeper client exception has occurred. makeFollower will be skipping the " +
|
|
||||||
s"state change for the partition $topicPartition with leader epoch: $leaderEpoch.", e)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
val followerLog = localLogOrException
|
val followerLog = localLogOrException
|
||||||
if (isNewLeaderEpoch) {
|
if (isNewLeaderEpoch) {
|
||||||
|
|
|
@ -2833,7 +2833,7 @@ class ReplicaManager(val config: KafkaConfig,
|
||||||
val leaderChangedPartitions = new mutable.HashSet[Partition]
|
val leaderChangedPartitions = new mutable.HashSet[Partition]
|
||||||
val followerChangedPartitions = new mutable.HashSet[Partition]
|
val followerChangedPartitions = new mutable.HashSet[Partition]
|
||||||
if (!localChanges.leaders.isEmpty) {
|
if (!localChanges.leaders.isEmpty) {
|
||||||
applyLocalLeadersDelta(leaderChangedPartitions, newImage, delta, lazyOffsetCheckpoints, localChanges.leaders.asScala, localChanges.directoryIds.asScala)
|
applyLocalLeadersDelta(leaderChangedPartitions, delta, lazyOffsetCheckpoints, localChanges.leaders.asScala, localChanges.directoryIds.asScala)
|
||||||
}
|
}
|
||||||
if (!localChanges.followers.isEmpty) {
|
if (!localChanges.followers.isEmpty) {
|
||||||
applyLocalFollowersDelta(followerChangedPartitions, newImage, delta, lazyOffsetCheckpoints, localChanges.followers.asScala, localChanges.directoryIds.asScala)
|
applyLocalFollowersDelta(followerChangedPartitions, newImage, delta, lazyOffsetCheckpoints, localChanges.followers.asScala, localChanges.directoryIds.asScala)
|
||||||
|
@ -2857,7 +2857,6 @@ class ReplicaManager(val config: KafkaConfig,
|
||||||
|
|
||||||
private def applyLocalLeadersDelta(
|
private def applyLocalLeadersDelta(
|
||||||
changedPartitions: mutable.Set[Partition],
|
changedPartitions: mutable.Set[Partition],
|
||||||
newImage: MetadataImage,
|
|
||||||
delta: TopicsDelta,
|
delta: TopicsDelta,
|
||||||
offsetCheckpoints: OffsetCheckpoints,
|
offsetCheckpoints: OffsetCheckpoints,
|
||||||
localLeaders: mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo],
|
localLeaders: mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo],
|
||||||
|
|
Loading…
Reference in New Issue