From 6e88c10ed5628ff31c0aa3096d16a1c78ef3127f Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Mon, 28 Oct 2024 13:41:46 +0100 Subject: [PATCH] KAFKA-14483 Move LocalLog to storage module (#17587) Reviewers: Chia-Ping Tsai --- checkstyle/suppressions.xml | 2 +- core/src/main/scala/kafka/log/LocalLog.scala | 720 ---------------- .../src/main/scala/kafka/log/UnifiedLog.scala | 63 +- .../kafka/cluster/PartitionLockTest.scala | 2 +- .../unit/kafka/cluster/PartitionTest.scala | 2 +- .../scala/unit/kafka/log/LocalLogTest.scala | 108 +-- .../kafka/log/LogCleanerManagerTest.scala | 2 +- .../scala/unit/kafka/log/LogCleanerTest.scala | 2 +- .../scala/unit/kafka/log/LogLoaderTest.scala | 2 +- .../kafka/server/ReplicaManagerTest.scala | 4 +- .../unit/kafka/utils/SchedulerTest.scala | 4 +- gradle/spotbugs-exclude.xml | 1 - .../kafka/storage/internals/log/LocalLog.java | 768 +++++++++++++++++- .../storage/internals/log/LogFileUtils.java | 6 + .../storage/internals/log/LogTruncation.java | 38 + .../internals/log/SegmentDeletionReason.java | 23 + 16 files changed, 929 insertions(+), 818 deletions(-) delete mode 100644 core/src/main/scala/kafka/log/LocalLog.scala create mode 100644 storage/src/main/java/org/apache/kafka/storage/internals/log/LogTruncation.java create mode 100644 storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentDeletionReason.java diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 492422cd564..78ab9c097d8 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -344,7 +344,7 @@ + files="(LocalLog|LogLoader|LogValidator|RemoteLogManager|RemoteIndexCache).java"/> diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala deleted file mode 100644 index 09aaed13d68..00000000000 --- a/core/src/main/scala/kafka/log/LocalLog.scala +++ /dev/null @@ -1,720 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log - -import kafka.utils.Logging -import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeException} -import org.apache.kafka.common.message.FetchResponseData -import org.apache.kafka.common.record.MemoryRecords -import org.apache.kafka.common.utils.{Time, Utils} -import org.apache.kafka.common.{KafkaException, TopicPartition} -import org.apache.kafka.server.util.Scheduler -import org.apache.kafka.storage.internals.log.{AbortedTxn, FetchDataInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogSegment, LogSegments, OffsetPosition, LocalLog => JLocalLog} - -import java.io.File -import java.nio.file.Files -import java.util -import java.util.Collections.singletonList -import java.util.concurrent.atomic.AtomicLong -import java.util.regex.Pattern -import java.util.{Collections, Optional} -import scala.collection.mutable.ListBuffer -import scala.collection.Seq -import scala.jdk.CollectionConverters._ -import scala.jdk.OptionConverters.RichOptional - -/** - * An append-only log for storing messages locally. The log is a sequence of LogSegments, each with a base offset. - * New log segments are created according to a configurable policy that controls the size in bytes or time interval - * for a given segment. - * - * NOTE: this class is not thread-safe, and it relies on the thread safety provided by the Log class. - * - * @param _dir The directory in which log segments are created. - * @param config The log configuration settings - * @param segments The non-empty log segments recovered from disk - * @param recoveryPoint The offset at which to begin the next recovery i.e. the first offset which has not been flushed to disk - * @param nextOffsetMetadata The offset where the next message could be appended - * @param scheduler The thread pool scheduler used for background actions - * @param time The time instance used for checking the clock - * @param topicPartition The topic partition associated with this log - * @param logDirFailureChannel The LogDirFailureChannel instance to asynchronously handle Log dir failure - */ -class LocalLog(@volatile private var _dir: File, - @volatile private[log] var config: LogConfig, - private[log] val segments: LogSegments, - @volatile private[log] var recoveryPoint: Long, - @volatile private var nextOffsetMetadata: LogOffsetMetadata, - private[log] val scheduler: Scheduler, - private[log] val time: Time, - private[log] val topicPartition: TopicPartition, - private[log] val logDirFailureChannel: LogDirFailureChannel) extends Logging { - - import kafka.log.LocalLog._ - - this.logIdent = s"[LocalLog partition=$topicPartition, dir=${dir.getParent}] " - - // The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers() - // After memory mapped buffer is closed, no disk IO operation should be performed for this log. - @volatile private[log] var isMemoryMappedBufferClosed = false - - // Cache value of parent directory to avoid allocations in hot paths like ReplicaManager.checkpointHighWatermarks - @volatile private var _parentDir: String = dir.getParent - - // Last time the log was flushed - private val lastFlushedTime = new AtomicLong(time.milliseconds) - - private[log] def dir: File = _dir - - private[log] def name: String = dir.getName - - private[log] def parentDir: String = _parentDir - - private[log] def parentDirFile: File = new File(_parentDir) - - private[log] def isFuture: Boolean = dir.getName.endsWith(LocalLog.FutureDirSuffix) - - private def maybeHandleIOException[T](msg: => String)(fun: => T): T = { - JLocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, () => msg, () => fun) - } - - /** - * Rename the directory of the log - * @param name the new dir name - * @throws KafkaStorageException if rename fails - */ - private[log] def renameDir(name: String): Boolean = { - maybeHandleIOException(s"Error while renaming dir for $topicPartition in log dir ${dir.getParent}") { - val renamedDir = new File(dir.getParent, name) - Utils.atomicMoveWithFallback(dir.toPath, renamedDir.toPath) - if (renamedDir != dir) { - _dir = renamedDir - _parentDir = renamedDir.getParent - segments.updateParentDir(renamedDir) - true - } else { - false - } - } - } - - /** - * Update the existing configuration to the new provided configuration. - * @param newConfig the new configuration to be updated to - */ - private[log] def updateConfig(newConfig: LogConfig): Unit = { - val oldConfig = config - config = newConfig - val oldRecordVersion = oldConfig.recordVersion - val newRecordVersion = newConfig.recordVersion - if (newRecordVersion.precedes(oldRecordVersion)) - warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.") - } - - private[log] def checkIfMemoryMappedBufferClosed(): Unit = { - if (isMemoryMappedBufferClosed) - throw new KafkaStorageException(s"The memory mapped buffer for log of $topicPartition is already closed") - } - - private[log] def updateRecoveryPoint(newRecoveryPoint: Long): Unit = { - recoveryPoint = newRecoveryPoint - } - - /** - * Update recoveryPoint to provided offset and mark the log as flushed, if the offset is greater - * than the existing recoveryPoint. - * - * @param offset the offset to be updated - */ - private[log] def markFlushed(offset: Long): Unit = { - checkIfMemoryMappedBufferClosed() - if (offset > recoveryPoint) { - updateRecoveryPoint(offset) - lastFlushedTime.set(time.milliseconds) - } - } - - /** - * The number of messages appended to the log since the last flush - */ - private[log] def unflushedMessages: Long = logEndOffset - recoveryPoint - - /** - * Flush local log segments for all offsets up to offset-1. - * Does not update the recovery point. - * - * @param offset The offset to flush up to (non-inclusive) - */ - private[log] def flush(offset: Long): Unit = { - val currentRecoveryPoint = recoveryPoint - if (currentRecoveryPoint <= offset) { - val segmentsToFlush = segments.values(currentRecoveryPoint, offset) - segmentsToFlush.forEach(_.flush()) - // If there are any new segments, we need to flush the parent directory for crash consistency. - if (segmentsToFlush.stream().anyMatch(_.baseOffset >= currentRecoveryPoint)) { - // The directory might be renamed concurrently for topic deletion, which may cause NoSuchFileException here. - // Since the directory is to be deleted anyways, we just swallow NoSuchFileException and let it go. - Utils.flushDirIfExists(dir.toPath) - } - } - } - - /** - * The time this log is last known to have been fully flushed to disk - */ - private[log] def lastFlushTime: Long = lastFlushedTime.get - - /** - * The offset metadata of the next message that will be appended to the log - */ - private[log] def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata - - /** - * The offset of the next message that will be appended to the log - */ - private[log] def logEndOffset: Long = nextOffsetMetadata.messageOffset - - /** - * Update end offset of the log, and update the recoveryPoint. - * - * @param endOffset the new end offset of the log - */ - private[log] def updateLogEndOffset(endOffset: Long): Unit = { - nextOffsetMetadata = new LogOffsetMetadata(endOffset, segments.activeSegment.baseOffset, segments.activeSegment.size) - if (recoveryPoint > endOffset) { - updateRecoveryPoint(endOffset) - } - } - - /** - * Close file handlers used by log but don't write to disk. - * This is called if the log directory is offline. - */ - private[log] def closeHandlers(): Unit = { - segments.closeHandlers() - isMemoryMappedBufferClosed = true - } - - /** - * Closes the segments of the log. - */ - private[log] def close(): Unit = { - maybeHandleIOException(s"Error while renaming dir for $topicPartition in dir ${dir.getParent}") { - checkIfMemoryMappedBufferClosed() - segments.close() - } - } - - /** - * Completely delete this log directory with no delay. - */ - private[log] def deleteEmptyDir(): Unit = { - maybeHandleIOException(s"Error while deleting dir for $topicPartition in dir ${dir.getParent}") { - if (segments.nonEmpty) { - throw new IllegalStateException(s"Can not delete directory when ${segments.numberOfSegments} segments are still present") - } - if (!isMemoryMappedBufferClosed) { - throw new IllegalStateException(s"Can not delete directory when memory mapped buffer for log of $topicPartition is still open.") - } - Utils.delete(dir) - } - } - - /** - * Completely delete all segments with no delay. - * @return the deleted segments - */ - private[log] def deleteAllSegments(): util.List[LogSegment] = { - maybeHandleIOException(s"Error while deleting all segments for $topicPartition in dir ${dir.getParent}") { - val deletableSegments = new util.ArrayList(segments.values) - removeAndDeleteSegments(segments.values.asScala, asyncDelete = false, LogDeletion(this)) - isMemoryMappedBufferClosed = true - deletableSegments - } - } - - /** - * This method deletes the given log segments by doing the following for each of them: - * - It removes the segment from the segment map so that it will no longer be used for reads. - * - It renames the index and log files by appending .deleted to the respective file name - * - It can either schedule an asynchronous delete operation to occur in the future or perform the deletion synchronously - * - * Asynchronous deletion allows reads to happen concurrently without synchronization and without the possibility of - * physically deleting a file while it is being read. - * - * This method does not convert IOException to KafkaStorageException, the immediate caller - * is expected to catch and handle IOException. - * - * @param segmentsToDelete The log segments to schedule for deletion - * @param asyncDelete Whether the segment files should be deleted asynchronously - * @param reason The reason for the segment deletion - */ - private[log] def removeAndDeleteSegments(segmentsToDelete: Iterable[LogSegment], - asyncDelete: Boolean, - reason: SegmentDeletionReason): Unit = { - if (segmentsToDelete.nonEmpty) { - // Most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by - // removing the deleted segment, we should force materialization of the iterator here, so that results of the - // iteration remain valid and deterministic. We should also pass only the materialized view of the - // iterator to the logic that actually deletes the segments. - val toDelete = segmentsToDelete.toList - reason.logReason(toDelete) - toDelete.foreach { segment => - segments.remove(segment.baseOffset) - } - JLocalLog.deleteSegmentFiles(toDelete.asJava, asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent) - } - } - - /** - * This method deletes the given segment and creates a new segment with the given new base offset. It ensures an - * active segment exists in the log at all times during this process. - * - * Asynchronous deletion allows reads to happen concurrently without synchronization and without the possibility of - * physically deleting a file while it is being read. - * - * This method does not convert IOException to KafkaStorageException, the immediate caller - * is expected to catch and handle IOException. - * - * @param newOffset The base offset of the new segment - * @param segmentToDelete The old active segment to schedule for deletion - * @param asyncDelete Whether the segment files should be deleted asynchronously - * @param reason The reason for the segment deletion - */ - private[log] def createAndDeleteSegment(newOffset: Long, - segmentToDelete: LogSegment, - asyncDelete: Boolean, - reason: SegmentDeletionReason): LogSegment = { - if (newOffset == segmentToDelete.baseOffset) - segmentToDelete.changeFileSuffixes("", LogFileUtils.DELETED_FILE_SUFFIX) - - val newSegment = LogSegment.open(dir, - newOffset, - config, - time, - config.initFileSize, - config.preallocate) - segments.add(newSegment) - - reason.logReason(List(segmentToDelete)) - if (newOffset != segmentToDelete.baseOffset) - segments.remove(segmentToDelete.baseOffset) - JLocalLog.deleteSegmentFiles(singletonList(segmentToDelete), asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent) - - newSegment - } - - /** - * Given a message offset, find its corresponding offset metadata in the log. - * If the message offset is out of range, throw an OffsetOutOfRangeException - */ - private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { - val fetchDataInfo = read(offset, - maxLength = 1, - minOneMessage = false, - maxOffsetMetadata = nextOffsetMetadata, - includeAbortedTxns = false) - fetchDataInfo.fetchOffsetMetadata - } - - /** - * Read messages from the log. - * - * @param startOffset The offset to begin reading at - * @param maxLength The maximum number of bytes to read - * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists) - * @param maxOffsetMetadata The metadata of the maximum offset to be fetched - * @param includeAbortedTxns If true, aborted transactions are included - * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset - * @return The fetch data information including fetch starting offset metadata and messages read. - */ - def read(startOffset: Long, - maxLength: Int, - minOneMessage: Boolean, - maxOffsetMetadata: LogOffsetMetadata, - includeAbortedTxns: Boolean): FetchDataInfo = { - maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") { - trace(s"Reading maximum $maxLength bytes at offset $startOffset from log with " + - s"total length ${segments.sizeInBytes} bytes") - - val endOffsetMetadata = nextOffsetMetadata - val endOffset = endOffsetMetadata.messageOffset - var segmentOpt = segments.floorSegment(startOffset) - - // return error on attempt to read beyond the log end offset - if (startOffset > endOffset || !segmentOpt.isPresent) - throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + - s"but we only have log segments upto $endOffset.") - - if (startOffset == maxOffsetMetadata.messageOffset) - emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) - else if (startOffset > maxOffsetMetadata.messageOffset) - emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) - else { - // Do the read on the segment with a base offset less than the target offset - // but if that segment doesn't contain any messages with an offset greater than that - // continue to read from successive segments until we get some messages or we reach the end of the log - var fetchDataInfo: FetchDataInfo = null - while (fetchDataInfo == null && segmentOpt.isPresent) { - val segment = segmentOpt.get - val baseOffset = segment.baseOffset - - // 1. If `maxOffsetMetadata#segmentBaseOffset < segment#baseOffset`, then return maxPosition as empty. - // 2. Use the max-offset position if it is on this segment; otherwise, the segment size is the limit. - // 3. When maxOffsetMetadata is message-offset-only, then we don't know the relativePositionInSegment so - // return maxPosition as empty to avoid reading beyond the max-offset - val maxPositionOpt: Optional[java.lang.Long] = - if (segment.baseOffset < maxOffsetMetadata.segmentBaseOffset) - Optional.of(segment.size) - else if (segment.baseOffset == maxOffsetMetadata.segmentBaseOffset && !maxOffsetMetadata.messageOffsetOnly()) - Optional.of(maxOffsetMetadata.relativePositionInSegment) - else - Optional.empty() - - fetchDataInfo = segment.read(startOffset, maxLength, maxPositionOpt, minOneMessage) - if (fetchDataInfo != null) { - if (includeAbortedTxns) - fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo) - } else segmentOpt = segments.higherSegment(baseOffset) - } - - if (fetchDataInfo != null) fetchDataInfo - else { - // okay we are beyond the end of the last segment with no data fetched although the start offset is in range, - // this can happen when all messages with offset larger than start offsets have been deleted. - // In this case, we will return the empty set with log end offset metadata - new FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY) - } - } - } - } - - private[log] def append(lastOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = { - segments.activeSegment.append(lastOffset, largestTimestamp, shallowOffsetOfMaxTimestamp, records) - updateLogEndOffset(lastOffset + 1) - } - - private def addAbortedTransactions(startOffset: Long, segment: LogSegment, - fetchInfo: FetchDataInfo): FetchDataInfo = { - val fetchSize = fetchInfo.records.sizeInBytes - val startOffsetPosition = new OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset, - fetchInfo.fetchOffsetMetadata.relativePositionInSegment) - val upperBoundOffset = segment.fetchUpperBoundOffset(startOffsetPosition, fetchSize).orElse( - segments.higherSegment(segment.baseOffset).toScala.map(s => s.baseOffset).getOrElse(logEndOffset)) - - val abortedTransactions = ListBuffer.empty[FetchResponseData.AbortedTransaction] - def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction) - collectAbortedTransactions(startOffset, upperBoundOffset, segment, accumulator) - - new FetchDataInfo(fetchInfo.fetchOffsetMetadata, - fetchInfo.records, - fetchInfo.firstEntryIncomplete, - Optional.of(abortedTransactions.toList.asJava)) - } - - private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long, - startingSegment: LogSegment, - accumulator: Seq[AbortedTxn] => Unit): Unit = { - val higherSegments = segments.higherSegments(startingSegment.baseOffset).iterator - var segmentEntryOpt = Option(startingSegment) - while (segmentEntryOpt.isDefined) { - val segment = segmentEntryOpt.get - val searchResult = segment.collectAbortedTxns(startOffset, upperBoundOffset) - accumulator(searchResult.abortedTransactions.asScala) - if (searchResult.isComplete) - return - segmentEntryOpt = nextOption(higherSegments) - } - } - - private[log] def collectAbortedTransactions(logStartOffset: Long, baseOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = { - val segmentEntry = segments.floorSegment(baseOffset) - val allAbortedTxns = ListBuffer.empty[AbortedTxn] - def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = allAbortedTxns ++= abortedTxns - segmentEntry.ifPresent(segment => collectAbortedTransactions(logStartOffset, upperBoundOffset, segment, accumulator)) - allAbortedTxns.toList - } - - /** - * Roll the log over to a new active segment starting with the current logEndOffset. - * This will trim the index to the exact size of the number of entries it currently contains. - * - * @param expectedNextOffset The expected next offset after the segment is rolled - * - * @return The newly rolled segment - */ - private[log] def roll(expectedNextOffset: Option[Long] = None): LogSegment = { - maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") { - val start = time.hiResClockMs() - checkIfMemoryMappedBufferClosed() - val newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset) - val logFile = LogFileUtils.logFile(dir, newOffset, "") - val activeSegment = segments.activeSegment - if (segments.contains(newOffset)) { - // segment with the same base offset already exists and loaded - if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) { - // We have seen this happen (see KAFKA-6388) after shouldRoll() returns true for an - // active segment of size zero because of one of the indexes is "full" (due to _maxEntries == 0). - warn(s"Trying to roll a new log segment with start offset $newOffset " + - s"=max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already " + - s"exists and is active with size 0. Size of time index: ${activeSegment.timeIndex.entries}," + - s" size of offset index: ${activeSegment.offsetIndex.entries}.") - val newSegment = createAndDeleteSegment(newOffset, activeSegment, asyncDelete = true, LogRoll(this)) - updateLogEndOffset(nextOffsetMetadata.messageOffset) - info(s"Rolled new log segment at offset $newOffset in ${time.hiResClockMs() - start} ms.") - return newSegment - } else { - throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with start offset $newOffset" + - s" =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " + - s"segment is ${segments.get(newOffset)}.") - } - } else if (segments.nonEmpty && newOffset < activeSegment.baseOffset) { - throw new KafkaException( - s"Trying to roll a new log segment for topic partition $topicPartition with " + - s"start offset $newOffset =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active segment $activeSegment") - } else { - val offsetIdxFile = LogFileUtils.offsetIndexFile(dir, newOffset) - val timeIdxFile = LogFileUtils.timeIndexFile(dir, newOffset) - val txnIdxFile = LogFileUtils.transactionIndexFile(dir, newOffset) - - for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) { - warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first") - Files.delete(file.toPath) - } - - segments.lastSegment.ifPresent(_.onBecomeInactiveSegment()) - } - - val newSegment = LogSegment.open(dir, - newOffset, - config, - time, - config.initFileSize, - config.preallocate) - segments.add(newSegment) - - // We need to update the segment base offset and append position data of the metadata when log rolls. - // The next offset should not change. - updateLogEndOffset(nextOffsetMetadata.messageOffset) - - info(s"Rolled new log segment at offset $newOffset in ${time.hiResClockMs() - start} ms.") - - newSegment - } - } - - /** - * Delete all data in the local log and start at the new offset. - * - * @param newOffset The new offset to start the log with - * @return the list of segments that were scheduled for deletion - */ - private[log] def truncateFullyAndStartAt(newOffset: Long): Iterable[LogSegment] = { - maybeHandleIOException(s"Error while truncating the entire log for $topicPartition in dir ${dir.getParent}") { - debug(s"Truncate and start at offset $newOffset") - checkIfMemoryMappedBufferClosed() - val segmentsToDelete = new util.ArrayList(segments.values).asScala - - if (segmentsToDelete.nonEmpty) { - removeAndDeleteSegments(segmentsToDelete.dropRight(1), asyncDelete = true, LogTruncation(this)) - // Use createAndDeleteSegment() to create new segment first and then delete the old last segment to prevent missing - // active segment during the deletion process - createAndDeleteSegment(newOffset, segmentsToDelete.last, asyncDelete = true, LogTruncation(this)) - } - - updateLogEndOffset(newOffset) - - segmentsToDelete - } - } - - /** - * Truncate this log so that it ends with the greatest offset < targetOffset. - * - * @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete. - * @return the list of segments that were scheduled for deletion - */ - private[log] def truncateTo(targetOffset: Long): Iterable[LogSegment] = { - val deletableSegments = segments.filter(segment => segment.baseOffset > targetOffset).asScala - removeAndDeleteSegments(deletableSegments, asyncDelete = true, LogTruncation(this)) - segments.activeSegment.truncateTo(targetOffset) - updateLogEndOffset(targetOffset) - deletableSegments - } -} - -/** - * Helper functions for logs - */ -object LocalLog extends Logging { - - /** a directory that is scheduled to be deleted */ - private[log] val DeleteDirSuffix = LogFileUtils.DELETE_DIR_SUFFIX - - /** a directory that is used for future partition */ - private[log] val FutureDirSuffix = "-future" - - /** a directory that is used for stray partition */ - private[log] val StrayDirSuffix = "-stray" - - private[log] val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix") - private[log] val FutureDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$FutureDirSuffix") - private[log] val StrayDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$StrayDirSuffix") - - private[log] val UnknownOffset = -1L - - /** - * Return a directory name to rename the log directory to for async deletion. - * The name will be in the following format: "topic-partitionId.uniqueId-delete". - * If the topic name is too long, it will be truncated to prevent the total name - * from exceeding 255 characters. - */ - private[log] def logDeleteDirName(topicPartition: TopicPartition): String = { - logDirNameWithSuffixCappedLength(topicPartition, DeleteDirSuffix) - } - - /** - * Return a directory name to rename the log directory to for stray partition deletion. - * The name will be in the following format: "topic-partitionId.uniqueId-stray". - * If the topic name is too long, it will be truncated to prevent the total name - * from exceeding 255 characters. - */ - private[log] def logStrayDirName(topicPartition: TopicPartition): String = { - logDirNameWithSuffixCappedLength(topicPartition, StrayDirSuffix) - } - - /** - * Return a future directory name for the given topic partition. The name will be in the following - * format: topic-partition.uniqueId-future where topic, partition and uniqueId are variables. - */ - private[log] def logFutureDirName(topicPartition: TopicPartition): String = { - logDirNameWithSuffix(topicPartition, FutureDirSuffix) - } - - /** - * Return a new directory name in the following format: "${topic}-${partitionId}.${uniqueId}${suffix}". - * If the topic name is too long, it will be truncated to prevent the total name - * from exceeding 255 characters. - */ - private[log] def logDirNameWithSuffixCappedLength(topicPartition: TopicPartition, suffix: String): String = { - val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "") - val fullSuffix = s"-${topicPartition.partition()}.$uniqueId$suffix" - val prefixLength = Math.min(topicPartition.topic().length, 255 - fullSuffix.length) - s"${topicPartition.topic().substring(0, prefixLength)}$fullSuffix" - } - - private[log] def logDirNameWithSuffix(topicPartition: TopicPartition, suffix: String): String = { - val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "") - s"${logDirName(topicPartition)}.$uniqueId$suffix" - } - - /** - * Return a directory name for the given topic partition. The name will be in the following - * format: topic-partition where topic, partition are variables. - */ - private[log] def logDirName(topicPartition: TopicPartition): String = { - s"${topicPartition.topic}-${topicPartition.partition}" - } - - /** - * Parse the topic and partition out of the directory name of a log - */ - private[log] def parseTopicPartitionName(dir: File): TopicPartition = { - if (dir == null) - throw new KafkaException("dir should not be null") - - def exception(dir: File): KafkaException = { - new KafkaException(s"Found directory ${dir.getCanonicalPath}, '${dir.getName}' is not in the form of " + - "topic-partition or topic-partition.uniqueId-delete (if marked for deletion).\n" + - "Kafka's log directories (and children) should only contain Kafka topic data.") - } - - val dirName = dir.getName - if (dirName == null || dirName.isEmpty || !dirName.contains('-')) - throw exception(dir) - if (dirName.endsWith(DeleteDirSuffix) && !DeleteDirPattern.matcher(dirName).matches || - dirName.endsWith(FutureDirSuffix) && !FutureDirPattern.matcher(dirName).matches || - dirName.endsWith(StrayDirSuffix) && !StrayDirPattern.matcher(dirName).matches) - throw exception(dir) - - val name: String = - if (dirName.endsWith(DeleteDirSuffix) || dirName.endsWith(FutureDirSuffix) || dirName.endsWith(StrayDirSuffix)) - dirName.substring(0, dirName.lastIndexOf('.')) - else dirName - - val index = name.lastIndexOf('-') - val topic = name.substring(0, index) - val partitionString = name.substring(index + 1) - if (topic.isEmpty || partitionString.isEmpty) - throw exception(dir) - - val partition = - try partitionString.toInt - catch { case _: NumberFormatException => throw exception(dir) } - - new TopicPartition(topic, partition) - } - - private[log] def emptyFetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata, - includeAbortedTxns: Boolean): FetchDataInfo = { - val abortedTransactions: Optional[java.util.List[FetchResponseData.AbortedTransaction]] = - if (includeAbortedTxns) Optional.of(Collections.emptyList()) - else Optional.empty() - new FetchDataInfo(fetchOffsetMetadata, - MemoryRecords.EMPTY, - false, - abortedTransactions) - } - - /** - * Wraps the value of iterator.next() in an option. - * Note: this facility is a part of the Iterator class starting from scala v2.13. - * - * @param iterator - * @tparam T the type of object held within the iterator - * @return Some(iterator.next) if a next element exists, None otherwise. - */ - private[log] def nextOption[T](iterator: util.Iterator[T]): Option[T] = { - if (iterator.hasNext) - Some(iterator.next()) - else - None - } -} - -trait SegmentDeletionReason { - def logReason(toDelete: List[LogSegment]): Unit -} - -case class LogTruncation(log: LocalLog) extends SegmentDeletionReason { - override def logReason(toDelete: List[LogSegment]): Unit = { - log.info(s"Deleting segments as part of log truncation: ${toDelete.mkString(",")}") - } -} - -case class LogRoll(log: LocalLog) extends SegmentDeletionReason { - override def logReason(toDelete: List[LogSegment]): Unit = { - log.info(s"Deleting segments as part of log roll: ${toDelete.mkString(",")}") - } -} - -case class LogDeletion(log: LocalLog) extends SegmentDeletionReason { - override def logReason(toDelete: List[LogSegment]): Unit = { - log.info(s"Deleting segments as the log has been deleted: ${toDelete.mkString(",")}") - } -} diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 4747b5b0ee9..f4ef718f94f 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -18,7 +18,6 @@ package kafka.log import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException} -import kafka.log.LocalLog.nextOption import kafka.log.remote.RemoteLogManager import kafka.utils._ import org.apache.kafka.common.errors._ @@ -41,7 +40,7 @@ import org.apache.kafka.server.util.Scheduler import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile} import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.log.LocalLog.SplitSegmentResult -import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, VerificationGuard, LocalLog => JLocalLog, UnifiedLog => JUnifiedLog} +import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, SegmentDeletionReason, VerificationGuard, UnifiedLog => JUnifiedLog} import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats} import java.io.{File, IOException} @@ -1246,7 +1245,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, } private[log] def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = { - localLog.collectAbortedTransactions(logStartOffset, startOffset, upperBoundOffset) + localLog.collectAbortedTransactions(logStartOffset, startOffset, upperBoundOffset).asScala.toList } /** @@ -1559,7 +1558,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, val newLocalLogStartOffset = localLog.segments.higherSegment(segmentsToDelete.last.baseOffset()).get.baseOffset() incrementStartOffset(newLocalLogStartOffset, LogStartOffsetIncrementReason.SegmentDeletion) // remove the segments for lookups - localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason) + localLog.removeAndDeleteSegments(segmentsToDelete.toList.asJava, true, reason) } deleteProducerSnapshots(deletable.toList.asJava, asyncDelete = true) } @@ -1713,7 +1712,11 @@ class UnifiedLog(@volatile var logStartOffset: Long, * @return The newly rolled segment */ def roll(expectedNextOffset: Option[Long] = None): LogSegment = lock synchronized { - val newSegment = localLog.roll(expectedNextOffset) + val nextOffset : JLong = expectedNextOffset match { + case Some(offset) => offset + case None => 0L + } + val newSegment = localLog.roll(nextOffset) // Take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot // offset align with the new segment offset since this ensures we can recover the segment by beginning // with the corresponding snapshot file and scanning the segment data. Because the segment base offset @@ -1848,7 +1851,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, truncateFullyAndStartAt(targetOffset) } else { val deletedSegments = localLog.truncateTo(targetOffset) - deleteProducerSnapshots(deletedSegments.toList.asJava, asyncDelete = true) + deleteProducerSnapshots(deletedSegments, asyncDelete = true) leaderEpochCache.foreach(_.truncateFromEndAsyncFlush(targetOffset)) logStartOffset = math.min(targetOffset, logStartOffset) rebuildProducerState(targetOffset, producerStateManager) @@ -1962,7 +1965,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, private[log] def addSegment(segment: LogSegment): LogSegment = localLog.segments.add(segment) private def maybeHandleIOException[T](msg: => String)(fun: => T): T = { - JLocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, () => msg, () => fun) + LocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, () => msg, () => fun) } private[log] def splitOverflowedSegment(segment: LogSegment): List[LogSegment] = lock synchronized { @@ -1971,7 +1974,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, result.newSegments.asScala.toList } - private[log] def deleteProducerSnapshots(segments: util.List[LogSegment], asyncDelete: Boolean): Unit = { + private[log] def deleteProducerSnapshots(segments: util.Collection[LogSegment], asyncDelete: Boolean): Unit = { JUnifiedLog.deleteProducerSnapshots(segments, producerStateManager, asyncDelete, scheduler, config, logDirFailureChannel, parentDir, topicPartition) } } @@ -1991,14 +1994,9 @@ object UnifiedLog extends Logging { val DeleteDirSuffix: String = LogFileUtils.DELETE_DIR_SUFFIX - val StrayDirSuffix: String = LocalLog.StrayDirSuffix + val StrayDirSuffix: String = LogFileUtils.STRAY_DIR_SUFFIX - val FutureDirSuffix: String = LocalLog.FutureDirSuffix - - private[log] val DeleteDirPattern = LocalLog.DeleteDirPattern - private[log] val FutureDirPattern = LocalLog.FutureDirPattern - - val UnknownOffset: Long = LocalLog.UnknownOffset + val UnknownOffset: Long = LocalLog.UNKNOWN_OFFSET def isRemoteLogEnabled(remoteStorageSystemEnable: Boolean, config: LogConfig, @@ -2139,7 +2137,7 @@ object UnifiedLog extends Logging { logDirFailureChannel: LogDirFailureChannel, logPrefix: String, isRecoveredSwapFile: Boolean = false): Iterable[LogSegment] = { - JLocalLog.replaceSegments(existingSegments, + LocalLog.replaceSegments(existingSegments, newSegments.asJava, oldSegments.asJava, dir, @@ -2159,11 +2157,11 @@ object UnifiedLog extends Logging { scheduler: Scheduler, logDirFailureChannel: LogDirFailureChannel, logPrefix: String): SplitSegmentResult = { - JLocalLog.splitOverflowedSegment(segment, existingSegments, dir, topicPartition, config, scheduler, logDirFailureChannel, logPrefix) + LocalLog.splitOverflowedSegment(segment, existingSegments, dir, topicPartition, config, scheduler, logDirFailureChannel, logPrefix) } private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = { - JLocalLog.createNewCleanedSegment(dir, logConfig, baseOffset) + LocalLog.createNewCleanedSegment(dir, logConfig, baseOffset) } // Visible for benchmarking @@ -2194,6 +2192,21 @@ object UnifiedLog extends Logging { if (remoteLogEnabledAndRemoteCopyEnabled) config.localRetentionBytes else config.retentionSize } + /** + * Wraps the value of iterator.next() in an option. + * Note: this facility is a part of the Iterator class starting from scala v2.13. + * + * @param iterator the iterator + * @tparam T the type of object held within the iterator + * @return Some(iterator.next) if a next element exists, None otherwise. + */ + private[log] def nextOption[T](iterator: util.Iterator[T]): Option[T] = { + if (iterator.hasNext) + Some(iterator.next()) + else + None + } + } object LogMetricNames { @@ -2208,9 +2221,9 @@ object LogMetricNames { } case class RetentionMsBreach(log: UnifiedLog, remoteLogEnabledAndRemoteCopyEnabled: Boolean) extends SegmentDeletionReason { - override def logReason(toDelete: List[LogSegment]): Unit = { + override def logReason(toDelete: util.List[LogSegment]): Unit = { val retentionMs = UnifiedLog.localRetentionMs(log.config, remoteLogEnabledAndRemoteCopyEnabled) - toDelete.foreach { segment => + toDelete.forEach { segment => if (segment.largestRecordTimestamp.isPresent) if (remoteLogEnabledAndRemoteCopyEnabled) log.info(s"Deleting segment $segment due to local log retention time ${retentionMs}ms breach based on the largest " + @@ -2231,9 +2244,9 @@ case class RetentionMsBreach(log: UnifiedLog, remoteLogEnabledAndRemoteCopyEnabl } case class RetentionSizeBreach(log: UnifiedLog, remoteLogEnabledAndRemoteCopyEnabled: Boolean) extends SegmentDeletionReason { - override def logReason(toDelete: List[LogSegment]): Unit = { + override def logReason(toDelete: util.List[LogSegment]): Unit = { var size = log.size - toDelete.foreach { segment => + toDelete.forEach { segment => size -= segment.size if (remoteLogEnabledAndRemoteCopyEnabled) log.info(s"Deleting segment $segment due to local log retention size ${UnifiedLog.localRetentionSize(log.config, remoteLogEnabledAndRemoteCopyEnabled)} breach. " + s"Local log size after deletion will be $size.") @@ -2244,10 +2257,10 @@ case class RetentionSizeBreach(log: UnifiedLog, remoteLogEnabledAndRemoteCopyEna } case class StartOffsetBreach(log: UnifiedLog, remoteLogEnabled: Boolean) extends SegmentDeletionReason { - override def logReason(toDelete: List[LogSegment]): Unit = { + override def logReason(toDelete: util.List[LogSegment]): Unit = { if (remoteLogEnabled) - log.info(s"Deleting segments due to local log start offset ${log.localLogStartOffset()} breach: ${toDelete.mkString(",")}") + log.info(s"Deleting segments due to local log start offset ${log.localLogStartOffset()} breach: ${toDelete.asScala.mkString(",")}") else - log.info(s"Deleting segments due to log start offset ${log.logStartOffset} breach: ${toDelete.mkString(",")}") + log.info(s"Deleting segments due to log start offset ${log.logStartOffset} breach: ${toDelete.asScala.mkString(",")}") } } diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index 6d512c23853..179e098c347 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -41,7 +41,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams} import org.apache.kafka.server.util.MockTime import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 1ab842e4888..befc4d5fa12 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -63,7 +63,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams} import org.apache.kafka.server.util.{KafkaScheduler, MockTime} import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, LogAppendInfo, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, LocalLog, LogAppendInfo, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource diff --git a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala index 775c86d0208..0b840ddde62 100644 --- a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala @@ -31,7 +31,7 @@ import org.apache.kafka.common.errors.KafkaStorageException import org.apache.kafka.common.record.{MemoryRecords, Record, SimpleRecord} import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.server.util.{MockTime, Scheduler} -import org.apache.kafka.storage.internals.log.{FetchDataInfo, LocalLog => JLocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogSegment, LogSegments} +import org.apache.kafka.storage.internals.log.{FetchDataInfo, LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogSegment, LogSegments, LogTruncation, SegmentDeletionReason} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.function.Executable import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} @@ -98,10 +98,10 @@ class LocalLogTest { private def appendRecords(records: Iterable[SimpleRecord], log: LocalLog = log, initialOffset: Long = 0L): Unit = { - log.append(lastOffset = initialOffset + records.size - 1, - largestTimestamp = records.head.timestamp, - shallowOffsetOfMaxTimestamp = initialOffset, - records = MemoryRecords.withRecords(initialOffset, Compression.NONE, 0, records.toList : _*)) + log.append(initialOffset + records.size - 1, + records.head.timestamp, + initialOffset, + MemoryRecords.withRecords(initialOffset, Compression.NONE, 0, records.toList : _*)) } private def readRecords(log: LocalLog = log, @@ -112,16 +112,16 @@ class LocalLogTest { includeAbortedTxns: Boolean = false): FetchDataInfo = { log.read(startOffset, maxLength, - minOneMessage = minOneMessage, + minOneMessage, maxOffsetMetadata, - includeAbortedTxns = includeAbortedTxns) + includeAbortedTxns) } @Test def testLogDeleteSegmentsSuccess(): Unit = { val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes) appendRecords(List(record)) - log.roll() + log.roll(0L) assertEquals(2, log.segments.numberOfSegments) assertFalse(logDir.listFiles.isEmpty) val segmentsBeforeDelete = new util.ArrayList(log.segments.values) @@ -135,7 +135,7 @@ class LocalLogTest { @Test def testRollEmptyActiveSegment(): Unit = { val oldActiveSegment = log.segments.activeSegment - log.roll() + log.roll(0L) assertEquals(1, log.segments.numberOfSegments) assertNotEquals(oldActiveSegment, log.segments.activeSegment) assertFalse(logDir.listFiles.isEmpty) @@ -146,7 +146,7 @@ class LocalLogTest { def testLogDeleteDirSuccessWhenEmptyAndFailureWhenNonEmpty(): Unit ={ val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes) appendRecords(List(record)) - log.roll() + log.roll(0L) assertEquals(2, log.segments.numberOfSegments) assertFalse(logDir.listFiles.isEmpty) @@ -172,7 +172,7 @@ class LocalLogTest { def testLogDirRenameToNewDir(): Unit = { val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes) appendRecords(List(record)) - log.roll() + log.roll(0L) assertEquals(2, log.segments.numberOfSegments) val newLogDir = TestUtils.randomPartitionLogDir(tmpDir) assertTrue(log.renameDir(newLogDir.getName)) @@ -198,7 +198,7 @@ class LocalLogTest { val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes) appendRecords(List(record)) mockTime.sleep(1) - val newSegment = log.roll() + val newSegment = log.roll(0L) log.flush(newSegment.baseOffset) log.markFlushed(newSegment.baseOffset) assertEquals(1L, log.recoveryPoint) @@ -263,29 +263,29 @@ class LocalLogTest { for (offset <- 0 to 8) { val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes) appendRecords(List(record), initialOffset = offset) - log.roll() + log.roll(0L) } assertEquals(10L, log.segments.numberOfSegments) class TestDeletionReason extends SegmentDeletionReason { - private var _deletedSegments: Iterable[LogSegment] = List[LogSegment]() + private var _deletedSegments: util.Collection[LogSegment] = new util.ArrayList() - override def logReason(toDelete: List[LogSegment]): Unit = { - _deletedSegments = List[LogSegment]() ++ toDelete + override def logReason(toDelete: util.List[LogSegment]): Unit = { + _deletedSegments = new util.ArrayList(toDelete) } - def deletedSegments: Iterable[LogSegment] = _deletedSegments + def deletedSegments: util.Collection[LogSegment] = _deletedSegments } val reason = new TestDeletionReason() - val toDelete = log.segments.values.asScala.toVector - log.removeAndDeleteSegments(toDelete, asyncDelete = asyncDelete, reason) + val toDelete = new util.ArrayList(log.segments.values) + log.removeAndDeleteSegments(toDelete, asyncDelete, reason) if (asyncDelete) { mockTime.sleep(log.config.fileDeleteDelayMs + 1) } assertTrue(log.segments.isEmpty) assertEquals(toDelete, reason.deletedSegments) - toDelete.foreach(segment => assertTrue(segment.deleted())) + toDelete.forEach(segment => assertTrue(segment.deleted())) } @Test @@ -302,13 +302,13 @@ class LocalLogTest { for (offset <- 0 to 8) { val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes) appendRecords(List(record), initialOffset = offset) - log.roll() + log.roll(0L) } assertEquals(10L, log.segments.numberOfSegments) val toDelete = log.segments.values - JLocalLog.deleteSegmentFiles(toDelete, asyncDelete, log.dir, log.topicPartition, log.config, log.scheduler, log.logDirFailureChannel, "") + LocalLog.deleteSegmentFiles(toDelete, asyncDelete, log.dir, log.topicPartition, log.config, log.scheduler, log.logDirFailureChannel, "") if (asyncDelete) { toDelete.forEach { segment => @@ -336,7 +336,7 @@ class LocalLogTest { appendRecords(List(record)) val newOffset = log.segments.activeSegment.baseOffset + 1 val oldActiveSegment = log.segments.activeSegment - val newActiveSegment = log.createAndDeleteSegment(newOffset, log.segments.activeSegment, asyncDelete = true, LogTruncation(log)) + val newActiveSegment = log.createAndDeleteSegment(newOffset, log.segments.activeSegment, true, new LogTruncation(log.logger)) assertEquals(1, log.segments.numberOfSegments) assertEquals(newActiveSegment, log.segments.activeSegment) assertNotEquals(oldActiveSegment, log.segments.activeSegment) @@ -354,7 +354,7 @@ class LocalLogTest { for (offset <- 0 to 7) { appendRecords(List(record), initialOffset = offset) if (offset % 2 != 0) - log.roll() + log.roll(0L) } for (offset <- 8 to 12) { val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes) @@ -362,7 +362,7 @@ class LocalLogTest { } assertEquals(5, log.segments.numberOfSegments) assertNotEquals(10L, log.segments.activeSegment.baseOffset) - val expected = log.segments.values.asScala.toVector + val expected = new util.ArrayList(log.segments.values) val deleted = log.truncateFullyAndStartAt(10L) assertEquals(expected, deleted) assertEquals(1, log.segments.numberOfSegments) @@ -379,7 +379,7 @@ class LocalLogTest { for (offset <- 0 to 4) { appendRecords(List(record), initialOffset = offset) if (offset % 2 != 0) - log.roll() + log.roll(0L) } assertEquals(3, log.segments.numberOfSegments) @@ -416,15 +416,15 @@ class LocalLogTest { val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes) appendRecords(List(record), initialOffset = offset) if (offset % 3 == 2) - log.roll() + log.roll(0L) } assertEquals(5, log.segments.numberOfSegments) assertEquals(12L, log.logEndOffset) - val expected = log.segments.values(9L, log.logEndOffset + 1).asScala.toVector + val expected = new util.ArrayList(log.segments.values(9L, log.logEndOffset + 1)) // Truncate to an offset before the base offset of the active segment val deleted = log.truncateTo(7L) - assertEquals(expected, deleted.toVector) + assertEquals(expected, deleted) assertEquals(3, log.segments.numberOfSegments) assertEquals(6L, log.segments.activeSegment.baseOffset) assertEquals(0L, log.recoveryPoint) @@ -444,7 +444,7 @@ class LocalLogTest { for (i <- 0 until 5) { val keyValues = Seq(KeyValue(i.toString, i.toString)) appendRecords(kvsToRecords(keyValues), initialOffset = i) - log.roll() + log.roll(0L) } def nonActiveBaseOffsetsFrom(startOffset: Long): Seq[Long] = { @@ -506,7 +506,7 @@ class LocalLogTest { assertThrows(classOf[KafkaException], () => LocalLog.parseTopicPartitionName(dir), () => "KafkaException should have been thrown for dir: " + dir.getCanonicalPath) // also test the "-delete" marker case - val deleteMarkerDir = new File(logDir, topic + partition + "." + LocalLog.DeleteDirSuffix) + val deleteMarkerDir = new File(logDir, topic + partition + "." + LogFileUtils.DELETE_DIR_SUFFIX) assertThrows(classOf[KafkaException], () => LocalLog.parseTopicPartitionName(deleteMarkerDir), () => "KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath) } @@ -535,7 +535,7 @@ class LocalLogTest { () => "KafkaException should have been thrown for dir: " + dir.getCanonicalPath) // also test the "-delete" marker case - val deleteMarkerDir = new File(logDir, topicPartitionName(topic, partition) + "." + LocalLog.DeleteDirSuffix) + val deleteMarkerDir = new File(logDir, topicPartitionName(topic, partition) + "." + LogFileUtils.DELETE_DIR_SUFFIX) assertThrows(classOf[KafkaException], () => LocalLog.parseTopicPartitionName(deleteMarkerDir), () => "KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath) } @@ -549,7 +549,7 @@ class LocalLogTest { () => "KafkaException should have been thrown for dir: " + dir.getCanonicalPath) // also test the "-delete" marker case - val deleteMarkerDir = new File(logDir, topic + partition + "." + LocalLog.DeleteDirSuffix) + val deleteMarkerDir = new File(logDir, topic + partition + "." + LogFileUtils.DELETE_DIR_SUFFIX) assertThrows(classOf[KafkaException], () => LocalLog.parseTopicPartitionName(deleteMarkerDir), () => "KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath) } @@ -569,14 +569,14 @@ class LocalLogTest { val name1 = LocalLog.logDeleteDirName(new TopicPartition("foo", 3)) assertTrue(name1.length <= 255) assertTrue(Pattern.compile("foo-3\\.[0-9a-z]{32}-delete").matcher(name1).matches()) - assertTrue(LocalLog.DeleteDirPattern.matcher(name1).matches()) - assertFalse(LocalLog.FutureDirPattern.matcher(name1).matches()) + assertTrue(LocalLog.DELETE_DIR_PATTERN.matcher(name1).matches()) + assertFalse(LocalLog.FUTURE_DIR_PATTERN.matcher(name1).matches()) val name2 = LocalLog.logDeleteDirName( new TopicPartition("n" + String.join("", Collections.nCopies(248, "o")), 5)) assertEquals(255, name2.length) assertTrue(Pattern.compile("n[o]{212}-5\\.[0-9a-z]{32}-delete").matcher(name2).matches()) - assertTrue(LocalLog.DeleteDirPattern.matcher(name2).matches()) - assertFalse(LocalLog.FutureDirPattern.matcher(name2).matches()) + assertTrue(LocalLog.DELETE_DIR_PATTERN.matcher(name2).matches()) + assertFalse(LocalLog.FUTURE_DIR_PATTERN.matcher(name2).matches()) } @Test @@ -598,7 +598,7 @@ class LocalLogTest { assertEquals(1, log.segments.numberOfSegments, "Log begins with a single empty segment.") // roll active segment with the same base offset of size zero should recreate the segment - log.roll(Some(0L)) + log.roll(0L) assertEquals(1, log.segments.numberOfSegments, "Expect 1 segment after roll() empty segment with base offset.") // should be able to append records to active segment @@ -614,7 +614,7 @@ class LocalLogTest { assertEquals(keyValues1 ++ keyValues2, recordsToKvs(readResult.records.records.asScala)) // roll so that active segment is empty - log.roll() + log.roll(0L) assertEquals(2L, log.segments.activeSegment.baseOffset, "Expect base offset of active segment to be LEO") assertEquals(2, log.segments.numberOfSegments, "Expect two segments.") assertEquals(2L, log.logEndOffset) @@ -626,7 +626,7 @@ class LocalLogTest { // roll active segment with the same base offset of size zero should recreate the segment { - val newSegment = log.roll() + val newSegment = log.roll(0L) assertEquals(0L, newSegment.baseOffset) assertEquals(1, log.segments.numberOfSegments) assertEquals(0L, log.logEndOffset) @@ -635,7 +635,7 @@ class LocalLogTest { appendRecords(List(KeyValue("k1", "v1").toRecord())) { - val newSegment = log.roll() + val newSegment = log.roll(0L) assertEquals(1L, newSegment.baseOffset) assertEquals(2, log.segments.numberOfSegments) assertEquals(1L, log.logEndOffset) @@ -644,7 +644,7 @@ class LocalLogTest { appendRecords(List(KeyValue("k2", "v2").toRecord()), initialOffset = 1L) { - val newSegment = log.roll(Some(1L)) + val newSegment = log.roll(1L) assertEquals(2L, newSegment.baseOffset) assertEquals(3, log.segments.numberOfSegments) assertEquals(2L, log.logEndOffset) @@ -661,7 +661,7 @@ class LocalLogTest { assertEquals(3, log.logEndOffset, "Expect two records in the log") // roll to create an empty active segment - log.roll() + log.roll(0L) assertEquals(3L, log.segments.activeSegment.baseOffset) // intentionally setup the logEndOffset to introduce an error later @@ -669,7 +669,7 @@ class LocalLogTest { // expect an error because of attempt to roll to a new offset (1L) that's lower than the // base offset (3L) of the active segment - assertThrows(classOf[KafkaException], () => log.roll()) + assertThrows(classOf[KafkaException], () => log.roll(0L)) } @Test @@ -679,7 +679,7 @@ class LocalLogTest { val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes) appendRecords(List(record)) mockTime.sleep(1) - val newSegment = log.roll() + val newSegment = log.roll(0L) // simulate the directory is renamed concurrently doReturn(new File("__NON_EXISTENT__"), Nil: _*).when(spyLog).dir @@ -701,14 +701,14 @@ class LocalLogTest { time, config.initFileSize, config.preallocate)) - new LocalLog(_dir = dir, - config = config, - segments = segments, - recoveryPoint = recoveryPoint, - nextOffsetMetadata = nextOffsetMetadata, - scheduler = scheduler, - time = time, - topicPartition = topicPartition, - logDirFailureChannel = logDirFailureChannel) + new LocalLog(dir, + config, + segments, + recoveryPoint, + nextOffsetMetadata, + scheduler, + time, + topicPartition, + logDirFailureChannel) } } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 68abd08f22f..5b3cc00732d 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -28,7 +28,7 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Utils import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.util.MockTime -import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, LogDirFailureChannel, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig} +import org.apache.kafka.storage.internals.log.{AppendOrigin, LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 9086626706c..81600b0f201 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -29,7 +29,7 @@ import org.apache.kafka.common.utils.Utils import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} import org.apache.kafka.server.util.MockTime -import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig} +import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig} import org.apache.kafka.storage.internals.utils.Throttler import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.api.Assertions._ diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index b72c1c555db..e7e99852c53 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -31,7 +31,7 @@ import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0 import org.apache.kafka.server.util.{MockTime, Scheduler} import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile} +import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile} import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue} diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 0642dfebd19..ddc9cac8d81 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -20,7 +20,7 @@ package kafka.server import com.yammer.metrics.core.{Gauge, Meter, Timer} import kafka.cluster.PartitionTest.MockPartitionListener import kafka.cluster.Partition -import kafka.log.{LocalLog, LogManager, LogManagerTest, UnifiedLog} +import kafka.log.{LogManager, LogManagerTest, UnifiedLog} import kafka.log.remote.RemoteLogManager import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics @@ -70,7 +70,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPa import org.apache.kafka.server.util.timer.MockTimer import org.apache.kafka.server.util.{MockScheduler, MockTime} import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, PartitionMetadataFile} -import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, VerificationGuard} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeEach, Test} diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala index fa67126d275..3aaa58641bc 100644 --- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala +++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala @@ -19,11 +19,11 @@ package kafka.utils import java.util.Properties import java.util.concurrent.atomic._ import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Executors, TimeUnit} -import kafka.log.{LocalLog, UnifiedLog} +import kafka.log.UnifiedLog import kafka.utils.TestUtils.retry import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.util.{KafkaScheduler, MockTime} -import org.apache.kafka.storage.internals.log.{LogConfig, LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig} +import org.apache.kafka.storage.internals.log.{LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout} diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index 0dc824d1f43..b5a3c9bd96e 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -163,7 +163,6 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read Given that, this bug pattern doesn't make sense for Scala code. --> - diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java index a5436f448db..d2cd71addad 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java @@ -16,10 +16,16 @@ */ package org.apache.kafka.storage.internals.log; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.OffsetOutOfRangeException; +import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.record.FileLogInputStream; import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordVersion; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.util.Scheduler; @@ -29,25 +35,774 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.nio.file.Files; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.Iterator; import java.util.List; +import java.util.Optional; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.regex.Pattern; import java.util.stream.Collectors; +import static java.util.Collections.singletonList; import static org.apache.kafka.common.utils.Utils.require; import static org.apache.kafka.storage.internals.log.LogFileUtils.CLEANED_FILE_SUFFIX; import static org.apache.kafka.storage.internals.log.LogFileUtils.DELETED_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.DELETE_DIR_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.FUTURE_DIR_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.STRAY_DIR_SUFFIX; import static org.apache.kafka.storage.internals.log.LogFileUtils.SWAP_FILE_SUFFIX; import static org.apache.kafka.storage.internals.log.LogFileUtils.isLogFile; +/** + * An append-only log for storing messages locally. The log is a sequence of LogSegments, each with a base offset. + * New log segments are created according to a configurable policy that controls the size in bytes or time interval + * for a given segment. + * NOTE: this class is not thread-safe, and it relies on the thread safety provided by the Log class. + */ public class LocalLog { private static final Logger LOG = LoggerFactory.getLogger(LocalLog.class); + public static final Pattern DELETE_DIR_PATTERN = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + LogFileUtils.DELETE_DIR_SUFFIX); + public static final Pattern FUTURE_DIR_PATTERN = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + LogFileUtils.FUTURE_DIR_SUFFIX); + public static final Pattern STRAY_DIR_PATTERN = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + LogFileUtils.STRAY_DIR_SUFFIX); + public static final long UNKNOWN_OFFSET = -1L; + + // Last time the log was flushed + private final AtomicLong lastFlushedTime; + private final String logIdent; + private final LogSegments segments; + private final Scheduler scheduler; + private final Time time; + private final TopicPartition topicPartition; + private final LogDirFailureChannel logDirFailureChannel; + private final Logger logger; + + private volatile LogOffsetMetadata nextOffsetMetadata; + // The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers() + // After memory mapped buffer is closed, no disk IO operation should be performed for this log. + private volatile boolean isMemoryMappedBufferClosed = false; + // Cache value of parent directory to avoid allocations in hot paths like ReplicaManager.checkpointHighWatermarks + private volatile String parentDir; + private volatile LogConfig config; + private volatile long recoveryPoint; + private File dir; + + /** + * @param dir The directory in which log segments are created. + * @param config The log configuration settings + * @param segments The non-empty log segments recovered from disk + * @param recoveryPoint The offset at which to begin the next recovery i.e. the first offset which has not been flushed to disk + * @param nextOffsetMetadata The offset where the next message could be appended + * @param scheduler The thread pool scheduler used for background actions + * @param time The time instance used for checking the clock + * @param topicPartition The topic partition associated with this log + * @param logDirFailureChannel The LogDirFailureChannel instance to asynchronously handle Log dir failure + */ + public LocalLog(File dir, + LogConfig config, + LogSegments segments, + long recoveryPoint, + LogOffsetMetadata nextOffsetMetadata, + Scheduler scheduler, + Time time, + TopicPartition topicPartition, + LogDirFailureChannel logDirFailureChannel) { + this.dir = dir; + this.config = config; + this.segments = segments; + this.recoveryPoint = recoveryPoint; + this.nextOffsetMetadata = nextOffsetMetadata; + this.scheduler = scheduler; + this.time = time; + this.topicPartition = topicPartition; + this.logDirFailureChannel = logDirFailureChannel; + this.logIdent = "[LocalLog partition=" + topicPartition + ", dir=" + dir + "] "; + this.logger = new LogContext(logIdent).logger(LocalLog.class); + // Last time the log was flushed + this.lastFlushedTime = new AtomicLong(time.milliseconds()); + this.parentDir = dir.getParent(); + } + + public File dir() { + return dir; + } + + public Logger logger() { + return logger; + } + + public LogConfig config() { + return config; + } + + public LogSegments segments() { + return segments; + } + + public Scheduler scheduler() { + return scheduler; + } + + public LogOffsetMetadata nextOffsetMetadata() { + return nextOffsetMetadata; + } + + public TopicPartition topicPartition() { + return topicPartition; + } + + public LogDirFailureChannel logDirFailureChannel() { + return logDirFailureChannel; + } + + public long recoveryPoint() { + return recoveryPoint; + } + + public Time time() { + return time; + } + + public String name() { + return dir.getName(); + } + + public String parentDir() { + return parentDir; + } + + public File parentDirFile() { + return new File(parentDir); + } + + public boolean isFuture() { + return dir.getName().endsWith(LogFileUtils.FUTURE_DIR_SUFFIX); + } + + private T maybeHandleIOException(Supplier errorMsgSupplier, StorageAction function) { + return maybeHandleIOException(logDirFailureChannel, parentDir, errorMsgSupplier, function); + } + + /** + * Rename the directory of the log + * @param name the new dir name + * @throws KafkaStorageException if rename fails + */ + public boolean renameDir(String name) { + return maybeHandleIOException( + () -> "Error while renaming dir for " + topicPartition + " in log dir " + dir.getParent(), + () -> { + File renamedDir = new File(dir.getParent(), name); + Utils.atomicMoveWithFallback(dir.toPath(), renamedDir.toPath()); + if (!renamedDir.equals(dir)) { + dir = renamedDir; + parentDir = renamedDir.getParent(); + segments.updateParentDir(renamedDir); + return true; + } else { + return false; + } + } + ); + } + + /** + * Update the existing configuration to the new provided configuration. + * @param newConfig the new configuration to be updated to + */ + public void updateConfig(LogConfig newConfig) { + LogConfig oldConfig = config; + config = newConfig; + RecordVersion oldRecordVersion = oldConfig.recordVersion(); + RecordVersion newRecordVersion = newConfig.recordVersion(); + if (newRecordVersion.precedes(oldRecordVersion)) { + logger.warn("Record format version has been downgraded from {} to {}.", oldRecordVersion, newRecordVersion); + } + } + + public void checkIfMemoryMappedBufferClosed() { + if (isMemoryMappedBufferClosed) { + throw new KafkaStorageException("The memory mapped buffer for log of " + topicPartition + " is already closed"); + } + } + + public void updateRecoveryPoint(long newRecoveryPoint) { + recoveryPoint = newRecoveryPoint; + } + + /** + * Update recoveryPoint to provided offset and mark the log as flushed, if the offset is greater + * than the existing recoveryPoint. + * + * @param offset the offset to be updated + */ + public void markFlushed(long offset) { + checkIfMemoryMappedBufferClosed(); + if (offset > recoveryPoint) { + updateRecoveryPoint(offset); + lastFlushedTime.set(time.milliseconds()); + } + } + + /** + * The number of messages appended to the log since the last flush + */ + public long unflushedMessages() { + return logEndOffset() - recoveryPoint; + } + + /** + * Flush local log segments for all offsets up to offset-1. + * Does not update the recovery point. + * + * @param offset The offset to flush up to (non-inclusive) + */ + public void flush(long offset) throws IOException { + long currentRecoveryPoint = recoveryPoint; + if (currentRecoveryPoint <= offset) { + Collection segmentsToFlush = segments.values(currentRecoveryPoint, offset); + for (LogSegment logSegment : segmentsToFlush) { + logSegment.flush(); + } + // If there are any new segments, we need to flush the parent directory for crash consistency. + if (segmentsToFlush.stream().anyMatch(s -> s.baseOffset() >= currentRecoveryPoint)) { + // The directory might be renamed concurrently for topic deletion, which may cause NoSuchFileException here. + // Since the directory is to be deleted anyways, we just swallow NoSuchFileException and let it go. + Utils.flushDirIfExists(dir.toPath()); + } + } + } + + /** + * The time this log is last known to have been fully flushed to disk + */ + public long lastFlushTime() { + return lastFlushedTime.get(); + } + + /** + * The offset metadata of the next message that will be appended to the log + */ + public LogOffsetMetadata logEndOffsetMetadata() { + return nextOffsetMetadata; + } + + /** + * The offset of the next message that will be appended to the log + */ + public long logEndOffset() { + return nextOffsetMetadata.messageOffset; + } + + /** + * Update end offset of the log, and update the recoveryPoint. + * + * @param endOffset the new end offset of the log + */ + public void updateLogEndOffset(long endOffset) { + nextOffsetMetadata = new LogOffsetMetadata(endOffset, segments.activeSegment().baseOffset(), segments.activeSegment().size()); + if (recoveryPoint > endOffset) { + updateRecoveryPoint(endOffset); + } + } + + /** + * Close file handlers used by log but don't write to disk. + * This is called if the log directory is offline. + */ + public void closeHandlers() { + segments.closeHandlers(); + isMemoryMappedBufferClosed = true; + } + + /** + * Closes the segments of the log. + */ + public void close() { + maybeHandleIOException( + () -> "Error while renaming dir for " + topicPartition + " in dir " + dir.getParent(), + () -> { + checkIfMemoryMappedBufferClosed(); + segments.close(); + return null; + } + ); + } + + /** + * Completely delete this log directory with no delay. + */ + public void deleteEmptyDir() { + maybeHandleIOException( + () -> "Error while deleting dir for " + topicPartition + " in dir " + dir.getParent(), + () -> { + if (!segments.isEmpty()) { + throw new IllegalStateException("Can not delete directory when " + segments.numberOfSegments() + " segments are still present"); + } + if (!isMemoryMappedBufferClosed) { + throw new IllegalStateException("Can not delete directory when memory mapped buffer for log of " + topicPartition + " is still open."); + } + Utils.delete(dir); + return null; + } + ); + } + + /** + * Completely delete all segments with no delay. + * @return the deleted segments + */ + public List deleteAllSegments() { + return maybeHandleIOException( + () -> "Error while deleting all segments for $topicPartition in dir ${dir.getParent}", + () -> { + List deletableSegments = new ArrayList<>(segments.values()); + removeAndDeleteSegments( + segments.values(), + false, + toDelete -> logger.info("Deleting segments as the log has been deleted: {}", toDelete.stream() + .map(LogSegment::toString) + .collect(Collectors.joining(", ")))); + isMemoryMappedBufferClosed = true; + return deletableSegments; + } + ); + } + + /** + * This method deletes the given log segments by doing the following for each of them: + *
    + *
  • It removes the segment from the segment map so that it will no longer be used for reads. + *
  • It renames the index and log files by appending .deleted to the respective file name + *
  • It can either schedule an asynchronous delete operation to occur in the future or perform the deletion synchronously + *
+ * Asynchronous deletion allows reads to happen concurrently without synchronization and without the possibility of + * physically deleting a file while it is being read. + * This method does not convert IOException to KafkaStorageException, the immediate caller + * is expected to catch and handle IOException. + * + * @param segmentsToDelete The log segments to schedule for deletion + * @param asyncDelete Whether the segment files should be deleted asynchronously + * @param reason The reason for the segment deletion + */ + public void removeAndDeleteSegments(Collection segmentsToDelete, + boolean asyncDelete, + SegmentDeletionReason reason) throws IOException { + if (!segmentsToDelete.isEmpty()) { + // Most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by + // removing the deleted segment, we should force materialization of the iterator here, so that results of the + // iteration remain valid and deterministic. We should also pass only the materialized view of the + // iterator to the logic that actually deletes the segments. + List toDelete = new ArrayList<>(segmentsToDelete); + reason.logReason(toDelete); + toDelete.forEach(segment -> segments.remove(segment.baseOffset())); + deleteSegmentFiles(toDelete, asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent); + } + } + + /** + * This method deletes the given segment and creates a new segment with the given new base offset. It ensures an + * active segment exists in the log at all times during this process. + *
+ * Asynchronous deletion allows reads to happen concurrently without synchronization and without the possibility of + * physically deleting a file while it is being read. + *
+ * This method does not convert IOException to KafkaStorageException, the immediate caller + * is expected to catch and handle IOException. + * + * @param newOffset The base offset of the new segment + * @param segmentToDelete The old active segment to schedule for deletion + * @param asyncDelete Whether the segment files should be deleted asynchronously + * @param reason The reason for the segment deletion + */ + public LogSegment createAndDeleteSegment(long newOffset, + LogSegment segmentToDelete, + boolean asyncDelete, + SegmentDeletionReason reason) throws IOException { + if (newOffset == segmentToDelete.baseOffset()) { + segmentToDelete.changeFileSuffixes("", LogFileUtils.DELETED_FILE_SUFFIX); + } + LogSegment newSegment = LogSegment.open(dir, + newOffset, + config, + time, + config.initFileSize(), + config.preallocate); + segments.add(newSegment); + + reason.logReason(singletonList(segmentToDelete)); + if (newOffset != segmentToDelete.baseOffset()) { + segments.remove(segmentToDelete.baseOffset()); + } + deleteSegmentFiles(singletonList(segmentToDelete), asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent); + return newSegment; + } + + /** + * Given a message offset, find its corresponding offset metadata in the log. + * If the message offset is out of range, throw an OffsetOutOfRangeException + */ + public LogOffsetMetadata convertToOffsetMetadataOrThrow(long offset) throws IOException { + FetchDataInfo fetchDataInfo = read(offset, 1, false, nextOffsetMetadata, false); + return fetchDataInfo.fetchOffsetMetadata; + } + + /** + * Read messages from the log. + * + * @param startOffset The offset to begin reading at + * @param maxLength The maximum number of bytes to read + * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists) + * @param maxOffsetMetadata The metadata of the maximum offset to be fetched + * @param includeAbortedTxns If true, aborted transactions are included + * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset + * @return The fetch data information including fetch starting offset metadata and messages read. + */ + public FetchDataInfo read(long startOffset, + int maxLength, + boolean minOneMessage, + LogOffsetMetadata maxOffsetMetadata, + boolean includeAbortedTxns) throws IOException { + return maybeHandleIOException( + () -> "Exception while reading from " + topicPartition + " in dir " + dir.getParent(), + () -> { + logger.trace("Reading maximum $maxLength bytes at offset {} from log with total length {} bytes", + startOffset, segments.sizeInBytes()); + + LogOffsetMetadata endOffsetMetadata = nextOffsetMetadata; + long endOffset = endOffsetMetadata.messageOffset; + Optional segmentOpt = segments.floorSegment(startOffset); + // return error on attempt to read beyond the log end offset + if (startOffset > endOffset || segmentOpt.isEmpty()) { + throw new OffsetOutOfRangeException("Received request for offset " + startOffset + " for partition " + topicPartition + ", " + + "but we only have log segments upto " + endOffset + "."); + } + if (startOffset == maxOffsetMetadata.messageOffset) return emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns); + if (startOffset > maxOffsetMetadata.messageOffset) return emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns); + + // Do the read on the segment with a base offset less than the target offset + // but if that segment doesn't contain any messages with an offset greater than that + // continue to read from successive segments until we get some messages or we reach the end of the log + FetchDataInfo fetchDataInfo = null; + while (fetchDataInfo == null && segmentOpt.isPresent()) { + LogSegment segment = segmentOpt.get(); + long baseOffset = segment.baseOffset(); + + // 1. If `maxOffsetMetadata#segmentBaseOffset < segment#baseOffset`, then return maxPosition as empty. + // 2. Use the max-offset position if it is on this segment; otherwise, the segment size is the limit. + // 3. When maxOffsetMetadata is message-offset-only, then we don't know the relativePositionInSegment so + // return maxPosition as empty to avoid reading beyond the max-offset + Optional maxPositionOpt; + if (segment.baseOffset() < maxOffsetMetadata.segmentBaseOffset) + maxPositionOpt = Optional.of((long) segment.size()); + else if (segment.baseOffset() == maxOffsetMetadata.segmentBaseOffset && !maxOffsetMetadata.messageOffsetOnly()) + maxPositionOpt = Optional.of((long) maxOffsetMetadata.relativePositionInSegment); + else + maxPositionOpt = Optional.empty(); + + fetchDataInfo = segment.read(startOffset, maxLength, maxPositionOpt, minOneMessage); + if (fetchDataInfo != null) { + if (includeAbortedTxns) { + fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo); + } + } else { + segmentOpt = segments.higherSegment(baseOffset); + } + } + if (fetchDataInfo != null) { + return fetchDataInfo; + } else { + // okay we are beyond the end of the last segment with no data fetched although the start offset is in range, + // this can happen when all messages with offset larger than start offsets have been deleted. + // In this case, we will return the empty set with log end offset metadata + return new FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY); + } + } + ); + } + + public void append(long lastOffset, long largestTimestamp, long shallowOffsetOfMaxTimestamp, MemoryRecords records) throws IOException { + segments.activeSegment().append(lastOffset, largestTimestamp, shallowOffsetOfMaxTimestamp, records); + updateLogEndOffset(lastOffset + 1); + } + + FetchDataInfo addAbortedTransactions(long startOffset, LogSegment segment, FetchDataInfo fetchInfo) throws IOException { + int fetchSize = fetchInfo.records.sizeInBytes(); + OffsetPosition startOffsetPosition = new OffsetPosition( + fetchInfo.fetchOffsetMetadata.messageOffset, + fetchInfo.fetchOffsetMetadata.relativePositionInSegment); + long upperBoundOffset = segment.fetchUpperBoundOffset(startOffsetPosition, fetchSize) + .orElse(segments.higherSegment(segment.baseOffset()) + .map(LogSegment::baseOffset).orElse(logEndOffset())); + + List abortedTransactions = new ArrayList<>(); + Consumer> accumulator = abortedTxns -> { + for (AbortedTxn abortedTxn : abortedTxns) + abortedTransactions.add(abortedTxn.asAbortedTransaction()); + }; + collectAbortedTransactions(startOffset, upperBoundOffset, segment, accumulator); + return new FetchDataInfo(fetchInfo.fetchOffsetMetadata, + fetchInfo.records, + fetchInfo.firstEntryIncomplete, + Optional.of(abortedTransactions)); + } + + private void collectAbortedTransactions(long startOffset, long upperBoundOffset, + LogSegment startingSegment, + Consumer> accumulator) { + Iterator higherSegments = segments.higherSegments(startingSegment.baseOffset()).iterator(); + Optional segmentEntryOpt = Optional.of(startingSegment); + while (segmentEntryOpt.isPresent()) { + LogSegment segment = segmentEntryOpt.get(); + TxnIndexSearchResult searchResult = segment.collectAbortedTxns(startOffset, upperBoundOffset); + accumulator.accept(searchResult.abortedTransactions); + if (searchResult.isComplete) return; + segmentEntryOpt = nextItem(higherSegments); + } + } + + public List collectAbortedTransactions(long logStartOffset, long baseOffset, long upperBoundOffset) { + Optional segmentEntry = segments.floorSegment(baseOffset); + List allAbortedTxns = new ArrayList<>(); + segmentEntry.ifPresent(logSegment -> collectAbortedTransactions(logStartOffset, upperBoundOffset, logSegment, allAbortedTxns::addAll)); + return allAbortedTxns; + } + + /** + * Roll the log over to a new active segment starting with the current logEndOffset. + * This will trim the index to the exact size of the number of entries it currently contains. + * + * @param expectedNextOffset The expected next offset after the segment is rolled + * + * @return The newly rolled segment + */ + public LogSegment roll(Long expectedNextOffset) { + return maybeHandleIOException( + () -> "Error while rolling log segment for " + topicPartition + " in dir " + dir.getParent(), + () -> { + long start = time.hiResClockMs(); + checkIfMemoryMappedBufferClosed(); + long newOffset = Math.max(expectedNextOffset, logEndOffset()); + File logFile = LogFileUtils.logFile(dir, newOffset, ""); + LogSegment activeSegment = segments.activeSegment(); + if (segments.contains(newOffset)) { + // segment with the same base offset already exists and loaded + if (activeSegment.baseOffset() == newOffset && activeSegment.size() == 0) { + // We have seen this happen (see KAFKA-6388) after shouldRoll() returns true for an + // active segment of size zero because of one of the indexes is "full" (due to _maxEntries == 0). + logger.warn("Trying to roll a new log segment with start offset {}=max(provided offset = {}, LEO = {}) " + + "while it already exists and is active with size 0. Size of time index: {}, size of offset index: {}.", + newOffset, expectedNextOffset, logEndOffset(), activeSegment.timeIndex().entries(), activeSegment.offsetIndex().entries()); + LogSegment newSegment = createAndDeleteSegment( + newOffset, + activeSegment, + true, + toDelete -> logger.info("Deleting segments as part of log roll: {}", toDelete.stream() + .map(LogSegment::toString) + .collect(Collectors.joining(", ")))); + updateLogEndOffset(nextOffsetMetadata.messageOffset); + logger.info("Rolled new log segment at offset {} in {} ms.", newOffset, time.hiResClockMs() - start); + return newSegment; + } else { + throw new KafkaException("Trying to roll a new log segment for topic partition " + topicPartition + " with start offset " + newOffset + + " =max(provided offset = " + expectedNextOffset + ", LEO = " + logEndOffset() + ") while it already exists. Existing " + + "segment is " + segments.get(newOffset) + "."); + } + } else if (!segments.isEmpty() && newOffset < activeSegment.baseOffset()) { + throw new KafkaException( + "Trying to roll a new log segment for topic partition " + topicPartition + " with " + + "start offset " + newOffset + " =max(provided offset = " + expectedNextOffset + ", LEO = " + logEndOffset() + ") lower than start offset of the active segment " + activeSegment); + } else { + File offsetIdxFile = LogFileUtils.offsetIndexFile(dir, newOffset); + File timeIdxFile = LogFileUtils.timeIndexFile(dir, newOffset); + File txnIdxFile = LogFileUtils.transactionIndexFile(dir, newOffset); + for (File file : Arrays.asList(logFile, offsetIdxFile, timeIdxFile, txnIdxFile)) { + if (file.exists()) { + logger.warn("Newly rolled segment file {} already exists; deleting it first", file.getAbsolutePath()); + Files.delete(file.toPath()); + } + } + if (segments.lastSegment().isPresent()) { + segments.lastSegment().get().onBecomeInactiveSegment(); + } + } + LogSegment newSegment = LogSegment.open(dir, + newOffset, + config, + time, + config.initFileSize(), + config.preallocate); + segments.add(newSegment); + + // We need to update the segment base offset and append position data of the metadata when log rolls. + // The next offset should not change. + updateLogEndOffset(nextOffsetMetadata.messageOffset); + logger.info("Rolled new log segment at offset {} in {} ms.", newOffset, time.hiResClockMs() - start); + return newSegment; + } + ); + } + + /** + * Delete all data in the local log and start at the new offset. + * + * @param newOffset The new offset to start the log with + * @return the list of segments that were scheduled for deletion + */ + public List truncateFullyAndStartAt(long newOffset) { + return maybeHandleIOException( + () -> "Error while truncating the entire log for " + topicPartition + " in dir " + dir.getParent(), + () -> { + logger.debug("Truncate and start at offset {}", newOffset); + checkIfMemoryMappedBufferClosed(); + List segmentsToDelete = new ArrayList<>(segments.values()); + + if (!segmentsToDelete.isEmpty()) { + removeAndDeleteSegments(segmentsToDelete.subList(0, segmentsToDelete.size() - 1), true, new LogTruncation(logger)); + // Use createAndDeleteSegment() to create new segment first and then delete the old last segment to prevent missing + // active segment during the deletion process + createAndDeleteSegment(newOffset, segmentsToDelete.get(segmentsToDelete.size() - 1), true, new LogTruncation(logger)); + } + updateLogEndOffset(newOffset); + return segmentsToDelete; + } + ); + } + + /** + * Truncate this log so that it ends with the greatest offset < targetOffset. + * + * @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete. + * @return the list of segments that were scheduled for deletion + */ + public Collection truncateTo(long targetOffset) throws IOException { + Collection deletableSegments = segments.filter(segment -> segment.baseOffset() > targetOffset); + removeAndDeleteSegments(deletableSegments, true, new LogTruncation(logger)); + segments.activeSegment().truncateTo(targetOffset); + updateLogEndOffset(targetOffset); + return deletableSegments; + } + + /** + * Return a directory name to rename the log directory to for async deletion. + * The name will be in the following format: "topic-partitionId.uniqueId-delete". + * If the topic name is too long, it will be truncated to prevent the total name + * from exceeding 255 characters. + */ + public static String logDeleteDirName(TopicPartition topicPartition) { + return logDirNameWithSuffixCappedLength(topicPartition, LogFileUtils.DELETE_DIR_SUFFIX); + } + + /** + * Return a directory name to rename the log directory to for stray partition deletion. + * The name will be in the following format: "topic-partitionId.uniqueId-stray". + * If the topic name is too long, it will be truncated to prevent the total name + * from exceeding 255 characters. + */ + public static String logStrayDirName(TopicPartition topicPartition) { + return logDirNameWithSuffixCappedLength(topicPartition, LogFileUtils.STRAY_DIR_SUFFIX); + } + + /** + * Return a future directory name for the given topic partition. The name will be in the following + * format: topic-partition.uniqueId-future where topic, partition and uniqueId are variables. + */ + public static String logFutureDirName(TopicPartition topicPartition) { + return logDirNameWithSuffix(topicPartition, LogFileUtils.FUTURE_DIR_SUFFIX); + } + + /** + * Return a new directory name in the following format: "${topic}-${partitionId}.${uniqueId}${suffix}". + * If the topic name is too long, it will be truncated to prevent the total name + * from exceeding 255 characters. + */ + private static String logDirNameWithSuffixCappedLength(TopicPartition topicPartition, String suffix) { + String uniqueId = UUID.randomUUID().toString().replaceAll("-", ""); + String fullSuffix = "-" + topicPartition.partition() + "." + uniqueId + suffix; + int prefixLength = Math.min(topicPartition.topic().length(), 255 - fullSuffix.length()); + return topicPartition.topic().substring(0, prefixLength) + fullSuffix; + } + + private static String logDirNameWithSuffix(TopicPartition topicPartition, String suffix) { + String uniqueId = UUID.randomUUID().toString().replaceAll("-", ""); + return logDirName(topicPartition) + "." + uniqueId + suffix; + } + + /** + * Return a directory name for the given topic partition. The name will be in the following + * format: topic-partition where topic, partition are variables. + */ + public static String logDirName(TopicPartition topicPartition) { + return topicPartition.topic() + "-" + topicPartition.partition(); + } + + private static KafkaException exception(File dir) throws IOException { + return new KafkaException("Found directory " + dir.getCanonicalPath() + ", '" + dir.getName() + "' is not in the form of " + + "topic-partition or topic-partition.uniqueId-delete (if marked for deletion).\n" + + "Kafka's log directories (and children) should only contain Kafka topic data."); + } + + /** + * Parse the topic and partition out of the directory name of a log + */ + public static TopicPartition parseTopicPartitionName(File dir) throws IOException { + if (dir == null) { + throw new KafkaException("dir should not be null"); + } + String dirName = dir.getName(); + if (dirName.isEmpty() || !dirName.contains("-")) { + throw exception(dir); + } + if (dirName.endsWith(DELETE_DIR_SUFFIX) && !DELETE_DIR_PATTERN.matcher(dirName).matches() || + dirName.endsWith(FUTURE_DIR_SUFFIX) && !FUTURE_DIR_PATTERN.matcher(dirName).matches() || + dirName.endsWith(STRAY_DIR_SUFFIX) && !STRAY_DIR_PATTERN.matcher(dirName).matches()) { + throw exception(dir); + } + String name = (dirName.endsWith(DELETE_DIR_SUFFIX) || dirName.endsWith(FUTURE_DIR_SUFFIX) || dirName.endsWith(STRAY_DIR_SUFFIX)) + ? dirName.substring(0, dirName.lastIndexOf('.')) + : dirName; + + int index = name.lastIndexOf('-'); + String topic = name.substring(0, index); + String partitionString = name.substring(index + 1); + if (topic.isEmpty() || partitionString.isEmpty()) { + throw exception(dir); + } + try { + return new TopicPartition(topic, Integer.parseInt(partitionString)); + } catch (NumberFormatException nfe) { + throw exception(dir); + } + } + + /** + * Wraps the value of iterator.next() in an Optional instance. + * + * @param iterator given iterator to iterate over + * @return if a next element exists, Optional#empty otherwise. + * @param the type of object held within the iterator + */ + public static Optional nextItem(Iterator iterator) { + return iterator.hasNext() ? Optional.of(iterator.next()) : Optional.empty(); + } + + private static FetchDataInfo emptyFetchDataInfo(LogOffsetMetadata fetchOffsetMetadata, boolean includeAbortedTxns) { + Optional> abortedTransactions = includeAbortedTxns + ? Optional.of(Collections.emptyList()) + : Optional.empty(); + return new FetchDataInfo(fetchOffsetMetadata, MemoryRecords.EMPTY, false, abortedTransactions); + } + /** * Invokes the provided function and handles any IOException raised by the function by marking the * provided directory offline. @@ -171,19 +926,16 @@ public class LocalLog { try { int position = 0; FileRecords sourceRecords = segment.log(); - while (position < sourceRecords.sizeInBytes()) { FileLogInputStream.FileChannelRecordBatch firstBatch = sourceRecords.batchesFrom(position).iterator().next(); LogSegment newSegment = createNewCleanedSegment(dir, config, firstBatch.baseOffset()); newSegments.add(newSegment); - int bytesAppended = newSegment.appendFromFile(sourceRecords, position); - if (bytesAppended == 0) + if (bytesAppended == 0) { throw new IllegalStateException("Failed to append records from position " + position + " in " + segment); - + } position += bytesAppended; } - // prepare new segments int totalSizeOfNewSegments = 0; for (LogSegment splitSegment : newSegments) { @@ -198,7 +950,7 @@ public class LocalLog { } // replace old segment with new ones LOG.info("{}Replacing overflowed segment $segment with split segments {}", logPrefix, newSegments); - List deletedSegments = replaceSegments(existingSegments, newSegments, Collections.singletonList(segment), + List deletedSegments = replaceSegments(existingSegments, newSegments, singletonList(segment), dir, topicPartition, config, scheduler, logDirFailureChannel, logPrefix, false); return new SplitSegmentResult(deletedSegments, newSegments); } catch (Exception e) { @@ -269,7 +1021,7 @@ public class LocalLog { .sorted(Comparator.comparingLong(LogSegment::baseOffset)) .collect(Collectors.toList()); - // need to do this in two phases to be crash safe AND do the delete asynchronously + // need to do this in two phases to be crash safe AND do the deletion asynchronously // if we crash in the middle of this we complete the swap in loadSegments() List reversedSegmentsList = new ArrayList<>(sortedNewSegments); Collections.reverse(reversedSegmentsList); @@ -290,7 +1042,7 @@ public class LocalLog { existingSegments.remove(segment.baseOffset()); } deleteSegmentFiles( - Collections.singletonList(segment), + singletonList(segment), true, dir, topicPartition, diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java index fd873f103ee..dd62b90f7f6 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java @@ -60,6 +60,12 @@ public final class LogFileUtils { /** Suffix of a directory that is scheduled to be deleted */ public static final String DELETE_DIR_SUFFIX = "-delete"; + /** Suffix of a directory that is used for future partition */ + public static final String FUTURE_DIR_SUFFIX = "-future"; + + /** Suffix of a directory that is used for stray partition */ + public static final String STRAY_DIR_SUFFIX = "-stray"; + private LogFileUtils() { } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogTruncation.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogTruncation.java new file mode 100644 index 00000000000..1a819676b69 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogTruncation.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.slf4j.Logger; + +import java.util.List; +import java.util.stream.Collectors; + +public class LogTruncation implements SegmentDeletionReason { + + private final Logger logger; + + public LogTruncation(Logger logger) { + this.logger = logger; + } + + @Override + public void logReason(List toDelete) { + logger.info("Deleting segments as part of log truncation: {}", toDelete.stream() + .map(LogSegment::toString) + .collect(Collectors.joining(", "))); + } +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentDeletionReason.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentDeletionReason.java new file mode 100644 index 00000000000..464ade51ee8 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentDeletionReason.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import java.util.List; + +public interface SegmentDeletionReason { + void logReason(List toDelete); +}