KAFKA-15129: [1/N] Remove metrics in LogCleanerManager when LogCleaner shutdown (#13924)

Reviewers: Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>

---------

Co-authored-by: Deqi Hu <deqi.hu@shopee.com>
This commit is contained in:
hudeqi 2023-07-03 22:14:30 +08:00 committed by GitHub
parent 0ae1d22879
commit 48eb8c90ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 52 additions and 8 deletions

View File

@ -179,6 +179,7 @@ class LogCleaner(initialConfig: CleanerConfig,
*/ */
def removeMetrics(): Unit = { def removeMetrics(): Unit = {
LogCleaner.MetricNames.foreach(metricsGroup.removeMetric) LogCleaner.MetricNames.foreach(metricsGroup.removeMetric)
cleanerManager.removeMetrics()
} }
/** /**

View File

@ -88,17 +88,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
/* for coordinating the pausing and the cleaning of a partition */ /* for coordinating the pausing and the cleaning of a partition */
private val pausedCleaningCond = lock.newCondition() private val pausedCleaningCond = lock.newCondition()
// Avoid adding legacy tags for a metric when initializing `LogCleanerManager`
GaugeMetricNameWithTag.clear()
/* gauges for tracking the number of partitions marked as uncleanable for each log directory */ /* gauges for tracking the number of partitions marked as uncleanable for each log directory */
for (dir <- logDirs) { for (dir <- logDirs) {
metricsGroup.newGauge("uncleanable-partitions-count", val metricTag = Map("logDirectory" -> dir.getAbsolutePath).asJava
metricsGroup.newGauge(UncleanablePartitionsCountMetricName,
() => inLock(lock) { uncleanablePartitions.get(dir.getAbsolutePath).map(_.size).getOrElse(0) }, () => inLock(lock) { uncleanablePartitions.get(dir.getAbsolutePath).map(_.size).getOrElse(0) },
Map("logDirectory" -> dir.getAbsolutePath).asJava metricTag
) )
GaugeMetricNameWithTag.computeIfAbsent(UncleanablePartitionsCountMetricName, k => new java.util.ArrayList[java.util.Map[String, String]]())
.add(metricTag)
} }
/* gauges for tracking the number of uncleanable bytes from uncleanable partitions for each log directory */ /* gauges for tracking the number of uncleanable bytes from uncleanable partitions for each log directory */
for (dir <- logDirs) { for (dir <- logDirs) {
metricsGroup.newGauge("uncleanable-bytes", val metricTag = Map("logDirectory" -> dir.getAbsolutePath).asJava
metricsGroup.newGauge(UncleanableBytesMetricName,
() => inLock(lock) { () => inLock(lock) {
uncleanablePartitions.get(dir.getAbsolutePath) match { uncleanablePartitions.get(dir.getAbsolutePath) match {
case Some(partitions) => case Some(partitions) =>
@ -116,17 +122,19 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
case None => 0 case None => 0
} }
}, },
Map("logDirectory" -> dir.getAbsolutePath).asJava metricTag
) )
GaugeMetricNameWithTag.computeIfAbsent(UncleanableBytesMetricName, k => new java.util.ArrayList[java.util.Map[String, String]]())
.add(metricTag)
} }
/* a gauge for tracking the cleanable ratio of the dirtiest log */ /* a gauge for tracking the cleanable ratio of the dirtiest log */
@volatile private var dirtiestLogCleanableRatio = 0.0 @volatile private var dirtiestLogCleanableRatio = 0.0
metricsGroup.newGauge("max-dirty-percent", () => (100 * dirtiestLogCleanableRatio).toInt) metricsGroup.newGauge(MaxDirtyPercentMetricName, () => (100 * dirtiestLogCleanableRatio).toInt)
/* a gauge for tracking the time since the last log cleaner run, in milli seconds */ /* a gauge for tracking the time since the last log cleaner run, in milli seconds */
@volatile private var timeOfLastRun: Long = Time.SYSTEM.milliseconds @volatile private var timeOfLastRun: Long = Time.SYSTEM.milliseconds
metricsGroup.newGauge("time-since-last-run-ms", () => Time.SYSTEM.milliseconds - timeOfLastRun) metricsGroup.newGauge(TimeSinceLastRunMsMetricName, () => Time.SYSTEM.milliseconds - timeOfLastRun)
/** /**
* @return the position processed for all logs. * @return the position processed for all logs.
@ -538,6 +546,13 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
logDirsToRemove.foreach { uncleanablePartitions.remove } logDirsToRemove.foreach { uncleanablePartitions.remove }
} }
} }
def removeMetrics(): Unit = {
GaugeMetricNameNoTag.foreach(metricsGroup.removeMetric)
GaugeMetricNameWithTag.asScala.foreach(metricNameAndTags => {
metricNameAndTags._2.asScala.foreach(tag => metricsGroup.removeMetric(metricNameAndTags._1, tag))
})
}
} }
/** /**
@ -555,6 +570,17 @@ private case class OffsetsToClean(firstDirtyOffset: Long,
} }
private[log] object LogCleanerManager extends Logging { private[log] object LogCleanerManager extends Logging {
private val UncleanablePartitionsCountMetricName = "uncleanable-partitions-count"
private val UncleanableBytesMetricName = "uncleanable-bytes"
private val MaxDirtyPercentMetricName = "max-dirty-percent"
private val TimeSinceLastRunMsMetricName = "time-since-last-run-ms"
private[log] val GaugeMetricNameWithTag = new java.util.HashMap[String, java.util.List[java.util.Map[String, String]]]()
private[log] val GaugeMetricNameNoTag = Set(
MaxDirtyPercentMetricName,
TimeSinceLastRunMsMetricName
)
def isCompactAndDelete(log: UnifiedLog): Boolean = { def isCompactAndDelete(log: UnifiedLog): Boolean = {
log.config.compact && log.config.delete log.config.compact && log.config.delete

View File

@ -30,6 +30,7 @@ import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig} import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig}
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test} import org.junit.jupiter.api.{AfterEach, Test}
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.{any, anyString} import org.mockito.ArgumentMatchers.{any, anyString}
import org.mockito.Mockito.{mockConstruction, times, verify, verifyNoMoreInteractions} import org.mockito.Mockito.{mockConstruction, times, verify, verifyNoMoreInteractions}
@ -71,7 +72,7 @@ class LogCleanerTest {
val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup]) val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
try { try {
val logCleaner = new LogCleaner(new CleanerConfig(true), val logCleaner = new LogCleaner(new CleanerConfig(true),
logDirs = Array(TestUtils.tempDir()), logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](), logs = new Pool[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1), logDirFailureChannel = new LogDirFailureChannel(1),
time = time) time = time)
@ -83,11 +84,27 @@ class LogCleanerTest {
val numMetricsRegistered = LogCleaner.MetricNames.size val numMetricsRegistered = LogCleaner.MetricNames.size
verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any()) verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any())
// verify that each metric is removed // verify that each metric in `LogCleaner` is removed
LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_)) LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))
// verify that each metric in `LogCleanerManager` is removed
val mockLogCleanerManagerMetricsGroup = mockMetricsGroupCtor.constructed.get(1)
LogCleanerManager.GaugeMetricNameNoTag.foreach(metricName => verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any()))
LogCleanerManager.GaugeMetricNameWithTag.asScala.foreach(metricNameAndTags => {
metricNameAndTags._2.asScala.foreach(tags => {
verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricNameAndTags._1), any(), ArgumentMatchers.eq(tags))
})
})
LogCleanerManager.GaugeMetricNameNoTag.foreach(verify(mockLogCleanerManagerMetricsGroup).removeMetric(_))
LogCleanerManager.GaugeMetricNameWithTag.asScala.foreach(metricNameAndTags => {
metricNameAndTags._2.asScala.foreach(tags => {
verify(mockLogCleanerManagerMetricsGroup).removeMetric(ArgumentMatchers.eq(metricNameAndTags._1), ArgumentMatchers.eq(tags))
})
})
// assert that we have verified all invocations on // assert that we have verified all invocations on
verifyNoMoreInteractions(mockMetricsGroup) verifyNoMoreInteractions(mockMetricsGroup)
verifyNoMoreInteractions(mockLogCleanerManagerMetricsGroup)
} finally { } finally {
mockMetricsGroupCtor.close() mockMetricsGroupCtor.close()
} }