KAFKA-17230: Fix for consumer node latency metrics (#16755)

Kafka Consumer client registers node/connection latency metrics in Selector.java but the values against the metric is never recorded. This seems to be an issue since inception. 

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>
This commit is contained in:
Apoorv Mittal 2024-08-01 20:04:19 +01:00 committed by GitHub
parent 15fa241cea
commit 902fc33b27
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 52 additions and 6 deletions

View File

@ -230,7 +230,7 @@ public abstract class AbstractFetch implements Closeable {
); );
} }
metricsManager.recordLatency(resp.requestLatencyMs()); metricsManager.recordLatency(resp.destination(), resp.requestLatencyMs());
} finally { } finally {
removePendingFetchRequest(fetchTarget, data.metadata().sessionId()); removePendingFetchRequest(fetchTarget, data.metadata().sessionId());
} }

View File

@ -81,8 +81,14 @@ public class FetchMetricsManager {
return throttleTime; return throttleTime;
} }
void recordLatency(long requestLatencyMs) { void recordLatency(String node, long requestLatencyMs) {
fetchLatency.record(requestLatencyMs); fetchLatency.record(requestLatencyMs);
if (!node.isEmpty()) {
String nodeTimeName = "node-" + node + ".latency";
Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName);
if (nodeRequestTime != null)
nodeRequestTime.record(requestLatencyMs);
}
} }
void recordBytesFetched(int bytes) { void recordBytesFetched(int bytes) {

View File

@ -21,6 +21,9 @@ import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
@ -66,14 +69,41 @@ public class FetchMetricsManagerTest {
@Test @Test
public void testLatency() { public void testLatency() {
metricsManager.recordLatency(123); metricsManager.recordLatency("", 123);
time.sleep(metrics.config().timeWindowMs() + 1); time.sleep(metrics.config().timeWindowMs() + 1);
metricsManager.recordLatency(456); metricsManager.recordLatency("", 456);
assertEquals(289.5, metricValue(metricsRegistry.fetchLatencyAvg), EPSILON); assertEquals(289.5, metricValue(metricsRegistry.fetchLatencyAvg), EPSILON);
assertEquals(456, metricValue(metricsRegistry.fetchLatencyMax), EPSILON); assertEquals(456, metricValue(metricsRegistry.fetchLatencyMax), EPSILON);
} }
@Test
public void testNodeLatency() {
String connectionId = "0";
MetricName nodeLatencyAvg = metrics.metricName("request-latency-avg", "group");
MetricName nodeLatencyMax = metrics.metricName("request-latency-max", "group");
registerNodeLatencyMetric(connectionId, nodeLatencyAvg, nodeLatencyMax);
metricsManager.recordLatency(connectionId, 123);
time.sleep(metrics.config().timeWindowMs() + 1);
metricsManager.recordLatency(connectionId, 456);
assertEquals(289.5, metricValue(metricsRegistry.fetchLatencyAvg), EPSILON);
assertEquals(456, metricValue(metricsRegistry.fetchLatencyMax), EPSILON);
assertEquals(289.5, metricValue(nodeLatencyAvg), EPSILON);
assertEquals(456, metricValue(nodeLatencyMax), EPSILON);
// Record metric against another node.
metricsManager.recordLatency("1", 501);
assertEquals(360, metricValue(metricsRegistry.fetchLatencyAvg), EPSILON);
assertEquals(501, metricValue(metricsRegistry.fetchLatencyMax), EPSILON);
// Node specific metric should not be affected.
assertEquals(289.5, metricValue(nodeLatencyAvg), EPSILON);
assertEquals(456, metricValue(nodeLatencyMax), EPSILON);
}
@Test @Test
public void testBytesFetched() { public void testBytesFetched() {
metricsManager.recordBytesFetched(2); metricsManager.recordBytesFetched(2);
@ -157,14 +187,24 @@ public class FetchMetricsManagerTest {
assertEquals(13, metricValue(metricsRegistry.partitionRecordsLeadAvg, tags), EPSILON); assertEquals(13, metricValue(metricsRegistry.partitionRecordsLeadAvg, tags), EPSILON);
} }
private void registerNodeLatencyMetric(String connectionId, MetricName nodeLatencyAvg, MetricName nodeLatencyMax) {
String nodeTimeName = "node-" + connectionId + ".latency";
Sensor nodeRequestTime = metrics.sensor(nodeTimeName);
nodeRequestTime.add(nodeLatencyAvg, new Avg());
nodeRequestTime.add(nodeLatencyMax, new Max());
}
private double metricValue(MetricNameTemplate name) { private double metricValue(MetricNameTemplate name) {
MetricName metricName = metrics.metricInstance(name); MetricName metricName = metrics.metricInstance(name);
KafkaMetric metric = metrics.metric(metricName); return metricValue(metricName);
return (Double) metric.metricValue();
} }
private double metricValue(MetricNameTemplate name, Map<String, String> tags) { private double metricValue(MetricNameTemplate name, Map<String, String> tags) {
MetricName metricName = metrics.metricInstance(name, tags); MetricName metricName = metrics.metricInstance(name, tags);
return metricValue(metricName);
}
private double metricValue(MetricName metricName) {
KafkaMetric metric = metrics.metric(metricName); KafkaMetric metric = metrics.metric(metricName);
return (Double) metric.metricValue(); return (Double) metric.metricValue();
} }