diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index fb5c05a4c75..80399f4e05c 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.metrics.KafkaMetricsGroup; import org.apache.kafka.server.purgatory.DelayedOperation; import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey; @@ -36,6 +37,8 @@ import org.apache.kafka.server.storage.log.FetchIsolation; import org.apache.kafka.storage.internals.log.LogOffsetMetadata; import org.apache.kafka.storage.internals.log.LogOffsetSnapshot; +import com.yammer.metrics.core.Meter; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +48,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -61,6 +65,8 @@ public class DelayedShareFetch extends DelayedOperation { private static final Logger log = LoggerFactory.getLogger(DelayedShareFetch.class); + private static final String EXPIRES_PER_SEC = "ExpiresPerSec"; + private final ShareFetch shareFetch; private final ReplicaManager replicaManager; private final BiConsumer exceptionHandler; @@ -70,6 +76,10 @@ public class DelayedShareFetch extends DelayedOperation { // The topic partitions that need to be completed for the share fetch request are given by sharePartitions. // sharePartitions is a subset of shareFetchData. The order of insertion/deletion of entries in sharePartitions is important. private final LinkedHashMap sharePartitions; + /** + * Metric for the rate of expired delayed fetch requests. + */ + private final Meter expiredRequestMeter; // Tracks the start time to acquire any share partition for a fetch request. private long acquireStartTimeMs; private LinkedHashMap partitionsAcquired; @@ -124,10 +134,14 @@ public class DelayedShareFetch extends DelayedOperation { this.shareGroupMetrics = shareGroupMetrics; this.time = time; this.acquireStartTimeMs = time.hiResClockMs(); + // Register metrics for DelayedShareFetch. + KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", "DelayedShareFetchMetrics"); + this.expiredRequestMeter = metricsGroup.newMeter(EXPIRES_PER_SEC, "requests", TimeUnit.SECONDS); } @Override public void onExpiration() { + expiredRequestMeter.mark(); } /** @@ -514,4 +528,9 @@ public class DelayedShareFetch extends DelayedOperation { Lock lock() { return lock; } + + // Visible for testing. + Meter expiredRequestMeter() { + return expiredRequestMeter; + } } diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 7eb6584bed7..8669474a80c 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -154,6 +154,7 @@ public class DelayedShareFetchTest { // Metrics shall not be recorded as no partition is acquired. assertNull(shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId)); assertNull(shareGroupMetrics.topicPartitionsFetchRatio(groupId)); + assertEquals(0, delayedShareFetch.expiredRequestMeter().count()); delayedShareFetch.lock().unlock(); } @@ -1118,6 +1119,23 @@ public class DelayedShareFetchTest { true); } + @Test + public void testOnCompleteExecutionOnTimeout() { + ShareFetch shareFetch = new ShareFetch( + FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + new CompletableFuture<>(), new LinkedHashMap<>(), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .build(); + assertFalse(delayedShareFetch.isCompleted()); + assertFalse(shareFetch.isCompleted()); + // Call run to execute onComplete and onExpiration. + delayedShareFetch.run(); + assertTrue(shareFetch.isCompleted()); + assertEquals(1, delayedShareFetch.expiredRequestMeter().count()); + } + static void mockTopicIdPartitionToReturnDataEqualToMinBytes(ReplicaManager replicaManager, TopicIdPartition topicIdPartition, int minBytes) { LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, minBytes); LogOffsetSnapshot endOffsetSnapshot = new LogOffsetSnapshot(1, mock(LogOffsetMetadata.class), diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index a2c6e7ed30b..7f2ec081885 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -48,7 +48,6 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.coordinator.group.GroupConfig; import org.apache.kafka.coordinator.group.GroupConfigManager; import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy; -import org.apache.kafka.server.metrics.KafkaYammerMetrics; import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch; import org.apache.kafka.server.share.fetch.ShareAcquiredRecords; import org.apache.kafka.server.share.metrics.SharePartitionMetrics; @@ -67,8 +66,6 @@ import org.apache.kafka.server.util.timer.Timer; import org.apache.kafka.storage.internals.log.OffsetResultHolder; import org.apache.kafka.test.TestUtils; -import com.yammer.metrics.core.Gauge; - import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -91,6 +88,7 @@ import java.util.concurrent.TimeUnit; import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID; import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder; +import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.yammerMetricValue; import static org.apache.kafka.test.TestUtils.assertFutureThrows; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -6736,19 +6734,6 @@ public class SharePartitionTest { Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); } - private Number yammerMetricValue(String name) { - try { - Gauge gauge = (Gauge) KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream() - .filter(e -> e.getKey().getMBeanName().contains(name)) - .findFirst() - .orElseThrow() - .getValue(); - return (Number) gauge.value(); - } catch (Exception e) { - return 0; - } - } - private static class SharePartitionBuilder { private int defaultAcquisitionLockTimeoutMs = 30000; diff --git a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java index 28df334d0ac..dc870fc9c25 100644 --- a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java +++ b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java @@ -20,11 +20,15 @@ package org.apache.kafka.server.share.session; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.requests.ShareRequestMetadata; import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; +import org.apache.kafka.server.metrics.KafkaMetricsGroup; import org.apache.kafka.server.share.CachedSharePartition; +import com.yammer.metrics.core.Meter; + import java.util.HashMap; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.TimeUnit; /** * Caches share sessions. @@ -37,6 +41,17 @@ import java.util.TreeMap; * must never be acquired while an individual ShareSession lock is already held. */ public class ShareSessionCache { + // Visible for testing. + static final String SHARE_SESSIONS_COUNT = "ShareSessionsCount"; + // Visible for testing. + static final String SHARE_PARTITIONS_COUNT = "SharePartitionsCount"; + private static final String SHARE_SESSION_EVICTIONS_PER_SEC = "ShareSessionEvictionsPerSec"; + + /** + * Metric for the rate of eviction of share sessions. + */ + private final Meter evictionsMeter; + private final int maxEntries; private final long evictionMs; private long numPartitions = 0; @@ -47,9 +62,15 @@ public class ShareSessionCache { // Maps last used times to sessions. private final TreeMap lastUsed = new TreeMap<>(); + @SuppressWarnings("this-escape") public ShareSessionCache(int maxEntries, long evictionMs) { this.maxEntries = maxEntries; this.evictionMs = evictionMs; + // Register metrics for ShareSessionCache. + KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", "ShareSessionCache"); + metricsGroup.newGauge(SHARE_SESSIONS_COUNT, this::size); + metricsGroup.newGauge(SHARE_PARTITIONS_COUNT, this::totalPartitions); + this.evictionsMeter = metricsGroup.newMeter(SHARE_SESSION_EVICTIONS_PER_SEC, "evictions", TimeUnit.SECONDS); } /** @@ -136,6 +157,7 @@ public class ShareSessionCache { } else if (now - lastUsedEntry.getKey().lastUsedMs() > evictionMs) { ShareSession session = lastUsedEntry.getValue(); remove(session); + evictionsMeter.mark(); return true; } return false; @@ -159,4 +181,9 @@ public class ShareSessionCache { } return null; } + + // Visible for testing. + Meter evictionsMeter() { + return evictionsMeter; + } } diff --git a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java index ef3aae2eee7..db7f15ef4c3 100644 --- a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java +++ b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java @@ -23,8 +23,11 @@ import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.server.metrics.KafkaYammerMetrics; import org.apache.kafka.test.TestUtils; +import com.yammer.metrics.core.Gauge; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.LinkedHashMap; @@ -145,4 +148,32 @@ public class ShareFetchTestUtils { List.of(acquiredRecords), (int) (acquiredRecords.lastOffset() - acquiredRecords.firstOffset() + 1) ); } + + /** + * Fetch the gauge value from the yammer metrics. + * + * @param name The name of the metric. + * @return The gauge value as a number. + */ + public static Number yammerMetricValue(String name) { + try { + Gauge gauge = (Gauge) KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream() + .filter(e -> e.getKey().getMBeanName().contains(name)) + .findFirst() + .orElseThrow() + .getValue(); + return (Number) gauge.value(); + } catch (Exception e) { + return 0; + } + } + + /** + * Clear all the yammer metrics. + */ + public static void clearYammerMetrics() { + KafkaYammerMetrics.defaultRegistry().allMetrics().keySet().forEach( + metricName -> KafkaYammerMetrics.defaultRegistry().removeMetric(metricName) + ); + } } diff --git a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java index d5d42c5a3ec..4de1ffa4975 100644 --- a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java +++ b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java @@ -19,23 +19,31 @@ package org.apache.kafka.server.share.session; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; import org.apache.kafka.server.share.CachedSharePartition; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Iterator; import java.util.List; +import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.clearYammerMetrics; +import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.yammerMetricValue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; public class ShareSessionCacheTest { + @BeforeEach + public void setUp() { + clearYammerMetrics(); + } + @Test - public void testShareSessionCache() { + public void testShareSessionCache() throws InterruptedException { ShareSessionCache cache = new ShareSessionCache(3, 100); assertEquals(0, cache.size()); ShareSessionKey key1 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 0, mockedSharePartitionMap(10)); @@ -43,56 +51,101 @@ public class ShareSessionCacheTest { ShareSessionKey key3 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 20, mockedSharePartitionMap(30)); assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 30, mockedSharePartitionMap(40))); assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 40, mockedSharePartitionMap(5))); - assertShareCacheContains(cache, new ArrayList<>(Arrays.asList(key1, key2, key3))); + assertShareCacheContains(cache, List.of(key1, key2, key3)); + + TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 3, + "Share session count should be 3."); + TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 60, + "Share partition count should be 60."); + assertEquals(0, cache.evictionsMeter().count()); + + // Touch the sessions to update the last used time, so that the key-2 can be evicted. cache.touch(cache.get(key1), 200); ShareSessionKey key4 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 210, mockedSharePartitionMap(11)); - assertShareCacheContains(cache, new ArrayList<>(Arrays.asList(key1, key3, key4))); + assertShareCacheContains(cache, List.of(key1, key3, key4)); + + TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 3, + "Share session count should be 3."); + TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 51, + "Share partition count should be 51."); + assertEquals(1, cache.evictionsMeter().count()); + assertTrue(cache.evictionsMeter().meanRate() > 0); + cache.touch(cache.get(key1), 400); cache.touch(cache.get(key3), 390); cache.touch(cache.get(key4), 400); - ShareSessionKey key5 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 410, mockedSharePartitionMap(50)); - assertNull(key5); + // No key should be evicted as all the sessions are touched to latest time. + assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 410, mockedSharePartitionMap(50))); } @Test - public void testResizeCachedSessions() { + public void testResizeCachedSessions() throws InterruptedException { ShareSessionCache cache = new ShareSessionCache(2, 100); assertEquals(0, cache.size()); assertEquals(0, cache.totalPartitions()); ShareSessionKey key1 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 0, mockedSharePartitionMap(2)); assertNotNull(key1); - assertShareCacheContains(cache, new ArrayList<>(List.of(key1))); + assertShareCacheContains(cache, List.of(key1)); ShareSession session1 = cache.get(key1); assertEquals(2, session1.size()); assertEquals(2, cache.totalPartitions()); assertEquals(1, cache.size()); + TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 1, + "Share session count should be 1."); + TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 2, + "Share partition count should be 2."); + assertEquals(0, cache.evictionsMeter().count()); + ShareSessionKey key2 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 0, mockedSharePartitionMap(4)); assertNotNull(key2); - assertShareCacheContains(cache, new ArrayList<>(Arrays.asList(key1, key2))); + assertShareCacheContains(cache, List.of(key1, key2)); ShareSession session2 = cache.get(key2); assertEquals(6, cache.totalPartitions()); assertEquals(2, cache.size()); cache.touch(session1, 200); cache.touch(session2, 200); + TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 2, + "Share session count should be 2."); + TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 6, + "Share partition count should be 6."); + assertEquals(0, cache.evictionsMeter().count()); + ShareSessionKey key3 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 200, mockedSharePartitionMap(5)); assertNull(key3); - assertShareCacheContains(cache, new ArrayList<>(Arrays.asList(key1, key2))); + assertShareCacheContains(cache, List.of(key1, key2)); assertEquals(6, cache.totalPartitions()); assertEquals(2, cache.size()); cache.remove(key1); - assertShareCacheContains(cache, new ArrayList<>(List.of(key2))); + assertShareCacheContains(cache, List.of(key2)); assertEquals(1, cache.size()); assertEquals(4, cache.totalPartitions()); + TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 1, + "Share session count should be 1."); + TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 4, + "Share partition count should be 4."); + assertEquals(0, cache.evictionsMeter().count()); + Iterator iterator = session2.partitionMap().iterator(); iterator.next(); iterator.remove(); + // Session size should get updated as it's backed by the partition map. assertEquals(3, session2.size()); + // Cached size should not get updated as it shall update on touch. assertEquals(4, session2.cachedSize()); + assertEquals(4, cache.totalPartitions()); + // Touch the session to update the changes in cache and session's cached size. cache.touch(session2, session2.lastUsedMs()); + assertEquals(3, session2.cachedSize()); assertEquals(3, cache.totalPartitions()); + + TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 1, + "Share session count should be 1."); + TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 3, + "Share partition count should be 3."); + assertEquals(0, cache.evictionsMeter().count()); } private ImplicitLinkedHashCollection mockedSharePartitionMap(int size) { @@ -104,7 +157,7 @@ public class ShareSessionCacheTest { } private void assertShareCacheContains(ShareSessionCache cache, - ArrayList sessionKeys) { + List sessionKeys) { int i = 0; assertEquals(sessionKeys.size(), cache.size()); for (ShareSessionKey sessionKey : sessionKeys) {