From 8cf969e00a86523ba4997fd7b8c17eed605e6ec0 Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Fri, 28 Feb 2025 14:22:27 +0000 Subject: [PATCH] KAFKA-18734: Implemented share partition metrics (KIP-1103) (#19045) The PR implements the SharePartitionMetrics as defined in KIP-1103, with one change. The metric `FetchLockRatio` is defined as `Meter` in KIP but is implemented as `HIstogram`. There was a discussion about same on KIP-1103 discussion where we thought that `FetchLockRatio` is pre-aggregated but while implemeting the rate from `Meter` can go above 100 as `Meter` defines rate per time period. Hence it makes more sense to implement metric `FetchLockRatio` as `Histogram`. Reviewers: Andrew Schofield --- .../kafka/server/share/SharePartition.java | 119 ++++++++-- .../server/share/SharePartitionTest.java | 206 ++++++++++++++++-- .../share/metrics/SharePartitionMetrics.java | 166 ++++++++++++++ 3 files changed, 459 insertions(+), 32 deletions(-) create mode 100644 server/src/main/java/org/apache/kafka/server/share/metrics/SharePartitionMetrics.java diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 9c358cf4c1e..301d2678782 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -42,6 +42,7 @@ import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch; import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey; import org.apache.kafka.server.share.fetch.DelayedShareFetchKey; import org.apache.kafka.server.share.fetch.ShareAcquiredRecords; +import org.apache.kafka.server.share.metrics.SharePartitionMetrics; import org.apache.kafka.server.share.persister.GroupTopicPartitionData; import org.apache.kafka.server.share.persister.PartitionAllData; import org.apache.kafka.server.share.persister.PartitionErrorData; @@ -84,6 +85,7 @@ import static kafka.server.share.ShareFetchUtils.offsetForTimestamp; * consumers. The class maintains the state of the records that have been fetched from the leader * and are in-flight. */ +@SuppressWarnings("ClassDataAbstractionCoupling") public class SharePartition { private static final Logger log = LoggerFactory.getLogger(SharePartition.class); @@ -280,6 +282,11 @@ public class SharePartition { */ private final long loadStartTimeMs; + /** + * The share partition metrics is used to track the broker-side metrics for the share partition. + */ + private final SharePartitionMetrics sharePartitionMetrics; + /** * The share partition start offset specifies the partition start offset from which the records * are cached in the cachedState of the sharePartition. @@ -313,6 +320,21 @@ public class SharePartition { */ private SharePartitionState partitionState; + /** + * The fetch lock acquired time is used to track the time when the lock for share partition is acquired. + */ + private long fetchLockAcquiredTimeMs; + + /** + * The fetch lock released time is used to track the time when the lock for share partition is released. + */ + private long fetchLockReleasedTimeMs; + + /** + * The fetch lock idle duration is used to track the time for which the fetch lock is idle. + */ + private long fetchLockIdleDurationMs; + /** * The replica manager is used to check to see if any delayed share fetch request can be completed because of data * availability due to acquisition lock timeout. @@ -334,9 +356,12 @@ public class SharePartition { SharePartitionListener listener ) { this(groupId, topicIdPartition, leaderEpoch, maxInFlightMessages, maxDeliveryCount, defaultRecordLockDurationMs, - timer, time, persister, replicaManager, groupConfigManager, SharePartitionState.EMPTY, listener); + timer, time, persister, replicaManager, groupConfigManager, SharePartitionState.EMPTY, listener, + new SharePartitionMetrics(groupId, topicIdPartition.topic(), topicIdPartition.partition())); } + // Visible for testing + @SuppressWarnings("ParameterNumber") SharePartition( String groupId, TopicIdPartition topicIdPartition, @@ -350,7 +375,8 @@ public class SharePartition { ReplicaManager replicaManager, GroupConfigManager groupConfigManager, SharePartitionState sharePartitionState, - SharePartitionListener listener + SharePartitionListener listener, + SharePartitionMetrics sharePartitionMetrics ) { this.groupId = groupId; this.topicIdPartition = topicIdPartition; @@ -371,6 +397,8 @@ public class SharePartition { this.groupConfigManager = groupConfigManager; this.fetchOffsetMetadata = new OffsetMetadata(); this.listener = listener; + this.sharePartitionMetrics = sharePartitionMetrics; + this.registerGaugeMetrics(); } /** @@ -476,6 +504,7 @@ public class SharePartition { InFlightBatch inFlightBatch = new InFlightBatch(EMPTY_MEMBER_ID, stateBatch.firstOffset(), stateBatch.lastOffset(), RecordState.forId(stateBatch.deliveryState()), stateBatch.deliveryCount(), null); cachedState.put(stateBatch.firstOffset(), inFlightBatch); + sharePartitionMetrics.recordInFlightBatchMessageCount(stateBatch.lastOffset() - stateBatch.firstOffset() + 1); } // Update the endOffset of the partition. if (!cachedState.isEmpty()) { @@ -1265,19 +1294,7 @@ public class SharePartition { if (nextFetchOffset() != endOffset() + 1) { return true; } - - lock.readLock().lock(); - long numRecords; - try { - if (cachedState.isEmpty()) { - numRecords = 0; - } else { - numRecords = this.endOffset - this.startOffset + 1; - } - } finally { - lock.readLock().unlock(); - } - return numRecords < maxInFlightMessages; + return numInFlightRecords() < maxInFlightMessages; } /** @@ -1291,13 +1308,30 @@ public class SharePartition { if (stateNotActive()) { return false; } - return fetchLock.compareAndSet(false, true); + boolean acquired = fetchLock.compareAndSet(false, true); + if (acquired) { + long currentTime = time.hiResClockMs(); + fetchLockAcquiredTimeMs = currentTime; + fetchLockIdleDurationMs = fetchLockReleasedTimeMs != 0 ? currentTime - fetchLockReleasedTimeMs : 0; + } + return acquired; } /** * Release the fetch lock once the records are fetched from the leader. */ void releaseFetchLock() { + // Register the metric for the duration the fetch lock was held. Do not register the metric + // if the fetch lock was not acquired. + if (fetchLock.get()) { + long currentTime = time.hiResClockMs(); + long acquiredDurationMs = currentTime - fetchLockAcquiredTimeMs; + // Update the metric for the fetch lock time. + sharePartitionMetrics.recordFetchLockTimeMs(acquiredDurationMs); + // Update fetch lock ratio metric. + recordFetchLockRatioMetric(acquiredDurationMs); + fetchLockReleasedTimeMs = currentTime; + } fetchLock.set(false); } @@ -1326,6 +1360,56 @@ public class SharePartition { return leaderEpoch; } + /** + * Records the fetch lock ratio metric. The metric is the ratio of the time duration the fetch + * lock was acquired to the total time since the last lock acquisition. The total time is calculated + * by adding the duration of the fetch lock idle time to the time the fetch lock was acquired. + * + * @param acquiredDurationMs The time duration the fetch lock was acquired. + */ + // Visible for testing + void recordFetchLockRatioMetric(long acquiredDurationMs) { + if (fetchLockIdleDurationMs < 0) { + // This is just a safe check to avoid negative time for fetch lock idle duration. This + // should not happen in any scenarios. If it does then just return from the method and + // no metric update is an indicator of the issue. + return; + } + + // Update the total fetch lock acquired time. + double fetchLockToTotalTime; + if (acquiredDurationMs + fetchLockIdleDurationMs == 0) { + // If the total time is 0 then the ratio is 1 i.e. the fetch lock was acquired for the complete time. + fetchLockToTotalTime = 1.0; + } else if (acquiredDurationMs == 0) { + // If the acquired duration is 0 then the ratio is the calculated by the idle duration. + fetchLockToTotalTime = 1.0 / fetchLockIdleDurationMs; + } else { + fetchLockToTotalTime = acquiredDurationMs * (1.0 / (acquiredDurationMs + fetchLockIdleDurationMs)); + } + sharePartitionMetrics.recordFetchLockRatio((int) (fetchLockToTotalTime * 100)); + } + + private void registerGaugeMetrics() { + sharePartitionMetrics.registerInFlightMessageCount(this::numInFlightRecords); + sharePartitionMetrics.registerInFlightBatchCount(this.cachedState::size); + } + + private long numInFlightRecords() { + lock.readLock().lock(); + long numRecords; + try { + if (cachedState.isEmpty()) { + numRecords = 0; + } else { + numRecords = this.endOffset - this.startOffset + 1; + } + } finally { + lock.readLock().unlock(); + } + return numRecords; + } + private boolean stateNotActive() { return partitionState() != SharePartitionState.ACTIVE; } @@ -1478,6 +1562,8 @@ public class SharePartition { RecordState.ACQUIRED, 1, timerTask)); + // Update the in-flight batch message count metrics for the share partition. + sharePartitionMetrics.recordInFlightBatchMessageCount(acquiredRecords.lastOffset() - acquiredRecords.firstOffset() + 1); }); return result; } finally { @@ -2501,6 +2587,7 @@ public class SharePartition { */ @Override public void run() { + sharePartitionMetrics.recordAcquisitionLockTimeoutPerSec(lastOffset - firstOffset + 1); releaseAcquisitionLockOnTimeout(memberId, firstOffset, lastOffset); } } diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index fec9b346cbb..a2c6e7ed30b 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -48,8 +48,10 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.coordinator.group.GroupConfig; import org.apache.kafka.coordinator.group.GroupConfigManager; import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy; +import org.apache.kafka.server.metrics.KafkaYammerMetrics; import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch; import org.apache.kafka.server.share.fetch.ShareAcquiredRecords; +import org.apache.kafka.server.share.metrics.SharePartitionMetrics; import org.apache.kafka.server.share.persister.NoOpStatePersister; import org.apache.kafka.server.share.persister.PartitionFactory; import org.apache.kafka.server.share.persister.Persister; @@ -65,6 +67,8 @@ import org.apache.kafka.server.util.timer.Timer; import org.apache.kafka.storage.internals.log.OffsetResultHolder; import org.apache.kafka.test.TestUtils; +import com.yammer.metrics.core.Gauge; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -96,6 +100,8 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class SharePartitionTest { @@ -104,7 +110,6 @@ public class SharePartitionTest { private static final int MAX_DELIVERY_COUNT = 5; private static final TopicIdPartition TOPIC_ID_PARTITION = new TopicIdPartition(Uuid.randomUuid(), 0, "test-topic"); private static final String MEMBER_ID = "member-1"; - private static Timer mockTimer; private static final Time MOCK_TIME = new MockTime(); private static final short MAX_IN_FLIGHT_MESSAGES = 200; private static final int ACQUISITION_LOCK_TIMEOUT_MS = 100; @@ -113,16 +118,21 @@ public class SharePartitionTest { private static final int DEFAULT_FETCH_OFFSET = 0; private static final int MAX_FETCH_RECORDS = Integer.MAX_VALUE; private static final byte ACKNOWLEDGE_TYPE_GAP_ID = 0; + private static Timer mockTimer; + private SharePartitionMetrics sharePartitionMetrics; @BeforeEach public void setUp() { + kafka.utils.TestUtils.clearYammerMetrics(); mockTimer = new SystemTimerReaper("share-group-lock-timeout-test-reaper", new SystemTimer("share-group-lock-test-timeout")); + sharePartitionMetrics = new SharePartitionMetrics(GROUP_ID, TOPIC_ID_PARTITION.topic(), TOPIC_ID_PARTITION.partition()); } @AfterEach public void tearDown() throws Exception { mockTimer.close(); + sharePartitionMetrics.close(); } @Test @@ -165,7 +175,7 @@ public class SharePartitionTest { } @Test - public void testMaybeInitialize() { + public void testMaybeInitialize() throws InterruptedException { Persister persister = Mockito.mock(Persister.class); ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( @@ -175,7 +185,10 @@ public class SharePartitionTest { new PersisterStateBatch(5L, 10L, RecordState.AVAILABLE.id, (short) 2), new PersisterStateBatch(11L, 15L, RecordState.ARCHIVED.id, (short) 3))))))); Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); - SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withPersister(persister) + .withSharePartitionMetrics(sharePartitionMetrics) + .build(); CompletableFuture result = sharePartition.maybeInitialize(); assertTrue(result.isDone()); @@ -201,6 +214,15 @@ public class SharePartitionTest { assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(11L).batchState()); assertEquals(3, sharePartition.cachedState().get(11L).batchDeliveryCount()); assertNull(sharePartition.cachedState().get(11L).offsetState()); + + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 2, + "In-flight batch count should be 2."); + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 11, + "In-flight message count should be 11."); + assertEquals(11, sharePartitionMetrics.inFlightBatchMessageCount().sum()); + assertEquals(2, sharePartitionMetrics.inFlightBatchMessageCount().count()); + assertEquals(5, sharePartitionMetrics.inFlightBatchMessageCount().min()); + assertEquals(6, sharePartitionMetrics.inFlightBatchMessageCount().max()); } @Test @@ -304,7 +326,8 @@ public class SharePartitionTest { } @Test - public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsByDuration() { + public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsByDuration() + throws InterruptedException { Persister persister = Mockito.mock(Persister.class); ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( @@ -332,7 +355,8 @@ public class SharePartitionTest { ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); - FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(MOCK_TIME.milliseconds() - TimeUnit.HOURS.toMillis(1), 15L, Optional.empty()); + FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset( + MOCK_TIME.milliseconds() - TimeUnit.HOURS.toMillis(1), 15L, Optional.empty()); Mockito.doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())). when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); @@ -340,6 +364,7 @@ public class SharePartitionTest { .withPersister(persister) .withGroupConfigManager(groupConfigManager) .withReplicaManager(replicaManager) + .withSharePartitionMetrics(sharePartitionMetrics) .build(); CompletableFuture result = sharePartition.maybeInitialize(); @@ -359,6 +384,11 @@ public class SharePartitionTest { assertEquals(15, sharePartition.startOffset()); assertEquals(15, sharePartition.endOffset()); assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH, sharePartition.stateEpoch()); + + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 0, + "In-flight batch count should be 0."); + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 0, + "In-flight message count should be 0."); } @Test @@ -1063,8 +1093,11 @@ public class SharePartitionTest { } @Test - public void testAcquireSingleRecord() { - SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); + public void testAcquireSingleRecord() throws InterruptedException { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withSharePartitionMetrics(sharePartitionMetrics) + .build(); MemoryRecords records = memoryRecords(1); List acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 1); @@ -1078,11 +1111,20 @@ public class SharePartitionTest { assertEquals(MEMBER_ID, sharePartition.cachedState().get(0L).batchMemberId()); assertEquals(1, sharePartition.cachedState().get(0L).batchDeliveryCount()); assertNull(sharePartition.cachedState().get(0L).offsetState()); + + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1, + "In-flight batch count should be 1."); + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 1, + "In-flight message count should be 1."); + assertEquals(1, sharePartitionMetrics.inFlightBatchMessageCount().sum()); } @Test - public void testAcquireMultipleRecords() { - SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); + public void testAcquireMultipleRecords() throws InterruptedException { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withSharePartitionMetrics(sharePartitionMetrics) + .build(); MemoryRecords records = memoryRecords(5, 10); List acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 3L, 5); @@ -1096,6 +1138,12 @@ public class SharePartitionTest { assertEquals(MEMBER_ID, sharePartition.cachedState().get(10L).batchMemberId()); assertEquals(1, sharePartition.cachedState().get(10L).batchDeliveryCount()); assertNull(sharePartition.cachedState().get(10L).offsetState()); + + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1, + "In-flight batch count should be 1."); + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 5, + "In-flight message count should be 5."); + assertEquals(5, sharePartitionMetrics.inFlightBatchMessageCount().sum()); } @Test @@ -1145,8 +1193,11 @@ public class SharePartitionTest { } @Test - public void testAcquireWithMultipleBatchesAndMaxFetchRecords() { - SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); + public void testAcquireWithMultipleBatchesAndMaxFetchRecords() throws InterruptedException { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withSharePartitionMetrics(sharePartitionMetrics) + .build(); // Create 3 batches of records. ByteBuffer buffer = ByteBuffer.allocate(4096); @@ -1177,6 +1228,12 @@ public class SharePartitionTest { assertEquals(MEMBER_ID, sharePartition.cachedState().get(10L).batchMemberId()); assertEquals(1, sharePartition.cachedState().get(10L).batchDeliveryCount()); assertNull(sharePartition.cachedState().get(10L).offsetState()); + + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1, + "In-flight batch count should be 1."); + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 20, + "In-flight message count should be 20."); + assertEquals(20, sharePartitionMetrics.inFlightBatchMessageCount().sum()); } @Test @@ -1378,8 +1435,12 @@ public class SharePartitionTest { } @Test - public void testAcquireWithBatchSizeAndEndOffsetLargerThanBatchFirstOffset() { - SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); + public void testAcquireWithBatchSizeAndEndOffsetLargerThanBatchFirstOffset() + throws InterruptedException { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withSharePartitionMetrics(sharePartitionMetrics) + .build(); sharePartition.updateCacheAndOffsets(4L); // Create 2 batches of records. @@ -1407,6 +1468,15 @@ public class SharePartitionTest { assertEquals(2, sharePartition.cachedState().size()); assertTrue(sharePartition.cachedState().containsKey(4L)); assertTrue(sharePartition.cachedState().containsKey(10L)); + + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 2, + "In-flight batch count should be 2."); + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 13, + "In-flight message count should be 13."); + assertEquals(13, sharePartitionMetrics.inFlightBatchMessageCount().sum()); + assertEquals(2, sharePartitionMetrics.inFlightBatchMessageCount().count()); + assertEquals(6, sharePartitionMetrics.inFlightBatchMessageCount().min()); + assertEquals(7, sharePartitionMetrics.inFlightBatchMessageCount().max()); } @Test @@ -1507,15 +1577,82 @@ public class SharePartitionTest { Mockito.doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())). when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); - SharePartition sharePartition = SharePartitionBuilder.builder().withReplicaManager(replicaManager).build(); + Time time = mock(Time.class); + when(time.hiResClockMs()) + .thenReturn(100L) // for tracking loadTimeMs + .thenReturn(110L) // for time when lock is acquired + .thenReturn(120L) // for time when lock is released + .thenReturn(140L) // for subsequent lock acquire + .thenReturn(170L); // for subsequent lock release + SharePartition sharePartition = SharePartitionBuilder.builder() + .withReplicaManager(replicaManager) + .withTime(time) + .withSharePartitionMetrics(sharePartitionMetrics) + .build(); + sharePartition.maybeInitialize(); assertTrue(sharePartition.maybeAcquireFetchLock()); // Lock cannot be acquired again, as already acquired. assertFalse(sharePartition.maybeAcquireFetchLock()); // Release the lock. sharePartition.releaseFetchLock(); + + assertEquals(1, sharePartitionMetrics.fetchLockTimeMs().count()); + assertEquals(10, sharePartitionMetrics.fetchLockTimeMs().sum()); + assertEquals(1, sharePartitionMetrics.fetchLockRatio().count()); + // Since first request didn't have any lock idle wait time, the ratio should be 1. + assertEquals(100, sharePartitionMetrics.fetchLockRatio().mean()); + // Lock can be acquired again. assertTrue(sharePartition.maybeAcquireFetchLock()); + // Release lock to update metrics and verify. + sharePartition.releaseFetchLock(); + + assertEquals(2, sharePartitionMetrics.fetchLockTimeMs().count()); + assertEquals(40, sharePartitionMetrics.fetchLockTimeMs().sum()); + assertEquals(2, sharePartitionMetrics.fetchLockRatio().count()); + // Since the second request had 20ms of idle wait time, the ratio should be 0.6 and mean as 0.8. + assertEquals(80, sharePartitionMetrics.fetchLockRatio().mean()); + } + + @Test + public void testRecordFetchLockRatioMetric() { + Time time = mock(Time.class); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withTime(time) + .withSharePartitionMetrics(sharePartitionMetrics) + .build(); + + // Acquired time and last lock acquisition time is 0; + sharePartition.recordFetchLockRatioMetric(0); + assertEquals(1, sharePartitionMetrics.fetchLockRatio().count()); + assertEquals(100, sharePartitionMetrics.fetchLockRatio().mean()); + + when(time.hiResClockMs()) + .thenReturn(10L) // for time when lock is acquired + .thenReturn(80L) // for time when lock is released + .thenReturn(160L); // to update lock idle duration while acquiring lock again. + + assertTrue(sharePartition.maybeAcquireFetchLock()); + sharePartition.releaseFetchLock(); + // Acquired time is 70 but last lock acquisition time was still 0, as it's the first request + // when last acquisition time was recorded. The last acquisition time should be updated to 80. + assertEquals(2, sharePartitionMetrics.fetchLockRatio().count()); + assertEquals(100, sharePartitionMetrics.fetchLockRatio().mean()); + + assertTrue(sharePartition.maybeAcquireFetchLock()); + // Update metric again with 0 as acquire time and 80 as idle duration ms. + sharePartition.recordFetchLockRatioMetric(0); + assertEquals(3, sharePartitionMetrics.fetchLockRatio().count()); + // Mean should be (100+100+1)/3 = 67, as when idle duration is 80, the ratio should be 1. + assertEquals(67, sharePartitionMetrics.fetchLockRatio().mean()); + + // Update metric again with 10 as acquire time and 80 as idle duration ms. + sharePartition.recordFetchLockRatioMetric(10); + assertEquals(4, sharePartitionMetrics.fetchLockRatio().count()); + // Mean should be (100+100+1+11)/4 = 53, as when idle time is 80 and acquire time 10, the ratio should be 11. + assertEquals(53, sharePartitionMetrics.fetchLockRatio().mean()); } @Test @@ -2762,6 +2899,7 @@ public class SharePartitionTest { SharePartition sharePartition = SharePartitionBuilder.builder() .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) .withState(SharePartitionState.ACTIVE) + .withSharePartitionMetrics(sharePartitionMetrics) .build(); fetchAcquiredRecords(sharePartition, memoryRecords(1), 1); @@ -2777,6 +2915,9 @@ public class SharePartitionTest { sharePartition.timer().size() == 0, DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED); + + assertEquals(1, sharePartitionMetrics.acquisitionLockTimeoutPerSec().count()); + assertTrue(sharePartitionMetrics.acquisitionLockTimeoutPerSec().meanRate() > 0); } @Test @@ -2784,6 +2925,7 @@ public class SharePartitionTest { SharePartition sharePartition = SharePartitionBuilder.builder() .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) .withState(SharePartitionState.ACTIVE) + .withSharePartitionMetrics(sharePartitionMetrics) .build(); fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5); @@ -2799,6 +2941,9 @@ public class SharePartitionTest { && sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask() == null, DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED); + + assertEquals(5, sharePartitionMetrics.acquisitionLockTimeoutPerSec().count()); + assertTrue(sharePartitionMetrics.acquisitionLockTimeoutPerSec().meanRate() > 0); } @Test @@ -2806,6 +2951,7 @@ public class SharePartitionTest { SharePartition sharePartition = SharePartitionBuilder.builder() .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) .withState(SharePartitionState.ACTIVE) + .withSharePartitionMetrics(sharePartitionMetrics) .build(); fetchAcquiredRecords(sharePartition, memoryRecords(5, 0), 5); @@ -2830,6 +2976,9 @@ public class SharePartitionTest { sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask() == null, DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED); + + assertEquals(10, sharePartitionMetrics.acquisitionLockTimeoutPerSec().count()); + assertTrue(sharePartitionMetrics.acquisitionLockTimeoutPerSec().meanRate() > 0); } @Test @@ -6587,6 +6736,19 @@ public class SharePartitionTest { Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); } + private Number yammerMetricValue(String name) { + try { + Gauge gauge = (Gauge) KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream() + .filter(e -> e.getKey().getMBeanName().contains(name)) + .findFirst() + .orElseThrow() + .getValue(); + return (Number) gauge.value(); + } catch (Exception e) { + return 0; + } + } + private static class SharePartitionBuilder { private int defaultAcquisitionLockTimeoutMs = 30000; @@ -6597,6 +6759,8 @@ public class SharePartitionTest { private ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); private GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class); private SharePartitionState state = SharePartitionState.EMPTY; + private Time time = MOCK_TIME; + private SharePartitionMetrics sharePartitionMetrics = Mockito.mock(SharePartitionMetrics.class); private SharePartitionBuilder withMaxInflightMessages(int maxInflightMessages) { this.maxInflightMessages = maxInflightMessages; @@ -6633,14 +6797,24 @@ public class SharePartitionTest { return this; } + private SharePartitionBuilder withTime(Time time) { + this.time = time; + return this; + } + + private SharePartitionBuilder withSharePartitionMetrics(SharePartitionMetrics sharePartitionMetrics) { + this.sharePartitionMetrics = sharePartitionMetrics; + return this; + } + public static SharePartitionBuilder builder() { return new SharePartitionBuilder(); } public SharePartition build() { return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, 0, maxInflightMessages, maxDeliveryCount, - defaultAcquisitionLockTimeoutMs, mockTimer, MOCK_TIME, persister, replicaManager, groupConfigManager, - state, Mockito.mock(SharePartitionListener.class)); + defaultAcquisitionLockTimeoutMs, mockTimer, time, persister, replicaManager, groupConfigManager, + state, Mockito.mock(SharePartitionListener.class), sharePartitionMetrics); } } } diff --git a/server/src/main/java/org/apache/kafka/server/share/metrics/SharePartitionMetrics.java b/server/src/main/java/org/apache/kafka/server/share/metrics/SharePartitionMetrics.java new file mode 100644 index 00000000000..6ea94d46374 --- /dev/null +++ b/server/src/main/java/org/apache/kafka/server/share/metrics/SharePartitionMetrics.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.share.metrics; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.metrics.KafkaMetricsGroup; + +import com.yammer.metrics.core.Histogram; +import com.yammer.metrics.core.Meter; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +/** + * SharePartitionMetrics is used to track the broker-side metrics for the SharePartition. + */ +public class SharePartitionMetrics implements AutoCloseable { + + public static final String IN_FLIGHT_MESSAGE_COUNT = "InFlightMessageCount"; + public static final String IN_FLIGHT_BATCH_COUNT = "InFlightBatchCount"; + + private static final String ACQUISITION_LOCK_TIMEOUT_PER_SEC = "AcquisitionLockTimeoutPerSec"; + private static final String IN_FLIGHT_BATCH_MESSAGE_COUNT = "InFlightBatchMessageCount"; + private static final String FETCH_LOCK_TIME_MS = "FetchLockTimeMs"; + private static final String FETCH_LOCK_RATIO = "FetchLockRatio"; + + /** + * Metric for the rate of acquisition lock timeouts for records. + */ + private final Meter acquisitionLockTimeoutPerSec; + /** + * Metric for the number of in-flight messages for the batch. + */ + private final Histogram inFlightBatchMessageCount; + /** + * Metric for the time the fetch lock is held. + */ + private final Histogram fetchLockTimeMs; + /** + * Metric for the ratio of fetch lock time to the total time. + */ + private final Histogram fetchLockRatio; + + private final Map tags; + private final KafkaMetricsGroup metricsGroup; + + public SharePartitionMetrics(String groupId, String topic, int partition) { + this.tags = Utils.mkMap( + Utils.mkEntry("group", Objects.requireNonNull(groupId)), + Utils.mkEntry("topic", Objects.requireNonNull(topic)), + Utils.mkEntry("partition", String.valueOf(partition)) + ); + this.metricsGroup = new KafkaMetricsGroup("kafka.server", "SharePartitionMetrics"); + + this.acquisitionLockTimeoutPerSec = metricsGroup.newMeter( + ACQUISITION_LOCK_TIMEOUT_PER_SEC, + "acquisition lock timeout", + TimeUnit.SECONDS, + this.tags); + + this.inFlightBatchMessageCount = metricsGroup.newHistogram( + IN_FLIGHT_BATCH_MESSAGE_COUNT, + true, + this.tags); + + this.fetchLockTimeMs = metricsGroup.newHistogram( + FETCH_LOCK_TIME_MS, + true, + this.tags); + + this.fetchLockRatio = metricsGroup.newHistogram( + FETCH_LOCK_RATIO, + true, + this.tags); + } + + /** + * Register a gauge for the in-flight message count. + * + * @param messageCountSupplier The supplier for the in-flight message count. + */ + public void registerInFlightMessageCount(Supplier messageCountSupplier) { + metricsGroup.newGauge( + IN_FLIGHT_MESSAGE_COUNT, + messageCountSupplier, + this.tags + ); + } + + /** + * Register a gauge for the in-flight batch count. + * + * @param batchCountSupplier The supplier for the in-flight batch count. + */ + public void registerInFlightBatchCount(Supplier batchCountSupplier) { + metricsGroup.newGauge( + IN_FLIGHT_BATCH_COUNT, + batchCountSupplier, + this.tags + ); + } + + public void recordAcquisitionLockTimeoutPerSec(long count) { + acquisitionLockTimeoutPerSec.mark(count); + } + + public void recordInFlightBatchMessageCount(long count) { + inFlightBatchMessageCount.update(count); + } + + public void recordFetchLockTimeMs(long timeMs) { + fetchLockTimeMs.update(timeMs); + } + + public void recordFetchLockRatio(int value) { + fetchLockRatio.update(value); + } + + // Visible for testing + public Meter acquisitionLockTimeoutPerSec() { + return acquisitionLockTimeoutPerSec; + } + + // Visible for testing + public Histogram inFlightBatchMessageCount() { + return inFlightBatchMessageCount; + } + + // Visible for testing + public Histogram fetchLockTimeMs() { + return fetchLockTimeMs; + } + + // Visible for testing + public Histogram fetchLockRatio() { + return fetchLockRatio; + } + + @Override + public void close() throws Exception { + List.of(ACQUISITION_LOCK_TIMEOUT_PER_SEC, + IN_FLIGHT_MESSAGE_COUNT, + IN_FLIGHT_BATCH_COUNT, + IN_FLIGHT_BATCH_MESSAGE_COUNT, + FETCH_LOCK_TIME_MS, + FETCH_LOCK_RATIO + ).forEach(m -> metricsGroup.removeMetric(m, tags)); + } +}