From 39dcdeffd7075b8cbb018c5a4bfdd17e7828d75b Mon Sep 17 00:00:00 2001 From: Justine Olshan Date: Wed, 10 Feb 2021 18:22:15 -0500 Subject: [PATCH] MINOR: Prevent creating partition.metadata until ID can be written (#10041) Currently the partition.metadata file is created when the log is created. However, clusters with older inter-broker protocols will never use this file. This PR moves the creation of the file to when we write to the file. This PR also deletes the partition.metadata file on startup if the IBP version is lower than 2.8. Reviewers: Jun Rao --- core/src/main/scala/kafka/log/Log.scala | 29 +++++++++++++------ .../src/main/scala/kafka/log/LogManager.scala | 15 ++++++---- .../main/scala/kafka/raft/RaftManager.scala | 3 +- .../main/scala/kafka/server/KafkaServer.scala | 2 +- .../kafka/server/PartitionMetadataFile.scala | 14 ++++----- .../scala/kafka/server/ReplicaManager.scala | 7 +++-- .../scala/unit/kafka/log/LogManagerTest.scala | 8 ++--- .../test/scala/unit/kafka/log/LogTest.scala | 9 +++--- .../kafka/server/ReplicaManagerTest.scala | 25 +++++++--------- .../scala/unit/kafka/utils/TestUtils.scala | 3 +- .../ReplicaFetcherThreadBenchmark.java | 3 +- .../PartitionMakeFollowerBenchmark.java | 3 +- .../UpdateFollowerFetchStateBenchmark.java | 3 +- 13 files changed, 70 insertions(+), 54 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index aa9b73951a8..2249b5eccbb 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -242,6 +242,12 @@ case object SnapshotGenerated extends LogStartOffsetIncrementReason { * @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired * @param hadCleanShutdown boolean flag to indicate if the Log had a clean/graceful shutdown last time. true means * clean shutdown whereas false means a crash. + * @param keepPartitionMetadataFile boolean flag to indicate whether the partition.metadata file should be kept in the + * log directory. A partition.metadata file is only created when the controller's + * inter-broker protocol version is at least 2.8. This file will persist the topic ID on + * the broker. If inter-broker protocol is downgraded below 2.8, a topic ID may be lost + * and a new ID generated upon re-upgrade. If the inter-broker protocol version is below + * 2.8, partition.metadata will be deleted to avoid ID conflicts upon re-upgrade. */ @threadsafe class Log(@volatile private var _dir: File, @@ -256,7 +262,8 @@ class Log(@volatile private var _dir: File, val topicPartition: TopicPartition, val producerStateManager: ProducerStateManager, logDirFailureChannel: LogDirFailureChannel, - private val hadCleanShutdown: Boolean = true) extends Logging with KafkaMetricsGroup { + private val hadCleanShutdown: Boolean = true, + val keepPartitionMetadataFile: Boolean = true) extends Logging with KafkaMetricsGroup { import kafka.log.Log._ @@ -307,7 +314,7 @@ class Log(@volatile private var _dir: File, // Visible for testing @volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None - @volatile var partitionMetadataFile : Option[PartitionMetadataFile] = None + @volatile var partitionMetadataFile : PartitionMetadataFile = null @volatile var topicId : Uuid = Uuid.ZERO_UUID @@ -341,10 +348,13 @@ class Log(@volatile private var _dir: File, producerStateManager.removeStraySnapshots(segments.values().asScala.map(_.baseOffset).toSeq) loadProducerState(logEndOffset, reloadFromCleanShutdown = hadCleanShutdown) - // Recover topic ID if present - partitionMetadataFile.foreach { file => - if (!file.isEmpty()) - topicId = file.read().topicId + // Delete partition metadata file if the version does not support topic IDs. + // Recover topic ID if present and topic IDs are supported + if (partitionMetadataFile.exists()) { + if (!keepPartitionMetadataFile) + partitionMetadataFile.delete() + else + topicId = partitionMetadataFile.read().topicId } } @@ -564,7 +574,7 @@ class Log(@volatile private var _dir: File, private def initializePartitionMetadata(): Unit = lock synchronized { val partitionMetadata = PartitionMetadataFile.newFile(dir) - partitionMetadataFile = Some(new PartitionMetadataFile(partitionMetadata, logDirFailureChannel)) + partitionMetadataFile = new PartitionMetadataFile(partitionMetadata, logDirFailureChannel) } private def initializeLeaderEpochCache(): Unit = lock synchronized { @@ -2563,11 +2573,12 @@ object Log { maxProducerIdExpirationMs: Int, producerIdExpirationCheckIntervalMs: Int, logDirFailureChannel: LogDirFailureChannel, - lastShutdownClean: Boolean = true): Log = { + lastShutdownClean: Boolean = true, + keepPartitionMetadataFile: Boolean = true): Log = { val topicPartition = Log.parseTopicPartitionName(dir) val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs) new Log(dir, config, logStartOffset, recoveryPoint, scheduler, brokerTopicStats, time, maxProducerIdExpirationMs, - producerIdExpirationCheckIntervalMs, topicPartition, producerStateManager, logDirFailureChannel, lastShutdownClean) + producerIdExpirationCheckIntervalMs, topicPartition, producerStateManager, logDirFailureChannel, lastShutdownClean, keepPartitionMetadataFile) } /** diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index b788bf0525f..acb9d34c60b 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -62,7 +62,8 @@ class LogManager(logDirs: Seq[File], scheduler: Scheduler, brokerTopicStats: BrokerTopicStats, logDirFailureChannel: LogDirFailureChannel, - time: Time) extends Logging with KafkaMetricsGroup { + time: Time, + val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup { import LogManager._ @@ -268,7 +269,8 @@ class LogManager(logDirs: Seq[File], time = time, brokerTopicStats = brokerTopicStats, logDirFailureChannel = logDirFailureChannel, - lastShutdownClean = hadCleanShutdown) + lastShutdownClean = hadCleanShutdown, + keepPartitionMetadataFile = keepPartitionMetadataFile) if (logDir.getName.endsWith(Log.DeleteDirSuffix)) { addLogToBeDeleted(log) @@ -824,7 +826,8 @@ class LogManager(logDirs: Seq[File], scheduler = scheduler, time = time, brokerTopicStats = brokerTopicStats, - logDirFailureChannel = logDirFailureChannel) + logDirFailureChannel = logDirFailureChannel, + keepPartitionMetadataFile = keepPartitionMetadataFile) if (isFuture) futureLogs.put(topicPartition, log) @@ -1208,7 +1211,8 @@ object LogManager { kafkaScheduler: KafkaScheduler, time: Time, brokerTopicStats: BrokerTopicStats, - logDirFailureChannel: LogDirFailureChannel): LogManager = { + logDirFailureChannel: LogDirFailureChannel, + keepPartitionMetadataFile: Boolean): LogManager = { val defaultProps = LogConfig.extractLogConfigMap(config) LogConfig.validateValues(defaultProps) @@ -1230,7 +1234,8 @@ object LogManager { scheduler = kafkaScheduler, brokerTopicStats = brokerTopicStats, logDirFailureChannel = logDirFailureChannel, - time = time) + time = time, + keepPartitionMetadataFile = keepPartitionMetadataFile) } } diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index f10df1f9953..b9a77b702db 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -230,7 +230,8 @@ class KafkaRaftManager[T]( time = time, maxProducerIdExpirationMs = config.transactionalIdExpirationMs, producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, - logDirFailureChannel = new LogDirFailureChannel(5) + logDirFailureChannel = new LogDirFailureChannel(5), + keepPartitionMetadataFile = config.usesTopicId ) KafkaMetadataLog(log, topicPartition) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 6bb134f9eca..5b7f26f6208 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -246,7 +246,7 @@ class KafkaServer( /* start log manager */ logManager = LogManager(config, initialOfflineDirs, new ZkConfigRepository(new AdminZkClient(zkClient)), - kafkaScheduler, time, brokerTopicStats, logDirFailureChannel) + kafkaScheduler, time, brokerTopicStats, logDirFailureChannel, config.usesTopicId) brokerState = BrokerState.RECOVERY logManager.startup(zkClient.getAllTopicsInCluster()) diff --git a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala index 1adcbc3fe1b..25b1ba6129d 100644 --- a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala +++ b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala @@ -19,7 +19,7 @@ package kafka.server import java.io.{BufferedReader, BufferedWriter, File, FileOutputStream, IOException, OutputStreamWriter} import java.nio.charset.StandardCharsets -import java.nio.file.{FileAlreadyExistsException, Files, Paths} +import java.nio.file.{Files, Paths} import java.util.regex.Pattern import kafka.utils.Logging @@ -91,10 +91,6 @@ class PartitionMetadataFile(val file: File, private val lock = new Object() private val logDir = file.getParentFile.getParent - - try Files.createFile(file.toPath) // create the file if it doesn't exist - catch { case _: FileAlreadyExistsException => } - def write(topicId: Uuid): Unit = { lock synchronized { try { @@ -138,7 +134,11 @@ class PartitionMetadataFile(val file: File, } } - def isEmpty(): Boolean = { - file.length() == 0 + def exists(): Boolean = { + file.exists() + } + + def delete(): Boolean = { + file.delete() } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 3f52e22507a..ba50c86dd40 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1424,17 +1424,18 @@ class ReplicaManager(val config: KafkaConfig, * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object. * we need to map this topic-partition to OfflinePartition instead. */ - if (localLog(topicPartition).isEmpty) + val local = localLog(topicPartition) + if (local.isEmpty) markPartitionOffline(topicPartition) else { val id = topicIds.get(topicPartition.topic()) // Ensure we have not received a request from an older protocol if (id != null && !id.equals(Uuid.ZERO_UUID)) { - val log = localLog(topicPartition).get + val log = local.get // Check if topic ID is in memory, if not, it must be new to the broker and does not have a metadata file. // This is because if the broker previously wrote it to file, it would be recovered on restart after failure. if (log.topicId.equals(Uuid.ZERO_UUID)) { - log.partitionMetadataFile.get.write(id) + log.partitionMetadataFile.write(id) log.topicId = id // Warn if the topic ID in the request does not match the log. } else if (!log.topicId.equals(id)) { diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 01ca38ce800..c3698dcc6ca 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -229,8 +229,8 @@ class LogManagerTest { s.lazyTimeIndex.get }) - // there should be a log file, two indexes, one producer snapshot, partition metadata, and the leader epoch checkpoint - assertEquals(log.numberOfSegments * 4 + 2, log.dir.list.length, "Files should have been deleted") + // there should be a log file, two indexes, one producer snapshot, and the leader epoch checkpoint + assertEquals(log.numberOfSegments * 4 + 1, log.dir.list.length, "Files should have been deleted") assertEquals(0, readLog(log, offset + 1).records.sizeInBytes, "Should get empty fetch off new log.") assertThrows(classOf[OffsetOutOfRangeException], () => readLog(log, 0), () => "Should get exception from fetching earlier.") @@ -274,8 +274,8 @@ class LogManagerTest { time.sleep(log.config.fileDeleteDelayMs + 1) // there should be a log file, two indexes (the txn index is created lazily), - // and a producer snapshot file per segment, and the leader epoch checkpoint and partition metadata file. - assertEquals(log.numberOfSegments * 4 + 2, log.dir.list.length, "Files should have been deleted") + // and a producer snapshot file per segment, and the leader epoch checkpoint. + assertEquals(log.numberOfSegments * 4 + 1, log.dir.list.length, "Files should have been deleted") assertEquals(0, readLog(log, offset + 1).records.sizeInBytes, "Should get empty fetch off new log.") assertThrows(classOf[OffsetOutOfRangeException], () => readLog(log, 0)) // log should still be appendable diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index b39dd7aeb9a..dab9eb1d71b 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -98,7 +98,7 @@ class LogTest { initialDefaultConfig = logConfig, cleanerConfig = CleanerConfig(enableCleaner = false), recoveryThreadsPerDataDir = 4, flushCheckMs = 1000L, flushRecoveryOffsetCheckpointMs = 10000L, flushStartOffsetCheckpointMs = 10000L, retentionCheckMs = 1000L, maxPidExpirationMs = 60 * 60 * 1000, scheduler = time.scheduler, time = time, - brokerTopicStats = new BrokerTopicStats, logDirFailureChannel = new LogDirFailureChannel(logDirs.size)) { + brokerTopicStats = new BrokerTopicStats, logDirFailureChannel = new LogDirFailureChannel(logDirs.size), keepPartitionMetadataFile = config.usesTopicId) { override def loadLog(logDir: File, hadCleanShutdown: Boolean, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long], topicConfigs: Map[String, LogConfig]): Log = { @@ -2530,7 +2530,7 @@ class LogTest { var log = createLog(logDir, logConfig) val topicId = Uuid.randomUuid() - log.partitionMetadataFile.get.write(topicId) + log.partitionMetadataFile.write(topicId) log.close() // test recovery case @@ -3100,7 +3100,7 @@ class LogTest { // Write a topic ID to the partition metadata file to ensure it is transferred correctly. val id = Uuid.randomUuid() log.topicId = id - log.partitionMetadataFile.get.write(id) + log.partitionMetadataFile.write(id) log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5) assertEquals(Some(5), log.latestEpoch) @@ -3115,8 +3115,7 @@ class LogTest { // Check the topic ID remains in memory and was copied correctly. assertEquals(id, log.topicId) - assertFalse(log.partitionMetadataFile.isEmpty) - assertEquals(id, log.partitionMetadataFile.get.read().topicId) + assertEquals(id, log.partitionMetadataFile.read().topicId) } @Test diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index a21a016bd84..c31bf8efe54 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -2248,9 +2248,8 @@ class ReplicaManagerTest { assertFalse(replicaManager.localLog(topicPartition).isEmpty) val id = topicIds.get(topicPartition.topic()) val log = replicaManager.localLog(topicPartition).get - assertFalse(log.partitionMetadataFile.isEmpty) - assertFalse(log.partitionMetadataFile.get.isEmpty()) - val partitionMetadata = log.partitionMetadataFile.get.read() + assertTrue(log.partitionMetadataFile.exists()) + val partitionMetadata = log.partitionMetadataFile.read() // Current version of PartitionMetadataFile is 0. assertEquals(0, partitionMetadata.version) @@ -2285,33 +2284,29 @@ class ReplicaManagerTest { topicIds, Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build().serialize(), version) - // The file has no contents if the topic does not have an associated topic ID. + // There is no file if the topic does not have an associated topic ID. replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "fakeTopic", ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ()) assertFalse(replicaManager.localLog(topicPartition).isEmpty) val log = replicaManager.localLog(topicPartition).get - assertFalse(log.partitionMetadataFile.isEmpty) - assertTrue(log.partitionMetadataFile.get.isEmpty()) + assertFalse(log.partitionMetadataFile.exists()) - // The file has no contents if the topic has the default UUID. + // There is no file if the topic has the default UUID. replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topic, ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ()) assertFalse(replicaManager.localLog(topicPartition).isEmpty) val log2 = replicaManager.localLog(topicPartition).get - assertFalse(log2.partitionMetadataFile.isEmpty) - assertTrue(log2.partitionMetadataFile.get.isEmpty()) + assertFalse(log2.partitionMetadataFile.exists()) - // The file has no contents if the request is an older version + // There is no file if the request an older version replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "foo", 0), (_, _) => ()) assertFalse(replicaManager.localLog(topicPartitionFoo).isEmpty) val log3 = replicaManager.localLog(topicPartitionFoo).get - assertFalse(log3.partitionMetadataFile.isEmpty) - assertTrue(log3.partitionMetadataFile.get.isEmpty()) + assertFalse(log3.partitionMetadataFile.exists()) - // The file has no contents if the request is an older version + // There is no file if the request is an older version replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "foo", 4), (_, _) => ()) assertFalse(replicaManager.localLog(topicPartitionFoo).isEmpty) val log4 = replicaManager.localLog(topicPartitionFoo).get - assertFalse(log4.partitionMetadataFile.isEmpty) - assertTrue(log4.partitionMetadataFile.get.isEmpty()) + assertFalse(log4.partitionMetadataFile.exists()) } finally replicaManager.shutdown(checkpointHW = false) } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 6a7db8092f8..43df2b97f4b 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1089,7 +1089,8 @@ object TestUtils extends Logging { scheduler = time.scheduler, time = time, brokerTopicStats = new BrokerTopicStats, - logDirFailureChannel = new LogDirFailureChannel(logDirs.size)) + logDirFailureChannel = new LogDirFailureChannel(logDirs.size), + keepPartitionMetadataFile = true) } class MockAlterIsrManager extends AlterIsrManager { diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index 8cc30fc756e..424b8df7e76 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -134,7 +134,8 @@ public class ReplicaFetcherThreadBenchmark { scheduler, brokerTopicStats, logDirFailureChannel, - Time.SYSTEM); + Time.SYSTEM, + true); LinkedHashMap> initialFetched = new LinkedHashMap<>(); scala.collection.mutable.Map initialFetchStates = new scala.collection.mutable.HashMap<>(); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java index 9f33ceb87c7..ece6f86a0fe 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java @@ -110,7 +110,8 @@ public class PartitionMakeFollowerBenchmark { scheduler, brokerTopicStats, logDirFailureChannel, - Time.SYSTEM); + Time.SYSTEM, + true); TopicPartition tp = new TopicPartition("topic", 0); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java index 1230a1cbabe..a82c6a0e774 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java @@ -95,7 +95,8 @@ public class UpdateFollowerFetchStateBenchmark { scheduler, brokerTopicStats, logDirFailureChannel, - Time.SYSTEM); + Time.SYSTEM, + true); OffsetCheckpoints offsetCheckpoints = Mockito.mock(OffsetCheckpoints.class); Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), topicPartition)).thenReturn(Option.apply(0L)); DelayedOperations delayedOperations = new DelayedOperationsMock();