diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala index 70536abecc0..86d8748462b 100644 --- a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala +++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala @@ -34,20 +34,38 @@ import java.util.concurrent.CompletableFuture import java.util.concurrent.atomic.AtomicBoolean import scala.jdk.CollectionConverters._ +object CoordinatorLoaderImpl { + /** + * The interval between updating the last committed offset during loading, in offsets. Smaller + * values commit more often at the expense of loading times when the workload is simple and does + * not create collections that need to participate in {@link CoordinatorPlayback} snapshotting. + * Larger values commit less often and allow more temporary data to accumulate before the next + * commit when the workload creates many temporary collections that need to be snapshotted. + * + * The value of 16,384 was chosen as a trade-off between the performance of these two workloads. + * + * When changing this value, please run the GroupCoordinatorShardLoadingBenchmark to evaluate + * the relative change in performance. + */ + val DEFAULT_COMMIT_INTERVAL_OFFSETS = 16384L +} + /** * Coordinator loader which reads records from a partition and replays them * to a group coordinator. * - * @param replicaManager The replica manager. - * @param deserializer The deserializer to use. - * @param loadBufferSize The load buffer size. + * @param replicaManager The replica manager. + * @param deserializer The deserializer to use. + * @param loadBufferSize The load buffer size. + * @param commitIntervalOffsets The interval between updating the last committed offset during loading, in offsets. * @tparam T The record type. */ class CoordinatorLoaderImpl[T]( time: Time, replicaManager: ReplicaManager, deserializer: Deserializer[T], - loadBufferSize: Int + loadBufferSize: Int, + commitIntervalOffsets: Long = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS ) extends CoordinatorLoader[T] with Logging { private val isRunning = new AtomicBoolean(true) private val scheduler = new KafkaScheduler(1) @@ -99,7 +117,7 @@ class CoordinatorLoaderImpl[T]( // the log end offset but the log is empty. This could happen with compacted topics. var readAtLeastOneRecord = true - var previousHighWatermark = -1L + var lastCommittedOffset = -1L var numRecords = 0L var numBytes = 0L while (currentOffset < logEndOffset && readAtLeastOneRecord && isRunning.get) { @@ -213,10 +231,14 @@ class CoordinatorLoaderImpl[T]( if (currentOffset >= currentHighWatermark) { coordinator.updateLastWrittenOffset(currentOffset) - if (currentHighWatermark > previousHighWatermark) { + if (currentHighWatermark > lastCommittedOffset) { coordinator.updateLastCommittedOffset(currentHighWatermark) - previousHighWatermark = currentHighWatermark + lastCommittedOffset = currentHighWatermark } + } else if (currentOffset - lastCommittedOffset >= commitIntervalOffsets) { + coordinator.updateLastWrittenOffset(currentOffset) + coordinator.updateLastCommittedOffset(currentOffset) + lastCommittedOffset = currentOffset } } numBytes = numBytes + memoryRecords.sizeInBytes() diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 85612c58d4a..f4c43adb772 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -634,7 +634,8 @@ class BrokerServer( time, replicaManager, serde, - config.groupCoordinatorConfig.offsetsLoadBufferSize + config.groupCoordinatorConfig.offsetsLoadBufferSize, + CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS ) val writer = new CoordinatorPartitionWriter( replicaManager @@ -673,7 +674,8 @@ class BrokerServer( time, replicaManager, serde, - config.shareCoordinatorConfig.shareCoordinatorLoadBufferSize() + config.shareCoordinatorConfig.shareCoordinatorLoadBufferSize(), + CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS ) val writer = new CoordinatorPartitionWriter( replicaManager diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala index dc4bbc830cd..585c18e2eca 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala @@ -63,7 +63,8 @@ class CoordinatorLoaderImplTest { time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, - loadBufferSize = 1000 + loadBufferSize = 1000, + commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS )) { loader => when(replicaManager.getLog(tp)).thenReturn(None) @@ -83,7 +84,8 @@ class CoordinatorLoaderImplTest { time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, - loadBufferSize = 1000 + loadBufferSize = 1000, + commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS )) { loader => loader.close() @@ -104,7 +106,8 @@ class CoordinatorLoaderImplTest { time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, - loadBufferSize = 1000 + loadBufferSize = 1000, + commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS )) { loader => when(replicaManager.getLog(tp)).thenReturn(Some(log)) when(log.logStartOffset).thenReturn(0L) @@ -207,7 +210,8 @@ class CoordinatorLoaderImplTest { time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, - loadBufferSize = 1000 + loadBufferSize = 1000, + commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS )) { loader => when(replicaManager.getLog(tp)).thenReturn(Some(log)) when(log.logStartOffset).thenReturn(0L) @@ -250,7 +254,8 @@ class CoordinatorLoaderImplTest { time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, - loadBufferSize = 1000 + loadBufferSize = 1000, + commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS )) { loader => when(replicaManager.getLog(tp)).thenReturn(Some(log)) when(log.logStartOffset).thenReturn(0L) @@ -290,7 +295,8 @@ class CoordinatorLoaderImplTest { time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, - loadBufferSize = 1000 + loadBufferSize = 1000, + commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS )) { loader => when(replicaManager.getLog(tp)).thenReturn(Some(log)) when(log.logStartOffset).thenReturn(0L) @@ -332,7 +338,8 @@ class CoordinatorLoaderImplTest { time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, - loadBufferSize = 1000 + loadBufferSize = 1000, + commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS )) { loader => when(replicaManager.getLog(tp)).thenReturn(Some(log)) when(log.logStartOffset).thenReturn(0L) @@ -364,7 +371,8 @@ class CoordinatorLoaderImplTest { time, replicaManager = replicaManager, deserializer = serde, - loadBufferSize = 1000 + loadBufferSize = 1000, + commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS )) { loader => val startTimeMs = time.milliseconds() when(replicaManager.getLog(tp)).thenReturn(Some(log)) @@ -419,7 +427,8 @@ class CoordinatorLoaderImplTest { time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, - loadBufferSize = 1000 + loadBufferSize = 1000, + commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS )) { loader => when(replicaManager.getLog(tp)).thenReturn(Some(log)) when(log.logStartOffset).thenReturn(0L) @@ -494,7 +503,8 @@ class CoordinatorLoaderImplTest { time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, - loadBufferSize = 1000 + loadBufferSize = 1000, + commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS )) { loader => when(replicaManager.getLog(tp)).thenReturn(Some(log)) when(log.logStartOffset).thenReturn(0L) @@ -520,7 +530,8 @@ class CoordinatorLoaderImplTest { time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, - loadBufferSize = 1000 + loadBufferSize = 1000, + commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS )) { loader => when(replicaManager.getLog(tp)).thenReturn(Some(log)) when(log.logStartOffset).thenReturn(0L) @@ -596,7 +607,8 @@ class CoordinatorLoaderImplTest { time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, - loadBufferSize = 1000 + loadBufferSize = 1000, + commitIntervalOffsets = CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS )) { loader => when(replicaManager.getLog(tp)).thenReturn(Some(log)) when(log.logStartOffset).thenReturn(0L) @@ -632,6 +644,79 @@ class CoordinatorLoaderImplTest { } } + @Test + def testUpdateLastWrittenOffsetCommitInterval(): Unit = { + val tp = new TopicPartition("foo", 0) + val replicaManager = mock(classOf[ReplicaManager]) + val serde = new StringKeyValueDeserializer + val log = mock(classOf[UnifiedLog]) + val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) + + Using.resource(new CoordinatorLoaderImpl[(String, String)]( + time = Time.SYSTEM, + replicaManager = replicaManager, + deserializer = serde, + loadBufferSize = 1000, + commitIntervalOffsets = 2L + )) { loader => + when(replicaManager.getLog(tp)).thenReturn(Some(log)) + when(log.logStartOffset).thenReturn(0L) + when(log.highWatermark).thenReturn(7L) + when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L)) + + val readResult1 = logReadResult(startOffset = 0, records = Seq( + new SimpleRecord("k1".getBytes, "v1".getBytes), + new SimpleRecord("k2".getBytes, "v2".getBytes) + )) + + when(log.read(0L, 1000, FetchIsolation.LOG_END, true + )).thenReturn(readResult1) + + val readResult2 = logReadResult(startOffset = 2, records = Seq( + new SimpleRecord("k3".getBytes, "v3".getBytes), + new SimpleRecord("k4".getBytes, "v4".getBytes), + new SimpleRecord("k5".getBytes, "v5".getBytes) + )) + + when(log.read(2L, 1000, FetchIsolation.LOG_END, true + )).thenReturn(readResult2) + + val readResult3 = logReadResult(startOffset = 5, records = Seq( + new SimpleRecord("k6".getBytes, "v6".getBytes) + )) + + when(log.read(5L, 1000, FetchIsolation.LOG_END, true + )).thenReturn(readResult3) + + val readResult4 = logReadResult(startOffset = 6, records = Seq( + new SimpleRecord("k7".getBytes, "v7".getBytes) + )) + + when(log.read(6L, 1000, FetchIsolation.LOG_END, true + )).thenReturn(readResult4) + + assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)) + + verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1")) + verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2")) + verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3")) + verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4")) + verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5")) + verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6")) + verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7")) + verify(coordinator, times(0)).updateLastWrittenOffset(0L) + verify(coordinator, times(1)).updateLastWrittenOffset(2L) + verify(coordinator, times(1)).updateLastWrittenOffset(5L) + verify(coordinator, times(0)).updateLastWrittenOffset(6L) + verify(coordinator, times(1)).updateLastWrittenOffset(7L) + verify(coordinator, times(0)).updateLastCommittedOffset(0L) + verify(coordinator, times(1)).updateLastCommittedOffset(2L) + verify(coordinator, times(1)).updateLastCommittedOffset(5L) + verify(coordinator, times(0)).updateLastCommittedOffset(6L) + verify(coordinator, times(1)).updateLastCommittedOffset(7L) + } + } + private def logReadResult( startOffset: Long, producerId: Long = RecordBatch.NO_PRODUCER_ID,