KAFKA-18545: Remove Zookeeper logic from LogManager (#18592)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
Ken Huang 2025-02-04 01:16:35 +08:00 committed by GitHub
parent 4ca24a7dbf
commit 272d947f96
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 3 additions and 55 deletions

View File

@ -353,8 +353,8 @@ class LogManager(logDirs: Seq[File],
addStrayLog(topicPartition, log) addStrayLog(topicPartition, log)
warn(s"Loaded stray log: $logDir") warn(s"Loaded stray log: $logDir")
} else if (isStray(log)) { } else if (isStray(log)) {
// Unlike Zookeeper mode, which tracks pending topic deletions under a ZNode, KRaft is unable to prevent a topic from being recreated before every replica has been deleted. // We are unable to prevent a topic from being recreated before every replica has been deleted.
// A KRaft broker with an offline directory may be unable to detect it still holds a to-be-deleted replica, // Broker with an offline directory may be unable to detect it still holds a to-be-deleted replica,
// and can create a conflicting topic partition for a new incarnation of the topic in one of the remaining online directories. // and can create a conflicting topic partition for a new incarnation of the topic in one of the remaining online directories.
// So upon a restart in which the offline directory is back online we need to clean up the old replica directory. // So upon a restart in which the offline directory is back online we need to clean up the old replica directory.
log.renameDir(UnifiedLog.logStrayDirName(log.topicPartition), shouldReinitialize = false) log.renameDir(UnifiedLog.logStrayDirName(log.topicPartition), shouldReinitialize = false)
@ -950,7 +950,6 @@ class LogManager(logDirs: Seq[File],
wasRemoteLogEnabled: Boolean): Unit = { wasRemoteLogEnabled: Boolean): Unit = {
topicConfigUpdated(topic) topicConfigUpdated(topic)
val logs = logsByTopic(topic) val logs = logsByTopic(topic)
// Combine the default properties with the overrides in zk to create the new LogConfig
val newLogConfig = LogConfig.fromProps(currentDefaultConfig.originals, newTopicConfig) val newLogConfig = LogConfig.fromProps(currentDefaultConfig.originals, newTopicConfig)
val isRemoteLogStorageEnabled = newLogConfig.remoteStorageEnable() val isRemoteLogStorageEnabled = newLogConfig.remoteStorageEnable()
// We would like to validate the configuration no matter whether the logs have materialised on disk or not. // We would like to validate the configuration no matter whether the logs have materialised on disk or not.
@ -1079,11 +1078,7 @@ class LogManager(logDirs: Seq[File],
log log
} }
// When running a ZK controller, we may get a log that does not have a topic ID. Assign it here.
if (log.topicId.isEmpty) {
topicId.foreach(log.assignTopicId)
}
// Ensure topic IDs are consistent // Ensure topic IDs are consistent
topicId.foreach { topicId => topicId.foreach { topicId =>
log.topicId.foreach { logTopicId => log.topicId.foreach { logTopicId =>

View File

@ -4188,53 +4188,6 @@ class ReplicaManagerTest {
} }
} }
@Test
def testPartitionMetadataFileCreatedWithExistingLog(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))
try {
val brokerList = Seq[Integer](0, 1).asJava
val topicPartition = new TopicPartition(topic, 0)
replicaManager.logManager.getOrCreateLog(topicPartition, isNew = true, topicId = None)
assertTrue(replicaManager.getLog(topicPartition).isDefined)
var log = replicaManager.getLog(topicPartition).get
assertEquals(None, log.topicId)
assertFalse(log.partitionMetadataFile.get.exists())
val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
val topicNames = topicIds.asScala.map(_.swap).asJava
def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
Seq(new LeaderAndIsrRequest.PartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(epoch)
.setIsr(brokerList)
.setPartitionEpoch(0)
.setReplicas(brokerList)
.setIsNew(true)).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ())
assertEquals(Errors.NONE, response.partitionErrors(topicNames).get(topicPartition))
assertFalse(replicaManager.localLog(topicPartition).isEmpty)
val id = topicIds.get(topicPartition.topic())
log = replicaManager.localLog(topicPartition).get
assertTrue(log.partitionMetadataFile.get.exists())
val partitionMetadata = log.partitionMetadataFile.get.read()
// Current version of PartitionMetadataFile is 0.
assertEquals(0, partitionMetadata.version)
assertEquals(id, partitionMetadata.topicId)
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}
@Test @Test
def testInconsistentIdReturnsError(): Unit = { def testInconsistentIdReturnsError(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time)) val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))