KAFKA-18486 Update testExceptionWhenUnverifiedTransactionHasMultipleProducerIds (#19883)

- Replace the deprecated `becomeLeaderOrFollower` with the
metadata-based `applyDelta` method.
- Add overloaded `topicsCreateDelta` to support custom topic name and
topicId.

Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
 <kitingiao@gmail.com>, Nick Guo <lansg0504@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
This commit is contained in:
Jing-Jia Hung 2025-06-08 00:55:20 +08:00 committed by GitHub
parent 8a7e4a1423
commit c5e06f6e7a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 21 additions and 21 deletions

View File

@ -2519,6 +2519,7 @@ class ReplicaManagerTest {
@Test
def testExceptionWhenUnverifiedTransactionHasMultipleProducerIds(): Unit = {
val localId = 1
val tp0 = new TopicPartition(topic, 0)
val tp1 = new TopicPartition(topic, 1)
val transactionalId = "txn1"
@ -2531,13 +2532,9 @@ class ReplicaManagerTest {
val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0, tp1))
try {
replicaManager.becomeLeaderOrFollower(1,
makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)),
(_, _) => ())
replicaManager.becomeLeaderOrFollower(1,
makeLeaderAndIsrRequest(topicIds(tp1.topic), tp1, Seq(0, 1), new LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)),
(_, _) => ())
val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true, partitions = List(0, 1), List.empty, topic, topicIds(topic))
val leaderImage = imageFromTopics(leaderDelta.apply())
replicaManager.applyDelta(leaderDelta, leaderImage)
// Append some transactional records with different producer IDs
val transactionalRecords = mutable.Map[TopicPartition, MemoryRecords]()
@ -4651,7 +4648,7 @@ class ReplicaManagerTest {
try {
val directoryIds = replicaManager.logManager.directoryIdsSet.toList
assertEquals(directoryIds.size, 2)
val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, true, partition = 0, directoryIds = directoryIds)
val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, true, partitions = List(0), directoryIds = directoryIds)
val (partition: Partition, isNewWhenCreatedForFirstTime: Boolean) = replicaManager.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, FOO_UUID).get
partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(), 1, localId, Seq(1, 2)),
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava),
@ -4698,7 +4695,7 @@ class ReplicaManagerTest {
// Test applying delta as leader
val directoryIds = replicaManager.logManager.directoryIdsSet.toList
// Make the local replica the leader
val leaderTopicsDelta = topicsCreateDelta(localId, true, partition = 0, directoryIds = directoryIds)
val leaderTopicsDelta = topicsCreateDelta(localId, true, partitions = List(0), directoryIds = directoryIds)
val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
@ -4708,7 +4705,7 @@ class ReplicaManagerTest {
assertEquals(directoryIds.head, logDirIdHostingPartition0)
// Test applying delta as follower
val followerTopicsDelta = topicsCreateDelta(localId, false, partition = 1, directoryIds = directoryIds)
val followerTopicsDelta = topicsCreateDelta(localId, false, partitions = List(1), directoryIds = directoryIds)
val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
@ -4736,7 +4733,7 @@ class ReplicaManagerTest {
try {
// Make the local replica the leader
val leaderTopicsDelta = topicsCreateDelta(localId, true, partition = 0, directoryIds = List(DirectoryId.UNASSIGNED, DirectoryId.UNASSIGNED))
val leaderTopicsDelta = topicsCreateDelta(localId, true, partitions = List(0), directoryIds = List(DirectoryId.UNASSIGNED, DirectoryId.UNASSIGNED))
val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
val topicId = leaderMetadataImage.topics().topicsByName.get("foo").id
val topicIdPartition0 = new TopicIdPartition(topicId, topicPartition0)
@ -4744,7 +4741,7 @@ class ReplicaManagerTest {
replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
// Make the local replica the as follower
val followerTopicsDelta = topicsCreateDelta(localId, false, partition = 1, directoryIds = List(DirectoryId.UNASSIGNED, DirectoryId.UNASSIGNED))
val followerTopicsDelta = topicsCreateDelta(localId, false, partitions = List(1), directoryIds = List(DirectoryId.UNASSIGNED, DirectoryId.UNASSIGNED))
val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
@ -4790,7 +4787,7 @@ class ReplicaManagerTest {
replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
// Make the local replica the as follower
val followerTopicsDelta = topicsCreateDelta(localId, false, partition = 1, directoryIds = List(DirectoryId.LOST, DirectoryId.LOST))
val followerTopicsDelta = topicsCreateDelta(localId, false, partitions = List(1), directoryIds = List(DirectoryId.LOST, DirectoryId.LOST))
val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
@ -5787,23 +5784,26 @@ class ReplicaManagerTest {
}
}
private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean, partition:Int = 0, directoryIds: List[Uuid] = List.empty): TopicsDelta = {
private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean, partitions:List[Int] = List(0), directoryIds: List[Uuid] = List.empty, topicName: String = "foo", topicId: Uuid = FOO_UUID): TopicsDelta = {
val leader = if (isStartIdLeader) startId else startId + 1
val delta = new TopicsDelta(TopicsImage.EMPTY)
delta.replay(new TopicRecord().setName("foo").setTopicId(FOO_UUID))
val record = partitionRecord(startId, leader, partition)
if (directoryIds.nonEmpty) {
record.setDirectories(directoryIds.asJava)
delta.replay(new TopicRecord().setName(topicName).setTopicId(topicId))
partitions.foreach { partition =>
val record = partitionRecord(startId, leader, partition, topicId)
if (directoryIds.nonEmpty) {
record.setDirectories(directoryIds.asJava)
}
delta.replay(record)
}
delta.replay(record)
delta
}
private def partitionRecord(startId: Int, leader: Int, partition: Int = 0) = {
private def partitionRecord(startId: Int, leader: Int, partition: Int = 0, topicId: Uuid = FOO_UUID) = {
new PartitionRecord()
.setPartitionId(partition)
.setTopicId(FOO_UUID)
.setTopicId(topicId)
.setReplicas(util.Arrays.asList(startId, startId + 1))
.setIsr(util.Arrays.asList(startId, startId + 1))
.setRemovingReplicas(Collections.emptyList())