mirror of https://github.com/apache/kafka.git
Backport of [KAFKA-18597](https://github.com/apache/kafka/pull/18627) to the 4.0 branch. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
46e843da9f
commit
15ec053665
|
@ -117,14 +117,14 @@ class LogCleaner(initialConfig: CleanerConfig,
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param f to compute the result
|
* @param f to compute the result
|
||||||
* @return the max value (int value) or 0 if there is no cleaner
|
* @return the max value or 0 if there is no cleaner
|
||||||
*/
|
*/
|
||||||
private[log] def maxOverCleanerThreads(f: CleanerThread => Double): Int =
|
private[log] def maxOverCleanerThreads(f: CleanerThread => Double): Double =
|
||||||
cleaners.map(f).maxOption.getOrElse(0.0d).toInt
|
cleaners.map(f).maxOption.getOrElse(0.0d)
|
||||||
|
|
||||||
/* a metric to track the maximum utilization of any thread's buffer in the last cleaning */
|
/* a metric to track the maximum utilization of any thread's buffer in the last cleaning */
|
||||||
metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,
|
metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,
|
||||||
() => maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100)
|
() => (maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100).toInt)
|
||||||
|
|
||||||
/* a metric to track the recopy rate of each thread's last cleaning */
|
/* a metric to track the recopy rate of each thread's last cleaning */
|
||||||
metricsGroup.newGauge(CleanerRecopyPercentMetricName, () => {
|
metricsGroup.newGauge(CleanerRecopyPercentMetricName, () => {
|
||||||
|
@ -134,12 +134,12 @@ class LogCleaner(initialConfig: CleanerConfig,
|
||||||
})
|
})
|
||||||
|
|
||||||
/* a metric to track the maximum cleaning time for the last cleaning from each thread */
|
/* a metric to track the maximum cleaning time for the last cleaning from each thread */
|
||||||
metricsGroup.newGauge(MaxCleanTimeMetricName, () => maxOverCleanerThreads(_.lastStats.elapsedSecs))
|
metricsGroup.newGauge(MaxCleanTimeMetricName, () => maxOverCleanerThreads(_.lastStats.elapsedSecs).toInt)
|
||||||
|
|
||||||
// a metric to track delay between the time when a log is required to be compacted
|
// a metric to track delay between the time when a log is required to be compacted
|
||||||
// as determined by max compaction lag and the time of last cleaner run.
|
// as determined by max compaction lag and the time of last cleaner run.
|
||||||
metricsGroup.newGauge(MaxCompactionDelayMetricsName,
|
metricsGroup.newGauge(MaxCompactionDelayMetricsName,
|
||||||
() => maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000)
|
() => (maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000).toInt)
|
||||||
|
|
||||||
metricsGroup.newGauge(DeadThreadCountMetricName, () => deadThreadCount)
|
metricsGroup.newGauge(DeadThreadCountMetricName, () => deadThreadCount)
|
||||||
|
|
||||||
|
@ -523,10 +523,11 @@ object LogCleaner {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private val MaxBufferUtilizationPercentMetricName = "max-buffer-utilization-percent"
|
// Visible for test.
|
||||||
|
private[log] val MaxBufferUtilizationPercentMetricName = "max-buffer-utilization-percent"
|
||||||
private val CleanerRecopyPercentMetricName = "cleaner-recopy-percent"
|
private val CleanerRecopyPercentMetricName = "cleaner-recopy-percent"
|
||||||
private val MaxCleanTimeMetricName = "max-clean-time-secs"
|
private[log] val MaxCleanTimeMetricName = "max-clean-time-secs"
|
||||||
private val MaxCompactionDelayMetricsName = "max-compaction-delay-secs"
|
private[log] val MaxCompactionDelayMetricsName = "max-compaction-delay-secs"
|
||||||
private val DeadThreadCountMetricName = "DeadThreadCount"
|
private val DeadThreadCountMetricName = "DeadThreadCount"
|
||||||
// package private for testing
|
// package private for testing
|
||||||
private[log] val MetricNames = Set(
|
private[log] val MetricNames = Set(
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package kafka.log
|
package kafka.log
|
||||||
|
|
||||||
import kafka.common._
|
import kafka.common._
|
||||||
|
import kafka.log.LogCleaner.{MaxBufferUtilizationPercentMetricName, MaxCleanTimeMetricName, MaxCompactionDelayMetricsName}
|
||||||
import kafka.server.KafkaConfig
|
import kafka.server.KafkaConfig
|
||||||
import kafka.utils.{CoreUtils, Logging, Pool, TestUtils}
|
import kafka.utils.{CoreUtils, Logging, Pool, TestUtils}
|
||||||
import org.apache.kafka.common.TopicPartition
|
import org.apache.kafka.common.TopicPartition
|
||||||
|
@ -2048,12 +2049,24 @@ class LogCleanerTest extends Logging {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testMaxOverCleanerThreads(): Unit = {
|
def testMaxBufferUtilizationPercentMetric(): Unit = {
|
||||||
val logCleaner = new LogCleaner(new CleanerConfig(true),
|
val logCleaner = new LogCleaner(
|
||||||
|
new CleanerConfig(true),
|
||||||
logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
|
logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
|
||||||
logs = new Pool[TopicPartition, UnifiedLog](),
|
logs = new Pool[TopicPartition, UnifiedLog](),
|
||||||
logDirFailureChannel = new LogDirFailureChannel(1),
|
logDirFailureChannel = new LogDirFailureChannel(1),
|
||||||
time = time)
|
time = time
|
||||||
|
)
|
||||||
|
|
||||||
|
def assertMaxBufferUtilizationPercent(expected: Int): Unit = {
|
||||||
|
val gauge = logCleaner.metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,
|
||||||
|
() => (logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100).toInt)
|
||||||
|
assertEquals(expected, gauge.value())
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// No CleanerThreads
|
||||||
|
assertMaxBufferUtilizationPercent(0)
|
||||||
|
|
||||||
val cleaners = logCleaner.cleaners
|
val cleaners = logCleaner.cleaners
|
||||||
|
|
||||||
|
@ -2072,18 +2085,123 @@ class LogCleanerTest extends Logging {
|
||||||
cleaner3.lastStats.bufferUtilization = 0.65
|
cleaner3.lastStats.bufferUtilization = 0.65
|
||||||
cleaners += cleaner3
|
cleaners += cleaner3
|
||||||
|
|
||||||
assertEquals(0, logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization))
|
// expect the gauge value to reflect the maximum bufferUtilization
|
||||||
|
assertMaxBufferUtilizationPercent(85)
|
||||||
|
|
||||||
cleaners.clear()
|
// Update bufferUtilization and verify the gauge value updates
|
||||||
|
cleaner1.lastStats.bufferUtilization = 0.9
|
||||||
|
assertMaxBufferUtilizationPercent(90)
|
||||||
|
|
||||||
cleaner1.lastStats.bufferUtilization = 5d
|
// All CleanerThreads have the same bufferUtilization
|
||||||
|
cleaners.foreach(_.lastStats.bufferUtilization = 0.5)
|
||||||
|
assertMaxBufferUtilizationPercent(50)
|
||||||
|
} finally {
|
||||||
|
logCleaner.shutdown()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testMaxCleanTimeMetric(): Unit = {
|
||||||
|
val logCleaner = new LogCleaner(
|
||||||
|
new CleanerConfig(true),
|
||||||
|
logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
|
||||||
|
logs = new Pool[TopicPartition, UnifiedLog](),
|
||||||
|
logDirFailureChannel = new LogDirFailureChannel(1),
|
||||||
|
time = time
|
||||||
|
)
|
||||||
|
|
||||||
|
def assertMaxCleanTime(expected: Int): Unit = {
|
||||||
|
val gauge = logCleaner.metricsGroup.newGauge(MaxCleanTimeMetricName,
|
||||||
|
() => logCleaner.maxOverCleanerThreads(_.lastStats.elapsedSecs).toInt)
|
||||||
|
assertEquals(expected, gauge.value())
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// No CleanerThreads
|
||||||
|
assertMaxCleanTime(0)
|
||||||
|
|
||||||
|
val cleaners = logCleaner.cleaners
|
||||||
|
|
||||||
|
val cleaner1 = new logCleaner.CleanerThread(1)
|
||||||
|
cleaner1.lastStats = new CleanerStats(time)
|
||||||
|
cleaner1.lastStats.endTime = cleaner1.lastStats.startTime + 1_000L
|
||||||
cleaners += cleaner1
|
cleaners += cleaner1
|
||||||
cleaner2.lastStats.bufferUtilization = 6d
|
|
||||||
|
val cleaner2 = new logCleaner.CleanerThread(2)
|
||||||
|
cleaner2.lastStats = new CleanerStats(time)
|
||||||
|
cleaner2.lastStats.endTime = cleaner2.lastStats.startTime + 2_000L
|
||||||
cleaners += cleaner2
|
cleaners += cleaner2
|
||||||
cleaner3.lastStats.bufferUtilization = 7d
|
|
||||||
|
val cleaner3 = new logCleaner.CleanerThread(3)
|
||||||
|
cleaner3.lastStats = new CleanerStats(time)
|
||||||
|
cleaner3.lastStats.endTime = cleaner3.lastStats.startTime + 3_000L
|
||||||
cleaners += cleaner3
|
cleaners += cleaner3
|
||||||
|
|
||||||
assertEquals(7, logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization))
|
// expect the gauge value to reflect the maximum cleanTime
|
||||||
|
assertMaxCleanTime(3)
|
||||||
|
|
||||||
|
// Update cleanTime and verify the gauge value updates
|
||||||
|
cleaner1.lastStats.endTime = cleaner1.lastStats.startTime + 4_000L
|
||||||
|
assertMaxCleanTime(4)
|
||||||
|
|
||||||
|
// All CleanerThreads have the same cleanTime
|
||||||
|
cleaners.foreach(cleaner => cleaner.lastStats.endTime = cleaner.lastStats.startTime + 1_500L)
|
||||||
|
assertMaxCleanTime(1)
|
||||||
|
} finally {
|
||||||
|
logCleaner.shutdown()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testMaxCompactionDelayMetrics(): Unit = {
|
||||||
|
val logCleaner = new LogCleaner(
|
||||||
|
new CleanerConfig(true),
|
||||||
|
logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
|
||||||
|
logs = new Pool[TopicPartition, UnifiedLog](),
|
||||||
|
logDirFailureChannel = new LogDirFailureChannel(1),
|
||||||
|
time = time
|
||||||
|
)
|
||||||
|
|
||||||
|
def assertMaxCompactionDelay(expected: Int): Unit = {
|
||||||
|
val gauge = logCleaner.metricsGroup.newGauge(MaxCompactionDelayMetricsName,
|
||||||
|
() => (logCleaner.maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000).toInt)
|
||||||
|
assertEquals(expected, gauge.value())
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// No CleanerThreads
|
||||||
|
assertMaxCompactionDelay(0)
|
||||||
|
|
||||||
|
val cleaners = logCleaner.cleaners
|
||||||
|
|
||||||
|
val cleaner1 = new logCleaner.CleanerThread(1)
|
||||||
|
cleaner1.lastStats = new CleanerStats(time)
|
||||||
|
cleaner1.lastPreCleanStats.maxCompactionDelayMs = 1_000L
|
||||||
|
cleaners += cleaner1
|
||||||
|
|
||||||
|
val cleaner2 = new logCleaner.CleanerThread(2)
|
||||||
|
cleaner2.lastStats = new CleanerStats(time)
|
||||||
|
cleaner2.lastPreCleanStats.maxCompactionDelayMs = 2_000L
|
||||||
|
cleaners += cleaner2
|
||||||
|
|
||||||
|
val cleaner3 = new logCleaner.CleanerThread(3)
|
||||||
|
cleaner3.lastStats = new CleanerStats(time)
|
||||||
|
cleaner3.lastPreCleanStats.maxCompactionDelayMs = 3_000L
|
||||||
|
cleaners += cleaner3
|
||||||
|
|
||||||
|
// expect the gauge value to reflect the maximum CompactionDelay
|
||||||
|
assertMaxCompactionDelay(3)
|
||||||
|
|
||||||
|
// Update CompactionDelay and verify the gauge value updates
|
||||||
|
cleaner1.lastPreCleanStats.maxCompactionDelayMs = 4_000L
|
||||||
|
assertMaxCompactionDelay(4)
|
||||||
|
|
||||||
|
// All CleanerThreads have the same CompactionDelay
|
||||||
|
cleaners.foreach(_.lastPreCleanStats.maxCompactionDelayMs = 1_500L)
|
||||||
|
assertMaxCompactionDelay(1)
|
||||||
|
} finally {
|
||||||
|
logCleaner.shutdown()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def writeToLog(log: UnifiedLog, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
|
private def writeToLog(log: UnifiedLog, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
|
||||||
|
|
Loading…
Reference in New Issue