KAFKA-18486 Migrate testPartitionMetadataFile to use applyDelta in place of deprecated becomeLeaderOrFollower (#19947)
CI / build (push) Waiting to run Details

Refactor testPartitionMetadataFile to use applyDelta and share
class-level partitions

- Replace deprecated becomeLeaderOrFollower with topicsCreateDelta +
applyDelta
- Test still asserts partition exists, local log exists, and verifies
partitionMetadataFile version (0) and topicId

Reviewers: TaiJuWu <tjwu1217@gmail.com>, Ken Huang <s7133700@gmail.com>,
 Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Jing-Jia Hung 2025-06-12 09:20:47 +08:00 committed by GitHub
parent 7c715c02c0
commit 2a7457f2dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 7 additions and 22 deletions

View File

@ -114,6 +114,7 @@ class ReplicaManagerTest {
private val topicId = Uuid.fromString("YK2ed2GaTH2JpgzUaJ8tgg") private val topicId = Uuid.fromString("YK2ed2GaTH2JpgzUaJ8tgg")
private val topicIds = scala.Predef.Map("test-topic" -> topicId) private val topicIds = scala.Predef.Map("test-topic" -> topicId)
private val topicNames = topicIds.map(_.swap) private val topicNames = topicIds.map(_.swap)
private val topicPartition = new TopicPartition(topic, 0)
private val transactionalId = "txn" private val transactionalId = "txn"
private val time = new MockTime private val time = new MockTime
private val metrics = new Metrics private val metrics = new Metrics
@ -4214,30 +4215,14 @@ class ReplicaManagerTest {
def testPartitionMetadataFile(): Unit = { def testPartitionMetadataFile(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time)) val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))
try { try {
val brokerList = Seq[Integer](0, 1).asJava val leaderDelta = topicsCreateDelta(0, isStartIdLeader = true, partitions = List(0),
val topicPartition = new TopicPartition(topic, 0) topicName = topic, topicId = topicIds(topic))
val topicIds = Collections.singletonMap(topic, Uuid.randomUuid()) val leaderImage = imageFromTopics(leaderDelta.apply())
val topicNames = topicIds.asScala.map(_.swap).asJava replicaManager.applyDelta(leaderDelta, leaderImage)
def leaderAndIsrRequest(epoch: Int, topicIds: java.util.Map[String, Uuid]): LeaderAndIsrRequest = assertTrue(replicaManager.getPartition(topicPartition).isInstanceOf[HostedPartition.Online])
new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
Seq(new LeaderAndIsrRequest.PartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(epoch)
.setIsr(brokerList)
.setPartitionEpoch(0)
.setReplicas(brokerList)
.setIsNew(true)).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topicIds), (_, _) => ())
assertEquals(Errors.NONE, response.partitionErrors(topicNames).get(topicPartition))
assertFalse(replicaManager.localLog(topicPartition).isEmpty) assertFalse(replicaManager.localLog(topicPartition).isEmpty)
val id = topicIds.get(topicPartition.topic()) val id = topicIds(topicPartition.topic)
val log = replicaManager.localLog(topicPartition).get val log = replicaManager.localLog(topicPartition).get
assertTrue(log.partitionMetadataFile.get.exists()) assertTrue(log.partitionMetadataFile.get.exists())
val partitionMetadata = log.partitionMetadataFile.get.read() val partitionMetadata = log.partitionMetadataFile.get.read()