KAFKA-14484: Move UnifiedLog static methods to storage (#18039)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Mickael Maison 2025-02-11 09:55:32 +01:00 committed by GitHub
parent f5dd661cb5
commit ece91e9247
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 387 additions and 361 deletions

View File

@ -85,6 +85,7 @@
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.apache.kafka.storage.internals"/>
<allow pkg="org.apache.kafka.storage.log.metrics"/>
<allow pkg="org.apache.kafka.test" />
<allow pkg="com.github.benmanes.caffeine.cache" />
<allow pkg="org.apache.kafka.coordinator.transaction"/>

View File

@ -17,12 +17,11 @@
package kafka.cluster
import kafka.log.UnifiedLog
import kafka.server.MetadataCache
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.storage.internals.log.LogOffsetMetadata
import org.apache.kafka.storage.internals.log.{LogOffsetMetadata, UnifiedLog}
import java.util.concurrent.atomic.AtomicReference
@ -75,7 +74,7 @@ case class ReplicaState(
object ReplicaState {
val Empty: ReplicaState = ReplicaState(
logEndOffsetMetadata = LogOffsetMetadata.UNKNOWN_OFFSET_METADATA,
logStartOffset = UnifiedLog.UnknownOffset,
logStartOffset = UnifiedLog.UNKNOWN_OFFSET,
lastFetchLeaderLogEndOffset = 0L,
lastFetchTimeMs = 0L,
lastCaughtUpTimeMs = 0L,
@ -157,9 +156,9 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition, val metadat
if (isNewLeader) {
ReplicaState(
logStartOffset = UnifiedLog.UnknownOffset,
logStartOffset = UnifiedLog.UNKNOWN_OFFSET,
logEndOffsetMetadata = LogOffsetMetadata.UNKNOWN_OFFSET_METADATA,
lastFetchLeaderLogEndOffset = UnifiedLog.UnknownOffset,
lastFetchLeaderLogEndOffset = UnifiedLog.UNKNOWN_OFFSET,
lastFetchTimeMs = 0L,
lastCaughtUpTimeMs = lastCaughtUpTimeMs,
brokerEpoch = Option.empty

View File

@ -34,7 +34,7 @@ import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.server.config.ServerConfigs
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, LastRecord, LogCleaningAbortedException, LogDirFailureChannel, LogSegment, LogSegmentOffsetOverflowException, OffsetMap, SkimpyOffsetMap, ThreadShutdownException, TransactionIndex}
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, LastRecord, LogCleaningAbortedException, LogDirFailureChannel, LogSegment, LogSegmentOffsetOverflowException, OffsetMap, SkimpyOffsetMap, ThreadShutdownException, TransactionIndex, UnifiedLog => JUnifiedLog}
import org.apache.kafka.storage.internals.utils.Throttler
import scala.jdk.CollectionConverters._
@ -655,7 +655,7 @@ private[log] class Cleaner(val id: Int,
legacyDeleteHorizonMs: Long,
upperBoundOffsetOfCleaningRound: Long): Unit = {
// create a new segment with a suffix appended to the name of the log and indexes
val cleaned = UnifiedLog.createNewCleanedSegment(log.dir, log.config, segments.head.baseOffset)
val cleaned = JUnifiedLog.createNewCleanedSegment(log.dir, log.config, segments.head.baseOffset)
transactionMetadata.cleanedIndex = Some(cleaned.txnIndex)
try {

View File

@ -41,7 +41,7 @@ import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsem
import java.util.{Collections, OptionalLong, Properties}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.{FileLock, Scheduler}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig, RemoteIndexCache}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig, UnifiedLog => JUnifiedLog, RemoteIndexCache}
import org.apache.kafka.storage.internals.checkpoint.{CleanShutdownFileHandler, OffsetCheckpointFile}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@ -323,7 +323,7 @@ class LogManager(logDirs: Seq[File],
topicConfigOverrides: Map[String, LogConfig],
numRemainingSegments: ConcurrentMap[String, Integer],
isStray: UnifiedLog => Boolean): UnifiedLog = {
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
val topicPartition = JUnifiedLog.parseTopicPartitionName(logDir)
val config = topicConfigOverrides.getOrElse(topicPartition.topic, defaultConfig)
val logRecoveryPoint = recoveryPoints.getOrDefault(topicPartition, 0L)
val logStartOffset = logStartOffsets.getOrDefault(topicPartition, 0L)
@ -345,9 +345,9 @@ class LogManager(logDirs: Seq[File],
numRemainingSegments = numRemainingSegments,
remoteStorageSystemEnable = remoteStorageSystemEnable)
if (logDir.getName.endsWith(UnifiedLog.DeleteDirSuffix)) {
if (logDir.getName.endsWith(JUnifiedLog.DELETE_DIR_SUFFIX)) {
addLogToBeDeleted(log)
} else if (logDir.getName.endsWith(UnifiedLog.StrayDirSuffix)) {
} else if (logDir.getName.endsWith(JUnifiedLog.STRAY_DIR_SUFFIX)) {
addStrayLog(topicPartition, log)
warn(s"Loaded stray log: $logDir")
} else if (isStray(log)) {
@ -355,7 +355,7 @@ class LogManager(logDirs: Seq[File],
// Broker with an offline directory may be unable to detect it still holds a to-be-deleted replica,
// and can create a conflicting topic partition for a new incarnation of the topic in one of the remaining online directories.
// So upon a restart in which the offline directory is back online we need to clean up the old replica directory.
log.renameDir(UnifiedLog.logStrayDirName(log.topicPartition), shouldReinitialize = false)
log.renameDir(JUnifiedLog.logStrayDirName(log.topicPartition), shouldReinitialize = false)
addStrayLog(log.topicPartition, log)
warn(s"Log in ${logDir.getAbsolutePath} marked stray and renamed to ${log.dir.getAbsolutePath}")
} else {
@ -461,7 +461,7 @@ class LogManager(logDirs: Seq[File],
// Ignore remote-log-index-cache directory as that is index cache maintained by tiered storage subsystem
// but not any topic-partition dir.
!logDir.getName.equals(RemoteIndexCache.DIR_NAME) &&
UnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)
JUnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)
numTotalLogs += logsToLoad.length
numRemainingLogs.put(logDirAbsolutePath, logsToLoad.length)
loadLogsCompletedFlags.put(logDirAbsolutePath, logsToLoad.isEmpty)
@ -1037,9 +1037,9 @@ class LogManager(logDirs: Seq[File],
val logDirName = {
if (isFuture)
UnifiedLog.logFutureDirName(topicPartition)
JUnifiedLog.logFutureDirName(topicPartition)
else
UnifiedLog.logDirName(topicPartition)
JUnifiedLog.logDirName(topicPartition)
}
val logDir = logDirs
@ -1213,7 +1213,7 @@ class LogManager(logDirs: Seq[File],
def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = {
val topicPartition = destLog.topicPartition
destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true)
destLog.renameDir(JUnifiedLog.logDirName(topicPartition), shouldReinitialize = true)
// the metrics tags still contain "future", so we have to remove it.
// we will add metrics back after sourceLog remove the metrics
destLog.removeLogMetrics()
@ -1234,7 +1234,7 @@ class LogManager(logDirs: Seq[File],
try {
sourceLog.foreach { srcLog =>
srcLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), shouldReinitialize = true)
srcLog.renameDir(JUnifiedLog.logDeleteDirName(topicPartition), shouldReinitialize = true)
// Now that replica in source log directory has been successfully renamed for deletion.
// Close the log, update checkpoint files, and enqueue this log to be deleted.
srcLog.close()
@ -1285,10 +1285,10 @@ class LogManager(logDirs: Seq[File],
}
if (isStray) {
// Move aside stray partitions, don't delete them
removedLog.renameDir(UnifiedLog.logStrayDirName(topicPartition), shouldReinitialize = false)
removedLog.renameDir(JUnifiedLog.logStrayDirName(topicPartition), shouldReinitialize = false)
warn(s"Log for partition ${removedLog.topicPartition} is marked as stray and renamed to ${removedLog.dir.getAbsolutePath}")
} else {
removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), shouldReinitialize = false)
removedLog.renameDir(JUnifiedLog.logDeleteDirName(topicPartition), shouldReinitialize = false)
addLogToBeDeleted(removedLog)
info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
}

View File

@ -20,7 +20,6 @@ package kafka.log
import kafka.log.remote.RemoteLogManager
import kafka.utils._
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.DescribeProducersResponseData
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record._
@ -30,16 +29,14 @@ import org.apache.kafka.common.requests.ProduceResponse.RecordError
import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils}
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.server.common.{OffsetAndEpoch, RequestLocal}
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.storage.log.{FetchIsolation, UnexpectedAppendOffsetException}
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.LocalLog.SplitSegmentResult
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, OffsetResultHolder, OffsetsOutOfOrderException, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, SegmentDeletionReason, VerificationGuard, UnifiedLog => JUnifiedLog}
import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats}
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogMetricNames, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, OffsetResultHolder, OffsetsOutOfOrderException, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, SegmentDeletionReason, VerificationGuard, UnifiedLog => JUnifiedLog}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import java.io.{File, IOException}
import java.lang.{Long => JLong}
@ -107,7 +104,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/* A lock that guards all modifications to the log */
private val lock = new Object
private val validatorMetricsRecorder = newValidatorMetricsRecorder(brokerTopicStats.allTopicsStats)
private val validatorMetricsRecorder = JUnifiedLog.newValidatorMetricsRecorder(brokerTopicStats.allTopicsStats)
/* The earliest offset which is part of an incomplete transaction. This is used to compute the
* last stable offset (LSO) in ReplicaManager. Note that it is possible that the "true" first unstable offset
@ -177,7 +174,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
def remoteLogEnabled(): Boolean = {
UnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config, topicPartition.topic())
JUnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config, topicPartition.topic())
}
/**
@ -425,15 +422,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
private[log] def newMetrics(): Unit = {
val tags = (Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString) ++
(if (isFuture) Map("is-future" -> "true") else Map.empty)).asJava
metricsGroup.newGauge(LogMetricNames.NumLogSegments, () => numberOfSegments, tags)
metricsGroup.newGauge(LogMetricNames.LogStartOffset, () => logStartOffset, tags)
metricsGroup.newGauge(LogMetricNames.LogEndOffset, () => logEndOffset, tags)
metricsGroup.newGauge(LogMetricNames.Size, () => size, tags)
metricNames = Map(LogMetricNames.NumLogSegments -> tags,
LogMetricNames.LogStartOffset -> tags,
LogMetricNames.LogEndOffset -> tags,
LogMetricNames.Size -> tags)
metricsGroup.newGauge(LogMetricNames.NUM_LOG_SEGMENTS, () => numberOfSegments, tags)
metricsGroup.newGauge(LogMetricNames.LOG_START_OFFSET, () => logStartOffset, tags)
metricsGroup.newGauge(LogMetricNames.LOG_END_OFFSET, () => logEndOffset, tags)
metricsGroup.newGauge(LogMetricNames.SIZE, () => size, tags)
metricNames = Map(LogMetricNames.NUM_LOG_SEGMENTS -> tags,
LogMetricNames.LOG_START_OFFSET -> tags,
LogMetricNames.LOG_END_OFFSET -> tags,
LogMetricNames.SIZE -> tags)
}
val producerExpireCheck: ScheduledFuture[_] = scheduler.schedule("PeriodicProducerExpirationCheck", () => removeExpiredProducers(time.milliseconds),
@ -485,8 +481,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
private def reinitializeLeaderEpochCache(): Unit = lock synchronized {
leaderEpochCache = UnifiedLog.createLeaderEpochCache(
dir, topicPartition, logDirFailureChannel, Option.apply(leaderEpochCache), scheduler)
leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
dir, topicPartition, logDirFailureChannel, Optional.of(leaderEpochCache), scheduler)
}
private def updateHighWatermarkWithLogEndOffset(): Unit = {
@ -817,7 +813,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// we may still be able to recover if the log is empty
// one example: fetching from log start offset on the leader which is not batch aligned,
// which may happen as a result of AdminClient#deleteRecords()
val hasFirstOffset = appendInfo.firstOffset != UnifiedLog.UnknownOffset
val hasFirstOffset = appendInfo.firstOffset != JUnifiedLog.UNKNOWN_OFFSET
val firstOffset = if (hasFirstOffset) appendInfo.firstOffset else records.batches.iterator().next().baseOffset()
val firstOrLast = if (hasFirstOffset) "First offset" else "Last offset of the first batch"
@ -1081,7 +1077,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
requireOffsetsMonotonic: Boolean,
leaderEpoch: Int): LogAppendInfo = {
var validBytesCount = 0
var firstOffset = UnifiedLog.UnknownOffset
var firstOffset = JUnifiedLog.UNKNOWN_OFFSET
var lastOffset = -1L
var lastLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH
var sourceCompression = CompressionType.NONE
@ -1267,6 +1263,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
} else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) {
val latestEpoch = leaderEpochCache.latestEpoch()
val epoch = if (latestEpoch.isPresent) Optional.of[Integer](latestEpoch.getAsInt) else Optional.empty[Integer]()
new OffsetResultHolder(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epoch))
} else if (targetTimestamp == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) {
if (remoteLogEnabled()) {
@ -1481,7 +1478,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
private def deleteRetentionMsBreachedSegments(): Int = {
val retentionMs = localRetentionMs(config, remoteLogEnabledAndRemoteCopyEnabled())
val retentionMs = JUnifiedLog.localRetentionMs(config, remoteLogEnabledAndRemoteCopyEnabled())
if (retentionMs < 0) return 0
val startMs = time.milliseconds
@ -1495,7 +1492,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
private def deleteRetentionSizeBreachedSegments(): Int = {
val retentionSize: Long = localRetentionSize(config, remoteLogEnabledAndRemoteCopyEnabled())
val retentionSize: Long = JUnifiedLog.localRetentionSize(config, remoteLogEnabledAndRemoteCopyEnabled())
if (retentionSize < 0 || size < retentionSize) return 0
var diff = size - retentionSize
def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
@ -1535,7 +1532,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* The log size in bytes for all segments that are only in local log but not yet in remote log.
*/
def onlyLocalLogSegmentsSize: Long =
UnifiedLog.sizeInBytes(logSegments.stream.filter(_.baseOffset >= highestOffsetInRemoteStorage()).collect(Collectors.toList[LogSegment]))
LogSegments.sizeInBytes(logSegments.stream.filter(_.baseOffset >= highestOffsetInRemoteStorage()).collect(Collectors.toList[LogSegment]))
/**
* The number of segments that are only in local log but not yet in remote log.
@ -1591,7 +1588,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
Note that this is only required for pre-V2 message formats because these do not store the first message offset
in the header.
*/
val rollOffset = if (appendInfo.firstOffset == UnifiedLog.UnknownOffset)
val rollOffset = if (appendInfo.firstOffset == JUnifiedLog.UNKNOWN_OFFSET)
maxOffsetInMessages - Integer.MAX_VALUE
else
appendInfo.firstOffset
@ -1828,9 +1825,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
private[log] def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment]): Unit = {
lock synchronized {
localLog.checkIfMemoryMappedBufferClosed()
val deletedSegments = UnifiedLog.replaceSegments(localLog.segments, newSegments, oldSegments, dir, topicPartition,
config, scheduler, logDirFailureChannel, logIdent)
deleteProducerSnapshots(deletedSegments.toList.asJava, asyncDelete = true)
val deletedSegments = LocalLog.replaceSegments(localLog.segments, newSegments.asJava, oldSegments.asJava, dir, topicPartition,
config, scheduler, logDirFailureChannel, logIdent, false)
deleteProducerSnapshots(deletedSegments, asyncDelete = true)
}
}
@ -1867,7 +1864,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
private[log] def splitOverflowedSegment(segment: LogSegment): List[LogSegment] = lock synchronized {
val result = UnifiedLog.splitOverflowedSegment(segment, localLog.segments, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent)
val result = LocalLog.splitOverflowedSegment(segment, localLog.segments, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent)
deleteProducerSnapshots(result.deletedSegments, asyncDelete = true)
result.newSegments.asScala.toList
}
@ -1878,34 +1875,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
object UnifiedLog extends Logging {
val LogFileSuffix: String = LogFileUtils.LOG_FILE_SUFFIX
val IndexFileSuffix: String = LogFileUtils.INDEX_FILE_SUFFIX
val TimeIndexFileSuffix: String = LogFileUtils.TIME_INDEX_FILE_SUFFIX
val TxnIndexFileSuffix: String = LogFileUtils.TXN_INDEX_FILE_SUFFIX
val CleanedFileSuffix: String = LogFileUtils.CLEANED_FILE_SUFFIX
val SwapFileSuffix: String = LogFileUtils.SWAP_FILE_SUFFIX
val DeleteDirSuffix: String = LogFileUtils.DELETE_DIR_SUFFIX
val StrayDirSuffix: String = LogFileUtils.STRAY_DIR_SUFFIX
val UnknownOffset: Long = LocalLog.UNKNOWN_OFFSET
def isRemoteLogEnabled(remoteStorageSystemEnable: Boolean,
config: LogConfig,
topic: String): Boolean = {
// Remote log is enabled only for non-compact and non-internal topics
remoteStorageSystemEnable &&
!(config.compact || Topic.isInternal(topic)
|| TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME.equals(topic)
|| Topic.CLUSTER_METADATA_TOPIC_NAME.equals(topic)) &&
config.remoteStorageEnable()
}
def apply(dir: File,
config: LogConfig,
@ -1925,20 +1894,20 @@ object UnifiedLog extends Logging {
logOffsetsListener: LogOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER): UnifiedLog = {
// create the log directory if it doesn't exist
Files.createDirectories(dir.toPath)
val topicPartition = UnifiedLog.parseTopicPartitionName(dir)
val topicPartition = JUnifiedLog.parseTopicPartitionName(dir)
val segments = new LogSegments(topicPartition)
// The created leaderEpochCache will be truncated by LogLoader if necessary
// so it is guaranteed that the epoch entries will be correct even when on-disk
// checkpoint was stale (due to async nature of LeaderEpochFileCache#truncateFromStart/End).
val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
val leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
dir,
topicPartition,
logDirFailureChannel,
None,
Optional.empty,
scheduler)
val producerStateManager = new ProducerStateManager(topicPartition, dir,
maxTransactionTimeoutMs, producerStateManagerConfig, time)
val isRemoteLogEnabled = UnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config, topicPartition.topic)
val isRemoteLogEnabled = JUnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config, topicPartition.topic)
val offsets = new LogLoader(
dir,
topicPartition,
@ -1968,108 +1937,6 @@ object UnifiedLog extends Logging {
logOffsetsListener)
}
def logDeleteDirName(topicPartition: TopicPartition): String = LocalLog.logDeleteDirName(topicPartition)
def logFutureDirName(topicPartition: TopicPartition): String = LocalLog.logFutureDirName(topicPartition)
def logStrayDirName(topicPartition: TopicPartition): String = LocalLog.logStrayDirName(topicPartition)
def logDirName(topicPartition: TopicPartition): String = LocalLog.logDirName(topicPartition)
def transactionIndexFile(dir: File, offset: Long, suffix: String = ""): File = LogFileUtils.transactionIndexFile(dir, offset, suffix)
def offsetFromFile(file: File): Long = LogFileUtils.offsetFromFile(file)
def sizeInBytes(segments: util.Collection[LogSegment]): Long = LogSegments.sizeInBytes(segments)
def parseTopicPartitionName(dir: File): TopicPartition = LocalLog.parseTopicPartitionName(dir)
/**
* Create a new LeaderEpochFileCache instance and load the epoch entries from the backing checkpoint file or
* the provided currentCache (if not empty).
*
* @param dir The directory in which the log will reside
* @param topicPartition The topic partition
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
* @param currentCache The current LeaderEpochFileCache instance (if any)
* @param scheduler The scheduler for executing asynchronous tasks
* @return The new LeaderEpochFileCache instance (if created), none otherwise
*/
def createLeaderEpochCache(dir: File,
topicPartition: TopicPartition,
logDirFailureChannel: LogDirFailureChannel,
currentCache: Option[LeaderEpochFileCache],
scheduler: Scheduler): LeaderEpochFileCache = {
val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)
val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel)
currentCache.map(_.withCheckpoint(checkpointFile)).getOrElse(new LeaderEpochFileCache(topicPartition, checkpointFile, scheduler))
}
private[log] def replaceSegments(existingSegments: LogSegments,
newSegments: Seq[LogSegment],
oldSegments: Seq[LogSegment],
dir: File,
topicPartition: TopicPartition,
config: LogConfig,
scheduler: Scheduler,
logDirFailureChannel: LogDirFailureChannel,
logPrefix: String,
isRecoveredSwapFile: Boolean = false): Iterable[LogSegment] = {
LocalLog.replaceSegments(existingSegments,
newSegments.asJava,
oldSegments.asJava,
dir,
topicPartition,
config,
scheduler,
logDirFailureChannel,
logPrefix,
isRecoveredSwapFile).asScala
}
private[log] def splitOverflowedSegment(segment: LogSegment,
existingSegments: LogSegments,
dir: File,
topicPartition: TopicPartition,
config: LogConfig,
scheduler: Scheduler,
logDirFailureChannel: LogDirFailureChannel,
logPrefix: String): SplitSegmentResult = {
LocalLog.splitOverflowedSegment(segment, existingSegments, dir, topicPartition, config, scheduler, logDirFailureChannel, logPrefix)
}
private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = {
LocalLog.createNewCleanedSegment(dir, logConfig, baseOffset)
}
// Visible for benchmarking
def newValidatorMetricsRecorder(allTopicsStats: BrokerTopicMetrics): LogValidator.MetricsRecorder = {
new LogValidator.MetricsRecorder {
def recordInvalidMagic(): Unit =
allTopicsStats.invalidMagicNumberRecordsPerSec.mark()
def recordInvalidOffset(): Unit =
allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark()
def recordInvalidSequence(): Unit =
allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark()
def recordInvalidChecksums(): Unit =
allTopicsStats.invalidMessageCrcRecordsPerSec.mark()
def recordNoKeyCompactedTopic(): Unit =
allTopicsStats.noKeyCompactedTopicRecordsPerSec.mark()
}
}
private[log] def localRetentionMs(config: LogConfig, remoteLogEnabledAndRemoteCopyEnabled: Boolean): Long = {
if (remoteLogEnabledAndRemoteCopyEnabled) config.localRetentionMs else config.retentionMs
}
private[log] def localRetentionSize(config: LogConfig, remoteLogEnabledAndRemoteCopyEnabled: Boolean): Long = {
if (remoteLogEnabledAndRemoteCopyEnabled) config.localRetentionBytes else config.retentionSize
}
/**
* Wraps the value of iterator.next() in an option.
* Note: this facility is a part of the Iterator class starting from scala v2.13.
@ -2087,20 +1954,9 @@ object UnifiedLog extends Logging {
}
object LogMetricNames {
val NumLogSegments: String = "NumLogSegments"
val LogStartOffset: String = "LogStartOffset"
val LogEndOffset: String = "LogEndOffset"
val Size: String = "Size"
def allMetricNames: List[String] = {
List(NumLogSegments, LogStartOffset, LogEndOffset, Size)
}
}
case class RetentionMsBreach(log: UnifiedLog, remoteLogEnabledAndRemoteCopyEnabled: Boolean) extends SegmentDeletionReason {
override def logReason(toDelete: util.List[LogSegment]): Unit = {
val retentionMs = UnifiedLog.localRetentionMs(log.config, remoteLogEnabledAndRemoteCopyEnabled)
val retentionMs = JUnifiedLog.localRetentionMs(log.config, remoteLogEnabledAndRemoteCopyEnabled)
toDelete.forEach { segment =>
if (segment.largestRecordTimestamp.isPresent)
if (remoteLogEnabledAndRemoteCopyEnabled)
@ -2126,7 +1982,7 @@ case class RetentionSizeBreach(log: UnifiedLog, remoteLogEnabledAndRemoteCopyEna
var size = log.size
toDelete.forEach { segment =>
size -= segment.size
if (remoteLogEnabledAndRemoteCopyEnabled) log.info(s"Deleting segment $segment due to local log retention size ${UnifiedLog.localRetentionSize(log.config, remoteLogEnabledAndRemoteCopyEnabled)} breach. " +
if (remoteLogEnabledAndRemoteCopyEnabled) log.info(s"Deleting segment $segment due to local log retention size ${JUnifiedLog.localRetentionSize(log.config, remoteLogEnabledAndRemoteCopyEnabled)} breach. " +
s"Local log size after deletion will be $size.")
else log.info(s"Deleting segment $segment due to log retention size ${log.config.retentionSize} breach. Log size " +
s"after deletion will be $size.")

View File

@ -41,7 +41,7 @@ import org.apache.kafka.snapshot.RawSnapshotWriter
import org.apache.kafka.snapshot.SnapshotPath
import org.apache.kafka.snapshot.Snapshots
import org.apache.kafka.storage.internals
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, LogDirFailureChannel, LogStartOffsetIncrementReason, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, LogDirFailureChannel, LogStartOffsetIncrementReason, ProducerStateManagerConfig, UnifiedLog => JUnifiedLog}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import java.io.File
@ -109,7 +109,7 @@ final class KafkaMetadataLog private (
}
private def handleAndConvertLogAppendInfo(appendInfo: internals.log.LogAppendInfo): LogAppendInfo = {
if (appendInfo.firstOffset != UnifiedLog.UnknownOffset)
if (appendInfo.firstOffset != JUnifiedLog.UNKNOWN_OFFSET)
new LogAppendInfo(appendInfo.firstOffset, appendInfo.lastOffset)
else
throw new KafkaException(s"Append failed unexpectedly")

View File

@ -25,7 +25,6 @@ import java.util.concurrent.CompletableFuture
import java.util.{Map => JMap}
import java.util.{Collection => JCollection}
import kafka.log.LogManager
import kafka.log.UnifiedLog
import kafka.server.KafkaConfig
import kafka.utils.CoreUtils
import kafka.utils.Logging
@ -49,6 +48,7 @@ import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.util.{FileLock, KafkaScheduler}
import org.apache.kafka.server.fault.FaultHandler
import org.apache.kafka.server.util.timer.SystemTimer
import org.apache.kafka.storage.internals.log.UnifiedLog
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters._

View File

@ -18,7 +18,6 @@ package kafka.server
import java.io.File
import java.util.concurrent.CompletableFuture
import kafka.log.UnifiedLog
import kafka.utils.{CoreUtils, Logging, Mx4jLoader}
import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
import org.apache.kafka.common.internals.Topic
@ -31,7 +30,7 @@ import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsem
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.{ProcessRole, ServerSocketFactory}
import org.apache.kafka.server.config.ServerTopicConfigSynonyms
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.kafka.storage.internals.log.{LogConfig, UnifiedLog}
import org.slf4j.Logger
import java.util

View File

@ -58,7 +58,7 @@ import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFe
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, VerificationGuard}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog => JUnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import java.io.File
@ -244,10 +244,10 @@ object ReplicaManager {
def createLogReadResult(e: Throwable): LogReadResult = {
LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
divergingEpoch = None,
highWatermark = UnifiedLog.UnknownOffset,
leaderLogStartOffset = UnifiedLog.UnknownOffset,
leaderLogEndOffset = UnifiedLog.UnknownOffset,
followerLogStartOffset = UnifiedLog.UnknownOffset,
highWatermark = JUnifiedLog.UNKNOWN_OFFSET,
leaderLogStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
leaderLogEndOffset = JUnifiedLog.UNKNOWN_OFFSET,
followerLogStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
fetchTimeMs = -1L,
lastStableOffset = None,
exception = Some(e))
@ -1084,7 +1084,7 @@ class ReplicaManager(val config: KafkaConfig,
/* If the topic name is exceptionally long, we can't support altering the log directory.
* See KAFKA-4893 for details.
* TODO: fix this by implementing topic IDs. */
if (UnifiedLog.logFutureDirName(topicPartition).length > 255)
if (JUnifiedLog.logFutureDirName(topicPartition).length > 255)
throw new InvalidTopicException("The topic name is too long.")
if (!logManager.isLogDirOnline(destinationDir))
throw new KafkaStorageException(s"Log directory $destinationDir is offline")
@ -1746,10 +1746,10 @@ class ReplicaManager(val config: KafkaConfig,
LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
divergingEpoch = None,
highWatermark = UnifiedLog.UnknownOffset,
leaderLogStartOffset = UnifiedLog.UnknownOffset,
leaderLogEndOffset = UnifiedLog.UnknownOffset,
followerLogStartOffset = UnifiedLog.UnknownOffset,
highWatermark = JUnifiedLog.UNKNOWN_OFFSET,
leaderLogStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
leaderLogEndOffset = JUnifiedLog.UNKNOWN_OFFSET,
followerLogStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
fetchTimeMs = -1L,
lastStableOffset = None,
exception = Some(e)

View File

@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.JsonNode
import java.io._
import com.fasterxml.jackson.databind.node.{IntNode, JsonNodeFactory, ObjectNode, TextNode}
import kafka.log._
import kafka.utils.CoreUtils
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.message.ConsumerProtocolAssignment
@ -49,7 +48,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
import org.apache.kafka.snapshot.Snapshots
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.storage.internals.log.{CorruptSnapshotException, LogFileUtils, OffsetIndex, ProducerStateManager, TimeIndex, TransactionIndex}
import org.apache.kafka.storage.internals.log.{CorruptSnapshotException, LogFileUtils, OffsetIndex, ProducerStateManager, TimeIndex, TransactionIndex, UnifiedLog => JUnifiedLog}
import org.apache.kafka.tools.api.{Decoder, StringDecoder}
import java.nio.ByteBuffer
@ -77,16 +76,16 @@ object DumpLogSegments {
val filename = file.getName
val suffix = filename.substring(filename.lastIndexOf("."))
suffix match {
case UnifiedLog.LogFileSuffix | Snapshots.SUFFIX =>
case JUnifiedLog.LOG_FILE_SUFFIX | Snapshots.SUFFIX =>
dumpLog(file, opts.shouldPrintDataLog, nonConsecutivePairsForLogFilesMap, opts.isDeepIteration,
opts.messageParser, opts.skipRecordMetadata, opts.maxBytes)
case UnifiedLog.IndexFileSuffix =>
case JUnifiedLog.INDEX_FILE_SUFFIX =>
dumpIndex(file, opts.indexSanityOnly, opts.verifyOnly, misMatchesForIndexFilesMap, opts.maxMessageSize)
case UnifiedLog.TimeIndexFileSuffix =>
case JUnifiedLog.TIME_INDEX_FILE_SUFFIX =>
dumpTimeIndex(file, opts.indexSanityOnly, opts.verifyOnly, timeIndexDumpErrors)
case LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX =>
dumpProducerIdSnapshot(file)
case UnifiedLog.TxnIndexFileSuffix =>
case JUnifiedLog.TXN_INDEX_FILE_SUFFIX =>
dumpTxnIndex(file)
case _ =>
System.err.println(s"Ignoring unknown file $file")
@ -111,7 +110,7 @@ object DumpLogSegments {
}
private def dumpTxnIndex(file: File): Unit = {
val index = new TransactionIndex(UnifiedLog.offsetFromFile(file), file)
val index = new TransactionIndex(JUnifiedLog.offsetFromFile(file), file)
for (abortedTxn <- index.allAbortedTxns.asScala) {
println(s"version: ${abortedTxn.version} producerId: ${abortedTxn.producerId} firstOffset: ${abortedTxn.firstOffset} " +
s"lastOffset: ${abortedTxn.lastOffset} lastStableOffset: ${abortedTxn.lastStableOffset}")
@ -144,7 +143,7 @@ object DumpLogSegments {
misMatchesForIndexFilesMap: mutable.Map[String, List[(Long, Long)]],
maxMessageSize: Int): Unit = {
val startOffset = file.getName.split("\\.")(0).toLong
val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + UnifiedLog.LogFileSuffix)
val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + JUnifiedLog.LOG_FILE_SUFFIX)
val fileRecords = FileRecords.open(logFile, false)
val index = new OffsetIndex(file, startOffset, -1, false)
@ -185,9 +184,9 @@ object DumpLogSegments {
verifyOnly: Boolean,
timeIndexDumpErrors: TimeIndexDumpErrors): Unit = {
val startOffset = file.getName.split("\\.")(0).toLong
val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + UnifiedLog.LogFileSuffix)
val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + JUnifiedLog.LOG_FILE_SUFFIX)
val fileRecords = FileRecords.open(logFile, false)
val indexFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + UnifiedLog.IndexFileSuffix)
val indexFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + JUnifiedLog.INDEX_FILE_SUFFIX)
val index = new OffsetIndex(indexFile, startOffset, -1, false)
val timeIndex = new TimeIndex(file, startOffset, -1, false)
@ -268,7 +267,7 @@ object DumpLogSegments {
parser: MessageParser[_, _],
skipRecordMetadata: Boolean,
maxBytes: Int): Unit = {
if (file.getName.endsWith(UnifiedLog.LogFileSuffix)) {
if (file.getName.endsWith(JUnifiedLog.LOG_FILE_SUFFIX)) {
val startOffset = file.getName.split("\\.")(0).toLong
println(s"Log starting offset: $startOffset")
} else if (file.getName.endsWith(Snapshots.SUFFIX)) {

View File

@ -17,7 +17,6 @@
package kafka.log.remote;
import kafka.cluster.Partition;
import kafka.log.UnifiedLog;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.Endpoint;
@ -75,6 +74,7 @@ import org.apache.kafka.storage.internals.log.ProducerStateManager;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
import org.apache.kafka.storage.internals.log.TimeIndex;
import org.apache.kafka.storage.internals.log.TransactionIndex;
import org.apache.kafka.storage.internals.log.UnifiedLog;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.apache.kafka.test.TestUtils;
@ -225,7 +225,7 @@ public class RemoteLogManagerTest {
private LeaderEpochCheckpointFile checkpoint;
private final AtomicLong currentLogStartOffset = new AtomicLong(0L);
private UnifiedLog mockLog = mock(UnifiedLog.class);
private kafka.log.UnifiedLog mockLog = mock(kafka.log.UnifiedLog.class);
private final MockScheduler scheduler = new MockScheduler(time);
private final Properties brokerConfig = kafka.utils.TestUtils.createDummyBrokerConfig();
@ -259,7 +259,7 @@ public class RemoteLogManagerTest {
return Duration.ofMillis(100);
}
@Override
long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) {
long findLogStartOffset(TopicIdPartition topicIdPartition, kafka.log.UnifiedLog log) {
return 0L;
}
};
@ -990,10 +990,10 @@ public class RemoteLogManagerTest {
return 0L == argument.getValue();
}, "Timed out waiting for updateHighestOffsetInRemoteStorage(0) get invoked for dir1 log");
UnifiedLog oldMockLog = mockLog;
kafka.log.UnifiedLog oldMockLog = mockLog;
Mockito.clearInvocations(oldMockLog);
// simulate altering log dir completes, and the new partition leader changes to the same broker in different log dir (dir2)
mockLog = mock(UnifiedLog.class);
mockLog = mock(kafka.log.UnifiedLog.class);
when(mockLog.parentDir()).thenReturn("dir2");
when(mockLog.leaderEpochCache()).thenReturn(cache);
when(mockLog.config()).thenReturn(logConfig);
@ -1639,16 +1639,16 @@ public class RemoteLogManagerTest {
File tpDir = new File(logDir, tp.toString());
Files.createDirectory(tpDir.toPath());
File txnIdxFile = new File(tpDir, "txn-index" + UnifiedLog.TxnIndexFileSuffix());
File txnIdxFile = new File(tpDir, "txn-index" + UnifiedLog.TXN_INDEX_FILE_SUFFIX);
txnIdxFile.createNewFile();
when(remoteStorageManager.fetchIndex(any(RemoteLogSegmentMetadata.class), any(IndexType.class)))
.thenAnswer(ans -> {
RemoteLogSegmentMetadata metadata = ans.getArgument(0);
IndexType indexType = ans.getArgument(1);
int maxEntries = (int) (metadata.endOffset() - metadata.startOffset());
OffsetIndex offsetIdx = new OffsetIndex(new File(tpDir, metadata.startOffset() + UnifiedLog.IndexFileSuffix()),
OffsetIndex offsetIdx = new OffsetIndex(new File(tpDir, metadata.startOffset() + UnifiedLog.INDEX_FILE_SUFFIX),
metadata.startOffset(), maxEntries * 8);
TimeIndex timeIdx = new TimeIndex(new File(tpDir, metadata.startOffset() + UnifiedLog.TimeIndexFileSuffix()),
TimeIndex timeIdx = new TimeIndex(new File(tpDir, metadata.startOffset() + UnifiedLog.TIME_INDEX_FILE_SUFFIX),
metadata.startOffset(), maxEntries * 12);
switch (indexType) {
case OFFSET:
@ -2041,7 +2041,7 @@ public class RemoteLogManagerTest {
@Test
public void testCandidateLogSegmentsSkipsActiveSegment() {
UnifiedLog log = mock(UnifiedLog.class);
kafka.log.UnifiedLog log = mock(kafka.log.UnifiedLog.class);
LogSegment segment1 = mock(LogSegment.class);
LogSegment segment2 = mock(LogSegment.class);
LogSegment activeSegment = mock(LogSegment.class);
@ -2065,7 +2065,7 @@ public class RemoteLogManagerTest {
@Test
public void testCandidateLogSegmentsSkipsSegmentsAfterLastStableOffset() {
UnifiedLog log = mock(UnifiedLog.class);
kafka.log.UnifiedLog log = mock(kafka.log.UnifiedLog.class);
LogSegment segment1 = mock(LogSegment.class);
LogSegment segment2 = mock(LogSegment.class);
LogSegment segment3 = mock(LogSegment.class);
@ -2335,7 +2335,7 @@ public class RemoteLogManagerTest {
return Duration.ofMillis(100);
}
@Override
long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) {
long findLogStartOffset(TopicIdPartition topicIdPartition, kafka.log.UnifiedLog log) {
return 0L;
}
};
@ -3711,7 +3711,7 @@ public class RemoteLogManagerTest {
return Duration.ofMillis(100);
}
@Override
long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) {
long findLogStartOffset(TopicIdPartition topicIdPartition, kafka.log.UnifiedLog log) {
return 0L;
}
};
@ -3766,7 +3766,7 @@ public class RemoteLogManagerTest {
private Partition mockPartition(TopicIdPartition topicIdPartition) {
TopicPartition tp = topicIdPartition.topicPartition();
Partition partition = mock(Partition.class);
UnifiedLog log = mock(UnifiedLog.class);
kafka.log.UnifiedLog log = mock(kafka.log.UnifiedLog.class);
when(partition.topicPartition()).thenReturn(tp);
when(partition.topic()).thenReturn(tp.topic());
when(log.remoteLogEnabled()).thenReturn(true);

View File

@ -17,7 +17,6 @@
package kafka.server
import kafka.log.UnifiedLog
import kafka.network.SocketServer
import kafka.server.IntegrationTestUtils.connectAndReceive
import kafka.utils.TestUtils
@ -48,6 +47,7 @@ import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.quota
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaType}
import org.apache.kafka.storage.internals.log.UnifiedLog
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{Tag, Test, Timeout}
import org.junit.jupiter.params.ParameterizedTest

View File

@ -16,7 +16,6 @@
*/
package kafka.raft
import kafka.log.UnifiedLog
import kafka.server.{KafkaConfig, KafkaRaftServer}
import kafka.utils.TestUtils
import org.apache.kafka.common.compress.Compression
@ -31,7 +30,7 @@ import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.snapshot.{FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
import org.apache.kafka.storage.internals.log.{LogConfig, LogStartOffsetIncrementReason}
import org.apache.kafka.storage.internals.log.{LogConfig, LogStartOffsetIncrementReason, UnifiedLog}
import org.apache.kafka.test.TestUtils.assertOptional
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}

View File

@ -39,7 +39,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog => JUnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@ -298,8 +298,8 @@ class PartitionLockTest extends Logging {
val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, None, None)
val logDirFailureChannel = new LogDirFailureChannel(1)
val segments = new LogSegments(log.topicPartition)
val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
log.dir, log.topicPartition, logDirFailureChannel, None, mockTime.scheduler)
val leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
log.dir, log.topicPartition, logDirFailureChannel, Optional.empty, mockTime.scheduler)
val maxTransactionTimeout = 5 * 60 * 1000
val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false)
val producerStateManager = new ProducerStateManager(

View File

@ -60,7 +60,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, Unexpec
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, LocalLog, LogAppendInfo, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, LocalLog, LogAppendInfo, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog => JUnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@ -445,8 +445,8 @@ class PartitionTest extends AbstractPartitionTest {
val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, None, None)
val logDirFailureChannel = new LogDirFailureChannel(1)
val segments = new LogSegments(log.topicPartition)
val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
log.dir, log.topicPartition, logDirFailureChannel, None, time.scheduler)
val leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
log.dir, log.topicPartition, logDirFailureChannel, Optional.empty, time.scheduler)
val maxTransactionTimeoutMs = 5 * 60 * 1000
val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, true)
val producerStateManager = new ProducerStateManager(
@ -1336,8 +1336,8 @@ class PartitionTest extends AbstractPartitionTest {
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = initializeTimeMs,
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset
logStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
logEndOffset = JUnifiedLog.UNKNOWN_OFFSET
)
time.sleep(500)
@ -1435,8 +1435,8 @@ class PartitionTest extends AbstractPartitionTest {
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = 0L,
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset
logStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
logEndOffset = JUnifiedLog.UNKNOWN_OFFSET
)
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 10L)
@ -1487,8 +1487,8 @@ class PartitionTest extends AbstractPartitionTest {
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = 0L,
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset
logStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
logEndOffset = JUnifiedLog.UNKNOWN_OFFSET
)
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 3L)
@ -1551,8 +1551,8 @@ class PartitionTest extends AbstractPartitionTest {
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = 0L,
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset
logStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
logEndOffset = JUnifiedLog.UNKNOWN_OFFSET
)
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 10L)
@ -2144,8 +2144,8 @@ class PartitionTest extends AbstractPartitionTest {
assertReplicaState(partition, remoteBrokerId2,
lastCaughtUpTimeMs = initializeTimeMs,
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset
logStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
logEndOffset = JUnifiedLog.UNKNOWN_OFFSET
)
// On initialization, the replica is considered caught up and should not be removed
@ -2263,8 +2263,8 @@ class PartitionTest extends AbstractPartitionTest {
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = initializeTimeMs,
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset
logStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
logEndOffset = JUnifiedLog.UNKNOWN_OFFSET
)
// Shrink the ISR
@ -2322,8 +2322,8 @@ class PartitionTest extends AbstractPartitionTest {
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = initializeTimeMs,
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset
logStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
logEndOffset = JUnifiedLog.UNKNOWN_OFFSET
)
// There is a short delay before the first fetch. The follower is not yet caught up to the log end.
@ -2382,8 +2382,8 @@ class PartitionTest extends AbstractPartitionTest {
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = initializeTimeMs,
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset
logStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
logEndOffset = JUnifiedLog.UNKNOWN_OFFSET
)
// The follower catches up to the log end immediately.
@ -2429,8 +2429,8 @@ class PartitionTest extends AbstractPartitionTest {
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = initializeTimeMs,
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset
logStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
logEndOffset = JUnifiedLog.UNKNOWN_OFFSET
)
time.sleep(30001)
@ -2516,8 +2516,8 @@ class PartitionTest extends AbstractPartitionTest {
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = 0L,
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset
logStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
logEndOffset = JUnifiedLog.UNKNOWN_OFFSET
)
// This will attempt to expand the ISR
@ -3054,8 +3054,8 @@ class PartitionTest extends AbstractPartitionTest {
// in the ISR.
assertReplicaState(partition, followerId,
lastCaughtUpTimeMs = 0L,
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset
logStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
logEndOffset = JUnifiedLog.UNKNOWN_OFFSET
)
// Follower fetches and updates its replica state.

View File

@ -16,12 +16,11 @@
*/
package kafka.cluster
import kafka.log.UnifiedLog
import kafka.server.metadata.KRaftMetadataCache
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.LogOffsetMetadata
import org.apache.kafka.storage.internals.log.{LogOffsetMetadata, UnifiedLog}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue}
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.Mockito.{mock, when}
@ -128,8 +127,8 @@ class ReplicaTest {
@Test
def testInitialState(): Unit = {
assertReplicaState(
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset,
logStartOffset = UnifiedLog.UNKNOWN_OFFSET,
logEndOffset = UnifiedLog.UNKNOWN_OFFSET,
lastCaughtUpTimeMs = 0L,
lastFetchLeaderLogEndOffset = 0L,
lastFetchTimeMs = 0L,
@ -243,10 +242,10 @@ class ReplicaTest {
)
assertReplicaState(
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset,
logStartOffset = UnifiedLog.UNKNOWN_OFFSET,
logEndOffset = UnifiedLog.UNKNOWN_OFFSET,
lastCaughtUpTimeMs = resetTimeMs1,
lastFetchLeaderLogEndOffset = UnifiedLog.UnknownOffset,
lastFetchLeaderLogEndOffset = UnifiedLog.UNKNOWN_OFFSET,
lastFetchTimeMs = 0L,
brokerEpoch = Option.empty
)
@ -267,10 +266,10 @@ class ReplicaTest {
)
assertReplicaState(
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset,
logStartOffset = UnifiedLog.UNKNOWN_OFFSET,
logEndOffset = UnifiedLog.UNKNOWN_OFFSET,
lastCaughtUpTimeMs = 0L,
lastFetchLeaderLogEndOffset = UnifiedLog.UnknownOffset,
lastFetchLeaderLogEndOffset = UnifiedLog.UNKNOWN_OFFSET,
lastFetchTimeMs = 0L,
brokerEpoch = Option.empty
)

View File

@ -19,7 +19,7 @@ package kafka.log
import java.io.File
import java.nio.file.Files
import java.util.Properties
import java.util.{Optional, Properties}
import kafka.utils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression
@ -28,7 +28,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AppendOrigin, LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.{AppendOrigin, LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog => JUnifiedLog}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
@ -109,8 +109,8 @@ class LogCleanerManagerTest extends Logging {
val maxTransactionTimeoutMs = 5 * 60 * 1000
val producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
val segments = new LogSegments(tp)
val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
tpDir, topicPartition, logDirFailureChannel, None, time.scheduler)
val leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
tpDir, topicPartition, logDirFailureChannel, Optional.empty, time.scheduler)
val producerStateManager = new ProducerStateManager(topicPartition, tpDir, maxTransactionTimeoutMs, producerStateManagerConfig, time)
val offsets = new LogLoader(
tpDir,
@ -807,7 +807,7 @@ class LogCleanerManagerTest extends Logging {
cleanupPolicy: String,
topicPartition: TopicPartition = new TopicPartition("log", 0)): UnifiedLog = {
val config = createLowRetentionLogConfig(segmentSize, cleanupPolicy)
val partitionDir = new File(logDir, UnifiedLog.logDirName(topicPartition))
val partitionDir = new File(logDir, JUnifiedLog.logDirName(topicPartition))
UnifiedLog(
dir = partitionDir,

View File

@ -29,7 +29,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LocalLog, LogAppendInfo, LogCleaningAbortedException, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LocalLog, LogAppendInfo, LogCleaningAbortedException, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog => JUnifiedLog}
import org.apache.kafka.storage.internals.utils.Throttler
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
@ -42,7 +42,7 @@ import java.io.{File, RandomAccessFile}
import java.nio._
import java.nio.charset.StandardCharsets
import java.nio.file.Paths
import java.util.Properties
import java.util.{Optional, Properties}
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit}
import scala.collection._
import scala.jdk.CollectionConverters._
@ -183,13 +183,13 @@ class LogCleanerTest extends Logging {
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024 : java.lang.Integer)
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE)
val config = LogConfig.fromProps(logConfig.originals, logProps)
val topicPartition = UnifiedLog.parseTopicPartitionName(dir)
val topicPartition = JUnifiedLog.parseTopicPartitionName(dir)
val logDirFailureChannel = new LogDirFailureChannel(10)
val maxTransactionTimeoutMs = 5 * 60 * 1000
val producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
val logSegments = new LogSegments(topicPartition)
val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
dir, topicPartition, logDirFailureChannel, None, time.scheduler)
val leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
dir, topicPartition, logDirFailureChannel, Optional.empty, time.scheduler)
val producerStateManager = new ProducerStateManager(topicPartition, dir,
maxTransactionTimeoutMs, producerStateManagerConfig, time)
val offsets = new LogLoader(
@ -1709,7 +1709,7 @@ class LogCleanerTest extends Logging {
// 1) Simulate recovery just after .cleaned file is created, before rename to .swap
// On recovery, clean operation is aborted. All messages should be present in the log
log.logSegments.asScala.head.changeFileSuffixes("", UnifiedLog.CleanedFileSuffix)
log.logSegments.asScala.head.changeFileSuffixes("", JUnifiedLog.CLEANED_FILE_SUFFIX)
for (file <- dir.listFiles if file.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)) {
Utils.atomicMoveWithFallback(file.toPath, Paths.get(Utils.replaceSuffix(file.getPath, LogFileUtils.DELETED_FILE_SUFFIX, "")), false)
}
@ -1725,8 +1725,8 @@ class LogCleanerTest extends Logging {
// 2) Simulate recovery just after .cleaned file is created, and a subset of them are renamed to .swap
// On recovery, clean operation is aborted. All messages should be present in the log
log.logSegments.asScala.head.changeFileSuffixes("", UnifiedLog.CleanedFileSuffix)
log.logSegments.asScala.head.log.renameTo(new File(Utils.replaceSuffix(log.logSegments.asScala.head.log.file.getPath, UnifiedLog.CleanedFileSuffix, UnifiedLog.SwapFileSuffix)))
log.logSegments.asScala.head.changeFileSuffixes("", JUnifiedLog.CLEANED_FILE_SUFFIX)
log.logSegments.asScala.head.log.renameTo(new File(Utils.replaceSuffix(log.logSegments.asScala.head.log.file.getPath, JUnifiedLog.CLEANED_FILE_SUFFIX, JUnifiedLog.SWAP_FILE_SUFFIX)))
for (file <- dir.listFiles if file.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)) {
Utils.atomicMoveWithFallback(file.toPath, Paths.get(Utils.replaceSuffix(file.getPath, LogFileUtils.DELETED_FILE_SUFFIX, "")), false)
}
@ -1742,7 +1742,7 @@ class LogCleanerTest extends Logging {
// 3) Simulate recovery just after swap file is created, before old segment files are
// renamed to .deleted. Clean operation is resumed during recovery.
log.logSegments.asScala.head.changeFileSuffixes("", UnifiedLog.SwapFileSuffix)
log.logSegments.asScala.head.changeFileSuffixes("", JUnifiedLog.SWAP_FILE_SUFFIX)
for (file <- dir.listFiles if file.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)) {
Utils.atomicMoveWithFallback(file.toPath, Paths.get(Utils.replaceSuffix(file.getPath, LogFileUtils.DELETED_FILE_SUFFIX, "")), false)
}
@ -1763,7 +1763,7 @@ class LogCleanerTest extends Logging {
// 4) Simulate recovery after swap file is created and old segments files are renamed
// to .deleted. Clean operation is resumed during recovery.
log.logSegments.asScala.head.changeFileSuffixes("", UnifiedLog.SwapFileSuffix)
log.logSegments.asScala.head.changeFileSuffixes("", JUnifiedLog.SWAP_FILE_SUFFIX)
log = recoverAndCheck(config, cleanedKeys)
// add some more messages and clean the log again
@ -1781,7 +1781,7 @@ class LogCleanerTest extends Logging {
// 5) Simulate recovery after a subset of swap files are renamed to regular files and old segments files are renamed
// to .deleted. Clean operation is resumed during recovery.
log.logSegments.asScala.head.timeIndex.file.renameTo(new File(Utils.replaceSuffix(log.logSegments.asScala.head.timeIndex.file.getPath, "", UnifiedLog.SwapFileSuffix)))
log.logSegments.asScala.head.timeIndex.file.renameTo(new File(Utils.replaceSuffix(log.logSegments.asScala.head.timeIndex.file.getPath, "", JUnifiedLog.SWAP_FILE_SUFFIX)))
log = recoverAndCheck(config, cleanedKeys)
// add some more messages and clean the log again

View File

@ -29,7 +29,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.MockConfigRepository
import org.apache.kafka.server.util.{MockTime, Scheduler}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile}
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile, UnifiedLog => JUnifiedLog}
import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
@ -146,14 +146,14 @@ class LogLoaderTest {
}
}
cleanShutdownInterceptedValue = hadCleanShutdown
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
val topicPartition = JUnifiedLog.parseTopicPartitionName(logDir)
val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
val logRecoveryPoint = recoveryPoints.getOrDefault(topicPartition, 0L)
val logStartOffset = logStartOffsets.getOrDefault(topicPartition, 0L)
val logDirFailureChannel: LogDirFailureChannel = new LogDirFailureChannel(1)
val segments = new LogSegments(topicPartition)
val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
logDir, topicPartition, logDirFailureChannel, None, time.scheduler)
val leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
logDir, topicPartition, logDirFailureChannel, Optional.empty, time.scheduler)
val producerStateManager = new ProducerStateManager(topicPartition, logDir,
this.maxTransactionTimeoutMs, this.producerStateManagerConfig, time)
val logLoader = new LogLoader(logDir, topicPartition, config, time.scheduler, time,
@ -279,7 +279,7 @@ class LogLoaderTest {
def createLogWithInterceptedReads(recoveryPoint: Long): UnifiedLog = {
val maxTransactionTimeoutMs = 5 * 60 * 1000
val producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
val topicPartition = JUnifiedLog.parseTopicPartitionName(logDir)
val logDirFailureChannel = new LogDirFailureChannel(10)
// Intercept all segment read calls
val interceptedLogSegments = new LogSegments(topicPartition) {
@ -296,8 +296,8 @@ class LogLoaderTest {
super.add(wrapper)
}
}
val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
logDir, topicPartition, logDirFailureChannel, None, mockTime.scheduler)
val leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
logDir, topicPartition, logDirFailureChannel, Optional.empty, mockTime.scheduler)
val producerStateManager = new ProducerStateManager(topicPartition, logDir,
maxTransactionTimeoutMs, producerStateManagerConfig, mockTime)
val logLoader = new LogLoader(
@ -414,12 +414,12 @@ class LogLoaderTest {
when(stateManager.isEmpty).thenReturn(true)
when(stateManager.firstUnstableOffset).thenReturn(Optional.empty[LogOffsetMetadata]())
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
val topicPartition = JUnifiedLog.parseTopicPartitionName(logDir)
val logDirFailureChannel: LogDirFailureChannel = new LogDirFailureChannel(1)
val config = new LogConfig(new Properties())
val segments = new LogSegments(topicPartition)
val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
logDir, topicPartition, logDirFailureChannel, None, mockTime.scheduler)
val leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
logDir, topicPartition, logDirFailureChannel, Optional.empty, mockTime.scheduler)
val offsets = new LogLoader(
logDir,
topicPartition,
@ -523,12 +523,12 @@ class LogLoaderTest {
when(stateManager.producerStateManagerConfig).thenReturn(producerStateManagerConfig)
when(stateManager.maxTransactionTimeoutMs).thenReturn(maxTransactionTimeoutMs)
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
val topicPartition = JUnifiedLog.parseTopicPartitionName(logDir)
val config = new LogConfig(new Properties())
val logDirFailureChannel = null
val segments = new LogSegments(topicPartition)
val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
logDir, topicPartition, logDirFailureChannel, None, mockTime.scheduler)
val leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
logDir, topicPartition, logDirFailureChannel, Optional.empty, mockTime.scheduler)
val offsets = new LogLoader(
logDir,
topicPartition,
@ -1057,7 +1057,7 @@ class LogLoaderTest {
// Simulate recovery just after .cleaned file is created, before rename to .swap. On recovery, existing split
// operation is aborted but the recovery process itself kicks off split which should complete.
newSegments.reverse.foreach(segment => {
segment.changeFileSuffixes("", UnifiedLog.CleanedFileSuffix)
segment.changeFileSuffixes("", JUnifiedLog.CLEANED_FILE_SUFFIX)
segment.truncateTo(0)
})
for (file <- logDir.listFiles if file.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX))
@ -1083,9 +1083,9 @@ class LogLoaderTest {
// operation is aborted but the recovery process itself kicks off split which should complete.
newSegments.reverse.foreach { segment =>
if (segment != newSegments.last)
segment.changeFileSuffixes("", UnifiedLog.CleanedFileSuffix)
segment.changeFileSuffixes("", JUnifiedLog.CLEANED_FILE_SUFFIX)
else
segment.changeFileSuffixes("", UnifiedLog.SwapFileSuffix)
segment.changeFileSuffixes("", JUnifiedLog.SWAP_FILE_SUFFIX)
segment.truncateTo(0)
}
for (file <- logDir.listFiles if file.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX))
@ -1110,7 +1110,7 @@ class LogLoaderTest {
// Simulate recovery right after all new segments have been renamed to .swap. On recovery, existing split operation
// is completed and the old segment must be deleted.
newSegments.reverse.foreach(segment => {
segment.changeFileSuffixes("", UnifiedLog.SwapFileSuffix)
segment.changeFileSuffixes("", JUnifiedLog.SWAP_FILE_SUFFIX)
})
for (file <- logDir.listFiles if file.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX))
Utils.atomicMoveWithFallback(file.toPath, Paths.get(Utils.replaceSuffix(file.getPath, LogFileUtils.DELETED_FILE_SUFFIX, "")))
@ -1136,7 +1136,7 @@ class LogLoaderTest {
// Simulate recovery right after all new segments have been renamed to .swap and old segment has been deleted. On
// recovery, existing split operation is completed.
newSegments.reverse.foreach(_.changeFileSuffixes("", UnifiedLog.SwapFileSuffix))
newSegments.reverse.foreach(_.changeFileSuffixes("", JUnifiedLog.SWAP_FILE_SUFFIX))
for (file <- logDir.listFiles if file.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX))
Utils.delete(file)
@ -1162,7 +1162,7 @@ class LogLoaderTest {
// Simulate recovery right after one of the new segment has been renamed to .swap and the other to .log. On
// recovery, existing split operation is completed.
newSegments.last.changeFileSuffixes("", UnifiedLog.SwapFileSuffix)
newSegments.last.changeFileSuffixes("", JUnifiedLog.SWAP_FILE_SUFFIX)
// Truncate the old segment
segmentWithOverflow.truncateTo(0)
@ -1597,7 +1597,7 @@ class LogLoaderTest {
def testLogStartOffsetWhenRemoteStorageIsEnabled(isRemoteLogEnabled: Boolean,
expectedLogStartOffset: Long): Unit = {
val logDirFailureChannel = null
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
val topicPartition = JUnifiedLog.parseTopicPartitionName(logDir)
val logConfig = LogTestUtils.createLogConfig()
val stateManager: ProducerStateManager = mock(classOf[ProducerStateManager])
when(stateManager.isEmpty).thenReturn(true)
@ -1619,8 +1619,8 @@ class LogLoaderTest {
log.logSegments.forEach(segment => segments.add(segment))
assertEquals(5, segments.firstSegment.get.baseOffset)
val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
logDir, topicPartition, logDirFailureChannel, None, mockTime.scheduler)
val leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
logDir, topicPartition, logDirFailureChannel, Optional.empty, mockTime.scheduler)
val offsets = new LogLoader(
logDir,
topicPartition,

View File

@ -44,7 +44,7 @@ import java.util.{Collections, Optional, OptionalLong, Properties}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.{FileLock, KafkaScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.log.{CleanerConfig, FetchDataInfo, LogConfig, LogDirFailureChannel, LogStartOffsetIncrementReason, ProducerStateManagerConfig, RemoteIndexCache}
import org.apache.kafka.storage.internals.log.{CleanerConfig, FetchDataInfo, LogConfig, LogDirFailureChannel, LogMetricNames, LogStartOffsetIncrementReason, ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog => JUnifiedLog}
import org.apache.kafka.storage.internals.checkpoint.{CleanShutdownFileHandler, OffsetCheckpointFile}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.function.Executable
@ -540,7 +540,7 @@ class LogManagerTest {
assertEquals(1, invokedCount)
assertTrue(
logDir.listFiles().toSet
.exists(f => f.getName.startsWith(testTopic) && f.getName.endsWith(UnifiedLog.StrayDirSuffix))
.exists(f => f.getName.startsWith(testTopic) && f.getName.endsWith(JUnifiedLog.STRAY_DIR_SUFFIX))
)
}
@ -952,7 +952,7 @@ class LogManagerTest {
val dir: File = invocation.getArgument(0)
val topicConfigOverrides: mutable.Map[String, LogConfig] = invocation.getArgument(5)
val topicPartition = UnifiedLog.parseTopicPartitionName(dir)
val topicPartition = JUnifiedLog.parseTopicPartitionName(dir)
val config = topicConfigOverrides.getOrElse(topicPartition.topic, logConfig)
UnifiedLog(
@ -1028,7 +1028,7 @@ class LogManagerTest {
val metricTag = s"topic=${tp.topic},partition=${tp.partition}"
def verifyMetrics(): Unit = {
assertEquals(LogMetricNames.allMetricNames.size, logMetrics.size)
assertEquals(LogMetricNames.ALL_METRIC_NAMES.size, logMetrics.size)
logMetrics.foreach { metric =>
assertTrue(metric.getMBeanName.contains(metricTag))
}
@ -1068,7 +1068,7 @@ class LogManagerTest {
val metricTag = s"topic=${tp.topic},partition=${tp.partition}"
def verifyMetrics(logCount: Int): Unit = {
assertEquals(LogMetricNames.allMetricNames.size * logCount, logMetrics.size)
assertEquals(LogMetricNames.ALL_METRIC_NAMES.size * logCount, logMetrics.size)
logMetrics.foreach { metric =>
assertTrue(metric.getMBeanName.contains(metricTag))
}

View File

@ -36,7 +36,7 @@ import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.log.LogConfig.{DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG, DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG}
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, FetchDataInfo, LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetsListener, LogSegment, ProducerStateManager, ProducerStateManagerConfig, TransactionIndex}
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, FetchDataInfo, LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetsListener, LogSegment, ProducerStateManager, ProducerStateManagerConfig, TransactionIndex, UnifiedLog => JUnifiedLog}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import scala.jdk.CollectionConverters._
@ -52,7 +52,7 @@ object LogTestUtils {
val ms = FileRecords.open(LogFileUtils.logFile(logDir, offset))
val idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(logDir, offset), offset, 1000)
val timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(logDir, offset), offset, 1500)
val txnIndex = new TransactionIndex(offset, UnifiedLog.transactionIndexFile(logDir, offset))
val txnIndex = new TransactionIndex(offset, LogFileUtils.transactionIndexFile(logDir, offset, ""))
new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, time)
}
@ -209,8 +209,8 @@ object LogTestUtils {
time.sleep(config.fileDeleteDelayMs + 1)
for (file <- logDir.listFiles) {
assertFalse(file.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX), "Unexpected .deleted file after recovery")
assertFalse(file.getName.endsWith(UnifiedLog.CleanedFileSuffix), "Unexpected .cleaned file after recovery")
assertFalse(file.getName.endsWith(UnifiedLog.SwapFileSuffix), "Unexpected .swap file after recovery")
assertFalse(file.getName.endsWith(JUnifiedLog.CLEANED_FILE_SUFFIX), "Unexpected .cleaned file after recovery")
assertFalse(file.getName.endsWith(JUnifiedLog.SWAP_FILE_SUFFIX), "Unexpected .swap file after recovery")
}
assertEquals(expectedKeys, keysInLog(recoveredLog))
assertFalse(hasOffsetOverflow(recoveredLog))

View File

@ -42,7 +42,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, UnexpectedAppendOffs
import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetResultHolder, OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, VerificationGuard}
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetResultHolder, OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, UnifiedLog => JUnifiedLog, VerificationGuard}
import org.apache.kafka.storage.internals.utils.Throttler
import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats}
import org.junit.jupiter.api.Assertions._
@ -527,13 +527,6 @@ class UnifiedLogTest {
assertLsoBoundedFetches()
}
@Test
def testOffsetFromProducerSnapshotFile(): Unit = {
val offset = 23423423L
val snapshotFile = LogFileUtils.producerSnapshotFile(tmpDir, offset)
assertEquals(offset, UnifiedLog.offsetFromFile(snapshotFile))
}
/**
* Tests for time based log roll. This test appends messages then changes the time
* using the mock clock to force the log to roll and checks the number of segments.
@ -2495,8 +2488,8 @@ class UnifiedLogTest {
assertEquals(Some(5), log.latestEpoch)
// Ensure that after a directory rename, the epoch cache is written to the right location
val tp = UnifiedLog.parseTopicPartitionName(log.dir)
log.renameDir(UnifiedLog.logDeleteDirName(tp), true)
val tp = JUnifiedLog.parseTopicPartitionName(log.dir)
log.renameDir(JUnifiedLog.logDeleteDirName(tp), true)
log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 10)
assertEquals(Some(10), log.latestEpoch)
assertTrue(LeaderEpochCheckpointFile.newFile(log.dir).exists())
@ -2516,8 +2509,8 @@ class UnifiedLogTest {
assertEquals(Some(5), log.latestEpoch)
// Ensure that after a directory rename, the partition metadata file is written to the right location.
val tp = UnifiedLog.parseTopicPartitionName(log.dir)
log.renameDir(UnifiedLog.logDeleteDirName(tp), true)
val tp = JUnifiedLog.parseTopicPartitionName(log.dir)
log.renameDir(JUnifiedLog.logDeleteDirName(tp), true)
log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 10)
assertEquals(Some(10), log.latestEpoch)
assertTrue(PartitionMetadataFile.newFile(log.dir).exists())
@ -2539,8 +2532,8 @@ class UnifiedLogTest {
log.partitionMetadataFile.get.record(topicId)
// Ensure that after a directory rename, the partition metadata file is written to the right location.
val tp = UnifiedLog.parseTopicPartitionName(log.dir)
log.renameDir(UnifiedLog.logDeleteDirName(tp), true)
val tp = JUnifiedLog.parseTopicPartitionName(log.dir)
log.renameDir(JUnifiedLog.logDeleteDirName(tp), true)
assertTrue(PartitionMetadataFile.newFile(log.dir).exists())
assertFalse(PartitionMetadataFile.newFile(this.logDir).exists())
@ -3531,7 +3524,7 @@ class UnifiedLogTest {
@Test
def testLoadPartitionDirWithNoSegmentsShouldNotThrow(): Unit = {
val dirName = UnifiedLog.logDeleteDirName(new TopicPartition("foo", 3))
val dirName = JUnifiedLog.logDeleteDirName(new TopicPartition("foo", 3))
val logDir = new File(tmpDir, dirName)
logDir.mkdirs()
val logConfig = LogTestUtils.createLogConfig()

View File

@ -19,7 +19,6 @@ package kafka.server
import kafka.cluster.Partition
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
import kafka.log.UnifiedLog
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.KRaftMetadataCache
@ -94,7 +93,7 @@ import org.apache.kafka.server.share.context.{FinalContext, ShareSessionContext}
import org.apache.kafka.server.share.session.{ShareSession, ShareSessionKey}
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.server.util.{FutureUtils, MockTime}
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig}
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, UnifiedLog}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
@ -3847,7 +3846,7 @@ class KafkaApisTest extends Logging {
any[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]()
)).thenAnswer(invocation => {
val callback = invocation.getArgument(3).asInstanceOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]
callback(Seq(tidp -> new FetchPartitionData(Errors.NOT_LEADER_OR_FOLLOWER, UnifiedLog.UnknownOffset, UnifiedLog.UnknownOffset, MemoryRecords.EMPTY,
callback(Seq(tidp -> new FetchPartitionData(Errors.NOT_LEADER_OR_FOLLOWER, UnifiedLog.UNKNOWN_OFFSET, UnifiedLog.UNKNOWN_OFFSET, MemoryRecords.EMPTY,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)))
})

View File

@ -19,7 +19,6 @@ package kafka.server
import java.io.File
import java.nio.file.Files
import java.util.{Optional, Properties}
import kafka.log.UnifiedLog
import org.apache.kafka.common.{KafkaException, Uuid}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadata}
@ -28,6 +27,7 @@ import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.storage.internals.log.UnifiedLog
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test

View File

@ -68,7 +68,7 @@ import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.server.util.{MockScheduler, MockTime}
import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogSegments, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, VerificationGuard}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogSegments, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, UnifiedLog => JUnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeEach, Test}
@ -2700,8 +2700,8 @@ class ReplicaManagerTest {
val maxTransactionTimeoutMs = 30000
val maxProducerIdExpirationMs = 30000
val segments = new LogSegments(tp)
val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
logDir, tp, mockLogDirFailureChannel, None, time.scheduler)
val leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
logDir, tp, mockLogDirFailureChannel, Optional.empty, time.scheduler)
val producerStateManager = new ProducerStateManager(tp, logDir,
maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs, true), time)
val offsets = new LogLoader(
@ -5877,7 +5877,7 @@ class ReplicaManagerTest {
assertEquals(tpId0, fetch.head._1)
val fetchInfo = fetch.head._2.info
assertEquals(1L, fetchInfo.fetchOffsetMetadata.messageOffset)
assertEquals(UnifiedLog.UnknownOffset, fetchInfo.fetchOffsetMetadata.segmentBaseOffset)
assertEquals(JUnifiedLog.UNKNOWN_OFFSET, fetchInfo.fetchOffsetMetadata.segmentBaseOffset)
assertEquals(-1, fetchInfo.fetchOffsetMetadata.relativePositionInSegment)
assertEquals(MemoryRecords.EMPTY, fetchInfo.records)
assertTrue(fetchInfo.delayedRemoteStorageFetch.isPresent)

View File

@ -16,14 +16,14 @@
*/
package kafka.utils
import java.util.Properties
import java.util.{Optional, Properties}
import java.util.concurrent.atomic._
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Executors, TimeUnit}
import kafka.log.UnifiedLog
import kafka.utils.TestUtils.retry
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.log.{LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.{LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog => JUnifiedLog}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout}
@ -135,11 +135,11 @@ class SchedulerTest {
val maxTransactionTimeoutMs = 5 * 60 * 1000
val maxProducerIdExpirationMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT
val producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
val topicPartition = JUnifiedLog.parseTopicPartitionName(logDir)
val logDirFailureChannel = new LogDirFailureChannel(10)
val segments = new LogSegments(topicPartition)
val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
logDir, topicPartition, logDirFailureChannel, None, mockTime.scheduler)
val leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
logDir, topicPartition, logDirFailureChannel, Optional.empty, mockTime.scheduler)
val producerStateManager = new ProducerStateManager(topicPartition, logDir,
maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs, false), mockTime)
val offsets = new LogLoader(

View File

@ -17,7 +17,7 @@
package kafka.utils
import com.yammer.metrics.core.{Histogram, Meter}
import kafka.log._
import kafka.log.LogManager
import kafka.network.RequestChannel
import kafka.security.JaasTestUtils
import kafka.server._
@ -56,7 +56,7 @@ import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfi
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig, UnifiedLog => JUnifiedLog}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions._
@ -961,7 +961,7 @@ object TestUtils extends Logging {
time: MockTime = new MockTime(),
recoveryThreadsPerDataDir: Int = 4,
transactionVerificationEnabled: Boolean = false,
log: Option[UnifiedLog] = None,
log: Option[kafka.log.UnifiedLog] = None,
remoteStorageSystemEnable: Boolean = false,
initialTaskDelayMs: Long = ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT): LogManager = {
val logManager = new LogManager(logDirs = logDirs.map(_.getAbsoluteFile),
@ -1109,7 +1109,7 @@ object TestUtils extends Logging {
!util.Arrays.asList(new File(logDir).list()).asScala.exists { partitionDirectoryNames =>
partitionDirectoryNames.exists { directoryName =>
directoryName.startsWith(tp.topic + "-" + tp.partition) &&
directoryName.endsWith(UnifiedLog.DeleteDirSuffix)
directoryName.endsWith(JUnifiedLog.DELETE_DIR_SUFFIX)
}
}
}

View File

@ -16,8 +16,6 @@
*/
package org.apache.kafka.jmh.record;
import kafka.log.UnifiedLog;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.AbstractRecords;
@ -28,6 +26,7 @@ import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.server.common.RequestLocal;
import org.apache.kafka.storage.internals.log.LogValidator;
import org.apache.kafka.storage.internals.log.UnifiedLog;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.openjdk.jmh.annotations.Param;

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.log;
import java.util.List;
public class LogMetricNames {
public static final String NUM_LOG_SEGMENTS = "NumLogSegments";
public static final String LOG_START_OFFSET = "LogStartOffset";
public static final String LOG_END_OFFSET = "LogEndOffset";
public static final String SIZE = "Size";
public static final List<String> ALL_METRIC_NAMES = List.of(NUM_LOG_SEGMENTS, LOG_START_OFFSET, LOG_END_OFFSET, SIZE);
}

View File

@ -17,16 +17,22 @@
package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@ -39,6 +45,16 @@ public class UnifiedLog {
private static final Logger LOG = LoggerFactory.getLogger(UnifiedLog.class);
public static final String LOG_FILE_SUFFIX = LogFileUtils.LOG_FILE_SUFFIX;
public static final String INDEX_FILE_SUFFIX = LogFileUtils.INDEX_FILE_SUFFIX;
public static final String TIME_INDEX_FILE_SUFFIX = LogFileUtils.TIME_INDEX_FILE_SUFFIX;
public static final String TXN_INDEX_FILE_SUFFIX = LogFileUtils.TXN_INDEX_FILE_SUFFIX;
public static final String CLEANED_FILE_SUFFIX = LogFileUtils.CLEANED_FILE_SUFFIX;
public static final String SWAP_FILE_SUFFIX = LogFileUtils.SWAP_FILE_SUFFIX;
public static final String DELETE_DIR_SUFFIX = LogFileUtils.DELETE_DIR_SUFFIX;
public static final String STRAY_DIR_SUFFIX = LogFileUtils.STRAY_DIR_SUFFIX;
public static final long UNKNOWN_OFFSET = LocalLog.UNKNOWN_OFFSET;
/**
* Rebuilds producer state until the provided lastOffset. This function may be called from the
* recovery code path, and thus must be free of all side effects, i.e. it must not update any
@ -129,8 +145,8 @@ public class UnifiedLog {
}
producerStateManager.updateMapEndOffset(lastOffset);
producerStateManager.takeSnapshot();
LOG.info(logPrefix + "Producer state recovery took " + (segmentRecoveryStart - producerStateLoadStart) + "ms for snapshot load " +
"and " + (time.milliseconds() - segmentRecoveryStart) + "ms for segment recovery from offset " + lastOffset);
LOG.info("{}Producer state recovery took {}ms for snapshot load and {}ms for segment recovery from offset {}",
logPrefix, segmentRecoveryStart - producerStateLoadStart, time.milliseconds() - segmentRecoveryStart, lastOffset);
}
}
@ -206,4 +222,105 @@ public class UnifiedLog {
}
return completedTxn;
}
public static boolean isRemoteLogEnabled(boolean remoteStorageSystemEnable, LogConfig config, String topic) {
// Remote log is enabled only for non-compact and non-internal topics
return remoteStorageSystemEnable &&
!(config.compact || Topic.isInternal(topic)
|| TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME.equals(topic)
|| Topic.CLUSTER_METADATA_TOPIC_NAME.equals(topic)) &&
config.remoteStorageEnable();
}
// Visible for benchmarking
public static LogValidator.MetricsRecorder newValidatorMetricsRecorder(BrokerTopicMetrics allTopicsStats) {
return new LogValidator.MetricsRecorder() {
public void recordInvalidMagic() {
allTopicsStats.invalidMagicNumberRecordsPerSec().mark();
}
public void recordInvalidOffset() {
allTopicsStats.invalidOffsetOrSequenceRecordsPerSec().mark();
}
public void recordInvalidSequence() {
allTopicsStats.invalidOffsetOrSequenceRecordsPerSec().mark();
}
public void recordInvalidChecksums() {
allTopicsStats.invalidMessageCrcRecordsPerSec().mark();
}
public void recordNoKeyCompactedTopic() {
allTopicsStats.noKeyCompactedTopicRecordsPerSec().mark();
}
};
}
/**
* Create a new LeaderEpochFileCache instance and load the epoch entries from the backing checkpoint file or
* the provided currentCache (if not empty).
*
* @param dir The directory in which the log will reside
* @param topicPartition The topic partition
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
* @param currentCache The current LeaderEpochFileCache instance (if any)
* @param scheduler The scheduler for executing asynchronous tasks
* @return The new LeaderEpochFileCache instance
*/
public static LeaderEpochFileCache createLeaderEpochCache(File dir,
TopicPartition topicPartition,
LogDirFailureChannel logDirFailureChannel,
Optional<LeaderEpochFileCache> currentCache,
Scheduler scheduler) throws IOException {
File leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir);
LeaderEpochCheckpointFile checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel);
return currentCache.map(cache -> cache.withCheckpoint(checkpointFile))
.orElse(new LeaderEpochFileCache(topicPartition, checkpointFile, scheduler));
}
public static LogSegment createNewCleanedSegment(File dir, LogConfig logConfig, long baseOffset) throws IOException {
return LocalLog.createNewCleanedSegment(dir, logConfig, baseOffset);
}
public static long localRetentionMs(LogConfig config, boolean remoteLogEnabledAndRemoteCopyEnabled) {
return remoteLogEnabledAndRemoteCopyEnabled ? config.localRetentionMs() : config.retentionMs;
}
public static long localRetentionSize(LogConfig config, boolean remoteLogEnabledAndRemoteCopyEnabled) {
return remoteLogEnabledAndRemoteCopyEnabled ? config.localRetentionBytes() : config.retentionSize;
}
public static String logDeleteDirName(TopicPartition topicPartition) {
return LocalLog.logDeleteDirName(topicPartition);
}
public static String logFutureDirName(TopicPartition topicPartition) {
return LocalLog.logFutureDirName(topicPartition);
}
public static String logStrayDirName(TopicPartition topicPartition) {
return LocalLog.logStrayDirName(topicPartition);
}
public static String logDirName(TopicPartition topicPartition) {
return LocalLog.logDirName(topicPartition);
}
public static File transactionIndexFile(File dir, long offset, String suffix) {
return LogFileUtils.transactionIndexFile(dir, offset, suffix);
}
public static long offsetFromFile(File file) {
return LogFileUtils.offsetFromFile(file);
}
public static long sizeInBytes(Collection<LogSegment> segments) {
return LogSegments.sizeInBytes(segments);
}
public static TopicPartition parseTopicPartitionName(File dir) throws IOException {
return LocalLog.parseTopicPartitionName(dir);
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.log;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import java.io.File;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class UnifiedLogTest {
private final File tmpDir = TestUtils.tempDirectory();
@Test
public void testOffsetFromProducerSnapshotFile() {
long offset = 23423423L;
File snapshotFile = LogFileUtils.producerSnapshotFile(tmpDir, offset);
assertEquals(offset, UnifiedLog.offsetFromFile(snapshotFile));
}
}

View File

@ -302,7 +302,7 @@ public final class TieredStorageTestContext implements AutoCloseable {
// unused now, but it can be reused later as this is an utility method.
public Optional<LeaderEpochFileCache> leaderEpochFileCache(int brokerId, TopicPartition partition) {
return log(brokerId, partition).map(log -> log.leaderEpochCache());
return log(brokerId, partition).map(UnifiedLog::leaderEpochCache);
}
public List<LocalTieredStorage> remoteStorageManagers() {

View File

@ -17,7 +17,6 @@
package org.apache.kafka.common.test;
import kafka.log.UnifiedLog;
import kafka.network.SocketServer;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
@ -51,6 +50,7 @@ import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.fault.FaultHandlerException;
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
import org.apache.kafka.storage.internals.log.UnifiedLog;
import java.io.File;
import java.io.IOException;
@ -345,7 +345,7 @@ public interface ClusterInstance {
topicPartitions.stream().allMatch(tp ->
Arrays.stream(Objects.requireNonNull(new File(logDir).list())).noneMatch(partitionDirectoryName ->
partitionDirectoryName.startsWith(tp.topic() + "-" + tp.partition()) &&
partitionDirectoryName.endsWith(UnifiedLog.DeleteDirSuffix())))
partitionDirectoryName.endsWith(UnifiedLog.DELETE_DIR_SUFFIX)))
)
), "Failed to hard-delete the delete directory");
}