KAFKA-18734: Implemented share partition metrics (KIP-1103) (#19045)

The PR implements the SharePartitionMetrics as defined in KIP-1103, with
one change. The metric `FetchLockRatio` is defined as `Meter` in KIP but
is implemented as `HIstogram`. There was a discussion about same on
KIP-1103 discussion where we thought that `FetchLockRatio` is
pre-aggregated but while implemeting the rate from `Meter` can go above
100 as `Meter` defines rate per time period. Hence it makes more sense
to implement metric `FetchLockRatio` as `Histogram`.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Apoorv Mittal 2025-02-28 14:22:27 +00:00 committed by GitHub
parent 8b605bd362
commit 8cf969e00a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 459 additions and 32 deletions

View File

@ -42,6 +42,7 @@ import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey; import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey; import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords; import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.metrics.SharePartitionMetrics;
import org.apache.kafka.server.share.persister.GroupTopicPartitionData; import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
import org.apache.kafka.server.share.persister.PartitionAllData; import org.apache.kafka.server.share.persister.PartitionAllData;
import org.apache.kafka.server.share.persister.PartitionErrorData; import org.apache.kafka.server.share.persister.PartitionErrorData;
@ -84,6 +85,7 @@ import static kafka.server.share.ShareFetchUtils.offsetForTimestamp;
* consumers. The class maintains the state of the records that have been fetched from the leader * consumers. The class maintains the state of the records that have been fetched from the leader
* and are in-flight. * and are in-flight.
*/ */
@SuppressWarnings("ClassDataAbstractionCoupling")
public class SharePartition { public class SharePartition {
private static final Logger log = LoggerFactory.getLogger(SharePartition.class); private static final Logger log = LoggerFactory.getLogger(SharePartition.class);
@ -280,6 +282,11 @@ public class SharePartition {
*/ */
private final long loadStartTimeMs; private final long loadStartTimeMs;
/**
* The share partition metrics is used to track the broker-side metrics for the share partition.
*/
private final SharePartitionMetrics sharePartitionMetrics;
/** /**
* The share partition start offset specifies the partition start offset from which the records * The share partition start offset specifies the partition start offset from which the records
* are cached in the cachedState of the sharePartition. * are cached in the cachedState of the sharePartition.
@ -313,6 +320,21 @@ public class SharePartition {
*/ */
private SharePartitionState partitionState; private SharePartitionState partitionState;
/**
* The fetch lock acquired time is used to track the time when the lock for share partition is acquired.
*/
private long fetchLockAcquiredTimeMs;
/**
* The fetch lock released time is used to track the time when the lock for share partition is released.
*/
private long fetchLockReleasedTimeMs;
/**
* The fetch lock idle duration is used to track the time for which the fetch lock is idle.
*/
private long fetchLockIdleDurationMs;
/** /**
* The replica manager is used to check to see if any delayed share fetch request can be completed because of data * The replica manager is used to check to see if any delayed share fetch request can be completed because of data
* availability due to acquisition lock timeout. * availability due to acquisition lock timeout.
@ -334,9 +356,12 @@ public class SharePartition {
SharePartitionListener listener SharePartitionListener listener
) { ) {
this(groupId, topicIdPartition, leaderEpoch, maxInFlightMessages, maxDeliveryCount, defaultRecordLockDurationMs, this(groupId, topicIdPartition, leaderEpoch, maxInFlightMessages, maxDeliveryCount, defaultRecordLockDurationMs,
timer, time, persister, replicaManager, groupConfigManager, SharePartitionState.EMPTY, listener); timer, time, persister, replicaManager, groupConfigManager, SharePartitionState.EMPTY, listener,
new SharePartitionMetrics(groupId, topicIdPartition.topic(), topicIdPartition.partition()));
} }
// Visible for testing
@SuppressWarnings("ParameterNumber")
SharePartition( SharePartition(
String groupId, String groupId,
TopicIdPartition topicIdPartition, TopicIdPartition topicIdPartition,
@ -350,7 +375,8 @@ public class SharePartition {
ReplicaManager replicaManager, ReplicaManager replicaManager,
GroupConfigManager groupConfigManager, GroupConfigManager groupConfigManager,
SharePartitionState sharePartitionState, SharePartitionState sharePartitionState,
SharePartitionListener listener SharePartitionListener listener,
SharePartitionMetrics sharePartitionMetrics
) { ) {
this.groupId = groupId; this.groupId = groupId;
this.topicIdPartition = topicIdPartition; this.topicIdPartition = topicIdPartition;
@ -371,6 +397,8 @@ public class SharePartition {
this.groupConfigManager = groupConfigManager; this.groupConfigManager = groupConfigManager;
this.fetchOffsetMetadata = new OffsetMetadata(); this.fetchOffsetMetadata = new OffsetMetadata();
this.listener = listener; this.listener = listener;
this.sharePartitionMetrics = sharePartitionMetrics;
this.registerGaugeMetrics();
} }
/** /**
@ -476,6 +504,7 @@ public class SharePartition {
InFlightBatch inFlightBatch = new InFlightBatch(EMPTY_MEMBER_ID, stateBatch.firstOffset(), InFlightBatch inFlightBatch = new InFlightBatch(EMPTY_MEMBER_ID, stateBatch.firstOffset(),
stateBatch.lastOffset(), RecordState.forId(stateBatch.deliveryState()), stateBatch.deliveryCount(), null); stateBatch.lastOffset(), RecordState.forId(stateBatch.deliveryState()), stateBatch.deliveryCount(), null);
cachedState.put(stateBatch.firstOffset(), inFlightBatch); cachedState.put(stateBatch.firstOffset(), inFlightBatch);
sharePartitionMetrics.recordInFlightBatchMessageCount(stateBatch.lastOffset() - stateBatch.firstOffset() + 1);
} }
// Update the endOffset of the partition. // Update the endOffset of the partition.
if (!cachedState.isEmpty()) { if (!cachedState.isEmpty()) {
@ -1265,19 +1294,7 @@ public class SharePartition {
if (nextFetchOffset() != endOffset() + 1) { if (nextFetchOffset() != endOffset() + 1) {
return true; return true;
} }
return numInFlightRecords() < maxInFlightMessages;
lock.readLock().lock();
long numRecords;
try {
if (cachedState.isEmpty()) {
numRecords = 0;
} else {
numRecords = this.endOffset - this.startOffset + 1;
}
} finally {
lock.readLock().unlock();
}
return numRecords < maxInFlightMessages;
} }
/** /**
@ -1291,13 +1308,30 @@ public class SharePartition {
if (stateNotActive()) { if (stateNotActive()) {
return false; return false;
} }
return fetchLock.compareAndSet(false, true); boolean acquired = fetchLock.compareAndSet(false, true);
if (acquired) {
long currentTime = time.hiResClockMs();
fetchLockAcquiredTimeMs = currentTime;
fetchLockIdleDurationMs = fetchLockReleasedTimeMs != 0 ? currentTime - fetchLockReleasedTimeMs : 0;
}
return acquired;
} }
/** /**
* Release the fetch lock once the records are fetched from the leader. * Release the fetch lock once the records are fetched from the leader.
*/ */
void releaseFetchLock() { void releaseFetchLock() {
// Register the metric for the duration the fetch lock was held. Do not register the metric
// if the fetch lock was not acquired.
if (fetchLock.get()) {
long currentTime = time.hiResClockMs();
long acquiredDurationMs = currentTime - fetchLockAcquiredTimeMs;
// Update the metric for the fetch lock time.
sharePartitionMetrics.recordFetchLockTimeMs(acquiredDurationMs);
// Update fetch lock ratio metric.
recordFetchLockRatioMetric(acquiredDurationMs);
fetchLockReleasedTimeMs = currentTime;
}
fetchLock.set(false); fetchLock.set(false);
} }
@ -1326,6 +1360,56 @@ public class SharePartition {
return leaderEpoch; return leaderEpoch;
} }
/**
* Records the fetch lock ratio metric. The metric is the ratio of the time duration the fetch
* lock was acquired to the total time since the last lock acquisition. The total time is calculated
* by adding the duration of the fetch lock idle time to the time the fetch lock was acquired.
*
* @param acquiredDurationMs The time duration the fetch lock was acquired.
*/
// Visible for testing
void recordFetchLockRatioMetric(long acquiredDurationMs) {
if (fetchLockIdleDurationMs < 0) {
// This is just a safe check to avoid negative time for fetch lock idle duration. This
// should not happen in any scenarios. If it does then just return from the method and
// no metric update is an indicator of the issue.
return;
}
// Update the total fetch lock acquired time.
double fetchLockToTotalTime;
if (acquiredDurationMs + fetchLockIdleDurationMs == 0) {
// If the total time is 0 then the ratio is 1 i.e. the fetch lock was acquired for the complete time.
fetchLockToTotalTime = 1.0;
} else if (acquiredDurationMs == 0) {
// If the acquired duration is 0 then the ratio is the calculated by the idle duration.
fetchLockToTotalTime = 1.0 / fetchLockIdleDurationMs;
} else {
fetchLockToTotalTime = acquiredDurationMs * (1.0 / (acquiredDurationMs + fetchLockIdleDurationMs));
}
sharePartitionMetrics.recordFetchLockRatio((int) (fetchLockToTotalTime * 100));
}
private void registerGaugeMetrics() {
sharePartitionMetrics.registerInFlightMessageCount(this::numInFlightRecords);
sharePartitionMetrics.registerInFlightBatchCount(this.cachedState::size);
}
private long numInFlightRecords() {
lock.readLock().lock();
long numRecords;
try {
if (cachedState.isEmpty()) {
numRecords = 0;
} else {
numRecords = this.endOffset - this.startOffset + 1;
}
} finally {
lock.readLock().unlock();
}
return numRecords;
}
private boolean stateNotActive() { private boolean stateNotActive() {
return partitionState() != SharePartitionState.ACTIVE; return partitionState() != SharePartitionState.ACTIVE;
} }
@ -1478,6 +1562,8 @@ public class SharePartition {
RecordState.ACQUIRED, RecordState.ACQUIRED,
1, 1,
timerTask)); timerTask));
// Update the in-flight batch message count metrics for the share partition.
sharePartitionMetrics.recordInFlightBatchMessageCount(acquiredRecords.lastOffset() - acquiredRecords.firstOffset() + 1);
}); });
return result; return result;
} finally { } finally {
@ -2501,6 +2587,7 @@ public class SharePartition {
*/ */
@Override @Override
public void run() { public void run() {
sharePartitionMetrics.recordAcquisitionLockTimeoutPerSec(lastOffset - firstOffset + 1);
releaseAcquisitionLockOnTimeout(memberId, firstOffset, lastOffset); releaseAcquisitionLockOnTimeout(memberId, firstOffset, lastOffset);
} }
} }

View File

@ -48,8 +48,10 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfig; import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupConfigManager; import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy; import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch; import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords; import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.metrics.SharePartitionMetrics;
import org.apache.kafka.server.share.persister.NoOpStatePersister; import org.apache.kafka.server.share.persister.NoOpStatePersister;
import org.apache.kafka.server.share.persister.PartitionFactory; import org.apache.kafka.server.share.persister.PartitionFactory;
import org.apache.kafka.server.share.persister.Persister; import org.apache.kafka.server.share.persister.Persister;
@ -65,6 +67,8 @@ import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.storage.internals.log.OffsetResultHolder; import org.apache.kafka.storage.internals.log.OffsetResultHolder;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import com.yammer.metrics.core.Gauge;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -96,6 +100,8 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class SharePartitionTest { public class SharePartitionTest {
@ -104,7 +110,6 @@ public class SharePartitionTest {
private static final int MAX_DELIVERY_COUNT = 5; private static final int MAX_DELIVERY_COUNT = 5;
private static final TopicIdPartition TOPIC_ID_PARTITION = new TopicIdPartition(Uuid.randomUuid(), 0, "test-topic"); private static final TopicIdPartition TOPIC_ID_PARTITION = new TopicIdPartition(Uuid.randomUuid(), 0, "test-topic");
private static final String MEMBER_ID = "member-1"; private static final String MEMBER_ID = "member-1";
private static Timer mockTimer;
private static final Time MOCK_TIME = new MockTime(); private static final Time MOCK_TIME = new MockTime();
private static final short MAX_IN_FLIGHT_MESSAGES = 200; private static final short MAX_IN_FLIGHT_MESSAGES = 200;
private static final int ACQUISITION_LOCK_TIMEOUT_MS = 100; private static final int ACQUISITION_LOCK_TIMEOUT_MS = 100;
@ -113,16 +118,21 @@ public class SharePartitionTest {
private static final int DEFAULT_FETCH_OFFSET = 0; private static final int DEFAULT_FETCH_OFFSET = 0;
private static final int MAX_FETCH_RECORDS = Integer.MAX_VALUE; private static final int MAX_FETCH_RECORDS = Integer.MAX_VALUE;
private static final byte ACKNOWLEDGE_TYPE_GAP_ID = 0; private static final byte ACKNOWLEDGE_TYPE_GAP_ID = 0;
private static Timer mockTimer;
private SharePartitionMetrics sharePartitionMetrics;
@BeforeEach @BeforeEach
public void setUp() { public void setUp() {
kafka.utils.TestUtils.clearYammerMetrics();
mockTimer = new SystemTimerReaper("share-group-lock-timeout-test-reaper", mockTimer = new SystemTimerReaper("share-group-lock-timeout-test-reaper",
new SystemTimer("share-group-lock-test-timeout")); new SystemTimer("share-group-lock-test-timeout"));
sharePartitionMetrics = new SharePartitionMetrics(GROUP_ID, TOPIC_ID_PARTITION.topic(), TOPIC_ID_PARTITION.partition());
} }
@AfterEach @AfterEach
public void tearDown() throws Exception { public void tearDown() throws Exception {
mockTimer.close(); mockTimer.close();
sharePartitionMetrics.close();
} }
@Test @Test
@ -165,7 +175,7 @@ public class SharePartitionTest {
} }
@Test @Test
public void testMaybeInitialize() { public void testMaybeInitialize() throws InterruptedException {
Persister persister = Mockito.mock(Persister.class); Persister persister = Mockito.mock(Persister.class);
ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class);
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
@ -175,7 +185,10 @@ public class SharePartitionTest {
new PersisterStateBatch(5L, 10L, RecordState.AVAILABLE.id, (short) 2), new PersisterStateBatch(5L, 10L, RecordState.AVAILABLE.id, (short) 2),
new PersisterStateBatch(11L, 15L, RecordState.ARCHIVED.id, (short) 3))))))); new PersisterStateBatch(11L, 15L, RecordState.ARCHIVED.id, (short) 3)))))));
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); SharePartition sharePartition = SharePartitionBuilder.builder()
.withPersister(persister)
.withSharePartitionMetrics(sharePartitionMetrics)
.build();
CompletableFuture<Void> result = sharePartition.maybeInitialize(); CompletableFuture<Void> result = sharePartition.maybeInitialize();
assertTrue(result.isDone()); assertTrue(result.isDone());
@ -201,6 +214,15 @@ public class SharePartitionTest {
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(11L).batchState()); assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(11L).batchState());
assertEquals(3, sharePartition.cachedState().get(11L).batchDeliveryCount()); assertEquals(3, sharePartition.cachedState().get(11L).batchDeliveryCount());
assertNull(sharePartition.cachedState().get(11L).offsetState()); assertNull(sharePartition.cachedState().get(11L).offsetState());
TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 2,
"In-flight batch count should be 2.");
TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 11,
"In-flight message count should be 11.");
assertEquals(11, sharePartitionMetrics.inFlightBatchMessageCount().sum());
assertEquals(2, sharePartitionMetrics.inFlightBatchMessageCount().count());
assertEquals(5, sharePartitionMetrics.inFlightBatchMessageCount().min());
assertEquals(6, sharePartitionMetrics.inFlightBatchMessageCount().max());
} }
@Test @Test
@ -304,7 +326,8 @@ public class SharePartitionTest {
} }
@Test @Test
public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsByDuration() { public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsByDuration()
throws InterruptedException {
Persister persister = Mockito.mock(Persister.class); Persister persister = Mockito.mock(Persister.class);
ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class);
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
@ -332,7 +355,8 @@ public class SharePartitionTest {
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(MOCK_TIME.milliseconds() - TimeUnit.HOURS.toMillis(1), 15L, Optional.empty()); FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(
MOCK_TIME.milliseconds() - TimeUnit.HOURS.toMillis(1), 15L, Optional.empty());
Mockito.doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())). Mockito.doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())).
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
@ -340,6 +364,7 @@ public class SharePartitionTest {
.withPersister(persister) .withPersister(persister)
.withGroupConfigManager(groupConfigManager) .withGroupConfigManager(groupConfigManager)
.withReplicaManager(replicaManager) .withReplicaManager(replicaManager)
.withSharePartitionMetrics(sharePartitionMetrics)
.build(); .build();
CompletableFuture<Void> result = sharePartition.maybeInitialize(); CompletableFuture<Void> result = sharePartition.maybeInitialize();
@ -359,6 +384,11 @@ public class SharePartitionTest {
assertEquals(15, sharePartition.startOffset()); assertEquals(15, sharePartition.startOffset());
assertEquals(15, sharePartition.endOffset()); assertEquals(15, sharePartition.endOffset());
assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH, sharePartition.stateEpoch()); assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH, sharePartition.stateEpoch());
TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 0,
"In-flight batch count should be 0.");
TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 0,
"In-flight message count should be 0.");
} }
@Test @Test
@ -1063,8 +1093,11 @@ public class SharePartitionTest {
} }
@Test @Test
public void testAcquireSingleRecord() { public void testAcquireSingleRecord() throws InterruptedException {
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withSharePartitionMetrics(sharePartitionMetrics)
.build();
MemoryRecords records = memoryRecords(1); MemoryRecords records = memoryRecords(1);
List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 1); List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 1);
@ -1078,11 +1111,20 @@ public class SharePartitionTest {
assertEquals(MEMBER_ID, sharePartition.cachedState().get(0L).batchMemberId()); assertEquals(MEMBER_ID, sharePartition.cachedState().get(0L).batchMemberId());
assertEquals(1, sharePartition.cachedState().get(0L).batchDeliveryCount()); assertEquals(1, sharePartition.cachedState().get(0L).batchDeliveryCount());
assertNull(sharePartition.cachedState().get(0L).offsetState()); assertNull(sharePartition.cachedState().get(0L).offsetState());
TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1,
"In-flight batch count should be 1.");
TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 1,
"In-flight message count should be 1.");
assertEquals(1, sharePartitionMetrics.inFlightBatchMessageCount().sum());
} }
@Test @Test
public void testAcquireMultipleRecords() { public void testAcquireMultipleRecords() throws InterruptedException {
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withSharePartitionMetrics(sharePartitionMetrics)
.build();
MemoryRecords records = memoryRecords(5, 10); MemoryRecords records = memoryRecords(5, 10);
List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 3L, 5); List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 3L, 5);
@ -1096,6 +1138,12 @@ public class SharePartitionTest {
assertEquals(MEMBER_ID, sharePartition.cachedState().get(10L).batchMemberId()); assertEquals(MEMBER_ID, sharePartition.cachedState().get(10L).batchMemberId());
assertEquals(1, sharePartition.cachedState().get(10L).batchDeliveryCount()); assertEquals(1, sharePartition.cachedState().get(10L).batchDeliveryCount());
assertNull(sharePartition.cachedState().get(10L).offsetState()); assertNull(sharePartition.cachedState().get(10L).offsetState());
TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1,
"In-flight batch count should be 1.");
TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 5,
"In-flight message count should be 5.");
assertEquals(5, sharePartitionMetrics.inFlightBatchMessageCount().sum());
} }
@Test @Test
@ -1145,8 +1193,11 @@ public class SharePartitionTest {
} }
@Test @Test
public void testAcquireWithMultipleBatchesAndMaxFetchRecords() { public void testAcquireWithMultipleBatchesAndMaxFetchRecords() throws InterruptedException {
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withSharePartitionMetrics(sharePartitionMetrics)
.build();
// Create 3 batches of records. // Create 3 batches of records.
ByteBuffer buffer = ByteBuffer.allocate(4096); ByteBuffer buffer = ByteBuffer.allocate(4096);
@ -1177,6 +1228,12 @@ public class SharePartitionTest {
assertEquals(MEMBER_ID, sharePartition.cachedState().get(10L).batchMemberId()); assertEquals(MEMBER_ID, sharePartition.cachedState().get(10L).batchMemberId());
assertEquals(1, sharePartition.cachedState().get(10L).batchDeliveryCount()); assertEquals(1, sharePartition.cachedState().get(10L).batchDeliveryCount());
assertNull(sharePartition.cachedState().get(10L).offsetState()); assertNull(sharePartition.cachedState().get(10L).offsetState());
TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1,
"In-flight batch count should be 1.");
TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 20,
"In-flight message count should be 20.");
assertEquals(20, sharePartitionMetrics.inFlightBatchMessageCount().sum());
} }
@Test @Test
@ -1378,8 +1435,12 @@ public class SharePartitionTest {
} }
@Test @Test
public void testAcquireWithBatchSizeAndEndOffsetLargerThanBatchFirstOffset() { public void testAcquireWithBatchSizeAndEndOffsetLargerThanBatchFirstOffset()
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); throws InterruptedException {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withSharePartitionMetrics(sharePartitionMetrics)
.build();
sharePartition.updateCacheAndOffsets(4L); sharePartition.updateCacheAndOffsets(4L);
// Create 2 batches of records. // Create 2 batches of records.
@ -1407,6 +1468,15 @@ public class SharePartitionTest {
assertEquals(2, sharePartition.cachedState().size()); assertEquals(2, sharePartition.cachedState().size());
assertTrue(sharePartition.cachedState().containsKey(4L)); assertTrue(sharePartition.cachedState().containsKey(4L));
assertTrue(sharePartition.cachedState().containsKey(10L)); assertTrue(sharePartition.cachedState().containsKey(10L));
TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 2,
"In-flight batch count should be 2.");
TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 13,
"In-flight message count should be 13.");
assertEquals(13, sharePartitionMetrics.inFlightBatchMessageCount().sum());
assertEquals(2, sharePartitionMetrics.inFlightBatchMessageCount().count());
assertEquals(6, sharePartitionMetrics.inFlightBatchMessageCount().min());
assertEquals(7, sharePartitionMetrics.inFlightBatchMessageCount().max());
} }
@Test @Test
@ -1507,15 +1577,82 @@ public class SharePartitionTest {
Mockito.doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())). Mockito.doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())).
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
SharePartition sharePartition = SharePartitionBuilder.builder().withReplicaManager(replicaManager).build(); Time time = mock(Time.class);
when(time.hiResClockMs())
.thenReturn(100L) // for tracking loadTimeMs
.thenReturn(110L) // for time when lock is acquired
.thenReturn(120L) // for time when lock is released
.thenReturn(140L) // for subsequent lock acquire
.thenReturn(170L); // for subsequent lock release
SharePartition sharePartition = SharePartitionBuilder.builder()
.withReplicaManager(replicaManager)
.withTime(time)
.withSharePartitionMetrics(sharePartitionMetrics)
.build();
sharePartition.maybeInitialize(); sharePartition.maybeInitialize();
assertTrue(sharePartition.maybeAcquireFetchLock()); assertTrue(sharePartition.maybeAcquireFetchLock());
// Lock cannot be acquired again, as already acquired. // Lock cannot be acquired again, as already acquired.
assertFalse(sharePartition.maybeAcquireFetchLock()); assertFalse(sharePartition.maybeAcquireFetchLock());
// Release the lock. // Release the lock.
sharePartition.releaseFetchLock(); sharePartition.releaseFetchLock();
assertEquals(1, sharePartitionMetrics.fetchLockTimeMs().count());
assertEquals(10, sharePartitionMetrics.fetchLockTimeMs().sum());
assertEquals(1, sharePartitionMetrics.fetchLockRatio().count());
// Since first request didn't have any lock idle wait time, the ratio should be 1.
assertEquals(100, sharePartitionMetrics.fetchLockRatio().mean());
// Lock can be acquired again. // Lock can be acquired again.
assertTrue(sharePartition.maybeAcquireFetchLock()); assertTrue(sharePartition.maybeAcquireFetchLock());
// Release lock to update metrics and verify.
sharePartition.releaseFetchLock();
assertEquals(2, sharePartitionMetrics.fetchLockTimeMs().count());
assertEquals(40, sharePartitionMetrics.fetchLockTimeMs().sum());
assertEquals(2, sharePartitionMetrics.fetchLockRatio().count());
// Since the second request had 20ms of idle wait time, the ratio should be 0.6 and mean as 0.8.
assertEquals(80, sharePartitionMetrics.fetchLockRatio().mean());
}
@Test
public void testRecordFetchLockRatioMetric() {
Time time = mock(Time.class);
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withTime(time)
.withSharePartitionMetrics(sharePartitionMetrics)
.build();
// Acquired time and last lock acquisition time is 0;
sharePartition.recordFetchLockRatioMetric(0);
assertEquals(1, sharePartitionMetrics.fetchLockRatio().count());
assertEquals(100, sharePartitionMetrics.fetchLockRatio().mean());
when(time.hiResClockMs())
.thenReturn(10L) // for time when lock is acquired
.thenReturn(80L) // for time when lock is released
.thenReturn(160L); // to update lock idle duration while acquiring lock again.
assertTrue(sharePartition.maybeAcquireFetchLock());
sharePartition.releaseFetchLock();
// Acquired time is 70 but last lock acquisition time was still 0, as it's the first request
// when last acquisition time was recorded. The last acquisition time should be updated to 80.
assertEquals(2, sharePartitionMetrics.fetchLockRatio().count());
assertEquals(100, sharePartitionMetrics.fetchLockRatio().mean());
assertTrue(sharePartition.maybeAcquireFetchLock());
// Update metric again with 0 as acquire time and 80 as idle duration ms.
sharePartition.recordFetchLockRatioMetric(0);
assertEquals(3, sharePartitionMetrics.fetchLockRatio().count());
// Mean should be (100+100+1)/3 = 67, as when idle duration is 80, the ratio should be 1.
assertEquals(67, sharePartitionMetrics.fetchLockRatio().mean());
// Update metric again with 10 as acquire time and 80 as idle duration ms.
sharePartition.recordFetchLockRatioMetric(10);
assertEquals(4, sharePartitionMetrics.fetchLockRatio().count());
// Mean should be (100+100+1+11)/4 = 53, as when idle time is 80 and acquire time 10, the ratio should be 11.
assertEquals(53, sharePartitionMetrics.fetchLockRatio().mean());
} }
@Test @Test
@ -2762,6 +2899,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder() SharePartition sharePartition = SharePartitionBuilder.builder()
.withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withState(SharePartitionState.ACTIVE) .withState(SharePartitionState.ACTIVE)
.withSharePartitionMetrics(sharePartitionMetrics)
.build(); .build();
fetchAcquiredRecords(sharePartition, memoryRecords(1), 1); fetchAcquiredRecords(sharePartition, memoryRecords(1), 1);
@ -2777,6 +2915,9 @@ public class SharePartitionTest {
sharePartition.timer().size() == 0, sharePartition.timer().size() == 0,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED); () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
assertEquals(1, sharePartitionMetrics.acquisitionLockTimeoutPerSec().count());
assertTrue(sharePartitionMetrics.acquisitionLockTimeoutPerSec().meanRate() > 0);
} }
@Test @Test
@ -2784,6 +2925,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder() SharePartition sharePartition = SharePartitionBuilder.builder()
.withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withState(SharePartitionState.ACTIVE) .withState(SharePartitionState.ACTIVE)
.withSharePartitionMetrics(sharePartitionMetrics)
.build(); .build();
fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5); fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5);
@ -2799,6 +2941,9 @@ public class SharePartitionTest {
&& sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask() == null, && sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED); () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
assertEquals(5, sharePartitionMetrics.acquisitionLockTimeoutPerSec().count());
assertTrue(sharePartitionMetrics.acquisitionLockTimeoutPerSec().meanRate() > 0);
} }
@Test @Test
@ -2806,6 +2951,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder() SharePartition sharePartition = SharePartitionBuilder.builder()
.withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withState(SharePartitionState.ACTIVE) .withState(SharePartitionState.ACTIVE)
.withSharePartitionMetrics(sharePartitionMetrics)
.build(); .build();
fetchAcquiredRecords(sharePartition, memoryRecords(5, 0), 5); fetchAcquiredRecords(sharePartition, memoryRecords(5, 0), 5);
@ -2830,6 +2976,9 @@ public class SharePartitionTest {
sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask() == null, sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED); () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
assertEquals(10, sharePartitionMetrics.acquisitionLockTimeoutPerSec().count());
assertTrue(sharePartitionMetrics.acquisitionLockTimeoutPerSec().meanRate() > 0);
} }
@Test @Test
@ -6587,6 +6736,19 @@ public class SharePartitionTest {
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
} }
private Number yammerMetricValue(String name) {
try {
Gauge gauge = (Gauge) KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
.filter(e -> e.getKey().getMBeanName().contains(name))
.findFirst()
.orElseThrow()
.getValue();
return (Number) gauge.value();
} catch (Exception e) {
return 0;
}
}
private static class SharePartitionBuilder { private static class SharePartitionBuilder {
private int defaultAcquisitionLockTimeoutMs = 30000; private int defaultAcquisitionLockTimeoutMs = 30000;
@ -6597,6 +6759,8 @@ public class SharePartitionTest {
private ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); private ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
private GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class); private GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
private SharePartitionState state = SharePartitionState.EMPTY; private SharePartitionState state = SharePartitionState.EMPTY;
private Time time = MOCK_TIME;
private SharePartitionMetrics sharePartitionMetrics = Mockito.mock(SharePartitionMetrics.class);
private SharePartitionBuilder withMaxInflightMessages(int maxInflightMessages) { private SharePartitionBuilder withMaxInflightMessages(int maxInflightMessages) {
this.maxInflightMessages = maxInflightMessages; this.maxInflightMessages = maxInflightMessages;
@ -6633,14 +6797,24 @@ public class SharePartitionTest {
return this; return this;
} }
private SharePartitionBuilder withTime(Time time) {
this.time = time;
return this;
}
private SharePartitionBuilder withSharePartitionMetrics(SharePartitionMetrics sharePartitionMetrics) {
this.sharePartitionMetrics = sharePartitionMetrics;
return this;
}
public static SharePartitionBuilder builder() { public static SharePartitionBuilder builder() {
return new SharePartitionBuilder(); return new SharePartitionBuilder();
} }
public SharePartition build() { public SharePartition build() {
return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, 0, maxInflightMessages, maxDeliveryCount, return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, 0, maxInflightMessages, maxDeliveryCount,
defaultAcquisitionLockTimeoutMs, mockTimer, MOCK_TIME, persister, replicaManager, groupConfigManager, defaultAcquisitionLockTimeoutMs, mockTimer, time, persister, replicaManager, groupConfigManager,
state, Mockito.mock(SharePartitionListener.class)); state, Mockito.mock(SharePartitionListener.class), sharePartitionMetrics);
} }
} }
} }

View File

@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.share.metrics;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.Meter;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
/**
* SharePartitionMetrics is used to track the broker-side metrics for the SharePartition.
*/
public class SharePartitionMetrics implements AutoCloseable {
public static final String IN_FLIGHT_MESSAGE_COUNT = "InFlightMessageCount";
public static final String IN_FLIGHT_BATCH_COUNT = "InFlightBatchCount";
private static final String ACQUISITION_LOCK_TIMEOUT_PER_SEC = "AcquisitionLockTimeoutPerSec";
private static final String IN_FLIGHT_BATCH_MESSAGE_COUNT = "InFlightBatchMessageCount";
private static final String FETCH_LOCK_TIME_MS = "FetchLockTimeMs";
private static final String FETCH_LOCK_RATIO = "FetchLockRatio";
/**
* Metric for the rate of acquisition lock timeouts for records.
*/
private final Meter acquisitionLockTimeoutPerSec;
/**
* Metric for the number of in-flight messages for the batch.
*/
private final Histogram inFlightBatchMessageCount;
/**
* Metric for the time the fetch lock is held.
*/
private final Histogram fetchLockTimeMs;
/**
* Metric for the ratio of fetch lock time to the total time.
*/
private final Histogram fetchLockRatio;
private final Map<String, String> tags;
private final KafkaMetricsGroup metricsGroup;
public SharePartitionMetrics(String groupId, String topic, int partition) {
this.tags = Utils.mkMap(
Utils.mkEntry("group", Objects.requireNonNull(groupId)),
Utils.mkEntry("topic", Objects.requireNonNull(topic)),
Utils.mkEntry("partition", String.valueOf(partition))
);
this.metricsGroup = new KafkaMetricsGroup("kafka.server", "SharePartitionMetrics");
this.acquisitionLockTimeoutPerSec = metricsGroup.newMeter(
ACQUISITION_LOCK_TIMEOUT_PER_SEC,
"acquisition lock timeout",
TimeUnit.SECONDS,
this.tags);
this.inFlightBatchMessageCount = metricsGroup.newHistogram(
IN_FLIGHT_BATCH_MESSAGE_COUNT,
true,
this.tags);
this.fetchLockTimeMs = metricsGroup.newHistogram(
FETCH_LOCK_TIME_MS,
true,
this.tags);
this.fetchLockRatio = metricsGroup.newHistogram(
FETCH_LOCK_RATIO,
true,
this.tags);
}
/**
* Register a gauge for the in-flight message count.
*
* @param messageCountSupplier The supplier for the in-flight message count.
*/
public void registerInFlightMessageCount(Supplier<Long> messageCountSupplier) {
metricsGroup.newGauge(
IN_FLIGHT_MESSAGE_COUNT,
messageCountSupplier,
this.tags
);
}
/**
* Register a gauge for the in-flight batch count.
*
* @param batchCountSupplier The supplier for the in-flight batch count.
*/
public void registerInFlightBatchCount(Supplier<Integer> batchCountSupplier) {
metricsGroup.newGauge(
IN_FLIGHT_BATCH_COUNT,
batchCountSupplier,
this.tags
);
}
public void recordAcquisitionLockTimeoutPerSec(long count) {
acquisitionLockTimeoutPerSec.mark(count);
}
public void recordInFlightBatchMessageCount(long count) {
inFlightBatchMessageCount.update(count);
}
public void recordFetchLockTimeMs(long timeMs) {
fetchLockTimeMs.update(timeMs);
}
public void recordFetchLockRatio(int value) {
fetchLockRatio.update(value);
}
// Visible for testing
public Meter acquisitionLockTimeoutPerSec() {
return acquisitionLockTimeoutPerSec;
}
// Visible for testing
public Histogram inFlightBatchMessageCount() {
return inFlightBatchMessageCount;
}
// Visible for testing
public Histogram fetchLockTimeMs() {
return fetchLockTimeMs;
}
// Visible for testing
public Histogram fetchLockRatio() {
return fetchLockRatio;
}
@Override
public void close() throws Exception {
List.of(ACQUISITION_LOCK_TIMEOUT_PER_SEC,
IN_FLIGHT_MESSAGE_COUNT,
IN_FLIGHT_BATCH_COUNT,
IN_FLIGHT_BATCH_MESSAGE_COUNT,
FETCH_LOCK_TIME_MS,
FETCH_LOCK_RATIO
).forEach(m -> metricsGroup.removeMetric(m, tags));
}
}