KAFKA-2668; Add a metric that records the total number of metrics

onurkaraman becketqin Do you have time to review this patch? It addresses the ticket that jjkoshy filed in KAFKA-2668.

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Onur Karaman <okaraman@linkedin.com>, Joel Koshy <jjkoshy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>, Jun Rao <junrao@gmail.com>

Closes #328 from lindong28/KAFKA-2668
This commit is contained in:
Dong Lin 2015-12-08 19:43:05 -08:00 committed by Jun Rao
parent ee6b5e044c
commit ef92a8ae74
35 changed files with 458 additions and 375 deletions

View File

@ -531,12 +531,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG); throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
this.time = new SystemTime(); this.time = new SystemTime();
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
TimeUnit.MILLISECONDS);
clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0) if (clientId.length() <= 0)
clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement(); clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
Map<String, String> metricsTags = new LinkedHashMap<String, String>();
metricsTags.put("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.tags(metricsTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class); MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX)); reporters.add(new JmxReporter(JMX_PREFIX));
@ -546,11 +548,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), 0); this.metadata.update(Cluster.bootstrap(addresses), 0);
String metricGrpPrefix = "consumer"; String metricGrpPrefix = "consumer";
Map<String, String> metricsTags = new LinkedHashMap<String, String>();
metricsTags.put("client-id", clientId);
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
NetworkClient netClient = new NetworkClient( NetworkClient netClient = new NetworkClient(
new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags, channelBuilder), new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
this.metadata, this.metadata,
clientId, clientId,
100, // a fixed large enough value will suffice 100, // a fixed large enough value will suffice
@ -573,7 +573,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.subscriptions, this.subscriptions,
metrics, metrics,
metricGrpPrefix, metricGrpPrefix,
metricsTags,
this.time, this.time,
retryBackoffMs, retryBackoffMs,
new ConsumerCoordinator.DefaultOffsetCommitCallback(), new ConsumerCoordinator.DefaultOffsetCommitCallback(),
@ -606,7 +605,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.subscriptions, this.subscriptions,
metrics, metrics,
metricGrpPrefix, metricGrpPrefix,
metricsTags,
this.time, this.time,
this.retryBackoffMs); this.retryBackoffMs);

View File

@ -14,7 +14,6 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node; import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.GroupAuthorizationException;
@ -107,7 +106,6 @@ public abstract class AbstractCoordinator implements Closeable {
int heartbeatIntervalMs, int heartbeatIntervalMs,
Metrics metrics, Metrics metrics,
String metricGrpPrefix, String metricGrpPrefix,
Map<String, String> metricTags,
Time time, Time time,
long retryBackoffMs) { long retryBackoffMs) {
this.client = client; this.client = client;
@ -119,7 +117,7 @@ public abstract class AbstractCoordinator implements Closeable {
this.sessionTimeoutMs = sessionTimeoutMs; this.sessionTimeoutMs = sessionTimeoutMs;
this.heartbeat = new Heartbeat(this.sessionTimeoutMs, heartbeatIntervalMs, time.milliseconds()); this.heartbeat = new Heartbeat(this.sessionTimeoutMs, heartbeatIntervalMs, time.milliseconds());
this.heartbeatTask = new HeartbeatTask(); this.heartbeatTask = new HeartbeatTask();
this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix, metricTags); this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
this.retryBackoffMs = retryBackoffMs; this.retryBackoffMs = retryBackoffMs;
} }
@ -679,47 +677,39 @@ public abstract class AbstractCoordinator implements Closeable {
public final Sensor joinLatency; public final Sensor joinLatency;
public final Sensor syncLatency; public final Sensor syncLatency;
public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) { public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
this.metrics = metrics; this.metrics = metrics;
this.metricGrpName = metricGrpPrefix + "-coordinator-metrics"; this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
this.heartbeatLatency = metrics.sensor("heartbeat-latency"); this.heartbeatLatency = metrics.sensor("heartbeat-latency");
this.heartbeatLatency.add(new MetricName("heartbeat-response-time-max", this.heartbeatLatency.add(metrics.metricName("heartbeat-response-time-max",
this.metricGrpName, this.metricGrpName,
"The max time taken to receive a response to a heartbeat request", "The max time taken to receive a response to a heartbeat request"), new Max());
tags), new Max()); this.heartbeatLatency.add(metrics.metricName("heartbeat-rate",
this.heartbeatLatency.add(new MetricName("heartbeat-rate",
this.metricGrpName, this.metricGrpName,
"The average number of heartbeats per second", "The average number of heartbeats per second"), new Rate(new Count()));
tags), new Rate(new Count()));
this.joinLatency = metrics.sensor("join-latency"); this.joinLatency = metrics.sensor("join-latency");
this.joinLatency.add(new MetricName("join-time-avg", this.joinLatency.add(metrics.metricName("join-time-avg",
this.metricGrpName, this.metricGrpName,
"The average time taken for a group rejoin", "The average time taken for a group rejoin"), new Avg());
tags), new Avg()); this.joinLatency.add(metrics.metricName("join-time-max",
this.joinLatency.add(new MetricName("join-time-max",
this.metricGrpName, this.metricGrpName,
"The max time taken for a group rejoin", "The max time taken for a group rejoin"), new Avg());
tags), new Avg()); this.joinLatency.add(metrics.metricName("join-rate",
this.joinLatency.add(new MetricName("join-rate",
this.metricGrpName, this.metricGrpName,
"The number of group joins per second", "The number of group joins per second"), new Rate(new Count()));
tags), new Rate(new Count()));
this.syncLatency = metrics.sensor("sync-latency"); this.syncLatency = metrics.sensor("sync-latency");
this.syncLatency.add(new MetricName("sync-time-avg", this.syncLatency.add(metrics.metricName("sync-time-avg",
this.metricGrpName, this.metricGrpName,
"The average time taken for a group sync", "The average time taken for a group sync"), new Avg());
tags), new Avg()); this.syncLatency.add(metrics.metricName("sync-time-max",
this.syncLatency.add(new MetricName("sync-time-max",
this.metricGrpName, this.metricGrpName,
"The max time taken for a group sync", "The max time taken for a group sync"), new Avg());
tags), new Avg()); this.syncLatency.add(metrics.metricName("sync-rate",
this.syncLatency.add(new MetricName("sync-rate",
this.metricGrpName, this.metricGrpName,
"The number of group syncs per second", "The number of group syncs per second"), new Rate(new Count()));
tags), new Rate(new Count()));
Measurable lastHeartbeat = Measurable lastHeartbeat =
new Measurable() { new Measurable() {
@ -727,10 +717,9 @@ public abstract class AbstractCoordinator implements Closeable {
return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS); return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
} }
}; };
metrics.addMetric(new MetricName("last-heartbeat-seconds-ago", metrics.addMetric(metrics.metricName("last-heartbeat-seconds-ago",
this.metricGrpName, this.metricGrpName,
"The number of seconds since the last controller heartbeat", "The number of seconds since the last controller heartbeat"),
tags),
lastHeartbeat); lastHeartbeat);
} }
} }

View File

@ -22,7 +22,6 @@ import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription; import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TopicAuthorizationException;
@ -82,7 +81,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
SubscriptionState subscriptions, SubscriptionState subscriptions,
Metrics metrics, Metrics metrics,
String metricGrpPrefix, String metricGrpPrefix,
Map<String, String> metricTags,
Time time, Time time,
long retryBackoffMs, long retryBackoffMs,
OffsetCommitCallback defaultOffsetCommitCallback, OffsetCommitCallback defaultOffsetCommitCallback,
@ -94,7 +92,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
heartbeatIntervalMs, heartbeatIntervalMs,
metrics, metrics,
metricGrpPrefix, metricGrpPrefix,
metricTags,
time, time,
retryBackoffMs); retryBackoffMs);
this.metadata = metadata; this.metadata = metadata;
@ -109,7 +106,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
addMetadataListener(); addMetadataListener();
this.autoCommitTask = autoCommitEnabled ? new AutoCommitTask(autoCommitIntervalMs) : null; this.autoCommitTask = autoCommitEnabled ? new AutoCommitTask(autoCommitIntervalMs) : null;
this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix, metricTags); this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
} }
@Override @Override
@ -639,23 +636,20 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
public final Sensor commitLatency; public final Sensor commitLatency;
public ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) { public ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
this.metrics = metrics; this.metrics = metrics;
this.metricGrpName = metricGrpPrefix + "-coordinator-metrics"; this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
this.commitLatency = metrics.sensor("commit-latency"); this.commitLatency = metrics.sensor("commit-latency");
this.commitLatency.add(new MetricName("commit-latency-avg", this.commitLatency.add(metrics.metricName("commit-latency-avg",
this.metricGrpName, this.metricGrpName,
"The average time taken for a commit request", "The average time taken for a commit request"), new Avg());
tags), new Avg()); this.commitLatency.add(metrics.metricName("commit-latency-max",
this.commitLatency.add(new MetricName("commit-latency-max",
this.metricGrpName, this.metricGrpName,
"The max time taken for a commit request", "The max time taken for a commit request"), new Max());
tags), new Max()); this.commitLatency.add(metrics.metricName("commit-rate",
this.commitLatency.add(new MetricName("commit-rate",
this.metricGrpName, this.metricGrpName,
"The number of commit calls per second", "The number of commit calls per second"), new Rate(new Count()));
tags), new Rate(new Count()));
Measurable numParts = Measurable numParts =
new Measurable() { new Measurable() {
@ -663,11 +657,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
return subscriptions.assignedPartitions().size(); return subscriptions.assignedPartitions().size();
} }
}; };
metrics.addMetric(new MetricName("assigned-partitions", metrics.addMetric(metrics.metricName("assigned-partitions",
this.metricGrpName, this.metricGrpName,
"The number of partitions currently assigned to this consumer", "The number of partitions currently assigned to this consumer"), numParts);
tags),
numParts);
} }
} }

View File

@ -21,7 +21,6 @@ import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node; import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
@ -99,7 +98,6 @@ public class Fetcher<K, V> {
SubscriptionState subscriptions, SubscriptionState subscriptions,
Metrics metrics, Metrics metrics,
String metricGrpPrefix, String metricGrpPrefix,
Map<String, String> metricTags,
Time time, Time time,
long retryBackoffMs) { long retryBackoffMs) {
@ -120,7 +118,7 @@ public class Fetcher<K, V> {
this.unauthorizedTopics = new HashSet<>(); this.unauthorizedTopics = new HashSet<>();
this.recordTooLargePartitions = new HashMap<>(); this.recordTooLargePartitions = new HashMap<>();
this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags); this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix);
this.retryBackoffMs = retryBackoffMs; this.retryBackoffMs = retryBackoffMs;
} }
@ -656,64 +654,53 @@ public class Fetcher<K, V> {
public final Sensor fetchThrottleTimeSensor; public final Sensor fetchThrottleTimeSensor;
public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) { public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix) {
this.metrics = metrics; this.metrics = metrics;
this.metricGrpName = metricGrpPrefix + "-fetch-manager-metrics"; this.metricGrpName = metricGrpPrefix + "-fetch-manager-metrics";
this.bytesFetched = metrics.sensor("bytes-fetched"); this.bytesFetched = metrics.sensor("bytes-fetched");
this.bytesFetched.add(new MetricName("fetch-size-avg", this.bytesFetched.add(metrics.metricName("fetch-size-avg",
this.metricGrpName, this.metricGrpName,
"The average number of bytes fetched per request", "The average number of bytes fetched per request"), new Avg());
tags), new Avg()); this.bytesFetched.add(metrics.metricName("fetch-size-max",
this.bytesFetched.add(new MetricName("fetch-size-max",
this.metricGrpName, this.metricGrpName,
"The maximum number of bytes fetched per request", "The maximum number of bytes fetched per request"), new Max());
tags), new Max()); this.bytesFetched.add(metrics.metricName("bytes-consumed-rate",
this.bytesFetched.add(new MetricName("bytes-consumed-rate",
this.metricGrpName, this.metricGrpName,
"The average number of bytes consumed per second", "The average number of bytes consumed per second"), new Rate());
tags), new Rate());
this.recordsFetched = metrics.sensor("records-fetched"); this.recordsFetched = metrics.sensor("records-fetched");
this.recordsFetched.add(new MetricName("records-per-request-avg", this.recordsFetched.add(metrics.metricName("records-per-request-avg",
this.metricGrpName, this.metricGrpName,
"The average number of records in each request", "The average number of records in each request"), new Avg());
tags), new Avg()); this.recordsFetched.add(metrics.metricName("records-consumed-rate",
this.recordsFetched.add(new MetricName("records-consumed-rate",
this.metricGrpName, this.metricGrpName,
"The average number of records consumed per second", "The average number of records consumed per second"), new Rate());
tags), new Rate());
this.fetchLatency = metrics.sensor("fetch-latency"); this.fetchLatency = metrics.sensor("fetch-latency");
this.fetchLatency.add(new MetricName("fetch-latency-avg", this.fetchLatency.add(metrics.metricName("fetch-latency-avg",
this.metricGrpName, this.metricGrpName,
"The average time taken for a fetch request.", "The average time taken for a fetch request."), new Avg());
tags), new Avg()); this.fetchLatency.add(metrics.metricName("fetch-latency-max",
this.fetchLatency.add(new MetricName("fetch-latency-max",
this.metricGrpName, this.metricGrpName,
"The max time taken for any fetch request.", "The max time taken for any fetch request."), new Max());
tags), new Max()); this.fetchLatency.add(metrics.metricName("fetch-rate",
this.fetchLatency.add(new MetricName("fetch-rate",
this.metricGrpName, this.metricGrpName,
"The number of fetch requests per second.", "The number of fetch requests per second."), new Rate(new Count()));
tags), new Rate(new Count()));
this.recordsFetchLag = metrics.sensor("records-lag"); this.recordsFetchLag = metrics.sensor("records-lag");
this.recordsFetchLag.add(new MetricName("records-lag-max", this.recordsFetchLag.add(metrics.metricName("records-lag-max",
this.metricGrpName, this.metricGrpName,
"The maximum lag in terms of number of records for any partition in this window", "The maximum lag in terms of number of records for any partition in this window"), new Max());
tags), new Max());
this.fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time"); this.fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
this.fetchThrottleTimeSensor.add(new MetricName("fetch-throttle-time-avg", this.fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-avg",
this.metricGrpName, this.metricGrpName,
"The average throttle time in ms", "The average throttle time in ms"), new Avg());
tags), new Avg());
this.fetchThrottleTimeSensor.add(new MetricName("fetch-throttle-time-max", this.fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-max",
this.metricGrpName, this.metricGrpName,
"The maximum throttle time in ms", "The maximum throttle time in ms"), new Max());
tags), new Max());
} }
public void recordTopicFetchMetrics(String topic, int bytes, int records) { public void recordTopicFetchMetrics(String topic, int bytes, int records) {

View File

@ -203,12 +203,14 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
this.producerConfig = config; this.producerConfig = config;
this.time = new SystemTime(); this.time = new SystemTime();
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
TimeUnit.MILLISECONDS);
clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0) if (clientId.length() <= 0)
clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
Map<String, String> metricTags = new LinkedHashMap<String, String>();
metricTags.put("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.tags(metricTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class); MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX)); reporters.add(new JmxReporter(JMX_PREFIX));
@ -256,21 +258,18 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
} }
Map<String, String> metricTags = new LinkedHashMap<String, String>();
metricTags.put("client-id", clientId);
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.totalMemorySize, this.totalMemorySize,
this.compressionType, this.compressionType,
config.getLong(ProducerConfig.LINGER_MS_CONFIG), config.getLong(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs, retryBackoffMs,
metrics, metrics,
time, time);
metricTags);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
NetworkClient client = new NetworkClient( NetworkClient client = new NetworkClient(
new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", metricTags, channelBuilder), new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
this.metadata, this.metadata,
clientId, clientId,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),

View File

@ -19,7 +19,6 @@ package org.apache.kafka.clients.producer.internals;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Deque; import java.util.Deque;
import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -62,9 +61,8 @@ public final class BufferPool {
* @param metrics instance of Metrics * @param metrics instance of Metrics
* @param time time instance * @param time time instance
* @param metricGrpName logical group name for metrics * @param metricGrpName logical group name for metrics
* @param metricTags additional key/val attributes for metrics
*/ */
public BufferPool(long memory, int poolableSize, Metrics metrics, Time time , String metricGrpName , Map<String, String> metricTags) { public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
this.poolableSize = poolableSize; this.poolableSize = poolableSize;
this.lock = new ReentrantLock(); this.lock = new ReentrantLock();
this.free = new ArrayDeque<ByteBuffer>(); this.free = new ArrayDeque<ByteBuffer>();
@ -74,10 +72,9 @@ public final class BufferPool {
this.metrics = metrics; this.metrics = metrics;
this.time = time; this.time = time;
this.waitTime = this.metrics.sensor("bufferpool-wait-time"); this.waitTime = this.metrics.sensor("bufferpool-wait-time");
MetricName metricName = new MetricName("bufferpool-wait-ratio", MetricName metricName = metrics.metricName("bufferpool-wait-ratio",
metricGrpName, metricGrpName,
"The fraction of time an appender waits for space allocation.", "The fraction of time an appender waits for space allocation.");
metricTags);
this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS)); this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
} }

View File

@ -85,7 +85,6 @@ public final class RecordAccumulator {
* exhausting all retries in a short period of time. * exhausting all retries in a short period of time.
* @param metrics The metrics * @param metrics The metrics
* @param time The time instance to use * @param time The time instance to use
* @param metricTags additional key/value attributes of the metric
*/ */
public RecordAccumulator(int batchSize, public RecordAccumulator(int batchSize,
long totalSize, long totalSize,
@ -93,8 +92,7 @@ public final class RecordAccumulator {
long lingerMs, long lingerMs,
long retryBackoffMs, long retryBackoffMs,
Metrics metrics, Metrics metrics,
Time time, Time time) {
Map<String, String> metricTags) {
this.drainIndex = 0; this.drainIndex = 0;
this.closed = false; this.closed = false;
this.flushesInProgress = new AtomicInteger(0); this.flushesInProgress = new AtomicInteger(0);
@ -105,14 +103,14 @@ public final class RecordAccumulator {
this.retryBackoffMs = retryBackoffMs; this.retryBackoffMs = retryBackoffMs;
this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>(); this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();
String metricGrpName = "producer-metrics"; String metricGrpName = "producer-metrics";
this.free = new BufferPool(totalSize, batchSize, metrics, time , metricGrpName , metricTags); this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName);
this.incomplete = new IncompleteRecordBatches(); this.incomplete = new IncompleteRecordBatches();
this.time = time; this.time = time;
registerMetrics(metrics, metricGrpName, metricTags); registerMetrics(metrics, metricGrpName);
} }
private void registerMetrics(Metrics metrics, String metricGrpName, Map<String, String> metricTags) { private void registerMetrics(Metrics metrics, String metricGrpName) {
MetricName metricName = new MetricName("waiting-threads", metricGrpName, "The number of user threads blocked waiting for buffer memory to enqueue their records", metricTags); MetricName metricName = metrics.metricName("waiting-threads", metricGrpName, "The number of user threads blocked waiting for buffer memory to enqueue their records");
Measurable waitingThreads = new Measurable() { Measurable waitingThreads = new Measurable() {
public double measure(MetricConfig config, long now) { public double measure(MetricConfig config, long now) {
return free.queued(); return free.queued();
@ -120,7 +118,7 @@ public final class RecordAccumulator {
}; };
metrics.addMetric(metricName, waitingThreads); metrics.addMetric(metricName, waitingThreads);
metricName = new MetricName("buffer-total-bytes", metricGrpName, "The maximum amount of buffer memory the client can use (whether or not it is currently used).", metricTags); metricName = metrics.metricName("buffer-total-bytes", metricGrpName, "The maximum amount of buffer memory the client can use (whether or not it is currently used).");
Measurable totalBytes = new Measurable() { Measurable totalBytes = new Measurable() {
public double measure(MetricConfig config, long now) { public double measure(MetricConfig config, long now) {
return free.totalMemory(); return free.totalMemory();
@ -128,7 +126,7 @@ public final class RecordAccumulator {
}; };
metrics.addMetric(metricName, totalBytes); metrics.addMetric(metricName, totalBytes);
metricName = new MetricName("buffer-available-bytes", metricGrpName, "The total amount of buffer memory that is not being used (either unallocated or in the free list).", metricTags); metricName = metrics.metricName("buffer-available-bytes", metricGrpName, "The total amount of buffer memory that is not being used (either unallocated or in the free list).");
Measurable availableBytes = new Measurable() { Measurable availableBytes = new Measurable() {
public double measure(MetricConfig config, long now) { public double measure(MetricConfig config, long now) {
return free.availableMemory(); return free.availableMemory();
@ -137,7 +135,7 @@ public final class RecordAccumulator {
metrics.addMetric(metricName, availableBytes); metrics.addMetric(metricName, availableBytes);
Sensor bufferExhaustedRecordSensor = metrics.sensor("buffer-exhausted-records"); Sensor bufferExhaustedRecordSensor = metrics.sensor("buffer-exhausted-records");
metricName = new MetricName("buffer-exhausted-rate", metricGrpName, "The average per-second number of record sends that are dropped due to buffer exhaustion", metricTags); metricName = metrics.metricName("buffer-exhausted-rate", metricGrpName, "The average per-second number of record sends that are dropped due to buffer exhaustion");
bufferExhaustedRecordSensor.add(metricName, new Rate()); bufferExhaustedRecordSensor.add(metricName, new Rate());
} }

View File

@ -367,65 +367,63 @@ public class Sender implements Runnable {
public SenderMetrics(Metrics metrics) { public SenderMetrics(Metrics metrics) {
this.metrics = metrics; this.metrics = metrics;
Map<String, String> metricTags = new LinkedHashMap<String, String>();
metricTags.put("client-id", clientId);
String metricGrpName = "producer-metrics"; String metricGrpName = "producer-metrics";
this.batchSizeSensor = metrics.sensor("batch-size"); this.batchSizeSensor = metrics.sensor("batch-size");
MetricName m = new MetricName("batch-size-avg", metricGrpName, "The average number of bytes sent per partition per-request.", metricTags); MetricName m = metrics.metricName("batch-size-avg", metricGrpName, "The average number of bytes sent per partition per-request.");
this.batchSizeSensor.add(m, new Avg()); this.batchSizeSensor.add(m, new Avg());
m = new MetricName("batch-size-max", metricGrpName, "The max number of bytes sent per partition per-request.", metricTags); m = metrics.metricName("batch-size-max", metricGrpName, "The max number of bytes sent per partition per-request.");
this.batchSizeSensor.add(m, new Max()); this.batchSizeSensor.add(m, new Max());
this.compressionRateSensor = metrics.sensor("compression-rate"); this.compressionRateSensor = metrics.sensor("compression-rate");
m = new MetricName("compression-rate-avg", metricGrpName, "The average compression rate of record batches.", metricTags); m = metrics.metricName("compression-rate-avg", metricGrpName, "The average compression rate of record batches.");
this.compressionRateSensor.add(m, new Avg()); this.compressionRateSensor.add(m, new Avg());
this.queueTimeSensor = metrics.sensor("queue-time"); this.queueTimeSensor = metrics.sensor("queue-time");
m = new MetricName("record-queue-time-avg", metricGrpName, "The average time in ms record batches spent in the record accumulator.", metricTags); m = metrics.metricName("record-queue-time-avg", metricGrpName, "The average time in ms record batches spent in the record accumulator.");
this.queueTimeSensor.add(m, new Avg()); this.queueTimeSensor.add(m, new Avg());
m = new MetricName("record-queue-time-max", metricGrpName, "The maximum time in ms record batches spent in the record accumulator.", metricTags); m = metrics.metricName("record-queue-time-max", metricGrpName, "The maximum time in ms record batches spent in the record accumulator.");
this.queueTimeSensor.add(m, new Max()); this.queueTimeSensor.add(m, new Max());
this.requestTimeSensor = metrics.sensor("request-time"); this.requestTimeSensor = metrics.sensor("request-time");
m = new MetricName("request-latency-avg", metricGrpName, "The average request latency in ms", metricTags); m = metrics.metricName("request-latency-avg", metricGrpName, "The average request latency in ms");
this.requestTimeSensor.add(m, new Avg()); this.requestTimeSensor.add(m, new Avg());
m = new MetricName("request-latency-max", metricGrpName, "The maximum request latency in ms", metricTags); m = metrics.metricName("request-latency-max", metricGrpName, "The maximum request latency in ms");
this.requestTimeSensor.add(m, new Max()); this.requestTimeSensor.add(m, new Max());
this.produceThrottleTimeSensor = metrics.sensor("produce-throttle-time"); this.produceThrottleTimeSensor = metrics.sensor("produce-throttle-time");
m = new MetricName("produce-throttle-time-avg", metricGrpName, "The average throttle time in ms", metricTags); m = metrics.metricName("produce-throttle-time-avg", metricGrpName, "The average throttle time in ms");
this.produceThrottleTimeSensor.add(m, new Avg()); this.produceThrottleTimeSensor.add(m, new Avg());
m = new MetricName("produce-throttle-time-max", metricGrpName, "The maximum throttle time in ms", metricTags); m = metrics.metricName("produce-throttle-time-max", metricGrpName, "The maximum throttle time in ms");
this.produceThrottleTimeSensor.add(m, new Max()); this.produceThrottleTimeSensor.add(m, new Max());
this.recordsPerRequestSensor = metrics.sensor("records-per-request"); this.recordsPerRequestSensor = metrics.sensor("records-per-request");
m = new MetricName("record-send-rate", metricGrpName, "The average number of records sent per second.", metricTags); m = metrics.metricName("record-send-rate", metricGrpName, "The average number of records sent per second.");
this.recordsPerRequestSensor.add(m, new Rate()); this.recordsPerRequestSensor.add(m, new Rate());
m = new MetricName("records-per-request-avg", metricGrpName, "The average number of records per request.", metricTags); m = metrics.metricName("records-per-request-avg", metricGrpName, "The average number of records per request.");
this.recordsPerRequestSensor.add(m, new Avg()); this.recordsPerRequestSensor.add(m, new Avg());
this.retrySensor = metrics.sensor("record-retries"); this.retrySensor = metrics.sensor("record-retries");
m = new MetricName("record-retry-rate", metricGrpName, "The average per-second number of retried record sends", metricTags); m = metrics.metricName("record-retry-rate", metricGrpName, "The average per-second number of retried record sends");
this.retrySensor.add(m, new Rate()); this.retrySensor.add(m, new Rate());
this.errorSensor = metrics.sensor("errors"); this.errorSensor = metrics.sensor("errors");
m = new MetricName("record-error-rate", metricGrpName, "The average per-second number of record sends that resulted in errors", metricTags); m = metrics.metricName("record-error-rate", metricGrpName, "The average per-second number of record sends that resulted in errors");
this.errorSensor.add(m, new Rate()); this.errorSensor.add(m, new Rate());
this.maxRecordSizeSensor = metrics.sensor("record-size-max"); this.maxRecordSizeSensor = metrics.sensor("record-size-max");
m = new MetricName("record-size-max", metricGrpName, "The maximum record size", metricTags); m = metrics.metricName("record-size-max", metricGrpName, "The maximum record size");
this.maxRecordSizeSensor.add(m, new Max()); this.maxRecordSizeSensor.add(m, new Max());
m = new MetricName("record-size-avg", metricGrpName, "The average record size", metricTags); m = metrics.metricName("record-size-avg", metricGrpName, "The average record size");
this.maxRecordSizeSensor.add(m, new Avg()); this.maxRecordSizeSensor.add(m, new Avg());
m = new MetricName("requests-in-flight", metricGrpName, "The current number of in-flight requests awaiting a response.", metricTags); m = metrics.metricName("requests-in-flight", metricGrpName, "The current number of in-flight requests awaiting a response.");
this.metrics.addMetric(m, new Measurable() { this.metrics.addMetric(m, new Measurable() {
public double measure(MetricConfig config, long now) { public double measure(MetricConfig config, long now) {
return client.inFlightRequestCount(); return client.inFlightRequestCount();
} }
}); });
m = new MetricName("metadata-age", metricGrpName, "The age in seconds of the current producer metadata being used.", metricTags); m = metrics.metricName("metadata-age", metricGrpName, "The age in seconds of the current producer metadata being used.");
metrics.addMetric(m, new Measurable() { metrics.addMetric(m, new Measurable() {
public double measure(MetricConfig config, long now) { public double measure(MetricConfig config, long now) {
return (now - metadata.lastSuccessfulUpdate()) / 1000.0; return (now - metadata.lastSuccessfulUpdate()) / 1000.0;
@ -440,32 +438,31 @@ public class Sender implements Runnable {
Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName); Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName);
if (topicRecordCount == null) { if (topicRecordCount == null) {
Map<String, String> metricTags = new LinkedHashMap<String, String>(); Map<String, String> metricTags = new LinkedHashMap<String, String>();
metricTags.put("client-id", clientId);
metricTags.put("topic", topic); metricTags.put("topic", topic);
String metricGrpName = "producer-topic-metrics"; String metricGrpName = "producer-topic-metrics";
topicRecordCount = this.metrics.sensor(topicRecordsCountName); topicRecordCount = this.metrics.sensor(topicRecordsCountName);
MetricName m = new MetricName("record-send-rate", metricGrpName , metricTags); MetricName m = this.metrics.metricName("record-send-rate", metricGrpName, metricTags);
topicRecordCount.add(m, new Rate()); topicRecordCount.add(m, new Rate());
String topicByteRateName = "topic." + topic + ".bytes"; String topicByteRateName = "topic." + topic + ".bytes";
Sensor topicByteRate = this.metrics.sensor(topicByteRateName); Sensor topicByteRate = this.metrics.sensor(topicByteRateName);
m = new MetricName("byte-rate", metricGrpName , metricTags); m = this.metrics.metricName("byte-rate", metricGrpName, metricTags);
topicByteRate.add(m, new Rate()); topicByteRate.add(m, new Rate());
String topicCompressionRateName = "topic." + topic + ".compression-rate"; String topicCompressionRateName = "topic." + topic + ".compression-rate";
Sensor topicCompressionRate = this.metrics.sensor(topicCompressionRateName); Sensor topicCompressionRate = this.metrics.sensor(topicCompressionRateName);
m = new MetricName("compression-rate", metricGrpName , metricTags); m = this.metrics.metricName("compression-rate", metricGrpName, metricTags);
topicCompressionRate.add(m, new Avg()); topicCompressionRate.add(m, new Avg());
String topicRetryName = "topic." + topic + ".record-retries"; String topicRetryName = "topic." + topic + ".record-retries";
Sensor topicRetrySensor = this.metrics.sensor(topicRetryName); Sensor topicRetrySensor = this.metrics.sensor(topicRetryName);
m = new MetricName("record-retry-rate", metricGrpName , metricTags); m = this.metrics.metricName("record-retry-rate", metricGrpName, metricTags);
topicRetrySensor.add(m, new Rate()); topicRetrySensor.add(m, new Rate());
String topicErrorName = "topic." + topic + ".record-errors"; String topicErrorName = "topic." + topic + ".record-errors";
Sensor topicErrorSensor = this.metrics.sensor(topicErrorName); Sensor topicErrorSensor = this.metrics.sensor(topicErrorName);
m = new MetricName("record-error-rate", metricGrpName , metricTags); m = this.metrics.metricName("record-error-rate", metricGrpName, metricTags);
topicErrorSensor.add(m, new Rate()); topicErrorSensor.add(m, new Rate());
} }
} }

View File

@ -18,7 +18,7 @@ import java.util.Map;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
/** /**
* The <code>MetricName</code> class encapsulates a metric's name, logical group and its related attributes * The <code>MetricName</code> class encapsulates a metric's name, logical group and its related attributes. It should be constructed using metrics.MetricName(...).
* <p> * <p>
* This class captures the following parameters * This class captures the following parameters
* <pre> * <pre>
@ -31,23 +31,27 @@ import org.apache.kafka.common.utils.Utils;
* <p> * <p>
* Ex: standard JMX MBean can be constructed like <b>domainName:type=group,key1=val1,key2=val2</b> * Ex: standard JMX MBean can be constructed like <b>domainName:type=group,key1=val1,key2=val2</b>
* <p> * <p>
*
* Usage looks something like this: * Usage looks something like this:
* <pre>{@code * <pre>{@code
* // set up metrics: * // set up metrics:
* Metrics metrics = new Metrics(); // this is the global repository of metrics and sensors
* Sensor sensor = metrics.sensor("message-sizes");
* *
* Map<String, String> metricTags = new LinkedHashMap<String, String>(); * Map<String, String> metricTags = new LinkedHashMap<String, String>();
* metricTags.put("client-id", "producer-1"); * metricTags.put("client-id", "producer-1");
* metricTags.put("topic", "topic"); * metricTags.put("topic", "topic");
* *
* MetricName metricName = new MetricName("message-size-avg", "producer-metrics", "average message size", metricTags); * MetricConfig metricConfig = new MetricConfig().tags(metricTags);
* Metrics metrics = new Metrics(metricConfig); // this is the global repository of metrics and sensors
*
* Sensor sensor = metrics.sensor("message-sizes");
*
* MetricName metricName = metrics.metricName("message-size-avg", "producer-metrics", "average message size");
* sensor.add(metricName, new Avg()); * sensor.add(metricName, new Avg());
* *
* metricName = new MetricName("message-size-max", "producer-metrics", metricTags); * metricName = metrics.metricName("message-size-max", "producer-metrics");
* sensor.add(metricName, new Max()); * sensor.add(metricName, new Max());
* *
* metricName = new MetricName("message-size-min", "producer-metrics", "message minimum size", "client-id", "my-client", "topic", "my-topic"); * metricName = metrics.metricName("message-size-min", "producer-metrics", "message minimum size", "client-id", "my-client", "topic", "my-topic");
* sensor.add(metricName, new Min()); * sensor.add(metricName, new Min());
* *
* // as messages are sent we record the sizes * // as messages are sent we record the sizes
@ -63,6 +67,8 @@ public final class MetricName {
private int hash = 0; private int hash = 0;
/** /**
* Please create MetricName by method {@link org.apache.kafka.common.metrics.Metrics#metricName(String, String, String, Map<String, String>)}
*
* @param name The name of the metric * @param name The name of the metric
* @param group logical group name of the metrics to which this metric belongs * @param group logical group name of the metrics to which this metric belongs
* @param description A human-readable description to include in the metric * @param description A human-readable description to include in the metric
@ -76,11 +82,15 @@ public final class MetricName {
} }
/** /**
* @deprecated This method will be removed in a future release.
* Please create MetricName by method {@link org.apache.kafka.common.metrics.Metrics#metricName(String, String, String, String...)}
*
* @param name The name of the metric * @param name The name of the metric
* @param group logical group name of the metrics to which this metric belongs * @param group logical group name of the metrics to which this metric belongs
* @param description A human-readable description to include in the metric * @param description A human-readable description to include in the metric
* @param keyValue additional key/value attributes of the metric (must come in pairs) * @param keyValue additional key/value attributes of the metric (must come in pairs)
*/ */
@Deprecated
public MetricName(String name, String group, String description, String... keyValue) { public MetricName(String name, String group, String description, String... keyValue) {
this(name, group, description, getTags(keyValue)); this(name, group, description, getTags(keyValue));
} }
@ -97,27 +107,39 @@ public final class MetricName {
} }
/** /**
* @deprecated This method will be removed in a future release.
* Please create MetricName by method {@link org.apache.kafka.common.metrics.Metrics#metricName(String, String, Map<String, String>)}
*
* @param name The name of the metric * @param name The name of the metric
* @param group logical group name of the metrics to which this metric belongs * @param group logical group name of the metrics to which this metric belongs
* @param tags key/value attributes of the metric * @param tags key/value attributes of the metric
*/ */
@Deprecated
public MetricName(String name, String group, Map<String, String> tags) { public MetricName(String name, String group, Map<String, String> tags) {
this(name, group, "", tags); this(name, group, "", tags);
} }
/** /**
* @deprecated This method will be removed in a future release.
* Please create MetricName by method {@link org.apache.kafka.common.metrics.Metrics#metricName(String, String, String)}
*
* @param name The name of the metric * @param name The name of the metric
* @param group logical group name of the metrics to which this metric belongs * @param group logical group name of the metrics to which this metric belongs
* @param description A human-readable description to include in the metric * @param description A human-readable description to include in the metric
*/ */
@Deprecated
public MetricName(String name, String group, String description) { public MetricName(String name, String group, String description) {
this(name, group, description, new HashMap<String, String>()); this(name, group, description, new HashMap<String, String>());
} }
/** /**
* @deprecated This method will be removed in a future release.
* Please create MetricName by method {@link org.apache.kafka.common.metrics.Metrics#metricName(String, String)}
*
* @param name The name of the metric * @param name The name of the metric
* @param group logical group name of the metrics to which this metric belongs * @param group logical group name of the metrics to which this metric belongs
*/ */
@Deprecated
public MetricName(String name, String group) { public MetricName(String name, String group) {
this(name, group, "", new HashMap<String, String>()); this(name, group, "", new HashMap<String, String>());
} }

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.kafka.common.metrics; package org.apache.kafka.common.metrics;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -28,6 +30,7 @@ public class MetricConfig {
private long eventWindow; private long eventWindow;
private long timeWindowMs; private long timeWindowMs;
private TimeUnit unit; private TimeUnit unit;
private Map<String, String> tags;
public MetricConfig() { public MetricConfig() {
super(); super();
@ -36,6 +39,7 @@ public class MetricConfig {
this.eventWindow = Long.MAX_VALUE; this.eventWindow = Long.MAX_VALUE;
this.timeWindowMs = TimeUnit.MILLISECONDS.convert(30, TimeUnit.SECONDS); this.timeWindowMs = TimeUnit.MILLISECONDS.convert(30, TimeUnit.SECONDS);
this.unit = TimeUnit.SECONDS; this.unit = TimeUnit.SECONDS;
this.tags = new LinkedHashMap<>();
} }
public Quota quota() { public Quota quota() {
@ -65,6 +69,15 @@ public class MetricConfig {
return this; return this;
} }
public Map<String, String> tags() {
return this.tags;
}
public MetricConfig tags(Map<String, String> tags) {
this.tags = tags;
return this;
}
public int samples() { public int samples() {
return this.samples; return this.samples;
} }

View File

@ -15,6 +15,8 @@ package org.apache.kafka.common.metrics;
import java.io.Closeable; import java.io.Closeable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -81,6 +83,15 @@ public class Metrics implements Closeable {
} }
/** /**
* Create a metrics repository with no metric reporters and the given default configuration.
* Expiration of Sensors is disabled.
*/
public Metrics(MetricConfig defaultConfig, Time time) {
this(defaultConfig, new ArrayList<MetricsReporter>(0), time);
}
/**
* Create a metrics repository with no reporters and the given default config. This config will be used for any * Create a metrics repository with no reporters and the given default config. This config will be used for any
* metric that doesn't override its own config. Expiration of Sensors is disabled. * metric that doesn't override its own config. Expiration of Sensors is disabled.
* @param defaultConfig The default config to use for all metrics that don't override their config * @param defaultConfig The default config to use for all metrics that don't override their config
@ -130,6 +141,90 @@ public class Metrics implements Closeable {
} else { } else {
this.metricsScheduler = null; this.metricsScheduler = null;
} }
addMetric(metricName("count", "kafka-metrics-count", "total number of registered metrics"),
new Measurable() {
@Override
public double measure(MetricConfig config, long now) {
return metrics.size();
}
});
}
/**
* Create a MetricName with the given name, group, description and tags, plus default tags specified in the metric
* configuration. Tag in tags takes precedence if the same tag key is specified in the default metric configuration.
*
* @param name The name of the metric
* @param group logical group name of the metrics to which this metric belongs
* @param description A human-readable description to include in the metric
* @param tags additional key/value attributes of the metric
*/
public MetricName metricName(String name, String group, String description, Map<String, String> tags) {
Map<String, String> combinedTag = new LinkedHashMap<>(config.tags());
combinedTag.putAll(tags);
return new MetricName(name, group, description, combinedTag);
}
/**
* Create a MetricName with the given name, group, description, and default tags
* specified in the metric configuration.
*
* @param name The name of the metric
* @param group logical group name of the metrics to which this metric belongs
* @param description A human-readable description to include in the metric
*/
public MetricName metricName(String name, String group, String description) {
return metricName(name, group, description, new HashMap<String, String>());
}
/**
* Create a MetricName with the given name, group and default tags specified in the metric configuration.
*
* @param name The name of the metric
* @param group logical group name of the metrics to which this metric belongs
*/
public MetricName metricName(String name, String group) {
return metricName(name, group, "", new HashMap<String, String>());
}
/**
* Create a MetricName with the given name, group, description, and keyValue as tags, plus default tags specified in the metric
* configuration. Tag in keyValue takes precedence if the same tag key is specified in the default metric configuration.
*
* @param name The name of the metric
* @param group logical group name of the metrics to which this metric belongs
* @param description A human-readable description to include in the metric
* @param keyValue additional key/value attributes of the metric (must come in pairs)
*/
public MetricName metricName(String name, String group, String description, String... keyValue) {
return metricName(name, group, description, getTags(keyValue));
}
/**
* Create a MetricName with the given name, group and tags, plus default tags specified in the metric
* configuration. Tag in tags takes precedence if the same tag key is specified in the default metric configuration.
*
* @param name The name of the metric
* @param group logical group name of the metrics to which this metric belongs
* @param tags key/value attributes of the metric
*/
public MetricName metricName(String name, String group, Map<String, String> tags) {
return metricName(name, group, "", tags);
}
private static Map<String, String> getTags(String... keyValue) {
if ((keyValue.length % 2) != 0)
throw new IllegalArgumentException("keyValue needs to be specified in pairs");
Map<String, String> tags = new HashMap<String, String>();
for (int i = 0; i < keyValue.length; i += 2)
tags.put(keyValue[i], keyValue[i + 1]);
return tags;
}
public MetricConfig config() {
return config;
} }
/** /**

View File

@ -118,8 +118,8 @@ public class Selector implements Selectable {
this.metricsPerConnection = metricsPerConnection; this.metricsPerConnection = metricsPerConnection;
} }
public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, ChannelBuilder channelBuilder) { public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder) {
this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, metricTags, true, channelBuilder); this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, new HashMap<String, String>(), true, channelBuilder);
} }
/** /**
@ -568,48 +568,48 @@ public class Selector implements Selectable {
} }
this.connectionClosed = sensor("connections-closed:" + tagsSuffix.toString()); this.connectionClosed = sensor("connections-closed:" + tagsSuffix.toString());
MetricName metricName = new MetricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", metricTags); MetricName metricName = metrics.metricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", metricTags);
this.connectionClosed.add(metricName, new Rate()); this.connectionClosed.add(metricName, new Rate());
this.connectionCreated = sensor("connections-created:" + tagsSuffix.toString()); this.connectionCreated = sensor("connections-created:" + tagsSuffix.toString());
metricName = new MetricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", metricTags); metricName = metrics.metricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", metricTags);
this.connectionCreated.add(metricName, new Rate()); this.connectionCreated.add(metricName, new Rate());
this.bytesTransferred = sensor("bytes-sent-received:" + tagsSuffix.toString()); this.bytesTransferred = sensor("bytes-sent-received:" + tagsSuffix.toString());
metricName = new MetricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", metricTags); metricName = metrics.metricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", metricTags);
bytesTransferred.add(metricName, new Rate(new Count())); bytesTransferred.add(metricName, new Rate(new Count()));
this.bytesSent = sensor("bytes-sent:" + tagsSuffix.toString(), bytesTransferred); this.bytesSent = sensor("bytes-sent:" + tagsSuffix.toString(), bytesTransferred);
metricName = new MetricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", metricTags); metricName = metrics.metricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", metricTags);
this.bytesSent.add(metricName, new Rate()); this.bytesSent.add(metricName, new Rate());
metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", metricTags); metricName = metrics.metricName("request-rate", metricGrpName, "The average number of requests sent per second.", metricTags);
this.bytesSent.add(metricName, new Rate(new Count())); this.bytesSent.add(metricName, new Rate(new Count()));
metricName = new MetricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", metricTags); metricName = metrics.metricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", metricTags);
this.bytesSent.add(metricName, new Avg()); this.bytesSent.add(metricName, new Avg());
metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", metricTags); metricName = metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", metricTags);
this.bytesSent.add(metricName, new Max()); this.bytesSent.add(metricName, new Max());
this.bytesReceived = sensor("bytes-received:" + tagsSuffix.toString(), bytesTransferred); this.bytesReceived = sensor("bytes-received:" + tagsSuffix.toString(), bytesTransferred);
metricName = new MetricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", metricTags); metricName = metrics.metricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", metricTags);
this.bytesReceived.add(metricName, new Rate()); this.bytesReceived.add(metricName, new Rate());
metricName = new MetricName("response-rate", metricGrpName, "Responses received sent per second.", metricTags); metricName = metrics.metricName("response-rate", metricGrpName, "Responses received sent per second.", metricTags);
this.bytesReceived.add(metricName, new Rate(new Count())); this.bytesReceived.add(metricName, new Rate(new Count()));
this.selectTime = sensor("select-time:" + tagsSuffix.toString()); this.selectTime = sensor("select-time:" + tagsSuffix.toString());
metricName = new MetricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", metricTags); metricName = metrics.metricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", metricTags);
this.selectTime.add(metricName, new Rate(new Count())); this.selectTime.add(metricName, new Rate(new Count()));
metricName = new MetricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags); metricName = metrics.metricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags);
this.selectTime.add(metricName, new Avg()); this.selectTime.add(metricName, new Avg());
metricName = new MetricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", metricTags); metricName = metrics.metricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", metricTags);
this.selectTime.add(metricName, new Rate(TimeUnit.NANOSECONDS)); this.selectTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
this.ioTime = sensor("io-time:" + tagsSuffix.toString()); this.ioTime = sensor("io-time:" + tagsSuffix.toString());
metricName = new MetricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags); metricName = metrics.metricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags);
this.ioTime.add(metricName, new Avg()); this.ioTime.add(metricName, new Avg());
metricName = new MetricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", metricTags); metricName = metrics.metricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", metricTags);
this.ioTime.add(metricName, new Rate(TimeUnit.NANOSECONDS)); this.ioTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
metricName = new MetricName("connection-count", metricGrpName, "The current number of active connections.", metricTags); metricName = metrics.metricName("connection-count", metricGrpName, "The current number of active connections.", metricTags);
topLevelMetricNames.add(metricName); topLevelMetricNames.add(metricName);
this.metrics.addMetric(metricName, new Measurable() { this.metrics.addMetric(metricName, new Measurable() {
public double measure(MetricConfig config, long now) { public double measure(MetricConfig config, long now) {
@ -637,27 +637,27 @@ public class Selector implements Selectable {
tags.put("node-id", "node-" + connectionId); tags.put("node-id", "node-" + connectionId);
nodeRequest = sensor(nodeRequestName); nodeRequest = sensor(nodeRequestName);
MetricName metricName = new MetricName("outgoing-byte-rate", metricGrpName, tags); MetricName metricName = metrics.metricName("outgoing-byte-rate", metricGrpName, tags);
nodeRequest.add(metricName, new Rate()); nodeRequest.add(metricName, new Rate());
metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", tags); metricName = metrics.metricName("request-rate", metricGrpName, "The average number of requests sent per second.", tags);
nodeRequest.add(metricName, new Rate(new Count())); nodeRequest.add(metricName, new Rate(new Count()));
metricName = new MetricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", tags); metricName = metrics.metricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", tags);
nodeRequest.add(metricName, new Avg()); nodeRequest.add(metricName, new Avg());
metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", tags); metricName = metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", tags);
nodeRequest.add(metricName, new Max()); nodeRequest.add(metricName, new Max());
String nodeResponseName = "node-" + connectionId + ".bytes-received"; String nodeResponseName = "node-" + connectionId + ".bytes-received";
Sensor nodeResponse = sensor(nodeResponseName); Sensor nodeResponse = sensor(nodeResponseName);
metricName = new MetricName("incoming-byte-rate", metricGrpName, tags); metricName = metrics.metricName("incoming-byte-rate", metricGrpName, tags);
nodeResponse.add(metricName, new Rate()); nodeResponse.add(metricName, new Rate());
metricName = new MetricName("response-rate", metricGrpName, "The average number of responses received per second.", tags); metricName = metrics.metricName("response-rate", metricGrpName, "The average number of responses received per second.", tags);
nodeResponse.add(metricName, new Rate(new Count())); nodeResponse.add(metricName, new Rate(new Count()));
String nodeTimeName = "node-" + connectionId + ".latency"; String nodeTimeName = "node-" + connectionId + ".latency";
Sensor nodeRequestTime = sensor(nodeTimeName); Sensor nodeRequestTime = sensor(nodeTimeName);
metricName = new MetricName("request-latency-avg", metricGrpName, tags); metricName = metrics.metricName("request-latency-avg", metricGrpName, tags);
nodeRequestTime.add(metricName, new Avg()); nodeRequestTime.add(metricName, new Avg());
metricName = new MetricName("request-latency-max", metricGrpName, tags); metricName = metrics.metricName("request-latency-max", metricGrpName, tags);
nodeRequestTime.add(metricName, new Max()); nodeRequestTime.add(metricName, new Max());
} }
} }

View File

@ -60,7 +60,6 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -89,7 +88,6 @@ public class ConsumerCoordinatorTest {
private SubscriptionState subscriptions; private SubscriptionState subscriptions;
private Metadata metadata; private Metadata metadata;
private Metrics metrics; private Metrics metrics;
private Map<String, String> metricTags = new LinkedHashMap<>();
private ConsumerNetworkClient consumerClient; private ConsumerNetworkClient consumerClient;
private MockRebalanceListener rebalanceListener; private MockRebalanceListener rebalanceListener;
private MockCommitCallback defaultOffsetCommitCallback; private MockCommitCallback defaultOffsetCommitCallback;
@ -109,7 +107,6 @@ public class ConsumerCoordinatorTest {
this.partitionAssignor.clear(); this.partitionAssignor.clear();
client.setNode(node); client.setNode(node);
this.coordinator = buildCoordinator(metrics, assignors); this.coordinator = buildCoordinator(metrics, assignors);
} }
@ -912,7 +909,6 @@ public class ConsumerCoordinatorTest {
subscriptions, subscriptions,
metrics, metrics,
"consumer" + groupId, "consumer" + groupId,
metricTags,
time, time,
retryBackoffMs, retryBackoffMs,
defaultOffsetCommitCallback, defaultOffsetCommitCallback,

View File

@ -54,7 +54,6 @@ import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
@ -83,7 +82,6 @@ public class FetcherTest {
private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST); private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE); private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE);
private Metrics metrics = new Metrics(time); private Metrics metrics = new Metrics(time);
private Map<String, String> metricTags = new LinkedHashMap<String, String>();
private static final double EPSILON = 0.0001; private static final double EPSILON = 0.0001;
private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100); private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
@ -484,8 +482,8 @@ public class FetcherTest {
} }
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics(); Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric avgMetric = allMetrics.get(new MetricName("fetch-throttle-time-avg", metricGroup, "", metricTags)); KafkaMetric avgMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-avg", metricGroup, ""));
KafkaMetric maxMetric = allMetrics.get(new MetricName("fetch-throttle-time-max", metricGroup, "", metricTags)); KafkaMetric maxMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-max", metricGroup, ""));
assertEquals(200, avgMetric.value(), EPSILON); assertEquals(200, avgMetric.value(), EPSILON);
assertEquals(300, maxMetric.value(), EPSILON); assertEquals(300, maxMetric.value(), EPSILON);
} }
@ -527,7 +525,6 @@ public class FetcherTest {
subscriptions, subscriptions,
metrics, metrics,
"consumer" + groupId, "consumer" + groupId,
metricTags,
time, time,
retryBackoffMs); retryBackoffMs);
} }

View File

@ -25,9 +25,7 @@ import org.junit.Test;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -38,7 +36,6 @@ public class BufferPoolTest {
private Metrics metrics = new Metrics(time); private Metrics metrics = new Metrics(time);
private final long maxBlockTimeMs = 2000; private final long maxBlockTimeMs = 2000;
String metricGroup = "TestMetrics"; String metricGroup = "TestMetrics";
Map<String, String> metricTags = new LinkedHashMap<String, String>();
@After @After
public void teardown() { public void teardown() {
@ -52,7 +49,7 @@ public class BufferPoolTest {
public void testSimple() throws Exception { public void testSimple() throws Exception {
long totalMemory = 64 * 1024; long totalMemory = 64 * 1024;
int size = 1024; int size = 1024;
BufferPool pool = new BufferPool(totalMemory, size, metrics, time, metricGroup, metricTags); BufferPool pool = new BufferPool(totalMemory, size, metrics, time, metricGroup);
ByteBuffer buffer = pool.allocate(size, maxBlockTimeMs); ByteBuffer buffer = pool.allocate(size, maxBlockTimeMs);
assertEquals("Buffer size should equal requested size.", size, buffer.limit()); assertEquals("Buffer size should equal requested size.", size, buffer.limit());
assertEquals("Unallocated memory should have shrunk", totalMemory - size, pool.unallocatedMemory()); assertEquals("Unallocated memory should have shrunk", totalMemory - size, pool.unallocatedMemory());
@ -79,7 +76,7 @@ public class BufferPoolTest {
*/ */
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void testCantAllocateMoreMemoryThanWeHave() throws Exception { public void testCantAllocateMoreMemoryThanWeHave() throws Exception {
BufferPool pool = new BufferPool(1024, 512, metrics, time, metricGroup, metricTags); BufferPool pool = new BufferPool(1024, 512, metrics, time, metricGroup);
ByteBuffer buffer = pool.allocate(1024, maxBlockTimeMs); ByteBuffer buffer = pool.allocate(1024, maxBlockTimeMs);
assertEquals(1024, buffer.limit()); assertEquals(1024, buffer.limit());
pool.deallocate(buffer); pool.deallocate(buffer);
@ -91,7 +88,7 @@ public class BufferPoolTest {
*/ */
@Test @Test
public void testDelayedAllocation() throws Exception { public void testDelayedAllocation() throws Exception {
BufferPool pool = new BufferPool(5 * 1024, 1024, metrics, time, metricGroup, metricTags); BufferPool pool = new BufferPool(5 * 1024, 1024, metrics, time, metricGroup);
ByteBuffer buffer = pool.allocate(1024, maxBlockTimeMs); ByteBuffer buffer = pool.allocate(1024, maxBlockTimeMs);
CountDownLatch doDealloc = asyncDeallocate(pool, buffer); CountDownLatch doDealloc = asyncDeallocate(pool, buffer);
CountDownLatch allocation = asyncAllocate(pool, 5 * 1024); CountDownLatch allocation = asyncAllocate(pool, 5 * 1024);
@ -140,7 +137,7 @@ public class BufferPoolTest {
*/ */
@Test @Test
public void testBlockTimeout() throws Exception { public void testBlockTimeout() throws Exception {
BufferPool pool = new BufferPool(2, 1, metrics, time, metricGroup, metricTags); BufferPool pool = new BufferPool(2, 1, metrics, time, metricGroup);
pool.allocate(1, maxBlockTimeMs); pool.allocate(1, maxBlockTimeMs);
try { try {
pool.allocate(2, maxBlockTimeMs); pool.allocate(2, maxBlockTimeMs);
@ -159,7 +156,7 @@ public class BufferPoolTest {
final int iterations = 50000; final int iterations = 50000;
final int poolableSize = 1024; final int poolableSize = 1024;
final long totalMemory = numThreads / 2 * poolableSize; final long totalMemory = numThreads / 2 * poolableSize;
final BufferPool pool = new BufferPool(totalMemory, poolableSize, metrics, time, metricGroup, metricTags); final BufferPool pool = new BufferPool(totalMemory, poolableSize, metrics, time, metricGroup);
List<StressTestThread> threads = new ArrayList<StressTestThread>(); List<StressTestThread> threads = new ArrayList<StressTestThread>();
for (int i = 0; i < numThreads; i++) for (int i = 0; i < numThreads; i++)
threads.add(new StressTestThread(pool, iterations)); threads.add(new StressTestThread(pool, iterations));

View File

@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -65,7 +64,6 @@ public class RecordAccumulatorTest {
private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value); private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3), Collections.<String>emptySet()); private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3), Collections.<String>emptySet());
private Metrics metrics = new Metrics(time); private Metrics metrics = new Metrics(time);
Map<String, String> metricTags = new LinkedHashMap<String, String>();
private final long maxBlockTimeMs = 1000; private final long maxBlockTimeMs = 1000;
@After @After
@ -76,7 +74,7 @@ public class RecordAccumulatorTest {
@Test @Test
public void testFull() throws Exception { public void testFull() throws Exception {
long now = time.milliseconds(); long now = time.milliseconds();
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time, metricTags); RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time);
int appends = 1024 / msgSize; int appends = 1024 / msgSize;
for (int i = 0; i < appends; i++) { for (int i = 0; i < appends; i++) {
accum.append(tp1, key, value, null, maxBlockTimeMs); accum.append(tp1, key, value, null, maxBlockTimeMs);
@ -100,7 +98,7 @@ public class RecordAccumulatorTest {
@Test @Test
public void testAppendLarge() throws Exception { public void testAppendLarge() throws Exception {
int batchSize = 512; int batchSize = 512;
RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time, metricTags); RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time);
accum.append(tp1, key, new byte[2 * batchSize], null, maxBlockTimeMs); accum.append(tp1, key, new byte[2 * batchSize], null, maxBlockTimeMs);
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
} }
@ -108,7 +106,7 @@ public class RecordAccumulatorTest {
@Test @Test
public void testLinger() throws Exception { public void testLinger() throws Exception {
long lingerMs = 10L; long lingerMs = 10L;
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time, metricTags); RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time);
accum.append(tp1, key, value, null, maxBlockTimeMs); accum.append(tp1, key, value, null, maxBlockTimeMs);
assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
time.sleep(10); time.sleep(10);
@ -126,7 +124,7 @@ public class RecordAccumulatorTest {
@Test @Test
public void testPartialDrain() throws Exception { public void testPartialDrain() throws Exception {
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time, metricTags); RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time);
int appends = 1024 / msgSize + 1; int appends = 1024 / msgSize + 1;
List<TopicPartition> partitions = asList(tp1, tp2); List<TopicPartition> partitions = asList(tp1, tp2);
for (TopicPartition tp : partitions) { for (TopicPartition tp : partitions) {
@ -145,7 +143,7 @@ public class RecordAccumulatorTest {
final int numThreads = 5; final int numThreads = 5;
final int msgs = 10000; final int msgs = 10000;
final int numParts = 2; final int numParts = 2;
final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time, metricTags); final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time);
List<Thread> threads = new ArrayList<Thread>(); List<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i < numThreads; i++) { for (int i = 0; i < numThreads; i++) {
threads.add(new Thread() { threads.add(new Thread() {
@ -185,7 +183,7 @@ public class RecordAccumulatorTest {
public void testNextReadyCheckDelay() throws Exception { public void testNextReadyCheckDelay() throws Exception {
// Next check time will use lingerMs since this test won't trigger any retries/backoff // Next check time will use lingerMs since this test won't trigger any retries/backoff
long lingerMs = 10L; long lingerMs = 10L;
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time, metricTags); RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time);
// Just short of going over the limit so we trigger linger time // Just short of going over the limit so we trigger linger time
int appends = 1024 / msgSize; int appends = 1024 / msgSize;
@ -219,7 +217,7 @@ public class RecordAccumulatorTest {
public void testRetryBackoff() throws Exception { public void testRetryBackoff() throws Exception {
long lingerMs = Long.MAX_VALUE / 4; long lingerMs = Long.MAX_VALUE / 4;
long retryBackoffMs = Long.MAX_VALUE / 2; long retryBackoffMs = Long.MAX_VALUE / 2;
final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, metricTags); final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time);
long now = time.milliseconds(); long now = time.milliseconds();
accum.append(tp1, key, value, null, maxBlockTimeMs); accum.append(tp1, key, value, null, maxBlockTimeMs);
@ -256,7 +254,7 @@ public class RecordAccumulatorTest {
@Test @Test
public void testFlush() throws Exception { public void testFlush() throws Exception {
long lingerMs = Long.MAX_VALUE; long lingerMs = Long.MAX_VALUE;
final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time, metricTags); final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time);
for (int i = 0; i < 100; i++) for (int i = 0; i < 100; i++)
accum.append(new TopicPartition(topic, i % 3), key, value, null, maxBlockTimeMs); accum.append(new TopicPartition(topic, i % 3), key, value, null, maxBlockTimeMs);
RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
@ -280,7 +278,7 @@ public class RecordAccumulatorTest {
public void testAbortIncompleteBatches() throws Exception { public void testAbortIncompleteBatches() throws Exception {
long lingerMs = Long.MAX_VALUE; long lingerMs = Long.MAX_VALUE;
final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0); final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0);
final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time, metricTags); final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time);
class TestCallback implements Callback { class TestCallback implements Callback {
@Override @Override
public void onCompletion(RecordMetadata metadata, Exception exception) { public void onCompletion(RecordMetadata metadata, Exception exception) {
@ -303,7 +301,7 @@ public class RecordAccumulatorTest {
public void testExpiredBatches() throws InterruptedException { public void testExpiredBatches() throws InterruptedException {
Time time = new SystemTime(); Time time = new SystemTime();
long now = time.milliseconds(); long now = time.milliseconds();
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10, 100L, metrics, time, metricTags); RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10, 100L, metrics, time);
int appends = 1024 / msgSize; int appends = 1024 / msgSize;
for (int i = 0; i < appends; i++) { for (int i = 0; i < appends; i++) {
accum.append(tp1, key, value, null, maxBlockTimeMs); accum.append(tp1, key, value, null, maxBlockTimeMs);

View File

@ -29,6 +29,7 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Struct;
@ -57,24 +58,29 @@ public class SenderTest {
private int batchSize = 16 * 1024; private int batchSize = 16 * 1024;
private Metadata metadata = new Metadata(0, Long.MAX_VALUE); private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
private Cluster cluster = TestUtils.singletonCluster("test", 1); private Cluster cluster = TestUtils.singletonCluster("test", 1);
private Metrics metrics = new Metrics(time); private Metrics metrics = null;
Map<String, String> metricTags = new LinkedHashMap<String, String>(); private RecordAccumulator accumulator = null;
private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, metricTags); private Sender sender = null;
private Sender sender = new Sender(client,
metadata,
this.accumulator,
MAX_REQUEST_SIZE,
ACKS_ALL,
MAX_RETRIES,
metrics,
time,
CLIENT_ID,
REQUEST_TIMEOUT);
@Before @Before
public void setup() { public void setup() {
metadata.update(cluster, time.milliseconds()); Map<String, String> metricTags = new LinkedHashMap<String, String>();
metricTags.put("client-id", CLIENT_ID); metricTags.put("client-id", CLIENT_ID);
MetricConfig metricConfig = new MetricConfig().tags(metricTags);
metrics = new Metrics(metricConfig, time);
accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time);
sender = new Sender(client,
metadata,
this.accumulator,
MAX_REQUEST_SIZE,
ACKS_ALL,
MAX_RETRIES,
metrics,
time,
CLIENT_ID,
REQUEST_TIMEOUT);
metadata.update(cluster, time.milliseconds());
} }
@After @After
@ -110,8 +116,8 @@ public class SenderTest {
sender.run(time.milliseconds()); sender.run(time.milliseconds());
} }
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics(); Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric avgMetric = allMetrics.get(new MetricName("produce-throttle-time-avg", METRIC_GROUP, "", metricTags)); KafkaMetric avgMetric = allMetrics.get(metrics.metricName("produce-throttle-time-avg", METRIC_GROUP, ""));
KafkaMetric maxMetric = allMetrics.get(new MetricName("produce-throttle-time-max", METRIC_GROUP, "", metricTags)); KafkaMetric maxMetric = allMetrics.get(metrics.metricName("produce-throttle-time-max", METRIC_GROUP, ""));
assertEquals(200, avgMetric.value(), EPS); assertEquals(200, avgMetric.value(), EPS);
assertEquals(300, maxMetric.value(), EPS); assertEquals(300, maxMetric.value(), EPS);
} }

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.kafka.common.metrics; package org.apache.kafka.common.metrics;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Total; import org.apache.kafka.common.metrics.stats.Total;
import org.junit.Test; import org.junit.Test;
@ -29,11 +28,11 @@ public class JmxReporterTest {
try { try {
metrics.addReporter(new JmxReporter()); metrics.addReporter(new JmxReporter());
Sensor sensor = metrics.sensor("kafka.requests"); Sensor sensor = metrics.sensor("kafka.requests");
sensor.add(new MetricName("pack.bean1.avg", "grp1"), new Avg()); sensor.add(metrics.metricName("pack.bean1.avg", "grp1"), new Avg());
sensor.add(new MetricName("pack.bean2.total", "grp2"), new Total()); sensor.add(metrics.metricName("pack.bean2.total", "grp2"), new Total());
Sensor sensor2 = metrics.sensor("kafka.blah"); Sensor sensor2 = metrics.sensor("kafka.blah");
sensor2.add(new MetricName("pack.bean1.some", "grp1"), new Total()); sensor2.add(metrics.metricName("pack.bean1.some", "grp1"), new Total());
sensor2.add(new MetricName("pack.bean2.some", "grp1"), new Total()); sensor2.add(metrics.metricName("pack.bean2.some", "grp1"), new Total());
} finally { } finally {
metrics.close(); metrics.close();
} }

View File

@ -59,16 +59,16 @@ public class MetricsTest {
@Test @Test
public void testMetricName() { public void testMetricName() {
MetricName n1 = new MetricName("name", "group", "description", "key1", "value1", "key2", "value2"); MetricName n1 = metrics.metricName("name", "group", "description", "key1", "value1", "key2", "value2");
Map<String, String> tags = new HashMap<String, String>(); Map<String, String> tags = new HashMap<String, String>();
tags.put("key1", "value1"); tags.put("key1", "value1");
tags.put("key2", "value2"); tags.put("key2", "value2");
MetricName n2 = new MetricName("name", "group", "description", tags); MetricName n2 = metrics.metricName("name", "group", "description", tags);
assertEquals("metric names created in two different ways should be equal", n1, n2); assertEquals("metric names created in two different ways should be equal", n1, n2);
try { try {
new MetricName("name", "group", "description", "key1"); metrics.metricName("name", "group", "description", "key1");
fail("Creating MetricName with an old number of keyValue should fail"); fail("Creating MetricName with an odd number of keyValue should fail");
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
// this is expected // this is expected
} }
@ -78,20 +78,20 @@ public class MetricsTest {
public void testSimpleStats() throws Exception { public void testSimpleStats() throws Exception {
ConstantMeasurable measurable = new ConstantMeasurable(); ConstantMeasurable measurable = new ConstantMeasurable();
metrics.addMetric(new MetricName("direct.measurable", "grp1", "The fraction of time an appender waits for space allocation."), measurable); metrics.addMetric(metrics.metricName("direct.measurable", "grp1", "The fraction of time an appender waits for space allocation."), measurable);
Sensor s = metrics.sensor("test.sensor"); Sensor s = metrics.sensor("test.sensor");
s.add(new MetricName("test.avg", "grp1"), new Avg()); s.add(metrics.metricName("test.avg", "grp1"), new Avg());
s.add(new MetricName("test.max", "grp1"), new Max()); s.add(metrics.metricName("test.max", "grp1"), new Max());
s.add(new MetricName("test.min", "grp1"), new Min()); s.add(metrics.metricName("test.min", "grp1"), new Min());
s.add(new MetricName("test.rate", "grp1"), new Rate(TimeUnit.SECONDS)); s.add(metrics.metricName("test.rate", "grp1"), new Rate(TimeUnit.SECONDS));
s.add(new MetricName("test.occurences", "grp1"), new Rate(TimeUnit.SECONDS, new Count())); s.add(metrics.metricName("test.occurences", "grp1"), new Rate(TimeUnit.SECONDS, new Count()));
s.add(new MetricName("test.count", "grp1"), new Count()); s.add(metrics.metricName("test.count", "grp1"), new Count());
s.add(new Percentiles(100, -100, 100, BucketSizing.CONSTANT, s.add(new Percentiles(100, -100, 100, BucketSizing.CONSTANT,
new Percentile(new MetricName("test.median", "grp1"), 50.0), new Percentile(metrics.metricName("test.median", "grp1"), 50.0),
new Percentile(new MetricName("test.perc99_9", "grp1"), 99.9))); new Percentile(metrics.metricName("test.perc99_9", "grp1"), 99.9)));
Sensor s2 = metrics.sensor("test.sensor2"); Sensor s2 = metrics.sensor("test.sensor2");
s2.add(new MetricName("s2.total", "grp1"), new Total()); s2.add(metrics.metricName("s2.total", "grp1"), new Total());
s2.record(5.0); s2.record(5.0);
int sum = 0; int sum = 0;
@ -103,38 +103,38 @@ public class MetricsTest {
// prior to any time passing // prior to any time passing
double elapsedSecs = (config.timeWindowMs() * (config.samples() - 1)) / 1000.0; double elapsedSecs = (config.timeWindowMs() * (config.samples() - 1)) / 1000.0;
assertEquals(String.format("Occurrences(0...%d) = %f", count, count / elapsedSecs), count / elapsedSecs, assertEquals(String.format("Occurrences(0...%d) = %f", count, count / elapsedSecs), count / elapsedSecs,
metrics.metrics().get(new MetricName("test.occurences", "grp1")).value(), EPS); metrics.metrics().get(metrics.metricName("test.occurences", "grp1")).value(), EPS);
// pretend 2 seconds passed... // pretend 2 seconds passed...
long sleepTimeMs = 2; long sleepTimeMs = 2;
time.sleep(sleepTimeMs * 1000); time.sleep(sleepTimeMs * 1000);
elapsedSecs += sleepTimeMs; elapsedSecs += sleepTimeMs;
assertEquals("s2 reflects the constant value", 5.0, metrics.metrics().get(new MetricName("s2.total", "grp1")).value(), EPS); assertEquals("s2 reflects the constant value", 5.0, metrics.metrics().get(metrics.metricName("s2.total", "grp1")).value(), EPS);
assertEquals("Avg(0...9) = 4.5", 4.5, metrics.metrics().get(new MetricName("test.avg", "grp1")).value(), EPS); assertEquals("Avg(0...9) = 4.5", 4.5, metrics.metrics().get(metrics.metricName("test.avg", "grp1")).value(), EPS);
assertEquals("Max(0...9) = 9", count - 1, metrics.metrics().get(new MetricName("test.max", "grp1")).value(), EPS); assertEquals("Max(0...9) = 9", count - 1, metrics.metrics().get(metrics.metricName("test.max", "grp1")).value(), EPS);
assertEquals("Min(0...9) = 0", 0.0, metrics.metrics().get(new MetricName("test.min", "grp1")).value(), EPS); assertEquals("Min(0...9) = 0", 0.0, metrics.metrics().get(metrics.metricName("test.min", "grp1")).value(), EPS);
assertEquals("Rate(0...9) = 1.40625", assertEquals("Rate(0...9) = 1.40625",
sum / elapsedSecs, metrics.metrics().get(new MetricName("test.rate", "grp1")).value(), EPS); sum / elapsedSecs, metrics.metrics().get(metrics.metricName("test.rate", "grp1")).value(), EPS);
assertEquals(String.format("Occurrences(0...%d) = %f", count, count / elapsedSecs), assertEquals(String.format("Occurrences(0...%d) = %f", count, count / elapsedSecs),
count / elapsedSecs, count / elapsedSecs,
metrics.metrics().get(new MetricName("test.occurences", "grp1")).value(), EPS); metrics.metrics().get(metrics.metricName("test.occurences", "grp1")).value(), EPS);
assertEquals("Count(0...9) = 10", assertEquals("Count(0...9) = 10",
(double) count, metrics.metrics().get(new MetricName("test.count", "grp1")).value(), EPS); (double) count, metrics.metrics().get(metrics.metricName("test.count", "grp1")).value(), EPS);
} }
@Test @Test
public void testHierarchicalSensors() { public void testHierarchicalSensors() {
Sensor parent1 = metrics.sensor("test.parent1"); Sensor parent1 = metrics.sensor("test.parent1");
parent1.add(new MetricName("test.parent1.count", "grp1"), new Count()); parent1.add(metrics.metricName("test.parent1.count", "grp1"), new Count());
Sensor parent2 = metrics.sensor("test.parent2"); Sensor parent2 = metrics.sensor("test.parent2");
parent2.add(new MetricName("test.parent2.count", "grp1"), new Count()); parent2.add(metrics.metricName("test.parent2.count", "grp1"), new Count());
Sensor child1 = metrics.sensor("test.child1", parent1, parent2); Sensor child1 = metrics.sensor("test.child1", parent1, parent2);
child1.add(new MetricName("test.child1.count", "grp1"), new Count()); child1.add(metrics.metricName("test.child1.count", "grp1"), new Count());
Sensor child2 = metrics.sensor("test.child2", parent1); Sensor child2 = metrics.sensor("test.child2", parent1);
child2.add(new MetricName("test.child2.count", "grp1"), new Count()); child2.add(metrics.metricName("test.child2.count", "grp1"), new Count());
Sensor grandchild = metrics.sensor("test.grandchild", child1); Sensor grandchild = metrics.sensor("test.grandchild", child1);
grandchild.add(new MetricName("test.grandchild.count", "grp1"), new Count()); grandchild.add(metrics.metricName("test.grandchild.count", "grp1"), new Count());
/* increment each sensor one time */ /* increment each sensor one time */
parent1.record(); parent1.record();
@ -167,75 +167,76 @@ public class MetricsTest {
@Test @Test
public void testRemoveSensor() { public void testRemoveSensor() {
int size = metrics.metrics().size();
Sensor parent1 = metrics.sensor("test.parent1"); Sensor parent1 = metrics.sensor("test.parent1");
parent1.add(new MetricName("test.parent1.count", "grp1"), new Count()); parent1.add(metrics.metricName("test.parent1.count", "grp1"), new Count());
Sensor parent2 = metrics.sensor("test.parent2"); Sensor parent2 = metrics.sensor("test.parent2");
parent2.add(new MetricName("test.parent2.count", "grp1"), new Count()); parent2.add(metrics.metricName("test.parent2.count", "grp1"), new Count());
Sensor child1 = metrics.sensor("test.child1", parent1, parent2); Sensor child1 = metrics.sensor("test.child1", parent1, parent2);
child1.add(new MetricName("test.child1.count", "grp1"), new Count()); child1.add(metrics.metricName("test.child1.count", "grp1"), new Count());
Sensor child2 = metrics.sensor("test.child2", parent2); Sensor child2 = metrics.sensor("test.child2", parent2);
child2.add(new MetricName("test.child2.count", "grp1"), new Count()); child2.add(metrics.metricName("test.child2.count", "grp1"), new Count());
Sensor grandChild1 = metrics.sensor("test.gchild2", child2); Sensor grandChild1 = metrics.sensor("test.gchild2", child2);
grandChild1.add(new MetricName("test.gchild2.count", "grp1"), new Count()); grandChild1.add(metrics.metricName("test.gchild2.count", "grp1"), new Count());
Sensor sensor = metrics.getSensor("test.parent1"); Sensor sensor = metrics.getSensor("test.parent1");
assertNotNull(sensor); assertNotNull(sensor);
metrics.removeSensor("test.parent1"); metrics.removeSensor("test.parent1");
assertNull(metrics.getSensor("test.parent1")); assertNull(metrics.getSensor("test.parent1"));
assertNull(metrics.metrics().get(new MetricName("test.parent1.count", "grp1"))); assertNull(metrics.metrics().get(metrics.metricName("test.parent1.count", "grp1")));
assertNull(metrics.getSensor("test.child1")); assertNull(metrics.getSensor("test.child1"));
assertNull(metrics.childrenSensors().get(sensor)); assertNull(metrics.childrenSensors().get(sensor));
assertNull(metrics.metrics().get(new MetricName("test.child1.count", "grp1"))); assertNull(metrics.metrics().get(metrics.metricName("test.child1.count", "grp1")));
sensor = metrics.getSensor("test.gchild2"); sensor = metrics.getSensor("test.gchild2");
assertNotNull(sensor); assertNotNull(sensor);
metrics.removeSensor("test.gchild2"); metrics.removeSensor("test.gchild2");
assertNull(metrics.getSensor("test.gchild2")); assertNull(metrics.getSensor("test.gchild2"));
assertNull(metrics.childrenSensors().get(sensor)); assertNull(metrics.childrenSensors().get(sensor));
assertNull(metrics.metrics().get(new MetricName("test.gchild2.count", "grp1"))); assertNull(metrics.metrics().get(metrics.metricName("test.gchild2.count", "grp1")));
sensor = metrics.getSensor("test.child2"); sensor = metrics.getSensor("test.child2");
assertNotNull(sensor); assertNotNull(sensor);
metrics.removeSensor("test.child2"); metrics.removeSensor("test.child2");
assertNull(metrics.getSensor("test.child2")); assertNull(metrics.getSensor("test.child2"));
assertNull(metrics.childrenSensors().get(sensor)); assertNull(metrics.childrenSensors().get(sensor));
assertNull(metrics.metrics().get(new MetricName("test.child2.count", "grp1"))); assertNull(metrics.metrics().get(metrics.metricName("test.child2.count", "grp1")));
sensor = metrics.getSensor("test.parent2"); sensor = metrics.getSensor("test.parent2");
assertNotNull(sensor); assertNotNull(sensor);
metrics.removeSensor("test.parent2"); metrics.removeSensor("test.parent2");
assertNull(metrics.getSensor("test.parent2")); assertNull(metrics.getSensor("test.parent2"));
assertNull(metrics.childrenSensors().get(sensor)); assertNull(metrics.childrenSensors().get(sensor));
assertNull(metrics.metrics().get(new MetricName("test.parent2.count", "grp1"))); assertNull(metrics.metrics().get(metrics.metricName("test.parent2.count", "grp1")));
assertEquals(0, metrics.metrics().size()); assertEquals(size, metrics.metrics().size());
} }
@Test @Test
public void testRemoveInactiveMetrics() { public void testRemoveInactiveMetrics() {
Sensor s1 = metrics.sensor("test.s1", null, 1); Sensor s1 = metrics.sensor("test.s1", null, 1);
s1.add(new MetricName("test.s1.count", "grp1"), new Count()); s1.add(metrics.metricName("test.s1.count", "grp1"), new Count());
Sensor s2 = metrics.sensor("test.s2", null, 3); Sensor s2 = metrics.sensor("test.s2", null, 3);
s2.add(new MetricName("test.s2.count", "grp1"), new Count()); s2.add(metrics.metricName("test.s2.count", "grp1"), new Count());
Metrics.ExpireSensorTask purger = metrics.new ExpireSensorTask(); Metrics.ExpireSensorTask purger = metrics.new ExpireSensorTask();
purger.run(); purger.run();
assertNotNull("Sensor test.s1 must be present", metrics.getSensor("test.s1")); assertNotNull("Sensor test.s1 must be present", metrics.getSensor("test.s1"));
assertNotNull("MetricName test.s1.count must be present", assertNotNull("MetricName test.s1.count must be present",
metrics.metrics().get(new MetricName("test.s1.count", "grp1"))); metrics.metrics().get(metrics.metricName("test.s1.count", "grp1")));
assertNotNull("Sensor test.s2 must be present", metrics.getSensor("test.s2")); assertNotNull("Sensor test.s2 must be present", metrics.getSensor("test.s2"));
assertNotNull("MetricName test.s2.count must be present", assertNotNull("MetricName test.s2.count must be present",
metrics.metrics().get(new MetricName("test.s2.count", "grp1"))); metrics.metrics().get(metrics.metricName("test.s2.count", "grp1")));
time.sleep(1001); time.sleep(1001);
purger.run(); purger.run();
assertNull("Sensor test.s1 should have been purged", metrics.getSensor("test.s1")); assertNull("Sensor test.s1 should have been purged", metrics.getSensor("test.s1"));
assertNull("MetricName test.s1.count should have been purged", assertNull("MetricName test.s1.count should have been purged",
metrics.metrics().get(new MetricName("test.s1.count", "grp1"))); metrics.metrics().get(metrics.metricName("test.s1.count", "grp1")));
assertNotNull("Sensor test.s2 must be present", metrics.getSensor("test.s2")); assertNotNull("Sensor test.s2 must be present", metrics.getSensor("test.s2"));
assertNotNull("MetricName test.s2.count must be present", assertNotNull("MetricName test.s2.count must be present",
metrics.metrics().get(new MetricName("test.s2.count", "grp1"))); metrics.metrics().get(metrics.metricName("test.s2.count", "grp1")));
// record a value in sensor s2. This should reset the clock for that sensor. // record a value in sensor s2. This should reset the clock for that sensor.
// It should not get purged at the 3 second mark after creation // It should not get purged at the 3 second mark after creation
@ -244,36 +245,37 @@ public class MetricsTest {
purger.run(); purger.run();
assertNotNull("Sensor test.s2 must be present", metrics.getSensor("test.s2")); assertNotNull("Sensor test.s2 must be present", metrics.getSensor("test.s2"));
assertNotNull("MetricName test.s2.count must be present", assertNotNull("MetricName test.s2.count must be present",
metrics.metrics().get(new MetricName("test.s2.count", "grp1"))); metrics.metrics().get(metrics.metricName("test.s2.count", "grp1")));
// After another 1 second sleep, the metric should be purged // After another 1 second sleep, the metric should be purged
time.sleep(1000); time.sleep(1000);
purger.run(); purger.run();
assertNull("Sensor test.s2 should have been purged", metrics.getSensor("test.s1")); assertNull("Sensor test.s2 should have been purged", metrics.getSensor("test.s1"));
assertNull("MetricName test.s2.count should have been purged", assertNull("MetricName test.s2.count should have been purged",
metrics.metrics().get(new MetricName("test.s1.count", "grp1"))); metrics.metrics().get(metrics.metricName("test.s1.count", "grp1")));
// After purging, it should be possible to recreate a metric // After purging, it should be possible to recreate a metric
s1 = metrics.sensor("test.s1", null, 1); s1 = metrics.sensor("test.s1", null, 1);
s1.add(new MetricName("test.s1.count", "grp1"), new Count()); s1.add(metrics.metricName("test.s1.count", "grp1"), new Count());
assertNotNull("Sensor test.s1 must be present", metrics.getSensor("test.s1")); assertNotNull("Sensor test.s1 must be present", metrics.getSensor("test.s1"));
assertNotNull("MetricName test.s1.count must be present", assertNotNull("MetricName test.s1.count must be present",
metrics.metrics().get(new MetricName("test.s1.count", "grp1"))); metrics.metrics().get(metrics.metricName("test.s1.count", "grp1")));
} }
@Test @Test
public void testRemoveMetric() { public void testRemoveMetric() {
metrics.addMetric(new MetricName("test1", "grp1"), new Count()); int size = metrics.metrics().size();
metrics.addMetric(new MetricName("test2", "grp1"), new Count()); metrics.addMetric(metrics.metricName("test1", "grp1"), new Count());
metrics.addMetric(metrics.metricName("test2", "grp1"), new Count());
assertNotNull(metrics.removeMetric(new MetricName("test1", "grp1"))); assertNotNull(metrics.removeMetric(metrics.metricName("test1", "grp1")));
assertNull(metrics.metrics().get(new MetricName("test1", "grp1"))); assertNull(metrics.metrics().get(metrics.metricName("test1", "grp1")));
assertNotNull(metrics.metrics().get(new MetricName("test2", "grp1"))); assertNotNull(metrics.metrics().get(metrics.metricName("test2", "grp1")));
assertNotNull(metrics.removeMetric(new MetricName("test2", "grp1"))); assertNotNull(metrics.removeMetric(metrics.metricName("test2", "grp1")));
assertNull(metrics.metrics().get(new MetricName("test2", "grp1"))); assertNull(metrics.metrics().get(metrics.metricName("test2", "grp1")));
assertEquals(0, metrics.metrics().size()); assertEquals(size, metrics.metrics().size());
} }
@Test @Test
@ -313,15 +315,15 @@ public class MetricsTest {
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void testDuplicateMetricName() { public void testDuplicateMetricName() {
metrics.sensor("test").add(new MetricName("test", "grp1"), new Avg()); metrics.sensor("test").add(metrics.metricName("test", "grp1"), new Avg());
metrics.sensor("test2").add(new MetricName("test", "grp1"), new Total()); metrics.sensor("test2").add(metrics.metricName("test", "grp1"), new Total());
} }
@Test @Test
public void testQuotas() { public void testQuotas() {
Sensor sensor = metrics.sensor("test"); Sensor sensor = metrics.sensor("test");
sensor.add(new MetricName("test1.total", "grp1"), new Total(), new MetricConfig().quota(Quota.upperBound(5.0))); sensor.add(metrics.metricName("test1.total", "grp1"), new Total(), new MetricConfig().quota(Quota.upperBound(5.0)));
sensor.add(new MetricName("test2.total", "grp1"), new Total(), new MetricConfig().quota(Quota.lowerBound(0.0))); sensor.add(metrics.metricName("test2.total", "grp1"), new Total(), new MetricConfig().quota(Quota.lowerBound(0.0)));
sensor.record(5.0); sensor.record(5.0);
try { try {
sensor.record(1.0); sensor.record(1.0);
@ -329,7 +331,7 @@ public class MetricsTest {
} catch (QuotaViolationException e) { } catch (QuotaViolationException e) {
// this is good // this is good
} }
assertEquals(6.0, metrics.metrics().get(new MetricName("test1.total", "grp1")).value(), EPS); assertEquals(6.0, metrics.metrics().get(metrics.metricName("test1.total", "grp1")).value(), EPS);
sensor.record(-6.0); sensor.record(-6.0);
try { try {
sensor.record(-1.0); sensor.record(-1.0);
@ -358,15 +360,15 @@ public class MetricsTest {
0.0, 0.0,
100.0, 100.0,
BucketSizing.CONSTANT, BucketSizing.CONSTANT,
new Percentile(new MetricName("test.p25", "grp1"), 25), new Percentile(metrics.metricName("test.p25", "grp1"), 25),
new Percentile(new MetricName("test.p50", "grp1"), 50), new Percentile(metrics.metricName("test.p50", "grp1"), 50),
new Percentile(new MetricName("test.p75", "grp1"), 75)); new Percentile(metrics.metricName("test.p75", "grp1"), 75));
MetricConfig config = new MetricConfig().eventWindow(50).samples(2); MetricConfig config = new MetricConfig().eventWindow(50).samples(2);
Sensor sensor = metrics.sensor("test", config); Sensor sensor = metrics.sensor("test", config);
sensor.add(percs); sensor.add(percs);
Metric p25 = this.metrics.metrics().get(new MetricName("test.p25", "grp1")); Metric p25 = this.metrics.metrics().get(metrics.metricName("test.p25", "grp1"));
Metric p50 = this.metrics.metrics().get(new MetricName("test.p50", "grp1")); Metric p50 = this.metrics.metrics().get(metrics.metricName("test.p50", "grp1"));
Metric p75 = this.metrics.metrics().get(new MetricName("test.p75", "grp1")); Metric p75 = this.metrics.metrics().get(metrics.metricName("test.p75", "grp1"));
// record two windows worth of sequential values // record two windows worth of sequential values
for (int i = 0; i < buckets; i++) for (int i = 0; i < buckets; i++)
@ -389,7 +391,7 @@ public class MetricsTest {
// Use the default time window. Set 3 samples // Use the default time window. Set 3 samples
MetricConfig cfg = new MetricConfig().samples(3); MetricConfig cfg = new MetricConfig().samples(3);
Sensor s = metrics.sensor("test.sensor", cfg); Sensor s = metrics.sensor("test.sensor", cfg);
s.add(new MetricName("test.rate", "grp1"), new Rate(TimeUnit.SECONDS)); s.add(metrics.metricName("test.rate", "grp1"), new Rate(TimeUnit.SECONDS));
int sum = 0; int sum = 0;
int count = cfg.samples() - 1; int count = cfg.samples() - 1;
@ -406,7 +408,7 @@ public class MetricsTest {
// prior to any time passing // prior to any time passing
double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + cfg.timeWindowMs() / 2) / 1000.0; double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + cfg.timeWindowMs() / 2) / 1000.0;
KafkaMetric km = metrics.metrics().get(new MetricName("test.rate", "grp1")); KafkaMetric km = metrics.metrics().get(metrics.metricName("test.rate", "grp1"));
assertEquals("Rate(0...2) = 2.666", sum / elapsedSecs, km.value(), EPS); assertEquals("Rate(0...2) = 2.666", sum / elapsedSecs, km.value(), EPS);
assertEquals("Elapsed Time = 75 seconds", elapsedSecs, assertEquals("Elapsed Time = 75 seconds", elapsedSecs,
((Rate) km.measurable()).windowSize(cfg, time.milliseconds()) / 1000, EPS); ((Rate) km.measurable()).windowSize(cfg, time.milliseconds()) / 1000, EPS);

View File

@ -54,7 +54,7 @@ public class SelectorTest {
this.channelBuilder = new PlaintextChannelBuilder(); this.channelBuilder = new PlaintextChannelBuilder();
this.channelBuilder.configure(configs); this.channelBuilder.configure(configs);
this.metrics = new Metrics(); this.metrics = new Metrics();
this.selector = new Selector(5000, this.metrics, time, "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder); this.selector = new Selector(5000, this.metrics, time, "MetricGroup", channelBuilder);
} }
@After @After

View File

@ -18,7 +18,6 @@ import static org.junit.Assert.assertTrue;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.io.File; import java.io.File;
@ -56,7 +55,7 @@ public class SslSelectorTest extends SelectorTest {
this.channelBuilder = new SslChannelBuilder(Mode.CLIENT); this.channelBuilder = new SslChannelBuilder(Mode.CLIENT);
this.channelBuilder.configure(sslClientConfigs); this.channelBuilder.configure(sslClientConfigs);
this.metrics = new Metrics(); this.metrics = new Metrics();
this.selector = new Selector(5000, metrics, time, "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder); this.selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder);
} }
@After @After
@ -84,7 +83,7 @@ public class SslSelectorTest extends SelectorTest {
} }
}; };
channelBuilder.configure(sslClientConfigs); channelBuilder.configure(sslClientConfigs);
Selector selector = new Selector(5000, metrics, time, "MetricGroup2", new LinkedHashMap<String, String>(), channelBuilder); Selector selector = new Selector(5000, metrics, time, "MetricGroup2", channelBuilder);
try { try {
int reqs = 500; int reqs = 500;
String node = "0"; String node = "0";

View File

@ -21,7 +21,6 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
@ -75,7 +74,7 @@ public class SslTransportLayerTest {
this.channelBuilder = new SslChannelBuilder(Mode.CLIENT); this.channelBuilder = new SslChannelBuilder(Mode.CLIENT);
this.channelBuilder.configure(sslClientConfigs); this.channelBuilder.configure(sslClientConfigs);
this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder); this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder);
} }
@After @After
@ -452,7 +451,7 @@ public class SslTransportLayerTest {
}; };
this.channelBuilder.configure(sslClientConfigs); this.channelBuilder.configure(sslClientConfigs);
this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder); this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder);
} }
private static class CertStores { private static class CertStores {
@ -560,7 +559,7 @@ public class SslTransportLayerTest {
this.newChannels = Collections.synchronizedList(new ArrayList<SocketChannel>()); this.newChannels = Collections.synchronizedList(new ArrayList<SocketChannel>());
SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.SERVER); SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.SERVER);
channelBuilder.configure(sslServerConfigs); channelBuilder.configure(sslServerConfigs);
this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder); this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder);
setName("echoserver"); setName("echoserver");
setDaemon(true); setDaemon(true);
acceptorThread = new AcceptorThread(); acceptorThread = new AcceptorThread();

View File

@ -14,7 +14,6 @@ package org.apache.kafka.test;
import java.util.Arrays; import java.util.Arrays;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Avg;
@ -33,15 +32,15 @@ public class MetricsBench {
Sensor parent = metrics.sensor("parent"); Sensor parent = metrics.sensor("parent");
Sensor child = metrics.sensor("child", parent); Sensor child = metrics.sensor("child", parent);
for (Sensor sensor : Arrays.asList(parent, child)) { for (Sensor sensor : Arrays.asList(parent, child)) {
sensor.add(new MetricName(sensor.name() + ".avg", "grp1"), new Avg()); sensor.add(metrics.metricName(sensor.name() + ".avg", "grp1"), new Avg());
sensor.add(new MetricName(sensor.name() + ".count", "grp1"), new Count()); sensor.add(metrics.metricName(sensor.name() + ".count", "grp1"), new Count());
sensor.add(new MetricName(sensor.name() + ".max", "grp1"), new Max()); sensor.add(metrics.metricName(sensor.name() + ".max", "grp1"), new Max());
sensor.add(new Percentiles(1024, sensor.add(new Percentiles(1024,
0.0, 0.0,
iters, iters,
BucketSizing.CONSTANT, BucketSizing.CONSTANT,
new Percentile(new MetricName(sensor.name() + ".median", "grp1"), 50.0), new Percentile(metrics.metricName(sensor.name() + ".median", "grp1"), 50.0),
new Percentile(new MetricName(sensor.name() + ".p_99", "grp1"), 99.0))); new Percentile(metrics.metricName(sensor.name() + ".p_99", "grp1"), 99.0)));
} }
long start = System.nanoTime(); long start = System.nanoTime();
for (int i = 0; i < iters; i++) for (int i = 0; i < iters; i++)

View File

@ -18,7 +18,6 @@ package org.apache.kafka.connect.runtime.distributed;
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator; import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
@ -67,7 +66,6 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
int heartbeatIntervalMs, int heartbeatIntervalMs,
Metrics metrics, Metrics metrics,
String metricGrpPrefix, String metricGrpPrefix,
Map<String, String> metricTags,
Time time, Time time,
long retryBackoffMs, long retryBackoffMs,
String restUrl, String restUrl,
@ -79,13 +77,12 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
heartbeatIntervalMs, heartbeatIntervalMs,
metrics, metrics,
metricGrpPrefix, metricGrpPrefix,
metricTags,
time, time,
retryBackoffMs); retryBackoffMs);
this.restUrl = restUrl; this.restUrl = restUrl;
this.configStorage = configStorage; this.configStorage = configStorage;
this.assignmentSnapshot = null; this.assignmentSnapshot = null;
this.sensors = new WorkerCoordinatorMetrics(metrics, metricGrpPrefix, metricTags); this.sensors = new WorkerCoordinatorMetrics(metrics, metricGrpPrefix);
this.listener = listener; this.listener = listener;
this.rejoinRequested = false; this.rejoinRequested = false;
} }
@ -254,7 +251,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
public final Metrics metrics; public final Metrics metrics;
public final String metricGrpName; public final String metricGrpName;
public WorkerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) { public WorkerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
this.metrics = metrics; this.metrics = metrics;
this.metricGrpName = metricGrpPrefix + "-coordinator-metrics"; this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
@ -270,16 +267,12 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
} }
}; };
metrics.addMetric(new MetricName("assigned-connectors", metrics.addMetric(metrics.metricName("assigned-connectors",
this.metricGrpName, this.metricGrpName,
"The number of connector instances currently assigned to this consumer", "The number of connector instances currently assigned to this consumer"), numConnectors);
tags), metrics.addMetric(metrics.metricName("assigned-tasks",
numConnectors); this.metricGrpName,
metrics.addMetric(new MetricName("assigned-tasks", "The number of tasks currently assigned to this consumer"), numTasks);
this.metricGrpName,
"The number of tasks currently assigned to this consumer",
tags),
numTasks);
} }
} }

View File

@ -71,10 +71,13 @@ public class WorkerGroupMember {
try { try {
this.time = new SystemTime(); this.time = new SystemTime();
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS);
String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig; clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
Map<String, String> metricsTags = new LinkedHashMap<>();
metricsTags.put("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.tags(metricsTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX)); reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time); this.metrics = new Metrics(metricConfig, reporters, time);
@ -83,11 +86,9 @@ public class WorkerGroupMember {
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), 0); this.metadata.update(Cluster.bootstrap(addresses), 0);
String metricGrpPrefix = "connect"; String metricGrpPrefix = "connect";
Map<String, String> metricsTags = new LinkedHashMap<>();
metricsTags.put("client-id", clientId);
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
NetworkClient netClient = new NetworkClient( NetworkClient netClient = new NetworkClient(
new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags, channelBuilder), new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
this.metadata, this.metadata,
clientId, clientId,
100, // a fixed large enough value will suffice 100, // a fixed large enough value will suffice
@ -102,7 +103,6 @@ public class WorkerGroupMember {
config.getInt(DistributedConfig.HEARTBEAT_INTERVAL_MS_CONFIG), config.getInt(DistributedConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
metrics, metrics,
metricGrpPrefix, metricGrpPrefix,
metricsTags,
this.time, this.time,
retryBackoffMs, retryBackoffMs,
restUrl, restUrl,

View File

@ -47,7 +47,6 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -75,7 +74,6 @@ public class WorkerCoordinatorTest {
private Node node = cluster.nodes().get(0); private Node node = cluster.nodes().get(0);
private Metadata metadata; private Metadata metadata;
private Metrics metrics; private Metrics metrics;
private Map<String, String> metricTags = new LinkedHashMap<>();
private ConsumerNetworkClient consumerClient; private ConsumerNetworkClient consumerClient;
private MockRebalanceListener rebalanceListener; private MockRebalanceListener rebalanceListener;
@Mock private KafkaConfigStorage configStorage; @Mock private KafkaConfigStorage configStorage;
@ -103,7 +101,6 @@ public class WorkerCoordinatorTest {
heartbeatIntervalMs, heartbeatIntervalMs,
metrics, metrics,
"consumer" + groupId, "consumer" + groupId,
metricTags,
time, time,
retryBackoffMs, retryBackoffMs,
LEADER_URL, LEADER_URL,

View File

@ -226,7 +226,6 @@ object AdminClient {
metrics, metrics,
time, time,
"admin", "admin",
Map[String, String](),
channelBuilder) channelBuilder)
val networkClient = new NetworkClient( val networkClient = new NetworkClient(

View File

@ -111,6 +111,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private val zkCommitMeter = newMeter("ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS, Map("clientId" -> config.clientId)) private val zkCommitMeter = newMeter("ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS, Map("clientId" -> config.clientId))
private val rebalanceTimer = new KafkaTimer(newTimer("RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, Map("clientId" -> config.clientId))) private val rebalanceTimer = new KafkaTimer(newTimer("RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, Map("clientId" -> config.clientId)))
newGauge(
"yammer-metrics-count",
new Gauge[Int] {
def value = {
com.yammer.metrics.Metrics.defaultRegistry().allMetrics().size()
}
}
)
val consumerIdString = { val consumerIdString = {
var consumerUuid : String = null var consumerUuid : String = null
config.consumerId match { config.consumerId match {

View File

@ -70,7 +70,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
private val allMetricNames = (0 until totalProcessorThreads).map { i => private val allMetricNames = (0 until totalProcessorThreads).map { i =>
val tags = new util.HashMap[String, String]() val tags = new util.HashMap[String, String]()
tags.put("networkProcessor", i.toString) tags.put("networkProcessor", i.toString)
new MetricName("io-wait-ratio", "socket-server-metrics", tags) metrics.metricName("io-wait-ratio", "socket-server-metrics", tags)
} }
/** /**
@ -384,7 +384,7 @@ private[kafka] class Processor(val id: Int,
newGauge("IdlePercent", newGauge("IdlePercent",
new Gauge[Double] { new Gauge[Double] {
def value = { def value = {
metrics.metrics().get(new MetricName("io-wait-ratio", "socket-server-metrics", metricTags)).value() metrics.metrics().get(metrics.metricName("io-wait-ratio", "socket-server-metrics", metricTags)).value()
} }
}, },
metricTags.asScala metricTags.asScala

View File

@ -76,7 +76,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
throttledRequestReaper.start() throttledRequestReaper.start()
private val delayQueueSensor = metrics.sensor(apiKey + "-delayQueue") private val delayQueueSensor = metrics.sensor(apiKey + "-delayQueue")
delayQueueSensor.add(new MetricName("queue-size", delayQueueSensor.add(metrics.metricName("queue-size",
apiKey, apiKey,
"Tracks the size of the delay queue"), new Total()) "Tracks the size of the delay queue"), new Total())
@ -206,7 +206,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
throttleTimeSensor = metrics.sensor(throttleTimeSensorName, throttleTimeSensor = metrics.sensor(throttleTimeSensorName,
null, null,
ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds) ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds)
throttleTimeSensor.add(new MetricName("throttle-time", throttleTimeSensor.add(metrics.metricName("throttle-time",
apiKey, apiKey,
"Tracking average throttle-time per client", "Tracking average throttle-time per client",
"client-id", "client-id",
@ -271,7 +271,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
} }
private def clientRateMetricName(clientId: String): MetricName = { private def clientRateMetricName(clientId: String): MetricName = {
new MetricName("byte-rate", apiKey, metrics.metricName("byte-rate", apiKey,
"Tracking byte-rate per client", "Tracking byte-rate per client",
"client-id", clientId) "client-id", clientId)
} }

View File

@ -141,6 +141,15 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
} }
) )
newGauge(
"yammer-metrics-count",
new Gauge[Int] {
def value = {
com.yammer.metrics.Metrics.defaultRegistry().allMetrics().size()
}
}
)
/** /**
* Start up API for bringing up a single instance of the Kafka server. * Start up API for bringing up a single instance of the Kafka server.
* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers

View File

@ -90,9 +90,6 @@ object JmxTool extends Logging {
List(null) List(null)
val names = queries.map((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName]).flatten val names = queries.map((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName]).flatten
val allAttributes: Iterable[(ObjectName, Array[String])] =
names.map((name: ObjectName) => (name, mbsc.getMBeanInfo(name).getAttributes().map(_.getName)))
val numExpectedAttributes: Map[ObjectName, Int] = val numExpectedAttributes: Map[ObjectName, Int] =
attributesWhitelistExists match { attributesWhitelistExists match {

View File

@ -125,10 +125,10 @@ class QuotasTest extends KafkaServerTestHarness {
val numRecords = 1000 val numRecords = 1000
produce(producers.head, numRecords) produce(producers.head, numRecords)
val producerMetricName = new MetricName("throttle-time", val producerMetricName = leaderNode.metrics.metricName("throttle-time",
ApiKeys.PRODUCE.name, ApiKeys.PRODUCE.name,
"Tracking throttle-time per client", "Tracking throttle-time per client",
"client-id", producerId1) "client-id", producerId1)
assertTrue("Should have been throttled", allMetrics(producerMetricName).value() > 0) assertTrue("Should have been throttled", allMetrics(producerMetricName).value() > 0)
// Consumer should read in a bursty manner and get throttled immediately // Consumer should read in a bursty manner and get throttled immediately
@ -136,10 +136,10 @@ class QuotasTest extends KafkaServerTestHarness {
// The replica consumer should not be throttled also. Create a fetch request which will exceed the quota immediately // The replica consumer should not be throttled also. Create a fetch request which will exceed the quota immediately
val request = new FetchRequestBuilder().addFetch(topic1, 0, 0, 1024*1024).replicaId(followerNode.config.brokerId).build() val request = new FetchRequestBuilder().addFetch(topic1, 0, 0, 1024*1024).replicaId(followerNode.config.brokerId).build()
replicaConsumers.head.fetch(request) replicaConsumers.head.fetch(request)
val consumerMetricName = new MetricName("throttle-time", val consumerMetricName = leaderNode.metrics.metricName("throttle-time",
ApiKeys.FETCH.name, ApiKeys.FETCH.name,
"Tracking throttle-time per client", "Tracking throttle-time per client",
"client-id", consumerId1) "client-id", consumerId1)
assertTrue("Should have been throttled", allMetrics(consumerMetricName).value() > 0) assertTrue("Should have been throttled", allMetrics(consumerMetricName).value() > 0)
} }
@ -166,10 +166,10 @@ class QuotasTest extends KafkaServerTestHarness {
val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala
val numRecords = 1000 val numRecords = 1000
produce(producers(1), numRecords) produce(producers(1), numRecords)
val producerMetricName = new MetricName("throttle-time", val producerMetricName = leaderNode.metrics.metricName("throttle-time",
ApiKeys.PRODUCE.name, ApiKeys.PRODUCE.name,
"Tracking throttle-time per client", "Tracking throttle-time per client",
"client-id", producerId2) "client-id", producerId2)
assertEquals("Should not have been throttled", 0.0, allMetrics(producerMetricName).value(), 0.0) assertEquals("Should not have been throttled", 0.0, allMetrics(producerMetricName).value(), 0.0)
// The "client" consumer does not get throttled. // The "client" consumer does not get throttled.
@ -177,10 +177,10 @@ class QuotasTest extends KafkaServerTestHarness {
// The replica consumer should not be throttled also. Create a fetch request which will exceed the quota immediately // The replica consumer should not be throttled also. Create a fetch request which will exceed the quota immediately
val request = new FetchRequestBuilder().addFetch(topic1, 0, 0, 1024*1024).replicaId(followerNode.config.brokerId).build() val request = new FetchRequestBuilder().addFetch(topic1, 0, 0, 1024*1024).replicaId(followerNode.config.brokerId).build()
replicaConsumers(1).fetch(request) replicaConsumers(1).fetch(request)
val consumerMetricName = new MetricName("throttle-time", val consumerMetricName = leaderNode.metrics.metricName("throttle-time",
ApiKeys.FETCH.name, ApiKeys.FETCH.name,
"Tracking throttle-time per client", "Tracking throttle-time per client",
"client-id", consumerId2) "client-id", consumerId2)
assertEquals("Should not have been throttled", 0.0, allMetrics(consumerMetricName).value(), 0.0) assertEquals("Should not have been throttled", 0.0, allMetrics(consumerMetricName).value(), 0.0)
} }

View File

@ -79,7 +79,7 @@ class ClientQuotaManagerTest {
def testQuotaViolation() { def testQuotaViolation() {
val metrics = newMetrics val metrics = newMetrics
val clientMetrics = new ClientQuotaManager(config, metrics, "producer", time) val clientMetrics = new ClientQuotaManager(config, metrics, "producer", time)
val queueSizeMetric = metrics.metrics().get(new MetricName("queue-size", "producer", "")) val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "producer", ""))
try { try {
/* We have 10 second windows. Make sure that there is no quota violation /* We have 10 second windows. Make sure that there is no quota violation
* if we produce under the quota * if we produce under the quota

View File

@ -680,30 +680,30 @@ public class StreamThread extends Thread {
this.metricTags.put("client-id", clientId + "-" + getName()); this.metricTags.put("client-id", clientId + "-" + getName());
this.commitTimeSensor = metrics.sensor("commit-time"); this.commitTimeSensor = metrics.sensor("commit-time");
this.commitTimeSensor.add(new MetricName("commit-time-avg", metricGrpName, "The average commit time in ms", metricTags), new Avg()); this.commitTimeSensor.add(metrics.metricName("commit-time-avg", metricGrpName, "The average commit time in ms", metricTags), new Avg());
this.commitTimeSensor.add(new MetricName("commit-time-max", metricGrpName, "The maximum commit time in ms", metricTags), new Max()); this.commitTimeSensor.add(metrics.metricName("commit-time-max", metricGrpName, "The maximum commit time in ms", metricTags), new Max());
this.commitTimeSensor.add(new MetricName("commit-calls-rate", metricGrpName, "The average per-second number of commit calls", metricTags), new Rate(new Count())); this.commitTimeSensor.add(metrics.metricName("commit-calls-rate", metricGrpName, "The average per-second number of commit calls", metricTags), new Rate(new Count()));
this.pollTimeSensor = metrics.sensor("poll-time"); this.pollTimeSensor = metrics.sensor("poll-time");
this.pollTimeSensor.add(new MetricName("poll-time-avg", metricGrpName, "The average poll time in ms", metricTags), new Avg()); this.pollTimeSensor.add(metrics.metricName("poll-time-avg", metricGrpName, "The average poll time in ms", metricTags), new Avg());
this.pollTimeSensor.add(new MetricName("poll-time-max", metricGrpName, "The maximum poll time in ms", metricTags), new Max()); this.pollTimeSensor.add(metrics.metricName("poll-time-max", metricGrpName, "The maximum poll time in ms", metricTags), new Max());
this.pollTimeSensor.add(new MetricName("poll-calls-rate", metricGrpName, "The average per-second number of record-poll calls", metricTags), new Rate(new Count())); this.pollTimeSensor.add(metrics.metricName("poll-calls-rate", metricGrpName, "The average per-second number of record-poll calls", metricTags), new Rate(new Count()));
this.processTimeSensor = metrics.sensor("process-time"); this.processTimeSensor = metrics.sensor("process-time");
this.processTimeSensor.add(new MetricName("process-time-avg-ms", metricGrpName, "The average process time in ms", metricTags), new Avg()); this.processTimeSensor.add(metrics.metricName("process-time-avg-ms", metricGrpName, "The average process time in ms", metricTags), new Avg());
this.processTimeSensor.add(new MetricName("process-time-max-ms", metricGrpName, "The maximum process time in ms", metricTags), new Max()); this.processTimeSensor.add(metrics.metricName("process-time-max-ms", metricGrpName, "The maximum process time in ms", metricTags), new Max());
this.processTimeSensor.add(new MetricName("process-calls-rate", metricGrpName, "The average per-second number of process calls", metricTags), new Rate(new Count())); this.processTimeSensor.add(metrics.metricName("process-calls-rate", metricGrpName, "The average per-second number of process calls", metricTags), new Rate(new Count()));
this.punctuateTimeSensor = metrics.sensor("punctuate-time"); this.punctuateTimeSensor = metrics.sensor("punctuate-time");
this.punctuateTimeSensor.add(new MetricName("punctuate-time-avg", metricGrpName, "The average punctuate time in ms", metricTags), new Avg()); this.punctuateTimeSensor.add(metrics.metricName("punctuate-time-avg", metricGrpName, "The average punctuate time in ms", metricTags), new Avg());
this.punctuateTimeSensor.add(new MetricName("punctuate-time-max", metricGrpName, "The maximum punctuate time in ms", metricTags), new Max()); this.punctuateTimeSensor.add(metrics.metricName("punctuate-time-max", metricGrpName, "The maximum punctuate time in ms", metricTags), new Max());
this.punctuateTimeSensor.add(new MetricName("punctuate-calls-rate", metricGrpName, "The average per-second number of punctuate calls", metricTags), new Rate(new Count())); this.punctuateTimeSensor.add(metrics.metricName("punctuate-calls-rate", metricGrpName, "The average per-second number of punctuate calls", metricTags), new Rate(new Count()));
this.taskCreationSensor = metrics.sensor("task-creation"); this.taskCreationSensor = metrics.sensor("task-creation");
this.taskCreationSensor.add(new MetricName("task-creation-rate", metricGrpName, "The average per-second number of newly created tasks", metricTags), new Rate(new Count())); this.taskCreationSensor.add(metrics.metricName("task-creation-rate", metricGrpName, "The average per-second number of newly created tasks", metricTags), new Rate(new Count()));
this.taskDestructionSensor = metrics.sensor("task-destruction"); this.taskDestructionSensor = metrics.sensor("task-destruction");
this.taskDestructionSensor.add(new MetricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count())); this.taskDestructionSensor.add(metrics.metricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count()));
} }
@Override @Override
@ -733,11 +733,11 @@ public class StreamThread extends Thread {
} }
private void addLatencyMetrics(String metricGrpName, Sensor sensor, String entityName, String opName, Map<String, String> tags) { private void addLatencyMetrics(String metricGrpName, Sensor sensor, String entityName, String opName, Map<String, String> tags) {
maybeAddMetric(sensor, new MetricName(opName + "-avg-latency-ms", metricGrpName, maybeAddMetric(sensor, metrics.metricName(opName + "-avg-latency-ms", metricGrpName,
"The average latency in milliseconds of " + entityName + " " + opName + " operation.", tags), new Avg()); "The average latency in milliseconds of " + entityName + " " + opName + " operation.", tags), new Avg());
maybeAddMetric(sensor, new MetricName(opName + "-max-latency-ms", metricGrpName, maybeAddMetric(sensor, metrics.metricName(opName + "-max-latency-ms", metricGrpName,
"The max latency in milliseconds of " + entityName + " " + opName + " operation.", tags), new Max()); "The max latency in milliseconds of " + entityName + " " + opName + " operation.", tags), new Max());
maybeAddMetric(sensor, new MetricName(opName + "-qps", metricGrpName, maybeAddMetric(sensor, metrics.metricName(opName + "-qps", metricGrpName,
"The average number of occurrence of " + entityName + " " + opName + " operation per second.", tags), new Rate(new Count())); "The average number of occurrence of " + entityName + " " + opName + " operation per second.", tags), new Rate(new Count()));
} }