KAFKA-5597: Autogenerate producer sender metrics

Subtask of https://issues.apache.org/jira/browse/KAFKA-3480

The changes are very similar to what was done for the consumer in https://issues.apache.org/jira/browse/KAFKA-5191 (pull request https://github.com/apache/kafka/pull/2993)

Author: James Cheng <jylcheng@yahoo.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Rajini Sivaram <rajinisivaram@googlemail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #3535 from wushujames/producer_sender_metrics_docs

Fix one minor naming bug
This commit is contained in:
James Cheng 2017-09-05 17:36:53 -07:00 committed by Guozhang Wang
parent cd59976eea
commit 2fb5664bf4
9 changed files with 240 additions and 160 deletions

View File

@ -666,11 +666,18 @@ project(':core') {
standardOutput = new File(generatedDocsDir, "consumer_metrics.html").newOutputStream() standardOutput = new File(generatedDocsDir, "consumer_metrics.html").newOutputStream()
} }
task genProducerMetricsDocs(type: JavaExec) {
classpath = sourceSets.test.runtimeClasspath
main = 'org.apache.kafka.clients.producer.internals.ProducerMetrics'
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "producer_metrics.html").newOutputStream()
}
task siteDocsTar(dependsOn: ['genProtocolErrorDocs', 'genProtocolApiKeyDocs', 'genProtocolMessageDocs', task siteDocsTar(dependsOn: ['genProtocolErrorDocs', 'genProtocolApiKeyDocs', 'genProtocolMessageDocs',
'genAdminClientConfigDocs', 'genProducerConfigDocs', 'genConsumerConfigDocs', 'genAdminClientConfigDocs', 'genProducerConfigDocs', 'genConsumerConfigDocs',
'genKafkaConfigDocs', 'genTopicConfigDocs', 'genKafkaConfigDocs', 'genTopicConfigDocs',
':connect:runtime:genConnectConfigDocs', ':connect:runtime:genConnectTransformationDocs', ':connect:runtime:genConnectConfigDocs', ':connect:runtime:genConnectTransformationDocs',
':streams:genStreamsConfigDocs', 'genConsumerMetricsDocs'], type: Tar) { ':streams:genStreamsConfigDocs', 'genConsumerMetricsDocs', 'genProducerMetricsDocs'], type: Tar) {
classifier = 'site-docs' classifier = 'site-docs'
compression = Compression.GZIP compression = Compression.GZIP
from project.file("$rootDir/docs") from project.file("$rootDir/docs")

View File

@ -31,6 +31,8 @@
files="ConsumerCoordinator.java"/> files="ConsumerCoordinator.java"/>
<suppress checks="ParameterNumber" <suppress checks="ParameterNumber"
files="Fetcher.java"/> files="Fetcher.java"/>
<suppress checks="ParameterNumber"
files="Sender.java"/>
<suppress checks="ParameterNumber" <suppress checks="ParameterNumber"
files="ConfigDef.java"/> files="ConfigDef.java"/>
<suppress checks="ParameterNumber" <suppress checks="ParameterNumber"

View File

@ -22,6 +22,7 @@ import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.internals.ProducerInterceptors; import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.clients.producer.internals.ProducerMetrics;
import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender; import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.clients.producer.internals.TransactionManager; import org.apache.kafka.clients.producer.internals.TransactionManager;
@ -328,6 +329,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
MetricsReporter.class); MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX)); reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time); this.metrics = new Metrics(metricConfig, reporters, time);
ProducerMetrics metricsRegistry = new ProducerMetrics(metricTags.keySet(), "producer");
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class); this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
if (keySerializer == null) { if (keySerializer == null) {
@ -380,7 +382,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds()); this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics); Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics, metricsRegistry.senderMetrics);
NetworkClient client = new NetworkClient( NetworkClient client = new NetworkClient(
new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
this.metrics, time, "producer", channelBuilder), this.metrics, time, "producer", channelBuilder),
@ -405,6 +407,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
acks, acks,
retries, retries,
this.metrics, this.metrics,
metricsRegistry.senderMetrics,
Time.SYSTEM, Time.SYSTEM,
this.requestTimeoutMs, this.requestTimeoutMs,
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.producer.internals;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.metrics.Metrics;
public class ProducerMetrics {
public SenderMetricsRegistry senderMetrics;
public ProducerMetrics(Set<String> tags, String metricGrpPrefix) {
this.senderMetrics = new SenderMetricsRegistry(tags);
}
private List<MetricNameTemplate> getAllTemplates() {
List<MetricNameTemplate> l = new ArrayList<>();
l.addAll(this.senderMetrics.getAllTemplates());
return l;
}
public static void main(String[] args) {
Set<String> tags = new HashSet<>();
tags.add("client-id");
ProducerMetrics metrics = new ProducerMetrics(tags, "producer");
System.out.println(Metrics.toHtmlTable("kafka.producer", metrics.getAllTemplates()));
}
}

View File

@ -129,6 +129,7 @@ public class Sender implements Runnable {
short acks, short acks,
int retries, int retries,
Metrics metrics, Metrics metrics,
SenderMetricsRegistry metricsRegistry,
Time time, Time time,
int requestTimeout, int requestTimeout,
long retryBackoffMs, long retryBackoffMs,
@ -144,7 +145,7 @@ public class Sender implements Runnable {
this.acks = acks; this.acks = acks;
this.retries = retries; this.retries = retries;
this.time = time; this.time = time;
this.sensors = new SenderMetrics(metrics); this.sensors = new SenderMetrics(metrics, metricsRegistry);
this.requestTimeout = requestTimeout; this.requestTimeout = requestTimeout;
this.retryBackoffMs = retryBackoffMs; this.retryBackoffMs = retryBackoffMs;
this.apiVersions = apiVersions; this.apiVersions = apiVersions;
@ -676,13 +677,12 @@ public class Sender implements Runnable {
this.client.wakeup(); this.client.wakeup();
} }
public static Sensor throttleTimeSensor(Metrics metrics) { public static Sensor throttleTimeSensor(Metrics metrics, SenderMetricsRegistry metricsRegistry) {
String metricGrpName = SenderMetrics.METRIC_GROUP_NAME;
Sensor produceThrottleTimeSensor = metrics.sensor("produce-throttle-time"); Sensor produceThrottleTimeSensor = metrics.sensor("produce-throttle-time");
produceThrottleTimeSensor.add(metrics.metricName("produce-throttle-time-avg", MetricName m = metrics.metricInstance(metricsRegistry.produceThrottleTimeAvg);
metricGrpName, "The average throttle time in ms"), new Avg()); produceThrottleTimeSensor.add(m, new Avg());
produceThrottleTimeSensor.add(metrics.metricName("produce-throttle-time-max", m = metrics.metricInstance(metricsRegistry.produceThrottleTimeMax);
metricGrpName, "The maximum throttle time in ms"), new Max()); produceThrottleTimeSensor.add(m, new Max());
return produceThrottleTimeSensor; return produceThrottleTimeSensor;
} }
@ -690,8 +690,6 @@ public class Sender implements Runnable {
* A collection of sensors for the sender * A collection of sensors for the sender
*/ */
private class SenderMetrics { private class SenderMetrics {
private static final String METRIC_GROUP_NAME = "producer-metrics";
private final Metrics metrics; private final Metrics metrics;
public final Sensor retrySensor; public final Sensor retrySensor;
public final Sensor errorSensor; public final Sensor errorSensor;
@ -702,60 +700,61 @@ public class Sender implements Runnable {
public final Sensor compressionRateSensor; public final Sensor compressionRateSensor;
public final Sensor maxRecordSizeSensor; public final Sensor maxRecordSizeSensor;
public final Sensor batchSplitSensor; public final Sensor batchSplitSensor;
private SenderMetricsRegistry metricsRegistry;
public SenderMetrics(Metrics metrics) { public SenderMetrics(Metrics metrics, SenderMetricsRegistry metricsRegistry) {
this.metrics = metrics; this.metrics = metrics;
String metricGrpName = METRIC_GROUP_NAME; this.metricsRegistry = metricsRegistry;
this.batchSizeSensor = metrics.sensor("batch-size"); this.batchSizeSensor = metrics.sensor("batch-size");
MetricName m = metrics.metricName("batch-size-avg", metricGrpName, "The average number of bytes sent per partition per-request."); MetricName m = metrics.metricInstance(metricsRegistry.batchSizeAvg);
this.batchSizeSensor.add(m, new Avg()); this.batchSizeSensor.add(m, new Avg());
m = metrics.metricName("batch-size-max", metricGrpName, "The max number of bytes sent per partition per-request."); m = metrics.metricInstance(metricsRegistry.batchSizeMax);
this.batchSizeSensor.add(m, new Max()); this.batchSizeSensor.add(m, new Max());
this.compressionRateSensor = metrics.sensor("compression-rate"); this.compressionRateSensor = metrics.sensor("compression-rate");
m = metrics.metricName("compression-rate-avg", metricGrpName, "The average compression rate of record batches."); m = metrics.metricInstance(metricsRegistry.compressionRateAvg);
this.compressionRateSensor.add(m, new Avg()); this.compressionRateSensor.add(m, new Avg());
this.queueTimeSensor = metrics.sensor("queue-time"); this.queueTimeSensor = metrics.sensor("queue-time");
m = metrics.metricName("record-queue-time-avg", metricGrpName, "The average time in ms record batches spent in the record accumulator."); m = metrics.metricInstance(metricsRegistry.recordQueueTimeAvg);
this.queueTimeSensor.add(m, new Avg()); this.queueTimeSensor.add(m, new Avg());
m = metrics.metricName("record-queue-time-max", metricGrpName, "The maximum time in ms record batches spent in the record accumulator."); m = metrics.metricInstance(metricsRegistry.recordQueueTimeMax);
this.queueTimeSensor.add(m, new Max()); this.queueTimeSensor.add(m, new Max());
this.requestTimeSensor = metrics.sensor("request-time"); this.requestTimeSensor = metrics.sensor("request-time");
m = metrics.metricName("request-latency-avg", metricGrpName, "The average request latency in ms"); m = metrics.metricInstance(metricsRegistry.requestLatencyAvg);
this.requestTimeSensor.add(m, new Avg()); this.requestTimeSensor.add(m, new Avg());
m = metrics.metricName("request-latency-max", metricGrpName, "The maximum request latency in ms"); m = metrics.metricInstance(metricsRegistry.requestLatencyMax);
this.requestTimeSensor.add(m, new Max()); this.requestTimeSensor.add(m, new Max());
this.recordsPerRequestSensor = metrics.sensor("records-per-request"); this.recordsPerRequestSensor = metrics.sensor("records-per-request");
m = metrics.metricName("record-send-rate", metricGrpName, "The average number of records sent per second."); m = metrics.metricInstance(metricsRegistry.recordSendRate);
this.recordsPerRequestSensor.add(m, new Rate()); this.recordsPerRequestSensor.add(m, new Rate());
m = metrics.metricName("records-per-request-avg", metricGrpName, "The average number of records per request."); m = metrics.metricInstance(metricsRegistry.recordsPerRequestAvg);
this.recordsPerRequestSensor.add(m, new Avg()); this.recordsPerRequestSensor.add(m, new Avg());
this.retrySensor = metrics.sensor("record-retries"); this.retrySensor = metrics.sensor("record-retries");
m = metrics.metricName("record-retry-rate", metricGrpName, "The average per-second number of retried record sends"); m = metrics.metricInstance(metricsRegistry.recordRetryRate);
this.retrySensor.add(m, new Rate()); this.retrySensor.add(m, new Rate());
this.errorSensor = metrics.sensor("errors"); this.errorSensor = metrics.sensor("errors");
m = metrics.metricName("record-error-rate", metricGrpName, "The average per-second number of record sends that resulted in errors"); m = metrics.metricInstance(metricsRegistry.recordErrorRate);
this.errorSensor.add(m, new Rate()); this.errorSensor.add(m, new Rate());
this.maxRecordSizeSensor = metrics.sensor("record-size-max"); this.maxRecordSizeSensor = metrics.sensor("record-size");
m = metrics.metricName("record-size-max", metricGrpName, "The maximum record size"); m = metrics.metricInstance(metricsRegistry.recordSizeMax);
this.maxRecordSizeSensor.add(m, new Max()); this.maxRecordSizeSensor.add(m, new Max());
m = metrics.metricName("record-size-avg", metricGrpName, "The average record size"); m = metrics.metricInstance(metricsRegistry.recordSizeAvg);
this.maxRecordSizeSensor.add(m, new Avg()); this.maxRecordSizeSensor.add(m, new Avg());
m = metrics.metricName("requests-in-flight", metricGrpName, "The current number of in-flight requests awaiting a response."); m = metrics.metricInstance(metricsRegistry.requestsInFlight);
this.metrics.addMetric(m, new Measurable() { this.metrics.addMetric(m, new Measurable() {
public double measure(MetricConfig config, long now) { public double measure(MetricConfig config, long now) {
return client.inFlightRequestCount(); return client.inFlightRequestCount();
} }
}); });
m = metrics.metricName("metadata-age", metricGrpName, "The age in seconds of the current producer metadata being used."); m = metrics.metricInstance(metricsRegistry.metadataAge);
metrics.addMetric(m, new Measurable() { metrics.addMetric(m, new Measurable() {
public double measure(MetricConfig config, long now) { public double measure(MetricConfig config, long now) {
return (now - metadata.lastSuccessfulUpdate()) / 1000.0; return (now - metadata.lastSuccessfulUpdate()) / 1000.0;
@ -763,7 +762,7 @@ public class Sender implements Runnable {
}); });
this.batchSplitSensor = metrics.sensor("batch-split-rate"); this.batchSplitSensor = metrics.sensor("batch-split-rate");
m = metrics.metricName("batch-split-rate", metricGrpName, "The rate of record batch split"); m = metrics.metricInstance(metricsRegistry.batchSplitRate);
this.batchSplitSensor.add(m, new Rate()); this.batchSplitSensor.add(m, new Rate());
} }
@ -774,30 +773,29 @@ public class Sender implements Runnable {
Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName); Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName);
if (topicRecordCount == null) { if (topicRecordCount == null) {
Map<String, String> metricTags = Collections.singletonMap("topic", topic); Map<String, String> metricTags = Collections.singletonMap("topic", topic);
String metricGrpName = "producer-topic-metrics";
topicRecordCount = this.metrics.sensor(topicRecordsCountName); topicRecordCount = this.metrics.sensor(topicRecordsCountName);
MetricName m = this.metrics.metricName("record-send-rate", metricGrpName, metricTags); MetricName m = this.metrics.metricInstance(metricsRegistry.topicRecordSendRate, metricTags);
topicRecordCount.add(m, new Rate()); topicRecordCount.add(m, new Rate());
String topicByteRateName = "topic." + topic + ".bytes"; String topicByteRateName = "topic." + topic + ".bytes";
Sensor topicByteRate = this.metrics.sensor(topicByteRateName); Sensor topicByteRate = this.metrics.sensor(topicByteRateName);
m = this.metrics.metricName("byte-rate", metricGrpName, metricTags); m = this.metrics.metricInstance(metricsRegistry.topicByteRate, metricTags);
topicByteRate.add(m, new Rate()); topicByteRate.add(m, new Rate());
String topicCompressionRateName = "topic." + topic + ".compression-rate"; String topicCompressionRateName = "topic." + topic + ".compression-rate";
Sensor topicCompressionRate = this.metrics.sensor(topicCompressionRateName); Sensor topicCompressionRate = this.metrics.sensor(topicCompressionRateName);
m = this.metrics.metricName("compression-rate", metricGrpName, metricTags); m = this.metrics.metricInstance(metricsRegistry.topicCompressionRate, metricTags);
topicCompressionRate.add(m, new Avg()); topicCompressionRate.add(m, new Avg());
String topicRetryName = "topic." + topic + ".record-retries"; String topicRetryName = "topic." + topic + ".record-retries";
Sensor topicRetrySensor = this.metrics.sensor(topicRetryName); Sensor topicRetrySensor = this.metrics.sensor(topicRetryName);
m = this.metrics.metricName("record-retry-rate", metricGrpName, metricTags); m = this.metrics.metricInstance(metricsRegistry.topicRecordRetryRate, metricTags);
topicRetrySensor.add(m, new Rate()); topicRetrySensor.add(m, new Rate());
String topicErrorName = "topic." + topic + ".record-errors"; String topicErrorName = "topic." + topic + ".record-errors";
Sensor topicErrorSensor = this.metrics.sensor(topicErrorName); Sensor topicErrorSensor = this.metrics.sensor(topicErrorName);
m = this.metrics.metricName("record-error-rate", metricGrpName, metricTags); m = this.metrics.metricInstance(metricsRegistry.topicRecordErrorRate, metricTags);
topicErrorSensor.add(m, new Rate()); topicErrorSensor.add(m, new Rate());
} }
} }

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.producer.internals;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.kafka.common.MetricNameTemplate;
public class SenderMetricsRegistry {
final static String METRIC_GROUP_NAME = "producer-metrics";
final static String TOPIC_METRIC_GROUP_NAME = "producer-topic-metrics";
public MetricNameTemplate batchSizeAvg;
public MetricNameTemplate batchSizeMax;
public MetricNameTemplate compressionRateAvg;
public MetricNameTemplate recordQueueTimeAvg;
public MetricNameTemplate recordQueueTimeMax;
public MetricNameTemplate requestLatencyAvg;
public MetricNameTemplate requestLatencyMax;
public MetricNameTemplate produceThrottleTimeAvg;
public MetricNameTemplate produceThrottleTimeMax;
public MetricNameTemplate recordSendRate;
public MetricNameTemplate recordsPerRequestAvg;
public MetricNameTemplate recordRetryRate;
public MetricNameTemplate recordErrorRate;
public MetricNameTemplate recordSizeMax;
public MetricNameTemplate recordSizeAvg;
public MetricNameTemplate requestsInFlight;
public MetricNameTemplate metadataAge;
public MetricNameTemplate topicRecordSendRate;
public MetricNameTemplate topicByteRate;
public MetricNameTemplate topicCompressionRate;
public MetricNameTemplate topicRecordRetryRate;
public MetricNameTemplate topicRecordErrorRate;
public MetricNameTemplate batchSplitRate;
public SenderMetricsRegistry() {
this(new HashSet<String>());
}
public SenderMetricsRegistry(Set<String> tags) {
/* ***** Client level *****/
this.batchSizeAvg = new MetricNameTemplate("batch-size-avg", METRIC_GROUP_NAME, "The average number of bytes sent per partition per-request.", tags);
this.batchSizeMax = new MetricNameTemplate("batch-size-max", METRIC_GROUP_NAME, "The max number of bytes sent per partition per-request.", tags);
this.compressionRateAvg = new MetricNameTemplate("compression-rate-avg", METRIC_GROUP_NAME, "The average compression rate of record batches.", tags);
this.recordQueueTimeAvg = new MetricNameTemplate("record-queue-time-avg", METRIC_GROUP_NAME, "The average time in ms record batches spent in the send buffer.", tags);
this.recordQueueTimeMax = new MetricNameTemplate("record-queue-time-max", METRIC_GROUP_NAME, "The maximum time in ms record batches spent in the send buffer.", tags);
this.requestLatencyAvg = new MetricNameTemplate("request-latency-avg", METRIC_GROUP_NAME, "The average request latency in ms", tags);
this.requestLatencyMax = new MetricNameTemplate("request-latency-max", METRIC_GROUP_NAME, "The maximum request latency in ms", tags);
this.recordSendRate = new MetricNameTemplate("record-send-rate", METRIC_GROUP_NAME, "The average number of records sent per second.", tags);
this.recordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", METRIC_GROUP_NAME, "The average number of records per request.", tags);
this.recordRetryRate = new MetricNameTemplate("record-retry-rate", METRIC_GROUP_NAME, "The average per-second number of retried record sends", tags);
this.recordErrorRate = new MetricNameTemplate("record-error-rate", METRIC_GROUP_NAME, "The average per-second number of record sends that resulted in errors", tags);
this.recordSizeMax = new MetricNameTemplate("record-size-max", METRIC_GROUP_NAME, "The maximum record size", tags);
this.recordSizeAvg = new MetricNameTemplate("record-size-avg", METRIC_GROUP_NAME, "The average record size", tags);
this.requestsInFlight = new MetricNameTemplate("requests-in-flight", METRIC_GROUP_NAME, "The current number of in-flight requests awaiting a response.", tags);
this.metadataAge = new MetricNameTemplate("metadata-age", METRIC_GROUP_NAME, "The age in seconds of the current producer metadata being used.", tags);
this.batchSplitRate = new MetricNameTemplate("batch-split-rate", METRIC_GROUP_NAME, "The average number of batch splits per second", tags);
this.produceThrottleTimeAvg = new MetricNameTemplate("produce-throttle-time-avg", METRIC_GROUP_NAME, "The average time in ms a request was throttled by a broker", tags);
this.produceThrottleTimeMax = new MetricNameTemplate("produce-throttle-time-max", METRIC_GROUP_NAME, "The maximum time in ms a request was throttled by a broker", tags);
/* ***** Topic level *****/
Set<String> topicTags = new HashSet<String>(tags);
topicTags.add("topic");
this.topicRecordSendRate = new MetricNameTemplate("record-send-rate", TOPIC_METRIC_GROUP_NAME, "The average number of records sent per second for a topic.", topicTags);
this.topicByteRate = new MetricNameTemplate("byte-rate", TOPIC_METRIC_GROUP_NAME, "The average number of bytes sent per second for a topic.", topicTags);
this.topicCompressionRate = new MetricNameTemplate("compression-rate", TOPIC_METRIC_GROUP_NAME, "The average compression rate of record batches for a topic.", topicTags);
this.topicRecordRetryRate = new MetricNameTemplate("record-retry-rate", TOPIC_METRIC_GROUP_NAME, "The average per-second number of retried record sends for a topic", topicTags);
this.topicRecordErrorRate = new MetricNameTemplate("record-error-rate", TOPIC_METRIC_GROUP_NAME, "The average per-second number of record sends that resulted in errors for a topic", topicTags);
}
public List<MetricNameTemplate> getAllTemplates() {
return Arrays.asList(this.batchSizeAvg,
this.batchSizeMax,
this.compressionRateAvg,
this.recordQueueTimeAvg,
this.recordQueueTimeMax,
this.requestLatencyAvg,
this.requestLatencyMax,
this.recordSendRate,
this.recordsPerRequestAvg,
this.recordRetryRate,
this.recordErrorRate,
this.recordSizeMax,
this.recordSizeAvg,
this.requestsInFlight,
this.metadataAge,
this.batchSplitRate,
this.produceThrottleTimeAvg,
this.produceThrottleTimeMax,
// per-topic metrics
this.topicRecordSendRate,
this.topicByteRate,
this.topicCompressionRate,
this.topicRecordRetryRate,
this.topicRecordErrorRate
);
}
}

View File

@ -88,7 +88,6 @@ public class SenderTest {
private static final short ACKS_ALL = -1; private static final short ACKS_ALL = -1;
private static final int MAX_RETRIES = 0; private static final int MAX_RETRIES = 0;
private static final String CLIENT_ID = "clientId"; private static final String CLIENT_ID = "clientId";
private static final String METRIC_GROUP = "producer-metrics";
private static final double EPS = 0.0001; private static final double EPS = 0.0001;
private static final int MAX_BLOCK_TIMEOUT = 1000; private static final int MAX_BLOCK_TIMEOUT = 1000;
private static final int REQUEST_TIMEOUT = 1000; private static final int REQUEST_TIMEOUT = 1000;
@ -104,6 +103,7 @@ public class SenderTest {
private Metrics metrics = null; private Metrics metrics = null;
private RecordAccumulator accumulator = null; private RecordAccumulator accumulator = null;
private Sender sender = null; private Sender sender = null;
private SenderMetricsRegistry senderMetricsRegistry = null;
private final LogContext loggerFactory = new LogContext(); private final LogContext loggerFactory = new LogContext();
@Before @Before
@ -235,7 +235,7 @@ public class SenderTest {
@Test @Test
public void testQuotaMetrics() throws Exception { public void testQuotaMetrics() throws Exception {
MockSelector selector = new MockSelector(time); MockSelector selector = new MockSelector(time);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics); Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics, this.senderMetricsRegistry);
Cluster cluster = TestUtils.singletonCluster("test", 1); Cluster cluster = TestUtils.singletonCluster("test", 1);
Node node = cluster.nodes().get(0); Node node = cluster.nodes().get(0);
NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
@ -263,8 +263,8 @@ public class SenderTest {
selector.clear(); selector.clear();
} }
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics(); Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric avgMetric = allMetrics.get(metrics.metricName("produce-throttle-time-avg", METRIC_GROUP, "")); KafkaMetric avgMetric = allMetrics.get(metrics.metricInstance(this.senderMetricsRegistry.produceThrottleTimeAvg));
KafkaMetric maxMetric = allMetrics.get(metrics.metricName("produce-throttle-time-max", METRIC_GROUP, "")); KafkaMetric maxMetric = allMetrics.get(metrics.metricInstance(this.senderMetricsRegistry.produceThrottleTimeMax));
// Throttle times are ApiVersions=400, Produce=(100, 200, 300) // Throttle times are ApiVersions=400, Produce=(100, 200, 300)
assertEquals(250, avgMetric.value(), EPS); assertEquals(250, avgMetric.value(), EPS);
assertEquals(400, maxMetric.value(), EPS); assertEquals(400, maxMetric.value(), EPS);
@ -278,7 +278,7 @@ public class SenderTest {
Metrics m = new Metrics(); Metrics m = new Metrics();
try { try {
Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
maxRetries, m, time, REQUEST_TIMEOUT, 50, null, apiVersions); maxRetries, m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, null, apiVersions);
// do a successful retry // do a successful retry
Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // connect
@ -325,7 +325,7 @@ public class SenderTest {
Metrics m = new Metrics(); Metrics m = new Metrics();
try { try {
Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
m, time, REQUEST_TIMEOUT, 50, null, apiVersions); m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, null, apiVersions);
// Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1 // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
Cluster cluster1 = TestUtils.clusterWith(2, "test", 2); Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds()); metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());
@ -577,7 +577,7 @@ public class SenderTest {
int maxRetries = 10; int maxRetries = 10;
Metrics m = new Metrics(); Metrics m = new Metrics();
Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
client.prepareResponse(new MockClient.RequestMatcher() { client.prepareResponse(new MockClient.RequestMatcher() {
@ -617,8 +617,9 @@ public class SenderTest {
int maxRetries = 10; int maxRetries = 10;
Metrics m = new Metrics(); Metrics m = new Metrics();
SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry();
Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); m, metricsRegistry, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // connect. sender.run(time.milliseconds()); // connect.
@ -637,7 +638,7 @@ public class SenderTest {
sender.run(time.milliseconds()); // nothing to do, since the pid has changed. We should check the metrics for errors. sender.run(time.milliseconds()); // nothing to do, since the pid has changed. We should check the metrics for errors.
assertEquals("Expected requests to be aborted after pid change", 0, client.inFlightRequestCount()); assertEquals("Expected requests to be aborted after pid change", 0, client.inFlightRequestCount());
KafkaMetric recordErrors = m.metrics().get(m.metricName("record-error-rate", METRIC_GROUP, "")); KafkaMetric recordErrors = m.metrics().get(m.metricInstance(metricsRegistry.recordErrorRate));
assertTrue("Expected non-zero value for record send errors", recordErrors.value() > 0); assertTrue("Expected non-zero value for record send errors", recordErrors.value() > 0);
assertTrue(responseFuture.isDone()); assertTrue(responseFuture.isDone());
@ -655,7 +656,7 @@ public class SenderTest {
int maxRetries = 10; int maxRetries = 10;
Metrics m = new Metrics(); Metrics m = new Metrics();
Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // connect. sender.run(time.milliseconds()); // connect.
@ -706,8 +707,9 @@ public class SenderTest {
try (Metrics m = new Metrics()) { try (Metrics m = new Metrics()) {
accumulator = new RecordAccumulator(loggerFactory, batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time, accumulator = new RecordAccumulator(loggerFactory, batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time,
new ApiVersions(), txnManager); new ApiVersions(), txnManager);
SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry();
Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
m, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions()); m, metricsRegistry, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions());
// Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1 // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
Cluster cluster1 = TestUtils.clusterWith(2, topic, 2); Cluster cluster1 = TestUtils.clusterWith(2, topic, 2);
metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds()); metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());
@ -769,7 +771,7 @@ public class SenderTest {
assertTrue("There should be no batch in the accumulator", accumulator.batches().get(tp).isEmpty()); assertTrue("There should be no batch in the accumulator", accumulator.batches().get(tp).isEmpty());
assertTrue("There should be a split", assertTrue("There should be a split",
m.metrics().get(m.metricName("batch-split-rate", "producer-metrics")).value() > 0); m.metrics().get(m.metricInstance(metricsRegistry.batchSplitRate)).value() > 0);
} }
} }
@ -826,8 +828,9 @@ public class SenderTest {
this.metrics = new Metrics(metricConfig, time); this.metrics = new Metrics(metricConfig, time);
this.accumulator = new RecordAccumulator(loggerFactory, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, this.accumulator = new RecordAccumulator(loggerFactory, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time,
apiVersions, transactionManager); apiVersions, transactionManager);
this.senderMetricsRegistry = new SenderMetricsRegistry(metricTags.keySet());
this.sender = new Sender(loggerFactory, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, this.sender = new Sender(loggerFactory, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
MAX_RETRIES, this.metrics, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); MAX_RETRIES, this.metrics, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds()); this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds());
} }

View File

@ -124,7 +124,7 @@ public class TransactionManagerTest {
Metrics metrics = new Metrics(metricConfig, time); Metrics metrics = new Metrics(metricConfig, time);
this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionManager); this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionManager);
this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
MAX_RETRIES, metrics, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); MAX_RETRIES, metrics, new SenderMetricsRegistry(metricTags.keySet()), this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds()); this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds());
client.setNode(brokerNode); client.setNode(brokerNode);
} }

View File

@ -1039,119 +1039,13 @@
<td>The fraction of time an appender waits for space allocation.</td> <td>The fraction of time an appender waits for space allocation.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr> </tr>
<tr>
<td>batch-size-avg</td>
<td>The average number of bytes sent per partition per-request.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>batch-size-max</td>
<td>The max number of bytes sent per partition per-request.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>compression-rate-avg</td>
<td>The average compression rate of record batches.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>record-queue-time-avg</td>
<td>The average time in ms record batches spent in the record accumulator.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>record-queue-time-max</td>
<td>The maximum time in ms record batches spent in the record accumulator.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>request-latency-avg</td>
<td>The average request latency in ms.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>request-latency-max</td>
<td>The maximum request latency in ms.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>record-send-rate</td>
<td>The average number of records sent per second.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>records-per-request-avg</td>
<td>The average number of records per request.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>record-retry-rate</td>
<td>The average per-second number of retried record sends.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>record-error-rate</td>
<td>The average per-second number of record sends that resulted in errors.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>record-size-max</td>
<td>The maximum record size.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>record-size-avg</td>
<td>The average record size.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>requests-in-flight</td>
<td>The current number of in-flight requests awaiting a response.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>metadata-age</td>
<td>The age in seconds of the current producer metadata being used.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>produce-throttle-time-max</td>
<td>The maximum time in ms a request was throttled by a broker.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>produce-throttle-time-avg</td>
<td>The average time in ms a request was throttled by a broker.</td>
<td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
</tr>
<tr>
<td>record-send-rate</td>
<td>The average number of records sent per second for a topic.</td>
<td>kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
</tr>
<tr>
<td>byte-rate</td>
<td>The average number of bytes sent per second for a topic.</td>
<td>kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
</tr>
<tr>
<td>compression-rate</td>
<td>The average compression rate of record batches for a topic.</td>
<td>kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
</tr>
<tr>
<td>record-retry-rate</td>
<td>The average per-second number of retried record sends for a topic.</td>
<td>kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
</tr>
<tr>
<td>record-error-rate</td>
<td>The average per-second number of record sends that resulted in errors for a topic.</td>
<td>kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
</tr>
</tbody></table> </tbody></table>
<h5><a id="producer_sender_monitoring" href="#producer_sender_monitoring">Producer Sender Metrics</a></h5>
<!--#include virtual="generated/producer_metrics.html" -->
<h4><a id="new_consumer_monitoring" href="#new_consumer_monitoring">New consumer monitoring</a></h4> <h4><a id="new_consumer_monitoring" href="#new_consumer_monitoring">New consumer monitoring</a></h4>