diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index 2298df9b85e..731904d56a7 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -24,11 +24,13 @@ import kafka.server.ReplicaManager; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.purgatory.DelayedOperation; import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey; import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy; import org.apache.kafka.server.share.fetch.ShareFetch; +import org.apache.kafka.server.share.metrics.ShareGroupMetrics; import org.apache.kafka.server.storage.log.FetchIsolation; import org.apache.kafka.server.storage.log.FetchPartitionData; import org.apache.kafka.storage.internals.log.LogOffsetMetadata; @@ -37,7 +39,6 @@ import org.apache.kafka.storage.internals.log.LogOffsetSnapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; @@ -62,25 +63,43 @@ public class DelayedShareFetch extends DelayedOperation { private final ReplicaManager replicaManager; private final BiConsumer exceptionHandler; private final PartitionMaxBytesStrategy partitionMaxBytesStrategy; + private final ShareGroupMetrics shareGroupMetrics; + private final Time time; // The topic partitions that need to be completed for the share fetch request are given by sharePartitions. // sharePartitions is a subset of shareFetchData. The order of insertion/deletion of entries in sharePartitions is important. private final LinkedHashMap sharePartitions; + // Tracks the start time to acquire any share partition for a fetch request. + private long acquireStartTimeMs; private LinkedHashMap partitionsAcquired; private LinkedHashMap partitionsAlreadyFetched; /** - * This function constructs an instance of delayed share fetch operation for completing share fetch requests instantaneously or with delay. - * @param shareFetch - The share fetch parameters of the share fetch request. - * @param replicaManager - The replica manager instance used to read from log/complete the request. - * @param exceptionHandler - The handler to complete share fetch requests with exception. - * @param sharePartitions - The share partitions referenced in the share fetch request. + * This function constructs an instance of delayed share fetch operation for completing share fetch + * requests instantaneously or with delay. + * + * @param shareFetch The share fetch parameters of the share fetch request. + * @param replicaManager The replica manager instance used to read from log/complete the request. + * @param exceptionHandler The handler to complete share fetch requests with exception. + * @param sharePartitions The share partitions referenced in the share fetch request. + * @param shareGroupMetrics The share group metrics to record the metrics. + * @param time The system time. */ public DelayedShareFetch( ShareFetch shareFetch, ReplicaManager replicaManager, BiConsumer exceptionHandler, - LinkedHashMap sharePartitions) { - this(shareFetch, replicaManager, exceptionHandler, sharePartitions, PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM)); + LinkedHashMap sharePartitions, + ShareGroupMetrics shareGroupMetrics, + Time time + ) { + this(shareFetch, + replicaManager, + exceptionHandler, + sharePartitions, + PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM), + shareGroupMetrics, + time + ); } DelayedShareFetch( @@ -88,7 +107,10 @@ public class DelayedShareFetch extends DelayedOperation { ReplicaManager replicaManager, BiConsumer exceptionHandler, LinkedHashMap sharePartitions, - PartitionMaxBytesStrategy partitionMaxBytesStrategy) { + PartitionMaxBytesStrategy partitionMaxBytesStrategy, + ShareGroupMetrics shareGroupMetrics, + Time time + ) { super(shareFetch.fetchParams().maxWaitMs, Optional.empty()); this.shareFetch = shareFetch; this.replicaManager = replicaManager; @@ -97,6 +119,9 @@ public class DelayedShareFetch extends DelayedOperation { this.exceptionHandler = exceptionHandler; this.sharePartitions = sharePartitions; this.partitionMaxBytesStrategy = partitionMaxBytesStrategy; + this.shareGroupMetrics = shareGroupMetrics; + this.time = time; + this.acquireStartTimeMs = time.hiResClockMs(); } @Override @@ -120,16 +145,28 @@ public class DelayedShareFetch extends DelayedOperation { try { LinkedHashMap topicPartitionData; // tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch. - if (partitionsAcquired.isEmpty()) + if (partitionsAcquired.isEmpty()) { topicPartitionData = acquirablePartitions(); - // tryComplete invoked forceComplete, so we can use the data from tryComplete. - else + // The TopicPartitionsAcquireTimeMs metric signifies the tension when acquiring the locks + // for the share partition, hence if no partitions are yet acquired by tryComplete, + // we record the metric here. Do not check if the request has successfully acquired any + // partitions now or not, as then the upper bound of request timeout shall be recorded + // for the metric. + updateAcquireElapsedTimeMetric(); + } else { + // tryComplete invoked forceComplete, so we can use the data from tryComplete. topicPartitionData = partitionsAcquired; + } if (topicPartitionData.isEmpty()) { // No locks for share partitions could be acquired, so we complete the request with an empty response. - shareFetch.maybeComplete(Collections.emptyMap()); + shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), 0); + shareFetch.maybeComplete(Map.of()); return; + } else { + // Update metric to record acquired to requested partitions. + double requestTopicToAcquired = (double) topicPartitionData.size() / shareFetch.partitionMaxBytes().size(); + shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) (requestTopicToAcquired * 100)); } log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", topicPartitionData, shareFetch.groupId(), shareFetch.fetchParams()); @@ -183,6 +220,8 @@ public class DelayedShareFetch extends DelayedOperation { try { if (!topicPartitionData.isEmpty()) { + // Update the metric to record the time taken to acquire the locks for the share partitions. + updateAcquireElapsedTimeMetric(); // In case, fetch offset metadata doesn't exist for one or more topic partitions, we do a // replicaManager.readFromLog to populate the offset metadata and update the fetch offset metadata for // those topic partitions. @@ -417,6 +456,20 @@ public class DelayedShareFetch extends DelayedOperation { shareFetch.maybeCompleteWithException(topicIdPartitions, throwable); } + /** + * The method updates the metric for the time taken to acquire the share partition locks. Also, + * it resets the acquireStartTimeMs to the current time, so that the metric records the time taken + * to acquire the locks for the re-try, if the partitions are re-acquired. The partitions can be + * re-acquired if the fetch request is not completed because of the minBytes or some other condition. + */ + private void updateAcquireElapsedTimeMetric() { + long currentTimeMs = time.hiResClockMs(); + shareGroupMetrics.recordTopicPartitionsAcquireTimeMs(shareFetch.groupId(), currentTimeMs - acquireStartTimeMs); + // Reset the acquireStartTimeMs to the current time. If the fetch request is not completed + // and the partitions are re-acquired then metric should record value from the last acquire time. + acquireStartTimeMs = currentTimeMs; + } + // Visible for testing. LinkedHashMap combineLogReadResponse(LinkedHashMap topicPartitionData, LinkedHashMap existingFetchedData) { diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index 04a6e526ec2..f080b08e8c8 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -659,7 +659,7 @@ public class SharePartitionManager implements AutoCloseable { // Add the share fetch to the delayed share fetch purgatory to process the fetch request. // The request will be added irrespective of whether the share partition is initialized or not. // Once the share partition is initialized, the delayed share fetch will be completed. - addDelayedShareFetch(new DelayedShareFetch(shareFetch, replicaManager, fencedSharePartitionHandler(), sharePartitions), delayedShareFetchWatchKeys); + addDelayedShareFetch(new DelayedShareFetch(shareFetch, replicaManager, fencedSharePartitionHandler(), sharePartitions, shareGroupMetrics, time), delayedShareFetchWatchKeys); } private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitionKey) { diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 363ae26b6c3..96cc155b731 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.message.ShareFetchResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.purgatory.DelayedOperationKey; import org.apache.kafka.server.purgatory.DelayedOperationPurgatory; import org.apache.kafka.server.share.SharePartitionKey; @@ -36,9 +37,11 @@ import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey; import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy; import org.apache.kafka.server.share.fetch.ShareAcquiredRecords; import org.apache.kafka.server.share.fetch.ShareFetch; +import org.apache.kafka.server.share.metrics.ShareGroupMetrics; import org.apache.kafka.server.storage.log.FetchIsolation; import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.server.storage.log.FetchPartitionData; +import org.apache.kafka.server.util.MockTime; import org.apache.kafka.server.util.timer.SystemTimer; import org.apache.kafka.server.util.timer.SystemTimerReaper; import org.apache.kafka.server.util.timer.Timer; @@ -74,6 +77,7 @@ import static kafka.server.share.SharePartitionManagerTest.mockReplicaManagerDel import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.orderedMap; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; @@ -100,6 +104,7 @@ public class DelayedShareFetchTest { @BeforeEach public void setUp() { + kafka.utils.TestUtils.clearYammerMetrics(); mockTimer = new SystemTimerReaper("DelayedShareFetchTestReaper", new SystemTimer("DelayedShareFetchTestTimer")); } @@ -133,9 +138,12 @@ public class DelayedShareFetchTest { when(sp0.canAcquireRecords()).thenReturn(false); when(sp1.canAcquireRecords()).thenReturn(false); + + ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(new MockTime()); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions) + .withShareGroupMetrics(shareGroupMetrics) .build()); // Since there is no partition that can be acquired, tryComplete should return false. @@ -143,6 +151,10 @@ public class DelayedShareFetchTest { assertFalse(delayedShareFetch.isCompleted()); Mockito.verify(delayedShareFetch, times(0)).releasePartitionLocks(any()); assertTrue(delayedShareFetch.lock().tryLock()); + // Metrics shall not be recorded as no partition is acquired. + assertNull(shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId)); + assertNull(shareGroupMetrics.topicPartitionsFetchRatio(groupId)); + delayedShareFetch.lock().unlock(); } @@ -190,12 +202,17 @@ public class DelayedShareFetchTest { PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Collections.singleton(tp0)); + Time time = mock(Time.class); + when(time.hiResClockMs()).thenReturn(100L).thenReturn(110L); + ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions) .withReplicaManager(replicaManager) .withExceptionHandler(exceptionHandler) .withPartitionMaxBytesStrategy(partitionMaxBytesStrategy) + .withShareGroupMetrics(shareGroupMetrics) + .withTime(time) .build()); assertFalse(delayedShareFetch.isCompleted()); @@ -204,6 +221,12 @@ public class DelayedShareFetchTest { assertFalse(delayedShareFetch.isCompleted()); Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any()); assertTrue(delayedShareFetch.lock().tryLock()); + // Though the request is not completed but sp0 was acquired and hence the metric should be recorded. + assertEquals(1, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count()); + assertEquals(10, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum()); + // Since the request is not completed, the fetch ratio should be null. + assertNull(shareGroupMetrics.topicPartitionsFetchRatio(groupId)); + delayedShareFetch.lock().unlock(); Mockito.verify(exceptionHandler, times(1)).accept(any(), any()); } @@ -297,11 +320,16 @@ public class DelayedShareFetchTest { PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Collections.singleton(tp0)); + Time time = mock(Time.class); + when(time.hiResClockMs()).thenReturn(120L).thenReturn(140L); + ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions) .withReplicaManager(replicaManager) .withPartitionMaxBytesStrategy(partitionMaxBytesStrategy) + .withShareGroupMetrics(shareGroupMetrics) + .withTime(time) .build()); assertFalse(delayedShareFetch.isCompleted()); @@ -310,6 +338,11 @@ public class DelayedShareFetchTest { assertTrue(delayedShareFetch.isCompleted()); Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any()); assertTrue(delayedShareFetch.lock().tryLock()); + assertEquals(1, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count()); + assertEquals(20, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum()); + assertEquals(1, shareGroupMetrics.topicPartitionsFetchRatio(groupId).count()); + assertEquals(50, shareGroupMetrics.topicPartitionsFetchRatio(groupId).sum()); + delayedShareFetch.lock().unlock(); } @@ -338,10 +371,16 @@ public class DelayedShareFetchTest { when(sp0.canAcquireRecords()).thenReturn(false); when(sp1.canAcquireRecords()).thenReturn(false); + + Time time = mock(Time.class); + when(time.hiResClockMs()).thenReturn(90L).thenReturn(140L); + ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetch) .withReplicaManager(replicaManager) .withSharePartitions(sharePartitions) + .withShareGroupMetrics(shareGroupMetrics) + .withTime(time) .build()); assertFalse(delayedShareFetch.isCompleted()); delayedShareFetch.forceComplete(); @@ -353,6 +392,12 @@ public class DelayedShareFetchTest { assertTrue(delayedShareFetch.isCompleted()); Mockito.verify(delayedShareFetch, times(0)).releasePartitionLocks(any()); assertTrue(delayedShareFetch.lock().tryLock()); + // As the request is completed by onComplete then both metrics shall be recorded. + assertEquals(1, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count()); + assertEquals(50, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum()); + assertEquals(1, shareGroupMetrics.topicPartitionsFetchRatio(groupId).count()); + assertEquals(0, shareGroupMetrics.topicPartitionsFetchRatio(groupId).sum()); + delayedShareFetch.lock().unlock(); } @@ -387,11 +432,16 @@ public class DelayedShareFetchTest { PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Collections.singleton(tp0)); + Time time = mock(Time.class); + when(time.hiResClockMs()).thenReturn(10L).thenReturn(140L); + ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetch) .withReplicaManager(replicaManager) .withSharePartitions(sharePartitions) .withPartitionMaxBytesStrategy(partitionMaxBytesStrategy) + .withShareGroupMetrics(shareGroupMetrics) + .withTime(time) .build()); assertFalse(delayedShareFetch.isCompleted()); delayedShareFetch.forceComplete(); @@ -405,6 +455,11 @@ public class DelayedShareFetchTest { assertTrue(shareFetch.isCompleted()); Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any()); assertTrue(delayedShareFetch.lock().tryLock()); + assertEquals(1, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count()); + assertEquals(130, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum()); + assertEquals(1, shareGroupMetrics.topicPartitionsFetchRatio(groupId).count()); + assertEquals(50, shareGroupMetrics.topicPartitionsFetchRatio(groupId).sum()); + delayedShareFetch.lock().unlock(); } @@ -428,10 +483,12 @@ public class DelayedShareFetchTest { when(sp0.maybeAcquireFetchLock()).thenReturn(true); when(sp0.canAcquireRecords()).thenReturn(false); + ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(new MockTime()); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetch) .withReplicaManager(replicaManager) .withSharePartitions(sharePartitions) + .withShareGroupMetrics(shareGroupMetrics) .build()); assertFalse(delayedShareFetch.isCompleted()); @@ -452,6 +509,10 @@ public class DelayedShareFetchTest { Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions(); Mockito.verify(delayedShareFetch, times(0)).releasePartitionLocks(any()); assertTrue(delayedShareFetch.lock().tryLock()); + // Assert both metrics shall be recorded only once. + assertEquals(1, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count()); + assertEquals(1, shareGroupMetrics.topicPartitionsFetchRatio(groupId).count()); + delayedShareFetch.lock().unlock(); } @@ -639,12 +700,17 @@ public class DelayedShareFetchTest { PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Collections.singleton(tp0)); BiConsumer exceptionHandler = mockExceptionHandler(); + Time time = mock(Time.class); + when(time.hiResClockMs()).thenReturn(100L).thenReturn(110L).thenReturn(170L); + ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions) .withReplicaManager(replicaManager) .withExceptionHandler(exceptionHandler) .withPartitionMaxBytesStrategy(partitionMaxBytesStrategy) + .withShareGroupMetrics(shareGroupMetrics) + .withTime(time) .build()); // Try complete should return false as the share partition has errored out. @@ -669,6 +735,13 @@ public class DelayedShareFetchTest { any(), any(), any(ReplicaQuota.class), anyBoolean()); Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any()); assertTrue(delayedShareFetch.lock().tryLock()); + assertEquals(2, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count()); + assertEquals(70, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum()); + assertEquals(10, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).min()); + assertEquals(60, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).max()); + assertEquals(1, shareGroupMetrics.topicPartitionsFetchRatio(groupId).count()); + assertEquals(0, shareGroupMetrics.topicPartitionsFetchRatio(groupId).sum()); + delayedShareFetch.lock().unlock(); Mockito.verify(exceptionHandler, times(1)).accept(any(), any()); } @@ -1078,11 +1151,13 @@ public class DelayedShareFetchTest { } static class DelayedShareFetchBuilder { - ShareFetch shareFetch = mock(ShareFetch.class); + private ShareFetch shareFetch = mock(ShareFetch.class); private ReplicaManager replicaManager = mock(ReplicaManager.class); private BiConsumer exceptionHandler = mockExceptionHandler(); private LinkedHashMap sharePartitions = mock(LinkedHashMap.class); private PartitionMaxBytesStrategy partitionMaxBytesStrategy = mock(PartitionMaxBytesStrategy.class); + private Time time = new MockTime(); + private ShareGroupMetrics shareGroupMetrics = mock(ShareGroupMetrics.class); DelayedShareFetchBuilder withShareFetchData(ShareFetch shareFetch) { this.shareFetch = shareFetch; @@ -1109,6 +1184,16 @@ public class DelayedShareFetchTest { return this; } + private DelayedShareFetchBuilder withShareGroupMetrics(ShareGroupMetrics shareGroupMetrics) { + this.shareGroupMetrics = shareGroupMetrics; + return this; + } + + private DelayedShareFetchBuilder withTime(Time time) { + this.time = time; + return this; + } + public static DelayedShareFetchBuilder builder() { return new DelayedShareFetchBuilder(); } @@ -1119,7 +1204,9 @@ public class DelayedShareFetchTest { replicaManager, exceptionHandler, sharePartitions, - partitionMaxBytesStrategy); + partitionMaxBytesStrategy, + shareGroupMetrics, + time); } } } diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index 0f777586e12..6454f303202 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -2259,7 +2259,7 @@ public class SharePartitionManagerTest { mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory); Time time = mock(Time.class); - when(time.hiResClockMs()).thenReturn(100L).thenReturn(200L); + when(time.hiResClockMs()).thenReturn(100L); ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time); sharePartitionManager = SharePartitionManagerBuilder.builder() .withPartitionCacheMap(partitionCacheMap) @@ -2292,9 +2292,9 @@ public class SharePartitionManagerTest { pendingInitializationFuture2.complete(null); // Verify the partition load time metrics for both partitions. assertEquals(2, shareGroupMetrics.partitionLoadTimeMs().count()); - assertEquals(90.0, shareGroupMetrics.partitionLoadTimeMs().min()); - assertEquals(160.0, shareGroupMetrics.partitionLoadTimeMs().max()); - assertEquals(250.0, shareGroupMetrics.partitionLoadTimeMs().sum()); + assertEquals(60.0, shareGroupMetrics.partitionLoadTimeMs().min()); + assertEquals(90.0, shareGroupMetrics.partitionLoadTimeMs().max()); + assertEquals(150.0, shareGroupMetrics.partitionLoadTimeMs().sum()); shareGroupMetrics.close(); } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index a631e1c0b03..6a27776babc 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -63,6 +63,7 @@ import org.apache.kafka.server.network.BrokerEndPoint import org.apache.kafka.server.purgatory.DelayedOperationPurgatory import org.apache.kafka.server.share.SharePartitionKey import org.apache.kafka.server.share.fetch.{DelayedShareFetchGroupKey, DelayedShareFetchKey, ShareFetch} +import org.apache.kafka.server.share.metrics.ShareGroupMetrics import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData} import org.apache.kafka.server.util.timer.MockTimer import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler} @@ -6101,7 +6102,9 @@ class ReplicaManagerTest { shareFetch, rm, mock(classOf[BiConsumer[SharePartitionKey, Throwable]]), - sharePartitions)) + sharePartitions, + mock(classOf[ShareGroupMetrics]), + time)) val delayedShareFetchWatchKeys : util.List[DelayedShareFetchKey] = new util.ArrayList[DelayedShareFetchKey] partitionMaxBytes.keySet.forEach((topicIdPartition: TopicIdPartition) => delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId, topicIdPartition.partition))) diff --git a/server/src/main/java/org/apache/kafka/server/share/metrics/ShareGroupMetrics.java b/server/src/main/java/org/apache/kafka/server/share/metrics/ShareGroupMetrics.java index 146ab4239aa..d7f167c4ae7 100644 --- a/server/src/main/java/org/apache/kafka/server/share/metrics/ShareGroupMetrics.java +++ b/server/src/main/java/org/apache/kafka/server/share/metrics/ShareGroupMetrics.java @@ -26,6 +26,7 @@ import com.yammer.metrics.core.Meter; import java.util.Arrays; import java.util.Locale; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -33,16 +34,32 @@ import java.util.stream.Collectors; * ShareGroupMetrics is used to track the broker-side metrics for the ShareGroup. */ public class ShareGroupMetrics implements AutoCloseable { - // Rate of records acknowledged per acknowledgement type. + private static final String RECORD_ACKNOWLEDGEMENTS_PER_SEC = "RecordAcknowledgementsPerSec"; - // The time in milliseconds to load the share partitions. private static final String PARTITION_LOAD_TIME_MS = "PartitionLoadTimeMs"; + private static final String TOPIC_PARTITIONS_FETCH_RATIO = "RequestTopicPartitionsFetchRatio"; + private static final String TOPIC_PARTITIONS_ACQUIRE_TIME_MS = "TopicPartitionsAcquireTimeMs"; private static final String ACK_TYPE_TAG = "ackType"; + /** + * Metric for the rate of records acknowledged per acknowledgement type. + */ + private final Map recordAcknowledgementMeterMap; + /** + * Metric for the time taken to load the share partitions. + */ + private final Histogram partitionLoadTimeMs; + /** + * Metric for the ratio of topic partitions fetched to the total number of topic partitions requested, per group. + */ + private final Map topicPartitionsFetchRatio; + /** + * Metric for the time taken to acquire topic partitions for a group. + */ + private final Map topicPartitionsAcquireTimeMs; + private final KafkaMetricsGroup metricsGroup; private final Time time; - private final Map recordAcknowledgementMeterMap; - private final Histogram partitionLoadTimeMs; public ShareGroupMetrics(Time time) { this.time = time; @@ -58,7 +75,9 @@ public class ShareGroupMetrics implements AutoCloseable { ) ) ); - partitionLoadTimeMs = metricsGroup.newHistogram(PARTITION_LOAD_TIME_MS); + this.partitionLoadTimeMs = metricsGroup.newHistogram(PARTITION_LOAD_TIME_MS); + this.topicPartitionsFetchRatio = new ConcurrentHashMap<>(); + this.topicPartitionsAcquireTimeMs = new ConcurrentHashMap<>(); } public void recordAcknowledgement(byte ackType) { @@ -76,19 +95,45 @@ public class ShareGroupMetrics implements AutoCloseable { partitionLoadTimeMs.update(time.hiResClockMs() - start); } + public void recordTopicPartitionsFetchRatio(String groupId, long value) { + topicPartitionsFetchRatio.computeIfAbsent(groupId, + k -> metricsGroup.newHistogram(TOPIC_PARTITIONS_FETCH_RATIO, true, Map.of("group", groupId))); + topicPartitionsFetchRatio.get(groupId).update(value); + } + + public void recordTopicPartitionsAcquireTimeMs(String groupId, long timeMs) { + topicPartitionsAcquireTimeMs.computeIfAbsent(groupId, + k -> metricsGroup.newHistogram(TOPIC_PARTITIONS_ACQUIRE_TIME_MS, true, Map.of("group", groupId))); + topicPartitionsAcquireTimeMs.get(groupId).update(timeMs); + } + + // Visible for testing public Meter recordAcknowledgementMeter(byte ackType) { return recordAcknowledgementMeterMap.get(ackType); } + // Visible for testing public Histogram partitionLoadTimeMs() { return partitionLoadTimeMs; } + // Visible for testing + public Histogram topicPartitionsFetchRatio(String groupId) { + return topicPartitionsFetchRatio.get(groupId); + } + + // Visible for testing + public Histogram topicPartitionsAcquireTimeMs(String groupId) { + return topicPartitionsAcquireTimeMs.get(groupId); + } + @Override public void close() throws Exception { Arrays.stream(AcknowledgeType.values()).forEach( m -> metricsGroup.removeMetric(RECORD_ACKNOWLEDGEMENTS_PER_SEC, Map.of(ACK_TYPE_TAG, m.toString()))); metricsGroup.removeMetric(PARTITION_LOAD_TIME_MS); + topicPartitionsFetchRatio.forEach((k, v) -> metricsGroup.removeMetric(TOPIC_PARTITIONS_FETCH_RATIO, Map.of("group", k))); + topicPartitionsAcquireTimeMs.forEach((k, v) -> metricsGroup.removeMetric(TOPIC_PARTITIONS_ACQUIRE_TIME_MS, Map.of("group", k))); } private static String capitalize(String string) {