MINOR: Rename `ZkVersion` to `PartitionEpoch` (#12071)

This patch does some initial cleanups in the context of KAFKA-13790. Mainly, it renames `ZkVersion` field to `PartitionEpoch` in the `LeaderAndIsrRequest`, the `LeaderAndIsr` and the `Partition`.

Reviewers: Jason Gustafson <jason@confluent.io>, dengziming <dengziming1993@gmail.com>
This commit is contained in:
David Jacot 2022-04-22 20:38:17 +02:00 committed by GitHub
parent d480c4aa6e
commit 7c8c65fc54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 214 additions and 206 deletions

View File

@ -78,8 +78,8 @@
"about": "The leader epoch." }, "about": "The leader epoch." },
{ "name": "Isr", "type": "[]int32", "versions": "0+", "entityType": "brokerId", { "name": "Isr", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
"about": "The in-sync replica IDs." }, "about": "The in-sync replica IDs." },
{ "name": "ZkVersion", "type": "int32", "versions": "0+", { "name": "PartitionEpoch", "type": "int32", "versions": "0+",
"about": "The ZooKeeper version." }, "about": "The current epoch for the partition. The epoch is a monotonically increasing value which is incremented after every partition change. (Since the LeaderAndIsr request is only used by the legacy controller, this corresponds to the zkVersion)" },
{ "name": "Replicas", "type": "[]int32", "versions": "0+", "entityType": "brokerId", { "name": "Replicas", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
"about": "The replica IDs." }, "about": "The replica IDs." },
{ "name": "AddingReplicas", "type": "[]int32", "versions": "3+", "ignorable": true, "entityType": "brokerId", { "name": "AddingReplicas", "type": "[]int32", "versions": "3+", "ignorable": true, "entityType": "brokerId",

View File

@ -116,7 +116,7 @@ public class LeaderAndIsrRequestTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(10) .setLeaderEpoch(10)
.setIsr(asList(0, 1)) .setIsr(asList(0, 1))
.setZkVersion(10) .setPartitionEpoch(10)
.setReplicas(asList(0, 1, 2)) .setReplicas(asList(0, 1, 2))
.setAddingReplicas(asList(3)) .setAddingReplicas(asList(3))
.setRemovingReplicas(asList(2)), .setRemovingReplicas(asList(2)),
@ -127,7 +127,7 @@ public class LeaderAndIsrRequestTest {
.setLeader(1) .setLeader(1)
.setLeaderEpoch(11) .setLeaderEpoch(11)
.setIsr(asList(1, 2, 3)) .setIsr(asList(1, 2, 3))
.setZkVersion(11) .setPartitionEpoch(11)
.setReplicas(asList(1, 2, 3)) .setReplicas(asList(1, 2, 3))
.setAddingReplicas(emptyList()) .setAddingReplicas(emptyList())
.setRemovingReplicas(emptyList()), .setRemovingReplicas(emptyList()),
@ -138,7 +138,7 @@ public class LeaderAndIsrRequestTest {
.setLeader(2) .setLeader(2)
.setLeaderEpoch(11) .setLeaderEpoch(11)
.setIsr(asList(2, 3, 4)) .setIsr(asList(2, 3, 4))
.setZkVersion(11) .setPartitionEpoch(11)
.setReplicas(asList(2, 3, 4)) .setReplicas(asList(2, 3, 4))
.setAddingReplicas(emptyList()) .setAddingReplicas(emptyList())
.setRemovingReplicas(emptyList()) .setRemovingReplicas(emptyList())

View File

@ -48,7 +48,7 @@ public class LeaderAndIsrResponseTest {
.setLeader(1) .setLeader(1)
.setLeaderEpoch(10) .setLeaderEpoch(10)
.setIsr(Collections.singletonList(10)) .setIsr(Collections.singletonList(10))
.setZkVersion(20) .setPartitionEpoch(20)
.setReplicas(Collections.singletonList(10)) .setReplicas(Collections.singletonList(10))
.setIsNew(false)); .setIsNew(false));
partitionStates.add(new LeaderAndIsrPartitionState() partitionStates.add(new LeaderAndIsrPartitionState()
@ -58,7 +58,7 @@ public class LeaderAndIsrResponseTest {
.setLeader(1) .setLeader(1)
.setLeaderEpoch(10) .setLeaderEpoch(10)
.setIsr(Collections.singletonList(10)) .setIsr(Collections.singletonList(10))
.setZkVersion(20) .setPartitionEpoch(20)
.setReplicas(Collections.singletonList(10)) .setReplicas(Collections.singletonList(10))
.setIsNew(false)); .setIsNew(false));
Map<String, Uuid> topicIds = Collections.singletonMap("foo", Uuid.randomUuid()); Map<String, Uuid> topicIds = Collections.singletonMap("foo", Uuid.randomUuid());

View File

@ -2207,7 +2207,7 @@ public class RequestResponseTest {
.setLeader(2) .setLeader(2)
.setLeaderEpoch(1) .setLeaderEpoch(1)
.setIsr(isr) .setIsr(isr)
.setZkVersion(2) .setPartitionEpoch(2)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(false)); .setIsNew(false));
partitionStates.add(new LeaderAndIsrPartitionState() partitionStates.add(new LeaderAndIsrPartitionState()
@ -2217,7 +2217,7 @@ public class RequestResponseTest {
.setLeader(1) .setLeader(1)
.setLeaderEpoch(1) .setLeaderEpoch(1)
.setIsr(isr) .setIsr(isr)
.setZkVersion(2) .setPartitionEpoch(2)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(false)); .setIsNew(false));
partitionStates.add(new LeaderAndIsrPartitionState() partitionStates.add(new LeaderAndIsrPartitionState()
@ -2227,7 +2227,7 @@ public class RequestResponseTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(1) .setLeaderEpoch(1)
.setIsr(isr) .setIsr(isr)
.setZkVersion(2) .setPartitionEpoch(2)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(false)); .setIsNew(false));

View File

@ -20,35 +20,40 @@ package kafka.api
import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.metadata.LeaderRecoveryState
object LeaderAndIsr { object LeaderAndIsr {
val initialLeaderEpoch: Int = 0 val InitialLeaderEpoch: Int = 0
val initialZKVersion: Int = 0 val InitialPartitionEpoch: Int = 0
val NoLeader: Int = -1 val NoLeader: Int = -1
val NoEpoch: Int = -1 val NoEpoch: Int = -1
val LeaderDuringDelete: Int = -2 val LeaderDuringDelete: Int = -2
val EpochDuringDelete: Int = -2 val EpochDuringDelete: Int = -2
def apply(leader: Int, isr: List[Int]): LeaderAndIsr = { def apply(leader: Int, isr: List[Int]): LeaderAndIsr = {
LeaderAndIsr(leader, initialLeaderEpoch, isr, LeaderRecoveryState.RECOVERED, initialZKVersion) LeaderAndIsr(leader, InitialLeaderEpoch, isr, LeaderRecoveryState.RECOVERED, InitialPartitionEpoch)
} }
def duringDelete(isr: List[Int]): LeaderAndIsr = LeaderAndIsr(LeaderDuringDelete, isr) def duringDelete(isr: List[Int]): LeaderAndIsr = LeaderAndIsr(LeaderDuringDelete, isr)
} }
case class LeaderAndIsr(leader: Int, case class LeaderAndIsr(
leader: Int,
leaderEpoch: Int, leaderEpoch: Int,
isr: List[Int], isr: List[Int],
leaderRecoveryState: LeaderRecoveryState, leaderRecoveryState: LeaderRecoveryState,
zkVersion: Int) { // The current epoch for the partition for KRaft controllers. The current ZK version for the
def withZkVersion(zkVersion: Int): LeaderAndIsr = copy(zkVersion = zkVersion) // legacy controllers. The epoch is a monotonically increasing value which is incremented
// after every partition change.
partitionEpoch: Int
) {
def withPartitionEpoch(partitionEpoch: Int): LeaderAndIsr = copy(partitionEpoch = partitionEpoch)
def newLeader(leader: Int): LeaderAndIsr = newLeaderAndIsr(leader, isr) def newLeader(leader: Int): LeaderAndIsr = newLeaderAndIsr(leader, isr)
def newLeaderAndIsr(leader: Int, isr: List[Int]): LeaderAndIsr = { def newLeaderAndIsr(leader: Int, isr: List[Int]): LeaderAndIsr = {
LeaderAndIsr(leader, leaderEpoch + 1, isr, leaderRecoveryState, zkVersion) LeaderAndIsr(leader, leaderEpoch + 1, isr, leaderRecoveryState, partitionEpoch)
} }
def newRecoveringLeaderAndIsr(leader: Int, isr: List[Int]): LeaderAndIsr = { def newRecoveringLeaderAndIsr(leader: Int, isr: List[Int]): LeaderAndIsr = {
LeaderAndIsr(leader, leaderEpoch + 1, isr, LeaderRecoveryState.RECOVERING, zkVersion) LeaderAndIsr(leader, leaderEpoch + 1, isr, LeaderRecoveryState.RECOVERING, partitionEpoch)
} }
def newEpoch: LeaderAndIsr = newLeaderAndIsr(leader, isr) def newEpoch: LeaderAndIsr = newLeaderAndIsr(leader, isr)
@ -57,18 +62,20 @@ case class LeaderAndIsr(leader: Int,
if (leader == LeaderAndIsr.NoLeader) None else Some(leader) if (leader == LeaderAndIsr.NoLeader) None else Some(leader)
} }
def equalsIgnoreZk(other: LeaderAndIsr): Boolean = { def equalsIgnorePartitionEpoch(other: LeaderAndIsr): Boolean = {
if (this == other) { if (this == other) {
true true
} else if (other == null) { } else if (other == null) {
false false
} else { } else {
leader == other.leader && leaderEpoch == other.leaderEpoch && isr.equals(other.isr) && leader == other.leader &&
leaderEpoch == other.leaderEpoch &&
isr.equals(other.isr) &&
leaderRecoveryState == other.leaderRecoveryState leaderRecoveryState == other.leaderRecoveryState
} }
} }
override def toString: String = { override def toString: String = {
s"LeaderAndIsr(leader=$leader, leaderEpoch=$leaderEpoch, isr=$isr, leaderRecoveryState=$leaderRecoveryState, zkVersion=$zkVersion)" s"LeaderAndIsr(leader=$leader, leaderEpoch=$leaderEpoch, isr=$isr, leaderRecoveryState=$leaderRecoveryState, partitionEpoch=$partitionEpoch)"
} }
} }

View File

@ -251,8 +251,9 @@ class Partition(val topicPartition: TopicPartition,
// lock to prevent the follower replica log update while checking if the log dir could be replaced with future log. // lock to prevent the follower replica log update while checking if the log dir could be replaced with future log.
private val futureLogLock = new Object() private val futureLogLock = new Object()
private var zkVersion: Int = LeaderAndIsr.initialZKVersion // The current epoch for the partition for KRaft controllers. The current ZK version for the legacy controllers.
@volatile private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1 @volatile private var partitionEpoch: Int = LeaderAndIsr.InitialPartitionEpoch
@volatile private var leaderEpoch: Int = LeaderAndIsr.InitialLeaderEpoch - 1
// start offset for 'leaderEpoch' above (leader epoch of the current leader for this partition), // start offset for 'leaderEpoch' above (leader epoch of the current leader for this partition),
// defined when this broker is leader for partition // defined when this broker is leader for partition
@volatile private var leaderEpochStartOffsetOpt: Option[Long] = None @volatile private var leaderEpochStartOffsetOpt: Option[Long] = None
@ -535,7 +536,7 @@ class Partition(val topicPartition: TopicPartition,
def getLeaderEpoch: Int = this.leaderEpoch def getLeaderEpoch: Int = this.leaderEpoch
def getZkVersion: Int = this.zkVersion def getPartitionEpoch: Int = this.partitionEpoch
/** /**
* Make the local replica the leader by resetting LogEndOffset for remote replicas (there could be old LogEndOffset * Make the local replica the leader by resetting LogEndOffset for remote replicas (there could be old LogEndOffset
@ -587,7 +588,7 @@ class Partition(val topicPartition: TopicPartition,
//We cache the leader epoch here, persisting it only if it's local (hence having a log dir) //We cache the leader epoch here, persisting it only if it's local (hence having a log dir)
leaderEpoch = partitionState.leaderEpoch leaderEpoch = partitionState.leaderEpoch
leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset) leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
zkVersion = partitionState.zkVersion partitionEpoch = partitionState.partitionEpoch
// In the case of successive leader elections in a short time period, a follower may have // In the case of successive leader elections in a short time period, a follower may have
// entries in its log from a later epoch than any entry in the new leader's log. In order // entries in its log from a later epoch than any entry in the new leader's log. In order
@ -666,7 +667,7 @@ class Partition(val topicPartition: TopicPartition,
leaderEpoch = partitionState.leaderEpoch leaderEpoch = partitionState.leaderEpoch
leaderEpochStartOffsetOpt = None leaderEpochStartOffsetOpt = None
zkVersion = partitionState.zkVersion partitionEpoch = partitionState.partitionEpoch
if (leaderReplicaIdOpt.contains(newLeaderBrokerId) && leaderEpoch == oldLeaderEpoch) { if (leaderReplicaIdOpt.contains(newLeaderBrokerId) && leaderEpoch == oldLeaderEpoch) {
false false
@ -1356,7 +1357,7 @@ class Partition(val topicPartition: TopicPartition,
// Alternatively, if the update fails, no harm is done since the expanded ISR puts // Alternatively, if the update fails, no harm is done since the expanded ISR puts
// a stricter requirement for advancement of the HW. // a stricter requirement for advancement of the HW.
val isrToSend = partitionState.isr + newInSyncReplicaId val isrToSend = partitionState.isr + newInSyncReplicaId
val newLeaderAndIsr = LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList, partitionState.leaderRecoveryState, zkVersion) val newLeaderAndIsr = LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList, partitionState.leaderRecoveryState, partitionEpoch)
val updatedState = PendingExpandIsr(partitionState.isr, newInSyncReplicaId, newLeaderAndIsr) val updatedState = PendingExpandIsr(partitionState.isr, newInSyncReplicaId, newLeaderAndIsr)
partitionState = updatedState partitionState = updatedState
updatedState updatedState
@ -1367,7 +1368,7 @@ class Partition(val topicPartition: TopicPartition,
// erroneously advance the HW if the `AlterPartition` were to fail. Hence the "maximal ISR" // erroneously advance the HW if the `AlterPartition` were to fail. Hence the "maximal ISR"
// for `PendingShrinkIsr` is the the current ISR. // for `PendingShrinkIsr` is the the current ISR.
val isrToSend = partitionState.isr -- outOfSyncReplicaIds val isrToSend = partitionState.isr -- outOfSyncReplicaIds
val newLeaderAndIsr = LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList, partitionState.leaderRecoveryState, zkVersion) val newLeaderAndIsr = LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList, partitionState.leaderRecoveryState, partitionEpoch)
val updatedState = PendingShrinkIsr(partitionState.isr, outOfSyncReplicaIds, newLeaderAndIsr) val updatedState = PendingShrinkIsr(partitionState.isr, outOfSyncReplicaIds, newLeaderAndIsr)
partitionState = updatedState partitionState = updatedState
updatedState updatedState
@ -1463,19 +1464,19 @@ class Partition(val topicPartition: TopicPartition,
debug(s"Ignoring new ISR $leaderAndIsr since we have a stale leader epoch $leaderEpoch.") debug(s"Ignoring new ISR $leaderAndIsr since we have a stale leader epoch $leaderEpoch.")
isrChangeListener.markFailed() isrChangeListener.markFailed()
false false
} else if (leaderAndIsr.zkVersion < zkVersion) { } else if (leaderAndIsr.partitionEpoch < partitionEpoch) {
debug(s"Ignoring new ISR $leaderAndIsr since we have a newer version $zkVersion.") debug(s"Ignoring new ISR $leaderAndIsr since we have a newer version $partitionEpoch.")
isrChangeListener.markFailed() isrChangeListener.markFailed()
false false
} else { } else {
// This is one of two states: // This is one of two states:
// 1) leaderAndIsr.zkVersion > zkVersion: Controller updated to new version with proposedIsrState. // 1) leaderAndIsr.partitionEpoch > partitionEpoch: Controller updated to new version with proposedIsrState.
// 2) leaderAndIsr.zkVersion == zkVersion: No update was performed since proposed and actual state are the same. // 2) leaderAndIsr.partitionEpoch == partitionEpoch: No update was performed since proposed and actual state are the same.
// In both cases, we want to move from Pending to Committed state to ensure new updates are processed. // In both cases, we want to move from Pending to Committed state to ensure new updates are processed.
partitionState = CommittedPartitionState(leaderAndIsr.isr.toSet, leaderAndIsr.leaderRecoveryState) partitionState = CommittedPartitionState(leaderAndIsr.isr.toSet, leaderAndIsr.leaderRecoveryState)
zkVersion = leaderAndIsr.zkVersion partitionEpoch = leaderAndIsr.partitionEpoch
info(s"ISR updated to ${partitionState.isr.mkString(",")} and version updated to $zkVersion") info(s"ISR updated to ${partitionState.isr.mkString(",")} and version updated to $partitionEpoch")
proposedIsrState match { proposedIsrState match {
case PendingExpandIsr(_, _, _) => isrChangeListener.markExpand() case PendingExpandIsr(_, _, _) => isrChangeListener.markExpand()

View File

@ -384,7 +384,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
.setLeader(leaderAndIsr.leader) .setLeader(leaderAndIsr.leader)
.setLeaderEpoch(leaderAndIsr.leaderEpoch) .setLeaderEpoch(leaderAndIsr.leaderEpoch)
.setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava) .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
.setZkVersion(leaderAndIsr.zkVersion) .setPartitionEpoch(leaderAndIsr.partitionEpoch)
.setReplicas(replicaAssignment.replicas.map(Integer.valueOf).asJava) .setReplicas(replicaAssignment.replicas.map(Integer.valueOf).asJava)
.setAddingReplicas(replicaAssignment.addingReplicas.map(Integer.valueOf).asJava) .setAddingReplicas(replicaAssignment.addingReplicas.map(Integer.valueOf).asJava)
.setRemovingReplicas(replicaAssignment.removingReplicas.map(Integer.valueOf).asJava) .setRemovingReplicas(replicaAssignment.removingReplicas.map(Integer.valueOf).asJava)
@ -443,7 +443,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
.setLeader(updatedLeaderAndIsr.leader) .setLeader(updatedLeaderAndIsr.leader)
.setLeaderEpoch(updatedLeaderAndIsr.leaderEpoch) .setLeaderEpoch(updatedLeaderAndIsr.leaderEpoch)
.setIsr(updatedLeaderAndIsr.isr.map(Integer.valueOf).asJava) .setIsr(updatedLeaderAndIsr.isr.map(Integer.valueOf).asJava)
.setZkVersion(updatedLeaderAndIsr.zkVersion) .setZkVersion(updatedLeaderAndIsr.partitionEpoch)
.setReplicas(replicas.map(Integer.valueOf).asJava) .setReplicas(replicas.map(Integer.valueOf).asJava)
.setOfflineReplicas(offlineReplicas.map(Integer.valueOf).asJava) .setOfflineReplicas(offlineReplicas.map(Integer.valueOf).asJava)
updateMetadataRequestPartitionInfoMap.put(partition, partitionStateInfo) updateMetadataRequestPartitionInfoMap.put(partition, partitionStateInfo)

View File

@ -1231,7 +1231,7 @@ class KafkaController(val config: KafkaConfig,
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, epoch) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, epoch)
controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch) controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch)
finalLeaderIsrAndControllerEpoch = Some(leaderIsrAndControllerEpoch) finalLeaderIsrAndControllerEpoch = Some(leaderIsrAndControllerEpoch)
info(s"Updated leader epoch for partition $partition to ${leaderAndIsr.leaderEpoch}, zkVersion=${leaderAndIsr.zkVersion}") info(s"Updated leader epoch for partition $partition to ${leaderAndIsr.leaderEpoch}, zkVersion=${leaderAndIsr.partitionEpoch}")
true true
case Some(Left(e)) => throw e case Some(Left(e)) => throw e
case None => false case None => false
@ -2298,7 +2298,7 @@ class KafkaController(val config: KafkaConfig,
.setLeaderEpoch(leaderAndIsr.leaderEpoch) .setLeaderEpoch(leaderAndIsr.leaderEpoch)
.setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava) .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value) .setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)
.setPartitionEpoch(leaderAndIsr.zkVersion) .setPartitionEpoch(leaderAndIsr.partitionEpoch)
) )
} }
} }
@ -2366,7 +2366,7 @@ class KafkaController(val config: KafkaConfig,
} else if (newLeaderAndIsr.leaderEpoch < currentLeaderAndIsr.leaderEpoch) { } else if (newLeaderAndIsr.leaderEpoch < currentLeaderAndIsr.leaderEpoch) {
partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH) partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
None None
} else if (newLeaderAndIsr.equalsIgnoreZk(currentLeaderAndIsr)) { } else if (newLeaderAndIsr.equalsIgnorePartitionEpoch(currentLeaderAndIsr)) {
// If a partition is already in the desired state, just return it // If a partition is already in the desired state, just return it
partitionResponses(tp) = Right(currentLeaderAndIsr) partitionResponses(tp) = Right(currentLeaderAndIsr)
None None
@ -2388,7 +2388,7 @@ class KafkaController(val config: KafkaConfig,
case (partition: TopicPartition, isrOrError: Either[Throwable, LeaderAndIsr]) => case (partition: TopicPartition, isrOrError: Either[Throwable, LeaderAndIsr]) =>
isrOrError match { isrOrError match {
case Right(updatedIsr) => case Right(updatedIsr) =>
debug(s"ISR for partition $partition updated to [${updatedIsr.isr.mkString(",")}] and zkVersion updated to [${updatedIsr.zkVersion}]") debug(s"ISR for partition $partition updated to [${updatedIsr.isr.mkString(",")}] and zkVersion updated to [${updatedIsr.partitionEpoch}]")
partitionResponses(partition) = Right(updatedIsr) partitionResponses(partition) = Right(updatedIsr)
Some(partition -> updatedIsr) Some(partition -> updatedIsr)
case Left(e) => case Left(e) =>
@ -2681,7 +2681,7 @@ case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpo
leaderAndIsrInfo.append(",ISR:" + leaderAndIsr.isr.mkString(",")) leaderAndIsrInfo.append(",ISR:" + leaderAndIsr.isr.mkString(","))
leaderAndIsrInfo.append(",LeaderRecoveryState:" + leaderAndIsr.leaderRecoveryState) leaderAndIsrInfo.append(",LeaderRecoveryState:" + leaderAndIsr.leaderRecoveryState)
leaderAndIsrInfo.append(",LeaderEpoch:" + leaderAndIsr.leaderEpoch) leaderAndIsrInfo.append(",LeaderEpoch:" + leaderAndIsr.leaderEpoch)
leaderAndIsrInfo.append(",ZkVersion:" + leaderAndIsr.zkVersion) leaderAndIsrInfo.append(",ZkVersion:" + leaderAndIsr.partitionEpoch)
leaderAndIsrInfo.append(",ControllerEpoch:" + controllerEpoch + ")") leaderAndIsrInfo.append(",ControllerEpoch:" + controllerEpoch + ")")
leaderAndIsrInfo.toString() leaderAndIsrInfo.toString()
} }

View File

@ -230,7 +230,7 @@ class DefaultAlterIsrManager(
.setPartitionIndex(item.topicPartition.partition) .setPartitionIndex(item.topicPartition.partition)
.setLeaderEpoch(item.leaderAndIsr.leaderEpoch) .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
.setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava) .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
.setPartitionEpoch(item.leaderAndIsr.zkVersion) .setPartitionEpoch(item.leaderAndIsr.partitionEpoch)
if (ibpVersion >= KAFKA_3_2_IV0) { if (ibpVersion >= KAFKA_3_2_IV0) {
partitionData.setLeaderRecoveryState(item.leaderAndIsr.leaderRecoveryState.value) partitionData.setLeaderRecoveryState(item.leaderAndIsr.leaderRecoveryState.value)

View File

@ -63,7 +63,7 @@ class ZkIsrManager(scheduler: Scheduler, time: Time, zkClient: KafkaZkClient) ex
controllerEpoch: Int controllerEpoch: Int
): CompletableFuture[LeaderAndIsr]= { ): CompletableFuture[LeaderAndIsr]= {
debug(s"Writing new ISR ${leaderAndIsr.isr} to ZooKeeper with version " + debug(s"Writing new ISR ${leaderAndIsr.isr} to ZooKeeper with version " +
s"${leaderAndIsr.zkVersion} for partition $topicPartition") s"${leaderAndIsr.partitionEpoch} for partition $topicPartition")
val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topicPartition, val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topicPartition,
leaderAndIsr, controllerEpoch) leaderAndIsr, controllerEpoch)
@ -78,7 +78,7 @@ class ZkIsrManager(scheduler: Scheduler, time: Time, zkClient: KafkaZkClient) ex
// We rely on Partition#isrState being properly set to the pending ISR at this point since we are synchronously // We rely on Partition#isrState being properly set to the pending ISR at this point since we are synchronously
// applying the callback // applying the callback
future.complete(leaderAndIsr.withZkVersion(newVersion)) future.complete(leaderAndIsr.withPartitionEpoch(newVersion))
} else { } else {
future.completeExceptionally(new InvalidUpdateVersionException( future.completeExceptionally(new InvalidUpdateVersionException(
s"ISR update $leaderAndIsr for partition $topicPartition with controller epoch $controllerEpoch " + s"ISR update $leaderAndIsr for partition $topicPartition with controller epoch $controllerEpoch " +

View File

@ -31,7 +31,7 @@ object ReplicationUtils extends Logging {
val newLeaderData = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(newLeaderAndIsr, controllerEpoch)) val newLeaderData = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(newLeaderAndIsr, controllerEpoch))
// use the epoch of the controller that made the leadership decision, instead of the current controller epoch // use the epoch of the controller that made the leadership decision, instead of the current controller epoch
val updatePersistentPath: (Boolean, Int) = zkClient.conditionalUpdatePath(path, newLeaderData, val updatePersistentPath: (Boolean, Int) = zkClient.conditionalUpdatePath(path, newLeaderData,
newLeaderAndIsr.zkVersion, Some(checkLeaderAndIsrZkData)) newLeaderAndIsr.partitionEpoch, Some(checkLeaderAndIsrZkData))
updatePersistentPath updatePersistentPath
} }

View File

@ -199,7 +199,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
val setDataRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) => val setDataRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
val path = TopicPartitionStateZNode.path(partition) val path = TopicPartitionStateZNode.path(partition)
val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch) val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch)
SetDataRequest(path, data, leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion, Some(partition)) SetDataRequest(path, data, leaderIsrAndControllerEpoch.leaderAndIsr.partitionEpoch, Some(partition))
} }
retryRequestsUntilConnected(setDataRequests.toSeq, expectedControllerEpochZkVersion) retryRequestsUntilConnected(setDataRequests.toSeq, expectedControllerEpochZkVersion)
} }
@ -271,7 +271,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
val partition = setDataResponse.ctx.get.asInstanceOf[TopicPartition] val partition = setDataResponse.ctx.get.asInstanceOf[TopicPartition]
setDataResponse.resultCode match { setDataResponse.resultCode match {
case Code.OK => case Code.OK =>
val updatedLeaderAndIsr = leaderAndIsrs(partition).withZkVersion(setDataResponse.stat.getVersion) val updatedLeaderAndIsr = leaderAndIsrs(partition).withPartitionEpoch(setDataResponse.stat.getVersion)
Some(partition -> Right(updatedLeaderAndIsr)) Some(partition -> Right(updatedLeaderAndIsr))
case Code.BADVERSION => case Code.BADVERSION =>
// Update the buffer for partitions to retry // Update the buffer for partitions to retry

View File

@ -549,7 +549,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
.setLeader(brokerId) .setLeader(brokerId)
.setLeaderEpoch(Int.MaxValue) .setLeaderEpoch(Int.MaxValue)
.setIsr(List(brokerId).asJava) .setIsr(List(brokerId).asJava)
.setZkVersion(2) .setPartitionEpoch(2)
.setReplicas(Seq(brokerId).asJava) .setReplicas(Seq(brokerId).asJava)
.setIsNew(false)).asJava, .setIsNew(false)).asJava,
getTopicIds().asJava, getTopicIds().asJava,
@ -562,7 +562,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
.setTopicName(tp.topic) .setTopicName(tp.topic)
.setPartitionStates(Seq(new StopReplicaPartitionState() .setPartitionStates(Seq(new StopReplicaPartitionState()
.setPartitionIndex(tp.partition) .setPartitionIndex(tp.partition)
.setLeaderEpoch(LeaderAndIsr.initialLeaderEpoch + 2) .setLeaderEpoch(LeaderAndIsr.InitialLeaderEpoch + 2)
.setDeletePartition(true)).asJava) .setDeletePartition(true)).asJava)
).asJava ).asJava
new StopReplicaRequest.Builder(ApiKeys.STOP_REPLICA.latestVersion, brokerId, Int.MaxValue, new StopReplicaRequest.Builder(ApiKeys.STOP_REPLICA.latestVersion, brokerId, Int.MaxValue,

View File

@ -124,7 +124,7 @@ class AbstractPartitionTest {
.setLeader(brokerId) .setLeader(brokerId)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(isr) .setIsr(isr)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed")
assertEquals(leaderEpoch, partition.getLeaderEpoch) assertEquals(leaderEpoch, partition.getLeaderEpoch)
@ -134,7 +134,7 @@ class AbstractPartitionTest {
.setLeader(brokerId + 1) .setLeader(brokerId + 1)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(isr) .setIsr(isr)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true), offsetCheckpoints, None), "Expected become follower transition to succeed") .setIsNew(true), offsetCheckpoints, None), "Expected become follower transition to succeed")
assertEquals(leaderEpoch, partition.getLeaderEpoch) assertEquals(leaderEpoch, partition.getLeaderEpoch)

View File

@ -94,7 +94,7 @@ class AssignmentStateTest extends AbstractPartitionTest {
.setLeader(brokerId) .setLeader(brokerId)
.setLeaderEpoch(6) .setLeaderEpoch(6)
.setIsr(isr.asJava) .setIsr(isr.asJava)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(replicas.asJava) .setReplicas(replicas.asJava)
.setIsNew(false) .setIsNew(false)
if (adding.nonEmpty) if (adding.nonEmpty)

View File

@ -139,7 +139,7 @@ class PartitionLockTest extends Logging {
.setLeader(replicas.get(0)) .setLeader(replicas.get(0))
.setLeaderEpoch(1) .setLeaderEpoch(1)
.setIsr(replicas) .setIsr(replicas)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true) .setIsNew(true)
val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints]) val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
@ -335,7 +335,7 @@ class PartitionLockTest extends Logging {
.setLeader(brokerId) .setLeader(brokerId)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(isr) .setIsr(isr)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed")

View File

@ -261,7 +261,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeader(2) .setLeader(2)
.setLeaderEpoch(1) .setLeaderEpoch(1)
.setIsr(List[Integer](0, 1, 2, brokerId).asJava) .setIsr(List[Integer](0, 1, 2, brokerId).asJava)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(List[Integer](0, 1, 2, brokerId).asJava) .setReplicas(List[Integer](0, 1, 2, brokerId).asJava)
.setIsNew(false) .setIsNew(false)
assertTrue(partition.makeFollower(partitionState, offsetCheckpoints, None)) assertTrue(partition.makeFollower(partitionState, offsetCheckpoints, None))
@ -570,7 +570,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeader(leader) .setLeader(leader)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(isr) .setIsr(isr)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(replicas.map(Int.box).asJava) .setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true) .setIsNew(true)
@ -644,7 +644,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeader(follower2) .setLeader(follower2)
.setLeaderEpoch(leaderEpoch + 1) .setLeaderEpoch(leaderEpoch + 1)
.setIsr(isr) .setIsr(isr)
.setZkVersion(4) .setPartitionEpoch(4)
.setReplicas(replicas.map(Int.box).asJava) .setReplicas(replicas.map(Int.box).asJava)
.setIsNew(false) .setIsNew(false)
@ -656,7 +656,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeader(leader) .setLeader(leader)
.setLeaderEpoch(leaderEpoch + 2) .setLeaderEpoch(leaderEpoch + 2)
.setIsr(isr) .setIsr(isr)
.setZkVersion(5) .setPartitionEpoch(5)
.setReplicas(replicas.map(Int.box).asJava) .setReplicas(replicas.map(Int.box).asJava)
.setIsNew(false) .setIsNew(false)
@ -782,7 +782,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeader(brokerId) .setLeader(brokerId)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(isr) .setIsr(isr)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed")
assertEquals(leaderEpoch, partition.getLeaderEpoch) assertEquals(leaderEpoch, partition.getLeaderEpoch)
@ -851,7 +851,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeader(1) .setLeader(1)
.setLeaderEpoch(1) .setLeaderEpoch(1)
.setIsr(List[Integer](0, 1, 2, brokerId).asJava) .setIsr(List[Integer](0, 1, 2, brokerId).asJava)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(List[Integer](0, 1, 2, brokerId).asJava) .setReplicas(List[Integer](0, 1, 2, brokerId).asJava)
.setIsNew(false) .setIsNew(false)
partition.makeFollower(partitionState, offsetCheckpoints, None) partition.makeFollower(partitionState, offsetCheckpoints, None)
@ -862,7 +862,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeader(1) .setLeader(1)
.setLeaderEpoch(4) .setLeaderEpoch(4)
.setIsr(List[Integer](0, 1, 2, brokerId).asJava) .setIsr(List[Integer](0, 1, 2, brokerId).asJava)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(List[Integer](0, 1, 2, brokerId).asJava) .setReplicas(List[Integer](0, 1, 2, brokerId).asJava)
.setIsNew(false) .setIsNew(false)
assertTrue(partition.makeFollower(partitionState, offsetCheckpoints, None)) assertTrue(partition.makeFollower(partitionState, offsetCheckpoints, None))
@ -873,7 +873,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeader(1) .setLeader(1)
.setLeaderEpoch(4) .setLeaderEpoch(4)
.setIsr(List[Integer](0, 1, 2, brokerId).asJava) .setIsr(List[Integer](0, 1, 2, brokerId).asJava)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(List[Integer](0, 1, 2, brokerId).asJava) .setReplicas(List[Integer](0, 1, 2, brokerId).asJava)
assertFalse(partition.makeFollower(partitionState, offsetCheckpoints, None)) assertFalse(partition.makeFollower(partitionState, offsetCheckpoints, None))
} }
@ -900,7 +900,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeader(leader) .setLeader(leader)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(isr) .setIsr(isr)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true) .setIsNew(true)
assertTrue(partition.makeLeader(leaderState, offsetCheckpoints, None), "Expected first makeLeader() to return 'leader changed'") assertTrue(partition.makeLeader(leaderState, offsetCheckpoints, None), "Expected first makeLeader() to return 'leader changed'")
@ -936,7 +936,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeader(follower2) .setLeader(follower2)
.setLeaderEpoch(leaderEpoch + 1) .setLeaderEpoch(leaderEpoch + 1)
.setIsr(isr) .setIsr(isr)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(false) .setIsNew(false)
partition.makeFollower(followerState, offsetCheckpoints, None) partition.makeFollower(followerState, offsetCheckpoints, None)
@ -946,7 +946,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeader(leader) .setLeader(leader)
.setLeaderEpoch(leaderEpoch + 2) .setLeaderEpoch(leaderEpoch + 2)
.setIsr(isr) .setIsr(isr)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(false) .setIsNew(false)
assertTrue(partition.makeLeader(newLeaderState, offsetCheckpoints, None), assertTrue(partition.makeLeader(newLeaderState, offsetCheckpoints, None),
@ -1015,7 +1015,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeader(leader) .setLeader(leader)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(isr) .setIsr(isr)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true) .setIsNew(true)
partition.makeLeader(leaderState, offsetCheckpoints, None) partition.makeLeader(leaderState, offsetCheckpoints, None)
@ -1042,7 +1042,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeader(brokerId) .setLeader(brokerId)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(isr) .setIsr(isr)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true), .setIsNew(true),
offsetCheckpoints, None), "Expected become leader transition to succeed") offsetCheckpoints, None), "Expected become leader transition to succeed")
@ -1096,7 +1096,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeader(brokerId) .setLeader(brokerId)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(isr) .setIsr(isr)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true), .setIsNew(true),
offsetCheckpoints, None), "Expected become leader transition to succeed") offsetCheckpoints, None), "Expected become leader transition to succeed")
@ -1149,7 +1149,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeader(brokerId) .setLeader(brokerId)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(isr) .setIsr(isr)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(replicas.map(Int.box).asJava) .setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true), .setIsNew(true),
offsetCheckpoints, None), "Expected become leader transition to succeed") offsetCheckpoints, None), "Expected become leader transition to succeed")
@ -1210,7 +1210,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeader(brokerId) .setLeader(brokerId)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(isr) .setIsr(isr)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true), .setIsNew(true),
offsetCheckpoints, None), "Expected become leader transition to succeed") offsetCheckpoints, None), "Expected become leader transition to succeed")
@ -1282,16 +1282,16 @@ class PartitionTest extends AbstractPartitionTest {
alterIsrManager.failIsrUpdate(Errors.NETWORK_EXCEPTION) alterIsrManager.failIsrUpdate(Errors.NETWORK_EXCEPTION)
assertEquals(0, isrChangeListener.shrinks.get) assertEquals(0, isrChangeListener.shrinks.get)
assertEquals(1, isrChangeListener.failures.get) assertEquals(1, isrChangeListener.failures.get)
assertEquals(1, partition.getZkVersion) assertEquals(1, partition.getPartitionEpoch)
assertEquals(alterIsrManager.isrUpdates.size, 1) assertEquals(alterIsrManager.isrUpdates.size, 1)
assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.isr) assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.isr)
assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.maximalIsr) assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.maximalIsr)
assertEquals(0L, partition.localLogOrException.highWatermark) assertEquals(0L, partition.localLogOrException.highWatermark)
// The shrink succeeds after retrying // The shrink succeeds after retrying
alterIsrManager.completeIsrUpdate(newZkVersion = 2) alterIsrManager.completeIsrUpdate(newPartitionEpoch = 2)
assertEquals(1, isrChangeListener.shrinks.get) assertEquals(1, isrChangeListener.shrinks.get)
assertEquals(2, partition.getZkVersion) assertEquals(2, partition.getPartitionEpoch)
assertEquals(alterIsrManager.isrUpdates.size, 0) assertEquals(alterIsrManager.isrUpdates.size, 0)
assertEquals(Set(brokerId), partition.partitionState.isr) assertEquals(Set(brokerId), partition.partitionState.isr)
assertEquals(Set(brokerId), partition.partitionState.maximalIsr) assertEquals(Set(brokerId), partition.partitionState.maximalIsr)
@ -1344,9 +1344,9 @@ class PartitionTest extends AbstractPartitionTest {
// After the ISR shrink completes, the ISR state should be updated and the // After the ISR shrink completes, the ISR state should be updated and the
// high watermark should be advanced // high watermark should be advanced
alterIsrManager.completeIsrUpdate(newZkVersion = 2) alterIsrManager.completeIsrUpdate(newPartitionEpoch = 2)
assertEquals(1, isrChangeListener.shrinks.get) assertEquals(1, isrChangeListener.shrinks.get)
assertEquals(2, partition.getZkVersion) assertEquals(2, partition.getPartitionEpoch)
assertEquals(alterIsrManager.isrUpdates.size, 0) assertEquals(alterIsrManager.isrUpdates.size, 0)
assertEquals(Set(brokerId), partition.partitionState.isr) assertEquals(Set(brokerId), partition.partitionState.isr)
assertEquals(Set(brokerId), partition.partitionState.maximalIsr) assertEquals(Set(brokerId), partition.partitionState.maximalIsr)
@ -1772,7 +1772,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeader(brokerId) .setLeader(brokerId)
.setLeaderEpoch(6) .setLeaderEpoch(6)
.setIsr(replicas) .setIsr(replicas)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(false) .setIsNew(false)
partition.makeLeader(leaderState, offsetCheckpoints, None) partition.makeLeader(leaderState, offsetCheckpoints, None)
@ -1790,7 +1790,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeader(brokerId) .setLeader(brokerId)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(replicas) .setIsr(replicas)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(false) .setIsNew(false)
partition.makeLeader(leaderState, offsetCheckpoints, Some(topicId)) partition.makeLeader(leaderState, offsetCheckpoints, Some(topicId))
@ -1834,7 +1834,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeader(brokerId) .setLeader(brokerId)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(replicas) .setIsr(replicas)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(false) .setIsNew(false)
partition.makeFollower(leaderState, offsetCheckpoints, Some(topicId)) partition.makeFollower(leaderState, offsetCheckpoints, Some(topicId))
@ -1911,7 +1911,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeader(brokerId) .setLeader(brokerId)
.setLeaderEpoch(6) .setLeaderEpoch(6)
.setIsr(isr) .setIsr(isr)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(false) .setIsNew(false)
partition.makeLeader(leaderState, offsetCheckpoints, None) partition.makeLeader(leaderState, offsetCheckpoints, None)
@ -2090,7 +2090,7 @@ class PartitionTest extends AbstractPartitionTest {
.setLeader(brokerId) .setLeader(brokerId)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(isr.map(Int.box).asJava) .setIsr(isr.map(Int.box).asJava)
.setZkVersion(zkVersion) .setPartitionEpoch(zkVersion)
.setReplicas(replicas.map(Int.box).asJava) .setReplicas(replicas.map(Int.box).asJava)
.setIsNew(isNew), .setIsNew(isNew),
offsetCheckpoints, offsetCheckpoints,
@ -2102,7 +2102,7 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(leaderEpoch, partition.getLeaderEpoch) assertEquals(leaderEpoch, partition.getLeaderEpoch)
assertEquals(isr.toSet, partition.partitionState.isr) assertEquals(isr.toSet, partition.partitionState.isr)
assertEquals(isr.toSet, partition.partitionState.maximalIsr) assertEquals(isr.toSet, partition.partitionState.maximalIsr)
assertEquals(zkVersion, partition.getZkVersion) assertEquals(zkVersion, partition.getPartitionEpoch)
newLeader newLeader
} }

View File

@ -257,7 +257,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
val tp = new TopicPartition("t", 0) val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(0)) val assignment = Map(tp.partition -> Seq(0))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
waitForPartitionState(tp, firstControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch, waitForPartitionState(tp, firstControllerEpoch, 0, LeaderAndIsr.InitialLeaderEpoch,
"failed to get expected partition state upon topic creation") "failed to get expected partition state upon topic creation")
} }
@ -271,7 +271,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
val tp = new TopicPartition("t", 0) val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(otherBrokerId, controllerId)) val assignment = Map(tp.partition -> Seq(otherBrokerId, controllerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers.take(1)) TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers.take(1))
waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch, waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch,
"failed to get expected partition state upon topic creation") "failed to get expected partition state upon topic creation")
} }
@ -286,7 +286,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
tp1 -> ReplicaAssignment(Seq(0), Seq(), Seq())) tp1 -> ReplicaAssignment(Seq(0), Seq(), Seq()))
TestUtils.createTopic(zkClient, tp0.topic, partitionReplicaAssignment = assignment, servers = servers) TestUtils.createTopic(zkClient, tp0.topic, partitionReplicaAssignment = assignment, servers = servers)
zkClient.setTopicAssignment(tp0.topic, Some(Uuid.randomUuid()), expandedAssignment, firstControllerEpochZkVersion) zkClient.setTopicAssignment(tp0.topic, Some(Uuid.randomUuid()), expandedAssignment, firstControllerEpochZkVersion)
waitForPartitionState(tp1, firstControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch, waitForPartitionState(tp1, firstControllerEpoch, 0, LeaderAndIsr.InitialLeaderEpoch,
"failed to get expected partition state upon topic partition expansion") "failed to get expected partition state upon topic partition expansion")
TestUtils.waitForPartitionMetadata(servers, tp1.topic, tp1.partition) TestUtils.waitForPartitionMetadata(servers, tp1.topic, tp1.partition)
} }
@ -306,7 +306,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
servers(otherBrokerId).shutdown() servers(otherBrokerId).shutdown()
servers(otherBrokerId).awaitShutdown() servers(otherBrokerId).awaitShutdown()
zkClient.setTopicAssignment(tp0.topic, Some(Uuid.randomUuid()), expandedAssignment, firstControllerEpochZkVersion) zkClient.setTopicAssignment(tp0.topic, Some(Uuid.randomUuid()), expandedAssignment, firstControllerEpochZkVersion)
waitForPartitionState(tp1, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch, waitForPartitionState(tp1, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch,
"failed to get expected partition state upon topic partition expansion") "failed to get expected partition state upon topic partition expansion")
TestUtils.waitForPartitionMetadata(Seq(servers(controllerId)), tp1.topic, tp1.partition) TestUtils.waitForPartitionMetadata(Seq(servers(controllerId)), tp1.topic, tp1.partition)
} }
@ -325,7 +325,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
val reassignment = Map(tp -> ReplicaAssignment(Seq(otherBrokerId), List(), List())) val reassignment = Map(tp -> ReplicaAssignment(Seq(otherBrokerId), List(), List()))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k -> v.replicas }) zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k -> v.replicas })
waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 3, waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.InitialLeaderEpoch + 3,
"failed to get expected partition state after partition reassignment") "failed to get expected partition state after partition reassignment")
TestUtils.waitUntilTrue(() => zkClient.getFullReplicaAssignmentForTopics(Set(tp.topic)) == reassignment, TestUtils.waitUntilTrue(() => zkClient.getFullReplicaAssignmentForTopics(Set(tp.topic)) == reassignment,
"failed to get updated partition assignment on topic znode after partition reassignment") "failed to get updated partition assignment on topic znode after partition reassignment")
@ -364,7 +364,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
val reassignment = Map(tp -> ReplicaAssignment(Seq(otherBrokerId), List(), List())) val reassignment = Map(tp -> ReplicaAssignment(Seq(otherBrokerId), List(), List()))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k -> v.replicas }) zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k -> v.replicas })
waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 3, waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.InitialLeaderEpoch + 3,
"with an offline log directory on the target broker, the partition reassignment stalls") "with an offline log directory on the target broker, the partition reassignment stalls")
TestUtils.waitUntilTrue(() => zkClient.getFullReplicaAssignmentForTopics(Set(tp.topic)) == reassignment, TestUtils.waitUntilTrue(() => zkClient.getFullReplicaAssignmentForTopics(Set(tp.topic)) == reassignment,
"failed to get updated partition assignment on topic znode after partition reassignment") "failed to get updated partition assignment on topic znode after partition reassignment")
@ -389,7 +389,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
servers(otherBrokerId).awaitShutdown() servers(otherBrokerId).awaitShutdown()
val controller = getController() val controller = getController()
zkClient.setOrCreatePartitionReassignment(reassignment, controller.kafkaController.controllerContext.epochZkVersion) zkClient.setOrCreatePartitionReassignment(reassignment, controller.kafkaController.controllerContext.epochZkVersion)
waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1, waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
"failed to get expected partition state during partition reassignment with offline replica") "failed to get expected partition state during partition reassignment with offline replica")
TestUtils.waitUntilTrue(() => zkClient.reassignPartitionsInProgress, TestUtils.waitUntilTrue(() => zkClient.reassignPartitionsInProgress,
"partition reassignment path should remain while reassignment in progress") "partition reassignment path should remain while reassignment in progress")
@ -407,10 +407,10 @@ class ControllerIntegrationTest extends QuorumTestHarness {
servers(otherBrokerId).shutdown() servers(otherBrokerId).shutdown()
servers(otherBrokerId).awaitShutdown() servers(otherBrokerId).awaitShutdown()
zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k -> v.replicas }) zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k -> v.replicas })
waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1, waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
"failed to get expected partition state during partition reassignment with offline replica") "failed to get expected partition state during partition reassignment with offline replica")
servers(otherBrokerId).startup() servers(otherBrokerId).startup()
waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 4, waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.InitialLeaderEpoch + 4,
"failed to get expected partition state after partition reassignment") "failed to get expected partition state after partition reassignment")
TestUtils.waitUntilTrue(() => zkClient.getFullReplicaAssignmentForTopics(Set(tp.topic)) == reassignment, TestUtils.waitUntilTrue(() => zkClient.getFullReplicaAssignmentForTopics(Set(tp.topic)) == reassignment,
"failed to get updated partition assignment on topic znode after partition reassignment") "failed to get updated partition assignment on topic znode after partition reassignment")
@ -426,7 +426,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
val tp = new TopicPartition("t", 0) val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(otherBroker.config.brokerId, controllerId)) val assignment = Map(tp.partition -> Seq(otherBroker.config.brokerId, controllerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
preferredReplicaLeaderElection(controllerId, otherBroker, tp, assignment(tp.partition).toSet, LeaderAndIsr.initialLeaderEpoch) preferredReplicaLeaderElection(controllerId, otherBroker, tp, assignment(tp.partition).toSet, LeaderAndIsr.InitialLeaderEpoch)
} }
@Test @Test
@ -437,8 +437,8 @@ class ControllerIntegrationTest extends QuorumTestHarness {
val tp = new TopicPartition("t", 0) val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(otherBroker.config.brokerId, controllerId)) val assignment = Map(tp.partition -> Seq(otherBroker.config.brokerId, controllerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
preferredReplicaLeaderElection(controllerId, otherBroker, tp, assignment(tp.partition).toSet, LeaderAndIsr.initialLeaderEpoch) preferredReplicaLeaderElection(controllerId, otherBroker, tp, assignment(tp.partition).toSet, LeaderAndIsr.InitialLeaderEpoch)
preferredReplicaLeaderElection(controllerId, otherBroker, tp, assignment(tp.partition).toSet, LeaderAndIsr.initialLeaderEpoch + 2) preferredReplicaLeaderElection(controllerId, otherBroker, tp, assignment(tp.partition).toSet, LeaderAndIsr.InitialLeaderEpoch + 2)
} }
@Test @Test
@ -454,7 +454,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
zkClient.createPreferredReplicaElection(Set(tp)) zkClient.createPreferredReplicaElection(Set(tp))
TestUtils.waitUntilTrue(() => !zkClient.pathExists(PreferredReplicaElectionZNode.path), TestUtils.waitUntilTrue(() => !zkClient.pathExists(PreferredReplicaElectionZNode.path),
"failed to remove preferred replica leader election path after giving up") "failed to remove preferred replica leader election path after giving up")
waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1, waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
"failed to get expected partition state upon broker shutdown") "failed to get expected partition state upon broker shutdown")
} }
@ -468,10 +468,10 @@ class ControllerIntegrationTest extends QuorumTestHarness {
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
servers(otherBrokerId).shutdown() servers(otherBrokerId).shutdown()
servers(otherBrokerId).awaitShutdown() servers(otherBrokerId).awaitShutdown()
waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1, waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
"failed to get expected partition state upon broker shutdown") "failed to get expected partition state upon broker shutdown")
servers(otherBrokerId).startup() servers(otherBrokerId).startup()
waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 2, waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.InitialLeaderEpoch + 2,
"failed to get expected partition state upon broker startup") "failed to get expected partition state upon broker startup")
} }
@ -483,14 +483,14 @@ class ControllerIntegrationTest extends QuorumTestHarness {
val tp = new TopicPartition("t", 0) val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(otherBrokerId)) val assignment = Map(tp.partition -> Seq(otherBrokerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch, waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.InitialLeaderEpoch,
"failed to get expected partition state upon topic creation") "failed to get expected partition state upon topic creation")
servers(otherBrokerId).shutdown() servers(otherBrokerId).shutdown()
servers(otherBrokerId).awaitShutdown() servers(otherBrokerId).awaitShutdown()
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp)) val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
leaderIsrAndControllerEpochMap.contains(tp) && leaderIsrAndControllerEpochMap.contains(tp) &&
isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), firstControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch + 1) && isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), firstControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.InitialLeaderEpoch + 1) &&
leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr == List(otherBrokerId) leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr == List(otherBrokerId)
}, "failed to get expected partition state after entire isr went offline") }, "failed to get expected partition state after entire isr went offline")
} }
@ -503,14 +503,14 @@ class ControllerIntegrationTest extends QuorumTestHarness {
val tp = new TopicPartition("t", 0) val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(otherBrokerId)) val assignment = Map(tp.partition -> Seq(otherBrokerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch, waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.InitialLeaderEpoch,
"failed to get expected partition state upon topic creation") "failed to get expected partition state upon topic creation")
servers(otherBrokerId).shutdown() servers(otherBrokerId).shutdown()
servers(otherBrokerId).awaitShutdown() servers(otherBrokerId).awaitShutdown()
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp)) val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
leaderIsrAndControllerEpochMap.contains(tp) && leaderIsrAndControllerEpochMap.contains(tp) &&
isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), firstControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch + 1) && isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), firstControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.InitialLeaderEpoch + 1) &&
leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr == List(otherBrokerId) leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr == List(otherBrokerId)
}, "failed to get expected partition state after entire isr went offline") }, "failed to get expected partition state after entire isr went offline")
} }
@ -669,7 +669,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
val assignment = Map(tp.partition -> Seq(0, 1)) val assignment = Map(tp.partition -> Seq(0, 1))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
waitForPartitionState(tp, firstControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch, waitForPartitionState(tp, firstControllerEpoch, 0, LeaderAndIsr.InitialLeaderEpoch,
"failed to get expected partition state upon topic creation") "failed to get expected partition state upon topic creation")
// Wait until the event thread is idle // Wait until the event thread is idle
@ -966,7 +966,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
TestUtils.createTopic(zkClient, tp1.topic(), assignment1, servers) TestUtils.createTopic(zkClient, tp1.topic(), assignment1, servers)
// Test that the first topic has its ID added correctly // Test that the first topic has its ID added correctly
waitForPartitionState(tp1, firstControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch, waitForPartitionState(tp1, firstControllerEpoch, 0, LeaderAndIsr.InitialLeaderEpoch,
"failed to get expected partition state upon topic creation") "failed to get expected partition state upon topic creation")
assertNotEquals(None, controller.controllerContext.topicIds.get("t1")) assertNotEquals(None, controller.controllerContext.topicIds.get("t1"))
val topicId1 = controller.controllerContext.topicIds("t1") val topicId1 = controller.controllerContext.topicIds("t1")
@ -977,7 +977,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
TestUtils.createTopic(zkClient, tp2.topic(), assignment2, servers) TestUtils.createTopic(zkClient, tp2.topic(), assignment2, servers)
// Test that the second topic has its ID added correctly // Test that the second topic has its ID added correctly
waitForPartitionState(tp2, firstControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch, waitForPartitionState(tp2, firstControllerEpoch, 0, LeaderAndIsr.InitialLeaderEpoch,
"failed to get expected partition state upon topic creation") "failed to get expected partition state upon topic creation")
assertNotEquals(None, controller.controllerContext.topicIds.get("t2")) assertNotEquals(None, controller.controllerContext.topicIds.get("t2"))
val topicId2 = controller.controllerContext.topicIds("t2") val topicId2 = controller.controllerContext.topicIds("t2")
@ -1002,7 +1002,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
TestUtils.createTopic(zkClient, tp1.topic(), assignment1, servers) TestUtils.createTopic(zkClient, tp1.topic(), assignment1, servers)
// Test that the first topic has no topic ID added. // Test that the first topic has no topic ID added.
waitForPartitionState(tp1, firstControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch, waitForPartitionState(tp1, firstControllerEpoch, 0, LeaderAndIsr.InitialLeaderEpoch,
"failed to get expected partition state upon topic creation") "failed to get expected partition state upon topic creation")
assertEquals(None, controller.controllerContext.topicIds.get("t1")) assertEquals(None, controller.controllerContext.topicIds.get("t1"))
@ -1011,7 +1011,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
TestUtils.createTopic(zkClient, tp2.topic(), assignment2, servers) TestUtils.createTopic(zkClient, tp2.topic(), assignment2, servers)
// Test that the second topic has no topic ID added. // Test that the second topic has no topic ID added.
waitForPartitionState(tp2, firstControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch, waitForPartitionState(tp2, firstControllerEpoch, 0, LeaderAndIsr.InitialLeaderEpoch,
"failed to get expected partition state upon topic creation") "failed to get expected partition state upon topic creation")
assertEquals(None, controller.controllerContext.topicIds.get("t2")) assertEquals(None, controller.controllerContext.topicIds.get("t2"))
@ -1028,7 +1028,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
servers = makeServers(1) servers = makeServers(1)
adminZkClient.createTopic(tp.topic, 1, 1) adminZkClient.createTopic(tp.topic, 1, 1)
waitForPartitionState(tp, firstControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch, waitForPartitionState(tp, firstControllerEpoch, 0, LeaderAndIsr.InitialLeaderEpoch,
"failed to get expected partition state upon topic creation") "failed to get expected partition state upon topic creation")
val topicIdAfterCreate = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic()) val topicIdAfterCreate = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
assertTrue(topicIdAfterCreate.isDefined) assertTrue(topicIdAfterCreate.isDefined)
@ -1054,7 +1054,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
servers = makeServers(1, interBrokerProtocolVersion = Some(KAFKA_2_7_IV0)) servers = makeServers(1, interBrokerProtocolVersion = Some(KAFKA_2_7_IV0))
adminZkClient.createTopic(tp.topic, 1, 1) adminZkClient.createTopic(tp.topic, 1, 1)
waitForPartitionState(tp, firstControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch, waitForPartitionState(tp, firstControllerEpoch, 0, LeaderAndIsr.InitialLeaderEpoch,
"failed to get expected partition state upon topic creation") "failed to get expected partition state upon topic creation")
val topicIdAfterCreate = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic()) val topicIdAfterCreate = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
assertEquals(None, topicIdAfterCreate) assertEquals(None, topicIdAfterCreate)
@ -1080,7 +1080,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
val tp = new TopicPartition("t", 0) val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(controllerId)) val assignment = Map(tp.partition -> Seq(controllerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch, waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch,
"failed to get expected partition state upon topic creation") "failed to get expected partition state upon topic creation")
val topicId = controller.controllerContext.topicIds.get("t").get val topicId = controller.controllerContext.topicIds.get("t").get
@ -1099,7 +1099,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
val tp = new TopicPartition("t", 0) val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(controllerId)) val assignment = Map(tp.partition -> Seq(controllerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch, waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch,
"failed to get expected partition state upon topic creation") "failed to get expected partition state upon topic creation")
val emptyTopicId = controller.controllerContext.topicIds.get("t") val emptyTopicId = controller.controllerContext.topicIds.get("t")
assertEquals(None, emptyTopicId) assertEquals(None, emptyTopicId)
@ -1119,7 +1119,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
val tp = new TopicPartition("t", 0) val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(controllerId)) val assignment = Map(tp.partition -> Seq(controllerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch, waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch,
"failed to get expected partition state upon topic creation") "failed to get expected partition state upon topic creation")
val topicId = controller.controllerContext.topicIds.get("t").get val topicId = controller.controllerContext.topicIds.get("t").get
@ -1139,7 +1139,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
val tp = new TopicPartition("t", 0) val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(controllerId)) val assignment = Map(tp.partition -> Seq(controllerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch, waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch,
"failed to get expected partition state upon topic creation") "failed to get expected partition state upon topic creation")
assertEquals(None, zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic)) assertEquals(None, zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic))
assertEquals(None, controller.controllerContext.topicIds.get(tp.topic)) assertEquals(None, controller.controllerContext.topicIds.get(tp.topic))
@ -1181,7 +1181,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
// Only the remaining brokers will have the replicas for the partition // Only the remaining brokers will have the replicas for the partition
val assignment = Map(tp.partition -> remainingBrokers.map(_.config.brokerId)) val assignment = Map(tp.partition -> remainingBrokers.map(_.config.brokerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
waitForPartitionState(tp, firstControllerEpoch, remainingBrokers(0).config.brokerId, LeaderAndIsr.initialLeaderEpoch, waitForPartitionState(tp, firstControllerEpoch, remainingBrokers(0).config.brokerId, LeaderAndIsr.InitialLeaderEpoch,
"failed to get expected partition state upon topic creation") "failed to get expected partition state upon topic creation")
val topicIdAfterCreate = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic()) val topicIdAfterCreate = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
assertEquals(None, topicIdAfterCreate) assertEquals(None, topicIdAfterCreate)
@ -1232,7 +1232,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
servers = makeServers(1, interBrokerProtocolVersion = Some(KAFKA_2_7_IV0)) servers = makeServers(1, interBrokerProtocolVersion = Some(KAFKA_2_7_IV0))
// use create topic with ZK client directly, without topic ID // use create topic with ZK client directly, without topic ID
adminZkClient.createTopic(tp.topic, 1, 1) adminZkClient.createTopic(tp.topic, 1, 1)
waitForPartitionState(tp, firstControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch, waitForPartitionState(tp, firstControllerEpoch, 0, LeaderAndIsr.InitialLeaderEpoch,
"failed to get expected partition state upon topic creation") "failed to get expected partition state upon topic creation")
val topicIdAfterCreate = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic()) val topicIdAfterCreate = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
val id = servers.head.kafkaController.controllerContext.topicIds.get(tp.topic) val id = servers.head.kafkaController.controllerContext.topicIds.get(tp.topic)
@ -1244,7 +1244,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
servers(0).shutdown() servers(0).shutdown()
servers(0).awaitShutdown() servers(0).awaitShutdown()
servers = makeServers(1) servers = makeServers(1)
waitForPartitionState(tp, firstControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch, waitForPartitionState(tp, firstControllerEpoch, 0, LeaderAndIsr.InitialLeaderEpoch,
"failed to get expected partition state upon controller restart") "failed to get expected partition state upon controller restart")
val topicIdAfterUpgrade = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic()) val topicIdAfterUpgrade = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
assertEquals(topicIdAfterUpgrade, servers.head.kafkaController.controllerContext.topicIds.get(tp.topic), assertEquals(topicIdAfterUpgrade, servers.head.kafkaController.controllerContext.topicIds.get(tp.topic),
@ -1256,7 +1256,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
servers(0).shutdown() servers(0).shutdown()
servers(0).awaitShutdown() servers(0).awaitShutdown()
servers = makeServers(1, interBrokerProtocolVersion = Some(KAFKA_2_7_IV0)) servers = makeServers(1, interBrokerProtocolVersion = Some(KAFKA_2_7_IV0))
waitForPartitionState(tp, firstControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch, waitForPartitionState(tp, firstControllerEpoch, 0, LeaderAndIsr.InitialLeaderEpoch,
"failed to get expected partition state upon topic creation") "failed to get expected partition state upon topic creation")
val topicIdAfterDowngrade = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic()) val topicIdAfterDowngrade = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
assertTrue(topicIdAfterDowngrade.isDefined) assertTrue(topicIdAfterDowngrade.isDefined)

View File

@ -171,7 +171,7 @@ class PartitionStateMachineTest {
TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat, ResponseMetadata(0, 0)))) TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat, ResponseMetadata(0, 0))))
val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId) val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2) val updatedLeaderAndIsr = leaderAndIsrAfterElection.withPartitionEpoch(2)
when(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion)) when(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion))
.thenReturn(UpdateLeaderAndIsrResult(Map(partition -> Right(updatedLeaderAndIsr)), Seq.empty)) .thenReturn(UpdateLeaderAndIsrResult(Map(partition -> Right(updatedLeaderAndIsr)), Seq.empty))
@ -206,7 +206,7 @@ class PartitionStateMachineTest {
.thenReturn(Seq(GetDataResponse(Code.OK, null, Some(partition), .thenReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat, ResponseMetadata(0, 0)))) TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat, ResponseMetadata(0, 0))))
val leaderAndIsrAfterElection = leaderAndIsr.newLeaderAndIsr(otherBrokerId, List(otherBrokerId)) val leaderAndIsrAfterElection = leaderAndIsr.newLeaderAndIsr(otherBrokerId, List(otherBrokerId))
val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2) val updatedLeaderAndIsr = leaderAndIsrAfterElection.withPartitionEpoch(2)
when(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion)) when(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion))
.thenReturn(UpdateLeaderAndIsrResult(Map(partition -> Right(updatedLeaderAndIsr)), Seq.empty)) .thenReturn(UpdateLeaderAndIsrResult(Map(partition -> Right(updatedLeaderAndIsr)), Seq.empty))
@ -260,7 +260,7 @@ class PartitionStateMachineTest {
when(mockZkClient.getLogConfigs(Set.empty, config.originals())) when(mockZkClient.getLogConfigs(Set.empty, config.originals()))
.thenReturn((Map(partition.topic -> LogConfig()), Map.empty[String, Exception])) .thenReturn((Map(partition.topic -> LogConfig()), Map.empty[String, Exception]))
val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId) val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2) val updatedLeaderAndIsr = leaderAndIsrAfterElection.withPartitionEpoch(2)
when(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion)) when(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion))
.thenReturn(UpdateLeaderAndIsrResult(Map(partition -> Right(updatedLeaderAndIsr)), Seq.empty)) .thenReturn(UpdateLeaderAndIsrResult(Map(partition -> Right(updatedLeaderAndIsr)), Seq.empty))
@ -337,7 +337,7 @@ class PartitionStateMachineTest {
} else { } else {
leaderAndIsr.newLeaderAndIsr(brokerId, List(brokerId)) leaderAndIsr.newLeaderAndIsr(brokerId, List(brokerId))
} }
val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2) val updatedLeaderAndIsr = leaderAndIsrAfterElection.withPartitionEpoch(2)
when(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion)) when(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion))
.thenReturn(UpdateLeaderAndIsrResult(Map(partition -> Right(updatedLeaderAndIsr)), Seq.empty)) .thenReturn(UpdateLeaderAndIsrResult(Map(partition -> Right(updatedLeaderAndIsr)), Seq.empty))

View File

@ -213,7 +213,7 @@ class ReplicaStateMachineTest {
val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
val adjustedLeaderAndIsr = leaderAndIsr.newLeaderAndIsr(LeaderAndIsr.NoLeader, List(otherBrokerId)) val adjustedLeaderAndIsr = leaderAndIsr.newLeaderAndIsr(LeaderAndIsr.NoLeader, List(otherBrokerId))
val updatedLeaderAndIsr = adjustedLeaderAndIsr.withZkVersion(adjustedLeaderAndIsr .zkVersion + 1) val updatedLeaderAndIsr = adjustedLeaderAndIsr.withPartitionEpoch(adjustedLeaderAndIsr.partitionEpoch + 1)
val updatedLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch) val updatedLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch)
when(mockZkClient.getTopicPartitionStatesRaw(partitions)).thenReturn( when(mockZkClient.getTopicPartitionStatesRaw(partitions)).thenReturn(
Seq(GetDataResponse(Code.OK, null, Some(partition), Seq(GetDataResponse(Code.OK, null, Some(partition),

View File

@ -147,9 +147,9 @@ class BrokerEpochIntegrationTest extends QuorumTestHarness {
.setPartitionIndex(tp.partition) .setPartitionIndex(tp.partition)
.setControllerEpoch(controllerEpoch) .setControllerEpoch(controllerEpoch)
.setLeader(brokerId2) .setLeader(brokerId2)
.setLeaderEpoch(LeaderAndIsr.initialLeaderEpoch + 1) .setLeaderEpoch(LeaderAndIsr.InitialLeaderEpoch + 1)
.setIsr(Seq(brokerId1, brokerId2).map(Integer.valueOf).asJava) .setIsr(Seq(brokerId1, brokerId2).map(Integer.valueOf).asJava)
.setZkVersion(LeaderAndIsr.initialZKVersion) .setPartitionEpoch(LeaderAndIsr.InitialPartitionEpoch)
.setReplicas(Seq(0, 1).map(Integer.valueOf).asJava) .setReplicas(Seq(0, 1).map(Integer.valueOf).asJava)
.setIsNew(false) .setIsNew(false)
) )
@ -177,9 +177,9 @@ class BrokerEpochIntegrationTest extends QuorumTestHarness {
.setPartitionIndex(tp.partition) .setPartitionIndex(tp.partition)
.setControllerEpoch(controllerEpoch) .setControllerEpoch(controllerEpoch)
.setLeader(brokerId2) .setLeader(brokerId2)
.setLeaderEpoch(LeaderAndIsr.initialLeaderEpoch + 1) .setLeaderEpoch(LeaderAndIsr.InitialLeaderEpoch + 1)
.setIsr(Seq(brokerId1, brokerId2).map(Integer.valueOf).asJava) .setIsr(Seq(brokerId1, brokerId2).map(Integer.valueOf).asJava)
.setZkVersion(LeaderAndIsr.initialZKVersion) .setZkVersion(LeaderAndIsr.InitialPartitionEpoch)
.setReplicas(Seq(0, 1).map(Integer.valueOf).asJava)) .setReplicas(Seq(0, 1).map(Integer.valueOf).asJava))
val liveBrokers = brokerAndEpochs.map { case (broker, _) => val liveBrokers = brokerAndEpochs.map { case (broker, _) =>
val securityProtocol = SecurityProtocol.PLAINTEXT val securityProtocol = SecurityProtocol.PLAINTEXT
@ -220,7 +220,7 @@ class BrokerEpochIntegrationTest extends QuorumTestHarness {
.setTopicName(tp.topic()) .setTopicName(tp.topic())
.setPartitionStates(Seq(new StopReplicaPartitionState() .setPartitionStates(Seq(new StopReplicaPartitionState()
.setPartitionIndex(tp.partition()) .setPartitionIndex(tp.partition())
.setLeaderEpoch(LeaderAndIsr.initialLeaderEpoch + 2) .setLeaderEpoch(LeaderAndIsr.InitialLeaderEpoch + 2)
.setDeletePartition(true)).asJava) .setDeletePartition(true)).asJava)
).asJava ).asJava
val requestBuilder = new StopReplicaRequest.Builder( val requestBuilder = new StopReplicaRequest.Builder(

View File

@ -1758,7 +1758,7 @@ class KafkaApisTest {
@Test @Test
def shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlagAndLeaderEpoch(): Unit = { def shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlagAndLeaderEpoch(): Unit = {
shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag( shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag(
LeaderAndIsr.initialLeaderEpoch + 2, deletePartition = true) LeaderAndIsr.InitialLeaderEpoch + 2, deletePartition = true)
} }
@Test @Test
@ -1776,7 +1776,7 @@ class KafkaApisTest {
@Test @Test
def shouldNotResignCoordinatorsIfStopReplicaReceivedWithoutDeleteFlag(): Unit = { def shouldNotResignCoordinatorsIfStopReplicaReceivedWithoutDeleteFlag(): Unit = {
shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag( shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag(
LeaderAndIsr.initialLeaderEpoch + 2, deletePartition = false) LeaderAndIsr.InitialLeaderEpoch + 2, deletePartition = false)
} }
def shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag(leaderEpoch: Int, def shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag(leaderEpoch: Int,
@ -3094,7 +3094,7 @@ class KafkaApisTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(1) .setLeaderEpoch(1)
.setIsr(asList(0, 1)) .setIsr(asList(0, 1))
.setZkVersion(2) .setPartitionEpoch(2)
.setReplicas(asList(0, 1, 2)) .setReplicas(asList(0, 1, 2))
.setIsNew(false) .setIsNew(false)
).asJava ).asJava

View File

@ -149,9 +149,9 @@ class LeaderElectionTest extends QuorumTestHarness {
.setPartitionIndex(partitionId) .setPartitionIndex(partitionId)
.setControllerEpoch(2) .setControllerEpoch(2)
.setLeader(brokerId2) .setLeader(brokerId2)
.setLeaderEpoch(LeaderAndIsr.initialLeaderEpoch) .setLeaderEpoch(LeaderAndIsr.InitialLeaderEpoch)
.setIsr(Seq(brokerId1, brokerId2).map(Integer.valueOf).asJava) .setIsr(Seq(brokerId1, brokerId2).map(Integer.valueOf).asJava)
.setZkVersion(LeaderAndIsr.initialZKVersion) .setPartitionEpoch(LeaderAndIsr.InitialPartitionEpoch)
.setReplicas(Seq(0, 1).map(Integer.valueOf).asJava) .setReplicas(Seq(0, 1).map(Integer.valueOf).asJava)
.setIsNew(false) .setIsNew(false)
) )

View File

@ -407,7 +407,7 @@ class ReplicaManagerConcurrencyTest {
.partitionChanges .partitionChanges
.get(partitionId) .get(partitionId)
leaderAndIsr.withZkVersion(registration.partitionEpoch) leaderAndIsr.withPartitionEpoch(registration.partitionEpoch)
} }
private def toList(ints: Array[Int]): util.List[Integer] = { private def toList(ints: Array[Int]): util.List[Integer] = {

View File

@ -237,7 +237,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(0) .setLeaderEpoch(0)
.setIsr(brokerList) .setIsr(brokerList)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(brokerList) .setReplicas(brokerList)
.setIsNew(false)).asJava, .setIsNew(false)).asJava,
topicIds, topicIds,
@ -260,7 +260,7 @@ class ReplicaManagerTest {
.setLeader(1) .setLeader(1)
.setLeaderEpoch(1) .setLeaderEpoch(1)
.setIsr(brokerList) .setIsr(brokerList)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(brokerList) .setReplicas(brokerList)
.setIsNew(false)).asJava, .setIsNew(false)).asJava,
topicIds, topicIds,
@ -297,7 +297,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(epoch) .setLeaderEpoch(epoch)
.setIsr(brokerList) .setIsr(brokerList)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(brokerList) .setReplicas(brokerList)
.setIsNew(true)).asJava, .setIsNew(true)).asJava,
topicIds.asJava, topicIds.asJava,
@ -357,7 +357,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(0) .setLeaderEpoch(0)
.setIsr(brokerList) .setIsr(brokerList)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(brokerList) .setReplicas(brokerList)
.setIsNew(true)).asJava, .setIsNew(true)).asJava,
Collections.singletonMap(topic, Uuid.randomUuid()), Collections.singletonMap(topic, Uuid.randomUuid()),
@ -423,7 +423,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(0) .setLeaderEpoch(0)
.setIsr(brokerList) .setIsr(brokerList)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(brokerList) .setReplicas(brokerList)
.setIsNew(true)).asJava, .setIsNew(true)).asJava,
topicIds.asJava, topicIds.asJava,
@ -483,7 +483,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(0) .setLeaderEpoch(0)
.setIsr(brokerList) .setIsr(brokerList)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(brokerList) .setReplicas(brokerList)
.setIsNew(true)).asJava, .setIsNew(true)).asJava,
topicIds.asJava, topicIds.asJava,
@ -590,7 +590,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(0) .setLeaderEpoch(0)
.setIsr(brokerList) .setIsr(brokerList)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(brokerList) .setReplicas(brokerList)
.setIsNew(true)).asJava, .setIsNew(true)).asJava,
topicIds.asJava, topicIds.asJava,
@ -667,7 +667,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(0) .setLeaderEpoch(0)
.setIsr(brokerList) .setIsr(brokerList)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(brokerList) .setReplicas(brokerList)
.setIsNew(false)).asJava, .setIsNew(false)).asJava,
topicIds.asJava, topicIds.asJava,
@ -724,7 +724,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(replicas) .setIsr(replicas)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true) .setIsNew(true)
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@ -846,7 +846,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(replicas) .setIsr(replicas)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true) .setIsNew(true)
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@ -917,7 +917,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(replicas) .setIsr(replicas)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true) .setIsNew(true)
val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@ -999,7 +999,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(0) .setLeaderEpoch(0)
.setIsr(partition0Replicas) .setIsr(partition0Replicas)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(partition0Replicas) .setReplicas(partition0Replicas)
.setIsNew(true), .setIsNew(true),
new LeaderAndIsrPartitionState() new LeaderAndIsrPartitionState()
@ -1009,7 +1009,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(0) .setLeaderEpoch(0)
.setIsr(partition1Replicas) .setIsr(partition1Replicas)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(partition1Replicas) .setReplicas(partition1Replicas)
.setIsNew(true) .setIsNew(true)
).asJava, ).asJava,
@ -1216,7 +1216,7 @@ class ReplicaManagerTest {
.setLeader(1) .setLeader(1)
.setLeaderEpoch(1) .setLeaderEpoch(1)
.setIsr(brokerList) .setIsr(brokerList)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(brokerList) .setReplicas(brokerList)
.setIsNew(false)).asJava, .setIsNew(false)).asJava,
Collections.singletonMap(topic, topicId), Collections.singletonMap(topic, topicId),
@ -1274,7 +1274,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(1) .setLeaderEpoch(1)
.setIsr(brokerList) .setIsr(brokerList)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(brokerList) .setReplicas(brokerList)
.setIsNew(false)).asJava, .setIsNew(false)).asJava,
Collections.singletonMap(topic, topicId), Collections.singletonMap(topic, topicId),
@ -1322,7 +1322,7 @@ class ReplicaManagerTest {
.setLeader(1) .setLeader(1)
.setLeaderEpoch(1) .setLeaderEpoch(1)
.setIsr(brokerList) .setIsr(brokerList)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(brokerList) .setReplicas(brokerList)
.setIsNew(false)).asJava, .setIsNew(false)).asJava,
Collections.singletonMap(topic, topicId), Collections.singletonMap(topic, topicId),
@ -1380,7 +1380,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(1) .setLeaderEpoch(1)
.setIsr(brokerList) .setIsr(brokerList)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(brokerList) .setReplicas(brokerList)
.setIsNew(false)).asJava, .setIsNew(false)).asJava,
Collections.singletonMap(topic, topicId), Collections.singletonMap(topic, topicId),
@ -1439,7 +1439,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(1) .setLeaderEpoch(1)
.setIsr(brokerList) .setIsr(brokerList)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(brokerList) .setReplicas(brokerList)
.setIsNew(false)).asJava, .setIsNew(false)).asJava,
Collections.singletonMap(topic, topicId), Collections.singletonMap(topic, topicId),
@ -1532,7 +1532,7 @@ class ReplicaManagerTest {
.setLeader(1) .setLeader(1)
.setLeaderEpoch(0) .setLeaderEpoch(0)
.setIsr(partition0Replicas) .setIsr(partition0Replicas)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(partition0Replicas) .setReplicas(partition0Replicas)
.setIsNew(true)).asJava, .setIsNew(true)).asJava,
topicIds.asJava, topicIds.asJava,
@ -1579,7 +1579,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(1) .setLeaderEpoch(1)
.setIsr(partition0Replicas) .setIsr(partition0Replicas)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(partition0Replicas) .setReplicas(partition0Replicas)
.setIsNew(true)).asJava, .setIsNew(true)).asJava,
topicIds.asJava, topicIds.asJava,
@ -1627,7 +1627,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(1) .setLeaderEpoch(1)
.setIsr(partition0Replicas) .setIsr(partition0Replicas)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(partition0Replicas) .setReplicas(partition0Replicas)
.setIsNew(true)).asJava, .setIsNew(true)).asJava,
topicIds.asJava, topicIds.asJava,
@ -1648,7 +1648,7 @@ class ReplicaManagerTest {
.setLeader(1) .setLeader(1)
.setLeaderEpoch(2) .setLeaderEpoch(2)
.setIsr(partition0Replicas) .setIsr(partition0Replicas)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(partition0Replicas) .setReplicas(partition0Replicas)
.setIsNew(true)).asJava, .setIsNew(true)).asJava,
topicIds.asJava, topicIds.asJava,
@ -1684,7 +1684,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(1) .setLeaderEpoch(1)
.setIsr(partition0Replicas) .setIsr(partition0Replicas)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(partition0Replicas) .setReplicas(partition0Replicas)
.setIsNew(true)).asJava, .setIsNew(true)).asJava,
topicIds.asJava, topicIds.asJava,
@ -1706,7 +1706,7 @@ class ReplicaManagerTest {
.setLeader(1) .setLeader(1)
.setLeaderEpoch(2) .setLeaderEpoch(2)
.setIsr(partition0Replicas) .setIsr(partition0Replicas)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(partition0Replicas) .setReplicas(partition0Replicas)
.setIsNew(true)).asJava, .setIsNew(true)).asJava,
topicIds.asJava, topicIds.asJava,
@ -1740,7 +1740,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(1) .setLeaderEpoch(1)
.setIsr(partition0Replicas) .setIsr(partition0Replicas)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(partition0Replicas) .setReplicas(partition0Replicas)
.setIsNew(true)).asJava, .setIsNew(true)).asJava,
topicIds.asJava, topicIds.asJava,
@ -1784,7 +1784,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(1) .setLeaderEpoch(1)
.setIsr(partition0Replicas) .setIsr(partition0Replicas)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(partition0Replicas) .setReplicas(partition0Replicas)
.setIsNew(true)).asJava, .setIsNew(true)).asJava,
topicIds.asJava, topicIds.asJava,
@ -1827,7 +1827,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(1) .setLeaderEpoch(1)
.setIsr(partition0Replicas) .setIsr(partition0Replicas)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(partition0Replicas) .setReplicas(partition0Replicas)
.setIsNew(true)).asJava, .setIsNew(true)).asJava,
topicIds.asJava, topicIds.asJava,
@ -2073,7 +2073,7 @@ class ReplicaManagerTest {
.setLeader(leaderBrokerId) .setLeader(leaderBrokerId)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(aliveBrokerIds.asJava) .setIsr(aliveBrokerIds.asJava)
.setZkVersion(zkVersion) .setPartitionEpoch(zkVersion)
.setReplicas(aliveBrokerIds.asJava) .setReplicas(aliveBrokerIds.asJava)
.setIsNew(isNew) .setIsNew(isNew)
} }
@ -2282,7 +2282,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(partition0Replicas) .setIsr(partition0Replicas)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(partition0Replicas) .setReplicas(partition0Replicas)
.setIsNew(true), .setIsNew(true),
new LeaderAndIsrPartitionState() new LeaderAndIsrPartitionState()
@ -2292,7 +2292,7 @@ class ReplicaManagerTest {
.setLeader(1) .setLeader(1)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(partition1Replicas) .setIsr(partition1Replicas)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(partition1Replicas) .setReplicas(partition1Replicas)
.setIsNew(true) .setIsNew(true)
).asJava, ).asJava,
@ -2313,7 +2313,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(leaderEpoch + leaderEpochIncrement) .setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
.setIsr(partition0Replicas) .setIsr(partition0Replicas)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(partition0Replicas) .setReplicas(partition0Replicas)
.setIsNew(true), .setIsNew(true),
new LeaderAndIsrPartitionState() new LeaderAndIsrPartitionState()
@ -2323,7 +2323,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(leaderEpoch + leaderEpochIncrement) .setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
.setIsr(partition1Replicas) .setIsr(partition1Replicas)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(partition1Replicas) .setReplicas(partition1Replicas)
.setIsNew(true) .setIsNew(true)
).asJava, ).asJava,
@ -2370,7 +2370,7 @@ class ReplicaManagerTest {
.setLeader(1) .setLeader(1)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(partition0Replicas) .setIsr(partition0Replicas)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(partition0Replicas) .setReplicas(partition0Replicas)
.setIsNew(true), .setIsNew(true),
new LeaderAndIsrPartitionState() new LeaderAndIsrPartitionState()
@ -2380,7 +2380,7 @@ class ReplicaManagerTest {
.setLeader(1) .setLeader(1)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(partition1Replicas) .setIsr(partition1Replicas)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(partition1Replicas) .setReplicas(partition1Replicas)
.setIsNew(true) .setIsNew(true)
).asJava, ).asJava,
@ -2401,7 +2401,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(leaderEpoch + leaderEpochIncrement) .setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
.setIsr(partition0Replicas) .setIsr(partition0Replicas)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(partition0Replicas) .setReplicas(partition0Replicas)
.setIsNew(true), .setIsNew(true),
new LeaderAndIsrPartitionState() new LeaderAndIsrPartitionState()
@ -2411,7 +2411,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(leaderEpoch + leaderEpochIncrement) .setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
.setIsr(partition1Replicas) .setIsr(partition1Replicas)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(partition1Replicas) .setReplicas(partition1Replicas)
.setIsNew(true) .setIsNew(true)
).asJava, ).asJava,
@ -2758,7 +2758,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(epoch) .setLeaderEpoch(epoch)
.setIsr(brokerList) .setIsr(brokerList)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(brokerList) .setReplicas(brokerList)
.setIsNew(true)).asJava, .setIsNew(true)).asJava,
topicIds, topicIds,
@ -2803,7 +2803,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(epoch) .setLeaderEpoch(epoch)
.setIsr(brokerList) .setIsr(brokerList)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(brokerList) .setReplicas(brokerList)
.setIsNew(true)).asJava, .setIsNew(true)).asJava,
topicIds, topicIds,
@ -2840,7 +2840,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(brokerList) .setIsr(brokerList)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(brokerList) .setReplicas(brokerList)
.setIsNew(true)).asJava, .setIsNew(true)).asJava,
topicIds, topicIds,
@ -2903,7 +2903,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(epoch) .setLeaderEpoch(epoch)
.setIsr(brokerList) .setIsr(brokerList)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(brokerList) .setReplicas(brokerList)
.setIsNew(true)).asJava, .setIsNew(true)).asJava,
topicIds, topicIds,
@ -2944,7 +2944,7 @@ class ReplicaManagerTest {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(epoch) .setLeaderEpoch(epoch)
.setIsr(brokerList) .setIsr(brokerList)
.setZkVersion(0) .setPartitionEpoch(0)
.setReplicas(brokerList) .setReplicas(brokerList)
.setIsNew(true)).asJava, .setIsNew(true)).asJava,
topicIds, topicIds,
@ -3027,7 +3027,7 @@ class ReplicaManagerTest {
.setLeader(leaderAndIsr.leader) .setLeader(leaderAndIsr.leader)
.setLeaderEpoch(leaderAndIsr.leaderEpoch) .setLeaderEpoch(leaderAndIsr.leaderEpoch)
.setIsr(leaderAndIsr.isr.map(Int.box).asJava) .setIsr(leaderAndIsr.isr.map(Int.box).asJava)
.setZkVersion(leaderAndIsr.zkVersion) .setPartitionEpoch(leaderAndIsr.partitionEpoch)
.setReplicas(replicas.map(Int.box).asJava) .setReplicas(replicas.map(Int.box).asJava)
.setIsNew(isNew) .setIsNew(isNew)

View File

@ -251,7 +251,7 @@ class RequestQuotaTest extends BaseRequestTest {
.setLeader(brokerId) .setLeader(brokerId)
.setLeaderEpoch(Int.MaxValue) .setLeaderEpoch(Int.MaxValue)
.setIsr(List(brokerId).asJava) .setIsr(List(brokerId).asJava)
.setZkVersion(2) .setPartitionEpoch(2)
.setReplicas(Seq(brokerId).asJava) .setReplicas(Seq(brokerId).asJava)
.setIsNew(true)).asJava, .setIsNew(true)).asJava,
getTopicIds().asJava, getTopicIds().asJava,
@ -263,7 +263,7 @@ class RequestQuotaTest extends BaseRequestTest {
.setTopicName(tp.topic()) .setTopicName(tp.topic())
.setPartitionStates(Seq(new StopReplicaPartitionState() .setPartitionStates(Seq(new StopReplicaPartitionState()
.setPartitionIndex(tp.partition()) .setPartitionIndex(tp.partition())
.setLeaderEpoch(LeaderAndIsr.initialLeaderEpoch + 2) .setLeaderEpoch(LeaderAndIsr.InitialLeaderEpoch + 2)
.setDeletePartition(true)).asJava) .setDeletePartition(true)).asJava)
).asJava ).asJava
new StopReplicaRequest.Builder(ApiKeys.STOP_REPLICA.latestVersion, brokerId, new StopReplicaRequest.Builder(ApiKeys.STOP_REPLICA.latestVersion, brokerId,

View File

@ -53,13 +53,13 @@ class StopReplicaRequestTest extends BaseRequestTest {
.setTopicName(tp0.topic()) .setTopicName(tp0.topic())
.setPartitionStates(Seq(new StopReplicaPartitionState() .setPartitionStates(Seq(new StopReplicaPartitionState()
.setPartitionIndex(tp0.partition()) .setPartitionIndex(tp0.partition())
.setLeaderEpoch(LeaderAndIsr.initialLeaderEpoch + 2) .setLeaderEpoch(LeaderAndIsr.InitialLeaderEpoch + 2)
.setDeletePartition(true)).asJava), .setDeletePartition(true)).asJava),
new StopReplicaTopicState() new StopReplicaTopicState()
.setTopicName(tp1.topic()) .setTopicName(tp1.topic())
.setPartitionStates(Seq(new StopReplicaPartitionState() .setPartitionStates(Seq(new StopReplicaPartitionState()
.setPartitionIndex(tp1.partition()) .setPartitionIndex(tp1.partition())
.setLeaderEpoch(LeaderAndIsr.initialLeaderEpoch + 2) .setLeaderEpoch(LeaderAndIsr.InitialLeaderEpoch + 2)
.setDeletePartition(true)).asJava) .setDeletePartition(true)).asJava)
).asJava ).asJava

View File

@ -1274,10 +1274,10 @@ object TestUtils extends Logging {
future future
} }
def completeIsrUpdate(newZkVersion: Int): Unit = { def completeIsrUpdate(newPartitionEpoch: Int): Unit = {
if (inFlight.compareAndSet(true, false)) { if (inFlight.compareAndSet(true, false)) {
val item = isrUpdates.dequeue() val item = isrUpdates.dequeue()
item.future.complete(item.leaderAndIsr.withZkVersion(newZkVersion)) item.future.complete(item.leaderAndIsr.withPartitionEpoch(newPartitionEpoch))
} else { } else {
fail("Expected an in-flight ISR update, but there was none") fail("Expected an in-flight ISR update, but there was none")
} }

View File

@ -919,13 +919,13 @@ class KafkaZkClientTest extends QuorumTestHarness {
stat stat
} }
private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] = private def leaderIsrAndControllerEpochs(state: Int, partitionEpoch: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] =
Map( Map(
topicPartition10 -> LeaderIsrAndControllerEpoch( topicPartition10 -> LeaderIsrAndControllerEpoch(
LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), LeaderRecoveryState.RECOVERED, zkVersion = zkVersion), LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), LeaderRecoveryState.RECOVERED, partitionEpoch = partitionEpoch),
controllerEpoch = 4), controllerEpoch = 4),
topicPartition11 -> LeaderIsrAndControllerEpoch( topicPartition11 -> LeaderIsrAndControllerEpoch(
LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), LeaderRecoveryState.RECOVERED, zkVersion = zkVersion), LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), LeaderRecoveryState.RECOVERED, partitionEpoch = partitionEpoch),
controllerEpoch = 4)) controllerEpoch = 4))
val initialLeaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch] = val initialLeaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch] =
@ -934,8 +934,8 @@ class KafkaZkClientTest extends QuorumTestHarness {
val initialLeaderIsrs: Map[TopicPartition, LeaderAndIsr] = val initialLeaderIsrs: Map[TopicPartition, LeaderAndIsr] =
initialLeaderIsrAndControllerEpochs.map { case (k, v) => k -> v.leaderAndIsr } initialLeaderIsrAndControllerEpochs.map { case (k, v) => k -> v.leaderAndIsr }
private def leaderIsrs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderAndIsr] = private def leaderIsrs(state: Int, partitionEpoch: Int): Map[TopicPartition, LeaderAndIsr] =
leaderIsrAndControllerEpochs(state, zkVersion).map { case (k, v) => k -> v.leaderAndIsr } leaderIsrAndControllerEpochs(state, partitionEpoch).map { case (k, v) => k -> v.leaderAndIsr }
private def checkUpdateLeaderAndIsrResult( private def checkUpdateLeaderAndIsrResult(
expectedSuccessfulPartitions: Map[TopicPartition, LeaderAndIsr], expectedSuccessfulPartitions: Map[TopicPartition, LeaderAndIsr],
@ -1000,26 +1000,26 @@ class KafkaZkClientTest extends QuorumTestHarness {
// successful updates // successful updates
checkUpdateLeaderAndIsrResult( checkUpdateLeaderAndIsrResult(
leaderIsrs(state = 1, zkVersion = 1), leaderIsrs(state = 1, partitionEpoch = 1),
mutable.ArrayBuffer.empty, mutable.ArrayBuffer.empty,
Map.empty, Map.empty,
zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4, controllerEpochZkVersion)) zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, partitionEpoch = 0),controllerEpoch = 4, controllerEpochZkVersion))
// Try to update with wrong ZK version // Try to update with wrong ZK version
checkUpdateLeaderAndIsrResult( checkUpdateLeaderAndIsrResult(
Map.empty, Map.empty,
ArrayBuffer(topicPartition10, topicPartition11), ArrayBuffer(topicPartition10, topicPartition11),
Map.empty, Map.empty,
zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4, controllerEpochZkVersion)) zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, partitionEpoch = 0),controllerEpoch = 4, controllerEpochZkVersion))
// Trigger successful, to be retried and failed partitions in same call // Trigger successful, to be retried and failed partitions in same call
val mixedState = Map( val mixedState = Map(
topicPartition10 -> LeaderAndIsr(leader = 1, leaderEpoch = 2, isr = List(4, 5), LeaderRecoveryState.RECOVERED, zkVersion = 1), topicPartition10 -> LeaderAndIsr(leader = 1, leaderEpoch = 2, isr = List(4, 5), LeaderRecoveryState.RECOVERED, partitionEpoch = 1),
topicPartition11 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), LeaderRecoveryState.RECOVERED, zkVersion = 0), topicPartition11 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), LeaderRecoveryState.RECOVERED, partitionEpoch = 0),
topicPartition20 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), LeaderRecoveryState.RECOVERED, zkVersion = 0)) topicPartition20 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), LeaderRecoveryState.RECOVERED, partitionEpoch = 0))
checkUpdateLeaderAndIsrResult( checkUpdateLeaderAndIsrResult(
leaderIsrs(state = 2, zkVersion = 2).filter { case (tp, _) => tp == topicPartition10 }, leaderIsrs(state = 2, partitionEpoch = 2).filter { case (tp, _) => tp == topicPartition10 },
ArrayBuffer(topicPartition11), ArrayBuffer(topicPartition11),
Map( Map(
topicPartition20 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic2/partitions/0/state")), topicPartition20 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic2/partitions/0/state")),
@ -1030,7 +1030,7 @@ class KafkaZkClientTest extends QuorumTestHarness {
leaderIsrAndControllerEpochs: Map[TopicPartition,LeaderIsrAndControllerEpoch], leaderIsrAndControllerEpochs: Map[TopicPartition,LeaderIsrAndControllerEpoch],
topicPartition: TopicPartition, topicPartition: TopicPartition,
response: GetDataResponse): Unit = { response: GetDataResponse): Unit = {
val zkVersion = leaderIsrAndControllerEpochs(topicPartition).leaderAndIsr.zkVersion val zkVersion = leaderIsrAndControllerEpochs(topicPartition).leaderAndIsr.partitionEpoch
assertEquals(Code.OK, response.resultCode) assertEquals(Code.OK, response.resultCode)
assertEquals(TopicPartitionStateZNode.path(topicPartition), response.path) assertEquals(TopicPartitionStateZNode.path(topicPartition), response.path)
assertEquals(Some(topicPartition), response.ctx) assertEquals(Some(topicPartition), response.ctx)
@ -1106,20 +1106,20 @@ class KafkaZkClientTest extends QuorumTestHarness {
assertEquals( assertEquals(
expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(1)), expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(1)),
zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0), controllerEpochZkVersion).map { zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 1, partitionEpoch = 0), controllerEpochZkVersion).map {
eraseMetadataAndStat}.toList) eraseMetadataAndStat}.toList)
// Mismatch controller epoch zkVersion // Mismatch controller epoch zkVersion
assertThrows(classOf[ControllerMovedException], () => zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0), controllerEpochZkVersion + 1)) assertThrows(classOf[ControllerMovedException], () => zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 1, partitionEpoch = 0), controllerEpochZkVersion + 1))
val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11) val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
assertEquals(2, getResponses.size) assertEquals(2, getResponses.size)
topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0), tp, r)} topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(leaderIsrAndControllerEpochs(state = 1, partitionEpoch = 0), tp, r)}
// Other ZK client can also write the state of a partition // Other ZK client can also write the state of a partition
assertEquals( assertEquals(
expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(2)), expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(2)),
otherZkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 2, zkVersion = 1), controllerEpochZkVersion).map { otherZkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 2, partitionEpoch = 1), controllerEpochZkVersion).map {
eraseMetadataAndStat}.toList) eraseMetadataAndStat}.toList)
} }

View File

@ -166,7 +166,7 @@ public class ReplicaFetcherThreadBenchmark {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(0) .setLeaderEpoch(0)
.setIsr(replicas) .setIsr(replicas)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true); .setIsNew(true);

View File

@ -158,7 +158,7 @@ public class PartitionMakeFollowerBenchmark {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(0) .setLeaderEpoch(0)
.setIsr(replicas) .setIsr(replicas)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true); .setIsNew(true);
return partition.makeFollower(partitionState, offsetCheckpoints, topicId); return partition.makeFollower(partitionState, offsetCheckpoints, topicId);

View File

@ -118,7 +118,7 @@ public class UpdateFollowerFetchStateBenchmark {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(0) .setLeaderEpoch(0)
.setIsr(replicas) .setIsr(replicas)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true); .setIsNew(true);
IsrChangeListener isrChangeListener = Mockito.mock(IsrChangeListener.class); IsrChangeListener isrChangeListener = Mockito.mock(IsrChangeListener.class);

View File

@ -222,7 +222,7 @@ public class PartitionCreationBench {
.setLeader(0) .setLeader(0)
.setLeaderEpoch(0) .setLeaderEpoch(0)
.setIsr(inSync) .setIsr(inSync)
.setZkVersion(1) .setPartitionEpoch(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true); .setIsNew(true);

View File

@ -194,7 +194,7 @@ public class PartitionRegistration {
setLeader(leader). setLeader(leader).
setLeaderEpoch(leaderEpoch). setLeaderEpoch(leaderEpoch).
setIsr(Replicas.toList(isr)). setIsr(Replicas.toList(isr)).
setZkVersion(partitionEpoch). setPartitionEpoch(partitionEpoch).
setReplicas(Replicas.toList(replicas)). setReplicas(Replicas.toList(replicas)).
setAddingReplicas(Replicas.toList(addingReplicas)). setAddingReplicas(Replicas.toList(addingReplicas)).
setRemovingReplicas(Replicas.toList(removingReplicas)). setRemovingReplicas(Replicas.toList(removingReplicas)).

View File

@ -85,7 +85,7 @@ public class PartitionRegistrationTest {
setLeader(1). setLeader(1).
setLeaderEpoch(123). setLeaderEpoch(123).
setIsr(Arrays.asList(1, 2)). setIsr(Arrays.asList(1, 2)).
setZkVersion(456). setPartitionEpoch(456).
setReplicas(Arrays.asList(1, 2, 3)). setReplicas(Arrays.asList(1, 2, 3)).
setAddingReplicas(Collections.emptyList()). setAddingReplicas(Collections.emptyList()).
setRemovingReplicas(Collections.emptyList()). setRemovingReplicas(Collections.emptyList()).
@ -98,7 +98,7 @@ public class PartitionRegistrationTest {
setLeader(2). setLeader(2).
setLeaderEpoch(234). setLeaderEpoch(234).
setIsr(Arrays.asList(2, 3, 4)). setIsr(Arrays.asList(2, 3, 4)).
setZkVersion(567). setPartitionEpoch(567).
setReplicas(Arrays.asList(2, 3, 4)). setReplicas(Arrays.asList(2, 3, 4)).
setAddingReplicas(Collections.emptyList()). setAddingReplicas(Collections.emptyList()).
setRemovingReplicas(Collections.emptyList()). setRemovingReplicas(Collections.emptyList()).