KAFKA-4000; Collect and record per-topic consumer metrics

Improve consumer metric collection by collecting and recording metrics per topic.

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #1684 from vahidhashemian/KAFKA-4000
This commit is contained in:
Vahid Hashemian 2016-12-09 14:54:30 -08:00 committed by Jason Gustafson
parent 055ca9b7a2
commit 7f8edbc8e8
1 changed files with 34 additions and 11 deletions

View File

@ -688,7 +688,6 @@ public class Fetcher<K, V> {
}
recordsCount = parsed.size();
this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, recordsCount);
if (!parsed.isEmpty()) {
log.trace("Adding fetched record for partition {} with offset {} to buffered record list", tp, position);
@ -833,11 +832,11 @@ public class Fetcher<K, V> {
private final FetchManagerMetrics sensors;
private final Set<TopicPartition> unrecordedPartitions;
private int totalBytes;
private int totalRecords;
private final FetchMetrics fetchMetrics = new FetchMetrics();
private final Map<String, FetchMetrics> topicFetchMetrics = new HashMap<>();
public FetchResponseMetricAggregator(FetchManagerMetrics sensors,
Set<TopicPartition> partitions) {
private FetchResponseMetricAggregator(FetchManagerMetrics sensors,
Set<TopicPartition> partitions) {
this.sensors = sensors;
this.unrecordedPartitions = partitions;
}
@ -847,14 +846,38 @@ public class Fetcher<K, V> {
* and number of records parsed. After all partitions have reported, we write the metric.
*/
public void record(TopicPartition partition, int bytes, int records) {
unrecordedPartitions.remove(partition);
totalBytes += bytes;
totalRecords += records;
this.unrecordedPartitions.remove(partition);
this.fetchMetrics.increment(bytes, records);
if (unrecordedPartitions.isEmpty()) {
// collect and aggregate per-topic metrics
String topic = partition.topic();
FetchMetrics topicFetchMetric = this.topicFetchMetrics.get(topic);
if (topicFetchMetric == null) {
topicFetchMetric = new FetchMetrics();
this.topicFetchMetrics.put(topic, topicFetchMetric);
}
topicFetchMetric.increment(bytes, records);
if (this.unrecordedPartitions.isEmpty()) {
// once all expected partitions from the fetch have reported in, record the metrics
sensors.bytesFetched.record(totalBytes);
sensors.recordsFetched.record(totalRecords);
this.sensors.bytesFetched.record(topicFetchMetric.fetchBytes);
this.sensors.recordsFetched.record(topicFetchMetric.fetchRecords);
// also record per-topic metrics
for (Map.Entry<String, FetchMetrics> entry: this.topicFetchMetrics.entrySet()) {
FetchMetrics metric = entry.getValue();
this.sensors.recordTopicFetchMetrics(entry.getKey(), metric.fetchBytes, metric.fetchRecords);
}
}
}
private static class FetchMetrics {
private int fetchBytes;
private int fetchRecords;
protected void increment(int bytes, int records) {
this.fetchBytes += bytes;
this.fetchRecords += records;
}
}
}