diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 4c68f1f8483..5dc0b26f8fb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -1327,16 +1327,25 @@ public class Fetcher implements SubscriptionState.Listener, Closeable { String name = partitionLagMetricName(tp); Sensor recordsLag = this.metrics.getSensor(name); if (recordsLag == null) { + Map metricTags = new HashMap<>(2); + metricTags.put("topic", tp.topic().replace('.', '_')); + metricTags.put("partition", String.valueOf(tp.partition())); + recordsLag = this.metrics.sensor(name); + + recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLag, metricTags), new Value()); + recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLagMax, metricTags), new Max()); + recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLagAvg, metricTags), new Avg()); + recordsLag.add(this.metrics.metricName(name, - metricsRegistry.partitionRecordsLag.group(), - metricsRegistry.partitionRecordsLag.description()), new Value()); + metricsRegistry.partitionRecordsLagDeprecated.group(), + metricsRegistry.partitionRecordsLagDeprecated.description()), new Value()); recordsLag.add(this.metrics.metricName(name + "-max", - metricsRegistry.partitionRecordsLagMax.group(), - metricsRegistry.partitionRecordsLagMax.description()), new Max()); + metricsRegistry.partitionRecordsLagMaxDeprecated.group(), + metricsRegistry.partitionRecordsLagMaxDeprecated.description()), new Max()); recordsLag.add(this.metrics.metricName(name + "-avg", - metricsRegistry.partitionRecordsLagAvg.group(), - metricsRegistry.partitionRecordsLagAvg.description()), new Avg()); + metricsRegistry.partitionRecordsLagAvgDeprecated.group(), + metricsRegistry.partitionRecordsLagAvgDeprecated.description()), new Avg()); } recordsLag.record(lag); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java index acf42ec339f..f15ac062abf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java @@ -49,6 +49,10 @@ public class FetcherMetricsRegistry { public MetricNameTemplate partitionRecordsLag; public MetricNameTemplate partitionRecordsLagMax; public MetricNameTemplate partitionRecordsLagAvg; + // To remove in 2.0 + public MetricNameTemplate partitionRecordsLagDeprecated; + public MetricNameTemplate partitionRecordsLagMaxDeprecated; + public MetricNameTemplate partitionRecordsLagAvgDeprecated; public FetcherMetricsRegistry() { this(new HashSet(), ""); @@ -117,13 +121,22 @@ public class FetcherMetricsRegistry { this.topicRecordsConsumedTotal = new MetricNameTemplate("records-consumed-total", groupName, "The total number of records consumed for a topic", topicTags); + this.partitionRecordsLagDeprecated = new MetricNameTemplate("{topic}-{partition}.records-lag", groupName, + "The latest lag of the partition (DEPRECATED use the tag based version instead)", tags); + this.partitionRecordsLagMaxDeprecated = new MetricNameTemplate("{topic}-{partition}.records-lag-max", groupName, + "The max lag of the partition (DEPRECATED use the tag based version instead)", tags); + this.partitionRecordsLagAvgDeprecated = new MetricNameTemplate("{topic}-{partition}.records-lag-avg", groupName, + "The average lag of the partition (DEPRECATED use the tag based version instead)", tags); + /***** Partition level *****/ - this.partitionRecordsLag = new MetricNameTemplate("{topic}-{partition}.records-lag", groupName, - "The latest lag of the partition", tags); - this.partitionRecordsLagMax = new MetricNameTemplate("{topic}-{partition}.records-lag-max", groupName, - "The max lag of the partition", tags); - this.partitionRecordsLagAvg = new MetricNameTemplate("{topic}-{partition}.records-lag-avg", groupName, - "The average lag of the partition", tags); + Set partitionTags = new HashSet<>(topicTags); + partitionTags.add("partition"); + this.partitionRecordsLag = new MetricNameTemplate("records-lag", groupName, + "The latest lag of the partition", partitionTags); + this.partitionRecordsLagMax = new MetricNameTemplate("records-lag-max", groupName, + "The max lag of the partition", partitionTags); + this.partitionRecordsLagAvg = new MetricNameTemplate("records-lag-avg", groupName, + "The average lag of the partition", partitionTags); } @@ -151,6 +164,9 @@ public class FetcherMetricsRegistry { topicRecordsPerRequestAvg, topicRecordsConsumedRate, topicRecordsConsumedTotal, + partitionRecordsLagDeprecated, + partitionRecordsLagAvgDeprecated, + partitionRecordsLagMaxDeprecated, partitionRecordsLag, partitionRecordsLagAvg, partitionRecordsLagMax diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 7c1d9ba4208..26d7a50cf60 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -1171,7 +1171,10 @@ public class FetcherTest { subscriptions.seek(tp0, 0); MetricName maxLagMetric = metrics.metricInstance(metricsRegistry.recordsLagMax); - MetricName partitionLagMetric = metrics.metricName(tp0 + ".records-lag", metricGroup); + Map tags = new HashMap<>(); + tags.put("topic", tp0.topic()); + tags.put("partition", String.valueOf(tp0.partition())); + MetricName partitionLagMetric = metrics.metricName("records-lag", metricGroup, tags); Map allMetrics = metrics.metrics(); KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric); @@ -1210,7 +1213,12 @@ public class FetcherTest { subscriptions.seek(tp0, 0); MetricName maxLagMetric = metrics.metricInstance(metricsRegistry.recordsLagMax); - MetricName partitionLagMetric = metrics.metricName(tp0 + ".records-lag", metricGroup); + + Map tags = new HashMap<>(); + tags.put("topic", tp0.topic()); + tags.put("partition", String.valueOf(tp0.partition())); + MetricName partitionLagMetric = metrics.metricName("records-lag", metricGroup, tags); + MetricName partitionLagMetricDeprecated = metrics.metricName(tp0 + ".records-lag", metricGroup); Map allMetrics = metrics.metrics(); KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric); @@ -1225,6 +1233,9 @@ public class FetcherTest { KafkaMetric partitionLag = allMetrics.get(partitionLagMetric); assertEquals(50, partitionLag.value(), EPSILON); + KafkaMetric partitionLagDeprecated = allMetrics.get(partitionLagMetricDeprecated); + assertEquals(50, partitionLagDeprecated.value(), EPSILON); + // recordsFetchLagMax should be lso - offset of the last message after receiving a non-empty FetchResponse MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0L); @@ -1237,6 +1248,7 @@ public class FetcherTest { // verify de-registration of partition lag subscriptions.unsubscribe(); assertFalse(allMetrics.containsKey(partitionLagMetric)); + assertFalse(allMetrics.containsKey(partitionLagMetricDeprecated)); } @Test diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 04935f83649..103a28ae67b 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -1398,8 +1398,16 @@ class PlaintextConsumerTest extends BaseConsumerTest { }, "Consumer did not consume any message before timeout.") assertEquals("should be assigned once", 1, listener0.callsToAssigned) // Verify the metric exist. - val tags = Collections.singletonMap("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe") - val fetchLag0 = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags)) + val tags1 = new util.HashMap[String, String]() + tags1.put("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe") + tags1.put("topic", tp.topic()) + tags1.put("partition", String.valueOf(tp.partition())) + + val tags2 = new util.HashMap[String, String]() + tags2.put("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe") + tags2.put("topic", tp2.topic()) + tags2.put("partition", String.valueOf(tp2.partition())) + val fetchLag0 = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1)) assertNotNull(fetchLag0) val expectedLag = numMessages - records.count assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag0.value, epsilon) @@ -1411,8 +1419,8 @@ class PlaintextConsumerTest extends BaseConsumerTest { listener0.callsToAssigned >= 2 }, "Expected rebalance did not occur.") // Verify the metric has gone - assertNull(consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags))) - assertNull(consumer.metrics.get(new MetricName(tp2 + ".records-lag", "consumer-fetch-manager-metrics", "", tags))) + assertNull(consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1))) + assertNull(consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags2))) } finally { consumer.close() } @@ -1436,15 +1444,24 @@ class PlaintextConsumerTest extends BaseConsumerTest { !records.records(tp).isEmpty }, "Consumer did not consume any message before timeout.") // Verify the metric exist. - val tags = Collections.singletonMap("client-id", "testPerPartitionLagMetricsCleanUpWithAssign") - val fetchLag = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags)) + val tags = new util.HashMap[String, String]() + tags.put("client-id", "testPerPartitionLagMetricsCleanUpWithAssign") + tags.put("topic", tp.topic()) + tags.put("partition", String.valueOf(tp.partition())) + val fetchLag = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags)) assertNotNull(fetchLag) + + val oldTags = Collections.singletonMap("client-id", "testPerPartitionLagMetricsCleanUpWithAssign") + val oldFetchLag = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", oldTags)) + assertEquals(fetchLag.metricValue(), oldFetchLag.metricValue()) + val expectedLag = numMessages - records.count assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag.value, epsilon) consumer.assign(List(tp2).asJava) TestUtils.waitUntilTrue(() => !consumer.poll(100).isEmpty, "Consumer did not consume any message before timeout.") assertNull(consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags))) + assertNull(consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags))) } finally { consumer.close() } @@ -1466,8 +1483,16 @@ class PlaintextConsumerTest extends BaseConsumerTest { records = consumer.poll(100) !records.isEmpty }, "Consumer did not consume any message before timeout.") - val tags = Collections.singletonMap("client-id", "testPerPartitionLagWithMaxPollRecords") - val lag = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags)) + val oldTags = Collections.singletonMap("client-id", "testPerPartitionLagWithMaxPollRecords") + val oldLag = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", oldTags)) + + val tags = new util.HashMap[String, String]() + tags.put("client-id", "testPerPartitionLagWithMaxPollRecords") + tags.put("topic", tp.topic()) + tags.put("partition", String.valueOf(tp.partition())) + val lag = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags)) + assertEquals(oldLag.metricValue(), lag.metricValue()) + assertEquals(s"The lag should be ${numMessages - records.count}", numMessages - records.count, lag.value, epsilon) } finally { consumer.close() diff --git a/docs/upgrade.html b/docs/upgrade.html index b8b88b995a5..61400598e96 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -66,6 +66,7 @@
  • The kafka artifact in Maven no longer depends on log4j or slf4j-log4j12. Similarly to the kafka-clients artifact, users can now choose the logging back-end by including the appropriate slf4j module (slf4j-log4j12, logback, etc.). The release tarball still includes log4j and slf4j-log4j12.
  • +
  • KIP-225 changed the metric "records.lag" to use tags for topic and partition. The original version with the name format "{topic}-{partition}.records-lag" is deprecated and will be removed in 2.0.0.
  • New Protocol Versions