KAFKA-19160;KAFKA-19164; Improve performance of fetching stable offsets (#19497)
CI / build (push) Waiting to run Details

When fetching stable offsets in the group coordinator, we iterate over
all requested partitions. For each partition, we iterate over the
group's ongoing transactions to check if there is a pending
transactional offset commit for that partition.

This can get slow when there are a large number of partitions and a
large number of pending transactions. Instead, maintain a list of
pending transactions per partition to speed up lookups.

Reviewers: Shaan, Dongnuo Lyu <dlyu@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, David Jaco <djacot@confluent.io>
This commit is contained in:
Sean Quah 2025-05-12 08:32:17 +01:00 committed by David Jacot
parent 1f856d437d
commit cf3c177936
2 changed files with 355 additions and 65 deletions

View File

@ -65,6 +65,7 @@ import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
@ -91,37 +92,37 @@ public class OffsetMetadataManager {
private GroupCoordinatorConfig config = null;
private GroupCoordinatorMetricsShard metrics = null;
Builder withLogContext(LogContext logContext) {
public Builder withLogContext(LogContext logContext) {
this.logContext = logContext;
return this;
}
Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
public Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
return this;
}
Builder withTime(Time time) {
public Builder withTime(Time time) {
this.time = time;
return this;
}
Builder withGroupMetadataManager(GroupMetadataManager groupMetadataManager) {
public Builder withGroupMetadataManager(GroupMetadataManager groupMetadataManager) {
this.groupMetadataManager = groupMetadataManager;
return this;
}
Builder withGroupCoordinatorConfig(GroupCoordinatorConfig config) {
public Builder withGroupCoordinatorConfig(GroupCoordinatorConfig config) {
this.config = config;
return this;
}
Builder withMetadataImage(MetadataImage metadataImage) {
public Builder withMetadataImage(MetadataImage metadataImage) {
this.metadataImage = metadataImage;
return this;
}
Builder withGroupCoordinatorMetricsShard(GroupCoordinatorMetricsShard metrics) {
public Builder withGroupCoordinatorMetricsShard(GroupCoordinatorMetricsShard metrics) {
this.metrics = metrics;
return this;
}
@ -196,9 +197,167 @@ public class OffsetMetadataManager {
private final TimelineHashMap<Long, Offsets> pendingTransactionalOffsets;
/**
* The open transactions (producer ids) keyed by group.
* The open transactions (producer ids) by group id, topic name and partition id.
*/
private final TimelineHashMap<String, TimelineHashSet<Long>> openTransactionsByGroup;
private final OpenTransactions openTransactions;
/**
* Tracks open transactions (producer ids) by group id, topic name and partition id.
* It is the responsiblity of the caller to update {@link #pendingTransactionalOffsets}.
*/
private class OpenTransactions {
/**
* The open transactions (producer ids) keyed by group id, topic name and partition id.
* Tracks whether partitions have any pending transactional offsets that have not been deleted.
*
* Values in each level of the map will never be empty collections.
*/
private final TimelineHashMap<String, TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>>> openTransactionsByGroup;
private OpenTransactions() {
this.openTransactionsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
}
/**
* Adds a producer id to the open transactions for the given group and topic partition.
*
* @param groupId The group id.
* @param topic The topic name.
* @param partition The partition.
* @param producerId The producer id.
* @return {@code true} if the partition did not already have a pending offset from the producer id.
*/
private boolean add(String groupId, String topic, int partition, long producerId) {
return openTransactionsByGroup
.computeIfAbsent(groupId, __ -> new TimelineHashMap<>(snapshotRegistry, 1))
.computeIfAbsent(topic, __ -> new TimelineHashMap<>(snapshotRegistry, 1))
.computeIfAbsent(partition, __ -> new TimelineHashSet<>(snapshotRegistry, 1))
.add(producerId);
}
/**
* Clears all open transactions for the given group and topic partition.
*
* @param groupId The group id.
* @param topic The topic name.
* @param partition The partition.
*/
private void clear(String groupId, String topic, int partition) {
TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic =
openTransactionsByGroup.get(groupId);
if (openTransactionsByTopic == null) return;
TimelineHashMap<Integer, TimelineHashSet<Long>> openTransactionsByPartition = openTransactionsByTopic.get(topic);
if (openTransactionsByPartition == null) return;
openTransactionsByPartition.remove(partition);
if (openTransactionsByPartition.isEmpty()) {
openTransactionsByTopic.remove(topic);
if (openTransactionsByTopic.isEmpty()) {
openTransactionsByGroup.remove(groupId);
}
}
}
/**
* Returns {@code true} if the given group has any pending transactional offsets.
*
* @param groupId The group id.
* @return {@code true} if the given group has any pending transactional offsets.
*/
private boolean contains(String groupId) {
return openTransactionsByGroup.containsKey(groupId);
}
/**
* Returns {@code true} if the given group has any pending transactional offsets for the given topic and partition.
*
* @param groupId The group id.
* @param topic The topic name.
* @param partition The partition.
* @return {@code true} if the given group has any pending transactional offsets for the given topic and partition.
*/
private boolean contains(String groupId, String topic, int partition) {
TimelineHashSet<Long> openTransactions = get(groupId, topic, partition);
return openTransactions != null;
}
/**
* Performs the given action for each partition with a pending transactional offset for the given group.
*
* @param groupId The group id.
* @param action The action to be performed for each partition with a pending transactional offset.
*/
private void forEachTopicPartition(String groupId, BiConsumer<String, Integer> action) {
TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic =
openTransactionsByGroup.get(groupId);
if (openTransactionsByTopic == null) return;
openTransactionsByTopic.forEach((topic, openTransactionsByPartition) -> {
openTransactionsByPartition.forEach((partition, producerIds) -> {
action.accept(topic, partition);
});
});
}
/**
* Performs the given action for each producer id with a pending transactional offset for the given group and topic partition.
*
* @param groupId The group id.
* @param topic The topic name.
* @param partition The partition.
* @param action The action to be performed for each producer id with a pending transactional offset.
*/
private void forEach(String groupId, String topic, int partition, Consumer<Long> action) {
TimelineHashSet<Long> openTransactions = get(groupId, topic, partition);
if (openTransactions == null) return;
openTransactions.forEach(action);
}
/**
* Gets the set of producer ids with pending transactional offsets for the given group and topic partition.
*
* @param groupId The group id.
* @param topic The topic name.
* @param partition The partition.
* @return The set of producer ids with pending transactional offsets for the given group and topic partition.
*/
private TimelineHashSet<Long> get(String groupId, String topic, int partition) {
TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic =
openTransactionsByGroup.get(groupId);
if (openTransactionsByTopic == null) return null;
TimelineHashMap<Integer, TimelineHashSet<Long>> openTransactionsByPartition = openTransactionsByTopic.get(topic);
if (openTransactionsByPartition == null) return null;
return openTransactionsByPartition.get(partition);
}
/**
* Removes a producer id from the open transactions for the given group and topic partition.
*
* @param groupId The group id.
* @param topic The topic name.
* @param partition The partition.
* @param producerId The producer id.
* @return {@code true} if the group and topic partition had a pending transactional offset from the producer id.
*/
private boolean remove(String groupId, String topic, int partition, long producerId) {
TimelineHashSet<Long> openTransactions = get(groupId, topic, partition);
if (openTransactions == null) return false;
boolean removed = openTransactions.remove(producerId);
if (openTransactions.isEmpty()) {
// Re-use the clean up in clear.
clear(groupId, topic, partition);
}
return removed;
}
}
private class Offsets {
/**
@ -283,7 +442,7 @@ public class OffsetMetadataManager {
this.metrics = metrics;
this.offsets = new Offsets();
this.pendingTransactionalOffsets = new TimelineHashMap<>(snapshotRegistry, 0);
this.openTransactionsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
this.openTransactions = new OpenTransactions();
}
/**
@ -653,26 +812,12 @@ public class OffsetMetadataManager {
// Delete all the pending transactional offsets too. Here we only write a tombstone
// if the topic-partition was not in the main storage because we don't need to write
// two consecutive tombstones.
TimelineHashSet<Long> openTransactions = openTransactionsByGroup.get(groupId);
if (openTransactions != null) {
openTransactions.forEach(producerId -> {
Offsets pendingOffsets = pendingTransactionalOffsets.get(producerId);
if (pendingOffsets != null) {
TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> pendingGroupOffsets =
pendingOffsets.offsetsByGroup.get(groupId);
if (pendingGroupOffsets != null) {
pendingGroupOffsets.forEach((topic, offsetsByPartition) -> {
offsetsByPartition.keySet().forEach(partition -> {
if (!hasCommittedOffset(groupId, topic, partition)) {
records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition));
numDeletedOffsets.getAndIncrement();
}
});
});
}
}
});
}
openTransactions.forEachTopicPartition(groupId, (topic, partition) -> {
if (!hasCommittedOffset(groupId, topic, partition)) {
records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition));
numDeletedOffsets.getAndIncrement();
}
});
return numDeletedOffsets.get();
}
@ -688,17 +833,7 @@ public class OffsetMetadataManager {
String topic,
int partition
) {
final TimelineHashSet<Long> openTransactions = openTransactionsByGroup.get(groupId);
if (openTransactions == null) return false;
for (Long producerId : openTransactions) {
Offsets offsets = pendingTransactionalOffsets.get(producerId);
if (offsets != null && offsets.get(groupId, topic, partition) != null) {
return true;
}
}
return false;
return openTransactions.contains(groupId, topic, partition);
}
/**
@ -739,6 +874,11 @@ public class OffsetMetadataManager {
final List<OffsetFetchResponseData.OffsetFetchResponseTopics> topicResponses = new ArrayList<>(request.topics().size());
final TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> groupOffsets =
failAllPartitions ? null : offsets.offsetsByGroup.get(request.groupId(), lastCommittedOffset);
// We inline the lookups from hasPendingTransactionalOffsets here, to avoid repeating string
// comparisons of group ids and topic names for every partition. They're only used when the
// client has requested stable offsets.
final TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic =
requireStable ? openTransactions.openTransactionsByGroup.get(request.groupId(), lastCommittedOffset) : null;
request.topics().forEach(topic -> {
final OffsetFetchResponseData.OffsetFetchResponseTopics topicResponse =
@ -747,12 +887,16 @@ public class OffsetMetadataManager {
final TimelineHashMap<Integer, OffsetAndMetadata> topicOffsets = groupOffsets == null ?
null : groupOffsets.get(topic.name(), lastCommittedOffset);
final TimelineHashMap<Integer, TimelineHashSet<Long>> openTransactionsByPartition =
(requireStable && openTransactionsByTopic != null) ? openTransactionsByTopic.get(topic.name(), lastCommittedOffset) : null;
topic.partitionIndexes().forEach(partitionIndex -> {
final OffsetAndMetadata offsetAndMetadata = topicOffsets == null ?
null : topicOffsets.get(partitionIndex, lastCommittedOffset);
if (requireStable && hasPendingTransactionalOffsets(request.groupId(), topic.name(), partitionIndex)) {
if (requireStable &&
openTransactionsByPartition != null &&
openTransactionsByPartition.containsKey(partitionIndex, lastCommittedOffset)) {
topicResponse.partitions().add(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(partitionIndex)
.setErrorCode(Errors.UNSTABLE_OFFSET_COMMIT.code())
@ -805,11 +949,18 @@ public class OffsetMetadataManager {
final List<OffsetFetchResponseData.OffsetFetchResponseTopics> topicResponses = new ArrayList<>();
final TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> groupOffsets =
offsets.offsetsByGroup.get(request.groupId(), lastCommittedOffset);
// We inline the lookups from hasPendingTransactionalOffsets here, to avoid repeating string
// comparisons of group ids and topic names for every partition. They're only used when the
// client has requested stable offsets.
final TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic =
requireStable ? openTransactions.openTransactionsByGroup.get(request.groupId(), lastCommittedOffset) : null;
if (groupOffsets != null) {
groupOffsets.entrySet(lastCommittedOffset).forEach(topicEntry -> {
final String topic = topicEntry.getKey();
final TimelineHashMap<Integer, OffsetAndMetadata> topicOffsets = topicEntry.getValue();
final TimelineHashMap<Integer, TimelineHashSet<Long>> openTransactionsByPartition =
(requireStable && openTransactionsByTopic != null) ? openTransactionsByTopic.get(topic, lastCommittedOffset) : null;
final OffsetFetchResponseData.OffsetFetchResponseTopics topicResponse =
new OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic);
@ -819,7 +970,9 @@ public class OffsetMetadataManager {
final int partition = partitionEntry.getKey();
final OffsetAndMetadata offsetAndMetadata = partitionEntry.getValue();
if (requireStable && hasPendingTransactionalOffsets(request.groupId(), topic, partition)) {
if (requireStable &&
openTransactionsByPartition != null &&
openTransactionsByPartition.containsKey(partition, lastCommittedOffset)) {
topicResponse.partitions().add(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(partition)
.setErrorCode(Errors.UNSTABLE_OFFSET_COMMIT.code())
@ -888,8 +1041,8 @@ public class OffsetMetadataManager {
});
metrics.record(OFFSET_EXPIRED_SENSOR_NAME, expiredPartitions.size());
// We don't want to remove the group if there are ongoing transactions.
return allOffsetsExpired.get() && !openTransactionsByGroup.containsKey(groupId);
// We don't want to remove the group if there are ongoing transactions with undeleted offsets.
return allOffsetsExpired.get() && !openTransactions.contains(groupId);
}
/**
@ -1010,9 +1163,7 @@ public class OffsetMetadataManager {
partition,
OffsetAndMetadata.fromRecord(recordOffset, value)
);
openTransactionsByGroup
.computeIfAbsent(groupId, __ -> new TimelineHashSet<>(snapshotRegistry, 1))
.add(producerId);
openTransactions.add(groupId, topic, partition, producerId);
}
} else {
if (offsets.remove(groupId, topic, partition) != null) {
@ -1020,15 +1171,13 @@ public class OffsetMetadataManager {
}
// Remove all the pending offset commits related to the tombstone.
TimelineHashSet<Long> openTransactions = openTransactionsByGroup.get(groupId);
if (openTransactions != null) {
openTransactions.forEach(openProducerId -> {
Offsets pendingOffsets = pendingTransactionalOffsets.get(openProducerId);
if (pendingOffsets != null) {
pendingOffsets.remove(groupId, topic, partition);
}
});
}
openTransactions.forEach(groupId, topic, partition, openProducerId -> {
Offsets pendingOffsets = pendingTransactionalOffsets.get(openProducerId);
if (pendingOffsets != null) {
pendingOffsets.remove(groupId, topic, partition);
}
});
openTransactions.clear(groupId, topic, partition);
}
}
@ -1039,6 +1188,7 @@ public class OffsetMetadataManager {
* @param result The result of the transaction.
* @throws RuntimeException if the transaction can not be completed.
*/
@SuppressWarnings("NPathComplexity")
public void replayEndTransactionMarker(
long producerId,
TransactionResult result
@ -1051,14 +1201,12 @@ public class OffsetMetadataManager {
return;
}
pendingOffsets.offsetsByGroup.keySet().forEach(groupId -> {
TimelineHashSet<Long> openTransactions = openTransactionsByGroup.get(groupId);
if (openTransactions != null) {
openTransactions.remove(producerId);
if (openTransactions.isEmpty()) {
openTransactionsByGroup.remove(groupId);
}
}
pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {
topicOffsets.forEach((topic, partitionOffsets) -> {
partitionOffsets.keySet().forEach(partitionId -> {
openTransactions.remove(groupId, topic, partitionId, producerId);
});
});
});
if (result == TransactionResult.COMMIT) {

View File

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.coordinator;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.coordinator.group.GroupMetadataManager;
import org.apache.kafka.coordinator.group.OffsetMetadataManager;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class TransactionalOffsetFetchBenchmark {
private static final Time TIME = Time.SYSTEM;
@Param({"4000"})
private int partitionCount;
@Param({"4000"})
private int transactionCount;
private static final String GROUP_ID = "my-group-id";
private static final String TOPIC_NAME = "my-topic-name";
private OffsetMetadataManager offsetMetadataManager;
/** A list of partition indexes from 0 to partitionCount - 1. */
private List<Integer> partitionIndexes;
@Setup(Level.Trial)
public void setup() {
LogContext logContext = new LogContext();
MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
delta.replay(new TopicRecord()
.setTopicId(Uuid.randomUuid())
.setName(TOPIC_NAME));
MetadataImage image = delta.apply(MetadataProvenance.EMPTY);
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
Group group = mock(Group.class);
when(groupMetadataManager.group(anyString(), anyLong())).thenReturn(group);
offsetMetadataManager = new OffsetMetadataManager.Builder()
.withLogContext(logContext)
.withSnapshotRegistry(snapshotRegistry)
.withTime(TIME)
.withGroupMetadataManager(groupMetadataManager)
.withGroupCoordinatorConfig(mock(GroupCoordinatorConfig.class))
.withMetadataImage(image)
.withGroupCoordinatorMetricsShard(mock(GroupCoordinatorMetricsShard.class))
.build();
for (int i = 0; i < transactionCount; i++) {
snapshotRegistry.idempotentCreateSnapshot(i);
offsetMetadataManager.replay(
i,
3193600 + i,
new OffsetCommitKey()
.setGroup(GROUP_ID)
.setTopic(TOPIC_NAME)
.setPartition(i),
new OffsetCommitValue()
.setOffset(100)
);
}
partitionIndexes = new ArrayList<>();
for (int i = 0; i < partitionCount; i++) {
partitionIndexes.add(i);
}
}
@Benchmark
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void run() {
offsetMetadataManager.fetchOffsets(
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId(GROUP_ID)
.setTopics(List.of(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName(TOPIC_NAME)
.setPartitionIndexes(partitionIndexes)
)),
Long.MAX_VALUE
);
}
}