KAFKA-18733: Updating share group metrics (1/N) (#18826)

Reviewers: Sushant Mahajan <smahajan@confluent.io>, Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Apoorv Mittal 2025-02-14 08:48:41 +00:00 committed by GitHub
parent e7a2af8414
commit 53543bcf63
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 239 additions and 195 deletions

View File

@ -275,6 +275,11 @@ public class SharePartition {
*/
private final SharePartitionListener listener;
/**
* The load start time is used to track the time taken to load the share partition.
*/
private final long loadStartTimeMs;
/**
* The share partition start offset specifies the partition start offset from which the records
* are cached in the cachedState of the sharePartition.
@ -359,6 +364,7 @@ public class SharePartition {
this.defaultRecordLockDurationMs = defaultRecordLockDurationMs;
this.timer = timer;
this.time = time;
this.loadStartTimeMs = time.hiResClockMs();
this.persister = persister;
this.partitionState = sharePartitionState;
this.replicaManager = replicaManager;
@ -911,6 +917,10 @@ public class SharePartition {
return future;
}
long loadStartTimeMs() {
return loadStartTimeMs;
}
private Optional<Throwable> releaseAcquiredRecordsForPerOffsetBatch(String memberId,
InFlightBatch inFlightBatch,
RecordState recordState,

View File

@ -19,7 +19,6 @@ package kafka.server.share;
import kafka.cluster.PartitionListener;
import kafka.server.ReplicaManager;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
@ -30,11 +29,6 @@ import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.requests.ShareRequestMetadata;
@ -53,6 +47,7 @@ import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey;
import org.apache.kafka.server.share.fetch.PartitionRotateStrategy;
import org.apache.kafka.server.share.fetch.PartitionRotateStrategy.PartitionRotateMetadata;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
import org.apache.kafka.server.share.persister.Persister;
import org.apache.kafka.server.share.session.ShareSession;
import org.apache.kafka.server.share.session.ShareSessionCache;
@ -75,7 +70,6 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@ -167,7 +161,6 @@ public class SharePartitionManager implements AutoCloseable {
int maxFetchRecords,
Persister persister,
GroupConfigManager groupConfigManager,
Metrics metrics,
BrokerTopicStats brokerTopicStats
) {
this(replicaManager,
@ -180,7 +173,7 @@ public class SharePartitionManager implements AutoCloseable {
maxFetchRecords,
persister,
groupConfigManager,
metrics,
new ShareGroupMetrics(time),
brokerTopicStats
);
}
@ -196,7 +189,7 @@ public class SharePartitionManager implements AutoCloseable {
int maxFetchRecords,
Persister persister,
GroupConfigManager groupConfigManager,
Metrics metrics,
ShareGroupMetrics shareGroupMetrics,
BrokerTopicStats brokerTopicStats
) {
this(replicaManager,
@ -211,7 +204,7 @@ public class SharePartitionManager implements AutoCloseable {
maxFetchRecords,
persister,
groupConfigManager,
metrics,
shareGroupMetrics,
brokerTopicStats
);
}
@ -229,7 +222,7 @@ public class SharePartitionManager implements AutoCloseable {
int maxFetchRecords,
Persister persister,
GroupConfigManager groupConfigManager,
Metrics metrics,
ShareGroupMetrics shareGroupMetrics,
BrokerTopicStats brokerTopicStats
) {
this.replicaManager = replicaManager;
@ -242,7 +235,7 @@ public class SharePartitionManager implements AutoCloseable {
this.maxInFlightMessages = maxInFlightMessages;
this.persister = persister;
this.groupConfigManager = groupConfigManager;
this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time);
this.shareGroupMetrics = shareGroupMetrics;
this.maxFetchRecords = maxFetchRecords;
this.brokerTopicStats = brokerTopicStats;
}
@ -312,7 +305,7 @@ public class SharePartitionManager implements AutoCloseable {
future.complete(throwable);
return;
}
acknowledgePartitionBatches.forEach(batch -> batch.acknowledgeTypes().forEach(this.shareGroupMetrics::recordAcknowledgement));
acknowledgePartitionBatches.forEach(batch -> batch.acknowledgeTypes().forEach(shareGroupMetrics::recordAcknowledgement));
future.complete(null);
});
@ -572,6 +565,7 @@ public class SharePartitionManager implements AutoCloseable {
@Override
public void close() throws Exception {
this.timer.close();
this.shareGroupMetrics.close();
}
private ShareSessionKey shareSessionKey(String groupId, Uuid memberId) {
@ -633,8 +627,10 @@ public class SharePartitionManager implements AutoCloseable {
// immediately then the requests might be waiting in purgatory until the share partition
// is initialized. Hence, trigger the completion of all pending delayed share fetch requests
// for the share partition.
if (!initialized)
if (!initialized) {
shareGroupMetrics.partitionLoadTime(sharePartition.loadStartTimeMs());
replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey);
}
});
sharePartitions.put(topicIdPartition, sharePartition);
}
@ -661,7 +657,6 @@ public class SharePartitionManager implements AutoCloseable {
private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitionKey) {
return partitionCacheMap.computeIfAbsent(sharePartitionKey,
k -> {
long start = time.hiResClockMs();
int leaderEpoch = ShareFetchUtils.leaderEpoch(replicaManager, sharePartitionKey.topicIdPartition().topicPartition());
// Attach listener to Partition which shall invoke partition change handlers.
// However, as there could be multiple share partitions (per group name) for a single topic-partition,
@ -683,7 +678,6 @@ public class SharePartitionManager implements AutoCloseable {
groupConfigManager,
listener
);
this.shareGroupMetrics.partitionLoadTime(start);
return partition;
});
}
@ -811,77 +805,4 @@ public class SharePartitionManager implements AutoCloseable {
removeSharePartitionFromCache(sharePartitionKey, partitionCacheMap, replicaManager);
}
}
static class ShareGroupMetrics {
/**
* share-acknowledgement (share-acknowledgement-rate and share-acknowledgement-count) - The total number of offsets acknowledged for share groups (requests to be ack).
* record-acknowledgement (record-acknowledgement-rate and record-acknowledgement-count) - The number of records acknowledged per acknowledgement type.
* partition-load-time (partition-load-time-avg and partition-load-time-max) - The time taken to load the share partitions.
*/
public static final String METRICS_GROUP_NAME = "share-group-metrics";
public static final String RECORD_ACK_SENSOR_PREFIX = "record-acknowledgement";
public static final String RECORD_ACK_RATE = "record-acknowledgement-rate";
public static final String RECORD_ACK_COUNT = "record-acknowledgement-count";
public static final String ACK_TYPE = "ack-type";
public static final String PARTITION_LOAD_TIME_SENSOR = "partition-load-time-sensor";
public static final String PARTITION_LOAD_TIME_AVG = "partition-load-time-avg";
public static final String PARTITION_LOAD_TIME_MAX = "partition-load-time-max";
public static final Map<Byte, String> RECORD_ACKS_MAP = new HashMap<>();
private final Time time;
private final Map<Byte, Sensor> recordAcksSensorMap = new HashMap<>();
private final Sensor partitionLoadTimeSensor;
static {
RECORD_ACKS_MAP.put((byte) 1, AcknowledgeType.ACCEPT.toString());
RECORD_ACKS_MAP.put((byte) 2, AcknowledgeType.RELEASE.toString());
RECORD_ACKS_MAP.put((byte) 3, AcknowledgeType.REJECT.toString());
}
public ShareGroupMetrics(Metrics metrics, Time time) {
this.time = time;
for (Map.Entry<Byte, String> entry : RECORD_ACKS_MAP.entrySet()) {
recordAcksSensorMap.put(entry.getKey(), metrics.sensor(String.format("%s-%s-sensor", RECORD_ACK_SENSOR_PREFIX, entry.getValue())));
recordAcksSensorMap.get(entry.getKey())
.add(new Meter(
metrics.metricName(
RECORD_ACK_RATE,
METRICS_GROUP_NAME,
"Rate of records acknowledged per acknowledgement type.",
ACK_TYPE, entry.getValue()),
metrics.metricName(
RECORD_ACK_COUNT,
METRICS_GROUP_NAME,
"The number of records acknowledged per acknowledgement type.",
ACK_TYPE, entry.getValue())));
}
partitionLoadTimeSensor = metrics.sensor(PARTITION_LOAD_TIME_SENSOR);
partitionLoadTimeSensor.add(metrics.metricName(
PARTITION_LOAD_TIME_AVG,
METRICS_GROUP_NAME,
"The average time in milliseconds to load the share partitions."),
new Avg());
partitionLoadTimeSensor.add(metrics.metricName(
PARTITION_LOAD_TIME_MAX,
METRICS_GROUP_NAME,
"The maximum time in milliseconds to load the share partitions."),
new Max());
}
void recordAcknowledgement(byte ackType) {
// unknown ack types (such as gaps for control records) are intentionally ignored
if (recordAcksSensorMap.containsKey(ackType)) {
recordAcksSensorMap.get(ackType).record();
}
}
void partitionLoadTime(long start) {
partitionLoadTimeSensor.record(time.hiResClockMs() - start);
}
}
}

View File

@ -440,7 +440,6 @@ class BrokerServer(
config.shareGroupConfig.shareFetchMaxFetchRecords,
persister,
groupConfigManager,
metrics,
brokerTopicStats
)

View File

@ -23,7 +23,6 @@ import kafka.server.ReplicaQuota;
import kafka.server.share.SharePartitionManager.SharePartitionListener;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
@ -39,7 +38,6 @@ import org.apache.kafka.common.errors.ShareSessionNotFoundException;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
@ -68,6 +66,7 @@ import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
import org.apache.kafka.server.share.persister.NoOpShareStatePersister;
import org.apache.kafka.server.share.persister.Persister;
import org.apache.kafka.server.share.session.ShareSession;
@ -161,7 +160,8 @@ public class SharePartitionManagerTest {
static final int PARTITION_MAX_BYTES = 40000;
static final int DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL = 1000;
private Time time;
private ReplicaManager mockReplicaManager;
private BrokerTopicStats brokerTopicStats;
private SharePartitionManager sharePartitionManager;
@ -170,6 +170,7 @@ public class SharePartitionManagerTest {
@BeforeEach
public void setUp() {
time = new MockTime();
kafka.utils.TestUtils.clearYammerMetrics();
brokerTopicStats = new BrokerTopicStats();
mockReplicaManager = mock(ReplicaManager.class);
@ -188,11 +189,9 @@ public class SharePartitionManagerTest {
@Test
public void testNewContextReturnsFinalContextWithoutRequestData() {
Time time = new MockTime();
ShareSessionCache cache = new ShareSessionCache(10, 1000);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.withTime(time)
.build();
Uuid tpId0 = Uuid.randomUuid();
@ -219,11 +218,9 @@ public class SharePartitionManagerTest {
@Test
public void testNewContextReturnsFinalContextWithRequestData() {
Time time = new MockTime();
ShareSessionCache cache = new ShareSessionCache(10, 1000);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.withTime(time)
.build();
Uuid tpId0 = Uuid.randomUuid();
@ -256,11 +253,9 @@ public class SharePartitionManagerTest {
@Test
public void testNewContextReturnsFinalContextError() {
Time time = new MockTime();
ShareSessionCache cache = new ShareSessionCache(10, 1000);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.withTime(time)
.build();
Uuid tpId0 = Uuid.randomUuid();
@ -293,11 +288,9 @@ public class SharePartitionManagerTest {
@Test
public void testNewContext() {
Time time = new MockTime();
ShareSessionCache cache = new ShareSessionCache(10, 1000);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.withTime(time)
.build();
Map<Uuid, String> topicNames = new HashMap<>();
@ -402,7 +395,6 @@ public class SharePartitionManagerTest {
@Test
public void testShareSessionExpiration() {
Time time = new MockTime();
ShareSessionCache cache = new ShareSessionCache(2, 1000);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
@ -710,11 +702,9 @@ public class SharePartitionManagerTest {
@Test
public void testGetErroneousAndValidTopicIdPartitions() {
Time time = new MockTime();
ShareSessionCache cache = new ShareSessionCache(10, 1000);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.withTime(time)
.build();
Uuid tpId0 = Uuid.randomUuid();
@ -810,11 +800,9 @@ public class SharePartitionManagerTest {
@Test
public void testShareFetchContextResponseSize() {
Time time = new MockTime();
ShareSessionCache cache = new ShareSessionCache(10, 1000);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.withTime(time)
.build();
Map<Uuid, String> topicNames = new HashMap<>();
@ -1084,9 +1072,6 @@ public class SharePartitionManagerTest {
mockFetchOffsetForTimestamp(mockReplicaManager);
Time time = mock(Time.class);
when(time.hiResClockMs()).thenReturn(0L).thenReturn(100L);
Metrics metrics = new Metrics();
Timer mockTimer = systemTimerReaper();
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
@ -1102,8 +1087,6 @@ public class SharePartitionManagerTest {
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withReplicaManager(mockReplicaManager)
.withTime(time)
.withMetrics(metrics)
.withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
.build();
@ -1128,20 +1111,6 @@ public class SharePartitionManagerTest {
Mockito.verify(mockReplicaManager, times(3)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
Map<MetricName, Consumer<Double>> expectedMetrics = new HashMap<>();
expectedMetrics.put(
metrics.metricName(SharePartitionManager.ShareGroupMetrics.PARTITION_LOAD_TIME_AVG, SharePartitionManager.ShareGroupMetrics.METRICS_GROUP_NAME),
val -> assertEquals((int) 100.0 / 7, val.intValue(), SharePartitionManager.ShareGroupMetrics.PARTITION_LOAD_TIME_AVG)
);
expectedMetrics.put(
metrics.metricName(SharePartitionManager.ShareGroupMetrics.PARTITION_LOAD_TIME_MAX, SharePartitionManager.ShareGroupMetrics.METRICS_GROUP_NAME),
val -> assertEquals(100.0, val, SharePartitionManager.ShareGroupMetrics.PARTITION_LOAD_TIME_MAX)
);
expectedMetrics.forEach((metric, test) -> {
assertTrue(metrics.metrics().containsKey(metric));
test.accept((Double) metrics.metrics().get(metric).metricValue());
});
// Should have 6 total fetches, 3 fetches for topic foo (though 4 partitions but 3 fetches) and 3
// fetches for topic bar (though 3 partitions but 3 fetches).
validateBrokerTopicStatsMetrics(
@ -1163,8 +1132,6 @@ public class SharePartitionManagerTest {
TopicIdPartition tp3 = new TopicIdPartition(barId, new TopicPartition("bar", 1));
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1, tp2, tp3);
final Time time = new MockTime(0, System.currentTimeMillis(), 0);
mockFetchOffsetForTimestamp(mockReplicaManager);
Timer mockTimer = systemTimerReaper();
@ -1179,7 +1146,6 @@ public class SharePartitionManagerTest {
mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, tp3, 1);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withTime(time)
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
@ -1335,18 +1301,20 @@ public class SharePartitionManagerTest {
@Test
public void testCloseSharePartitionManager() throws Exception {
Timer timer = Mockito.mock(SystemTimerReaper.class);
Persister persister = Mockito.mock(Persister.class);
ShareGroupMetrics shareGroupMetrics = Mockito.mock(ShareGroupMetrics.class);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withTimer(timer)
.withShareGroupPersister(persister)
.withShareGroupMetrics(shareGroupMetrics)
.build();
// Verify that 0 calls are made to timer.close() and persister.stop().
// Verify that 0 calls are made to timer.close() and shareGroupMetrics.close().
Mockito.verify(timer, times(0)).close();
Mockito.verify(persister, times(0)).stop();
Mockito.verify(shareGroupMetrics, times(0)).close();
// Closing the sharePartitionManager closes timer object in sharePartitionManager.
sharePartitionManager.close();
// Verify that the timer object in sharePartitionManager is closed by checking the calls to timer.close() and persister.stop().
// Verify that the timer object in sharePartitionManager is closed by checking the calls to timer.close() and shareGroupMetrics.close().
Mockito.verify(timer, times(1)).close();
Mockito.verify(shareGroupMetrics, times(1)).close();
}
@Test
@ -1548,7 +1516,7 @@ public class SharePartitionManagerTest {
}
@Test
public void testAcknowledgeMultiplePartition() {
public void testAcknowledgeMultiplePartition() throws Exception {
String groupId = "grp";
String memberId = Uuid.randomUuid().toString();
@ -1569,10 +1537,10 @@ public class SharePartitionManagerTest {
partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3);
Metrics metrics = new Metrics();
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withMetrics(metrics)
.withShareGroupMetrics(shareGroupMetrics)
.withBrokerTopicStats(brokerTopicStats)
.build();
@ -1603,41 +1571,12 @@ public class SharePartitionManagerTest {
assertEquals(0, result.get(tp3).partitionIndex());
assertEquals(Errors.NONE.code(), result.get(tp3).errorCode());
Map<MetricName, Consumer<Double>> expectedMetrics = new HashMap<>();
expectedMetrics.put(
metrics.metricName(SharePartitionManager.ShareGroupMetrics.RECORD_ACK_COUNT, SharePartitionManager.ShareGroupMetrics.METRICS_GROUP_NAME,
Collections.singletonMap(SharePartitionManager.ShareGroupMetrics.ACK_TYPE, AcknowledgeType.ACCEPT.toString())),
val -> assertEquals(2.0, val)
);
expectedMetrics.put(
metrics.metricName(SharePartitionManager.ShareGroupMetrics.RECORD_ACK_COUNT, SharePartitionManager.ShareGroupMetrics.METRICS_GROUP_NAME,
Collections.singletonMap(SharePartitionManager.ShareGroupMetrics.ACK_TYPE, AcknowledgeType.RELEASE.toString())),
val -> assertEquals(2.0, val)
);
expectedMetrics.put(
metrics.metricName(SharePartitionManager.ShareGroupMetrics.RECORD_ACK_COUNT, SharePartitionManager.ShareGroupMetrics.METRICS_GROUP_NAME,
Collections.singletonMap(SharePartitionManager.ShareGroupMetrics.ACK_TYPE, AcknowledgeType.REJECT.toString())),
val -> assertEquals(2.0, val)
);
expectedMetrics.put(
metrics.metricName(SharePartitionManager.ShareGroupMetrics.RECORD_ACK_RATE, SharePartitionManager.ShareGroupMetrics.METRICS_GROUP_NAME,
Collections.singletonMap(SharePartitionManager.ShareGroupMetrics.ACK_TYPE, AcknowledgeType.ACCEPT.toString())),
val -> assertTrue(val > 0)
);
expectedMetrics.put(
metrics.metricName(SharePartitionManager.ShareGroupMetrics.RECORD_ACK_RATE, SharePartitionManager.ShareGroupMetrics.METRICS_GROUP_NAME,
Collections.singletonMap(SharePartitionManager.ShareGroupMetrics.ACK_TYPE, AcknowledgeType.RELEASE.toString())),
val -> assertTrue(val > 0)
);
expectedMetrics.put(
metrics.metricName(SharePartitionManager.ShareGroupMetrics.RECORD_ACK_RATE, SharePartitionManager.ShareGroupMetrics.METRICS_GROUP_NAME,
Collections.singletonMap(SharePartitionManager.ShareGroupMetrics.ACK_TYPE, AcknowledgeType.REJECT.toString())),
val -> assertTrue(val > 0)
);
expectedMetrics.forEach((metric, test) -> {
assertTrue(metrics.metrics().containsKey(metric));
test.accept((Double) metrics.metrics().get(metric).metricValue());
});
assertEquals(2, shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.ACCEPT.id).count());
assertEquals(2, shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.RELEASE.id).count());
assertEquals(2, shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.REJECT.id).count());
assertTrue(shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.ACCEPT.id).meanRate() > 0);
assertTrue(shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.RELEASE.id).meanRate() > 0);
assertTrue(shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.REJECT.id).meanRate() > 0);
// Should have 3 successful acknowledgement and 1 successful acknowledgement per topic.
validateBrokerTopicStatsMetrics(
@ -1645,6 +1584,7 @@ public class SharePartitionManagerTest {
new TopicMetrics(0, 0, 3, 0),
Map.of(tp1.topic(), new TopicMetrics(0, 0, 1, 0), tp2.topic(), new TopicMetrics(0, 0, 1, 0), tp3.topic(), new TopicMetrics(0, 0, 1, 0))
);
shareGroupMetrics.close();
}
@Test
@ -2183,6 +2123,7 @@ public class SharePartitionManagerTest {
// Keep the initialization future pending, so fetch request is stuck.
CompletableFuture<Void> pendingInitializationFuture = new CompletableFuture<>();
when(sp0.maybeInitialize()).thenReturn(pendingInitializationFuture);
when(sp0.loadStartTimeMs()).thenReturn(10L);
Timer mockTimer = systemTimerReaper();
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
@ -2190,9 +2131,14 @@ public class SharePartitionManagerTest {
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
Time time = mock(Time.class);
when(time.hiResClockMs()).thenReturn(100L);
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withReplicaManager(mockReplicaManager)
.withTime(time)
.withShareGroupMetrics(shareGroupMetrics)
.withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
.build();
@ -2210,14 +2156,89 @@ public class SharePartitionManagerTest {
Mockito.verify(mockReplicaManager, times(0)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
assertFalse(pendingInitializationFuture.isDone());
assertEquals(0, shareGroupMetrics.partitionLoadTimeMs().count());
// Complete the pending initialization future.
pendingInitializationFuture.complete(null);
// Verify the partition load time metrics.
assertEquals(1, shareGroupMetrics.partitionLoadTimeMs().count());
assertEquals(90.0, shareGroupMetrics.partitionLoadTimeMs().min());
assertEquals(90.0, shareGroupMetrics.partitionLoadTimeMs().max());
assertEquals(90.0, shareGroupMetrics.partitionLoadTimeMs().sum());
// Should have 1 fetch recorded.
validateBrokerTopicStatsMetrics(
brokerTopicStats,
new TopicMetrics(1, 0, 0, 0),
Map.of(tp0.topic(), new TopicMetrics(1, 0, 0, 0))
);
shareGroupMetrics.close();
}
@Test
public void testPartitionLoadTimeMetricWithMultiplePartitions() throws Exception {
String groupId = "grp";
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);
SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
// Keep the initialization future pending, so fetch request is stuck.
CompletableFuture<Void> pendingInitializationFuture1 = new CompletableFuture<>();
when(sp0.maybeInitialize()).thenReturn(pendingInitializationFuture1);
when(sp0.loadStartTimeMs()).thenReturn(10L);
CompletableFuture<Void> pendingInitializationFuture2 = new CompletableFuture<>();
when(sp1.maybeInitialize()).thenReturn(pendingInitializationFuture2);
when(sp1.loadStartTimeMs()).thenReturn(40L);
Timer mockTimer = systemTimerReaper();
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
Time time = mock(Time.class);
when(time.hiResClockMs()).thenReturn(100L).thenReturn(200L);
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withReplicaManager(mockReplicaManager)
.withTime(time)
.withShareGroupMetrics(shareGroupMetrics)
.withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
.build();
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future =
sharePartitionManager.fetchMessages(groupId, Uuid.randomUuid().toString(), FETCH_PARAMS, 0,
BATCH_SIZE, partitionMaxBytes);
// Verify that the fetch request is completed.
TestUtils.waitForCondition(
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing in delayed share fetch queue never ended.");
assertFalse(pendingInitializationFuture1.isDone());
assertFalse(pendingInitializationFuture2.isDone());
assertEquals(0, shareGroupMetrics.partitionLoadTimeMs().count());
// Complete the first pending initialization future.
pendingInitializationFuture1.complete(null);
// Verify the partition load time metrics for first partition.
assertEquals(1, shareGroupMetrics.partitionLoadTimeMs().count());
assertEquals(90.0, shareGroupMetrics.partitionLoadTimeMs().min());
assertEquals(90.0, shareGroupMetrics.partitionLoadTimeMs().max());
assertEquals(90.0, shareGroupMetrics.partitionLoadTimeMs().sum());
// Complete the second pending initialization future.
pendingInitializationFuture2.complete(null);
// Verify the partition load time metrics for both partitions.
assertEquals(2, shareGroupMetrics.partitionLoadTimeMs().count());
assertEquals(90.0, shareGroupMetrics.partitionLoadTimeMs().min());
assertEquals(160.0, shareGroupMetrics.partitionLoadTimeMs().max());
assertEquals(250.0, shareGroupMetrics.partitionLoadTimeMs().sum());
shareGroupMetrics.close();
}
@Flaky("KAFKA-18657")
@ -2841,14 +2862,10 @@ public class SharePartitionManagerTest {
new SystemTimer(TIMER_NAME_PREFIX + "-test-timer"));
}
private void assertNoReaperThreadsPendingClose() {
List<String> threads = Thread.getAllStackTraces()
.keySet()
.stream()
.map(Thread::getName)
.filter(name -> name.contains(TIMER_NAME_PREFIX))
.toList();
assertTrue(threads.isEmpty(), "Found unexpected reaper threads: " + threads);
private void assertNoReaperThreadsPendingClose() throws InterruptedException {
TestUtils.waitForCondition(
() -> Thread.getAllStackTraces().keySet().stream().noneMatch(t -> t.getName().contains(TIMER_NAME_PREFIX)),
"Found unexpected reaper threads with name containing: " + TIMER_NAME_PREFIX);
}
private void testSharePartitionListener(
@ -3054,7 +3071,7 @@ public class SharePartitionManagerTest {
private Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
private Persister persister = new NoOpShareStatePersister();
private Timer timer = new MockTimer();
private Metrics metrics = new Metrics();
private ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
private BrokerTopicStats brokerTopicStats;
private SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) {
@ -3087,8 +3104,8 @@ public class SharePartitionManagerTest {
return this;
}
private SharePartitionManagerBuilder withMetrics(Metrics metrics) {
this.metrics = metrics;
private SharePartitionManagerBuilder withShareGroupMetrics(ShareGroupMetrics shareGroupMetrics) {
this.shareGroupMetrics = shareGroupMetrics;
return this;
}
@ -3103,18 +3120,19 @@ public class SharePartitionManagerTest {
public SharePartitionManager build() {
return new SharePartitionManager(replicaManager,
time,
cache,
partitionCacheMap,
DEFAULT_RECORD_LOCK_DURATION_MS,
timer,
MAX_DELIVERY_COUNT,
MAX_IN_FLIGHT_MESSAGES,
MAX_FETCH_RECORDS,
persister,
mock(GroupConfigManager.class),
metrics,
brokerTopicStats);
time,
cache,
partitionCacheMap,
DEFAULT_RECORD_LOCK_DURATION_MS,
timer,
MAX_DELIVERY_COUNT,
MAX_IN_FLIGHT_MESSAGES,
MAX_FETCH_RECORDS,
persister,
mock(GroupConfigManager.class),
shareGroupMetrics,
brokerTopicStats
);
}
}
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.share.metrics;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.Meter;
import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* ShareGroupMetrics is used to track the broker-side metrics for the ShareGroup.
*/
public class ShareGroupMetrics implements AutoCloseable {
// Rate of records acknowledged per acknowledgement type.
private static final String RECORD_ACKNOWLEDGEMENTS_PER_SEC = "RecordAcknowledgementsPerSec";
// The time in milliseconds to load the share partitions.
private static final String PARTITION_LOAD_TIME_MS = "PartitionLoadTimeMs";
private static final String ACK_TYPE_TAG = "ackType";
private final KafkaMetricsGroup metricsGroup;
private final Time time;
private final Map<Byte, Meter> recordAcknowledgementMeterMap;
private final Histogram partitionLoadTimeMs;
public ShareGroupMetrics(Time time) {
this.time = time;
this.metricsGroup = new KafkaMetricsGroup("kafka.server", "ShareGroupMetrics");
this.recordAcknowledgementMeterMap = Arrays.stream(AcknowledgeType.values()).collect(
Collectors.toMap(
type -> type.id,
type -> metricsGroup.newMeter(
RECORD_ACKNOWLEDGEMENTS_PER_SEC,
"records",
TimeUnit.SECONDS,
Map.of(ACK_TYPE_TAG, capitalize(type.toString()))
)
)
);
partitionLoadTimeMs = metricsGroup.newHistogram(PARTITION_LOAD_TIME_MS);
}
public void recordAcknowledgement(byte ackType) {
// unknown ack types (such as gaps for control records) are intentionally ignored
if (recordAcknowledgementMeterMap.containsKey(ackType)) {
recordAcknowledgementMeterMap.get(ackType).mark();
}
}
public void partitionLoadTime(long start) {
partitionLoadTimeMs.update(time.hiResClockMs() - start);
}
public Meter recordAcknowledgementMeter(byte ackType) {
return recordAcknowledgementMeterMap.get(ackType);
}
public Histogram partitionLoadTimeMs() {
return partitionLoadTimeMs;
}
@Override
public void close() throws Exception {
Arrays.stream(AcknowledgeType.values()).forEach(
m -> metricsGroup.removeMetric(RECORD_ACKNOWLEDGEMENTS_PER_SEC, Map.of(ACK_TYPE_TAG, m.toString())));
metricsGroup.removeMetric(PARTITION_LOAD_TIME_MS);
}
private static String capitalize(String string) {
if (string == null || string.isEmpty()) {
return string;
}
return string.substring(0, 1).toUpperCase(Locale.ROOT) + string.substring(1);
}
}