KAFKA-5890; records.lag should use tags for topic and partition rather than using metric name.

This is the implementation of KIP-225.
It marks the previous metrics as deprecated in the documentation and adds new metrics using tags.

Testing verifies that both the new and the old metric report the same value.

Author: cmolter <cmolter@apple.com>

Reviewers: Jiangjie (Becket) Qin <becket.qin@gmail.com>

Closes #4362 from lahabana/kafka-5890
This commit is contained in:
cmolter 2018-01-14 16:18:39 -08:00 committed by Jiangjie Qin
parent 1d1c857596
commit 5d81639907
5 changed files with 85 additions and 22 deletions

View File

@ -1327,16 +1327,25 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
String name = partitionLagMetricName(tp);
Sensor recordsLag = this.metrics.getSensor(name);
if (recordsLag == null) {
Map<String, String> metricTags = new HashMap<>(2);
metricTags.put("topic", tp.topic().replace('.', '_'));
metricTags.put("partition", String.valueOf(tp.partition()));
recordsLag = this.metrics.sensor(name);
recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLag, metricTags), new Value());
recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLagMax, metricTags), new Max());
recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLagAvg, metricTags), new Avg());
recordsLag.add(this.metrics.metricName(name,
metricsRegistry.partitionRecordsLag.group(),
metricsRegistry.partitionRecordsLag.description()), new Value());
metricsRegistry.partitionRecordsLagDeprecated.group(),
metricsRegistry.partitionRecordsLagDeprecated.description()), new Value());
recordsLag.add(this.metrics.metricName(name + "-max",
metricsRegistry.partitionRecordsLagMax.group(),
metricsRegistry.partitionRecordsLagMax.description()), new Max());
metricsRegistry.partitionRecordsLagMaxDeprecated.group(),
metricsRegistry.partitionRecordsLagMaxDeprecated.description()), new Max());
recordsLag.add(this.metrics.metricName(name + "-avg",
metricsRegistry.partitionRecordsLagAvg.group(),
metricsRegistry.partitionRecordsLagAvg.description()), new Avg());
metricsRegistry.partitionRecordsLagAvgDeprecated.group(),
metricsRegistry.partitionRecordsLagAvgDeprecated.description()), new Avg());
}
recordsLag.record(lag);
}

View File

@ -49,6 +49,10 @@ public class FetcherMetricsRegistry {
public MetricNameTemplate partitionRecordsLag;
public MetricNameTemplate partitionRecordsLagMax;
public MetricNameTemplate partitionRecordsLagAvg;
// To remove in 2.0
public MetricNameTemplate partitionRecordsLagDeprecated;
public MetricNameTemplate partitionRecordsLagMaxDeprecated;
public MetricNameTemplate partitionRecordsLagAvgDeprecated;
public FetcherMetricsRegistry() {
this(new HashSet<String>(), "");
@ -117,13 +121,22 @@ public class FetcherMetricsRegistry {
this.topicRecordsConsumedTotal = new MetricNameTemplate("records-consumed-total", groupName,
"The total number of records consumed for a topic", topicTags);
this.partitionRecordsLagDeprecated = new MetricNameTemplate("{topic}-{partition}.records-lag", groupName,
"The latest lag of the partition (DEPRECATED use the tag based version instead)", tags);
this.partitionRecordsLagMaxDeprecated = new MetricNameTemplate("{topic}-{partition}.records-lag-max", groupName,
"The max lag of the partition (DEPRECATED use the tag based version instead)", tags);
this.partitionRecordsLagAvgDeprecated = new MetricNameTemplate("{topic}-{partition}.records-lag-avg", groupName,
"The average lag of the partition (DEPRECATED use the tag based version instead)", tags);
/***** Partition level *****/
this.partitionRecordsLag = new MetricNameTemplate("{topic}-{partition}.records-lag", groupName,
"The latest lag of the partition", tags);
this.partitionRecordsLagMax = new MetricNameTemplate("{topic}-{partition}.records-lag-max", groupName,
"The max lag of the partition", tags);
this.partitionRecordsLagAvg = new MetricNameTemplate("{topic}-{partition}.records-lag-avg", groupName,
"The average lag of the partition", tags);
Set<String> partitionTags = new HashSet<>(topicTags);
partitionTags.add("partition");
this.partitionRecordsLag = new MetricNameTemplate("records-lag", groupName,
"The latest lag of the partition", partitionTags);
this.partitionRecordsLagMax = new MetricNameTemplate("records-lag-max", groupName,
"The max lag of the partition", partitionTags);
this.partitionRecordsLagAvg = new MetricNameTemplate("records-lag-avg", groupName,
"The average lag of the partition", partitionTags);
}
@ -151,6 +164,9 @@ public class FetcherMetricsRegistry {
topicRecordsPerRequestAvg,
topicRecordsConsumedRate,
topicRecordsConsumedTotal,
partitionRecordsLagDeprecated,
partitionRecordsLagAvgDeprecated,
partitionRecordsLagMaxDeprecated,
partitionRecordsLag,
partitionRecordsLagAvg,
partitionRecordsLagMax

View File

@ -1171,7 +1171,10 @@ public class FetcherTest {
subscriptions.seek(tp0, 0);
MetricName maxLagMetric = metrics.metricInstance(metricsRegistry.recordsLagMax);
MetricName partitionLagMetric = metrics.metricName(tp0 + ".records-lag", metricGroup);
Map<String, String> tags = new HashMap<>();
tags.put("topic", tp0.topic());
tags.put("partition", String.valueOf(tp0.partition()));
MetricName partitionLagMetric = metrics.metricName("records-lag", metricGroup, tags);
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric);
@ -1210,7 +1213,12 @@ public class FetcherTest {
subscriptions.seek(tp0, 0);
MetricName maxLagMetric = metrics.metricInstance(metricsRegistry.recordsLagMax);
MetricName partitionLagMetric = metrics.metricName(tp0 + ".records-lag", metricGroup);
Map<String, String> tags = new HashMap<>();
tags.put("topic", tp0.topic());
tags.put("partition", String.valueOf(tp0.partition()));
MetricName partitionLagMetric = metrics.metricName("records-lag", metricGroup, tags);
MetricName partitionLagMetricDeprecated = metrics.metricName(tp0 + ".records-lag", metricGroup);
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric);
@ -1225,6 +1233,9 @@ public class FetcherTest {
KafkaMetric partitionLag = allMetrics.get(partitionLagMetric);
assertEquals(50, partitionLag.value(), EPSILON);
KafkaMetric partitionLagDeprecated = allMetrics.get(partitionLagMetricDeprecated);
assertEquals(50, partitionLagDeprecated.value(), EPSILON);
// recordsFetchLagMax should be lso - offset of the last message after receiving a non-empty FetchResponse
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
TimestampType.CREATE_TIME, 0L);
@ -1237,6 +1248,7 @@ public class FetcherTest {
// verify de-registration of partition lag
subscriptions.unsubscribe();
assertFalse(allMetrics.containsKey(partitionLagMetric));
assertFalse(allMetrics.containsKey(partitionLagMetricDeprecated));
}
@Test

View File

@ -1398,8 +1398,16 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}, "Consumer did not consume any message before timeout.")
assertEquals("should be assigned once", 1, listener0.callsToAssigned)
// Verify the metric exist.
val tags = Collections.singletonMap("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe")
val fetchLag0 = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags))
val tags1 = new util.HashMap[String, String]()
tags1.put("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe")
tags1.put("topic", tp.topic())
tags1.put("partition", String.valueOf(tp.partition()))
val tags2 = new util.HashMap[String, String]()
tags2.put("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe")
tags2.put("topic", tp2.topic())
tags2.put("partition", String.valueOf(tp2.partition()))
val fetchLag0 = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1))
assertNotNull(fetchLag0)
val expectedLag = numMessages - records.count
assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag0.value, epsilon)
@ -1411,8 +1419,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
listener0.callsToAssigned >= 2
}, "Expected rebalance did not occur.")
// Verify the metric has gone
assertNull(consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags)))
assertNull(consumer.metrics.get(new MetricName(tp2 + ".records-lag", "consumer-fetch-manager-metrics", "", tags)))
assertNull(consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1)))
assertNull(consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags2)))
} finally {
consumer.close()
}
@ -1436,15 +1444,24 @@ class PlaintextConsumerTest extends BaseConsumerTest {
!records.records(tp).isEmpty
}, "Consumer did not consume any message before timeout.")
// Verify the metric exist.
val tags = Collections.singletonMap("client-id", "testPerPartitionLagMetricsCleanUpWithAssign")
val fetchLag = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags))
val tags = new util.HashMap[String, String]()
tags.put("client-id", "testPerPartitionLagMetricsCleanUpWithAssign")
tags.put("topic", tp.topic())
tags.put("partition", String.valueOf(tp.partition()))
val fetchLag = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags))
assertNotNull(fetchLag)
val oldTags = Collections.singletonMap("client-id", "testPerPartitionLagMetricsCleanUpWithAssign")
val oldFetchLag = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", oldTags))
assertEquals(fetchLag.metricValue(), oldFetchLag.metricValue())
val expectedLag = numMessages - records.count
assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag.value, epsilon)
consumer.assign(List(tp2).asJava)
TestUtils.waitUntilTrue(() => !consumer.poll(100).isEmpty, "Consumer did not consume any message before timeout.")
assertNull(consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags)))
assertNull(consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags)))
} finally {
consumer.close()
}
@ -1466,8 +1483,16 @@ class PlaintextConsumerTest extends BaseConsumerTest {
records = consumer.poll(100)
!records.isEmpty
}, "Consumer did not consume any message before timeout.")
val tags = Collections.singletonMap("client-id", "testPerPartitionLagWithMaxPollRecords")
val lag = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags))
val oldTags = Collections.singletonMap("client-id", "testPerPartitionLagWithMaxPollRecords")
val oldLag = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", oldTags))
val tags = new util.HashMap[String, String]()
tags.put("client-id", "testPerPartitionLagWithMaxPollRecords")
tags.put("topic", tp.topic())
tags.put("partition", String.valueOf(tp.partition()))
val lag = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags))
assertEquals(oldLag.metricValue(), lag.metricValue())
assertEquals(s"The lag should be ${numMessages - records.count}", numMessages - records.count, lag.value, epsilon)
} finally {
consumer.close()

View File

@ -66,6 +66,7 @@
<li>The kafka artifact in Maven no longer depends on log4j or slf4j-log4j12. Similarly to the kafka-clients artifact, users
can now choose the logging back-end by including the appropriate slf4j module (slf4j-log4j12, logback, etc.). The release
tarball still includes log4j and slf4j-log4j12.</li>
<li><a href="https://cwiki.apache.org/confluence/x/uaBzB">KIP-225</a> changed the metric "records.lag" to use tags for topic and partition. The original version with the name format "{topic}-{partition}.records-lag" is deprecated and will be removed in 2.0.0.</li>
</ul>
<h5><a id="upgrade_110_new_protocols" href="#upgrade_110_new_protocols">New Protocol Versions</a></h5>