MINOR: Remove deprecated per-partition lag metrics

It takes O(n^2) time to instantiate a mbean with n attributes which can be very slow if the number of attributes of this mbean is large. This PR removes metrics whose number of attributes can grow with the number of partitions in the cluster to fix the performance issue. These metrics have already been marked for removal in 2.0 by KIP-225.

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #5172 from lindong28/remove-deprecated-metrics
This commit is contained in:
Dong Lin 2018-06-11 23:32:30 -07:00
parent f0b1b46486
commit 4580d9f16a
5 changed files with 32 additions and 67 deletions

View File

@ -1447,16 +1447,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLag, metricTags), new Value()); recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLag, metricTags), new Value());
recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLagMax, metricTags), new Max()); recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLagMax, metricTags), new Max());
recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLagAvg, metricTags), new Avg()); recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLagAvg, metricTags), new Avg());
recordsLag.add(this.metrics.metricName(name,
metricsRegistry.partitionRecordsLagDeprecated.group(),
metricsRegistry.partitionRecordsLagDeprecated.description()), new Value());
recordsLag.add(this.metrics.metricName(name + "-max",
metricsRegistry.partitionRecordsLagMaxDeprecated.group(),
metricsRegistry.partitionRecordsLagMaxDeprecated.description()), new Max());
recordsLag.add(this.metrics.metricName(name + "-avg",
metricsRegistry.partitionRecordsLagAvgDeprecated.group(),
metricsRegistry.partitionRecordsLagAvgDeprecated.description()), new Avg());
} }
recordsLag.record(lag); recordsLag.record(lag);
} }

View File

@ -54,10 +54,6 @@ public class FetcherMetricsRegistry {
public MetricNameTemplate partitionRecordsLead; public MetricNameTemplate partitionRecordsLead;
public MetricNameTemplate partitionRecordsLeadMin; public MetricNameTemplate partitionRecordsLeadMin;
public MetricNameTemplate partitionRecordsLeadAvg; public MetricNameTemplate partitionRecordsLeadAvg;
// To remove in 2.0
public MetricNameTemplate partitionRecordsLagDeprecated;
public MetricNameTemplate partitionRecordsLagMaxDeprecated;
public MetricNameTemplate partitionRecordsLagAvgDeprecated;
public FetcherMetricsRegistry() { public FetcherMetricsRegistry() {
this(new HashSet<String>(), ""); this(new HashSet<String>(), "");
@ -68,72 +64,65 @@ public class FetcherMetricsRegistry {
} }
public FetcherMetricsRegistry(Set<String> tags, String metricGrpPrefix) { public FetcherMetricsRegistry(Set<String> tags, String metricGrpPrefix) {
/***** Client level *****/ /***** Client level *****/
String groupName = metricGrpPrefix + "-fetch-manager-metrics"; String groupName = metricGrpPrefix + "-fetch-manager-metrics";
this.fetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName, this.fetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName,
"The average number of bytes fetched per request", tags); "The average number of bytes fetched per request", tags);
this.fetchSizeMax = new MetricNameTemplate("fetch-size-max", groupName, this.fetchSizeMax = new MetricNameTemplate("fetch-size-max", groupName,
"The maximum number of bytes fetched per request", tags); "The maximum number of bytes fetched per request", tags);
this.bytesConsumedRate = new MetricNameTemplate("bytes-consumed-rate", groupName, this.bytesConsumedRate = new MetricNameTemplate("bytes-consumed-rate", groupName,
"The average number of bytes consumed per second", tags); "The average number of bytes consumed per second", tags);
this.bytesConsumedTotal = new MetricNameTemplate("bytes-consumed-total", groupName, this.bytesConsumedTotal = new MetricNameTemplate("bytes-consumed-total", groupName,
"The total number of bytes consumed", tags); "The total number of bytes consumed", tags);
this.recordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", groupName, this.recordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", groupName,
"The average number of records in each request", tags); "The average number of records in each request", tags);
this.recordsConsumedRate = new MetricNameTemplate("records-consumed-rate", groupName, this.recordsConsumedRate = new MetricNameTemplate("records-consumed-rate", groupName,
"The average number of records consumed per second", tags); "The average number of records consumed per second", tags);
this.recordsConsumedTotal = new MetricNameTemplate("records-consumed-total", groupName, this.recordsConsumedTotal = new MetricNameTemplate("records-consumed-total", groupName,
"The total number of records consumed", tags); "The total number of records consumed", tags);
this.fetchLatencyAvg = new MetricNameTemplate("fetch-latency-avg", groupName, this.fetchLatencyAvg = new MetricNameTemplate("fetch-latency-avg", groupName,
"The average time taken for a fetch request.", tags); "The average time taken for a fetch request.", tags);
this.fetchLatencyMax = new MetricNameTemplate("fetch-latency-max", groupName, this.fetchLatencyMax = new MetricNameTemplate("fetch-latency-max", groupName,
"The max time taken for any fetch request.", tags); "The max time taken for any fetch request.", tags);
this.fetchRequestRate = new MetricNameTemplate("fetch-rate", groupName, this.fetchRequestRate = new MetricNameTemplate("fetch-rate", groupName,
"The number of fetch requests per second.", tags); "The number of fetch requests per second.", tags);
this.fetchRequestTotal = new MetricNameTemplate("fetch-total", groupName, this.fetchRequestTotal = new MetricNameTemplate("fetch-total", groupName,
"The total number of fetch requests.", tags); "The total number of fetch requests.", tags);
this.recordsLagMax = new MetricNameTemplate("records-lag-max", groupName, this.recordsLagMax = new MetricNameTemplate("records-lag-max", groupName,
"The maximum lag in terms of number of records for any partition in this window", tags); "The maximum lag in terms of number of records for any partition in this window", tags);
this.recordsLeadMin = new MetricNameTemplate("records-lead-min", groupName, this.recordsLeadMin = new MetricNameTemplate("records-lead-min", groupName,
"The minimum lead in terms of number of records for any partition in this window", tags); "The minimum lead in terms of number of records for any partition in this window", tags);
this.fetchThrottleTimeAvg = new MetricNameTemplate("fetch-throttle-time-avg", groupName, this.fetchThrottleTimeAvg = new MetricNameTemplate("fetch-throttle-time-avg", groupName,
"The average throttle time in ms", tags); "The average throttle time in ms", tags);
this.fetchThrottleTimeMax = new MetricNameTemplate("fetch-throttle-time-max", groupName, this.fetchThrottleTimeMax = new MetricNameTemplate("fetch-throttle-time-max", groupName,
"The maximum throttle time in ms", tags); "The maximum throttle time in ms", tags);
/***** Topic level *****/ /***** Topic level *****/
Set<String> topicTags = new LinkedHashSet<>(tags); Set<String> topicTags = new LinkedHashSet<>(tags);
topicTags.add("topic"); topicTags.add("topic");
this.topicFetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName, this.topicFetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName,
"The average number of bytes fetched per request for a topic", topicTags); "The average number of bytes fetched per request for a topic", topicTags);
this.topicFetchSizeMax = new MetricNameTemplate("fetch-size-max", groupName, this.topicFetchSizeMax = new MetricNameTemplate("fetch-size-max", groupName,
"The maximum number of bytes fetched per request for a topic", topicTags); "The maximum number of bytes fetched per request for a topic", topicTags);
this.topicBytesConsumedRate = new MetricNameTemplate("bytes-consumed-rate", groupName, this.topicBytesConsumedRate = new MetricNameTemplate("bytes-consumed-rate", groupName,
"The average number of bytes consumed per second for a topic", topicTags); "The average number of bytes consumed per second for a topic", topicTags);
this.topicBytesConsumedTotal = new MetricNameTemplate("bytes-consumed-total", groupName, this.topicBytesConsumedTotal = new MetricNameTemplate("bytes-consumed-total", groupName,
"The total number of bytes consumed for a topic", topicTags); "The total number of bytes consumed for a topic", topicTags);
this.topicRecordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", groupName, this.topicRecordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", groupName,
"The average number of records in each request for a topic", topicTags); "The average number of records in each request for a topic", topicTags);
this.topicRecordsConsumedRate = new MetricNameTemplate("records-consumed-rate", groupName, this.topicRecordsConsumedRate = new MetricNameTemplate("records-consumed-rate", groupName,
"The average number of records consumed per second for a topic", topicTags); "The average number of records consumed per second for a topic", topicTags);
this.topicRecordsConsumedTotal = new MetricNameTemplate("records-consumed-total", groupName, this.topicRecordsConsumedTotal = new MetricNameTemplate("records-consumed-total", groupName,
"The total number of records consumed for a topic", topicTags); "The total number of records consumed for a topic", topicTags);
this.partitionRecordsLagDeprecated = new MetricNameTemplate("{topic}-{partition}.records-lag", groupName,
"The latest lag of the partition (DEPRECATED use the tag based version instead)", tags);
this.partitionRecordsLagMaxDeprecated = new MetricNameTemplate("{topic}-{partition}.records-lag-max", groupName,
"The max lag of the partition (DEPRECATED use the tag based version instead)", tags);
this.partitionRecordsLagAvgDeprecated = new MetricNameTemplate("{topic}-{partition}.records-lag-avg", groupName,
"The average lag of the partition (DEPRECATED use the tag based version instead)", tags);
/***** Partition level *****/ /***** Partition level *****/
Set<String> partitionTags = new HashSet<>(topicTags); Set<String> partitionTags = new HashSet<>(topicTags);
@ -150,9 +139,9 @@ public class FetcherMetricsRegistry {
"The min lead of the partition", partitionTags); "The min lead of the partition", partitionTags);
this.partitionRecordsLeadAvg = new MetricNameTemplate("records-lead-avg", groupName, this.partitionRecordsLeadAvg = new MetricNameTemplate("records-lead-avg", groupName,
"The average lead of the partition", partitionTags); "The average lead of the partition", partitionTags);
} }
public List<MetricNameTemplate> getAllTemplates() { public List<MetricNameTemplate> getAllTemplates() {
return Arrays.asList( return Arrays.asList(
fetchSizeAvg, fetchSizeAvg,
@ -177,9 +166,6 @@ public class FetcherMetricsRegistry {
topicRecordsPerRequestAvg, topicRecordsPerRequestAvg,
topicRecordsConsumedRate, topicRecordsConsumedRate,
topicRecordsConsumedTotal, topicRecordsConsumedTotal,
partitionRecordsLagDeprecated,
partitionRecordsLagAvgDeprecated,
partitionRecordsLagMaxDeprecated,
partitionRecordsLag, partitionRecordsLag,
partitionRecordsLagAvg, partitionRecordsLagAvg,
partitionRecordsLagMax, partitionRecordsLagMax,

View File

@ -1529,7 +1529,6 @@ public class FetcherTest {
tags.put("topic", tp0.topic()); tags.put("topic", tp0.topic());
tags.put("partition", String.valueOf(tp0.partition())); tags.put("partition", String.valueOf(tp0.partition()));
MetricName partitionLagMetric = metrics.metricName("records-lag", metricGroup, tags); MetricName partitionLagMetric = metrics.metricName("records-lag", metricGroup, tags);
MetricName partitionLagMetricDeprecated = metrics.metricName(tp0 + ".records-lag", metricGroup);
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics(); Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric); KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric);
@ -1544,9 +1543,6 @@ public class FetcherTest {
KafkaMetric partitionLag = allMetrics.get(partitionLagMetric); KafkaMetric partitionLag = allMetrics.get(partitionLagMetric);
assertEquals(50, partitionLag.value(), EPSILON); assertEquals(50, partitionLag.value(), EPSILON);
KafkaMetric partitionLagDeprecated = allMetrics.get(partitionLagMetricDeprecated);
assertEquals(50, partitionLagDeprecated.value(), EPSILON);
// recordsFetchLagMax should be lso - offset of the last message after receiving a non-empty FetchResponse // recordsFetchLagMax should be lso - offset of the last message after receiving a non-empty FetchResponse
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
TimestampType.CREATE_TIME, 0L); TimestampType.CREATE_TIME, 0L);
@ -1559,7 +1555,6 @@ public class FetcherTest {
// verify de-registration of partition lag // verify de-registration of partition lag
subscriptions.unsubscribe(); subscriptions.unsubscribe();
assertFalse(allMetrics.containsKey(partitionLagMetric)); assertFalse(allMetrics.containsKey(partitionLagMetric));
assertFalse(allMetrics.containsKey(partitionLagMetricDeprecated));
} }
@Test @Test

View File

@ -43,11 +43,11 @@ class PlaintextConsumerTest extends BaseConsumerTest {
def testHeaders() { def testHeaders() {
val numRecords = 1 val numRecords = 1
val record = new ProducerRecord(tp.topic, tp.partition, null, "key".getBytes, "value".getBytes) val record = new ProducerRecord(tp.topic, tp.partition, null, "key".getBytes, "value".getBytes)
record.headers().add("headerKey", "headerValue".getBytes) record.headers().add("headerKey", "headerValue".getBytes)
this.producers.head.send(record) this.producers.head.send(record)
assertEquals(0, this.consumers.head.assignment.size) assertEquals(0, this.consumers.head.assignment.size)
this.consumers.head.assign(List(tp).asJava) this.consumers.head.assign(List(tp).asJava)
assertEquals(1, this.consumers.head.assignment.size) assertEquals(1, this.consumers.head.assignment.size)
@ -63,23 +63,23 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals("headerValue", if (header == null) null else new String(header.value())) assertEquals("headerValue", if (header == null) null else new String(header.value()))
} }
} }
@Test @Test
def testHeadersExtendedSerializerDeserializer() { def testHeadersExtendedSerializerDeserializer() {
val numRecords = 1 val numRecords = 1
val record = new ProducerRecord(tp.topic, tp.partition, null, "key".getBytes, "value".getBytes) val record = new ProducerRecord(tp.topic, tp.partition, null, "key".getBytes, "value".getBytes)
val extendedSerializer = new ExtendedSerializer[Array[Byte]] { val extendedSerializer = new ExtendedSerializer[Array[Byte]] {
var serializer = new ByteArraySerializer() var serializer = new ByteArraySerializer()
override def serialize(topic: String, headers: Headers, data: Array[Byte]): Array[Byte] = { override def serialize(topic: String, headers: Headers, data: Array[Byte]): Array[Byte] = {
headers.add("content-type", "application/octet-stream".getBytes) headers.add("content-type", "application/octet-stream".getBytes)
serializer.serialize(topic, data) serializer.serialize(topic, data)
} }
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = serializer.configure(configs, isKey) override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = serializer.configure(configs, isKey)
override def close(): Unit = serializer.close() override def close(): Unit = serializer.close()
override def serialize(topic: String, data: Array[Byte]): Array[Byte] = { override def serialize(topic: String, data: Array[Byte]): Array[Byte] = {
@ -90,9 +90,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val extendedDeserializer = new ExtendedDeserializer[Array[Byte]] { val extendedDeserializer = new ExtendedDeserializer[Array[Byte]] {
var deserializer = new ByteArrayDeserializer() var deserializer = new ByteArrayDeserializer()
override def deserialize(topic: String, headers: Headers, data: Array[Byte]): Array[Byte] = { override def deserialize(topic: String, headers: Headers, data: Array[Byte]): Array[Byte] = {
val header = headers.lastHeader("content-type") val header = headers.lastHeader("content-type")
assertEquals("application/octet-stream", if (header == null) null else new String(header.value())) assertEquals("application/octet-stream", if (header == null) null else new String(header.value()))
@ -110,7 +110,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
} }
} }
val producer0 = new KafkaProducer(this.producerConfig, new ByteArraySerializer(), extendedSerializer) val producer0 = new KafkaProducer(this.producerConfig, new ByteArraySerializer(), extendedSerializer)
producers += producer0 producers += producer0
producer0.send(record) producer0.send(record)
@ -127,7 +127,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(numRecords, records.size) assertEquals(numRecords, records.size)
} }
@Test @Test
def testMaxPollRecords() { def testMaxPollRecords() {
val maxPollRecords = 2 val maxPollRecords = 2
@ -1534,10 +1534,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val fetchLag = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags)) val fetchLag = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags))
assertNotNull(fetchLag) assertNotNull(fetchLag)
val oldTags = Collections.singletonMap("client-id", "testPerPartitionLagMetricsCleanUpWithAssign")
val oldFetchLag = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", oldTags))
assertEquals(fetchLag.metricValue(), oldFetchLag.metricValue())
val expectedLag = numMessages - records.count val expectedLag = numMessages - records.count
assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag.value, epsilon) assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag.value, epsilon)
@ -1594,15 +1590,12 @@ class PlaintextConsumerTest extends BaseConsumerTest {
records = consumer.poll(100) records = consumer.poll(100)
!records.isEmpty !records.isEmpty
}, "Consumer did not consume any message before timeout.") }, "Consumer did not consume any message before timeout.")
val oldTags = Collections.singletonMap("client-id", "testPerPartitionLagWithMaxPollRecords")
val oldLag = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", oldTags))
val tags = new util.HashMap[String, String]() val tags = new util.HashMap[String, String]()
tags.put("client-id", "testPerPartitionLagWithMaxPollRecords") tags.put("client-id", "testPerPartitionLagWithMaxPollRecords")
tags.put("topic", tp.topic()) tags.put("topic", tp.topic())
tags.put("partition", String.valueOf(tp.partition())) tags.put("partition", String.valueOf(tp.partition()))
val lag = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags)) val lag = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags))
assertEquals(oldLag.metricValue(), lag.metricValue())
assertEquals(s"The lag should be ${numMessages - records.count}", numMessages - records.count, lag.value, epsilon) assertEquals(s"The lag should be ${numMessages - records.count}", numMessages - records.count, lag.value, epsilon)
} finally { } finally {

View File

@ -80,6 +80,7 @@
JMX monitoring tools that do not automatically aggregate. To get the total count for a specific request type, the tool needs to be JMX monitoring tools that do not automatically aggregate. To get the total count for a specific request type, the tool needs to be
updated to aggregate across different versions. updated to aggregate across different versions.
</li> </li>
<li><a href="https://cwiki.apache.org/confluence/x/uaBzB">KIP-225</a> changed the metric "records.lag" to use tags for topic and partition. The original version with the name format "{topic}-{partition}.records-lag" has been removed.</li>
<li>The Scala producers, which have been deprecated since 0.10.0.0, have been removed. The Java producer has been the recommended option <li>The Scala producers, which have been deprecated since 0.10.0.0, have been removed. The Java producer has been the recommended option
since 0.9.0.0. Note that the behaviour of the default partitioner in the Java producer differs from the default partitioner since 0.9.0.0. Note that the behaviour of the default partitioner in the Java producer differs from the default partitioner
in the Scala producers. Users migrating should consider configuring a custom partitioner that retains the previous behaviour.</li> in the Scala producers. Users migrating should consider configuring a custom partitioner that retains the previous behaviour.</li>