diff --git a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java index 44711760421..53f6c0dd305 100644 --- a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java +++ b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java @@ -56,7 +56,6 @@ public class LogManagerBuilder { private BrokerTopicStats brokerTopicStats = null; private LogDirFailureChannel logDirFailureChannel = null; private Time time = Time.SYSTEM; - private boolean keepPartitionMetadataFile = true; private boolean remoteStorageSystemEnable = false; private long initialTaskDelayMs = ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT; @@ -145,11 +144,6 @@ public class LogManagerBuilder { return this; } - public LogManagerBuilder setKeepPartitionMetadataFile(boolean keepPartitionMetadataFile) { - this.keepPartitionMetadataFile = keepPartitionMetadataFile; - return this; - } - public LogManagerBuilder setRemoteStorageSystemEnable(boolean remoteStorageSystemEnable) { this.remoteStorageSystemEnable = remoteStorageSystemEnable; return this; @@ -186,7 +180,6 @@ public class LogManagerBuilder { brokerTopicStats, logDirFailureChannel, time, - keepPartitionMetadataFile, remoteStorageSystemEnable, initialTaskDelayMs); } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index c9e771a5a55..effd7135799 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -78,7 +78,6 @@ class LogManager(logDirs: Seq[File], brokerTopicStats: BrokerTopicStats, logDirFailureChannel: LogDirFailureChannel, time: Time, - val keepPartitionMetadataFile: Boolean, remoteStorageSystemEnable: Boolean, val initialTaskDelayMs: Long) extends Logging { @@ -346,7 +345,6 @@ class LogManager(logDirs: Seq[File], logDirFailureChannel = logDirFailureChannel, lastShutdownClean = hadCleanShutdown, topicId = None, - keepPartitionMetadataFile = keepPartitionMetadataFile, numRemainingSegments = numRemainingSegments, remoteStorageSystemEnable = remoteStorageSystemEnable) @@ -1074,7 +1072,6 @@ class LogManager(logDirs: Seq[File], brokerTopicStats = brokerTopicStats, logDirFailureChannel = logDirFailureChannel, topicId = topicId, - keepPartitionMetadataFile = keepPartitionMetadataFile, remoteStorageSystemEnable = remoteStorageSystemEnable) if (isFuture) @@ -1552,8 +1549,7 @@ object LogManager { kafkaScheduler: Scheduler, time: Time, brokerTopicStats: BrokerTopicStats, - logDirFailureChannel: LogDirFailureChannel, - keepPartitionMetadataFile: Boolean): LogManager = { + logDirFailureChannel: LogDirFailureChannel): LogManager = { val defaultProps = config.extractLogConfigMap LogConfig.validateBrokerLogConfigValues(defaultProps, config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) @@ -1578,7 +1574,6 @@ object LogManager { brokerTopicStats = brokerTopicStats, logDirFailureChannel = logDirFailureChannel, time = time, - keepPartitionMetadataFile = keepPartitionMetadataFile, interBrokerProtocolVersion = config.interBrokerProtocolVersion, remoteStorageSystemEnable = config.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), initialTaskDelayMs = config.logInitialTaskDelayMs) diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index f92bc6610ea..af4b9e05d7b 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -85,13 +85,6 @@ import scala.jdk.OptionConverters.{RichOption, RichOptional, RichOptionalInt} * @param _topicId optional Uuid to specify the topic ID for the topic if it exists. Should only be specified when * first creating the log through Partition.makeLeader or Partition.makeFollower. When reloading a log, * this field will be populated by reading the topic ID value from partition.metadata if it exists. - * @param keepPartitionMetadataFile boolean flag to indicate whether the partition.metadata file should be kept in the - * log directory. A partition.metadata file is only created when the raft controller is used - * or the ZK controller and this broker's inter-broker protocol version is at least 2.8. - * This file will persist the topic ID on the broker. If inter-broker protocol for a ZK controller - * is downgraded below 2.8, a topic ID may be lost and a new ID generated upon re-upgrade. - * If the inter-broker protocol version on a ZK cluster is below 2.8, partition.metadata - * will be deleted to avoid ID conflicts upon re-upgrade. * @param remoteStorageSystemEnable flag to indicate whether the system level remote log storage is enabled or not. */ @threadsafe @@ -102,7 +95,6 @@ class UnifiedLog(@volatile var logStartOffset: Long, @volatile var leaderEpochCache: LeaderEpochFileCache, val producerStateManager: ProducerStateManager, @volatile private var _topicId: Option[Uuid], - val keepPartitionMetadataFile: Boolean, val remoteStorageSystemEnable: Boolean = false, @volatile private var logOffsetsListener: LogOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER) extends Logging with AutoCloseable { @@ -190,40 +182,26 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Initialize topic ID information for the log by maintaining the partition metadata file and setting the in-memory _topicId. - * Delete partition metadata file if the version does not support topic IDs. * Set _topicId based on a few scenarios: - * - Recover topic ID if present and topic IDs are supported. Ensure we do not try to assign a provided topicId that is inconsistent + * - Recover topic ID if present. Ensure we do not try to assign a provided topicId that is inconsistent * with the ID on file. - * - If we were provided a topic ID when creating the log, partition metadata files are supported, and one does not yet exist + * - If we were provided a topic ID when creating the log and one does not yet exist * set _topicId and write to the partition metadata file. - * - Otherwise set _topicId to None */ private def initializeTopicId(): Unit = { val partMetadataFile = partitionMetadataFile.getOrElse( throw new KafkaException("The partitionMetadataFile should have been initialized")) if (partMetadataFile.exists()) { - if (keepPartitionMetadataFile) { - val fileTopicId = partMetadataFile.read().topicId - if (_topicId.isDefined && !_topicId.contains(fileTopicId)) - throw new InconsistentTopicIdException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," + - s"but log already contained topic ID $fileTopicId") + val fileTopicId = partMetadataFile.read().topicId + if (_topicId.isDefined && !_topicId.contains(fileTopicId)) + throw new InconsistentTopicIdException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," + + s"but log already contained topic ID $fileTopicId") - _topicId = Some(fileTopicId) - - } else { - try partMetadataFile.delete() - catch { - case e: IOException => - error(s"Error while trying to delete partition metadata file $partMetadataFile", e) - } - } - } else if (keepPartitionMetadataFile) { + _topicId = Some(fileTopicId) + } else { _topicId.foreach(partMetadataFile.record) scheduler.scheduleOnce("flush-metadata-file", () => maybeFlushMetadataFile()) - } else { - // We want to keep the file and the in-memory topic ID in sync. - _topicId = None } } @@ -493,17 +471,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, } case None => - if (keepPartitionMetadataFile) { - _topicId = Some(topicId) - partitionMetadataFile match { - case Some(partMetadataFile) => - if (!partMetadataFile.exists()) { - partMetadataFile.record(topicId) - scheduler.scheduleOnce("flush-metadata-file", () => maybeFlushMetadataFile()) - } - case _ => warn(s"The topic id $topicId will not be persisted to the partition metadata file " + - "since the partition is deleted") - } + _topicId = Some(topicId) + partitionMetadataFile match { + case Some(partMetadataFile) => + if (!partMetadataFile.exists()) { + partMetadataFile.record(topicId) + scheduler.scheduleOnce("flush-metadata-file", () => maybeFlushMetadataFile()) + } + case _ => warn(s"The topic id $topicId will not be persisted to the partition metadata file " + + "since the partition is deleted") } } } @@ -1989,7 +1965,6 @@ object UnifiedLog extends Logging { logDirFailureChannel: LogDirFailureChannel, lastShutdownClean: Boolean = true, topicId: Option[Uuid], - keepPartitionMetadataFile: Boolean, numRemainingSegments: ConcurrentMap[String, Integer] = new ConcurrentHashMap[String, Integer], remoteStorageSystemEnable: Boolean = false, logOffsetsListener: LogOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER): UnifiedLog = { @@ -2034,7 +2009,6 @@ object UnifiedLog extends Logging { leaderEpochCache, producerStateManager, topicId, - keepPartitionMetadataFile, remoteStorageSystemEnable, logOffsetsListener) } diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala index d3ab1f25ff3..cd3f1db2d98 100644 --- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala +++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala @@ -620,8 +620,7 @@ object KafkaMetadataLog extends Logging { producerIdExpirationCheckIntervalMs = Int.MaxValue, logDirFailureChannel = new LogDirFailureChannel(5), lastShutdownClean = false, - topicId = Some(topicId), - keepPartitionMetadataFile = true + topicId = Some(topicId) ) val metadataLog = new KafkaMetadataLog( diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 36f1232427e..43160315b6d 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -216,8 +216,7 @@ class BrokerServer( kafkaScheduler, time, brokerTopicStats, - logDirFailureChannel, - keepPartitionMetadataFile = true) + logDirFailureChannel) remoteLogManagerOpt = createRemoteLogManager() diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index 4695506863c..c8800cf054e 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -452,8 +452,7 @@ class PartitionLockTest extends Logging { log.producerIdExpirationCheckIntervalMs, leaderEpochCache, producerStateManager, - _topicId = None, - keepPartitionMetadataFile = true) { + _topicId = None) { override def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, origin: AppendOrigin, requestLocal: RequestLocal, verificationGuard: VerificationGuard): LogAppendInfo = { diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index d05b3ee14a1..3b5220d9441 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -3622,8 +3622,7 @@ class PartitionTest extends AbstractPartitionTest { log.producerIdExpirationCheckIntervalMs, leaderEpochCache, producerStateManager, - _topicId = None, - keepPartitionMetadataFile = true) { + _topicId = None) { override def appendAsFollower(records: MemoryRecords): LogAppendInfo = { appendSemaphore.acquire() diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala index e0a6724d081..38937012065 100644 --- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala @@ -117,8 +117,7 @@ abstract class AbstractLogCleanerIntegrationTest { producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, logDirFailureChannel = new LogDirFailureChannel(10), - topicId = None, - keepPartitionMetadataFile = true) + topicId = None) logMap.put(partition, log) this.logs += log } diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala index 87f56a9c250..cf28d1cef68 100755 --- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala @@ -69,8 +69,7 @@ class BrokerCompressionTest { producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, logDirFailureChannel = new LogDirFailureChannel(10), - topicId = None, - keepPartitionMetadataFile = true + topicId = None ) /* append two messages */ diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 796536780b1..2315acc3fb3 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -133,7 +133,7 @@ class LogCleanerManagerTest extends Logging { // the exception should be caught and the partition that caused it marked as uncleanable class LogMock extends UnifiedLog(offsets.logStartOffset, localLog, new BrokerTopicStats, producerIdExpirationCheckIntervalMs, leaderEpochCache, - producerStateManager, _topicId = None, keepPartitionMetadataFile = true) { + producerStateManager, _topicId = None) { // Throw an error in getFirstBatchTimestampForSegments since it is called in grabFilthiestLog() override def getFirstBatchTimestampForSegments(segments: util.Collection[LogSegment]): util.Collection[java.lang.Long] = throw new IllegalStateException("Error!") @@ -821,8 +821,7 @@ class LogCleanerManagerTest extends Logging { producerStateManagerConfig = producerStateManagerConfig, producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, logDirFailureChannel = new LogDirFailureChannel(10), - topicId = None, - keepPartitionMetadataFile = true) + topicId = None) } private def createLowRetentionLogConfig(segmentSize: Int, cleanupPolicy: String): LogConfig = { @@ -875,8 +874,7 @@ class LogCleanerManagerTest extends Logging { producerStateManagerConfig = producerStateManagerConfig, producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, logDirFailureChannel = new LogDirFailureChannel(10), - topicId = None, - keepPartitionMetadataFile = true + topicId = None ) } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 9100cc7af21..152f0b66b2c 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -216,8 +216,7 @@ class LogCleanerTest extends Logging { producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs, leaderEpochCache = leaderEpochCache, producerStateManager = producerStateManager, - _topicId = None, - keepPartitionMetadataFile = true) { + _topicId = None) { override def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment]): Unit = { deleteStartLatch.countDown() if (!deleteCompleteLatch.await(5000, TimeUnit.MILLISECONDS)) { @@ -2093,8 +2092,7 @@ class LogCleanerTest extends Logging { producerStateManagerConfig = producerStateManagerConfig, producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, logDirFailureChannel = new LogDirFailureChannel(10), - topicId = None, - keepPartitionMetadataFile = true + topicId = None ) } diff --git a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala index d6d2b066506..342ef145b6d 100644 --- a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala @@ -156,8 +156,7 @@ class LogConcurrencyTest { producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, logDirFailureChannel = new LogDirFailureChannel(10), - topicId = None, - keepPartitionMetadataFile = true + topicId = None ) } diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index 6d82ae2d291..a2b49685b43 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -127,7 +127,6 @@ class LogLoaderTest { brokerTopicStats = new BrokerTopicStats(), logDirFailureChannel = logDirFailureChannel, time = time, - keepPartitionMetadataFile = true, remoteStorageSystemEnable = config.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), initialTaskDelayMs = config.logInitialTaskDelayMs) { @@ -324,7 +323,7 @@ class LogLoaderTest { logDirFailureChannel) new UnifiedLog(offsets.logStartOffset, localLog, brokerTopicStats, producerIdExpirationCheckIntervalMs, leaderEpochCache, producerStateManager, - None, keepPartitionMetadataFile = true) + None) } // Retain snapshots for the last 2 segments @@ -447,8 +446,7 @@ class LogLoaderTest { producerIdExpirationCheckIntervalMs = 30000, leaderEpochCache = leaderEpochCache, producerStateManager = stateManager, - _topicId = None, - keepPartitionMetadataFile = true) + _topicId = None) verify(stateManager).updateMapEndOffset(0L) verify(stateManager).removeStraySnapshots(any()) @@ -557,8 +555,7 @@ class LogLoaderTest { producerIdExpirationCheckIntervalMs = 30000, leaderEpochCache = leaderEpochCache, producerStateManager = stateManager, - _topicId = None, - keepPartitionMetadataFile = true) + _topicId = None) verify(stateManager).removeStraySnapshots(any[java.util.List[java.lang.Long]]) verify(stateManager, times(2)).updateMapEndOffset(0L) diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 28adaa35fab..b7dd63daa45 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -975,7 +975,6 @@ class LogManagerTest { // not clean shutdown lastShutdownClean = false, topicId = None, - keepPartitionMetadataFile = false, // pass mock map for verification later numRemainingSegments = mockMap) @@ -1383,7 +1382,6 @@ class LogManagerTest { time = Time.SYSTEM, brokerTopicStats = new BrokerTopicStats, logDirFailureChannel = new LogDirFailureChannel(1), - keepPartitionMetadataFile = true, interBrokerProtocolVersion = MetadataVersion.latestTesting, remoteStorageSystemEnable = false, initialTaskDelayMs = 0) diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala index e98028ab86f..bf2b71676fa 100644 --- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala +++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala @@ -103,7 +103,6 @@ object LogTestUtils { producerIdExpirationCheckIntervalMs: Int = TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, lastShutdownClean: Boolean = true, topicId: Option[Uuid] = None, - keepPartitionMetadataFile: Boolean = true, numRemainingSegments: ConcurrentMap[String, Integer] = new ConcurrentHashMap[String, Integer], remoteStorageSystemEnable: Boolean = false, remoteLogManager: Option[RemoteLogManager] = None, @@ -122,7 +121,6 @@ object LogTestUtils { logDirFailureChannel = new LogDirFailureChannel(10), lastShutdownClean = lastShutdownClean, topicId = topicId, - keepPartitionMetadataFile = keepPartitionMetadataFile, numRemainingSegments = numRemainingSegments, remoteStorageSystemEnable = remoteStorageSystemEnable, logOffsetsListener = logOffsetsListener diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index feb2a9770ec..e66418d35cf 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -1964,26 +1964,6 @@ class UnifiedLogTest { log.close() } - @Test - def testNoOpWhenKeepPartitionMetadataFileIsFalse(): Unit = { - val logConfig = LogTestUtils.createLogConfig() - val log = createLog(logDir, logConfig, keepPartitionMetadataFile = false) - - val topicId = Uuid.randomUuid() - log.assignTopicId(topicId) - // We should not write to this file or set the topic ID - assertFalse(log.partitionMetadataFile.get.exists()) - assertEquals(None, log.topicId) - log.close() - - val log2 = createLog(logDir, logConfig, topicId = Some(Uuid.randomUuid()), keepPartitionMetadataFile = false) - - // We should not write to this file or set the topic ID - assertFalse(log2.partitionMetadataFile.get.exists()) - assertEquals(None, log2.topicId) - log2.close() - } - @Test def testLogFailsWhenInconsistentTopicIdSet(): Unit = { val logConfig = LogTestUtils.createLogConfig() @@ -4499,13 +4479,12 @@ class UnifiedLogTest { producerIdExpirationCheckIntervalMs: Int = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, lastShutdownClean: Boolean = true, topicId: Option[Uuid] = None, - keepPartitionMetadataFile: Boolean = true, remoteStorageSystemEnable: Boolean = false, remoteLogManager: Option[RemoteLogManager] = None, logOffsetsListener: LogOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER): UnifiedLog = { val log = LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, time, logStartOffset, recoveryPoint, maxTransactionTimeoutMs, producerStateManagerConfig, producerIdExpirationCheckIntervalMs, - lastShutdownClean, topicId, keepPartitionMetadataFile, new ConcurrentHashMap[String, Integer], + lastShutdownClean, topicId, new ConcurrentHashMap[String, Integer], remoteStorageSystemEnable, remoteLogManager, logOffsetsListener) logsToClose = logsToClose :+ log log diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index a3081f17ed3..0aa837f03e5 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -2929,8 +2929,7 @@ class ReplicaManagerTest { producerIdExpirationCheckIntervalMs = 30000, leaderEpochCache = leaderEpochCache, producerStateManager = producerStateManager, - _topicId = topicId, - keepPartitionMetadataFile = true) { + _topicId = topicId) { override def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch] = { assertEquals(leaderEpoch, leaderEpochFromLeader) diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala index 4557fce99eb..87896f85156 100644 --- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala +++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala @@ -100,8 +100,7 @@ class DumpLogSegmentsTest { producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, logDirFailureChannel = new LogDirFailureChannel(10), - topicId = None, - keepPartitionMetadataFile = true + topicId = None ) log } diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala index 7afa2178f73..c5671926dbf 100644 --- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala +++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala @@ -164,7 +164,7 @@ class SchedulerTest { localLog = localLog, brokerTopicStats, producerIdExpirationCheckIntervalMs, leaderEpochCache, producerStateManager, - _topicId = None, keepPartitionMetadataFile = true) + _topicId = None) assertTrue(scheduler.taskRunning(log.producerExpireCheck)) log.close() assertFalse(scheduler.taskRunning(log.producerExpireCheck)) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index b2d630ca357..03a8400c33e 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -988,7 +988,6 @@ object TestUtils extends Logging { time = time, brokerTopicStats = new BrokerTopicStats, logDirFailureChannel = new LogDirFailureChannel(logDirs.size), - keepPartitionMetadataFile = true, interBrokerProtocolVersion = interBrokerProtocolVersion, remoteStorageSystemEnable = remoteStorageSystemEnable, initialTaskDelayMs = initialTaskDelayMs) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index ae0a582d0a0..16e54582a29 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -146,7 +146,6 @@ public class ReplicaFetcherThreadBenchmark { setBrokerTopicStats(brokerTopicStats). setLogDirFailureChannel(logDirFailureChannel). setTime(Time.SYSTEM). - setKeepPartitionMetadataFile(true). build(); replicaManager = new ReplicaManagerBuilder(). diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/StressTestLog.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/StressTestLog.java index 439f98f0400..3b9eae7aeaa 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/StressTestLog.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/StressTestLog.java @@ -78,7 +78,6 @@ public class StressTestLog { new LogDirFailureChannel(10), true, Option.empty(), - true, new ConcurrentHashMap<>(), false, LogOffsetsListener.NO_OP_OFFSETS_LISTENER diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java index b6777fea46c..cb22efe8808 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java @@ -314,7 +314,6 @@ public class TestLinearWriteSpeed { new LogDirFailureChannel(10), true, Option.empty(), - true, new CopyOnWriteMap<>(), false, LogOffsetsListener.NO_OP_OFFSETS_LISTENER diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java index dd636829910..3750bb47c46 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java @@ -115,7 +115,7 @@ public class PartitionMakeFollowerBenchmark { setScheduler(scheduler). setBrokerTopicStats(brokerTopicStats). setLogDirFailureChannel(logDirFailureChannel). - setTime(Time.SYSTEM).setKeepPartitionMetadataFile(true). + setTime(Time.SYSTEM). build(); TopicPartition tp = new TopicPartition("topic", 0); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java index 13b3c0a1fe4..e339a9e783e 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java @@ -106,7 +106,6 @@ public class UpdateFollowerFetchStateBenchmark { setBrokerTopicStats(brokerTopicStats). setLogDirFailureChannel(logDirFailureChannel). setTime(Time.SYSTEM). - setKeepPartitionMetadataFile(true). build(); OffsetCheckpoints offsetCheckpoints = Mockito.mock(OffsetCheckpoints.class); Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), topicPartition)).thenReturn(Optional.of(0L)); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java index 4a50e05433c..179bcafdfa7 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java @@ -141,7 +141,6 @@ public class PartitionCreationBench { setBrokerTopicStats(brokerTopicStats). setLogDirFailureChannel(failureChannel). setTime(Time.SYSTEM). - setKeepPartitionMetadataFile(true). build(); scheduler.startup(); this.quotaManagers = QuotaFactory.instantiate(this.brokerProperties, this.metrics, this.time, "");