From 7e9df7d03db51b3b376e06029b6addc10ae16998 Mon Sep 17 00:00:00 2001 From: Gaurav Narula Date: Thu, 17 Jul 2025 03:40:00 +0100 Subject: [PATCH] KAFKA-19505: allow mocking UnifiedLog#topicId in ReplicaManagerTest (#20167) The mocked value for `UnifiedLog#topicId` was incorrectly set up which caused test failure. Reviewers: Luke Chen , PoAn Yang , Satish Duggana , Chia-Ping Tsai --- .../kafka/server/ReplicaManagerTest.scala | 40 +++++++++---------- .../scala/unit/kafka/utils/TestUtils.scala | 10 +++-- 2 files changed, 27 insertions(+), 23 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 843a0d60342..88e0ae5d35d 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -2977,7 +2977,8 @@ class ReplicaManagerTest { if (enableRemoteStorage && defaultTopicRemoteLogStorageEnable) { logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") } - val mockLog = setupMockLog(path1) + val logConfig = new LogConfig(logProps) + val mockLogFn = (topicPartition: TopicPartition, topicId: Option[Uuid]) => setupMockLog(path1, logConfig, enableRemoteStorage, topicPartition, topicId) if (setupLogDirMetaProperties) { // add meta.properties file in each dir config.logDirs.stream().forEach(dir => { @@ -2991,10 +2992,7 @@ class ReplicaManagerTest { new File(new File(dir), MetaPropertiesEnsemble.META_PROPERTIES_NAME).getAbsolutePath, false) }) } - val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), new LogConfig(logProps), log = if (shouldMockLog) Some(mockLog) else None, remoteStorageSystemEnable = enableRemoteStorage) - val logConfig = new LogConfig(logProps) - when(mockLog.config).thenReturn(logConfig) - when(mockLog.remoteLogEnabled()).thenReturn(enableRemoteStorage) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), logConfig, logFn = if (shouldMockLog) Some(mockLogFn) else None, remoteStorageSystemEnable = enableRemoteStorage) val aliveBrokers = aliveBrokerIds.map(brokerId => new Node(brokerId, s"host$brokerId", brokerId)) brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.isRemoteStorageSystemEnabled) @@ -3306,8 +3304,8 @@ class ReplicaManagerTest { val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteFetchQuotaExceeded = Some(false)) try { val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) - replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - replicaManager.createPartition(tp02).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId)) + replicaManager.createPartition(tp02).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId2)) val partition0Replicas = Seq[Integer](0, 1).asJava val topicIds = Map(tp0.topic -> topicId, tp02.topic -> topicId2).asJava val leaderEpoch = 0 @@ -3360,7 +3358,7 @@ class ReplicaManagerTest { val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog= true, remoteFetchQuotaExceeded = Some(false)) try { val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) - replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId)) val topicIds = Map(tp0.topic -> topicId).asJava val leaderEpoch = 0 val delta = createLeaderDelta(topicIds.get(topic), tp0, brokerList.get(0), brokerList, brokerList) @@ -3438,7 +3436,7 @@ class ReplicaManagerTest { val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteLogManager = Some(spyRLM)) try { val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) - replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId)) val leaderEpoch = 0 val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 0, leaderEpoch = leaderEpoch) @@ -3537,7 +3535,7 @@ class ReplicaManagerTest { try { val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) - replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId)) val leaderEpoch = 0 val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 0, leaderEpoch = leaderEpoch) @@ -3606,8 +3604,8 @@ class ReplicaManagerTest { try { val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) - replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId)) + replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId)) val leaderEpoch = 0 val leaderDelta0 = createLeaderDelta(topicId, tp0, leaderId = 0, leaderEpoch = leaderEpoch) @@ -3723,7 +3721,7 @@ class ReplicaManagerTest { val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteLogManager = Some(remoteLogManager), buildRemoteLogAuxState = true) try { val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) - replicaManager.createPartition(topicPartition).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + replicaManager.createPartition(topicPartition).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId)) val partition0Replicas = Seq[Integer](0, 1).asJava // Verify the metrics for build remote log state and for failures is zero before replicas start to fetch @@ -3762,7 +3760,7 @@ class ReplicaManagerTest { val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteLogManager = Some(remoteLogManager), buildRemoteLogAuxState = true) try { val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) - replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId)) // Verify the metrics for build remote log state and for failures is zero before replicas start to fetch assertEquals(0, brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count) @@ -3807,7 +3805,7 @@ class ReplicaManagerTest { val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteLogManager = Some(remoteLogManager), buildRemoteLogAuxState = true) try { val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) - replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, Some(topicId)) val partition0Replicas = Seq[Integer](0, 1).asJava // Verify the metrics for build remote log state and for failures is zero before replicas start to fetch @@ -3837,14 +3835,14 @@ class ReplicaManagerTest { } } - private def setupMockLog(path: String): UnifiedLog = { + private def setupMockLog(path: String, logConfig: LogConfig, enableRemoteStorage: Boolean, topicPartition: TopicPartition, topicId: Option[Uuid]): UnifiedLog = { val mockLog = mock(classOf[UnifiedLog]) - val partitionDir = new File(path, s"$topic-0") + val partitionDir = new File(path, s"$topicPartition") partitionDir.mkdir() when(mockLog.dir).thenReturn(partitionDir) when(mockLog.parentDir).thenReturn(path) - when(mockLog.topicId).thenReturn(Optional.of(topicId)).thenReturn(Optional.of(topicId2)) - when(mockLog.topicPartition).thenReturn(new TopicPartition(topic, 0)).thenReturn(new TopicPartition(topic2, 0)) + when(mockLog.topicId).thenReturn(topicId.toJava) + when(mockLog.topicPartition).thenReturn(topicPartition) when(mockLog.highWatermark).thenReturn(highHW) when(mockLog.updateHighWatermark(anyLong())).thenReturn(0L) when(mockLog.logEndOffsetMetadata).thenReturn(new LogOffsetMetadata(10)) @@ -3858,6 +3856,8 @@ class ReplicaManagerTest { when(mockLog.latestEpoch).thenReturn(Optional.of(0)) val producerStateManager = mock(classOf[ProducerStateManager]) when(mockLog.producerStateManager).thenReturn(producerStateManager) + when(mockLog.config).thenReturn(logConfig) + when(mockLog.remoteLogEnabled()).thenReturn(enableRemoteStorage) mockLog } @@ -5848,7 +5848,7 @@ class ReplicaManagerTest { val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true) try { val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) - replicaManager.createPartition(tp).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints = offsetCheckpoints, None) + replicaManager.createPartition(tp).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints = offsetCheckpoints, Some(topicId)) val partition0Replicas = Seq[Integer](0, 1).asJava val leaderEpoch = 0 val leaderDelta = createLeaderDelta(topicId, tp, leaderId = 0, leaderEpoch = leaderEpoch, replicas = partition0Replicas, isr = partition0Replicas) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 861c520c666..9cc76e2e2e1 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -952,7 +952,7 @@ object TestUtils extends Logging { time: MockTime = new MockTime(), recoveryThreadsPerDataDir: Int = 4, transactionVerificationEnabled: Boolean = false, - log: Option[UnifiedLog] = None, + logFn: Option[(TopicPartition, Option[Uuid]) => UnifiedLog] = None, remoteStorageSystemEnable: Boolean = false, initialTaskDelayMs: Long = ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT): LogManager = { val logManager = new LogManager(logDirs = logDirs.map(_.getAbsoluteFile), @@ -975,9 +975,13 @@ object TestUtils extends Logging { remoteStorageSystemEnable = remoteStorageSystemEnable, initialTaskDelayMs = initialTaskDelayMs) - if (log.isDefined) { + if (logFn.isDefined) { val spyLogManager = Mockito.spy(logManager) - Mockito.doReturn(log.get, Nil: _*).when(spyLogManager).getOrCreateLog(any(classOf[TopicPartition]), anyBoolean(), anyBoolean(), any(classOf[Optional[Uuid]]), any(classOf[Option[Uuid]])) + Mockito.doAnswer(answer => { + val topicPartition = answer.getArgument(0, classOf[TopicPartition]) + val topicId = answer.getArgument(3, classOf[Optional[Uuid]]) + logFn.get(topicPartition, OptionConverters.toScala(topicId)) + }).when(spyLogManager).getOrCreateLog(any(classOf[TopicPartition]), anyBoolean(), anyBoolean(), any(classOf[Optional[Uuid]]), any(classOf[Option[Uuid]])) spyLogManager } else logManager