diff --git a/build.gradle b/build.gradle
index 26ca93d0738..753a86cfc35 100644
--- a/build.gradle
+++ b/build.gradle
@@ -3350,6 +3350,7 @@ project(':jmh-benchmarks') {
implementation project(':raft')
implementation project(':clients')
implementation project(':coordinator-common')
+ implementation project(':coordinator-common').sourceSets.test.output
implementation project(':group-coordinator')
implementation project(':group-coordinator:group-coordinator-api')
implementation project(':metadata')
diff --git a/checkstyle/import-control-jmh-benchmarks.xml b/checkstyle/import-control-jmh-benchmarks.xml
index d2f87a3577f..4c11bc3acb4 100644
--- a/checkstyle/import-control-jmh-benchmarks.xml
+++ b/checkstyle/import-control-jmh-benchmarks.xml
@@ -52,9 +52,7 @@
-
-
-
+
diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java
index 6613ce25fc8..3a8b7434326 100644
--- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java
+++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java
@@ -50,6 +50,20 @@ import java.util.function.Function;
*/
public class CoordinatorLoaderImpl implements CoordinatorLoader {
+ /**
+ * The interval between updating the last committed offset during loading, in offsets. Smaller
+ * values commit more often at the expense of loading times when the workload is simple and does
+ * not create collections that need to participate in {@link CoordinatorPlayback} snapshotting.
+ * Larger values commit less often and allow more temporary data to accumulate before the next
+ * commit when the workload creates many temporary collections that need to be snapshotted.
+ *
+ * The value of 16,384 was chosen as a trade-off between the performance of these two workloads.
+ *
+ * When changing this value, please run the GroupCoordinatorShardLoadingBenchmark to evaluate
+ * the relative change in performance.
+ */
+ public static final long DEFAULT_COMMIT_INTERVAL_OFFSETS = 16384;
+
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLoaderImpl.class);
private final Time time;
@@ -57,6 +71,7 @@ public class CoordinatorLoaderImpl implements CoordinatorLoader {
private final Function> partitionLogEndOffsetSupplier;
private final Deserializer deserializer;
private final int loadBufferSize;
+ private final long commitIntervalOffsets;
private final AtomicBoolean isRunning = new AtomicBoolean(true);
private final KafkaScheduler scheduler = new KafkaScheduler(1);
@@ -66,13 +81,15 @@ public class CoordinatorLoaderImpl implements CoordinatorLoader {
Function> partitionLogSupplier,
Function> partitionLogEndOffsetSupplier,
Deserializer deserializer,
- int loadBufferSize
+ int loadBufferSize,
+ long commitIntervalOffsets
) {
this.time = time;
this.partitionLogSupplier = partitionLogSupplier;
this.partitionLogEndOffsetSupplier = partitionLogEndOffsetSupplier;
this.deserializer = deserializer;
this.loadBufferSize = loadBufferSize;
+ this.commitIntervalOffsets = commitIntervalOffsets;
this.scheduler.startup();
}
@@ -121,7 +138,7 @@ public class CoordinatorLoaderImpl implements CoordinatorLoader {
long currentOffset = log.logStartOffset();
LoadStats stats = new LoadStats();
- long previousHighWatermark = -1L;
+ long lastCommittedOffset = -1L;
while (shouldFetchNextBatch(currentOffset, logEndOffset(tp), stats.readAtLeastOneRecord)) {
FetchDataInfo fetchDataInfo = log.read(currentOffset, loadBufferSize, FetchIsolation.LOG_END, true);
@@ -133,9 +150,9 @@ public class CoordinatorLoaderImpl implements CoordinatorLoader {
buffer = memoryRecords.buffer();
}
- ReplayResult replayResult = processMemoryRecords(tp, log, memoryRecords, coordinator, stats, currentOffset, previousHighWatermark);
+ ReplayResult replayResult = processMemoryRecords(tp, log, memoryRecords, coordinator, stats, currentOffset, lastCommittedOffset);
currentOffset = replayResult.nextOffset;
- previousHighWatermark = replayResult.highWatermark;
+ lastCommittedOffset = replayResult.lastCommittedOffset;
}
long endTimeMs = time.milliseconds();
@@ -207,7 +224,7 @@ public class CoordinatorLoaderImpl implements CoordinatorLoader {
CoordinatorPlayback coordinator,
LoadStats loadStats,
long currentOffset,
- long previousHighWatermark
+ long lastCommittedOffset
) {
for (MutableRecordBatch batch : memoryRecords.batches()) {
if (batch.isControlBatch()) {
@@ -286,14 +303,18 @@ public class CoordinatorLoaderImpl implements CoordinatorLoader {
if (currentOffset >= currentHighWatermark) {
coordinator.updateLastWrittenOffset(currentOffset);
- if (currentHighWatermark > previousHighWatermark) {
+ if (currentHighWatermark > lastCommittedOffset) {
coordinator.updateLastCommittedOffset(currentHighWatermark);
- previousHighWatermark = currentHighWatermark;
+ lastCommittedOffset = currentHighWatermark;
}
+ } else if (currentOffset - lastCommittedOffset >= commitIntervalOffsets) {
+ coordinator.updateLastWrittenOffset(currentOffset);
+ coordinator.updateLastCommittedOffset(currentOffset);
+ lastCommittedOffset = currentOffset;
}
}
loadStats.numBytes += memoryRecords.sizeInBytes();
- return new ReplayResult(currentOffset, previousHighWatermark);
+ return new ReplayResult(currentOffset, lastCommittedOffset);
}
/**
@@ -326,5 +347,5 @@ public class CoordinatorLoaderImpl implements CoordinatorLoader {
}
}
- private record ReplayResult(long nextOffset, long highWatermark) { }
+ private record ReplayResult(long nextOffset, long lastCommittedOffset) { }
}
diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/SnapshottableCoordinator.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/SnapshottableCoordinator.java
index 1550e444bf4..278373e6842 100644
--- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/SnapshottableCoordinator.java
+++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/SnapshottableCoordinator.java
@@ -61,7 +61,7 @@ public class SnapshottableCoordinator, U> implemen
*/
private long lastCommittedOffset;
- SnapshottableCoordinator(
+ public SnapshottableCoordinator(
LogContext logContext,
SnapshotRegistry snapshotRegistry,
S coordinator,
diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
index 9f8ab68c66f..11cdab83cac 100644
--- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
+++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
@@ -91,7 +91,8 @@ class CoordinatorLoaderImplTest {
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
- 1000
+ 1000,
+ CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) {
assertFutureThrows(NotLeaderOrFollowerException.class, loader.load(tp, coordinator));
}
@@ -110,7 +111,8 @@ class CoordinatorLoaderImplTest {
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
- 1000
+ 1000,
+ CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) {
loader.close();
assertFutureThrows(RuntimeException.class, loader.load(tp, coordinator));
@@ -131,7 +133,8 @@ class CoordinatorLoaderImplTest {
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
- 1000
+ 1000,
+ CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) {
when(log.logStartOffset()).thenReturn(0L);
when(log.highWatermark()).thenReturn(0L);
@@ -217,7 +220,8 @@ class CoordinatorLoaderImplTest {
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
- 1000
+ 1000,
+ CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) {
when(log.logStartOffset()).thenReturn(0L);
@@ -262,7 +266,8 @@ class CoordinatorLoaderImplTest {
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
- 1000
+ 1000,
+ CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) {
when(log.logStartOffset()).thenReturn(0L);
@@ -298,7 +303,8 @@ class CoordinatorLoaderImplTest {
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
- 1000
+ 1000,
+ CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) {
when(log.logStartOffset()).thenReturn(0L);
@@ -337,7 +343,8 @@ class CoordinatorLoaderImplTest {
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
- 1000
+ 1000,
+ CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) {
when(log.logStartOffset()).thenReturn(0L);
@@ -365,7 +372,8 @@ class CoordinatorLoaderImplTest {
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
- 1000
+ 1000,
+ CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) {
long startTimeMs = time.milliseconds();
when(log.logStartOffset()).thenReturn(0L);
@@ -412,7 +420,8 @@ class CoordinatorLoaderImplTest {
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
- 1000
+ 1000,
+ CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) {
when(log.logStartOffset()).thenReturn(0L);
when(log.highWatermark()).thenReturn(0L, 0L, 2L);
@@ -475,7 +484,8 @@ class CoordinatorLoaderImplTest {
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
- 1000
+ 1000,
+ CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) {
when(log.logStartOffset()).thenReturn(0L);
when(log.highWatermark()).thenReturn(0L);
@@ -501,7 +511,8 @@ class CoordinatorLoaderImplTest {
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
- 1000
+ 1000,
+ CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) {
when(log.logStartOffset()).thenReturn(0L);
when(log.highWatermark()).thenReturn(5L, 7L, 7L);
@@ -551,6 +562,79 @@ class CoordinatorLoaderImplTest {
}
}
+ @Test
+ void testUpdateLastWrittenOffsetCommitInterval() throws Exception {
+ TopicPartition tp = new TopicPartition("foo", 0);
+ UnifiedLog log = mock(UnifiedLog.class);
+ Function> partitionLogSupplier = partition -> Optional.of(log);
+ Function> partitionLogEndOffsetSupplier = partition -> Optional.of(7L);
+ StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
+ CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
+
+ try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>(
+ Time.SYSTEM,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ 1000,
+ 2L
+ )) {
+ when(log.logStartOffset()).thenReturn(0L);
+ when(log.highWatermark()).thenReturn(7L);
+
+ FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
+ new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+ new SimpleRecord("k2".getBytes(), "v2".getBytes())
+ ));
+
+ when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
+ .thenReturn(readResult1);
+
+ FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
+ new SimpleRecord("k3".getBytes(), "v3".getBytes()),
+ new SimpleRecord("k4".getBytes(), "v4".getBytes()),
+ new SimpleRecord("k5".getBytes(), "v5".getBytes())
+ ));
+
+ when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
+ .thenReturn(readResult2);
+
+ FetchDataInfo readResult3 = logReadResult(5, Arrays.asList(
+ new SimpleRecord("k6".getBytes(), "v6".getBytes())
+ ));
+
+ when(log.read(5L, 1000, FetchIsolation.LOG_END, true))
+ .thenReturn(readResult3);
+
+ FetchDataInfo readResult4 = logReadResult(6, Arrays.asList(
+ new SimpleRecord("k7".getBytes(), "v7".getBytes())
+ ));
+
+ when(log.read(6L, 1000, FetchIsolation.LOG_END, true))
+ .thenReturn(readResult4);
+
+ assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
+
+ verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k1", "v1"));
+ verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k2", "v2"));
+ verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k3", "v3"));
+ verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k4", "v4"));
+ verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k5", "v5"));
+ verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k6", "v6"));
+ verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k7", "v7"));
+ verify(coordinator, times(0)).updateLastWrittenOffset(0L);
+ verify(coordinator, times(1)).updateLastWrittenOffset(2L);
+ verify(coordinator, times(1)).updateLastWrittenOffset(5L);
+ verify(coordinator, times(0)).updateLastWrittenOffset(6L);
+ verify(coordinator, times(1)).updateLastWrittenOffset(7L);
+ verify(coordinator, times(0)).updateLastCommittedOffset(0L);
+ verify(coordinator, times(1)).updateLastCommittedOffset(2L);
+ verify(coordinator, times(1)).updateLastCommittedOffset(5L);
+ verify(coordinator, times(0)).updateLastCommittedOffset(6L);
+ verify(coordinator, times(1)).updateLastCommittedOffset(7L);
+ }
+ }
+
@Test
void testPartitionGoesOfflineDuringLoad() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
@@ -565,7 +649,8 @@ class CoordinatorLoaderImplTest {
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
- 1000
+ 1000,
+ CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) {
when(log.logStartOffset()).thenReturn(0L);
when(log.highWatermark()).thenReturn(0L);
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 3ded033020b..47085169979 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -613,7 +613,8 @@ class BrokerServer(
tp => replicaManager.getLog(tp).toJava,
tp => replicaManager.getLogEndOffset(tp).map(Long.box).toJava,
serde,
- config.groupCoordinatorConfig.offsetsLoadBufferSize
+ config.groupCoordinatorConfig.offsetsLoadBufferSize,
+ CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)
val writer = new CoordinatorPartitionWriter(
replicaManager
@@ -644,7 +645,8 @@ class BrokerServer(
tp => replicaManager.getLog(tp).toJava,
tp => replicaManager.getLogEndOffset(tp).map(Long.box).toJava,
serde,
- config.shareCoordinatorConfig.shareCoordinatorLoadBufferSize()
+ config.shareCoordinatorConfig.shareCoordinatorLoadBufferSize(),
+ CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)
val writer = new CoordinatorPartitionWriter(
replicaManager
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java
new file mode 100644
index 00000000000..0a6612ca8fd
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.coordinator;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorLoaderImpl;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.MockCoordinatorExecutor;
+import org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer;
+import org.apache.kafka.coordinator.common.runtime.SnapshottableCoordinator;
+import org.apache.kafka.coordinator.group.GroupConfigManager;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
+import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde;
+import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
+import org.apache.kafka.coordinator.group.OffsetAndMetadata;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
+import org.apache.kafka.server.storage.log.FetchIsolation;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+import org.apache.kafka.storage.internals.log.UnifiedLog;
+import org.apache.kafka.timeline.SnapshotRegistry;
+
+import com.yammer.metrics.core.MetricsRegistry;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.BenchmarkParams;
+import org.openjdk.jmh.infra.IterationParams;
+import org.openjdk.jmh.runner.IterationType;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 1)
+@Measurement(iterations = 1)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class GroupCoordinatorShardLoadingBenchmark {
+
+ private static final String GROUP_ID = "test-group";
+
+ @Param({"1", "4", "16", "64", "256", "1024", "4096", "16384", "65536", "262144", "1048576"})
+ private long commitInterval;
+
+ @Param({"8192"})
+ private int batchCount;
+
+ @Param({"2048"})
+ private int batchSize;
+
+ private TopicPartition topicPartition;
+ private MockTime time;
+ private GroupCoordinatorConfig config;
+ private GroupCoordinatorRecordSerde serde;
+ private GroupCoordinatorShard coordinatorShard;
+ private SnapshottableCoordinator snapshottableCoordinator;
+ private UnifiedLog offsetCommitLog;
+ private UnifiedLog transactionalOffsetCommitLog;
+
+ static class OffsetCommitLog extends MockLog {
+ private final int batchCount;
+ private final SimpleRecord[] batch;
+
+ public OffsetCommitLog(TopicPartition tp, int batchSize, int batchCount) throws IOException {
+ super(tp);
+
+ this.batchCount = batchCount;
+
+ List batchRecords = new ArrayList<>();
+ for (int i = 0; i < batchSize; i++) {
+ String topic = "topic-" + i;
+ int partition = 0;
+
+ OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(
+ 0L,
+ OptionalInt.of(0),
+ OffsetAndMetadata.NO_METADATA,
+ 0L,
+ OptionalLong.empty(),
+ Uuid.randomUuid()
+ );
+
+ CoordinatorRecord coordinatorRecord = GroupCoordinatorRecordHelpers.newOffsetCommitRecord(
+ GROUP_ID, topic, partition, offsetAndMetadata
+ );
+
+ byte[] keyBytes = new GroupCoordinatorRecordSerde().serializeKey(coordinatorRecord);
+ byte[] valueBytes = new GroupCoordinatorRecordSerde().serializeValue(coordinatorRecord);
+ SimpleRecord simpleRecord = new SimpleRecord(keyBytes, valueBytes);
+ batchRecords.add(simpleRecord);
+ }
+
+ this.batch = batchRecords.toArray(new SimpleRecord[0]);
+ }
+
+ @Override
+ public long logStartOffset() {
+ return 0L;
+ }
+
+ @Override
+ public long logEndOffset() {
+ if (batch == null) {
+ return 0L;
+ }
+
+ return (long) batchCount * (long) batch.length;
+ }
+
+ @Override
+ public FetchDataInfo read(long startOffset, int maxLength, FetchIsolation isolation, boolean minOneMessage) {
+ if (startOffset < 0 || startOffset >= logEndOffset()) {
+ return new FetchDataInfo(new LogOffsetMetadata(startOffset), MemoryRecords.EMPTY);
+ }
+
+ MemoryRecords records = MemoryRecords.withRecords(
+ startOffset,
+ Compression.NONE,
+ batch
+ );
+ return new FetchDataInfo(new LogOffsetMetadata(startOffset), records);
+ }
+ }
+
+ static class TransactionalOffsetCommitLog extends MockLog {
+ private final int batchCount;
+ private final SimpleRecord[] batch;
+ private final long producerId;
+ private final short producerEpoch;
+ private final int coordinatorEpoch;
+
+ public TransactionalOffsetCommitLog(TopicPartition tp, int batchSize, int batchCount) throws IOException {
+ super(tp);
+
+ this.batchCount = batchCount;
+ this.producerId = 1000L;
+ this.producerEpoch = 0;
+ this.coordinatorEpoch = 100;
+
+ List batchRecords = new ArrayList<>();
+ for (int i = 0; i < batchSize - 1; i++) {
+ String topic = "topic-" + i;
+ int partition = 0;
+
+ OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(
+ 0L,
+ OptionalInt.of(0),
+ OffsetAndMetadata.NO_METADATA,
+ 0L,
+ OptionalLong.empty(),
+ Uuid.randomUuid()
+ );
+
+ CoordinatorRecord coordinatorRecord = GroupCoordinatorRecordHelpers.newOffsetCommitRecord(
+ GROUP_ID, topic, partition, offsetAndMetadata
+ );
+
+ byte[] keyBytes = new GroupCoordinatorRecordSerde().serializeKey(coordinatorRecord);
+ byte[] valueBytes = new GroupCoordinatorRecordSerde().serializeValue(coordinatorRecord);
+ SimpleRecord simpleRecord = new SimpleRecord(keyBytes, valueBytes);
+ batchRecords.add(simpleRecord);
+ }
+
+ this.batch = batchRecords.toArray(new SimpleRecord[0]);
+ }
+
+ @Override
+ public long logStartOffset() {
+ return 0L;
+ }
+
+ @Override
+ public long logEndOffset() {
+ if (batch == null) {
+ return 0L;
+ }
+
+ return (long) (batch.length + 1) * (long) batchCount;
+ }
+
+ @Override
+ public FetchDataInfo read(long startOffset, int maxLength, FetchIsolation isolation, boolean minOneMessage) {
+ if (startOffset < 0 || startOffset >= logEndOffset()) {
+ return new FetchDataInfo(new LogOffsetMetadata(startOffset), MemoryRecords.EMPTY);
+ }
+
+ // Repeat the batch followed by a commit marker.
+ long patternLength = batch.length + 1;
+ if (startOffset % patternLength < batch.length) {
+ MemoryRecords records = MemoryRecords.withTransactionalRecords(
+ startOffset,
+ Compression.NONE,
+ producerId,
+ producerEpoch,
+ 0,
+ 0,
+ batch
+ );
+ return new FetchDataInfo(new LogOffsetMetadata(startOffset), records);
+ } else {
+ MemoryRecords records = MemoryRecords.withEndTransactionMarker(
+ startOffset,
+ 0L,
+ 0,
+ producerId,
+ producerEpoch,
+ new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch)
+ );
+ return new FetchDataInfo(new LogOffsetMetadata(startOffset), records);
+ }
+ }
+ }
+
+ @Setup(Level.Trial)
+ public void setup() throws Exception {
+ topicPartition = new TopicPartition("__consumer_offsets", 0);
+ time = new MockTime();
+ Map props = new HashMap<>();
+ config = GroupCoordinatorConfig.fromProps(props);
+ serde = new GroupCoordinatorRecordSerde();
+ }
+
+ @Setup(Level.Iteration)
+ public void setupIteration(BenchmarkParams benchmarkParams, IterationParams iterationParams) throws IOException {
+ // Reduce the data size for warmup iterations, since transactional offset commit loading
+ // takes longer than 20 seconds.
+ int iterationBatchCount = batchCount;
+ if (iterationParams.getType() == IterationType.WARMUP) {
+ iterationBatchCount = Math.min(iterationBatchCount, 1024);
+ }
+
+ offsetCommitLog = new OffsetCommitLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0), batchSize, iterationBatchCount);
+ transactionalOffsetCommitLog = new TransactionalOffsetCommitLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0), batchSize, iterationBatchCount);
+ }
+
+ @Setup(Level.Invocation)
+ public void setupInvocation() {
+ GroupConfigManager configManager = new GroupConfigManager(new HashMap<>());
+ LogContext logContext = new LogContext();
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+
+ MetricsRegistry metricsRegistry = new MetricsRegistry();
+ Metrics metrics = new Metrics();
+ GroupCoordinatorMetrics coordinatorMetrics = new GroupCoordinatorMetrics(metricsRegistry, metrics);
+
+ coordinatorShard = new GroupCoordinatorShard.Builder(config, configManager)
+ .withAuthorizerPlugin(Optional.empty())
+ .withLogContext(logContext)
+ .withSnapshotRegistry(snapshotRegistry)
+ .withTime(time)
+ .withTimer(new MockCoordinatorTimer<>(time))
+ .withExecutor(new MockCoordinatorExecutor<>())
+ .withCoordinatorMetrics(coordinatorMetrics)
+ .withTopicPartition(topicPartition)
+ .build();
+
+ snapshottableCoordinator = new SnapshottableCoordinator<>(
+ logContext,
+ snapshotRegistry,
+ coordinatorShard,
+ topicPartition
+ );
+ }
+
+ private CoordinatorLoader.LoadSummary loadRecords(UnifiedLog log) throws ExecutionException, InterruptedException {
+ Function> partitionLogSupplier = tp -> Optional.of(log);
+ Function> partitionLogEndOffsetSupplier = tp -> Optional.of(log.logEndOffset());
+
+ CoordinatorLoaderImpl loader = new CoordinatorLoaderImpl<>(
+ time,
+ partitionLogSupplier,
+ partitionLogEndOffsetSupplier,
+ serde,
+ config.offsetsLoadBufferSize(),
+ commitInterval
+ );
+
+ return loader.load(topicPartition, snapshottableCoordinator).get();
+ }
+
+ @Benchmark
+ @Threads(1)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ public CoordinatorLoader.LoadSummary loadOffsetCommitRecords() throws ExecutionException, InterruptedException {
+ return loadRecords(offsetCommitLog);
+ }
+
+ @Benchmark
+ @Threads(1)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ public CoordinatorLoader.LoadSummary loadTransactionalOffsetCommitRecords() throws ExecutionException, InterruptedException {
+ return loadRecords(transactionalOffsetCommitLog);
+ }
+
+ public static void main(String[] args) throws RunnerException {
+ Options opt = new OptionsBuilder()
+ .include(GroupCoordinatorShardLoadingBenchmark.class.getSimpleName())
+ .forks(1)
+ .build();
+
+ new Runner(opt).run();
+ }
+}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/MockLog.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/MockLog.java
new file mode 100644
index 00000000000..a7de83d5bcb
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/MockLog.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.coordinator;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.server.storage.log.FetchIsolation;
+import org.apache.kafka.server.util.Scheduler;
+import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.LocalLog;
+import org.apache.kafka.storage.internals.log.LogOffsetsListener;
+import org.apache.kafka.storage.internals.log.LogSegments;
+import org.apache.kafka.storage.internals.log.ProducerStateManager;
+import org.apache.kafka.storage.internals.log.UnifiedLog;
+import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public abstract class MockLog extends UnifiedLog {
+
+ public MockLog(TopicPartition tp) throws IOException {
+ super(
+ 0,
+ createMockLocalLog(tp),
+ mock(BrokerTopicStats.class),
+ Integer.MAX_VALUE,
+ mock(LeaderEpochFileCache.class),
+ mock(ProducerStateManager.class),
+ Optional.empty(),
+ false,
+ LogOffsetsListener.NO_OP_OFFSETS_LISTENER
+ );
+ }
+
+ @Override
+ public abstract long logStartOffset();
+
+ @Override
+ public abstract long logEndOffset();
+
+ @Override
+ public long highWatermark() {
+ return logEndOffset();
+ }
+
+ @Override
+ public abstract FetchDataInfo read(long startOffset, int maxLength, FetchIsolation isolation, boolean minOneMessage);
+
+ private static LocalLog createMockLocalLog(TopicPartition tp) {
+ LocalLog localLog = mock(LocalLog.class);
+ when(localLog.scheduler()).thenReturn(mock(Scheduler.class));
+ when(localLog.segments()).thenReturn(mock(LogSegments.class));
+ when(localLog.topicPartition()).thenReturn(tp);
+ return localLog;
+ }
+}