KAFKA-19732, KAFKA-19716: Clear out coordinator snapshots periodically while loading (#20590)
CI / build (push) Has been cancelled Details

When nested Timeline collections are created and discarded while loading
a coordinator partition, references to them accumulate in the current
snapshot. Allow the GC to reclaim them by starting a new snapshot and
discarding previous snapshots every 16,384 records.
    
Small intervals degrade loading times for non-transactional offset
commit workloads while large intervals degrade loading times for
transactional workloads. A default of 16,384 was chosen as a compromise.
    
Cherry pick of d067c6c040.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Sean Quah 2025-09-26 10:01:05 +01:00 committed by GitHub
parent 7ba7f5eb05
commit 8248d1d2bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 130 additions and 21 deletions

View File

@ -34,20 +34,38 @@ import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicBoolean
import scala.jdk.CollectionConverters._
object CoordinatorLoaderImpl {
/**
* The interval between updating the last committed offset during loading, in offsets. Smaller
* values commit more often at the expense of loading times when the workload is simple and does
* not create collections that need to participate in {@link CoordinatorPlayback} snapshotting.
* Larger values commit less often and allow more temporary data to accumulate before the next
* commit when the workload creates many temporary collections that need to be snapshotted.
*
* The value of 16,384 was chosen as a trade-off between the performance of these two workloads.
*
* When changing this value, please run the GroupCoordinatorShardLoadingBenchmark to evaluate
* the relative change in performance.
*/
val DEFAULT_COMMIT_INTERVAL_OFFSETS = 16384L
}
/**
* Coordinator loader which reads records from a partition and replays them
* to a group coordinator.
*
* @param replicaManager The replica manager.
* @param deserializer The deserializer to use.
* @param loadBufferSize The load buffer size.
* @param replicaManager The replica manager.
* @param deserializer The deserializer to use.
* @param loadBufferSize The load buffer size.
* @param commitIntervalOffsets The interval between updating the last committed offset during loading, in offsets.
* @tparam T The record type.
*/
class CoordinatorLoaderImpl[T](
time: Time,
replicaManager: ReplicaManager,
deserializer: Deserializer[T],
loadBufferSize: Int
loadBufferSize: Int,
commitIntervalOffsets: Long = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
) extends CoordinatorLoader[T] with Logging {
private val isRunning = new AtomicBoolean(true)
private val scheduler = new KafkaScheduler(1)
@ -99,7 +117,7 @@ class CoordinatorLoaderImpl[T](
// the log end offset but the log is empty. This could happen with compacted topics.
var readAtLeastOneRecord = true
var previousHighWatermark = -1L
var lastCommittedOffset = -1L
var numRecords = 0L
var numBytes = 0L
while (currentOffset < logEndOffset && readAtLeastOneRecord && isRunning.get) {
@ -213,10 +231,14 @@ class CoordinatorLoaderImpl[T](
if (currentOffset >= currentHighWatermark) {
coordinator.updateLastWrittenOffset(currentOffset)
if (currentHighWatermark > previousHighWatermark) {
if (currentHighWatermark > lastCommittedOffset) {
coordinator.updateLastCommittedOffset(currentHighWatermark)
previousHighWatermark = currentHighWatermark
lastCommittedOffset = currentHighWatermark
}
} else if (currentOffset - lastCommittedOffset >= commitIntervalOffsets) {
coordinator.updateLastWrittenOffset(currentOffset)
coordinator.updateLastCommittedOffset(currentOffset)
lastCommittedOffset = currentOffset
}
}
numBytes = numBytes + memoryRecords.sizeInBytes()

View File

@ -634,7 +634,8 @@ class BrokerServer(
time,
replicaManager,
serde,
config.groupCoordinatorConfig.offsetsLoadBufferSize
config.groupCoordinatorConfig.offsetsLoadBufferSize,
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)
val writer = new CoordinatorPartitionWriter(
replicaManager
@ -673,7 +674,8 @@ class BrokerServer(
time,
replicaManager,
serde,
config.shareCoordinatorConfig.shareCoordinatorLoadBufferSize()
config.shareCoordinatorConfig.shareCoordinatorLoadBufferSize(),
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)
val writer = new CoordinatorPartitionWriter(
replicaManager

View File

@ -63,7 +63,8 @@ class CoordinatorLoaderImplTest {
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
loadBufferSize = 1000,
commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(None)
@ -83,7 +84,8 @@ class CoordinatorLoaderImplTest {
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
loadBufferSize = 1000,
commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) { loader =>
loader.close()
@ -104,7 +106,8 @@ class CoordinatorLoaderImplTest {
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
loadBufferSize = 1000,
commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
@ -207,7 +210,8 @@ class CoordinatorLoaderImplTest {
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
loadBufferSize = 1000,
commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
@ -250,7 +254,8 @@ class CoordinatorLoaderImplTest {
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
loadBufferSize = 1000,
commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
@ -290,7 +295,8 @@ class CoordinatorLoaderImplTest {
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
loadBufferSize = 1000,
commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
@ -332,7 +338,8 @@ class CoordinatorLoaderImplTest {
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
loadBufferSize = 1000,
commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
@ -364,7 +371,8 @@ class CoordinatorLoaderImplTest {
time,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
loadBufferSize = 1000,
commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) { loader =>
val startTimeMs = time.milliseconds()
when(replicaManager.getLog(tp)).thenReturn(Some(log))
@ -419,7 +427,8 @@ class CoordinatorLoaderImplTest {
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
loadBufferSize = 1000,
commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
@ -494,7 +503,8 @@ class CoordinatorLoaderImplTest {
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
loadBufferSize = 1000,
commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
@ -520,7 +530,8 @@ class CoordinatorLoaderImplTest {
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
loadBufferSize = 1000,
commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
@ -596,7 +607,8 @@ class CoordinatorLoaderImplTest {
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
loadBufferSize = 1000,
commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
@ -632,6 +644,79 @@ class CoordinatorLoaderImplTest {
}
}
@Test
def testUpdateLastWrittenOffsetCommitInterval(): Unit = {
val tp = new TopicPartition("foo", 0)
val replicaManager = mock(classOf[ReplicaManager])
val serde = new StringKeyValueDeserializer
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000,
commitIntervalOffsets = 2L
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
when(log.highWatermark).thenReturn(7L)
when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L))
val readResult1 = logReadResult(startOffset = 0, records = Seq(
new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes)
))
when(log.read(0L, 1000, FetchIsolation.LOG_END, true
)).thenReturn(readResult1)
val readResult2 = logReadResult(startOffset = 2, records = Seq(
new SimpleRecord("k3".getBytes, "v3".getBytes),
new SimpleRecord("k4".getBytes, "v4".getBytes),
new SimpleRecord("k5".getBytes, "v5".getBytes)
))
when(log.read(2L, 1000, FetchIsolation.LOG_END, true
)).thenReturn(readResult2)
val readResult3 = logReadResult(startOffset = 5, records = Seq(
new SimpleRecord("k6".getBytes, "v6".getBytes)
))
when(log.read(5L, 1000, FetchIsolation.LOG_END, true
)).thenReturn(readResult3)
val readResult4 = logReadResult(startOffset = 6, records = Seq(
new SimpleRecord("k7".getBytes, "v7".getBytes)
))
when(log.read(6L, 1000, FetchIsolation.LOG_END, true
)).thenReturn(readResult4)
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6"))
verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7"))
verify(coordinator, times(0)).updateLastWrittenOffset(0L)
verify(coordinator, times(1)).updateLastWrittenOffset(2L)
verify(coordinator, times(1)).updateLastWrittenOffset(5L)
verify(coordinator, times(0)).updateLastWrittenOffset(6L)
verify(coordinator, times(1)).updateLastWrittenOffset(7L)
verify(coordinator, times(0)).updateLastCommittedOffset(0L)
verify(coordinator, times(1)).updateLastCommittedOffset(2L)
verify(coordinator, times(1)).updateLastCommittedOffset(5L)
verify(coordinator, times(0)).updateLastCommittedOffset(6L)
verify(coordinator, times(1)).updateLastCommittedOffset(7L)
}
}
private def logReadResult(
startOffset: Long,
producerId: Long = RecordBatch.NO_PRODUCER_ID,