MINOR:Fill missing parameter annotations for LogCleaner methods (#13839)

Reviewers: Josep Prat <josep.prat@aiven.io>
---------

Co-authored-by: Deqi Hu <deqi.hu@shopee.com>
This commit is contained in:
hudeqi 2023-06-21 21:54:32 +08:00 committed by GitHub
parent 16bb8cbb8c
commit d5dafe22fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 116 additions and 14 deletions

View File

@ -91,6 +91,7 @@ import scala.util.control.ControlThrowable
* @param initialConfig Initial configuration parameters for the cleaner. Actual config may be dynamically updated.
* @param logDirs The directories where offset checkpoints reside
* @param logs The pool of logs
* @param logDirFailureChannel The channel used to add offline log dirs that may be encountered when cleaning the log
* @param time A way to control the passage of time
*/
class LogCleaner(initialConfig: CleanerConfig,
@ -149,7 +150,7 @@ class LogCleaner(initialConfig: CleanerConfig,
private[log] def deadThreadCount: Int = cleaners.count(_.isThreadFailed)
/**
* Start the background cleaning
* Start the background cleaner threads
*/
def startup(): Unit = {
info("Starting the log cleaner")
@ -161,7 +162,7 @@ class LogCleaner(initialConfig: CleanerConfig,
}
/**
* Stop the background cleaning
* Stop the background cleaner threads
*/
def shutdown(): Unit = {
info("Shutting down the log cleaner.")
@ -173,14 +174,25 @@ class LogCleaner(initialConfig: CleanerConfig,
}
}
/**
* Remove metrics
*/
def removeMetrics(): Unit = {
LogCleaner.MetricNames.foreach(metricsGroup.removeMetric)
}
/**
* @return A set of configs that is reconfigurable in LogCleaner
*/
override def reconfigurableConfigs: Set[String] = {
LogCleaner.ReconfigurableConfigs
}
/**
* Validate the new cleaner threads num is reasonable
*
* @param newConfig A submitted new KafkaConfig instance that contains new cleaner config
*/
override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
val numThreads = LogCleaner.cleanerConfig(newConfig).numThreads
val currentThreads = config.numThreads
@ -194,11 +206,14 @@ class LogCleaner(initialConfig: CleanerConfig,
}
/**
* Reconfigure log clean config. The will:
* 1. update desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond, if necessary
* 2. stop current log cleaners and create new ones.
* That ensures that if any of the cleaners had failed, new cleaners are created to match the new config.
*/
* Reconfigure log clean config. The will:
* 1. update desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond, if necessary
* 2. stop current log cleaners and create new ones.
* That ensures that if any of the cleaners had failed, new cleaners are created to match the new config.
*
* @param oldConfig the old log cleaner config
* @param newConfig the new log cleaner config reconfigured
*/
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
config = LogCleaner.cleanerConfig(newConfig)
@ -215,6 +230,8 @@ class LogCleaner(initialConfig: CleanerConfig,
/**
* Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of
* the partition is aborted.
*
* @param topicPartition The topic and partition to abort cleaning
*/
def abortCleaning(topicPartition: TopicPartition): Unit = {
cleanerManager.abortCleaning(topicPartition)
@ -222,20 +239,28 @@ class LogCleaner(initialConfig: CleanerConfig,
/**
* Update checkpoint file to remove partitions if necessary.
*
* @param dataDir The data dir to be updated if necessary
* @param partitionToRemove The topicPartition to be removed, default none
*/
def updateCheckpoints(dataDir: File, partitionToRemove: Option[TopicPartition] = None): Unit = {
cleanerManager.updateCheckpoints(dataDir, partitionToRemove = partitionToRemove)
}
/**
* alter the checkpoint directory for the topicPartition, to remove the data in sourceLogDir, and add the data in destLogDir
* Alter the checkpoint directory for the `topicPartition`, to remove the data in `sourceLogDir`, and add the data in `destLogDir`
* Generally occurs when the disk balance ends and replaces the previous file with the future file
*
* @param topicPartition The topic and partition to alter checkpoint
* @param sourceLogDir The source log dir to remove checkpoint
* @param destLogDir The dest log dir to remove checkpoint
*/
def alterCheckpointDir(topicPartition: TopicPartition, sourceLogDir: File, destLogDir: File): Unit = {
cleanerManager.alterCheckpointDir(topicPartition, sourceLogDir, destLogDir)
}
/**
* Stop cleaning logs in the provided directory
* Stop cleaning logs in the provided directory when handling log dir failure
*
* @param dir the absolute path of the log dir
*/
@ -244,7 +269,11 @@ class LogCleaner(initialConfig: CleanerConfig,
}
/**
* Truncate cleaner offset checkpoint for the given partition if its checkpointed offset is larger than the given offset
* Truncate cleaner offset checkpoint for the given partition if its checkpoint offset is larger than the given offset
*
* @param dataDir The data dir to be truncated if necessary
* @param topicPartition The topic and partition to truncate checkpoint offset
* @param offset The given offset to be compared
*/
def maybeTruncateCheckpoint(dataDir: File, topicPartition: TopicPartition, offset: Long): Unit = {
cleanerManager.maybeTruncateCheckpoint(dataDir, topicPartition, offset)
@ -253,14 +282,18 @@ class LogCleaner(initialConfig: CleanerConfig,
/**
* Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition.
* This call blocks until the cleaning of the partition is aborted and paused.
*
* @param topicPartition The topic and partition to abort and pause cleaning
*/
def abortAndPauseCleaning(topicPartition: TopicPartition): Unit = {
cleanerManager.abortAndPauseCleaning(topicPartition)
}
/**
* Resume the cleaning of paused partitions.
*/
* Resume the cleaning of paused partitions.
*
* @param topicPartitions The collection of topicPartitions to be resumed cleaning
*/
def resumeCleaning(topicPartitions: Iterable[TopicPartition]): Unit = {
cleanerManager.resumeCleaning(topicPartitions)
}
@ -289,6 +322,7 @@ class LogCleaner(initialConfig: CleanerConfig,
/**
* To prevent race between retention and compaction,
* retention threads need to make this call to obtain:
*
* @return A list of log partitions that retention threads can safely work on
*/
def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, UnifiedLog)] = {
@ -327,6 +361,11 @@ class LogCleaner(initialConfig: CleanerConfig,
@volatile var lastStats: CleanerStats = new CleanerStats()
@volatile var lastPreCleanStats: PreCleanStats = new PreCleanStats()
/**
* Check if the cleaning for a partition is aborted. If so, throw an exception.
*
* @param topicPartition The topic and partition to check
*/
private def checkDone(topicPartition: TopicPartition): Unit = {
if (!isRunning)
throw new ThreadShutdownException
@ -347,6 +386,7 @@ class LogCleaner(initialConfig: CleanerConfig,
/**
* Cleans a log if there is a dirty log available
*
* @return whether a log was cleaned
*/
private def tryCleanFilthiestLog(): Boolean = {
@ -417,6 +457,12 @@ class LogCleaner(initialConfig: CleanerConfig,
/**
* Log out statistics on a single run of the cleaner.
*
* @param id The cleaner thread id
* @param name The cleaned log name
* @param from The cleaned offset that is the first dirty offset to begin
* @param to The cleaned offset that is the first not cleaned offset to end
* @param stats The statistics for this round of cleaning
*/
def recordStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats): Unit = {
this.lastStats = stats
@ -532,6 +578,14 @@ private[log] class Cleaner(val id: Int,
doClean(cleanable, time.milliseconds())
}
/**
* Clean the given log
*
* @param cleanable The log to be cleaned
* @param currentTime The current timestamp for doing cleaning
*
* @return The first offset not cleaned and the statistics for this round of cleaning
* */
private[log] def doClean(cleanable: LogToClean, currentTime: Long): (Long, CleanerStats) = {
info("Beginning cleaning of log %s".format(cleanable.log.name))
@ -667,6 +721,8 @@ private[log] class Cleaner(val id: Int,
* @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment
* @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration
* @param maxLogMessageSize The maximum message size of the corresponding topic
* @param transactionMetadata The state of ongoing transactions which is carried between the cleaning of the grouped segments
* @param lastRecordsOfActiveProducers The active producers and its last data offset
* @param stats Collector for cleaning statistics
* @param currentTime The time at which the clean was initiated
*/
@ -778,6 +834,11 @@ private[log] class Cleaner(val id: Int,
* 1. A compacted topic using compression may contain a message set slightly larger than max.message.bytes
* 2. max.message.bytes of a topic could have been reduced after writing larger messages
* In these cases, grow the buffer to hold the next batch.
*
* @param sourceRecords The dirty log segment records to process
* @param position The current position in the read buffer to read from
* @param maxLogMessageSize The maximum record size in bytes for the topic
* @param memoryRecords The memory records in read buffer
*/
private def growBuffersOrFail(sourceRecords: FileRecords,
position: Int,
@ -803,6 +864,14 @@ private[log] class Cleaner(val id: Int,
growBuffers(maxSize)
}
/**
* Check if a batch should be discard by cleaned transaction state
*
* @param batch The batch of records to check
* @param transactionMetadata The maintained transaction state about cleaning
*
* @return if the batch can be discarded
*/
private def shouldDiscardBatch(batch: RecordBatch,
transactionMetadata: CleanedTransactionMetadata): Boolean = {
if (batch.isControlBatch)
@ -811,6 +880,18 @@ private[log] class Cleaner(val id: Int,
transactionMetadata.onBatchRead(batch)
}
/**
* Check if a record should be retained
*
* @param map The offset map(key=>offset) to use for cleaning segments
* @param retainDeletesForLegacyRecords Should tombstones (lower than version 2) and markers be retained while cleaning this segment
* @param batch The batch of records that the record belongs to
* @param record The record to check
* @param stats The collector for cleaning statistics
* @param currentTime The current time that used to compare with the delete horizon time of the batch when judging a non-legacy record
*
* @return if the record can be retained
*/
private def shouldRetainRecord(map: OffsetMap,
retainDeletesForLegacyRecords: Boolean,
batch: RecordBatch,
@ -847,6 +928,8 @@ private[log] class Cleaner(val id: Int,
/**
* Double the I/O buffer capacity
*
* @param maxLogMessageSize The maximum record size in bytes allowed
*/
def growBuffers(maxLogMessageSize: Int): Unit = {
val maxBufferSize = math.max(maxLogMessageSize, maxIoBufferSize)
@ -876,6 +959,7 @@ private[log] class Cleaner(val id: Int,
* @param segments The log segments to group
* @param maxSize the maximum size in bytes for the total of all log data in a group
* @param maxIndexSize the maximum size in bytes for the total of all index data in a group
* @param firstUncleanableOffset The upper(exclusive) offset to clean to
*
* @return A list of grouped segments
*/
@ -915,7 +999,8 @@ private[log] class Cleaner(val id: Int,
* the base offset of the next segment in the list.
* If the next segment doesn't exist, first Uncleanable Offset will be used.
*
* @param segs - remaining segments to group.
* @param segs Remaining segments to group.
* @param firstUncleanableOffset The upper(exclusive) offset to clean to
* @return The estimated last offset for the first segment in segs
*/
private def lastOffsetForFirstSegment(segs: List[LogSegment], firstUncleanableOffset: Long): Long = {
@ -972,8 +1057,13 @@ private[log] class Cleaner(val id: Int,
/**
* Add the messages in the given segment to the offset map
*
* @param topicPartition The topic and partition of the log segment to build offset
* @param segment The segment to index
* @param map The map in which to store the key=>offset mapping
* @param startOffset The offset at which dirty messages begin
* @param nextSegmentStartOffset The base offset for next segment when building current segment
* @param maxLogMessageSize The maximum size in bytes for record allowed
* @param transactionMetadata The state of ongoing transactions for the log between offset range to build
* @param stats Collector for cleaning statistics
*
* @return If the map was filled whilst loading from this segment
@ -1152,6 +1242,11 @@ private[log] class CleanedTransactionMetadata {
// Output cleaned index to write retained aborted transactions
var cleanedIndex: Option[TransactionIndex] = None
/**
* Update the cleaned transaction state with the new found aborted transactions that has just been traversed.
*
* @param abortedTransactions The new found aborted transactions to add
*/
def addAbortedTransactions(abortedTransactions: List[AbortedTxn]): Unit = {
this.abortedTransactions ++= abortedTransactions
}
@ -1159,6 +1254,10 @@ private[log] class CleanedTransactionMetadata {
/**
* Update the cleaned transaction state with a control batch that has just been traversed by the cleaner.
* Return true if the control batch can be discarded.
*
* @param controlBatch The control batch that been traversed
*
* @return True if the control batch can be discarded
*/
def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
consumeAbortedTxnsUpTo(controlBatch.lastOffset)
@ -1200,6 +1299,10 @@ private[log] class CleanedTransactionMetadata {
/**
* Update the transactional state for the incoming non-control batch. If the batch is part of
* an aborted transaction, return true to indicate that it is safe to discard.
*
* @param batch The batch to read when updating the transactional state
*
* @return Whether the batch is part of an aborted transaction or not
*/
def onBatchRead(batch: RecordBatch): Boolean = {
consumeAbortedTxnsUpTo(batch.lastOffset)

View File

@ -140,7 +140,6 @@ class LogSegment private[log] (val log: FileRecords,
* @param largestTimestamp The largest timestamp in the message set.
* @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append.
* @param records The log entries to append.
* @return the physical position in the file of the appended records
* @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow
*/
@nonthreadsafe