From cae984816015e656893f17c4b53ac569fe53008e Mon Sep 17 00:00:00 2001 From: Ming-Yen Chung Date: Sun, 17 Aug 2025 18:14:09 +0800 Subject: [PATCH] KAFKA-19447 Replace PartitionState with PartitionRegistration in makeFollower/makeLeader (#20335) Follow-up to [KAFKA-18486](https://issues.apache.org/jira/browse/KAFKA-18486) * Replace PartitionState with PartitionRegistration in makeFollower/makeLeader * Remove PartitionState.java since it is no longer referenced Reviewers: TaiJuWu , Ken Huang , Chia-Ping Tsai --- .../main/scala/kafka/cluster/Partition.scala | 89 +- .../scala/kafka/server/ReplicaManager.scala | 6 +- .../kafka/cluster/AbstractPartitionTest.scala | 34 +- .../kafka/cluster/AssignmentStateTest.scala | 96 +- .../kafka/cluster/PartitionLockTest.scala | 33 +- .../unit/kafka/cluster/PartitionTest.scala | 1049 +++++++++-------- .../kafka/server/ReplicaManagerTest.scala | 58 +- .../ReplicaFetcherThreadBenchmark.java | 24 +- .../PartitionMakeFollowerBenchmark.java | 14 +- .../UpdateFollowerFetchStateBenchmark.java | 19 +- .../jmh/server/PartitionCreationBench.java | 25 +- .../kafka/metadata/PartitionRegistration.java | 17 - .../metadata/PartitionRegistrationTest.java | 46 - .../apache/kafka/common/PartitionState.java | 178 --- 14 files changed, 741 insertions(+), 947 deletions(-) delete mode 100644 server-common/src/main/java/org/apache/kafka/common/PartitionState.java diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index d896217b2fe..3b45a08b067 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -36,13 +36,12 @@ import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record.{FileRecords, MemoryRecords, RecordBatch} import org.apache.kafka.common.requests._ import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} -import org.apache.kafka.common.{PartitionState => JPartitionState} import org.apache.kafka.common.utils.Time -import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCache} +import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCache, PartitionRegistration} import org.apache.kafka.server.common.RequestLocal import org.apache.kafka.server.log.remote.TopicPartitionLog import org.apache.kafka.server.log.remote.storage.RemoteLogManager -import org.apache.kafka.storage.internals.log.{AppendOrigin, AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AppendOrigin, AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetsListener, LogOffsetSnapshot, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog, VerificationGuard} import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, TopicPartitionOperationKey} import org.apache.kafka.server.replica.Replica @@ -731,7 +730,8 @@ class Partition(val topicPartition: TopicPartition, * from the time when this broker was the leader last time) and setting the new leader and ISR. * If the leader replica id does not change, return false to indicate the replica manager. */ - def makeLeader(partitionState: JPartitionState, + def makeLeader(partitionRegistration: PartitionRegistration, + isNew: Boolean, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetDirectoryId: Option[Uuid] = None): Boolean = { @@ -739,23 +739,23 @@ class Partition(val topicPartition: TopicPartition, // Partition state changes are expected to have a partition epoch larger or equal // to the current partition epoch. The latter is allowed because the partition epoch // is also updated by the AlterPartition response so the new epoch might be known - // before a LeaderAndIsr request is received or before an update is received via + // before a partitionRegistration is received or before an update is received via // the metadata log. - if (partitionState.partitionEpoch < partitionEpoch) { - stateChangeLogger.info(s"Skipped the become-leader state change for $topicPartition with topic id $topicId " + - s"and partition state $partitionState since the leader is already at a newer partition epoch $partitionEpoch.") + if (partitionRegistration.partitionEpoch < partitionEpoch) { + stateChangeLogger.info(s"Skipped the become-leader state change for $topicPartition with topic id $topicId, " + + s"partition registration $partitionRegistration and isNew=$isNew since the leader is already at a newer partition epoch $partitionEpoch.") return false } val currentTimeMs = time.milliseconds val isNewLeader = !isLeader - val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch - val replicas = partitionState.replicas.asScala.map(_.toInt) - val isr = partitionState.isr.asScala.map(_.toInt).toSet - val addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt) - val removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt) + val isNewLeaderEpoch = partitionRegistration.leaderEpoch > leaderEpoch + val replicas = partitionRegistration.replicas + val isr = partitionRegistration.isr.toSet + val addingReplicas = partitionRegistration.addingReplicas + val removingReplicas = partitionRegistration.removingReplicas - if (partitionState.leaderRecoveryState == LeaderRecoveryState.RECOVERING.value) { + if (partitionRegistration.leaderRecoveryState == LeaderRecoveryState.RECOVERING) { stateChangeLogger.info(s"The topic partition $topicPartition was marked as RECOVERING. " + "Marking the topic partition as RECOVERED.") } @@ -771,7 +771,7 @@ class Partition(val topicPartition: TopicPartition, LeaderRecoveryState.RECOVERED ) - createLogInAssignedDirectoryId(partitionState, highWatermarkCheckpoints, topicId, targetDirectoryId) + createLogInAssignedDirectoryId(isNew, highWatermarkCheckpoints, topicId, targetDirectoryId) val leaderLog = localLogOrException @@ -780,8 +780,8 @@ class Partition(val topicPartition: TopicPartition, if (isNewLeaderEpoch) { val leaderEpochStartOffset = leaderLog.logEndOffset stateChangeLogger.info(s"Leader $topicPartition with topic id $topicId starts at " + - s"leader epoch ${partitionState.leaderEpoch} from offset $leaderEpochStartOffset " + - s"with partition epoch ${partitionState.partitionEpoch}, high watermark ${leaderLog.highWatermark}, " + + s"leader epoch ${partitionRegistration.leaderEpoch} from offset $leaderEpochStartOffset " + + s"with partition epoch ${partitionRegistration.partitionEpoch}, high watermark ${leaderLog.highWatermark}, " + s"ISR ${isr.mkString("[", ",", "]")}, adding replicas ${addingReplicas.mkString("[", ",", "]")} and " + s"removing replicas ${removingReplicas.mkString("[", ",", "]")} ${if (isUnderMinIsr) "(under-min-isr)" else ""}. " + s"Previous leader $leaderReplicaIdOpt and previous leader epoch was $leaderEpoch.") @@ -791,7 +791,7 @@ class Partition(val topicPartition: TopicPartition, // to ensure that these followers can truncate to the right offset, we must cache the new // leader epoch and the start offset since it should be larger than any epoch that a follower // would try to query. - leaderLog.assignEpochStartOffset(partitionState.leaderEpoch, leaderEpochStartOffset) + leaderLog.assignEpochStartOffset(partitionRegistration.leaderEpoch, leaderEpochStartOffset) // Initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and // lastFetchLeaderLogEndOffset. @@ -800,23 +800,23 @@ class Partition(val topicPartition: TopicPartition, currentTimeMs, leaderEpochStartOffset, isNewLeader, - partitionState.isr.contains(replica.brokerId) + isr.contains(replica.brokerId) ) } // We update the leader epoch and the leader epoch start offset iff the // leader epoch changed. - leaderEpoch = partitionState.leaderEpoch + leaderEpoch = partitionRegistration.leaderEpoch leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset) } else { - stateChangeLogger.info(s"Skipped the become-leader state change for $topicPartition with topic id $topicId " + - s"and partition state $partitionState since it is already the leader with leader epoch $leaderEpoch. " + + stateChangeLogger.info(s"Skipped the become-leader state change for $topicPartition with topic id $topicId, " + + s"partition registration $partitionRegistration and isNew=$isNew since it is already the leader with leader epoch $leaderEpoch. " + s"Current high watermark ${leaderLog.highWatermark}, ISR ${isr.mkString("[", ",", "]")}, " + s"adding replicas ${addingReplicas.mkString("[", ",", "]")} and " + s"removing replicas ${removingReplicas.mkString("[", ",", "]")}.") } - partitionEpoch = partitionState.partitionEpoch + partitionEpoch = partitionRegistration.partitionEpoch leaderReplicaIdOpt = Some(localBrokerId) // We may need to increment high watermark since ISR could be down to 1. @@ -837,46 +837,47 @@ class Partition(val topicPartition: TopicPartition, * replica manager that state is already correct and the become-follower steps can * be skipped. */ - def makeFollower(partitionState: JPartitionState, + def makeFollower(partitionRegistration: PartitionRegistration, + isNew: Boolean, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid] = None): Boolean = { inWriteLock(leaderIsrUpdateLock) { - if (partitionState.partitionEpoch < partitionEpoch) { - stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId " + - s"and partition state $partitionState since the follower is already at a newer partition epoch $partitionEpoch.") + if (partitionRegistration.partitionEpoch < partitionEpoch) { + stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId, " + + s"partition registration $partitionRegistration and isNew=$isNew since the follower is already at a newer partition epoch $partitionEpoch.") return false } - val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch + val isNewLeaderEpoch = partitionRegistration.leaderEpoch > leaderEpoch // The leader should be updated before updateAssignmentAndIsr where we clear the ISR. Or it is possible to meet // the under min isr condition during the makeFollower process and emits the wrong metric. - leaderReplicaIdOpt = Option(partitionState.leader) - leaderEpoch = partitionState.leaderEpoch + leaderReplicaIdOpt = Option(partitionRegistration.leader) + leaderEpoch = partitionRegistration.leaderEpoch leaderEpochStartOffsetOpt = None - partitionEpoch = partitionState.partitionEpoch + partitionEpoch = partitionRegistration.partitionEpoch updateAssignmentAndIsr( - replicas = partitionState.replicas.asScala.iterator.map(_.toInt).toSeq, + replicas = partitionRegistration.replicas, isLeader = false, isr = Set.empty, - addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt), - removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt), - LeaderRecoveryState.of(partitionState.leaderRecoveryState) + addingReplicas = partitionRegistration.addingReplicas, + removingReplicas = partitionRegistration.removingReplicas, + partitionRegistration.leaderRecoveryState ) - createLogInAssignedDirectoryId(partitionState, highWatermarkCheckpoints, topicId, targetLogDirectoryId) + createLogInAssignedDirectoryId(isNew, highWatermarkCheckpoints, topicId, targetLogDirectoryId) val followerLog = localLogOrException if (isNewLeaderEpoch) { val leaderEpochEndOffset = followerLog.logEndOffset - stateChangeLogger.info(s"Follower $topicPartition starts at leader epoch ${partitionState.leaderEpoch} from " + - s"offset $leaderEpochEndOffset with partition epoch ${partitionState.partitionEpoch} and " + - s"high watermark ${followerLog.highWatermark}. Current leader is ${partitionState.leader}. " + + stateChangeLogger.info(s"Follower $topicPartition starts at leader epoch ${partitionRegistration.leaderEpoch} from " + + s"offset $leaderEpochEndOffset with partition epoch ${partitionRegistration.partitionEpoch} and " + + s"high watermark ${followerLog.highWatermark}. Current leader is ${partitionRegistration.leader}. " + s"Previous leader $leaderReplicaIdOpt and previous leader epoch was $leaderEpoch.") } else { - stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId " + - s"and partition state $partitionState since it is already a follower with leader epoch $leaderEpoch.") + stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId, " + + s"partition registration $partitionRegistration and isNew=$isNew since it is already a follower with leader epoch $leaderEpoch.") } // We must restart the fetchers when the leader epoch changed regardless of @@ -885,11 +886,11 @@ class Partition(val topicPartition: TopicPartition, } } - private def createLogInAssignedDirectoryId(partitionState: JPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = { + private def createLogInAssignedDirectoryId(isNew: Boolean, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = { targetLogDirectoryId match { case Some(directoryId) => if (logManager.onlineLogDirId(directoryId) || !logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) { - createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) + createLogIfNotExists(isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) } else { warn(s"Skipping creation of log because there are potentially offline log " + s"directories and log may already exist there. directoryId=$directoryId, " + @@ -897,7 +898,7 @@ class Partition(val topicPartition: TopicPartition, } case None => - createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId) + createLogIfNotExists(isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId) } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 8fa705ef8c4..496e50208db 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2409,9 +2409,8 @@ class ReplicaManager(val config: KafkaConfig, localLeaders.foreachEntry { (tp, info) => getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) => try { - val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew) val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2) - partition.makeLeader(state, offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId) + partition.makeLeader(info.partition, isNew, offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId) changedPartitions.add(partition) } catch { @@ -2451,9 +2450,8 @@ class ReplicaManager(val config: KafkaConfig, // - This also ensures that the local replica is created even if the leader // is unavailable. This is required to ensure that we include the partition's // high watermark in the checkpoint file (see KAFKA-1647). - val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew) val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2) - val isNewLeaderEpoch = partition.makeFollower(state, offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId) + val isNewLeaderEpoch = partition.makeFollower(info.partition, isNew, offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId) if (isInControlledShutdown && (info.partition.leader == NO_LEADER || !info.partition.isr.contains(config.brokerId))) { diff --git a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala index c15f62df51a..d475c6e4291 100644 --- a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala @@ -19,11 +19,10 @@ package kafka.cluster import kafka.log.LogManager import kafka.utils.TestUtils import kafka.utils.TestUtils.MockAlterPartitionManager -import org.apache.kafka.common.{TopicPartition, Uuid} +import org.apache.kafka.common.{DirectoryId, TopicPartition, Uuid} import org.apache.kafka.common.config.TopicConfig -import org.apache.kafka.common.PartitionState import org.apache.kafka.common.utils.Utils -import org.apache.kafka.metadata.{MetadataCache, MockConfigRepository} +import org.apache.kafka.metadata.{LeaderRecoveryState, MetadataCache, MockConfigRepository, PartitionRegistration} import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.config.ReplicationConfigs import org.apache.kafka.server.util.MockTime @@ -119,28 +118,25 @@ class AbstractPartitionTest { isLeader: Boolean): Partition = { partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - val replicas = java.util.List.of[Integer](brokerId, remoteReplicaId) + val replicas = Array(brokerId, remoteReplicaId) val isr = replicas + val partitionRegistrationBuilder = new PartitionRegistration.Builder() + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(leaderEpoch) + .setIsr(isr) + .setPartitionEpoch(1) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) if (isLeader) { - assertTrue(partition.makeLeader(new PartitionState() - .setLeader(brokerId) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setPartitionEpoch(1) - .setReplicas(replicas) - .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") + val partitionRegistration = partitionRegistrationBuilder.setLeader(brokerId).build() + assertTrue(partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, None), "Expected become leader transition to succeed") assertEquals(leaderEpoch, partition.getLeaderEpoch) } else { - assertTrue(partition.makeFollower(new PartitionState() - .setLeader(remoteReplicaId) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setPartitionEpoch(1) - .setReplicas(replicas) - .setIsNew(true), offsetCheckpoints, None), "Expected become follower transition to succeed") + val partitionRegistration = partitionRegistrationBuilder.setLeader(remoteReplicaId).build() + assertTrue(partition.makeFollower(partitionRegistration, isNew = true, offsetCheckpoints, None), "Expected become follower transition to succeed") assertEquals(leaderEpoch, partition.getLeaderEpoch) - assertEquals(None, partition.leaderLogIfLocal) + assertTrue(partition.leaderLogIfLocal.isEmpty) } partition diff --git a/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala b/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala index 0f995661d3f..6172afd286d 100644 --- a/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala @@ -16,13 +16,13 @@ */ package kafka.cluster -import org.apache.kafka.common.PartitionState +import org.apache.kafka.common.DirectoryId +import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{Arguments, MethodSource} import java.util - import scala.jdk.CollectionConverters._ object AssignmentStateTest { @@ -30,54 +30,54 @@ object AssignmentStateTest { def parameters: util.stream.Stream[Arguments] = util.List.of[Arguments]( Arguments.of( - util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), - util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), - util.List.of[Integer], util.List.of[Integer], util.List.of[Int], Boolean.box(false)), + Array(brokerId, brokerId + 1, brokerId + 2), + Array(brokerId, brokerId + 1, brokerId + 2), + Array.emptyIntArray, Array.emptyIntArray, util.List.of[Int], Boolean.box(false)), Arguments.of( - util.List.of[Integer](brokerId, brokerId + 1), - util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), - util.List.of[Integer], util.List.of[Integer], util.List.of[Int], Boolean.box(true)), + Array(brokerId, brokerId + 1), + Array(brokerId, brokerId + 1, brokerId + 2), + Array.emptyIntArray, Array.emptyIntArray, util.List.of[Int], Boolean.box(true)), Arguments.of( - util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), - util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), - util.List.of[Integer](brokerId + 3, brokerId + 4), - util.List.of[Integer](brokerId + 1), + Array(brokerId, brokerId + 1, brokerId + 2), + Array(brokerId, brokerId + 1, brokerId + 2), + Array(brokerId + 3, brokerId + 4), + Array(brokerId + 1), util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), Arguments.of( - util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), - util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), - util.List.of[Integer](brokerId + 3, brokerId + 4), - util.List.of[Integer], + Array(brokerId, brokerId + 1, brokerId + 2), + Array(brokerId, brokerId + 1, brokerId + 2), + Array(brokerId + 3, brokerId + 4), + Array.emptyIntArray, util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), Arguments.of( - util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), - util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), - util.List.of[Integer], - util.List.of[Integer](brokerId + 1), + Array(brokerId, brokerId + 1, brokerId + 2), + Array(brokerId, brokerId + 1, brokerId + 2), + Array.emptyIntArray, + Array(brokerId + 1), util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), Arguments.of( - util.List.of[Integer](brokerId + 1, brokerId + 2), - util.List.of[Integer](brokerId + 1, brokerId + 2), - util.List.of[Integer](brokerId), - util.List.of[Integer], + Array(brokerId + 1, brokerId + 2), + Array(brokerId + 1, brokerId + 2), + Array(brokerId), + Array.emptyIntArray, util.List.of(brokerId + 1, brokerId + 2), Boolean.box(false)), Arguments.of( - util.List.of[Integer](brokerId + 2, brokerId + 3, brokerId + 4), - util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), - util.List.of[Integer](brokerId + 3, brokerId + 4, brokerId + 5), - util.List.of[Integer], + Array(brokerId + 2, brokerId + 3, brokerId + 4), + Array(brokerId, brokerId + 1, brokerId + 2), + Array(brokerId + 3, brokerId + 4, brokerId + 5), + Array.emptyIntArray, util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), Arguments.of( - util.List.of[Integer](brokerId + 2, brokerId + 3, brokerId + 4), - util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), - util.List.of[Integer](brokerId + 3, brokerId + 4, brokerId + 5), - util.List.of[Integer], + Array(brokerId + 2, brokerId + 3, brokerId + 4), + Array(brokerId, brokerId + 1, brokerId + 2), + Array(brokerId + 3, brokerId + 4, brokerId + 5), + Array.emptyIntArray, util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(false)), Arguments.of( - util.List.of[Integer](brokerId + 2, brokerId + 3), - util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2), - util.List.of[Integer](brokerId + 3, brokerId + 4, brokerId + 5), - util.List.of[Integer], + Array(brokerId + 2, brokerId + 3), + Array(brokerId, brokerId + 1, brokerId + 2), + Array(brokerId + 3, brokerId + 4, brokerId + 5), + Array.emptyIntArray, util.List.of(brokerId, brokerId + 1, brokerId + 2), Boolean.box(true)) ).stream() } @@ -86,31 +86,29 @@ class AssignmentStateTest extends AbstractPartitionTest { @ParameterizedTest @MethodSource(Array("parameters")) - def testPartitionAssignmentStatus(isr: util.List[Integer], replicas: util.List[Integer], - adding: util.List[Integer], removing: util.List[Integer], + def testPartitionAssignmentStatus(isr: Array[Int], replicas: Array[Int], + adding: Array[Int], removing: Array[Int], original: util.List[Int], isUnderReplicated: Boolean): Unit = { - val leaderState = new PartitionState() + val partitionRegistration = new PartitionRegistration.Builder() .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(6) .setIsr(isr) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(false) - if (!adding.isEmpty) - leaderState.setAddingReplicas(adding) - if (!removing.isEmpty) - leaderState.setRemovingReplicas(removing) - - val isReassigning = !adding.isEmpty || !removing.isEmpty + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .setAddingReplicas(adding) + .setRemovingReplicas(removing) + .build() // set the original replicas as the URP calculation will need them if (!original.isEmpty) partition.assignmentState = SimpleAssignmentState(original.asScala) // do the test - partition.makeLeader(leaderState, offsetCheckpoints, None) + partition.makeLeader(partitionRegistration, isNew = false, offsetCheckpoints, None) + val isReassigning = !adding.isEmpty || !removing.isEmpty assertEquals(isReassigning, partition.isReassigning) - if (!adding.isEmpty) - adding.forEach(r => assertTrue(partition.isAddingReplica(r))) + adding.foreach(r => assertTrue(partition.isAddingReplica(r))) if (adding.contains(brokerId)) assertTrue(partition.isAddingLocalReplica) else diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index 7e4800ce5b1..fe262360a32 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -18,7 +18,6 @@ package kafka.cluster import java.lang.{Long => JLong} -import java.util import java.util.{Optional, Properties} import java.util.concurrent._ import java.util.concurrent.atomic.AtomicBoolean @@ -28,11 +27,10 @@ import kafka.utils._ import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord} import org.apache.kafka.common.requests.FetchRequest -import org.apache.kafka.common.PartitionState import org.apache.kafka.common.utils.Utils -import org.apache.kafka.common.{TopicPartition, Uuid} +import org.apache.kafka.common.{DirectoryId, TopicPartition, Uuid} import org.apache.kafka.coordinator.transaction.TransactionLogConfig -import org.apache.kafka.metadata.{LeaderAndIsr, MetadataCache, MockConfigRepository} +import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCache, MockConfigRepository, PartitionRegistration} import org.apache.kafka.server.common.{RequestLocal, TopicIdPartition} import org.apache.kafka.server.config.ReplicationConfigs import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams} @@ -141,18 +139,20 @@ class PartitionLockTest extends Logging { def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = { val active = new AtomicBoolean(true) val replicaToCheck = 3 - val firstReplicaSet = util.List.of[Integer](3, 4, 5) - val secondReplicaSet = util.List.of[Integer](1, 2, 3) - def partitionState(replicas: util.List[Integer]) = new PartitionState() - .setLeader(replicas.get(0)) + val firstReplicaSet = Array(3, 4, 5) + val secondReplicaSet = Array(1, 2, 3) + def partitionRegistration(replicas: Array[Int]) = new PartitionRegistration.Builder() + .setLeader(replicas(0)) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(1) .setIsr(replicas) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(true) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints]) // Update replica set synchronously first to avoid race conditions - partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints, None) + partition.makeLeader(partitionRegistration(secondReplicaSet), isNew = true, offsetCheckpoints, None) assertTrue(partition.getReplica(replicaToCheck).isDefined, s"Expected replica $replicaToCheck to be defined") val future = executorService.submit((() => { @@ -165,7 +165,7 @@ class PartitionLockTest extends Logging { secondReplicaSet } - partition.makeLeader(partitionState(replicas), offsetCheckpoints, None) + partition.makeLeader(partitionRegistration(replicas), isNew = true, offsetCheckpoints, None) i += 1 Thread.sleep(1) // just to avoid tight loop @@ -344,17 +344,20 @@ class PartitionLockTest extends Logging { partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId)) - val replicas = (0 to numReplicaFetchers).map(i => Integer.valueOf(brokerId + i)).toList.asJava + val replicas = (0 to numReplicaFetchers).map(i => brokerId + i).toArray val isr = replicas - replicas.forEach(replicaId => when(metadataCache.getAliveBrokerEpoch(replicaId)).thenReturn(Optional.of(1L))) + replicas.foreach(replicaId => when(metadataCache.getAliveBrokerEpoch(replicaId)).thenReturn(Optional.of(1L))) - assertTrue(partition.makeLeader(new PartitionState() + val partitionRegistration = new PartitionRegistration.Builder() .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(true), offsetCheckpoints, Some(topicId)), "Expected become leader transition to succeed") + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, Some(topicId)), "Expected become leader transition to succeed") partition } diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index e309943427c..8e512ad4d01 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -27,13 +27,11 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.{AlterPartitionResponse, FetchRequest, ListOffsetsRequest, RequestHeader} -import org.apache.kafka.common.{PartitionState => JPartitionState} import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicPartition, Uuid} -import org.apache.kafka.metadata.MetadataCache +import org.apache.kafka.metadata.{LeaderRecoveryState, MetadataCache, PartitionRegistration} import org.apache.kafka.server.config.ReplicationConfigs import org.apache.kafka.server.replica.Replica -import org.apache.kafka.metadata.LeaderRecoveryState import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import org.mockito.ArgumentMatchers @@ -189,7 +187,7 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 10 val logStartOffset = 0L val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch, isLeader = true) - addBrokerEpochToMockMetadataCache(metadataCache, util.List.of[Integer](remoteReplicaId)) + addBrokerEpochToMockMetadataCache(metadataCache, Array(remoteReplicaId)) def epochEndOffset(epoch: Int, endOffset: Long): FetchResponseData.EpochEndOffset = { new FetchResponseData.EpochEndOffset() @@ -310,20 +308,21 @@ class PartitionTest extends AbstractPartitionTest { def testReplicaFetchToFollower(): Unit = { val followerId = brokerId + 1 val leaderId = brokerId + 2 - val replicas = util.List.of[Integer](brokerId, followerId, leaderId) - val isr = util.List.of[Integer](brokerId, followerId, leaderId) + val replicas = Array(brokerId, followerId, leaderId) + val isr = Array(brokerId, followerId, leaderId) val leaderEpoch = 8 val partitionEpoch = 1 - assertTrue(partition.makeFollower(new JPartitionState() + val partitionRegistration = new PartitionRegistration.Builder() .setLeader(leaderId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(partitionEpoch) .setReplicas(replicas) - .setIsNew(true), - offsetCheckpoints, None - )) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeFollower(partitionRegistration, isNew = true, offsetCheckpoints, None)) def assertFetchFromReplicaFails[T <: ApiException]( expectedExceptionClass: Class[T], @@ -351,21 +350,22 @@ class PartitionTest extends AbstractPartitionTest { val validReplica = brokerId + 1 val addingReplica1 = brokerId + 2 val addingReplica2 = brokerId + 3 - val replicas = util.List.of[Integer](leader, validReplica) - val isr = util.List.of[Integer](leader, validReplica) + val replicas = Array(leader, validReplica) + val isr = Array(leader, validReplica) val leaderEpoch = 8 val partitionEpoch = 1 - addBrokerEpochToMockMetadataCache(metadataCache, util.List.of[Integer](leader, addingReplica1, addingReplica2)) + addBrokerEpochToMockMetadataCache(metadataCache, Array(leader, addingReplica1, addingReplica2)) - assertTrue(partition.makeLeader(new JPartitionState() + var partitionRegistration = new PartitionRegistration.Builder() .setLeader(leader) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(partitionEpoch) .setReplicas(replicas) - .setIsNew(true), - offsetCheckpoints, topicId - )) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, topicId)) assertThrows(classOf[UnknownLeaderEpochException], () => { fetchFollower( @@ -388,20 +388,21 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(None, partition.getReplica(addingReplica2).map(_.stateSnapshot.logEndOffset)) // The replicas are added as part of a reassignment - val newReplicas = util.List.of[Integer](leader, validReplica, addingReplica1, addingReplica2) + val newReplicas = Array(leader, validReplica, addingReplica1, addingReplica2) val newPartitionEpoch = partitionEpoch + 1 - val addingReplicas = util.List.of[Integer](addingReplica1, addingReplica2) + val addingReplicas = Array(addingReplica1, addingReplica2) - assertFalse(partition.makeLeader(new JPartitionState() + partitionRegistration = new PartitionRegistration.Builder() .setLeader(leader) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(newPartitionEpoch) .setReplicas(newReplicas) .setAddingReplicas(addingReplicas) - .setIsNew(true), - offsetCheckpoints, None - )) + .setDirectories(DirectoryId.unassignedArray(newReplicas.length)) + .build() + assertFalse(partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, None)) // Now the fetches are allowed assertEquals(0L, fetchFollower( @@ -427,6 +428,7 @@ class PartitionTest extends AbstractPartitionTest { val appendSemaphore = new Semaphore(0) val mockTime = new MockTime() val prevLeaderEpoch = 0 + val replicas = Array(0, 1, 2, brokerId) partition = new Partition( topicPartition, @@ -479,14 +481,17 @@ class PartitionTest extends AbstractPartitionTest { } partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints, None) - var partitionState = new JPartitionState() + var partitionRegistration = new PartitionRegistration.Builder() .setLeader(2) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(prevLeaderEpoch) - .setIsr(util.List.of[Integer](0, 1, 2, brokerId)) + .setIsr(replicas) .setPartitionEpoch(1) - .setReplicas(util.List.of[Integer](0, 1, 2, brokerId)) - .setIsNew(false) - assertTrue(partition.makeFollower(partitionState, offsetCheckpoints, None)) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeLeader(partitionRegistration, isNew = false, offsetCheckpoints, None)) + val appendThread = new Thread { override def run(): Unit = { @@ -504,14 +509,16 @@ class PartitionTest extends AbstractPartitionTest { appendThread.start() TestUtils.waitUntilTrue(() => appendSemaphore.hasQueuedThreads, "follower log append is not called.") - partitionState = new JPartitionState() + partitionRegistration = new PartitionRegistration.Builder() .setLeader(2) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(prevLeaderEpoch + 1) - .setIsr(util.List.of[Integer](0, 1, 2, brokerId)) + .setIsr(replicas) .setPartitionEpoch(2) - .setReplicas(util.List.of[Integer](0, 1, 2, brokerId)) - .setIsNew(false) - assertTrue(partition.makeFollower(partitionState, offsetCheckpoints, None)) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeFollower(partitionRegistration, isNew = false, offsetCheckpoints, None)) appendSemaphore.release() appendThread.join() @@ -661,7 +668,8 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true) - addBrokerEpochToMockMetadataCache(metadataCache, util.List.of[Integer](remoteReplicaId)) + addBrokerEpochToMockMetadataCache(metadataCache, Array(remoteReplicaId)) + def sendFetch(leaderEpoch: Option[Int]): LogReadInfo = { fetchFollower( partition, @@ -799,8 +807,8 @@ class PartitionTest extends AbstractPartitionTest { val leader = brokerId val follower1 = brokerId + 1 val follower2 = brokerId + 2 - val replicas = util.List.of[Integer](leader, follower1, follower2) - val isr = util.List.of[Integer](leader, follower2) + val replicas = Array(leader, follower1, follower2) + val isr = Array(leader, follower2) val leaderEpoch = 8 val batch1 = TestUtils.records(records = List( new SimpleRecord(10, "k1".getBytes, "v1".getBytes), @@ -810,15 +818,17 @@ class PartitionTest extends AbstractPartitionTest { new SimpleRecord(21,"k5".getBytes, "v3".getBytes))) addBrokerEpochToMockMetadataCache(metadataCache, replicas) - val leaderState = new JPartitionState() + val leaderRegistration = new PartitionRegistration.Builder() .setLeader(leader) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(true) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeLeader(leaderRegistration, isNew = true, offsetCheckpoints, topicId), "Expected first makeLeader() to return 'leader changed'") - assertTrue(partition.makeLeader(leaderState, offsetCheckpoints, topicId), "Expected first makeLeader() to return 'leader changed'") assertEquals(leaderEpoch, partition.getLeaderEpoch, "Current leader epoch") assertEquals(Set[Integer](leader, follower2), partition.partitionState.isr, "ISR") @@ -880,26 +890,28 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(Right(None), fetchOffsetsForTimestamp(30, Some(IsolationLevel.READ_UNCOMMITTED))) // Make into a follower - val followerState = new JPartitionState() + val followerRegistration = new PartitionRegistration.Builder() .setLeader(follower2) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch + 1) .setIsr(isr) .setPartitionEpoch(4) .setReplicas(replicas) - .setIsNew(false) - - assertTrue(partition.makeFollower(followerState, offsetCheckpoints, None)) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeFollower(followerRegistration, isNew = false, offsetCheckpoints, None)) // Back to leader, this resets the startLogOffset for this epoch (to 2), we're now in the fault condition - val newLeaderState = new JPartitionState() + val newLeaderRegistration = new PartitionRegistration.Builder() .setLeader(leader) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch + 2) .setIsr(isr) .setPartitionEpoch(5) .setReplicas(replicas) - .setIsNew(false) - - assertTrue(partition.makeLeader(newLeaderState, offsetCheckpoints, None)) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeLeader(newLeaderRegistration, isNew = false, offsetCheckpoints, None)) // Try to get offsets as a client fetchOffsetsForTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(IsolationLevel.READ_UNCOMMITTED)) match { @@ -970,16 +982,19 @@ class PartitionTest extends AbstractPartitionTest { partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) val log = partition.localLogOrException val epoch = 1 + val replicas = Array(0, 1, 2, brokerId) // Start off as follower - val partitionState = new JPartitionState() + val partitionRegistration = new PartitionRegistration.Builder() .setLeader(1) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(epoch) - .setIsr(util.List.of[Integer](0, 1, 2, brokerId)) + .setIsr(replicas) .setPartitionEpoch(1) - .setReplicas(util.List.of[Integer](0, 1, 2, brokerId)) - .setIsNew(false) - partition.makeFollower(partitionState, offsetCheckpoints, None) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + partition.makeFollower(partitionRegistration, isNew = false, offsetCheckpoints, None) val initialLogStartOffset = 5L partition.truncateFullyAndStartAt(initialLogStartOffset, isFuture = false) @@ -1044,18 +1059,22 @@ class PartitionTest extends AbstractPartitionTest { @Test def testListOffsetIsolationLevels(): Unit = { val leaderEpoch = 5 - val replicas = util.List.of[Integer](brokerId, brokerId + 1) + val replicas = Array(brokerId, brokerId + 1) val isr = replicas partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - assertTrue(partition.makeLeader(new JPartitionState() + val partitionRegistration = new PartitionRegistration.Builder() .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, None), "Expected become leader transition to succeed") + assertEquals(leaderEpoch, partition.getLeaderEpoch) val records = createTransactionalRecords(util.List.of( @@ -1129,34 +1148,42 @@ class PartitionTest extends AbstractPartitionTest { @Test def testMakeFollowerWithNoLeaderIdChange(): Unit = { + val replicas = Array(0, 1, 2, brokerId) // Start off as follower - var partitionState = new JPartitionState() + var partitionRegistration = new PartitionRegistration.Builder() .setLeader(1) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(1) - .setIsr(util.List.of[Integer](0, 1, 2, brokerId)) + .setIsr(replicas) .setPartitionEpoch(1) - .setReplicas(util.List.of[Integer](0, 1, 2, brokerId)) - .setIsNew(false) - partition.makeFollower(partitionState, offsetCheckpoints, None) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + partition.makeFollower(partitionRegistration, isNew = false, offsetCheckpoints, None) // Request with same leader and epoch increases by only 1, do become-follower steps - partitionState = new JPartitionState() + partitionRegistration = new PartitionRegistration.Builder() .setLeader(1) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(4) - .setIsr(util.List.of[Integer](0, 1, 2, brokerId)) + .setIsr(replicas) .setPartitionEpoch(1) - .setReplicas(util.List.of[Integer](0, 1, 2, brokerId)) - .setIsNew(false) - assertTrue(partition.makeFollower(partitionState, offsetCheckpoints, None)) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeFollower(partitionRegistration, isNew = false, offsetCheckpoints, None)) // Request with same leader and same epoch, skip become-follower steps - partitionState = new JPartitionState() + partitionRegistration = new PartitionRegistration.Builder() .setLeader(1) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(4) - .setIsr(util.List.of[Integer](0, 1, 2, brokerId)) + .setIsr(replicas) .setPartitionEpoch(1) - .setReplicas(util.List.of[Integer](0, 1, 2, brokerId)) - assertFalse(partition.makeFollower(partitionState, offsetCheckpoints, None)) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertFalse(partition.makeFollower(partitionRegistration, isNew = false, offsetCheckpoints, None)) } @Test @@ -1164,8 +1191,8 @@ class PartitionTest extends AbstractPartitionTest { val leader = brokerId val follower1 = brokerId + 1 val follower2 = brokerId + 2 - val replicas = util.List.of[Integer](leader, follower1, follower2) - val isr = util.List.of[Integer](leader, follower2) + val replicas = Array(leader, follower1, follower2) + val isr = Array(leader, follower2) val leaderEpoch = 8 val batch1 = TestUtils.records(records = List(new SimpleRecord("k1".getBytes, "v1".getBytes), new SimpleRecord("k2".getBytes, "v2".getBytes))) @@ -1176,14 +1203,16 @@ class PartitionTest extends AbstractPartitionTest { new SimpleRecord("k7".getBytes, "v2".getBytes))) addBrokerEpochToMockMetadataCache(metadataCache, replicas) - val leaderState = new JPartitionState() + val leaderRegistration = new PartitionRegistration.Builder() .setLeader(leader) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(true) - assertTrue(partition.makeLeader(leaderState, offsetCheckpoints, topicId), "Expected first makeLeader() to return 'leader changed'") + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeLeader(leaderRegistration, isNew = true, offsetCheckpoints, topicId), "Expected first makeLeader() to return 'leader changed'") assertEquals(leaderEpoch, partition.getLeaderEpoch, "Current leader epoch") assertEquals(Set[Integer](leader, follower2), partition.partitionState.isr, "ISR") @@ -1202,23 +1231,27 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(lastOffsetOfFirstBatch + 1, partition.log.get.highWatermark, "Expected leader's HW") // current leader becomes follower and then leader again (without any new records appended) - val followerState = new JPartitionState() + val followerRegistration = new PartitionRegistration.Builder() .setLeader(follower2) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch + 1) .setIsr(isr) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(false) - partition.makeFollower(followerState, offsetCheckpoints, None) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + partition.makeFollower(followerRegistration, isNew = false, offsetCheckpoints, None) - val newLeaderState = new JPartitionState() + val newLeaderRegistration = new PartitionRegistration.Builder() .setLeader(leader) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch + 2) .setIsr(isr) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(false) - assertTrue(partition.makeLeader(newLeaderState, offsetCheckpoints, topicId), + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeLeader(newLeaderRegistration, isNew = false, offsetCheckpoints, topicId), "Expected makeLeader() to return 'leader changed' after makeFollower()") val currentLeaderEpochStartOffset = partition.localLogOrException.logEndOffset @@ -1285,25 +1318,28 @@ class PartitionTest extends AbstractPartitionTest { val leader = brokerId val follower1 = brokerId + 1 val follower2 = brokerId + 2 - val replicas = util.List.of[Integer](leader, follower1, follower2) - val isr = util.List.of[Integer](leader) + val replicas = Array(leader, follower1, follower2) + val isr = Array(leader) val leaderEpoch = 8 assertFalse(partition.isAtMinIsr) // Make isr set to only have leader to trigger AtMinIsr (default min isr config is 1) - val leaderState = new JPartitionState() + val leaderRegistration = new PartitionRegistration.Builder() .setLeader(leader) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(true) - partition.makeLeader(leaderState, offsetCheckpoints, None) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + partition.makeLeader(leaderRegistration, isNew = true, offsetCheckpoints, None) assertTrue(partition.isAtMinIsr) } @Test def testIsUnderMinIsr(): Unit = { + val replicas = Array(brokerId, brokerId + 1) configRepository.setTopicConfig(topicPartition.topic, TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") partition = new Partition(topicPartition, replicaLagTimeMaxMs = ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_DEFAULT, @@ -1316,27 +1352,29 @@ class PartitionTest extends AbstractPartitionTest { logManager, alterPartitionManager) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None) - partition.makeLeader( - new JPartitionState() - .setLeader(brokerId) - .setLeaderEpoch(0) - .setIsr(util.List.of[Integer](brokerId, brokerId + 1)) - .setReplicas(util.List.of[Integer](brokerId, brokerId + 1)) - .setPartitionEpoch(1) - .setIsNew(true), - offsetCheckpoints, - topicId = None) + + var leaderRegistration = new PartitionRegistration.Builder() + .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(0) + .setIsr(replicas) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .setPartitionEpoch(1) + .build() + partition.makeLeader(leaderRegistration, isNew = true, offsetCheckpoints, None) assertFalse(partition.isUnderMinIsr) - val LeaderState = new JPartitionState() + leaderRegistration = new PartitionRegistration.Builder() .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(1) - .setIsr(util.List.of[Integer](brokerId)) + .setIsr(Array(brokerId)) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) .setPartitionEpoch(2) - .setReplicas(util.List.of[Integer](brokerId, brokerId + 1)) - .setIsNew(false) - - partition.makeLeader(LeaderState, offsetCheckpoints, None) + .build() + partition.makeLeader(leaderRegistration, isNew = false, offsetCheckpoints, None) assertTrue(partition.isUnderMinIsr) } @@ -1347,22 +1385,23 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val replicas = Array(brokerId, remoteBrokerId) val isr = replicas addBrokerEpochToMockMetadataCache(metadataCache, replicas) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) val initializeTimeMs = time.milliseconds() - assertTrue(partition.makeLeader( - new JPartitionState() - .setLeader(brokerId) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setPartitionEpoch(1) - .setReplicas(replicas) - .setIsNew(true), - offsetCheckpoints, None), "Expected become leader transition to succeed") + val partitionRegistration = new PartitionRegistration.Builder() + .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(leaderEpoch) + .setIsr(isr) + .setPartitionEpoch(1) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, None), "Expected become leader transition to succeed") assertReplicaState(partition, remoteBrokerId, lastCaughtUpTimeMs = initializeTimeMs, @@ -1409,22 +1448,22 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = util.List.of[Integer](brokerId, remoteBrokerId) + val replicas = Array(brokerId, remoteBrokerId) addBrokerEpochToMockMetadataCache(metadataCache, replicas) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) val initializeTimeMs = time.milliseconds() - assertTrue(partition.makeLeader( - new JPartitionState() - .setLeader(brokerId) - .setLeaderEpoch(leaderEpoch) - .setIsr(util.List.of[Integer](brokerId)) - .setPartitionEpoch(1) - .setReplicas(replicas) - .setIsNew(true), - offsetCheckpoints, None), "Expected become leader transition to succeed") - + val partitionRegistration = new PartitionRegistration.Builder() + .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(leaderEpoch) + .setIsr(Array(brokerId)) + .setPartitionEpoch(1) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, None), "Expected become leader transition to succeed") doAnswer(_ => { // simulate topic is deleted at the moment partition.delete() @@ -1443,20 +1482,21 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = util.List.of[Integer](brokerId, remoteBrokerId) - val isr = util.List.of[Integer](brokerId) + val replicas = Array(brokerId, remoteBrokerId) + val isr = Array(brokerId) addBrokerEpochToMockMetadataCache(metadataCache, replicas) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - assertTrue(partition.makeLeader( - new JPartitionState() - .setLeader(brokerId) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setPartitionEpoch(1) - .setReplicas(replicas) - .setIsNew(true), - offsetCheckpoints, None), "Expected become leader transition to succeed") + val partitionRegistration = new PartitionRegistration.Builder() + .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(leaderEpoch) + .setIsr(isr) + .setPartitionEpoch(1) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, None), "Expected become leader transition to succeed") assertEquals(Set(brokerId), partition.partitionState.isr) assertReplicaState(partition, remoteBrokerId, @@ -1493,20 +1533,21 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = util.List.of[Integer](brokerId, remoteBrokerId) - val isr = util.List.of[Integer](brokerId) + val replicas = Array(brokerId, remoteBrokerId) + val isr = Array(brokerId) addBrokerEpochToMockMetadataCache(metadataCache, replicas) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - assertTrue(partition.makeLeader( - new JPartitionState() - .setLeader(brokerId) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setPartitionEpoch(1) - .setReplicas(replicas) - .setIsNew(true), - offsetCheckpoints, None), "Expected become leader transition to succeed") + val partitionRegistration = new PartitionRegistration.Builder() + .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(leaderEpoch) + .setIsr(isr) + .setPartitionEpoch(1) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, None), "Expected become leader transition to succeed") assertEquals(Set(brokerId), partition.partitionState.isr) assertReplicaState(partition, remoteBrokerId, @@ -1555,20 +1596,21 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = util.List.of[Integer](brokerId, remoteBrokerId) - val isr = util.List.of[Integer](brokerId) + val replicas = Array(brokerId, remoteBrokerId) + val isr = Array(brokerId) addBrokerEpochToMockMetadataCache(metadataCache, replicas) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - assertTrue(partition.makeLeader( - new JPartitionState() - .setLeader(brokerId) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setPartitionEpoch(1) - .setReplicas(replicas) - .setIsNew(true), - offsetCheckpoints, None), "Expected become leader transition to succeed") + val partitionRegistration = new PartitionRegistration.Builder() + .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(leaderEpoch) + .setIsr(isr) + .setPartitionEpoch(1) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, None), "Expected become leader transition to succeed") assertEquals(Set(brokerId), partition.partitionState.isr) assertReplicaState(partition, remoteBrokerId, @@ -1609,8 +1651,8 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = util.List.of[Integer](brokerId, remoteBrokerId) - val shrinkedIsr = util.List.of[Integer](brokerId) + val replicas = Array(brokerId, remoteBrokerId) + val shrinkedIsr = Array(brokerId) addBrokerEpochToMockMetadataCache(metadataCache, replicas) @@ -1628,22 +1670,19 @@ class PartitionTest extends AbstractPartitionTest { ) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - assertTrue( - partition.makeLeader( - new JPartitionState() - .setLeader(brokerId) - .setLeaderEpoch(leaderEpoch) - .setIsr(replicas) - .setPartitionEpoch(1) - .setReplicas(replicas) - .setIsNew(false), - offsetCheckpoints, - None - ), - "Expected become leader transition to succeed" - ) - assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) + var partitionRegistration = new PartitionRegistration.Builder() + .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(leaderEpoch) + .setIsr(replicas) + .setPartitionEpoch(1) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeLeader(partitionRegistration, isNew = false, offsetCheckpoints, None), "Expected become leader transition to succeed") + + assertEquals(replicas.toSet, partition.partitionState.isr) + assertEquals(replicas.toSet, partition.partitionState.maximalIsr) // Fetch to let the follower catch up to the log end offset fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset) @@ -1669,24 +1708,20 @@ class PartitionTest extends AbstractPartitionTest { seedLogData(log, numRecords = 10, leaderEpoch) // Controller shrinks the ISR after - assertFalse( - partition.makeLeader( - new JPartitionState() - .setLeader(brokerId) - .setLeaderEpoch(leaderEpoch) - .setIsr(shrinkedIsr) - .setPartitionEpoch(2) - .setReplicas(replicas) - .setIsNew(false), - offsetCheckpoints, - None - ), - "Expected to stay leader" - ) + partitionRegistration = new PartitionRegistration.Builder() + .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(leaderEpoch) + .setIsr(shrinkedIsr) + .setPartitionEpoch(2) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertFalse(partition.makeLeader(partitionRegistration, isNew = false, offsetCheckpoints, None), "Expected to stay leader") assertTrue(partition.isLeader) - assertEquals(util.Set.copyOf(shrinkedIsr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(shrinkedIsr).asScala, partition.partitionState.maximalIsr) + assertEquals(shrinkedIsr.toSet, partition.partitionState.isr) + assertEquals(shrinkedIsr.toSet, partition.partitionState.maximalIsr) assertEquals(Set.empty, partition.getOutOfSyncReplicas(partition.replicaLagTimeMaxMs)) // In the case of unfenced, the HWM doesn't increase, otherwise the HWM increases because the @@ -1705,8 +1740,8 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = util.List.of[Integer](brokerId, remoteBrokerId) - val isr = util.List.of[Integer](brokerId) + val replicas = Array(brokerId, remoteBrokerId) + val isr = Array(brokerId) addBrokerEpochToMockMetadataCache(metadataCache, replicas) @@ -1730,17 +1765,18 @@ class PartitionTest extends AbstractPartitionTest { ) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - assertTrue(partition.makeLeader( - new JPartitionState() - .setLeader(brokerId) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setPartitionEpoch(1) - .setReplicas(replicas) - .setIsNew(true), - offsetCheckpoints, None), "Expected become leader transition to succeed") - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) + val partitionRegistration = new PartitionRegistration.Builder() + .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(leaderEpoch) + .setIsr(isr) + .setPartitionEpoch(1) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, None), "Expected become leader transition to succeed") + assertEquals(isr.toSet, partition.partitionState.isr) + assertEquals(isr.toSet, partition.partitionState.maximalIsr) markRemoteReplicaEligible(true) @@ -1756,16 +1792,16 @@ class PartitionTest extends AbstractPartitionTest { ) // Expansion is triggered. - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) + assertEquals(isr.toSet, partition.partitionState.isr) + assertEquals(replicas.toSet, partition.partitionState.maximalIsr) assertEquals(1, alterPartitionManager.isrUpdates.size) // Controller rejects the expansion because the broker is fenced or offline. alterPartitionManager.failIsrUpdate(Errors.INELIGIBLE_REPLICA) // The leader reverts back to the previous ISR. - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) + assertEquals(isr.toSet, partition.partitionState.isr) + assertEquals(isr.toSet, partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) @@ -1776,8 +1812,8 @@ class PartitionTest extends AbstractPartitionTest { fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset) // Expansion is not triggered because the follower is fenced. - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) + assertEquals(isr.toSet, partition.partitionState.isr) + assertEquals(isr.toSet, partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) @@ -1788,8 +1824,8 @@ class PartitionTest extends AbstractPartitionTest { fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset) // Expansion is triggered. - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) + assertEquals(isr.toSet, partition.partitionState.isr) + assertEquals(replicas.toSet, partition.partitionState.maximalIsr) assertTrue(partition.partitionState.isInflight) assertEquals(1, alterPartitionManager.isrUpdates.size) @@ -1797,8 +1833,8 @@ class PartitionTest extends AbstractPartitionTest { alterPartitionManager.completeIsrUpdate(newPartitionEpoch = 1) // ISR is committed. - assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) + assertEquals(replicas.toSet, partition.partitionState.isr) + assertEquals(replicas.toSet, partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) } @@ -1811,8 +1847,8 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val remoteBrokerId1 = brokerId + 1 val remoteBrokerId2 = brokerId + 2 - val replicas = util.List.of[Integer](brokerId, remoteBrokerId1, remoteBrokerId2) - val isr = util.List.of[Integer](brokerId, remoteBrokerId2) + val replicas = Array(brokerId, remoteBrokerId1, remoteBrokerId2) + val isr = Array(brokerId, remoteBrokerId2) val metadataCache: KRaftMetadataCache = mock(classOf[KRaftMetadataCache]) @@ -1834,20 +1870,21 @@ class PartitionTest extends AbstractPartitionTest { ) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - assertTrue(partition.makeLeader( - new JPartitionState() - .setLeader(brokerId) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setPartitionEpoch(1) - .setReplicas(replicas) - .setIsNew(true), - offsetCheckpoints, None), "Expected become leader transition to succeed") - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) + val partitionRegistration = new PartitionRegistration.Builder() + .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(leaderEpoch) + .setIsr(isr) + .setPartitionEpoch(1) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, None), "Expected become leader transition to succeed") + assertEquals(isr.toSet, partition.partitionState.isr) + assertEquals(isr.toSet, partition.partitionState.maximalIsr) // Fetch to let the follower catch up to the log end offset, but using a wrong broker epoch. The expansion should fail. - addBrokerEpochToMockMetadataCache(metadataCache, util.List.of[Integer](brokerId, remoteBrokerId2)) + addBrokerEpochToMockMetadataCache(metadataCache, Array(brokerId, remoteBrokerId2)) // Create a race case where the replica epoch get bumped right after the previous fetch succeeded. val wrongReplicaEpoch = defaultBrokerEpoch(remoteBrokerId1) - 1 when(metadataCache.getAliveBrokerEpoch(remoteBrokerId1)).thenReturn(Optional.of(wrongReplicaEpoch), Optional.of(defaultBrokerEpoch(remoteBrokerId1))) @@ -1865,8 +1902,8 @@ class PartitionTest extends AbstractPartitionTest { ) // Expansion is not triggered. - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) + assertEquals(isr.toSet, partition.partitionState.isr) + assertEquals(isr.toSet, partition.partitionState.maximalIsr) assertEquals(0, alterPartitionManager.isrUpdates.size) // Fetch again, this time with correct default broker epoch. @@ -1883,8 +1920,8 @@ class PartitionTest extends AbstractPartitionTest { ) // Expansion is triggered. - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) + assertEquals(isr.toSet, partition.partitionState.isr) + assertEquals(replicas.toSet, partition.partitionState.maximalIsr) assertEquals(1, alterPartitionManager.isrUpdates.size) val isrUpdate = alterPartitionManager.isrUpdates.head isrUpdate.leaderAndIsr.isrWithBrokerEpoch.asScala.foreach { brokerState => @@ -1903,8 +1940,8 @@ class PartitionTest extends AbstractPartitionTest { seedLogData(log, numRecords = 10, leaderEpoch = 4) val leaderEpoch = 5 val remoteBrokerId1 = brokerId + 1 - val replicas = util.List.of[Integer](brokerId, remoteBrokerId1) - val isr = util.List.of[Integer](brokerId, remoteBrokerId1) + val replicas = Array(brokerId, remoteBrokerId1) + val isr = Array(brokerId, remoteBrokerId1) addBrokerEpochToMockMetadataCache(metadataCache, replicas) @@ -1922,17 +1959,18 @@ class PartitionTest extends AbstractPartitionTest { ) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - assertTrue(partition.makeLeader( - new JPartitionState() - .setLeader(brokerId) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setPartitionEpoch(1) - .setReplicas(replicas) - .setIsNew(true), - offsetCheckpoints, None), "Expected become leader transition to succeed") - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) + val partitionRegistration = new PartitionRegistration.Builder() + .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(leaderEpoch) + .setIsr(isr) + .setPartitionEpoch(1) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, None), "Expected become leader transition to succeed") + assertEquals(isr.toSet, partition.partitionState.isr) + assertEquals(isr.toSet, partition.partitionState.maximalIsr) val expectedReplicaEpoch = defaultBrokerEpoch(remoteBrokerId1) fetchFollower(partition, @@ -1966,8 +2004,8 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = util.List.of[Integer](brokerId, remoteBrokerId) - val isr = util.List.of[Integer](brokerId) + val replicas = Array(brokerId, remoteBrokerId) + val isr = Array(brokerId) addBrokerEpochToMockMetadataCache(metadataCache, replicas) val partition = new Partition( @@ -1984,17 +2022,18 @@ class PartitionTest extends AbstractPartitionTest { ) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - assertTrue(partition.makeLeader( - new JPartitionState() - .setLeader(brokerId) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setPartitionEpoch(1) - .setReplicas(replicas) - .setIsNew(true), - offsetCheckpoints, None), "Expected become leader transition to succeed") - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) + val partitionRegistration = new PartitionRegistration.Builder() + .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(leaderEpoch) + .setIsr(isr) + .setPartitionEpoch(1) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, None), "Expected become leader transition to succeed") + assertEquals(isr.toSet, partition.partitionState.isr) + assertEquals(isr.toSet, partition.partitionState.maximalIsr) // Fetch to let the follower catch up to the log end offset and // to check if an expansion is possible. @@ -2008,16 +2047,16 @@ class PartitionTest extends AbstractPartitionTest { ) // Expansion is triggered. - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) + assertEquals(isr.toSet, partition.partitionState.isr) + assertEquals(replicas.toSet, partition.partitionState.maximalIsr) assertEquals(1, alterPartitionManager.isrUpdates.size) // Controller rejects the expansion because the broker is in controlled shutdown. alterPartitionManager.failIsrUpdate(Errors.INELIGIBLE_REPLICA) // The leader reverts back to the previous ISR. - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) + assertEquals(isr.toSet, partition.partitionState.isr) + assertEquals(isr.toSet, partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) @@ -2028,8 +2067,8 @@ class PartitionTest extends AbstractPartitionTest { fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset) // Expansion is not triggered because the follower is fenced. - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) + assertEquals(isr.toSet, partition.partitionState.isr) + assertEquals(isr.toSet, partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) @@ -2040,8 +2079,8 @@ class PartitionTest extends AbstractPartitionTest { fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset) // Expansion is triggered. - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) + assertEquals(isr.toSet, partition.partitionState.isr) + assertEquals(replicas.toSet, partition.partitionState.maximalIsr) assertTrue(partition.partitionState.isInflight) assertEquals(1, alterPartitionManager.isrUpdates.size) @@ -2049,8 +2088,8 @@ class PartitionTest extends AbstractPartitionTest { alterPartitionManager.completeIsrUpdate(newPartitionEpoch= 1) // ISR is committed. - assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) + assertEquals(replicas.toSet, partition.partitionState.isr) + assertEquals(replicas.toSet, partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) } @@ -2062,8 +2101,8 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = util.List.of[Integer](brokerId, remoteBrokerId) - val isr = util.List.of[Integer](brokerId, remoteBrokerId) + val replicas = Array(brokerId, remoteBrokerId) + val isr = Array(brokerId, remoteBrokerId) val topicId = Uuid.randomUuid() assertTrue(makeLeader( @@ -2114,8 +2153,8 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val remoteBrokerId1 = brokerId + 1 val remoteBrokerId2 = brokerId + 2 - val replicas = util.List.of[Integer](brokerId, remoteBrokerId1, remoteBrokerId2) - val isr = util.List.of[Integer](brokerId, remoteBrokerId1, remoteBrokerId2) + val replicas = Array(brokerId, remoteBrokerId1, remoteBrokerId2) + val isr = Array(brokerId, remoteBrokerId1, remoteBrokerId2) val initializeTimeMs = time.milliseconds() val metadataCache = mock(classOf[KRaftMetadataCache]) @@ -2135,15 +2174,16 @@ class PartitionTest extends AbstractPartitionTest { ) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - assertTrue(partition.makeLeader( - new JPartitionState() - .setLeader(brokerId) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setPartitionEpoch(1) - .setReplicas(replicas) - .setIsNew(true), - offsetCheckpoints, None), "Expected become leader transition to succeed") + val partitionRegistration = new PartitionRegistration.Builder() + .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(leaderEpoch) + .setIsr(isr) + .setPartitionEpoch(1) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, None), "Expected become leader transition to succeed") assertEquals(0L, partition.localLogOrException.highWatermark) fetchFollower(partition, replicaId = remoteBrokerId1, fetchOffset = log.logEndOffset) @@ -2195,8 +2235,8 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val remoteBrokerId1 = brokerId + 1 val remoteBrokerId2 = brokerId + 2 - val replicas = util.List.of[Integer](brokerId, remoteBrokerId1, remoteBrokerId2) - val isr = util.List.of[Integer](brokerId, remoteBrokerId1) + val replicas = Array(brokerId, remoteBrokerId1, remoteBrokerId2) + val isr = Array(brokerId, remoteBrokerId1) addBrokerEpochToMockMetadataCache(metadataCache, replicas) @@ -2214,15 +2254,16 @@ class PartitionTest extends AbstractPartitionTest { ) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - assertTrue(partition.makeLeader( - new JPartitionState() - .setLeader(brokerId) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setPartitionEpoch(1) - .setReplicas(replicas) - .setIsNew(true), - offsetCheckpoints, None), "Expected become leader transition to succeed") + val partitionRegistration = new PartitionRegistration.Builder() + .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(leaderEpoch) + .setIsr(isr) + .setPartitionEpoch(1) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, None), "Expected become leader transition to succeed") assertTrue(partition.isUnderMinIsr) assertEquals(0L, partition.localLogOrException.highWatermark) @@ -2249,8 +2290,8 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = util.List.of[Integer](brokerId, remoteBrokerId) - val isr = util.List.of[Integer](brokerId, remoteBrokerId) + val replicas = Array(brokerId, remoteBrokerId) + val isr = Array(brokerId, remoteBrokerId) val initializeTimeMs = time.milliseconds() assertTrue(makeLeader( @@ -2304,8 +2345,8 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = util.List.of[Integer](brokerId, remoteBrokerId) - val isr = util.List.of[Integer](brokerId, remoteBrokerId) + val replicas = Array(brokerId, remoteBrokerId) + val isr = Array(brokerId, remoteBrokerId) val initializeTimeMs = time.milliseconds() addBrokerEpochToMockMetadataCache(metadataCache, replicas) @@ -2362,8 +2403,8 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = util.List.of[Integer](brokerId, remoteBrokerId) - val isr = util.List.of[Integer](brokerId, remoteBrokerId) + val replicas = Array(brokerId, remoteBrokerId) + val isr = Array(brokerId, remoteBrokerId) val initializeTimeMs = time.milliseconds() addBrokerEpochToMockMetadataCache(metadataCache, replicas) @@ -2408,8 +2449,8 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = util.List.of[Integer](brokerId, remoteBrokerId) - val isr = util.List.of[Integer](brokerId, remoteBrokerId) + val replicas = Array(brokerId, remoteBrokerId) + val isr = Array(brokerId, remoteBrokerId) val initializeTimeMs = time.milliseconds() assertTrue(makeLeader( @@ -2493,8 +2534,8 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 - val replicas = util.List.of[Integer](brokerId, remoteBrokerId) - val isr = util.List.of[Integer](brokerId) + val replicas = Array(brokerId, remoteBrokerId) + val isr = Array(brokerId) addBrokerEpochToMockMetadataCache(metadataCache, replicas) assertTrue(makeLeader( @@ -2585,8 +2626,8 @@ class PartitionTest extends AbstractPartitionTest { val follower1 = brokerId + 1 val follower2 = brokerId + 2 val follower3 = brokerId + 3 - val replicas = util.List.of[Integer](brokerId, follower1, follower2, follower3) - val isr = util.List.of[Integer](brokerId, follower1, follower2) + val replicas = Array(brokerId, follower1, follower2, follower3) + val isr = Array(brokerId, follower1, follower2) val partitionEpoch = 1 addBrokerEpochToMockMetadataCache(metadataCache, replicas) @@ -2640,8 +2681,8 @@ class PartitionTest extends AbstractPartitionTest { val follower1 = brokerId + 1 val follower2 = brokerId + 2 val follower3 = brokerId + 3 - val replicas = util.List.of[Integer](brokerId, follower1, follower2, follower3) - val isr = util.List.of[Integer](brokerId, follower1, follower2) + val replicas = Array(brokerId, follower1, follower2, follower3) + val isr = Array(brokerId, follower1, follower2) addBrokerEpochToMockMetadataCache(metadataCache, replicas) doNothing().when(delayedOperations).checkAndCompleteAll() @@ -2678,15 +2719,17 @@ class PartitionTest extends AbstractPartitionTest { when(offsetCheckpoints.fetch(logDir1.getAbsolutePath, topicPartition)) .thenReturn(Optional.of(long2Long(4L))) - val replicas = util.List.of[Integer](brokerId, brokerId + 1) - val leaderState = new JPartitionState() + val replicas = Array(brokerId, brokerId + 1) + val leaderRegistration = new PartitionRegistration.Builder() .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(6) .setIsr(replicas) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(false) - partition.makeLeader(leaderState, offsetCheckpoints, None) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + partition.makeLeader(leaderRegistration, isNew = false, offsetCheckpoints, None) assertEquals(4, partition.localLogOrException.highWatermark) } @@ -2694,15 +2737,17 @@ class PartitionTest extends AbstractPartitionTest { def testTopicIdAndPartitionMetadataFileForLeader(): Unit = { val leaderEpoch = 5 val topicId = Uuid.randomUuid() - val replicas = util.List.of[Integer](brokerId, brokerId + 1) - val leaderState = new JPartitionState() + val replicas = Array(brokerId, brokerId + 1) + val leaderRegistration = new PartitionRegistration.Builder() .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch) .setIsr(replicas) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(false) - partition.makeLeader(leaderState, offsetCheckpoints, Some(topicId)) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + partition.makeLeader(leaderRegistration, isNew = false, offsetCheckpoints, Some(topicId)) checkTopicId(topicId, partition) @@ -2725,10 +2770,10 @@ class PartitionTest extends AbstractPartitionTest { // Calling makeLeader with a new topic ID should not overwrite the old topic ID. We should get an InconsistentTopicIdException. // This scenario should not occur, since the topic ID check will fail. - assertThrows(classOf[InconsistentTopicIdException], () => partition2.makeLeader(leaderState, offsetCheckpoints, Some(Uuid.randomUuid()))) + assertThrows(classOf[InconsistentTopicIdException], () => partition2.makeLeader(leaderRegistration, isNew = false, offsetCheckpoints, Some(Uuid.randomUuid()))) // Calling makeLeader with no topic ID should not overwrite the old topic ID. We should get the original log. - partition2.makeLeader(leaderState, offsetCheckpoints, None) + partition2.makeLeader(leaderRegistration, isNew = false, offsetCheckpoints, None) checkTopicId(topicId, partition2) } @@ -2736,15 +2781,17 @@ class PartitionTest extends AbstractPartitionTest { def testTopicIdAndPartitionMetadataFileForFollower(): Unit = { val leaderEpoch = 5 val topicId = Uuid.randomUuid() - val replicas = util.List.of[Integer](brokerId, brokerId + 1) - val leaderState = new JPartitionState() + val replicas = Array(brokerId, brokerId + 1) + val leaderRegistration = new PartitionRegistration.Builder() .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch) .setIsr(replicas) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(false) - partition.makeFollower(leaderState, offsetCheckpoints, Some(topicId)) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + partition.makeLeader(leaderRegistration, isNew = false, offsetCheckpoints, Some(topicId)) checkTopicId(topicId, partition) @@ -2767,10 +2814,10 @@ class PartitionTest extends AbstractPartitionTest { // Calling makeFollower with a new topic ID should not overwrite the old topic ID. We should get an InconsistentTopicIdException. // This scenario should not occur, since the topic ID check will fail. - assertThrows(classOf[InconsistentTopicIdException], () => partition2.makeFollower(leaderState, offsetCheckpoints, Some(Uuid.randomUuid()))) + assertThrows(classOf[InconsistentTopicIdException], () => partition2.makeFollower(leaderRegistration, isNew = false, offsetCheckpoints, Some(Uuid.randomUuid()))) // Calling makeFollower with no topic ID should not overwrite the old topic ID. We should get the original log. - partition2.makeFollower(leaderState, offsetCheckpoints, None) + partition2.makeFollower(leaderRegistration, isNew = false, offsetCheckpoints, None) checkTopicId(topicId, partition2) } @@ -2809,21 +2856,20 @@ class PartitionTest extends AbstractPartitionTest { @Test def testUnderReplicatedPartitionsCorrectSemantics(): Unit = { - val replicas = util.List.of[Integer](brokerId, brokerId + 1, brokerId + 2) - val isr = util.List.of[Integer](brokerId, brokerId + 1) - - var leaderState = new JPartitionState() + val replicas = Array(brokerId, brokerId + 1, brokerId + 2) + val isr = Array(brokerId, brokerId + 1) + val leaderRegistrationBuilder = new PartitionRegistration.Builder() .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(6) .setIsr(isr) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(false) - partition.makeLeader(leaderState, offsetCheckpoints, None) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + partition.makeLeader(leaderRegistrationBuilder.build(), isNew = false, offsetCheckpoints, None) assertTrue(partition.isUnderReplicated) - leaderState = leaderState.setIsr(replicas) - partition.makeLeader(leaderState, offsetCheckpoints, None) + partition.makeLeader(leaderRegistrationBuilder.setIsr(replicas).build(), isNew = false, offsetCheckpoints, None) assertFalse(partition.isUnderReplicated) } @@ -3009,20 +3055,20 @@ class PartitionTest extends AbstractPartitionTest { def testDoNotResetReplicaStateIfLeaderEpochIsNotBumped(): Unit = { val leaderId = brokerId val followerId = brokerId + 1 - val replicas = util.List.of[Integer](leaderId, followerId) + val replicas = Array(leaderId, followerId) val leaderEpoch = 8 val topicId = Uuid.randomUuid() addBrokerEpochToMockMetadataCache(metadataCache, replicas) - val initialLeaderState = new JPartitionState() + val LeaderRegistrationBuilder = new PartitionRegistration.Builder() .setLeader(leaderId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch) - .setIsr(util.List.of[Integer](leaderId)) + .setIsr(Array(leaderId)) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(true) - - assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, Some(topicId))) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + assertTrue(partition.makeLeader(LeaderRegistrationBuilder.build(), isNew = true, offsetCheckpoints, Some(topicId))) assertEquals(1, partition.getPartitionEpoch) assertEquals(leaderEpoch, partition.getLeaderEpoch) assertEquals(Set(leaderId), partition.partitionState.isr) @@ -3046,15 +3092,7 @@ class PartitionTest extends AbstractPartitionTest { // makeLeader is called again with the same leader epoch but with // a newer partition epoch. This can happen in KRaft when a partition // is reassigned. The leader epoch is not bumped when we add replicas. - val updatedLeaderState = new JPartitionState() - .setLeader(leaderId) - .setLeaderEpoch(leaderEpoch) - .setIsr(util.List.of[Integer](leaderId)) - .setPartitionEpoch(2) - .setReplicas(replicas) - .setIsNew(false) - - assertFalse(partition.makeLeader(updatedLeaderState, offsetCheckpoints, Some(topicId))) + assertFalse(partition.makeLeader(LeaderRegistrationBuilder.setPartitionEpoch(2).build(), isNew = false, offsetCheckpoints, Some(topicId))) assertEquals(2, partition.getPartitionEpoch) assertEquals(leaderEpoch, partition.getLeaderEpoch) assertEquals(Set(leaderId), partition.partitionState.isr) @@ -3071,19 +3109,19 @@ class PartitionTest extends AbstractPartitionTest { def testDoNotUpdateEpochStartOffsetIfLeaderEpochIsNotBumped(): Unit = { val leaderId = brokerId val followerId = brokerId + 1 - val replicas = util.List.of[Integer](leaderId, followerId) + val replicas = Array(leaderId, followerId) val leaderEpoch = 8 val topicId = Uuid.randomUuid() - val initialLeaderState = new JPartitionState() + val LeaderRegistrationBuilder = new PartitionRegistration.Builder() .setLeader(leaderId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch) - .setIsr(util.List.of[Integer](leaderId)) + .setIsr(Array(leaderId)) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(true) - - assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, Some(topicId))) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + assertTrue(partition.makeLeader(LeaderRegistrationBuilder.build(), isNew = true, offsetCheckpoints, Some(topicId))) assertEquals(1, partition.getPartitionEpoch) assertEquals(leaderEpoch, partition.getLeaderEpoch) assertEquals(Set(leaderId), partition.partitionState.isr) @@ -3100,15 +3138,7 @@ class PartitionTest extends AbstractPartitionTest { // makeLeader is called again with the same leader epoch but with // a newer partition epoch. - val updatedLeaderState = new JPartitionState() - .setLeader(leaderId) - .setLeaderEpoch(leaderEpoch) - .setIsr(util.List.of[Integer](leaderId)) - .setPartitionEpoch(2) - .setReplicas(replicas) - .setIsNew(false) - - assertFalse(partition.makeLeader(updatedLeaderState, offsetCheckpoints, Some(topicId))) + assertFalse(partition.makeLeader(LeaderRegistrationBuilder.setPartitionEpoch(2).build(), isNew = false, offsetCheckpoints, Some(topicId))) assertEquals(2, partition.getPartitionEpoch) assertEquals(leaderEpoch, partition.getLeaderEpoch) assertEquals(Set(leaderId), partition.partitionState.isr) @@ -3119,33 +3149,25 @@ class PartitionTest extends AbstractPartitionTest { @Test def testIgnoreLeaderPartitionStateChangeWithOlderPartitionEpoch(): Unit = { val leaderId = brokerId - val replicas = util.List.of[Integer](leaderId) + val replicas = Array(leaderId) val leaderEpoch = 8 val topicId = Uuid.randomUuid() - val initialLeaderState = new JPartitionState() + val LeaderRegistrationBuilder = new PartitionRegistration.Builder() .setLeader(leaderId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch) - .setIsr(util.List.of[Integer](leaderId)) + .setIsr(Array(leaderId)) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(true) - - assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, Some(topicId))) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + assertTrue(partition.makeLeader(LeaderRegistrationBuilder.build(), isNew = true, offsetCheckpoints, Some(topicId))) assertEquals(1, partition.getPartitionEpoch) assertEquals(leaderEpoch, partition.getLeaderEpoch) // makeLeader is called again with the same leader epoch but with // a older partition epoch. - val updatedLeaderState = new JPartitionState() - .setLeader(leaderId) - .setLeaderEpoch(leaderEpoch) - .setIsr(util.List.of[Integer](leaderId)) - .setPartitionEpoch(0) - .setReplicas(replicas) - .setIsNew(false) - - assertFalse(partition.makeLeader(updatedLeaderState, offsetCheckpoints, Some(topicId))) + assertFalse(partition.makeLeader(LeaderRegistrationBuilder.setPartitionEpoch(0).build(), isNew = false, offsetCheckpoints, Some(topicId))) assertEquals(1, partition.getPartitionEpoch) assertEquals(leaderEpoch, partition.getLeaderEpoch) } @@ -3154,33 +3176,25 @@ class PartitionTest extends AbstractPartitionTest { def testIgnoreFollowerPartitionStateChangeWithOlderPartitionEpoch(): Unit = { val leaderId = brokerId val followerId = brokerId + 1 - val replicas = util.List.of[Integer](leaderId, followerId) + val replicas = Array(leaderId, followerId) val leaderEpoch = 8 val topicId = Uuid.randomUuid() - val initialFollowerState = new JPartitionState() + val LeaderRegistrationBuilder = new PartitionRegistration.Builder() .setLeader(followerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch) - .setIsr(util.List.of[Integer](leaderId, followerId)) + .setIsr(Array(leaderId)) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(true) - - assertTrue(partition.makeFollower(initialFollowerState, offsetCheckpoints, Some(topicId))) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + assertTrue(partition.makeLeader(LeaderRegistrationBuilder.build(), isNew = true, offsetCheckpoints, Some(topicId))) assertEquals(1, partition.getPartitionEpoch) assertEquals(leaderEpoch, partition.getLeaderEpoch) // makeLeader is called again with the same leader epoch but with // a older partition epoch. - val updatedFollowerState = new JPartitionState() - .setLeader(followerId) - .setLeaderEpoch(leaderEpoch) - .setIsr(util.List.of[Integer](leaderId, followerId)) - .setPartitionEpoch(1) - .setReplicas(replicas) - .setIsNew(true) - - assertFalse(partition.makeFollower(updatedFollowerState, offsetCheckpoints, Some(topicId))) + assertFalse(partition.makeLeader(LeaderRegistrationBuilder.setIsr(Array(leaderId, followerId)).build(), isNew = true, offsetCheckpoints, Some(topicId))) assertEquals(1, partition.getPartitionEpoch) assertEquals(leaderEpoch, partition.getLeaderEpoch) } @@ -3190,62 +3204,58 @@ class PartitionTest extends AbstractPartitionTest { val localReplica = brokerId val remoteReplica1 = brokerId + 1 val remoteReplica2 = brokerId + 2 - val replicas = util.List.of[Integer](localReplica, remoteReplica1, remoteReplica2) + val replicas = Array(localReplica, remoteReplica1, remoteReplica2) val topicId = Uuid.randomUuid() // The local replica is the leader. - val initialLeaderState = new JPartitionState() + val leaderRegistrationBuilder = new PartitionRegistration.Builder() .setLeader(localReplica) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(1) .setIsr(replicas) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(true) - - assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, Some(topicId))) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + assertTrue(partition.makeLeader(leaderRegistrationBuilder.build(), isNew = true, offsetCheckpoints, Some(topicId))) assertEquals(1, partition.getPartitionEpoch) assertEquals(1, partition.getLeaderEpoch) assertEquals(Some(localReplica), partition.leaderReplicaIdOpt) - assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.isr) + assertEquals(replicas.toSet, partition.partitionState.isr) assertEquals(Seq(remoteReplica1, remoteReplica2), partition.remoteReplicas.map(_.brokerId).toSeq) - assertEquals(replicas.asScala, partition.assignmentState.replicas) + assertEquals(replicas.toSeq, partition.assignmentState.replicas) // The local replica becomes a follower. - val updatedLeaderState = new JPartitionState() + val updatedLeaderRegistration = leaderRegistrationBuilder .setLeader(remoteReplica1) .setLeaderEpoch(2) - .setIsr(replicas) .setPartitionEpoch(2) - .setReplicas(replicas) - .setIsNew(false) - - assertTrue(partition.makeFollower(updatedLeaderState, offsetCheckpoints, Some(topicId))) + .build() + assertTrue(partition.makeFollower(updatedLeaderRegistration, isNew = false, offsetCheckpoints, Some(topicId))) assertEquals(2, partition.getPartitionEpoch) assertEquals(2, partition.getLeaderEpoch) assertEquals(Some(remoteReplica1), partition.leaderReplicaIdOpt) assertEquals(Set.empty, partition.partitionState.isr) assertEquals(Seq.empty, partition.remoteReplicas.map(_.brokerId).toSeq) - assertEquals(replicas.asScala, partition.assignmentState.replicas) + assertEquals(replicas.toSeq, partition.assignmentState.replicas) } @Test def testAddAndRemoveListeners(): Unit = { partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = topicId) - val replicas = util.List.of[Integer](brokerId, brokerId + 1) + val replicas = Array(brokerId, brokerId + 1) val isr = replicas addBrokerEpochToMockMetadataCache(metadataCache, replicas) - partition.makeLeader( - new JPartitionState() - .setLeader(brokerId) - .setLeaderEpoch(0) - .setIsr(isr) - .setReplicas(replicas) - .setPartitionEpoch(1) - .setIsNew(true), - offsetCheckpoints, - topicId = None) - + val partitionRegistration = new PartitionRegistration.Builder() + .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(0) + .setIsr(isr) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .setPartitionEpoch(1) + .build() + partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, None) val listener1 = new MockPartitionListener() val listener2 = new MockPartitionListener() @@ -3302,18 +3312,19 @@ class PartitionTest extends AbstractPartitionTest { @Test def testAddListenerFailsWhenPartitionIsDeleted(): Unit = { + val replicas = Array(brokerId, brokerId + 1) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = topicId) - partition.makeLeader( - new JPartitionState() - .setLeader(brokerId) - .setLeaderEpoch(0) - .setIsr(util.List.of[Integer](brokerId, brokerId + 1)) - .setReplicas(util.List.of[Integer](brokerId, brokerId + 1)) - .setPartitionEpoch(1) - .setIsNew(true), - offsetCheckpoints, - topicId = None) + val partitionRegistration = new PartitionRegistration.Builder() + .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(0) + .setIsr(replicas) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .setPartitionEpoch(1) + .build() + partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, None) partition.delete() @@ -3324,19 +3335,19 @@ class PartitionTest extends AbstractPartitionTest { def testPartitionListenerWhenLogOffsetsChanged(): Unit = { partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = topicId) - val replicas = util.List.of[Integer](brokerId, brokerId + 1) - val isr = util.List.of[Integer](brokerId, brokerId + 1) + val replicas = Array(brokerId, brokerId + 1) + val isr = Array(brokerId, brokerId + 1) addBrokerEpochToMockMetadataCache(metadataCache, replicas) - partition.makeLeader( - new JPartitionState() - .setLeader(brokerId) - .setLeaderEpoch(0) - .setIsr(isr) - .setReplicas(replicas) - .setPartitionEpoch(1) - .setIsNew(true), - offsetCheckpoints, - topicId = None) + val partitionRegistration = new PartitionRegistration.Builder() + .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(0) + .setIsr(isr) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .setPartitionEpoch(1) + .build() + partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, None) val listener = new MockPartitionListener() assertTrue(partition.maybeAddListener(listener)) @@ -3366,18 +3377,19 @@ class PartitionTest extends AbstractPartitionTest { @Test def testPartitionListenerWhenPartitionFailed(): Unit = { + val replicas = Array(brokerId, brokerId + 1) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = topicId) - partition.makeLeader( - new JPartitionState() - .setLeader(brokerId) - .setLeaderEpoch(0) - .setIsr(util.List.of[Integer](brokerId, brokerId + 1)) - .setReplicas(util.List.of[Integer](brokerId, brokerId + 1)) - .setPartitionEpoch(1) - .setIsNew(true), - offsetCheckpoints, - topicId = None) + val partitionRegistration = new PartitionRegistration.Builder() + .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(0) + .setIsr(replicas) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .setPartitionEpoch(1) + .build() + partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, None) val listener = new MockPartitionListener() assertTrue(partition.maybeAddListener(listener)) @@ -3389,18 +3401,19 @@ class PartitionTest extends AbstractPartitionTest { @Test def testPartitionListenerWhenPartitionIsDeleted(): Unit = { + val replicas = Array(brokerId, brokerId + 1) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = topicId) - partition.makeLeader( - new JPartitionState() - .setLeader(brokerId) - .setLeaderEpoch(0) - .setIsr(util.List.of[Integer](brokerId, brokerId + 1)) - .setReplicas(util.List.of[Integer](brokerId, brokerId + 1)) - .setPartitionEpoch(1) - .setIsNew(true), - offsetCheckpoints, - topicId = None) + val partitionRegistration = new PartitionRegistration.Builder() + .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(0) + .setIsr(replicas) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .setPartitionEpoch(1) + .build() + partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, None) val listener = new MockPartitionListener() assertTrue(partition.maybeAddListener(listener)) @@ -3416,20 +3429,20 @@ class PartitionTest extends AbstractPartitionTest { partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints, topicId = topicId) assertTrue(partition.log.isDefined) - val replicas = util.List.of[Integer](brokerId, brokerId + 1) + val replicas = Array(brokerId, brokerId + 1) val isr = replicas val epoch = 0 addBrokerEpochToMockMetadataCache(metadataCache, replicas) - partition.makeLeader( - new JPartitionState() - .setLeader(brokerId) - .setLeaderEpoch(epoch) - .setIsr(isr) - .setReplicas(replicas) - .setPartitionEpoch(1) - .setIsNew(true), - offsetCheckpoints, - topicId = None) + val partitionRegistration = new PartitionRegistration.Builder() + .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(epoch) + .setIsr(isr) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .setPartitionEpoch(1) + .build() + partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, None) val listener = new MockPartitionListener() assertTrue(partition.maybeAddListener(listener)) @@ -3484,19 +3497,22 @@ class PartitionTest extends AbstractPartitionTest { @Test def testMaybeStartTransactionVerification(): Unit = { val leaderEpoch = 5 - val replicas = util.List.of[Integer](brokerId, brokerId + 1) + val replicas = Array(brokerId, brokerId + 1) val isr = replicas val producerId = 22L partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - assertTrue(partition.makeLeader(new JPartitionState() + val partitionRegistration = new PartitionRegistration.Builder() .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + assertTrue(partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, None), "Expected become leader transition to succeed") assertEquals(leaderEpoch, partition.getLeaderEpoch) val idempotentRecords = createIdempotentRecords(util.List.of( @@ -3540,8 +3556,8 @@ class PartitionTest extends AbstractPartitionTest { private def makeLeader( topicId: Option[Uuid], leaderEpoch: Int, - isr: util.List[Integer], - replicas: util.List[Integer], + isr: Array[Int], + replicas: Array[Int], partitionEpoch: Int, isNew: Boolean, partition: Partition = partition @@ -3552,23 +3568,22 @@ class PartitionTest extends AbstractPartitionTest { offsetCheckpoints, topicId ) - val newLeader = partition.makeLeader( - new JPartitionState() - .setLeader(brokerId) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setPartitionEpoch(partitionEpoch) - .setReplicas(replicas) - .setIsNew(isNew), - offsetCheckpoints, - topicId - ) + val partitionRegistration = new PartitionRegistration.Builder() + .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(leaderEpoch) + .setIsr(isr) + .setPartitionEpoch(partitionEpoch) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + val newLeader = partition.makeLeader(partitionRegistration, isNew = isNew, offsetCheckpoints, topicId) assertTrue(partition.isLeader) assertFalse(partition.partitionState.isInflight) assertEquals(topicId, partition.topicId) assertEquals(leaderEpoch, partition.getLeaderEpoch) - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) + assertEquals(isr.toSet, partition.partitionState.isr) + assertEquals(isr.toSet, partition.partitionState.maximalIsr) assertEquals(partitionEpoch, partition.getPartitionEpoch) newLeader } @@ -3707,8 +3722,8 @@ class PartitionTest extends AbstractPartitionTest { ) } - private def addBrokerEpochToMockMetadataCache(metadataCache: MetadataCache, brokers: util.List[Integer]): Unit = { - brokers.forEach { broker => + private def addBrokerEpochToMockMetadataCache(metadataCache: MetadataCache, brokers: Array[Int]): Unit = { + brokers.foreach { broker => when(metadataCache.getAliveBrokerEpoch(broker)).thenReturn(Optional.of(defaultBrokerEpoch(broker))) } } @@ -3734,22 +3749,24 @@ class PartitionTest extends AbstractPartitionTest { spyLogManager, alterPartitionManager) val leaderEpoch = 1 - val replicas = util.List.of[Integer](brokerId, brokerId + 1) + val replicas = Array(brokerId, brokerId + 1) val isr = replicas - val leaderAndIsrPartitionState = new JPartitionState() + val partitionRegistration = new PartitionRegistration.Builder() .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(isNew) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() val topicId = Uuid.randomUuid() val targetDirectory = DirectoryId.random() when(spyLogManager.hasOfflineLogDirs()).thenReturn(true) when(spyLogManager.onlineLogDirId(targetDirectory)).thenReturn(true) // When - val res = partition.makeLeader(leaderAndIsrPartitionState, offsetCheckpoints, Some(topicId), Some(targetDirectory)) + val res = partition.makeLeader(partitionRegistration, isNew = isNew, offsetCheckpoints, Some(topicId), Some(targetDirectory)) // Then assertTrue(res) @@ -3777,22 +3794,24 @@ class PartitionTest extends AbstractPartitionTest { spyLogManager, alterPartitionManager) val leaderEpoch = 1 - val replicas = util.List.of[Integer](brokerId, brokerId + 1) + val replicas = Array(brokerId, brokerId + 1) val isr = replicas - val leaderAndIsrPartitionState = new JPartitionState() + val partitionRegistration = new PartitionRegistration.Builder() .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(isNew) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() val topicId = Uuid.randomUuid() val targetDirectory = DirectoryId.random() when(spyLogManager.hasOfflineLogDirs()).thenReturn(true) when(spyLogManager.onlineLogDirId(targetDirectory)).thenReturn(true) // When - val res = partition.makeFollower(leaderAndIsrPartitionState, offsetCheckpoints, Some(topicId), Some(targetDirectory)) + val res = partition.makeFollower(partitionRegistration, isNew = isNew, offsetCheckpoints, Some(topicId), Some(targetDirectory)) // Then assertTrue(res) @@ -3820,22 +3839,24 @@ class PartitionTest extends AbstractPartitionTest { spyLogManager, alterPartitionManager) val leaderEpoch = 1 - val replicas = util.List.of[Integer](brokerId, brokerId + 1) + val replicas = Array(brokerId, brokerId + 1) val isr = replicas - val leaderAndIsrPartitionState = new JPartitionState() + val partitionRegistration = new PartitionRegistration.Builder() .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(isNew) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() val topicId = Uuid.randomUuid() val targetDirectory = DirectoryId.random() when(spyLogManager.hasOfflineLogDirs()).thenReturn(false) when(spyLogManager.onlineLogDirId(targetDirectory)).thenReturn(false) // When - val res = partition.makeLeader(leaderAndIsrPartitionState, offsetCheckpoints, Some(topicId), Some(targetDirectory)) + val res = partition.makeLeader(partitionRegistration, isNew = isNew, offsetCheckpoints, Some(topicId), Some(targetDirectory)) // Then assertTrue(res) @@ -3863,22 +3884,24 @@ class PartitionTest extends AbstractPartitionTest { spyLogManager, alterPartitionManager) val leaderEpoch = 1 - val replicas = util.List.of[Integer](brokerId, brokerId + 1) + val replicas = Array(brokerId, brokerId + 1) val isr = replicas - val leaderAndIsrPartitionState = new JPartitionState() + val partitionRegistration = new PartitionRegistration.Builder() .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(isNew) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() val topicId = Uuid.randomUuid() val targetDirectory = DirectoryId.random() when(spyLogManager.hasOfflineLogDirs()).thenReturn(false) when(spyLogManager.onlineLogDirId(targetDirectory)).thenReturn(false) // When - val res = partition.makeFollower(leaderAndIsrPartitionState, offsetCheckpoints, Some(topicId), Some(targetDirectory)) + val res = partition.makeFollower(partitionRegistration, isNew = isNew, offsetCheckpoints, Some(topicId), Some(targetDirectory)) // Then assertTrue(res) @@ -3906,22 +3929,24 @@ class PartitionTest extends AbstractPartitionTest { spyLogManager, alterPartitionManager) val leaderEpoch = 1 - val replicas = util.List.of[Integer](brokerId, brokerId + 1) + val replicas = Array(brokerId, brokerId + 1) val isr = replicas - val leaderAndIsrPartitionState = new JPartitionState() + val partitionRegistration = new PartitionRegistration.Builder() .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(isNew) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() val topicId = Uuid.randomUuid() val targetDirectory = DirectoryId.UNASSIGNED when(spyLogManager.hasOfflineLogDirs()).thenReturn(true) when(spyLogManager.onlineLogDirId(targetDirectory)).thenReturn(false) // When - val res = partition.makeLeader(leaderAndIsrPartitionState, offsetCheckpoints, Some(topicId), Some(targetDirectory)) + val res = partition.makeLeader(partitionRegistration, isNew, offsetCheckpoints, Some(topicId), Some(targetDirectory)) // Then assertTrue(res) @@ -3950,22 +3975,24 @@ class PartitionTest extends AbstractPartitionTest { spyLogManager, alterPartitionManager) val leaderEpoch = 1 - val replicas = util.List.of[Integer](brokerId, brokerId + 1) + val replicas = Array(brokerId, brokerId + 1) val isr = replicas - val leaderAndIsrPartitionState = new JPartitionState() + val partitionRegistration = new PartitionRegistration.Builder() .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(isNew) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() val topicId = Uuid.randomUuid() val targetDirectory = DirectoryId.UNASSIGNED when(spyLogManager.hasOfflineLogDirs()).thenReturn(true) when(spyLogManager.onlineLogDirId(targetDirectory)).thenReturn(false) // When - val res = partition.makeFollower(leaderAndIsrPartitionState, offsetCheckpoints, Some(topicId), Some(targetDirectory)) + val res = partition.makeFollower(partitionRegistration, isNew, offsetCheckpoints, Some(topicId), Some(targetDirectory)) // Then assertTrue(res) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index c5393117bdc..2483a1f85c0 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -43,7 +43,6 @@ import org.apache.kafka.common.metrics.Monitorable import org.apache.kafka.common.metrics.PluginMetrics import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.{PartitionState => JPartitionState} import org.apache.kafka.common.record._ import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView @@ -56,7 +55,7 @@ import org.apache.kafka.common.utils.{LogContext, Time, Utils} import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, TransactionLogConfig} import org.apache.kafka.image._ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER -import org.apache.kafka.metadata.MetadataCache +import org.apache.kafka.metadata.{LeaderRecoveryState, MetadataCache, PartitionRegistration} import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils} import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, MetadataVersion, OffsetAndEpoch, RequestLocal, StopPartition} import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs} @@ -77,7 +76,7 @@ import org.apache.kafka.server.util.timer.MockTimer import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler} import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, FetchDataInfo, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegments, ProducerStateManager, ProducerStateManagerConfig, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, FetchDataInfo, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetsListener, LogOffsetSnapshot, LogSegments, ProducerStateManager, ProducerStateManagerConfig, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeEach, Test} @@ -98,7 +97,7 @@ import java.util.concurrent.{Callable, CompletableFuture, ConcurrentHashMap, Cou import java.util.function.{BiConsumer, Consumer} import java.util.stream.IntStream import java.util.{Collections, Optional, OptionalLong, Properties} -import scala.collection.{Map, Seq, mutable} +import scala.collection.{mutable, Map, Seq} import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters.{RichOption, RichOptional} @@ -135,7 +134,7 @@ class ReplicaManagerTest { private val quotaAvailableThrottleTime = 0 // Constants defined for readability - private val zkVersion = 0 + private val partitionEpoch = 0 private val brokerEpoch = 0L // These metrics are static and once we remove them after each test, they won't be created and verified anymore @@ -1253,7 +1252,7 @@ class ReplicaManagerTest { val leaderBrokerId = 1 val leaderEpoch = 1 val leaderEpochIncrement = 2 - val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId) + val aliveBrokerIds = Array(followerBrokerId, leaderBrokerId) val countDownLatch = new CountDownLatch(1) // Prepare the mocked components for the test @@ -1267,8 +1266,8 @@ class ReplicaManagerTest { val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - partition.makeLeader( - leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds), + partition.makeLeader(partitionRegistration(leaderBrokerId, leaderEpoch, aliveBrokerIds, partitionEpoch, aliveBrokerIds), + isNew = false, offsetCheckpoints, None) @@ -2654,20 +2653,21 @@ class ReplicaManagerTest { (replicaManager, mockLogMgr) } - private def leaderAndIsrPartitionState(topicPartition: TopicPartition, - leaderEpoch: Int, - leaderBrokerId: Int, - aliveBrokerIds: Seq[Integer], - isNew: Boolean = false): JPartitionState = { - new JPartitionState() - .setTopicName(topic) - .setPartitionIndex(topicPartition.partition) - .setLeader(leaderBrokerId) + private def partitionRegistration(leader: Int, + leaderEpoch: Int, + isr: Array[Int], + partitionEpoch: Int, + replicas: Array[Int]): PartitionRegistration = { + new PartitionRegistration.Builder() + .setLeader(leader) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(leaderEpoch) - .setIsr(aliveBrokerIds.asJava) - .setPartitionEpoch(zkVersion) - .setReplicas(aliveBrokerIds.asJava) - .setIsNew(isNew) + .setIsr(isr) + .setPartitionEpoch(partitionEpoch) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + } private class CallbackResult[T] { @@ -4157,6 +4157,7 @@ class ReplicaManagerTest { val localId = 1 val topicPartition0 = new TopicIdPartition(FOO_UUID, 0, "foo") val directoryEventHandler = mock(classOf[DirectoryEventHandler]) + val aliveBrokerIds = Array(1, 2) val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, setupLogDirMetaProperties = true, directoryEventHandler = directoryEventHandler) try { @@ -4164,7 +4165,8 @@ class ReplicaManagerTest { assertEquals(directoryIds.size, 2) val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, true, partitions = List(0), directoryIds = directoryIds) val (partition: Partition, isNewWhenCreatedForFirstTime: Boolean) = replicaManager.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, FOO_UUID).get - partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(), 1, localId, Seq(1, 2)), + partition.makeLeader(partitionRegistration(localId, 1, aliveBrokerIds, partitionEpoch, aliveBrokerIds), + isNew = false, new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None) @@ -5716,13 +5718,15 @@ class ReplicaManagerTest { val localId = 1 val topicPartition0 = new TopicIdPartition(FOO_UUID, 0, Topic.GROUP_METADATA_TOPIC_NAME) val directoryEventHandler = mock(classOf[DirectoryEventHandler]) + val aliveBrokerIds = Array(1, 2) val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, setupLogDirMetaProperties = true, directoryEventHandler = directoryEventHandler) val directoryIds = rm.logManager.directoryIdsSet.toList assertEquals(directoryIds.size, 2) val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, isStartIdLeader = true, directoryIds = directoryIds) val (partition: Partition, _) = rm.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, FOO_UUID).get - partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(), 1, localId, Seq(1, 2)), + partition.makeLeader(partitionRegistration(localId, 1, aliveBrokerIds, partitionEpoch, aliveBrokerIds), + isNew = false, new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None) @@ -5743,15 +5747,17 @@ class ReplicaManagerTest { val localId = 1 val topicPartition0 = new TopicIdPartition(FOO_UUID, 0, Topic.GROUP_METADATA_TOPIC_NAME) val directoryEventHandler = mock(classOf[DirectoryEventHandler]) + val aliveBrokerIds = Array(1, 2) val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, setupLogDirMetaProperties = true, directoryEventHandler = directoryEventHandler) val directoryIds = rm.logManager.directoryIdsSet.toList assertEquals(directoryIds.size, 2) val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, isStartIdLeader = true, directoryIds = directoryIds) val (partition: Partition, _) = rm.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, FOO_UUID).get - partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(), 1, localId, Seq(1, 2)), - new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), - None) + partition.makeLeader(partitionRegistration(localId, 1, aliveBrokerIds, partitionEpoch, aliveBrokerIds), + isNew = false, + new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), + None) def callback(responseStatus: Map[TopicPartition, DeleteRecordsResponseData.DeleteRecordsPartitionResult]): Unit = { assert(responseStatus.values.head.errorCode == Errors.NONE.code) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index 0a69a431c0e..c7f5f8da0b8 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -35,7 +35,7 @@ import kafka.server.metadata.KRaftMetadataCache; import kafka.utils.TestUtils; import org.apache.kafka.clients.FetchSessionHandler; -import org.apache.kafka.common.PartitionState; +import org.apache.kafka.common.DirectoryId; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; @@ -52,7 +52,9 @@ import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.LeaderRecoveryState; import org.apache.kafka.metadata.MockConfigRepository; +import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.server.network.BrokerEndPoint; @@ -163,19 +165,21 @@ public class ReplicaFetcherThreadBenchmark { for (int i = 0; i < partitionCount; i++) { TopicPartition tp = new TopicPartition("topic", i); - List replicas = List.of(0, 1, 2); - PartitionState partitionState = new PartitionState() - .setLeader(0) - .setLeaderEpoch(0) - .setIsr(replicas) - .setPartitionEpoch(1) - .setReplicas(replicas) - .setIsNew(true); + int[] replicas = {0, 1, 2}; + PartitionRegistration partitionRegistration = new PartitionRegistration.Builder() + .setLeader(0) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(0) + .setIsr(replicas) + .setPartitionEpoch(1) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build(); OffsetCheckpoints checkpoints = (logDir, topicPartition) -> Optional.of(0L); Partition partition = replicaManager.createPartition(tp); - partition.makeFollower(partitionState, checkpoints, topicId, Option.empty()); + partition.makeFollower(partitionRegistration, true, checkpoints, topicId, Option.empty()); pool.put(tp, partition); initialFetchStates.put(tp, new InitialFetchState(topicId, new BrokerEndPoint(3, "host", 3000), 0, 0)); BaseRecords fetched = new BaseRecords() { diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java index 8dfc70ca7c5..9273f2139da 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java @@ -24,7 +24,7 @@ import kafka.log.LogManager; import kafka.server.AlterPartitionManager; import kafka.server.builders.LogManagerBuilder; -import org.apache.kafka.common.PartitionState; +import org.apache.kafka.common.DirectoryId; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; @@ -32,8 +32,10 @@ import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.LeaderRecoveryState; import org.apache.kafka.metadata.MetadataCache; import org.apache.kafka.metadata.MockConfigRepository; +import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.server.util.KafkaScheduler; import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints; import org.apache.kafka.storage.internals.log.CleanerConfig; @@ -78,7 +80,7 @@ import scala.jdk.javaapi.OptionConverters; public class PartitionMakeFollowerBenchmark { private final File logDir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString()); private final KafkaScheduler scheduler = new KafkaScheduler(1, true, "scheduler"); - private final List replicas = List.of(0, 1, 2); + private final int[] replicas = {0, 1, 2}; private final OffsetCheckpoints offsetCheckpoints = Mockito.mock(OffsetCheckpoints.class); private final DelayedOperations delayedOperations = Mockito.mock(DelayedOperations.class); private final ExecutorService executorService = Executors.newSingleThreadExecutor(); @@ -148,13 +150,15 @@ public class PartitionMakeFollowerBenchmark { @Benchmark public boolean testMakeFollower() { - PartitionState partitionState = new PartitionState() + PartitionRegistration partitionRegistration = new PartitionRegistration.Builder() .setLeader(0) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(0) .setIsr(replicas) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(true); - return partition.makeFollower(partitionState, offsetCheckpoints, topicId, Option.empty()); + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build(); + return partition.makeFollower(partitionRegistration, true, offsetCheckpoints, topicId, Option.empty()); } } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java index 665fc5f4596..bfa9119c1df 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java @@ -24,12 +24,14 @@ import kafka.log.LogManager; import kafka.server.AlterPartitionManager; import kafka.server.builders.LogManagerBuilder; -import org.apache.kafka.common.PartitionState; +import org.apache.kafka.common.DirectoryId; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.metadata.LeaderRecoveryState; import org.apache.kafka.metadata.MetadataCache; import org.apache.kafka.metadata.MockConfigRepository; +import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.server.replica.Replica; import org.apache.kafka.server.util.KafkaScheduler; import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints; @@ -54,7 +56,6 @@ import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Warmup; import java.io.File; -import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.Properties; @@ -109,24 +110,24 @@ public class UpdateFollowerFetchStateBenchmark { DelayedOperations delayedOperations = new DelayedOperationsMock(); // one leader, plus two followers - List replicas = new ArrayList<>(); - replicas.add(0); - replicas.add(1); - replicas.add(2); - PartitionState partitionState = new PartitionState() + int[] replicas = {0, 1, 2}; + PartitionRegistration partitionRegistration = new PartitionRegistration.Builder() .setLeader(0) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(0) .setIsr(replicas) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(true); + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build(); AlterPartitionListener alterPartitionListener = Mockito.mock(AlterPartitionListener.class); AlterPartitionManager alterPartitionManager = Mockito.mock(AlterPartitionManager.class); partition = new Partition(topicPartition, 100, 0, () -> -1, Time.SYSTEM, alterPartitionListener, delayedOperations, Mockito.mock(MetadataCache.class), logManager, alterPartitionManager, topicId); - partition.makeLeader(partitionState, offsetCheckpoints, topicId, Option.empty()); + + partition.makeLeader(partitionRegistration, true, offsetCheckpoints, topicId, Option.empty()); replica1 = partition.getReplica(1).get(); replica2 = partition.getReplica(2).get(); } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java index c32ae47c3ec..be73b492115 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java @@ -27,14 +27,16 @@ import kafka.server.builders.ReplicaManagerBuilder; import kafka.server.metadata.KRaftMetadataCache; import kafka.utils.TestUtils; -import org.apache.kafka.common.PartitionState; +import org.apache.kafka.common.DirectoryId; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.metadata.ConfigRepository; +import org.apache.kafka.metadata.LeaderRecoveryState; import org.apache.kafka.metadata.MockConfigRepository; +import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.server.util.KafkaScheduler; import org.apache.kafka.server.util.Scheduler; import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints; @@ -181,28 +183,23 @@ public class PartitionCreationBench { topicPartitions.add(new TopicPartition(topicName, partitionNum)); } - List replicas = new ArrayList<>(); - replicas.add(0); - replicas.add(1); - replicas.add(2); + int[] replicas = {0, 1, 2}; OffsetCheckpoints checkpoints = (logDir, topicPartition) -> Optional.of(0L); for (TopicPartition topicPartition : topicPartitions) { final Partition partition = this.replicaManager.createPartition(topicPartition); - List inSync = new ArrayList<>(); - inSync.add(0); - inSync.add(1); - inSync.add(2); + int[] isr = {0, 1, 2}; - PartitionState partitionState = new PartitionState() + PartitionRegistration partitionRegistration = new PartitionRegistration.Builder() .setLeader(0) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(0) - .setIsr(inSync) + .setIsr(isr) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(true); - - partition.makeFollower(partitionState, checkpoints, topicId, Option.empty()); + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build(); + partition.makeFollower(partitionRegistration, true, checkpoints, topicId, Option.empty()); } } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java index 1d97db0b82e..e02012d19ae 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java @@ -18,8 +18,6 @@ package org.apache.kafka.metadata; import org.apache.kafka.common.DirectoryId; -import org.apache.kafka.common.PartitionState; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.InvalidReplicaDirectoriesException; import org.apache.kafka.common.metadata.PartitionChangeRecord; @@ -410,21 +408,6 @@ public class PartitionRegistration { return new ApiMessageAndVersion(record, options.metadataVersion().partitionRecordVersion()); } - public PartitionState toLeaderAndIsrPartitionState(TopicPartition tp, boolean isNew) { - return new PartitionState(). - setTopicName(tp.topic()). - setPartitionIndex(tp.partition()). - setLeader(leader). - setLeaderEpoch(leaderEpoch). - setIsr(Replicas.toList(isr)). - setPartitionEpoch(partitionEpoch). - setReplicas(Replicas.toList(replicas)). - setAddingReplicas(Replicas.toList(addingReplicas)). - setRemovingReplicas(Replicas.toList(removingReplicas)). - setLeaderRecoveryState(leaderRecoveryState.value()). - setIsNew(isNew); - } - @Override public int hashCode() { return Objects.hash(Arrays.hashCode(replicas), Arrays.hashCode(isr), Arrays.hashCode(removingReplicas), diff --git a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java index 4774507bee9..0fe7025c178 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java @@ -18,8 +18,6 @@ package org.apache.kafka.metadata; import org.apache.kafka.common.DirectoryId; -import org.apache.kafka.common.PartitionState; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.metadata.PartitionChangeRecord; import org.apache.kafka.common.metadata.PartitionRecord; @@ -94,50 +92,6 @@ public class PartitionRegistrationTest { assertEquals(registrationA, registrationB); } - @Test - public void testToLeaderAndIsrPartitionState() { - PartitionRegistration a = new PartitionRegistration.Builder(). - setReplicas(new int[]{1, 2, 3}). - setDirectories(new Uuid[]{ - Uuid.fromString("NSmkU0ieQuy2IHN59Ce0Bw"), - Uuid.fromString("Y8N9gnSKSLKKFCioX2laGA"), - Uuid.fromString("Oi7nvb8KQPyaGEqr4JtCRw") - }). - setIsr(new int[]{1, 2}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(123).setPartitionEpoch(456).build(); - PartitionRegistration b = new PartitionRegistration.Builder(). - setReplicas(new int[]{2, 3, 4}). - setDirectories(new Uuid[]{ - Uuid.fromString("tAn3q03aQAWEYkNajXm3lA"), - Uuid.fromString("zgj8rqatTmWMyWBsRZyiVg"), - Uuid.fromString("bAAlGAz1TN2doZjtWlvhRQ") - }). - setIsr(new int[]{2, 3, 4}).setLeader(2).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(234).setPartitionEpoch(567).build(); - assertEquals(new PartitionState(). - setTopicName("foo"). - setPartitionIndex(1). - setLeader(1). - setLeaderEpoch(123). - setIsr(List.of(1, 2)). - setPartitionEpoch(456). - setReplicas(List.of(1, 2, 3)). - setAddingReplicas(List.of()). - setRemovingReplicas(List.of()). - setIsNew(true).toString(), - a.toLeaderAndIsrPartitionState(new TopicPartition("foo", 1), true).toString()); - assertEquals(new PartitionState(). - setTopicName("bar"). - setPartitionIndex(0). - setLeader(2). - setLeaderEpoch(234). - setIsr(List.of(2, 3, 4)). - setPartitionEpoch(567). - setReplicas(List.of(2, 3, 4)). - setAddingReplicas(List.of()). - setRemovingReplicas(List.of()). - setIsNew(false).toString(), - b.toLeaderAndIsrPartitionState(new TopicPartition("bar", 0), false).toString()); - } - @Test public void testMergePartitionChangeRecordWithReassignmentData() { Uuid dir1 = Uuid.fromString("FbRuu7CeQtq5YFreEzg16g"); diff --git a/server-common/src/main/java/org/apache/kafka/common/PartitionState.java b/server-common/src/main/java/org/apache/kafka/common/PartitionState.java deleted file mode 100644 index 7c104f05a66..00000000000 --- a/server-common/src/main/java/org/apache/kafka/common/PartitionState.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.kafka.common; - -import org.apache.kafka.common.protocol.MessageUtil; - -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; - -public class PartitionState { - String topicName = ""; - int partitionIndex = 0; - int leader = 0; - int leaderEpoch = 0; - List isr = new ArrayList<>(0); - int partitionEpoch = 0; - List replicas = new ArrayList<>(0); - List addingReplicas = new ArrayList<>(0); - List removingReplicas = new ArrayList<>(0); - boolean isNew = false; - byte leaderRecoveryState = (byte) 0; - - @Override - public boolean equals(Object o) { - if (o == null || getClass() != o.getClass()) return false; - PartitionState that = (PartitionState) o; - return partitionIndex == that.partitionIndex && - leader == that.leader && - leaderEpoch == that.leaderEpoch && - partitionEpoch == that.partitionEpoch && - isNew == that.isNew && - leaderRecoveryState == that.leaderRecoveryState && - Objects.equals(topicName, that.topicName) && - Objects.equals(isr, that.isr) && - Objects.equals(replicas, that.replicas) && - Objects.equals(addingReplicas, that.addingReplicas) && - Objects.equals(removingReplicas, that.removingReplicas); - } - - @Override - public int hashCode() { - return Objects.hash(topicName, partitionIndex, leader, leaderEpoch, isr, partitionEpoch, - replicas, addingReplicas, removingReplicas, isNew, leaderRecoveryState); - } - - @Override - public String toString() { - return "PartitionState(" - + "topicName='" + topicName + "'" - + ", partitionIndex=" + partitionIndex - + ", leader=" + leader - + ", leaderEpoch=" + leaderEpoch - + ", isr=" + MessageUtil.deepToString(isr.iterator()) - + ", partitionEpoch=" + partitionEpoch - + ", replicas=" + MessageUtil.deepToString(replicas.iterator()) - + ", addingReplicas=" + MessageUtil.deepToString(addingReplicas.iterator()) - + ", removingReplicas=" + MessageUtil.deepToString(removingReplicas.iterator()) - + ", isNew=" + (isNew ? "true" : "false") - + ", leaderRecoveryState=" + leaderRecoveryState - + ")"; - } - - public String topicName() { - return this.topicName; - } - - public int partitionIndex() { - return this.partitionIndex; - } - - public int leader() { - return this.leader; - } - - public int leaderEpoch() { - return this.leaderEpoch; - } - - public List isr() { - return this.isr; - } - - public int partitionEpoch() { - return this.partitionEpoch; - } - - public List replicas() { - return this.replicas; - } - - public List addingReplicas() { - return this.addingReplicas; - } - - public List removingReplicas() { - return this.removingReplicas; - } - - public boolean isNew() { - return this.isNew; - } - - public byte leaderRecoveryState() { - return this.leaderRecoveryState; - } - - public PartitionState setTopicName(String v) { - this.topicName = v; - return this; - } - - public PartitionState setPartitionIndex(int v) { - this.partitionIndex = v; - return this; - } - - public PartitionState setLeader(int v) { - this.leader = v; - return this; - } - - public PartitionState setLeaderEpoch(int v) { - this.leaderEpoch = v; - return this; - } - - public PartitionState setIsr(List v) { - this.isr = v; - return this; - } - - public PartitionState setPartitionEpoch(int v) { - this.partitionEpoch = v; - return this; - } - - public PartitionState setReplicas(List v) { - this.replicas = v; - return this; - } - - public PartitionState setAddingReplicas(List v) { - this.addingReplicas = v; - return this; - } - - public PartitionState setRemovingReplicas(List v) { - this.removingReplicas = v; - return this; - } - - public PartitionState setIsNew(boolean v) { - this.isNew = v; - return this; - } - - public PartitionState setLeaderRecoveryState(byte v) { - this.leaderRecoveryState = v; - return this; - } -} \ No newline at end of file