KAFKA-18878: Added share session cache and delayed share fetch metrics (KIP-1103) (#19059)

The PR implements the ShareSessionCache and DelayedShareFetchMetrics as
defined in KIP-1103.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Apoorv Mittal 2025-03-03 16:44:34 +00:00 committed by GitHub
parent ff94c44e70
commit a6c53d0c37
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 162 additions and 29 deletions

View File

@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
@ -36,6 +37,8 @@ import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
import com.yammer.metrics.core.Meter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -45,6 +48,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@ -61,6 +65,8 @@ public class DelayedShareFetch extends DelayedOperation {
private static final Logger log = LoggerFactory.getLogger(DelayedShareFetch.class);
private static final String EXPIRES_PER_SEC = "ExpiresPerSec";
private final ShareFetch shareFetch;
private final ReplicaManager replicaManager;
private final BiConsumer<SharePartitionKey, Throwable> exceptionHandler;
@ -70,6 +76,10 @@ public class DelayedShareFetch extends DelayedOperation {
// The topic partitions that need to be completed for the share fetch request are given by sharePartitions.
// sharePartitions is a subset of shareFetchData. The order of insertion/deletion of entries in sharePartitions is important.
private final LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions;
/**
* Metric for the rate of expired delayed fetch requests.
*/
private final Meter expiredRequestMeter;
// Tracks the start time to acquire any share partition for a fetch request.
private long acquireStartTimeMs;
private LinkedHashMap<TopicIdPartition, Long> partitionsAcquired;
@ -124,10 +134,14 @@ public class DelayedShareFetch extends DelayedOperation {
this.shareGroupMetrics = shareGroupMetrics;
this.time = time;
this.acquireStartTimeMs = time.hiResClockMs();
// Register metrics for DelayedShareFetch.
KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", "DelayedShareFetchMetrics");
this.expiredRequestMeter = metricsGroup.newMeter(EXPIRES_PER_SEC, "requests", TimeUnit.SECONDS);
}
@Override
public void onExpiration() {
expiredRequestMeter.mark();
}
/**
@ -514,4 +528,9 @@ public class DelayedShareFetch extends DelayedOperation {
Lock lock() {
return lock;
}
// Visible for testing.
Meter expiredRequestMeter() {
return expiredRequestMeter;
}
}

View File

@ -154,6 +154,7 @@ public class DelayedShareFetchTest {
// Metrics shall not be recorded as no partition is acquired.
assertNull(shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId));
assertNull(shareGroupMetrics.topicPartitionsFetchRatio(groupId));
assertEquals(0, delayedShareFetch.expiredRequestMeter().count());
delayedShareFetch.lock().unlock();
}
@ -1118,6 +1119,23 @@ public class DelayedShareFetchTest {
true);
}
@Test
public void testOnCompleteExecutionOnTimeout() {
ShareFetch shareFetch = new ShareFetch(
FETCH_PARAMS, "grp", Uuid.randomUuid().toString(),
new CompletableFuture<>(), new LinkedHashMap<>(), BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
.build();
assertFalse(delayedShareFetch.isCompleted());
assertFalse(shareFetch.isCompleted());
// Call run to execute onComplete and onExpiration.
delayedShareFetch.run();
assertTrue(shareFetch.isCompleted());
assertEquals(1, delayedShareFetch.expiredRequestMeter().count());
}
static void mockTopicIdPartitionToReturnDataEqualToMinBytes(ReplicaManager replicaManager, TopicIdPartition topicIdPartition, int minBytes) {
LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, minBytes);
LogOffsetSnapshot endOffsetSnapshot = new LogOffsetSnapshot(1, mock(LogOffsetMetadata.class),

View File

@ -48,7 +48,6 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.metrics.SharePartitionMetrics;
@ -67,8 +66,6 @@ import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
import org.apache.kafka.test.TestUtils;
import com.yammer.metrics.core.Gauge;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -91,6 +88,7 @@ import java.util.concurrent.TimeUnit;
import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.yammerMetricValue;
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -6736,19 +6734,6 @@ public class SharePartitionTest {
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
}
private Number yammerMetricValue(String name) {
try {
Gauge gauge = (Gauge) KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
.filter(e -> e.getKey().getMBeanName().contains(name))
.findFirst()
.orElseThrow()
.getValue();
return (Number) gauge.value();
} catch (Exception e) {
return 0;
}
}
private static class SharePartitionBuilder {
private int defaultAcquisitionLockTimeoutMs = 30000;

View File

@ -20,11 +20,15 @@ package org.apache.kafka.server.share.session;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.requests.ShareRequestMetadata;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.share.CachedSharePartition;
import com.yammer.metrics.core.Meter;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
/**
* Caches share sessions.
@ -37,6 +41,17 @@ import java.util.TreeMap;
* must never be acquired while an individual ShareSession lock is already held.
*/
public class ShareSessionCache {
// Visible for testing.
static final String SHARE_SESSIONS_COUNT = "ShareSessionsCount";
// Visible for testing.
static final String SHARE_PARTITIONS_COUNT = "SharePartitionsCount";
private static final String SHARE_SESSION_EVICTIONS_PER_SEC = "ShareSessionEvictionsPerSec";
/**
* Metric for the rate of eviction of share sessions.
*/
private final Meter evictionsMeter;
private final int maxEntries;
private final long evictionMs;
private long numPartitions = 0;
@ -47,9 +62,15 @@ public class ShareSessionCache {
// Maps last used times to sessions.
private final TreeMap<LastUsedKey, ShareSession> lastUsed = new TreeMap<>();
@SuppressWarnings("this-escape")
public ShareSessionCache(int maxEntries, long evictionMs) {
this.maxEntries = maxEntries;
this.evictionMs = evictionMs;
// Register metrics for ShareSessionCache.
KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", "ShareSessionCache");
metricsGroup.newGauge(SHARE_SESSIONS_COUNT, this::size);
metricsGroup.newGauge(SHARE_PARTITIONS_COUNT, this::totalPartitions);
this.evictionsMeter = metricsGroup.newMeter(SHARE_SESSION_EVICTIONS_PER_SEC, "evictions", TimeUnit.SECONDS);
}
/**
@ -136,6 +157,7 @@ public class ShareSessionCache {
} else if (now - lastUsedEntry.getKey().lastUsedMs() > evictionMs) {
ShareSession session = lastUsedEntry.getValue();
remove(session);
evictionsMeter.mark();
return true;
}
return false;
@ -159,4 +181,9 @@ public class ShareSessionCache {
}
return null;
}
// Visible for testing.
Meter evictionsMeter() {
return evictionsMeter;
}
}

View File

@ -23,8 +23,11 @@ import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.test.TestUtils;
import com.yammer.metrics.core.Gauge;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
@ -145,4 +148,32 @@ public class ShareFetchTestUtils {
List.of(acquiredRecords), (int) (acquiredRecords.lastOffset() - acquiredRecords.firstOffset() + 1)
);
}
/**
* Fetch the gauge value from the yammer metrics.
*
* @param name The name of the metric.
* @return The gauge value as a number.
*/
public static Number yammerMetricValue(String name) {
try {
Gauge gauge = (Gauge) KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
.filter(e -> e.getKey().getMBeanName().contains(name))
.findFirst()
.orElseThrow()
.getValue();
return (Number) gauge.value();
} catch (Exception e) {
return 0;
}
}
/**
* Clear all the yammer metrics.
*/
public static void clearYammerMetrics() {
KafkaYammerMetrics.defaultRegistry().allMetrics().keySet().forEach(
metricName -> KafkaYammerMetrics.defaultRegistry().removeMetric(metricName)
);
}
}

View File

@ -19,23 +19,31 @@ package org.apache.kafka.server.share.session;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.server.share.CachedSharePartition;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.clearYammerMetrics;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.yammerMetricValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ShareSessionCacheTest {
@BeforeEach
public void setUp() {
clearYammerMetrics();
}
@Test
public void testShareSessionCache() {
public void testShareSessionCache() throws InterruptedException {
ShareSessionCache cache = new ShareSessionCache(3, 100);
assertEquals(0, cache.size());
ShareSessionKey key1 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 0, mockedSharePartitionMap(10));
@ -43,56 +51,101 @@ public class ShareSessionCacheTest {
ShareSessionKey key3 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 20, mockedSharePartitionMap(30));
assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 30, mockedSharePartitionMap(40)));
assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 40, mockedSharePartitionMap(5)));
assertShareCacheContains(cache, new ArrayList<>(Arrays.asList(key1, key2, key3)));
assertShareCacheContains(cache, List.of(key1, key2, key3));
TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 3,
"Share session count should be 3.");
TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 60,
"Share partition count should be 60.");
assertEquals(0, cache.evictionsMeter().count());
// Touch the sessions to update the last used time, so that the key-2 can be evicted.
cache.touch(cache.get(key1), 200);
ShareSessionKey key4 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 210, mockedSharePartitionMap(11));
assertShareCacheContains(cache, new ArrayList<>(Arrays.asList(key1, key3, key4)));
assertShareCacheContains(cache, List.of(key1, key3, key4));
TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 3,
"Share session count should be 3.");
TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 51,
"Share partition count should be 51.");
assertEquals(1, cache.evictionsMeter().count());
assertTrue(cache.evictionsMeter().meanRate() > 0);
cache.touch(cache.get(key1), 400);
cache.touch(cache.get(key3), 390);
cache.touch(cache.get(key4), 400);
ShareSessionKey key5 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 410, mockedSharePartitionMap(50));
assertNull(key5);
// No key should be evicted as all the sessions are touched to latest time.
assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 410, mockedSharePartitionMap(50)));
}
@Test
public void testResizeCachedSessions() {
public void testResizeCachedSessions() throws InterruptedException {
ShareSessionCache cache = new ShareSessionCache(2, 100);
assertEquals(0, cache.size());
assertEquals(0, cache.totalPartitions());
ShareSessionKey key1 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 0, mockedSharePartitionMap(2));
assertNotNull(key1);
assertShareCacheContains(cache, new ArrayList<>(List.of(key1)));
assertShareCacheContains(cache, List.of(key1));
ShareSession session1 = cache.get(key1);
assertEquals(2, session1.size());
assertEquals(2, cache.totalPartitions());
assertEquals(1, cache.size());
TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 1,
"Share session count should be 1.");
TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 2,
"Share partition count should be 2.");
assertEquals(0, cache.evictionsMeter().count());
ShareSessionKey key2 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 0, mockedSharePartitionMap(4));
assertNotNull(key2);
assertShareCacheContains(cache, new ArrayList<>(Arrays.asList(key1, key2)));
assertShareCacheContains(cache, List.of(key1, key2));
ShareSession session2 = cache.get(key2);
assertEquals(6, cache.totalPartitions());
assertEquals(2, cache.size());
cache.touch(session1, 200);
cache.touch(session2, 200);
TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 2,
"Share session count should be 2.");
TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 6,
"Share partition count should be 6.");
assertEquals(0, cache.evictionsMeter().count());
ShareSessionKey key3 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 200, mockedSharePartitionMap(5));
assertNull(key3);
assertShareCacheContains(cache, new ArrayList<>(Arrays.asList(key1, key2)));
assertShareCacheContains(cache, List.of(key1, key2));
assertEquals(6, cache.totalPartitions());
assertEquals(2, cache.size());
cache.remove(key1);
assertShareCacheContains(cache, new ArrayList<>(List.of(key2)));
assertShareCacheContains(cache, List.of(key2));
assertEquals(1, cache.size());
assertEquals(4, cache.totalPartitions());
TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 1,
"Share session count should be 1.");
TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 4,
"Share partition count should be 4.");
assertEquals(0, cache.evictionsMeter().count());
Iterator<CachedSharePartition> iterator = session2.partitionMap().iterator();
iterator.next();
iterator.remove();
// Session size should get updated as it's backed by the partition map.
assertEquals(3, session2.size());
// Cached size should not get updated as it shall update on touch.
assertEquals(4, session2.cachedSize());
assertEquals(4, cache.totalPartitions());
// Touch the session to update the changes in cache and session's cached size.
cache.touch(session2, session2.lastUsedMs());
assertEquals(3, session2.cachedSize());
assertEquals(3, cache.totalPartitions());
TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 1,
"Share session count should be 1.");
TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 3,
"Share partition count should be 3.");
assertEquals(0, cache.evictionsMeter().count());
}
private ImplicitLinkedHashCollection<CachedSharePartition> mockedSharePartitionMap(int size) {
@ -104,7 +157,7 @@ public class ShareSessionCacheTest {
}
private void assertShareCacheContains(ShareSessionCache cache,
ArrayList<ShareSessionKey> sessionKeys) {
List<ShareSessionKey> sessionKeys) {
int i = 0;
assertEquals(sessionKeys.size(), cache.size());
for (ShareSessionKey sessionKey : sessionKeys) {