From d067c6c04089a3d24e1f72e6cb1b10b0d85f76da Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 19 Sep 2025 08:44:07 +0100 Subject: [PATCH] KAFKA-19716: Clear out coordinator snapshots periodically while loading (#20547) When nested Timeline collections are created and discarded while loading a coordinator partition, references to them accumulate in the current snapshot. Allow the GC to reclaim them by starting a new snapshot and discarding previous snapshots every 16,384 records. Small intervals degrade loading times for non-transactional offset commit workloads while large intervals degrade loading times for transactional workloads. A default of 16,384 was chosen as a compromise. Also add a benchmark for group coordinator loading. Reviewers: David Jacot --- build.gradle | 1 + checkstyle/import-control-jmh-benchmarks.xml | 4 +- .../common/runtime/CoordinatorLoaderImpl.java | 39 +- .../runtime/SnapshottableCoordinator.java | 2 +- .../runtime/CoordinatorLoaderImplTest.java | 109 +++++- .../scala/kafka/server/BrokerServer.scala | 6 +- ...GroupCoordinatorShardLoadingBenchmark.java | 355 ++++++++++++++++++ .../apache/kafka/jmh/coordinator/MockLog.java | 74 ++++ 8 files changed, 563 insertions(+), 27 deletions(-) create mode 100644 jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java create mode 100644 jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/MockLog.java diff --git a/build.gradle b/build.gradle index 26ca93d0738..753a86cfc35 100644 --- a/build.gradle +++ b/build.gradle @@ -3350,6 +3350,7 @@ project(':jmh-benchmarks') { implementation project(':raft') implementation project(':clients') implementation project(':coordinator-common') + implementation project(':coordinator-common').sourceSets.test.output implementation project(':group-coordinator') implementation project(':group-coordinator:group-coordinator-api') implementation project(':metadata') diff --git a/checkstyle/import-control-jmh-benchmarks.xml b/checkstyle/import-control-jmh-benchmarks.xml index d2f87a3577f..4c11bc3acb4 100644 --- a/checkstyle/import-control-jmh-benchmarks.xml +++ b/checkstyle/import-control-jmh-benchmarks.xml @@ -52,9 +52,7 @@ - - - + diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java index 6613ce25fc8..3a8b7434326 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java @@ -50,6 +50,20 @@ import java.util.function.Function; */ public class CoordinatorLoaderImpl implements CoordinatorLoader { + /** + * The interval between updating the last committed offset during loading, in offsets. Smaller + * values commit more often at the expense of loading times when the workload is simple and does + * not create collections that need to participate in {@link CoordinatorPlayback} snapshotting. + * Larger values commit less often and allow more temporary data to accumulate before the next + * commit when the workload creates many temporary collections that need to be snapshotted. + * + * The value of 16,384 was chosen as a trade-off between the performance of these two workloads. + * + * When changing this value, please run the GroupCoordinatorShardLoadingBenchmark to evaluate + * the relative change in performance. + */ + public static final long DEFAULT_COMMIT_INTERVAL_OFFSETS = 16384; + private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLoaderImpl.class); private final Time time; @@ -57,6 +71,7 @@ public class CoordinatorLoaderImpl implements CoordinatorLoader { private final Function> partitionLogEndOffsetSupplier; private final Deserializer deserializer; private final int loadBufferSize; + private final long commitIntervalOffsets; private final AtomicBoolean isRunning = new AtomicBoolean(true); private final KafkaScheduler scheduler = new KafkaScheduler(1); @@ -66,13 +81,15 @@ public class CoordinatorLoaderImpl implements CoordinatorLoader { Function> partitionLogSupplier, Function> partitionLogEndOffsetSupplier, Deserializer deserializer, - int loadBufferSize + int loadBufferSize, + long commitIntervalOffsets ) { this.time = time; this.partitionLogSupplier = partitionLogSupplier; this.partitionLogEndOffsetSupplier = partitionLogEndOffsetSupplier; this.deserializer = deserializer; this.loadBufferSize = loadBufferSize; + this.commitIntervalOffsets = commitIntervalOffsets; this.scheduler.startup(); } @@ -121,7 +138,7 @@ public class CoordinatorLoaderImpl implements CoordinatorLoader { long currentOffset = log.logStartOffset(); LoadStats stats = new LoadStats(); - long previousHighWatermark = -1L; + long lastCommittedOffset = -1L; while (shouldFetchNextBatch(currentOffset, logEndOffset(tp), stats.readAtLeastOneRecord)) { FetchDataInfo fetchDataInfo = log.read(currentOffset, loadBufferSize, FetchIsolation.LOG_END, true); @@ -133,9 +150,9 @@ public class CoordinatorLoaderImpl implements CoordinatorLoader { buffer = memoryRecords.buffer(); } - ReplayResult replayResult = processMemoryRecords(tp, log, memoryRecords, coordinator, stats, currentOffset, previousHighWatermark); + ReplayResult replayResult = processMemoryRecords(tp, log, memoryRecords, coordinator, stats, currentOffset, lastCommittedOffset); currentOffset = replayResult.nextOffset; - previousHighWatermark = replayResult.highWatermark; + lastCommittedOffset = replayResult.lastCommittedOffset; } long endTimeMs = time.milliseconds(); @@ -207,7 +224,7 @@ public class CoordinatorLoaderImpl implements CoordinatorLoader { CoordinatorPlayback coordinator, LoadStats loadStats, long currentOffset, - long previousHighWatermark + long lastCommittedOffset ) { for (MutableRecordBatch batch : memoryRecords.batches()) { if (batch.isControlBatch()) { @@ -286,14 +303,18 @@ public class CoordinatorLoaderImpl implements CoordinatorLoader { if (currentOffset >= currentHighWatermark) { coordinator.updateLastWrittenOffset(currentOffset); - if (currentHighWatermark > previousHighWatermark) { + if (currentHighWatermark > lastCommittedOffset) { coordinator.updateLastCommittedOffset(currentHighWatermark); - previousHighWatermark = currentHighWatermark; + lastCommittedOffset = currentHighWatermark; } + } else if (currentOffset - lastCommittedOffset >= commitIntervalOffsets) { + coordinator.updateLastWrittenOffset(currentOffset); + coordinator.updateLastCommittedOffset(currentOffset); + lastCommittedOffset = currentOffset; } } loadStats.numBytes += memoryRecords.sizeInBytes(); - return new ReplayResult(currentOffset, previousHighWatermark); + return new ReplayResult(currentOffset, lastCommittedOffset); } /** @@ -326,5 +347,5 @@ public class CoordinatorLoaderImpl implements CoordinatorLoader { } } - private record ReplayResult(long nextOffset, long highWatermark) { } + private record ReplayResult(long nextOffset, long lastCommittedOffset) { } } diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/SnapshottableCoordinator.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/SnapshottableCoordinator.java index 1550e444bf4..278373e6842 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/SnapshottableCoordinator.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/SnapshottableCoordinator.java @@ -61,7 +61,7 @@ public class SnapshottableCoordinator, U> implemen */ private long lastCommittedOffset; - SnapshottableCoordinator( + public SnapshottableCoordinator( LogContext logContext, SnapshotRegistry snapshotRegistry, S coordinator, diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java index 9f8ab68c66f..11cdab83cac 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java @@ -91,7 +91,8 @@ class CoordinatorLoaderImplTest { partitionLogSupplier, partitionLogEndOffsetSupplier, serde, - 1000 + 1000, + CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS )) { assertFutureThrows(NotLeaderOrFollowerException.class, loader.load(tp, coordinator)); } @@ -110,7 +111,8 @@ class CoordinatorLoaderImplTest { partitionLogSupplier, partitionLogEndOffsetSupplier, serde, - 1000 + 1000, + CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS )) { loader.close(); assertFutureThrows(RuntimeException.class, loader.load(tp, coordinator)); @@ -131,7 +133,8 @@ class CoordinatorLoaderImplTest { partitionLogSupplier, partitionLogEndOffsetSupplier, serde, - 1000 + 1000, + CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS )) { when(log.logStartOffset()).thenReturn(0L); when(log.highWatermark()).thenReturn(0L); @@ -217,7 +220,8 @@ class CoordinatorLoaderImplTest { partitionLogSupplier, partitionLogEndOffsetSupplier, serde, - 1000 + 1000, + CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS )) { when(log.logStartOffset()).thenReturn(0L); @@ -262,7 +266,8 @@ class CoordinatorLoaderImplTest { partitionLogSupplier, partitionLogEndOffsetSupplier, serde, - 1000 + 1000, + CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS )) { when(log.logStartOffset()).thenReturn(0L); @@ -298,7 +303,8 @@ class CoordinatorLoaderImplTest { partitionLogSupplier, partitionLogEndOffsetSupplier, serde, - 1000 + 1000, + CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS )) { when(log.logStartOffset()).thenReturn(0L); @@ -337,7 +343,8 @@ class CoordinatorLoaderImplTest { partitionLogSupplier, partitionLogEndOffsetSupplier, serde, - 1000 + 1000, + CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS )) { when(log.logStartOffset()).thenReturn(0L); @@ -365,7 +372,8 @@ class CoordinatorLoaderImplTest { partitionLogSupplier, partitionLogEndOffsetSupplier, serde, - 1000 + 1000, + CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS )) { long startTimeMs = time.milliseconds(); when(log.logStartOffset()).thenReturn(0L); @@ -412,7 +420,8 @@ class CoordinatorLoaderImplTest { partitionLogSupplier, partitionLogEndOffsetSupplier, serde, - 1000 + 1000, + CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS )) { when(log.logStartOffset()).thenReturn(0L); when(log.highWatermark()).thenReturn(0L, 0L, 2L); @@ -475,7 +484,8 @@ class CoordinatorLoaderImplTest { partitionLogSupplier, partitionLogEndOffsetSupplier, serde, - 1000 + 1000, + CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS )) { when(log.logStartOffset()).thenReturn(0L); when(log.highWatermark()).thenReturn(0L); @@ -501,7 +511,8 @@ class CoordinatorLoaderImplTest { partitionLogSupplier, partitionLogEndOffsetSupplier, serde, - 1000 + 1000, + CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS )) { when(log.logStartOffset()).thenReturn(0L); when(log.highWatermark()).thenReturn(5L, 7L, 7L); @@ -551,6 +562,79 @@ class CoordinatorLoaderImplTest { } } + @Test + void testUpdateLastWrittenOffsetCommitInterval() throws Exception { + TopicPartition tp = new TopicPartition("foo", 0); + UnifiedLog log = mock(UnifiedLog.class); + Function> partitionLogSupplier = partition -> Optional.of(log); + Function> partitionLogEndOffsetSupplier = partition -> Optional.of(7L); + StringKeyValueDeserializer serde = new StringKeyValueDeserializer(); + CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); + + try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( + Time.SYSTEM, + partitionLogSupplier, + partitionLogEndOffsetSupplier, + serde, + 1000, + 2L + )) { + when(log.logStartOffset()).thenReturn(0L); + when(log.highWatermark()).thenReturn(7L); + + FetchDataInfo readResult1 = logReadResult(0, Arrays.asList( + new SimpleRecord("k1".getBytes(), "v1".getBytes()), + new SimpleRecord("k2".getBytes(), "v2".getBytes()) + )); + + when(log.read(0L, 1000, FetchIsolation.LOG_END, true)) + .thenReturn(readResult1); + + FetchDataInfo readResult2 = logReadResult(2, Arrays.asList( + new SimpleRecord("k3".getBytes(), "v3".getBytes()), + new SimpleRecord("k4".getBytes(), "v4".getBytes()), + new SimpleRecord("k5".getBytes(), "v5".getBytes()) + )); + + when(log.read(2L, 1000, FetchIsolation.LOG_END, true)) + .thenReturn(readResult2); + + FetchDataInfo readResult3 = logReadResult(5, Arrays.asList( + new SimpleRecord("k6".getBytes(), "v6".getBytes()) + )); + + when(log.read(5L, 1000, FetchIsolation.LOG_END, true)) + .thenReturn(readResult3); + + FetchDataInfo readResult4 = logReadResult(6, Arrays.asList( + new SimpleRecord("k7".getBytes(), "v7".getBytes()) + )); + + when(log.read(6L, 1000, FetchIsolation.LOG_END, true)) + .thenReturn(readResult4); + + assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)); + + verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k1", "v1")); + verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k2", "v2")); + verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k3", "v3")); + verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k4", "v4")); + verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k5", "v5")); + verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k6", "v6")); + verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k7", "v7")); + verify(coordinator, times(0)).updateLastWrittenOffset(0L); + verify(coordinator, times(1)).updateLastWrittenOffset(2L); + verify(coordinator, times(1)).updateLastWrittenOffset(5L); + verify(coordinator, times(0)).updateLastWrittenOffset(6L); + verify(coordinator, times(1)).updateLastWrittenOffset(7L); + verify(coordinator, times(0)).updateLastCommittedOffset(0L); + verify(coordinator, times(1)).updateLastCommittedOffset(2L); + verify(coordinator, times(1)).updateLastCommittedOffset(5L); + verify(coordinator, times(0)).updateLastCommittedOffset(6L); + verify(coordinator, times(1)).updateLastCommittedOffset(7L); + } + } + @Test void testPartitionGoesOfflineDuringLoad() throws Exception { TopicPartition tp = new TopicPartition("foo", 0); @@ -565,7 +649,8 @@ class CoordinatorLoaderImplTest { partitionLogSupplier, partitionLogEndOffsetSupplier, serde, - 1000 + 1000, + CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS )) { when(log.logStartOffset()).thenReturn(0L); when(log.highWatermark()).thenReturn(0L); diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 3ded033020b..47085169979 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -613,7 +613,8 @@ class BrokerServer( tp => replicaManager.getLog(tp).toJava, tp => replicaManager.getLogEndOffset(tp).map(Long.box).toJava, serde, - config.groupCoordinatorConfig.offsetsLoadBufferSize + config.groupCoordinatorConfig.offsetsLoadBufferSize, + CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS ) val writer = new CoordinatorPartitionWriter( replicaManager @@ -644,7 +645,8 @@ class BrokerServer( tp => replicaManager.getLog(tp).toJava, tp => replicaManager.getLogEndOffset(tp).map(Long.box).toJava, serde, - config.shareCoordinatorConfig.shareCoordinatorLoadBufferSize() + config.shareCoordinatorConfig.shareCoordinatorLoadBufferSize(), + CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS ) val writer = new CoordinatorPartitionWriter( replicaManager diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java new file mode 100644 index 00000000000..0a6612ca8fd --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.coordinator; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader; +import org.apache.kafka.coordinator.common.runtime.CoordinatorLoaderImpl; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.common.runtime.MockCoordinatorExecutor; +import org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer; +import org.apache.kafka.coordinator.common.runtime.SnapshottableCoordinator; +import org.apache.kafka.coordinator.group.GroupConfigManager; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; +import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers; +import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde; +import org.apache.kafka.coordinator.group.GroupCoordinatorShard; +import org.apache.kafka.coordinator.group.OffsetAndMetadata; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.storage.internals.log.FetchDataInfo; +import org.apache.kafka.storage.internals.log.LogOffsetMetadata; +import org.apache.kafka.storage.internals.log.UnifiedLog; +import org.apache.kafka.timeline.SnapshotRegistry; + +import com.yammer.metrics.core.MetricsRegistry; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.BenchmarkParams; +import org.openjdk.jmh.infra.IterationParams; +import org.openjdk.jmh.runner.IterationType; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 1) +@Measurement(iterations = 1) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class GroupCoordinatorShardLoadingBenchmark { + + private static final String GROUP_ID = "test-group"; + + @Param({"1", "4", "16", "64", "256", "1024", "4096", "16384", "65536", "262144", "1048576"}) + private long commitInterval; + + @Param({"8192"}) + private int batchCount; + + @Param({"2048"}) + private int batchSize; + + private TopicPartition topicPartition; + private MockTime time; + private GroupCoordinatorConfig config; + private GroupCoordinatorRecordSerde serde; + private GroupCoordinatorShard coordinatorShard; + private SnapshottableCoordinator snapshottableCoordinator; + private UnifiedLog offsetCommitLog; + private UnifiedLog transactionalOffsetCommitLog; + + static class OffsetCommitLog extends MockLog { + private final int batchCount; + private final SimpleRecord[] batch; + + public OffsetCommitLog(TopicPartition tp, int batchSize, int batchCount) throws IOException { + super(tp); + + this.batchCount = batchCount; + + List batchRecords = new ArrayList<>(); + for (int i = 0; i < batchSize; i++) { + String topic = "topic-" + i; + int partition = 0; + + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata( + 0L, + OptionalInt.of(0), + OffsetAndMetadata.NO_METADATA, + 0L, + OptionalLong.empty(), + Uuid.randomUuid() + ); + + CoordinatorRecord coordinatorRecord = GroupCoordinatorRecordHelpers.newOffsetCommitRecord( + GROUP_ID, topic, partition, offsetAndMetadata + ); + + byte[] keyBytes = new GroupCoordinatorRecordSerde().serializeKey(coordinatorRecord); + byte[] valueBytes = new GroupCoordinatorRecordSerde().serializeValue(coordinatorRecord); + SimpleRecord simpleRecord = new SimpleRecord(keyBytes, valueBytes); + batchRecords.add(simpleRecord); + } + + this.batch = batchRecords.toArray(new SimpleRecord[0]); + } + + @Override + public long logStartOffset() { + return 0L; + } + + @Override + public long logEndOffset() { + if (batch == null) { + return 0L; + } + + return (long) batchCount * (long) batch.length; + } + + @Override + public FetchDataInfo read(long startOffset, int maxLength, FetchIsolation isolation, boolean minOneMessage) { + if (startOffset < 0 || startOffset >= logEndOffset()) { + return new FetchDataInfo(new LogOffsetMetadata(startOffset), MemoryRecords.EMPTY); + } + + MemoryRecords records = MemoryRecords.withRecords( + startOffset, + Compression.NONE, + batch + ); + return new FetchDataInfo(new LogOffsetMetadata(startOffset), records); + } + } + + static class TransactionalOffsetCommitLog extends MockLog { + private final int batchCount; + private final SimpleRecord[] batch; + private final long producerId; + private final short producerEpoch; + private final int coordinatorEpoch; + + public TransactionalOffsetCommitLog(TopicPartition tp, int batchSize, int batchCount) throws IOException { + super(tp); + + this.batchCount = batchCount; + this.producerId = 1000L; + this.producerEpoch = 0; + this.coordinatorEpoch = 100; + + List batchRecords = new ArrayList<>(); + for (int i = 0; i < batchSize - 1; i++) { + String topic = "topic-" + i; + int partition = 0; + + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata( + 0L, + OptionalInt.of(0), + OffsetAndMetadata.NO_METADATA, + 0L, + OptionalLong.empty(), + Uuid.randomUuid() + ); + + CoordinatorRecord coordinatorRecord = GroupCoordinatorRecordHelpers.newOffsetCommitRecord( + GROUP_ID, topic, partition, offsetAndMetadata + ); + + byte[] keyBytes = new GroupCoordinatorRecordSerde().serializeKey(coordinatorRecord); + byte[] valueBytes = new GroupCoordinatorRecordSerde().serializeValue(coordinatorRecord); + SimpleRecord simpleRecord = new SimpleRecord(keyBytes, valueBytes); + batchRecords.add(simpleRecord); + } + + this.batch = batchRecords.toArray(new SimpleRecord[0]); + } + + @Override + public long logStartOffset() { + return 0L; + } + + @Override + public long logEndOffset() { + if (batch == null) { + return 0L; + } + + return (long) (batch.length + 1) * (long) batchCount; + } + + @Override + public FetchDataInfo read(long startOffset, int maxLength, FetchIsolation isolation, boolean minOneMessage) { + if (startOffset < 0 || startOffset >= logEndOffset()) { + return new FetchDataInfo(new LogOffsetMetadata(startOffset), MemoryRecords.EMPTY); + } + + // Repeat the batch followed by a commit marker. + long patternLength = batch.length + 1; + if (startOffset % patternLength < batch.length) { + MemoryRecords records = MemoryRecords.withTransactionalRecords( + startOffset, + Compression.NONE, + producerId, + producerEpoch, + 0, + 0, + batch + ); + return new FetchDataInfo(new LogOffsetMetadata(startOffset), records); + } else { + MemoryRecords records = MemoryRecords.withEndTransactionMarker( + startOffset, + 0L, + 0, + producerId, + producerEpoch, + new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch) + ); + return new FetchDataInfo(new LogOffsetMetadata(startOffset), records); + } + } + } + + @Setup(Level.Trial) + public void setup() throws Exception { + topicPartition = new TopicPartition("__consumer_offsets", 0); + time = new MockTime(); + Map props = new HashMap<>(); + config = GroupCoordinatorConfig.fromProps(props); + serde = new GroupCoordinatorRecordSerde(); + } + + @Setup(Level.Iteration) + public void setupIteration(BenchmarkParams benchmarkParams, IterationParams iterationParams) throws IOException { + // Reduce the data size for warmup iterations, since transactional offset commit loading + // takes longer than 20 seconds. + int iterationBatchCount = batchCount; + if (iterationParams.getType() == IterationType.WARMUP) { + iterationBatchCount = Math.min(iterationBatchCount, 1024); + } + + offsetCommitLog = new OffsetCommitLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0), batchSize, iterationBatchCount); + transactionalOffsetCommitLog = new TransactionalOffsetCommitLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0), batchSize, iterationBatchCount); + } + + @Setup(Level.Invocation) + public void setupInvocation() { + GroupConfigManager configManager = new GroupConfigManager(new HashMap<>()); + LogContext logContext = new LogContext(); + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + + MetricsRegistry metricsRegistry = new MetricsRegistry(); + Metrics metrics = new Metrics(); + GroupCoordinatorMetrics coordinatorMetrics = new GroupCoordinatorMetrics(metricsRegistry, metrics); + + coordinatorShard = new GroupCoordinatorShard.Builder(config, configManager) + .withAuthorizerPlugin(Optional.empty()) + .withLogContext(logContext) + .withSnapshotRegistry(snapshotRegistry) + .withTime(time) + .withTimer(new MockCoordinatorTimer<>(time)) + .withExecutor(new MockCoordinatorExecutor<>()) + .withCoordinatorMetrics(coordinatorMetrics) + .withTopicPartition(topicPartition) + .build(); + + snapshottableCoordinator = new SnapshottableCoordinator<>( + logContext, + snapshotRegistry, + coordinatorShard, + topicPartition + ); + } + + private CoordinatorLoader.LoadSummary loadRecords(UnifiedLog log) throws ExecutionException, InterruptedException { + Function> partitionLogSupplier = tp -> Optional.of(log); + Function> partitionLogEndOffsetSupplier = tp -> Optional.of(log.logEndOffset()); + + CoordinatorLoaderImpl loader = new CoordinatorLoaderImpl<>( + time, + partitionLogSupplier, + partitionLogEndOffsetSupplier, + serde, + config.offsetsLoadBufferSize(), + commitInterval + ); + + return loader.load(topicPartition, snapshottableCoordinator).get(); + } + + @Benchmark + @Threads(1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public CoordinatorLoader.LoadSummary loadOffsetCommitRecords() throws ExecutionException, InterruptedException { + return loadRecords(offsetCommitLog); + } + + @Benchmark + @Threads(1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public CoordinatorLoader.LoadSummary loadTransactionalOffsetCommitRecords() throws ExecutionException, InterruptedException { + return loadRecords(transactionalOffsetCommitLog); + } + + public static void main(String[] args) throws RunnerException { + Options opt = new OptionsBuilder() + .include(GroupCoordinatorShardLoadingBenchmark.class.getSimpleName()) + .forks(1) + .build(); + + new Runner(opt).run(); + } +} diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/MockLog.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/MockLog.java new file mode 100644 index 00000000000..a7de83d5bcb --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/MockLog.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.coordinator; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.util.Scheduler; +import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; +import org.apache.kafka.storage.internals.log.FetchDataInfo; +import org.apache.kafka.storage.internals.log.LocalLog; +import org.apache.kafka.storage.internals.log.LogOffsetsListener; +import org.apache.kafka.storage.internals.log.LogSegments; +import org.apache.kafka.storage.internals.log.ProducerStateManager; +import org.apache.kafka.storage.internals.log.UnifiedLog; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; + +import java.io.IOException; +import java.util.Optional; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public abstract class MockLog extends UnifiedLog { + + public MockLog(TopicPartition tp) throws IOException { + super( + 0, + createMockLocalLog(tp), + mock(BrokerTopicStats.class), + Integer.MAX_VALUE, + mock(LeaderEpochFileCache.class), + mock(ProducerStateManager.class), + Optional.empty(), + false, + LogOffsetsListener.NO_OP_OFFSETS_LISTENER + ); + } + + @Override + public abstract long logStartOffset(); + + @Override + public abstract long logEndOffset(); + + @Override + public long highWatermark() { + return logEndOffset(); + } + + @Override + public abstract FetchDataInfo read(long startOffset, int maxLength, FetchIsolation isolation, boolean minOneMessage); + + private static LocalLog createMockLocalLog(TopicPartition tp) { + LocalLog localLog = mock(LocalLog.class); + when(localLog.scheduler()).thenReturn(mock(Scheduler.class)); + when(localLog.segments()).thenReturn(mock(LogSegments.class)); + when(localLog.topicPartition()).thenReturn(tp); + return localLog; + } +}