KAFKA-14482 Move LogLoader to storage module (#17042)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Mickael Maison 2024-09-16 18:37:49 +02:00 committed by GitHub
parent 21e67b3d21
commit f1c011a8b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 1262 additions and 1085 deletions

View File

@ -342,11 +342,11 @@
<!-- storage -->
<suppress checks="CyclomaticComplexity"
files="(LogValidator|RemoteLogManagerConfig|RemoteLogManager).java"/>
files="(LogLoader|LogValidator|RemoteLogManagerConfig|RemoteLogManager).java"/>
<suppress checks="NPathComplexity"
files="(LogValidator|RemoteLogManager|RemoteIndexCache).java"/>
files="(LogLoader|LogValidator|RemoteLogManager|RemoteIndexCache).java"/>
<suppress checks="ParameterNumber"
files="(LogAppendInfo|RemoteLogManagerConfig).java"/>
files="(LogAppendInfo|LogLoader|RemoteLogManagerConfig).java"/>
<!-- benchmarks -->
<suppress checks="(ClassDataAbstractionCoupling|ClassFanOutComplexity)"

View File

@ -24,27 +24,20 @@ import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.log.{AbortedTxn, FetchDataInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogSegment, LogSegments, OffsetPosition}
import org.apache.kafka.storage.internals.log.{AbortedTxn, FetchDataInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogSegment, LogSegments, OffsetPosition, LocalLog => JLocalLog}
import java.io.{File, IOException}
import java.io.File
import java.nio.file.Files
import java.util
import java.util.Collections.singletonList
import java.util.concurrent.atomic.AtomicLong
import java.util.regex.Pattern
import java.util.{Collections, Optional}
import scala.collection.mutable.ListBuffer
import scala.collection.{Seq, immutable}
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
/**
* Holds the result of splitting a segment into one or more segments, see LocalLog.splitOverflowedSegment().
*
* @param deletedSegments segments deleted when splitting a segment
* @param newSegments new segments created when splitting a segment
*/
case class SplitSegmentResult(deletedSegments: Iterable[LogSegment], newSegments: Iterable[LogSegment])
/**
* An append-only log for storing messages locally. The log is a sequence of LogSegments, each with a base offset.
* New log segments are created according to a configurable policy that controls the size in bytes or time interval
@ -97,9 +90,7 @@ class LocalLog(@volatile private var _dir: File,
private[log] def isFuture: Boolean = dir.getName.endsWith(LocalLog.FutureDirSuffix)
private def maybeHandleIOException[T](msg: => String)(fun: => T): T = {
LocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, msg) {
fun
}
JLocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, () => msg, () => fun)
}
/**
@ -248,9 +239,9 @@ class LocalLog(@volatile private var _dir: File,
* Completely delete all segments with no delay.
* @return the deleted segments
*/
private[log] def deleteAllSegments(): Iterable[LogSegment] = {
private[log] def deleteAllSegments(): util.List[LogSegment] = {
maybeHandleIOException(s"Error while deleting all segments for $topicPartition in dir ${dir.getParent}") {
val deletableSegments = new util.ArrayList(segments.values).asScala
val deletableSegments = new util.ArrayList(segments.values)
removeAndDeleteSegments(segments.values.asScala, asyncDelete = false, LogDeletion(this))
isMemoryMappedBufferClosed = true
deletableSegments
@ -286,7 +277,7 @@ class LocalLog(@volatile private var _dir: File,
toDelete.foreach { segment =>
segments.remove(segment.baseOffset)
}
LocalLog.deleteSegmentFiles(toDelete, asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent)
JLocalLog.deleteSegmentFiles(toDelete.asJava, asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent)
}
}
@ -323,7 +314,7 @@ class LocalLog(@volatile private var _dir: File,
reason.logReason(List(segmentToDelete))
if (newOffset != segmentToDelete.baseOffset)
segments.remove(segmentToDelete.baseOffset)
LocalLog.deleteSegmentFiles(List(segmentToDelete), asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent)
JLocalLog.deleteSegmentFiles(singletonList(segmentToDelete), asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent)
newSegment
}
@ -572,17 +563,8 @@ class LocalLog(@volatile private var _dir: File,
*/
object LocalLog extends Logging {
/** a file that is scheduled to be deleted */
private[log] val DeletedFileSuffix = LogFileUtils.DELETED_FILE_SUFFIX
/** A temporary file that is being used for log cleaning */
private[log] val CleanedFileSuffix = ".cleaned"
/** A temporary file used when swapping files into the log */
private[log] val SwapFileSuffix = ".swap"
/** a directory that is scheduled to be deleted */
private[log] val DeleteDirSuffix = "-delete"
private[log] val DeleteDirSuffix = LogFileUtils.DELETE_DIR_SUFFIX
/** a directory that is used for future partition */
private[log] val FutureDirSuffix = "-future"
@ -688,253 +670,6 @@ object LocalLog extends Logging {
new TopicPartition(topic, partition)
}
private[log] def isIndexFile(file: File): Boolean = {
val fileName = file.getName
fileName.endsWith(LogFileUtils.INDEX_FILE_SUFFIX) || fileName.endsWith(LogFileUtils.TIME_INDEX_FILE_SUFFIX) || fileName.endsWith(LogFileUtils.TXN_INDEX_FILE_SUFFIX)
}
private[log] def isLogFile(file: File): Boolean =
file.getPath.endsWith(LogFileUtils.LOG_FILE_SUFFIX)
/**
* Invokes the provided function and handles any IOException raised by the function by marking the
* provided directory offline.
*
* @param logDirFailureChannel Used to asynchronously handle log directory failure.
* @param logDir The log directory to be marked offline during an IOException.
* @param errorMsg The error message to be used when marking the log directory offline.
* @param fun The function to be executed.
* @return The value returned by the function after a successful invocation
*/
private[log] def maybeHandleIOException[T](logDirFailureChannel: LogDirFailureChannel,
logDir: String,
errorMsg: => String)(fun: => T): T = {
if (logDirFailureChannel.hasOfflineLogDir(logDir)) {
throw new KafkaStorageException(s"The log dir $logDir is already offline due to a previous IO exception.")
}
try {
fun
} catch {
case e: IOException =>
logDirFailureChannel.maybeAddOfflineLogDir(logDir, errorMsg, e)
throw new KafkaStorageException(errorMsg, e)
}
}
/**
* Split a segment into one or more segments such that there is no offset overflow in any of them. The
* resulting segments will contain the exact same messages that are present in the input segment. On successful
* completion of this method, the input segment will be deleted and will be replaced by the resulting new segments.
* See replaceSegments for recovery logic, in case the broker dies in the middle of this operation.
*
* Note that this method assumes we have already determined that the segment passed in contains records that cause
* offset overflow.
*
* The split logic overloads the use of .clean files that LogCleaner typically uses to make the process of replacing
* the input segment with multiple new segments atomic and recoverable in the event of a crash. See replaceSegments
* and completeSwapOperations for the implementation to make this operation recoverable on crashes.</p>
*
* @param segment Segment to split
* @param existingSegments The existing segments of the log
* @param dir The directory in which the log will reside
* @param topicPartition The topic
* @param config The log configuration settings
* @param scheduler The thread pool scheduler used for background actions
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
* @param logPrefix The logging prefix
* @return List of new segments that replace the input segment
*/
private[log] def splitOverflowedSegment(segment: LogSegment,
existingSegments: LogSegments,
dir: File,
topicPartition: TopicPartition,
config: LogConfig,
scheduler: Scheduler,
logDirFailureChannel: LogDirFailureChannel,
logPrefix: String): SplitSegmentResult = {
require(isLogFile(segment.log.file), s"Cannot split file ${segment.log.file.getAbsoluteFile}")
require(segment.hasOverflow, s"Split operation is only permitted for segments with overflow, and the problem path is ${segment.log.file.getAbsoluteFile}")
info(s"${logPrefix}Splitting overflowed segment $segment")
val newSegments = ListBuffer[LogSegment]()
try {
var position = 0
val sourceRecords = segment.log
while (position < sourceRecords.sizeInBytes) {
val firstBatch = sourceRecords.batchesFrom(position).asScala.head
val newSegment = createNewCleanedSegment(dir, config, firstBatch.baseOffset)
newSegments += newSegment
val bytesAppended = newSegment.appendFromFile(sourceRecords, position)
if (bytesAppended == 0)
throw new IllegalStateException(s"Failed to append records from position $position in $segment")
position += bytesAppended
}
// prepare new segments
var totalSizeOfNewSegments = 0
newSegments.foreach { splitSegment =>
splitSegment.onBecomeInactiveSegment()
splitSegment.flush()
splitSegment.setLastModified(segment.lastModified)
totalSizeOfNewSegments += splitSegment.log.sizeInBytes
}
// size of all the new segments combined must equal size of the original segment
if (totalSizeOfNewSegments != segment.log.sizeInBytes)
throw new IllegalStateException("Inconsistent segment sizes after split" +
s" before: ${segment.log.sizeInBytes} after: $totalSizeOfNewSegments")
// replace old segment with new ones
info(s"${logPrefix}Replacing overflowed segment $segment with split segments $newSegments")
val newSegmentsToAdd = newSegments.toSeq
val deletedSegments = LocalLog.replaceSegments(existingSegments, newSegmentsToAdd, List(segment),
dir, topicPartition, config, scheduler, logDirFailureChannel, logPrefix)
SplitSegmentResult(deletedSegments.toSeq, newSegmentsToAdd)
} catch {
case e: Exception =>
newSegments.foreach { splitSegment =>
splitSegment.close()
splitSegment.deleteIfExists()
}
throw e
}
}
/**
* Swap one or more new segment in place and delete one or more existing segments in a crash-safe
* manner. The old segments will be asynchronously deleted.
*
* This method does not need to convert IOException to KafkaStorageException because it is either
* called before all logs are loaded or the caller will catch and handle IOException
*
* The sequence of operations is:
*
* - Cleaner creates one or more new segments with suffix .cleaned and invokes replaceSegments() on
* the Log instance. If broker crashes at this point, the clean-and-swap operation is aborted and
* the .cleaned files are deleted on recovery in LogLoader.
* - New segments are renamed .swap. If the broker crashes before all segments were renamed to .swap, the
* clean-and-swap operation is aborted - .cleaned as well as .swap files are deleted on recovery in
* in LogLoader. We detect this situation by maintaining a specific order in which files are renamed
* from .cleaned to .swap. Basically, files are renamed in descending order of offsets. On recovery,
* all .swap files whose offset is greater than the minimum-offset .clean file are deleted.
* - If the broker crashes after all new segments were renamed to .swap, the operation is completed,
* the swap operation is resumed on recovery as described in the next step.
* - Old segment files are renamed to .deleted and asynchronous delete is scheduled. If the broker
* crashes, any .deleted files left behind are deleted on recovery in LogLoader.
* replaceSegments() is then invoked to complete the swap with newSegment recreated from the
* .swap file and oldSegments containing segments which were not renamed before the crash.
* - Swap segment(s) are renamed to replace the existing segments, completing this operation.
* If the broker crashes, any .deleted files which may be left behind are deleted
* on recovery in LogLoader.
*
* @param existingSegments The existing segments of the log
* @param newSegments The new log segment to add to the log
* @param oldSegments The old log segments to delete from the log
* @param dir The directory in which the log will reside
* @param topicPartition The topic
* @param config The log configuration settings
* @param scheduler The thread pool scheduler used for background actions
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
* @param logPrefix The logging prefix
* @param isRecoveredSwapFile true if the new segment was created from a swap file during recovery after a crash
*/
private[log] def replaceSegments(existingSegments: LogSegments,
newSegments: Seq[LogSegment],
oldSegments: Seq[LogSegment],
dir: File,
topicPartition: TopicPartition,
config: LogConfig,
scheduler: Scheduler,
logDirFailureChannel: LogDirFailureChannel,
logPrefix: String,
isRecoveredSwapFile: Boolean = false): Iterable[LogSegment] = {
val sortedNewSegments = newSegments.sortBy(_.baseOffset)
// Some old segments may have been removed from index and scheduled for async deletion after the caller reads segments
// but before this method is executed. We want to filter out those segments to avoid calling deleteSegmentFiles()
// multiple times for the same segment.
val sortedOldSegments = oldSegments.filter(seg => existingSegments.contains(seg.baseOffset)).sortBy(_.baseOffset)
// need to do this in two phases to be crash safe AND do the delete asynchronously
// if we crash in the middle of this we complete the swap in loadSegments()
if (!isRecoveredSwapFile)
sortedNewSegments.reverse.foreach(_.changeFileSuffixes(CleanedFileSuffix, SwapFileSuffix))
sortedNewSegments.reverse.foreach(existingSegments.add)
val newSegmentBaseOffsets = sortedNewSegments.map(_.baseOffset).toSet
// delete the old files
val deletedNotReplaced = sortedOldSegments.map { seg =>
// remove the index entry
if (seg.baseOffset != sortedNewSegments.head.baseOffset)
existingSegments.remove(seg.baseOffset)
deleteSegmentFiles(
List(seg),
asyncDelete = true,
dir,
topicPartition,
config,
scheduler,
logDirFailureChannel,
logPrefix)
if (newSegmentBaseOffsets.contains(seg.baseOffset)) Option.empty else Some(seg)
}.filter(item => item.isDefined).map(item => item.get)
// okay we are safe now, remove the swap suffix
sortedNewSegments.foreach(_.changeFileSuffixes(SwapFileSuffix, ""))
Utils.flushDir(dir.toPath)
deletedNotReplaced
}
/**
* Perform physical deletion of the index and log files for the given segment.
* Prior to the deletion, the index and log files are renamed by appending .deleted to the
* respective file name. Allows these files to be optionally deleted asynchronously.
*
* This method assumes that the file exists. It does not need to convert IOException
* (thrown from changeFileSuffixes) to KafkaStorageException because it is either called before
* all logs are loaded or the caller will catch and handle IOException.
*
* @param segmentsToDelete The segments to be deleted
* @param asyncDelete If true, the deletion of the segments is done asynchronously
* @param dir The directory in which the log will reside
* @param topicPartition The topic
* @param config The log configuration settings
* @param scheduler The thread pool scheduler used for background actions
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
* @param logPrefix The logging prefix
* @throws IOException if the file can't be renamed and still exists
*/
private[log] def deleteSegmentFiles(segmentsToDelete: immutable.Iterable[LogSegment],
asyncDelete: Boolean,
dir: File,
topicPartition: TopicPartition,
config: LogConfig,
scheduler: Scheduler,
logDirFailureChannel: LogDirFailureChannel,
logPrefix: String): Unit = {
segmentsToDelete.foreach { segment =>
if (!segment.hasSuffix(LogFileUtils.DELETED_FILE_SUFFIX))
segment.changeFileSuffixes("", LogFileUtils.DELETED_FILE_SUFFIX)
}
def deleteSegments(): Unit = {
info(s"${logPrefix}Deleting segment files ${segmentsToDelete.mkString(",")}")
val parentDir = dir.getParent
maybeHandleIOException(logDirFailureChannel, parentDir, s"Error while deleting segments for $topicPartition in dir $parentDir") {
segmentsToDelete.foreach { segment =>
segment.deleteIfExists()
}
}
}
if (asyncDelete)
scheduler.scheduleOnce("delete-file", () => deleteSegments(), config.fileDeleteDelayMs)
else
deleteSegments()
}
private[log] def emptyFetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata,
includeAbortedTxns: Boolean): FetchDataInfo = {
val abortedTransactions: Optional[java.util.List[FetchResponseData.AbortedTransaction]] =
@ -946,11 +681,6 @@ object LocalLog extends Logging {
abortedTransactions)
}
private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = {
LogSegment.deleteIfExists(dir, baseOffset, CleanedFileSuffix)
LogSegment.open(dir, baseOffset, logConfig, Time.SYSTEM, false, logConfig.initFileSize, logConfig.preallocate, CleanedFileSuffix)
}
/**
* Wraps the value of iterator.next() in an option.
* Note: this facility is a part of the Iterator class starting from scala v2.13.

View File

@ -1,528 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import java.io.{File, IOException}
import java.nio.file.{Files, NoSuchFileException}
import kafka.log.UnifiedLog.{CleanedFileSuffix, SwapFileSuffix, isIndexFile, isLogFile, offsetFromFile}
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InvalidOffsetException
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.snapshot.Snapshots
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{CorruptIndexException, LoadedLogOffsets, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogSegment, LogSegmentOffsetOverflowException, LogSegments, ProducerStateManager}
import java.util.Optional
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Set, mutable}
import scala.jdk.CollectionConverters._
/**
* @param dir The directory from which log segments need to be loaded
* @param topicPartition The topic partition associated with the log being loaded
* @param config The configuration settings for the log being loaded
* @param scheduler The thread pool scheduler used for background actions
* @param time The time instance used for checking the clock
* @param logDirFailureChannel The LogDirFailureChannel instance to asynchronously handle log
* directory failure
* @param hadCleanShutdown Boolean flag to indicate whether the associated log previously had a
* clean shutdown
* @param segments The LogSegments instance into which segments recovered from disk will be
* populated
* @param logStartOffsetCheckpoint The checkpoint of the log start offset
* @param recoveryPointCheckpoint The checkpoint of the offset at which to begin the recovery
* @param leaderEpochCache An optional LeaderEpochFileCache instance to be updated during recovery
* @param producerStateManager The ProducerStateManager instance to be updated during recovery
* @param numRemainingSegments The remaining segments to be recovered in this log keyed by recovery thread name
* @param isRemoteLogEnabled Boolean flag to indicate whether the remote storage is enabled or not
*/
class LogLoader(
dir: File,
topicPartition: TopicPartition,
config: LogConfig,
scheduler: Scheduler,
time: Time,
logDirFailureChannel: LogDirFailureChannel,
hadCleanShutdown: Boolean,
segments: LogSegments,
logStartOffsetCheckpoint: Long,
recoveryPointCheckpoint: Long,
leaderEpochCache: Optional[LeaderEpochFileCache],
producerStateManager: ProducerStateManager,
numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int],
isRemoteLogEnabled: Boolean = false,
) extends Logging {
logIdent = s"[LogLoader partition=$topicPartition, dir=${dir.getParent}] "
/**
* Load the log segments from the log files on disk, and returns the components of the loaded log.
* Additionally, it also suitably updates the provided LeaderEpochFileCache and ProducerStateManager
* to reflect the contents of the loaded log.
*
* In the context of the calling thread, this function does not need to convert IOException to
* KafkaStorageException because it is only called before all logs are loaded.
*
* @return the offsets of the Log successfully loaded from disk
*
* @throws LogSegmentOffsetOverflowException if we encounter a .swap file with messages that
* overflow index offset
*/
def load(): LoadedLogOffsets = {
// First pass: through the files in the log directory and remove any temporary files
// and find any interrupted swap operations
val swapFiles = removeTempFilesAndCollectSwapFiles()
// The remaining valid swap files must come from compaction or segment split operation. We can
// simply rename them to regular segment files. But, before renaming, we should figure out which
// segments are compacted/split and delete these segment files: this is done by calculating
// min/maxSwapFileOffset.
// We store segments that require renaming in this code block, and do the actual renaming later.
var minSwapFileOffset = Long.MaxValue
var maxSwapFileOffset = Long.MinValue
swapFiles.filter(f => UnifiedLog.isLogFile(new File(Utils.replaceSuffix(f.getPath, SwapFileSuffix, "")))).foreach { f =>
val baseOffset = offsetFromFile(f)
val segment = LogSegment.open(f.getParentFile,
baseOffset,
config,
time,
false,
0,
false,
UnifiedLog.SwapFileSuffix)
info(s"Found log file ${f.getPath} from interrupted swap operation, which is recoverable from ${UnifiedLog.SwapFileSuffix} files by renaming.")
minSwapFileOffset = Math.min(segment.baseOffset, minSwapFileOffset)
maxSwapFileOffset = Math.max(segment.readNextOffset, maxSwapFileOffset)
}
// Second pass: delete segments that are between minSwapFileOffset and maxSwapFileOffset. As
// discussed above, these segments were compacted or split but haven't been renamed to .delete
// before shutting down the broker.
for (file <- dir.listFiles if file.isFile) {
try {
if (!file.getName.endsWith(SwapFileSuffix)) {
val offset = offsetFromFile(file)
if (offset >= minSwapFileOffset && offset < maxSwapFileOffset) {
info(s"Deleting segment files ${file.getName} that is compacted but has not been deleted yet.")
file.delete()
}
}
} catch {
// offsetFromFile with files that do not include an offset in the file name
case _: StringIndexOutOfBoundsException =>
case _: NumberFormatException =>
}
}
// Third pass: rename all swap files.
for (file <- dir.listFiles if file.isFile) {
if (file.getName.endsWith(SwapFileSuffix)) {
info(s"Recovering file ${file.getName} by renaming from ${UnifiedLog.SwapFileSuffix} files.")
file.renameTo(new File(Utils.replaceSuffix(file.getPath, UnifiedLog.SwapFileSuffix, "")))
}
}
// Fourth pass: load all the log and index files.
// We might encounter legacy log segments with offset overflow (KAFKA-6264). We need to split such segments. When
// this happens, restart loading segment files from scratch.
retryOnOffsetOverflow(() => {
// In case we encounter a segment with offset overflow, the retry logic will split it after which we need to retry
// loading of segments. In that case, we also need to close all segments that could have been left open in previous
// call to loadSegmentFiles().
segments.close()
segments.clear()
loadSegmentFiles()
})
val (newRecoveryPoint: Long, nextOffset: Long) = {
if (!dir.getAbsolutePath.endsWith(UnifiedLog.DeleteDirSuffix)) {
val (newRecoveryPoint, nextOffset) = retryOnOffsetOverflow(recoverLog)
// reset the index size of the currently active log segment to allow more entries
segments.lastSegment.get.resizeIndexes(config.maxIndexSize)
(newRecoveryPoint, nextOffset)
} else {
if (segments.isEmpty) {
segments.add(
LogSegment.open(
dir,
0,
config,
time,
config.initFileSize,
false))
}
(0L, 0L)
}
}
leaderEpochCache.ifPresent(_.truncateFromEndAsyncFlush(nextOffset))
val newLogStartOffset = if (isRemoteLogEnabled) {
logStartOffsetCheckpoint
} else {
math.max(logStartOffsetCheckpoint, segments.firstSegment.get.baseOffset)
}
// The earliest leader epoch may not be flushed during a hard failure. Recover it here.
leaderEpochCache.ifPresent(_.truncateFromStartAsyncFlush(logStartOffsetCheckpoint))
// Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
// from scratch.
if (!producerStateManager.isEmpty)
throw new IllegalStateException("Producer state must be empty during log initialization")
// Reload all snapshots into the ProducerStateManager cache, the intermediate ProducerStateManager used
// during log recovery may have deleted some files without the LogLoader.producerStateManager instance witnessing the
// deletion.
producerStateManager.removeStraySnapshots(segments.baseOffsets)
UnifiedLog.rebuildProducerState(
producerStateManager,
segments,
newLogStartOffset,
nextOffset,
config.recordVersion,
time,
reloadFromCleanShutdown = hadCleanShutdown,
logIdent)
val activeSegment = segments.lastSegment.get
new LoadedLogOffsets(
newLogStartOffset,
newRecoveryPoint,
new LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size))
}
/**
* Removes any temporary files found in log directory, and creates a list of all .swap files which could be swapped
* in place of existing segment(s). For log splitting, we know that any .swap file whose base offset is higher than
* the smallest offset .clean file could be part of an incomplete split operation. Such .swap files are also deleted
* by this method.
*
* @return Set of .swap files that are valid to be swapped in as segment files and index files
*/
private def removeTempFilesAndCollectSwapFiles(): Set[File] = {
val swapFiles = mutable.Set[File]()
val cleanedFiles = mutable.Set[File]()
var minCleanedFileOffset = Long.MaxValue
for (file <- dir.listFiles if file.isFile) {
if (!file.canRead)
throw new IOException(s"Could not read file $file")
val filename = file.getName
// Delete stray files marked for deletion, but skip KRaft snapshots.
// These are handled in the recovery logic in `KafkaMetadataLog`.
if (filename.endsWith(LogFileUtils.DELETED_FILE_SUFFIX) && !filename.endsWith(Snapshots.DELETE_SUFFIX)) {
debug(s"Deleting stray temporary file ${file.getAbsolutePath}")
Files.deleteIfExists(file.toPath)
} else if (filename.endsWith(CleanedFileSuffix)) {
minCleanedFileOffset = Math.min(offsetFromFile(file), minCleanedFileOffset)
cleanedFiles += file
} else if (filename.endsWith(SwapFileSuffix)) {
swapFiles += file
}
}
// KAFKA-6264: Delete all .swap files whose base offset is greater than the minimum .cleaned segment offset. Such .swap
// files could be part of an incomplete split operation that could not complete. See Log#splitOverflowedSegment
// for more details about the split operation.
val (invalidSwapFiles, validSwapFiles) = swapFiles.partition(file => offsetFromFile(file) >= minCleanedFileOffset)
invalidSwapFiles.foreach { file =>
debug(s"Deleting invalid swap file ${file.getAbsoluteFile} minCleanedFileOffset: $minCleanedFileOffset")
Files.deleteIfExists(file.toPath)
}
// Now that we have deleted all .swap files that constitute an incomplete split operation, let's delete all .clean files
cleanedFiles.foreach { file =>
debug(s"Deleting stray .clean file ${file.getAbsolutePath}")
Files.deleteIfExists(file.toPath)
}
validSwapFiles
}
/**
* Retries the provided function only whenever an LogSegmentOffsetOverflowException is raised by
* it during execution. Before every retry, the overflowed segment is split into one or more segments
* such that there is no offset overflow in any of them.
*
* @param fn The function to be executed
* @return The value returned by the function, if successful
* @throws Exception whenever the executed function throws any exception other than
* LogSegmentOffsetOverflowException, the same exception is raised to the caller
*/
private def retryOnOffsetOverflow[T](fn: () => T): T = {
while (true) {
try {
return fn()
} catch {
case e: LogSegmentOffsetOverflowException =>
info(s"Caught segment overflow error: ${e.getMessage}. Split segment and retry.")
val result = UnifiedLog.splitOverflowedSegment(
e.segment,
segments,
dir,
topicPartition,
config,
scheduler,
logDirFailureChannel,
logIdent)
deleteProducerSnapshotsAsync(result.deletedSegments)
}
}
throw new IllegalStateException()
}
/**
* Loads segments from disk into the provided params.segments.
*
* This method does not need to convert IOException to KafkaStorageException because it is only called before all logs are loaded.
* It is possible that we encounter a segment with index offset overflow in which case the LogSegmentOffsetOverflowException
* will be thrown. Note that any segments that were opened before we encountered the exception will remain open and the
* caller is responsible for closing them appropriately, if needed.
*
* @throws LogSegmentOffsetOverflowException if the log directory contains a segment with messages that overflow the index offset
*/
private def loadSegmentFiles(): Unit = {
// load segments in ascending order because transactional data from one segment may depend on the
// segments that come before it
for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) {
if (isIndexFile(file)) {
// if it is an index file, make sure it has a corresponding .log file
val offset = offsetFromFile(file)
val logFile = LogFileUtils.logFile(dir, offset)
if (!logFile.exists) {
warn(s"Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.")
Files.deleteIfExists(file.toPath)
}
} else if (isLogFile(file)) {
// if it's a log file, load the corresponding log segment
val baseOffset = offsetFromFile(file)
val timeIndexFileNewlyCreated = !LogFileUtils.timeIndexFile(dir, baseOffset).exists()
val segment = LogSegment.open(
dir,
baseOffset,
config,
time,
true,
0,
false,
"")
try segment.sanityCheck(timeIndexFileNewlyCreated)
catch {
case _: NoSuchFileException =>
if (hadCleanShutdown || segment.baseOffset < recoveryPointCheckpoint)
error(s"Could not find offset index file corresponding to log file" +
s" ${segment.log.file.getAbsolutePath}, recovering segment and rebuilding index files...")
recoverSegment(segment)
case e: CorruptIndexException =>
warn(s"Found a corrupted index file corresponding to log file" +
s" ${segment.log.file.getAbsolutePath} due to ${e.getMessage}}, recovering segment and" +
" rebuilding index files...")
recoverSegment(segment)
}
segments.add(segment)
}
}
}
/**
* Just recovers the given segment, without adding it to the provided params.segments.
*
* @param segment Segment to recover
*
* @return The number of bytes truncated from the segment
*
* @throws LogSegmentOffsetOverflowException if the segment contains messages that cause index offset overflow
*/
private def recoverSegment(segment: LogSegment): Int = {
val producerStateManager = new ProducerStateManager(
topicPartition,
dir,
this.producerStateManager.maxTransactionTimeoutMs(),
this.producerStateManager.producerStateManagerConfig(),
time)
UnifiedLog.rebuildProducerState(
producerStateManager,
segments,
logStartOffsetCheckpoint,
segment.baseOffset,
config.recordVersion,
time,
reloadFromCleanShutdown = false,
logIdent)
val bytesTruncated = segment.recover(producerStateManager, leaderEpochCache)
// once we have recovered the segment's data, take a snapshot to ensure that we won't
// need to reload the same segment again while recovering another segment.
producerStateManager.takeSnapshot()
bytesTruncated
}
/**
* Recover the log segments (if there was an unclean shutdown). Ensures there is at least one
* active segment, and returns the updated recovery point and next offset after recovery. Along
* the way, the method suitably updates the LeaderEpochFileCache or ProducerStateManager inside
* the provided LogComponents.
*
* This method does not need to convert IOException to KafkaStorageException because it is only
* called before all logs are loaded.
*
* @return a tuple containing (newRecoveryPoint, nextOffset).
*
* @throws LogSegmentOffsetOverflowException if we encountered a legacy segment with offset overflow
*/
private[log] def recoverLog(): (Long, Long) = {
/** return the log end offset if valid */
def deleteSegmentsIfLogStartGreaterThanLogEnd(): Option[Long] = {
if (segments.nonEmpty) {
val logEndOffset = segments.lastSegment.get.readNextOffset
if (logEndOffset >= logStartOffsetCheckpoint)
Some(logEndOffset)
else {
warn(s"Deleting all segments because logEndOffset ($logEndOffset) " +
s"is smaller than logStartOffset $logStartOffsetCheckpoint. " +
"This could happen if segment files were deleted from the file system.")
removeAndDeleteSegmentsAsync(segments.values.asScala)
leaderEpochCache.ifPresent(_.clearAndFlush())
producerStateManager.truncateFullyAndStartAt(logStartOffsetCheckpoint)
None
}
} else None
}
// If we have the clean shutdown marker, skip recovery.
if (!hadCleanShutdown) {
val unflushed = segments.values(recoveryPointCheckpoint, Long.MaxValue)
val numUnflushed = unflushed.size
val unflushedIter = unflushed.iterator
var truncated = false
var numFlushed = 0
val threadName = Thread.currentThread().getName
numRemainingSegments.put(threadName, numUnflushed)
while (unflushedIter.hasNext && !truncated) {
val segment = unflushedIter.next()
info(s"Recovering unflushed segment ${segment.baseOffset}. $numFlushed/$numUnflushed recovered for $topicPartition.")
val truncatedBytes =
try {
recoverSegment(segment)
} catch {
case _: InvalidOffsetException =>
val startOffset = segment.baseOffset
warn(s"Found invalid offset during recovery. Deleting the" +
s" corrupt segment and creating an empty one with starting offset $startOffset")
segment.truncateTo(startOffset)
}
if (truncatedBytes > 0) {
// we had an invalid message, delete all remaining log
warn(s"Corruption found in segment ${segment.baseOffset}," +
s" truncating to offset ${segment.readNextOffset}")
val unflushedRemaining = new ArrayBuffer[LogSegment]
unflushedIter.forEachRemaining(s => unflushedRemaining += s)
removeAndDeleteSegmentsAsync(unflushedRemaining)
truncated = true
// segment is truncated, so set remaining segments to 0
numRemainingSegments.put(threadName, 0)
} else {
numFlushed += 1
numRemainingSegments.put(threadName, numUnflushed - numFlushed)
}
}
}
val logEndOffsetOption = deleteSegmentsIfLogStartGreaterThanLogEnd()
if (segments.isEmpty) {
// no existing segments, create a new mutable segment beginning at logStartOffset
segments.add(
LogSegment.open(
dir,
logStartOffsetCheckpoint,
config,
time,
config.initFileSize,
config.preallocate))
}
// Update the recovery point if there was a clean shutdown and did not perform any changes to
// the segment. Otherwise, we just ensure that the recovery point is not ahead of the log end
// offset. To ensure correctness and to make it easier to reason about, it's best to only advance
// the recovery point when the log is flushed. If we advanced the recovery point here, we could
// skip recovery for unflushed segments if the broker crashed after we checkpoint the recovery
// point and before we flush the segment.
(hadCleanShutdown, logEndOffsetOption) match {
case (true, Some(logEndOffset)) =>
(logEndOffset, logEndOffset)
case _ =>
val logEndOffset = logEndOffsetOption.getOrElse(segments.lastSegment.get.readNextOffset)
(Math.min(recoveryPointCheckpoint, logEndOffset), logEndOffset)
}
}
/**
* This method deletes the given log segments and the associated producer snapshots, by doing the
* following for each of them:
* - It removes the segment from the segment map so that it will no longer be used for reads.
* - It schedules asynchronous deletion of the segments that allows reads to happen concurrently without
* synchronization and without the possibility of physically deleting a file while it is being
* read.
*
* This method does not need to convert IOException to KafkaStorageException because it is either
* called before all logs are loaded or the immediate caller will catch and handle IOException
*
* @param segmentsToDelete The log segments to schedule for deletion
*/
private def removeAndDeleteSegmentsAsync(segmentsToDelete: Iterable[LogSegment]): Unit = {
if (segmentsToDelete.nonEmpty) {
// Most callers hold an iterator into the `params.segments` collection and
// `removeAndDeleteSegmentAsync` mutates it by removing the deleted segment. Therefore,
// we should force materialization of the iterator here, so that results of the iteration
// remain valid and deterministic. We should also pass only the materialized view of the
// iterator to the logic that deletes the segments.
val toDelete = segmentsToDelete.toList
info(s"Deleting segments as part of log recovery: ${toDelete.mkString(",")}")
toDelete.foreach { segment =>
segments.remove(segment.baseOffset)
}
UnifiedLog.deleteSegmentFiles(
toDelete,
asyncDelete = true,
dir,
topicPartition,
config,
scheduler,
logDirFailureChannel,
logIdent)
deleteProducerSnapshotsAsync(segmentsToDelete)
}
}
private def deleteProducerSnapshotsAsync(segments: Iterable[LogSegment]): Unit = {
UnifiedLog.deleteProducerSnapshots(segments,
producerStateManager,
asyncDelete = true,
scheduler,
config,
logDirFailureChannel,
dir.getParent,
topicPartition)
}
}

View File

@ -328,7 +328,7 @@ class LogManager(logDirs: Seq[File],
logStartOffsets: util.Map[TopicPartition, JLong],
defaultConfig: LogConfig,
topicConfigOverrides: Map[String, LogConfig],
numRemainingSegments: ConcurrentMap[String, Int],
numRemainingSegments: ConcurrentMap[String, Integer],
isStray: UnifiedLog => Boolean): UnifiedLog = {
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
val config = topicConfigOverrides.getOrElse(topicPartition.topic, defaultConfig)
@ -421,7 +421,7 @@ class LogManager(logDirs: Seq[File],
// log dir path -> number of Remaining logs map for remainingLogsToRecover metric
val numRemainingLogs: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int]
// log recovery thread name -> number of remaining segments map for remainingSegmentsToRecover metric
val numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int]
val numRemainingSegments: ConcurrentMap[String, Integer] = new ConcurrentHashMap[String, Integer]
def handleIOException(logDirAbsolutePath: String, e: IOException): Unit = {
offlineDirs.add((logDirAbsolutePath, e))
@ -550,7 +550,7 @@ class LogManager(logDirs: Seq[File],
}
private[log] def addLogRecoveryMetrics(numRemainingLogs: ConcurrentMap[String, Int],
numRemainingSegments: ConcurrentMap[String, Int]): Unit = {
numRemainingSegments: ConcurrentMap[String, Integer]): Unit = {
debug("Adding log recovery metrics")
for (dir <- logDirs) {
metricsGroup.newGauge("remainingLogsToRecover", () => numRemainingLogs.get(dir.getAbsolutePath),

View File

@ -39,10 +39,12 @@ import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, VerificationGuard}
import org.apache.kafka.storage.internals.log.LocalLog.SplitSegmentResult
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LocalLog => JLocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, UnifiedLog => JUnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats}
import java.io.{File, IOException}
import java.lang.{Long => JLong}
import java.nio.file.{Files, Path}
import java.util
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, ScheduledFuture}
@ -50,7 +52,7 @@ import java.util.stream.Collectors
import java.util.{Collections, Optional, OptionalInt, OptionalLong}
import scala.annotation.nowarn
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.{Seq, immutable, mutable}
import scala.collection.{Seq, mutable}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
@ -551,8 +553,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
private def rebuildProducerState(lastOffset: Long,
producerStateManager: ProducerStateManager): Unit = lock synchronized {
localLog.checkIfMemoryMappedBufferClosed()
UnifiedLog.rebuildProducerState(producerStateManager, localLog.segments, logStartOffset, lastOffset, recordVersion, time,
reloadFromCleanShutdown = false, logIdent)
JUnifiedLog.rebuildProducerState(producerStateManager, localLog.segments, logStartOffset, lastOffset, recordVersion, time, false, logIdent)
}
@threadsafe
@ -905,7 +906,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
updateHighWatermarkWithLogEndOffset()
// update the producer state
updatedProducers.values.foreach(producerAppendInfo => producerStateManager.update(producerAppendInfo))
updatedProducers.values.forEach(producerAppendInfo => producerStateManager.update(producerAppendInfo))
// update the transaction index with the true last stable offset. The last offset visible
// to consumers using READ_COMMITTED will be limited by this value and the high watermark.
@ -1025,8 +1026,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
records: MemoryRecords,
origin: AppendOrigin,
requestVerificationGuard: VerificationGuard):
(mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[BatchMetadata]) = {
val updatedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
(util.Map[JLong, ProducerAppendInfo], List[CompletedTxn], Option[BatchMetadata]) = {
val updatedProducers = new util.HashMap[JLong, ProducerAppendInfo]
val completedTxns = ListBuffer.empty[CompletedTxn]
var relativePositionInSegment = appendOffsetMetadata.relativePositionInSegment
@ -1067,12 +1068,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// We cache offset metadata for the start of each transaction. This allows us to
// compute the last stable offset without relying on additional index lookups.
val firstOffsetMetadata = if (batch.isTransactional)
Some(new LogOffsetMetadata(batch.baseOffset, appendOffsetMetadata.segmentBaseOffset, relativePositionInSegment))
Optional.of(new LogOffsetMetadata(batch.baseOffset, appendOffsetMetadata.segmentBaseOffset, relativePositionInSegment))
else
None
Optional.empty[LogOffsetMetadata]
val maybeCompletedTxn = updateProducers(producerStateManager, batch, updatedProducers, firstOffsetMetadata, origin)
maybeCompletedTxn.foreach(completedTxns += _)
val maybeCompletedTxn = JUnifiedLog.updateProducers(producerStateManager, batch, updatedProducers, firstOffsetMetadata, origin)
maybeCompletedTxn.ifPresent(ct => completedTxns += ct)
}
relativePositionInSegment += batch.sizeInBytes
@ -1553,7 +1554,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// remove the segments for lookups
localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason)
}
deleteProducerSnapshots(deletable, asyncDelete = true)
deleteProducerSnapshots(deletable.toList.asJava, asyncDelete = true)
}
numToDelete
}
@ -1840,7 +1841,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
truncateFullyAndStartAt(targetOffset)
} else {
val deletedSegments = localLog.truncateTo(targetOffset)
deleteProducerSnapshots(deletedSegments, asyncDelete = true)
deleteProducerSnapshots(deletedSegments.toList.asJava, asyncDelete = true)
leaderEpochCache.foreach(_.truncateFromEndAsyncFlush(targetOffset))
logStartOffset = math.min(targetOffset, logStartOffset)
rebuildProducerState(targetOffset, producerStateManager)
@ -1921,7 +1922,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
localLog.checkIfMemoryMappedBufferClosed()
val deletedSegments = UnifiedLog.replaceSegments(localLog.segments, newSegments, oldSegments, dir, topicPartition,
config, scheduler, logDirFailureChannel, logIdent)
deleteProducerSnapshots(deletedSegments, asyncDelete = true)
deleteProducerSnapshots(deletedSegments.toList.asJava, asyncDelete = true)
}
}
@ -1932,8 +1933,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* Currently, it is used by LogCleaner threads on log compact non-active segments only with LogCleanerManager's lock
* to ensure no other logcleaner threads and retention thread can work on the same segment.
*/
private[log] def getFirstBatchTimestampForSegments(segments: util.Collection[LogSegment]): util.Collection[java.lang.Long] = {
segments.stream().map[java.lang.Long](s => s.getFirstBatchTimestamp).collect(Collectors.toList())
private[log] def getFirstBatchTimestampForSegments(segments: util.Collection[LogSegment]): util.Collection[JLong] = {
segments.stream().map[JLong](s => s.getFirstBatchTimestamp).collect(Collectors.toList())
}
/**
@ -1954,19 +1955,17 @@ class UnifiedLog(@volatile var logStartOffset: Long,
private[log] def addSegment(segment: LogSegment): LogSegment = localLog.segments.add(segment)
private def maybeHandleIOException[T](msg: => String)(fun: => T): T = {
LocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, msg) {
fun
}
JLocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, () => msg, () => fun)
}
private[log] def splitOverflowedSegment(segment: LogSegment): List[LogSegment] = lock synchronized {
val result = UnifiedLog.splitOverflowedSegment(segment, localLog.segments, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent)
deleteProducerSnapshots(result.deletedSegments, asyncDelete = true)
result.newSegments.toList
result.newSegments.asScala.toList
}
private[log] def deleteProducerSnapshots(segments: Iterable[LogSegment], asyncDelete: Boolean): Unit = {
UnifiedLog.deleteProducerSnapshots(segments, producerStateManager, asyncDelete, scheduler, config, logDirFailureChannel, parentDir, topicPartition)
private[log] def deleteProducerSnapshots(segments: util.List[LogSegment], asyncDelete: Boolean): Unit = {
JUnifiedLog.deleteProducerSnapshots(segments, producerStateManager, asyncDelete, scheduler, config, logDirFailureChannel, parentDir, topicPartition)
}
}
@ -1979,11 +1978,11 @@ object UnifiedLog extends Logging {
val TxnIndexFileSuffix: String = LogFileUtils.TXN_INDEX_FILE_SUFFIX
val CleanedFileSuffix: String = LocalLog.CleanedFileSuffix
val CleanedFileSuffix: String = LogFileUtils.CLEANED_FILE_SUFFIX
val SwapFileSuffix: String = LocalLog.SwapFileSuffix
val SwapFileSuffix: String = LogFileUtils.SWAP_FILE_SUFFIX
val DeleteDirSuffix: String = LocalLog.DeleteDirSuffix
val DeleteDirSuffix: String = LogFileUtils.DELETE_DIR_SUFFIX
val StrayDirSuffix: String = LocalLog.StrayDirSuffix
@ -2019,7 +2018,7 @@ object UnifiedLog extends Logging {
lastShutdownClean: Boolean = true,
topicId: Option[Uuid],
keepPartitionMetadataFile: Boolean,
numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int],
numRemainingSegments: ConcurrentMap[String, Integer] = new ConcurrentHashMap[String, Integer],
remoteStorageSystemEnable: Boolean = false,
logOffsetsListener: LogOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER): UnifiedLog = {
// create the log directory if it doesn't exist
@ -2086,42 +2085,6 @@ object UnifiedLog extends Logging {
def parseTopicPartitionName(dir: File): TopicPartition = LocalLog.parseTopicPartitionName(dir)
private[log] def isIndexFile(file: File): Boolean = LocalLog.isIndexFile(file)
private[log] def isLogFile(file: File): Boolean = LocalLog.isLogFile(file)
private def loadProducersFromRecords(producerStateManager: ProducerStateManager, records: Records): Unit = {
val loadedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
val completedTxns = ListBuffer.empty[CompletedTxn]
records.batches.forEach { batch =>
if (batch.hasProducerId) {
val maybeCompletedTxn = updateProducers(
producerStateManager,
batch,
loadedProducers,
firstOffsetMetadata = None,
origin = AppendOrigin.REPLICATION)
maybeCompletedTxn.foreach(completedTxns += _)
}
}
loadedProducers.values.foreach(producerStateManager.update)
completedTxns.foreach(producerStateManager.completeTxn)
}
private def updateProducers(producerStateManager: ProducerStateManager,
batch: RecordBatch,
producers: mutable.Map[Long, ProducerAppendInfo],
firstOffsetMetadata: Option[LogOffsetMetadata],
origin: AppendOrigin): Option[CompletedTxn] = {
val producerId = batch.producerId
val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, origin))
val completedTxn = appendInfo.append(batch, firstOffsetMetadata.asJava).asScala
// Whether we wrote a control marker or a data batch, we can remove VerificationGuard since either the transaction is complete or we have a first offset.
if (batch.isTransactional)
producerStateManager.clearVerificationStateEntry(producerId)
completedTxn
}
/**
* If the recordVersion is >= RecordVersion.V2, create a new LeaderEpochFileCache instance.
* Loading the epoch entries from the backing checkpoint file or the provided currentCache if not empty.
@ -2169,124 +2132,16 @@ object UnifiedLog extends Logging {
logDirFailureChannel: LogDirFailureChannel,
logPrefix: String,
isRecoveredSwapFile: Boolean = false): Iterable[LogSegment] = {
LocalLog.replaceSegments(existingSegments,
newSegments,
oldSegments,
JLocalLog.replaceSegments(existingSegments,
newSegments.asJava,
oldSegments.asJava,
dir,
topicPartition,
config,
scheduler,
logDirFailureChannel,
logPrefix,
isRecoveredSwapFile)
}
private[log] def deleteSegmentFiles(segmentsToDelete: immutable.Iterable[LogSegment],
asyncDelete: Boolean,
dir: File,
topicPartition: TopicPartition,
config: LogConfig,
scheduler: Scheduler,
logDirFailureChannel: LogDirFailureChannel,
logPrefix: String): Unit = {
LocalLog.deleteSegmentFiles(segmentsToDelete, asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logPrefix)
}
/**
* Rebuilds producer state until the provided lastOffset. This function may be called from the
* recovery code path, and thus must be free of all side-effects, i.e. it must not update any
* log-specific state.
*
* @param producerStateManager The ProducerStateManager instance to be rebuilt.
* @param segments The segments of the log whose producer state is being rebuilt
* @param logStartOffset The log start offset
* @param lastOffset The last offset upto which the producer state needs to be rebuilt
* @param recordVersion The record version
* @param time The time instance used for checking the clock
* @param reloadFromCleanShutdown True if the producer state is being built after a clean shutdown,
* false otherwise.
* @param logPrefix The logging prefix
*/
private[log] def rebuildProducerState(producerStateManager: ProducerStateManager,
segments: LogSegments,
logStartOffset: Long,
lastOffset: Long,
recordVersion: RecordVersion,
time: Time,
reloadFromCleanShutdown: Boolean,
logPrefix: String): Unit = {
val offsetsToSnapshot =
if (segments.nonEmpty) {
val lastSegmentBaseOffset = segments.lastSegment.get.baseOffset
val nextLatestSegmentBaseOffset = segments.lowerSegment(lastSegmentBaseOffset).asScala.map(_.baseOffset)
Seq(nextLatestSegmentBaseOffset, Some(lastSegmentBaseOffset), Some(lastOffset))
} else {
Seq(Some(lastOffset))
}
info(s"${logPrefix}Loading producer state till offset $lastOffset with message format version ${recordVersion.value}")
// We want to avoid unnecessary scanning of the log to build the producer state when the broker is being
// upgraded. The basic idea is to use the absence of producer snapshot files to detect the upgrade case,
// but we have to be careful not to assume too much in the presence of broker failures. The two most common
// upgrade cases in which we expect to find no snapshots are the following:
//
// 1. The broker has been upgraded, but the topic is still on the old message format.
// 2. The broker has been upgraded, the topic is on the new message format, and we had a clean shutdown.
//
// If we hit either of these cases, we skip producer state loading and write a new snapshot at the log end
// offset (see below). The next time the log is reloaded, we will load producer state using this snapshot
// (or later snapshots). Otherwise, if there is no snapshot file, then we have to rebuild producer state
// from the first segment.
if (recordVersion.value < RecordBatch.MAGIC_VALUE_V2 ||
(!producerStateManager.latestSnapshotOffset.isPresent && reloadFromCleanShutdown)) {
// To avoid an expensive scan through all of the segments, we take empty snapshots from the start of the
// last two segments and the last offset. This should avoid the full scan in the case that the log needs
// truncation.
offsetsToSnapshot.flatten.foreach { offset =>
producerStateManager.updateMapEndOffset(offset)
producerStateManager.takeSnapshot()
}
} else {
info(s"${logPrefix}Reloading from producer snapshot and rebuilding producer state from offset $lastOffset")
val isEmptyBeforeTruncation = producerStateManager.isEmpty && producerStateManager.mapEndOffset >= lastOffset
val producerStateLoadStart = time.milliseconds()
producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds())
val segmentRecoveryStart = time.milliseconds()
// Only do the potentially expensive reloading if the last snapshot offset is lower than the log end
// offset (which would be the case on first startup) and there were active producers prior to truncation
// (which could be the case if truncating after initial loading). If there weren't, then truncating
// shouldn't change that fact (although it could cause a producerId to expire earlier than expected),
// and we can skip the loading. This is an optimization for users which are not yet using
// idempotent/transactional features yet.
if (lastOffset > producerStateManager.mapEndOffset && !isEmptyBeforeTruncation) {
val segmentOfLastOffset = segments.floorSegment(lastOffset)
segments.values(producerStateManager.mapEndOffset, lastOffset).forEach { segment =>
val startOffset = Utils.max(segment.baseOffset, producerStateManager.mapEndOffset, logStartOffset)
producerStateManager.updateMapEndOffset(startOffset)
if (offsetsToSnapshot.contains(Some(segment.baseOffset)))
producerStateManager.takeSnapshot()
val maxPosition = if (segmentOfLastOffset.isPresent && segmentOfLastOffset.get == segment) {
Option(segment.translateOffset(lastOffset))
.map(_.position)
.getOrElse(segment.size)
} else {
segment.size
}
val fetchDataInfo = segment.read(startOffset, Int.MaxValue, maxPosition)
if (fetchDataInfo != null)
loadProducersFromRecords(producerStateManager, fetchDataInfo.records)
}
}
producerStateManager.updateMapEndOffset(lastOffset)
producerStateManager.takeSnapshot()
info(s"${logPrefix}Producer state recovery took ${segmentRecoveryStart - producerStateLoadStart}ms for snapshot load " +
s"and ${time.milliseconds() - segmentRecoveryStart}ms for segment recovery from offset $lastOffset")
}
isRecoveredSwapFile).asScala
}
private[log] def splitOverflowedSegment(segment: LogSegment,
@ -2297,39 +2152,11 @@ object UnifiedLog extends Logging {
scheduler: Scheduler,
logDirFailureChannel: LogDirFailureChannel,
logPrefix: String): SplitSegmentResult = {
LocalLog.splitOverflowedSegment(segment, existingSegments, dir, topicPartition, config, scheduler, logDirFailureChannel, logPrefix)
}
private[log] def deleteProducerSnapshots(segments: Iterable[LogSegment],
producerStateManager: ProducerStateManager,
asyncDelete: Boolean,
scheduler: Scheduler,
config: LogConfig,
logDirFailureChannel: LogDirFailureChannel,
parentDir: String,
topicPartition: TopicPartition): Unit = {
val snapshotsToDelete = segments.flatMap { segment =>
producerStateManager.removeAndMarkSnapshotForDeletion(segment.baseOffset).asScala
}
def deleteProducerSnapshots(): Unit = {
LocalLog.maybeHandleIOException(logDirFailureChannel,
parentDir,
s"Error while deleting producer state snapshots for $topicPartition in dir $parentDir") {
snapshotsToDelete.foreach { snapshot =>
snapshot.deleteIfExists()
}
}
}
if (asyncDelete)
scheduler.scheduleOnce("delete-producer-snapshot", () => deleteProducerSnapshots(), config.fileDeleteDelayMs)
else
deleteProducerSnapshots()
JLocalLog.splitOverflowedSegment(segment, existingSegments, dir, topicPartition, config, scheduler, logDirFailureChannel, logPrefix)
}
private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = {
LocalLog.createNewCleanedSegment(dir, logConfig, baseOffset)
JLocalLog.createNewCleanedSegment(dir, logConfig, baseOffset)
}
// Visible for benchmarking

View File

@ -40,7 +40,7 @@ import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, FetchIsolation, FetchParams, LogAppendInfo, LogConfig, LogDirFailureChannel, LogSegments, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, FetchIsolation, FetchParams, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@ -319,12 +319,14 @@ class PartitionLockTest extends Logging {
mockTime.scheduler,
mockTime,
logDirFailureChannel,
hadCleanShutdown = true,
true,
segments,
0L,
0L,
leaderEpochCache.asJava,
producerStateManager
producerStateManager,
new ConcurrentHashMap[String, Integer],
false
).load()
val localLog = new LocalLog(log.dir, log.config, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, log.topicPartition,

View File

@ -44,7 +44,7 @@ import org.mockito.invocation.InvocationOnMock
import java.lang.{Long => JLong}
import java.nio.ByteBuffer
import java.util.Optional
import java.util.concurrent.{CountDownLatch, Semaphore}
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Semaphore}
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.compress.Compression
@ -61,7 +61,7 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, FetchIsolation, FetchParams, LogAppendInfo, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, FetchIsolation, FetchParams, LogAppendInfo, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@ -454,12 +454,14 @@ class PartitionTest extends AbstractPartitionTest {
mockTime.scheduler,
mockTime,
logDirFailureChannel,
hadCleanShutdown = true,
segments = segments,
logStartOffsetCheckpoint = 0L,
recoveryPointCheckpoint = 0L,
true,
segments,
0L,
0L,
leaderEpochCache.asJava,
producerStateManager
producerStateManager,
new ConcurrentHashMap[String, Integer],
false
).load()
val localLog = new LocalLog(log.dir, log.config, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, log.topicPartition,

View File

@ -20,6 +20,7 @@ package kafka.log
import java.io.File
import java.nio.channels.ClosedChannelException
import java.nio.charset.StandardCharsets
import java.util
import java.util.regex.Pattern
import java.util.Collections
import kafka.server.KafkaConfig
@ -30,7 +31,7 @@ import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.record.{MemoryRecords, Record, SimpleRecord}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.util.{MockTime, Scheduler}
import org.apache.kafka.storage.internals.log.{FetchDataInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogSegment, LogSegments}
import org.apache.kafka.storage.internals.log.{FetchDataInfo, LocalLog => JLocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogSegment, LogSegments}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@ -123,7 +124,7 @@ class LocalLogTest {
log.roll()
assertEquals(2, log.segments.numberOfSegments)
assertFalse(logDir.listFiles.isEmpty)
val segmentsBeforeDelete = log.segments.values.asScala.toVector
val segmentsBeforeDelete = new util.ArrayList(log.segments.values)
val deletedSegments = log.deleteAllSegments()
assertTrue(log.segments.isEmpty)
assertEquals(segmentsBeforeDelete, deletedSegments)
@ -138,7 +139,7 @@ class LocalLogTest {
assertEquals(1, log.segments.numberOfSegments)
assertNotEquals(oldActiveSegment, log.segments.activeSegment)
assertFalse(logDir.listFiles.isEmpty)
assertTrue(oldActiveSegment.hasSuffix(LocalLog.DeletedFileSuffix))
assertTrue(oldActiveSegment.hasSuffix(LogFileUtils.DELETED_FILE_SUFFIX))
}
@Test
@ -306,17 +307,17 @@ class LocalLogTest {
assertEquals(10L, log.segments.numberOfSegments)
val toDelete = log.segments.values.asScala.toVector
LocalLog.deleteSegmentFiles(toDelete, asyncDelete = asyncDelete, log.dir, log.topicPartition, log.config, log.scheduler, log.logDirFailureChannel, "")
val toDelete = log.segments.values
JLocalLog.deleteSegmentFiles(toDelete, asyncDelete, log.dir, log.topicPartition, log.config, log.scheduler, log.logDirFailureChannel, "")
if (asyncDelete) {
toDelete.foreach {
toDelete.forEach {
segment =>
assertFalse(segment.deleted())
assertTrue(segment.hasSuffix(LocalLog.DeletedFileSuffix))
assertTrue(segment.hasSuffix(LogFileUtils.DELETED_FILE_SUFFIX))
}
mockTime.sleep(log.config.fileDeleteDelayMs + 1)
}
toDelete.foreach(segment => assertTrue(segment.deleted()))
toDelete.forEach(segment => assertTrue(segment.deleted()))
}
@Test
@ -339,7 +340,7 @@ class LocalLogTest {
assertEquals(1, log.segments.numberOfSegments)
assertEquals(newActiveSegment, log.segments.activeSegment)
assertNotEquals(oldActiveSegment, log.segments.activeSegment)
assertTrue(oldActiveSegment.hasSuffix(LocalLog.DeletedFileSuffix))
assertTrue(oldActiveSegment.hasSuffix(LogFileUtils.DELETED_FILE_SUFFIX))
assertEquals(newOffset, log.segments.activeSegment.baseOffset)
assertEquals(0L, log.recoveryPoint)
assertEquals(newOffset, log.logEndOffset)

View File

@ -28,13 +28,14 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, LogDirFailureChannel, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, LogDirFailureChannel, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import java.lang.{Long => JLong}
import java.util
import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable
import scala.compat.java8.OptionConverters._
@ -119,12 +120,14 @@ class LogCleanerManagerTest extends Logging {
time.scheduler,
time,
logDirFailureChannel,
hadCleanShutdown = true,
true,
segments,
0L,
0L,
leaderEpochCache.asJava,
producerStateManager
producerStateManager,
new ConcurrentHashMap[String, Integer],
false
).load()
val localLog = new LocalLog(tpDir, config, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, time.scheduler, time, tp, logDirFailureChannel)

View File

@ -29,7 +29,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.utils.Throttler
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
@ -43,7 +43,7 @@ import java.nio._
import java.nio.charset.StandardCharsets
import java.nio.file.Paths
import java.util.Properties
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit}
import scala.collection._
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
@ -200,12 +200,14 @@ class LogCleanerTest extends Logging {
time.scheduler,
time,
logDirFailureChannel,
hadCleanShutdown = true,
true,
logSegments,
0L,
0L,
leaderEpochCache.asJava,
producerStateManager
producerStateManager,
new ConcurrentHashMap[String, Integer],
false
).load()
val localLog = new LocalLog(dir, config, logSegments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, time.scheduler, time, topicPartition, logDirFailureChannel)

View File

@ -17,10 +17,6 @@
package kafka.log
import java.io.{BufferedWriter, File, FileWriter, IOException}
import java.nio.ByteBuffer
import java.nio.file.{Files, NoSuchFileException, Paths}
import java.util.{Optional, OptionalLong, Properties}
import kafka.server.KafkaConfig
import kafka.server.metadata.MockConfigRepository
import kafka.utils.TestUtils
@ -35,7 +31,7 @@ import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0
import org.apache.kafka.server.util.{MockTime, Scheduler}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile}
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile}
import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
@ -47,9 +43,13 @@ import org.mockito.{ArgumentMatchers, Mockito}
import org.mockito.ArgumentMatchers.{any, anyLong}
import org.mockito.Mockito.{mock, reset, times, verify, when}
import java.io.{BufferedWriter, File, FileWriter, IOException}
import java.lang.{Long => JLong}
import java.nio.ByteBuffer
import java.nio.file.{Files, NoSuchFileException, Paths}
import java.util
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.{Optional, OptionalLong, Properties}
import scala.annotation.nowarn
import scala.collection.mutable.ListBuffer
import scala.collection.{Iterable, Map, mutable}
@ -136,7 +136,7 @@ class LogLoaderTest {
override def loadLog(logDir: File, hadCleanShutdown: Boolean, recoveryPoints: util.Map[TopicPartition, JLong],
logStartOffsets: util.Map[TopicPartition, JLong], defaultConfig: LogConfig,
topicConfigs: Map[String, LogConfig], numRemainingSegments: ConcurrentMap[String, Int],
topicConfigs: Map[String, LogConfig], numRemainingSegments: ConcurrentMap[String, Integer],
shouldBeStrayKraftLog: UnifiedLog => Boolean): UnifiedLog = {
if (simulateError.hasError) {
simulateError.errorType match {
@ -163,7 +163,7 @@ class LogLoaderTest {
this.maxTransactionTimeoutMs, this.producerStateManagerConfig, time)
val logLoader = new LogLoader(logDir, topicPartition, config, time.scheduler, time,
logDirFailureChannel, hadCleanShutdown, segments, logStartOffset, logRecoveryPoint,
leaderEpochCache.asJava, producerStateManager)
leaderEpochCache.asJava, producerStateManager, new ConcurrentHashMap[String, Integer], false)
val offsets = logLoader.load()
val localLog = new LocalLog(logDir, logConfig, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition,
@ -382,12 +382,15 @@ class LogLoaderTest {
mockTime.scheduler,
mockTime,
logDirFailureChannel,
hadCleanShutdown = false,
false,
interceptedLogSegments,
0L,
recoveryPoint,
leaderEpochCache.asJava,
producerStateManager)
producerStateManager,
new ConcurrentHashMap[String, Integer],
false
)
val offsets = logLoader.load()
val localLog = new LocalLog(logDir, logConfig, interceptedLogSegments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition,
@ -445,12 +448,14 @@ class LogLoaderTest {
mockTime.scheduler,
mockTime,
logDirFailureChannel,
hadCleanShutdown = false,
false,
segments,
0L,
0L,
leaderEpochCache.asJava,
stateManager
stateManager,
new ConcurrentHashMap[String, Integer],
false
).load()
val localLog = new LocalLog(logDir, config, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition,
@ -555,12 +560,14 @@ class LogLoaderTest {
mockTime.scheduler,
mockTime,
logDirFailureChannel,
hadCleanShutdown = false,
false,
segments,
0L,
0L,
leaderEpochCache.asJava,
stateManager
stateManager,
new ConcurrentHashMap[String, Integer],
false
).load()
val localLog = new LocalLog(logDir, config, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition,
@ -610,12 +617,14 @@ class LogLoaderTest {
mockTime.scheduler,
mockTime,
logDirFailureChannel,
hadCleanShutdown = true,
true,
segments,
0L,
0L,
leaderEpochCache.asJava,
stateManager
stateManager,
new ConcurrentHashMap[String, Integer],
false
).load()
val localLog = new LocalLog(logDir, config, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition,
@ -664,12 +673,14 @@ class LogLoaderTest {
mockTime.scheduler,
mockTime,
logDirFailureChannel,
hadCleanShutdown = true,
true,
segments,
0L,
0L,
leaderEpochCache.asJava,
stateManager
stateManager,
new ConcurrentHashMap[String, Integer],
false
).load()
val localLog = new LocalLog(logDir, config, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition,
@ -1811,13 +1822,14 @@ class LogLoaderTest {
mockTime.scheduler,
mockTime,
logDirFailureChannel,
hadCleanShutdown = true,
true,
segments,
0L,
0L,
leaderEpochCache.asJava,
stateManager,
isRemoteLogEnabled = isRemoteLogEnabled
new ConcurrentHashMap[String, Integer],
isRemoteLogEnabled
).load()
assertEquals(expectedLogStartOffset, offsets.logStartOffset)
}

View File

@ -264,7 +264,7 @@ class LogManagerTest {
invocation.callRealMethod().asInstanceOf[UnifiedLog]
loadLogCalled = loadLogCalled + 1
}.when(logManager).loadLog(any[File], any[Boolean], any[util.Map[TopicPartition, JLong]], any[util.Map[TopicPartition, JLong]],
any[LogConfig], any[Map[String, LogConfig]], any[ConcurrentMap[String, Int]], any[UnifiedLog => Boolean]())
any[LogConfig], any[Map[String, LogConfig]], any[ConcurrentMap[String, Integer]], any[UnifiedLog => Boolean]())
val t = new Thread() {
override def run(): Unit = { logManager.startup(Set.empty) }
@ -367,7 +367,7 @@ class LogManagerTest {
assertTrue(log.numberOfSegments > 1, "There should be more than one segment now.")
log.updateHighWatermark(log.logEndOffset)
log.logSegments.forEach(_.log.file.setLastModified(time.milliseconds))
log.logSegments.forEach(s => s.log.file.setLastModified(time.milliseconds))
time.sleep(maxLogAgeMs + 1)
assertEquals(1, log.numberOfSegments, "Now there should only be only one segment in the index.")
@ -466,7 +466,7 @@ class LogManagerTest {
val numSegments = log.numberOfSegments
assertTrue(log.numberOfSegments > 1, "There should be more than one segment now.")
log.logSegments.forEach(_.log.file.setLastModified(time.milliseconds))
log.logSegments.forEach(s => s.log.file.setLastModified(time.milliseconds))
time.sleep(maxLogAgeMs + 1)
assertEquals(numSegments, log.numberOfSegments, "number of segments shouldn't have changed")
@ -539,7 +539,7 @@ class LogManagerTest {
true
}
logManager.loadLog(log.dir, hadCleanShutdown = true, Collections.emptyMap[TopicPartition, JLong], Collections.emptyMap[TopicPartition, JLong], logConfig, Map.empty, new ConcurrentHashMap[String, Int](), providedIsStray)
logManager.loadLog(log.dir, hadCleanShutdown = true, Collections.emptyMap[TopicPartition, JLong], Collections.emptyMap[TopicPartition, JLong], logConfig, Map.empty, new ConcurrentHashMap[String, Integer](), providedIsStray)
assertEquals(1, invokedCount)
assertTrue(
logDir.listFiles().toSet
@ -874,7 +874,7 @@ class LogManagerTest {
private def verifyRemainingSegmentsToRecoverMetric(spyLogManager: LogManager,
logDirs: Seq[File],
recoveryThreadsPerDataDir: Int,
mockMap: ConcurrentHashMap[String, Int],
mockMap: ConcurrentHashMap[String, Integer],
expectedParams: Map[String, Int]): Unit = {
val logManagerClassName = classOf[LogManager].getSimpleName
// get all `remainingSegmentsToRecover` metrics
@ -942,7 +942,7 @@ class LogManagerTest {
assertEquals(2, spyLogManager.liveLogDirs.size)
val mockTime = new MockTime()
val mockMap = mock(classOf[ConcurrentHashMap[String, Int]])
val mockMap = mock(classOf[ConcurrentHashMap[String, Integer]])
val mockBrokerTopicStats = mock(classOf[BrokerTopicStats])
val expectedSegmentsPerLog = 2
@ -978,7 +978,7 @@ class LogManagerTest {
numRemainingSegments = mockMap)
}.when(spyLogManager).loadLog(any[File], any[Boolean], any[util.Map[TopicPartition, JLong]], any[util.Map[TopicPartition, JLong]],
any[LogConfig], any[Map[String, LogConfig]], any[ConcurrentMap[String, Int]], any[UnifiedLog => Boolean]())
any[LogConfig], any[Map[String, LogConfig]], any[ConcurrentMap[String, Integer]], any[UnifiedLog => Boolean]())
// do nothing for removeLogRecoveryMetrics for metrics verification
doNothing().when(spyLogManager).removeLogRecoveryMetrics()
@ -987,7 +987,7 @@ class LogManagerTest {
spyLogManager.startup(Set.empty)
// make sure log recovery metrics are added and removed
verify(spyLogManager, times(1)).addLogRecoveryMetrics(any[ConcurrentMap[String, Int]], any[ConcurrentMap[String, Int]])
verify(spyLogManager, times(1)).addLogRecoveryMetrics(any[ConcurrentMap[String, Int]], any[ConcurrentMap[String, Integer]])
verify(spyLogManager, times(1)).removeLogRecoveryMetrics()
// expected 1 log in each log dir since we created 2 partitions with 2 log dirs
@ -1016,7 +1016,7 @@ class LogManagerTest {
spyLogManager.startup(Set.empty)
// make sure log recovery metrics are added and removed once
verify(spyLogManager, times(1)).addLogRecoveryMetrics(any[ConcurrentMap[String, Int]], any[ConcurrentMap[String, Int]])
verify(spyLogManager, times(1)).addLogRecoveryMetrics(any[ConcurrentMap[String, Int]], any[ConcurrentMap[String, Integer]])
verify(spyLogManager, times(1)).removeLogRecoveryMetrics()
verifyLogRecoverMetricsRemoved(spyLogManager)

View File

@ -104,7 +104,7 @@ object LogTestUtils {
lastShutdownClean: Boolean = true,
topicId: Option[Uuid] = None,
keepPartitionMetadataFile: Boolean = true,
numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int],
numRemainingSegments: ConcurrentMap[String, Integer] = new ConcurrentHashMap[String, Integer],
remoteStorageSystemEnable: Boolean = false,
remoteLogManager: Option[RemoteLogManager] = None,
logOffsetsListener: LogOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER): UnifiedLog = {

View File

@ -4526,7 +4526,7 @@ class UnifiedLogTest {
logOffsetsListener: LogOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER): UnifiedLog = {
val log = LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, time, logStartOffset, recoveryPoint,
maxTransactionTimeoutMs, producerStateManagerConfig, producerIdExpirationCheckIntervalMs,
lastShutdownClean, topicId, keepPartitionMetadataFile, new ConcurrentHashMap[String, Int],
lastShutdownClean, topicId, keepPartitionMetadataFile, new ConcurrentHashMap[String, Integer],
remoteStorageSystemEnable, remoteLogManager, logOffsetsListener)
logsToClose = logsToClose :+ log
log

View File

@ -20,7 +20,7 @@ package kafka.server
import com.yammer.metrics.core.{Gauge, Meter, Timer}
import kafka.cluster.PartitionTest.MockPartitionListener
import kafka.cluster.{BrokerEndPoint, Partition}
import kafka.log._
import kafka.log.{LocalLog, LogManager, LogManagerTest, UnifiedLog}
import kafka.log.remote.RemoteLogManager
import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS
import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics
@ -67,7 +67,7 @@ import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.server.util.{MockScheduler, MockTime}
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, PartitionMetadataFile}
import org.apache.kafka.storage.internals.log._
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeEach, Test}
@ -85,7 +85,7 @@ import java.net.InetAddress
import java.nio.file.{Files, Paths}
import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
import java.util.concurrent.{Callable, CountDownLatch, TimeUnit}
import java.util.concurrent.{Callable, ConcurrentHashMap, CountDownLatch, TimeUnit}
import java.util.stream.IntStream
import java.util.{Collections, Optional, OptionalLong, Properties}
import scala.collection.{Map, Seq, mutable}
@ -2933,12 +2933,14 @@ class ReplicaManagerTest {
mockScheduler,
time,
mockLogDirFailureChannel,
hadCleanShutdown = true,
true,
segments,
0L,
0L,
leaderEpochCache.asJava,
producerStateManager
producerStateManager,
new ConcurrentHashMap[String, Integer],
false
).load()
val localLog = new LocalLog(logDir, logConfig, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockScheduler, time, tp, mockLogDirFailureChannel)

View File

@ -18,12 +18,12 @@ package kafka.utils
import java.util.Properties
import java.util.concurrent.atomic._
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
import kafka.log.{LocalLog, LogLoader, UnifiedLog}
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Executors, TimeUnit}
import kafka.log.{LocalLog, UnifiedLog}
import kafka.utils.TestUtils.retry
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.log.{LogConfig, LogDirFailureChannel, LogSegments, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.{LogConfig, LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout}
@ -150,12 +150,14 @@ class SchedulerTest {
scheduler,
mockTime,
logDirFailureChannel,
hadCleanShutdown = true,
true,
segments,
0L,
0L,
leaderEpochCache.asJava,
producerStateManager
producerStateManager,
new ConcurrentHashMap[String, Integer],
false
).load()
val localLog = new LocalLog(logDir, logConfig, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, scheduler, mockTime, topicPartition, logDirFailureChannel)

View File

@ -0,0 +1,330 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.record.FileLogInputStream;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.require;
import static org.apache.kafka.storage.internals.log.LogFileUtils.CLEANED_FILE_SUFFIX;
import static org.apache.kafka.storage.internals.log.LogFileUtils.DELETED_FILE_SUFFIX;
import static org.apache.kafka.storage.internals.log.LogFileUtils.SWAP_FILE_SUFFIX;
import static org.apache.kafka.storage.internals.log.LogFileUtils.isLogFile;
public class LocalLog {
private static final Logger LOG = LoggerFactory.getLogger(LocalLog.class);
/**
* Invokes the provided function and handles any IOException raised by the function by marking the
* provided directory offline.
*
* @param logDirFailureChannel Used to asynchronously handle log directory failure.
* @param logDir The log directory to be marked offline during an IOException.
* @param errorMsgSupplier The supplier for the error message to be used when marking the log directory offline.
* @param function The function to be executed.
* @return The value returned by the function after a successful invocation
*/
public static <T> T maybeHandleIOException(LogDirFailureChannel logDirFailureChannel,
String logDir,
Supplier<String> errorMsgSupplier,
StorageAction<T, IOException> function) {
if (logDirFailureChannel.hasOfflineLogDir(logDir)) {
throw new KafkaStorageException("The log dir " + logDir + " is already offline due to a previous IO exception.");
}
try {
return function.execute();
} catch (IOException ioe) {
String errorMsg = errorMsgSupplier.get();
logDirFailureChannel.maybeAddOfflineLogDir(logDir, errorMsg, ioe);
throw new KafkaStorageException(errorMsg, ioe);
}
}
/**
* Perform physical deletion of the index and log files for the given segment.
* Prior to the deletion, the index and log files are renamed by appending .deleted to the
* respective file name. Allows these files to be optionally deleted asynchronously.
* <br/>
* This method assumes that the file exists. It does not need to convert IOException
* (thrown from changeFileSuffixes) to KafkaStorageException because it is either called before
* all logs are loaded or the caller will catch and handle IOException.
*
* @param segmentsToDelete The segments to be deleted
* @param asyncDelete If true, the deletion of the segments is done asynchronously
* @param dir The directory in which the log will reside
* @param topicPartition The topic
* @param config The log configuration settings
* @param scheduler The thread pool scheduler used for background actions
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
* @param logPrefix The logging prefix
* @throws IOException if the file can't be renamed and still exists
*/
public static void deleteSegmentFiles(Collection<LogSegment> segmentsToDelete,
boolean asyncDelete,
File dir,
TopicPartition topicPartition,
LogConfig config,
Scheduler scheduler,
LogDirFailureChannel logDirFailureChannel,
String logPrefix) throws IOException {
for (LogSegment segment : segmentsToDelete) {
if (!segment.hasSuffix(DELETED_FILE_SUFFIX)) {
segment.changeFileSuffixes("", DELETED_FILE_SUFFIX);
}
}
Runnable deleteSegments = () -> {
LOG.info("{}Deleting segment files {}", logPrefix, segmentsToDelete.stream().map(LogSegment::toString).collect(Collectors.joining(", ")));
String parentDir = dir.getParent();
maybeHandleIOException(logDirFailureChannel, parentDir,
() -> "Error while deleting segments for " + topicPartition + " in dir " + parentDir,
() -> {
for (LogSegment segment : segmentsToDelete) {
segment.deleteIfExists();
}
return null;
});
};
if (asyncDelete) {
scheduler.scheduleOnce("delete-file", deleteSegments, config.fileDeleteDelayMs);
} else {
deleteSegments.run();
}
}
public static LogSegment createNewCleanedSegment(File dir, LogConfig logConfig, long baseOffset) throws IOException {
LogSegment.deleteIfExists(dir, baseOffset, CLEANED_FILE_SUFFIX);
return LogSegment.open(dir, baseOffset, logConfig, Time.SYSTEM, false, logConfig.initFileSize(), logConfig.preallocate, CLEANED_FILE_SUFFIX);
}
/**
* Split a segment into one or more segments such that there is no offset overflow in any of them. The
* resulting segments will contain the exact same messages that are present in the input segment. On successful
* completion of this method, the input segment will be deleted and will be replaced by the resulting new segments.
* See replaceSegments for recovery logic, in case the broker dies in the middle of this operation.
* <br/>
* Note that this method assumes we have already determined that the segment passed in contains records that cause
* offset overflow.
* <br/>
* The split logic overloads the use of .clean files that LogCleaner typically uses to make the process of replacing
* the input segment with multiple new segments atomic and recoverable in the event of a crash. See replaceSegments
* and completeSwapOperations for the implementation to make this operation recoverable on crashes.</p>
*
* @param segment Segment to split
* @param existingSegments The existing segments of the log
* @param dir The directory in which the log will reside
* @param topicPartition The topic
* @param config The log configuration settings
* @param scheduler The thread pool scheduler used for background actions
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
* @param logPrefix The logging prefix
* @return List of new segments that replace the input segment
*/
public static SplitSegmentResult splitOverflowedSegment(LogSegment segment,
LogSegments existingSegments,
File dir,
TopicPartition topicPartition,
LogConfig config,
Scheduler scheduler,
LogDirFailureChannel logDirFailureChannel,
String logPrefix) throws IOException {
require(isLogFile(segment.log().file()), "Cannot split file " + segment.log().file().getAbsoluteFile());
require(segment.hasOverflow(), "Split operation is only permitted for segments with overflow, and the problem path is " + segment.log().file().getAbsoluteFile());
LOG.info("{}Splitting overflowed segment {}", logPrefix, segment);
List<LogSegment> newSegments = new ArrayList<>();
try {
int position = 0;
FileRecords sourceRecords = segment.log();
while (position < sourceRecords.sizeInBytes()) {
FileLogInputStream.FileChannelRecordBatch firstBatch = sourceRecords.batchesFrom(position).iterator().next();
LogSegment newSegment = createNewCleanedSegment(dir, config, firstBatch.baseOffset());
newSegments.add(newSegment);
int bytesAppended = newSegment.appendFromFile(sourceRecords, position);
if (bytesAppended == 0)
throw new IllegalStateException("Failed to append records from position " + position + " in " + segment);
position += bytesAppended;
}
// prepare new segments
int totalSizeOfNewSegments = 0;
for (LogSegment splitSegment : newSegments) {
splitSegment.onBecomeInactiveSegment();
splitSegment.flush();
splitSegment.setLastModified(segment.lastModified());
totalSizeOfNewSegments += splitSegment.log().sizeInBytes();
}
// size of all the new segments combined must equal size of the original segment
if (totalSizeOfNewSegments != segment.log().sizeInBytes()) {
throw new IllegalStateException("Inconsistent segment sizes after split before: " + segment.log().sizeInBytes() + " after: " + totalSizeOfNewSegments);
}
// replace old segment with new ones
LOG.info("{}Replacing overflowed segment $segment with split segments {}", logPrefix, newSegments);
List<LogSegment> deletedSegments = replaceSegments(existingSegments, newSegments, Collections.singletonList(segment),
dir, topicPartition, config, scheduler, logDirFailureChannel, logPrefix, false);
return new SplitSegmentResult(deletedSegments, newSegments);
} catch (Exception e) {
for (LogSegment splitSegment : newSegments) {
splitSegment.close();
splitSegment.deleteIfExists();
}
throw e;
}
}
/**
* Swap one or more new segment in place and delete one or more existing segments in a crash-safe
* manner. The old segments will be asynchronously deleted.
* <br/>
* This method does not need to convert IOException to KafkaStorageException because it is either
* called before all logs are loaded or the caller will catch and handle IOException
* <br/>
* The sequence of operations is:
* <ol>
* <li>Cleaner creates one or more new segments with suffix .cleaned and invokes replaceSegments() on
* the Log instance. If broker crashes at this point, the clean-and-swap operation is aborted and
* the .cleaned files are deleted on recovery in LogLoader.
* <li>New segments are renamed .swap. If the broker crashes before all segments were renamed to .swap, the
* clean-and-swap operation is aborted - .cleaned as well as .swap files are deleted on recovery in
* LogLoader. We detect this situation by maintaining a specific order in which files are renamed
* from .cleaned to .swap. Basically, files are renamed in descending order of offsets. On recovery,
* all .swap files whose offset is greater than the minimum-offset .clean file are deleted.
* <li>If the broker crashes after all new segments were renamed to .swap, the operation is completed,
* the swap operation is resumed on recovery as described in the next step.
* <li>Old segment files are renamed to .deleted and asynchronous delete is scheduled. If the broker
* crashes, any .deleted files left behind are deleted on recovery in LogLoader.
* replaceSegments() is then invoked to complete the swap with newSegment recreated from the
* .swap file and oldSegments containing segments which were not renamed before the crash.
* <li>Swap segment(s) are renamed to replace the existing segments, completing this operation.
* If the broker crashes, any .deleted files which may be left behind are deleted
* on recovery in LogLoader.
* </ol>
*
* @param existingSegments The existing segments of the log
* @param newSegments The new log segment to add to the log
* @param oldSegments The old log segments to delete from the log
* @param dir The directory in which the log will reside
* @param topicPartition The topic
* @param config The log configuration settings
* @param scheduler The thread pool scheduler used for background actions
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
* @param logPrefix The logging prefix
* @param isRecoveredSwapFile true if the new segment was created from a swap file during recovery after a crash
*/
public static List<LogSegment> replaceSegments(LogSegments existingSegments,
List<LogSegment> newSegments,
List<LogSegment> oldSegments,
File dir,
TopicPartition topicPartition,
LogConfig config,
Scheduler scheduler,
LogDirFailureChannel logDirFailureChannel,
String logPrefix,
boolean isRecoveredSwapFile) throws IOException {
List<LogSegment> sortedNewSegments = new ArrayList<>(newSegments);
sortedNewSegments.sort(Comparator.comparingLong(LogSegment::baseOffset));
// Some old segments may have been removed from index and scheduled for async deletion after the caller reads segments
// but before this method is executed. We want to filter out those segments to avoid calling deleteSegmentFiles()
// multiple times for the same segment.
List<LogSegment> sortedOldSegments = oldSegments.stream()
.filter(seg -> existingSegments.contains(seg.baseOffset()))
.sorted(Comparator.comparingLong(LogSegment::baseOffset))
.collect(Collectors.toList());
// need to do this in two phases to be crash safe AND do the delete asynchronously
// if we crash in the middle of this we complete the swap in loadSegments()
List<LogSegment> reversedSegmentsList = new ArrayList<>(sortedNewSegments);
Collections.reverse(reversedSegmentsList);
for (LogSegment segment : reversedSegmentsList) {
if (!isRecoveredSwapFile) {
segment.changeFileSuffixes(CLEANED_FILE_SUFFIX, SWAP_FILE_SUFFIX);
}
existingSegments.add(segment);
}
Set<Long> newSegmentBaseOffsets = sortedNewSegments.stream().map(LogSegment::baseOffset).collect(Collectors.toSet());
// delete the old files
List<LogSegment> deletedNotReplaced = new ArrayList<>();
for (LogSegment segment : sortedOldSegments) {
// remove the index entry
if (segment.baseOffset() != sortedNewSegments.get(0).baseOffset()) {
existingSegments.remove(segment.baseOffset());
}
deleteSegmentFiles(
Collections.singletonList(segment),
true,
dir,
topicPartition,
config,
scheduler,
logDirFailureChannel,
logPrefix);
if (!newSegmentBaseOffsets.contains(segment.baseOffset())) {
deletedNotReplaced.add(segment);
}
}
// okay we are safe now, remove the swap suffix
for (LogSegment logSegment : sortedNewSegments) {
logSegment.changeFileSuffixes(SWAP_FILE_SUFFIX, "");
}
Utils.flushDir(dir.toPath());
return deletedNotReplaced;
}
public static class SplitSegmentResult {
public final List<LogSegment> deletedSegments;
public final List<LogSegment> newSegments;
/**
* Holds the result of splitting a segment into one or more segments, see LocalLog.splitOverflowedSegment().
*
* @param deletedSegments segments deleted when splitting a segment
* @param newSegments new segments created when splitting a segment
*/
public SplitSegmentResult(List<LogSegment> deletedSegments, List<LogSegment> newSegments) {
this.deletedSegments = deletedSegments;
this.newSegments = newSegments;
}
}
}

View File

@ -51,6 +51,15 @@ public final class LogFileUtils {
*/
public static final String TXN_INDEX_FILE_SUFFIX = ".txnindex";
/** Suffix of a temporary file that is being used for log cleaning */
public static final String CLEANED_FILE_SUFFIX = ".cleaned";
/** Suffix of a temporary file used when swapping files into the log */
public static final String SWAP_FILE_SUFFIX = ".swap";
/** Suffix of a directory that is scheduled to be deleted */
public static final String DELETE_DIR_SUFFIX = "-delete";
private LogFileUtils() {
}
@ -187,4 +196,13 @@ public final class LogFileUtils {
return offsetFromFileName(file.getName());
}
public static boolean isLogFile(File file) {
return file.getPath().endsWith(LOG_FILE_SUFFIX);
}
public static boolean isIndexFile(File file) {
String fileName = file.getName();
return fileName.endsWith(INDEX_FILE_SUFFIX) || fileName.endsWith(TIME_INDEX_FILE_SUFFIX) || fileName.endsWith(TXN_INDEX_FILE_SUFFIX);
}
}

View File

@ -0,0 +1,565 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidOffsetException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
public class LogLoader {
private static final String SNAPSHOT_DELETE_SUFFIX = ".checkpoint.deleted";
private final File dir;
private final TopicPartition topicPartition;
private final LogConfig config;
private final Scheduler scheduler;
private final Time time;
private final LogDirFailureChannel logDirFailureChannel;
private final boolean hadCleanShutdown;
private final LogSegments segments;
private final long logStartOffsetCheckpoint;
private final long recoveryPointCheckpoint;
private final Optional<LeaderEpochFileCache> leaderEpochCache;
private final ProducerStateManager producerStateManager;
private final ConcurrentMap<String, Integer> numRemainingSegments;
private final boolean isRemoteLogEnabled;
private final Logger logger;
private final String logPrefix;
/**
* @param dir The directory from which log segments need to be loaded
* @param topicPartition The topic partition associated with the log being loaded
* @param config The configuration settings for the log being loaded
* @param scheduler The thread pool scheduler used for background actions
* @param time The time instance used for checking the clock
* @param logDirFailureChannel The {@link LogDirFailureChannel} instance to asynchronously handle log directory failure
* @param hadCleanShutdown Boolean flag to indicate whether the associated log previously had a clean shutdown
* @param segments The {@link LogSegments} instance into which segments recovered from disk will be populated
* @param logStartOffsetCheckpoint The checkpoint of the log start offset
* @param recoveryPointCheckpoint The checkpoint of the offset at which to begin the recovery
* @param leaderEpochCache An optional {@link LeaderEpochFileCache} instance to be updated during recovery
* @param producerStateManager The {@link ProducerStateManager} instance to be updated during recovery
* @param numRemainingSegments The remaining segments to be recovered in this log keyed by recovery thread name
* @param isRemoteLogEnabled Boolean flag to indicate whether the remote storage is enabled or not
*/
public LogLoader(
File dir,
TopicPartition topicPartition,
LogConfig config,
Scheduler scheduler,
Time time,
LogDirFailureChannel logDirFailureChannel,
boolean hadCleanShutdown,
LogSegments segments,
long logStartOffsetCheckpoint,
long recoveryPointCheckpoint,
Optional<LeaderEpochFileCache> leaderEpochCache,
ProducerStateManager producerStateManager,
ConcurrentMap<String, Integer> numRemainingSegments,
boolean isRemoteLogEnabled) {
this.dir = dir;
this.topicPartition = topicPartition;
this.config = config;
this.scheduler = scheduler;
this.time = time;
this.logDirFailureChannel = logDirFailureChannel;
this.hadCleanShutdown = hadCleanShutdown;
this.segments = segments;
this.logStartOffsetCheckpoint = logStartOffsetCheckpoint;
this.recoveryPointCheckpoint = recoveryPointCheckpoint;
this.leaderEpochCache = leaderEpochCache;
this.producerStateManager = producerStateManager;
this.numRemainingSegments = numRemainingSegments;
this.isRemoteLogEnabled = isRemoteLogEnabled;
this.logPrefix = "[LogLoader partition=" + topicPartition + ", dir=" + dir.getParent() + "] ";
this.logger = new LogContext(logPrefix).logger(LogLoader.class);
}
/**
* Load the log segments from the log files on disk, and returns the components of the loaded log.
* Additionally, it also suitably updates the provided {@link LeaderEpochFileCache} and
* {@link ProducerStateManager} to reflect the contents of the loaded log.
* <br/>
* In the context of the calling thread, this function does not need to convert {@link IOException} to
* {@link KafkaStorageException} because it is only called before all logs are loaded.
*
* @return the offsets of the Log successfully loaded from disk
*
* @throws LogSegmentOffsetOverflowException if we encounter a .swap file with messages that
* overflow index offset
*/
public LoadedLogOffsets load() throws IOException {
// First pass: through the files in the log directory and remove any temporary files
// and find any interrupted swap operations
Set<File> swapFiles = removeTempFilesAndCollectSwapFiles();
// The remaining valid swap files must come from compaction or segment split operation. We can
// simply rename them to regular segment files. But, before renaming, we should figure out which
// segments are compacted/split and delete these segment files: this is done by calculating
// min/maxSwapFileOffset.
// We store segments that require renaming in this code block, and do the actual renaming later.
long minSwapFileOffset = Long.MAX_VALUE;
long maxSwapFileOffset = Long.MIN_VALUE;
for (File swapFile : swapFiles) {
if (!LogFileUtils.isLogFile(new File(Utils.replaceSuffix(swapFile.getPath(), LogFileUtils.SWAP_FILE_SUFFIX, "")))) {
continue;
}
long baseOffset = LogFileUtils.offsetFromFile(swapFile);
LogSegment segment = LogSegment.open(swapFile.getParentFile(),
baseOffset,
config,
time,
false,
0,
false,
LogFileUtils.SWAP_FILE_SUFFIX);
logger.info("Found log file {} from interrupted swap operation, which is recoverable from {} files by renaming.", swapFile.getPath(), LogFileUtils.SWAP_FILE_SUFFIX);
minSwapFileOffset = Math.min(segment.baseOffset(), minSwapFileOffset);
maxSwapFileOffset = Math.max(segment.readNextOffset(), maxSwapFileOffset);
}
// Second pass: delete segments that are between minSwapFileOffset and maxSwapFileOffset. As
// discussed above, these segments were compacted or split but haven't been renamed to .delete
// before shutting down the broker.
File[] files = dir.listFiles();
if (files == null) files = new File[0];
for (File file : files) {
if (!file.isFile()) {
continue;
}
try {
if (!file.getName().endsWith(LogFileUtils.SWAP_FILE_SUFFIX)) {
long offset = LogFileUtils.offsetFromFile(file);
if (offset >= minSwapFileOffset && offset < maxSwapFileOffset) {
logger.info("Deleting segment files {} that is compacted but has not been deleted yet.", file.getName());
boolean ignore = file.delete();
}
}
} catch (StringIndexOutOfBoundsException | NumberFormatException e) {
// ignore offsetFromFile with files that do not include an offset in the file name
}
}
// Third pass: rename all swap files.
files = dir.listFiles();
if (files == null) files = new File[0];
for (File file : files) {
if (!file.isFile()) {
continue;
}
if (file.getName().endsWith(LogFileUtils.SWAP_FILE_SUFFIX)) {
logger.info("Recovering file {} by renaming from {} files.", file.getName(), LogFileUtils.SWAP_FILE_SUFFIX);
boolean ignore = file.renameTo(new File(Utils.replaceSuffix(file.getPath(), LogFileUtils.SWAP_FILE_SUFFIX, "")));
}
}
// Fourth pass: load all the log and index files.
// We might encounter legacy log segments with offset overflow (KAFKA-6264). We need to split such segments. When
// this happens, restart loading segment files from scratch.
retryOnOffsetOverflow(() -> {
// In case we encounter a segment with offset overflow, the retry logic will split it after which we need to retry
// loading of segments. In that case, we also need to close all segments that could have been left open in previous
// call to loadSegmentFiles().
segments.close();
segments.clear();
loadSegmentFiles();
return null;
});
RecoveryOffsets recoveryOffsets;
if (!dir.getAbsolutePath().endsWith(LogFileUtils.DELETE_DIR_SUFFIX)) {
recoveryOffsets = retryOnOffsetOverflow(this::recoverLog);
// reset the index size of the currently active log segment to allow more entries
segments.lastSegment().get().resizeIndexes(config.maxIndexSize);
} else {
if (segments.isEmpty()) {
segments.add(LogSegment.open(dir, 0, config, time, config.initFileSize(), false));
}
recoveryOffsets = new RecoveryOffsets(0L, 0L);
}
leaderEpochCache.ifPresent(lec -> lec.truncateFromEndAsyncFlush(recoveryOffsets.nextOffset));
long newLogStartOffset = isRemoteLogEnabled
? logStartOffsetCheckpoint
: Math.max(logStartOffsetCheckpoint, segments.firstSegment().get().baseOffset());
// The earliest leader epoch may not be flushed during a hard failure. Recover it here.
leaderEpochCache.ifPresent(lec -> lec.truncateFromStartAsyncFlush(logStartOffsetCheckpoint));
// Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
// from scratch.
if (!producerStateManager.isEmpty()) {
throw new IllegalStateException("Producer state must be empty during log initialization");
}
// Reload all snapshots into the ProducerStateManager cache, the intermediate ProducerStateManager used
// during log recovery may have deleted some files without the LogLoader.producerStateManager instance witnessing the
// deletion.
producerStateManager.removeStraySnapshots(segments.baseOffsets());
UnifiedLog.rebuildProducerState(
producerStateManager,
segments,
newLogStartOffset,
recoveryOffsets.nextOffset,
config.recordVersion(),
time,
hadCleanShutdown,
logPrefix);
LogSegment activeSegment = segments.lastSegment().get();
return new LoadedLogOffsets(
newLogStartOffset,
recoveryOffsets.newRecoveryPoint,
new LogOffsetMetadata(recoveryOffsets.nextOffset, activeSegment.baseOffset(), activeSegment.size()));
}
/**
* Removes any temporary files found in log directory, and creates a list of all .swap files which could be swapped
* in place of existing segment(s). For log splitting, we know that any .swap file whose base offset is higher than
* the smallest offset .clean file could be part of an incomplete split operation. Such .swap files are also deleted
* by this method.
*
* @return Set of .swap files that are valid to be swapped in as segment files and index files
*/
private Set<File> removeTempFilesAndCollectSwapFiles() throws IOException {
Set<File> swapFiles = new HashSet<>();
Set<File> cleanedFiles = new HashSet<>();
long minCleanedFileOffset = Long.MAX_VALUE;
File[] files = dir.listFiles();
if (files == null) files = new File[0];
for (File file : files) {
if (!file.isFile()) {
continue;
}
if (!file.canRead()) {
throw new IOException("Could not read file " + file);
}
String filename = file.getName();
// Delete stray files marked for deletion, but skip KRaft snapshots.
// These are handled in the recovery logic in `KafkaMetadataLog`.
if (filename.endsWith(LogFileUtils.DELETED_FILE_SUFFIX) && !filename.endsWith(SNAPSHOT_DELETE_SUFFIX)) {
logger.debug("Deleting stray temporary file {}", file.getAbsolutePath());
Files.deleteIfExists(file.toPath());
} else if (filename.endsWith(LogFileUtils.CLEANED_FILE_SUFFIX)) {
minCleanedFileOffset = Math.min(LogFileUtils.offsetFromFile(file), minCleanedFileOffset);
cleanedFiles.add(file);
} else if (filename.endsWith(LogFileUtils.SWAP_FILE_SUFFIX)) {
swapFiles.add(file);
}
}
// KAFKA-6264: Delete all .swap files whose base offset is greater than the minimum .cleaned segment offset. Such .swap
// files could be part of an incomplete split operation that could not complete. See LocalLog#splitOverflowedSegment
// for more details about the split operation.
Set<File> invalidSwapFiles = new HashSet<>();
Set<File> validSwapFiles = new HashSet<>();
for (File file : swapFiles) {
if (LogFileUtils.offsetFromFile(file) >= minCleanedFileOffset) {
invalidSwapFiles.add(file);
} else {
validSwapFiles.add(file);
}
}
for (File file : invalidSwapFiles) {
logger.debug("Deleting invalid swap file {} minCleanedFileOffset: {}", file.getAbsoluteFile(), minCleanedFileOffset);
Files.deleteIfExists(file.toPath());
}
// Now that we have deleted all .swap files that constitute an incomplete split operation, let's delete all .clean files
for (File file : cleanedFiles) {
logger.debug("Deleting stray .clean file {}", file.getAbsolutePath());
Files.deleteIfExists(file.toPath());
}
return validSwapFiles;
}
/**
* Retries the provided function only whenever an LogSegmentOffsetOverflowException is raised by
* it during execution. Before every retry, the overflowed segment is split into one or more segments
* such that there is no offset overflow in any of them.
*
* @param function The function to be executed
* @return The value returned by the function, if successful
* @throws IllegalStateException whenever the executed function throws any exception other than
* LogSegmentOffsetOverflowException, the same exception is raised to the caller
*/
private <T> T retryOnOffsetOverflow(StorageAction<T, IOException> function) throws IOException {
while (true) {
try {
return function.execute();
} catch (LogSegmentOffsetOverflowException lsooe) {
logger.info("Caught segment overflow error: {}. Split segment and retry.", lsooe.getMessage());
LocalLog.SplitSegmentResult result = LocalLog.splitOverflowedSegment(
lsooe.segment,
segments,
dir,
topicPartition,
config,
scheduler,
logDirFailureChannel,
logPrefix);
deleteProducerSnapshotsAsync(result.deletedSegments);
}
}
}
/**
* Loads segments from disk.
* <br/>
* This method does not need to convert {@link IOException} to {@link KafkaStorageException} because it is only
* called before all logs are loaded. It is possible that we encounter a segment with index offset overflow in
* which case the {@link LogSegmentOffsetOverflowException} will be thrown. Note that any segments that were opened
* before we encountered the exception will remain open and the caller is responsible for closing them
* appropriately, if needed.
*
* @throws LogSegmentOffsetOverflowException if the log directory contains a segment with messages that overflow the index offset
*/
private void loadSegmentFiles() throws IOException {
// load segments in ascending order because transactional data from one segment may depend on the
// segments that come before it
File[] files = dir.listFiles();
if (files == null) files = new File[0];
List<File> sortedFiles = Arrays.stream(files).filter(File::isFile).sorted().collect(Collectors.toList());
for (File file : sortedFiles) {
if (LogFileUtils.isIndexFile(file)) {
// if it is an index file, make sure it has a corresponding .log file
long offset = LogFileUtils.offsetFromFile(file);
File logFile = LogFileUtils.logFile(dir, offset);
if (!logFile.exists()) {
logger.warn("Found an orphaned index file {}, with no corresponding log file.", file.getAbsolutePath());
Files.deleteIfExists(file.toPath());
}
} else if (LogFileUtils.isLogFile(file)) {
// if it's a log file, load the corresponding log segment
long baseOffset = LogFileUtils.offsetFromFile(file);
boolean timeIndexFileNewlyCreated = !LogFileUtils.timeIndexFile(dir, baseOffset).exists();
LogSegment segment = LogSegment.open(dir, baseOffset, config, time, true, 0, false, "");
try {
segment.sanityCheck(timeIndexFileNewlyCreated);
} catch (NoSuchFileException nsfe) {
if (hadCleanShutdown || segment.baseOffset() < recoveryPointCheckpoint) {
logger.error("Could not find offset index file corresponding to log file {}, recovering segment and rebuilding index files...", segment.log().file().getAbsolutePath());
}
recoverSegment(segment);
} catch (CorruptIndexException cie) {
logger.warn("Found a corrupted index file corresponding to log file {} due to {}, recovering segment and rebuilding index files...", segment.log().file().getAbsolutePath(), cie.getMessage());
recoverSegment(segment);
}
segments.add(segment);
}
}
}
/**
* Just recovers the given segment.
*
* @param segment The {@link LogSegment} to recover
* @return The number of bytes truncated from the segment
* @throws LogSegmentOffsetOverflowException if the segment contains messages that cause index offset overflow
*/
private int recoverSegment(LogSegment segment) throws IOException {
ProducerStateManager producerStateManager = new ProducerStateManager(
topicPartition,
dir,
this.producerStateManager.maxTransactionTimeoutMs(),
this.producerStateManager.producerStateManagerConfig(),
time);
UnifiedLog.rebuildProducerState(
producerStateManager,
segments,
logStartOffsetCheckpoint,
segment.baseOffset(),
config.recordVersion(),
time,
false,
logPrefix);
int bytesTruncated = segment.recover(producerStateManager, leaderEpochCache);
// once we have recovered the segment's data, take a snapshot to ensure that we won't
// need to reload the same segment again while recovering another segment.
producerStateManager.takeSnapshot();
return bytesTruncated;
}
/** return the log end offset if valid */
private Optional<Long> deleteSegmentsIfLogStartGreaterThanLogEnd() throws IOException {
if (segments.nonEmpty()) {
long logEndOffset = segments.lastSegment().get().readNextOffset();
if (logEndOffset >= logStartOffsetCheckpoint) {
return Optional.of(logEndOffset);
} else {
logger.warn("Deleting all segments because logEndOffset ({}) " +
"is smaller than logStartOffset {}. " +
"This could happen if segment files were deleted from the file system.", logEndOffset, logStartOffsetCheckpoint);
removeAndDeleteSegmentsAsync(segments.values());
leaderEpochCache.ifPresent(LeaderEpochFileCache::clearAndFlush);
producerStateManager.truncateFullyAndStartAt(logStartOffsetCheckpoint);
return Optional.empty();
}
}
return Optional.empty();
}
static class RecoveryOffsets {
final long newRecoveryPoint;
final long nextOffset;
RecoveryOffsets(long newRecoveryPoint, long nextOffset) {
this.newRecoveryPoint = newRecoveryPoint;
this.nextOffset = nextOffset;
}
}
/**
* Recover the log segments (if there was an unclean shutdown). Ensures there is at least one
* active segment, and returns the updated recovery point and next offset after recovery. Along
* the way, the method suitably updates the {@link LeaderEpochFileCache} or {@link ProducerStateManager}
* inside the provided LogComponents.
* <br/>
* This method does not need to convert {@link IOException} to {@link KafkaStorageException} because it is only
* called before all logs are loaded.
*
* @return a {@link RecoveryOffsets} instance containing newRecoveryPoint and nextOffset.
* @throws LogSegmentOffsetOverflowException if we encountered a legacy segment with offset overflow
*/
RecoveryOffsets recoverLog() throws IOException {
// If we have the clean shutdown marker, skip recovery.
if (!hadCleanShutdown) {
Collection<LogSegment> unflushed = segments.values(recoveryPointCheckpoint, Long.MAX_VALUE);
int numUnflushed = unflushed.size();
Iterator<LogSegment> unflushedIter = unflushed.iterator();
boolean truncated = false;
int numFlushed = 0;
String threadName = Thread.currentThread().getName();
numRemainingSegments.put(threadName, numUnflushed);
while (unflushedIter.hasNext() && !truncated) {
LogSegment segment = unflushedIter.next();
logger.info("Recovering unflushed segment {}. {} recovered for {}.", segment.baseOffset(), numFlushed / numUnflushed, topicPartition);
int truncatedBytes;
try {
truncatedBytes = recoverSegment(segment);
} catch (InvalidOffsetException | IOException ioe) {
long startOffset = segment.baseOffset();
logger.warn("Found invalid offset during recovery. Deleting the corrupt segment and creating an empty one with starting offset {}", startOffset);
truncatedBytes = segment.truncateTo(startOffset);
}
if (truncatedBytes > 0) {
// we had an invalid message, delete all remaining log
logger.warn("Corruption found in segment {}, truncating to offset {}", segment.baseOffset(), segment.readNextOffset());
Collection<LogSegment> unflushedRemaining = new ArrayList<>();
unflushedIter.forEachRemaining(unflushedRemaining::add);
removeAndDeleteSegmentsAsync(unflushedRemaining);
truncated = true;
// segment is truncated, so set remaining segments to 0
numRemainingSegments.put(threadName, 0);
} else {
numFlushed += 1;
numRemainingSegments.put(threadName, numUnflushed - numFlushed);
}
}
}
Optional<Long> logEndOffsetOptional = deleteSegmentsIfLogStartGreaterThanLogEnd();
if (segments.isEmpty()) {
// no existing segments, create a new mutable segment beginning at logStartOffset
segments.add(LogSegment.open(dir, logStartOffsetCheckpoint, config, time, config.initFileSize(), config.preallocate));
}
// Update the recovery point if there was a clean shutdown and did not perform any changes to
// the segment. Otherwise, we just ensure that the recovery point is not ahead of the log end
// offset. To ensure correctness and to make it easier to reason about, it's best to only advance
// the recovery point when the log is flushed. If we advanced the recovery point here, we could
// skip recovery for unflushed segments if the broker crashed after we checkpoint the recovery
// point and before we flush the segment.
if (hadCleanShutdown && logEndOffsetOptional.isPresent()) {
return new RecoveryOffsets(logEndOffsetOptional.get(), logEndOffsetOptional.get());
} else {
long logEndOffset = logEndOffsetOptional.orElse(segments.lastSegment().get().readNextOffset());
return new RecoveryOffsets(Math.min(recoveryPointCheckpoint, logEndOffset), logEndOffset);
}
}
/**
* This method deletes the given log segments and the associated producer snapshots, by doing the
* following for each of them:
* <ul>
* <li>It removes the segment from the segment map so that it will no longer be used for reads.
* <li>It schedules asynchronous deletion of the segments that allows reads to happen concurrently without
* synchronization and without the possibility of physically deleting a file while it is being
* read.
* </ul>
* This method does not need to convert {@link IOException} to {@link KafkaStorageException} because it is either
* called before all logs are loaded or the immediate caller will catch and handle IOException
*
* @param segmentsToDelete The log segments to schedule for deletion
*/
private void removeAndDeleteSegmentsAsync(Collection<LogSegment> segmentsToDelete) throws IOException {
if (!segmentsToDelete.isEmpty()) {
List<LogSegment> toDelete = new ArrayList<>(segmentsToDelete);
logger.info("Deleting segments as part of log recovery: {}", toDelete.stream().map(LogSegment::toString).collect(Collectors.joining(", ")));
toDelete.forEach(segment -> segments.remove(segment.baseOffset()));
LocalLog.deleteSegmentFiles(
toDelete,
true,
dir,
topicPartition,
config,
scheduler,
logDirFailureChannel,
logPrefix);
deleteProducerSnapshotsAsync(segmentsToDelete);
}
}
private void deleteProducerSnapshotsAsync(Collection<LogSegment> segments) throws IOException {
UnifiedLog.deleteProducerSnapshots(
segments,
producerStateManager,
true,
scheduler,
config,
logDirFailureChannel,
dir.getParent(),
topicPartition);
}
}

View File

@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.RecordVersion;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class UnifiedLog {
private static final Logger LOG = LoggerFactory.getLogger(UnifiedLog.class);
/**
* Rebuilds producer state until the provided lastOffset. This function may be called from the
* recovery code path, and thus must be free of all side effects, i.e. it must not update any
* log-specific state.
*
* @param producerStateManager The {@link ProducerStateManager} instance to be rebuilt.
* @param segments The segments of the log whose producer state is being rebuilt
* @param logStartOffset The log start offset
* @param lastOffset The last offset upto which the producer state needs to be rebuilt
* @param recordVersion The record version
* @param time The time instance used for checking the clock
* @param reloadFromCleanShutdown True if the producer state is being built after a clean shutdown, false otherwise.
* @param logPrefix The logging prefix
*/
public static void rebuildProducerState(ProducerStateManager producerStateManager,
LogSegments segments,
long logStartOffset,
long lastOffset,
RecordVersion recordVersion,
Time time,
boolean reloadFromCleanShutdown,
String logPrefix) throws IOException {
List<Optional<Long>> offsetsToSnapshot = new ArrayList<>();
if (segments.nonEmpty()) {
long lastSegmentBaseOffset = segments.lastSegment().get().baseOffset();
Optional<LogSegment> lowerSegment = segments.lowerSegment(lastSegmentBaseOffset);
Optional<Long> nextLatestSegmentBaseOffset = lowerSegment.map(LogSegment::baseOffset);
offsetsToSnapshot.add(nextLatestSegmentBaseOffset);
offsetsToSnapshot.add(Optional.of(lastSegmentBaseOffset));
}
offsetsToSnapshot.add(Optional.of(lastOffset));
LOG.info("{}Loading producer state till offset {} with message format version {}", logPrefix, lastOffset, recordVersion.value);
// We want to avoid unnecessary scanning of the log to build the producer state when the broker is being
// upgraded. The basic idea is to use the absence of producer snapshot files to detect the upgrade case,
// but we have to be careful not to assume too much in the presence of broker failures. The two most common
// upgrade cases in which we expect to find no snapshots are the following:
//
// 1. The broker has been upgraded, but the topic is still on the old message format.
// 2. The broker has been upgraded, the topic is on the new message format, and we had a clean shutdown.
//
// If we hit either of these cases, we skip producer state loading and write a new snapshot at the log end
// offset (see below). The next time the log is reloaded, we will load producer state using this snapshot
// (or later snapshots). Otherwise, if there is no snapshot file, then we have to rebuild producer state
// from the first segment.
if (recordVersion.value < RecordBatch.MAGIC_VALUE_V2 ||
(!producerStateManager.latestSnapshotOffset().isPresent() && reloadFromCleanShutdown)) {
// To avoid an expensive scan through all the segments, we take empty snapshots from the start of the
// last two segments and the last offset. This should avoid the full scan in the case that the log needs
// truncation.
for (Optional<Long> offset : offsetsToSnapshot) {
if (offset.isPresent()) {
producerStateManager.updateMapEndOffset(offset.get());
producerStateManager.takeSnapshot();
}
}
} else {
LOG.info("{}Reloading from producer snapshot and rebuilding producer state from offset {}", logPrefix, lastOffset);
boolean isEmptyBeforeTruncation = producerStateManager.isEmpty() && producerStateManager.mapEndOffset() >= lastOffset;
long producerStateLoadStart = time.milliseconds();
producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds());
long segmentRecoveryStart = time.milliseconds();
// Only do the potentially expensive reloading if the last snapshot offset is lower than the log end
// offset (which would be the case on first startup) and there were active producers prior to truncation
// (which could be the case if truncating after initial loading). If there weren't, then truncating
// shouldn't change that fact (although it could cause a producerId to expire earlier than expected),
// and we can skip the loading. This is an optimization for users which are not yet using
// idempotent/transactional features yet.
if (lastOffset > producerStateManager.mapEndOffset() && !isEmptyBeforeTruncation) {
Optional<LogSegment> segmentOfLastOffset = segments.floorSegment(lastOffset);
for (LogSegment segment : segments.values(producerStateManager.mapEndOffset(), lastOffset)) {
long startOffset = Utils.max(segment.baseOffset(), producerStateManager.mapEndOffset(), logStartOffset);
producerStateManager.updateMapEndOffset(startOffset);
if (offsetsToSnapshot.contains(Optional.of(segment.baseOffset()))) {
producerStateManager.takeSnapshot();
}
int maxPosition = segment.size();
if (segmentOfLastOffset.isPresent() && segmentOfLastOffset.get() == segment) {
FileRecords.LogOffsetPosition lop = segment.translateOffset(lastOffset);
maxPosition = lop != null ? lop.position : segment.size();
}
FetchDataInfo fetchDataInfo = segment.read(startOffset, Integer.MAX_VALUE, maxPosition);
if (fetchDataInfo != null) {
loadProducersFromRecords(producerStateManager, fetchDataInfo.records);
}
}
}
producerStateManager.updateMapEndOffset(lastOffset);
producerStateManager.takeSnapshot();
LOG.info(logPrefix + "Producer state recovery took " + (segmentRecoveryStart - producerStateLoadStart) + "ms for snapshot load " +
"and " + (time.milliseconds() - segmentRecoveryStart) + "ms for segment recovery from offset " + lastOffset);
}
}
public static void deleteProducerSnapshots(Collection<LogSegment> segments,
ProducerStateManager producerStateManager,
boolean asyncDelete,
Scheduler scheduler,
LogConfig config,
LogDirFailureChannel logDirFailureChannel,
String parentDir,
TopicPartition topicPartition) throws IOException {
List<SnapshotFile> snapshotsToDelete = new ArrayList<>();
for (LogSegment segment : segments) {
Optional<SnapshotFile> snapshotFile = producerStateManager.removeAndMarkSnapshotForDeletion(segment.baseOffset());
snapshotFile.ifPresent(snapshotsToDelete::add);
}
Runnable deleteProducerSnapshots = () -> deleteProducerSnapshots(snapshotsToDelete, logDirFailureChannel, parentDir, topicPartition);
if (asyncDelete) {
scheduler.scheduleOnce("delete-producer-snapshot", deleteProducerSnapshots, config.fileDeleteDelayMs);
} else {
deleteProducerSnapshots.run();
}
}
private static void deleteProducerSnapshots(List<SnapshotFile> snapshotsToDelete, LogDirFailureChannel logDirFailureChannel, String parentDir, TopicPartition topicPartition) {
LocalLog.maybeHandleIOException(
logDirFailureChannel,
parentDir,
() -> "Error while deleting producer state snapshots for " + topicPartition + " in dir " + parentDir,
() -> {
for (SnapshotFile snapshotFile : snapshotsToDelete) {
snapshotFile.deleteIfExists();
}
return null;
});
}
private static void loadProducersFromRecords(ProducerStateManager producerStateManager, Records records) {
Map<Long, ProducerAppendInfo> loadedProducers = new HashMap<>();
final List<CompletedTxn> completedTxns = new ArrayList<>();
records.batches().forEach(batch -> {
if (batch.hasProducerId()) {
Optional<CompletedTxn> maybeCompletedTxn = updateProducers(
producerStateManager,
batch,
loadedProducers,
Optional.empty(),
AppendOrigin.REPLICATION);
maybeCompletedTxn.ifPresent(completedTxns::add);
}
});
loadedProducers.values().forEach(producerStateManager::update);
completedTxns.forEach(producerStateManager::completeTxn);
}
public static Optional<CompletedTxn> updateProducers(ProducerStateManager producerStateManager,
RecordBatch batch,
Map<Long, ProducerAppendInfo> producers,
Optional<LogOffsetMetadata> firstOffsetMetadata,
AppendOrigin origin) {
long producerId = batch.producerId();
ProducerAppendInfo appendInfo = producers.computeIfAbsent(producerId, __ -> producerStateManager.prepareUpdate(producerId, origin));
Optional<CompletedTxn> completedTxn = appendInfo.append(batch, firstOffsetMetadata);
// Whether we wrote a control marker or a data batch, we can remove VerificationGuard since either the transaction is complete or we have a first offset.
if (batch.isTransactional()) {
producerStateManager.clearVerificationStateEntry(producerId);
}
return completedTxn;
}
}