diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 492422cd564..78ab9c097d8 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -344,7 +344,7 @@
+ files="(LocalLog|LogLoader|LogValidator|RemoteLogManager|RemoteIndexCache).java"/>
diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala
deleted file mode 100644
index 09aaed13d68..00000000000
--- a/core/src/main/scala/kafka/log/LocalLog.scala
+++ /dev/null
@@ -1,720 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import kafka.utils.Logging
-import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeException}
-import org.apache.kafka.common.message.FetchResponseData
-import org.apache.kafka.common.record.MemoryRecords
-import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.server.util.Scheduler
-import org.apache.kafka.storage.internals.log.{AbortedTxn, FetchDataInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogSegment, LogSegments, OffsetPosition, LocalLog => JLocalLog}
-
-import java.io.File
-import java.nio.file.Files
-import java.util
-import java.util.Collections.singletonList
-import java.util.concurrent.atomic.AtomicLong
-import java.util.regex.Pattern
-import java.util.{Collections, Optional}
-import scala.collection.mutable.ListBuffer
-import scala.collection.Seq
-import scala.jdk.CollectionConverters._
-import scala.jdk.OptionConverters.RichOptional
-
-/**
- * An append-only log for storing messages locally. The log is a sequence of LogSegments, each with a base offset.
- * New log segments are created according to a configurable policy that controls the size in bytes or time interval
- * for a given segment.
- *
- * NOTE: this class is not thread-safe, and it relies on the thread safety provided by the Log class.
- *
- * @param _dir The directory in which log segments are created.
- * @param config The log configuration settings
- * @param segments The non-empty log segments recovered from disk
- * @param recoveryPoint The offset at which to begin the next recovery i.e. the first offset which has not been flushed to disk
- * @param nextOffsetMetadata The offset where the next message could be appended
- * @param scheduler The thread pool scheduler used for background actions
- * @param time The time instance used for checking the clock
- * @param topicPartition The topic partition associated with this log
- * @param logDirFailureChannel The LogDirFailureChannel instance to asynchronously handle Log dir failure
- */
-class LocalLog(@volatile private var _dir: File,
- @volatile private[log] var config: LogConfig,
- private[log] val segments: LogSegments,
- @volatile private[log] var recoveryPoint: Long,
- @volatile private var nextOffsetMetadata: LogOffsetMetadata,
- private[log] val scheduler: Scheduler,
- private[log] val time: Time,
- private[log] val topicPartition: TopicPartition,
- private[log] val logDirFailureChannel: LogDirFailureChannel) extends Logging {
-
- import kafka.log.LocalLog._
-
- this.logIdent = s"[LocalLog partition=$topicPartition, dir=${dir.getParent}] "
-
- // The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers()
- // After memory mapped buffer is closed, no disk IO operation should be performed for this log.
- @volatile private[log] var isMemoryMappedBufferClosed = false
-
- // Cache value of parent directory to avoid allocations in hot paths like ReplicaManager.checkpointHighWatermarks
- @volatile private var _parentDir: String = dir.getParent
-
- // Last time the log was flushed
- private val lastFlushedTime = new AtomicLong(time.milliseconds)
-
- private[log] def dir: File = _dir
-
- private[log] def name: String = dir.getName
-
- private[log] def parentDir: String = _parentDir
-
- private[log] def parentDirFile: File = new File(_parentDir)
-
- private[log] def isFuture: Boolean = dir.getName.endsWith(LocalLog.FutureDirSuffix)
-
- private def maybeHandleIOException[T](msg: => String)(fun: => T): T = {
- JLocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, () => msg, () => fun)
- }
-
- /**
- * Rename the directory of the log
- * @param name the new dir name
- * @throws KafkaStorageException if rename fails
- */
- private[log] def renameDir(name: String): Boolean = {
- maybeHandleIOException(s"Error while renaming dir for $topicPartition in log dir ${dir.getParent}") {
- val renamedDir = new File(dir.getParent, name)
- Utils.atomicMoveWithFallback(dir.toPath, renamedDir.toPath)
- if (renamedDir != dir) {
- _dir = renamedDir
- _parentDir = renamedDir.getParent
- segments.updateParentDir(renamedDir)
- true
- } else {
- false
- }
- }
- }
-
- /**
- * Update the existing configuration to the new provided configuration.
- * @param newConfig the new configuration to be updated to
- */
- private[log] def updateConfig(newConfig: LogConfig): Unit = {
- val oldConfig = config
- config = newConfig
- val oldRecordVersion = oldConfig.recordVersion
- val newRecordVersion = newConfig.recordVersion
- if (newRecordVersion.precedes(oldRecordVersion))
- warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.")
- }
-
- private[log] def checkIfMemoryMappedBufferClosed(): Unit = {
- if (isMemoryMappedBufferClosed)
- throw new KafkaStorageException(s"The memory mapped buffer for log of $topicPartition is already closed")
- }
-
- private[log] def updateRecoveryPoint(newRecoveryPoint: Long): Unit = {
- recoveryPoint = newRecoveryPoint
- }
-
- /**
- * Update recoveryPoint to provided offset and mark the log as flushed, if the offset is greater
- * than the existing recoveryPoint.
- *
- * @param offset the offset to be updated
- */
- private[log] def markFlushed(offset: Long): Unit = {
- checkIfMemoryMappedBufferClosed()
- if (offset > recoveryPoint) {
- updateRecoveryPoint(offset)
- lastFlushedTime.set(time.milliseconds)
- }
- }
-
- /**
- * The number of messages appended to the log since the last flush
- */
- private[log] def unflushedMessages: Long = logEndOffset - recoveryPoint
-
- /**
- * Flush local log segments for all offsets up to offset-1.
- * Does not update the recovery point.
- *
- * @param offset The offset to flush up to (non-inclusive)
- */
- private[log] def flush(offset: Long): Unit = {
- val currentRecoveryPoint = recoveryPoint
- if (currentRecoveryPoint <= offset) {
- val segmentsToFlush = segments.values(currentRecoveryPoint, offset)
- segmentsToFlush.forEach(_.flush())
- // If there are any new segments, we need to flush the parent directory for crash consistency.
- if (segmentsToFlush.stream().anyMatch(_.baseOffset >= currentRecoveryPoint)) {
- // The directory might be renamed concurrently for topic deletion, which may cause NoSuchFileException here.
- // Since the directory is to be deleted anyways, we just swallow NoSuchFileException and let it go.
- Utils.flushDirIfExists(dir.toPath)
- }
- }
- }
-
- /**
- * The time this log is last known to have been fully flushed to disk
- */
- private[log] def lastFlushTime: Long = lastFlushedTime.get
-
- /**
- * The offset metadata of the next message that will be appended to the log
- */
- private[log] def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
-
- /**
- * The offset of the next message that will be appended to the log
- */
- private[log] def logEndOffset: Long = nextOffsetMetadata.messageOffset
-
- /**
- * Update end offset of the log, and update the recoveryPoint.
- *
- * @param endOffset the new end offset of the log
- */
- private[log] def updateLogEndOffset(endOffset: Long): Unit = {
- nextOffsetMetadata = new LogOffsetMetadata(endOffset, segments.activeSegment.baseOffset, segments.activeSegment.size)
- if (recoveryPoint > endOffset) {
- updateRecoveryPoint(endOffset)
- }
- }
-
- /**
- * Close file handlers used by log but don't write to disk.
- * This is called if the log directory is offline.
- */
- private[log] def closeHandlers(): Unit = {
- segments.closeHandlers()
- isMemoryMappedBufferClosed = true
- }
-
- /**
- * Closes the segments of the log.
- */
- private[log] def close(): Unit = {
- maybeHandleIOException(s"Error while renaming dir for $topicPartition in dir ${dir.getParent}") {
- checkIfMemoryMappedBufferClosed()
- segments.close()
- }
- }
-
- /**
- * Completely delete this log directory with no delay.
- */
- private[log] def deleteEmptyDir(): Unit = {
- maybeHandleIOException(s"Error while deleting dir for $topicPartition in dir ${dir.getParent}") {
- if (segments.nonEmpty) {
- throw new IllegalStateException(s"Can not delete directory when ${segments.numberOfSegments} segments are still present")
- }
- if (!isMemoryMappedBufferClosed) {
- throw new IllegalStateException(s"Can not delete directory when memory mapped buffer for log of $topicPartition is still open.")
- }
- Utils.delete(dir)
- }
- }
-
- /**
- * Completely delete all segments with no delay.
- * @return the deleted segments
- */
- private[log] def deleteAllSegments(): util.List[LogSegment] = {
- maybeHandleIOException(s"Error while deleting all segments for $topicPartition in dir ${dir.getParent}") {
- val deletableSegments = new util.ArrayList(segments.values)
- removeAndDeleteSegments(segments.values.asScala, asyncDelete = false, LogDeletion(this))
- isMemoryMappedBufferClosed = true
- deletableSegments
- }
- }
-
- /**
- * This method deletes the given log segments by doing the following for each of them:
- * - It removes the segment from the segment map so that it will no longer be used for reads.
- * - It renames the index and log files by appending .deleted to the respective file name
- * - It can either schedule an asynchronous delete operation to occur in the future or perform the deletion synchronously
- *
- * Asynchronous deletion allows reads to happen concurrently without synchronization and without the possibility of
- * physically deleting a file while it is being read.
- *
- * This method does not convert IOException to KafkaStorageException, the immediate caller
- * is expected to catch and handle IOException.
- *
- * @param segmentsToDelete The log segments to schedule for deletion
- * @param asyncDelete Whether the segment files should be deleted asynchronously
- * @param reason The reason for the segment deletion
- */
- private[log] def removeAndDeleteSegments(segmentsToDelete: Iterable[LogSegment],
- asyncDelete: Boolean,
- reason: SegmentDeletionReason): Unit = {
- if (segmentsToDelete.nonEmpty) {
- // Most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by
- // removing the deleted segment, we should force materialization of the iterator here, so that results of the
- // iteration remain valid and deterministic. We should also pass only the materialized view of the
- // iterator to the logic that actually deletes the segments.
- val toDelete = segmentsToDelete.toList
- reason.logReason(toDelete)
- toDelete.foreach { segment =>
- segments.remove(segment.baseOffset)
- }
- JLocalLog.deleteSegmentFiles(toDelete.asJava, asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent)
- }
- }
-
- /**
- * This method deletes the given segment and creates a new segment with the given new base offset. It ensures an
- * active segment exists in the log at all times during this process.
- *
- * Asynchronous deletion allows reads to happen concurrently without synchronization and without the possibility of
- * physically deleting a file while it is being read.
- *
- * This method does not convert IOException to KafkaStorageException, the immediate caller
- * is expected to catch and handle IOException.
- *
- * @param newOffset The base offset of the new segment
- * @param segmentToDelete The old active segment to schedule for deletion
- * @param asyncDelete Whether the segment files should be deleted asynchronously
- * @param reason The reason for the segment deletion
- */
- private[log] def createAndDeleteSegment(newOffset: Long,
- segmentToDelete: LogSegment,
- asyncDelete: Boolean,
- reason: SegmentDeletionReason): LogSegment = {
- if (newOffset == segmentToDelete.baseOffset)
- segmentToDelete.changeFileSuffixes("", LogFileUtils.DELETED_FILE_SUFFIX)
-
- val newSegment = LogSegment.open(dir,
- newOffset,
- config,
- time,
- config.initFileSize,
- config.preallocate)
- segments.add(newSegment)
-
- reason.logReason(List(segmentToDelete))
- if (newOffset != segmentToDelete.baseOffset)
- segments.remove(segmentToDelete.baseOffset)
- JLocalLog.deleteSegmentFiles(singletonList(segmentToDelete), asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent)
-
- newSegment
- }
-
- /**
- * Given a message offset, find its corresponding offset metadata in the log.
- * If the message offset is out of range, throw an OffsetOutOfRangeException
- */
- private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = {
- val fetchDataInfo = read(offset,
- maxLength = 1,
- minOneMessage = false,
- maxOffsetMetadata = nextOffsetMetadata,
- includeAbortedTxns = false)
- fetchDataInfo.fetchOffsetMetadata
- }
-
- /**
- * Read messages from the log.
- *
- * @param startOffset The offset to begin reading at
- * @param maxLength The maximum number of bytes to read
- * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists)
- * @param maxOffsetMetadata The metadata of the maximum offset to be fetched
- * @param includeAbortedTxns If true, aborted transactions are included
- * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset
- * @return The fetch data information including fetch starting offset metadata and messages read.
- */
- def read(startOffset: Long,
- maxLength: Int,
- minOneMessage: Boolean,
- maxOffsetMetadata: LogOffsetMetadata,
- includeAbortedTxns: Boolean): FetchDataInfo = {
- maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {
- trace(s"Reading maximum $maxLength bytes at offset $startOffset from log with " +
- s"total length ${segments.sizeInBytes} bytes")
-
- val endOffsetMetadata = nextOffsetMetadata
- val endOffset = endOffsetMetadata.messageOffset
- var segmentOpt = segments.floorSegment(startOffset)
-
- // return error on attempt to read beyond the log end offset
- if (startOffset > endOffset || !segmentOpt.isPresent)
- throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
- s"but we only have log segments upto $endOffset.")
-
- if (startOffset == maxOffsetMetadata.messageOffset)
- emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
- else if (startOffset > maxOffsetMetadata.messageOffset)
- emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns)
- else {
- // Do the read on the segment with a base offset less than the target offset
- // but if that segment doesn't contain any messages with an offset greater than that
- // continue to read from successive segments until we get some messages or we reach the end of the log
- var fetchDataInfo: FetchDataInfo = null
- while (fetchDataInfo == null && segmentOpt.isPresent) {
- val segment = segmentOpt.get
- val baseOffset = segment.baseOffset
-
- // 1. If `maxOffsetMetadata#segmentBaseOffset < segment#baseOffset`, then return maxPosition as empty.
- // 2. Use the max-offset position if it is on this segment; otherwise, the segment size is the limit.
- // 3. When maxOffsetMetadata is message-offset-only, then we don't know the relativePositionInSegment so
- // return maxPosition as empty to avoid reading beyond the max-offset
- val maxPositionOpt: Optional[java.lang.Long] =
- if (segment.baseOffset < maxOffsetMetadata.segmentBaseOffset)
- Optional.of(segment.size)
- else if (segment.baseOffset == maxOffsetMetadata.segmentBaseOffset && !maxOffsetMetadata.messageOffsetOnly())
- Optional.of(maxOffsetMetadata.relativePositionInSegment)
- else
- Optional.empty()
-
- fetchDataInfo = segment.read(startOffset, maxLength, maxPositionOpt, minOneMessage)
- if (fetchDataInfo != null) {
- if (includeAbortedTxns)
- fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo)
- } else segmentOpt = segments.higherSegment(baseOffset)
- }
-
- if (fetchDataInfo != null) fetchDataInfo
- else {
- // okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
- // this can happen when all messages with offset larger than start offsets have been deleted.
- // In this case, we will return the empty set with log end offset metadata
- new FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
- }
- }
- }
- }
-
- private[log] def append(lastOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = {
- segments.activeSegment.append(lastOffset, largestTimestamp, shallowOffsetOfMaxTimestamp, records)
- updateLogEndOffset(lastOffset + 1)
- }
-
- private def addAbortedTransactions(startOffset: Long, segment: LogSegment,
- fetchInfo: FetchDataInfo): FetchDataInfo = {
- val fetchSize = fetchInfo.records.sizeInBytes
- val startOffsetPosition = new OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
- fetchInfo.fetchOffsetMetadata.relativePositionInSegment)
- val upperBoundOffset = segment.fetchUpperBoundOffset(startOffsetPosition, fetchSize).orElse(
- segments.higherSegment(segment.baseOffset).toScala.map(s => s.baseOffset).getOrElse(logEndOffset))
-
- val abortedTransactions = ListBuffer.empty[FetchResponseData.AbortedTransaction]
- def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction)
- collectAbortedTransactions(startOffset, upperBoundOffset, segment, accumulator)
-
- new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
- fetchInfo.records,
- fetchInfo.firstEntryIncomplete,
- Optional.of(abortedTransactions.toList.asJava))
- }
-
- private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long,
- startingSegment: LogSegment,
- accumulator: Seq[AbortedTxn] => Unit): Unit = {
- val higherSegments = segments.higherSegments(startingSegment.baseOffset).iterator
- var segmentEntryOpt = Option(startingSegment)
- while (segmentEntryOpt.isDefined) {
- val segment = segmentEntryOpt.get
- val searchResult = segment.collectAbortedTxns(startOffset, upperBoundOffset)
- accumulator(searchResult.abortedTransactions.asScala)
- if (searchResult.isComplete)
- return
- segmentEntryOpt = nextOption(higherSegments)
- }
- }
-
- private[log] def collectAbortedTransactions(logStartOffset: Long, baseOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = {
- val segmentEntry = segments.floorSegment(baseOffset)
- val allAbortedTxns = ListBuffer.empty[AbortedTxn]
- def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = allAbortedTxns ++= abortedTxns
- segmentEntry.ifPresent(segment => collectAbortedTransactions(logStartOffset, upperBoundOffset, segment, accumulator))
- allAbortedTxns.toList
- }
-
- /**
- * Roll the log over to a new active segment starting with the current logEndOffset.
- * This will trim the index to the exact size of the number of entries it currently contains.
- *
- * @param expectedNextOffset The expected next offset after the segment is rolled
- *
- * @return The newly rolled segment
- */
- private[log] def roll(expectedNextOffset: Option[Long] = None): LogSegment = {
- maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") {
- val start = time.hiResClockMs()
- checkIfMemoryMappedBufferClosed()
- val newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset)
- val logFile = LogFileUtils.logFile(dir, newOffset, "")
- val activeSegment = segments.activeSegment
- if (segments.contains(newOffset)) {
- // segment with the same base offset already exists and loaded
- if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) {
- // We have seen this happen (see KAFKA-6388) after shouldRoll() returns true for an
- // active segment of size zero because of one of the indexes is "full" (due to _maxEntries == 0).
- warn(s"Trying to roll a new log segment with start offset $newOffset " +
- s"=max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already " +
- s"exists and is active with size 0. Size of time index: ${activeSegment.timeIndex.entries}," +
- s" size of offset index: ${activeSegment.offsetIndex.entries}.")
- val newSegment = createAndDeleteSegment(newOffset, activeSegment, asyncDelete = true, LogRoll(this))
- updateLogEndOffset(nextOffsetMetadata.messageOffset)
- info(s"Rolled new log segment at offset $newOffset in ${time.hiResClockMs() - start} ms.")
- return newSegment
- } else {
- throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with start offset $newOffset" +
- s" =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " +
- s"segment is ${segments.get(newOffset)}.")
- }
- } else if (segments.nonEmpty && newOffset < activeSegment.baseOffset) {
- throw new KafkaException(
- s"Trying to roll a new log segment for topic partition $topicPartition with " +
- s"start offset $newOffset =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active segment $activeSegment")
- } else {
- val offsetIdxFile = LogFileUtils.offsetIndexFile(dir, newOffset)
- val timeIdxFile = LogFileUtils.timeIndexFile(dir, newOffset)
- val txnIdxFile = LogFileUtils.transactionIndexFile(dir, newOffset)
-
- for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) {
- warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first")
- Files.delete(file.toPath)
- }
-
- segments.lastSegment.ifPresent(_.onBecomeInactiveSegment())
- }
-
- val newSegment = LogSegment.open(dir,
- newOffset,
- config,
- time,
- config.initFileSize,
- config.preallocate)
- segments.add(newSegment)
-
- // We need to update the segment base offset and append position data of the metadata when log rolls.
- // The next offset should not change.
- updateLogEndOffset(nextOffsetMetadata.messageOffset)
-
- info(s"Rolled new log segment at offset $newOffset in ${time.hiResClockMs() - start} ms.")
-
- newSegment
- }
- }
-
- /**
- * Delete all data in the local log and start at the new offset.
- *
- * @param newOffset The new offset to start the log with
- * @return the list of segments that were scheduled for deletion
- */
- private[log] def truncateFullyAndStartAt(newOffset: Long): Iterable[LogSegment] = {
- maybeHandleIOException(s"Error while truncating the entire log for $topicPartition in dir ${dir.getParent}") {
- debug(s"Truncate and start at offset $newOffset")
- checkIfMemoryMappedBufferClosed()
- val segmentsToDelete = new util.ArrayList(segments.values).asScala
-
- if (segmentsToDelete.nonEmpty) {
- removeAndDeleteSegments(segmentsToDelete.dropRight(1), asyncDelete = true, LogTruncation(this))
- // Use createAndDeleteSegment() to create new segment first and then delete the old last segment to prevent missing
- // active segment during the deletion process
- createAndDeleteSegment(newOffset, segmentsToDelete.last, asyncDelete = true, LogTruncation(this))
- }
-
- updateLogEndOffset(newOffset)
-
- segmentsToDelete
- }
- }
-
- /**
- * Truncate this log so that it ends with the greatest offset < targetOffset.
- *
- * @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete.
- * @return the list of segments that were scheduled for deletion
- */
- private[log] def truncateTo(targetOffset: Long): Iterable[LogSegment] = {
- val deletableSegments = segments.filter(segment => segment.baseOffset > targetOffset).asScala
- removeAndDeleteSegments(deletableSegments, asyncDelete = true, LogTruncation(this))
- segments.activeSegment.truncateTo(targetOffset)
- updateLogEndOffset(targetOffset)
- deletableSegments
- }
-}
-
-/**
- * Helper functions for logs
- */
-object LocalLog extends Logging {
-
- /** a directory that is scheduled to be deleted */
- private[log] val DeleteDirSuffix = LogFileUtils.DELETE_DIR_SUFFIX
-
- /** a directory that is used for future partition */
- private[log] val FutureDirSuffix = "-future"
-
- /** a directory that is used for stray partition */
- private[log] val StrayDirSuffix = "-stray"
-
- private[log] val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix")
- private[log] val FutureDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$FutureDirSuffix")
- private[log] val StrayDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$StrayDirSuffix")
-
- private[log] val UnknownOffset = -1L
-
- /**
- * Return a directory name to rename the log directory to for async deletion.
- * The name will be in the following format: "topic-partitionId.uniqueId-delete".
- * If the topic name is too long, it will be truncated to prevent the total name
- * from exceeding 255 characters.
- */
- private[log] def logDeleteDirName(topicPartition: TopicPartition): String = {
- logDirNameWithSuffixCappedLength(topicPartition, DeleteDirSuffix)
- }
-
- /**
- * Return a directory name to rename the log directory to for stray partition deletion.
- * The name will be in the following format: "topic-partitionId.uniqueId-stray".
- * If the topic name is too long, it will be truncated to prevent the total name
- * from exceeding 255 characters.
- */
- private[log] def logStrayDirName(topicPartition: TopicPartition): String = {
- logDirNameWithSuffixCappedLength(topicPartition, StrayDirSuffix)
- }
-
- /**
- * Return a future directory name for the given topic partition. The name will be in the following
- * format: topic-partition.uniqueId-future where topic, partition and uniqueId are variables.
- */
- private[log] def logFutureDirName(topicPartition: TopicPartition): String = {
- logDirNameWithSuffix(topicPartition, FutureDirSuffix)
- }
-
- /**
- * Return a new directory name in the following format: "${topic}-${partitionId}.${uniqueId}${suffix}".
- * If the topic name is too long, it will be truncated to prevent the total name
- * from exceeding 255 characters.
- */
- private[log] def logDirNameWithSuffixCappedLength(topicPartition: TopicPartition, suffix: String): String = {
- val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
- val fullSuffix = s"-${topicPartition.partition()}.$uniqueId$suffix"
- val prefixLength = Math.min(topicPartition.topic().length, 255 - fullSuffix.length)
- s"${topicPartition.topic().substring(0, prefixLength)}$fullSuffix"
- }
-
- private[log] def logDirNameWithSuffix(topicPartition: TopicPartition, suffix: String): String = {
- val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
- s"${logDirName(topicPartition)}.$uniqueId$suffix"
- }
-
- /**
- * Return a directory name for the given topic partition. The name will be in the following
- * format: topic-partition where topic, partition are variables.
- */
- private[log] def logDirName(topicPartition: TopicPartition): String = {
- s"${topicPartition.topic}-${topicPartition.partition}"
- }
-
- /**
- * Parse the topic and partition out of the directory name of a log
- */
- private[log] def parseTopicPartitionName(dir: File): TopicPartition = {
- if (dir == null)
- throw new KafkaException("dir should not be null")
-
- def exception(dir: File): KafkaException = {
- new KafkaException(s"Found directory ${dir.getCanonicalPath}, '${dir.getName}' is not in the form of " +
- "topic-partition or topic-partition.uniqueId-delete (if marked for deletion).\n" +
- "Kafka's log directories (and children) should only contain Kafka topic data.")
- }
-
- val dirName = dir.getName
- if (dirName == null || dirName.isEmpty || !dirName.contains('-'))
- throw exception(dir)
- if (dirName.endsWith(DeleteDirSuffix) && !DeleteDirPattern.matcher(dirName).matches ||
- dirName.endsWith(FutureDirSuffix) && !FutureDirPattern.matcher(dirName).matches ||
- dirName.endsWith(StrayDirSuffix) && !StrayDirPattern.matcher(dirName).matches)
- throw exception(dir)
-
- val name: String =
- if (dirName.endsWith(DeleteDirSuffix) || dirName.endsWith(FutureDirSuffix) || dirName.endsWith(StrayDirSuffix))
- dirName.substring(0, dirName.lastIndexOf('.'))
- else dirName
-
- val index = name.lastIndexOf('-')
- val topic = name.substring(0, index)
- val partitionString = name.substring(index + 1)
- if (topic.isEmpty || partitionString.isEmpty)
- throw exception(dir)
-
- val partition =
- try partitionString.toInt
- catch { case _: NumberFormatException => throw exception(dir) }
-
- new TopicPartition(topic, partition)
- }
-
- private[log] def emptyFetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata,
- includeAbortedTxns: Boolean): FetchDataInfo = {
- val abortedTransactions: Optional[java.util.List[FetchResponseData.AbortedTransaction]] =
- if (includeAbortedTxns) Optional.of(Collections.emptyList())
- else Optional.empty()
- new FetchDataInfo(fetchOffsetMetadata,
- MemoryRecords.EMPTY,
- false,
- abortedTransactions)
- }
-
- /**
- * Wraps the value of iterator.next() in an option.
- * Note: this facility is a part of the Iterator class starting from scala v2.13.
- *
- * @param iterator
- * @tparam T the type of object held within the iterator
- * @return Some(iterator.next) if a next element exists, None otherwise.
- */
- private[log] def nextOption[T](iterator: util.Iterator[T]): Option[T] = {
- if (iterator.hasNext)
- Some(iterator.next())
- else
- None
- }
-}
-
-trait SegmentDeletionReason {
- def logReason(toDelete: List[LogSegment]): Unit
-}
-
-case class LogTruncation(log: LocalLog) extends SegmentDeletionReason {
- override def logReason(toDelete: List[LogSegment]): Unit = {
- log.info(s"Deleting segments as part of log truncation: ${toDelete.mkString(",")}")
- }
-}
-
-case class LogRoll(log: LocalLog) extends SegmentDeletionReason {
- override def logReason(toDelete: List[LogSegment]): Unit = {
- log.info(s"Deleting segments as part of log roll: ${toDelete.mkString(",")}")
- }
-}
-
-case class LogDeletion(log: LocalLog) extends SegmentDeletionReason {
- override def logReason(toDelete: List[LogSegment]): Unit = {
- log.info(s"Deleting segments as the log has been deleted: ${toDelete.mkString(",")}")
- }
-}
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 4747b5b0ee9..f4ef718f94f 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -18,7 +18,6 @@
package kafka.log
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
-import kafka.log.LocalLog.nextOption
import kafka.log.remote.RemoteLogManager
import kafka.utils._
import org.apache.kafka.common.errors._
@@ -41,7 +40,7 @@ import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.LocalLog.SplitSegmentResult
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, VerificationGuard, LocalLog => JLocalLog, UnifiedLog => JUnifiedLog}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, SegmentDeletionReason, VerificationGuard, UnifiedLog => JUnifiedLog}
import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats}
import java.io.{File, IOException}
@@ -1246,7 +1245,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
private[log] def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = {
- localLog.collectAbortedTransactions(logStartOffset, startOffset, upperBoundOffset)
+ localLog.collectAbortedTransactions(logStartOffset, startOffset, upperBoundOffset).asScala.toList
}
/**
@@ -1559,7 +1558,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
val newLocalLogStartOffset = localLog.segments.higherSegment(segmentsToDelete.last.baseOffset()).get.baseOffset()
incrementStartOffset(newLocalLogStartOffset, LogStartOffsetIncrementReason.SegmentDeletion)
// remove the segments for lookups
- localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason)
+ localLog.removeAndDeleteSegments(segmentsToDelete.toList.asJava, true, reason)
}
deleteProducerSnapshots(deletable.toList.asJava, asyncDelete = true)
}
@@ -1713,7 +1712,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* @return The newly rolled segment
*/
def roll(expectedNextOffset: Option[Long] = None): LogSegment = lock synchronized {
- val newSegment = localLog.roll(expectedNextOffset)
+ val nextOffset : JLong = expectedNextOffset match {
+ case Some(offset) => offset
+ case None => 0L
+ }
+ val newSegment = localLog.roll(nextOffset)
// Take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot
// offset align with the new segment offset since this ensures we can recover the segment by beginning
// with the corresponding snapshot file and scanning the segment data. Because the segment base offset
@@ -1848,7 +1851,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
truncateFullyAndStartAt(targetOffset)
} else {
val deletedSegments = localLog.truncateTo(targetOffset)
- deleteProducerSnapshots(deletedSegments.toList.asJava, asyncDelete = true)
+ deleteProducerSnapshots(deletedSegments, asyncDelete = true)
leaderEpochCache.foreach(_.truncateFromEndAsyncFlush(targetOffset))
logStartOffset = math.min(targetOffset, logStartOffset)
rebuildProducerState(targetOffset, producerStateManager)
@@ -1962,7 +1965,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
private[log] def addSegment(segment: LogSegment): LogSegment = localLog.segments.add(segment)
private def maybeHandleIOException[T](msg: => String)(fun: => T): T = {
- JLocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, () => msg, () => fun)
+ LocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, () => msg, () => fun)
}
private[log] def splitOverflowedSegment(segment: LogSegment): List[LogSegment] = lock synchronized {
@@ -1971,7 +1974,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
result.newSegments.asScala.toList
}
- private[log] def deleteProducerSnapshots(segments: util.List[LogSegment], asyncDelete: Boolean): Unit = {
+ private[log] def deleteProducerSnapshots(segments: util.Collection[LogSegment], asyncDelete: Boolean): Unit = {
JUnifiedLog.deleteProducerSnapshots(segments, producerStateManager, asyncDelete, scheduler, config, logDirFailureChannel, parentDir, topicPartition)
}
}
@@ -1991,14 +1994,9 @@ object UnifiedLog extends Logging {
val DeleteDirSuffix: String = LogFileUtils.DELETE_DIR_SUFFIX
- val StrayDirSuffix: String = LocalLog.StrayDirSuffix
+ val StrayDirSuffix: String = LogFileUtils.STRAY_DIR_SUFFIX
- val FutureDirSuffix: String = LocalLog.FutureDirSuffix
-
- private[log] val DeleteDirPattern = LocalLog.DeleteDirPattern
- private[log] val FutureDirPattern = LocalLog.FutureDirPattern
-
- val UnknownOffset: Long = LocalLog.UnknownOffset
+ val UnknownOffset: Long = LocalLog.UNKNOWN_OFFSET
def isRemoteLogEnabled(remoteStorageSystemEnable: Boolean,
config: LogConfig,
@@ -2139,7 +2137,7 @@ object UnifiedLog extends Logging {
logDirFailureChannel: LogDirFailureChannel,
logPrefix: String,
isRecoveredSwapFile: Boolean = false): Iterable[LogSegment] = {
- JLocalLog.replaceSegments(existingSegments,
+ LocalLog.replaceSegments(existingSegments,
newSegments.asJava,
oldSegments.asJava,
dir,
@@ -2159,11 +2157,11 @@ object UnifiedLog extends Logging {
scheduler: Scheduler,
logDirFailureChannel: LogDirFailureChannel,
logPrefix: String): SplitSegmentResult = {
- JLocalLog.splitOverflowedSegment(segment, existingSegments, dir, topicPartition, config, scheduler, logDirFailureChannel, logPrefix)
+ LocalLog.splitOverflowedSegment(segment, existingSegments, dir, topicPartition, config, scheduler, logDirFailureChannel, logPrefix)
}
private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = {
- JLocalLog.createNewCleanedSegment(dir, logConfig, baseOffset)
+ LocalLog.createNewCleanedSegment(dir, logConfig, baseOffset)
}
// Visible for benchmarking
@@ -2194,6 +2192,21 @@ object UnifiedLog extends Logging {
if (remoteLogEnabledAndRemoteCopyEnabled) config.localRetentionBytes else config.retentionSize
}
+ /**
+ * Wraps the value of iterator.next() in an option.
+ * Note: this facility is a part of the Iterator class starting from scala v2.13.
+ *
+ * @param iterator the iterator
+ * @tparam T the type of object held within the iterator
+ * @return Some(iterator.next) if a next element exists, None otherwise.
+ */
+ private[log] def nextOption[T](iterator: util.Iterator[T]): Option[T] = {
+ if (iterator.hasNext)
+ Some(iterator.next())
+ else
+ None
+ }
+
}
object LogMetricNames {
@@ -2208,9 +2221,9 @@ object LogMetricNames {
}
case class RetentionMsBreach(log: UnifiedLog, remoteLogEnabledAndRemoteCopyEnabled: Boolean) extends SegmentDeletionReason {
- override def logReason(toDelete: List[LogSegment]): Unit = {
+ override def logReason(toDelete: util.List[LogSegment]): Unit = {
val retentionMs = UnifiedLog.localRetentionMs(log.config, remoteLogEnabledAndRemoteCopyEnabled)
- toDelete.foreach { segment =>
+ toDelete.forEach { segment =>
if (segment.largestRecordTimestamp.isPresent)
if (remoteLogEnabledAndRemoteCopyEnabled)
log.info(s"Deleting segment $segment due to local log retention time ${retentionMs}ms breach based on the largest " +
@@ -2231,9 +2244,9 @@ case class RetentionMsBreach(log: UnifiedLog, remoteLogEnabledAndRemoteCopyEnabl
}
case class RetentionSizeBreach(log: UnifiedLog, remoteLogEnabledAndRemoteCopyEnabled: Boolean) extends SegmentDeletionReason {
- override def logReason(toDelete: List[LogSegment]): Unit = {
+ override def logReason(toDelete: util.List[LogSegment]): Unit = {
var size = log.size
- toDelete.foreach { segment =>
+ toDelete.forEach { segment =>
size -= segment.size
if (remoteLogEnabledAndRemoteCopyEnabled) log.info(s"Deleting segment $segment due to local log retention size ${UnifiedLog.localRetentionSize(log.config, remoteLogEnabledAndRemoteCopyEnabled)} breach. " +
s"Local log size after deletion will be $size.")
@@ -2244,10 +2257,10 @@ case class RetentionSizeBreach(log: UnifiedLog, remoteLogEnabledAndRemoteCopyEna
}
case class StartOffsetBreach(log: UnifiedLog, remoteLogEnabled: Boolean) extends SegmentDeletionReason {
- override def logReason(toDelete: List[LogSegment]): Unit = {
+ override def logReason(toDelete: util.List[LogSegment]): Unit = {
if (remoteLogEnabled)
- log.info(s"Deleting segments due to local log start offset ${log.localLogStartOffset()} breach: ${toDelete.mkString(",")}")
+ log.info(s"Deleting segments due to local log start offset ${log.localLogStartOffset()} breach: ${toDelete.asScala.mkString(",")}")
else
- log.info(s"Deleting segments due to log start offset ${log.logStartOffset} breach: ${toDelete.mkString(",")}")
+ log.info(s"Deleting segments due to log start offset ${log.logStartOffset} breach: ${toDelete.asScala.mkString(",")}")
}
}
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 6d512c23853..179e098c347 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -41,7 +41,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 1ab842e4888..befc4d5fa12 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -63,7 +63,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, LogAppendInfo, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, LocalLog, LogAppendInfo, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
diff --git a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
index 775c86d0208..0b840ddde62 100644
--- a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.record.{MemoryRecords, Record, SimpleRecord}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.util.{MockTime, Scheduler}
-import org.apache.kafka.storage.internals.log.{FetchDataInfo, LocalLog => JLocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogSegment, LogSegments}
+import org.apache.kafka.storage.internals.log.{FetchDataInfo, LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogSegment, LogSegments, LogTruncation, SegmentDeletionReason}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -98,10 +98,10 @@ class LocalLogTest {
private def appendRecords(records: Iterable[SimpleRecord],
log: LocalLog = log,
initialOffset: Long = 0L): Unit = {
- log.append(lastOffset = initialOffset + records.size - 1,
- largestTimestamp = records.head.timestamp,
- shallowOffsetOfMaxTimestamp = initialOffset,
- records = MemoryRecords.withRecords(initialOffset, Compression.NONE, 0, records.toList : _*))
+ log.append(initialOffset + records.size - 1,
+ records.head.timestamp,
+ initialOffset,
+ MemoryRecords.withRecords(initialOffset, Compression.NONE, 0, records.toList : _*))
}
private def readRecords(log: LocalLog = log,
@@ -112,16 +112,16 @@ class LocalLogTest {
includeAbortedTxns: Boolean = false): FetchDataInfo = {
log.read(startOffset,
maxLength,
- minOneMessage = minOneMessage,
+ minOneMessage,
maxOffsetMetadata,
- includeAbortedTxns = includeAbortedTxns)
+ includeAbortedTxns)
}
@Test
def testLogDeleteSegmentsSuccess(): Unit = {
val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
appendRecords(List(record))
- log.roll()
+ log.roll(0L)
assertEquals(2, log.segments.numberOfSegments)
assertFalse(logDir.listFiles.isEmpty)
val segmentsBeforeDelete = new util.ArrayList(log.segments.values)
@@ -135,7 +135,7 @@ class LocalLogTest {
@Test
def testRollEmptyActiveSegment(): Unit = {
val oldActiveSegment = log.segments.activeSegment
- log.roll()
+ log.roll(0L)
assertEquals(1, log.segments.numberOfSegments)
assertNotEquals(oldActiveSegment, log.segments.activeSegment)
assertFalse(logDir.listFiles.isEmpty)
@@ -146,7 +146,7 @@ class LocalLogTest {
def testLogDeleteDirSuccessWhenEmptyAndFailureWhenNonEmpty(): Unit ={
val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
appendRecords(List(record))
- log.roll()
+ log.roll(0L)
assertEquals(2, log.segments.numberOfSegments)
assertFalse(logDir.listFiles.isEmpty)
@@ -172,7 +172,7 @@ class LocalLogTest {
def testLogDirRenameToNewDir(): Unit = {
val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
appendRecords(List(record))
- log.roll()
+ log.roll(0L)
assertEquals(2, log.segments.numberOfSegments)
val newLogDir = TestUtils.randomPartitionLogDir(tmpDir)
assertTrue(log.renameDir(newLogDir.getName))
@@ -198,7 +198,7 @@ class LocalLogTest {
val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
appendRecords(List(record))
mockTime.sleep(1)
- val newSegment = log.roll()
+ val newSegment = log.roll(0L)
log.flush(newSegment.baseOffset)
log.markFlushed(newSegment.baseOffset)
assertEquals(1L, log.recoveryPoint)
@@ -263,29 +263,29 @@ class LocalLogTest {
for (offset <- 0 to 8) {
val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
appendRecords(List(record), initialOffset = offset)
- log.roll()
+ log.roll(0L)
}
assertEquals(10L, log.segments.numberOfSegments)
class TestDeletionReason extends SegmentDeletionReason {
- private var _deletedSegments: Iterable[LogSegment] = List[LogSegment]()
+ private var _deletedSegments: util.Collection[LogSegment] = new util.ArrayList()
- override def logReason(toDelete: List[LogSegment]): Unit = {
- _deletedSegments = List[LogSegment]() ++ toDelete
+ override def logReason(toDelete: util.List[LogSegment]): Unit = {
+ _deletedSegments = new util.ArrayList(toDelete)
}
- def deletedSegments: Iterable[LogSegment] = _deletedSegments
+ def deletedSegments: util.Collection[LogSegment] = _deletedSegments
}
val reason = new TestDeletionReason()
- val toDelete = log.segments.values.asScala.toVector
- log.removeAndDeleteSegments(toDelete, asyncDelete = asyncDelete, reason)
+ val toDelete = new util.ArrayList(log.segments.values)
+ log.removeAndDeleteSegments(toDelete, asyncDelete, reason)
if (asyncDelete) {
mockTime.sleep(log.config.fileDeleteDelayMs + 1)
}
assertTrue(log.segments.isEmpty)
assertEquals(toDelete, reason.deletedSegments)
- toDelete.foreach(segment => assertTrue(segment.deleted()))
+ toDelete.forEach(segment => assertTrue(segment.deleted()))
}
@Test
@@ -302,13 +302,13 @@ class LocalLogTest {
for (offset <- 0 to 8) {
val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
appendRecords(List(record), initialOffset = offset)
- log.roll()
+ log.roll(0L)
}
assertEquals(10L, log.segments.numberOfSegments)
val toDelete = log.segments.values
- JLocalLog.deleteSegmentFiles(toDelete, asyncDelete, log.dir, log.topicPartition, log.config, log.scheduler, log.logDirFailureChannel, "")
+ LocalLog.deleteSegmentFiles(toDelete, asyncDelete, log.dir, log.topicPartition, log.config, log.scheduler, log.logDirFailureChannel, "")
if (asyncDelete) {
toDelete.forEach {
segment =>
@@ -336,7 +336,7 @@ class LocalLogTest {
appendRecords(List(record))
val newOffset = log.segments.activeSegment.baseOffset + 1
val oldActiveSegment = log.segments.activeSegment
- val newActiveSegment = log.createAndDeleteSegment(newOffset, log.segments.activeSegment, asyncDelete = true, LogTruncation(log))
+ val newActiveSegment = log.createAndDeleteSegment(newOffset, log.segments.activeSegment, true, new LogTruncation(log.logger))
assertEquals(1, log.segments.numberOfSegments)
assertEquals(newActiveSegment, log.segments.activeSegment)
assertNotEquals(oldActiveSegment, log.segments.activeSegment)
@@ -354,7 +354,7 @@ class LocalLogTest {
for (offset <- 0 to 7) {
appendRecords(List(record), initialOffset = offset)
if (offset % 2 != 0)
- log.roll()
+ log.roll(0L)
}
for (offset <- 8 to 12) {
val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
@@ -362,7 +362,7 @@ class LocalLogTest {
}
assertEquals(5, log.segments.numberOfSegments)
assertNotEquals(10L, log.segments.activeSegment.baseOffset)
- val expected = log.segments.values.asScala.toVector
+ val expected = new util.ArrayList(log.segments.values)
val deleted = log.truncateFullyAndStartAt(10L)
assertEquals(expected, deleted)
assertEquals(1, log.segments.numberOfSegments)
@@ -379,7 +379,7 @@ class LocalLogTest {
for (offset <- 0 to 4) {
appendRecords(List(record), initialOffset = offset)
if (offset % 2 != 0)
- log.roll()
+ log.roll(0L)
}
assertEquals(3, log.segments.numberOfSegments)
@@ -416,15 +416,15 @@ class LocalLogTest {
val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
appendRecords(List(record), initialOffset = offset)
if (offset % 3 == 2)
- log.roll()
+ log.roll(0L)
}
assertEquals(5, log.segments.numberOfSegments)
assertEquals(12L, log.logEndOffset)
- val expected = log.segments.values(9L, log.logEndOffset + 1).asScala.toVector
+ val expected = new util.ArrayList(log.segments.values(9L, log.logEndOffset + 1))
// Truncate to an offset before the base offset of the active segment
val deleted = log.truncateTo(7L)
- assertEquals(expected, deleted.toVector)
+ assertEquals(expected, deleted)
assertEquals(3, log.segments.numberOfSegments)
assertEquals(6L, log.segments.activeSegment.baseOffset)
assertEquals(0L, log.recoveryPoint)
@@ -444,7 +444,7 @@ class LocalLogTest {
for (i <- 0 until 5) {
val keyValues = Seq(KeyValue(i.toString, i.toString))
appendRecords(kvsToRecords(keyValues), initialOffset = i)
- log.roll()
+ log.roll(0L)
}
def nonActiveBaseOffsetsFrom(startOffset: Long): Seq[Long] = {
@@ -506,7 +506,7 @@ class LocalLogTest {
assertThrows(classOf[KafkaException], () => LocalLog.parseTopicPartitionName(dir),
() => "KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
// also test the "-delete" marker case
- val deleteMarkerDir = new File(logDir, topic + partition + "." + LocalLog.DeleteDirSuffix)
+ val deleteMarkerDir = new File(logDir, topic + partition + "." + LogFileUtils.DELETE_DIR_SUFFIX)
assertThrows(classOf[KafkaException], () => LocalLog.parseTopicPartitionName(deleteMarkerDir),
() => "KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath)
}
@@ -535,7 +535,7 @@ class LocalLogTest {
() => "KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
// also test the "-delete" marker case
- val deleteMarkerDir = new File(logDir, topicPartitionName(topic, partition) + "." + LocalLog.DeleteDirSuffix)
+ val deleteMarkerDir = new File(logDir, topicPartitionName(topic, partition) + "." + LogFileUtils.DELETE_DIR_SUFFIX)
assertThrows(classOf[KafkaException], () => LocalLog.parseTopicPartitionName(deleteMarkerDir),
() => "KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath)
}
@@ -549,7 +549,7 @@ class LocalLogTest {
() => "KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
// also test the "-delete" marker case
- val deleteMarkerDir = new File(logDir, topic + partition + "." + LocalLog.DeleteDirSuffix)
+ val deleteMarkerDir = new File(logDir, topic + partition + "." + LogFileUtils.DELETE_DIR_SUFFIX)
assertThrows(classOf[KafkaException], () => LocalLog.parseTopicPartitionName(deleteMarkerDir),
() => "KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath)
}
@@ -569,14 +569,14 @@ class LocalLogTest {
val name1 = LocalLog.logDeleteDirName(new TopicPartition("foo", 3))
assertTrue(name1.length <= 255)
assertTrue(Pattern.compile("foo-3\\.[0-9a-z]{32}-delete").matcher(name1).matches())
- assertTrue(LocalLog.DeleteDirPattern.matcher(name1).matches())
- assertFalse(LocalLog.FutureDirPattern.matcher(name1).matches())
+ assertTrue(LocalLog.DELETE_DIR_PATTERN.matcher(name1).matches())
+ assertFalse(LocalLog.FUTURE_DIR_PATTERN.matcher(name1).matches())
val name2 = LocalLog.logDeleteDirName(
new TopicPartition("n" + String.join("", Collections.nCopies(248, "o")), 5))
assertEquals(255, name2.length)
assertTrue(Pattern.compile("n[o]{212}-5\\.[0-9a-z]{32}-delete").matcher(name2).matches())
- assertTrue(LocalLog.DeleteDirPattern.matcher(name2).matches())
- assertFalse(LocalLog.FutureDirPattern.matcher(name2).matches())
+ assertTrue(LocalLog.DELETE_DIR_PATTERN.matcher(name2).matches())
+ assertFalse(LocalLog.FUTURE_DIR_PATTERN.matcher(name2).matches())
}
@Test
@@ -598,7 +598,7 @@ class LocalLogTest {
assertEquals(1, log.segments.numberOfSegments, "Log begins with a single empty segment.")
// roll active segment with the same base offset of size zero should recreate the segment
- log.roll(Some(0L))
+ log.roll(0L)
assertEquals(1, log.segments.numberOfSegments, "Expect 1 segment after roll() empty segment with base offset.")
// should be able to append records to active segment
@@ -614,7 +614,7 @@ class LocalLogTest {
assertEquals(keyValues1 ++ keyValues2, recordsToKvs(readResult.records.records.asScala))
// roll so that active segment is empty
- log.roll()
+ log.roll(0L)
assertEquals(2L, log.segments.activeSegment.baseOffset, "Expect base offset of active segment to be LEO")
assertEquals(2, log.segments.numberOfSegments, "Expect two segments.")
assertEquals(2L, log.logEndOffset)
@@ -626,7 +626,7 @@ class LocalLogTest {
// roll active segment with the same base offset of size zero should recreate the segment
{
- val newSegment = log.roll()
+ val newSegment = log.roll(0L)
assertEquals(0L, newSegment.baseOffset)
assertEquals(1, log.segments.numberOfSegments)
assertEquals(0L, log.logEndOffset)
@@ -635,7 +635,7 @@ class LocalLogTest {
appendRecords(List(KeyValue("k1", "v1").toRecord()))
{
- val newSegment = log.roll()
+ val newSegment = log.roll(0L)
assertEquals(1L, newSegment.baseOffset)
assertEquals(2, log.segments.numberOfSegments)
assertEquals(1L, log.logEndOffset)
@@ -644,7 +644,7 @@ class LocalLogTest {
appendRecords(List(KeyValue("k2", "v2").toRecord()), initialOffset = 1L)
{
- val newSegment = log.roll(Some(1L))
+ val newSegment = log.roll(1L)
assertEquals(2L, newSegment.baseOffset)
assertEquals(3, log.segments.numberOfSegments)
assertEquals(2L, log.logEndOffset)
@@ -661,7 +661,7 @@ class LocalLogTest {
assertEquals(3, log.logEndOffset, "Expect two records in the log")
// roll to create an empty active segment
- log.roll()
+ log.roll(0L)
assertEquals(3L, log.segments.activeSegment.baseOffset)
// intentionally setup the logEndOffset to introduce an error later
@@ -669,7 +669,7 @@ class LocalLogTest {
// expect an error because of attempt to roll to a new offset (1L) that's lower than the
// base offset (3L) of the active segment
- assertThrows(classOf[KafkaException], () => log.roll())
+ assertThrows(classOf[KafkaException], () => log.roll(0L))
}
@Test
@@ -679,7 +679,7 @@ class LocalLogTest {
val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
appendRecords(List(record))
mockTime.sleep(1)
- val newSegment = log.roll()
+ val newSegment = log.roll(0L)
// simulate the directory is renamed concurrently
doReturn(new File("__NON_EXISTENT__"), Nil: _*).when(spyLog).dir
@@ -701,14 +701,14 @@ class LocalLogTest {
time,
config.initFileSize,
config.preallocate))
- new LocalLog(_dir = dir,
- config = config,
- segments = segments,
- recoveryPoint = recoveryPoint,
- nextOffsetMetadata = nextOffsetMetadata,
- scheduler = scheduler,
- time = time,
- topicPartition = topicPartition,
- logDirFailureChannel = logDirFailureChannel)
+ new LocalLog(dir,
+ config,
+ segments,
+ recoveryPoint,
+ nextOffsetMetadata,
+ scheduler,
+ time,
+ topicPartition,
+ logDirFailureChannel)
}
}
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 68abd08f22f..5b3cc00732d 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, LogDirFailureChannel, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 9086626706c..81600b0f201 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.utils.Throttler
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index b72c1c555db..e7e99852c53 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0
import org.apache.kafka.server.util.{MockTime, Scheduler}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile}
import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 0642dfebd19..ddc9cac8d81 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -20,7 +20,7 @@ package kafka.server
import com.yammer.metrics.core.{Gauge, Meter, Timer}
import kafka.cluster.PartitionTest.MockPartitionListener
import kafka.cluster.Partition
-import kafka.log.{LocalLog, LogManager, LogManagerTest, UnifiedLog}
+import kafka.log.{LogManager, LogManagerTest, UnifiedLog}
import kafka.log.remote.RemoteLogManager
import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS
import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics
@@ -70,7 +70,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPa
import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.server.util.{MockScheduler, MockTime}
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, PartitionMetadataFile}
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeEach, Test}
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index fa67126d275..3aaa58641bc 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -19,11 +19,11 @@ package kafka.utils
import java.util.Properties
import java.util.concurrent.atomic._
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Executors, TimeUnit}
-import kafka.log.{LocalLog, UnifiedLog}
+import kafka.log.UnifiedLog
import kafka.utils.TestUtils.retry
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
-import org.apache.kafka.storage.internals.log.{LogConfig, LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.{LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout}
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 0dc824d1f43..b5a3c9bd96e 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -163,7 +163,6 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
Given that, this bug pattern doesn't make sense for Scala code. -->
-
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java
index a5436f448db..d2cd71addad 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java
@@ -16,10 +16,16 @@
*/
package org.apache.kafka.storage.internals.log;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.record.FileLogInputStream;
import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.Scheduler;
@@ -29,25 +35,774 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.Iterator;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import java.util.function.Supplier;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import static java.util.Collections.singletonList;
import static org.apache.kafka.common.utils.Utils.require;
import static org.apache.kafka.storage.internals.log.LogFileUtils.CLEANED_FILE_SUFFIX;
import static org.apache.kafka.storage.internals.log.LogFileUtils.DELETED_FILE_SUFFIX;
+import static org.apache.kafka.storage.internals.log.LogFileUtils.DELETE_DIR_SUFFIX;
+import static org.apache.kafka.storage.internals.log.LogFileUtils.FUTURE_DIR_SUFFIX;
+import static org.apache.kafka.storage.internals.log.LogFileUtils.STRAY_DIR_SUFFIX;
import static org.apache.kafka.storage.internals.log.LogFileUtils.SWAP_FILE_SUFFIX;
import static org.apache.kafka.storage.internals.log.LogFileUtils.isLogFile;
+/**
+ * An append-only log for storing messages locally. The log is a sequence of LogSegments, each with a base offset.
+ * New log segments are created according to a configurable policy that controls the size in bytes or time interval
+ * for a given segment.
+ * NOTE: this class is not thread-safe, and it relies on the thread safety provided by the Log class.
+ */
public class LocalLog {
private static final Logger LOG = LoggerFactory.getLogger(LocalLog.class);
+ public static final Pattern DELETE_DIR_PATTERN = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + LogFileUtils.DELETE_DIR_SUFFIX);
+ public static final Pattern FUTURE_DIR_PATTERN = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + LogFileUtils.FUTURE_DIR_SUFFIX);
+ public static final Pattern STRAY_DIR_PATTERN = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + LogFileUtils.STRAY_DIR_SUFFIX);
+ public static final long UNKNOWN_OFFSET = -1L;
+
+ // Last time the log was flushed
+ private final AtomicLong lastFlushedTime;
+ private final String logIdent;
+ private final LogSegments segments;
+ private final Scheduler scheduler;
+ private final Time time;
+ private final TopicPartition topicPartition;
+ private final LogDirFailureChannel logDirFailureChannel;
+ private final Logger logger;
+
+ private volatile LogOffsetMetadata nextOffsetMetadata;
+ // The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers()
+ // After memory mapped buffer is closed, no disk IO operation should be performed for this log.
+ private volatile boolean isMemoryMappedBufferClosed = false;
+ // Cache value of parent directory to avoid allocations in hot paths like ReplicaManager.checkpointHighWatermarks
+ private volatile String parentDir;
+ private volatile LogConfig config;
+ private volatile long recoveryPoint;
+ private File dir;
+
+ /**
+ * @param dir The directory in which log segments are created.
+ * @param config The log configuration settings
+ * @param segments The non-empty log segments recovered from disk
+ * @param recoveryPoint The offset at which to begin the next recovery i.e. the first offset which has not been flushed to disk
+ * @param nextOffsetMetadata The offset where the next message could be appended
+ * @param scheduler The thread pool scheduler used for background actions
+ * @param time The time instance used for checking the clock
+ * @param topicPartition The topic partition associated with this log
+ * @param logDirFailureChannel The LogDirFailureChannel instance to asynchronously handle Log dir failure
+ */
+ public LocalLog(File dir,
+ LogConfig config,
+ LogSegments segments,
+ long recoveryPoint,
+ LogOffsetMetadata nextOffsetMetadata,
+ Scheduler scheduler,
+ Time time,
+ TopicPartition topicPartition,
+ LogDirFailureChannel logDirFailureChannel) {
+ this.dir = dir;
+ this.config = config;
+ this.segments = segments;
+ this.recoveryPoint = recoveryPoint;
+ this.nextOffsetMetadata = nextOffsetMetadata;
+ this.scheduler = scheduler;
+ this.time = time;
+ this.topicPartition = topicPartition;
+ this.logDirFailureChannel = logDirFailureChannel;
+ this.logIdent = "[LocalLog partition=" + topicPartition + ", dir=" + dir + "] ";
+ this.logger = new LogContext(logIdent).logger(LocalLog.class);
+ // Last time the log was flushed
+ this.lastFlushedTime = new AtomicLong(time.milliseconds());
+ this.parentDir = dir.getParent();
+ }
+
+ public File dir() {
+ return dir;
+ }
+
+ public Logger logger() {
+ return logger;
+ }
+
+ public LogConfig config() {
+ return config;
+ }
+
+ public LogSegments segments() {
+ return segments;
+ }
+
+ public Scheduler scheduler() {
+ return scheduler;
+ }
+
+ public LogOffsetMetadata nextOffsetMetadata() {
+ return nextOffsetMetadata;
+ }
+
+ public TopicPartition topicPartition() {
+ return topicPartition;
+ }
+
+ public LogDirFailureChannel logDirFailureChannel() {
+ return logDirFailureChannel;
+ }
+
+ public long recoveryPoint() {
+ return recoveryPoint;
+ }
+
+ public Time time() {
+ return time;
+ }
+
+ public String name() {
+ return dir.getName();
+ }
+
+ public String parentDir() {
+ return parentDir;
+ }
+
+ public File parentDirFile() {
+ return new File(parentDir);
+ }
+
+ public boolean isFuture() {
+ return dir.getName().endsWith(LogFileUtils.FUTURE_DIR_SUFFIX);
+ }
+
+ private T maybeHandleIOException(Supplier errorMsgSupplier, StorageAction function) {
+ return maybeHandleIOException(logDirFailureChannel, parentDir, errorMsgSupplier, function);
+ }
+
+ /**
+ * Rename the directory of the log
+ * @param name the new dir name
+ * @throws KafkaStorageException if rename fails
+ */
+ public boolean renameDir(String name) {
+ return maybeHandleIOException(
+ () -> "Error while renaming dir for " + topicPartition + " in log dir " + dir.getParent(),
+ () -> {
+ File renamedDir = new File(dir.getParent(), name);
+ Utils.atomicMoveWithFallback(dir.toPath(), renamedDir.toPath());
+ if (!renamedDir.equals(dir)) {
+ dir = renamedDir;
+ parentDir = renamedDir.getParent();
+ segments.updateParentDir(renamedDir);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ );
+ }
+
+ /**
+ * Update the existing configuration to the new provided configuration.
+ * @param newConfig the new configuration to be updated to
+ */
+ public void updateConfig(LogConfig newConfig) {
+ LogConfig oldConfig = config;
+ config = newConfig;
+ RecordVersion oldRecordVersion = oldConfig.recordVersion();
+ RecordVersion newRecordVersion = newConfig.recordVersion();
+ if (newRecordVersion.precedes(oldRecordVersion)) {
+ logger.warn("Record format version has been downgraded from {} to {}.", oldRecordVersion, newRecordVersion);
+ }
+ }
+
+ public void checkIfMemoryMappedBufferClosed() {
+ if (isMemoryMappedBufferClosed) {
+ throw new KafkaStorageException("The memory mapped buffer for log of " + topicPartition + " is already closed");
+ }
+ }
+
+ public void updateRecoveryPoint(long newRecoveryPoint) {
+ recoveryPoint = newRecoveryPoint;
+ }
+
+ /**
+ * Update recoveryPoint to provided offset and mark the log as flushed, if the offset is greater
+ * than the existing recoveryPoint.
+ *
+ * @param offset the offset to be updated
+ */
+ public void markFlushed(long offset) {
+ checkIfMemoryMappedBufferClosed();
+ if (offset > recoveryPoint) {
+ updateRecoveryPoint(offset);
+ lastFlushedTime.set(time.milliseconds());
+ }
+ }
+
+ /**
+ * The number of messages appended to the log since the last flush
+ */
+ public long unflushedMessages() {
+ return logEndOffset() - recoveryPoint;
+ }
+
+ /**
+ * Flush local log segments for all offsets up to offset-1.
+ * Does not update the recovery point.
+ *
+ * @param offset The offset to flush up to (non-inclusive)
+ */
+ public void flush(long offset) throws IOException {
+ long currentRecoveryPoint = recoveryPoint;
+ if (currentRecoveryPoint <= offset) {
+ Collection segmentsToFlush = segments.values(currentRecoveryPoint, offset);
+ for (LogSegment logSegment : segmentsToFlush) {
+ logSegment.flush();
+ }
+ // If there are any new segments, we need to flush the parent directory for crash consistency.
+ if (segmentsToFlush.stream().anyMatch(s -> s.baseOffset() >= currentRecoveryPoint)) {
+ // The directory might be renamed concurrently for topic deletion, which may cause NoSuchFileException here.
+ // Since the directory is to be deleted anyways, we just swallow NoSuchFileException and let it go.
+ Utils.flushDirIfExists(dir.toPath());
+ }
+ }
+ }
+
+ /**
+ * The time this log is last known to have been fully flushed to disk
+ */
+ public long lastFlushTime() {
+ return lastFlushedTime.get();
+ }
+
+ /**
+ * The offset metadata of the next message that will be appended to the log
+ */
+ public LogOffsetMetadata logEndOffsetMetadata() {
+ return nextOffsetMetadata;
+ }
+
+ /**
+ * The offset of the next message that will be appended to the log
+ */
+ public long logEndOffset() {
+ return nextOffsetMetadata.messageOffset;
+ }
+
+ /**
+ * Update end offset of the log, and update the recoveryPoint.
+ *
+ * @param endOffset the new end offset of the log
+ */
+ public void updateLogEndOffset(long endOffset) {
+ nextOffsetMetadata = new LogOffsetMetadata(endOffset, segments.activeSegment().baseOffset(), segments.activeSegment().size());
+ if (recoveryPoint > endOffset) {
+ updateRecoveryPoint(endOffset);
+ }
+ }
+
+ /**
+ * Close file handlers used by log but don't write to disk.
+ * This is called if the log directory is offline.
+ */
+ public void closeHandlers() {
+ segments.closeHandlers();
+ isMemoryMappedBufferClosed = true;
+ }
+
+ /**
+ * Closes the segments of the log.
+ */
+ public void close() {
+ maybeHandleIOException(
+ () -> "Error while renaming dir for " + topicPartition + " in dir " + dir.getParent(),
+ () -> {
+ checkIfMemoryMappedBufferClosed();
+ segments.close();
+ return null;
+ }
+ );
+ }
+
+ /**
+ * Completely delete this log directory with no delay.
+ */
+ public void deleteEmptyDir() {
+ maybeHandleIOException(
+ () -> "Error while deleting dir for " + topicPartition + " in dir " + dir.getParent(),
+ () -> {
+ if (!segments.isEmpty()) {
+ throw new IllegalStateException("Can not delete directory when " + segments.numberOfSegments() + " segments are still present");
+ }
+ if (!isMemoryMappedBufferClosed) {
+ throw new IllegalStateException("Can not delete directory when memory mapped buffer for log of " + topicPartition + " is still open.");
+ }
+ Utils.delete(dir);
+ return null;
+ }
+ );
+ }
+
+ /**
+ * Completely delete all segments with no delay.
+ * @return the deleted segments
+ */
+ public List deleteAllSegments() {
+ return maybeHandleIOException(
+ () -> "Error while deleting all segments for $topicPartition in dir ${dir.getParent}",
+ () -> {
+ List deletableSegments = new ArrayList<>(segments.values());
+ removeAndDeleteSegments(
+ segments.values(),
+ false,
+ toDelete -> logger.info("Deleting segments as the log has been deleted: {}", toDelete.stream()
+ .map(LogSegment::toString)
+ .collect(Collectors.joining(", "))));
+ isMemoryMappedBufferClosed = true;
+ return deletableSegments;
+ }
+ );
+ }
+
+ /**
+ * This method deletes the given log segments by doing the following for each of them:
+ *
+ * - It removes the segment from the segment map so that it will no longer be used for reads.
+ *
- It renames the index and log files by appending .deleted to the respective file name
+ *
- It can either schedule an asynchronous delete operation to occur in the future or perform the deletion synchronously
+ *
+ * Asynchronous deletion allows reads to happen concurrently without synchronization and without the possibility of
+ * physically deleting a file while it is being read.
+ * This method does not convert IOException to KafkaStorageException, the immediate caller
+ * is expected to catch and handle IOException.
+ *
+ * @param segmentsToDelete The log segments to schedule for deletion
+ * @param asyncDelete Whether the segment files should be deleted asynchronously
+ * @param reason The reason for the segment deletion
+ */
+ public void removeAndDeleteSegments(Collection segmentsToDelete,
+ boolean asyncDelete,
+ SegmentDeletionReason reason) throws IOException {
+ if (!segmentsToDelete.isEmpty()) {
+ // Most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by
+ // removing the deleted segment, we should force materialization of the iterator here, so that results of the
+ // iteration remain valid and deterministic. We should also pass only the materialized view of the
+ // iterator to the logic that actually deletes the segments.
+ List toDelete = new ArrayList<>(segmentsToDelete);
+ reason.logReason(toDelete);
+ toDelete.forEach(segment -> segments.remove(segment.baseOffset()));
+ deleteSegmentFiles(toDelete, asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent);
+ }
+ }
+
+ /**
+ * This method deletes the given segment and creates a new segment with the given new base offset. It ensures an
+ * active segment exists in the log at all times during this process.
+ *
+ * Asynchronous deletion allows reads to happen concurrently without synchronization and without the possibility of
+ * physically deleting a file while it is being read.
+ *
+ * This method does not convert IOException to KafkaStorageException, the immediate caller
+ * is expected to catch and handle IOException.
+ *
+ * @param newOffset The base offset of the new segment
+ * @param segmentToDelete The old active segment to schedule for deletion
+ * @param asyncDelete Whether the segment files should be deleted asynchronously
+ * @param reason The reason for the segment deletion
+ */
+ public LogSegment createAndDeleteSegment(long newOffset,
+ LogSegment segmentToDelete,
+ boolean asyncDelete,
+ SegmentDeletionReason reason) throws IOException {
+ if (newOffset == segmentToDelete.baseOffset()) {
+ segmentToDelete.changeFileSuffixes("", LogFileUtils.DELETED_FILE_SUFFIX);
+ }
+ LogSegment newSegment = LogSegment.open(dir,
+ newOffset,
+ config,
+ time,
+ config.initFileSize(),
+ config.preallocate);
+ segments.add(newSegment);
+
+ reason.logReason(singletonList(segmentToDelete));
+ if (newOffset != segmentToDelete.baseOffset()) {
+ segments.remove(segmentToDelete.baseOffset());
+ }
+ deleteSegmentFiles(singletonList(segmentToDelete), asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent);
+ return newSegment;
+ }
+
+ /**
+ * Given a message offset, find its corresponding offset metadata in the log.
+ * If the message offset is out of range, throw an OffsetOutOfRangeException
+ */
+ public LogOffsetMetadata convertToOffsetMetadataOrThrow(long offset) throws IOException {
+ FetchDataInfo fetchDataInfo = read(offset, 1, false, nextOffsetMetadata, false);
+ return fetchDataInfo.fetchOffsetMetadata;
+ }
+
+ /**
+ * Read messages from the log.
+ *
+ * @param startOffset The offset to begin reading at
+ * @param maxLength The maximum number of bytes to read
+ * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists)
+ * @param maxOffsetMetadata The metadata of the maximum offset to be fetched
+ * @param includeAbortedTxns If true, aborted transactions are included
+ * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset
+ * @return The fetch data information including fetch starting offset metadata and messages read.
+ */
+ public FetchDataInfo read(long startOffset,
+ int maxLength,
+ boolean minOneMessage,
+ LogOffsetMetadata maxOffsetMetadata,
+ boolean includeAbortedTxns) throws IOException {
+ return maybeHandleIOException(
+ () -> "Exception while reading from " + topicPartition + " in dir " + dir.getParent(),
+ () -> {
+ logger.trace("Reading maximum $maxLength bytes at offset {} from log with total length {} bytes",
+ startOffset, segments.sizeInBytes());
+
+ LogOffsetMetadata endOffsetMetadata = nextOffsetMetadata;
+ long endOffset = endOffsetMetadata.messageOffset;
+ Optional segmentOpt = segments.floorSegment(startOffset);
+ // return error on attempt to read beyond the log end offset
+ if (startOffset > endOffset || segmentOpt.isEmpty()) {
+ throw new OffsetOutOfRangeException("Received request for offset " + startOffset + " for partition " + topicPartition + ", " +
+ "but we only have log segments upto " + endOffset + ".");
+ }
+ if (startOffset == maxOffsetMetadata.messageOffset) return emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns);
+ if (startOffset > maxOffsetMetadata.messageOffset) return emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns);
+
+ // Do the read on the segment with a base offset less than the target offset
+ // but if that segment doesn't contain any messages with an offset greater than that
+ // continue to read from successive segments until we get some messages or we reach the end of the log
+ FetchDataInfo fetchDataInfo = null;
+ while (fetchDataInfo == null && segmentOpt.isPresent()) {
+ LogSegment segment = segmentOpt.get();
+ long baseOffset = segment.baseOffset();
+
+ // 1. If `maxOffsetMetadata#segmentBaseOffset < segment#baseOffset`, then return maxPosition as empty.
+ // 2. Use the max-offset position if it is on this segment; otherwise, the segment size is the limit.
+ // 3. When maxOffsetMetadata is message-offset-only, then we don't know the relativePositionInSegment so
+ // return maxPosition as empty to avoid reading beyond the max-offset
+ Optional maxPositionOpt;
+ if (segment.baseOffset() < maxOffsetMetadata.segmentBaseOffset)
+ maxPositionOpt = Optional.of((long) segment.size());
+ else if (segment.baseOffset() == maxOffsetMetadata.segmentBaseOffset && !maxOffsetMetadata.messageOffsetOnly())
+ maxPositionOpt = Optional.of((long) maxOffsetMetadata.relativePositionInSegment);
+ else
+ maxPositionOpt = Optional.empty();
+
+ fetchDataInfo = segment.read(startOffset, maxLength, maxPositionOpt, minOneMessage);
+ if (fetchDataInfo != null) {
+ if (includeAbortedTxns) {
+ fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo);
+ }
+ } else {
+ segmentOpt = segments.higherSegment(baseOffset);
+ }
+ }
+ if (fetchDataInfo != null) {
+ return fetchDataInfo;
+ } else {
+ // okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
+ // this can happen when all messages with offset larger than start offsets have been deleted.
+ // In this case, we will return the empty set with log end offset metadata
+ return new FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY);
+ }
+ }
+ );
+ }
+
+ public void append(long lastOffset, long largestTimestamp, long shallowOffsetOfMaxTimestamp, MemoryRecords records) throws IOException {
+ segments.activeSegment().append(lastOffset, largestTimestamp, shallowOffsetOfMaxTimestamp, records);
+ updateLogEndOffset(lastOffset + 1);
+ }
+
+ FetchDataInfo addAbortedTransactions(long startOffset, LogSegment segment, FetchDataInfo fetchInfo) throws IOException {
+ int fetchSize = fetchInfo.records.sizeInBytes();
+ OffsetPosition startOffsetPosition = new OffsetPosition(
+ fetchInfo.fetchOffsetMetadata.messageOffset,
+ fetchInfo.fetchOffsetMetadata.relativePositionInSegment);
+ long upperBoundOffset = segment.fetchUpperBoundOffset(startOffsetPosition, fetchSize)
+ .orElse(segments.higherSegment(segment.baseOffset())
+ .map(LogSegment::baseOffset).orElse(logEndOffset()));
+
+ List abortedTransactions = new ArrayList<>();
+ Consumer> accumulator = abortedTxns -> {
+ for (AbortedTxn abortedTxn : abortedTxns)
+ abortedTransactions.add(abortedTxn.asAbortedTransaction());
+ };
+ collectAbortedTransactions(startOffset, upperBoundOffset, segment, accumulator);
+ return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
+ fetchInfo.records,
+ fetchInfo.firstEntryIncomplete,
+ Optional.of(abortedTransactions));
+ }
+
+ private void collectAbortedTransactions(long startOffset, long upperBoundOffset,
+ LogSegment startingSegment,
+ Consumer> accumulator) {
+ Iterator higherSegments = segments.higherSegments(startingSegment.baseOffset()).iterator();
+ Optional segmentEntryOpt = Optional.of(startingSegment);
+ while (segmentEntryOpt.isPresent()) {
+ LogSegment segment = segmentEntryOpt.get();
+ TxnIndexSearchResult searchResult = segment.collectAbortedTxns(startOffset, upperBoundOffset);
+ accumulator.accept(searchResult.abortedTransactions);
+ if (searchResult.isComplete) return;
+ segmentEntryOpt = nextItem(higherSegments);
+ }
+ }
+
+ public List collectAbortedTransactions(long logStartOffset, long baseOffset, long upperBoundOffset) {
+ Optional segmentEntry = segments.floorSegment(baseOffset);
+ List allAbortedTxns = new ArrayList<>();
+ segmentEntry.ifPresent(logSegment -> collectAbortedTransactions(logStartOffset, upperBoundOffset, logSegment, allAbortedTxns::addAll));
+ return allAbortedTxns;
+ }
+
+ /**
+ * Roll the log over to a new active segment starting with the current logEndOffset.
+ * This will trim the index to the exact size of the number of entries it currently contains.
+ *
+ * @param expectedNextOffset The expected next offset after the segment is rolled
+ *
+ * @return The newly rolled segment
+ */
+ public LogSegment roll(Long expectedNextOffset) {
+ return maybeHandleIOException(
+ () -> "Error while rolling log segment for " + topicPartition + " in dir " + dir.getParent(),
+ () -> {
+ long start = time.hiResClockMs();
+ checkIfMemoryMappedBufferClosed();
+ long newOffset = Math.max(expectedNextOffset, logEndOffset());
+ File logFile = LogFileUtils.logFile(dir, newOffset, "");
+ LogSegment activeSegment = segments.activeSegment();
+ if (segments.contains(newOffset)) {
+ // segment with the same base offset already exists and loaded
+ if (activeSegment.baseOffset() == newOffset && activeSegment.size() == 0) {
+ // We have seen this happen (see KAFKA-6388) after shouldRoll() returns true for an
+ // active segment of size zero because of one of the indexes is "full" (due to _maxEntries == 0).
+ logger.warn("Trying to roll a new log segment with start offset {}=max(provided offset = {}, LEO = {}) " +
+ "while it already exists and is active with size 0. Size of time index: {}, size of offset index: {}.",
+ newOffset, expectedNextOffset, logEndOffset(), activeSegment.timeIndex().entries(), activeSegment.offsetIndex().entries());
+ LogSegment newSegment = createAndDeleteSegment(
+ newOffset,
+ activeSegment,
+ true,
+ toDelete -> logger.info("Deleting segments as part of log roll: {}", toDelete.stream()
+ .map(LogSegment::toString)
+ .collect(Collectors.joining(", "))));
+ updateLogEndOffset(nextOffsetMetadata.messageOffset);
+ logger.info("Rolled new log segment at offset {} in {} ms.", newOffset, time.hiResClockMs() - start);
+ return newSegment;
+ } else {
+ throw new KafkaException("Trying to roll a new log segment for topic partition " + topicPartition + " with start offset " + newOffset +
+ " =max(provided offset = " + expectedNextOffset + ", LEO = " + logEndOffset() + ") while it already exists. Existing " +
+ "segment is " + segments.get(newOffset) + ".");
+ }
+ } else if (!segments.isEmpty() && newOffset < activeSegment.baseOffset()) {
+ throw new KafkaException(
+ "Trying to roll a new log segment for topic partition " + topicPartition + " with " +
+ "start offset " + newOffset + " =max(provided offset = " + expectedNextOffset + ", LEO = " + logEndOffset() + ") lower than start offset of the active segment " + activeSegment);
+ } else {
+ File offsetIdxFile = LogFileUtils.offsetIndexFile(dir, newOffset);
+ File timeIdxFile = LogFileUtils.timeIndexFile(dir, newOffset);
+ File txnIdxFile = LogFileUtils.transactionIndexFile(dir, newOffset);
+ for (File file : Arrays.asList(logFile, offsetIdxFile, timeIdxFile, txnIdxFile)) {
+ if (file.exists()) {
+ logger.warn("Newly rolled segment file {} already exists; deleting it first", file.getAbsolutePath());
+ Files.delete(file.toPath());
+ }
+ }
+ if (segments.lastSegment().isPresent()) {
+ segments.lastSegment().get().onBecomeInactiveSegment();
+ }
+ }
+ LogSegment newSegment = LogSegment.open(dir,
+ newOffset,
+ config,
+ time,
+ config.initFileSize(),
+ config.preallocate);
+ segments.add(newSegment);
+
+ // We need to update the segment base offset and append position data of the metadata when log rolls.
+ // The next offset should not change.
+ updateLogEndOffset(nextOffsetMetadata.messageOffset);
+ logger.info("Rolled new log segment at offset {} in {} ms.", newOffset, time.hiResClockMs() - start);
+ return newSegment;
+ }
+ );
+ }
+
+ /**
+ * Delete all data in the local log and start at the new offset.
+ *
+ * @param newOffset The new offset to start the log with
+ * @return the list of segments that were scheduled for deletion
+ */
+ public List truncateFullyAndStartAt(long newOffset) {
+ return maybeHandleIOException(
+ () -> "Error while truncating the entire log for " + topicPartition + " in dir " + dir.getParent(),
+ () -> {
+ logger.debug("Truncate and start at offset {}", newOffset);
+ checkIfMemoryMappedBufferClosed();
+ List segmentsToDelete = new ArrayList<>(segments.values());
+
+ if (!segmentsToDelete.isEmpty()) {
+ removeAndDeleteSegments(segmentsToDelete.subList(0, segmentsToDelete.size() - 1), true, new LogTruncation(logger));
+ // Use createAndDeleteSegment() to create new segment first and then delete the old last segment to prevent missing
+ // active segment during the deletion process
+ createAndDeleteSegment(newOffset, segmentsToDelete.get(segmentsToDelete.size() - 1), true, new LogTruncation(logger));
+ }
+ updateLogEndOffset(newOffset);
+ return segmentsToDelete;
+ }
+ );
+ }
+
+ /**
+ * Truncate this log so that it ends with the greatest offset < targetOffset.
+ *
+ * @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete.
+ * @return the list of segments that were scheduled for deletion
+ */
+ public Collection truncateTo(long targetOffset) throws IOException {
+ Collection deletableSegments = segments.filter(segment -> segment.baseOffset() > targetOffset);
+ removeAndDeleteSegments(deletableSegments, true, new LogTruncation(logger));
+ segments.activeSegment().truncateTo(targetOffset);
+ updateLogEndOffset(targetOffset);
+ return deletableSegments;
+ }
+
+ /**
+ * Return a directory name to rename the log directory to for async deletion.
+ * The name will be in the following format: "topic-partitionId.uniqueId-delete".
+ * If the topic name is too long, it will be truncated to prevent the total name
+ * from exceeding 255 characters.
+ */
+ public static String logDeleteDirName(TopicPartition topicPartition) {
+ return logDirNameWithSuffixCappedLength(topicPartition, LogFileUtils.DELETE_DIR_SUFFIX);
+ }
+
+ /**
+ * Return a directory name to rename the log directory to for stray partition deletion.
+ * The name will be in the following format: "topic-partitionId.uniqueId-stray".
+ * If the topic name is too long, it will be truncated to prevent the total name
+ * from exceeding 255 characters.
+ */
+ public static String logStrayDirName(TopicPartition topicPartition) {
+ return logDirNameWithSuffixCappedLength(topicPartition, LogFileUtils.STRAY_DIR_SUFFIX);
+ }
+
+ /**
+ * Return a future directory name for the given topic partition. The name will be in the following
+ * format: topic-partition.uniqueId-future where topic, partition and uniqueId are variables.
+ */
+ public static String logFutureDirName(TopicPartition topicPartition) {
+ return logDirNameWithSuffix(topicPartition, LogFileUtils.FUTURE_DIR_SUFFIX);
+ }
+
+ /**
+ * Return a new directory name in the following format: "${topic}-${partitionId}.${uniqueId}${suffix}".
+ * If the topic name is too long, it will be truncated to prevent the total name
+ * from exceeding 255 characters.
+ */
+ private static String logDirNameWithSuffixCappedLength(TopicPartition topicPartition, String suffix) {
+ String uniqueId = UUID.randomUUID().toString().replaceAll("-", "");
+ String fullSuffix = "-" + topicPartition.partition() + "." + uniqueId + suffix;
+ int prefixLength = Math.min(topicPartition.topic().length(), 255 - fullSuffix.length());
+ return topicPartition.topic().substring(0, prefixLength) + fullSuffix;
+ }
+
+ private static String logDirNameWithSuffix(TopicPartition topicPartition, String suffix) {
+ String uniqueId = UUID.randomUUID().toString().replaceAll("-", "");
+ return logDirName(topicPartition) + "." + uniqueId + suffix;
+ }
+
+ /**
+ * Return a directory name for the given topic partition. The name will be in the following
+ * format: topic-partition where topic, partition are variables.
+ */
+ public static String logDirName(TopicPartition topicPartition) {
+ return topicPartition.topic() + "-" + topicPartition.partition();
+ }
+
+ private static KafkaException exception(File dir) throws IOException {
+ return new KafkaException("Found directory " + dir.getCanonicalPath() + ", '" + dir.getName() + "' is not in the form of " +
+ "topic-partition or topic-partition.uniqueId-delete (if marked for deletion).\n" +
+ "Kafka's log directories (and children) should only contain Kafka topic data.");
+ }
+
+ /**
+ * Parse the topic and partition out of the directory name of a log
+ */
+ public static TopicPartition parseTopicPartitionName(File dir) throws IOException {
+ if (dir == null) {
+ throw new KafkaException("dir should not be null");
+ }
+ String dirName = dir.getName();
+ if (dirName.isEmpty() || !dirName.contains("-")) {
+ throw exception(dir);
+ }
+ if (dirName.endsWith(DELETE_DIR_SUFFIX) && !DELETE_DIR_PATTERN.matcher(dirName).matches() ||
+ dirName.endsWith(FUTURE_DIR_SUFFIX) && !FUTURE_DIR_PATTERN.matcher(dirName).matches() ||
+ dirName.endsWith(STRAY_DIR_SUFFIX) && !STRAY_DIR_PATTERN.matcher(dirName).matches()) {
+ throw exception(dir);
+ }
+ String name = (dirName.endsWith(DELETE_DIR_SUFFIX) || dirName.endsWith(FUTURE_DIR_SUFFIX) || dirName.endsWith(STRAY_DIR_SUFFIX))
+ ? dirName.substring(0, dirName.lastIndexOf('.'))
+ : dirName;
+
+ int index = name.lastIndexOf('-');
+ String topic = name.substring(0, index);
+ String partitionString = name.substring(index + 1);
+ if (topic.isEmpty() || partitionString.isEmpty()) {
+ throw exception(dir);
+ }
+ try {
+ return new TopicPartition(topic, Integer.parseInt(partitionString));
+ } catch (NumberFormatException nfe) {
+ throw exception(dir);
+ }
+ }
+
+ /**
+ * Wraps the value of iterator.next() in an Optional instance.
+ *
+ * @param iterator given iterator to iterate over
+ * @return if a next element exists, Optional#empty otherwise.
+ * @param the type of object held within the iterator
+ */
+ public static Optional nextItem(Iterator iterator) {
+ return iterator.hasNext() ? Optional.of(iterator.next()) : Optional.empty();
+ }
+
+ private static FetchDataInfo emptyFetchDataInfo(LogOffsetMetadata fetchOffsetMetadata, boolean includeAbortedTxns) {
+ Optional> abortedTransactions = includeAbortedTxns
+ ? Optional.of(Collections.emptyList())
+ : Optional.empty();
+ return new FetchDataInfo(fetchOffsetMetadata, MemoryRecords.EMPTY, false, abortedTransactions);
+ }
+
/**
* Invokes the provided function and handles any IOException raised by the function by marking the
* provided directory offline.
@@ -171,19 +926,16 @@ public class LocalLog {
try {
int position = 0;
FileRecords sourceRecords = segment.log();
-
while (position < sourceRecords.sizeInBytes()) {
FileLogInputStream.FileChannelRecordBatch firstBatch = sourceRecords.batchesFrom(position).iterator().next();
LogSegment newSegment = createNewCleanedSegment(dir, config, firstBatch.baseOffset());
newSegments.add(newSegment);
-
int bytesAppended = newSegment.appendFromFile(sourceRecords, position);
- if (bytesAppended == 0)
+ if (bytesAppended == 0) {
throw new IllegalStateException("Failed to append records from position " + position + " in " + segment);
-
+ }
position += bytesAppended;
}
-
// prepare new segments
int totalSizeOfNewSegments = 0;
for (LogSegment splitSegment : newSegments) {
@@ -198,7 +950,7 @@ public class LocalLog {
}
// replace old segment with new ones
LOG.info("{}Replacing overflowed segment $segment with split segments {}", logPrefix, newSegments);
- List deletedSegments = replaceSegments(existingSegments, newSegments, Collections.singletonList(segment),
+ List deletedSegments = replaceSegments(existingSegments, newSegments, singletonList(segment),
dir, topicPartition, config, scheduler, logDirFailureChannel, logPrefix, false);
return new SplitSegmentResult(deletedSegments, newSegments);
} catch (Exception e) {
@@ -269,7 +1021,7 @@ public class LocalLog {
.sorted(Comparator.comparingLong(LogSegment::baseOffset))
.collect(Collectors.toList());
- // need to do this in two phases to be crash safe AND do the delete asynchronously
+ // need to do this in two phases to be crash safe AND do the deletion asynchronously
// if we crash in the middle of this we complete the swap in loadSegments()
List reversedSegmentsList = new ArrayList<>(sortedNewSegments);
Collections.reverse(reversedSegmentsList);
@@ -290,7 +1042,7 @@ public class LocalLog {
existingSegments.remove(segment.baseOffset());
}
deleteSegmentFiles(
- Collections.singletonList(segment),
+ singletonList(segment),
true,
dir,
topicPartition,
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java
index fd873f103ee..dd62b90f7f6 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java
@@ -60,6 +60,12 @@ public final class LogFileUtils {
/** Suffix of a directory that is scheduled to be deleted */
public static final String DELETE_DIR_SUFFIX = "-delete";
+ /** Suffix of a directory that is used for future partition */
+ public static final String FUTURE_DIR_SUFFIX = "-future";
+
+ /** Suffix of a directory that is used for stray partition */
+ public static final String STRAY_DIR_SUFFIX = "-stray";
+
private LogFileUtils() {
}
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogTruncation.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogTruncation.java
new file mode 100644
index 00000000000..1a819676b69
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogTruncation.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class LogTruncation implements SegmentDeletionReason {
+
+ private final Logger logger;
+
+ public LogTruncation(Logger logger) {
+ this.logger = logger;
+ }
+
+ @Override
+ public void logReason(List toDelete) {
+ logger.info("Deleting segments as part of log truncation: {}", toDelete.stream()
+ .map(LogSegment::toString)
+ .collect(Collectors.joining(", ")));
+ }
+}
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentDeletionReason.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentDeletionReason.java
new file mode 100644
index 00000000000..464ade51ee8
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentDeletionReason.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.util.List;
+
+public interface SegmentDeletionReason {
+ void logReason(List toDelete);
+}