mirror of https://github.com/apache/kafka.git
KAFKA-19732, KAFKA-19716: Clear out coordinator snapshots periodically while loading (#20591)
When nested Timeline collections are created and discarded while loading
a coordinator partition, references to them accumulate in the current
snapshot. Allow the GC to reclaim them by starting a new snapshot and
discarding previous snapshots every 16,384 records.
Small intervals degrade loading times for non-transactional offset
commit workloads while large intervals degrade loading times for
transactional workloads. A default of 16,384 was chosen as a compromise.
Cherry pick of d067c6c040
.
Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
parent
e76213e182
commit
b14efd8750
|
@ -34,20 +34,38 @@ import java.util.concurrent.CompletableFuture
|
|||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
object CoordinatorLoaderImpl {
|
||||
/**
|
||||
* The interval between updating the last committed offset during loading, in offsets. Smaller
|
||||
* values commit more often at the expense of loading times when the workload is simple and does
|
||||
* not create collections that need to participate in {@link CoordinatorPlayback} snapshotting.
|
||||
* Larger values commit less often and allow more temporary data to accumulate before the next
|
||||
* commit when the workload creates many temporary collections that need to be snapshotted.
|
||||
*
|
||||
* The value of 16,384 was chosen as a trade-off between the performance of these two workloads.
|
||||
*
|
||||
* When changing this value, please run the GroupCoordinatorShardLoadingBenchmark to evaluate
|
||||
* the relative change in performance.
|
||||
*/
|
||||
val DEFAULT_COMMIT_INTERVAL_OFFSETS = 16384L
|
||||
}
|
||||
|
||||
/**
|
||||
* Coordinator loader which reads records from a partition and replays them
|
||||
* to a group coordinator.
|
||||
*
|
||||
* @param replicaManager The replica manager.
|
||||
* @param deserializer The deserializer to use.
|
||||
* @param loadBufferSize The load buffer size.
|
||||
* @param replicaManager The replica manager.
|
||||
* @param deserializer The deserializer to use.
|
||||
* @param loadBufferSize The load buffer size.
|
||||
* @param commitIntervalOffsets The interval between updating the last committed offset during loading, in offsets.
|
||||
* @tparam T The record type.
|
||||
*/
|
||||
class CoordinatorLoaderImpl[T](
|
||||
time: Time,
|
||||
replicaManager: ReplicaManager,
|
||||
deserializer: Deserializer[T],
|
||||
loadBufferSize: Int
|
||||
loadBufferSize: Int,
|
||||
commitIntervalOffsets: Long = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
|
||||
) extends CoordinatorLoader[T] with Logging {
|
||||
private val isRunning = new AtomicBoolean(true)
|
||||
private val scheduler = new KafkaScheduler(1)
|
||||
|
@ -99,7 +117,7 @@ class CoordinatorLoaderImpl[T](
|
|||
// the log end offset but the log is empty. This could happen with compacted topics.
|
||||
var readAtLeastOneRecord = true
|
||||
|
||||
var previousHighWatermark = -1L
|
||||
var lastCommittedOffset = -1L
|
||||
var numRecords = 0L
|
||||
var numBytes = 0L
|
||||
while (currentOffset < logEndOffset && readAtLeastOneRecord && isRunning.get) {
|
||||
|
@ -208,10 +226,14 @@ class CoordinatorLoaderImpl[T](
|
|||
if (currentOffset >= currentHighWatermark) {
|
||||
coordinator.updateLastWrittenOffset(currentOffset)
|
||||
|
||||
if (currentHighWatermark > previousHighWatermark) {
|
||||
if (currentHighWatermark > lastCommittedOffset) {
|
||||
coordinator.updateLastCommittedOffset(currentHighWatermark)
|
||||
previousHighWatermark = currentHighWatermark
|
||||
lastCommittedOffset = currentHighWatermark
|
||||
}
|
||||
} else if (currentOffset - lastCommittedOffset >= commitIntervalOffsets) {
|
||||
coordinator.updateLastWrittenOffset(currentOffset)
|
||||
coordinator.updateLastCommittedOffset(currentOffset)
|
||||
lastCommittedOffset = currentOffset
|
||||
}
|
||||
}
|
||||
numBytes = numBytes + memoryRecords.sizeInBytes()
|
||||
|
|
|
@ -620,7 +620,8 @@ class BrokerServer(
|
|||
time,
|
||||
replicaManager,
|
||||
serde,
|
||||
config.groupCoordinatorConfig.offsetsLoadBufferSize
|
||||
config.groupCoordinatorConfig.offsetsLoadBufferSize,
|
||||
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
|
||||
)
|
||||
val writer = new CoordinatorPartitionWriter(
|
||||
replicaManager
|
||||
|
@ -650,7 +651,8 @@ class BrokerServer(
|
|||
time,
|
||||
replicaManager,
|
||||
serde,
|
||||
config.shareCoordinatorConfig.shareCoordinatorLoadBufferSize()
|
||||
config.shareCoordinatorConfig.shareCoordinatorLoadBufferSize(),
|
||||
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
|
||||
)
|
||||
val writer = new CoordinatorPartitionWriter(
|
||||
replicaManager
|
||||
|
|
|
@ -62,7 +62,8 @@ class CoordinatorLoaderImplTest {
|
|||
time = Time.SYSTEM,
|
||||
replicaManager = replicaManager,
|
||||
deserializer = serde,
|
||||
loadBufferSize = 1000
|
||||
loadBufferSize = 1000,
|
||||
commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
|
||||
)) { loader =>
|
||||
when(replicaManager.getLog(tp)).thenReturn(None)
|
||||
|
||||
|
@ -82,7 +83,8 @@ class CoordinatorLoaderImplTest {
|
|||
time = Time.SYSTEM,
|
||||
replicaManager = replicaManager,
|
||||
deserializer = serde,
|
||||
loadBufferSize = 1000
|
||||
loadBufferSize = 1000,
|
||||
commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
|
||||
)) { loader =>
|
||||
loader.close()
|
||||
|
||||
|
@ -103,7 +105,8 @@ class CoordinatorLoaderImplTest {
|
|||
time = Time.SYSTEM,
|
||||
replicaManager = replicaManager,
|
||||
deserializer = serde,
|
||||
loadBufferSize = 1000
|
||||
loadBufferSize = 1000,
|
||||
commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
|
||||
)) { loader =>
|
||||
when(replicaManager.getLog(tp)).thenReturn(Some(log))
|
||||
when(log.logStartOffset).thenReturn(0L)
|
||||
|
@ -186,7 +189,8 @@ class CoordinatorLoaderImplTest {
|
|||
time = Time.SYSTEM,
|
||||
replicaManager = replicaManager,
|
||||
deserializer = serde,
|
||||
loadBufferSize = 1000
|
||||
loadBufferSize = 1000,
|
||||
commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
|
||||
)) { loader =>
|
||||
when(replicaManager.getLog(tp)).thenReturn(Some(log))
|
||||
when(log.logStartOffset).thenReturn(0L)
|
||||
|
@ -229,7 +233,8 @@ class CoordinatorLoaderImplTest {
|
|||
time = Time.SYSTEM,
|
||||
replicaManager = replicaManager,
|
||||
deserializer = serde,
|
||||
loadBufferSize = 1000
|
||||
loadBufferSize = 1000,
|
||||
commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
|
||||
)) { loader =>
|
||||
when(replicaManager.getLog(tp)).thenReturn(Some(log))
|
||||
when(log.logStartOffset).thenReturn(0L)
|
||||
|
@ -265,7 +270,8 @@ class CoordinatorLoaderImplTest {
|
|||
time = Time.SYSTEM,
|
||||
replicaManager = replicaManager,
|
||||
deserializer = serde,
|
||||
loadBufferSize = 1000
|
||||
loadBufferSize = 1000,
|
||||
commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
|
||||
)) { loader =>
|
||||
when(replicaManager.getLog(tp)).thenReturn(Some(log))
|
||||
when(log.logStartOffset).thenReturn(0L)
|
||||
|
@ -303,7 +309,8 @@ class CoordinatorLoaderImplTest {
|
|||
time = Time.SYSTEM,
|
||||
replicaManager = replicaManager,
|
||||
deserializer = serde,
|
||||
loadBufferSize = 1000
|
||||
loadBufferSize = 1000,
|
||||
commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
|
||||
)) { loader =>
|
||||
when(replicaManager.getLog(tp)).thenReturn(Some(log))
|
||||
when(log.logStartOffset).thenReturn(0L)
|
||||
|
@ -331,7 +338,8 @@ class CoordinatorLoaderImplTest {
|
|||
time,
|
||||
replicaManager = replicaManager,
|
||||
deserializer = serde,
|
||||
loadBufferSize = 1000
|
||||
loadBufferSize = 1000,
|
||||
commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
|
||||
)) { loader =>
|
||||
val startTimeMs = time.milliseconds()
|
||||
when(replicaManager.getLog(tp)).thenReturn(Some(log))
|
||||
|
@ -378,7 +386,8 @@ class CoordinatorLoaderImplTest {
|
|||
time = Time.SYSTEM,
|
||||
replicaManager = replicaManager,
|
||||
deserializer = serde,
|
||||
loadBufferSize = 1000
|
||||
loadBufferSize = 1000,
|
||||
commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
|
||||
)) { loader =>
|
||||
when(replicaManager.getLog(tp)).thenReturn(Some(log))
|
||||
when(log.logStartOffset).thenReturn(0L)
|
||||
|
@ -441,7 +450,8 @@ class CoordinatorLoaderImplTest {
|
|||
time = Time.SYSTEM,
|
||||
replicaManager = replicaManager,
|
||||
deserializer = serde,
|
||||
loadBufferSize = 1000
|
||||
loadBufferSize = 1000,
|
||||
commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
|
||||
)) { loader =>
|
||||
when(replicaManager.getLog(tp)).thenReturn(Some(log))
|
||||
when(log.logStartOffset).thenReturn(0L)
|
||||
|
@ -467,7 +477,8 @@ class CoordinatorLoaderImplTest {
|
|||
time = Time.SYSTEM,
|
||||
replicaManager = replicaManager,
|
||||
deserializer = serde,
|
||||
loadBufferSize = 1000
|
||||
loadBufferSize = 1000,
|
||||
commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
|
||||
)) { loader =>
|
||||
when(replicaManager.getLog(tp)).thenReturn(Some(log))
|
||||
when(log.logStartOffset).thenReturn(0L)
|
||||
|
@ -531,7 +542,8 @@ class CoordinatorLoaderImplTest {
|
|||
time = Time.SYSTEM,
|
||||
replicaManager = replicaManager,
|
||||
deserializer = serde,
|
||||
loadBufferSize = 1000
|
||||
loadBufferSize = 1000,
|
||||
commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
|
||||
)) { loader =>
|
||||
when(replicaManager.getLog(tp)).thenReturn(Some(log))
|
||||
when(log.logStartOffset).thenReturn(0L)
|
||||
|
@ -559,6 +571,79 @@ class CoordinatorLoaderImplTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testUpdateLastWrittenOffsetCommitInterval(): Unit = {
|
||||
val tp = new TopicPartition("foo", 0)
|
||||
val replicaManager = mock(classOf[ReplicaManager])
|
||||
val serde = new StringKeyValueDeserializer
|
||||
val log = mock(classOf[UnifiedLog])
|
||||
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
|
||||
|
||||
Using.resource(new CoordinatorLoaderImpl[(String, String)](
|
||||
time = Time.SYSTEM,
|
||||
replicaManager = replicaManager,
|
||||
deserializer = serde,
|
||||
loadBufferSize = 1000,
|
||||
commitIntervalOffsets = 2L
|
||||
)) { loader =>
|
||||
when(replicaManager.getLog(tp)).thenReturn(Some(log))
|
||||
when(log.logStartOffset).thenReturn(0L)
|
||||
when(log.highWatermark).thenReturn(7L)
|
||||
when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L))
|
||||
|
||||
val readResult1 = logReadResult(startOffset = 0, records = Seq(
|
||||
new SimpleRecord("k1".getBytes, "v1".getBytes),
|
||||
new SimpleRecord("k2".getBytes, "v2".getBytes)
|
||||
))
|
||||
|
||||
when(log.read(0L, 1000, FetchIsolation.LOG_END, true
|
||||
)).thenReturn(readResult1)
|
||||
|
||||
val readResult2 = logReadResult(startOffset = 2, records = Seq(
|
||||
new SimpleRecord("k3".getBytes, "v3".getBytes),
|
||||
new SimpleRecord("k4".getBytes, "v4".getBytes),
|
||||
new SimpleRecord("k5".getBytes, "v5".getBytes)
|
||||
))
|
||||
|
||||
when(log.read(2L, 1000, FetchIsolation.LOG_END, true
|
||||
)).thenReturn(readResult2)
|
||||
|
||||
val readResult3 = logReadResult(startOffset = 5, records = Seq(
|
||||
new SimpleRecord("k6".getBytes, "v6".getBytes)
|
||||
))
|
||||
|
||||
when(log.read(5L, 1000, FetchIsolation.LOG_END, true
|
||||
)).thenReturn(readResult3)
|
||||
|
||||
val readResult4 = logReadResult(startOffset = 6, records = Seq(
|
||||
new SimpleRecord("k7".getBytes, "v7".getBytes)
|
||||
))
|
||||
|
||||
when(log.read(6L, 1000, FetchIsolation.LOG_END, true
|
||||
)).thenReturn(readResult4)
|
||||
|
||||
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
|
||||
|
||||
verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
|
||||
verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
|
||||
verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
|
||||
verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
|
||||
verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
|
||||
verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6"))
|
||||
verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7"))
|
||||
verify(coordinator, times(0)).updateLastWrittenOffset(0L)
|
||||
verify(coordinator, times(1)).updateLastWrittenOffset(2L)
|
||||
verify(coordinator, times(1)).updateLastWrittenOffset(5L)
|
||||
verify(coordinator, times(0)).updateLastWrittenOffset(6L)
|
||||
verify(coordinator, times(1)).updateLastWrittenOffset(7L)
|
||||
verify(coordinator, times(0)).updateLastCommittedOffset(0L)
|
||||
verify(coordinator, times(1)).updateLastCommittedOffset(2L)
|
||||
verify(coordinator, times(1)).updateLastCommittedOffset(5L)
|
||||
verify(coordinator, times(0)).updateLastCommittedOffset(6L)
|
||||
verify(coordinator, times(1)).updateLastCommittedOffset(7L)
|
||||
}
|
||||
}
|
||||
|
||||
private def logReadResult(
|
||||
startOffset: Long,
|
||||
producerId: Long = RecordBatch.NO_PRODUCER_ID,
|
||||
|
|
Loading…
Reference in New Issue