KAFKA-19716: Clear out coordinator snapshots periodically while loading (#20547)

When nested Timeline collections are created and discarded while loading
a coordinator partition, references to them accumulate in the current
snapshot. Allow the GC to reclaim them by starting a new snapshot and
discarding previous snapshots every 16,384 records.

Small intervals degrade loading times for non-transactional offset
commit workloads while large intervals degrade loading times for
transactional workloads. A default of 16,384 was chosen as a compromise.

Also add a benchmark for group coordinator loading.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Sean Quah 2025-09-19 08:44:07 +01:00 committed by GitHub
parent b72db2b2c7
commit d067c6c040
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 563 additions and 27 deletions

View File

@ -3350,6 +3350,7 @@ project(':jmh-benchmarks') {
implementation project(':raft')
implementation project(':clients')
implementation project(':coordinator-common')
implementation project(':coordinator-common').sourceSets.test.output
implementation project(':group-coordinator')
implementation project(':group-coordinator:group-coordinator-api')
implementation project(':metadata')

View File

@ -52,9 +52,7 @@
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.apache.kafka.storage"/>
<allow pkg="org.apache.kafka.clients"/>
<allow class="org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage"/>
<allow class="org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage"/>
<allow class="org.apache.kafka.coordinator.common.runtime.HdrHistogram"/>
<allow pkg="org.apache.kafka.coordinator.common.runtime"/>
<allow pkg="org.apache.kafka.coordinator.group"/>
<allow pkg="org.apache.kafka.image"/>
<allow pkg="org.apache.kafka.metadata"/>

View File

@ -50,6 +50,20 @@ import java.util.function.Function;
*/
public class CoordinatorLoaderImpl<T> implements CoordinatorLoader<T> {
/**
* The interval between updating the last committed offset during loading, in offsets. Smaller
* values commit more often at the expense of loading times when the workload is simple and does
* not create collections that need to participate in {@link CoordinatorPlayback} snapshotting.
* Larger values commit less often and allow more temporary data to accumulate before the next
* commit when the workload creates many temporary collections that need to be snapshotted.
*
* The value of 16,384 was chosen as a trade-off between the performance of these two workloads.
*
* When changing this value, please run the GroupCoordinatorShardLoadingBenchmark to evaluate
* the relative change in performance.
*/
public static final long DEFAULT_COMMIT_INTERVAL_OFFSETS = 16384;
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLoaderImpl.class);
private final Time time;
@ -57,6 +71,7 @@ public class CoordinatorLoaderImpl<T> implements CoordinatorLoader<T> {
private final Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier;
private final Deserializer<T> deserializer;
private final int loadBufferSize;
private final long commitIntervalOffsets;
private final AtomicBoolean isRunning = new AtomicBoolean(true);
private final KafkaScheduler scheduler = new KafkaScheduler(1);
@ -66,13 +81,15 @@ public class CoordinatorLoaderImpl<T> implements CoordinatorLoader<T> {
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier,
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier,
Deserializer<T> deserializer,
int loadBufferSize
int loadBufferSize,
long commitIntervalOffsets
) {
this.time = time;
this.partitionLogSupplier = partitionLogSupplier;
this.partitionLogEndOffsetSupplier = partitionLogEndOffsetSupplier;
this.deserializer = deserializer;
this.loadBufferSize = loadBufferSize;
this.commitIntervalOffsets = commitIntervalOffsets;
this.scheduler.startup();
}
@ -121,7 +138,7 @@ public class CoordinatorLoaderImpl<T> implements CoordinatorLoader<T> {
long currentOffset = log.logStartOffset();
LoadStats stats = new LoadStats();
long previousHighWatermark = -1L;
long lastCommittedOffset = -1L;
while (shouldFetchNextBatch(currentOffset, logEndOffset(tp), stats.readAtLeastOneRecord)) {
FetchDataInfo fetchDataInfo = log.read(currentOffset, loadBufferSize, FetchIsolation.LOG_END, true);
@ -133,9 +150,9 @@ public class CoordinatorLoaderImpl<T> implements CoordinatorLoader<T> {
buffer = memoryRecords.buffer();
}
ReplayResult replayResult = processMemoryRecords(tp, log, memoryRecords, coordinator, stats, currentOffset, previousHighWatermark);
ReplayResult replayResult = processMemoryRecords(tp, log, memoryRecords, coordinator, stats, currentOffset, lastCommittedOffset);
currentOffset = replayResult.nextOffset;
previousHighWatermark = replayResult.highWatermark;
lastCommittedOffset = replayResult.lastCommittedOffset;
}
long endTimeMs = time.milliseconds();
@ -207,7 +224,7 @@ public class CoordinatorLoaderImpl<T> implements CoordinatorLoader<T> {
CoordinatorPlayback<T> coordinator,
LoadStats loadStats,
long currentOffset,
long previousHighWatermark
long lastCommittedOffset
) {
for (MutableRecordBatch batch : memoryRecords.batches()) {
if (batch.isControlBatch()) {
@ -286,14 +303,18 @@ public class CoordinatorLoaderImpl<T> implements CoordinatorLoader<T> {
if (currentOffset >= currentHighWatermark) {
coordinator.updateLastWrittenOffset(currentOffset);
if (currentHighWatermark > previousHighWatermark) {
if (currentHighWatermark > lastCommittedOffset) {
coordinator.updateLastCommittedOffset(currentHighWatermark);
previousHighWatermark = currentHighWatermark;
lastCommittedOffset = currentHighWatermark;
}
} else if (currentOffset - lastCommittedOffset >= commitIntervalOffsets) {
coordinator.updateLastWrittenOffset(currentOffset);
coordinator.updateLastCommittedOffset(currentOffset);
lastCommittedOffset = currentOffset;
}
}
loadStats.numBytes += memoryRecords.sizeInBytes();
return new ReplayResult(currentOffset, previousHighWatermark);
return new ReplayResult(currentOffset, lastCommittedOffset);
}
/**
@ -326,5 +347,5 @@ public class CoordinatorLoaderImpl<T> implements CoordinatorLoader<T> {
}
}
private record ReplayResult(long nextOffset, long highWatermark) { }
private record ReplayResult(long nextOffset, long lastCommittedOffset) { }
}

View File

@ -61,7 +61,7 @@ public class SnapshottableCoordinator<S extends CoordinatorShard<U>, U> implemen
*/
private long lastCommittedOffset;
SnapshottableCoordinator(
public SnapshottableCoordinator(
LogContext logContext,
SnapshotRegistry snapshotRegistry,
S coordinator,

View File

@ -91,7 +91,8 @@ class CoordinatorLoaderImplTest {
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000
1000,
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) {
assertFutureThrows(NotLeaderOrFollowerException.class, loader.load(tp, coordinator));
}
@ -110,7 +111,8 @@ class CoordinatorLoaderImplTest {
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000
1000,
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) {
loader.close();
assertFutureThrows(RuntimeException.class, loader.load(tp, coordinator));
@ -131,7 +133,8 @@ class CoordinatorLoaderImplTest {
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000
1000,
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) {
when(log.logStartOffset()).thenReturn(0L);
when(log.highWatermark()).thenReturn(0L);
@ -217,7 +220,8 @@ class CoordinatorLoaderImplTest {
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000
1000,
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) {
when(log.logStartOffset()).thenReturn(0L);
@ -262,7 +266,8 @@ class CoordinatorLoaderImplTest {
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000
1000,
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) {
when(log.logStartOffset()).thenReturn(0L);
@ -298,7 +303,8 @@ class CoordinatorLoaderImplTest {
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000
1000,
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) {
when(log.logStartOffset()).thenReturn(0L);
@ -337,7 +343,8 @@ class CoordinatorLoaderImplTest {
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000
1000,
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) {
when(log.logStartOffset()).thenReturn(0L);
@ -365,7 +372,8 @@ class CoordinatorLoaderImplTest {
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000
1000,
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) {
long startTimeMs = time.milliseconds();
when(log.logStartOffset()).thenReturn(0L);
@ -412,7 +420,8 @@ class CoordinatorLoaderImplTest {
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000
1000,
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) {
when(log.logStartOffset()).thenReturn(0L);
when(log.highWatermark()).thenReturn(0L, 0L, 2L);
@ -475,7 +484,8 @@ class CoordinatorLoaderImplTest {
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000
1000,
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) {
when(log.logStartOffset()).thenReturn(0L);
when(log.highWatermark()).thenReturn(0L);
@ -501,7 +511,8 @@ class CoordinatorLoaderImplTest {
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000
1000,
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) {
when(log.logStartOffset()).thenReturn(0L);
when(log.highWatermark()).thenReturn(5L, 7L, 7L);
@ -551,6 +562,79 @@ class CoordinatorLoaderImplTest {
}
}
@Test
void testUpdateLastWrittenOffsetCommitInterval() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
UnifiedLog log = mock(UnifiedLog.class);
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log);
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(7L);
StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
Time.SYSTEM,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000,
2L
)) {
when(log.logStartOffset()).thenReturn(0L);
when(log.highWatermark()).thenReturn(7L);
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
new SimpleRecord("k2".getBytes(), "v2".getBytes())
));
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
.thenReturn(readResult1);
FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
new SimpleRecord("k3".getBytes(), "v3".getBytes()),
new SimpleRecord("k4".getBytes(), "v4".getBytes()),
new SimpleRecord("k5".getBytes(), "v5".getBytes())
));
when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
.thenReturn(readResult2);
FetchDataInfo readResult3 = logReadResult(5, Arrays.asList(
new SimpleRecord("k6".getBytes(), "v6".getBytes())
));
when(log.read(5L, 1000, FetchIsolation.LOG_END, true))
.thenReturn(readResult3);
FetchDataInfo readResult4 = logReadResult(6, Arrays.asList(
new SimpleRecord("k7".getBytes(), "v7".getBytes())
));
when(log.read(6L, 1000, FetchIsolation.LOG_END, true))
.thenReturn(readResult4);
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k1", "v1"));
verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k2", "v2"));
verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k3", "v3"));
verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k4", "v4"));
verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k5", "v5"));
verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k6", "v6"));
verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k7", "v7"));
verify(coordinator, times(0)).updateLastWrittenOffset(0L);
verify(coordinator, times(1)).updateLastWrittenOffset(2L);
verify(coordinator, times(1)).updateLastWrittenOffset(5L);
verify(coordinator, times(0)).updateLastWrittenOffset(6L);
verify(coordinator, times(1)).updateLastWrittenOffset(7L);
verify(coordinator, times(0)).updateLastCommittedOffset(0L);
verify(coordinator, times(1)).updateLastCommittedOffset(2L);
verify(coordinator, times(1)).updateLastCommittedOffset(5L);
verify(coordinator, times(0)).updateLastCommittedOffset(6L);
verify(coordinator, times(1)).updateLastCommittedOffset(7L);
}
}
@Test
void testPartitionGoesOfflineDuringLoad() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
@ -565,7 +649,8 @@ class CoordinatorLoaderImplTest {
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000
1000,
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) {
when(log.logStartOffset()).thenReturn(0L);
when(log.highWatermark()).thenReturn(0L);

View File

@ -613,7 +613,8 @@ class BrokerServer(
tp => replicaManager.getLog(tp).toJava,
tp => replicaManager.getLogEndOffset(tp).map(Long.box).toJava,
serde,
config.groupCoordinatorConfig.offsetsLoadBufferSize
config.groupCoordinatorConfig.offsetsLoadBufferSize,
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)
val writer = new CoordinatorPartitionWriter(
replicaManager
@ -644,7 +645,8 @@ class BrokerServer(
tp => replicaManager.getLog(tp).toJava,
tp => replicaManager.getLogEndOffset(tp).map(Long.box).toJava,
serde,
config.shareCoordinatorConfig.shareCoordinatorLoadBufferSize()
config.shareCoordinatorConfig.shareCoordinatorLoadBufferSize(),
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)
val writer = new CoordinatorPartitionWriter(
replicaManager

View File

@ -0,0 +1,355 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.coordinator;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoaderImpl;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.MockCoordinatorExecutor;
import org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer;
import org.apache.kafka.coordinator.common.runtime.SnapshottableCoordinator;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde;
import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
import org.apache.kafka.coordinator.group.OffsetAndMetadata;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.UnifiedLog;
import org.apache.kafka.timeline.SnapshotRegistry;
import com.yammer.metrics.core.MetricsRegistry;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.BenchmarkParams;
import org.openjdk.jmh.infra.IterationParams;
import org.openjdk.jmh.runner.IterationType;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 1)
@Measurement(iterations = 1)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class GroupCoordinatorShardLoadingBenchmark {
private static final String GROUP_ID = "test-group";
@Param({"1", "4", "16", "64", "256", "1024", "4096", "16384", "65536", "262144", "1048576"})
private long commitInterval;
@Param({"8192"})
private int batchCount;
@Param({"2048"})
private int batchSize;
private TopicPartition topicPartition;
private MockTime time;
private GroupCoordinatorConfig config;
private GroupCoordinatorRecordSerde serde;
private GroupCoordinatorShard coordinatorShard;
private SnapshottableCoordinator<GroupCoordinatorShard, CoordinatorRecord> snapshottableCoordinator;
private UnifiedLog offsetCommitLog;
private UnifiedLog transactionalOffsetCommitLog;
static class OffsetCommitLog extends MockLog {
private final int batchCount;
private final SimpleRecord[] batch;
public OffsetCommitLog(TopicPartition tp, int batchSize, int batchCount) throws IOException {
super(tp);
this.batchCount = batchCount;
List<SimpleRecord> batchRecords = new ArrayList<>();
for (int i = 0; i < batchSize; i++) {
String topic = "topic-" + i;
int partition = 0;
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(
0L,
OptionalInt.of(0),
OffsetAndMetadata.NO_METADATA,
0L,
OptionalLong.empty(),
Uuid.randomUuid()
);
CoordinatorRecord coordinatorRecord = GroupCoordinatorRecordHelpers.newOffsetCommitRecord(
GROUP_ID, topic, partition, offsetAndMetadata
);
byte[] keyBytes = new GroupCoordinatorRecordSerde().serializeKey(coordinatorRecord);
byte[] valueBytes = new GroupCoordinatorRecordSerde().serializeValue(coordinatorRecord);
SimpleRecord simpleRecord = new SimpleRecord(keyBytes, valueBytes);
batchRecords.add(simpleRecord);
}
this.batch = batchRecords.toArray(new SimpleRecord[0]);
}
@Override
public long logStartOffset() {
return 0L;
}
@Override
public long logEndOffset() {
if (batch == null) {
return 0L;
}
return (long) batchCount * (long) batch.length;
}
@Override
public FetchDataInfo read(long startOffset, int maxLength, FetchIsolation isolation, boolean minOneMessage) {
if (startOffset < 0 || startOffset >= logEndOffset()) {
return new FetchDataInfo(new LogOffsetMetadata(startOffset), MemoryRecords.EMPTY);
}
MemoryRecords records = MemoryRecords.withRecords(
startOffset,
Compression.NONE,
batch
);
return new FetchDataInfo(new LogOffsetMetadata(startOffset), records);
}
}
static class TransactionalOffsetCommitLog extends MockLog {
private final int batchCount;
private final SimpleRecord[] batch;
private final long producerId;
private final short producerEpoch;
private final int coordinatorEpoch;
public TransactionalOffsetCommitLog(TopicPartition tp, int batchSize, int batchCount) throws IOException {
super(tp);
this.batchCount = batchCount;
this.producerId = 1000L;
this.producerEpoch = 0;
this.coordinatorEpoch = 100;
List<SimpleRecord> batchRecords = new ArrayList<>();
for (int i = 0; i < batchSize - 1; i++) {
String topic = "topic-" + i;
int partition = 0;
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(
0L,
OptionalInt.of(0),
OffsetAndMetadata.NO_METADATA,
0L,
OptionalLong.empty(),
Uuid.randomUuid()
);
CoordinatorRecord coordinatorRecord = GroupCoordinatorRecordHelpers.newOffsetCommitRecord(
GROUP_ID, topic, partition, offsetAndMetadata
);
byte[] keyBytes = new GroupCoordinatorRecordSerde().serializeKey(coordinatorRecord);
byte[] valueBytes = new GroupCoordinatorRecordSerde().serializeValue(coordinatorRecord);
SimpleRecord simpleRecord = new SimpleRecord(keyBytes, valueBytes);
batchRecords.add(simpleRecord);
}
this.batch = batchRecords.toArray(new SimpleRecord[0]);
}
@Override
public long logStartOffset() {
return 0L;
}
@Override
public long logEndOffset() {
if (batch == null) {
return 0L;
}
return (long) (batch.length + 1) * (long) batchCount;
}
@Override
public FetchDataInfo read(long startOffset, int maxLength, FetchIsolation isolation, boolean minOneMessage) {
if (startOffset < 0 || startOffset >= logEndOffset()) {
return new FetchDataInfo(new LogOffsetMetadata(startOffset), MemoryRecords.EMPTY);
}
// Repeat the batch followed by a commit marker.
long patternLength = batch.length + 1;
if (startOffset % patternLength < batch.length) {
MemoryRecords records = MemoryRecords.withTransactionalRecords(
startOffset,
Compression.NONE,
producerId,
producerEpoch,
0,
0,
batch
);
return new FetchDataInfo(new LogOffsetMetadata(startOffset), records);
} else {
MemoryRecords records = MemoryRecords.withEndTransactionMarker(
startOffset,
0L,
0,
producerId,
producerEpoch,
new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch)
);
return new FetchDataInfo(new LogOffsetMetadata(startOffset), records);
}
}
}
@Setup(Level.Trial)
public void setup() throws Exception {
topicPartition = new TopicPartition("__consumer_offsets", 0);
time = new MockTime();
Map<String, Object> props = new HashMap<>();
config = GroupCoordinatorConfig.fromProps(props);
serde = new GroupCoordinatorRecordSerde();
}
@Setup(Level.Iteration)
public void setupIteration(BenchmarkParams benchmarkParams, IterationParams iterationParams) throws IOException {
// Reduce the data size for warmup iterations, since transactional offset commit loading
// takes longer than 20 seconds.
int iterationBatchCount = batchCount;
if (iterationParams.getType() == IterationType.WARMUP) {
iterationBatchCount = Math.min(iterationBatchCount, 1024);
}
offsetCommitLog = new OffsetCommitLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0), batchSize, iterationBatchCount);
transactionalOffsetCommitLog = new TransactionalOffsetCommitLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0), batchSize, iterationBatchCount);
}
@Setup(Level.Invocation)
public void setupInvocation() {
GroupConfigManager configManager = new GroupConfigManager(new HashMap<>());
LogContext logContext = new LogContext();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
MetricsRegistry metricsRegistry = new MetricsRegistry();
Metrics metrics = new Metrics();
GroupCoordinatorMetrics coordinatorMetrics = new GroupCoordinatorMetrics(metricsRegistry, metrics);
coordinatorShard = new GroupCoordinatorShard.Builder(config, configManager)
.withAuthorizerPlugin(Optional.empty())
.withLogContext(logContext)
.withSnapshotRegistry(snapshotRegistry)
.withTime(time)
.withTimer(new MockCoordinatorTimer<>(time))
.withExecutor(new MockCoordinatorExecutor<>())
.withCoordinatorMetrics(coordinatorMetrics)
.withTopicPartition(topicPartition)
.build();
snapshottableCoordinator = new SnapshottableCoordinator<>(
logContext,
snapshotRegistry,
coordinatorShard,
topicPartition
);
}
private CoordinatorLoader.LoadSummary loadRecords(UnifiedLog log) throws ExecutionException, InterruptedException {
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = tp -> Optional.of(log);
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = tp -> Optional.of(log.logEndOffset());
CoordinatorLoaderImpl<CoordinatorRecord> loader = new CoordinatorLoaderImpl<>(
time,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
config.offsetsLoadBufferSize(),
commitInterval
);
return loader.load(topicPartition, snapshottableCoordinator).get();
}
@Benchmark
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public CoordinatorLoader.LoadSummary loadOffsetCommitRecords() throws ExecutionException, InterruptedException {
return loadRecords(offsetCommitLog);
}
@Benchmark
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public CoordinatorLoader.LoadSummary loadTransactionalOffsetCommitRecords() throws ExecutionException, InterruptedException {
return loadRecords(transactionalOffsetCommitLog);
}
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(GroupCoordinatorShardLoadingBenchmark.class.getSimpleName())
.forks(1)
.build();
new Runner(opt).run();
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.coordinator;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LocalLog;
import org.apache.kafka.storage.internals.log.LogOffsetsListener;
import org.apache.kafka.storage.internals.log.LogSegments;
import org.apache.kafka.storage.internals.log.ProducerStateManager;
import org.apache.kafka.storage.internals.log.UnifiedLog;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import java.io.IOException;
import java.util.Optional;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public abstract class MockLog extends UnifiedLog {
public MockLog(TopicPartition tp) throws IOException {
super(
0,
createMockLocalLog(tp),
mock(BrokerTopicStats.class),
Integer.MAX_VALUE,
mock(LeaderEpochFileCache.class),
mock(ProducerStateManager.class),
Optional.empty(),
false,
LogOffsetsListener.NO_OP_OFFSETS_LISTENER
);
}
@Override
public abstract long logStartOffset();
@Override
public abstract long logEndOffset();
@Override
public long highWatermark() {
return logEndOffset();
}
@Override
public abstract FetchDataInfo read(long startOffset, int maxLength, FetchIsolation isolation, boolean minOneMessage);
private static LocalLog createMockLocalLog(TopicPartition tp) {
LocalLog localLog = mock(LocalLog.class);
when(localLog.scheduler()).thenReturn(mock(Scheduler.class));
when(localLog.segments()).thenReturn(mock(LogSegments.class));
when(localLog.topicPartition()).thenReturn(tp);
return localLog;
}
}