KAFKA-18232: Add share group state topic prune metrics. (#18174)

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Sushant Mahajan 2025-01-20 20:47:15 +05:30 committed by GitHub
parent 71495a2013
commit 06a5e258e4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 157 additions and 9 deletions

View File

@ -319,6 +319,10 @@ public class ShareCoordinatorService implements ShareCoordinator {
fut.completeExceptionally(exp);
return;
}
shareCoordinatorMetrics.recordPrune(
off,
tp
);
fut.complete(null);
// Best effort prevention of issuing duplicate delete calls.
lastPrunedOffsets.put(tp, off);

View File

@ -23,6 +23,7 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetrics;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetricsShard;
@ -46,6 +47,8 @@ public class ShareCoordinatorMetrics extends CoordinatorMetrics implements AutoC
public static final String SHARE_COORDINATOR_WRITE_SENSOR_NAME = "ShareCoordinatorWrite";
public static final String SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME = "ShareCoordinatorWriteLatency";
public static final String SHARE_COORDINATOR_STATE_TOPIC_PRUNE_SENSOR_NAME = "ShareCoordinatorStateTopicPruneSensorName";
private Map<TopicPartition, ShareGroupPruneMetrics> pruneMetrics = new ConcurrentHashMap<>();
/**
* Global sensors. These are shared across all metrics shards.
@ -92,6 +95,7 @@ public class ShareCoordinatorMetrics extends CoordinatorMetrics implements AutoC
SHARE_COORDINATOR_WRITE_SENSOR_NAME,
SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME
).forEach(metrics::removeSensor);
pruneMetrics.values().forEach(v -> metrics.removeSensor(v.pruneSensor.name()));
}
@Override
@ -153,4 +157,31 @@ public class ShareCoordinatorMetrics extends CoordinatorMetrics implements AutoC
globalSensors.get(sensorName).record();
}
}
public void recordPrune(double value, TopicPartition tp) {
pruneMetrics.computeIfAbsent(tp, k -> new ShareGroupPruneMetrics(tp))
.pruneSensor.record(value);
}
private class ShareGroupPruneMetrics {
private final Sensor pruneSensor;
ShareGroupPruneMetrics(TopicPartition tp) {
String sensorNameSuffix = tp.toString();
Map<String, String> tags = Map.of(
"topic", tp.topic(),
"partition", Integer.toString(tp.partition())
);
pruneSensor = metrics.sensor(SHARE_COORDINATOR_STATE_TOPIC_PRUNE_SENSOR_NAME + sensorNameSuffix);
pruneSensor.add(
metrics.metricName("last-pruned-offset",
METRICS_GROUP,
"The offset at which the share-group state topic was last pruned.",
tags),
new Value()
);
}
}
}

View File

@ -52,6 +52,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@ -971,11 +972,13 @@ class ShareCoordinatorServiceTest {
CompletableFuture.completedFuture(Optional.of(11L))
);
Metrics metrics = new Metrics();
ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
new ShareCoordinatorMetrics(),
new ShareCoordinatorMetrics(metrics),
time,
timer,
writer
@ -1007,6 +1010,10 @@ class ShareCoordinatorServiceTest {
verify(writer, times(2))
.deleteRecords(any(), anyLong());
checkMetrics(metrics);
checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, true);
service.shutdown();
}
@ -1058,11 +1065,13 @@ class ShareCoordinatorServiceTest {
CompletableFuture.completedFuture(Optional.of(21L))
);
Metrics metrics = new Metrics();
ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
new ShareCoordinatorMetrics(),
new ShareCoordinatorMetrics(metrics),
time,
timer,
writer
@ -1094,6 +1103,11 @@ class ShareCoordinatorServiceTest {
verify(writer, times(4))
.deleteRecords(any(), anyLong());
checkMetrics(metrics);
checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, true);
checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1, false);
service.shutdown();
}
@ -1111,11 +1125,13 @@ class ShareCoordinatorServiceTest {
any()
)).thenReturn(CompletableFuture.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
Metrics metrics = new Metrics();
ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
new ShareCoordinatorMetrics(),
new ShareCoordinatorMetrics(metrics),
time,
timer,
writer
@ -1139,6 +1155,10 @@ class ShareCoordinatorServiceTest {
verify(writer, times(0))
.deleteRecords(any(), anyLong());
checkMetrics(metrics);
checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, false);
service.shutdown();
}
@ -1156,11 +1176,13 @@ class ShareCoordinatorServiceTest {
any()
)).thenReturn(CompletableFuture.completedFuture(Optional.of(20L)));
Metrics metrics = new Metrics();
ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
new ShareCoordinatorMetrics(),
new ShareCoordinatorMetrics(metrics),
time,
timer,
writer
@ -1184,6 +1206,9 @@ class ShareCoordinatorServiceTest {
verify(writer, times(1))
.deleteRecords(any(), eq(20L));
checkMetrics(metrics);
service.shutdown();
}
@ -1201,11 +1226,12 @@ class ShareCoordinatorServiceTest {
any()
)).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
Metrics metrics = new Metrics();
ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
new ShareCoordinatorMetrics(),
new ShareCoordinatorMetrics(metrics),
time,
timer,
writer
@ -1229,6 +1255,10 @@ class ShareCoordinatorServiceTest {
verify(writer, times(0))
.deleteRecords(any(), anyLong());
checkMetrics(metrics);
checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, false);
service.shutdown();
}
@ -1257,11 +1287,12 @@ class ShareCoordinatorServiceTest {
CompletableFuture.completedFuture(Optional.of(10L))
);
Metrics metrics = new Metrics();
ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
new ShareCoordinatorMetrics(),
new ShareCoordinatorMetrics(metrics),
time,
timer,
writer
@ -1293,6 +1324,10 @@ class ShareCoordinatorServiceTest {
verify(writer, times(1))
.deleteRecords(any(), anyLong());
checkMetrics(metrics);
checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, true);
service.shutdown();
}
@ -1325,11 +1360,13 @@ class ShareCoordinatorServiceTest {
CompletableFuture.completedFuture(Optional.of(10L))
);
Metrics metrics = new Metrics();
ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
new ShareCoordinatorMetrics(),
new ShareCoordinatorMetrics(metrics),
time,
timer,
writer
@ -1361,6 +1398,36 @@ class ShareCoordinatorServiceTest {
verify(writer, times(2))
.deleteRecords(any(), anyLong());
checkMetrics(metrics);
checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, true);
service.shutdown();
}
private void checkMetrics(Metrics metrics) {
Set<MetricName> usualMetrics = new HashSet<>(Arrays.asList(
metrics.metricName("write-latency-avg", ShareCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("write-latency-max", ShareCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("write-rate", ShareCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("write-total", ShareCoordinatorMetrics.METRICS_GROUP)
));
usualMetrics.forEach(metric -> assertTrue(metrics.metrics().containsKey(metric)));
}
private void checkPruneMetric(Metrics metrics, String topic, int partition, boolean checkPresence) {
boolean isPresent = metrics.metrics().containsKey(
metrics.metricName(
"last-pruned-offset",
ShareCoordinatorMetrics.METRICS_GROUP,
"The offset at which the share-group state topic was last pruned.",
Map.of(
"topic", topic,
"partition", Integer.toString(partition)
)
)
);
assertEquals(checkPresence, isPresent);
}
}

View File

@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.share.metrics;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
@ -27,10 +28,12 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import static org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME;
import static org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ShareCoordinatorMetricsTest {
@ -46,14 +49,21 @@ public class ShareCoordinatorMetricsTest {
metrics.metricName("write-latency-max", ShareCoordinatorMetrics.METRICS_GROUP)
));
ShareCoordinatorMetrics ignored = new ShareCoordinatorMetrics(metrics);
ShareCoordinatorMetrics coordMetrics = new ShareCoordinatorMetrics(metrics);
for (MetricName metricName : expectedMetrics) {
assertTrue(metrics.metrics().containsKey(metricName));
}
assertFalse(metrics.metrics().containsKey(pruneMetricName(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1)));
coordMetrics.recordPrune(
10.0,
new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1)
);
assertTrue(metrics.metrics().containsKey(pruneMetricName(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1)));
}
@Test
public void testGlobalSensors() {
public void testShardGlobalSensors() {
MockTime time = new MockTime();
Metrics metrics = new Metrics(time);
ShareCoordinatorMetrics coordinatorMetrics = new ShareCoordinatorMetrics(metrics);
@ -71,7 +81,43 @@ public class ShareCoordinatorMetricsTest {
assertMetricValue(metrics, metrics.metricName("write-latency-max", ShareCoordinatorMetrics.METRICS_GROUP), 30.0);
}
@Test
public void testCoordinatorGlobalSensors() {
MockTime time = new MockTime();
Metrics metrics = new Metrics(time);
ShareCoordinatorMetrics coordinatorMetrics = new ShareCoordinatorMetrics(metrics);
coordinatorMetrics.record(SHARE_COORDINATOR_WRITE_SENSOR_NAME);
assertMetricValue(metrics, metrics.metricName("write-rate", ShareCoordinatorMetrics.METRICS_GROUP), 1.0 / 30); //sampled stats
assertMetricValue(metrics, metrics.metricName("write-total", ShareCoordinatorMetrics.METRICS_GROUP), 1.0);
coordinatorMetrics.record(SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME, 20);
coordinatorMetrics.record(SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME, 30);
assertMetricValue(metrics, metrics.metricName("write-latency-avg", ShareCoordinatorMetrics.METRICS_GROUP), 50.0 / 2);
assertMetricValue(metrics, metrics.metricName("write-latency-max", ShareCoordinatorMetrics.METRICS_GROUP), 30.0);
assertFalse(metrics.metrics().containsKey(pruneMetricName(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1)));
coordinatorMetrics.recordPrune(
10.0,
new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1)
);
assertMetricValue(metrics, pruneMetricName(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1), 10.0);
}
private void assertMetricValue(Metrics metrics, MetricName metricName, double val) {
assertEquals(val, metrics.metric(metricName).metricValue());
}
private MetricName pruneMetricName(Metrics metrics, String topic, Integer partition) {
return metrics.metricName(
"last-pruned-offset",
ShareCoordinatorMetrics.METRICS_GROUP,
"The offset at which the share-group state topic was last pruned.",
Map.of(
"topic", topic,
"partition", Integer.toString(partition)
)
);
}
}