mirror of https://github.com/apache/kafka.git
KAFKA-19447 Replace PartitionState with PartitionRegistration in makeFollower/makeLeader (#20335)
Follow-up to [KAFKA-18486](https://issues.apache.org/jira/browse/KAFKA-18486) * Replace PartitionState with PartitionRegistration in makeFollower/makeLeader * Remove PartitionState.java since it is no longer referenced Reviewers: TaiJuWu <tjwu1217@gmail.com>, Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
bf0e6ba700
commit
cae9848160
|
@ -36,13 +36,12 @@ import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
|
|||
import org.apache.kafka.common.record.{FileRecords, MemoryRecords, RecordBatch}
|
||||
import org.apache.kafka.common.requests._
|
||||
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
|
||||
import org.apache.kafka.common.{PartitionState => JPartitionState}
|
||||
import org.apache.kafka.common.utils.Time
|
||||
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCache}
|
||||
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCache, PartitionRegistration}
|
||||
import org.apache.kafka.server.common.RequestLocal
|
||||
import org.apache.kafka.server.log.remote.TopicPartitionLog
|
||||
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
|
||||
import org.apache.kafka.storage.internals.log.{AppendOrigin, AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog, VerificationGuard}
|
||||
import org.apache.kafka.storage.internals.log.{AppendOrigin, AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetsListener, LogOffsetSnapshot, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog, VerificationGuard}
|
||||
import org.apache.kafka.server.metrics.KafkaMetricsGroup
|
||||
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, TopicPartitionOperationKey}
|
||||
import org.apache.kafka.server.replica.Replica
|
||||
|
@ -731,7 +730,8 @@ class Partition(val topicPartition: TopicPartition,
|
|||
* from the time when this broker was the leader last time) and setting the new leader and ISR.
|
||||
* If the leader replica id does not change, return false to indicate the replica manager.
|
||||
*/
|
||||
def makeLeader(partitionState: JPartitionState,
|
||||
def makeLeader(partitionRegistration: PartitionRegistration,
|
||||
isNew: Boolean,
|
||||
highWatermarkCheckpoints: OffsetCheckpoints,
|
||||
topicId: Option[Uuid],
|
||||
targetDirectoryId: Option[Uuid] = None): Boolean = {
|
||||
|
@ -739,23 +739,23 @@ class Partition(val topicPartition: TopicPartition,
|
|||
// Partition state changes are expected to have a partition epoch larger or equal
|
||||
// to the current partition epoch. The latter is allowed because the partition epoch
|
||||
// is also updated by the AlterPartition response so the new epoch might be known
|
||||
// before a LeaderAndIsr request is received or before an update is received via
|
||||
// before a partitionRegistration is received or before an update is received via
|
||||
// the metadata log.
|
||||
if (partitionState.partitionEpoch < partitionEpoch) {
|
||||
stateChangeLogger.info(s"Skipped the become-leader state change for $topicPartition with topic id $topicId " +
|
||||
s"and partition state $partitionState since the leader is already at a newer partition epoch $partitionEpoch.")
|
||||
if (partitionRegistration.partitionEpoch < partitionEpoch) {
|
||||
stateChangeLogger.info(s"Skipped the become-leader state change for $topicPartition with topic id $topicId, " +
|
||||
s"partition registration $partitionRegistration and isNew=$isNew since the leader is already at a newer partition epoch $partitionEpoch.")
|
||||
return false
|
||||
}
|
||||
|
||||
val currentTimeMs = time.milliseconds
|
||||
val isNewLeader = !isLeader
|
||||
val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch
|
||||
val replicas = partitionState.replicas.asScala.map(_.toInt)
|
||||
val isr = partitionState.isr.asScala.map(_.toInt).toSet
|
||||
val addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt)
|
||||
val removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt)
|
||||
val isNewLeaderEpoch = partitionRegistration.leaderEpoch > leaderEpoch
|
||||
val replicas = partitionRegistration.replicas
|
||||
val isr = partitionRegistration.isr.toSet
|
||||
val addingReplicas = partitionRegistration.addingReplicas
|
||||
val removingReplicas = partitionRegistration.removingReplicas
|
||||
|
||||
if (partitionState.leaderRecoveryState == LeaderRecoveryState.RECOVERING.value) {
|
||||
if (partitionRegistration.leaderRecoveryState == LeaderRecoveryState.RECOVERING) {
|
||||
stateChangeLogger.info(s"The topic partition $topicPartition was marked as RECOVERING. " +
|
||||
"Marking the topic partition as RECOVERED.")
|
||||
}
|
||||
|
@ -771,7 +771,7 @@ class Partition(val topicPartition: TopicPartition,
|
|||
LeaderRecoveryState.RECOVERED
|
||||
)
|
||||
|
||||
createLogInAssignedDirectoryId(partitionState, highWatermarkCheckpoints, topicId, targetDirectoryId)
|
||||
createLogInAssignedDirectoryId(isNew, highWatermarkCheckpoints, topicId, targetDirectoryId)
|
||||
|
||||
val leaderLog = localLogOrException
|
||||
|
||||
|
@ -780,8 +780,8 @@ class Partition(val topicPartition: TopicPartition,
|
|||
if (isNewLeaderEpoch) {
|
||||
val leaderEpochStartOffset = leaderLog.logEndOffset
|
||||
stateChangeLogger.info(s"Leader $topicPartition with topic id $topicId starts at " +
|
||||
s"leader epoch ${partitionState.leaderEpoch} from offset $leaderEpochStartOffset " +
|
||||
s"with partition epoch ${partitionState.partitionEpoch}, high watermark ${leaderLog.highWatermark}, " +
|
||||
s"leader epoch ${partitionRegistration.leaderEpoch} from offset $leaderEpochStartOffset " +
|
||||
s"with partition epoch ${partitionRegistration.partitionEpoch}, high watermark ${leaderLog.highWatermark}, " +
|
||||
s"ISR ${isr.mkString("[", ",", "]")}, adding replicas ${addingReplicas.mkString("[", ",", "]")} and " +
|
||||
s"removing replicas ${removingReplicas.mkString("[", ",", "]")} ${if (isUnderMinIsr) "(under-min-isr)" else ""}. " +
|
||||
s"Previous leader $leaderReplicaIdOpt and previous leader epoch was $leaderEpoch.")
|
||||
|
@ -791,7 +791,7 @@ class Partition(val topicPartition: TopicPartition,
|
|||
// to ensure that these followers can truncate to the right offset, we must cache the new
|
||||
// leader epoch and the start offset since it should be larger than any epoch that a follower
|
||||
// would try to query.
|
||||
leaderLog.assignEpochStartOffset(partitionState.leaderEpoch, leaderEpochStartOffset)
|
||||
leaderLog.assignEpochStartOffset(partitionRegistration.leaderEpoch, leaderEpochStartOffset)
|
||||
|
||||
// Initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and
|
||||
// lastFetchLeaderLogEndOffset.
|
||||
|
@ -800,23 +800,23 @@ class Partition(val topicPartition: TopicPartition,
|
|||
currentTimeMs,
|
||||
leaderEpochStartOffset,
|
||||
isNewLeader,
|
||||
partitionState.isr.contains(replica.brokerId)
|
||||
isr.contains(replica.brokerId)
|
||||
)
|
||||
}
|
||||
|
||||
// We update the leader epoch and the leader epoch start offset iff the
|
||||
// leader epoch changed.
|
||||
leaderEpoch = partitionState.leaderEpoch
|
||||
leaderEpoch = partitionRegistration.leaderEpoch
|
||||
leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
|
||||
} else {
|
||||
stateChangeLogger.info(s"Skipped the become-leader state change for $topicPartition with topic id $topicId " +
|
||||
s"and partition state $partitionState since it is already the leader with leader epoch $leaderEpoch. " +
|
||||
stateChangeLogger.info(s"Skipped the become-leader state change for $topicPartition with topic id $topicId, " +
|
||||
s"partition registration $partitionRegistration and isNew=$isNew since it is already the leader with leader epoch $leaderEpoch. " +
|
||||
s"Current high watermark ${leaderLog.highWatermark}, ISR ${isr.mkString("[", ",", "]")}, " +
|
||||
s"adding replicas ${addingReplicas.mkString("[", ",", "]")} and " +
|
||||
s"removing replicas ${removingReplicas.mkString("[", ",", "]")}.")
|
||||
}
|
||||
|
||||
partitionEpoch = partitionState.partitionEpoch
|
||||
partitionEpoch = partitionRegistration.partitionEpoch
|
||||
leaderReplicaIdOpt = Some(localBrokerId)
|
||||
|
||||
// We may need to increment high watermark since ISR could be down to 1.
|
||||
|
@ -837,46 +837,47 @@ class Partition(val topicPartition: TopicPartition,
|
|||
* replica manager that state is already correct and the become-follower steps can
|
||||
* be skipped.
|
||||
*/
|
||||
def makeFollower(partitionState: JPartitionState,
|
||||
def makeFollower(partitionRegistration: PartitionRegistration,
|
||||
isNew: Boolean,
|
||||
highWatermarkCheckpoints: OffsetCheckpoints,
|
||||
topicId: Option[Uuid],
|
||||
targetLogDirectoryId: Option[Uuid] = None): Boolean = {
|
||||
inWriteLock(leaderIsrUpdateLock) {
|
||||
if (partitionState.partitionEpoch < partitionEpoch) {
|
||||
stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId " +
|
||||
s"and partition state $partitionState since the follower is already at a newer partition epoch $partitionEpoch.")
|
||||
if (partitionRegistration.partitionEpoch < partitionEpoch) {
|
||||
stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId, " +
|
||||
s"partition registration $partitionRegistration and isNew=$isNew since the follower is already at a newer partition epoch $partitionEpoch.")
|
||||
return false
|
||||
}
|
||||
|
||||
val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch
|
||||
val isNewLeaderEpoch = partitionRegistration.leaderEpoch > leaderEpoch
|
||||
// The leader should be updated before updateAssignmentAndIsr where we clear the ISR. Or it is possible to meet
|
||||
// the under min isr condition during the makeFollower process and emits the wrong metric.
|
||||
leaderReplicaIdOpt = Option(partitionState.leader)
|
||||
leaderEpoch = partitionState.leaderEpoch
|
||||
leaderReplicaIdOpt = Option(partitionRegistration.leader)
|
||||
leaderEpoch = partitionRegistration.leaderEpoch
|
||||
leaderEpochStartOffsetOpt = None
|
||||
partitionEpoch = partitionState.partitionEpoch
|
||||
partitionEpoch = partitionRegistration.partitionEpoch
|
||||
|
||||
updateAssignmentAndIsr(
|
||||
replicas = partitionState.replicas.asScala.iterator.map(_.toInt).toSeq,
|
||||
replicas = partitionRegistration.replicas,
|
||||
isLeader = false,
|
||||
isr = Set.empty,
|
||||
addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt),
|
||||
removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt),
|
||||
LeaderRecoveryState.of(partitionState.leaderRecoveryState)
|
||||
addingReplicas = partitionRegistration.addingReplicas,
|
||||
removingReplicas = partitionRegistration.removingReplicas,
|
||||
partitionRegistration.leaderRecoveryState
|
||||
)
|
||||
|
||||
createLogInAssignedDirectoryId(partitionState, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
|
||||
createLogInAssignedDirectoryId(isNew, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
|
||||
|
||||
val followerLog = localLogOrException
|
||||
if (isNewLeaderEpoch) {
|
||||
val leaderEpochEndOffset = followerLog.logEndOffset
|
||||
stateChangeLogger.info(s"Follower $topicPartition starts at leader epoch ${partitionState.leaderEpoch} from " +
|
||||
s"offset $leaderEpochEndOffset with partition epoch ${partitionState.partitionEpoch} and " +
|
||||
s"high watermark ${followerLog.highWatermark}. Current leader is ${partitionState.leader}. " +
|
||||
stateChangeLogger.info(s"Follower $topicPartition starts at leader epoch ${partitionRegistration.leaderEpoch} from " +
|
||||
s"offset $leaderEpochEndOffset with partition epoch ${partitionRegistration.partitionEpoch} and " +
|
||||
s"high watermark ${followerLog.highWatermark}. Current leader is ${partitionRegistration.leader}. " +
|
||||
s"Previous leader $leaderReplicaIdOpt and previous leader epoch was $leaderEpoch.")
|
||||
} else {
|
||||
stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId " +
|
||||
s"and partition state $partitionState since it is already a follower with leader epoch $leaderEpoch.")
|
||||
stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId, " +
|
||||
s"partition registration $partitionRegistration and isNew=$isNew since it is already a follower with leader epoch $leaderEpoch.")
|
||||
}
|
||||
|
||||
// We must restart the fetchers when the leader epoch changed regardless of
|
||||
|
@ -885,11 +886,11 @@ class Partition(val topicPartition: TopicPartition,
|
|||
}
|
||||
}
|
||||
|
||||
private def createLogInAssignedDirectoryId(partitionState: JPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
|
||||
private def createLogInAssignedDirectoryId(isNew: Boolean, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
|
||||
targetLogDirectoryId match {
|
||||
case Some(directoryId) =>
|
||||
if (logManager.onlineLogDirId(directoryId) || !logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) {
|
||||
createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
|
||||
createLogIfNotExists(isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
|
||||
} else {
|
||||
warn(s"Skipping creation of log because there are potentially offline log " +
|
||||
s"directories and log may already exist there. directoryId=$directoryId, " +
|
||||
|
@ -897,7 +898,7 @@ class Partition(val topicPartition: TopicPartition,
|
|||
}
|
||||
|
||||
case None =>
|
||||
createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId)
|
||||
createLogIfNotExists(isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2409,9 +2409,8 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
localLeaders.foreachEntry { (tp, info) =>
|
||||
getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) =>
|
||||
try {
|
||||
val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
|
||||
val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2)
|
||||
partition.makeLeader(state, offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId)
|
||||
partition.makeLeader(info.partition, isNew, offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId)
|
||||
|
||||
changedPartitions.add(partition)
|
||||
} catch {
|
||||
|
@ -2451,9 +2450,8 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
// - This also ensures that the local replica is created even if the leader
|
||||
// is unavailable. This is required to ensure that we include the partition's
|
||||
// high watermark in the checkpoint file (see KAFKA-1647).
|
||||
val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
|
||||
val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2)
|
||||
val isNewLeaderEpoch = partition.makeFollower(state, offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId)
|
||||
val isNewLeaderEpoch = partition.makeFollower(info.partition, isNew, offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId)
|
||||
|
||||
if (isInControlledShutdown && (info.partition.leader == NO_LEADER ||
|
||||
!info.partition.isr.contains(config.brokerId))) {
|
||||
|
|
|
@ -19,11 +19,10 @@ package kafka.cluster
|
|||
import kafka.log.LogManager
|
||||
import kafka.utils.TestUtils
|
||||
import kafka.utils.TestUtils.MockAlterPartitionManager
|
||||
import org.apache.kafka.common.{TopicPartition, Uuid}
|
||||
import org.apache.kafka.common.{DirectoryId, TopicPartition, Uuid}
|
||||
import org.apache.kafka.common.config.TopicConfig
|
||||
import org.apache.kafka.common.PartitionState
|
||||
import org.apache.kafka.common.utils.Utils
|
||||
import org.apache.kafka.metadata.{MetadataCache, MockConfigRepository}
|
||||
import org.apache.kafka.metadata.{LeaderRecoveryState, MetadataCache, MockConfigRepository, PartitionRegistration}
|
||||
import org.apache.kafka.server.common.MetadataVersion
|
||||
import org.apache.kafka.server.config.ReplicationConfigs
|
||||
import org.apache.kafka.server.util.MockTime
|
||||
|
@ -119,28 +118,25 @@ class AbstractPartitionTest {
|
|||
isLeader: Boolean): Partition = {
|
||||
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
|
||||
|
||||
val replicas = java.util.List.of[Integer](brokerId, remoteReplicaId)
|
||||
val replicas = Array(brokerId, remoteReplicaId)
|
||||
val isr = replicas
|
||||
|
||||
if (isLeader) {
|
||||
assertTrue(partition.makeLeader(new PartitionState()
|
||||
.setLeader(brokerId)
|
||||
val partitionRegistrationBuilder = new PartitionRegistration.Builder()
|
||||
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
|
||||
.setLeaderEpoch(leaderEpoch)
|
||||
.setIsr(isr)
|
||||
.setPartitionEpoch(1)
|
||||
.setReplicas(replicas)
|
||||
.setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed")
|
||||
.setDirectories(DirectoryId.unassignedArray(replicas.length))
|
||||
if (isLeader) {
|
||||
val partitionRegistration = partitionRegistrationBuilder.setLeader(brokerId).build()
|
||||
assertTrue(partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, None), "Expected become leader transition to succeed")
|
||||
assertEquals(leaderEpoch, partition.getLeaderEpoch)
|
||||
} else {
|
||||
assertTrue(partition.makeFollower(new PartitionState()
|
||||
.setLeader(remoteReplicaId)
|
||||
.setLeaderEpoch(leaderEpoch)
|
||||
.setIsr(isr)
|
||||
.setPartitionEpoch(1)
|
||||
.setReplicas(replicas)
|
||||
.setIsNew(true), offsetCheckpoints, None), "Expected become follower transition to succeed")
|
||||
val partitionRegistration = partitionRegistrationBuilder.setLeader(remoteReplicaId).build()
|
||||
assertTrue(partition.makeFollower(partitionRegistration, isNew = true, offsetCheckpoints, None), "Expected become follower transition to succeed")
|
||||
assertEquals(leaderEpoch, partition.getLeaderEpoch)
|
||||
assertEquals(None, partition.leaderLogIfLocal)
|
||||
assertTrue(partition.leaderLogIfLocal.isEmpty)
|
||||
}
|
||||
|
||||
partition
|
||||
|
|
|
@ -16,13 +16,13 @@
|
|||
*/
|
||||
package kafka.cluster
|
||||
|
||||
import org.apache.kafka.common.PartitionState
|
||||
import org.apache.kafka.common.DirectoryId
|
||||
import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
|
||||
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
|
||||
import org.junit.jupiter.params.ParameterizedTest
|
||||
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
|
||||
|
||||
import java.util
|
||||
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
object AssignmentStateTest {
|
||||
|
@ -30,54 +30,54 @@ object AssignmentStateTest {
|
|||
|
||||
def parameters: util.stream.Stream[Arguments] = util.List.of[Arguments](
|
||||
Arguments.of(
|
||||
util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2),
|
||||
util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2),
|
||||
util.List.of[Integer], util.List.of[Integer], util.List.of[Int], Boolean.box(false)),
|
||||
Array(brokerId, brokerId + 1, brokerId + 2),
|
||||
Array(brokerId, brokerId + 1, brokerId + 2),
|
||||
Array.emptyIntArray, Array.emptyIntArray, util.List.of[Int], Boolean.box(false)),
|
||||
Arguments.of(
|
||||
util.List.of[Integer](brokerId, brokerId + 1),
|
||||
util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2),
|
||||
util.List.of[Integer], util.List.of[Integer], util.List.of[Int], Boolean.box(true)),
|
||||
Array(brokerId, brokerId + 1),
|
||||
Array(brokerId, brokerId + 1, brokerId + 2),
|
||||
Array.emptyIntArray, Array.emptyIntArray, util.List.of[Int], Boolean.box(true)),
|
||||
Arguments.of(
|
||||
util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2),
|
||||
util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2),
|
||||
util.List.of[Integer](brokerId + 3, brokerId + 4),
|
||||
util.List.of[Integer](brokerId + 1),
|
||||
Array(brokerId, brokerId + 1, brokerId + 2),
|
||||
Array(brokerId, brokerId + 1, brokerId + 2),
|
||||
Array(brokerId + 3, brokerId + 4),
|
||||
Array(brokerId + 1),
|
||||
util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)),
|
||||
Arguments.of(
|
||||
util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2),
|
||||
util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2),
|
||||
util.List.of[Integer](brokerId + 3, brokerId + 4),
|
||||
util.List.of[Integer],
|
||||
Array(brokerId, brokerId + 1, brokerId + 2),
|
||||
Array(brokerId, brokerId + 1, brokerId + 2),
|
||||
Array(brokerId + 3, brokerId + 4),
|
||||
Array.emptyIntArray,
|
||||
util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)),
|
||||
Arguments.of(
|
||||
util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2),
|
||||
util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2),
|
||||
util.List.of[Integer],
|
||||
util.List.of[Integer](brokerId + 1),
|
||||
Array(brokerId, brokerId + 1, brokerId + 2),
|
||||
Array(brokerId, brokerId + 1, brokerId + 2),
|
||||
Array.emptyIntArray,
|
||||
Array(brokerId + 1),
|
||||
util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)),
|
||||
Arguments.of(
|
||||
util.List.of[Integer](brokerId + 1, brokerId + 2),
|
||||
util.List.of[Integer](brokerId + 1, brokerId + 2),
|
||||
util.List.of[Integer](brokerId),
|
||||
util.List.of[Integer],
|
||||
Array(brokerId + 1, brokerId + 2),
|
||||
Array(brokerId + 1, brokerId + 2),
|
||||
Array(brokerId),
|
||||
Array.emptyIntArray,
|
||||
util.List.of(brokerId + 1, brokerId + 2), Boolean.box(false)),
|
||||
Arguments.of(
|
||||
util.List.of[Integer](brokerId + 2, brokerId + 3, brokerId + 4),
|
||||
util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2),
|
||||
util.List.of[Integer](brokerId + 3, brokerId + 4, brokerId + 5),
|
||||
util.List.of[Integer],
|
||||
Array(brokerId + 2, brokerId + 3, brokerId + 4),
|
||||
Array(brokerId, brokerId + 1, brokerId + 2),
|
||||
Array(brokerId + 3, brokerId + 4, brokerId + 5),
|
||||
Array.emptyIntArray,
|
||||
util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)),
|
||||
Arguments.of(
|
||||
util.List.of[Integer](brokerId + 2, brokerId + 3, brokerId + 4),
|
||||
util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2),
|
||||
util.List.of[Integer](brokerId + 3, brokerId + 4, brokerId + 5),
|
||||
util.List.of[Integer],
|
||||
Array(brokerId + 2, brokerId + 3, brokerId + 4),
|
||||
Array(brokerId, brokerId + 1, brokerId + 2),
|
||||
Array(brokerId + 3, brokerId + 4, brokerId + 5),
|
||||
Array.emptyIntArray,
|
||||
util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)),
|
||||
Arguments.of(
|
||||
util.List.of[Integer](brokerId + 2, brokerId + 3),
|
||||
util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2),
|
||||
util.List.of[Integer](brokerId + 3, brokerId + 4, brokerId + 5),
|
||||
util.List.of[Integer],
|
||||
Array(brokerId + 2, brokerId + 3),
|
||||
Array(brokerId, brokerId + 1, brokerId + 2),
|
||||
Array(brokerId + 3, brokerId + 4, brokerId + 5),
|
||||
Array.emptyIntArray,
|
||||
util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(true))
|
||||
).stream()
|
||||
}
|
||||
|
@ -86,31 +86,29 @@ class AssignmentStateTest extends AbstractPartitionTest {
|
|||
|
||||
@ParameterizedTest
|
||||
@MethodSource(Array("parameters"))
|
||||
def testPartitionAssignmentStatus(isr: util.List[Integer], replicas: util.List[Integer],
|
||||
adding: util.List[Integer], removing: util.List[Integer],
|
||||
def testPartitionAssignmentStatus(isr: Array[Int], replicas: Array[Int],
|
||||
adding: Array[Int], removing: Array[Int],
|
||||
original: util.List[Int], isUnderReplicated: Boolean): Unit = {
|
||||
val leaderState = new PartitionState()
|
||||
val partitionRegistration = new PartitionRegistration.Builder()
|
||||
.setLeader(brokerId)
|
||||
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
|
||||
.setLeaderEpoch(6)
|
||||
.setIsr(isr)
|
||||
.setPartitionEpoch(1)
|
||||
.setReplicas(replicas)
|
||||
.setIsNew(false)
|
||||
if (!adding.isEmpty)
|
||||
leaderState.setAddingReplicas(adding)
|
||||
if (!removing.isEmpty)
|
||||
leaderState.setRemovingReplicas(removing)
|
||||
|
||||
val isReassigning = !adding.isEmpty || !removing.isEmpty
|
||||
.setDirectories(DirectoryId.unassignedArray(replicas.length))
|
||||
.setAddingReplicas(adding)
|
||||
.setRemovingReplicas(removing)
|
||||
.build()
|
||||
|
||||
// set the original replicas as the URP calculation will need them
|
||||
if (!original.isEmpty)
|
||||
partition.assignmentState = SimpleAssignmentState(original.asScala)
|
||||
// do the test
|
||||
partition.makeLeader(leaderState, offsetCheckpoints, None)
|
||||
partition.makeLeader(partitionRegistration, isNew = false, offsetCheckpoints, None)
|
||||
val isReassigning = !adding.isEmpty || !removing.isEmpty
|
||||
assertEquals(isReassigning, partition.isReassigning)
|
||||
if (!adding.isEmpty)
|
||||
adding.forEach(r => assertTrue(partition.isAddingReplica(r)))
|
||||
adding.foreach(r => assertTrue(partition.isAddingReplica(r)))
|
||||
if (adding.contains(brokerId))
|
||||
assertTrue(partition.isAddingLocalReplica)
|
||||
else
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package kafka.cluster
|
||||
|
||||
import java.lang.{Long => JLong}
|
||||
import java.util
|
||||
import java.util.{Optional, Properties}
|
||||
import java.util.concurrent._
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
@ -28,11 +27,10 @@ import kafka.utils._
|
|||
import org.apache.kafka.common.config.TopicConfig
|
||||
import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
|
||||
import org.apache.kafka.common.requests.FetchRequest
|
||||
import org.apache.kafka.common.PartitionState
|
||||
import org.apache.kafka.common.utils.Utils
|
||||
import org.apache.kafka.common.{TopicPartition, Uuid}
|
||||
import org.apache.kafka.common.{DirectoryId, TopicPartition, Uuid}
|
||||
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
|
||||
import org.apache.kafka.metadata.{LeaderAndIsr, MetadataCache, MockConfigRepository}
|
||||
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCache, MockConfigRepository, PartitionRegistration}
|
||||
import org.apache.kafka.server.common.{RequestLocal, TopicIdPartition}
|
||||
import org.apache.kafka.server.config.ReplicationConfigs
|
||||
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
|
||||
|
@ -141,18 +139,20 @@ class PartitionLockTest extends Logging {
|
|||
def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {
|
||||
val active = new AtomicBoolean(true)
|
||||
val replicaToCheck = 3
|
||||
val firstReplicaSet = util.List.of[Integer](3, 4, 5)
|
||||
val secondReplicaSet = util.List.of[Integer](1, 2, 3)
|
||||
def partitionState(replicas: util.List[Integer]) = new PartitionState()
|
||||
.setLeader(replicas.get(0))
|
||||
val firstReplicaSet = Array(3, 4, 5)
|
||||
val secondReplicaSet = Array(1, 2, 3)
|
||||
def partitionRegistration(replicas: Array[Int]) = new PartitionRegistration.Builder()
|
||||
.setLeader(replicas(0))
|
||||
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
|
||||
.setLeaderEpoch(1)
|
||||
.setIsr(replicas)
|
||||
.setPartitionEpoch(1)
|
||||
.setReplicas(replicas)
|
||||
.setIsNew(true)
|
||||
.setDirectories(DirectoryId.unassignedArray(replicas.length))
|
||||
.build()
|
||||
val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
|
||||
// Update replica set synchronously first to avoid race conditions
|
||||
partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints, None)
|
||||
partition.makeLeader(partitionRegistration(secondReplicaSet), isNew = true, offsetCheckpoints, None)
|
||||
assertTrue(partition.getReplica(replicaToCheck).isDefined, s"Expected replica $replicaToCheck to be defined")
|
||||
|
||||
val future = executorService.submit((() => {
|
||||
|
@ -165,7 +165,7 @@ class PartitionLockTest extends Logging {
|
|||
secondReplicaSet
|
||||
}
|
||||
|
||||
partition.makeLeader(partitionState(replicas), offsetCheckpoints, None)
|
||||
partition.makeLeader(partitionRegistration(replicas), isNew = true, offsetCheckpoints, None)
|
||||
|
||||
i += 1
|
||||
Thread.sleep(1) // just to avoid tight loop
|
||||
|
@ -344,17 +344,20 @@ class PartitionLockTest extends Logging {
|
|||
|
||||
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId))
|
||||
|
||||
val replicas = (0 to numReplicaFetchers).map(i => Integer.valueOf(brokerId + i)).toList.asJava
|
||||
val replicas = (0 to numReplicaFetchers).map(i => brokerId + i).toArray
|
||||
val isr = replicas
|
||||
replicas.forEach(replicaId => when(metadataCache.getAliveBrokerEpoch(replicaId)).thenReturn(Optional.of(1L)))
|
||||
replicas.foreach(replicaId => when(metadataCache.getAliveBrokerEpoch(replicaId)).thenReturn(Optional.of(1L)))
|
||||
|
||||
assertTrue(partition.makeLeader(new PartitionState()
|
||||
val partitionRegistration = new PartitionRegistration.Builder()
|
||||
.setLeader(brokerId)
|
||||
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
|
||||
.setLeaderEpoch(leaderEpoch)
|
||||
.setIsr(isr)
|
||||
.setPartitionEpoch(1)
|
||||
.setReplicas(replicas)
|
||||
.setIsNew(true), offsetCheckpoints, Some(topicId)), "Expected become leader transition to succeed")
|
||||
.setDirectories(DirectoryId.unassignedArray(replicas.length))
|
||||
.build()
|
||||
assertTrue(partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, Some(topicId)), "Expected become leader transition to succeed")
|
||||
|
||||
partition
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -43,7 +43,6 @@ import org.apache.kafka.common.metrics.Monitorable
|
|||
import org.apache.kafka.common.metrics.PluginMetrics
|
||||
import org.apache.kafka.common.network.ListenerName
|
||||
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
|
||||
import org.apache.kafka.common.{PartitionState => JPartitionState}
|
||||
import org.apache.kafka.common.record._
|
||||
import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
|
||||
import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView
|
||||
|
@ -56,7 +55,7 @@ import org.apache.kafka.common.utils.{LogContext, Time, Utils}
|
|||
import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, TransactionLogConfig}
|
||||
import org.apache.kafka.image._
|
||||
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
|
||||
import org.apache.kafka.metadata.MetadataCache
|
||||
import org.apache.kafka.metadata.{LeaderRecoveryState, MetadataCache, PartitionRegistration}
|
||||
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
|
||||
import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, MetadataVersion, OffsetAndEpoch, RequestLocal, StopPartition}
|
||||
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
|
||||
|
@ -77,7 +76,7 @@ import org.apache.kafka.server.util.timer.MockTimer
|
|||
import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
|
||||
import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
|
||||
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
|
||||
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, FetchDataInfo, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegments, ProducerStateManager, ProducerStateManagerConfig, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
|
||||
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, FetchDataInfo, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetsListener, LogOffsetSnapshot, LogSegments, ProducerStateManager, ProducerStateManagerConfig, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
|
||||
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
|
||||
import org.junit.jupiter.api.Assertions._
|
||||
import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeEach, Test}
|
||||
|
@ -98,7 +97,7 @@ import java.util.concurrent.{Callable, CompletableFuture, ConcurrentHashMap, Cou
|
|||
import java.util.function.{BiConsumer, Consumer}
|
||||
import java.util.stream.IntStream
|
||||
import java.util.{Collections, Optional, OptionalLong, Properties}
|
||||
import scala.collection.{Map, Seq, mutable}
|
||||
import scala.collection.{mutable, Map, Seq}
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.jdk.OptionConverters.{RichOption, RichOptional}
|
||||
|
||||
|
@ -135,7 +134,7 @@ class ReplicaManagerTest {
|
|||
private val quotaAvailableThrottleTime = 0
|
||||
|
||||
// Constants defined for readability
|
||||
private val zkVersion = 0
|
||||
private val partitionEpoch = 0
|
||||
private val brokerEpoch = 0L
|
||||
|
||||
// These metrics are static and once we remove them after each test, they won't be created and verified anymore
|
||||
|
@ -1253,7 +1252,7 @@ class ReplicaManagerTest {
|
|||
val leaderBrokerId = 1
|
||||
val leaderEpoch = 1
|
||||
val leaderEpochIncrement = 2
|
||||
val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId)
|
||||
val aliveBrokerIds = Array(followerBrokerId, leaderBrokerId)
|
||||
val countDownLatch = new CountDownLatch(1)
|
||||
|
||||
// Prepare the mocked components for the test
|
||||
|
@ -1267,8 +1266,8 @@ class ReplicaManagerTest {
|
|||
|
||||
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
|
||||
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
|
||||
partition.makeLeader(
|
||||
leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds),
|
||||
partition.makeLeader(partitionRegistration(leaderBrokerId, leaderEpoch, aliveBrokerIds, partitionEpoch, aliveBrokerIds),
|
||||
isNew = false,
|
||||
offsetCheckpoints,
|
||||
None)
|
||||
|
||||
|
@ -2654,20 +2653,21 @@ class ReplicaManagerTest {
|
|||
(replicaManager, mockLogMgr)
|
||||
}
|
||||
|
||||
private def leaderAndIsrPartitionState(topicPartition: TopicPartition,
|
||||
private def partitionRegistration(leader: Int,
|
||||
leaderEpoch: Int,
|
||||
leaderBrokerId: Int,
|
||||
aliveBrokerIds: Seq[Integer],
|
||||
isNew: Boolean = false): JPartitionState = {
|
||||
new JPartitionState()
|
||||
.setTopicName(topic)
|
||||
.setPartitionIndex(topicPartition.partition)
|
||||
.setLeader(leaderBrokerId)
|
||||
isr: Array[Int],
|
||||
partitionEpoch: Int,
|
||||
replicas: Array[Int]): PartitionRegistration = {
|
||||
new PartitionRegistration.Builder()
|
||||
.setLeader(leader)
|
||||
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
|
||||
.setLeaderEpoch(leaderEpoch)
|
||||
.setIsr(aliveBrokerIds.asJava)
|
||||
.setPartitionEpoch(zkVersion)
|
||||
.setReplicas(aliveBrokerIds.asJava)
|
||||
.setIsNew(isNew)
|
||||
.setIsr(isr)
|
||||
.setPartitionEpoch(partitionEpoch)
|
||||
.setReplicas(replicas)
|
||||
.setDirectories(DirectoryId.unassignedArray(replicas.length))
|
||||
.build()
|
||||
|
||||
}
|
||||
|
||||
private class CallbackResult[T] {
|
||||
|
@ -4157,6 +4157,7 @@ class ReplicaManagerTest {
|
|||
val localId = 1
|
||||
val topicPartition0 = new TopicIdPartition(FOO_UUID, 0, "foo")
|
||||
val directoryEventHandler = mock(classOf[DirectoryEventHandler])
|
||||
val aliveBrokerIds = Array(1, 2)
|
||||
|
||||
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, setupLogDirMetaProperties = true, directoryEventHandler = directoryEventHandler)
|
||||
try {
|
||||
|
@ -4164,7 +4165,8 @@ class ReplicaManagerTest {
|
|||
assertEquals(directoryIds.size, 2)
|
||||
val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, true, partitions = List(0), directoryIds = directoryIds)
|
||||
val (partition: Partition, isNewWhenCreatedForFirstTime: Boolean) = replicaManager.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, FOO_UUID).get
|
||||
partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(), 1, localId, Seq(1, 2)),
|
||||
partition.makeLeader(partitionRegistration(localId, 1, aliveBrokerIds, partitionEpoch, aliveBrokerIds),
|
||||
isNew = false,
|
||||
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava),
|
||||
None)
|
||||
|
||||
|
@ -5716,13 +5718,15 @@ class ReplicaManagerTest {
|
|||
val localId = 1
|
||||
val topicPartition0 = new TopicIdPartition(FOO_UUID, 0, Topic.GROUP_METADATA_TOPIC_NAME)
|
||||
val directoryEventHandler = mock(classOf[DirectoryEventHandler])
|
||||
val aliveBrokerIds = Array(1, 2)
|
||||
|
||||
val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, setupLogDirMetaProperties = true, directoryEventHandler = directoryEventHandler)
|
||||
val directoryIds = rm.logManager.directoryIdsSet.toList
|
||||
assertEquals(directoryIds.size, 2)
|
||||
val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, isStartIdLeader = true, directoryIds = directoryIds)
|
||||
val (partition: Partition, _) = rm.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, FOO_UUID).get
|
||||
partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(), 1, localId, Seq(1, 2)),
|
||||
partition.makeLeader(partitionRegistration(localId, 1, aliveBrokerIds, partitionEpoch, aliveBrokerIds),
|
||||
isNew = false,
|
||||
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava),
|
||||
None)
|
||||
|
||||
|
@ -5743,13 +5747,15 @@ class ReplicaManagerTest {
|
|||
val localId = 1
|
||||
val topicPartition0 = new TopicIdPartition(FOO_UUID, 0, Topic.GROUP_METADATA_TOPIC_NAME)
|
||||
val directoryEventHandler = mock(classOf[DirectoryEventHandler])
|
||||
val aliveBrokerIds = Array(1, 2)
|
||||
|
||||
val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, setupLogDirMetaProperties = true, directoryEventHandler = directoryEventHandler)
|
||||
val directoryIds = rm.logManager.directoryIdsSet.toList
|
||||
assertEquals(directoryIds.size, 2)
|
||||
val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, isStartIdLeader = true, directoryIds = directoryIds)
|
||||
val (partition: Partition, _) = rm.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, FOO_UUID).get
|
||||
partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(), 1, localId, Seq(1, 2)),
|
||||
partition.makeLeader(partitionRegistration(localId, 1, aliveBrokerIds, partitionEpoch, aliveBrokerIds),
|
||||
isNew = false,
|
||||
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava),
|
||||
None)
|
||||
|
||||
|
|
|
@ -35,7 +35,7 @@ import kafka.server.metadata.KRaftMetadataCache;
|
|||
import kafka.utils.TestUtils;
|
||||
|
||||
import org.apache.kafka.clients.FetchSessionHandler;
|
||||
import org.apache.kafka.common.PartitionState;
|
||||
import org.apache.kafka.common.DirectoryId;
|
||||
import org.apache.kafka.common.TopicIdPartition;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.Uuid;
|
||||
|
@ -52,7 +52,9 @@ import org.apache.kafka.common.requests.FetchResponse;
|
|||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.metadata.LeaderRecoveryState;
|
||||
import org.apache.kafka.metadata.MockConfigRepository;
|
||||
import org.apache.kafka.metadata.PartitionRegistration;
|
||||
import org.apache.kafka.server.common.MetadataVersion;
|
||||
import org.apache.kafka.server.common.OffsetAndEpoch;
|
||||
import org.apache.kafka.server.network.BrokerEndPoint;
|
||||
|
@ -163,19 +165,21 @@ public class ReplicaFetcherThreadBenchmark {
|
|||
for (int i = 0; i < partitionCount; i++) {
|
||||
TopicPartition tp = new TopicPartition("topic", i);
|
||||
|
||||
List<Integer> replicas = List.of(0, 1, 2);
|
||||
PartitionState partitionState = new PartitionState()
|
||||
int[] replicas = {0, 1, 2};
|
||||
PartitionRegistration partitionRegistration = new PartitionRegistration.Builder()
|
||||
.setLeader(0)
|
||||
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
|
||||
.setLeaderEpoch(0)
|
||||
.setIsr(replicas)
|
||||
.setPartitionEpoch(1)
|
||||
.setReplicas(replicas)
|
||||
.setIsNew(true);
|
||||
.setDirectories(DirectoryId.unassignedArray(replicas.length))
|
||||
.build();
|
||||
|
||||
OffsetCheckpoints checkpoints = (logDir, topicPartition) -> Optional.of(0L);
|
||||
Partition partition = replicaManager.createPartition(tp);
|
||||
|
||||
partition.makeFollower(partitionState, checkpoints, topicId, Option.empty());
|
||||
partition.makeFollower(partitionRegistration, true, checkpoints, topicId, Option.empty());
|
||||
pool.put(tp, partition);
|
||||
initialFetchStates.put(tp, new InitialFetchState(topicId, new BrokerEndPoint(3, "host", 3000), 0, 0));
|
||||
BaseRecords fetched = new BaseRecords() {
|
||||
|
|
|
@ -24,7 +24,7 @@ import kafka.log.LogManager;
|
|||
import kafka.server.AlterPartitionManager;
|
||||
import kafka.server.builders.LogManagerBuilder;
|
||||
|
||||
import org.apache.kafka.common.PartitionState;
|
||||
import org.apache.kafka.common.DirectoryId;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.common.compress.Compression;
|
||||
|
@ -32,8 +32,10 @@ import org.apache.kafka.common.record.MemoryRecords;
|
|||
import org.apache.kafka.common.record.SimpleRecord;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.metadata.LeaderRecoveryState;
|
||||
import org.apache.kafka.metadata.MetadataCache;
|
||||
import org.apache.kafka.metadata.MockConfigRepository;
|
||||
import org.apache.kafka.metadata.PartitionRegistration;
|
||||
import org.apache.kafka.server.util.KafkaScheduler;
|
||||
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints;
|
||||
import org.apache.kafka.storage.internals.log.CleanerConfig;
|
||||
|
@ -78,7 +80,7 @@ import scala.jdk.javaapi.OptionConverters;
|
|||
public class PartitionMakeFollowerBenchmark {
|
||||
private final File logDir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
|
||||
private final KafkaScheduler scheduler = new KafkaScheduler(1, true, "scheduler");
|
||||
private final List<Integer> replicas = List.of(0, 1, 2);
|
||||
private final int[] replicas = {0, 1, 2};
|
||||
private final OffsetCheckpoints offsetCheckpoints = Mockito.mock(OffsetCheckpoints.class);
|
||||
private final DelayedOperations delayedOperations = Mockito.mock(DelayedOperations.class);
|
||||
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
|
||||
|
@ -148,13 +150,15 @@ public class PartitionMakeFollowerBenchmark {
|
|||
|
||||
@Benchmark
|
||||
public boolean testMakeFollower() {
|
||||
PartitionState partitionState = new PartitionState()
|
||||
PartitionRegistration partitionRegistration = new PartitionRegistration.Builder()
|
||||
.setLeader(0)
|
||||
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
|
||||
.setLeaderEpoch(0)
|
||||
.setIsr(replicas)
|
||||
.setPartitionEpoch(1)
|
||||
.setReplicas(replicas)
|
||||
.setIsNew(true);
|
||||
return partition.makeFollower(partitionState, offsetCheckpoints, topicId, Option.empty());
|
||||
.setDirectories(DirectoryId.unassignedArray(replicas.length))
|
||||
.build();
|
||||
return partition.makeFollower(partitionRegistration, true, offsetCheckpoints, topicId, Option.empty());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,12 +24,14 @@ import kafka.log.LogManager;
|
|||
import kafka.server.AlterPartitionManager;
|
||||
import kafka.server.builders.LogManagerBuilder;
|
||||
|
||||
import org.apache.kafka.common.PartitionState;
|
||||
import org.apache.kafka.common.DirectoryId;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.metadata.LeaderRecoveryState;
|
||||
import org.apache.kafka.metadata.MetadataCache;
|
||||
import org.apache.kafka.metadata.MockConfigRepository;
|
||||
import org.apache.kafka.metadata.PartitionRegistration;
|
||||
import org.apache.kafka.server.replica.Replica;
|
||||
import org.apache.kafka.server.util.KafkaScheduler;
|
||||
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints;
|
||||
|
@ -54,7 +56,6 @@ import org.openjdk.jmh.annotations.TearDown;
|
|||
import org.openjdk.jmh.annotations.Warmup;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
|
@ -109,24 +110,24 @@ public class UpdateFollowerFetchStateBenchmark {
|
|||
DelayedOperations delayedOperations = new DelayedOperationsMock();
|
||||
|
||||
// one leader, plus two followers
|
||||
List<Integer> replicas = new ArrayList<>();
|
||||
replicas.add(0);
|
||||
replicas.add(1);
|
||||
replicas.add(2);
|
||||
PartitionState partitionState = new PartitionState()
|
||||
int[] replicas = {0, 1, 2};
|
||||
PartitionRegistration partitionRegistration = new PartitionRegistration.Builder()
|
||||
.setLeader(0)
|
||||
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
|
||||
.setLeaderEpoch(0)
|
||||
.setIsr(replicas)
|
||||
.setPartitionEpoch(1)
|
||||
.setReplicas(replicas)
|
||||
.setIsNew(true);
|
||||
.setDirectories(DirectoryId.unassignedArray(replicas.length))
|
||||
.build();
|
||||
AlterPartitionListener alterPartitionListener = Mockito.mock(AlterPartitionListener.class);
|
||||
AlterPartitionManager alterPartitionManager = Mockito.mock(AlterPartitionManager.class);
|
||||
partition = new Partition(topicPartition, 100,
|
||||
0, () -> -1, Time.SYSTEM,
|
||||
alterPartitionListener, delayedOperations,
|
||||
Mockito.mock(MetadataCache.class), logManager, alterPartitionManager, topicId);
|
||||
partition.makeLeader(partitionState, offsetCheckpoints, topicId, Option.empty());
|
||||
|
||||
partition.makeLeader(partitionRegistration, true, offsetCheckpoints, topicId, Option.empty());
|
||||
replica1 = partition.getReplica(1).get();
|
||||
replica2 = partition.getReplica(2).get();
|
||||
}
|
||||
|
|
|
@ -27,14 +27,16 @@ import kafka.server.builders.ReplicaManagerBuilder;
|
|||
import kafka.server.metadata.KRaftMetadataCache;
|
||||
import kafka.utils.TestUtils;
|
||||
|
||||
import org.apache.kafka.common.PartitionState;
|
||||
import org.apache.kafka.common.DirectoryId;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.metadata.ConfigRepository;
|
||||
import org.apache.kafka.metadata.LeaderRecoveryState;
|
||||
import org.apache.kafka.metadata.MockConfigRepository;
|
||||
import org.apache.kafka.metadata.PartitionRegistration;
|
||||
import org.apache.kafka.server.util.KafkaScheduler;
|
||||
import org.apache.kafka.server.util.Scheduler;
|
||||
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints;
|
||||
|
@ -181,28 +183,23 @@ public class PartitionCreationBench {
|
|||
topicPartitions.add(new TopicPartition(topicName, partitionNum));
|
||||
}
|
||||
|
||||
List<Integer> replicas = new ArrayList<>();
|
||||
replicas.add(0);
|
||||
replicas.add(1);
|
||||
replicas.add(2);
|
||||
int[] replicas = {0, 1, 2};
|
||||
|
||||
OffsetCheckpoints checkpoints = (logDir, topicPartition) -> Optional.of(0L);
|
||||
for (TopicPartition topicPartition : topicPartitions) {
|
||||
final Partition partition = this.replicaManager.createPartition(topicPartition);
|
||||
List<Integer> inSync = new ArrayList<>();
|
||||
inSync.add(0);
|
||||
inSync.add(1);
|
||||
inSync.add(2);
|
||||
int[] isr = {0, 1, 2};
|
||||
|
||||
PartitionState partitionState = new PartitionState()
|
||||
PartitionRegistration partitionRegistration = new PartitionRegistration.Builder()
|
||||
.setLeader(0)
|
||||
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
|
||||
.setLeaderEpoch(0)
|
||||
.setIsr(inSync)
|
||||
.setIsr(isr)
|
||||
.setPartitionEpoch(1)
|
||||
.setReplicas(replicas)
|
||||
.setIsNew(true);
|
||||
|
||||
partition.makeFollower(partitionState, checkpoints, topicId, Option.empty());
|
||||
.setDirectories(DirectoryId.unassignedArray(replicas.length))
|
||||
.build();
|
||||
partition.makeFollower(partitionRegistration, true, checkpoints, topicId, Option.empty());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,8 +18,6 @@
|
|||
package org.apache.kafka.metadata;
|
||||
|
||||
import org.apache.kafka.common.DirectoryId;
|
||||
import org.apache.kafka.common.PartitionState;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.common.errors.InvalidReplicaDirectoriesException;
|
||||
import org.apache.kafka.common.metadata.PartitionChangeRecord;
|
||||
|
@ -410,21 +408,6 @@ public class PartitionRegistration {
|
|||
return new ApiMessageAndVersion(record, options.metadataVersion().partitionRecordVersion());
|
||||
}
|
||||
|
||||
public PartitionState toLeaderAndIsrPartitionState(TopicPartition tp, boolean isNew) {
|
||||
return new PartitionState().
|
||||
setTopicName(tp.topic()).
|
||||
setPartitionIndex(tp.partition()).
|
||||
setLeader(leader).
|
||||
setLeaderEpoch(leaderEpoch).
|
||||
setIsr(Replicas.toList(isr)).
|
||||
setPartitionEpoch(partitionEpoch).
|
||||
setReplicas(Replicas.toList(replicas)).
|
||||
setAddingReplicas(Replicas.toList(addingReplicas)).
|
||||
setRemovingReplicas(Replicas.toList(removingReplicas)).
|
||||
setLeaderRecoveryState(leaderRecoveryState.value()).
|
||||
setIsNew(isNew);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(Arrays.hashCode(replicas), Arrays.hashCode(isr), Arrays.hashCode(removingReplicas),
|
||||
|
|
|
@ -18,8 +18,6 @@
|
|||
package org.apache.kafka.metadata;
|
||||
|
||||
import org.apache.kafka.common.DirectoryId;
|
||||
import org.apache.kafka.common.PartitionState;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.common.metadata.PartitionChangeRecord;
|
||||
import org.apache.kafka.common.metadata.PartitionRecord;
|
||||
|
@ -94,50 +92,6 @@ public class PartitionRegistrationTest {
|
|||
assertEquals(registrationA, registrationB);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToLeaderAndIsrPartitionState() {
|
||||
PartitionRegistration a = new PartitionRegistration.Builder().
|
||||
setReplicas(new int[]{1, 2, 3}).
|
||||
setDirectories(new Uuid[]{
|
||||
Uuid.fromString("NSmkU0ieQuy2IHN59Ce0Bw"),
|
||||
Uuid.fromString("Y8N9gnSKSLKKFCioX2laGA"),
|
||||
Uuid.fromString("Oi7nvb8KQPyaGEqr4JtCRw")
|
||||
}).
|
||||
setIsr(new int[]{1, 2}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(123).setPartitionEpoch(456).build();
|
||||
PartitionRegistration b = new PartitionRegistration.Builder().
|
||||
setReplicas(new int[]{2, 3, 4}).
|
||||
setDirectories(new Uuid[]{
|
||||
Uuid.fromString("tAn3q03aQAWEYkNajXm3lA"),
|
||||
Uuid.fromString("zgj8rqatTmWMyWBsRZyiVg"),
|
||||
Uuid.fromString("bAAlGAz1TN2doZjtWlvhRQ")
|
||||
}).
|
||||
setIsr(new int[]{2, 3, 4}).setLeader(2).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(234).setPartitionEpoch(567).build();
|
||||
assertEquals(new PartitionState().
|
||||
setTopicName("foo").
|
||||
setPartitionIndex(1).
|
||||
setLeader(1).
|
||||
setLeaderEpoch(123).
|
||||
setIsr(List.of(1, 2)).
|
||||
setPartitionEpoch(456).
|
||||
setReplicas(List.of(1, 2, 3)).
|
||||
setAddingReplicas(List.of()).
|
||||
setRemovingReplicas(List.of()).
|
||||
setIsNew(true).toString(),
|
||||
a.toLeaderAndIsrPartitionState(new TopicPartition("foo", 1), true).toString());
|
||||
assertEquals(new PartitionState().
|
||||
setTopicName("bar").
|
||||
setPartitionIndex(0).
|
||||
setLeader(2).
|
||||
setLeaderEpoch(234).
|
||||
setIsr(List.of(2, 3, 4)).
|
||||
setPartitionEpoch(567).
|
||||
setReplicas(List.of(2, 3, 4)).
|
||||
setAddingReplicas(List.of()).
|
||||
setRemovingReplicas(List.of()).
|
||||
setIsNew(false).toString(),
|
||||
b.toLeaderAndIsrPartitionState(new TopicPartition("bar", 0), false).toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergePartitionChangeRecordWithReassignmentData() {
|
||||
Uuid dir1 = Uuid.fromString("FbRuu7CeQtq5YFreEzg16g");
|
||||
|
|
|
@ -1,178 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.apache.kafka.common;
|
||||
|
||||
import org.apache.kafka.common.protocol.MessageUtil;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class PartitionState {
|
||||
String topicName = "";
|
||||
int partitionIndex = 0;
|
||||
int leader = 0;
|
||||
int leaderEpoch = 0;
|
||||
List<Integer> isr = new ArrayList<>(0);
|
||||
int partitionEpoch = 0;
|
||||
List<Integer> replicas = new ArrayList<>(0);
|
||||
List<Integer> addingReplicas = new ArrayList<>(0);
|
||||
List<Integer> removingReplicas = new ArrayList<>(0);
|
||||
boolean isNew = false;
|
||||
byte leaderRecoveryState = (byte) 0;
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
PartitionState that = (PartitionState) o;
|
||||
return partitionIndex == that.partitionIndex &&
|
||||
leader == that.leader &&
|
||||
leaderEpoch == that.leaderEpoch &&
|
||||
partitionEpoch == that.partitionEpoch &&
|
||||
isNew == that.isNew &&
|
||||
leaderRecoveryState == that.leaderRecoveryState &&
|
||||
Objects.equals(topicName, that.topicName) &&
|
||||
Objects.equals(isr, that.isr) &&
|
||||
Objects.equals(replicas, that.replicas) &&
|
||||
Objects.equals(addingReplicas, that.addingReplicas) &&
|
||||
Objects.equals(removingReplicas, that.removingReplicas);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(topicName, partitionIndex, leader, leaderEpoch, isr, partitionEpoch,
|
||||
replicas, addingReplicas, removingReplicas, isNew, leaderRecoveryState);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "PartitionState("
|
||||
+ "topicName='" + topicName + "'"
|
||||
+ ", partitionIndex=" + partitionIndex
|
||||
+ ", leader=" + leader
|
||||
+ ", leaderEpoch=" + leaderEpoch
|
||||
+ ", isr=" + MessageUtil.deepToString(isr.iterator())
|
||||
+ ", partitionEpoch=" + partitionEpoch
|
||||
+ ", replicas=" + MessageUtil.deepToString(replicas.iterator())
|
||||
+ ", addingReplicas=" + MessageUtil.deepToString(addingReplicas.iterator())
|
||||
+ ", removingReplicas=" + MessageUtil.deepToString(removingReplicas.iterator())
|
||||
+ ", isNew=" + (isNew ? "true" : "false")
|
||||
+ ", leaderRecoveryState=" + leaderRecoveryState
|
||||
+ ")";
|
||||
}
|
||||
|
||||
public String topicName() {
|
||||
return this.topicName;
|
||||
}
|
||||
|
||||
public int partitionIndex() {
|
||||
return this.partitionIndex;
|
||||
}
|
||||
|
||||
public int leader() {
|
||||
return this.leader;
|
||||
}
|
||||
|
||||
public int leaderEpoch() {
|
||||
return this.leaderEpoch;
|
||||
}
|
||||
|
||||
public List<Integer> isr() {
|
||||
return this.isr;
|
||||
}
|
||||
|
||||
public int partitionEpoch() {
|
||||
return this.partitionEpoch;
|
||||
}
|
||||
|
||||
public List<Integer> replicas() {
|
||||
return this.replicas;
|
||||
}
|
||||
|
||||
public List<Integer> addingReplicas() {
|
||||
return this.addingReplicas;
|
||||
}
|
||||
|
||||
public List<Integer> removingReplicas() {
|
||||
return this.removingReplicas;
|
||||
}
|
||||
|
||||
public boolean isNew() {
|
||||
return this.isNew;
|
||||
}
|
||||
|
||||
public byte leaderRecoveryState() {
|
||||
return this.leaderRecoveryState;
|
||||
}
|
||||
|
||||
public PartitionState setTopicName(String v) {
|
||||
this.topicName = v;
|
||||
return this;
|
||||
}
|
||||
|
||||
public PartitionState setPartitionIndex(int v) {
|
||||
this.partitionIndex = v;
|
||||
return this;
|
||||
}
|
||||
|
||||
public PartitionState setLeader(int v) {
|
||||
this.leader = v;
|
||||
return this;
|
||||
}
|
||||
|
||||
public PartitionState setLeaderEpoch(int v) {
|
||||
this.leaderEpoch = v;
|
||||
return this;
|
||||
}
|
||||
|
||||
public PartitionState setIsr(List<Integer> v) {
|
||||
this.isr = v;
|
||||
return this;
|
||||
}
|
||||
|
||||
public PartitionState setPartitionEpoch(int v) {
|
||||
this.partitionEpoch = v;
|
||||
return this;
|
||||
}
|
||||
|
||||
public PartitionState setReplicas(List<Integer> v) {
|
||||
this.replicas = v;
|
||||
return this;
|
||||
}
|
||||
|
||||
public PartitionState setAddingReplicas(List<Integer> v) {
|
||||
this.addingReplicas = v;
|
||||
return this;
|
||||
}
|
||||
|
||||
public PartitionState setRemovingReplicas(List<Integer> v) {
|
||||
this.removingReplicas = v;
|
||||
return this;
|
||||
}
|
||||
|
||||
public PartitionState setIsNew(boolean v) {
|
||||
this.isNew = v;
|
||||
return this;
|
||||
}
|
||||
|
||||
public PartitionState setLeaderRecoveryState(byte v) {
|
||||
this.leaderRecoveryState = v;
|
||||
return this;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue