KAFKA-19505: allow mocking UnifiedLog#topicId in ReplicaManagerTest (#20167)

The mocked value for `UnifiedLog#topicId` was incorrectly set up which
caused test failure.

Reviewers: Luke Chen <showuon@gmail.com>, PoAn Yang <payang@apache.org>, Satish Duggana <satishd@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Gaurav Narula 2025-07-17 03:40:00 +01:00 committed by GitHub
parent 70824be92a
commit 7e9df7d03d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 27 additions and 23 deletions

View File

@ -2977,7 +2977,8 @@ class ReplicaManagerTest {
if (enableRemoteStorage && defaultTopicRemoteLogStorageEnable) {
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
}
val mockLog = setupMockLog(path1)
val logConfig = new LogConfig(logProps)
val mockLogFn = (topicPartition: TopicPartition, topicId: Option[Uuid]) => setupMockLog(path1, logConfig, enableRemoteStorage, topicPartition, topicId)
if (setupLogDirMetaProperties) {
// add meta.properties file in each dir
config.logDirs.stream().forEach(dir => {
@ -2991,10 +2992,7 @@ class ReplicaManagerTest {
new File(new File(dir), MetaPropertiesEnsemble.META_PROPERTIES_NAME).getAbsolutePath, false)
})
}
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), new LogConfig(logProps), log = if (shouldMockLog) Some(mockLog) else None, remoteStorageSystemEnable = enableRemoteStorage)
val logConfig = new LogConfig(logProps)
when(mockLog.config).thenReturn(logConfig)
when(mockLog.remoteLogEnabled()).thenReturn(enableRemoteStorage)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), logConfig, logFn = if (shouldMockLog) Some(mockLogFn) else None, remoteStorageSystemEnable = enableRemoteStorage)
val aliveBrokers = aliveBrokerIds.map(brokerId => new Node(brokerId, s"host$brokerId", brokerId))
brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.isRemoteStorageSystemEnabled)
@ -3306,8 +3304,8 @@ class ReplicaManagerTest {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteFetchQuotaExceeded = Some(false))
try {
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
replicaManager.createPartition(tp02).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId))
replicaManager.createPartition(tp02).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId2))
val partition0Replicas = Seq[Integer](0, 1).asJava
val topicIds = Map(tp0.topic -> topicId, tp02.topic -> topicId2).asJava
val leaderEpoch = 0
@ -3360,7 +3358,7 @@ class ReplicaManagerTest {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog= true, remoteFetchQuotaExceeded = Some(false))
try {
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId))
val topicIds = Map(tp0.topic -> topicId).asJava
val leaderEpoch = 0
val delta = createLeaderDelta(topicIds.get(topic), tp0, brokerList.get(0), brokerList, brokerList)
@ -3438,7 +3436,7 @@ class ReplicaManagerTest {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteLogManager = Some(spyRLM))
try {
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId))
val leaderEpoch = 0
val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 0, leaderEpoch = leaderEpoch)
@ -3537,7 +3535,7 @@ class ReplicaManagerTest {
try {
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId))
val leaderEpoch = 0
val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 0, leaderEpoch = leaderEpoch)
@ -3606,8 +3604,8 @@ class ReplicaManagerTest {
try {
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId))
replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId))
val leaderEpoch = 0
val leaderDelta0 = createLeaderDelta(topicId, tp0, leaderId = 0, leaderEpoch = leaderEpoch)
@ -3723,7 +3721,7 @@ class ReplicaManagerTest {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteLogManager = Some(remoteLogManager), buildRemoteLogAuxState = true)
try {
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(topicPartition).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
replicaManager.createPartition(topicPartition).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId))
val partition0Replicas = Seq[Integer](0, 1).asJava
// Verify the metrics for build remote log state and for failures is zero before replicas start to fetch
@ -3762,7 +3760,7 @@ class ReplicaManagerTest {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteLogManager = Some(remoteLogManager), buildRemoteLogAuxState = true)
try {
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId))
// Verify the metrics for build remote log state and for failures is zero before replicas start to fetch
assertEquals(0, brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count)
@ -3807,7 +3805,7 @@ class ReplicaManagerTest {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteLogManager = Some(remoteLogManager), buildRemoteLogAuxState = true)
try {
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId))
val partition0Replicas = Seq[Integer](0, 1).asJava
// Verify the metrics for build remote log state and for failures is zero before replicas start to fetch
@ -3837,14 +3835,14 @@ class ReplicaManagerTest {
}
}
private def setupMockLog(path: String): UnifiedLog = {
private def setupMockLog(path: String, logConfig: LogConfig, enableRemoteStorage: Boolean, topicPartition: TopicPartition, topicId: Option[Uuid]): UnifiedLog = {
val mockLog = mock(classOf[UnifiedLog])
val partitionDir = new File(path, s"$topic-0")
val partitionDir = new File(path, s"$topicPartition")
partitionDir.mkdir()
when(mockLog.dir).thenReturn(partitionDir)
when(mockLog.parentDir).thenReturn(path)
when(mockLog.topicId).thenReturn(Optional.of(topicId)).thenReturn(Optional.of(topicId2))
when(mockLog.topicPartition).thenReturn(new TopicPartition(topic, 0)).thenReturn(new TopicPartition(topic2, 0))
when(mockLog.topicId).thenReturn(topicId.toJava)
when(mockLog.topicPartition).thenReturn(topicPartition)
when(mockLog.highWatermark).thenReturn(highHW)
when(mockLog.updateHighWatermark(anyLong())).thenReturn(0L)
when(mockLog.logEndOffsetMetadata).thenReturn(new LogOffsetMetadata(10))
@ -3858,6 +3856,8 @@ class ReplicaManagerTest {
when(mockLog.latestEpoch).thenReturn(Optional.of(0))
val producerStateManager = mock(classOf[ProducerStateManager])
when(mockLog.producerStateManager).thenReturn(producerStateManager)
when(mockLog.config).thenReturn(logConfig)
when(mockLog.remoteLogEnabled()).thenReturn(enableRemoteStorage)
mockLog
}
@ -5848,7 +5848,7 @@ class ReplicaManagerTest {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true)
try {
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints = offsetCheckpoints, None)
replicaManager.createPartition(tp).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints = offsetCheckpoints, Some(topicId))
val partition0Replicas = Seq[Integer](0, 1).asJava
val leaderEpoch = 0
val leaderDelta = createLeaderDelta(topicId, tp, leaderId = 0, leaderEpoch = leaderEpoch, replicas = partition0Replicas, isr = partition0Replicas)

View File

@ -952,7 +952,7 @@ object TestUtils extends Logging {
time: MockTime = new MockTime(),
recoveryThreadsPerDataDir: Int = 4,
transactionVerificationEnabled: Boolean = false,
log: Option[UnifiedLog] = None,
logFn: Option[(TopicPartition, Option[Uuid]) => UnifiedLog] = None,
remoteStorageSystemEnable: Boolean = false,
initialTaskDelayMs: Long = ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT): LogManager = {
val logManager = new LogManager(logDirs = logDirs.map(_.getAbsoluteFile),
@ -975,9 +975,13 @@ object TestUtils extends Logging {
remoteStorageSystemEnable = remoteStorageSystemEnable,
initialTaskDelayMs = initialTaskDelayMs)
if (log.isDefined) {
if (logFn.isDefined) {
val spyLogManager = Mockito.spy(logManager)
Mockito.doReturn(log.get, Nil: _*).when(spyLogManager).getOrCreateLog(any(classOf[TopicPartition]), anyBoolean(), anyBoolean(), any(classOf[Optional[Uuid]]), any(classOf[Option[Uuid]]))
Mockito.doAnswer(answer => {
val topicPartition = answer.getArgument(0, classOf[TopicPartition])
val topicId = answer.getArgument(3, classOf[Optional[Uuid]])
logFn.get(topicPartition, OptionConverters.toScala(topicId))
}).when(spyLogManager).getOrCreateLog(any(classOf[TopicPartition]), anyBoolean(), anyBoolean(), any(classOf[Optional[Uuid]]), any(classOf[Option[Uuid]]))
spyLogManager
} else
logManager