KAFKA-14486 Move LogCleanerManager to storage module (#19216)

Move LogCleanerManager and related classes to storage module and rewrite
in Java.

Reviewers: TengYao Chi <kitingiao@gmail.com>, Jun Rao
<junrao@gmail.com>, Mickael Maison <mickael.maison@gmail.com>, Chia-Ping
Tsai <chia7712@gmail.com>
This commit is contained in:
Dmitry Werner 2025-03-27 09:35:38 +05:00 committed by GitHub
parent eb88e78373
commit 84b8fec089
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 1347 additions and 935 deletions

View File

@ -80,6 +80,7 @@
<subpackage name="storage.internals">
<allow pkg="kafka.server"/>
<allow pkg="kafka.log"/>
<allow pkg="kafka.utils"/>
<allow pkg="com.fasterxml.jackson" />
<allow pkg="com.yammer.metrics.core" />
<allow pkg="org.apache.kafka.common" />

View File

@ -21,11 +21,11 @@ import java.io.{File, IOException}
import java.lang.{Long => JLong}
import java.nio._
import java.util
import java.util.Date
import java.util.{Date, Optional}
import java.util.concurrent.TimeUnit
import kafka.log.LogCleaner.{CleanerRecopyPercentMetricName, DeadThreadCountMetricName, MaxBufferUtilizationPercentMetricName, MaxCleanTimeMetricName, MaxCompactionDelayMetricsName}
import kafka.server.{BrokerReconfigurable, KafkaConfig}
import kafka.utils.{Logging, Pool}
import kafka.utils.Logging
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.errors.{CorruptRecordException, KafkaStorageException}
@ -36,12 +36,13 @@ import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.server.config.ServerConfigs
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, LastRecord, LogCleaningAbortedException, LogDirFailureChannel, LogSegment, LogSegmentOffsetOverflowException, OffsetMap, SkimpyOffsetMap, ThreadShutdownException, TransactionIndex, UnifiedLog}
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, LastRecord, LogCleanerManager, LogCleaningAbortedException, LogCleaningException, LogDirFailureChannel, LogSegment, LogSegmentOffsetOverflowException, LogToClean, OffsetMap, PreCleanStats, SkimpyOffsetMap, ThreadShutdownException, TransactionIndex, UnifiedLog}
import org.apache.kafka.storage.internals.utils.Throttler
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
import scala.collection.{Iterable, Seq, Set, mutable}
import scala.jdk.OptionConverters.{RichOption, RichOptional}
import scala.util.control.ControlThrowable
/**
@ -93,13 +94,13 @@ import scala.util.control.ControlThrowable
*
* @param initialConfig Initial configuration parameters for the cleaner. Actual config may be dynamically updated.
* @param logDirs The directories where offset checkpoints reside
* @param logs The pool of logs
* @param logs The map of logs
* @param logDirFailureChannel The channel used to add offline log dirs that may be encountered when cleaning the log
* @param time A way to control the passage of time
*/
class LogCleaner(initialConfig: CleanerConfig,
val logDirs: Seq[File],
val logs: Pool[TopicPartition, UnifiedLog],
val logs: util.concurrent.ConcurrentMap[TopicPartition, UnifiedLog],
val logDirFailureChannel: LogDirFailureChannel,
time: Time = Time.SYSTEM) extends Logging with BrokerReconfigurable {
// Visible for test.
@ -109,7 +110,7 @@ class LogCleaner(initialConfig: CleanerConfig,
@volatile private var config = initialConfig
/* for managing the state of partitions being cleaned. package-private to allow access in tests */
private[log] val cleanerManager = new LogCleanerManager(logDirs, logs, logDirFailureChannel)
private[log] val cleanerManager = new LogCleanerManager(logDirs.asJava, logs, logDirFailureChannel)
/* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */
private[log] val throttler = new Throttler(config.maxIoBytesPerSecond, 300, "cleaner-io", "bytes", time)
@ -249,7 +250,7 @@ class LogCleaner(initialConfig: CleanerConfig,
* @param partitionToRemove The topicPartition to be removed, default none
*/
def updateCheckpoints(dataDir: File, partitionToRemove: Option[TopicPartition] = None): Unit = {
cleanerManager.updateCheckpoints(dataDir, partitionToRemove = partitionToRemove)
cleanerManager.updateCheckpoints(dataDir, Optional.empty(), partitionToRemove.toJava)
}
/**
@ -300,7 +301,7 @@ class LogCleaner(initialConfig: CleanerConfig,
* @param topicPartitions The collection of topicPartitions to be resumed cleaning
*/
def resumeCleaning(topicPartitions: Iterable[TopicPartition]): Unit = {
cleanerManager.resumeCleaning(topicPartitions)
cleanerManager.resumeCleaning(topicPartitions.toList.asJava)
}
/**
@ -314,7 +315,7 @@ class LogCleaner(initialConfig: CleanerConfig,
* @return A boolean indicating whether the work has completed before timeout
*/
def awaitCleaned(topicPartition: TopicPartition, offset: Long, maxWaitMs: Long = 60000L): Boolean = {
def isCleaned = cleanerManager.allCleanerCheckpoints.get(topicPartition).fold(false)(_ >= offset)
def isCleaned = Option(cleanerManager.allCleanerCheckpoints.get(topicPartition)).fold(false)(_ >= offset)
var remainingWaitMs = maxWaitMs
while (!isCleaned && remainingWaitMs > 0) {
val sleepTime = math.min(100, remainingWaitMs)
@ -331,7 +332,7 @@ class LogCleaner(initialConfig: CleanerConfig,
* @return A list of log partitions that retention threads can safely work on
*/
def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, UnifiedLog)] = {
cleanerManager.pauseCleaningForNonCompactedPartitions()
cleanerManager.pauseCleaningForNonCompactedPartitions().asScala.map(entry => (entry.getKey, entry.getValue))
}
// Only for testing
@ -409,7 +410,7 @@ class LogCleaner(initialConfig: CleanerConfig,
@throws(classOf[LogCleaningException])
private def cleanFilthiestLog(): Boolean = {
val preCleanStats = new PreCleanStats()
val ltc = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats)
val ltc = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats).toScala
val cleaned = ltc match {
case None =>
false
@ -424,7 +425,7 @@ class LogCleaner(initialConfig: CleanerConfig,
case e: Exception => throw new LogCleaningException(cleanable.log, e.getMessage, e)
}
}
val deletable: Iterable[(TopicPartition, UnifiedLog)] = cleanerManager.deletableLogs()
val deletable = cleanerManager.deletableLogs().asScala
try {
deletable.foreach { case (_, log) =>
try {
@ -435,7 +436,7 @@ class LogCleaner(initialConfig: CleanerConfig,
}
}
} finally {
cleanerManager.doneDeleting(deletable.map(_._1))
cleanerManager.doneDeleting(deletable.keys.toList.asJava)
}
cleaned
@ -1150,25 +1151,6 @@ private[log] class Cleaner(val id: Int,
}
}
/**
* A simple struct for collecting pre-clean stats
*/
private class PreCleanStats {
var maxCompactionDelayMs = 0L
var delayedPartitions = 0
var cleanablePartitions = 0
def updateMaxCompactionDelay(delayMs: Long): Unit = {
maxCompactionDelayMs = Math.max(maxCompactionDelayMs, delayMs)
if (delayMs > 0) {
delayedPartitions += 1
}
}
def recordCleanablePartitions(numOfCleanables: Int): Unit = {
cleanablePartitions = numOfCleanables
}
}
/**
* A simple struct for collecting stats about log cleaning
*/
@ -1221,22 +1203,6 @@ private class CleanerStats(time: Time = Time.SYSTEM) {
}
/**
* Helper class for a log, its topic/partition, the first cleanable position, the first uncleanable dirty position,
* and whether it needs compaction immediately.
*/
private case class LogToClean(topicPartition: TopicPartition,
log: UnifiedLog,
firstDirtyOffset: Long,
uncleanableOffset: Long,
needCompactionNow: Boolean = false) extends Ordered[LogToClean] {
val cleanBytes: Long = log.logSegments(-1, firstDirtyOffset).asScala.map(_.size.toLong).sum
val (firstUncleanableOffset, cleanableBytes) = LogCleanerManager.calculateCleanableBytes(log, firstDirtyOffset, uncleanableOffset)
val totalBytes: Long = cleanBytes + cleanableBytes
val cleanableRatio: Double = cleanableBytes / totalBytes.toDouble
override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt
}
/**
* This is a helper class to facilitate tracking transaction state while cleaning the log. It maintains a set
* of the ongoing aborted and committed transactions as the cleaner is working its way through the log. This

View File

@ -1,686 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import java.lang.{Long => JLong}
import java.io.File
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kafka.utils.CoreUtils._
import kafka.utils.{Logging, Pool}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.utils.Time
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile
import org.apache.kafka.storage.internals.log.{LogCleaningAbortedException, LogDirFailureChannel, UnifiedLog}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import java.util.Comparator
import scala.collection.{Iterable, Seq, mutable}
import scala.jdk.CollectionConverters._
private[log] sealed trait LogCleaningState
private[log] case object LogCleaningInProgress extends LogCleaningState
private[log] case object LogCleaningAborted extends LogCleaningState
private[log] case class LogCleaningPaused(pausedCount: Int) extends LogCleaningState
private[log] class LogCleaningException(val log: UnifiedLog,
private val message: String,
private val cause: Throwable) extends KafkaException(message, cause)
/**
* This class manages the state of each partition being cleaned.
* LogCleaningState defines the cleaning states that a TopicPartition can be in.
* 1. None : No cleaning state in a TopicPartition. In this state, it can become LogCleaningInProgress
* or LogCleaningPaused(1). Valid previous state are LogCleaningInProgress and LogCleaningPaused(1)
* 2. LogCleaningInProgress : The cleaning is currently in progress. In this state, it can become None when log cleaning is finished
* or become LogCleaningAborted. Valid previous state is None.
* 3. LogCleaningAborted : The cleaning abort is requested. In this state, it can become LogCleaningPaused(1).
* Valid previous state is LogCleaningInProgress.
* 4-a. LogCleaningPaused(1) : The cleaning is paused once. No log cleaning can be done in this state.
* In this state, it can become None or LogCleaningPaused(2).
* Valid previous state is None, LogCleaningAborted or LogCleaningPaused(2).
* 4-b. LogCleaningPaused(i) : The cleaning is paused i times where i>= 2. No log cleaning can be done in this state.
* In this state, it can become LogCleaningPaused(i-1) or LogCleaningPaused(i+1).
* Valid previous state is LogCleaningPaused(i-1) or LogCleaningPaused(i+1).
*/
private[log] class LogCleanerManager(val logDirs: Seq[File],
val logs: Pool[TopicPartition, UnifiedLog],
val logDirFailureChannel: LogDirFailureChannel) extends Logging {
import LogCleanerManager._
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
protected override def loggerName: String = classOf[LogCleaner].getName
// package-private for testing
private[log] val offsetCheckpointFile = "cleaner-offset-checkpoint"
/* the offset checkpoints holding the last cleaned point for each log */
@volatile private var checkpoints = logDirs.map(dir =>
(dir, new OffsetCheckpointFile(new File(dir, offsetCheckpointFile), logDirFailureChannel))).toMap
/* the set of logs currently being cleaned */
private val inProgress = mutable.HashMap[TopicPartition, LogCleaningState]()
/* the set of uncleanable partitions (partitions that have raised an unexpected error during cleaning)
* for each log directory */
private val uncleanablePartitions = mutable.HashMap[String, mutable.Set[TopicPartition]]()
/* a global lock used to control all access to the in-progress set and the offset checkpoints */
private val lock = new ReentrantLock
/* for coordinating the pausing and the cleaning of a partition */
private val pausedCleaningCond = lock.newCondition()
// Visible for testing
private[log] val gaugeMetricNameWithTag = new java.util.HashMap[String, java.util.List[java.util.Map[String, String]]]()
/* gauges for tracking the number of partitions marked as uncleanable for each log directory */
for (dir <- logDirs) {
val metricTag = Map("logDirectory" -> dir.getAbsolutePath).asJava
metricsGroup.newGauge(UncleanablePartitionsCountMetricName,
() => inLock(lock) { uncleanablePartitions.get(dir.getAbsolutePath).map(_.size).getOrElse(0) },
metricTag
)
gaugeMetricNameWithTag.computeIfAbsent(UncleanablePartitionsCountMetricName, _ => new java.util.ArrayList[java.util.Map[String, String]]())
.add(metricTag)
}
/* gauges for tracking the number of uncleanable bytes from uncleanable partitions for each log directory */
for (dir <- logDirs) {
val metricTag = Map("logDirectory" -> dir.getAbsolutePath).asJava
metricsGroup.newGauge(UncleanableBytesMetricName,
() => inLock(lock) {
uncleanablePartitions.get(dir.getAbsolutePath) match {
case Some(partitions) =>
val lastClean = allCleanerCheckpoints
val now = Time.SYSTEM.milliseconds
partitions.iterator.map { tp =>
Option(logs.get(tp)).map {
log =>
val lastCleanOffset: Option[Long] = lastClean.get(tp)
val offsetsToClean = cleanableOffsets(log, lastCleanOffset, now)
val (_, uncleanableBytes) = calculateCleanableBytes(log, offsetsToClean.firstDirtyOffset, offsetsToClean.firstUncleanableDirtyOffset)
uncleanableBytes
}.getOrElse(0L)
}.sum
case None => 0
}
},
metricTag
)
gaugeMetricNameWithTag.computeIfAbsent(UncleanableBytesMetricName, _ => new java.util.ArrayList[java.util.Map[String, String]]())
.add(metricTag)
}
/* a gauge for tracking the cleanable ratio of the dirtiest log */
@volatile private var dirtiestLogCleanableRatio = 0.0
metricsGroup.newGauge(MaxDirtyPercentMetricName, () => (100 * dirtiestLogCleanableRatio).toInt)
/* a gauge for tracking the time since the last log cleaner run, in milli seconds */
@volatile private var timeOfLastRun: Long = Time.SYSTEM.milliseconds
metricsGroup.newGauge(TimeSinceLastRunMsMetricName, () => Time.SYSTEM.milliseconds - timeOfLastRun)
/**
* @return the position processed for all logs.
*/
def allCleanerCheckpoints: Map[TopicPartition, Long] = {
inLock(lock) {
checkpoints.values.flatMap(checkpoint => {
try {
checkpoint.read().asScala.map{ case (tp, offset) => tp -> Long2long(offset) }
} catch {
case e: KafkaStorageException =>
error(s"Failed to access checkpoint file ${checkpoint.file.getName} in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e)
Map.empty[TopicPartition, Long]
}
}).toMap
}
}
/**
* Package private for unit test. Get the cleaning state of the partition.
*/
private[log] def cleaningState(tp: TopicPartition): Option[LogCleaningState] = {
inLock(lock) {
inProgress.get(tp)
}
}
/**
* Package private for unit test. Set the cleaning state of the partition.
*/
private[log] def setCleaningState(tp: TopicPartition, state: LogCleaningState): Unit = {
inLock(lock) {
inProgress.put(tp, state)
}
}
/**
* Choose the log to clean next and add it to the in-progress set. We recompute this
* each time from the full set of logs to allow logs to be dynamically added to the pool of logs
* the log manager maintains.
*/
def grabFilthiestCompactedLog(time: Time, preCleanStats: PreCleanStats = new PreCleanStats()): Option[LogToClean] = {
inLock(lock) {
val now = time.milliseconds
this.timeOfLastRun = now
val lastClean = allCleanerCheckpoints
val dirtyLogs = logs.filter {
case (_, log) => log.config.compact
}.filterNot {
case (topicPartition, log) =>
inProgress.contains(topicPartition) || isUncleanablePartition(log, topicPartition)
}.map {
case (topicPartition, log) => // create a LogToClean instance for each
try {
val lastCleanOffset = lastClean.get(topicPartition)
val offsetsToClean = cleanableOffsets(log, lastCleanOffset, now)
// update checkpoint for logs with invalid checkpointed offsets
if (offsetsToClean.forceUpdateCheckpoint)
updateCheckpoints(log.parentDirFile, partitionToUpdateOrAdd = Option(topicPartition, offsetsToClean.firstDirtyOffset))
val compactionDelayMs = maxCompactionDelay(log, offsetsToClean.firstDirtyOffset, now)
preCleanStats.updateMaxCompactionDelay(compactionDelayMs)
LogToClean(topicPartition, log, offsetsToClean.firstDirtyOffset, offsetsToClean.firstUncleanableDirtyOffset, compactionDelayMs > 0)
} catch {
case e: Throwable => throw new LogCleaningException(log,
s"Failed to calculate log cleaning stats for partition $topicPartition", e)
}
}.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0
// and must meet the minimum threshold for dirty byte ratio or have some bytes required to be compacted
val cleanableLogs = dirtyLogs.filter { ltc =>
(ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio
}
if (cleanableLogs.isEmpty)
None
else {
preCleanStats.recordCleanablePartitions(cleanableLogs.size)
val filthiest = cleanableLogs.max
inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
Some(filthiest)
}
}
}
/**
* Pause logs cleaning for logs that do not have compaction enabled
* and do not have other deletion or compaction in progress.
* This is to handle potential race between retention and cleaner threads when users
* switch topic configuration between compacted and non-compacted topic.
* @return retention logs that have log cleaning successfully paused
*/
def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, UnifiedLog)] = {
inLock(lock) {
val deletableLogs = logs.filter {
case (_, log) => !log.config.compact // pick non-compacted logs
}.filterNot {
case (topicPartition, _) => inProgress.contains(topicPartition) // skip any logs already in-progress
}
deletableLogs.foreach {
case (topicPartition, _) => inProgress.put(topicPartition, LogCleaningPaused(1))
}
deletableLogs
}
}
/**
* Find any logs that have compaction enabled. Mark them as being cleaned
* Include logs without delete enabled, as they may have segments
* that precede the start offset.
*/
def deletableLogs(): Iterable[(TopicPartition, UnifiedLog)] = {
inLock(lock) {
val toClean = logs.filter { case (topicPartition, log) =>
!inProgress.contains(topicPartition) && log.config.compact &&
!isUncleanablePartition(log, topicPartition)
}
toClean.foreach { case (tp, _) => inProgress.put(tp, LogCleaningInProgress) }
toClean
}
}
/**
* Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of
* the partition is aborted.
* This is implemented by first abortAndPausing and then resuming the cleaning of the partition.
*/
def abortCleaning(topicPartition: TopicPartition): Unit = {
inLock(lock) {
abortAndPauseCleaning(topicPartition)
resumeCleaning(Seq(topicPartition))
}
}
/**
* Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition.
* This call blocks until the cleaning of the partition is aborted and paused.
* 1. If the partition is not in progress, mark it as paused.
* 2. Otherwise, first mark the state of the partition as aborted.
* 3. The cleaner thread checks the state periodically and if it sees the state of the partition is aborted, it
* throws a LogCleaningAbortedException to stop the cleaning task.
* 4. When the cleaning task is stopped, doneCleaning() is called, which sets the state of the partition as paused.
* 5. abortAndPauseCleaning() waits until the state of the partition is changed to paused.
* 6. If the partition is already paused, a new call to this function
* will increase the paused count by one.
*/
def abortAndPauseCleaning(topicPartition: TopicPartition): Unit = {
inLock(lock) {
inProgress.get(topicPartition) match {
case None =>
inProgress.put(topicPartition, LogCleaningPaused(1))
case Some(LogCleaningInProgress) =>
inProgress.put(topicPartition, LogCleaningAborted)
case Some(LogCleaningPaused(count)) =>
inProgress.put(topicPartition, LogCleaningPaused(count + 1))
case Some(s) =>
throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be aborted and paused since it is in $s state.")
}
while (!isCleaningInStatePaused(topicPartition))
pausedCleaningCond.await(100, TimeUnit.MILLISECONDS)
}
}
/**
* Resume the cleaning of paused partitions.
* Each call of this function will undo one pause.
*/
def resumeCleaning(topicPartitions: Iterable[TopicPartition]): Unit = {
inLock(lock) {
topicPartitions.foreach {
topicPartition =>
inProgress.get(topicPartition) match {
case None =>
throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be resumed since it is not paused.")
case Some(state) =>
state match {
case LogCleaningPaused(count) if count == 1 =>
inProgress.remove(topicPartition)
case LogCleaningPaused(count) if count > 1 =>
inProgress.put(topicPartition, LogCleaningPaused(count - 1))
case s =>
throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be resumed since it is in $s state.")
}
}
}
}
}
/**
* Check if the cleaning for a partition is in a particular state. The caller is expected to hold lock while making the call.
*/
private def isCleaningInState(topicPartition: TopicPartition, expectedState: LogCleaningState): Boolean = {
inProgress.get(topicPartition) match {
case None => false
case Some(state) =>
if (state == expectedState)
true
else
false
}
}
/**
* Check if the cleaning for a partition is paused. The caller is expected to hold lock while making the call.
*/
private def isCleaningInStatePaused(topicPartition: TopicPartition): Boolean = {
inProgress.get(topicPartition) match {
case None => false
case Some(state) =>
state match {
case _: LogCleaningPaused =>
true
case _ =>
false
}
}
}
/**
* Check if the cleaning for a partition is aborted. If so, throw an exception.
*/
def checkCleaningAborted(topicPartition: TopicPartition): Unit = {
inLock(lock) {
if (isCleaningInState(topicPartition, LogCleaningAborted))
throw new LogCleaningAbortedException()
}
}
/**
* Update checkpoint file, adding or removing partitions if necessary.
*
* @param dataDir The File object to be updated
* @param partitionToUpdateOrAdd The [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add
* @param partitionToRemove The TopicPartition to be removed
*/
def updateCheckpoints(dataDir: File,
partitionToUpdateOrAdd: Option[(TopicPartition, JLong)] = None,
partitionToRemove: Option[TopicPartition] = None): Unit = {
inLock(lock) {
val checkpoint = checkpoints(dataDir)
if (checkpoint != null) {
try {
val currentCheckpoint = checkpoint.read().asScala.filter { case (tp, _) => logs.keys.contains(tp) }.toMap
// remove the partition offset if any
var updatedCheckpoint = partitionToRemove match {
case Some(topicPartition) => currentCheckpoint - topicPartition
case None => currentCheckpoint
}
// update or add the partition offset if any
updatedCheckpoint = partitionToUpdateOrAdd match {
case Some(updatedOffset) => updatedCheckpoint + updatedOffset
case None => updatedCheckpoint
}
checkpoint.write(updatedCheckpoint.asJava)
} catch {
case e: KafkaStorageException =>
error(s"Failed to access checkpoint file ${checkpoint.file.getName} in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e)
}
}
}
}
/**
* alter the checkpoint directory for the topicPartition, to remove the data in sourceLogDir, and add the data in destLogDir
*/
def alterCheckpointDir(topicPartition: TopicPartition, sourceLogDir: File, destLogDir: File): Unit = {
inLock(lock) {
try {
checkpoints.get(sourceLogDir).flatMap(_.read().asScala.get(topicPartition)) match {
case Some(offset) =>
debug(s"Removing the partition offset data in checkpoint file for '$topicPartition' " +
s"from ${sourceLogDir.getAbsoluteFile} directory.")
updateCheckpoints(sourceLogDir, partitionToRemove = Option(topicPartition))
debug(s"Adding the partition offset data in checkpoint file for '$topicPartition' " +
s"to ${destLogDir.getAbsoluteFile} directory.")
updateCheckpoints(destLogDir, partitionToUpdateOrAdd = Option(topicPartition, offset))
case None =>
}
} catch {
case e: KafkaStorageException =>
error(s"Failed to access checkpoint file in dir ${sourceLogDir.getAbsolutePath}", e)
}
val logUncleanablePartitions = uncleanablePartitions.getOrElse(sourceLogDir.toString, mutable.Set[TopicPartition]())
if (logUncleanablePartitions.contains(topicPartition)) {
logUncleanablePartitions.remove(topicPartition)
markPartitionUncleanable(destLogDir.toString, topicPartition)
}
}
}
/**
* Stop cleaning logs in the provided directory
*
* @param dir the absolute path of the log dir
*/
def handleLogDirFailure(dir: String): Unit = {
warn(s"Stopping cleaning logs in dir $dir")
inLock(lock) {
checkpoints = checkpoints.filter { case (k, _) => k.getAbsolutePath != dir }
}
}
/**
* Truncate the checkpointed offset for the given partition if its checkpointed offset is larger than the given offset
*/
def maybeTruncateCheckpoint(dataDir: File, topicPartition: TopicPartition, offset: JLong): Unit = {
inLock(lock) {
if (logs.get(topicPartition).config.compact) {
val checkpoint = checkpoints(dataDir)
if (checkpoint != null) {
val existing = checkpoint.read()
if (existing.getOrDefault(topicPartition, 0L) > offset) {
existing.put(topicPartition, offset)
checkpoint.write(existing)
}
}
}
}
}
/**
* Save out the endOffset and remove the given log from the in-progress set, if not aborted.
*/
def doneCleaning(topicPartition: TopicPartition, dataDir: File, endOffset: Long): Unit = {
inLock(lock) {
inProgress.get(topicPartition) match {
case Some(LogCleaningInProgress) =>
updateCheckpoints(dataDir, partitionToUpdateOrAdd = Option(topicPartition, endOffset))
inProgress.remove(topicPartition)
case Some(LogCleaningAborted) =>
inProgress.put(topicPartition, LogCleaningPaused(1))
pausedCleaningCond.signalAll()
case None =>
throw new IllegalStateException(s"State for partition $topicPartition should exist.")
case s =>
throw new IllegalStateException(s"In-progress partition $topicPartition cannot be in $s state.")
}
}
}
def doneDeleting(topicPartitions: Iterable[TopicPartition]): Unit = {
inLock(lock) {
topicPartitions.foreach {
topicPartition =>
inProgress.get(topicPartition) match {
case Some(LogCleaningInProgress) =>
inProgress.remove(topicPartition)
case Some(LogCleaningAborted) =>
inProgress.put(topicPartition, LogCleaningPaused(1))
pausedCleaningCond.signalAll()
case None =>
throw new IllegalStateException(s"State for partition $topicPartition should exist.")
case s =>
throw new IllegalStateException(s"In-progress partition $topicPartition cannot be in $s state.")
}
}
}
}
/**
* Returns an immutable set of the uncleanable partitions for a given log directory
* Only used for testing
*/
private[log] def uncleanablePartitions(logDir: String): Set[TopicPartition] = {
var partitions: Set[TopicPartition] = Set()
inLock(lock) { partitions ++= uncleanablePartitions.getOrElse(logDir, partitions) }
partitions
}
def markPartitionUncleanable(logDir: String, partition: TopicPartition): Unit = {
inLock(lock) {
uncleanablePartitions.get(logDir) match {
case Some(partitions) =>
partitions.add(partition)
case None =>
uncleanablePartitions.put(logDir, mutable.Set(partition))
}
}
}
private def isUncleanablePartition(log: UnifiedLog, topicPartition: TopicPartition): Boolean = {
inLock(lock) {
uncleanablePartitions.get(log.parentDir).exists(partitions => partitions.contains(topicPartition))
}
}
def maintainUncleanablePartitions(): Unit = {
// Remove deleted partitions from uncleanablePartitions
inLock(lock) {
// Remove deleted partitions
uncleanablePartitions.values.foreach { partitions =>
partitions.filterInPlace(logs.contains)
}
// Remove entries with empty partition set.
uncleanablePartitions.filterInPlace {
case (_, partitions) => partitions.nonEmpty
}
}
}
def removeMetrics(): Unit = {
GaugeMetricNameNoTag.foreach(metricsGroup.removeMetric)
gaugeMetricNameWithTag.asScala.foreach { metricNameAndTags =>
metricNameAndTags._2.asScala.foreach { tag =>
metricsGroup.removeMetric(metricNameAndTags._1, tag)
}
}
gaugeMetricNameWithTag.clear()
}
}
/**
* Helper class for the range of cleanable dirty offsets of a log and whether to update the checkpoint associated with
* the log
*
* @param firstDirtyOffset the lower (inclusive) offset to begin cleaning from
* @param firstUncleanableDirtyOffset the upper(exclusive) offset to clean to
* @param forceUpdateCheckpoint whether to update the checkpoint associated with this log. if true, checkpoint should be
* reset to firstDirtyOffset
*/
private case class OffsetsToClean(firstDirtyOffset: Long,
firstUncleanableDirtyOffset: Long,
forceUpdateCheckpoint: Boolean = false) {
}
private[log] object LogCleanerManager extends Logging {
private val UncleanablePartitionsCountMetricName = "uncleanable-partitions-count"
private val UncleanableBytesMetricName = "uncleanable-bytes"
private val MaxDirtyPercentMetricName = "max-dirty-percent"
private val TimeSinceLastRunMsMetricName = "time-since-last-run-ms"
// Visible for testing
private[log] val GaugeMetricNameNoTag = Set(
MaxDirtyPercentMetricName,
TimeSinceLastRunMsMetricName
)
private def isCompactAndDelete(log: UnifiedLog): Boolean = {
log.config.compact && log.config.delete
}
/**
* get max delay between the time when log is required to be compacted as determined
* by maxCompactionLagMs and the current time.
*/
private def maxCompactionDelay(log: UnifiedLog, firstDirtyOffset: Long, now: Long) : Long = {
val dirtyNonActiveSegments = log.nonActiveLogSegmentsFrom(firstDirtyOffset)
val firstBatchTimestamps = log.getFirstBatchTimestampForSegments(dirtyNonActiveSegments).stream.filter(_ > 0)
val earliestDirtySegmentTimestamp = firstBatchTimestamps.min(Comparator.naturalOrder()).orElse(Long.MaxValue)
val maxCompactionLagMs = math.max(log.config.maxCompactionLagMs, 0L)
val cleanUntilTime = now - maxCompactionLagMs
if (earliestDirtySegmentTimestamp < cleanUntilTime)
cleanUntilTime - earliestDirtySegmentTimestamp
else
0L
}
/**
* Returns the range of dirty offsets that can be cleaned.
*
* @param log the log
* @param lastCleanOffset the last checkpointed offset
* @param now the current time in milliseconds of the cleaning operation
* @return OffsetsToClean containing offsets for cleanable portion of log and whether the log checkpoint needs updating
*/
def cleanableOffsets(log: UnifiedLog, lastCleanOffset: Option[Long], now: Long): OffsetsToClean = {
// If the log segments are abnormally truncated and hence the checkpointed offset is no longer valid;
// reset to the log starting offset and log the error
val (firstDirtyOffset, forceUpdateCheckpoint) = {
val logStartOffset = log.logStartOffset
val checkpointDirtyOffset = lastCleanOffset.getOrElse(logStartOffset)
if (checkpointDirtyOffset < logStartOffset) {
// Don't bother with the warning if compact and delete are enabled.
if (!isCompactAndDelete(log))
warn(s"Resetting first dirty offset of ${log.name} to log start offset $logStartOffset " +
s"since the checkpointed offset $checkpointDirtyOffset is invalid.")
(logStartOffset, true)
} else if (checkpointDirtyOffset > log.logEndOffset) {
// The dirty offset has gotten ahead of the log end offset. This could happen if there was data
// corruption at the end of the log. We conservatively assume that the full log needs cleaning.
warn(s"The last checkpoint dirty offset for partition ${log.name} is $checkpointDirtyOffset, " +
s"which is larger than the log end offset ${log.logEndOffset}. Resetting to the log start offset $logStartOffset.")
(logStartOffset, true)
} else {
(checkpointDirtyOffset, false)
}
}
val minCompactionLagMs = math.max(log.config.compactionLagMs, 0L)
// Find the first segment that cannot be cleaned. We cannot clean past:
// 1. The active segment
// 2. The last stable offset (including the high watermark)
// 3. Any segments closer to the head of the log than the minimum compaction lag time
val firstUncleanableDirtyOffset: Long = Seq(
// we do not clean beyond the last stable offset
Some(log.lastStableOffset),
// the active segment is always uncleanable
Option(log.activeSegment.baseOffset),
// the first segment whose largest message timestamp is within a minimum time lag from now
if (minCompactionLagMs > 0) {
// dirty log segments
val dirtyNonActiveSegments = log.nonActiveLogSegmentsFrom(firstDirtyOffset)
dirtyNonActiveSegments.asScala.find { s =>
val isUncleanable = s.largestTimestamp > now - minCompactionLagMs
debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} " +
s"segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - minCompactionLagMs}; " +
s"is uncleanable=$isUncleanable")
isUncleanable
}.map(_.baseOffset)
} else None
).flatten.min
debug(s"Finding range of cleanable offsets for log=${log.name}. Last clean offset=$lastCleanOffset " +
s"now=$now => firstDirtyOffset=$firstDirtyOffset firstUncleanableOffset=$firstUncleanableDirtyOffset " +
s"activeSegment.baseOffset=${log.activeSegment.baseOffset}")
OffsetsToClean(firstDirtyOffset, math.max(firstDirtyOffset, firstUncleanableDirtyOffset), forceUpdateCheckpoint)
}
/**
* Given the first dirty offset and an uncleanable offset, calculates the total cleanable bytes for this log
* @return the biggest uncleanable offset and the total amount of cleanable bytes
*/
def calculateCleanableBytes(log: UnifiedLog, firstDirtyOffset: Long, uncleanableOffset: Long): (Long, Long) = {
val firstUncleanableSegment = log.nonActiveLogSegmentsFrom(uncleanableOffset).asScala.headOption.getOrElse(log.activeSegment)
val firstUncleanableOffset = firstUncleanableSegment.baseOffset
val cleanableBytes = log.logSegments(math.min(firstDirtyOffset, firstUncleanableOffset), firstUncleanableOffset).asScala.map(_.size.toLong).sum
(firstUncleanableOffset, cleanableBytes)
}
}

View File

@ -83,11 +83,11 @@ class LogManager(logDirs: Seq[File],
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
private val logCreationOrDeletionLock = new Object
private val currentLogs = new Pool[TopicPartition, UnifiedLog]()
private val currentLogs = new util.concurrent.ConcurrentHashMap[TopicPartition, UnifiedLog]()
// Future logs are put in the directory with "-future" suffix. Future log is created when user wants to move replica
// from one log directory to another log directory on the same broker. The directory of the future log will be renamed
// to replace the current log of the partition after the future log catches up with the current log
private val futureLogs = new Pool[TopicPartition, UnifiedLog]()
private val futureLogs = new util.concurrent.ConcurrentHashMap[TopicPartition, UnifiedLog]()
// Each element in the queue contains the log object to be deleted and the time it is scheduled for deletion.
private val logsToBeDeleted = new LinkedBlockingQueue[(UnifiedLog, Long)]()
@ -230,8 +230,8 @@ class LogManager(logDirs: Seq[File],
if (cleaner != null)
cleaner.handleLogDirFailure(dir)
def removeOfflineLogs(logs: Pool[TopicPartition, UnifiedLog]): Iterable[TopicPartition] = {
val offlineTopicPartitions: Iterable[TopicPartition] = logs.collect {
def removeOfflineLogs(logs: util.concurrent.ConcurrentMap[TopicPartition, UnifiedLog]): Iterable[TopicPartition] = {
val offlineTopicPartitions: Iterable[TopicPartition] = logs.asScala.collect {
case (tp, log) if log.parentDir == dir => tp
}
offlineTopicPartitions.foreach { topicPartition => {
@ -1180,7 +1180,7 @@ class LogManager(logDirs: Seq[File],
}
private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Iterable[(UnifiedLog, Option[UnifiedLog])] = {
futureLogs.values.flatMap { futureLog =>
futureLogs.asScala.values.flatMap { futureLog =>
val topicId = futureLog.topicId.orElseThrow(() =>
new RuntimeException(s"The log dir $futureLog does not have a topic ID, " +
"which is not allowed when running in KRaft mode.")
@ -1386,7 +1386,7 @@ class LogManager(logDirs: Seq[File],
// prevent cleaner from working on same partitions when changing cleanup policy
cleaner.pauseCleaningForNonCompactedPartitions()
} else {
currentLogs.filter {
currentLogs.asScala.filter {
case (_, log) => !log.config.compact
}
}
@ -1418,10 +1418,10 @@ class LogManager(logDirs: Seq[File],
/**
* Get all the partition logs
*/
def allLogs: Iterable[UnifiedLog] = currentLogs.values ++ futureLogs.values
def allLogs: Iterable[UnifiedLog] = currentLogs.asScala.values ++ futureLogs.asScala.values
def logsByTopic(topic: String): Seq[UnifiedLog] = {
(currentLogs.toList ++ futureLogs.toList).collect {
(currentLogs.asScala.toList ++ futureLogs.asScala.toList).collect {
case (topicPartition, log) if topicPartition.topic == topic => log
}
}
@ -1437,8 +1437,8 @@ class LogManager(logDirs: Seq[File],
def addToDir(tp: TopicPartition, log: UnifiedLog): Unit = {
byDir.getOrElseUpdate(log.parentDir, new mutable.AnyRefMap[TopicPartition, UnifiedLog]()).put(tp, log)
}
currentLogs.foreachEntry(addToDir)
futureLogs.foreachEntry(addToDir)
currentLogs.asScala.foreachEntry(addToDir)
futureLogs.asScala.foreachEntry(addToDir)
byDir
}
@ -1466,7 +1466,7 @@ class LogManager(logDirs: Seq[File],
private def flushDirtyLogs(): Unit = {
debug("Checking for dirty logs to flush...")
for ((topicPartition, log) <- currentLogs.toList ++ futureLogs.toList) {
for ((topicPartition, log) <- currentLogs.asScala.toList ++ futureLogs.asScala.toList) {
try {
val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
debug(s"Checking if flush is needed on ${topicPartition.topic} flush interval ${log.config.flushMs}" +
@ -1480,7 +1480,7 @@ class LogManager(logDirs: Seq[File],
}
}
private def removeLogAndMetrics(logs: Pool[TopicPartition, UnifiedLog], tp: TopicPartition): Option[UnifiedLog] = {
private def removeLogAndMetrics(logs: util.concurrent.ConcurrentMap[TopicPartition, UnifiedLog], tp: TopicPartition): Option[UnifiedLog] = {
val removedLog = logs.remove(tp)
if (removedLog != null) {
removedLog.removeLogMetrics()

View File

@ -19,7 +19,7 @@ package kafka.log
import java.io.File
import java.nio.file.Files
import java.util.{Optional, Properties}
import kafka.utils.{Pool, TestUtils}
import kafka.utils.TestUtils
import kafka.utils.Implicits._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression
@ -93,7 +93,7 @@ abstract class AbstractLogCleanerIntegrationTest {
cleanerIoBufferSize: Option[Int] = None,
propertyOverrides: Properties = new Properties()): LogCleaner = {
val logMap = new Pool[TopicPartition, UnifiedLog]()
val logMap = new java.util.concurrent.ConcurrentHashMap[TopicPartition, UnifiedLog]()
for (partition <- partitions) {
val dir = new File(logDir, s"${partition.topic}-${partition.partition}")
Files.createDirectories(dir.toPath)

View File

@ -25,7 +25,7 @@ import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.UnifiedLog
import org.apache.kafka.storage.internals.log.{LogCleanerManager, UnifiedLog}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
@ -78,8 +78,8 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest {
val uncleanableBytesGauge = getGauge[Long]("uncleanable-bytes", uncleanableDirectory)
TestUtils.waitUntilTrue(() => uncleanablePartitionsCountGauge.value() == 2, "There should be 2 uncleanable partitions", 2000L)
val expectedTotalUncleanableBytes = LogCleanerManager.calculateCleanableBytes(log, 0, log.logSegments.asScala.last.baseOffset)._2 +
LogCleanerManager.calculateCleanableBytes(log2, 0, log2.logSegments.asScala.last.baseOffset)._2
val expectedTotalUncleanableBytes = LogCleanerManager.calculateCleanableBytes(log, 0, log.logSegments.asScala.last.baseOffset).getValue +
LogCleanerManager.calculateCleanableBytes(log2, 0, log2.logSegments.asScala.last.baseOffset).getValue
TestUtils.waitUntilTrue(() => uncleanableBytesGauge.value() == expectedTotalUncleanableBytes,
s"There should be $expectedTotalUncleanableBytes uncleanable bytes", 1000L)
@ -172,7 +172,7 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest {
cleaner.awaitCleaned(new TopicPartition("log", 0), firstBlockCleanableSegmentOffset)
val read1 = readFromLog(log)
val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints(new TopicPartition("log", 0))
val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints.get(new TopicPartition("log", 0))
assertTrue(lastCleaned >= firstBlockCleanableSegmentOffset,
s"log cleaner should have processed at least to offset $firstBlockCleanableSegmentOffset, but lastCleaned=$lastCleaned")
@ -187,7 +187,7 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest {
assertEquals(appends1, read2, s"log should only contains zero keys now")
val lastCleaned2 = cleaner.cleanerManager.allCleanerCheckpoints(new TopicPartition("log", 0))
val lastCleaned2 = cleaner.cleanerManager.allCleanerCheckpoints.get(new TopicPartition("log", 0))
val secondBlockCleanableSegmentOffset = activeSegAtT1.baseOffset
assertTrue(lastCleaned2 >= secondBlockCleanableSegmentOffset,
s"log cleaner should have processed at least to offset $secondBlockCleanableSegmentOffset, but lastCleaned=$lastCleaned2")

View File

@ -92,7 +92,7 @@ class LogCleanerLagIntegrationTest extends AbstractLogCleanerIntegrationTest wit
val compactedSize = log.logSegments(0L, activeSegAtT0.baseOffset).asScala.map(_.size).sum
debug(s"after cleaning the compacted size up to active segment at T0: $compactedSize")
val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints(new TopicPartition("log", 0))
val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints.get(new TopicPartition("log", 0))
assertTrue(lastCleaned >= firstBlock1SegmentBaseOffset, s"log cleaner should have processed up to offset $firstBlock1SegmentBaseOffset, but lastCleaned=$lastCleaned")
assertTrue(sizeUpToActiveSegmentAtT0 > compactedSize, s"log should have been compacted: size up to offset of active segment at T0=$sizeUpToActiveSegmentAtT0 compacted size=$compactedSize")
}

View File

@ -28,7 +28,8 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AppendOrigin, LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog}
import org.apache.kafka.storage.internals.log.LogCleaningState.{LOG_CLEANING_ABORTED, LOG_CLEANING_IN_PROGRESS}
import org.apache.kafka.storage.internals.log.{AppendOrigin, LocalLog, LogCleanerManager, LogCleaningException, LogCleaningState, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogToClean, PreCleanStats, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
@ -37,6 +38,8 @@ import java.lang.{Long => JLong}
import java.util
import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOptional
/**
* Unit tests for the log cleaning logic
@ -46,7 +49,7 @@ class LogCleanerManagerTest extends Logging {
val tmpDir: File = TestUtils.tempDir()
val tmpDir2: File = TestUtils.tempDir()
val logDir: File = TestUtils.randomPartitionLogDir(tmpDir)
val logDir2: File = TestUtils.randomPartitionLogDir(tmpDir)
val logDir2: File = TestUtils.randomPartitionLogDir(tmpDir2)
val topicPartition = new TopicPartition("log", 0)
val topicPartition2 = new TopicPartition("log2", 0)
val logProps = new Properties()
@ -58,21 +61,21 @@ class LogCleanerManagerTest extends Logging {
val offset = 999
val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false)
val cleanerCheckpoints: mutable.Map[TopicPartition, Long] = mutable.Map[TopicPartition, Long]()
val cleanerCheckpoints: mutable.Map[TopicPartition, JLong] = mutable.Map[TopicPartition, JLong]()
class LogCleanerManagerMock(logDirs: Seq[File],
logs: Pool[TopicPartition, UnifiedLog],
class LogCleanerManagerMock(logDirs: util.List[File],
logs: util.concurrent.ConcurrentMap[TopicPartition, UnifiedLog],
logDirFailureChannel: LogDirFailureChannel) extends LogCleanerManager(logDirs, logs, logDirFailureChannel) {
override def allCleanerCheckpoints: Map[TopicPartition, Long] = {
cleanerCheckpoints.toMap
override def allCleanerCheckpoints: util.Map[TopicPartition, JLong] = {
cleanerCheckpoints.toMap.asJava
}
override def updateCheckpoints(dataDir: File, partitionToUpdateOrAdd: Option[(TopicPartition, JLong)] = None,
partitionToRemove: Option[TopicPartition] = None): Unit = {
override def updateCheckpoints(dataDir: File, partitionToUpdateOrAdd: Optional[util.Map.Entry[TopicPartition, JLong]],
partitionToRemove: Optional[TopicPartition]): Unit = {
assert(partitionToRemove.isEmpty, "partitionToRemove argument with value not yet handled")
val (tp, offset) = partitionToUpdateOrAdd.getOrElse(
throw new IllegalArgumentException("partitionToUpdateOrAdd==None argument not yet handled"))
cleanerCheckpoints.put(tp, offset)
val entry = partitionToUpdateOrAdd.orElseThrow(() =>
new IllegalArgumentException("partitionToUpdateOrAdd==None argument not yet handled"))
cleanerCheckpoints.put(entry.getKey, entry.getValue)
}
}
@ -83,8 +86,8 @@ class LogCleanerManagerTest extends Logging {
private def setupIncreasinglyFilthyLogs(partitions: Seq[TopicPartition],
startNumBatches: Int,
batchIncrement: Int): Pool[TopicPartition, UnifiedLog] = {
val logs = new Pool[TopicPartition, UnifiedLog]()
batchIncrement: Int): util.concurrent.ConcurrentMap[TopicPartition, UnifiedLog] = {
val logs = new util.concurrent.ConcurrentHashMap[TopicPartition, UnifiedLog]()
var numBatches = startNumBatches
for (tp <- partitions) {
@ -146,12 +149,12 @@ class LogCleanerManagerTest extends Logging {
batchesPerSegment = 2
)
val logsPool = new Pool[TopicPartition, UnifiedLog]()
val logsPool = new util.concurrent.ConcurrentHashMap[TopicPartition, UnifiedLog]()
logsPool.put(tp, log)
val cleanerManager = createCleanerManagerMock(logsPool)
cleanerCheckpoints.put(tp, 1)
val thrownException = assertThrows(classOf[LogCleaningException], () => cleanerManager.grabFilthiestCompactedLog(time).get)
val thrownException = assertThrows(classOf[LogCleaningException], () => cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats()).get)
assertEquals(log, thrownException.log)
assertTrue(thrownException.getCause.isInstanceOf[IllegalStateException])
}
@ -168,7 +171,7 @@ class LogCleanerManagerTest extends Logging {
val cleanerManager = createCleanerManagerMock(logs)
partitions.foreach(partition => cleanerCheckpoints.put(partition, 20))
val filthiestLog: LogToClean = cleanerManager.grabFilthiestCompactedLog(time).get
val filthiestLog: LogToClean = cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats()).get
assertEquals(tp2, filthiestLog.topicPartition)
assertEquals(tp2, filthiestLog.log.topicPartition)
}
@ -187,7 +190,7 @@ class LogCleanerManagerTest extends Logging {
cleanerManager.markPartitionUncleanable(logs.get(tp2).dir.getParent, tp2)
val filthiestLog: LogToClean = cleanerManager.grabFilthiestCompactedLog(time).get
val filthiestLog: LogToClean = cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats()).get
assertEquals(tp1, filthiestLog.topicPartition)
assertEquals(tp1, filthiestLog.log.topicPartition)
}
@ -204,9 +207,9 @@ class LogCleanerManagerTest extends Logging {
val cleanerManager = createCleanerManagerMock(logs)
partitions.foreach(partition => cleanerCheckpoints.put(partition, 20))
cleanerManager.setCleaningState(tp2, LogCleaningInProgress)
cleanerManager.setCleaningState(tp2, LOG_CLEANING_IN_PROGRESS)
val filthiestLog: LogToClean = cleanerManager.grabFilthiestCompactedLog(time).get
val filthiestLog: LogToClean = cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats()).get
assertEquals(tp1, filthiestLog.topicPartition)
assertEquals(tp1, filthiestLog.log.topicPartition)
}
@ -223,11 +226,11 @@ class LogCleanerManagerTest extends Logging {
val cleanerManager = createCleanerManagerMock(logs)
partitions.foreach(partition => cleanerCheckpoints.put(partition, 20))
cleanerManager.setCleaningState(tp2, LogCleaningInProgress)
cleanerManager.setCleaningState(tp2, LOG_CLEANING_IN_PROGRESS)
cleanerManager.markPartitionUncleanable(logs.get(tp1).dir.getParent, tp1)
val filthiestLog: Option[LogToClean] = cleanerManager.grabFilthiestCompactedLog(time)
assertEquals(None, filthiestLog)
val filthiestLog: Optional[LogToClean] = cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats())
assertEquals(Optional.empty(), filthiestLog)
}
@Test
@ -237,7 +240,7 @@ class LogCleanerManagerTest extends Logging {
val cleanerManager = createCleanerManagerMock(logs)
cleanerCheckpoints.put(tp, 200)
val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time).get
val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats()).get
assertEquals(0L, filthiestLog.firstDirtyOffset)
}
@ -251,7 +254,7 @@ class LogCleanerManagerTest extends Logging {
val cleanerManager = createCleanerManagerMock(logs)
cleanerCheckpoints.put(tp, 0L)
val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time).get
val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats()).get
assertEquals(10L, filthiestLog.firstDirtyOffset)
}
@ -260,7 +263,7 @@ class LogCleanerManagerTest extends Logging {
val tp = new TopicPartition("foo", 0)
val log = createLog(segmentSize = 2048, TopicConfig.CLEANUP_POLICY_COMPACT, tp)
val logs = new Pool[TopicPartition, UnifiedLog]()
val logs = new util.concurrent.ConcurrentHashMap[TopicPartition, UnifiedLog]()
logs.put(tp, log)
appendRecords(log, numRecords = 3)
@ -275,8 +278,8 @@ class LogCleanerManagerTest extends Logging {
cleanerCheckpoints.put(tp, 0L)
// The active segment is uncleanable and hence not filthy from the POV of the CleanerManager.
val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time)
assertEquals(None, filthiestLog)
val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats())
assertEquals(Optional.empty(), filthiestLog)
}
@Test
@ -287,7 +290,7 @@ class LogCleanerManagerTest extends Logging {
val tp = new TopicPartition("foo", 0)
val logs = new Pool[TopicPartition, UnifiedLog]()
val logs = new util.concurrent.ConcurrentHashMap[TopicPartition, UnifiedLog]()
val log = createLog(2048, TopicConfig.CLEANUP_POLICY_COMPACT, topicPartition = tp)
logs.put(tp, log)
@ -301,8 +304,8 @@ class LogCleanerManagerTest extends Logging {
cleanerCheckpoints.put(tp, 3L)
// These segments are uncleanable and hence not filthy
val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time)
assertEquals(None, filthiestLog)
val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats())
assertEquals(Optional.empty(), filthiestLog)
}
/**
@ -375,12 +378,12 @@ class LogCleanerManagerTest extends Logging {
log.updateConfig(config)
// log cleanup inprogress, the log is not available for compaction
val cleanable = cleanerManager.grabFilthiestCompactedLog(time)
val cleanable = cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats()).toScala
assertEquals(0, cleanable.size, "should have 0 logs ready to be compacted")
// log cleanup finished, and log can be picked up for compaction
cleanerManager.resumeCleaning(deletableLog.map(_._1))
val cleanable2 = cleanerManager.grabFilthiestCompactedLog(time)
cleanerManager.resumeCleaning(deletableLog.asScala.map(_.getKey).toList.asJava)
val cleanable2 = cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats()).toScala
assertEquals(1, cleanable2.size, "should have 1 logs ready to be compacted")
// update cleanup policy to delete
@ -393,7 +396,7 @@ class LogCleanerManagerTest extends Logging {
assertEquals(0, deletableLog2.size, "should have 0 logs ready to be deleted")
// compaction done, should have 1 log eligible for log cleanup
cleanerManager.doneDeleting(Seq(cleanable2.get.topicPartition))
cleanerManager.doneDeleting(Seq(cleanable2.get.topicPartition).asJava)
val deletableLog3 = cleanerManager.pauseCleaningForNonCompactedPartitions()
assertEquals(1, deletableLog3.size, "should have 1 logs ready to be deleted")
}
@ -405,11 +408,11 @@ class LogCleanerManagerTest extends Logging {
val cleanerManager: LogCleanerManager = createCleanerManager(log)
// expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.getOrElse(topicPartition, 0))
assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.getOrDefault(topicPartition, 0))
cleanerManager.updateCheckpoints(logDir, partitionToUpdateOrAdd = Option(topicPartition, offset))
cleanerManager.updateCheckpoints(logDir, Optional.of(util.Map.entry(topicPartition, offset)), Optional.empty())
// expect the checkpoint offset is now updated to the expected offset after doing updateCheckpoints
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition))
assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition))
}
@Test
@ -419,12 +422,12 @@ class LogCleanerManagerTest extends Logging {
val cleanerManager: LogCleanerManager = createCleanerManager(log)
// write some data into the cleaner-offset-checkpoint file
cleanerManager.updateCheckpoints(logDir, partitionToUpdateOrAdd = Option(topicPartition, offset))
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition))
cleanerManager.updateCheckpoints(logDir, Optional.of(util.Map.entry(topicPartition, offset)), Optional.empty())
assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition))
// updateCheckpoints should remove the topicPartition data in the logDir
cleanerManager.updateCheckpoints(logDir, partitionToRemove = Option(topicPartition))
assertFalse(cleanerManager.allCleanerCheckpoints.contains(topicPartition))
cleanerManager.updateCheckpoints(logDir, Optional.empty(), Optional.of(topicPartition))
assertFalse(cleanerManager.allCleanerCheckpoints.containsKey(topicPartition))
}
@Test
@ -434,15 +437,15 @@ class LogCleanerManagerTest extends Logging {
val cleanerManager: LogCleanerManager = createCleanerManager(log)
// write some data into the cleaner-offset-checkpoint file in logDir and logDir2
cleanerManager.updateCheckpoints(logDir, partitionToUpdateOrAdd = Option(topicPartition, offset))
cleanerManager.updateCheckpoints(logDir2, partitionToUpdateOrAdd = Option(topicPartition2, offset))
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition))
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition2))
cleanerManager.updateCheckpoints(logDir, Optional.of(util.Map.entry(topicPartition, offset)), Optional.empty())
cleanerManager.updateCheckpoints(logDir2, Optional.of(util.Map.entry(topicPartition2, offset)), Optional.empty())
assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition))
assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2))
cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
// verify the partition data in logDir is gone, and data in logDir2 is still there
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition2))
assertFalse(cleanerManager.allCleanerCheckpoints.contains(topicPartition))
assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2))
assertFalse(cleanerManager.allCleanerCheckpoints.containsKey(topicPartition))
}
@Test
@ -454,15 +457,15 @@ class LogCleanerManagerTest extends Logging {
val higherOffset = 1000L
// write some data into the cleaner-offset-checkpoint file in logDir
cleanerManager.updateCheckpoints(logDir, partitionToUpdateOrAdd = Option(topicPartition, offset))
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition))
cleanerManager.updateCheckpoints(logDir, Optional.of(util.Map.entry(topicPartition, offset)), Optional.empty())
assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition))
// we should not truncate the checkpoint data for checkpointed offset <= the given offset (higherOffset)
cleanerManager.maybeTruncateCheckpoint(logDir, topicPartition, higherOffset)
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition))
assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition))
// we should truncate the checkpoint data for checkpointed offset > the given offset (lowerOffset)
cleanerManager.maybeTruncateCheckpoint(logDir, topicPartition, lowerOffset)
assertEquals(lowerOffset, cleanerManager.allCleanerCheckpoints(topicPartition))
assertEquals(lowerOffset, cleanerManager.allCleanerCheckpoints.get(topicPartition))
}
@Test
@ -472,17 +475,17 @@ class LogCleanerManagerTest extends Logging {
val cleanerManager: LogCleanerManager = createCleanerManager(log)
// write some data into the cleaner-offset-checkpoint file in logDir
cleanerManager.updateCheckpoints(logDir, partitionToUpdateOrAdd = Option(topicPartition, offset))
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition))
cleanerManager.updateCheckpoints(logDir, Optional.of(util.Map.entry(topicPartition, offset)), Optional.empty())
assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition))
cleanerManager.alterCheckpointDir(topicPartition, logDir, logDir2)
// verify we still can get the partition offset after alterCheckpointDir
// This data should locate in logDir2, not logDir
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition))
assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition))
// force delete the logDir2 from checkpoints, so that the partition data should also be deleted
cleanerManager.handleLogDirFailure(logDir2.getAbsolutePath)
assertFalse(cleanerManager.allCleanerCheckpoints.contains(topicPartition))
assertFalse(cleanerManager.allCleanerCheckpoints.containsKey(topicPartition))
}
/**
@ -498,11 +501,11 @@ class LogCleanerManagerTest extends Logging {
val pausedPartitions = cleanerManager.pauseCleaningForNonCompactedPartitions()
// Log truncation happens due to unclean leader election
cleanerManager.abortAndPauseCleaning(log.topicPartition)
cleanerManager.resumeCleaning(Seq(log.topicPartition))
cleanerManager.resumeCleaning(Seq(log.topicPartition).asJava)
// log cleanup finishes and pausedPartitions are resumed
cleanerManager.resumeCleaning(pausedPartitions.map(_._1))
cleanerManager.resumeCleaning(pausedPartitions.asScala.map(_.getKey).toList.asJava)
assertEquals(None, cleanerManager.cleaningState(log.topicPartition))
assertEquals(Optional.empty(), cleanerManager.cleaningState(log.topicPartition))
}
/**
@ -519,9 +522,9 @@ class LogCleanerManagerTest extends Logging {
// Broker processes StopReplicaRequest with delete=true
cleanerManager.abortCleaning(log.topicPartition)
// log cleanup finishes and pausedPartitions are resumed
cleanerManager.resumeCleaning(pausedPartitions.map(_._1))
cleanerManager.resumeCleaning(pausedPartitions.asScala.map(_.getKey).toList.asJava)
assertEquals(None, cleanerManager.cleaningState(log.topicPartition))
assertEquals(Optional.empty(), cleanerManager.cleaningState(log.topicPartition))
}
/**
@ -554,7 +557,7 @@ class LogCleanerManagerTest extends Logging {
log.updateHighWatermark(50)
val lastCleanOffset = Some(0L)
val lastCleanOffset = Optional.of(0L.asInstanceOf[JLong])
val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds)
assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable offset starts at the beginning of the log.")
assertEquals(log.highWatermark, log.lastStableOffset, "The high watermark equals the last stable offset as no transactions are in progress")
@ -576,7 +579,7 @@ class LogCleanerManagerTest extends Logging {
log.updateHighWatermark(log.logEndOffset)
val lastCleanOffset = Some(0L)
val lastCleanOffset = Optional.of(0L.asInstanceOf[JLong])
val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds)
assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable offset starts at the beginning of the log.")
assertEquals(log.activeSegment.baseOffset, cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset begins with the active segment.")
@ -608,7 +611,7 @@ class LogCleanerManagerTest extends Logging {
log.updateHighWatermark(log.logEndOffset)
val lastCleanOffset = Some(0L)
val lastCleanOffset = Optional.of(0L.asInstanceOf[JLong])
val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds)
assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable offset starts at the beginning of the log.")
assertEquals(activeSegAtT0.baseOffset, cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset begins with the second block of log entries.")
@ -635,7 +638,7 @@ class LogCleanerManagerTest extends Logging {
time.sleep(compactionLag + 1)
val lastCleanOffset = Some(0L)
val lastCleanOffset = Optional.of(0L.asInstanceOf[JLong])
val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds)
assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable offset starts at the beginning of the log.")
assertEquals(log.activeSegment.baseOffset, cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset begins with active segment.")
@ -647,7 +650,7 @@ class LogCleanerManagerTest extends Logging {
val logs = setupIncreasinglyFilthyLogs(Seq(tp), startNumBatches = 20, batchIncrement = 5)
logs.get(tp).maybeIncrementLogStartOffset(10L, LogStartOffsetIncrementReason.ClientRecordDeletion)
var lastCleanOffset = Some(15L)
var lastCleanOffset = Optional.of(15L.asInstanceOf[JLong])
var cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp), lastCleanOffset, time.milliseconds)
assertFalse(cleanableOffsets.forceUpdateCheckpoint, "Checkpoint offset should not be reset if valid")
@ -655,7 +658,7 @@ class LogCleanerManagerTest extends Logging {
cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp), lastCleanOffset, time.milliseconds)
assertTrue(cleanableOffsets.forceUpdateCheckpoint, "Checkpoint offset needs to be reset if less than log start offset")
lastCleanOffset = Some(25L)
lastCleanOffset = Optional.of(25L)
cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp), lastCleanOffset, time.milliseconds)
assertTrue(cleanableOffsets.forceUpdateCheckpoint, "Checkpoint offset needs to be reset if greater than log end offset")
}
@ -682,7 +685,7 @@ class LogCleanerManagerTest extends Logging {
time.sleep(compactionLag + 1)
// although the compaction lag has been exceeded, the undecided data should not be cleaned
var cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Some(0L), time.milliseconds())
var cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Optional.of(0L), time.milliseconds())
assertEquals(0L, cleanableOffsets.firstDirtyOffset)
assertEquals(0L, cleanableOffsets.firstUncleanableDirtyOffset)
@ -693,14 +696,14 @@ class LogCleanerManagerTest extends Logging {
log.updateHighWatermark(4L)
// the first segment should now become cleanable immediately
cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Some(0L), time.milliseconds())
cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Optional.of(0L), time.milliseconds())
assertEquals(0L, cleanableOffsets.firstDirtyOffset)
assertEquals(3L, cleanableOffsets.firstUncleanableDirtyOffset)
time.sleep(compactionLag + 1)
// the second segment becomes cleanable after the compaction lag
cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Some(0L), time.milliseconds())
cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Optional.of(0L), time.milliseconds())
assertEquals(0L, cleanableOffsets.firstDirtyOffset)
assertEquals(4L, cleanableOffsets.firstUncleanableDirtyOffset)
}
@ -717,20 +720,20 @@ class LogCleanerManagerTest extends Logging {
assertThrows(classOf[IllegalStateException], () => cleanerManager.doneCleaning(topicPartition, log.dir, 1))
cleanerManager.setCleaningState(topicPartition, LogCleaningPaused(1))
cleanerManager.setCleaningState(topicPartition, LogCleaningState.logCleaningPaused(1))
assertThrows(classOf[IllegalStateException], () => cleanerManager.doneCleaning(topicPartition, log.dir, 1))
cleanerManager.setCleaningState(topicPartition, LogCleaningInProgress)
cleanerManager.setCleaningState(topicPartition, LOG_CLEANING_IN_PROGRESS)
val endOffset = 1L
cleanerManager.doneCleaning(topicPartition, log.dir, endOffset)
assertTrue(cleanerManager.cleaningState(topicPartition).isEmpty)
assertTrue(cleanerManager.allCleanerCheckpoints.contains(topicPartition))
assertEquals(Some(endOffset), cleanerManager.allCleanerCheckpoints.get(topicPartition))
assertTrue(cleanerManager.allCleanerCheckpoints.containsKey(topicPartition))
assertEquals(Some(endOffset), Option(cleanerManager.allCleanerCheckpoints.get(topicPartition)))
cleanerManager.setCleaningState(topicPartition, LogCleaningAborted)
cleanerManager.setCleaningState(topicPartition, LOG_CLEANING_ABORTED)
cleanerManager.doneCleaning(topicPartition, log.dir, endOffset)
assertEquals(LogCleaningPaused(1), cleanerManager.cleaningState(topicPartition).get)
assertTrue(cleanerManager.allCleanerCheckpoints.contains(topicPartition))
assertEquals(LogCleaningState.logCleaningPaused(1), cleanerManager.cleaningState(topicPartition).get)
assertTrue(cleanerManager.allCleanerCheckpoints.containsKey(topicPartition))
}
@Test
@ -740,18 +743,18 @@ class LogCleanerManagerTest extends Logging {
val cleanerManager: LogCleanerManager = createCleanerManager(log)
val tp = new TopicPartition("log", 0)
assertThrows(classOf[IllegalStateException], () => cleanerManager.doneDeleting(Seq(tp)))
assertThrows(classOf[IllegalStateException], () => cleanerManager.doneDeleting(Seq(tp).asJava))
cleanerManager.setCleaningState(tp, LogCleaningPaused(1))
assertThrows(classOf[IllegalStateException], () => cleanerManager.doneDeleting(Seq(tp)))
cleanerManager.setCleaningState(tp, LogCleaningState.logCleaningPaused(1))
assertThrows(classOf[IllegalStateException], () => cleanerManager.doneDeleting(Seq(tp).asJava))
cleanerManager.setCleaningState(tp, LogCleaningInProgress)
cleanerManager.doneDeleting(Seq(tp))
cleanerManager.setCleaningState(tp, LOG_CLEANING_IN_PROGRESS)
cleanerManager.doneDeleting(Seq(tp).asJava)
assertTrue(cleanerManager.cleaningState(tp).isEmpty)
cleanerManager.setCleaningState(tp, LogCleaningAborted)
cleanerManager.doneDeleting(Seq(tp))
assertEquals(LogCleaningPaused(1), cleanerManager.cleaningState(tp).get)
cleanerManager.setCleaningState(tp, LOG_CLEANING_ABORTED)
cleanerManager.doneDeleting(Seq(tp).asJava)
assertEquals(LogCleaningState.logCleaningPaused(1), cleanerManager.cleaningState(tp).get)
}
/**
@ -766,8 +769,8 @@ class LogCleanerManagerTest extends Logging {
val cleanerManager = createCleanerManagerMock(logs)
cleanerCheckpoints.put(tp, 15L)
val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time)
assertEquals(None, filthiestLog, "Log should not be selected for cleaning")
val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats())
assertEquals(Optional.empty(), filthiestLog, "Log should not be selected for cleaning")
assertEquals(20L, cleanerCheckpoints(tp), "Unselected log should have checkpoint offset updated")
}
@ -788,19 +791,19 @@ class LogCleanerManagerTest extends Logging {
cleanerCheckpoints.put(tp0, 10L)
cleanerCheckpoints.put(tp1, 5L)
val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time).get
val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats()).get
assertEquals(tp1, filthiestLog.topicPartition, "Dirtier log should be selected")
assertEquals(15L, cleanerCheckpoints(tp0), "Unselected log should have checkpoint offset updated")
}
private def createCleanerManager(log: UnifiedLog): LogCleanerManager = {
val logs = new Pool[TopicPartition, UnifiedLog]()
val logs = new util.concurrent.ConcurrentHashMap[TopicPartition, UnifiedLog]()
logs.put(topicPartition, log)
new LogCleanerManager(Seq(logDir, logDir2), logs, null)
new LogCleanerManager(Seq(logDir, logDir2).asJava, logs, null)
}
private def createCleanerManagerMock(pool: Pool[TopicPartition, UnifiedLog]): LogCleanerManagerMock = {
new LogCleanerManagerMock(Seq(logDir), pool, null)
private def createCleanerManagerMock(pool: util.concurrent.ConcurrentMap[TopicPartition, UnifiedLog]): LogCleanerManagerMock = {
new LogCleanerManagerMock(Seq(logDir).asJava, pool, null)
}
private def createLog(segmentSize: Int,

View File

@ -29,7 +29,7 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.config.ServerConfigs
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, UnifiedLog}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogCleanerManager, LogConfig, UnifiedLog}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtensionContext
import org.junit.jupiter.params.ParameterizedTest
@ -86,7 +86,7 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati
// and make sure its gone from checkpoint file
cleaner.logs.remove(topicPartitions(0))
cleaner.updateCheckpoints(logDir, partitionToRemove = Option(topicPartitions(0)))
val checkpoints = new OffsetCheckpointFile(new File(logDir, cleaner.cleanerManager.offsetCheckpointFile), null).read()
val checkpoints = new OffsetCheckpointFile(new File(logDir, LogCleanerManager.OFFSET_CHECKPOINT_FILE), null).read()
// we expect partition 0 to be gone
assertFalse(checkpoints.containsKey(topicPartitions(0)))
}
@ -318,7 +318,7 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati
// TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG
val topicPartition = new TopicPartition(topic, partitionId)
cleaner.awaitCleaned(topicPartition, firstDirty)
val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints(topicPartition)
val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints.get(topicPartition)
assertTrue(lastCleaned >= firstDirty, s"log cleaner should have processed up to offset $firstDirty, but lastCleaned=$lastCleaned")
}

View File

@ -19,7 +19,7 @@ package kafka.log
import kafka.log.LogCleaner.{MaxBufferUtilizationPercentMetricName, MaxCleanTimeMetricName, MaxCompactionDelayMetricsName}
import kafka.server.KafkaConfig
import kafka.utils.{CoreUtils, Logging, Pool, TestUtils}
import kafka.utils.{CoreUtils, Logging, TestUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
@ -29,7 +29,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LocalLog, LogAppendInfo, LogCleaningAbortedException, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog}
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LocalLog, LogAppendInfo, LogCleanerManager, LogCleaningAbortedException, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogToClean, OffsetMap, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog}
import org.apache.kafka.storage.internals.utils.Throttler
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
@ -78,7 +78,7 @@ class LogCleanerTest extends Logging {
try {
val logCleaner = new LogCleaner(new CleanerConfig(true),
logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logs = new ConcurrentHashMap[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
time = time)
val metricsToVerify = new java.util.HashMap[String, java.util.List[java.util.Map[String, String]]]()
@ -99,13 +99,13 @@ class LogCleanerTest extends Logging {
// verify that each metric in `LogCleanerManager` is removed
val mockLogCleanerManagerMetricsGroup = mockMetricsGroupCtor.constructed.get(1)
LogCleanerManager.GaugeMetricNameNoTag.foreach(metricName => verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any()))
LogCleanerManager.GAUGE_METRIC_NAME_NO_TAG.forEach(metricName => verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any()))
metricsToVerify.asScala.foreach { metricNameAndTags =>
metricNameAndTags._2.asScala.foreach { tags =>
verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricNameAndTags._1), any(), ArgumentMatchers.eq(tags))
}
}
LogCleanerManager.GaugeMetricNameNoTag.foreach(verify(mockLogCleanerManagerMetricsGroup).removeMetric(_))
LogCleanerManager.GAUGE_METRIC_NAME_NO_TAG.forEach(verify(mockLogCleanerManagerMetricsGroup).removeMetric(_))
metricsToVerify.asScala.foreach { metricNameAndTags =>
metricNameAndTags._2.asScala.foreach { tags =>
verify(mockLogCleanerManagerMetricsGroup).removeMetric(ArgumentMatchers.eq(metricNameAndTags._1), ArgumentMatchers.eq(tags))
@ -124,7 +124,7 @@ class LogCleanerTest extends Logging {
def testMetricsActiveAfterReconfiguration(): Unit = {
val logCleaner = new LogCleaner(new CleanerConfig(true),
logDirs = Array(TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logs = new util.concurrent.ConcurrentHashMap[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
time = time)
@ -285,7 +285,7 @@ class LogCleanerTest extends Logging {
log.roll()
// clean the log with only one message removed
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, log.activeSegment.baseOffset))
cleaner.clean(new LogToClean(log, 2, log.activeSegment.baseOffset, false))
assertTrue(log.logSegments.iterator.next().log.channel.size < originalMaxFileSize,
"Cleaned segment file should be trimmed to its real size.")
@ -309,7 +309,7 @@ class LogCleanerTest extends Logging {
appendIdempotentAsLeader(log, pid3, producerEpoch)(Seq(1, 4))
log.roll()
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
cleaner.clean(new LogToClean(log, 0L, log.activeSegment.baseOffset, false))
assertEquals(List(2, 5, 7), lastOffsetsPerBatchInLog(log))
assertEquals(Map(pid1 -> 2, pid2 -> 2, pid3 -> 1), lastSequencesInLog(log))
assertEquals(List(2, 3, 1, 4), LogTestUtils.keysInLog(log))
@ -341,7 +341,7 @@ class LogCleanerTest extends Logging {
// do one more append and a round of cleaning to force another deletion from producer 1's batch
appendIdempotentAsLeader(log, pid4, producerEpoch)(Seq(2))
log.roll()
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
cleaner.clean(new LogToClean(log, 0L, log.activeSegment.baseOffset, false))
assertEquals(Map(pid1 -> 2, pid2 -> 2, pid3 -> 1, pid4 -> 0), lastSequencesInLog(log))
assertEquals(List(2, 5, 7, 8), lastOffsetsPerBatchInLog(log))
assertEquals(List(3, 1, 4, 2), LogTestUtils.keysInLog(log))
@ -485,7 +485,7 @@ class LogCleanerTest extends Logging {
val abortedTransactions = log.collectAbortedTransactions(log.logStartOffset, log.logEndOffset)
log.roll()
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
cleaner.clean(new LogToClean(log, 0L, log.activeSegment.baseOffset, false))
assertEquals(List(3, 2), LogTestUtils.keysInLog(log))
assertEquals(List(3, 6, 7, 8, 9), offsetsInLog(log))
@ -525,7 +525,7 @@ class LogCleanerTest extends Logging {
log.appendAsLeader(abortMarker(pid1, producerEpoch), 0, AppendOrigin.COORDINATOR)
// we have only cleaned the records in the first segment
val dirtyOffset = cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))._1
val dirtyOffset = cleaner.clean(new LogToClean(log, 0L, log.activeSegment.baseOffset, false))._1
assertEquals(List(2, 3, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10), LogTestUtils.keysInLog(log))
log.roll()
@ -535,13 +535,12 @@ class LogCleanerTest extends Logging {
appendProducer1(Seq(12))
// finally only the keys from pid3 should remain
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, dirtyOffset, log.activeSegment.baseOffset))
cleaner.clean(new LogToClean(log, dirtyOffset, log.activeSegment.baseOffset, false))
assertEquals(List(2, 3, 6, 7, 8, 9, 11, 12), LogTestUtils.keysInLog(log))
}
@Test
def testCommitMarkerRemoval(): Unit = {
val tp = new TopicPartition("test", 0)
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 256: java.lang.Integer)
@ -559,7 +558,7 @@ class LogCleanerTest extends Logging {
log.roll()
// cannot remove the marker in this pass because there are still valid records
var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
var dirtyOffset = cleaner.doClean(new LogToClean(log, 0L, log.activeSegment.baseOffset, false), currentTime = largeTimestamp)._1
assertEquals(List(1, 3, 2), LogTestUtils.keysInLog(log))
assertEquals(List(0, 2, 3, 4, 5), offsetsInLog(log))
@ -568,17 +567,17 @@ class LogCleanerTest extends Logging {
log.roll()
// the first cleaning preserves the commit marker (at offset 3) since there were still records for the transaction
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
dirtyOffset = cleaner.doClean(new LogToClean(log, dirtyOffset, log.activeSegment.baseOffset, false), currentTime = largeTimestamp)._1
assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
// clean again with same timestamp to verify marker is not removed early
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
dirtyOffset = cleaner.doClean(new LogToClean(log, dirtyOffset, log.activeSegment.baseOffset, false), currentTime = largeTimestamp)._1
assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
// clean again with max timestamp to verify the marker is removed
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1
dirtyOffset = cleaner.doClean(new LogToClean(log, dirtyOffset, log.activeSegment.baseOffset, false), currentTime = Long.MaxValue)._1
assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5, 6, 7, 8), offsetsInLog(log))
}
@ -589,7 +588,6 @@ class LogCleanerTest extends Logging {
*/
@Test
def testDeletedBatchesWithNoMessagesRead(): Unit = {
val tp = new TopicPartition("test", 0)
val cleaner = makeCleaner(capacity = Int.MaxValue, maxMessageSize = 100)
val logProps = new Properties()
logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 100: java.lang.Integer)
@ -607,19 +605,18 @@ class LogCleanerTest extends Logging {
log.appendAsLeader(commitMarker(producerId, producerEpoch), 0, AppendOrigin.COORDINATOR)
log.roll()
cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
cleaner.doClean(new LogToClean(log, 0L, log.activeSegment.baseOffset, false), currentTime = largeTimestamp)
assertEquals(List(2), LogTestUtils.keysInLog(log))
assertEquals(List(1, 3, 4), offsetsInLog(log))
// In the first pass, the deleteHorizon for {Producer2: Commit} is set. In the second pass, it's removed.
runTwoPassClean(cleaner, LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
runTwoPassClean(cleaner, new LogToClean(log, 0L, log.activeSegment.baseOffset, false), currentTime = largeTimestamp)
assertEquals(List(2), LogTestUtils.keysInLog(log))
assertEquals(List(3, 4), offsetsInLog(log))
}
@Test
def testCommitMarkerRetentionWithEmptyBatch(): Unit = {
val tp = new TopicPartition("test", 0)
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 256: java.lang.Integer)
@ -647,14 +644,14 @@ class LogCleanerTest extends Logging {
// first time through the records are removed
// Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}]
var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
var dirtyOffset = cleaner.doClean(new LogToClean(log, 0L, log.activeSegment.baseOffset, false), currentTime = largeTimestamp)._1
assertEquals(List(2, 3), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5, 6, 7), offsetsInLog(log))
assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log))
// the empty batch remains if cleaned again because it still holds the last sequence
// Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}]
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
dirtyOffset = cleaner.doClean(new LogToClean(log, dirtyOffset, log.activeSegment.baseOffset, false), currentTime = largeTimestamp)._1
assertEquals(List(2, 3), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5, 6, 7), offsetsInLog(log))
assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log))
@ -668,14 +665,14 @@ class LogCleanerTest extends Logging {
// Expected State: [{Producer1: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}]
// The deleteHorizon for {Producer2: Commit} is still not set yet.
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
dirtyOffset = cleaner.doClean(new LogToClean(log, dirtyOffset, log.activeSegment.baseOffset, false), currentTime = largeTimestamp)._1
assertEquals(List(2, 3, 1), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5, 6, 7, 8, 9), offsetsInLog(log))
assertEquals(List(1, 4, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log))
// Expected State: [{Producer1: EmptyBatch}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}]
// In the first pass, the deleteHorizon for {Producer2: Commit} is set. In the second pass, it's removed.
dirtyOffset = runTwoPassClean(cleaner, LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)
dirtyOffset = runTwoPassClean(cleaner, new LogToClean(log, dirtyOffset, log.activeSegment.baseOffset, false), currentTime = largeTimestamp)
assertEquals(List(2, 3, 1), LogTestUtils.keysInLog(log))
assertEquals(List(5, 6, 7, 8, 9), offsetsInLog(log))
assertEquals(List(1, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log))
@ -683,7 +680,6 @@ class LogCleanerTest extends Logging {
@Test
def testCleanEmptyControlBatch(): Unit = {
val tp = new TopicPartition("test", 0)
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 256: java.lang.Integer)
@ -701,14 +697,14 @@ class LogCleanerTest extends Logging {
// Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
// In the first pass, the deleteHorizon for the commit marker is set. In the second pass, the commit marker is removed
// but the empty batch is retained for preserving the producer epoch.
var dirtyOffset = runTwoPassClean(cleaner, LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
var dirtyOffset = runTwoPassClean(cleaner, new LogToClean(log, 0L, log.activeSegment.baseOffset, false), currentTime = largeTimestamp)
assertEquals(List(2, 3), LogTestUtils.keysInLog(log))
assertEquals(List(1, 2), offsetsInLog(log))
assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
// the empty control batch does not cause an exception when cleaned
// Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1
dirtyOffset = cleaner.doClean(new LogToClean(log, dirtyOffset, log.activeSegment.baseOffset, false), currentTime = Long.MaxValue)._1
assertEquals(List(2, 3), LogTestUtils.keysInLog(log))
assertEquals(List(1, 2), offsetsInLog(log))
assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
@ -716,7 +712,6 @@ class LogCleanerTest extends Logging {
@Test
def testCommittedTransactionSpanningSegments(): Unit = {
val tp = new TopicPartition("test", 0)
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 128: java.lang.Integer)
@ -732,14 +727,13 @@ class LogCleanerTest extends Logging {
log.roll()
// Both the record and the marker should remain after cleaning
runTwoPassClean(cleaner, LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
runTwoPassClean(cleaner, new LogToClean(log, 0L, log.activeSegment.baseOffset, false), currentTime = largeTimestamp)
assertEquals(List(0, 1), offsetsInLog(log))
assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
}
@Test
def testAbortedTransactionSpanningSegments(): Unit = {
val tp = new TopicPartition("test", 0)
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 128: java.lang.Integer)
@ -757,19 +751,18 @@ class LogCleanerTest extends Logging {
// Both the batch and the marker should remain after cleaning. The batch is retained
// because it is the last entry for this producerId. The marker is retained because
// there are still batches remaining from this transaction.
cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
cleaner.doClean(new LogToClean(log, 0L, log.activeSegment.baseOffset, false), currentTime = largeTimestamp)
assertEquals(List(1), offsetsInLog(log))
assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
// The empty batch and the marker is still retained after a second cleaning.
cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = Long.MaxValue)
cleaner.doClean(new LogToClean(log, 0L, log.activeSegment.baseOffset, false), currentTime = Long.MaxValue)
assertEquals(List(1), offsetsInLog(log))
assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
}
@Test
def testAbortMarkerRemoval(): Unit = {
val tp = new TopicPartition("test", 0)
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 256: java.lang.Integer)
@ -787,12 +780,12 @@ class LogCleanerTest extends Logging {
log.roll()
// Aborted records are removed, but the abort marker is still preserved.
val dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
val dirtyOffset = cleaner.doClean(new LogToClean(log, 0L, log.activeSegment.baseOffset, false), currentTime = largeTimestamp)._1
assertEquals(List(3), LogTestUtils.keysInLog(log))
assertEquals(List(3, 4, 5), offsetsInLog(log))
// In the first pass, the delete horizon for the abort marker is set. In the second pass, the abort marker is removed.
runTwoPassClean(cleaner, LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)
runTwoPassClean(cleaner, new LogToClean(log, dirtyOffset, log.activeSegment.baseOffset, false), currentTime = largeTimestamp)
assertEquals(List(3), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5), offsetsInLog(log))
}
@ -804,7 +797,6 @@ class LogCleanerTest extends Logging {
val producerEpoch = 0.toShort
val producerId = 1L
val tp = new TopicPartition("test", 0)
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 2048: java.lang.Integer)
@ -826,19 +818,18 @@ class LogCleanerTest extends Logging {
// Both transactional batches will be cleaned. The last one will remain in the log
// as an empty batch in order to preserve the producer sequence number and epoch
cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
cleaner.doClean(new LogToClean(log, 0L, log.activeSegment.baseOffset, false), currentTime = largeTimestamp)
assertEquals(List(1, 3, 4, 5), offsetsInLog(log))
assertEquals(List(1, 2, 3, 4, 5), lastOffsetsPerBatchInLog(log))
// In the first pass, the delete horizon for the first marker is set. In the second pass, the first marker is removed.
runTwoPassClean(cleaner, LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
runTwoPassClean(cleaner, new LogToClean(log, 0L, log.activeSegment.baseOffset, false), currentTime = largeTimestamp)
assertEquals(List(3, 4, 5), offsetsInLog(log))
assertEquals(List(2, 3, 4, 5), lastOffsetsPerBatchInLog(log))
}
@Test
def testAbortMarkerRetentionWithEmptyBatch(): Unit = {
val tp = new TopicPartition("test", 0)
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 256: java.lang.Integer)
@ -863,14 +854,14 @@ class LogCleanerTest extends Logging {
assertAbortedTransactionIndexed()
// first time through the records are removed
var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
var dirtyOffset = cleaner.doClean(new LogToClean(log, 0L, log.activeSegment.baseOffset, false), currentTime = largeTimestamp)._1
assertAbortedTransactionIndexed()
assertEquals(List(), LogTestUtils.keysInLog(log))
assertEquals(List(2), offsetsInLog(log)) // abort marker is retained
assertEquals(List(1, 2), lastOffsetsPerBatchInLog(log)) // empty batch is retained
// the empty batch remains if cleaned again because it still holds the last sequence
dirtyOffset = runTwoPassClean(cleaner, LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)
dirtyOffset = runTwoPassClean(cleaner, new LogToClean(log, dirtyOffset, log.activeSegment.baseOffset, false), currentTime = largeTimestamp)
assertAbortedTransactionIndexed()
assertEquals(List(), LogTestUtils.keysInLog(log))
assertEquals(List(2), offsetsInLog(log)) // abort marker is still retained
@ -880,14 +871,14 @@ class LogCleanerTest extends Logging {
appendProducer(Seq(1))
log.roll()
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
dirtyOffset = cleaner.doClean(new LogToClean(log, dirtyOffset, log.activeSegment.baseOffset, false), currentTime = largeTimestamp)._1
assertAbortedTransactionIndexed()
assertEquals(List(1), LogTestUtils.keysInLog(log))
assertEquals(List(2, 3), offsetsInLog(log)) // abort marker is not yet gone because we read the empty batch
assertEquals(List(2, 3), lastOffsetsPerBatchInLog(log)) // but we do not preserve the empty batch
// In the first pass, the delete horizon for the abort marker is set. In the second pass, the abort marker is removed.
dirtyOffset = runTwoPassClean(cleaner, LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)
dirtyOffset = runTwoPassClean(cleaner, new LogToClean(log, dirtyOffset, log.activeSegment.baseOffset, false), currentTime = largeTimestamp)
assertEquals(List(1), LogTestUtils.keysInLog(log))
assertEquals(List(3), offsetsInLog(log)) // abort marker is gone
assertEquals(List(3), lastOffsetsPerBatchInLog(log))
@ -1020,7 +1011,7 @@ class LogCleanerTest extends Logging {
while (log.numberOfSegments < 4)
log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), 0)
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, log.activeSegment.baseOffset))
cleaner.clean(new LogToClean(log, 0, log.activeSegment.baseOffset, false))
val keys = LogTestUtils.keysInLog(log).toSet
assertTrue((0 until leo.toInt by 2).forall(!keys.contains(_)), "None of the keys we deleted should still exist.")
}
@ -1044,7 +1035,7 @@ class LogCleanerTest extends Logging {
val initialLogSize = log.size
val (endOffset, stats) = cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, log.activeSegment.baseOffset))
val (endOffset, stats) = cleaner.clean(new LogToClean(log, 2, log.activeSegment.baseOffset, false))
assertEquals(5, endOffset)
assertEquals(5, stats.messagesRead)
assertEquals(initialLogSize, stats.bytesRead)
@ -1070,7 +1061,7 @@ class LogCleanerTest extends Logging {
// roll the segment, so we can clean the messages already appended
log.roll()
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
cleaner.clean(new LogToClean(log, 0L, log.activeSegment.baseOffset, false))
assertEquals(List(1, 3, 4), lastOffsetsPerBatchInLog(log))
assertEquals(Map(1L -> 0, 2L -> 1, 3L -> 0), lastSequencesInLog(log))
assertEquals(List(0, 1), LogTestUtils.keysInLog(log))
@ -1093,7 +1084,7 @@ class LogCleanerTest extends Logging {
log.appendAsLeader(abortMarker(producerId, producerEpoch), 0, AppendOrigin.COORDINATOR)
log.roll()
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
cleaner.clean(new LogToClean(log, 0L, log.activeSegment.baseOffset, false))
assertEquals(List(2, 3), lastOffsetsPerBatchInLog(log))
assertEquals(Map(producerId -> 2), lastSequencesInLog(log))
assertEquals(List(), LogTestUtils.keysInLog(log))
@ -1102,7 +1093,7 @@ class LogCleanerTest extends Logging {
// Append a new entry from the producer and verify that the empty batch is cleaned up
appendProducer(Seq(1, 5))
log.roll()
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
cleaner.clean(new LogToClean(log, 0L, log.activeSegment.baseOffset, false))
assertEquals(List(3, 5), lastOffsetsPerBatchInLog(log))
assertEquals(Map(producerId -> 4), lastSequencesInLog(log))
@ -1149,7 +1140,7 @@ class LogCleanerTest extends Logging {
assertEquals(List(0, 1, 2, 3), offsetsInLog(log))
// After cleaning, the marker should not be removed
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
cleaner.clean(new LogToClean(log, 0L, log.activeSegment.baseOffset, false))
assertEquals(List(0, 1, 2, 3), lastOffsetsPerBatchInLog(log))
assertEquals(List(0, 1, 2, 3), offsetsInLog(log))
}
@ -1172,16 +1163,16 @@ class LogCleanerTest extends Logging {
log.roll()
// clean the log with only one message removed
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, log.activeSegment.baseOffset))
cleaner.clean(new LogToClean(log, 2, log.activeSegment.baseOffset, false))
assertEquals(List(1,0,1,0), LogTestUtils.keysInLog(log))
assertEquals(List(1,2,3,4), offsetsInLog(log))
// continue to make progress, even though we can only clean one message at a time
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 3, log.activeSegment.baseOffset))
cleaner.clean(new LogToClean(log, 3, log.activeSegment.baseOffset, false))
assertEquals(List(0,1,0), LogTestUtils.keysInLog(log))
assertEquals(List(2,3,4), offsetsInLog(log))
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 4, log.activeSegment.baseOffset))
cleaner.clean(new LogToClean(log, 4, log.activeSegment.baseOffset, false))
assertEquals(List(1,0), LogTestUtils.keysInLog(log))
assertEquals(List(3,4), offsetsInLog(log))
}
@ -1218,7 +1209,7 @@ class LogCleanerTest extends Logging {
assertTrue(distinctValuesBySegment.reverse.tail.forall(_ > N),
"Test is not effective unless each segment contains duplicates. Increase segment size or decrease number of keys.")
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, firstUncleanableOffset))
cleaner.clean(new LogToClean(log, 0, firstUncleanableOffset, false))
val distinctValuesBySegmentAfterClean = distinctValuesBySegment
@ -1241,7 +1232,7 @@ class LogCleanerTest extends Logging {
for (_ <- 0 until 6)
log.appendAsLeader(createRecords, 0)
val logToClean = LogToClean(new TopicPartition("test", 0), log, log.activeSegment.baseOffset, log.activeSegment.baseOffset)
val logToClean = new LogToClean(log, log.activeSegment.baseOffset, log.activeSegment.baseOffset, false)
assertEquals(logToClean.totalBytes, log.size - log.activeSegment.size,
"Total bytes of LogToClean should equal size of all segments excluding the active segment")
@ -1261,7 +1252,7 @@ class LogCleanerTest extends Logging {
// segments [0,1] are clean; segments [2, 3] are cleanable; segments [4,5] are uncleanable
val segs = log.logSegments.asScala.toSeq
val logToClean = LogToClean(new TopicPartition("test", 0), log, segs(2).baseOffset, segs(4).baseOffset)
val logToClean = new LogToClean(log, segs(2).baseOffset, segs(4).baseOffset, false)
val expectedCleanSize = segs.take(2).map(_.size).sum
val expectedCleanableSize = segs.slice(2, 4).map(_.size).sum
@ -1301,7 +1292,7 @@ class LogCleanerTest extends Logging {
log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), 0)
val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages
val (_, stats) = cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, log.activeSegment.baseOffset))
val (_, stats) = cleaner.clean(new LogToClean(log, 0, log.activeSegment.baseOffset, false))
assertEquals(0, unkeyedMessageCountInLog(log), "Log should only contain keyed messages after cleaning.")
assertEquals(expectedSizeAfterCleaning, log.size, "Log should only contain keyed messages after cleaning.")
@ -1472,7 +1463,7 @@ class LogCleanerTest extends Logging {
//segments will not group even their size is very small.
assertEquals(totalSegments - notCleanableSegments, groups.size)
//do clean to clean first 2 segments to empty
cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset))
cleaner.clean(new LogToClean(log, 0, firstUncleanableOffset, false))
assertEquals(totalSegments, log.numberOfSegments)
assertEquals(0, log.logSegments.asScala.head.size)
@ -1482,7 +1473,7 @@ class LogCleanerTest extends Logging {
assertEquals(noneEmptySegment + 1, groups.size)
//trigger a clean and 2 empty segments should cleaned to 1
cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset))
cleaner.clean(new LogToClean(log, 0, firstUncleanableOffset, false))
assertEquals(totalSegments - 1, log.numberOfSegments)
}
@ -1883,7 +1874,7 @@ class LogCleanerTest extends Logging {
log.roll()
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, log.activeSegment.baseOffset))
cleaner.clean(new LogToClean(log, 0, log.activeSegment.baseOffset, false))
for (segment <- log.logSegments.asScala; batch <- segment.log.batches.asScala; record <- batch.asScala) {
assertTrue(record.hasMagic(batch.magic))
@ -1927,14 +1918,14 @@ class LogCleanerTest extends Logging {
key = "0".getBytes,
timestamp = time.milliseconds() + logConfig.deleteRetentionMs + 10000), 0)
log.roll()
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, log.activeSegment.baseOffset))
cleaner.clean(new LogToClean(log, 0, log.activeSegment.baseOffset, false))
// Append a tombstone with a small timestamp and roll out a new log segment.
log.appendAsLeader(TestUtils.singletonRecords(value = null,
key = "0".getBytes,
timestamp = time.milliseconds() - logConfig.deleteRetentionMs - 10000), 0)
log.roll()
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 1, log.activeSegment.baseOffset))
cleaner.clean(new LogToClean(log, 1, log.activeSegment.baseOffset, false))
assertEquals(1, log.logSegments.asScala.head.log.batches.iterator.next().lastOffset,
"The tombstone should be retained.")
// Append a message and roll out another log segment.
@ -1942,7 +1933,7 @@ class LogCleanerTest extends Logging {
key = "1".getBytes,
timestamp = time.milliseconds()), 0)
log.roll()
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, log.activeSegment.baseOffset))
cleaner.clean(new LogToClean(log, 2, log.activeSegment.baseOffset, false))
assertEquals(1, log.logSegments.asScala.head.log.batches.iterator.next().lastOffset,
"The tombstone should be retained.")
}
@ -1967,7 +1958,7 @@ class LogCleanerTest extends Logging {
// active segment record
log.appendAsFollower(messageWithOffset(1015, 1015, 11L), Int.MaxValue)
val (nextDirtyOffset, _) = cleaner.clean(LogToClean(log.topicPartition, log, 0L, log.activeSegment.baseOffset, needCompactionNow = true))
val (nextDirtyOffset, _) = cleaner.clean(new LogToClean(log, 0L, log.activeSegment.baseOffset, true))
assertEquals(log.activeSegment.baseOffset, nextDirtyOffset,
"Cleaning point should pass offset gap")
}
@ -1986,7 +1977,7 @@ class LogCleanerTest extends Logging {
// active segment record
log.appendAsFollower(messageWithOffset(1015, 1015, 30L), Int.MaxValue)
val (nextDirtyOffset, _) = cleaner.clean(LogToClean(log.topicPartition, log, 0L, log.activeSegment.baseOffset, needCompactionNow = true))
val (nextDirtyOffset, _) = cleaner.clean(new LogToClean(log, 0L, log.activeSegment.baseOffset, true))
assertEquals(log.activeSegment.baseOffset, nextDirtyOffset,
"Cleaning point should pass offset gap in multiple segments")
}
@ -1996,7 +1987,7 @@ class LogCleanerTest extends Logging {
def testMaxCleanTimeSecs(): Unit = {
val logCleaner = new LogCleaner(new CleanerConfig(true),
logDirs = Array(TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logs = new util.concurrent.ConcurrentHashMap[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
time = time)
@ -2020,7 +2011,7 @@ class LogCleanerTest extends Logging {
val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new KafkaConfig(oldKafkaProps)),
logDirs = Array(TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logs = new util.concurrent.ConcurrentHashMap[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
time = time) {
// shutdown() and startup() are called in LogCleaner.reconfigure().
@ -2048,7 +2039,7 @@ class LogCleanerTest extends Logging {
val logCleaner = new LogCleaner(
new CleanerConfig(true),
logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logs = new util.concurrent.ConcurrentHashMap[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
time = time
)
@ -2100,7 +2091,7 @@ class LogCleanerTest extends Logging {
val logCleaner = new LogCleaner(
new CleanerConfig(true),
logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logs = new util.concurrent.ConcurrentHashMap[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
time = time
)
@ -2152,7 +2143,7 @@ class LogCleanerTest extends Logging {
val logCleaner = new LogCleaner(
new CleanerConfig(true),
logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logs = new util.concurrent.ConcurrentHashMap[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
time = time
)
@ -2171,28 +2162,28 @@ class LogCleanerTest extends Logging {
val cleaner1 = new logCleaner.CleanerThread(1)
cleaner1.lastStats = new CleanerStats(time)
cleaner1.lastPreCleanStats.maxCompactionDelayMs = 1_000L
cleaner1.lastPreCleanStats.maxCompactionDelayMs(1_000L)
cleaners += cleaner1
val cleaner2 = new logCleaner.CleanerThread(2)
cleaner2.lastStats = new CleanerStats(time)
cleaner2.lastPreCleanStats.maxCompactionDelayMs = 2_000L
cleaner2.lastPreCleanStats.maxCompactionDelayMs(2_000L)
cleaners += cleaner2
val cleaner3 = new logCleaner.CleanerThread(3)
cleaner3.lastStats = new CleanerStats(time)
cleaner3.lastPreCleanStats.maxCompactionDelayMs = 3_000L
cleaner3.lastPreCleanStats.maxCompactionDelayMs(3_000L)
cleaners += cleaner3
// expect the gauge value to reflect the maximum CompactionDelay
assertMaxCompactionDelay(3)
// Update CompactionDelay and verify the gauge value updates
cleaner1.lastPreCleanStats.maxCompactionDelayMs = 4_000L
cleaner1.lastPreCleanStats.maxCompactionDelayMs(4_000L)
assertMaxCompactionDelay(4)
// All CleanerThreads have the same CompactionDelay
cleaners.foreach(_.lastPreCleanStats.maxCompactionDelayMs = 1_500L)
cleaners.foreach(_.lastPreCleanStats.maxCompactionDelayMs(1_500L))
assertMaxCompactionDelay(1)
} finally {
logCleaner.shutdown()

View File

@ -43,7 +43,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, UnexpectedAppendOffs
import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetResultHolder, OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogToClean, OffsetResultHolder, OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.internals.utils.Throttler
import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats}
import org.junit.jupiter.api.Assertions._
@ -1100,7 +1100,7 @@ class UnifiedLogTest {
// Clean segments, this should delete everything except the active segment since there only
// exists the key "a".
cleaner.clean(LogToClean(log.topicPartition, log, 0, log.logEndOffset))
cleaner.clean(new LogToClean(log, 0, log.logEndOffset, false))
log.deleteOldSegments()
// Sleep to breach the file delete delay and run scheduled file deletion tasks
mockTime.sleep(1)

View File

@ -279,15 +279,6 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Bug pattern="RV_RETURN_VALUE_IGNORED"/>
</Match>
<Match>
<!-- Suppress a warning about ignoring the return value of await.
This is done intentionally because we use other clues to determine
if the wait was cut short. -->
<Package name="kafka.log"/>
<Source name="LogCleanerManager.scala"/>
<Bug pattern="RV_RETURN_VALUE_IGNORED,RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
</Match>
<Match>
<!-- Suppress some warnings about intentional switch statement fallthrough. -->
<Class name="org.apache.kafka.connect.runtime.WorkerConnector"/>

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.util;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.function.Supplier;
/**
* A utility class providing helper methods for working with {@link Lock} objects.
* This class simplifies the usage of locks by encapsulating common patterns,
* such as acquiring and releasing locks in a safe manner.
*/
public class LockUtils {
/**
* Executes the given {@link Supplier} within the context of the specified {@link Lock}.
* The lock is acquired before executing the supplier and released after the execution,
* ensuring that the lock is always released, even if an exception is thrown.
*
* @param <T> the type of the result returned by the supplier
* @param lock the lock to be acquired and released
* @param supplier the supplier to be executed within the lock context
* @return the result of the supplier
* @throws NullPointerException if either {@code lock} or {@code supplier} is null
*/
public static <T> T inLock(Lock lock, Supplier<T> supplier) {
Objects.requireNonNull(lock, "Lock must not be null");
Objects.requireNonNull(supplier, "Supplier must not be null");
lock.lock();
try {
return supplier.get();
} finally {
lock.unlock();
}
}
}

View File

@ -0,0 +1,798 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.kafka.server.util.LockUtils.inLock;
/**
* This class manages the state (see {@link LogCleaningState}) of each partition being cleaned.
* <ul>
* <li>1. None : No cleaning state in a TopicPartition. In this state, it can become LogCleaningInProgress
* or LogCleaningPaused(1). Valid previous state are LogCleaningInProgress and LogCleaningPaused(1)</li>
* <li>2. LogCleaningInProgress : The cleaning is currently in progress. In this state, it can become None when log cleaning is finished
* or become LogCleaningAborted. Valid previous state is None.</li>
* <li>3. LogCleaningAborted : The cleaning abort is requested. In this state, it can become LogCleaningPaused(1).
* Valid previous state is LogCleaningInProgress.</li>
* <li>4-a. LogCleaningPaused(1) : The cleaning is paused once. No log cleaning can be done in this state.
* In this state, it can become None or LogCleaningPaused(2).
* Valid previous state is None, LogCleaningAborted or LogCleaningPaused(2).</li>
* <li>4-b. LogCleaningPaused(i) : The cleaning is paused i times where i>= 2. No log cleaning can be done in this state.
* In this state, it can become LogCleaningPaused(i-1) or LogCleaningPaused(i+1).
* Valid previous state is LogCleaningPaused(i-1) or LogCleaningPaused(i+1).</li>
* </ul>
*/
public class LogCleanerManager {
public static final String OFFSET_CHECKPOINT_FILE = "cleaner-offset-checkpoint";
private static final Logger LOG = LoggerFactory.getLogger("kafka.log.LogCleaner");
private static final String UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME = "uncleanable-partitions-count";
private static final String UNCLEANABLE_BYTES_METRIC_NAME = "uncleanable-bytes";
private static final String MAX_DIRTY_PERCENT_METRIC_NAME = "max-dirty-percent";
private static final String TIME_SINCE_LAST_RUN_MS_METRIC_NAME = "time-since-last-run-ms";
// Visible for testing
public static final Set<String> GAUGE_METRIC_NAME_NO_TAG = Set.of(MAX_DIRTY_PERCENT_METRIC_NAME, TIME_SINCE_LAST_RUN_MS_METRIC_NAME);
// For compatibility, metrics are defined to be under `kafka.log.LogCleanerManager` class
private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.log", "LogCleanerManager");
/**
* The set of logs currently being cleaned.
*/
private final Map<TopicPartition, LogCleaningState> inProgress = new HashMap<>();
/**
* The set of uncleanable partitions (partitions that have raised an unexpected error during cleaning)
* for each log directory.
*/
private final Map<String, Set<TopicPartition>> uncleanablePartitions = new HashMap<>();
/**
* A global lock used to control all access to the in-progress set and the offset checkpoints.
*/
private final Lock lock = new ReentrantLock();
/**
* For coordinating the pausing and the cleaning of a partition.
*/
private final Condition pausedCleaningCond = lock.newCondition();
private final Map<String, List<Map<String, String>>> gaugeMetricNameWithTag = new HashMap<>();
private final ConcurrentMap<TopicPartition, UnifiedLog> logs;
/**
* The offset checkpoints holding the last cleaned point for each log.
*/
private volatile Map<File, OffsetCheckpointFile> checkpoints;
private volatile double dirtiestLogCleanableRatio;
private volatile long timeOfLastRun;
@SuppressWarnings({"this-escape"})
public LogCleanerManager(
List<File> logDirs,
ConcurrentMap<TopicPartition, UnifiedLog> logs,
LogDirFailureChannel logDirFailureChannel
) {
this.logs = logs;
checkpoints = logDirs.stream()
.collect(Collectors.toMap(
dir -> dir,
dir -> {
try {
return new OffsetCheckpointFile(new File(dir, OFFSET_CHECKPOINT_FILE), logDirFailureChannel);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
));
registerMetrics(logDirs);
}
private void registerMetrics(List<File> logDirs) {
// gauges for tracking the number of partitions marked as uncleanable for each log directory
for (File dir : logDirs) {
Map<String, String> metricTag = Map.of("logDirectory", dir.getAbsolutePath());
metricsGroup.newGauge(
UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME,
() -> inLock(lock, () -> uncleanablePartitions.getOrDefault(dir.getAbsolutePath(), Set.of()).size()),
metricTag
);
gaugeMetricNameWithTag
.computeIfAbsent(UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME, k -> new ArrayList<>())
.add(metricTag);
}
// gauges for tracking the number of uncleanable bytes from uncleanable partitions for each log directory
for (File dir : logDirs) {
Map<String, String> metricTag = Map.of("logDirectory", dir.getAbsolutePath());
metricsGroup.newGauge(
UNCLEANABLE_BYTES_METRIC_NAME,
() -> inLock(lock, () -> {
Set<TopicPartition> partitions = uncleanablePartitions.get(dir.getAbsolutePath());
if (partitions == null) {
return 0;
} else {
Map<TopicPartition, Long> lastClean = allCleanerCheckpoints();
long now = Time.SYSTEM.milliseconds();
return partitions.stream()
.mapToLong(tp -> {
UnifiedLog log = logs.get(tp);
if (log != null) {
Optional<Long> lastCleanOffset = Optional.of(lastClean.get(tp));
try {
OffsetsToClean offsetsToClean = cleanableOffsets(log, lastCleanOffset, now);
return calculateCleanableBytes(log, offsetsToClean.firstDirtyOffset(),
offsetsToClean.firstUncleanableDirtyOffset()).getValue();
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
return 0L;
}
}).sum();
}
}),
metricTag
);
gaugeMetricNameWithTag
.computeIfAbsent(UNCLEANABLE_BYTES_METRIC_NAME, k -> new ArrayList<>())
.add(metricTag);
}
// a gauge for tracking the cleanable ratio of the dirtiest log
dirtiestLogCleanableRatio = 0.0;
metricsGroup.newGauge(MAX_DIRTY_PERCENT_METRIC_NAME, () -> (int) (100 * dirtiestLogCleanableRatio));
// a gauge for tracking the time since the last log cleaner run, in milliseconds
timeOfLastRun = Time.SYSTEM.milliseconds();
metricsGroup.newGauge(TIME_SINCE_LAST_RUN_MS_METRIC_NAME, () -> Time.SYSTEM.milliseconds() - timeOfLastRun);
}
public Map<String, List<Map<String, String>>> gaugeMetricNameWithTag() {
return gaugeMetricNameWithTag;
}
/**
* @return the position processed for all logs.
*/
public Map<TopicPartition, Long> allCleanerCheckpoints() {
return inLock(lock, () -> checkpoints.values().stream()
.flatMap(checkpoint -> {
try {
return checkpoint.read().entrySet().stream();
} catch (KafkaStorageException e) {
LOG.error("Failed to access checkpoint file {} in dir {}",
checkpoint.file().getName(), checkpoint.file().getParentFile().getAbsolutePath(), e);
return Stream.empty();
}
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}
/**
* Public for unit test. Get the cleaning state of the partition.
*/
public Optional<LogCleaningState> cleaningState(TopicPartition tp) {
return inLock(lock, () -> Optional.ofNullable(inProgress.get(tp)));
}
/**
* Public for unit test. Set the cleaning state of the partition.
*/
public void setCleaningState(TopicPartition tp, LogCleaningState state) {
inLock(lock, () -> inProgress.put(tp, state));
}
/**
* Choose the log to clean next and add it to the in-progress set. We recompute this
* each time from the full set of logs to allow logs to be dynamically added to the pool of logs
* the log manager maintains.
*/
public Optional<LogToClean> grabFilthiestCompactedLog(Time time, PreCleanStats preCleanStats) {
return inLock(lock, () -> {
long now = time.milliseconds();
timeOfLastRun = now;
Map<TopicPartition, Long> lastClean = allCleanerCheckpoints();
List<LogToClean> dirtyLogs = logs.entrySet().stream()
.filter(entry -> entry.getValue().config().compact &&
!inProgress.containsKey(entry.getKey()) &&
!isUncleanablePartition(entry.getValue(), entry.getKey())
)
.map(entry -> {
// create a LogToClean instance for each
TopicPartition topicPartition = entry.getKey();
UnifiedLog log = entry.getValue();
try {
Long lastCleanOffset = lastClean.get(topicPartition);
OffsetsToClean offsetsToClean = cleanableOffsets(log, Optional.ofNullable(lastCleanOffset), now);
// update checkpoint for logs with invalid checkpointed offsets
if (offsetsToClean.forceUpdateCheckpoint) {
updateCheckpoints(log.parentDirFile(), Optional.of(Map.entry(topicPartition, offsetsToClean.firstDirtyOffset)), Optional.empty());
}
long compactionDelayMs = maxCompactionDelay(log, offsetsToClean.firstDirtyOffset, now);
preCleanStats.updateMaxCompactionDelay(compactionDelayMs);
return new LogToClean(log, offsetsToClean.firstDirtyOffset,
offsetsToClean.firstUncleanableDirtyOffset, compactionDelayMs > 0);
} catch (Throwable e) {
throw new LogCleaningException(log, "Failed to calculate log cleaning stats for partition " + topicPartition, e);
}
}
)
.filter(ltc -> ltc.totalBytes() > 0) // skip any empty logs
.toList();
dirtiestLogCleanableRatio = dirtyLogs.isEmpty()
? 0
: dirtyLogs.stream()
.mapToDouble(LogToClean::cleanableRatio)
.max()
.orElse(0.0);
// and must meet the minimum threshold for dirty byte ratio or have some bytes required to be compacted
List<LogToClean> cleanableLogs = dirtyLogs.stream()
.filter(ltc -> (ltc.needCompactionNow() && ltc.cleanableBytes() > 0) || ltc.cleanableRatio() > ltc.log().config().minCleanableRatio)
.toList();
if (cleanableLogs.isEmpty()) {
return Optional.empty();
} else {
preCleanStats.recordCleanablePartitions(cleanableLogs.size());
LogToClean filthiest = cleanableLogs.stream()
.max(Comparator.comparingDouble(LogToClean::cleanableRatio))
.orElseThrow(() -> new IllegalStateException("No filthiest log found"));
inProgress.put(filthiest.topicPartition(), LogCleaningState.LOG_CLEANING_IN_PROGRESS);
return Optional.of(filthiest);
}
});
}
/**
* Pause logs cleaning for logs that do not have compaction enabled
* and do not have other deletion or compaction in progress.
* This is to handle potential race between retention and cleaner threads when users
* switch topic configuration between compacted and non-compacted topic.
*
* @return retention logs that have log cleaning successfully paused
*/
public List<Map.Entry<TopicPartition, UnifiedLog>> pauseCleaningForNonCompactedPartitions() {
return inLock(lock, () -> {
List<Map.Entry<TopicPartition, UnifiedLog>> deletableLogs = logs.entrySet().stream()
.filter(entry -> !entry.getValue().config().compact) // pick non-compacted logs
.filter(entry -> !inProgress.containsKey(entry.getKey())) // skip any logs already in-progress
.collect(Collectors.toList());
deletableLogs.forEach(entry -> inProgress.put(entry.getKey(), LogCleaningState.logCleaningPaused(1)));
return deletableLogs;
});
}
/**
* Find any logs that have compaction enabled. Mark them as being cleaned
* Include logs without delete enabled, as they may have segments
* that precede the start offset.
*/
public Map<TopicPartition, UnifiedLog> deletableLogs() {
return inLock(lock, () -> {
Map<TopicPartition, UnifiedLog> toClean = logs.entrySet().stream()
.filter(entry -> {
TopicPartition topicPartition = entry.getKey();
UnifiedLog log = entry.getValue();
return !inProgress.containsKey(topicPartition) && log.config().compact &&
!isUncleanablePartition(log, topicPartition);
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
toClean.forEach((partition, log) -> inProgress.put(partition, LogCleaningState.LOG_CLEANING_IN_PROGRESS));
return toClean;
});
}
/**
* Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of
* the partition is aborted.
* This is implemented by first abortAndPausing and then resuming the cleaning of the partition.
*/
public void abortCleaning(TopicPartition topicPartition) {
inLock(lock, () -> {
abortAndPauseCleaning(topicPartition);
resumeCleaning(List.of(topicPartition));
return null;
});
}
/**
* Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition.
* This call blocks until the cleaning of the partition is aborted and paused.
* <ol>
* <li>If the partition is not in progress, mark it as paused.</li>
* <li>Otherwise, first mark the state of the partition as aborted.</li>
* <li>The cleaner thread checks the state periodically and if it sees the state of the partition is aborted, it
* throws a LogCleaningAbortedException to stop the cleaning task.</li>
* <li>When the cleaning task is stopped, doneCleaning() is called, which sets the state of the partition as paused.</li>
* <li>abortAndPauseCleaning() waits until the state of the partition is changed to paused.</li>
* <li>If the partition is already paused, a new call to this function
* will increase the paused count by one.</li>
* </ol>
*/
public void abortAndPauseCleaning(TopicPartition topicPartition) {
inLock(lock, () -> {
LogCleaningState state = inProgress.get(topicPartition);
if (state == null) {
inProgress.put(topicPartition, LogCleaningState.logCleaningPaused(1));
} else if (state == LogCleaningState.LOG_CLEANING_IN_PROGRESS) {
inProgress.put(topicPartition, LogCleaningState.LOG_CLEANING_ABORTED);
} else if (state instanceof LogCleaningState.LogCleaningPaused logCleaningPaused) {
inProgress.put(topicPartition, LogCleaningState.logCleaningPaused(logCleaningPaused.pausedCount() + 1));
} else {
throw new IllegalStateException("Compaction for partition " + topicPartition +
" cannot be aborted and paused since it is in " + state + " state.");
}
while (!isCleaningInStatePaused(topicPartition)) {
try {
pausedCleaningCond.await(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return null;
});
}
/**
* Resume the cleaning of paused partitions.
* Each call of this function will undo one pause.
*/
public void resumeCleaning(List<TopicPartition> topicPartitions) {
inLock(lock, () -> {
topicPartitions.forEach(topicPartition -> {
LogCleaningState state = inProgress.get(topicPartition);
if (state == null) {
throw new IllegalStateException("Compaction for partition " + topicPartition + " cannot be resumed since it is not paused.");
}
if (state instanceof LogCleaningState.LogCleaningPaused logCleaningPaused) {
if (logCleaningPaused.pausedCount() == 1) {
inProgress.remove(topicPartition);
} else if (logCleaningPaused.pausedCount() > 1) {
inProgress.put(topicPartition, LogCleaningState.logCleaningPaused(logCleaningPaused.pausedCount() - 1));
}
} else {
throw new IllegalStateException("Compaction for partition " + topicPartition +
" cannot be resumed since it is in " + state + " state.");
}
});
return null;
});
}
/**
* Check if the cleaning for a partition is in a particular state. The caller is expected to hold lock while making the call.
*/
private boolean isCleaningInState(TopicPartition topicPartition, LogCleaningState expectedState) {
LogCleaningState state = inProgress.get(topicPartition);
if (state == null) {
return false;
} else {
return state == expectedState;
}
}
/**
* Check if the cleaning for a partition is paused. The caller is expected to hold lock while making the call.
*/
private boolean isCleaningInStatePaused(TopicPartition topicPartition) {
LogCleaningState state = inProgress.get(topicPartition);
if (state == null) {
return false;
} else {
return state instanceof LogCleaningState.LogCleaningPaused;
}
}
/**
* Check if the cleaning for a partition is aborted. If so, throw an exception.
*/
public void checkCleaningAborted(TopicPartition topicPartition) {
inLock(lock, () -> {
if (isCleaningInState(topicPartition, LogCleaningState.LOG_CLEANING_ABORTED)) {
throw new LogCleaningAbortedException();
}
return null;
});
}
/**
* Update checkpoint file, adding or removing partitions if necessary.
*
* @param dataDir The File object to be updated
* @param partitionToUpdateOrAdd The [TopicPartition, Long] map entry to be updated. pass "Optional.empty" if doing remove, not add
* @param partitionToRemove The TopicPartition to be removed
*/
public void updateCheckpoints(
File dataDir,
Optional<Map.Entry<TopicPartition, Long>> partitionToUpdateOrAdd,
Optional<TopicPartition> partitionToRemove
) {
inLock(lock, () -> {
OffsetCheckpointFile checkpoint = checkpoints.get(dataDir);
if (checkpoint != null) {
try {
Map<TopicPartition, Long> currentCheckpoint = checkpoint.read().entrySet().stream()
.filter(entry -> logs.containsKey(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Map<TopicPartition, Long> updatedCheckpoint = new HashMap<>(currentCheckpoint);
// Remove the partition offset if present
partitionToRemove.ifPresent(updatedCheckpoint::remove);
// Update or add the partition offset if present
partitionToUpdateOrAdd.ifPresent(entry -> updatedCheckpoint.put(entry.getKey(), entry.getValue()));
// Write back the updated checkpoint
checkpoint.write(updatedCheckpoint);
} catch (KafkaStorageException e) {
LOG.error("Failed to access checkpoint file {} in dir {}",
checkpoint.file().getName(), checkpoint.file().getParentFile().getAbsolutePath(), e);
}
}
return null;
});
}
/**
* Alter the checkpoint directory for the topicPartition, to remove the data in sourceLogDir, and add the data in destLogDir.
*/
public void alterCheckpointDir(TopicPartition topicPartition, File sourceLogDir, File destLogDir) {
inLock(lock, () -> {
try {
Optional<Long> offsetOpt = Optional.ofNullable(checkpoints.get(sourceLogDir))
.flatMap(checkpoint -> Optional.ofNullable(checkpoint.read().get(topicPartition)));
offsetOpt.ifPresent(offset -> {
LOG.debug("Removing the partition offset data in checkpoint file for '{}' from {} directory.",
topicPartition, sourceLogDir.getAbsoluteFile());
updateCheckpoints(sourceLogDir, Optional.empty(), Optional.of(topicPartition));
LOG.debug("Adding the partition offset data in checkpoint file for '{}' to {} directory.",
topicPartition, destLogDir.getAbsoluteFile());
updateCheckpoints(destLogDir, Optional.of(Map.entry(topicPartition, offset)), Optional.empty());
});
} catch (KafkaStorageException e) {
LOG.error("Failed to access checkpoint file in dir {}", sourceLogDir.getAbsolutePath(), e);
}
Set<TopicPartition> logUncleanablePartitions = uncleanablePartitions.getOrDefault(sourceLogDir.toString(), Collections.emptySet());
if (logUncleanablePartitions.contains(topicPartition)) {
logUncleanablePartitions.remove(topicPartition);
markPartitionUncleanable(destLogDir.toString(), topicPartition);
}
return null;
});
}
/**
* Stop cleaning logs in the provided directory.
*
* @param dir the absolute path of the log dir
*/
public void handleLogDirFailure(String dir) {
LOG.warn("Stopping cleaning logs in dir {}", dir);
inLock(lock, () -> {
checkpoints = checkpoints.entrySet().stream()
.filter(entry -> !entry.getKey().getAbsolutePath().equals(dir))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return null;
});
}
/**
* Truncate the checkpointed offset for the given partition if its checkpointed offset is larger than the given offset.
*/
public void maybeTruncateCheckpoint(File dataDir, TopicPartition topicPartition, long offset) {
inLock(lock, () -> {
if (logs.get(topicPartition).config().compact) {
OffsetCheckpointFile checkpoint = checkpoints.get(dataDir);
if (checkpoint != null) {
Map<TopicPartition, Long> existing = checkpoint.read();
if (existing.getOrDefault(topicPartition, 0L) > offset) {
existing.put(topicPartition, offset);
checkpoint.write(existing);
}
}
}
return null;
});
}
/**
* Save out the endOffset and remove the given log from the in-progress set, if not aborted.
*/
public void doneCleaning(TopicPartition topicPartition, File dataDir, long endOffset) {
inLock(lock, () -> {
LogCleaningState state = inProgress.get(topicPartition);
if (state == null) {
throw new IllegalStateException("State for partition " + topicPartition + " should exist.");
} else if (state == LogCleaningState.LOG_CLEANING_IN_PROGRESS) {
updateCheckpoints(dataDir, Optional.of(Map.entry(topicPartition, endOffset)), Optional.empty());
inProgress.remove(topicPartition);
} else if (state == LogCleaningState.LOG_CLEANING_ABORTED) {
inProgress.put(topicPartition, LogCleaningState.logCleaningPaused(1));
pausedCleaningCond.signalAll();
} else {
throw new IllegalStateException("In-progress partition " + topicPartition + " cannot be in " + state + " state.");
}
return null;
});
}
public void doneDeleting(List<TopicPartition> topicPartitions) {
inLock(lock, () -> {
topicPartitions.forEach(topicPartition -> {
LogCleaningState logCleaningState = inProgress.get(topicPartition);
if (logCleaningState == null) {
throw new IllegalStateException("State for partition " + topicPartition + " should exist.");
} else if (logCleaningState == LogCleaningState.LOG_CLEANING_IN_PROGRESS) {
inProgress.remove(topicPartition);
} else if (logCleaningState == LogCleaningState.LOG_CLEANING_ABORTED) {
inProgress.put(topicPartition, LogCleaningState.logCleaningPaused(1));
pausedCleaningCond.signalAll();
} else {
throw new IllegalStateException("In-progress partition " + topicPartition + " cannot be in " + logCleaningState + " state.");
}
});
return null;
});
}
/**
* Returns an immutable set of the uncleanable partitions for a given log directory.
* Only used for testing.
*/
public Set<TopicPartition> uncleanablePartitions(String logDir) {
return inLock(lock, () -> {
Set<TopicPartition> partitions = uncleanablePartitions.get(logDir);
return partitions != null ? Set.copyOf(partitions) : Set.of();
});
}
public void markPartitionUncleanable(String logDir, TopicPartition partition) {
inLock(lock, () -> {
Set<TopicPartition> partitions = uncleanablePartitions.computeIfAbsent(logDir, dir -> new HashSet<>());
partitions.add(partition);
return null;
});
}
private boolean isUncleanablePartition(UnifiedLog log, TopicPartition topicPartition) {
return inLock(lock, () -> Optional.ofNullable(uncleanablePartitions.get(log.parentDir()))
.map(partitions -> partitions.contains(topicPartition))
.orElse(false)
);
}
public void maintainUncleanablePartitions() {
// Remove deleted partitions from uncleanablePartitions
inLock(lock, () -> {
// Remove deleted partitions
uncleanablePartitions.values().forEach(partitions ->
partitions.removeIf(partition -> !logs.containsKey(partition)));
// Remove entries with empty partition set.
uncleanablePartitions.entrySet().removeIf(entry -> entry.getValue().isEmpty());
return null;
});
}
public void removeMetrics() {
GAUGE_METRIC_NAME_NO_TAG.forEach(metricsGroup::removeMetric);
gaugeMetricNameWithTag.forEach((metricName, tags) ->
tags.forEach(tag -> metricsGroup.removeMetric(metricName, tag)));
gaugeMetricNameWithTag.clear();
}
private static boolean isCompactAndDelete(UnifiedLog log) {
return log.config().compact && log.config().delete;
}
/**
* Get max delay between the time when log is required to be compacted as determined
* by maxCompactionLagMs and the current time.
*/
private static long maxCompactionDelay(UnifiedLog log, long firstDirtyOffset, long now) {
List<LogSegment> dirtyNonActiveSegments = log.nonActiveLogSegmentsFrom(firstDirtyOffset);
Stream<Long> firstBatchTimestamps = log.getFirstBatchTimestampForSegments(dirtyNonActiveSegments).stream()
.filter(timestamp -> timestamp > 0);
long earliestDirtySegmentTimestamp = firstBatchTimestamps.min(Comparator.naturalOrder()).orElse(Long.MAX_VALUE);
long maxCompactionLagMs = Math.max(log.config().maxCompactionLagMs, 0L);
long cleanUntilTime = now - maxCompactionLagMs;
return earliestDirtySegmentTimestamp < cleanUntilTime ? cleanUntilTime - earliestDirtySegmentTimestamp : 0L;
}
/**
* Returns the range of dirty offsets that can be cleaned.
*
* @param log the log
* @param lastCleanOffset the last checkpointed offset
* @param now the current time in milliseconds of the cleaning operation
* @return OffsetsToClean containing offsets for cleanable portion of log and whether the log checkpoint needs updating
* @throws IOException if an I/O error occurs
*/
public static OffsetsToClean cleanableOffsets(UnifiedLog log, Optional<Long> lastCleanOffset, long now) throws IOException {
// If the log segments are abnormally truncated and hence the checkpointed offset is no longer valid;
// reset to the log starting offset and log the error
long logStartOffset = log.logStartOffset();
long checkpointDirtyOffset = lastCleanOffset.orElse(logStartOffset);
long firstDirtyOffset;
boolean forceUpdateCheckpoint;
if (checkpointDirtyOffset < logStartOffset) {
// Don't bother with the warning if compact and delete are enabled.
if (!isCompactAndDelete(log))
LOG.warn("Resetting first dirty offset of {} to log start offset {} since the checkpointed offset {} is invalid.",
log.name(), logStartOffset, checkpointDirtyOffset);
firstDirtyOffset = logStartOffset;
forceUpdateCheckpoint = true;
} else if (checkpointDirtyOffset > log.logEndOffset()) {
// The dirty offset has gotten ahead of the log end offset. This could happen if there was data
// corruption at the end of the log. We conservatively assume that the full log needs cleaning.
LOG.warn("The last checkpoint dirty offset for partition {} is {}, " +
"which is larger than the log end offset {}. Resetting to the log start offset {}.",
log.name(), checkpointDirtyOffset, log.logEndOffset(), logStartOffset);
firstDirtyOffset = logStartOffset;
forceUpdateCheckpoint = true;
} else {
firstDirtyOffset = checkpointDirtyOffset;
forceUpdateCheckpoint = false;
}
long minCompactionLagMs = Math.max(log.config().compactionLagMs, 0L);
// Find the first segment that cannot be cleaned. We cannot clean past:
// 1. The active segment
// 2. The last stable offset (including the high watermark)
// 3. Any segments closer to the head of the log than the minimum compaction lag time
long firstUncleanableDirtyOffset = Stream.of(
// we do not clean beyond the last stable offset
Optional.of(log.lastStableOffset()),
// the active segment is always uncleanable
Optional.of(log.activeSegment().baseOffset()),
// the first segment whose largest message timestamp is within a minimum time lag from now
minCompactionLagMs > 0 ? findFirstUncleanableSegment(log, firstDirtyOffset, now, minCompactionLagMs) : Optional.<Long>empty()
)
.flatMap(Optional::stream)
.min(Long::compare)
.orElseThrow(() -> new IllegalStateException("No uncleanable offset found"));
LOG.debug("Finding range of cleanable offsets for log={}. Last clean offset={} " +
"now={} => firstDirtyOffset={} firstUncleanableOffset={} activeSegment.baseOffset={}",
log.name(), lastCleanOffset, now, firstDirtyOffset, firstUncleanableDirtyOffset, log.activeSegment().baseOffset());
return new OffsetsToClean(firstDirtyOffset, Math.max(firstDirtyOffset, firstUncleanableDirtyOffset), forceUpdateCheckpoint);
}
/**
* Given the first dirty offset and an uncleanable offset, calculates the total cleanable bytes for this log.
*
* @return the biggest uncleanable offset and the total amount of cleanable bytes
*/
public static Map.Entry<Long, Long> calculateCleanableBytes(UnifiedLog log, long firstDirtyOffset, long uncleanableOffset) {
List<LogSegment> nonActiveSegments = log.nonActiveLogSegmentsFrom(uncleanableOffset);
LogSegment firstUncleanableSegment = nonActiveSegments.isEmpty() ? log.activeSegment() : nonActiveSegments.get(0);
long firstUncleanableOffset = firstUncleanableSegment.baseOffset();
long cleanableBytes = log.logSegments(Math.min(firstDirtyOffset, firstUncleanableOffset), firstUncleanableOffset).stream()
.mapToLong(LogSegment::size)
.sum();
return Map.entry(firstUncleanableOffset, cleanableBytes);
}
private static Optional<Long> findFirstUncleanableSegment(UnifiedLog log, long firstDirtyOffset, long now, long minCompactionLagMs) throws IOException {
List<LogSegment> dirtyNonActiveSegments = log.nonActiveLogSegmentsFrom(firstDirtyOffset);
for (LogSegment segment : dirtyNonActiveSegments) {
boolean isUncleanable = segment.largestTimestamp() > now - minCompactionLagMs;
LOG.debug("Checking if log segment may be cleaned: log='{}' segment.baseOffset={} " +
"segment.largestTimestamp={}; now - compactionLag={}; is uncleanable={}",
log.name(), segment.baseOffset(), segment.largestTimestamp(), now - minCompactionLagMs, isUncleanable);
if (isUncleanable) {
return Optional.of(segment.baseOffset());
}
}
return Optional.empty();
}
/**
* Helper class for the range of cleanable dirty offsets of a log and whether to update the checkpoint associated with
* the log.
*
* @param firstDirtyOffset the lower (inclusive) offset to begin cleaning from
* @param firstUncleanableDirtyOffset the upper(exclusive) offset to clean to
* @param forceUpdateCheckpoint whether to update the checkpoint associated with this log. if true, checkpoint should be
* reset to firstDirtyOffset
*/
public record OffsetsToClean(long firstDirtyOffset, long firstUncleanableDirtyOffset,
boolean forceUpdateCheckpoint) {
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.KafkaException;
/**
* An exception indicating a failure during log cleaning operations.
* This exception typically wraps the root cause of the cleaning failure and provides
* additional context about the partition and log being cleaned.
*/
public class LogCleaningException extends KafkaException {
public final UnifiedLog log;
public LogCleaningException(UnifiedLog log, String message, Throwable cause) {
super(message, cause);
this.log = log;
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.log;
import java.util.Objects;
/**
* LogCleaningState defines the cleaning states that a TopicPartition can be in.
*/
public sealed interface LogCleaningState {
LogCleaningInProgress LOG_CLEANING_IN_PROGRESS = new LogCleaningInProgress();
LogCleaningAborted LOG_CLEANING_ABORTED = new LogCleaningAborted();
static LogCleaningPaused logCleaningPaused(int pausedCount) {
return new LogCleaningPaused(pausedCount);
}
final class LogCleaningInProgress implements LogCleaningState {
private LogCleaningInProgress() {}
}
final class LogCleaningAborted implements LogCleaningState {
private LogCleaningAborted() {}
}
final class LogCleaningPaused implements LogCleaningState {
private final int pausedCount;
private LogCleaningPaused(int pausedCount) {
this.pausedCount = pausedCount;
}
public int pausedCount() {
return pausedCount;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LogCleaningPaused that = (LogCleaningPaused) o;
return pausedCount == that.pausedCount;
}
@Override
public int hashCode() {
return Objects.hashCode(pausedCount);
}
@Override
public String toString() {
return "LogCleaningPaused{" +
"pausedCount=" + pausedCount +
'}';
}
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
import java.util.Objects;
/**
* Helper class for a log, its topic/partition, the first cleanable position, the first uncleanable dirty position,
* and whether it needs compaction immediately.
*/
public final class LogToClean implements Comparable<LogToClean> {
private final TopicPartition topicPartition;
private final UnifiedLog log;
private final long firstDirtyOffset;
private final boolean needCompactionNow;
private final long cleanBytes;
private final long firstUncleanableOffset;
private final long cleanableBytes;
private final long totalBytes;
private final double cleanableRatio;
public LogToClean(UnifiedLog log, long firstDirtyOffset, long uncleanableOffset, boolean needCompactionNow) {
this.log = log;
this.topicPartition = log.topicPartition();
this.firstDirtyOffset = firstDirtyOffset;
this.needCompactionNow = needCompactionNow;
this.cleanBytes = log.logSegments(-1, firstDirtyOffset).stream()
.mapToLong(LogSegment::size)
.sum();
Map.Entry<Long, Long> cleanableBytesResult = LogCleanerManager.calculateCleanableBytes(log, firstDirtyOffset, uncleanableOffset);
this.firstUncleanableOffset = cleanableBytesResult.getKey();
this.cleanableBytes = cleanableBytesResult.getValue();
this.totalBytes = cleanBytes + cleanableBytes;
this.cleanableRatio = (double) cleanableBytes / totalBytes;
}
public TopicPartition topicPartition() {
return topicPartition;
}
public UnifiedLog log() {
return log;
}
public long firstDirtyOffset() {
return firstDirtyOffset;
}
boolean needCompactionNow() {
return needCompactionNow;
}
public long cleanBytes() {
return cleanBytes;
}
public long firstUncleanableOffset() {
return firstUncleanableOffset;
}
public long cleanableBytes() {
return cleanableBytes;
}
public long totalBytes() {
return totalBytes;
}
public double cleanableRatio() {
return cleanableRatio;
}
@Override
public int compareTo(LogToClean that) {
return Double.compare(this.cleanableRatio, that.cleanableRatio);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LogToClean that = (LogToClean) o;
return firstDirtyOffset == that.firstDirtyOffset &&
needCompactionNow == that.needCompactionNow &&
cleanBytes == that.cleanBytes &&
firstUncleanableOffset == that.firstUncleanableOffset &&
cleanableBytes == that.cleanableBytes &&
totalBytes == that.totalBytes &&
Double.compare(that.cleanableRatio, cleanableRatio) == 0 &&
topicPartition.equals(that.topicPartition) &&
log.equals(that.log);
}
@Override
public int hashCode() {
return Objects.hash(
topicPartition, log, firstDirtyOffset, needCompactionNow, cleanBytes,
firstUncleanableOffset, cleanableBytes, totalBytes, cleanableRatio
);
}
@Override
public String toString() {
return "LogToClean{" +
"topicPartition=" + topicPartition +
", log=" + log +
", firstDirtyOffset=" + firstDirtyOffset +
", needCompactionNow=" + needCompactionNow +
", cleanBytes=" + cleanBytes +
", firstUncleanableOffset=" + firstUncleanableOffset +
", cleanableBytes=" + cleanableBytes +
", totalBytes=" + totalBytes +
", cleanableRatio=" + cleanableRatio +
'}';
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.log;
/**
* A simple struct for collecting pre-clean stats.
*/
public class PreCleanStats {
private long maxCompactionDelayMs = 0L;
private int delayedPartitions = 0;
private int cleanablePartitions = 0;
public void updateMaxCompactionDelay(long delayMs) {
maxCompactionDelayMs = Math.max(maxCompactionDelayMs, delayMs);
if (delayMs > 0) {
delayedPartitions++;
}
}
public void recordCleanablePartitions(int numOfCleanables) {
cleanablePartitions = numOfCleanables;
}
public int cleanablePartitions() {
return cleanablePartitions;
}
public int delayedPartitions() {
return delayedPartitions;
}
public long maxCompactionDelayMs() {
return maxCompactionDelayMs;
}
// for testing
public void maxCompactionDelayMs(long maxCompactionDelayMs) {
this.maxCompactionDelayMs = maxCompactionDelayMs;
}
}