KAFKA-18479: Remove keepPartitionMetadataFile in UnifiedLog and LogMan… (#18491)

Reviewers: Jun Rao <junrao@gmail.com>
This commit is contained in:
Dmitry Werner 2025-01-16 02:59:28 +05:00 committed by GitHub
parent d82f03e98b
commit 92fd99bda1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 38 additions and 123 deletions

View File

@ -56,7 +56,6 @@ public class LogManagerBuilder {
private BrokerTopicStats brokerTopicStats = null;
private LogDirFailureChannel logDirFailureChannel = null;
private Time time = Time.SYSTEM;
private boolean keepPartitionMetadataFile = true;
private boolean remoteStorageSystemEnable = false;
private long initialTaskDelayMs = ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT;
@ -145,11 +144,6 @@ public class LogManagerBuilder {
return this;
}
public LogManagerBuilder setKeepPartitionMetadataFile(boolean keepPartitionMetadataFile) {
this.keepPartitionMetadataFile = keepPartitionMetadataFile;
return this;
}
public LogManagerBuilder setRemoteStorageSystemEnable(boolean remoteStorageSystemEnable) {
this.remoteStorageSystemEnable = remoteStorageSystemEnable;
return this;
@ -186,7 +180,6 @@ public class LogManagerBuilder {
brokerTopicStats,
logDirFailureChannel,
time,
keepPartitionMetadataFile,
remoteStorageSystemEnable,
initialTaskDelayMs);
}

View File

@ -78,7 +78,6 @@ class LogManager(logDirs: Seq[File],
brokerTopicStats: BrokerTopicStats,
logDirFailureChannel: LogDirFailureChannel,
time: Time,
val keepPartitionMetadataFile: Boolean,
remoteStorageSystemEnable: Boolean,
val initialTaskDelayMs: Long) extends Logging {
@ -346,7 +345,6 @@ class LogManager(logDirs: Seq[File],
logDirFailureChannel = logDirFailureChannel,
lastShutdownClean = hadCleanShutdown,
topicId = None,
keepPartitionMetadataFile = keepPartitionMetadataFile,
numRemainingSegments = numRemainingSegments,
remoteStorageSystemEnable = remoteStorageSystemEnable)
@ -1074,7 +1072,6 @@ class LogManager(logDirs: Seq[File],
brokerTopicStats = brokerTopicStats,
logDirFailureChannel = logDirFailureChannel,
topicId = topicId,
keepPartitionMetadataFile = keepPartitionMetadataFile,
remoteStorageSystemEnable = remoteStorageSystemEnable)
if (isFuture)
@ -1552,8 +1549,7 @@ object LogManager {
kafkaScheduler: Scheduler,
time: Time,
brokerTopicStats: BrokerTopicStats,
logDirFailureChannel: LogDirFailureChannel,
keepPartitionMetadataFile: Boolean): LogManager = {
logDirFailureChannel: LogDirFailureChannel): LogManager = {
val defaultProps = config.extractLogConfigMap
LogConfig.validateBrokerLogConfigValues(defaultProps, config.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
@ -1578,7 +1574,6 @@ object LogManager {
brokerTopicStats = brokerTopicStats,
logDirFailureChannel = logDirFailureChannel,
time = time,
keepPartitionMetadataFile = keepPartitionMetadataFile,
interBrokerProtocolVersion = config.interBrokerProtocolVersion,
remoteStorageSystemEnable = config.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
initialTaskDelayMs = config.logInitialTaskDelayMs)

View File

@ -85,13 +85,6 @@ import scala.jdk.OptionConverters.{RichOption, RichOptional, RichOptionalInt}
* @param _topicId optional Uuid to specify the topic ID for the topic if it exists. Should only be specified when
* first creating the log through Partition.makeLeader or Partition.makeFollower. When reloading a log,
* this field will be populated by reading the topic ID value from partition.metadata if it exists.
* @param keepPartitionMetadataFile boolean flag to indicate whether the partition.metadata file should be kept in the
* log directory. A partition.metadata file is only created when the raft controller is used
* or the ZK controller and this broker's inter-broker protocol version is at least 2.8.
* This file will persist the topic ID on the broker. If inter-broker protocol for a ZK controller
* is downgraded below 2.8, a topic ID may be lost and a new ID generated upon re-upgrade.
* If the inter-broker protocol version on a ZK cluster is below 2.8, partition.metadata
* will be deleted to avoid ID conflicts upon re-upgrade.
* @param remoteStorageSystemEnable flag to indicate whether the system level remote log storage is enabled or not.
*/
@threadsafe
@ -102,7 +95,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
@volatile var leaderEpochCache: LeaderEpochFileCache,
val producerStateManager: ProducerStateManager,
@volatile private var _topicId: Option[Uuid],
val keepPartitionMetadataFile: Boolean,
val remoteStorageSystemEnable: Boolean = false,
@volatile private var logOffsetsListener: LogOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER) extends Logging with AutoCloseable {
@ -190,40 +182,26 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Initialize topic ID information for the log by maintaining the partition metadata file and setting the in-memory _topicId.
* Delete partition metadata file if the version does not support topic IDs.
* Set _topicId based on a few scenarios:
* - Recover topic ID if present and topic IDs are supported. Ensure we do not try to assign a provided topicId that is inconsistent
* - Recover topic ID if present. Ensure we do not try to assign a provided topicId that is inconsistent
* with the ID on file.
* - If we were provided a topic ID when creating the log, partition metadata files are supported, and one does not yet exist
* - If we were provided a topic ID when creating the log and one does not yet exist
* set _topicId and write to the partition metadata file.
* - Otherwise set _topicId to None
*/
private def initializeTopicId(): Unit = {
val partMetadataFile = partitionMetadataFile.getOrElse(
throw new KafkaException("The partitionMetadataFile should have been initialized"))
if (partMetadataFile.exists()) {
if (keepPartitionMetadataFile) {
val fileTopicId = partMetadataFile.read().topicId
if (_topicId.isDefined && !_topicId.contains(fileTopicId))
throw new InconsistentTopicIdException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," +
s"but log already contained topic ID $fileTopicId")
_topicId = Some(fileTopicId)
} else {
try partMetadataFile.delete()
catch {
case e: IOException =>
error(s"Error while trying to delete partition metadata file $partMetadataFile", e)
}
}
} else if (keepPartitionMetadataFile) {
_topicId.foreach(partMetadataFile.record)
scheduler.scheduleOnce("flush-metadata-file", () => maybeFlushMetadataFile())
} else {
// We want to keep the file and the in-memory topic ID in sync.
_topicId = None
}
}
@ -493,7 +471,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
case None =>
if (keepPartitionMetadataFile) {
_topicId = Some(topicId)
partitionMetadataFile match {
case Some(partMetadataFile) =>
@ -506,7 +483,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
}
}
}
private def reinitializeLeaderEpochCache(): Unit = lock synchronized {
leaderEpochCache = UnifiedLog.createLeaderEpochCache(
@ -1989,7 +1965,6 @@ object UnifiedLog extends Logging {
logDirFailureChannel: LogDirFailureChannel,
lastShutdownClean: Boolean = true,
topicId: Option[Uuid],
keepPartitionMetadataFile: Boolean,
numRemainingSegments: ConcurrentMap[String, Integer] = new ConcurrentHashMap[String, Integer],
remoteStorageSystemEnable: Boolean = false,
logOffsetsListener: LogOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER): UnifiedLog = {
@ -2034,7 +2009,6 @@ object UnifiedLog extends Logging {
leaderEpochCache,
producerStateManager,
topicId,
keepPartitionMetadataFile,
remoteStorageSystemEnable,
logOffsetsListener)
}

View File

@ -620,8 +620,7 @@ object KafkaMetadataLog extends Logging {
producerIdExpirationCheckIntervalMs = Int.MaxValue,
logDirFailureChannel = new LogDirFailureChannel(5),
lastShutdownClean = false,
topicId = Some(topicId),
keepPartitionMetadataFile = true
topicId = Some(topicId)
)
val metadataLog = new KafkaMetadataLog(

View File

@ -216,8 +216,7 @@ class BrokerServer(
kafkaScheduler,
time,
brokerTopicStats,
logDirFailureChannel,
keepPartitionMetadataFile = true)
logDirFailureChannel)
remoteLogManagerOpt = createRemoteLogManager()

View File

@ -452,8 +452,7 @@ class PartitionLockTest extends Logging {
log.producerIdExpirationCheckIntervalMs,
leaderEpochCache,
producerStateManager,
_topicId = None,
keepPartitionMetadataFile = true) {
_topicId = None) {
override def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, origin: AppendOrigin,
requestLocal: RequestLocal, verificationGuard: VerificationGuard): LogAppendInfo = {

View File

@ -3622,8 +3622,7 @@ class PartitionTest extends AbstractPartitionTest {
log.producerIdExpirationCheckIntervalMs,
leaderEpochCache,
producerStateManager,
_topicId = None,
keepPartitionMetadataFile = true) {
_topicId = None) {
override def appendAsFollower(records: MemoryRecords): LogAppendInfo = {
appendSemaphore.acquire()

View File

@ -117,8 +117,7 @@ abstract class AbstractLogCleanerIntegrationTest {
producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false),
producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true)
topicId = None)
logMap.put(partition, log)
this.logs += log
}

View File

@ -69,8 +69,7 @@ class BrokerCompressionTest {
producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false),
producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true
topicId = None
)
/* append two messages */

View File

@ -133,7 +133,7 @@ class LogCleanerManagerTest extends Logging {
// the exception should be caught and the partition that caused it marked as uncleanable
class LogMock extends UnifiedLog(offsets.logStartOffset, localLog, new BrokerTopicStats,
producerIdExpirationCheckIntervalMs, leaderEpochCache,
producerStateManager, _topicId = None, keepPartitionMetadataFile = true) {
producerStateManager, _topicId = None) {
// Throw an error in getFirstBatchTimestampForSegments since it is called in grabFilthiestLog()
override def getFirstBatchTimestampForSegments(segments: util.Collection[LogSegment]): util.Collection[java.lang.Long] =
throw new IllegalStateException("Error!")
@ -821,8 +821,7 @@ class LogCleanerManagerTest extends Logging {
producerStateManagerConfig = producerStateManagerConfig,
producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true)
topicId = None)
}
private def createLowRetentionLogConfig(segmentSize: Int, cleanupPolicy: String): LogConfig = {
@ -875,8 +874,7 @@ class LogCleanerManagerTest extends Logging {
producerStateManagerConfig = producerStateManagerConfig,
producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true
topicId = None
)
}

View File

@ -216,8 +216,7 @@ class LogCleanerTest extends Logging {
producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs,
leaderEpochCache = leaderEpochCache,
producerStateManager = producerStateManager,
_topicId = None,
keepPartitionMetadataFile = true) {
_topicId = None) {
override def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment]): Unit = {
deleteStartLatch.countDown()
if (!deleteCompleteLatch.await(5000, TimeUnit.MILLISECONDS)) {
@ -2093,8 +2092,7 @@ class LogCleanerTest extends Logging {
producerStateManagerConfig = producerStateManagerConfig,
producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true
topicId = None
)
}

View File

@ -156,8 +156,7 @@ class LogConcurrencyTest {
producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false),
producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true
topicId = None
)
}

View File

@ -127,7 +127,6 @@ class LogLoaderTest {
brokerTopicStats = new BrokerTopicStats(),
logDirFailureChannel = logDirFailureChannel,
time = time,
keepPartitionMetadataFile = true,
remoteStorageSystemEnable = config.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
initialTaskDelayMs = config.logInitialTaskDelayMs) {
@ -324,7 +323,7 @@ class LogLoaderTest {
logDirFailureChannel)
new UnifiedLog(offsets.logStartOffset, localLog, brokerTopicStats,
producerIdExpirationCheckIntervalMs, leaderEpochCache, producerStateManager,
None, keepPartitionMetadataFile = true)
None)
}
// Retain snapshots for the last 2 segments
@ -447,8 +446,7 @@ class LogLoaderTest {
producerIdExpirationCheckIntervalMs = 30000,
leaderEpochCache = leaderEpochCache,
producerStateManager = stateManager,
_topicId = None,
keepPartitionMetadataFile = true)
_topicId = None)
verify(stateManager).updateMapEndOffset(0L)
verify(stateManager).removeStraySnapshots(any())
@ -557,8 +555,7 @@ class LogLoaderTest {
producerIdExpirationCheckIntervalMs = 30000,
leaderEpochCache = leaderEpochCache,
producerStateManager = stateManager,
_topicId = None,
keepPartitionMetadataFile = true)
_topicId = None)
verify(stateManager).removeStraySnapshots(any[java.util.List[java.lang.Long]])
verify(stateManager, times(2)).updateMapEndOffset(0L)

View File

@ -975,7 +975,6 @@ class LogManagerTest {
// not clean shutdown
lastShutdownClean = false,
topicId = None,
keepPartitionMetadataFile = false,
// pass mock map for verification later
numRemainingSegments = mockMap)
@ -1383,7 +1382,6 @@ class LogManagerTest {
time = Time.SYSTEM,
brokerTopicStats = new BrokerTopicStats,
logDirFailureChannel = new LogDirFailureChannel(1),
keepPartitionMetadataFile = true,
interBrokerProtocolVersion = MetadataVersion.latestTesting,
remoteStorageSystemEnable = false,
initialTaskDelayMs = 0)

View File

@ -103,7 +103,6 @@ object LogTestUtils {
producerIdExpirationCheckIntervalMs: Int = TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
lastShutdownClean: Boolean = true,
topicId: Option[Uuid] = None,
keepPartitionMetadataFile: Boolean = true,
numRemainingSegments: ConcurrentMap[String, Integer] = new ConcurrentHashMap[String, Integer],
remoteStorageSystemEnable: Boolean = false,
remoteLogManager: Option[RemoteLogManager] = None,
@ -122,7 +121,6 @@ object LogTestUtils {
logDirFailureChannel = new LogDirFailureChannel(10),
lastShutdownClean = lastShutdownClean,
topicId = topicId,
keepPartitionMetadataFile = keepPartitionMetadataFile,
numRemainingSegments = numRemainingSegments,
remoteStorageSystemEnable = remoteStorageSystemEnable,
logOffsetsListener = logOffsetsListener

View File

@ -1964,26 +1964,6 @@ class UnifiedLogTest {
log.close()
}
@Test
def testNoOpWhenKeepPartitionMetadataFileIsFalse(): Unit = {
val logConfig = LogTestUtils.createLogConfig()
val log = createLog(logDir, logConfig, keepPartitionMetadataFile = false)
val topicId = Uuid.randomUuid()
log.assignTopicId(topicId)
// We should not write to this file or set the topic ID
assertFalse(log.partitionMetadataFile.get.exists())
assertEquals(None, log.topicId)
log.close()
val log2 = createLog(logDir, logConfig, topicId = Some(Uuid.randomUuid()), keepPartitionMetadataFile = false)
// We should not write to this file or set the topic ID
assertFalse(log2.partitionMetadataFile.get.exists())
assertEquals(None, log2.topicId)
log2.close()
}
@Test
def testLogFailsWhenInconsistentTopicIdSet(): Unit = {
val logConfig = LogTestUtils.createLogConfig()
@ -4499,13 +4479,12 @@ class UnifiedLogTest {
producerIdExpirationCheckIntervalMs: Int = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
lastShutdownClean: Boolean = true,
topicId: Option[Uuid] = None,
keepPartitionMetadataFile: Boolean = true,
remoteStorageSystemEnable: Boolean = false,
remoteLogManager: Option[RemoteLogManager] = None,
logOffsetsListener: LogOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER): UnifiedLog = {
val log = LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, time, logStartOffset, recoveryPoint,
maxTransactionTimeoutMs, producerStateManagerConfig, producerIdExpirationCheckIntervalMs,
lastShutdownClean, topicId, keepPartitionMetadataFile, new ConcurrentHashMap[String, Integer],
lastShutdownClean, topicId, new ConcurrentHashMap[String, Integer],
remoteStorageSystemEnable, remoteLogManager, logOffsetsListener)
logsToClose = logsToClose :+ log
log

View File

@ -2929,8 +2929,7 @@ class ReplicaManagerTest {
producerIdExpirationCheckIntervalMs = 30000,
leaderEpochCache = leaderEpochCache,
producerStateManager = producerStateManager,
_topicId = topicId,
keepPartitionMetadataFile = true) {
_topicId = topicId) {
override def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch] = {
assertEquals(leaderEpoch, leaderEpochFromLeader)

View File

@ -100,8 +100,7 @@ class DumpLogSegmentsTest {
producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false),
producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true
topicId = None
)
log
}

View File

@ -164,7 +164,7 @@ class SchedulerTest {
localLog = localLog,
brokerTopicStats, producerIdExpirationCheckIntervalMs,
leaderEpochCache, producerStateManager,
_topicId = None, keepPartitionMetadataFile = true)
_topicId = None)
assertTrue(scheduler.taskRunning(log.producerExpireCheck))
log.close()
assertFalse(scheduler.taskRunning(log.producerExpireCheck))

View File

@ -988,7 +988,6 @@ object TestUtils extends Logging {
time = time,
brokerTopicStats = new BrokerTopicStats,
logDirFailureChannel = new LogDirFailureChannel(logDirs.size),
keepPartitionMetadataFile = true,
interBrokerProtocolVersion = interBrokerProtocolVersion,
remoteStorageSystemEnable = remoteStorageSystemEnable,
initialTaskDelayMs = initialTaskDelayMs)

View File

@ -146,7 +146,6 @@ public class ReplicaFetcherThreadBenchmark {
setBrokerTopicStats(brokerTopicStats).
setLogDirFailureChannel(logDirFailureChannel).
setTime(Time.SYSTEM).
setKeepPartitionMetadataFile(true).
build();
replicaManager = new ReplicaManagerBuilder().

View File

@ -78,7 +78,6 @@ public class StressTestLog {
new LogDirFailureChannel(10),
true,
Option.empty(),
true,
new ConcurrentHashMap<>(),
false,
LogOffsetsListener.NO_OP_OFFSETS_LISTENER

View File

@ -314,7 +314,6 @@ public class TestLinearWriteSpeed {
new LogDirFailureChannel(10),
true,
Option.empty(),
true,
new CopyOnWriteMap<>(),
false,
LogOffsetsListener.NO_OP_OFFSETS_LISTENER

View File

@ -115,7 +115,7 @@ public class PartitionMakeFollowerBenchmark {
setScheduler(scheduler).
setBrokerTopicStats(brokerTopicStats).
setLogDirFailureChannel(logDirFailureChannel).
setTime(Time.SYSTEM).setKeepPartitionMetadataFile(true).
setTime(Time.SYSTEM).
build();
TopicPartition tp = new TopicPartition("topic", 0);

View File

@ -106,7 +106,6 @@ public class UpdateFollowerFetchStateBenchmark {
setBrokerTopicStats(brokerTopicStats).
setLogDirFailureChannel(logDirFailureChannel).
setTime(Time.SYSTEM).
setKeepPartitionMetadataFile(true).
build();
OffsetCheckpoints offsetCheckpoints = Mockito.mock(OffsetCheckpoints.class);
Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), topicPartition)).thenReturn(Optional.of(0L));

View File

@ -141,7 +141,6 @@ public class PartitionCreationBench {
setBrokerTopicStats(brokerTopicStats).
setLogDirFailureChannel(failureChannel).
setTime(Time.SYSTEM).
setKeepPartitionMetadataFile(true).
build();
scheduler.startup();
this.quotaManagers = QuotaFactory.instantiate(this.brokerProperties, this.metrics, this.time, "");