KAFKA-18486 Remove ReplicaManager#becomeLeaderOrFollower from testFencedErrorCausedByBecomeLeader and other similar methods (#19966)
CI / build (push) Waiting to run Details

The included tests are as follows:

- testFencedErrorCausedByBecomeLeader
- testFetchBeyondHighWatermark
- testFetchFollowerNotAllowedForOlderClients
- testFetchFromFollowerShouldNotRunPreferLeaderSelect
- testFetchFromLeaderAlwaysAllowed
- testFetchMessagesWhenNotFollowerForOnePartition
- testFetchRequestRateMetrics
- testFetchShouldReturnImmediatelyWhenPreferredReadReplicaIsDefined
- testFollowerFetchWithDefaultSelectorNoForcedHwPropagation
- testFollowerStateNotUpdatedIfLogReadFails

I removed `testFetchMessagesWithInconsistentTopicId ` as it's no longer
needed, the "topicId" is now mandatory and cannot be null in our new
implementation.

Reviewers: Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Lan Ding
<isDing_L@163.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Nick Guo 2025-06-16 21:43:41 +08:00 committed by GitHub
parent 86419e9b8a
commit fd70290633
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 65 additions and 261 deletions

View File

@ -512,37 +512,21 @@ class ReplicaManagerTest {
} }
} }
@Test @ParameterizedTest
def testFencedErrorCausedByBecomeLeader(): Unit = { @ValueSource(ints = Array(0, 1, 10))
testFencedErrorCausedByBecomeLeader(0) def testFencedErrorCausedByBecomeLeader(loopEpochChange: Int): Unit = {
testFencedErrorCausedByBecomeLeader(1) val localId = 0
testFencedErrorCausedByBecomeLeader(10)
}
private[this] def testFencedErrorCausedByBecomeLeader(loopEpochChange: Int): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time)) val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))
try { try {
val brokerList = Seq[Integer](0, 1).asJava
val topicPartition = new TopicPartition(topic, 0) val topicPartition = new TopicPartition(topic, 0)
replicaManager.createPartition(topicPartition) replicaManager.createPartition(topicPartition)
.createLogIfNotExists(isNew = false, isFutureReplica = false, .createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None) new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None)
def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true, topicName = topic, topicId = topicIds(topic))
Seq(new LeaderAndIsrRequest.PartitionState() val leaderImage = imageFromTopics(leaderDelta.apply())
.setTopicName(topic) replicaManager.applyDelta(leaderDelta, leaderImage)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(epoch)
.setIsr(brokerList)
.setPartitionEpoch(0)
.setReplicas(brokerList)
.setIsNew(true)).asJava,
topicIds.asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ())
val partition = replicaManager.getPartitionOrException(new TopicPartition(topic, 0)) val partition = replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ == partition.log.get.dir.getParentFile).size) assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ == partition.log.get.dir.getParentFile).size)
@ -554,7 +538,12 @@ class ReplicaManagerTest {
// make sure the future log is created // make sure the future log is created
replicaManager.futureLocalLogOrException(topicPartition) replicaManager.futureLocalLogOrException(topicPartition)
assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size) assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
(1 to loopEpochChange).foreach(epoch => replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(epoch), (_, _) => ())) (1 to loopEpochChange).foreach(
epoch => {
val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true, topicName = topic, topicId = topicIds(topic), leaderEpoch = epoch)
replicaManager.applyDelta(leaderDelta, imageFromTopics(leaderDelta.apply()))
}
)
// wait for the ReplicaAlterLogDirsThread to complete // wait for the ReplicaAlterLogDirsThread to complete
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
replicaManager.replicaAlterLogDirsManager.shutdownIdleFetcherThreads() replicaManager.replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
@ -974,25 +963,16 @@ class ReplicaManagerTest {
try { try {
val brokerList = Seq[Integer](0, 1, 2).asJava val brokerList = Seq[Integer](0, 1, 2).asJava
val partition = rm.createPartition(new TopicPartition(topic, 0)) val tp = new TopicPartition(topic, 0)
val partition = rm.createPartition(tp)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None) new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
// Make this replica the leader. // Make this replica the leader.
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, val leaderDelta = createLeaderDelta(topicId, tp, leaderId = 0, replicas = brokerList, isr = brokerList)
Seq(new LeaderAndIsrRequest.PartitionState() val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
.setTopicName(topic) rm.applyDelta(leaderDelta, leaderMetadataImage)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(0)
.setIsr(brokerList)
.setPartitionEpoch(0)
.setReplicas(brokerList)
.setIsNew(false)).asJava,
topicIds.asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2, "host2", 2)).asJava).build()
rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
rm.getPartitionOrException(new TopicPartition(topic, 0)) rm.getPartitionOrException(new TopicPartition(topic, 0))
.localLogOrException .localLogOrException
@ -1030,6 +1010,7 @@ class ReplicaManagerTest {
@Test @Test
def testFollowerStateNotUpdatedIfLogReadFails(): Unit = { def testFollowerStateNotUpdatedIfLogReadFails(): Unit = {
val localId = 0
val maxFetchBytes = 1024 * 1024 val maxFetchBytes = 1024 * 1024
val aliveBrokersIds = Seq(0, 1) val aliveBrokersIds = Seq(0, 1)
val leaderEpoch = 5 val leaderEpoch = 5
@ -1038,25 +1019,11 @@ class ReplicaManagerTest {
try { try {
val tp = new TopicPartition(topic, 0) val tp = new TopicPartition(topic, 0)
val tidp = new TopicIdPartition(topicId, tp) val tidp = new TopicIdPartition(topicId, tp)
val replicas = aliveBrokersIds.toList.map(Int.box).asJava
// Broker 0 becomes leader of the partition // Broker 0 becomes leader of the partition
val leaderAndIsrPartitionState = new LeaderAndIsrRequest.PartitionState() val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true, topicName = topic, topicId = topicIds(topic), leaderEpoch = leaderEpoch)
.setTopicName(topic) val leaderImage = imageFromTopics(leaderDelta.apply())
.setPartitionIndex(0) replicaManager.applyDelta(leaderDelta, leaderImage)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(leaderEpoch)
.setIsr(replicas)
.setPartitionEpoch(0)
.setReplicas(replicas)
.setIsNew(true)
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
Seq(leaderAndIsrPartitionState).asJava,
Collections.singletonMap(topic, topicId),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
assertEquals(Errors.NONE, leaderAndIsrResponse.error)
// Follower replica state is initialized, but initial state is not known // Follower replica state is initialized, but initial state is not known
assertTrue(replicaManager.onlinePartition(tp).isDefined) assertTrue(replicaManager.onlinePartition(tp).isDefined)
@ -1129,6 +1096,7 @@ class ReplicaManagerTest {
@Test @Test
def testFetchMessagesWithInconsistentTopicId(): Unit = { def testFetchMessagesWithInconsistentTopicId(): Unit = {
val localId = 0
val maxFetchBytes = 1024 * 1024 val maxFetchBytes = 1024 * 1024
val aliveBrokersIds = Seq(0, 1) val aliveBrokersIds = Seq(0, 1)
val leaderEpoch = 5 val leaderEpoch = 5
@ -1137,25 +1105,11 @@ class ReplicaManagerTest {
try { try {
val tp = new TopicPartition(topic, 0) val tp = new TopicPartition(topic, 0)
val tidp = new TopicIdPartition(topicId, tp) val tidp = new TopicIdPartition(topicId, tp)
val replicas = aliveBrokersIds.toList.map(Int.box).asJava
// Broker 0 becomes leader of the partition // Broker 0 becomes leader of the partition
val leaderAndIsrPartitionState = new LeaderAndIsrRequest.PartitionState() val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true, topicName = topic, topicId = topicIds(topic), leaderEpoch = leaderEpoch)
.setTopicName(topic) val leaderImage = imageFromTopics(leaderDelta.apply())
.setPartitionIndex(0) replicaManager.applyDelta(leaderDelta, leaderImage)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(leaderEpoch)
.setIsr(replicas)
.setPartitionEpoch(0)
.setReplicas(replicas)
.setIsNew(true)
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
Seq(leaderAndIsrPartitionState).asJava,
Collections.singletonMap(topic, topicId),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
assertEquals(Errors.NONE, leaderAndIsrResponse.error)
assertEquals(Some(topicId), replicaManager.getPartitionOrException(tp).topicId) assertEquals(Some(topicId), replicaManager.getPartitionOrException(tp).topicId)
@ -1195,54 +1149,6 @@ class ReplicaManagerTest {
val fetch2 = successfulFetch.headOption.filter(_._1 == zeroTidp).map(_._2) val fetch2 = successfulFetch.headOption.filter(_._1 == zeroTidp).map(_._2)
assertTrue(fetch2.isDefined) assertTrue(fetch2.isDefined)
assertEquals(Errors.NONE, fetch2.get.error) assertEquals(Errors.NONE, fetch2.get.error)
// Next create a topic without a topic ID written in the log.
val tp2 = new TopicPartition("noIdTopic", 0)
val tidp2 = new TopicIdPartition(Uuid.randomUuid(), tp2)
// Broker 0 becomes leader of the partition
val leaderAndIsrPartitionState2 = new LeaderAndIsrRequest.PartitionState()
.setTopicName("noIdTopic")
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(leaderEpoch)
.setIsr(replicas)
.setPartitionEpoch(0)
.setReplicas(replicas)
.setIsNew(true)
val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
Seq(leaderAndIsrPartitionState2).asJava,
Collections.emptyMap(),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest2, (_, _) => ())
assertEquals(Errors.NONE, leaderAndIsrResponse2.error)
assertEquals(None, replicaManager.getPartitionOrException(tp2).topicId)
// Fetch messages simulating the request containing a topic ID. We should not have an error.
fetchPartitions(
replicaManager,
replicaId = 1,
fetchInfos = Seq(tidp2 -> validFetchPartitionData),
responseCallback = callback
)
val fetch3 = successfulFetch.headOption.filter(_._1 == tidp2).map(_._2)
assertTrue(fetch3.isDefined)
assertEquals(Errors.NONE, fetch3.get.error)
// Fetch messages simulating the request not containing a topic ID. We should not have an error.
val zeroTidp2 = new TopicIdPartition(Uuid.ZERO_UUID, tidp2.topicPartition)
fetchPartitions(
replicaManager,
replicaId = 1,
fetchInfos = Seq(zeroTidp2 -> validFetchPartitionData),
responseCallback = callback
)
val fetch4 = successfulFetch.headOption.filter(_._1 == zeroTidp2).map(_._2)
assertTrue(fetch4.isDefined)
assertEquals(Errors.NONE, fetch4.get.error)
} finally { } finally {
replicaManager.shutdown(checkpointHW = false) replicaManager.shutdown(checkpointHW = false)
} }
@ -1257,6 +1163,7 @@ class ReplicaManagerTest {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2)) val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2))
try { try {
val leaderEpoch = 0
// Create 2 partitions, assign replica 0 as the leader for both a different follower (1 and 2) for each // Create 2 partitions, assign replica 0 as the leader for both a different follower (1 and 2) for each
val tp0 = new TopicPartition(topic, 0) val tp0 = new TopicPartition(topic, 0)
val tp1 = new TopicPartition(topic, 1) val tp1 = new TopicPartition(topic, 1)
@ -1267,34 +1174,14 @@ class ReplicaManagerTest {
replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
val partition0Replicas = Seq[Integer](0, 1).asJava val partition0Replicas = Seq[Integer](0, 1).asJava
val partition1Replicas = Seq[Integer](0, 2).asJava val partition1Replicas = Seq[Integer](0, 2).asJava
val topicIds = Map(tp0.topic -> topicId, tp1.topic -> topicId).asJava
val leaderEpoch = 0 val leaderDelta0 = createLeaderDelta(topicIds(topic), tp0, 0, partition0Replicas, partition0Replicas)
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, val leaderMetadataImage0 = imageFromTopics(leaderDelta0.apply())
Seq( replicaManager.applyDelta(leaderDelta0, leaderMetadataImage0)
new LeaderAndIsrRequest.PartitionState()
.setTopicName(tp0.topic) val leaderDelta1 = createLeaderDelta(topicIds(topic), tp1, 0, partition1Replicas, partition1Replicas)
.setPartitionIndex(tp0.partition) val leaderMetadataImage1 = imageFromTopics(leaderDelta1.apply())
.setControllerEpoch(0) replicaManager.applyDelta(leaderDelta1, leaderMetadataImage1)
.setLeader(leaderEpoch)
.setLeaderEpoch(0)
.setIsr(partition0Replicas)
.setPartitionEpoch(0)
.setReplicas(partition0Replicas)
.setIsNew(true),
new LeaderAndIsrRequest.PartitionState()
.setTopicName(tp1.topic)
.setPartitionIndex(tp1.partition)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(leaderEpoch)
.setIsr(partition1Replicas)
.setPartitionEpoch(0)
.setReplicas(partition1Replicas)
.setIsNew(true)
).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
// Append a couple of messages. // Append a couple of messages.
for (i <- 1 to 2) { for (i <- 1 to 2) {
@ -1642,27 +1529,13 @@ class ReplicaManagerTest {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time),
propsModifier = props => props.put(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG, classOf[MockReplicaSelector].getName)) propsModifier = props => props.put(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG, classOf[MockReplicaSelector].getName))
try { try {
val leaderBrokerId = 0
val followerBrokerId = 1
val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava
val tp0 = new TopicPartition(topic, 0) val tp0 = new TopicPartition(topic, 0)
val tidp0 = new TopicIdPartition(topicId, tp0) val tidp0 = new TopicIdPartition(topicId, tp0)
// Make this replica the follower // Make this replica the follower
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, val followerDelta = createFollowerDelta(topicId, tp0, 0, 1, 1)
Seq(new LeaderAndIsrRequest.PartitionState() val followerMetadataImage = imageFromTopics(followerDelta.apply())
.setTopicName(topic) replicaManager.applyDelta(followerDelta, followerMetadataImage)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(1)
.setLeaderEpoch(1)
.setIsr(brokerList)
.setPartitionEpoch(0)
.setReplicas(brokerList)
.setIsNew(false)).asJava,
Collections.singletonMap(topic, topicId),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) => ())
val metadata = new DefaultClientMetadata("rack-a", "client-id", val metadata = new DefaultClientMetadata("rack-a", "client-id",
InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, "default") InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, "default")
@ -1687,13 +1560,13 @@ class ReplicaManagerTest {
@Test @Test
def testFetchShouldReturnImmediatelyWhenPreferredReadReplicaIsDefined(): Unit = { def testFetchShouldReturnImmediatelyWhenPreferredReadReplicaIsDefined(): Unit = {
val localId = 0
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time),
propsModifier = props => props.put(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG, "org.apache.kafka.common.replica.RackAwareReplicaSelector")) propsModifier = props => props.put(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG, "org.apache.kafka.common.replica.RackAwareReplicaSelector"))
try { try {
val leaderBrokerId = 0 val leaderBrokerId = 0
val followerBrokerId = 1 val followerBrokerId = 1
val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava
val tp0 = new TopicPartition(topic, 0) val tp0 = new TopicPartition(topic, 0)
val tidp0 = new TopicIdPartition(topicId, tp0) val tidp0 = new TopicIdPartition(topicId, tp0)
@ -1707,20 +1580,9 @@ class ReplicaManagerTest {
// Make this replica the leader // Make this replica the leader
val leaderEpoch = 1 val leaderEpoch = 1
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true, topicName = topic, topicId = topicIds(topic), leaderEpoch = leaderEpoch)
Seq(new LeaderAndIsrRequest.PartitionState() val leaderImage = imageFromTopics(leaderDelta.apply())
.setTopicName(topic) replicaManager.applyDelta(leaderDelta, leaderImage)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(leaderEpoch)
.setIsr(brokerList)
.setPartitionEpoch(0)
.setReplicas(brokerList)
.setIsNew(false)).asJava,
Collections.singletonMap(topic, topicId),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) => ())
// The leader must record the follower's fetch offset to make it eligible for follower fetch selection // The leader must record the follower's fetch offset to make it eligible for follower fetch selection
val followerFetchData = new PartitionData(topicId, 0L, 0L, Int.MaxValue, Optional.of(Int.box(leaderEpoch)), Optional.empty[Integer]) val followerFetchData = new PartitionData(topicId, 0L, 0L, Int.MaxValue, Optional.of(Int.box(leaderEpoch)), Optional.empty[Integer])
@ -1767,27 +1629,15 @@ class ReplicaManagerTest {
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true, topicId = Optional.of(topicId)) leaderBrokerId, countDownLatch, expectTruncation = true, topicId = Optional.of(topicId))
try { try {
val brokerList = Seq[Integer](0, 1).asJava
val tp0 = new TopicPartition(topic, 0) val tp0 = new TopicPartition(topic, 0)
val tidp0 = new TopicIdPartition(topicId, tp0) val tidp0 = new TopicIdPartition(topicId, tp0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
// Make this replica the follower // Make this replica the follower
val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, val followerDelta = createFollowerDelta(topicId, tp0, 1, 0, 1)
Seq(new LeaderAndIsrRequest.PartitionState() val followerImage = imageFromTopics(followerDelta.apply())
.setTopicName(topic) replicaManager.applyDelta(followerDelta, followerImage)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(1)
.setIsr(brokerList)
.setPartitionEpoch(0)
.setReplicas(brokerList)
.setIsNew(false)).asJava,
Collections.singletonMap(topic, topicId),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
val simpleRecords = Seq(new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)) val simpleRecords = Seq(new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))
val appendResult = appendRecords(replicaManager, tp0, val appendResult = appendRecords(replicaManager, tp0,
@ -1873,21 +1723,10 @@ class ReplicaManagerTest {
val tidp0 = new TopicIdPartition(topicId, tp0) val tidp0 = new TopicIdPartition(topicId, tp0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
val partition0Replicas = Seq[Integer](0, 1).asJava
val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, val followerDelta = createFollowerDelta(topicId, tp0, 0, 1)
Seq(new LeaderAndIsrRequest.PartitionState() val followerImage = imageFromTopics(followerDelta.apply())
.setTopicName(tp0.topic) replicaManager.applyDelta(followerDelta, followerImage)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(1)
.setLeaderEpoch(0)
.setIsr(partition0Replicas)
.setPartitionEpoch(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
topicIds.asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ())
// Fetch from follower, with non-empty ClientMetadata (FetchRequest v11+) // Fetch from follower, with non-empty ClientMetadata (FetchRequest v11+)
val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS, "") val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS, "")
@ -1909,6 +1748,7 @@ class ReplicaManagerTest {
@Test @Test
def testFetchRequestRateMetrics(): Unit = { def testFetchRequestRateMetrics(): Unit = {
val localId = 0
val mockTimer = new MockTimer(time) val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
@ -1917,22 +1757,10 @@ class ReplicaManagerTest {
val tidp0 = new TopicIdPartition(topicId, tp0) val tidp0 = new TopicIdPartition(topicId, tp0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
val partition0Replicas = Seq[Integer](0, 1).asJava
val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true, topicName = topic, topicId = topicIds(topic), leaderEpoch = 1)
Seq(new LeaderAndIsrRequest.PartitionState() val leaderImage = imageFromTopics(leaderDelta.apply())
.setTopicName(tp0.topic) replicaManager.applyDelta(leaderDelta, leaderImage)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(1)
.setIsr(partition0Replicas)
.setPartitionEpoch(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
topicIds.asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
def assertMetricCount(expected: Int): Unit = { def assertMetricCount(expected: Int): Unit = {
assertEquals(expected, replicaManager.brokerTopicStats.allTopicsStats.totalFetchRequestRate.count) assertEquals(expected, replicaManager.brokerTopicStats.allTopicsStats.totalFetchRequestRate.count)
@ -2026,6 +1854,7 @@ class ReplicaManagerTest {
@Test @Test
def testFetchFromLeaderAlwaysAllowed(): Unit = { def testFetchFromLeaderAlwaysAllowed(): Unit = {
val localId = 0
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1)) val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1))
try { try {
@ -2033,22 +1862,10 @@ class ReplicaManagerTest {
val tidp0 = new TopicIdPartition(topicId, tp0) val tidp0 = new TopicIdPartition(topicId, tp0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
val partition0Replicas = Seq[Integer](0, 1).asJava
val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true, topicName = topic, topicId = topicIds(topic), leaderEpoch = 1)
Seq(new LeaderAndIsrRequest.PartitionState() val leaderImage = imageFromTopics(leaderDelta.apply())
.setTopicName(tp0.topic) replicaManager.applyDelta(leaderDelta, leaderImage)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(1)
.setIsr(partition0Replicas)
.setPartitionEpoch(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
topicIds.asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS, "") val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS, "")
var partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, 100, var partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, 100,
@ -2828,6 +2645,7 @@ class ReplicaManagerTest {
allLogs.put(topicPartitionObj, mockLog) allLogs.put(topicPartitionObj, mockLog)
when(mockLogMgr.allLogs).thenReturn(allLogs.values.asScala) when(mockLogMgr.allLogs).thenReturn(allLogs.values.asScala)
when(mockLogMgr.isLogDirOnline(anyString)).thenReturn(true) when(mockLogMgr.isLogDirOnline(anyString)).thenReturn(true)
when(mockLogMgr.directoryId(anyString)).thenReturn(None)
val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId) val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId)
val aliveBrokers = aliveBrokerIds.map(brokerId => new Node(brokerId, s"host$brokerId", brokerId)) val aliveBrokers = aliveBrokerIds.map(brokerId => new Node(brokerId, s"host$brokerId", brokerId))
@ -3414,8 +3232,6 @@ class ReplicaManagerTest {
.setTopicId(topicIds.get(topic)) .setTopicId(topicIds.get(topic))
.setIsr(partition1Replicas) .setIsr(partition1Replicas)
.setReplicas(partition1Replicas) .setReplicas(partition1Replicas)
.setRemovingReplicas(util.List.of())
.setAddingReplicas(util.List.of())
.setLeader(partition1Replicas.get(0)) .setLeader(partition1Replicas.get(0))
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setPartitionEpoch(0) .setPartitionEpoch(0)
@ -3431,8 +3247,6 @@ class ReplicaManagerTest {
.setTopicId(topicIds.get(topic)) .setTopicId(topicIds.get(topic))
.setIsr(partition1Replicas) .setIsr(partition1Replicas)
.setReplicas(partition1Replicas) .setReplicas(partition1Replicas)
.setRemovingReplicas(util.List.of())
.setAddingReplicas(util.List.of())
.setLeader(partition1Replicas.get(1)) .setLeader(partition1Replicas.get(1))
.setLeaderEpoch(leaderEpoch + leaderEpochIncrement) .setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
.setPartitionEpoch(0) .setPartitionEpoch(0)
@ -3479,8 +3293,6 @@ class ReplicaManagerTest {
.setTopicId(topicIds.get(topic)) .setTopicId(topicIds.get(topic))
.setIsr(partition1Replicas) .setIsr(partition1Replicas)
.setReplicas(partition1Replicas) .setReplicas(partition1Replicas)
.setRemovingReplicas(util.List.of())
.setAddingReplicas(util.List.of())
.setLeader(partition1Replicas.get(0)) .setLeader(partition1Replicas.get(0))
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setPartitionEpoch(0) .setPartitionEpoch(0)
@ -3496,8 +3308,6 @@ class ReplicaManagerTest {
.setTopicId(topicIds.get(topic)) .setTopicId(topicIds.get(topic))
.setIsr(partition1Replicas) .setIsr(partition1Replicas)
.setReplicas(partition1Replicas) .setReplicas(partition1Replicas)
.setRemovingReplicas(util.List.of())
.setAddingReplicas(util.List.of())
.setLeader(partition1Replicas.get(1)) .setLeader(partition1Replicas.get(1))
.setLeaderEpoch(leaderEpoch + leaderEpochIncrement) .setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
.setPartitionEpoch(0) .setPartitionEpoch(0)
@ -4321,8 +4131,6 @@ class ReplicaManagerTest {
.setTopicId(topicId) .setTopicId(topicId)
.setReplicas(effectiveReplicas) .setReplicas(effectiveReplicas)
.setIsr(effectiveIsr) .setIsr(effectiveIsr)
.setRemovingReplicas(Collections.emptyList())
.setAddingReplicas(Collections.emptyList())
.setLeader(leaderId) .setLeader(leaderId)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setPartitionEpoch(0) .setPartitionEpoch(0)
@ -4349,8 +4157,6 @@ class ReplicaManagerTest {
.setTopicId(topicId) .setTopicId(topicId)
.setReplicas(util.Arrays.asList(followerId, leaderId)) .setReplicas(util.Arrays.asList(followerId, leaderId))
.setIsr(util.Arrays.asList(followerId, leaderId)) .setIsr(util.Arrays.asList(followerId, leaderId))
.setRemovingReplicas(Collections.emptyList())
.setAddingReplicas(Collections.emptyList())
.setLeader(leaderId) .setLeader(leaderId)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setPartitionEpoch(0) .setPartitionEpoch(0)
@ -5539,13 +5345,13 @@ class ReplicaManagerTest {
} }
} }
private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean, partitions:List[Int] = List(0), directoryIds: List[Uuid] = List.empty, topicName: String = "foo", topicId: Uuid = FOO_UUID): TopicsDelta = { private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean, partitions:List[Int] = List(0), directoryIds: List[Uuid] = List.empty, topicName: String = "foo", topicId: Uuid = FOO_UUID, leaderEpoch: Int = 0): TopicsDelta = {
val leader = if (isStartIdLeader) startId else startId + 1 val leader = if (isStartIdLeader) startId else startId + 1
val delta = new TopicsDelta(TopicsImage.EMPTY) val delta = new TopicsDelta(TopicsImage.EMPTY)
delta.replay(new TopicRecord().setName(topicName).setTopicId(topicId)) delta.replay(new TopicRecord().setName(topicName).setTopicId(topicId))
partitions.foreach { partition => partitions.foreach { partition =>
val record = partitionRecord(startId, leader, partition, topicId) val record = partitionRecord(startId, leader, partition, topicId, leaderEpoch)
if (directoryIds.nonEmpty) { if (directoryIds.nonEmpty) {
record.setDirectories(directoryIds.asJava) record.setDirectories(directoryIds.asJava)
} }
@ -5555,16 +5361,14 @@ class ReplicaManagerTest {
delta delta
} }
private def partitionRecord(startId: Int, leader: Int, partition: Int = 0, topicId: Uuid = FOO_UUID) = { private def partitionRecord(startId: Int, leader: Int, partition: Int = 0, topicId: Uuid = FOO_UUID, leaderEpoch: Int = 0) = {
new PartitionRecord() new PartitionRecord()
.setPartitionId(partition) .setPartitionId(partition)
.setTopicId(topicId) .setTopicId(topicId)
.setReplicas(util.Arrays.asList(startId, startId + 1)) .setReplicas(util.Arrays.asList(startId, startId + 1))
.setIsr(util.Arrays.asList(startId, startId + 1)) .setIsr(util.Arrays.asList(startId, startId + 1))
.setRemovingReplicas(Collections.emptyList())
.setAddingReplicas(Collections.emptyList())
.setLeader(leader) .setLeader(leader)
.setLeaderEpoch(0) .setLeaderEpoch(leaderEpoch)
.setPartitionEpoch(0) .setPartitionEpoch(0)
} }