KAFKA-5191: Autogenerate Consumer Fetcher metrics

Autogenerate docs for the Consumer Fetcher's metrics. This is a smaller subset of the original PR https://github.com/apache/kafka/pull/1202.

CC ijuma benstopford hachikuji

Author: James Cheng <jylcheng@yahoo.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>

Closes #2993 from wushujames/fetcher_metrics_docs
This commit is contained in:
James Cheng 2017-05-26 15:34:20 -07:00 committed by Guozhang Wang
parent ca8915d2ef
commit 0bc4f75eed
13 changed files with 498 additions and 187 deletions

View File

@ -617,10 +617,17 @@ project(':core') {
standardOutput = new File(generatedDocsDir, "topic_config.html").newOutputStream()
}
task genConsumerMetricsDocs(type: JavaExec) {
classpath = sourceSets.test.runtimeClasspath
main = 'org.apache.kafka.clients.consumer.internals.ConsumerMetrics'
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "consumer_metrics.html").newOutputStream()
}
task siteDocsTar(dependsOn: ['genProtocolErrorDocs', 'genProtocolApiKeyDocs', 'genProtocolMessageDocs',
'genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs',
'genTopicConfigDocs', ':connect:runtime:genConnectConfigDocs', ':connect:runtime:genConnectTransformationDocs',
':streams:genStreamsConfigDocs'], type: Tar) {
':streams:genStreamsConfigDocs', 'genConsumerMetricsDocs'], type: Tar) {
classifier = 'site-docs'
compression = Compression.GZIP
from project.file("$rootDir/docs")

View File

@ -22,6 +22,7 @@ import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.PollCondition;
import org.apache.kafka.clients.consumer.internals.Fetcher;
@ -662,11 +663,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
String metricGrpPrefix = "consumer";
ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer");
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
IsolationLevel isolationLevel = IsolationLevel.valueOf(
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricGrpPrefix);
Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry.fetcherMetrics);
NetworkClient netClient = new NetworkClient(
new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
@ -718,7 +720,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.metadata,
this.subscriptions,
metrics,
metricGrpPrefix,
metricsRegistry.fetcherMetrics,
this.time,
this.retryBackoffMs,
isolationLevel);

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.metrics.Metrics;
public class ConsumerMetrics {
public FetcherMetricsRegistry fetcherMetrics;
public ConsumerMetrics(Set<String> metricsTags, String metricGrpPrefix) {
this.fetcherMetrics = new FetcherMetricsRegistry(metricsTags, metricGrpPrefix);
}
public ConsumerMetrics(String metricGroupPrefix) {
this(new HashSet<String>(), metricGroupPrefix);
}
private List<MetricNameTemplate> getAllTemplates() {
List<MetricNameTemplate> l = new ArrayList<>();
l.addAll(this.fetcherMetrics.getAllTemplates());
return l;
}
public static void main(String[] args) {
Set<String> tags = new HashSet<>();
tags.add("client-id");
ConsumerMetrics metrics = new ConsumerMetrics(tags, "consumer");
System.out.println(Metrics.toHtmlTable("kafka.consumer", metrics.getAllTemplates()));
}
}

View File

@ -125,7 +125,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
Metadata metadata,
SubscriptionState subscriptions,
Metrics metrics,
String metricGrpPrefix,
FetcherMetricsRegistry metricsRegistry,
Time time,
long retryBackoffMs,
IsolationLevel isolationLevel) {
@ -142,7 +142,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
this.keyDeserializer = ensureExtended(keyDeserializer);
this.valueDeserializer = ensureExtended(valueDeserializer);
this.completedFetches = new ConcurrentLinkedQueue<>();
this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix);
this.sensors = new FetchManagerMetrics(metrics, metricsRegistry);
this.retryBackoffMs = retryBackoffMs;
this.isolationLevel = isolationLevel;
@ -931,15 +931,12 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
sensors.updatePartitionLagSensors(assignment);
}
public static Sensor throttleTimeSensor(Metrics metrics, String metricGrpPrefix) {
String metricGrpName = metricGrpPrefix + FetchManagerMetrics.METRIC_GROUP_SUFFIX;
public static Sensor throttleTimeSensor(Metrics metrics, FetcherMetricsRegistry metricsRegistry) {
Sensor fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-avg",
metricGrpName,
"The average throttle time in ms"), new Avg());
fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-max",
metricGrpName,
"The maximum throttle time in ms"), new Max());
fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeAvg), new Avg());
fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeMax), new Max());
return fetchThrottleTimeSensor;
}
@ -1225,54 +1222,35 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
}
private static class FetchManagerMetrics {
private static final String METRIC_GROUP_SUFFIX = "-fetch-manager-metrics";
private final Metrics metrics;
private final String metricGrpName;
private FetcherMetricsRegistry metricsRegistry;
private final Sensor bytesFetched;
private final Sensor recordsFetched;
private final Sensor fetchLatency;
private final Sensor recordsFetchLag;
private Set<TopicPartition> assignedPartitions;
private FetchManagerMetrics(Metrics metrics, String metricGrpPrefix) {
private FetchManagerMetrics(Metrics metrics, FetcherMetricsRegistry metricsRegistry) {
this.metrics = metrics;
this.metricGrpName = metricGrpPrefix + METRIC_GROUP_SUFFIX;
this.metricsRegistry = metricsRegistry;
this.bytesFetched = metrics.sensor("bytes-fetched");
this.bytesFetched.add(metrics.metricName("fetch-size-avg",
this.metricGrpName,
"The average number of bytes fetched per request"), new Avg());
this.bytesFetched.add(metrics.metricName("fetch-size-max",
this.metricGrpName,
"The maximum number of bytes fetched per request"), new Max());
this.bytesFetched.add(metrics.metricName("bytes-consumed-rate",
this.metricGrpName,
"The average number of bytes consumed per second"), new Rate());
this.bytesFetched.add(metrics.metricInstance(metricsRegistry.fetchSizeAvg), new Avg());
this.bytesFetched.add(metrics.metricInstance(metricsRegistry.fetchSizeMax), new Max());
this.bytesFetched.add(metrics.metricInstance(metricsRegistry.bytesConsumedRate), new Rate());
this.recordsFetched = metrics.sensor("records-fetched");
this.recordsFetched.add(metrics.metricName("records-per-request-avg",
this.metricGrpName,
"The average number of records in each request"), new Avg());
this.recordsFetched.add(metrics.metricName("records-consumed-rate",
this.metricGrpName,
"The average number of records consumed per second"), new Rate());
this.recordsFetched.add(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg), new Avg());
this.recordsFetched.add(metrics.metricInstance(metricsRegistry.recordsConsumedRate), new Rate());
this.fetchLatency = metrics.sensor("fetch-latency");
this.fetchLatency.add(metrics.metricName("fetch-latency-avg",
this.metricGrpName,
"The average time taken for a fetch request."), new Avg());
this.fetchLatency.add(metrics.metricName("fetch-latency-max",
this.metricGrpName,
"The max time taken for any fetch request."), new Max());
this.fetchLatency.add(metrics.metricName("fetch-rate",
this.metricGrpName,
"The number of fetch requests per second."), new Rate(new Count()));
this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchLatencyAvg), new Avg());
this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchLatencyMax), new Max());
this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchRequestRate), new Rate(new Count()));
this.recordsFetchLag = metrics.sensor("records-lag");
this.recordsFetchLag.add(metrics.metricName("records-lag-max",
this.metricGrpName,
"The maximum lag in terms of number of records for any partition in this window"), new Max());
this.recordsFetchLag.add(metrics.metricInstance(metricsRegistry.recordsLagMax), new Max());
}
private void recordTopicFetchMetrics(String topic, int bytes, int records) {
@ -1283,17 +1261,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
Map<String, String> metricTags = Collections.singletonMap("topic", topic.replace('.', '_'));
bytesFetched = this.metrics.sensor(name);
bytesFetched.add(this.metrics.metricName("fetch-size-avg",
this.metricGrpName,
"The average number of bytes fetched per request for topic " + topic,
bytesFetched.add(this.metrics.metricInstance(metricsRegistry.topicFetchSizeAvg,
metricTags), new Avg());
bytesFetched.add(this.metrics.metricName("fetch-size-max",
this.metricGrpName,
"The maximum number of bytes fetched per request for topic " + topic,
bytesFetched.add(this.metrics.metricInstance(metricsRegistry.topicFetchSizeMax,
metricTags), new Max());
bytesFetched.add(this.metrics.metricName("bytes-consumed-rate",
this.metricGrpName,
"The average number of bytes consumed per second for topic " + topic,
bytesFetched.add(this.metrics.metricInstance(metricsRegistry.topicBytesConsumedRate,
metricTags), new Rate());
}
bytesFetched.record(bytes);
@ -1306,13 +1278,9 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
metricTags.put("topic", topic.replace('.', '_'));
recordsFetched = this.metrics.sensor(name);
recordsFetched.add(this.metrics.metricName("records-per-request-avg",
this.metricGrpName,
"The average number of records in each request for topic " + topic,
recordsFetched.add(this.metrics.metricInstance(metricsRegistry.topicRecordsPerRequestAvg,
metricTags), new Avg());
recordsFetched.add(this.metrics.metricName("records-consumed-rate",
this.metricGrpName,
"The average number of records consumed per second for topic " + topic,
recordsFetched.add(this.metrics.metricInstance(metricsRegistry.topicRecordsConsumedRate,
metricTags), new Rate());
}
recordsFetched.record(records);
@ -1335,14 +1303,15 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
Sensor recordsLag = this.metrics.getSensor(name);
if (recordsLag == null) {
recordsLag = this.metrics.sensor(name);
recordsLag.add(this.metrics.metricName(name, this.metricGrpName, "The latest lag of the partition"),
new Value());
recordsLag.add(this.metrics.metricName(name,
metricsRegistry.partitionRecordsLag.group(),
metricsRegistry.partitionRecordsLag.description()), new Value());
recordsLag.add(this.metrics.metricName(name + "-max",
this.metricGrpName,
"The max lag of the partition"), new Max());
metricsRegistry.partitionRecordsLagMax.group(),
metricsRegistry.partitionRecordsLagMax.description()), new Max());
recordsLag.add(this.metrics.metricName(name + "-avg",
this.metricGrpName,
"The average lag of the partition"), new Avg());
metricsRegistry.partitionRecordsLagAvg.group(),
metricsRegistry.partitionRecordsLagAvg.description()), new Avg());
}
recordsLag.record(lag);
}

View File

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.kafka.common.MetricNameTemplate;
public class FetcherMetricsRegistry {
public MetricNameTemplate fetchSizeAvg;
public MetricNameTemplate fetchSizeMax;
public MetricNameTemplate bytesConsumedRate;
public MetricNameTemplate recordsPerRequestAvg;
public MetricNameTemplate recordsConsumedRate;
public MetricNameTemplate fetchLatencyAvg;
public MetricNameTemplate fetchLatencyMax;
public MetricNameTemplate fetchRequestRate;
public MetricNameTemplate recordsLagMax;
public MetricNameTemplate fetchThrottleTimeAvg;
public MetricNameTemplate fetchThrottleTimeMax;
public MetricNameTemplate topicFetchSizeAvg;
public MetricNameTemplate topicFetchSizeMax;
public MetricNameTemplate topicBytesConsumedRate;
public MetricNameTemplate topicRecordsPerRequestAvg;
public MetricNameTemplate topicRecordsConsumedRate;
public MetricNameTemplate partitionRecordsLag;
public MetricNameTemplate partitionRecordsLagMax;
public MetricNameTemplate partitionRecordsLagAvg;
public FetcherMetricsRegistry() {
this(new HashSet<String>(), "");
}
public FetcherMetricsRegistry(String metricGrpPrefix) {
this(new HashSet<String>(), metricGrpPrefix);
}
public FetcherMetricsRegistry(Set<String> tags, String metricGrpPrefix) {
/***** Client level *****/
String groupName = metricGrpPrefix + "-fetch-manager-metrics";
this.fetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName,
"The average number of bytes fetched per request", tags);
this.fetchSizeMax = new MetricNameTemplate("fetch-size-max", groupName,
"The maximum number of bytes fetched per request", tags);
this.bytesConsumedRate = new MetricNameTemplate("bytes-consumed-rate", groupName,
"The average number of bytes consumed per second", tags);
this.recordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", groupName,
"The average number of records in each request", tags);
this.recordsConsumedRate = new MetricNameTemplate("records-consumed-rate", groupName,
"The average number of records consumed per second", tags);
this.fetchLatencyAvg = new MetricNameTemplate("fetch-latency-avg", groupName,
"The average time taken for a fetch request.", tags);
this.fetchLatencyMax = new MetricNameTemplate("fetch-latency-max", groupName,
"The max time taken for any fetch request.", tags);
this.fetchRequestRate = new MetricNameTemplate("fetch-rate", groupName,
"The number of fetch requests per second.", tags);
this.recordsLagMax = new MetricNameTemplate("records-lag-max", groupName,
"The maximum lag in terms of number of records for any partition in this window", tags);
this.fetchThrottleTimeAvg = new MetricNameTemplate("fetch-throttle-time-avg", groupName,
"The average throttle time in ms", tags);
this.fetchThrottleTimeMax = new MetricNameTemplate("fetch-throttle-time-max", groupName,
"The maximum throttle time in ms", tags);
/***** Topic level *****/
Set<String> topicTags = new HashSet<>(tags);
topicTags.add("topic");
this.topicFetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName,
"The average number of bytes fetched per request for a topic", topicTags);
this.topicFetchSizeMax = new MetricNameTemplate("fetch-size-max", groupName,
"The maximum number of bytes fetched per request for a topic", topicTags);
this.topicBytesConsumedRate = new MetricNameTemplate("bytes-consumed-rate", groupName,
"The average number of bytes consumed per second for a topic", topicTags);
this.topicRecordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", groupName,
"The average number of records in each request for a topic", topicTags);
this.topicRecordsConsumedRate = new MetricNameTemplate("records-consumed-rate", groupName,
"The average number of records consumed per second for a topic", topicTags);
/***** Partition level *****/
this.partitionRecordsLag = new MetricNameTemplate("{topic}-{partition}.records-lag", groupName,
"The latest lag of the partition", tags);
this.partitionRecordsLagMax = new MetricNameTemplate("{topic}-{partition}.records-lag-max", groupName,
"The max lag of the partition", tags);
this.partitionRecordsLagAvg = new MetricNameTemplate("{topic}-{partition}.records-lag-avg", groupName,
"The average lag of the partition", tags);
}
public List<MetricNameTemplate> getAllTemplates() {
return Arrays.asList(
fetchSizeAvg,
fetchSizeMax,
bytesConsumedRate,
recordsPerRequestAvg,
recordsConsumedRate,
fetchLatencyAvg,
fetchLatencyMax,
fetchRequestRate,
recordsLagMax,
fetchThrottleTimeAvg,
fetchThrottleTimeMax,
topicFetchSizeAvg,
topicFetchSizeMax,
topicBytesConsumedRate,
topicRecordsPerRequestAvg,
topicRecordsConsumedRate,
partitionRecordsLag,
partitionRecordsLagAvg,
partitionRecordsLagMax
);
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common;
import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.common.utils.Utils;
/**
* A template for a MetricName. It contains a name, group, and description, as
* well as all the tags that will be used to create the mBean name. Tag values
* are omitted from the template, but are filled in at runtime with their
* specified values.
*/
public class MetricNameTemplate {
private final String name;
private final String group;
private final String description;
private Set<String> tags;
public MetricNameTemplate(String name, String group, String description, Set<String> tags) {
this.name = Utils.notNull(name);
this.group = Utils.notNull(group);
this.description = Utils.notNull(description);
this.tags = Utils.notNull(tags);
}
public MetricNameTemplate(String name, String group, String description, String... keys) {
this(name, group, description, getTags(keys));
}
private static Set<String> getTags(String... keys) {
Set<String> tags = new HashSet<String>();
for (int i = 0; i < keys.length; i++)
tags.add(keys[i]);
return tags;
}
public String name() {
return this.name;
}
public String group() {
return this.group;
}
public String description() {
return this.description;
}
public Set<String> tags() {
return tags;
}
}

View File

@ -97,7 +97,7 @@ public class JmxReporter implements MetricsReporter {
private KafkaMbean removeAttribute(KafkaMetric metric) {
MetricName metricName = metric.metricName();
String mBeanName = getMBeanName(metricName);
String mBeanName = getMBeanName(prefix, metricName);
KafkaMbean mbean = this.mbeans.get(mBeanName);
if (mbean != null)
mbean.removeAttribute(metricName.name());
@ -107,7 +107,7 @@ public class JmxReporter implements MetricsReporter {
private KafkaMbean addAttribute(KafkaMetric metric) {
try {
MetricName metricName = metric.metricName();
String mBeanName = getMBeanName(metricName);
String mBeanName = getMBeanName(prefix, metricName);
if (!this.mbeans.containsKey(mBeanName))
mbeans.put(mBeanName, new KafkaMbean(mBeanName));
KafkaMbean mbean = this.mbeans.get(mBeanName);
@ -122,7 +122,7 @@ public class JmxReporter implements MetricsReporter {
* @param metricName
* @return standard JMX MBean name in the following format domainName:type=metricType,key1=val1,key2=val2
*/
private String getMBeanName(MetricName metricName) {
static String getMBeanName(String prefix, MetricName metricName) {
StringBuilder mBeanName = new StringBuilder();
mBeanName.append(prefix);
mBeanName.append(":type=");

View File

@ -17,6 +17,7 @@
package org.apache.kafka.common.metrics;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
@ -26,9 +27,13 @@ import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@ -226,6 +231,65 @@ public class Metrics implements Closeable {
return tags;
}
public static String toHtmlTable(String domain, List<MetricNameTemplate> allMetrics) {
Map<String, Map<String, String>> beansAndAttributes = new TreeMap<String, Map<String, String>>();
try (Metrics metrics = new Metrics()) {
for (MetricNameTemplate template : allMetrics) {
Map<String, String> tags = new TreeMap<String, String>();
for (String s : template.tags()) {
tags.put(s, "{" + s + "}");
}
MetricName metricName = metrics.metricName(template.name(), template.group(), template.description(), tags);
String mBeanName = JmxReporter.getMBeanName(domain, metricName);
if (!beansAndAttributes.containsKey(mBeanName)) {
beansAndAttributes.put(mBeanName, new TreeMap<String, String>());
}
Map<String, String> attrAndDesc = beansAndAttributes.get(mBeanName);
if (!attrAndDesc.containsKey(template.name())) {
attrAndDesc.put(template.name(), template.description());
} else {
throw new IllegalArgumentException("mBean '" + mBeanName + "' attribute '" + template.name() + "' is defined twice.");
}
}
}
StringBuilder b = new StringBuilder();
b.append("<table class=\"data-table\"><tbody>\n");
for (Entry<String, Map<String, String>> e : beansAndAttributes.entrySet()) {
b.append("<tr>\n");
b.append("<td colspan=3 class=\"mbeanName\" style=\"background-color:#ccc; font-weight: bold;\">");
b.append(e.getKey());
b.append("</td>");
b.append("</tr>\n");
b.append("<tr>\n");
b.append("<th style=\"width: 90px\"></th>\n");
b.append("<th>Attribute name</th>\n");
b.append("<th>Description</th>\n");
b.append("</tr>\n");
for (Entry<String, String> e2 : e.getValue().entrySet()) {
b.append("<tr>\n");
b.append("<td></td>");
b.append("<td>");
b.append(e2.getKey());
b.append("</td>");
b.append("<td>");
b.append(e2.getValue());
b.append("</td>");
b.append("</tr>\n");
}
}
b.append("</tbody></table>");
return b.toString();
}
public MetricConfig config() {
return config;
}
@ -484,6 +548,25 @@ public class Metrics implements Closeable {
return Collections.unmodifiableMap(childrenSensors);
}
public MetricName metricInstance(MetricNameTemplate template, String... keyValue) {
return metricInstance(template, getTags(keyValue));
}
public MetricName metricInstance(MetricNameTemplate template, Map<String, String> tags) {
// check to make sure that the runtime defined tags contain all the template tags.
Set<String> runtimeTagKeys = new HashSet<>(tags.keySet());
runtimeTagKeys.addAll(config().tags().keySet());
Set<String> templateTagKeys = template.tags();
if (!runtimeTagKeys.equals(templateTagKeys)) {
throw new IllegalArgumentException("For '" + template.name() + "', runtime-defined metric tags do not match the tags in the template. " + ""
+ "Runtime = " + runtimeTagKeys.toString() + " Template = " + templateTagKeys.toString());
}
return this.metricName(template.name(), template.group(), template.description(), tags);
}
/**
* Close this metrics repository.
*/

View File

@ -22,6 +22,7 @@ import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.Fetcher;
@ -1540,6 +1541,8 @@ public class KafkaConsumerTest {
ConsumerInterceptors<String, String> interceptors = null;
Metrics metrics = new Metrics();
ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricGroupPrefix);
SubscriptionState subscriptions = new SubscriptionState(autoResetStrategy);
ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, retryBackoffMs, requestTimeoutMs);
ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator(
@ -1574,7 +1577,7 @@ public class KafkaConsumerTest {
metadata,
subscriptions,
metrics,
metricGroupPrefix,
metricsRegistry.fetcherMetrics,
time,
retryBackoffMs,
IsolationLevel.READ_UNCOMMITTED);

View File

@ -122,6 +122,8 @@ public class FetcherTest {
private Cluster cluster = TestUtils.singletonCluster(topicName, 2);
private Node node = cluster.nodes().get(0);
private Metrics metrics = new Metrics(time);
FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry("consumer" + groupId);
private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE);
private static final double EPSILON = 0.0001;
@ -1043,7 +1045,7 @@ public class FetcherTest {
@Test
public void testQuotaMetrics() throws Exception {
MockSelector selector = new MockSelector(time);
Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, "consumer" + groupId);
Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry);
Cluster cluster = TestUtils.singletonCluster("test", 1);
Node node = cluster.nodes().get(0);
NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
@ -1070,8 +1072,8 @@ public class FetcherTest {
selector.clear();
}
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric avgMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-avg", metricGroup, ""));
KafkaMetric maxMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-max", metricGroup, ""));
KafkaMetric avgMetric = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchThrottleTimeAvg));
KafkaMetric maxMetric = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchThrottleTimeMax));
// Throttle times are ApiVersions=400, Fetch=(100, 200, 300)
assertEquals(250, avgMetric.value(), EPSILON);
assertEquals(400, maxMetric.value(), EPSILON);
@ -1087,7 +1089,7 @@ public class FetcherTest {
subscriptions.assignFromUser(singleton(tp1));
subscriptions.seek(tp1, 0);
MetricName maxLagMetric = metrics.metricName("records-lag-max", metricGroup);
MetricName maxLagMetric = metrics.metricInstance(metricsRegistry.recordsLagMax);
MetricName partitionLagMetric = metrics.metricName(tp1 + ".records-lag", metricGroup);
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
@ -1122,8 +1124,8 @@ public class FetcherTest {
subscriptions.seek(tp1, 0);
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricName("fetch-size-avg", metricGroup));
KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricName("records-per-request-avg", metricGroup));
KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
TimestampType.CREATE_TIME, 0L);
@ -1146,8 +1148,8 @@ public class FetcherTest {
subscriptions.seek(tp1, 1);
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricName("fetch-size-avg", metricGroup));
KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricName("records-per-request-avg", metricGroup));
KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
TimestampType.CREATE_TIME, 0L);
@ -1173,8 +1175,8 @@ public class FetcherTest {
subscriptions.seek(tp2, 0);
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricName("fetch-size-avg", metricGroup));
KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricName("records-per-request-avg", metricGroup));
KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
TimestampType.CREATE_TIME, 0L);
@ -1208,8 +1210,8 @@ public class FetcherTest {
subscriptions.seek(tp2, 0);
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricName("fetch-size-avg", metricGroup));
KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricName("records-per-request-avg", metricGroup));
KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));
// send the fetch and then seek to a new offset
assertEquals(1, fetcher.sendFetches());
@ -1816,7 +1818,7 @@ public class FetcherTest {
metadata,
subscriptions,
metrics,
"consumer" + groupId,
metricsRegistry,
time,
retryBackoffMs,
isolationLevel);

View File

@ -530,4 +530,58 @@ public class MetricsTest {
private Double measure(Measurable rate, MetricConfig config) {
return rate.measure(config, time.milliseconds());
}
@Test
public void testMetricInstances() {
MetricName n1 = metrics.metricInstance(SampleMetrics.METRIC1, "key1", "value1", "key2", "value2");
Map<String, String> tags = new HashMap<String, String>();
tags.put("key1", "value1");
tags.put("key2", "value2");
MetricName n2 = metrics.metricInstance(SampleMetrics.METRIC2, tags);
assertEquals("metric names created in two different ways should be equal", n1, n2);
try {
metrics.metricInstance(SampleMetrics.METRIC1, "key1");
fail("Creating MetricName with an odd number of keyValue should fail");
} catch (IllegalArgumentException e) {
// this is expected
}
Map<String, String> parentTagsWithValues = new HashMap<>();
parentTagsWithValues.put("parent-tag", "parent-tag-value");
Map<String, String> childTagsWithValues = new HashMap<>();
childTagsWithValues.put("child-tag", "child-tag-value");
try (Metrics inherited = new Metrics(new MetricConfig().tags(parentTagsWithValues), Arrays.asList((MetricsReporter) new JmxReporter()), time, true)) {
MetricName inheritedMetric = inherited.metricInstance(SampleMetrics.METRIC_WITH_INHERITED_TAGS, childTagsWithValues);
Map<String, String> filledOutTags = inheritedMetric.tags();
assertEquals("parent-tag should be set properly", filledOutTags.get("parent-tag"), "parent-tag-value");
assertEquals("child-tag should be set properly", filledOutTags.get("child-tag"), "child-tag-value");
try {
inherited.metricInstance(SampleMetrics.METRIC_WITH_INHERITED_TAGS, parentTagsWithValues);
fail("Creating MetricName should fail if the child metrics are not defined at runtime");
} catch (IllegalArgumentException e) {
// this is expected
}
try {
Map<String, String> runtimeTags = new HashMap<>();
runtimeTags.put("child-tag", "child-tag-value");
runtimeTags.put("tag-not-in-template", "unexpected-value");
inherited.metricInstance(SampleMetrics.METRIC_WITH_INHERITED_TAGS, runtimeTags);
fail("Creating MetricName should fail if there is a tag at runtime that is not in the template");
} catch (IllegalArgumentException e) {
// this is expected
}
}
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.metrics;
import org.apache.kafka.common.MetricNameTemplate;
/**
* A registry of predefined Metrics for the MetricsTest.java class.
*/
public class SampleMetrics {
public static final MetricNameTemplate METRIC1 = new MetricNameTemplate("name", "group", "The first metric used in testMetricName()", "key1", "key2");
public static final MetricNameTemplate METRIC2 = new MetricNameTemplate("name", "group", "The second metric used in testMetricName()", "key1", "key2");
public static final MetricNameTemplate METRIC_WITH_INHERITED_TAGS = new MetricNameTemplate("inherited.tags", "group", "inherited.tags in testMetricName", "parent-tag", "child-tag");
}

View File

@ -1224,110 +1224,7 @@
<h5><a id="new_consumer_fetch_monitoring" href="#new_consumer_fetch_monitoring">Consumer Fetch Metrics</a></h5>
<table class="data-table">
<tbody>
<tr>
<th>Metric/Attribute name</th>
<th>Description</th>
<th>Mbean name</th>
</tr>
<tr>
<td>fetch-size-avg</td>
<td>The average number of bytes fetched per request</td>
<td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>fetch-size-max</td>
<td>The maximum number of bytes fetched per request</td>
<td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>bytes-consumed-rate</td>
<td>The average number of bytes consumed per second</td>
<td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>records-per-request-avg</td>
<td>The average number of records in each request</td>
<td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>records-consumed-rate</td>
<td>The average number of records consumed per second</td>
<td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>fetch-latency-avg</td>
<td>The average time taken for a fetch request</td>
<td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>fetch-latency-max</td>
<td>The max time taken for a fetch request</td>
<td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>fetch-rate</td>
<td>The number of fetch requests per second</td>
<td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>records-lag-max</td>
<td>The maximum lag in terms of number of records for any partition in this window</td>
<td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>fetch-throttle-time-avg</td>
<td>The average throttle time in ms</td>
<td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>fetch-throttle-time-max</td>
<td>The maximum throttle time in ms</td>
<td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)</td>
</tr>
</tbody>
</table>
<h5><a id="topic_fetch_monitoring" href="#topic_fetch_monitoring">Topic-level Fetch Metrics</a></h5>
<table class="data-table">
<tbody>
<tr>
<th>Metric/Attribute name</th>
<th>Description</th>
<th>Mbean name</th>
</tr>
<tr>
<td>fetch-size-avg</td>
<td>The average number of bytes fetched per request for a specific topic.</td>
<td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
</tr>
<tr>
<td>fetch-size-max</td>
<td>The maximum number of bytes fetched per request for a specific topic.</td>
<td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
</tr>
<tr>
<td>bytes-consumed-rate</td>
<td>The average number of bytes consumed per second for a specific topic.</td>
<td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
</tr>
<tr>
<td>records-per-request-avg</td>
<td>The average number of records in each request for a specific topic.</td>
<td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
</tr>
<tr>
<td>records-consumed-rate</td>
<td>The average number of records consumed per second for a specific topic.</td>
<td>kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
</tr>
</tbody>
</table>
<!--#include virtual="generated/consumer_metrics.html" -->
<h4><a id="kafka_streams_monitoring" href="#kafka_streams_monitoring">Streams Monitoring</a></h4>