KAFKA-14483 Move LocalLog to storage module (#17587)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Mickael Maison 2024-10-28 13:41:46 +01:00 committed by GitHub
parent 12a60b8cd9
commit 6e88c10ed5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 929 additions and 818 deletions

View File

@ -344,7 +344,7 @@
<suppress checks="CyclomaticComplexity" <suppress checks="CyclomaticComplexity"
files="(LogLoader|LogValidator|RemoteLogManagerConfig|RemoteLogManager).java"/> files="(LogLoader|LogValidator|RemoteLogManagerConfig|RemoteLogManager).java"/>
<suppress checks="NPathComplexity" <suppress checks="NPathComplexity"
files="(LogLoader|LogValidator|RemoteLogManager|RemoteIndexCache).java"/> files="(LocalLog|LogLoader|LogValidator|RemoteLogManager|RemoteIndexCache).java"/>
<suppress checks="ParameterNumber" <suppress checks="ParameterNumber"
files="(LogAppendInfo|LogLoader|RemoteLogManagerConfig).java"/> files="(LogAppendInfo|LogLoader|RemoteLogManagerConfig).java"/>

View File

@ -1,720 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import kafka.utils.Logging
import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeException}
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.log.{AbortedTxn, FetchDataInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogSegment, LogSegments, OffsetPosition, LocalLog => JLocalLog}
import java.io.File
import java.nio.file.Files
import java.util
import java.util.Collections.singletonList
import java.util.concurrent.atomic.AtomicLong
import java.util.regex.Pattern
import java.util.{Collections, Optional}
import scala.collection.mutable.ListBuffer
import scala.collection.Seq
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOptional
/**
* An append-only log for storing messages locally. The log is a sequence of LogSegments, each with a base offset.
* New log segments are created according to a configurable policy that controls the size in bytes or time interval
* for a given segment.
*
* NOTE: this class is not thread-safe, and it relies on the thread safety provided by the Log class.
*
* @param _dir The directory in which log segments are created.
* @param config The log configuration settings
* @param segments The non-empty log segments recovered from disk
* @param recoveryPoint The offset at which to begin the next recovery i.e. the first offset which has not been flushed to disk
* @param nextOffsetMetadata The offset where the next message could be appended
* @param scheduler The thread pool scheduler used for background actions
* @param time The time instance used for checking the clock
* @param topicPartition The topic partition associated with this log
* @param logDirFailureChannel The LogDirFailureChannel instance to asynchronously handle Log dir failure
*/
class LocalLog(@volatile private var _dir: File,
@volatile private[log] var config: LogConfig,
private[log] val segments: LogSegments,
@volatile private[log] var recoveryPoint: Long,
@volatile private var nextOffsetMetadata: LogOffsetMetadata,
private[log] val scheduler: Scheduler,
private[log] val time: Time,
private[log] val topicPartition: TopicPartition,
private[log] val logDirFailureChannel: LogDirFailureChannel) extends Logging {
import kafka.log.LocalLog._
this.logIdent = s"[LocalLog partition=$topicPartition, dir=${dir.getParent}] "
// The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers()
// After memory mapped buffer is closed, no disk IO operation should be performed for this log.
@volatile private[log] var isMemoryMappedBufferClosed = false
// Cache value of parent directory to avoid allocations in hot paths like ReplicaManager.checkpointHighWatermarks
@volatile private var _parentDir: String = dir.getParent
// Last time the log was flushed
private val lastFlushedTime = new AtomicLong(time.milliseconds)
private[log] def dir: File = _dir
private[log] def name: String = dir.getName
private[log] def parentDir: String = _parentDir
private[log] def parentDirFile: File = new File(_parentDir)
private[log] def isFuture: Boolean = dir.getName.endsWith(LocalLog.FutureDirSuffix)
private def maybeHandleIOException[T](msg: => String)(fun: => T): T = {
JLocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, () => msg, () => fun)
}
/**
* Rename the directory of the log
* @param name the new dir name
* @throws KafkaStorageException if rename fails
*/
private[log] def renameDir(name: String): Boolean = {
maybeHandleIOException(s"Error while renaming dir for $topicPartition in log dir ${dir.getParent}") {
val renamedDir = new File(dir.getParent, name)
Utils.atomicMoveWithFallback(dir.toPath, renamedDir.toPath)
if (renamedDir != dir) {
_dir = renamedDir
_parentDir = renamedDir.getParent
segments.updateParentDir(renamedDir)
true
} else {
false
}
}
}
/**
* Update the existing configuration to the new provided configuration.
* @param newConfig the new configuration to be updated to
*/
private[log] def updateConfig(newConfig: LogConfig): Unit = {
val oldConfig = config
config = newConfig
val oldRecordVersion = oldConfig.recordVersion
val newRecordVersion = newConfig.recordVersion
if (newRecordVersion.precedes(oldRecordVersion))
warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.")
}
private[log] def checkIfMemoryMappedBufferClosed(): Unit = {
if (isMemoryMappedBufferClosed)
throw new KafkaStorageException(s"The memory mapped buffer for log of $topicPartition is already closed")
}
private[log] def updateRecoveryPoint(newRecoveryPoint: Long): Unit = {
recoveryPoint = newRecoveryPoint
}
/**
* Update recoveryPoint to provided offset and mark the log as flushed, if the offset is greater
* than the existing recoveryPoint.
*
* @param offset the offset to be updated
*/
private[log] def markFlushed(offset: Long): Unit = {
checkIfMemoryMappedBufferClosed()
if (offset > recoveryPoint) {
updateRecoveryPoint(offset)
lastFlushedTime.set(time.milliseconds)
}
}
/**
* The number of messages appended to the log since the last flush
*/
private[log] def unflushedMessages: Long = logEndOffset - recoveryPoint
/**
* Flush local log segments for all offsets up to offset-1.
* Does not update the recovery point.
*
* @param offset The offset to flush up to (non-inclusive)
*/
private[log] def flush(offset: Long): Unit = {
val currentRecoveryPoint = recoveryPoint
if (currentRecoveryPoint <= offset) {
val segmentsToFlush = segments.values(currentRecoveryPoint, offset)
segmentsToFlush.forEach(_.flush())
// If there are any new segments, we need to flush the parent directory for crash consistency.
if (segmentsToFlush.stream().anyMatch(_.baseOffset >= currentRecoveryPoint)) {
// The directory might be renamed concurrently for topic deletion, which may cause NoSuchFileException here.
// Since the directory is to be deleted anyways, we just swallow NoSuchFileException and let it go.
Utils.flushDirIfExists(dir.toPath)
}
}
}
/**
* The time this log is last known to have been fully flushed to disk
*/
private[log] def lastFlushTime: Long = lastFlushedTime.get
/**
* The offset metadata of the next message that will be appended to the log
*/
private[log] def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
/**
* The offset of the next message that will be appended to the log
*/
private[log] def logEndOffset: Long = nextOffsetMetadata.messageOffset
/**
* Update end offset of the log, and update the recoveryPoint.
*
* @param endOffset the new end offset of the log
*/
private[log] def updateLogEndOffset(endOffset: Long): Unit = {
nextOffsetMetadata = new LogOffsetMetadata(endOffset, segments.activeSegment.baseOffset, segments.activeSegment.size)
if (recoveryPoint > endOffset) {
updateRecoveryPoint(endOffset)
}
}
/**
* Close file handlers used by log but don't write to disk.
* This is called if the log directory is offline.
*/
private[log] def closeHandlers(): Unit = {
segments.closeHandlers()
isMemoryMappedBufferClosed = true
}
/**
* Closes the segments of the log.
*/
private[log] def close(): Unit = {
maybeHandleIOException(s"Error while renaming dir for $topicPartition in dir ${dir.getParent}") {
checkIfMemoryMappedBufferClosed()
segments.close()
}
}
/**
* Completely delete this log directory with no delay.
*/
private[log] def deleteEmptyDir(): Unit = {
maybeHandleIOException(s"Error while deleting dir for $topicPartition in dir ${dir.getParent}") {
if (segments.nonEmpty) {
throw new IllegalStateException(s"Can not delete directory when ${segments.numberOfSegments} segments are still present")
}
if (!isMemoryMappedBufferClosed) {
throw new IllegalStateException(s"Can not delete directory when memory mapped buffer for log of $topicPartition is still open.")
}
Utils.delete(dir)
}
}
/**
* Completely delete all segments with no delay.
* @return the deleted segments
*/
private[log] def deleteAllSegments(): util.List[LogSegment] = {
maybeHandleIOException(s"Error while deleting all segments for $topicPartition in dir ${dir.getParent}") {
val deletableSegments = new util.ArrayList(segments.values)
removeAndDeleteSegments(segments.values.asScala, asyncDelete = false, LogDeletion(this))
isMemoryMappedBufferClosed = true
deletableSegments
}
}
/**
* This method deletes the given log segments by doing the following for each of them:
* - It removes the segment from the segment map so that it will no longer be used for reads.
* - It renames the index and log files by appending .deleted to the respective file name
* - It can either schedule an asynchronous delete operation to occur in the future or perform the deletion synchronously
*
* Asynchronous deletion allows reads to happen concurrently without synchronization and without the possibility of
* physically deleting a file while it is being read.
*
* This method does not convert IOException to KafkaStorageException, the immediate caller
* is expected to catch and handle IOException.
*
* @param segmentsToDelete The log segments to schedule for deletion
* @param asyncDelete Whether the segment files should be deleted asynchronously
* @param reason The reason for the segment deletion
*/
private[log] def removeAndDeleteSegments(segmentsToDelete: Iterable[LogSegment],
asyncDelete: Boolean,
reason: SegmentDeletionReason): Unit = {
if (segmentsToDelete.nonEmpty) {
// Most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by
// removing the deleted segment, we should force materialization of the iterator here, so that results of the
// iteration remain valid and deterministic. We should also pass only the materialized view of the
// iterator to the logic that actually deletes the segments.
val toDelete = segmentsToDelete.toList
reason.logReason(toDelete)
toDelete.foreach { segment =>
segments.remove(segment.baseOffset)
}
JLocalLog.deleteSegmentFiles(toDelete.asJava, asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent)
}
}
/**
* This method deletes the given segment and creates a new segment with the given new base offset. It ensures an
* active segment exists in the log at all times during this process.
*
* Asynchronous deletion allows reads to happen concurrently without synchronization and without the possibility of
* physically deleting a file while it is being read.
*
* This method does not convert IOException to KafkaStorageException, the immediate caller
* is expected to catch and handle IOException.
*
* @param newOffset The base offset of the new segment
* @param segmentToDelete The old active segment to schedule for deletion
* @param asyncDelete Whether the segment files should be deleted asynchronously
* @param reason The reason for the segment deletion
*/
private[log] def createAndDeleteSegment(newOffset: Long,
segmentToDelete: LogSegment,
asyncDelete: Boolean,
reason: SegmentDeletionReason): LogSegment = {
if (newOffset == segmentToDelete.baseOffset)
segmentToDelete.changeFileSuffixes("", LogFileUtils.DELETED_FILE_SUFFIX)
val newSegment = LogSegment.open(dir,
newOffset,
config,
time,
config.initFileSize,
config.preallocate)
segments.add(newSegment)
reason.logReason(List(segmentToDelete))
if (newOffset != segmentToDelete.baseOffset)
segments.remove(segmentToDelete.baseOffset)
JLocalLog.deleteSegmentFiles(singletonList(segmentToDelete), asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent)
newSegment
}
/**
* Given a message offset, find its corresponding offset metadata in the log.
* If the message offset is out of range, throw an OffsetOutOfRangeException
*/
private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = {
val fetchDataInfo = read(offset,
maxLength = 1,
minOneMessage = false,
maxOffsetMetadata = nextOffsetMetadata,
includeAbortedTxns = false)
fetchDataInfo.fetchOffsetMetadata
}
/**
* Read messages from the log.
*
* @param startOffset The offset to begin reading at
* @param maxLength The maximum number of bytes to read
* @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists)
* @param maxOffsetMetadata The metadata of the maximum offset to be fetched
* @param includeAbortedTxns If true, aborted transactions are included
* @throws OffsetOutOfRangeException If startOffset is beyond the log end offset
* @return The fetch data information including fetch starting offset metadata and messages read.
*/
def read(startOffset: Long,
maxLength: Int,
minOneMessage: Boolean,
maxOffsetMetadata: LogOffsetMetadata,
includeAbortedTxns: Boolean): FetchDataInfo = {
maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {
trace(s"Reading maximum $maxLength bytes at offset $startOffset from log with " +
s"total length ${segments.sizeInBytes} bytes")
val endOffsetMetadata = nextOffsetMetadata
val endOffset = endOffsetMetadata.messageOffset
var segmentOpt = segments.floorSegment(startOffset)
// return error on attempt to read beyond the log end offset
if (startOffset > endOffset || !segmentOpt.isPresent)
throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
s"but we only have log segments upto $endOffset.")
if (startOffset == maxOffsetMetadata.messageOffset)
emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
else if (startOffset > maxOffsetMetadata.messageOffset)
emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns)
else {
// Do the read on the segment with a base offset less than the target offset
// but if that segment doesn't contain any messages with an offset greater than that
// continue to read from successive segments until we get some messages or we reach the end of the log
var fetchDataInfo: FetchDataInfo = null
while (fetchDataInfo == null && segmentOpt.isPresent) {
val segment = segmentOpt.get
val baseOffset = segment.baseOffset
// 1. If `maxOffsetMetadata#segmentBaseOffset < segment#baseOffset`, then return maxPosition as empty.
// 2. Use the max-offset position if it is on this segment; otherwise, the segment size is the limit.
// 3. When maxOffsetMetadata is message-offset-only, then we don't know the relativePositionInSegment so
// return maxPosition as empty to avoid reading beyond the max-offset
val maxPositionOpt: Optional[java.lang.Long] =
if (segment.baseOffset < maxOffsetMetadata.segmentBaseOffset)
Optional.of(segment.size)
else if (segment.baseOffset == maxOffsetMetadata.segmentBaseOffset && !maxOffsetMetadata.messageOffsetOnly())
Optional.of(maxOffsetMetadata.relativePositionInSegment)
else
Optional.empty()
fetchDataInfo = segment.read(startOffset, maxLength, maxPositionOpt, minOneMessage)
if (fetchDataInfo != null) {
if (includeAbortedTxns)
fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo)
} else segmentOpt = segments.higherSegment(baseOffset)
}
if (fetchDataInfo != null) fetchDataInfo
else {
// okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
// this can happen when all messages with offset larger than start offsets have been deleted.
// In this case, we will return the empty set with log end offset metadata
new FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
}
}
}
}
private[log] def append(lastOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = {
segments.activeSegment.append(lastOffset, largestTimestamp, shallowOffsetOfMaxTimestamp, records)
updateLogEndOffset(lastOffset + 1)
}
private def addAbortedTransactions(startOffset: Long, segment: LogSegment,
fetchInfo: FetchDataInfo): FetchDataInfo = {
val fetchSize = fetchInfo.records.sizeInBytes
val startOffsetPosition = new OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
fetchInfo.fetchOffsetMetadata.relativePositionInSegment)
val upperBoundOffset = segment.fetchUpperBoundOffset(startOffsetPosition, fetchSize).orElse(
segments.higherSegment(segment.baseOffset).toScala.map(s => s.baseOffset).getOrElse(logEndOffset))
val abortedTransactions = ListBuffer.empty[FetchResponseData.AbortedTransaction]
def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction)
collectAbortedTransactions(startOffset, upperBoundOffset, segment, accumulator)
new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
fetchInfo.records,
fetchInfo.firstEntryIncomplete,
Optional.of(abortedTransactions.toList.asJava))
}
private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long,
startingSegment: LogSegment,
accumulator: Seq[AbortedTxn] => Unit): Unit = {
val higherSegments = segments.higherSegments(startingSegment.baseOffset).iterator
var segmentEntryOpt = Option(startingSegment)
while (segmentEntryOpt.isDefined) {
val segment = segmentEntryOpt.get
val searchResult = segment.collectAbortedTxns(startOffset, upperBoundOffset)
accumulator(searchResult.abortedTransactions.asScala)
if (searchResult.isComplete)
return
segmentEntryOpt = nextOption(higherSegments)
}
}
private[log] def collectAbortedTransactions(logStartOffset: Long, baseOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = {
val segmentEntry = segments.floorSegment(baseOffset)
val allAbortedTxns = ListBuffer.empty[AbortedTxn]
def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = allAbortedTxns ++= abortedTxns
segmentEntry.ifPresent(segment => collectAbortedTransactions(logStartOffset, upperBoundOffset, segment, accumulator))
allAbortedTxns.toList
}
/**
* Roll the log over to a new active segment starting with the current logEndOffset.
* This will trim the index to the exact size of the number of entries it currently contains.
*
* @param expectedNextOffset The expected next offset after the segment is rolled
*
* @return The newly rolled segment
*/
private[log] def roll(expectedNextOffset: Option[Long] = None): LogSegment = {
maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") {
val start = time.hiResClockMs()
checkIfMemoryMappedBufferClosed()
val newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset)
val logFile = LogFileUtils.logFile(dir, newOffset, "")
val activeSegment = segments.activeSegment
if (segments.contains(newOffset)) {
// segment with the same base offset already exists and loaded
if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) {
// We have seen this happen (see KAFKA-6388) after shouldRoll() returns true for an
// active segment of size zero because of one of the indexes is "full" (due to _maxEntries == 0).
warn(s"Trying to roll a new log segment with start offset $newOffset " +
s"=max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already " +
s"exists and is active with size 0. Size of time index: ${activeSegment.timeIndex.entries}," +
s" size of offset index: ${activeSegment.offsetIndex.entries}.")
val newSegment = createAndDeleteSegment(newOffset, activeSegment, asyncDelete = true, LogRoll(this))
updateLogEndOffset(nextOffsetMetadata.messageOffset)
info(s"Rolled new log segment at offset $newOffset in ${time.hiResClockMs() - start} ms.")
return newSegment
} else {
throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with start offset $newOffset" +
s" =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " +
s"segment is ${segments.get(newOffset)}.")
}
} else if (segments.nonEmpty && newOffset < activeSegment.baseOffset) {
throw new KafkaException(
s"Trying to roll a new log segment for topic partition $topicPartition with " +
s"start offset $newOffset =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active segment $activeSegment")
} else {
val offsetIdxFile = LogFileUtils.offsetIndexFile(dir, newOffset)
val timeIdxFile = LogFileUtils.timeIndexFile(dir, newOffset)
val txnIdxFile = LogFileUtils.transactionIndexFile(dir, newOffset)
for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) {
warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first")
Files.delete(file.toPath)
}
segments.lastSegment.ifPresent(_.onBecomeInactiveSegment())
}
val newSegment = LogSegment.open(dir,
newOffset,
config,
time,
config.initFileSize,
config.preallocate)
segments.add(newSegment)
// We need to update the segment base offset and append position data of the metadata when log rolls.
// The next offset should not change.
updateLogEndOffset(nextOffsetMetadata.messageOffset)
info(s"Rolled new log segment at offset $newOffset in ${time.hiResClockMs() - start} ms.")
newSegment
}
}
/**
* Delete all data in the local log and start at the new offset.
*
* @param newOffset The new offset to start the log with
* @return the list of segments that were scheduled for deletion
*/
private[log] def truncateFullyAndStartAt(newOffset: Long): Iterable[LogSegment] = {
maybeHandleIOException(s"Error while truncating the entire log for $topicPartition in dir ${dir.getParent}") {
debug(s"Truncate and start at offset $newOffset")
checkIfMemoryMappedBufferClosed()
val segmentsToDelete = new util.ArrayList(segments.values).asScala
if (segmentsToDelete.nonEmpty) {
removeAndDeleteSegments(segmentsToDelete.dropRight(1), asyncDelete = true, LogTruncation(this))
// Use createAndDeleteSegment() to create new segment first and then delete the old last segment to prevent missing
// active segment during the deletion process
createAndDeleteSegment(newOffset, segmentsToDelete.last, asyncDelete = true, LogTruncation(this))
}
updateLogEndOffset(newOffset)
segmentsToDelete
}
}
/**
* Truncate this log so that it ends with the greatest offset < targetOffset.
*
* @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete.
* @return the list of segments that were scheduled for deletion
*/
private[log] def truncateTo(targetOffset: Long): Iterable[LogSegment] = {
val deletableSegments = segments.filter(segment => segment.baseOffset > targetOffset).asScala
removeAndDeleteSegments(deletableSegments, asyncDelete = true, LogTruncation(this))
segments.activeSegment.truncateTo(targetOffset)
updateLogEndOffset(targetOffset)
deletableSegments
}
}
/**
* Helper functions for logs
*/
object LocalLog extends Logging {
/** a directory that is scheduled to be deleted */
private[log] val DeleteDirSuffix = LogFileUtils.DELETE_DIR_SUFFIX
/** a directory that is used for future partition */
private[log] val FutureDirSuffix = "-future"
/** a directory that is used for stray partition */
private[log] val StrayDirSuffix = "-stray"
private[log] val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix")
private[log] val FutureDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$FutureDirSuffix")
private[log] val StrayDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$StrayDirSuffix")
private[log] val UnknownOffset = -1L
/**
* Return a directory name to rename the log directory to for async deletion.
* The name will be in the following format: "topic-partitionId.uniqueId-delete".
* If the topic name is too long, it will be truncated to prevent the total name
* from exceeding 255 characters.
*/
private[log] def logDeleteDirName(topicPartition: TopicPartition): String = {
logDirNameWithSuffixCappedLength(topicPartition, DeleteDirSuffix)
}
/**
* Return a directory name to rename the log directory to for stray partition deletion.
* The name will be in the following format: "topic-partitionId.uniqueId-stray".
* If the topic name is too long, it will be truncated to prevent the total name
* from exceeding 255 characters.
*/
private[log] def logStrayDirName(topicPartition: TopicPartition): String = {
logDirNameWithSuffixCappedLength(topicPartition, StrayDirSuffix)
}
/**
* Return a future directory name for the given topic partition. The name will be in the following
* format: topic-partition.uniqueId-future where topic, partition and uniqueId are variables.
*/
private[log] def logFutureDirName(topicPartition: TopicPartition): String = {
logDirNameWithSuffix(topicPartition, FutureDirSuffix)
}
/**
* Return a new directory name in the following format: "${topic}-${partitionId}.${uniqueId}${suffix}".
* If the topic name is too long, it will be truncated to prevent the total name
* from exceeding 255 characters.
*/
private[log] def logDirNameWithSuffixCappedLength(topicPartition: TopicPartition, suffix: String): String = {
val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
val fullSuffix = s"-${topicPartition.partition()}.$uniqueId$suffix"
val prefixLength = Math.min(topicPartition.topic().length, 255 - fullSuffix.length)
s"${topicPartition.topic().substring(0, prefixLength)}$fullSuffix"
}
private[log] def logDirNameWithSuffix(topicPartition: TopicPartition, suffix: String): String = {
val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
s"${logDirName(topicPartition)}.$uniqueId$suffix"
}
/**
* Return a directory name for the given topic partition. The name will be in the following
* format: topic-partition where topic, partition are variables.
*/
private[log] def logDirName(topicPartition: TopicPartition): String = {
s"${topicPartition.topic}-${topicPartition.partition}"
}
/**
* Parse the topic and partition out of the directory name of a log
*/
private[log] def parseTopicPartitionName(dir: File): TopicPartition = {
if (dir == null)
throw new KafkaException("dir should not be null")
def exception(dir: File): KafkaException = {
new KafkaException(s"Found directory ${dir.getCanonicalPath}, '${dir.getName}' is not in the form of " +
"topic-partition or topic-partition.uniqueId-delete (if marked for deletion).\n" +
"Kafka's log directories (and children) should only contain Kafka topic data.")
}
val dirName = dir.getName
if (dirName == null || dirName.isEmpty || !dirName.contains('-'))
throw exception(dir)
if (dirName.endsWith(DeleteDirSuffix) && !DeleteDirPattern.matcher(dirName).matches ||
dirName.endsWith(FutureDirSuffix) && !FutureDirPattern.matcher(dirName).matches ||
dirName.endsWith(StrayDirSuffix) && !StrayDirPattern.matcher(dirName).matches)
throw exception(dir)
val name: String =
if (dirName.endsWith(DeleteDirSuffix) || dirName.endsWith(FutureDirSuffix) || dirName.endsWith(StrayDirSuffix))
dirName.substring(0, dirName.lastIndexOf('.'))
else dirName
val index = name.lastIndexOf('-')
val topic = name.substring(0, index)
val partitionString = name.substring(index + 1)
if (topic.isEmpty || partitionString.isEmpty)
throw exception(dir)
val partition =
try partitionString.toInt
catch { case _: NumberFormatException => throw exception(dir) }
new TopicPartition(topic, partition)
}
private[log] def emptyFetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata,
includeAbortedTxns: Boolean): FetchDataInfo = {
val abortedTransactions: Optional[java.util.List[FetchResponseData.AbortedTransaction]] =
if (includeAbortedTxns) Optional.of(Collections.emptyList())
else Optional.empty()
new FetchDataInfo(fetchOffsetMetadata,
MemoryRecords.EMPTY,
false,
abortedTransactions)
}
/**
* Wraps the value of iterator.next() in an option.
* Note: this facility is a part of the Iterator class starting from scala v2.13.
*
* @param iterator
* @tparam T the type of object held within the iterator
* @return Some(iterator.next) if a next element exists, None otherwise.
*/
private[log] def nextOption[T](iterator: util.Iterator[T]): Option[T] = {
if (iterator.hasNext)
Some(iterator.next())
else
None
}
}
trait SegmentDeletionReason {
def logReason(toDelete: List[LogSegment]): Unit
}
case class LogTruncation(log: LocalLog) extends SegmentDeletionReason {
override def logReason(toDelete: List[LogSegment]): Unit = {
log.info(s"Deleting segments as part of log truncation: ${toDelete.mkString(",")}")
}
}
case class LogRoll(log: LocalLog) extends SegmentDeletionReason {
override def logReason(toDelete: List[LogSegment]): Unit = {
log.info(s"Deleting segments as part of log roll: ${toDelete.mkString(",")}")
}
}
case class LogDeletion(log: LocalLog) extends SegmentDeletionReason {
override def logReason(toDelete: List[LogSegment]): Unit = {
log.info(s"Deleting segments as the log has been deleted: ${toDelete.mkString(",")}")
}
}

View File

@ -18,7 +18,6 @@
package kafka.log package kafka.log
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException} import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.log.LocalLog.nextOption
import kafka.log.remote.RemoteLogManager import kafka.log.remote.RemoteLogManager
import kafka.utils._ import kafka.utils._
import org.apache.kafka.common.errors._ import org.apache.kafka.common.errors._
@ -41,7 +40,7 @@ import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile} import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.LocalLog.SplitSegmentResult import org.apache.kafka.storage.internals.log.LocalLog.SplitSegmentResult
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, VerificationGuard, LocalLog => JLocalLog, UnifiedLog => JUnifiedLog} import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, SegmentDeletionReason, VerificationGuard, UnifiedLog => JUnifiedLog}
import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats} import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats}
import java.io.{File, IOException} import java.io.{File, IOException}
@ -1246,7 +1245,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
} }
private[log] def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = { private[log] def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = {
localLog.collectAbortedTransactions(logStartOffset, startOffset, upperBoundOffset) localLog.collectAbortedTransactions(logStartOffset, startOffset, upperBoundOffset).asScala.toList
} }
/** /**
@ -1559,7 +1558,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
val newLocalLogStartOffset = localLog.segments.higherSegment(segmentsToDelete.last.baseOffset()).get.baseOffset() val newLocalLogStartOffset = localLog.segments.higherSegment(segmentsToDelete.last.baseOffset()).get.baseOffset()
incrementStartOffset(newLocalLogStartOffset, LogStartOffsetIncrementReason.SegmentDeletion) incrementStartOffset(newLocalLogStartOffset, LogStartOffsetIncrementReason.SegmentDeletion)
// remove the segments for lookups // remove the segments for lookups
localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason) localLog.removeAndDeleteSegments(segmentsToDelete.toList.asJava, true, reason)
} }
deleteProducerSnapshots(deletable.toList.asJava, asyncDelete = true) deleteProducerSnapshots(deletable.toList.asJava, asyncDelete = true)
} }
@ -1713,7 +1712,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* @return The newly rolled segment * @return The newly rolled segment
*/ */
def roll(expectedNextOffset: Option[Long] = None): LogSegment = lock synchronized { def roll(expectedNextOffset: Option[Long] = None): LogSegment = lock synchronized {
val newSegment = localLog.roll(expectedNextOffset) val nextOffset : JLong = expectedNextOffset match {
case Some(offset) => offset
case None => 0L
}
val newSegment = localLog.roll(nextOffset)
// Take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot // Take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot
// offset align with the new segment offset since this ensures we can recover the segment by beginning // offset align with the new segment offset since this ensures we can recover the segment by beginning
// with the corresponding snapshot file and scanning the segment data. Because the segment base offset // with the corresponding snapshot file and scanning the segment data. Because the segment base offset
@ -1848,7 +1851,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
truncateFullyAndStartAt(targetOffset) truncateFullyAndStartAt(targetOffset)
} else { } else {
val deletedSegments = localLog.truncateTo(targetOffset) val deletedSegments = localLog.truncateTo(targetOffset)
deleteProducerSnapshots(deletedSegments.toList.asJava, asyncDelete = true) deleteProducerSnapshots(deletedSegments, asyncDelete = true)
leaderEpochCache.foreach(_.truncateFromEndAsyncFlush(targetOffset)) leaderEpochCache.foreach(_.truncateFromEndAsyncFlush(targetOffset))
logStartOffset = math.min(targetOffset, logStartOffset) logStartOffset = math.min(targetOffset, logStartOffset)
rebuildProducerState(targetOffset, producerStateManager) rebuildProducerState(targetOffset, producerStateManager)
@ -1962,7 +1965,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
private[log] def addSegment(segment: LogSegment): LogSegment = localLog.segments.add(segment) private[log] def addSegment(segment: LogSegment): LogSegment = localLog.segments.add(segment)
private def maybeHandleIOException[T](msg: => String)(fun: => T): T = { private def maybeHandleIOException[T](msg: => String)(fun: => T): T = {
JLocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, () => msg, () => fun) LocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, () => msg, () => fun)
} }
private[log] def splitOverflowedSegment(segment: LogSegment): List[LogSegment] = lock synchronized { private[log] def splitOverflowedSegment(segment: LogSegment): List[LogSegment] = lock synchronized {
@ -1971,7 +1974,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
result.newSegments.asScala.toList result.newSegments.asScala.toList
} }
private[log] def deleteProducerSnapshots(segments: util.List[LogSegment], asyncDelete: Boolean): Unit = { private[log] def deleteProducerSnapshots(segments: util.Collection[LogSegment], asyncDelete: Boolean): Unit = {
JUnifiedLog.deleteProducerSnapshots(segments, producerStateManager, asyncDelete, scheduler, config, logDirFailureChannel, parentDir, topicPartition) JUnifiedLog.deleteProducerSnapshots(segments, producerStateManager, asyncDelete, scheduler, config, logDirFailureChannel, parentDir, topicPartition)
} }
} }
@ -1991,14 +1994,9 @@ object UnifiedLog extends Logging {
val DeleteDirSuffix: String = LogFileUtils.DELETE_DIR_SUFFIX val DeleteDirSuffix: String = LogFileUtils.DELETE_DIR_SUFFIX
val StrayDirSuffix: String = LocalLog.StrayDirSuffix val StrayDirSuffix: String = LogFileUtils.STRAY_DIR_SUFFIX
val FutureDirSuffix: String = LocalLog.FutureDirSuffix val UnknownOffset: Long = LocalLog.UNKNOWN_OFFSET
private[log] val DeleteDirPattern = LocalLog.DeleteDirPattern
private[log] val FutureDirPattern = LocalLog.FutureDirPattern
val UnknownOffset: Long = LocalLog.UnknownOffset
def isRemoteLogEnabled(remoteStorageSystemEnable: Boolean, def isRemoteLogEnabled(remoteStorageSystemEnable: Boolean,
config: LogConfig, config: LogConfig,
@ -2139,7 +2137,7 @@ object UnifiedLog extends Logging {
logDirFailureChannel: LogDirFailureChannel, logDirFailureChannel: LogDirFailureChannel,
logPrefix: String, logPrefix: String,
isRecoveredSwapFile: Boolean = false): Iterable[LogSegment] = { isRecoveredSwapFile: Boolean = false): Iterable[LogSegment] = {
JLocalLog.replaceSegments(existingSegments, LocalLog.replaceSegments(existingSegments,
newSegments.asJava, newSegments.asJava,
oldSegments.asJava, oldSegments.asJava,
dir, dir,
@ -2159,11 +2157,11 @@ object UnifiedLog extends Logging {
scheduler: Scheduler, scheduler: Scheduler,
logDirFailureChannel: LogDirFailureChannel, logDirFailureChannel: LogDirFailureChannel,
logPrefix: String): SplitSegmentResult = { logPrefix: String): SplitSegmentResult = {
JLocalLog.splitOverflowedSegment(segment, existingSegments, dir, topicPartition, config, scheduler, logDirFailureChannel, logPrefix) LocalLog.splitOverflowedSegment(segment, existingSegments, dir, topicPartition, config, scheduler, logDirFailureChannel, logPrefix)
} }
private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = { private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = {
JLocalLog.createNewCleanedSegment(dir, logConfig, baseOffset) LocalLog.createNewCleanedSegment(dir, logConfig, baseOffset)
} }
// Visible for benchmarking // Visible for benchmarking
@ -2194,6 +2192,21 @@ object UnifiedLog extends Logging {
if (remoteLogEnabledAndRemoteCopyEnabled) config.localRetentionBytes else config.retentionSize if (remoteLogEnabledAndRemoteCopyEnabled) config.localRetentionBytes else config.retentionSize
} }
/**
* Wraps the value of iterator.next() in an option.
* Note: this facility is a part of the Iterator class starting from scala v2.13.
*
* @param iterator the iterator
* @tparam T the type of object held within the iterator
* @return Some(iterator.next) if a next element exists, None otherwise.
*/
private[log] def nextOption[T](iterator: util.Iterator[T]): Option[T] = {
if (iterator.hasNext)
Some(iterator.next())
else
None
}
} }
object LogMetricNames { object LogMetricNames {
@ -2208,9 +2221,9 @@ object LogMetricNames {
} }
case class RetentionMsBreach(log: UnifiedLog, remoteLogEnabledAndRemoteCopyEnabled: Boolean) extends SegmentDeletionReason { case class RetentionMsBreach(log: UnifiedLog, remoteLogEnabledAndRemoteCopyEnabled: Boolean) extends SegmentDeletionReason {
override def logReason(toDelete: List[LogSegment]): Unit = { override def logReason(toDelete: util.List[LogSegment]): Unit = {
val retentionMs = UnifiedLog.localRetentionMs(log.config, remoteLogEnabledAndRemoteCopyEnabled) val retentionMs = UnifiedLog.localRetentionMs(log.config, remoteLogEnabledAndRemoteCopyEnabled)
toDelete.foreach { segment => toDelete.forEach { segment =>
if (segment.largestRecordTimestamp.isPresent) if (segment.largestRecordTimestamp.isPresent)
if (remoteLogEnabledAndRemoteCopyEnabled) if (remoteLogEnabledAndRemoteCopyEnabled)
log.info(s"Deleting segment $segment due to local log retention time ${retentionMs}ms breach based on the largest " + log.info(s"Deleting segment $segment due to local log retention time ${retentionMs}ms breach based on the largest " +
@ -2231,9 +2244,9 @@ case class RetentionMsBreach(log: UnifiedLog, remoteLogEnabledAndRemoteCopyEnabl
} }
case class RetentionSizeBreach(log: UnifiedLog, remoteLogEnabledAndRemoteCopyEnabled: Boolean) extends SegmentDeletionReason { case class RetentionSizeBreach(log: UnifiedLog, remoteLogEnabledAndRemoteCopyEnabled: Boolean) extends SegmentDeletionReason {
override def logReason(toDelete: List[LogSegment]): Unit = { override def logReason(toDelete: util.List[LogSegment]): Unit = {
var size = log.size var size = log.size
toDelete.foreach { segment => toDelete.forEach { segment =>
size -= segment.size size -= segment.size
if (remoteLogEnabledAndRemoteCopyEnabled) log.info(s"Deleting segment $segment due to local log retention size ${UnifiedLog.localRetentionSize(log.config, remoteLogEnabledAndRemoteCopyEnabled)} breach. " + if (remoteLogEnabledAndRemoteCopyEnabled) log.info(s"Deleting segment $segment due to local log retention size ${UnifiedLog.localRetentionSize(log.config, remoteLogEnabledAndRemoteCopyEnabled)} breach. " +
s"Local log size after deletion will be $size.") s"Local log size after deletion will be $size.")
@ -2244,10 +2257,10 @@ case class RetentionSizeBreach(log: UnifiedLog, remoteLogEnabledAndRemoteCopyEna
} }
case class StartOffsetBreach(log: UnifiedLog, remoteLogEnabled: Boolean) extends SegmentDeletionReason { case class StartOffsetBreach(log: UnifiedLog, remoteLogEnabled: Boolean) extends SegmentDeletionReason {
override def logReason(toDelete: List[LogSegment]): Unit = { override def logReason(toDelete: util.List[LogSegment]): Unit = {
if (remoteLogEnabled) if (remoteLogEnabled)
log.info(s"Deleting segments due to local log start offset ${log.localLogStartOffset()} breach: ${toDelete.mkString(",")}") log.info(s"Deleting segments due to local log start offset ${log.localLogStartOffset()} breach: ${toDelete.asScala.mkString(",")}")
else else
log.info(s"Deleting segments due to log start offset ${log.logStartOffset} breach: ${toDelete.mkString(",")}") log.info(s"Deleting segments due to log start offset ${log.logStartOffset} breach: ${toDelete.asScala.mkString(",")}")
} }
} }

View File

@ -41,7 +41,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
import org.apache.kafka.server.util.MockTime import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard} import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}

View File

@ -63,7 +63,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
import org.apache.kafka.server.util.{KafkaScheduler, MockTime} import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, LogAppendInfo, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard} import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, LocalLog, LogAppendInfo, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource import org.junit.jupiter.params.provider.ValueSource

View File

@ -31,7 +31,7 @@ import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.record.{MemoryRecords, Record, SimpleRecord} import org.apache.kafka.common.record.{MemoryRecords, Record, SimpleRecord}
import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.util.{MockTime, Scheduler} import org.apache.kafka.server.util.{MockTime, Scheduler}
import org.apache.kafka.storage.internals.log.{FetchDataInfo, LocalLog => JLocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogSegment, LogSegments} import org.apache.kafka.storage.internals.log.{FetchDataInfo, LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogSegment, LogSegments, LogTruncation, SegmentDeletionReason}
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@ -98,10 +98,10 @@ class LocalLogTest {
private def appendRecords(records: Iterable[SimpleRecord], private def appendRecords(records: Iterable[SimpleRecord],
log: LocalLog = log, log: LocalLog = log,
initialOffset: Long = 0L): Unit = { initialOffset: Long = 0L): Unit = {
log.append(lastOffset = initialOffset + records.size - 1, log.append(initialOffset + records.size - 1,
largestTimestamp = records.head.timestamp, records.head.timestamp,
shallowOffsetOfMaxTimestamp = initialOffset, initialOffset,
records = MemoryRecords.withRecords(initialOffset, Compression.NONE, 0, records.toList : _*)) MemoryRecords.withRecords(initialOffset, Compression.NONE, 0, records.toList : _*))
} }
private def readRecords(log: LocalLog = log, private def readRecords(log: LocalLog = log,
@ -112,16 +112,16 @@ class LocalLogTest {
includeAbortedTxns: Boolean = false): FetchDataInfo = { includeAbortedTxns: Boolean = false): FetchDataInfo = {
log.read(startOffset, log.read(startOffset,
maxLength, maxLength,
minOneMessage = minOneMessage, minOneMessage,
maxOffsetMetadata, maxOffsetMetadata,
includeAbortedTxns = includeAbortedTxns) includeAbortedTxns)
} }
@Test @Test
def testLogDeleteSegmentsSuccess(): Unit = { def testLogDeleteSegmentsSuccess(): Unit = {
val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes) val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
appendRecords(List(record)) appendRecords(List(record))
log.roll() log.roll(0L)
assertEquals(2, log.segments.numberOfSegments) assertEquals(2, log.segments.numberOfSegments)
assertFalse(logDir.listFiles.isEmpty) assertFalse(logDir.listFiles.isEmpty)
val segmentsBeforeDelete = new util.ArrayList(log.segments.values) val segmentsBeforeDelete = new util.ArrayList(log.segments.values)
@ -135,7 +135,7 @@ class LocalLogTest {
@Test @Test
def testRollEmptyActiveSegment(): Unit = { def testRollEmptyActiveSegment(): Unit = {
val oldActiveSegment = log.segments.activeSegment val oldActiveSegment = log.segments.activeSegment
log.roll() log.roll(0L)
assertEquals(1, log.segments.numberOfSegments) assertEquals(1, log.segments.numberOfSegments)
assertNotEquals(oldActiveSegment, log.segments.activeSegment) assertNotEquals(oldActiveSegment, log.segments.activeSegment)
assertFalse(logDir.listFiles.isEmpty) assertFalse(logDir.listFiles.isEmpty)
@ -146,7 +146,7 @@ class LocalLogTest {
def testLogDeleteDirSuccessWhenEmptyAndFailureWhenNonEmpty(): Unit ={ def testLogDeleteDirSuccessWhenEmptyAndFailureWhenNonEmpty(): Unit ={
val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes) val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
appendRecords(List(record)) appendRecords(List(record))
log.roll() log.roll(0L)
assertEquals(2, log.segments.numberOfSegments) assertEquals(2, log.segments.numberOfSegments)
assertFalse(logDir.listFiles.isEmpty) assertFalse(logDir.listFiles.isEmpty)
@ -172,7 +172,7 @@ class LocalLogTest {
def testLogDirRenameToNewDir(): Unit = { def testLogDirRenameToNewDir(): Unit = {
val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes) val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
appendRecords(List(record)) appendRecords(List(record))
log.roll() log.roll(0L)
assertEquals(2, log.segments.numberOfSegments) assertEquals(2, log.segments.numberOfSegments)
val newLogDir = TestUtils.randomPartitionLogDir(tmpDir) val newLogDir = TestUtils.randomPartitionLogDir(tmpDir)
assertTrue(log.renameDir(newLogDir.getName)) assertTrue(log.renameDir(newLogDir.getName))
@ -198,7 +198,7 @@ class LocalLogTest {
val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes) val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
appendRecords(List(record)) appendRecords(List(record))
mockTime.sleep(1) mockTime.sleep(1)
val newSegment = log.roll() val newSegment = log.roll(0L)
log.flush(newSegment.baseOffset) log.flush(newSegment.baseOffset)
log.markFlushed(newSegment.baseOffset) log.markFlushed(newSegment.baseOffset)
assertEquals(1L, log.recoveryPoint) assertEquals(1L, log.recoveryPoint)
@ -263,29 +263,29 @@ class LocalLogTest {
for (offset <- 0 to 8) { for (offset <- 0 to 8) {
val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes) val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
appendRecords(List(record), initialOffset = offset) appendRecords(List(record), initialOffset = offset)
log.roll() log.roll(0L)
} }
assertEquals(10L, log.segments.numberOfSegments) assertEquals(10L, log.segments.numberOfSegments)
class TestDeletionReason extends SegmentDeletionReason { class TestDeletionReason extends SegmentDeletionReason {
private var _deletedSegments: Iterable[LogSegment] = List[LogSegment]() private var _deletedSegments: util.Collection[LogSegment] = new util.ArrayList()
override def logReason(toDelete: List[LogSegment]): Unit = { override def logReason(toDelete: util.List[LogSegment]): Unit = {
_deletedSegments = List[LogSegment]() ++ toDelete _deletedSegments = new util.ArrayList(toDelete)
} }
def deletedSegments: Iterable[LogSegment] = _deletedSegments def deletedSegments: util.Collection[LogSegment] = _deletedSegments
} }
val reason = new TestDeletionReason() val reason = new TestDeletionReason()
val toDelete = log.segments.values.asScala.toVector val toDelete = new util.ArrayList(log.segments.values)
log.removeAndDeleteSegments(toDelete, asyncDelete = asyncDelete, reason) log.removeAndDeleteSegments(toDelete, asyncDelete, reason)
if (asyncDelete) { if (asyncDelete) {
mockTime.sleep(log.config.fileDeleteDelayMs + 1) mockTime.sleep(log.config.fileDeleteDelayMs + 1)
} }
assertTrue(log.segments.isEmpty) assertTrue(log.segments.isEmpty)
assertEquals(toDelete, reason.deletedSegments) assertEquals(toDelete, reason.deletedSegments)
toDelete.foreach(segment => assertTrue(segment.deleted())) toDelete.forEach(segment => assertTrue(segment.deleted()))
} }
@Test @Test
@ -302,13 +302,13 @@ class LocalLogTest {
for (offset <- 0 to 8) { for (offset <- 0 to 8) {
val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes) val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
appendRecords(List(record), initialOffset = offset) appendRecords(List(record), initialOffset = offset)
log.roll() log.roll(0L)
} }
assertEquals(10L, log.segments.numberOfSegments) assertEquals(10L, log.segments.numberOfSegments)
val toDelete = log.segments.values val toDelete = log.segments.values
JLocalLog.deleteSegmentFiles(toDelete, asyncDelete, log.dir, log.topicPartition, log.config, log.scheduler, log.logDirFailureChannel, "") LocalLog.deleteSegmentFiles(toDelete, asyncDelete, log.dir, log.topicPartition, log.config, log.scheduler, log.logDirFailureChannel, "")
if (asyncDelete) { if (asyncDelete) {
toDelete.forEach { toDelete.forEach {
segment => segment =>
@ -336,7 +336,7 @@ class LocalLogTest {
appendRecords(List(record)) appendRecords(List(record))
val newOffset = log.segments.activeSegment.baseOffset + 1 val newOffset = log.segments.activeSegment.baseOffset + 1
val oldActiveSegment = log.segments.activeSegment val oldActiveSegment = log.segments.activeSegment
val newActiveSegment = log.createAndDeleteSegment(newOffset, log.segments.activeSegment, asyncDelete = true, LogTruncation(log)) val newActiveSegment = log.createAndDeleteSegment(newOffset, log.segments.activeSegment, true, new LogTruncation(log.logger))
assertEquals(1, log.segments.numberOfSegments) assertEquals(1, log.segments.numberOfSegments)
assertEquals(newActiveSegment, log.segments.activeSegment) assertEquals(newActiveSegment, log.segments.activeSegment)
assertNotEquals(oldActiveSegment, log.segments.activeSegment) assertNotEquals(oldActiveSegment, log.segments.activeSegment)
@ -354,7 +354,7 @@ class LocalLogTest {
for (offset <- 0 to 7) { for (offset <- 0 to 7) {
appendRecords(List(record), initialOffset = offset) appendRecords(List(record), initialOffset = offset)
if (offset % 2 != 0) if (offset % 2 != 0)
log.roll() log.roll(0L)
} }
for (offset <- 8 to 12) { for (offset <- 8 to 12) {
val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes) val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
@ -362,7 +362,7 @@ class LocalLogTest {
} }
assertEquals(5, log.segments.numberOfSegments) assertEquals(5, log.segments.numberOfSegments)
assertNotEquals(10L, log.segments.activeSegment.baseOffset) assertNotEquals(10L, log.segments.activeSegment.baseOffset)
val expected = log.segments.values.asScala.toVector val expected = new util.ArrayList(log.segments.values)
val deleted = log.truncateFullyAndStartAt(10L) val deleted = log.truncateFullyAndStartAt(10L)
assertEquals(expected, deleted) assertEquals(expected, deleted)
assertEquals(1, log.segments.numberOfSegments) assertEquals(1, log.segments.numberOfSegments)
@ -379,7 +379,7 @@ class LocalLogTest {
for (offset <- 0 to 4) { for (offset <- 0 to 4) {
appendRecords(List(record), initialOffset = offset) appendRecords(List(record), initialOffset = offset)
if (offset % 2 != 0) if (offset % 2 != 0)
log.roll() log.roll(0L)
} }
assertEquals(3, log.segments.numberOfSegments) assertEquals(3, log.segments.numberOfSegments)
@ -416,15 +416,15 @@ class LocalLogTest {
val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes) val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
appendRecords(List(record), initialOffset = offset) appendRecords(List(record), initialOffset = offset)
if (offset % 3 == 2) if (offset % 3 == 2)
log.roll() log.roll(0L)
} }
assertEquals(5, log.segments.numberOfSegments) assertEquals(5, log.segments.numberOfSegments)
assertEquals(12L, log.logEndOffset) assertEquals(12L, log.logEndOffset)
val expected = log.segments.values(9L, log.logEndOffset + 1).asScala.toVector val expected = new util.ArrayList(log.segments.values(9L, log.logEndOffset + 1))
// Truncate to an offset before the base offset of the active segment // Truncate to an offset before the base offset of the active segment
val deleted = log.truncateTo(7L) val deleted = log.truncateTo(7L)
assertEquals(expected, deleted.toVector) assertEquals(expected, deleted)
assertEquals(3, log.segments.numberOfSegments) assertEquals(3, log.segments.numberOfSegments)
assertEquals(6L, log.segments.activeSegment.baseOffset) assertEquals(6L, log.segments.activeSegment.baseOffset)
assertEquals(0L, log.recoveryPoint) assertEquals(0L, log.recoveryPoint)
@ -444,7 +444,7 @@ class LocalLogTest {
for (i <- 0 until 5) { for (i <- 0 until 5) {
val keyValues = Seq(KeyValue(i.toString, i.toString)) val keyValues = Seq(KeyValue(i.toString, i.toString))
appendRecords(kvsToRecords(keyValues), initialOffset = i) appendRecords(kvsToRecords(keyValues), initialOffset = i)
log.roll() log.roll(0L)
} }
def nonActiveBaseOffsetsFrom(startOffset: Long): Seq[Long] = { def nonActiveBaseOffsetsFrom(startOffset: Long): Seq[Long] = {
@ -506,7 +506,7 @@ class LocalLogTest {
assertThrows(classOf[KafkaException], () => LocalLog.parseTopicPartitionName(dir), assertThrows(classOf[KafkaException], () => LocalLog.parseTopicPartitionName(dir),
() => "KafkaException should have been thrown for dir: " + dir.getCanonicalPath) () => "KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
// also test the "-delete" marker case // also test the "-delete" marker case
val deleteMarkerDir = new File(logDir, topic + partition + "." + LocalLog.DeleteDirSuffix) val deleteMarkerDir = new File(logDir, topic + partition + "." + LogFileUtils.DELETE_DIR_SUFFIX)
assertThrows(classOf[KafkaException], () => LocalLog.parseTopicPartitionName(deleteMarkerDir), assertThrows(classOf[KafkaException], () => LocalLog.parseTopicPartitionName(deleteMarkerDir),
() => "KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath) () => "KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath)
} }
@ -535,7 +535,7 @@ class LocalLogTest {
() => "KafkaException should have been thrown for dir: " + dir.getCanonicalPath) () => "KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
// also test the "-delete" marker case // also test the "-delete" marker case
val deleteMarkerDir = new File(logDir, topicPartitionName(topic, partition) + "." + LocalLog.DeleteDirSuffix) val deleteMarkerDir = new File(logDir, topicPartitionName(topic, partition) + "." + LogFileUtils.DELETE_DIR_SUFFIX)
assertThrows(classOf[KafkaException], () => LocalLog.parseTopicPartitionName(deleteMarkerDir), assertThrows(classOf[KafkaException], () => LocalLog.parseTopicPartitionName(deleteMarkerDir),
() => "KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath) () => "KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath)
} }
@ -549,7 +549,7 @@ class LocalLogTest {
() => "KafkaException should have been thrown for dir: " + dir.getCanonicalPath) () => "KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
// also test the "-delete" marker case // also test the "-delete" marker case
val deleteMarkerDir = new File(logDir, topic + partition + "." + LocalLog.DeleteDirSuffix) val deleteMarkerDir = new File(logDir, topic + partition + "." + LogFileUtils.DELETE_DIR_SUFFIX)
assertThrows(classOf[KafkaException], () => LocalLog.parseTopicPartitionName(deleteMarkerDir), assertThrows(classOf[KafkaException], () => LocalLog.parseTopicPartitionName(deleteMarkerDir),
() => "KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath) () => "KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath)
} }
@ -569,14 +569,14 @@ class LocalLogTest {
val name1 = LocalLog.logDeleteDirName(new TopicPartition("foo", 3)) val name1 = LocalLog.logDeleteDirName(new TopicPartition("foo", 3))
assertTrue(name1.length <= 255) assertTrue(name1.length <= 255)
assertTrue(Pattern.compile("foo-3\\.[0-9a-z]{32}-delete").matcher(name1).matches()) assertTrue(Pattern.compile("foo-3\\.[0-9a-z]{32}-delete").matcher(name1).matches())
assertTrue(LocalLog.DeleteDirPattern.matcher(name1).matches()) assertTrue(LocalLog.DELETE_DIR_PATTERN.matcher(name1).matches())
assertFalse(LocalLog.FutureDirPattern.matcher(name1).matches()) assertFalse(LocalLog.FUTURE_DIR_PATTERN.matcher(name1).matches())
val name2 = LocalLog.logDeleteDirName( val name2 = LocalLog.logDeleteDirName(
new TopicPartition("n" + String.join("", Collections.nCopies(248, "o")), 5)) new TopicPartition("n" + String.join("", Collections.nCopies(248, "o")), 5))
assertEquals(255, name2.length) assertEquals(255, name2.length)
assertTrue(Pattern.compile("n[o]{212}-5\\.[0-9a-z]{32}-delete").matcher(name2).matches()) assertTrue(Pattern.compile("n[o]{212}-5\\.[0-9a-z]{32}-delete").matcher(name2).matches())
assertTrue(LocalLog.DeleteDirPattern.matcher(name2).matches()) assertTrue(LocalLog.DELETE_DIR_PATTERN.matcher(name2).matches())
assertFalse(LocalLog.FutureDirPattern.matcher(name2).matches()) assertFalse(LocalLog.FUTURE_DIR_PATTERN.matcher(name2).matches())
} }
@Test @Test
@ -598,7 +598,7 @@ class LocalLogTest {
assertEquals(1, log.segments.numberOfSegments, "Log begins with a single empty segment.") assertEquals(1, log.segments.numberOfSegments, "Log begins with a single empty segment.")
// roll active segment with the same base offset of size zero should recreate the segment // roll active segment with the same base offset of size zero should recreate the segment
log.roll(Some(0L)) log.roll(0L)
assertEquals(1, log.segments.numberOfSegments, "Expect 1 segment after roll() empty segment with base offset.") assertEquals(1, log.segments.numberOfSegments, "Expect 1 segment after roll() empty segment with base offset.")
// should be able to append records to active segment // should be able to append records to active segment
@ -614,7 +614,7 @@ class LocalLogTest {
assertEquals(keyValues1 ++ keyValues2, recordsToKvs(readResult.records.records.asScala)) assertEquals(keyValues1 ++ keyValues2, recordsToKvs(readResult.records.records.asScala))
// roll so that active segment is empty // roll so that active segment is empty
log.roll() log.roll(0L)
assertEquals(2L, log.segments.activeSegment.baseOffset, "Expect base offset of active segment to be LEO") assertEquals(2L, log.segments.activeSegment.baseOffset, "Expect base offset of active segment to be LEO")
assertEquals(2, log.segments.numberOfSegments, "Expect two segments.") assertEquals(2, log.segments.numberOfSegments, "Expect two segments.")
assertEquals(2L, log.logEndOffset) assertEquals(2L, log.logEndOffset)
@ -626,7 +626,7 @@ class LocalLogTest {
// roll active segment with the same base offset of size zero should recreate the segment // roll active segment with the same base offset of size zero should recreate the segment
{ {
val newSegment = log.roll() val newSegment = log.roll(0L)
assertEquals(0L, newSegment.baseOffset) assertEquals(0L, newSegment.baseOffset)
assertEquals(1, log.segments.numberOfSegments) assertEquals(1, log.segments.numberOfSegments)
assertEquals(0L, log.logEndOffset) assertEquals(0L, log.logEndOffset)
@ -635,7 +635,7 @@ class LocalLogTest {
appendRecords(List(KeyValue("k1", "v1").toRecord())) appendRecords(List(KeyValue("k1", "v1").toRecord()))
{ {
val newSegment = log.roll() val newSegment = log.roll(0L)
assertEquals(1L, newSegment.baseOffset) assertEquals(1L, newSegment.baseOffset)
assertEquals(2, log.segments.numberOfSegments) assertEquals(2, log.segments.numberOfSegments)
assertEquals(1L, log.logEndOffset) assertEquals(1L, log.logEndOffset)
@ -644,7 +644,7 @@ class LocalLogTest {
appendRecords(List(KeyValue("k2", "v2").toRecord()), initialOffset = 1L) appendRecords(List(KeyValue("k2", "v2").toRecord()), initialOffset = 1L)
{ {
val newSegment = log.roll(Some(1L)) val newSegment = log.roll(1L)
assertEquals(2L, newSegment.baseOffset) assertEquals(2L, newSegment.baseOffset)
assertEquals(3, log.segments.numberOfSegments) assertEquals(3, log.segments.numberOfSegments)
assertEquals(2L, log.logEndOffset) assertEquals(2L, log.logEndOffset)
@ -661,7 +661,7 @@ class LocalLogTest {
assertEquals(3, log.logEndOffset, "Expect two records in the log") assertEquals(3, log.logEndOffset, "Expect two records in the log")
// roll to create an empty active segment // roll to create an empty active segment
log.roll() log.roll(0L)
assertEquals(3L, log.segments.activeSegment.baseOffset) assertEquals(3L, log.segments.activeSegment.baseOffset)
// intentionally setup the logEndOffset to introduce an error later // intentionally setup the logEndOffset to introduce an error later
@ -669,7 +669,7 @@ class LocalLogTest {
// expect an error because of attempt to roll to a new offset (1L) that's lower than the // expect an error because of attempt to roll to a new offset (1L) that's lower than the
// base offset (3L) of the active segment // base offset (3L) of the active segment
assertThrows(classOf[KafkaException], () => log.roll()) assertThrows(classOf[KafkaException], () => log.roll(0L))
} }
@Test @Test
@ -679,7 +679,7 @@ class LocalLogTest {
val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes) val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
appendRecords(List(record)) appendRecords(List(record))
mockTime.sleep(1) mockTime.sleep(1)
val newSegment = log.roll() val newSegment = log.roll(0L)
// simulate the directory is renamed concurrently // simulate the directory is renamed concurrently
doReturn(new File("__NON_EXISTENT__"), Nil: _*).when(spyLog).dir doReturn(new File("__NON_EXISTENT__"), Nil: _*).when(spyLog).dir
@ -701,14 +701,14 @@ class LocalLogTest {
time, time,
config.initFileSize, config.initFileSize,
config.preallocate)) config.preallocate))
new LocalLog(_dir = dir, new LocalLog(dir,
config = config, config,
segments = segments, segments,
recoveryPoint = recoveryPoint, recoveryPoint,
nextOffsetMetadata = nextOffsetMetadata, nextOffsetMetadata,
scheduler = scheduler, scheduler,
time = time, time,
topicPartition = topicPartition, topicPartition,
logDirFailureChannel = logDirFailureChannel) logDirFailureChannel)
} }
} }

View File

@ -28,7 +28,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.util.MockTime import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, LogDirFailureChannel, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig} import org.apache.kafka.storage.internals.log.{AppendOrigin, LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test} import org.junit.jupiter.api.{AfterEach, Test}

View File

@ -29,7 +29,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.util.MockTime import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig} import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.utils.Throttler import org.apache.kafka.storage.internals.utils.Throttler
import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._

View File

@ -31,7 +31,7 @@ import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0 import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0
import org.apache.kafka.server.util.{MockTime, Scheduler} import org.apache.kafka.server.util.{MockTime, Scheduler}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile} import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile}
import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler
import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue} import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}

View File

@ -20,7 +20,7 @@ package kafka.server
import com.yammer.metrics.core.{Gauge, Meter, Timer} import com.yammer.metrics.core.{Gauge, Meter, Timer}
import kafka.cluster.PartitionTest.MockPartitionListener import kafka.cluster.PartitionTest.MockPartitionListener
import kafka.cluster.Partition import kafka.cluster.Partition
import kafka.log.{LocalLog, LogManager, LogManagerTest, UnifiedLog} import kafka.log.{LogManager, LogManagerTest, UnifiedLog}
import kafka.log.remote.RemoteLogManager import kafka.log.remote.RemoteLogManager
import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS
import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics
@ -70,7 +70,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPa
import org.apache.kafka.server.util.timer.MockTimer import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.server.util.{MockScheduler, MockTime} import org.apache.kafka.server.util.{MockScheduler, MockTime}
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, PartitionMetadataFile} import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, PartitionMetadataFile}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, VerificationGuard} import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeEach, Test} import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeEach, Test}

View File

@ -19,11 +19,11 @@ package kafka.utils
import java.util.Properties import java.util.Properties
import java.util.concurrent.atomic._ import java.util.concurrent.atomic._
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Executors, TimeUnit} import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Executors, TimeUnit}
import kafka.log.{LocalLog, UnifiedLog} import kafka.log.UnifiedLog
import kafka.utils.TestUtils.retry import kafka.utils.TestUtils.retry
import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.util.{KafkaScheduler, MockTime} import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.log.{LogConfig, LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig} import org.apache.kafka.storage.internals.log.{LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout}

View File

@ -163,7 +163,6 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
Given that, this bug pattern doesn't make sense for Scala code. --> Given that, this bug pattern doesn't make sense for Scala code. -->
<Or> <Or>
<Class name="kafka.log.Log"/> <Class name="kafka.log.Log"/>
<Class name="kafka.log.LocalLog$"/>
</Or> </Or>
<Bug pattern="REC_CATCH_EXCEPTION"/> <Bug pattern="REC_CATCH_EXCEPTION"/>
</Match> </Match>

View File

@ -16,10 +16,16 @@
*/ */
package org.apache.kafka.storage.internals.log; package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.KafkaStorageException; import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.record.FileLogInputStream; import org.apache.kafka.common.record.FileLogInputStream;
import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordVersion;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.Scheduler; import org.apache.kafka.server.util.Scheduler;
@ -29,25 +35,774 @@ import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static java.util.Collections.singletonList;
import static org.apache.kafka.common.utils.Utils.require; import static org.apache.kafka.common.utils.Utils.require;
import static org.apache.kafka.storage.internals.log.LogFileUtils.CLEANED_FILE_SUFFIX; import static org.apache.kafka.storage.internals.log.LogFileUtils.CLEANED_FILE_SUFFIX;
import static org.apache.kafka.storage.internals.log.LogFileUtils.DELETED_FILE_SUFFIX; import static org.apache.kafka.storage.internals.log.LogFileUtils.DELETED_FILE_SUFFIX;
import static org.apache.kafka.storage.internals.log.LogFileUtils.DELETE_DIR_SUFFIX;
import static org.apache.kafka.storage.internals.log.LogFileUtils.FUTURE_DIR_SUFFIX;
import static org.apache.kafka.storage.internals.log.LogFileUtils.STRAY_DIR_SUFFIX;
import static org.apache.kafka.storage.internals.log.LogFileUtils.SWAP_FILE_SUFFIX; import static org.apache.kafka.storage.internals.log.LogFileUtils.SWAP_FILE_SUFFIX;
import static org.apache.kafka.storage.internals.log.LogFileUtils.isLogFile; import static org.apache.kafka.storage.internals.log.LogFileUtils.isLogFile;
/**
* An append-only log for storing messages locally. The log is a sequence of LogSegments, each with a base offset.
* New log segments are created according to a configurable policy that controls the size in bytes or time interval
* for a given segment.
* NOTE: this class is not thread-safe, and it relies on the thread safety provided by the Log class.
*/
public class LocalLog { public class LocalLog {
private static final Logger LOG = LoggerFactory.getLogger(LocalLog.class); private static final Logger LOG = LoggerFactory.getLogger(LocalLog.class);
public static final Pattern DELETE_DIR_PATTERN = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + LogFileUtils.DELETE_DIR_SUFFIX);
public static final Pattern FUTURE_DIR_PATTERN = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + LogFileUtils.FUTURE_DIR_SUFFIX);
public static final Pattern STRAY_DIR_PATTERN = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + LogFileUtils.STRAY_DIR_SUFFIX);
public static final long UNKNOWN_OFFSET = -1L;
// Last time the log was flushed
private final AtomicLong lastFlushedTime;
private final String logIdent;
private final LogSegments segments;
private final Scheduler scheduler;
private final Time time;
private final TopicPartition topicPartition;
private final LogDirFailureChannel logDirFailureChannel;
private final Logger logger;
private volatile LogOffsetMetadata nextOffsetMetadata;
// The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers()
// After memory mapped buffer is closed, no disk IO operation should be performed for this log.
private volatile boolean isMemoryMappedBufferClosed = false;
// Cache value of parent directory to avoid allocations in hot paths like ReplicaManager.checkpointHighWatermarks
private volatile String parentDir;
private volatile LogConfig config;
private volatile long recoveryPoint;
private File dir;
/**
* @param dir The directory in which log segments are created.
* @param config The log configuration settings
* @param segments The non-empty log segments recovered from disk
* @param recoveryPoint The offset at which to begin the next recovery i.e. the first offset which has not been flushed to disk
* @param nextOffsetMetadata The offset where the next message could be appended
* @param scheduler The thread pool scheduler used for background actions
* @param time The time instance used for checking the clock
* @param topicPartition The topic partition associated with this log
* @param logDirFailureChannel The LogDirFailureChannel instance to asynchronously handle Log dir failure
*/
public LocalLog(File dir,
LogConfig config,
LogSegments segments,
long recoveryPoint,
LogOffsetMetadata nextOffsetMetadata,
Scheduler scheduler,
Time time,
TopicPartition topicPartition,
LogDirFailureChannel logDirFailureChannel) {
this.dir = dir;
this.config = config;
this.segments = segments;
this.recoveryPoint = recoveryPoint;
this.nextOffsetMetadata = nextOffsetMetadata;
this.scheduler = scheduler;
this.time = time;
this.topicPartition = topicPartition;
this.logDirFailureChannel = logDirFailureChannel;
this.logIdent = "[LocalLog partition=" + topicPartition + ", dir=" + dir + "] ";
this.logger = new LogContext(logIdent).logger(LocalLog.class);
// Last time the log was flushed
this.lastFlushedTime = new AtomicLong(time.milliseconds());
this.parentDir = dir.getParent();
}
public File dir() {
return dir;
}
public Logger logger() {
return logger;
}
public LogConfig config() {
return config;
}
public LogSegments segments() {
return segments;
}
public Scheduler scheduler() {
return scheduler;
}
public LogOffsetMetadata nextOffsetMetadata() {
return nextOffsetMetadata;
}
public TopicPartition topicPartition() {
return topicPartition;
}
public LogDirFailureChannel logDirFailureChannel() {
return logDirFailureChannel;
}
public long recoveryPoint() {
return recoveryPoint;
}
public Time time() {
return time;
}
public String name() {
return dir.getName();
}
public String parentDir() {
return parentDir;
}
public File parentDirFile() {
return new File(parentDir);
}
public boolean isFuture() {
return dir.getName().endsWith(LogFileUtils.FUTURE_DIR_SUFFIX);
}
private <T> T maybeHandleIOException(Supplier<String> errorMsgSupplier, StorageAction<T, IOException> function) {
return maybeHandleIOException(logDirFailureChannel, parentDir, errorMsgSupplier, function);
}
/**
* Rename the directory of the log
* @param name the new dir name
* @throws KafkaStorageException if rename fails
*/
public boolean renameDir(String name) {
return maybeHandleIOException(
() -> "Error while renaming dir for " + topicPartition + " in log dir " + dir.getParent(),
() -> {
File renamedDir = new File(dir.getParent(), name);
Utils.atomicMoveWithFallback(dir.toPath(), renamedDir.toPath());
if (!renamedDir.equals(dir)) {
dir = renamedDir;
parentDir = renamedDir.getParent();
segments.updateParentDir(renamedDir);
return true;
} else {
return false;
}
}
);
}
/**
* Update the existing configuration to the new provided configuration.
* @param newConfig the new configuration to be updated to
*/
public void updateConfig(LogConfig newConfig) {
LogConfig oldConfig = config;
config = newConfig;
RecordVersion oldRecordVersion = oldConfig.recordVersion();
RecordVersion newRecordVersion = newConfig.recordVersion();
if (newRecordVersion.precedes(oldRecordVersion)) {
logger.warn("Record format version has been downgraded from {} to {}.", oldRecordVersion, newRecordVersion);
}
}
public void checkIfMemoryMappedBufferClosed() {
if (isMemoryMappedBufferClosed) {
throw new KafkaStorageException("The memory mapped buffer for log of " + topicPartition + " is already closed");
}
}
public void updateRecoveryPoint(long newRecoveryPoint) {
recoveryPoint = newRecoveryPoint;
}
/**
* Update recoveryPoint to provided offset and mark the log as flushed, if the offset is greater
* than the existing recoveryPoint.
*
* @param offset the offset to be updated
*/
public void markFlushed(long offset) {
checkIfMemoryMappedBufferClosed();
if (offset > recoveryPoint) {
updateRecoveryPoint(offset);
lastFlushedTime.set(time.milliseconds());
}
}
/**
* The number of messages appended to the log since the last flush
*/
public long unflushedMessages() {
return logEndOffset() - recoveryPoint;
}
/**
* Flush local log segments for all offsets up to offset-1.
* Does not update the recovery point.
*
* @param offset The offset to flush up to (non-inclusive)
*/
public void flush(long offset) throws IOException {
long currentRecoveryPoint = recoveryPoint;
if (currentRecoveryPoint <= offset) {
Collection<LogSegment> segmentsToFlush = segments.values(currentRecoveryPoint, offset);
for (LogSegment logSegment : segmentsToFlush) {
logSegment.flush();
}
// If there are any new segments, we need to flush the parent directory for crash consistency.
if (segmentsToFlush.stream().anyMatch(s -> s.baseOffset() >= currentRecoveryPoint)) {
// The directory might be renamed concurrently for topic deletion, which may cause NoSuchFileException here.
// Since the directory is to be deleted anyways, we just swallow NoSuchFileException and let it go.
Utils.flushDirIfExists(dir.toPath());
}
}
}
/**
* The time this log is last known to have been fully flushed to disk
*/
public long lastFlushTime() {
return lastFlushedTime.get();
}
/**
* The offset metadata of the next message that will be appended to the log
*/
public LogOffsetMetadata logEndOffsetMetadata() {
return nextOffsetMetadata;
}
/**
* The offset of the next message that will be appended to the log
*/
public long logEndOffset() {
return nextOffsetMetadata.messageOffset;
}
/**
* Update end offset of the log, and update the recoveryPoint.
*
* @param endOffset the new end offset of the log
*/
public void updateLogEndOffset(long endOffset) {
nextOffsetMetadata = new LogOffsetMetadata(endOffset, segments.activeSegment().baseOffset(), segments.activeSegment().size());
if (recoveryPoint > endOffset) {
updateRecoveryPoint(endOffset);
}
}
/**
* Close file handlers used by log but don't write to disk.
* This is called if the log directory is offline.
*/
public void closeHandlers() {
segments.closeHandlers();
isMemoryMappedBufferClosed = true;
}
/**
* Closes the segments of the log.
*/
public void close() {
maybeHandleIOException(
() -> "Error while renaming dir for " + topicPartition + " in dir " + dir.getParent(),
() -> {
checkIfMemoryMappedBufferClosed();
segments.close();
return null;
}
);
}
/**
* Completely delete this log directory with no delay.
*/
public void deleteEmptyDir() {
maybeHandleIOException(
() -> "Error while deleting dir for " + topicPartition + " in dir " + dir.getParent(),
() -> {
if (!segments.isEmpty()) {
throw new IllegalStateException("Can not delete directory when " + segments.numberOfSegments() + " segments are still present");
}
if (!isMemoryMappedBufferClosed) {
throw new IllegalStateException("Can not delete directory when memory mapped buffer for log of " + topicPartition + " is still open.");
}
Utils.delete(dir);
return null;
}
);
}
/**
* Completely delete all segments with no delay.
* @return the deleted segments
*/
public List<LogSegment> deleteAllSegments() {
return maybeHandleIOException(
() -> "Error while deleting all segments for $topicPartition in dir ${dir.getParent}",
() -> {
List<LogSegment> deletableSegments = new ArrayList<>(segments.values());
removeAndDeleteSegments(
segments.values(),
false,
toDelete -> logger.info("Deleting segments as the log has been deleted: {}", toDelete.stream()
.map(LogSegment::toString)
.collect(Collectors.joining(", "))));
isMemoryMappedBufferClosed = true;
return deletableSegments;
}
);
}
/**
* This method deletes the given log segments by doing the following for each of them:
* <ul>
* <li>It removes the segment from the segment map so that it will no longer be used for reads.
* <li>It renames the index and log files by appending .deleted to the respective file name
* <li>It can either schedule an asynchronous delete operation to occur in the future or perform the deletion synchronously
* </ul>
* Asynchronous deletion allows reads to happen concurrently without synchronization and without the possibility of
* physically deleting a file while it is being read.
* This method does not convert IOException to KafkaStorageException, the immediate caller
* is expected to catch and handle IOException.
*
* @param segmentsToDelete The log segments to schedule for deletion
* @param asyncDelete Whether the segment files should be deleted asynchronously
* @param reason The reason for the segment deletion
*/
public void removeAndDeleteSegments(Collection<LogSegment> segmentsToDelete,
boolean asyncDelete,
SegmentDeletionReason reason) throws IOException {
if (!segmentsToDelete.isEmpty()) {
// Most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by
// removing the deleted segment, we should force materialization of the iterator here, so that results of the
// iteration remain valid and deterministic. We should also pass only the materialized view of the
// iterator to the logic that actually deletes the segments.
List<LogSegment> toDelete = new ArrayList<>(segmentsToDelete);
reason.logReason(toDelete);
toDelete.forEach(segment -> segments.remove(segment.baseOffset()));
deleteSegmentFiles(toDelete, asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent);
}
}
/**
* This method deletes the given segment and creates a new segment with the given new base offset. It ensures an
* active segment exists in the log at all times during this process.
* <br/>
* Asynchronous deletion allows reads to happen concurrently without synchronization and without the possibility of
* physically deleting a file while it is being read.
* <br/>
* This method does not convert IOException to KafkaStorageException, the immediate caller
* is expected to catch and handle IOException.
*
* @param newOffset The base offset of the new segment
* @param segmentToDelete The old active segment to schedule for deletion
* @param asyncDelete Whether the segment files should be deleted asynchronously
* @param reason The reason for the segment deletion
*/
public LogSegment createAndDeleteSegment(long newOffset,
LogSegment segmentToDelete,
boolean asyncDelete,
SegmentDeletionReason reason) throws IOException {
if (newOffset == segmentToDelete.baseOffset()) {
segmentToDelete.changeFileSuffixes("", LogFileUtils.DELETED_FILE_SUFFIX);
}
LogSegment newSegment = LogSegment.open(dir,
newOffset,
config,
time,
config.initFileSize(),
config.preallocate);
segments.add(newSegment);
reason.logReason(singletonList(segmentToDelete));
if (newOffset != segmentToDelete.baseOffset()) {
segments.remove(segmentToDelete.baseOffset());
}
deleteSegmentFiles(singletonList(segmentToDelete), asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent);
return newSegment;
}
/**
* Given a message offset, find its corresponding offset metadata in the log.
* If the message offset is out of range, throw an OffsetOutOfRangeException
*/
public LogOffsetMetadata convertToOffsetMetadataOrThrow(long offset) throws IOException {
FetchDataInfo fetchDataInfo = read(offset, 1, false, nextOffsetMetadata, false);
return fetchDataInfo.fetchOffsetMetadata;
}
/**
* Read messages from the log.
*
* @param startOffset The offset to begin reading at
* @param maxLength The maximum number of bytes to read
* @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists)
* @param maxOffsetMetadata The metadata of the maximum offset to be fetched
* @param includeAbortedTxns If true, aborted transactions are included
* @throws OffsetOutOfRangeException If startOffset is beyond the log end offset
* @return The fetch data information including fetch starting offset metadata and messages read.
*/
public FetchDataInfo read(long startOffset,
int maxLength,
boolean minOneMessage,
LogOffsetMetadata maxOffsetMetadata,
boolean includeAbortedTxns) throws IOException {
return maybeHandleIOException(
() -> "Exception while reading from " + topicPartition + " in dir " + dir.getParent(),
() -> {
logger.trace("Reading maximum $maxLength bytes at offset {} from log with total length {} bytes",
startOffset, segments.sizeInBytes());
LogOffsetMetadata endOffsetMetadata = nextOffsetMetadata;
long endOffset = endOffsetMetadata.messageOffset;
Optional<LogSegment> segmentOpt = segments.floorSegment(startOffset);
// return error on attempt to read beyond the log end offset
if (startOffset > endOffset || segmentOpt.isEmpty()) {
throw new OffsetOutOfRangeException("Received request for offset " + startOffset + " for partition " + topicPartition + ", " +
"but we only have log segments upto " + endOffset + ".");
}
if (startOffset == maxOffsetMetadata.messageOffset) return emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns);
if (startOffset > maxOffsetMetadata.messageOffset) return emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns);
// Do the read on the segment with a base offset less than the target offset
// but if that segment doesn't contain any messages with an offset greater than that
// continue to read from successive segments until we get some messages or we reach the end of the log
FetchDataInfo fetchDataInfo = null;
while (fetchDataInfo == null && segmentOpt.isPresent()) {
LogSegment segment = segmentOpt.get();
long baseOffset = segment.baseOffset();
// 1. If `maxOffsetMetadata#segmentBaseOffset < segment#baseOffset`, then return maxPosition as empty.
// 2. Use the max-offset position if it is on this segment; otherwise, the segment size is the limit.
// 3. When maxOffsetMetadata is message-offset-only, then we don't know the relativePositionInSegment so
// return maxPosition as empty to avoid reading beyond the max-offset
Optional<Long> maxPositionOpt;
if (segment.baseOffset() < maxOffsetMetadata.segmentBaseOffset)
maxPositionOpt = Optional.of((long) segment.size());
else if (segment.baseOffset() == maxOffsetMetadata.segmentBaseOffset && !maxOffsetMetadata.messageOffsetOnly())
maxPositionOpt = Optional.of((long) maxOffsetMetadata.relativePositionInSegment);
else
maxPositionOpt = Optional.empty();
fetchDataInfo = segment.read(startOffset, maxLength, maxPositionOpt, minOneMessage);
if (fetchDataInfo != null) {
if (includeAbortedTxns) {
fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo);
}
} else {
segmentOpt = segments.higherSegment(baseOffset);
}
}
if (fetchDataInfo != null) {
return fetchDataInfo;
} else {
// okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
// this can happen when all messages with offset larger than start offsets have been deleted.
// In this case, we will return the empty set with log end offset metadata
return new FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY);
}
}
);
}
public void append(long lastOffset, long largestTimestamp, long shallowOffsetOfMaxTimestamp, MemoryRecords records) throws IOException {
segments.activeSegment().append(lastOffset, largestTimestamp, shallowOffsetOfMaxTimestamp, records);
updateLogEndOffset(lastOffset + 1);
}
FetchDataInfo addAbortedTransactions(long startOffset, LogSegment segment, FetchDataInfo fetchInfo) throws IOException {
int fetchSize = fetchInfo.records.sizeInBytes();
OffsetPosition startOffsetPosition = new OffsetPosition(
fetchInfo.fetchOffsetMetadata.messageOffset,
fetchInfo.fetchOffsetMetadata.relativePositionInSegment);
long upperBoundOffset = segment.fetchUpperBoundOffset(startOffsetPosition, fetchSize)
.orElse(segments.higherSegment(segment.baseOffset())
.map(LogSegment::baseOffset).orElse(logEndOffset()));
List<FetchResponseData.AbortedTransaction> abortedTransactions = new ArrayList<>();
Consumer<List<AbortedTxn>> accumulator = abortedTxns -> {
for (AbortedTxn abortedTxn : abortedTxns)
abortedTransactions.add(abortedTxn.asAbortedTransaction());
};
collectAbortedTransactions(startOffset, upperBoundOffset, segment, accumulator);
return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
fetchInfo.records,
fetchInfo.firstEntryIncomplete,
Optional.of(abortedTransactions));
}
private void collectAbortedTransactions(long startOffset, long upperBoundOffset,
LogSegment startingSegment,
Consumer<List<AbortedTxn>> accumulator) {
Iterator<LogSegment> higherSegments = segments.higherSegments(startingSegment.baseOffset()).iterator();
Optional<LogSegment> segmentEntryOpt = Optional.of(startingSegment);
while (segmentEntryOpt.isPresent()) {
LogSegment segment = segmentEntryOpt.get();
TxnIndexSearchResult searchResult = segment.collectAbortedTxns(startOffset, upperBoundOffset);
accumulator.accept(searchResult.abortedTransactions);
if (searchResult.isComplete) return;
segmentEntryOpt = nextItem(higherSegments);
}
}
public List<AbortedTxn> collectAbortedTransactions(long logStartOffset, long baseOffset, long upperBoundOffset) {
Optional<LogSegment> segmentEntry = segments.floorSegment(baseOffset);
List<AbortedTxn> allAbortedTxns = new ArrayList<>();
segmentEntry.ifPresent(logSegment -> collectAbortedTransactions(logStartOffset, upperBoundOffset, logSegment, allAbortedTxns::addAll));
return allAbortedTxns;
}
/**
* Roll the log over to a new active segment starting with the current logEndOffset.
* This will trim the index to the exact size of the number of entries it currently contains.
*
* @param expectedNextOffset The expected next offset after the segment is rolled
*
* @return The newly rolled segment
*/
public LogSegment roll(Long expectedNextOffset) {
return maybeHandleIOException(
() -> "Error while rolling log segment for " + topicPartition + " in dir " + dir.getParent(),
() -> {
long start = time.hiResClockMs();
checkIfMemoryMappedBufferClosed();
long newOffset = Math.max(expectedNextOffset, logEndOffset());
File logFile = LogFileUtils.logFile(dir, newOffset, "");
LogSegment activeSegment = segments.activeSegment();
if (segments.contains(newOffset)) {
// segment with the same base offset already exists and loaded
if (activeSegment.baseOffset() == newOffset && activeSegment.size() == 0) {
// We have seen this happen (see KAFKA-6388) after shouldRoll() returns true for an
// active segment of size zero because of one of the indexes is "full" (due to _maxEntries == 0).
logger.warn("Trying to roll a new log segment with start offset {}=max(provided offset = {}, LEO = {}) " +
"while it already exists and is active with size 0. Size of time index: {}, size of offset index: {}.",
newOffset, expectedNextOffset, logEndOffset(), activeSegment.timeIndex().entries(), activeSegment.offsetIndex().entries());
LogSegment newSegment = createAndDeleteSegment(
newOffset,
activeSegment,
true,
toDelete -> logger.info("Deleting segments as part of log roll: {}", toDelete.stream()
.map(LogSegment::toString)
.collect(Collectors.joining(", "))));
updateLogEndOffset(nextOffsetMetadata.messageOffset);
logger.info("Rolled new log segment at offset {} in {} ms.", newOffset, time.hiResClockMs() - start);
return newSegment;
} else {
throw new KafkaException("Trying to roll a new log segment for topic partition " + topicPartition + " with start offset " + newOffset +
" =max(provided offset = " + expectedNextOffset + ", LEO = " + logEndOffset() + ") while it already exists. Existing " +
"segment is " + segments.get(newOffset) + ".");
}
} else if (!segments.isEmpty() && newOffset < activeSegment.baseOffset()) {
throw new KafkaException(
"Trying to roll a new log segment for topic partition " + topicPartition + " with " +
"start offset " + newOffset + " =max(provided offset = " + expectedNextOffset + ", LEO = " + logEndOffset() + ") lower than start offset of the active segment " + activeSegment);
} else {
File offsetIdxFile = LogFileUtils.offsetIndexFile(dir, newOffset);
File timeIdxFile = LogFileUtils.timeIndexFile(dir, newOffset);
File txnIdxFile = LogFileUtils.transactionIndexFile(dir, newOffset);
for (File file : Arrays.asList(logFile, offsetIdxFile, timeIdxFile, txnIdxFile)) {
if (file.exists()) {
logger.warn("Newly rolled segment file {} already exists; deleting it first", file.getAbsolutePath());
Files.delete(file.toPath());
}
}
if (segments.lastSegment().isPresent()) {
segments.lastSegment().get().onBecomeInactiveSegment();
}
}
LogSegment newSegment = LogSegment.open(dir,
newOffset,
config,
time,
config.initFileSize(),
config.preallocate);
segments.add(newSegment);
// We need to update the segment base offset and append position data of the metadata when log rolls.
// The next offset should not change.
updateLogEndOffset(nextOffsetMetadata.messageOffset);
logger.info("Rolled new log segment at offset {} in {} ms.", newOffset, time.hiResClockMs() - start);
return newSegment;
}
);
}
/**
* Delete all data in the local log and start at the new offset.
*
* @param newOffset The new offset to start the log with
* @return the list of segments that were scheduled for deletion
*/
public List<LogSegment> truncateFullyAndStartAt(long newOffset) {
return maybeHandleIOException(
() -> "Error while truncating the entire log for " + topicPartition + " in dir " + dir.getParent(),
() -> {
logger.debug("Truncate and start at offset {}", newOffset);
checkIfMemoryMappedBufferClosed();
List<LogSegment> segmentsToDelete = new ArrayList<>(segments.values());
if (!segmentsToDelete.isEmpty()) {
removeAndDeleteSegments(segmentsToDelete.subList(0, segmentsToDelete.size() - 1), true, new LogTruncation(logger));
// Use createAndDeleteSegment() to create new segment first and then delete the old last segment to prevent missing
// active segment during the deletion process
createAndDeleteSegment(newOffset, segmentsToDelete.get(segmentsToDelete.size() - 1), true, new LogTruncation(logger));
}
updateLogEndOffset(newOffset);
return segmentsToDelete;
}
);
}
/**
* Truncate this log so that it ends with the greatest offset < targetOffset.
*
* @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete.
* @return the list of segments that were scheduled for deletion
*/
public Collection<LogSegment> truncateTo(long targetOffset) throws IOException {
Collection<LogSegment> deletableSegments = segments.filter(segment -> segment.baseOffset() > targetOffset);
removeAndDeleteSegments(deletableSegments, true, new LogTruncation(logger));
segments.activeSegment().truncateTo(targetOffset);
updateLogEndOffset(targetOffset);
return deletableSegments;
}
/**
* Return a directory name to rename the log directory to for async deletion.
* The name will be in the following format: "topic-partitionId.uniqueId-delete".
* If the topic name is too long, it will be truncated to prevent the total name
* from exceeding 255 characters.
*/
public static String logDeleteDirName(TopicPartition topicPartition) {
return logDirNameWithSuffixCappedLength(topicPartition, LogFileUtils.DELETE_DIR_SUFFIX);
}
/**
* Return a directory name to rename the log directory to for stray partition deletion.
* The name will be in the following format: "topic-partitionId.uniqueId-stray".
* If the topic name is too long, it will be truncated to prevent the total name
* from exceeding 255 characters.
*/
public static String logStrayDirName(TopicPartition topicPartition) {
return logDirNameWithSuffixCappedLength(topicPartition, LogFileUtils.STRAY_DIR_SUFFIX);
}
/**
* Return a future directory name for the given topic partition. The name will be in the following
* format: topic-partition.uniqueId-future where topic, partition and uniqueId are variables.
*/
public static String logFutureDirName(TopicPartition topicPartition) {
return logDirNameWithSuffix(topicPartition, LogFileUtils.FUTURE_DIR_SUFFIX);
}
/**
* Return a new directory name in the following format: "${topic}-${partitionId}.${uniqueId}${suffix}".
* If the topic name is too long, it will be truncated to prevent the total name
* from exceeding 255 characters.
*/
private static String logDirNameWithSuffixCappedLength(TopicPartition topicPartition, String suffix) {
String uniqueId = UUID.randomUUID().toString().replaceAll("-", "");
String fullSuffix = "-" + topicPartition.partition() + "." + uniqueId + suffix;
int prefixLength = Math.min(topicPartition.topic().length(), 255 - fullSuffix.length());
return topicPartition.topic().substring(0, prefixLength) + fullSuffix;
}
private static String logDirNameWithSuffix(TopicPartition topicPartition, String suffix) {
String uniqueId = UUID.randomUUID().toString().replaceAll("-", "");
return logDirName(topicPartition) + "." + uniqueId + suffix;
}
/**
* Return a directory name for the given topic partition. The name will be in the following
* format: topic-partition where topic, partition are variables.
*/
public static String logDirName(TopicPartition topicPartition) {
return topicPartition.topic() + "-" + topicPartition.partition();
}
private static KafkaException exception(File dir) throws IOException {
return new KafkaException("Found directory " + dir.getCanonicalPath() + ", '" + dir.getName() + "' is not in the form of " +
"topic-partition or topic-partition.uniqueId-delete (if marked for deletion).\n" +
"Kafka's log directories (and children) should only contain Kafka topic data.");
}
/**
* Parse the topic and partition out of the directory name of a log
*/
public static TopicPartition parseTopicPartitionName(File dir) throws IOException {
if (dir == null) {
throw new KafkaException("dir should not be null");
}
String dirName = dir.getName();
if (dirName.isEmpty() || !dirName.contains("-")) {
throw exception(dir);
}
if (dirName.endsWith(DELETE_DIR_SUFFIX) && !DELETE_DIR_PATTERN.matcher(dirName).matches() ||
dirName.endsWith(FUTURE_DIR_SUFFIX) && !FUTURE_DIR_PATTERN.matcher(dirName).matches() ||
dirName.endsWith(STRAY_DIR_SUFFIX) && !STRAY_DIR_PATTERN.matcher(dirName).matches()) {
throw exception(dir);
}
String name = (dirName.endsWith(DELETE_DIR_SUFFIX) || dirName.endsWith(FUTURE_DIR_SUFFIX) || dirName.endsWith(STRAY_DIR_SUFFIX))
? dirName.substring(0, dirName.lastIndexOf('.'))
: dirName;
int index = name.lastIndexOf('-');
String topic = name.substring(0, index);
String partitionString = name.substring(index + 1);
if (topic.isEmpty() || partitionString.isEmpty()) {
throw exception(dir);
}
try {
return new TopicPartition(topic, Integer.parseInt(partitionString));
} catch (NumberFormatException nfe) {
throw exception(dir);
}
}
/**
* Wraps the value of iterator.next() in an Optional instance.
*
* @param iterator given iterator to iterate over
* @return if a next element exists, Optional#empty otherwise.
* @param <T> the type of object held within the iterator
*/
public static <T> Optional<T> nextItem(Iterator<T> iterator) {
return iterator.hasNext() ? Optional.of(iterator.next()) : Optional.empty();
}
private static FetchDataInfo emptyFetchDataInfo(LogOffsetMetadata fetchOffsetMetadata, boolean includeAbortedTxns) {
Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions = includeAbortedTxns
? Optional.of(Collections.emptyList())
: Optional.empty();
return new FetchDataInfo(fetchOffsetMetadata, MemoryRecords.EMPTY, false, abortedTransactions);
}
/** /**
* Invokes the provided function and handles any IOException raised by the function by marking the * Invokes the provided function and handles any IOException raised by the function by marking the
* provided directory offline. * provided directory offline.
@ -171,19 +926,16 @@ public class LocalLog {
try { try {
int position = 0; int position = 0;
FileRecords sourceRecords = segment.log(); FileRecords sourceRecords = segment.log();
while (position < sourceRecords.sizeInBytes()) { while (position < sourceRecords.sizeInBytes()) {
FileLogInputStream.FileChannelRecordBatch firstBatch = sourceRecords.batchesFrom(position).iterator().next(); FileLogInputStream.FileChannelRecordBatch firstBatch = sourceRecords.batchesFrom(position).iterator().next();
LogSegment newSegment = createNewCleanedSegment(dir, config, firstBatch.baseOffset()); LogSegment newSegment = createNewCleanedSegment(dir, config, firstBatch.baseOffset());
newSegments.add(newSegment); newSegments.add(newSegment);
int bytesAppended = newSegment.appendFromFile(sourceRecords, position); int bytesAppended = newSegment.appendFromFile(sourceRecords, position);
if (bytesAppended == 0) if (bytesAppended == 0) {
throw new IllegalStateException("Failed to append records from position " + position + " in " + segment); throw new IllegalStateException("Failed to append records from position " + position + " in " + segment);
}
position += bytesAppended; position += bytesAppended;
} }
// prepare new segments // prepare new segments
int totalSizeOfNewSegments = 0; int totalSizeOfNewSegments = 0;
for (LogSegment splitSegment : newSegments) { for (LogSegment splitSegment : newSegments) {
@ -198,7 +950,7 @@ public class LocalLog {
} }
// replace old segment with new ones // replace old segment with new ones
LOG.info("{}Replacing overflowed segment $segment with split segments {}", logPrefix, newSegments); LOG.info("{}Replacing overflowed segment $segment with split segments {}", logPrefix, newSegments);
List<LogSegment> deletedSegments = replaceSegments(existingSegments, newSegments, Collections.singletonList(segment), List<LogSegment> deletedSegments = replaceSegments(existingSegments, newSegments, singletonList(segment),
dir, topicPartition, config, scheduler, logDirFailureChannel, logPrefix, false); dir, topicPartition, config, scheduler, logDirFailureChannel, logPrefix, false);
return new SplitSegmentResult(deletedSegments, newSegments); return new SplitSegmentResult(deletedSegments, newSegments);
} catch (Exception e) { } catch (Exception e) {
@ -269,7 +1021,7 @@ public class LocalLog {
.sorted(Comparator.comparingLong(LogSegment::baseOffset)) .sorted(Comparator.comparingLong(LogSegment::baseOffset))
.collect(Collectors.toList()); .collect(Collectors.toList());
// need to do this in two phases to be crash safe AND do the delete asynchronously // need to do this in two phases to be crash safe AND do the deletion asynchronously
// if we crash in the middle of this we complete the swap in loadSegments() // if we crash in the middle of this we complete the swap in loadSegments()
List<LogSegment> reversedSegmentsList = new ArrayList<>(sortedNewSegments); List<LogSegment> reversedSegmentsList = new ArrayList<>(sortedNewSegments);
Collections.reverse(reversedSegmentsList); Collections.reverse(reversedSegmentsList);
@ -290,7 +1042,7 @@ public class LocalLog {
existingSegments.remove(segment.baseOffset()); existingSegments.remove(segment.baseOffset());
} }
deleteSegmentFiles( deleteSegmentFiles(
Collections.singletonList(segment), singletonList(segment),
true, true,
dir, dir,
topicPartition, topicPartition,

View File

@ -60,6 +60,12 @@ public final class LogFileUtils {
/** Suffix of a directory that is scheduled to be deleted */ /** Suffix of a directory that is scheduled to be deleted */
public static final String DELETE_DIR_SUFFIX = "-delete"; public static final String DELETE_DIR_SUFFIX = "-delete";
/** Suffix of a directory that is used for future partition */
public static final String FUTURE_DIR_SUFFIX = "-future";
/** Suffix of a directory that is used for stray partition */
public static final String STRAY_DIR_SUFFIX = "-stray";
private LogFileUtils() { private LogFileUtils() {
} }

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.log;
import org.slf4j.Logger;
import java.util.List;
import java.util.stream.Collectors;
public class LogTruncation implements SegmentDeletionReason {
private final Logger logger;
public LogTruncation(Logger logger) {
this.logger = logger;
}
@Override
public void logReason(List<LogSegment> toDelete) {
logger.info("Deleting segments as part of log truncation: {}", toDelete.stream()
.map(LogSegment::toString)
.collect(Collectors.joining(", ")));
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.log;
import java.util.List;
public interface SegmentDeletionReason {
void logReason(List<LogSegment> toDelete);
}