mirror of https://github.com/apache/kafka.git
MINOR: reduce allocations in log start and recovery checkpoints (#8467)
For brokers with replica counts > 4000, allocations from logsByDir become substantial. logsByDir is called often by LogManager.checkpointLogRecoveryOffsets and LogManager.checkpointLogStartOffsets. The approach used is similar to the one from the checkpointHighwatermarks change in https://github.com/apache/kafka/pull/6741. Are there better ways to structure out data structure to avoid creating logsByDir on demand for each checkpoint iteration? This micro-optimization will help as is, but if we can avoid doing this completely it'd be better. JMH benchmark results: ``` Before: Benchmark (numPartitions) (numTopics) Mode Cnt Score Error Units CheckpointBench.measureCheckpointLogStartOffsets 3 100 thrpt 15 2.233 ± 0.013 ops/ms CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate 3 100 thrpt 15 477.097 ± 49.731 MB/sec CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm 3 100 thrpt 15 246083.007 ± 33.052 B/op CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space 3 100 thrpt 15 475.683 ± 55.569 MB/sec CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space.norm 3 100 thrpt 15 245474.040 ± 14968.328 B/op CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen 3 100 thrpt 15 0.001 ± 0.001 MB/sec CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen.norm 3 100 thrpt 15 0.341 ± 0.268 B/op CheckpointBench.measureCheckpointLogStartOffsets:·gc.count 3 100 thrpt 15 129.000 counts CheckpointBench.measureCheckpointLogStartOffsets:·gc.time 3 100 thrpt 15 52.000 ms CheckpointBench.measureCheckpointLogStartOffsets 3 1000 thrpt 15 0.572 ± 0.004 ops/ms CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate 3 1000 thrpt 15 1360.240 ± 150.539 MB/sec CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm 3 1000 thrpt 15 2750221.257 ± 891.024 B/op CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space 3 1000 thrpt 15 1362.908 ± 148.799 MB/sec CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space.norm 3 1000 thrpt 15 2756395.092 ± 44671.843 B/op CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen 3 1000 thrpt 15 0.017 ± 0.008 MB/sec CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen.norm 3 1000 thrpt 15 33.611 ± 14.401 B/op CheckpointBench.measureCheckpointLogStartOffsets:·gc.count 3 1000 thrpt 15 273.000 counts CheckpointBench.measureCheckpointLogStartOffsets:·gc.time 3 1000 thrpt 15 186.000 ms CheckpointBench.measureCheckpointLogStartOffsets 3 2000 thrpt 15 0.266 ± 0.002 ops/ms CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate 3 2000 thrpt 15 1342.557 ± 171.260 MB/sec CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm 3 2000 thrpt 15 5877881.729 ± 3695.086 B/op CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space 3 2000 thrpt 15 1343.965 ± 186.069 MB/sec CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space.norm 3 2000 thrpt 15 5877788.561 ± 168540.343 B/op CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen 3 2000 thrpt 15 0.081 ± 0.043 MB/sec CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen.norm 3 2000 thrpt 15 351.277 ± 167.006 B/op CheckpointBench.measureCheckpointLogStartOffsets:·gc.count 3 2000 thrpt 15 253.000 counts CheckpointBench.measureCheckpointLogStartOffsets:·gc.time 3 2000 thrpt 15 231.000 ms JMH benchmarks done After: CheckpointBench.measureCheckpointLogStartOffsets 3 100 thrpt 15 2.809 ± 0.129 ops/ms CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate 3 100 thrpt 15 211.248 ± 25.953 MB/sec CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm 3 100 thrpt 15 86533.838 ± 3763.989 B/op CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space 3 100 thrpt 15 211.512 ± 38.669 MB/sec CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space.norm 3 100 thrpt 15 86228.552 ± 9590.781 B/op CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen 3 100 thrpt 15 ≈ 10⁻³ MB/sec CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen.norm 3 100 thrpt 15 0.140 ± 0.111 B/op CheckpointBench.measureCheckpointLogStartOffsets:·gc.count 3 100 thrpt 15 57.000 counts CheckpointBench.measureCheckpointLogStartOffsets:·gc.time 3 100 thrpt 15 25.000 ms CheckpointBench.measureCheckpointLogStartOffsets 3 1000 thrpt 15 1.046 ± 0.030 ops/ms CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate 3 1000 thrpt 15 524.597 ± 74.793 MB/sec CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm 3 1000 thrpt 15 582898.889 ± 37552.262 B/op CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space 3 1000 thrpt 15 519.675 ± 89.754 MB/sec CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space.norm 3 1000 thrpt 15 576371.150 ± 55972.955 B/op CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen 3 1000 thrpt 15 0.009 ± 0.005 MB/sec CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen.norm 3 1000 thrpt 15 9.920 ± 5.375 B/op CheckpointBench.measureCheckpointLogStartOffsets:·gc.count 3 1000 thrpt 15 111.000 counts CheckpointBench.measureCheckpointLogStartOffsets:·gc.time 3 1000 thrpt 15 56.000 ms CheckpointBench.measureCheckpointLogStartOffsets 3 2000 thrpt 15 0.617 ± 0.007 ops/ms CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate 3 2000 thrpt 15 573.061 ± 95.931 MB/sec CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm 3 2000 thrpt 15 1092098.004 ± 75140.633 B/op CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space 3 2000 thrpt 15 572.448 ± 97.960 MB/sec CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space.norm 3 2000 thrpt 15 1091290.460 ± 85946.164 B/op CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen 3 2000 thrpt 15 0.010 ± 0.012 MB/sec CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen.norm 3 2000 thrpt 15 19.990 ± 24.407 B/op CheckpointBench.measureCheckpointLogStartOffsets:·gc.count 3 2000 thrpt 15 109.000 counts CheckpointBench.measureCheckpointLogStartOffsets:·gc.time 3 2000 thrpt 15 67.000 ms JMH benchmarks done ``` For the 2000 topic, 3 partition case, we see a reduction in normalized allocations from 5877881B/op to 1284190.774B/op, a reduction of 78%. Some allocation profiles from a mid sized broker follow. I have seen worse, but these add up to around 3.8% on a broker that saw GC overhead in CPU time of around 30%. You could argue that this is relatively small, but it seems worthwhile for a low risk change.   Reviewers: Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
parent
99b8b51f1e
commit
cfc34cace5
|
@ -1003,9 +1003,17 @@ class LogManager(logDirs: Seq[File],
|
|||
/**
|
||||
* Map of log dir to logs by topic and partitions in that dir
|
||||
*/
|
||||
private def logsByDir: Map[String, Map[TopicPartition, Log]] = {
|
||||
(this.currentLogs.toList ++ this.futureLogs.toList).toMap
|
||||
.groupBy { case (_, log) => log.parentDir }
|
||||
def logsByDir: Map[String, Map[TopicPartition, Log]] = {
|
||||
// This code is called often by checkpoint processes and is written in a way that reduces
|
||||
// allocations and CPU with many topic partitions.
|
||||
// When changing this code please measure the changes with org.apache.kafka.jmh.server.CheckpointBench
|
||||
val byDir = new mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Log]]()
|
||||
def addToDir(tp: TopicPartition, log: Log): Unit = {
|
||||
byDir.getOrElseUpdate(log.parentDir, new mutable.AnyRefMap[TopicPartition, Log]()).put(tp, log)
|
||||
}
|
||||
currentLogs.foreachEntry(addToDir)
|
||||
futureLogs.foreachEntry(addToDir)
|
||||
byDir
|
||||
}
|
||||
|
||||
// logDir should be an absolute path
|
||||
|
|
|
@ -74,6 +74,10 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] {
|
|||
def values: Iterable[V] = pool.values.asScala
|
||||
|
||||
def clear(): Unit = { pool.clear() }
|
||||
|
||||
def foreachEntry(f: (K, V) => Unit): Unit = {
|
||||
pool.forEach((k, v) => f(k, v))
|
||||
}
|
||||
|
||||
override def size: Int = pool.size
|
||||
|
||||
|
|
|
@ -66,8 +66,8 @@ import scala.Option;
|
|||
@Measurement(iterations = 5)
|
||||
@Fork(3)
|
||||
@OutputTimeUnit(TimeUnit.MILLISECONDS)
|
||||
@State(Scope.Benchmark)
|
||||
public class HighwatermarkCheckpointBench {
|
||||
@State(value = Scope.Benchmark)
|
||||
public class CheckpointBench {
|
||||
|
||||
@Param({"100", "1000", "2000"})
|
||||
public int numTopics;
|
||||
|
@ -172,4 +172,10 @@ public class HighwatermarkCheckpointBench {
|
|||
public void measureCheckpointHighWatermarks() {
|
||||
this.replicaManager.checkpointHighWatermarks();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@Threads(1)
|
||||
public void measureCheckpointLogStartOffsets() {
|
||||
this.logManager.checkpointLogStartOffsets();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue