KAFKA-18733: Implemented fetch ratio and partition acquire time metrics (3/N) (#18959)

PR implements the final set of ShareGroupMetrics,
RequestTopicPartitionsFetchRatio and TopicPartitionsAcquireTimeMs, as
defined in KIP-1103:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1103%3A+Additional+metrics+for+cooperative+consumption

Note: Metric `RequestTopicPartitionsFetchRatio` is calculated as
percentage as Histogram API doesn't record double.


Reviewers: Andrew Schofield <aschofield@confluent.io>, Abhinav Dixit <adixit@confluent.io>
This commit is contained in:
Apoorv Mittal 2025-02-21 17:01:39 +00:00 committed by GitHub
parent 8f13e7c207
commit f543eac4fe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 214 additions and 26 deletions

View File

@ -24,11 +24,13 @@ import kafka.server.ReplicaManager;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
@ -37,7 +39,6 @@ import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
@ -62,25 +63,43 @@ public class DelayedShareFetch extends DelayedOperation {
private final ReplicaManager replicaManager;
private final BiConsumer<SharePartitionKey, Throwable> exceptionHandler;
private final PartitionMaxBytesStrategy partitionMaxBytesStrategy;
private final ShareGroupMetrics shareGroupMetrics;
private final Time time;
// The topic partitions that need to be completed for the share fetch request are given by sharePartitions.
// sharePartitions is a subset of shareFetchData. The order of insertion/deletion of entries in sharePartitions is important.
private final LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions;
// Tracks the start time to acquire any share partition for a fetch request.
private long acquireStartTimeMs;
private LinkedHashMap<TopicIdPartition, Long> partitionsAcquired;
private LinkedHashMap<TopicIdPartition, LogReadResult> partitionsAlreadyFetched;
/**
* This function constructs an instance of delayed share fetch operation for completing share fetch requests instantaneously or with delay.
* @param shareFetch - The share fetch parameters of the share fetch request.
* @param replicaManager - The replica manager instance used to read from log/complete the request.
* @param exceptionHandler - The handler to complete share fetch requests with exception.
* @param sharePartitions - The share partitions referenced in the share fetch request.
* This function constructs an instance of delayed share fetch operation for completing share fetch
* requests instantaneously or with delay.
*
* @param shareFetch The share fetch parameters of the share fetch request.
* @param replicaManager The replica manager instance used to read from log/complete the request.
* @param exceptionHandler The handler to complete share fetch requests with exception.
* @param sharePartitions The share partitions referenced in the share fetch request.
* @param shareGroupMetrics The share group metrics to record the metrics.
* @param time The system time.
*/
public DelayedShareFetch(
ShareFetch shareFetch,
ReplicaManager replicaManager,
BiConsumer<SharePartitionKey, Throwable> exceptionHandler,
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions) {
this(shareFetch, replicaManager, exceptionHandler, sharePartitions, PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM));
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions,
ShareGroupMetrics shareGroupMetrics,
Time time
) {
this(shareFetch,
replicaManager,
exceptionHandler,
sharePartitions,
PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM),
shareGroupMetrics,
time
);
}
DelayedShareFetch(
@ -88,7 +107,10 @@ public class DelayedShareFetch extends DelayedOperation {
ReplicaManager replicaManager,
BiConsumer<SharePartitionKey, Throwable> exceptionHandler,
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions,
PartitionMaxBytesStrategy partitionMaxBytesStrategy) {
PartitionMaxBytesStrategy partitionMaxBytesStrategy,
ShareGroupMetrics shareGroupMetrics,
Time time
) {
super(shareFetch.fetchParams().maxWaitMs, Optional.empty());
this.shareFetch = shareFetch;
this.replicaManager = replicaManager;
@ -97,6 +119,9 @@ public class DelayedShareFetch extends DelayedOperation {
this.exceptionHandler = exceptionHandler;
this.sharePartitions = sharePartitions;
this.partitionMaxBytesStrategy = partitionMaxBytesStrategy;
this.shareGroupMetrics = shareGroupMetrics;
this.time = time;
this.acquireStartTimeMs = time.hiResClockMs();
}
@Override
@ -120,16 +145,28 @@ public class DelayedShareFetch extends DelayedOperation {
try {
LinkedHashMap<TopicIdPartition, Long> topicPartitionData;
// tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch.
if (partitionsAcquired.isEmpty())
if (partitionsAcquired.isEmpty()) {
topicPartitionData = acquirablePartitions();
// tryComplete invoked forceComplete, so we can use the data from tryComplete.
else
// The TopicPartitionsAcquireTimeMs metric signifies the tension when acquiring the locks
// for the share partition, hence if no partitions are yet acquired by tryComplete,
// we record the metric here. Do not check if the request has successfully acquired any
// partitions now or not, as then the upper bound of request timeout shall be recorded
// for the metric.
updateAcquireElapsedTimeMetric();
} else {
// tryComplete invoked forceComplete, so we can use the data from tryComplete.
topicPartitionData = partitionsAcquired;
}
if (topicPartitionData.isEmpty()) {
// No locks for share partitions could be acquired, so we complete the request with an empty response.
shareFetch.maybeComplete(Collections.emptyMap());
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), 0);
shareFetch.maybeComplete(Map.of());
return;
} else {
// Update metric to record acquired to requested partitions.
double requestTopicToAcquired = (double) topicPartitionData.size() / shareFetch.partitionMaxBytes().size();
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) (requestTopicToAcquired * 100));
}
log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}",
topicPartitionData, shareFetch.groupId(), shareFetch.fetchParams());
@ -183,6 +220,8 @@ public class DelayedShareFetch extends DelayedOperation {
try {
if (!topicPartitionData.isEmpty()) {
// Update the metric to record the time taken to acquire the locks for the share partitions.
updateAcquireElapsedTimeMetric();
// In case, fetch offset metadata doesn't exist for one or more topic partitions, we do a
// replicaManager.readFromLog to populate the offset metadata and update the fetch offset metadata for
// those topic partitions.
@ -417,6 +456,20 @@ public class DelayedShareFetch extends DelayedOperation {
shareFetch.maybeCompleteWithException(topicIdPartitions, throwable);
}
/**
* The method updates the metric for the time taken to acquire the share partition locks. Also,
* it resets the acquireStartTimeMs to the current time, so that the metric records the time taken
* to acquire the locks for the re-try, if the partitions are re-acquired. The partitions can be
* re-acquired if the fetch request is not completed because of the minBytes or some other condition.
*/
private void updateAcquireElapsedTimeMetric() {
long currentTimeMs = time.hiResClockMs();
shareGroupMetrics.recordTopicPartitionsAcquireTimeMs(shareFetch.groupId(), currentTimeMs - acquireStartTimeMs);
// Reset the acquireStartTimeMs to the current time. If the fetch request is not completed
// and the partitions are re-acquired then metric should record value from the last acquire time.
acquireStartTimeMs = currentTimeMs;
}
// Visible for testing.
LinkedHashMap<TopicIdPartition, LogReadResult> combineLogReadResponse(LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
LinkedHashMap<TopicIdPartition, LogReadResult> existingFetchedData) {

View File

@ -659,7 +659,7 @@ public class SharePartitionManager implements AutoCloseable {
// Add the share fetch to the delayed share fetch purgatory to process the fetch request.
// The request will be added irrespective of whether the share partition is initialized or not.
// Once the share partition is initialized, the delayed share fetch will be completed.
addDelayedShareFetch(new DelayedShareFetch(shareFetch, replicaManager, fencedSharePartitionHandler(), sharePartitions), delayedShareFetchWatchKeys);
addDelayedShareFetch(new DelayedShareFetch(shareFetch, replicaManager, fencedSharePartitionHandler(), sharePartitions, shareGroupMetrics, time), delayedShareFetchWatchKeys);
}
private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitionKey) {

View File

@ -29,6 +29,7 @@ import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.purgatory.DelayedOperationKey;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
import org.apache.kafka.server.share.SharePartitionKey;
@ -36,9 +37,11 @@ import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;
@ -74,6 +77,7 @@ import static kafka.server.share.SharePartitionManagerTest.mockReplicaManagerDel
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.orderedMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
@ -100,6 +104,7 @@ public class DelayedShareFetchTest {
@BeforeEach
public void setUp() {
kafka.utils.TestUtils.clearYammerMetrics();
mockTimer = new SystemTimerReaper("DelayedShareFetchTestReaper",
new SystemTimer("DelayedShareFetchTestTimer"));
}
@ -133,9 +138,12 @@ public class DelayedShareFetchTest {
when(sp0.canAcquireRecords()).thenReturn(false);
when(sp1.canAcquireRecords()).thenReturn(false);
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(new MockTime());
DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
.withSharePartitions(sharePartitions)
.withShareGroupMetrics(shareGroupMetrics)
.build());
// Since there is no partition that can be acquired, tryComplete should return false.
@ -143,6 +151,10 @@ public class DelayedShareFetchTest {
assertFalse(delayedShareFetch.isCompleted());
Mockito.verify(delayedShareFetch, times(0)).releasePartitionLocks(any());
assertTrue(delayedShareFetch.lock().tryLock());
// Metrics shall not be recorded as no partition is acquired.
assertNull(shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId));
assertNull(shareGroupMetrics.topicPartitionsFetchRatio(groupId));
delayedShareFetch.lock().unlock();
}
@ -190,12 +202,17 @@ public class DelayedShareFetchTest {
PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Collections.singleton(tp0));
Time time = mock(Time.class);
when(time.hiResClockMs()).thenReturn(100L).thenReturn(110L);
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
.withSharePartitions(sharePartitions)
.withReplicaManager(replicaManager)
.withExceptionHandler(exceptionHandler)
.withPartitionMaxBytesStrategy(partitionMaxBytesStrategy)
.withShareGroupMetrics(shareGroupMetrics)
.withTime(time)
.build());
assertFalse(delayedShareFetch.isCompleted());
@ -204,6 +221,12 @@ public class DelayedShareFetchTest {
assertFalse(delayedShareFetch.isCompleted());
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any());
assertTrue(delayedShareFetch.lock().tryLock());
// Though the request is not completed but sp0 was acquired and hence the metric should be recorded.
assertEquals(1, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count());
assertEquals(10, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum());
// Since the request is not completed, the fetch ratio should be null.
assertNull(shareGroupMetrics.topicPartitionsFetchRatio(groupId));
delayedShareFetch.lock().unlock();
Mockito.verify(exceptionHandler, times(1)).accept(any(), any());
}
@ -297,11 +320,16 @@ public class DelayedShareFetchTest {
PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Collections.singleton(tp0));
Time time = mock(Time.class);
when(time.hiResClockMs()).thenReturn(120L).thenReturn(140L);
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
.withSharePartitions(sharePartitions)
.withReplicaManager(replicaManager)
.withPartitionMaxBytesStrategy(partitionMaxBytesStrategy)
.withShareGroupMetrics(shareGroupMetrics)
.withTime(time)
.build());
assertFalse(delayedShareFetch.isCompleted());
@ -310,6 +338,11 @@ public class DelayedShareFetchTest {
assertTrue(delayedShareFetch.isCompleted());
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any());
assertTrue(delayedShareFetch.lock().tryLock());
assertEquals(1, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count());
assertEquals(20, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum());
assertEquals(1, shareGroupMetrics.topicPartitionsFetchRatio(groupId).count());
assertEquals(50, shareGroupMetrics.topicPartitionsFetchRatio(groupId).sum());
delayedShareFetch.lock().unlock();
}
@ -338,10 +371,16 @@ public class DelayedShareFetchTest {
when(sp0.canAcquireRecords()).thenReturn(false);
when(sp1.canAcquireRecords()).thenReturn(false);
Time time = mock(Time.class);
when(time.hiResClockMs()).thenReturn(90L).thenReturn(140L);
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
.withReplicaManager(replicaManager)
.withSharePartitions(sharePartitions)
.withShareGroupMetrics(shareGroupMetrics)
.withTime(time)
.build());
assertFalse(delayedShareFetch.isCompleted());
delayedShareFetch.forceComplete();
@ -353,6 +392,12 @@ public class DelayedShareFetchTest {
assertTrue(delayedShareFetch.isCompleted());
Mockito.verify(delayedShareFetch, times(0)).releasePartitionLocks(any());
assertTrue(delayedShareFetch.lock().tryLock());
// As the request is completed by onComplete then both metrics shall be recorded.
assertEquals(1, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count());
assertEquals(50, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum());
assertEquals(1, shareGroupMetrics.topicPartitionsFetchRatio(groupId).count());
assertEquals(0, shareGroupMetrics.topicPartitionsFetchRatio(groupId).sum());
delayedShareFetch.lock().unlock();
}
@ -387,11 +432,16 @@ public class DelayedShareFetchTest {
PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Collections.singleton(tp0));
Time time = mock(Time.class);
when(time.hiResClockMs()).thenReturn(10L).thenReturn(140L);
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
.withReplicaManager(replicaManager)
.withSharePartitions(sharePartitions)
.withPartitionMaxBytesStrategy(partitionMaxBytesStrategy)
.withShareGroupMetrics(shareGroupMetrics)
.withTime(time)
.build());
assertFalse(delayedShareFetch.isCompleted());
delayedShareFetch.forceComplete();
@ -405,6 +455,11 @@ public class DelayedShareFetchTest {
assertTrue(shareFetch.isCompleted());
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any());
assertTrue(delayedShareFetch.lock().tryLock());
assertEquals(1, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count());
assertEquals(130, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum());
assertEquals(1, shareGroupMetrics.topicPartitionsFetchRatio(groupId).count());
assertEquals(50, shareGroupMetrics.topicPartitionsFetchRatio(groupId).sum());
delayedShareFetch.lock().unlock();
}
@ -428,10 +483,12 @@ public class DelayedShareFetchTest {
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp0.canAcquireRecords()).thenReturn(false);
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(new MockTime());
DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
.withReplicaManager(replicaManager)
.withSharePartitions(sharePartitions)
.withShareGroupMetrics(shareGroupMetrics)
.build());
assertFalse(delayedShareFetch.isCompleted());
@ -452,6 +509,10 @@ public class DelayedShareFetchTest {
Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions();
Mockito.verify(delayedShareFetch, times(0)).releasePartitionLocks(any());
assertTrue(delayedShareFetch.lock().tryLock());
// Assert both metrics shall be recorded only once.
assertEquals(1, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count());
assertEquals(1, shareGroupMetrics.topicPartitionsFetchRatio(groupId).count());
delayedShareFetch.lock().unlock();
}
@ -639,12 +700,17 @@ public class DelayedShareFetchTest {
PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Collections.singleton(tp0));
BiConsumer<SharePartitionKey, Throwable> exceptionHandler = mockExceptionHandler();
Time time = mock(Time.class);
when(time.hiResClockMs()).thenReturn(100L).thenReturn(110L).thenReturn(170L);
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
.withSharePartitions(sharePartitions)
.withReplicaManager(replicaManager)
.withExceptionHandler(exceptionHandler)
.withPartitionMaxBytesStrategy(partitionMaxBytesStrategy)
.withShareGroupMetrics(shareGroupMetrics)
.withTime(time)
.build());
// Try complete should return false as the share partition has errored out.
@ -669,6 +735,13 @@ public class DelayedShareFetchTest {
any(), any(), any(ReplicaQuota.class), anyBoolean());
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any());
assertTrue(delayedShareFetch.lock().tryLock());
assertEquals(2, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count());
assertEquals(70, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum());
assertEquals(10, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).min());
assertEquals(60, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).max());
assertEquals(1, shareGroupMetrics.topicPartitionsFetchRatio(groupId).count());
assertEquals(0, shareGroupMetrics.topicPartitionsFetchRatio(groupId).sum());
delayedShareFetch.lock().unlock();
Mockito.verify(exceptionHandler, times(1)).accept(any(), any());
}
@ -1078,11 +1151,13 @@ public class DelayedShareFetchTest {
}
static class DelayedShareFetchBuilder {
ShareFetch shareFetch = mock(ShareFetch.class);
private ShareFetch shareFetch = mock(ShareFetch.class);
private ReplicaManager replicaManager = mock(ReplicaManager.class);
private BiConsumer<SharePartitionKey, Throwable> exceptionHandler = mockExceptionHandler();
private LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = mock(LinkedHashMap.class);
private PartitionMaxBytesStrategy partitionMaxBytesStrategy = mock(PartitionMaxBytesStrategy.class);
private Time time = new MockTime();
private ShareGroupMetrics shareGroupMetrics = mock(ShareGroupMetrics.class);
DelayedShareFetchBuilder withShareFetchData(ShareFetch shareFetch) {
this.shareFetch = shareFetch;
@ -1109,6 +1184,16 @@ public class DelayedShareFetchTest {
return this;
}
private DelayedShareFetchBuilder withShareGroupMetrics(ShareGroupMetrics shareGroupMetrics) {
this.shareGroupMetrics = shareGroupMetrics;
return this;
}
private DelayedShareFetchBuilder withTime(Time time) {
this.time = time;
return this;
}
public static DelayedShareFetchBuilder builder() {
return new DelayedShareFetchBuilder();
}
@ -1119,7 +1204,9 @@ public class DelayedShareFetchTest {
replicaManager,
exceptionHandler,
sharePartitions,
partitionMaxBytesStrategy);
partitionMaxBytesStrategy,
shareGroupMetrics,
time);
}
}
}

View File

@ -2259,7 +2259,7 @@ public class SharePartitionManagerTest {
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
Time time = mock(Time.class);
when(time.hiResClockMs()).thenReturn(100L).thenReturn(200L);
when(time.hiResClockMs()).thenReturn(100L);
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
@ -2292,9 +2292,9 @@ public class SharePartitionManagerTest {
pendingInitializationFuture2.complete(null);
// Verify the partition load time metrics for both partitions.
assertEquals(2, shareGroupMetrics.partitionLoadTimeMs().count());
assertEquals(90.0, shareGroupMetrics.partitionLoadTimeMs().min());
assertEquals(160.0, shareGroupMetrics.partitionLoadTimeMs().max());
assertEquals(250.0, shareGroupMetrics.partitionLoadTimeMs().sum());
assertEquals(60.0, shareGroupMetrics.partitionLoadTimeMs().min());
assertEquals(90.0, shareGroupMetrics.partitionLoadTimeMs().max());
assertEquals(150.0, shareGroupMetrics.partitionLoadTimeMs().sum());
shareGroupMetrics.close();
}

View File

@ -63,6 +63,7 @@ import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory
import org.apache.kafka.server.share.SharePartitionKey
import org.apache.kafka.server.share.fetch.{DelayedShareFetchGroupKey, DelayedShareFetchKey, ShareFetch}
import org.apache.kafka.server.share.metrics.ShareGroupMetrics
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData}
import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
@ -6101,7 +6102,9 @@ class ReplicaManagerTest {
shareFetch,
rm,
mock(classOf[BiConsumer[SharePartitionKey, Throwable]]),
sharePartitions))
sharePartitions,
mock(classOf[ShareGroupMetrics]),
time))
val delayedShareFetchWatchKeys : util.List[DelayedShareFetchKey] = new util.ArrayList[DelayedShareFetchKey]
partitionMaxBytes.keySet.forEach((topicIdPartition: TopicIdPartition) => delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId, topicIdPartition.partition)))

View File

@ -26,6 +26,7 @@ import com.yammer.metrics.core.Meter;
import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -33,16 +34,32 @@ import java.util.stream.Collectors;
* ShareGroupMetrics is used to track the broker-side metrics for the ShareGroup.
*/
public class ShareGroupMetrics implements AutoCloseable {
// Rate of records acknowledged per acknowledgement type.
private static final String RECORD_ACKNOWLEDGEMENTS_PER_SEC = "RecordAcknowledgementsPerSec";
// The time in milliseconds to load the share partitions.
private static final String PARTITION_LOAD_TIME_MS = "PartitionLoadTimeMs";
private static final String TOPIC_PARTITIONS_FETCH_RATIO = "RequestTopicPartitionsFetchRatio";
private static final String TOPIC_PARTITIONS_ACQUIRE_TIME_MS = "TopicPartitionsAcquireTimeMs";
private static final String ACK_TYPE_TAG = "ackType";
/**
* Metric for the rate of records acknowledged per acknowledgement type.
*/
private final Map<Byte, Meter> recordAcknowledgementMeterMap;
/**
* Metric for the time taken to load the share partitions.
*/
private final Histogram partitionLoadTimeMs;
/**
* Metric for the ratio of topic partitions fetched to the total number of topic partitions requested, per group.
*/
private final Map<String, Histogram> topicPartitionsFetchRatio;
/**
* Metric for the time taken to acquire topic partitions for a group.
*/
private final Map<String, Histogram> topicPartitionsAcquireTimeMs;
private final KafkaMetricsGroup metricsGroup;
private final Time time;
private final Map<Byte, Meter> recordAcknowledgementMeterMap;
private final Histogram partitionLoadTimeMs;
public ShareGroupMetrics(Time time) {
this.time = time;
@ -58,7 +75,9 @@ public class ShareGroupMetrics implements AutoCloseable {
)
)
);
partitionLoadTimeMs = metricsGroup.newHistogram(PARTITION_LOAD_TIME_MS);
this.partitionLoadTimeMs = metricsGroup.newHistogram(PARTITION_LOAD_TIME_MS);
this.topicPartitionsFetchRatio = new ConcurrentHashMap<>();
this.topicPartitionsAcquireTimeMs = new ConcurrentHashMap<>();
}
public void recordAcknowledgement(byte ackType) {
@ -76,19 +95,45 @@ public class ShareGroupMetrics implements AutoCloseable {
partitionLoadTimeMs.update(time.hiResClockMs() - start);
}
public void recordTopicPartitionsFetchRatio(String groupId, long value) {
topicPartitionsFetchRatio.computeIfAbsent(groupId,
k -> metricsGroup.newHistogram(TOPIC_PARTITIONS_FETCH_RATIO, true, Map.of("group", groupId)));
topicPartitionsFetchRatio.get(groupId).update(value);
}
public void recordTopicPartitionsAcquireTimeMs(String groupId, long timeMs) {
topicPartitionsAcquireTimeMs.computeIfAbsent(groupId,
k -> metricsGroup.newHistogram(TOPIC_PARTITIONS_ACQUIRE_TIME_MS, true, Map.of("group", groupId)));
topicPartitionsAcquireTimeMs.get(groupId).update(timeMs);
}
// Visible for testing
public Meter recordAcknowledgementMeter(byte ackType) {
return recordAcknowledgementMeterMap.get(ackType);
}
// Visible for testing
public Histogram partitionLoadTimeMs() {
return partitionLoadTimeMs;
}
// Visible for testing
public Histogram topicPartitionsFetchRatio(String groupId) {
return topicPartitionsFetchRatio.get(groupId);
}
// Visible for testing
public Histogram topicPartitionsAcquireTimeMs(String groupId) {
return topicPartitionsAcquireTimeMs.get(groupId);
}
@Override
public void close() throws Exception {
Arrays.stream(AcknowledgeType.values()).forEach(
m -> metricsGroup.removeMetric(RECORD_ACKNOWLEDGEMENTS_PER_SEC, Map.of(ACK_TYPE_TAG, m.toString())));
metricsGroup.removeMetric(PARTITION_LOAD_TIME_MS);
topicPartitionsFetchRatio.forEach((k, v) -> metricsGroup.removeMetric(TOPIC_PARTITIONS_FETCH_RATIO, Map.of("group", k)));
topicPartitionsAcquireTimeMs.forEach((k, v) -> metricsGroup.removeMetric(TOPIC_PARTITIONS_ACQUIRE_TIME_MS, Map.of("group", k)));
}
private static String capitalize(String string) {