KAFKA-18486 Migrate tests to use applyDelta instead of becomeLeaderOrFollower for testInconsistentIdReturnsError and others (#20014)

continues the migration effort for KAFKA-18486 by replacing usage of the
deprecated `becomeLeaderOrFollower` API with `applyDelta` in several
test cases.

#### Updated tests:
- `testInconsistentIdReturnsError`
- `testMaybeAddLogDirFetchers`
- `testMaybeAddLogDirFetchersPausingCleaning`
- `testSuccessfulBuildRemoteLogAuxStateMetrics`
- `testVerificationForTransactionalPartitionsOnly`
- `testBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate`

Reviewers: Jhen-Yung Hsu <jhenyunghsu@gmail.com>, TaiJuWu
 <tjwu1217@gmail.com>, Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
This commit is contained in:
Jing-Jia Hung 2025-06-25 08:02:27 -04:00 committed by GitHub
parent 33a1648c44
commit 5e23df0c8d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 75 additions and 178 deletions

View File

@ -55,7 +55,7 @@ import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, TransactionLogConfig}
import org.apache.kafka.image._
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.{LeaderAndIsr, MetadataCache}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, MetadataVersion, OffsetAndEpoch, RequestLocal, StopPartition}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
@ -133,7 +133,6 @@ class ReplicaManagerTest {
// Constants defined for readability
private val zkVersion = 0
private val correlationId = 0
private val controllerEpoch = 0
private val brokerEpoch = 0L
@ -312,38 +311,26 @@ class ReplicaManagerTest {
alterPartitionManager = alterPartitionManager)
try {
val partition = rm.createPartition(new TopicPartition(topic, 0))
partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
val delta = topicsCreateDelta(0, isStartIdLeader = true, partitions = List(0), topicName = topic, topicId = topicIds(topic))
val image = imageFromTopics(delta.apply())
rm.applyDelta(delta, image)
val partition = rm.getPartitionOrException(topicPartition)
rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
Seq(new LeaderAndIsrRequest.PartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(0)
.setIsr(Seq[Integer](0).asJava)
.setPartitionEpoch(0)
.setReplicas(Seq[Integer](0).asJava)
.setIsNew(false)).asJava,
Collections.singletonMap(topic, topicId),
Set(new Node(0, "host1", 0)).asJava).build(), (_, _) => ())
appendRecords(rm, new TopicPartition(topic, 0),
appendRecords(rm, topicPartition,
MemoryRecords.withRecords(Compression.NONE, new SimpleRecord("first message".getBytes()), new SimpleRecord("second message".getBytes())))
logManager.maybeUpdatePreferredLogDir(new TopicPartition(topic, 0), dir2.getAbsolutePath)
logManager.maybeUpdatePreferredLogDir(topicPartition, dir2.getAbsolutePath)
partition.createLogIfNotExists(isNew = true, isFutureReplica = true,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
// this method should use hw of future log to create log dir fetcher. Otherwise, it causes offset mismatch error
rm.maybeAddLogDirFetchers(Set(partition), new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), _ => None)
rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t => t.fetchState(new TopicPartition(topic, 0)).foreach(s => assertEquals(0L, s.fetchOffset)))
rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t => t.fetchState(topicPartition).foreach(s => assertEquals(0L, s.fetchOffset)))
// make sure alter log dir thread has processed the data
rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t => t.doWork())
assertEquals(Set.empty, rm.replicaAlterLogDirsManager.failedPartitions.partitions())
// the future log becomes the current log, so the partition state should get removed
rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t => assertEquals(None, t.fetchState(new TopicPartition(topic, 0))))
rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t => assertEquals(None, t.fetchState(topicPartition)))
} finally {
rm.shutdown(checkpointHW = false)
}
@ -362,7 +349,6 @@ class ReplicaManagerTest {
val metadataCache: MetadataCache = mock(classOf[MetadataCache])
mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0)))
when(metadataCache.metadataVersion()).thenReturn(MetadataVersion.MINIMUM_VERSION)
val tp0 = new TopicPartition(topic, 0)
val rm = new ReplicaManager(
metrics = metrics,
config = config,
@ -375,28 +361,13 @@ class ReplicaManagerTest {
alterPartitionManager = alterPartitionManager)
try {
val partition = rm.createPartition(tp0)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), Option.apply(topicId))
val delta = topicsCreateDelta(startId = 0, isStartIdLeader = true,
partitions = List(0), topicName = topic, topicId = topicId)
val image = imageFromTopics(delta.apply())
rm.applyDelta(delta, image)
val partition = rm.getPartitionOrException(topicPartition)
val response = rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
Seq(new LeaderAndIsrRequest.PartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(0)
.setIsr(Seq[Integer](0).asJava)
.setPartitionEpoch(0)
.setReplicas(Seq[Integer](0).asJava)
.setIsNew(false)).asJava,
Collections.singletonMap(topic, topicId),
Set(new Node(0, "host1", 0)).asJava).build(), (_, _) => ())
// expect the errorCounts only has 1 entry with Errors.NONE
val errorCounts = response.errorCounts()
assertEquals(1, response.errorCounts().size())
assertNotNull(errorCounts.get(Errors.NONE))
spyLogManager.maybeUpdatePreferredLogDir(tp0, dir2.getAbsolutePath)
spyLogManager.maybeUpdatePreferredLogDir(topicPartition, dir2.getAbsolutePath)
if (futureLogCreated) {
// create future log before maybeAddLogDirFetchers invoked
@ -404,7 +375,7 @@ class ReplicaManagerTest {
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
} else {
val mockLog = mock(classOf[UnifiedLog])
when(spyLogManager.getLog(tp0, isFuture = true)).thenReturn(Option.apply(mockLog))
when(spyLogManager.getLog(topicPartition, isFuture = true)).thenReturn(Option.apply(mockLog))
when(mockLog.topicId).thenReturn(Optional.of(topicId))
when(mockLog.parentDir).thenReturn(dir2.getAbsolutePath)
}
@ -1225,65 +1196,51 @@ class ReplicaManagerTest {
}
}
@Test
def testBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate(): Unit = {
verifyBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate(new Properties, expectTruncation = false)
}
/**
* If a partition becomes a follower and the leader is unchanged it should check for truncation
* if the epoch has increased by more than one (which suggests it has missed an update). For
* IBP version 2.7 onwards, we don't require this since we can truncate at any time based
* on diverging epochs returned in fetch responses.
* This test assumes IBP >= 2.7 behavior, so `expectTruncation` is set to false and truncation is not expected.
*/
private def verifyBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate(extraProps: Properties,
expectTruncation: Boolean): Unit = {
val topicPartition = 0
@Test
def testBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate(): Unit = {
val extraProps = new Properties
val followerBrokerId = 0
val leaderBrokerId = 1
val controllerId = 0
val controllerEpoch = 0
var leaderEpoch = 1
val leaderEpochIncrement = 2
val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId)
val countDownLatch = new CountDownLatch(1)
val offsetFromLeader = 5
// Prepare the mocked components for the test
val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(new MockTimer(time),
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch,
expectTruncation = expectTruncation, localLogOffset = Optional.of(10), offsetFromLeader = offsetFromLeader, extraProps = extraProps, topicId = Optional.of(topicId))
topicPartition.partition(), leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch,
expectTruncation = false, localLogOffset = Optional.of(10), offsetFromLeader = offsetFromLeader, extraProps = extraProps, topicId = Optional.of(topicId))
try {
// Initialize partition state to follower, with leader = 1, leaderEpoch = 1
val tp = new TopicPartition(topic, topicPartition)
val partition = replicaManager.createPartition(tp)
val partition = replicaManager.createPartition(topicPartition)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
partition.makeFollower(
leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds),
offsetCheckpoints,
None)
val followerDelta = topicsCreateDelta(startId = followerBrokerId, isStartIdLeader = false, partitions = List(topicPartition.partition()), List.empty, topic, topicIds(topic), leaderEpoch)
replicaManager.applyDelta(followerDelta, imageFromTopics(followerDelta.apply()))
// Verify log created and partition is hosted
val localLog = replicaManager.localLog(topicPartition)
assertTrue(localLog.isDefined, "Log should be created for follower after applyDelta")
val hostedPartition = replicaManager.getPartition(topicPartition)
assertTrue(hostedPartition.isInstanceOf[HostedPartition.Online])
// Make local partition a follower - because epoch increased by more than 1, truncation should
// trigger even though leader does not change
leaderEpoch += leaderEpochIncrement
val leaderAndIsrRequest0 = new LeaderAndIsrRequest.Builder(
controllerId, controllerEpoch, brokerEpoch,
Seq(leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds)).asJava,
Collections.singletonMap(topic, topicId),
Set(new Node(followerBrokerId, "host1", 0),
new Node(leaderBrokerId, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest0,
(_, followers) => assertEquals(followerBrokerId, followers.head.partitionId))
val epochJumpDelta = topicsCreateDelta(startId = followerBrokerId, isStartIdLeader = false, partitions = List(topicPartition.partition()), List.empty, topic, topicIds(topic), leaderEpoch)
replicaManager.applyDelta(epochJumpDelta, imageFromTopics(epochJumpDelta.apply()))
assertTrue(countDownLatch.await(1000L, TimeUnit.MILLISECONDS))
// Truncation should have happened once
if (expectTruncation) {
verify(mockLogMgr).truncateTo(Map(tp -> offsetFromLeader), isFuture = false)
}
verify(mockLogMgr).finishedInitializingLog(ArgumentMatchers.eq(tp), any())
verify(mockLogMgr).finishedInitializingLog(ArgumentMatchers.eq(topicPartition), any())
} finally {
replicaManager.shutdown(checkpointHW = false)
}
@ -1859,16 +1816,16 @@ class ReplicaManagerTest {
val producerEpoch = 0.toShort
val sequence = 0
val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0, tp1))
val brokerList = Seq[Integer](0, 1).asJava
try {
replicaManager.becomeLeaderOrFollower(1,
makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)),
(_, _) => ())
val leaderDelta0 = createLeaderDelta(topicId, tp0, leaderId = 1, replicas = brokerList, isr = brokerList)
val leaderDelta1 = createLeaderDelta(topicId, tp1, leaderId = 1, replicas = brokerList, isr = brokerList)
val image0 = imageFromTopics(leaderDelta0.apply())
replicaManager.applyDelta(leaderDelta0, image0)
replicaManager.becomeLeaderOrFollower(1,
makeLeaderAndIsrRequest(topicIds(tp1.topic), tp1, Seq(0, 1), new LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)),
(_, _) => ())
val image1 = imageFromTopics(leaderDelta1.apply())
replicaManager.applyDelta(leaderDelta1, image1)
// If we supply no transactional ID and idempotent records, we do not verify.
val idempotentRecords = MemoryRecords.withIdempotentRecords(Compression.NONE, producerId, producerEpoch, sequence,
@ -3651,8 +3608,6 @@ class ReplicaManagerTest {
@Test
def testSuccessfulBuildRemoteLogAuxStateMetrics(): Unit = {
val tp0 = new TopicPartition(topic, 0)
val remoteLogManager = mock(classOf[RemoteLogManager])
val remoteLogSegmentMetadata = mock(classOf[RemoteLogSegmentMetadata])
when(remoteLogManager.fetchRemoteLogSegmentMetadata(any(), anyInt(), anyLong())).thenReturn(
@ -3664,40 +3619,25 @@ class ReplicaManagerTest {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteLogManager = Some(remoteLogManager), buildRemoteLogAuxState = true)
try {
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
replicaManager.createPartition(topicPartition).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
val partition0Replicas = Seq[Integer](0, 1).asJava
val topicIds = Map(tp0.topic -> topicId).asJava
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
Seq(
new LeaderAndIsrRequest.PartitionState()
.setTopicName(tp0.topic)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(1)
.setLeaderEpoch(0)
.setIsr(partition0Replicas)
.setPartitionEpoch(0)
.setReplicas(partition0Replicas)
.setIsNew(true)
).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
// Verify the metrics for build remote log state and for failures is zero before replicas start to fetch
assertEquals(0, brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count)
assertEquals(0, brokerTopicStats.topicStats(tp0.topic()).failedBuildRemoteLogAuxStateRate.count)
assertEquals(0, brokerTopicStats.topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate.count)
assertEquals(0, brokerTopicStats.topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate.count)
// Verify aggregate metrics
assertEquals(0, brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count)
assertEquals(0, brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count)
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
val leaderDelta = createLeaderDelta(topicId, topicPartition, leaderId = 1, replicas = partition0Replicas, isr = partition0Replicas)
val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
replicaManager.applyDelta(leaderDelta, leaderMetadataImage)
// Replicas fetch from the leader periodically, therefore we check that the metric value is increasing
waitUntilTrue(() => brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count > 0,
"Should have buildRemoteLogAuxStateRequestRate count > 0, but got:" + brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count)
assertEquals(0, brokerTopicStats.topicStats(tp0.topic()).failedBuildRemoteLogAuxStateRate.count)
waitUntilTrue(() => brokerTopicStats.topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate.count > 0,
"Should have buildRemoteLogAuxStateRequestRate count > 0, but got:" + brokerTopicStats.topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate.count)
assertEquals(0, brokerTopicStats.topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate.count)
// Verify aggregate metrics
waitUntilTrue(() => brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count > 0,
"Should have all topic buildRemoteLogAuxStateRequestRate count > 0, but got:" + brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count)
@ -3881,41 +3821,35 @@ class ReplicaManagerTest {
def testInconsistentIdReturnsError(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))
try {
val brokerList = Seq[Integer](0, 1).asJava
val topicPartition = new TopicPartition(topic, 0)
val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
val topicNames = topicIds.asScala.map(_.swap).asJava
val invalidTopicId = Uuid.randomUuid()
val invalidTopicIds = Collections.singletonMap(topic, Uuid.randomUuid())
val invalidTopicNames = invalidTopicIds.asScala.map(_.swap).asJava
val initialDelta = topicsCreateDelta(0, isStartIdLeader = true,
partitions = List(0), topicName = topic, topicId = topicIds(topic))
val initialImage = imageFromTopics(initialDelta.apply())
replicaManager.applyDelta(initialDelta, initialImage)
def leaderAndIsrRequest(epoch: Int, topicIds: java.util.Map[String, Uuid]): LeaderAndIsrRequest =
new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
Seq(new LeaderAndIsrRequest.PartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(epoch)
.setIsr(brokerList)
.setPartitionEpoch(0)
.setReplicas(brokerList)
.setIsNew(true)).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topicIds), (_, _) => ())
assertEquals(Errors.NONE, response.partitionErrors(topicNames).get(topicPartition))
val response2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1, topicIds), (_, _) => ())
assertEquals(Errors.NONE, response2.partitionErrors(topicNames).get(topicPartition))
val updateDelta = topicsCreateDelta(0, isStartIdLeader = true,
partitions = List(0), topicName = topic, topicId = topicIds(topic), leaderEpoch = 1)
val updateImage = imageFromTopics(updateDelta.apply())
replicaManager.applyDelta(updateDelta, updateImage)
// Send request with inconsistent ID.
val response3 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1, invalidTopicIds), (_, _) => ())
assertEquals(Errors.INCONSISTENT_TOPIC_ID, response3.partitionErrors(invalidTopicNames).get(topicPartition))
val inconsistentDelta1 = topicsCreateDelta(0, isStartIdLeader = true,
partitions = List(0), topicName = topic, topicId = invalidTopicId, leaderEpoch = 1)
val inconsistentImage1 = imageFromTopics(inconsistentDelta1.apply())
val exception1 = assertThrows(classOf[IllegalStateException], () => {
replicaManager.applyDelta(inconsistentDelta1, inconsistentImage1)
})
assertEquals(s"Topic ${topic}-0 exists, but its ID is ${topicId}, not ${invalidTopicId} as expected", exception1.getMessage)
val inconsistentDelta2 = topicsCreateDelta(0, isStartIdLeader = true,
partitions = List(0), topicName = topic, topicId = invalidTopicId, leaderEpoch = 2)
val inconsistentImage2 = imageFromTopics(inconsistentDelta2.apply())
val exception2 = assertThrows(classOf[IllegalStateException], () => {
replicaManager.applyDelta(inconsistentDelta2, inconsistentImage2)
})
assertEquals(s"Topic ${topic}-0 exists, but its ID is ${topicId}, not ${invalidTopicId} as expected", exception2.getMessage)
val response4 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(2, invalidTopicIds), (_, _) => ())
assertEquals(Errors.INCONSISTENT_TOPIC_ID, response4.partitionErrors(invalidTopicNames).get(topicPartition))
} finally {
replicaManager.shutdown(checkpointHW = false)
}
@ -3984,43 +3918,6 @@ class ReplicaManagerTest {
}
}
private def makeLeaderAndIsrRequest(
topicId: Uuid,
topicPartition: TopicPartition,
replicas: Seq[Int],
leaderAndIsr: LeaderAndIsr,
isNew: Boolean = true,
brokerEpoch: Int = 0,
controllerId: Int = 0,
controllerEpoch: Int = 0
): LeaderAndIsrRequest = {
val partitionState = new LeaderAndIsrRequest.PartitionState()
.setTopicName(topicPartition.topic)
.setPartitionIndex(topicPartition.partition)
.setControllerEpoch(controllerEpoch)
.setLeader(leaderAndIsr.leader)
.setLeaderEpoch(leaderAndIsr.leaderEpoch)
.setIsr(leaderAndIsr.isr)
.setPartitionEpoch(leaderAndIsr.partitionEpoch)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(isNew)
def mkNode(replicaId: Int): Node = {
new Node(replicaId, s"host-$replicaId", 9092)
}
val nodes = Set(mkNode(controllerId)) ++ replicas.map(mkNode).toSet
new LeaderAndIsrRequest.Builder(
controllerId,
controllerEpoch,
brokerEpoch,
Seq(partitionState).asJava,
Map(topicPartition.topic -> topicId).asJava,
nodes.asJava
).build()
}
@Test
def testActiveProducerState(): Unit = {
val brokerId = 0