KAFKA-2733: Standardize metric name for Kafka Streams

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Yasuhiro Matsuda, Jun Rao

Closes #643 from guozhangwang/K2733
This commit is contained in:
Guozhang Wang 2015-12-09 22:15:34 -08:00
parent 6e5bd2497a
commit ec466d358d
2 changed files with 20 additions and 18 deletions

View File

@ -721,23 +721,25 @@ public class StreamThread extends Thread {
for (int i = 0; i < tags.length; i += 2)
tagMap.put(tags[i], tags[i + 1]);
String metricGroupName = "streaming-" + scopeName + "-metrics";
// first add the global operation metrics if not yet, with the global tags only
Sensor parent = metrics.sensor(operationName);
addLatencyMetrics(this.metricGrpName, parent, "all", operationName, this.metricTags);
Sensor parent = metrics.sensor(scopeName + "-" + operationName);
addLatencyMetrics(metricGroupName, parent, "all", operationName, this.metricTags);
// add the store operation metrics with additional tags
Sensor sensor = metrics.sensor(entityName + "-" + operationName, parent);
addLatencyMetrics("streaming-" + scopeName + "-metrics", sensor, entityName, operationName, tagMap);
Sensor sensor = metrics.sensor(scopeName + "-" + entityName + "-" + operationName, parent);
addLatencyMetrics(metricGroupName, sensor, entityName, operationName, tagMap);
return sensor;
}
private void addLatencyMetrics(String metricGrpName, Sensor sensor, String entityName, String opName, Map<String, String> tags) {
maybeAddMetric(sensor, metrics.metricName(opName + "-avg-latency-ms", metricGrpName,
maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-avg-latency-ms", metricGrpName,
"The average latency in milliseconds of " + entityName + " " + opName + " operation.", tags), new Avg());
maybeAddMetric(sensor, metrics.metricName(opName + "-max-latency-ms", metricGrpName,
maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-max-latency-ms", metricGrpName,
"The max latency in milliseconds of " + entityName + " " + opName + " operation.", tags), new Max());
maybeAddMetric(sensor, metrics.metricName(opName + "-qps", metricGrpName,
maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-qps", metricGrpName,
"The average number of occurrence of " + entityName + " " + opName + " operation per second.", tags), new Rate(new Count()));
}

View File

@ -31,7 +31,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
protected final KeyValueStore<K, V> inner;
protected final Serdes<K, V> serialization;
protected final String metricGrp;
protected final String metricScope;
protected final Time time;
private Sensor putTime;
@ -48,10 +48,10 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
private KeyValueStoreChangeLogger<K, V> changeLogger = null;
// always wrap the store with the metered store
public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricGrp, Time time) {
public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricScope, Time time) {
this.inner = inner;
this.serialization = serialization;
this.metricGrp = metricGrp;
this.metricScope = metricScope;
this.time = time != null ? time : new SystemTime();
}
@ -69,14 +69,14 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
public void init(ProcessorContext context) {
final String name = name();
this.metrics = context.metrics();
this.putTime = this.metrics.addLatencySensor(metricGrp, name, "put", "store-name", name);
this.getTime = this.metrics.addLatencySensor(metricGrp, name, "get", "store-name", name);
this.deleteTime = this.metrics.addLatencySensor(metricGrp, name, "delete", "store-name", name);
this.putAllTime = this.metrics.addLatencySensor(metricGrp, name, "put-all", "store-name", name);
this.allTime = this.metrics.addLatencySensor(metricGrp, name, "all", "store-name", name);
this.rangeTime = this.metrics.addLatencySensor(metricGrp, name, "range", "store-name", name);
this.flushTime = this.metrics.addLatencySensor(metricGrp, name, "flush", "store-name", name);
this.restoreTime = this.metrics.addLatencySensor(metricGrp, name, "restore", "store-name", name);
this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
this.getTime = this.metrics.addLatencySensor(metricScope, name, "get");
this.deleteTime = this.metrics.addLatencySensor(metricScope, name, "delete");
this.putAllTime = this.metrics.addLatencySensor(metricScope, name, "put-all");
this.allTime = this.metrics.addLatencySensor(metricScope, name, "all");
this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range");
this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
serialization.init(context);
this.changeLogger = this.loggingEnabled ? new KeyValueStoreChangeLogger<>(name, context, serialization) : null;