diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java index 8f5691d17c0..4802eb7a120 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java @@ -230,7 +230,7 @@ public abstract class AbstractFetch implements Closeable { ); } - metricsManager.recordLatency(resp.requestLatencyMs()); + metricsManager.recordLatency(resp.destination(), resp.requestLatencyMs()); } finally { removePendingFetchRequest(fetchTarget, data.metadata().sessionId()); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java index 49801da3b7c..153279162bc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java @@ -81,8 +81,14 @@ public class FetchMetricsManager { return throttleTime; } - void recordLatency(long requestLatencyMs) { + void recordLatency(String node, long requestLatencyMs) { fetchLatency.record(requestLatencyMs); + if (!node.isEmpty()) { + String nodeTimeName = "node-" + node + ".latency"; + Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName); + if (nodeRequestTime != null) + nodeRequestTime.record(requestLatencyMs); + } } void recordBytesFetched(int bytes) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java index 4fec94d96a4..c7daeb53343 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java @@ -21,6 +21,9 @@ import org.apache.kafka.common.MetricNameTemplate; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -66,14 +69,41 @@ public class FetchMetricsManagerTest { @Test public void testLatency() { - metricsManager.recordLatency(123); + metricsManager.recordLatency("", 123); time.sleep(metrics.config().timeWindowMs() + 1); - metricsManager.recordLatency(456); + metricsManager.recordLatency("", 456); assertEquals(289.5, metricValue(metricsRegistry.fetchLatencyAvg), EPSILON); assertEquals(456, metricValue(metricsRegistry.fetchLatencyMax), EPSILON); } + @Test + public void testNodeLatency() { + String connectionId = "0"; + MetricName nodeLatencyAvg = metrics.metricName("request-latency-avg", "group"); + MetricName nodeLatencyMax = metrics.metricName("request-latency-max", "group"); + registerNodeLatencyMetric(connectionId, nodeLatencyAvg, nodeLatencyMax); + + metricsManager.recordLatency(connectionId, 123); + time.sleep(metrics.config().timeWindowMs() + 1); + metricsManager.recordLatency(connectionId, 456); + + assertEquals(289.5, metricValue(metricsRegistry.fetchLatencyAvg), EPSILON); + assertEquals(456, metricValue(metricsRegistry.fetchLatencyMax), EPSILON); + + assertEquals(289.5, metricValue(nodeLatencyAvg), EPSILON); + assertEquals(456, metricValue(nodeLatencyMax), EPSILON); + + // Record metric against another node. + metricsManager.recordLatency("1", 501); + + assertEquals(360, metricValue(metricsRegistry.fetchLatencyAvg), EPSILON); + assertEquals(501, metricValue(metricsRegistry.fetchLatencyMax), EPSILON); + // Node specific metric should not be affected. + assertEquals(289.5, metricValue(nodeLatencyAvg), EPSILON); + assertEquals(456, metricValue(nodeLatencyMax), EPSILON); + } + @Test public void testBytesFetched() { metricsManager.recordBytesFetched(2); @@ -157,14 +187,24 @@ public class FetchMetricsManagerTest { assertEquals(13, metricValue(metricsRegistry.partitionRecordsLeadAvg, tags), EPSILON); } + private void registerNodeLatencyMetric(String connectionId, MetricName nodeLatencyAvg, MetricName nodeLatencyMax) { + String nodeTimeName = "node-" + connectionId + ".latency"; + Sensor nodeRequestTime = metrics.sensor(nodeTimeName); + nodeRequestTime.add(nodeLatencyAvg, new Avg()); + nodeRequestTime.add(nodeLatencyMax, new Max()); + } + private double metricValue(MetricNameTemplate name) { MetricName metricName = metrics.metricInstance(name); - KafkaMetric metric = metrics.metric(metricName); - return (Double) metric.metricValue(); + return metricValue(metricName); } private double metricValue(MetricNameTemplate name, Map tags) { MetricName metricName = metrics.metricInstance(name, tags); + return metricValue(metricName); + } + + private double metricValue(MetricName metricName) { KafkaMetric metric = metrics.metric(metricName); return (Double) metric.metricValue(); }