mirror of https://github.com/apache/kafka.git
KAFKA-1359: Ensure all topic/server metrics registered at once.
This commit is contained in:
parent
9a6f7113ed
commit
ec075c5a85
|
@ -774,27 +774,35 @@ public class Sender implements Runnable {
|
|||
|
||||
public SenderMetrics(Metrics metrics) {
|
||||
this.metrics = metrics;
|
||||
|
||||
this.batchSizeSensor = metrics.sensor("batch-size");
|
||||
this.queueTimeSensor = metrics.sensor("queue-time");
|
||||
this.requestTimeSensor = metrics.sensor("request-time");
|
||||
this.recordsPerRequestSensor = metrics.sensor("records-per-request");
|
||||
this.retrySensor = metrics.sensor("record-retries");
|
||||
this.errorSensor = metrics.sensor("errors");
|
||||
this.maxRecordSizeSensor = metrics.sensor("record-size-max");
|
||||
this.batchSizeSensor.add("batch-size-avg", "The average number of bytes sent per partition per-request.", new Avg());
|
||||
this.retrySensor.add("record-retry-rate", "The average per-second number of retried record sends", new Rate());
|
||||
this.requestTimeSensor.add("request-latency-avg", "The average request latency in ms", new Avg());
|
||||
this.requestTimeSensor.add("request-latency-max", "The maximum request latency in ms", new Max());
|
||||
|
||||
this.queueTimeSensor = metrics.sensor("queue-time");
|
||||
this.queueTimeSensor.add("record-queue-time-avg",
|
||||
"The average time in ms record batches spent in the record accumulator.",
|
||||
new Avg());
|
||||
this.queueTimeSensor.add("record-queue-time-max",
|
||||
"The maximum time in ms record batches spent in the record accumulator.",
|
||||
new Max());
|
||||
this.errorSensor.add("record-error-rate", "The average per-second number of record sends that resulted in errors", new Rate());
|
||||
|
||||
this.requestTimeSensor = metrics.sensor("request-time");
|
||||
this.requestTimeSensor.add("request-latency-avg", "The average request latency in ms", new Avg());
|
||||
this.requestTimeSensor.add("request-latency-max", "The maximum request latency in ms", new Max());
|
||||
|
||||
this.recordsPerRequestSensor = metrics.sensor("records-per-request");
|
||||
this.recordsPerRequestSensor.add("record-send-rate", "The average number of records sent per second.", new Rate());
|
||||
this.recordsPerRequestSensor.add("records-per-request-avg", "The average number of records per request.", new Avg());
|
||||
|
||||
this.retrySensor = metrics.sensor("record-retries");
|
||||
this.retrySensor.add("record-retry-rate", "The average per-second number of retried record sends", new Rate());
|
||||
|
||||
this.errorSensor = metrics.sensor("errors");
|
||||
this.errorSensor.add("record-error-rate", "The average per-second number of record sends that resulted in errors", new Rate());
|
||||
|
||||
this.maxRecordSizeSensor = metrics.sensor("record-size-max");
|
||||
this.maxRecordSizeSensor.add("record-size-max", "The maximum record size", new Max());
|
||||
|
||||
this.metrics.addMetric("requests-in-flight", "The current number of in-flight requests awaiting a response.", new Measurable() {
|
||||
public double measure(MetricConfig config, long now) {
|
||||
return inFlightRequests.totalInFlightRequests();
|
||||
|
@ -807,32 +815,53 @@ public class Sender implements Runnable {
|
|||
});
|
||||
}
|
||||
|
||||
public void maybeRegisterTopicMetrics(String topic) {
|
||||
// if one sensor of the metrics has been registered for the topic,
|
||||
// then all other sensors should have been registered; and vice versa
|
||||
String topicRecordsCountName = "topic." + topic + ".records-per-batch";
|
||||
Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName);
|
||||
if (topicRecordCount == null) {
|
||||
topicRecordCount = this.metrics.sensor(topicRecordsCountName);
|
||||
topicRecordCount.add("topic." + topic + ".record-send-rate", new Rate());
|
||||
|
||||
String topicByteRateName = "topic." + topic + ".bytes";
|
||||
Sensor topicByteRate = this.metrics.sensor(topicByteRateName);
|
||||
topicByteRate.add("topic." + topic + ".byte-rate", new Rate());
|
||||
|
||||
String topicRetryName = "topic." + topic + ".record-retries";
|
||||
Sensor topicRetrySensor = this.metrics.sensor(topicRetryName);
|
||||
topicRetrySensor.add("topic." + topic + ".record-retry-rate", new Rate());
|
||||
|
||||
String topicErrorName = "topic." + topic + ".record-errors";
|
||||
Sensor topicErrorSensor = this.metrics.sensor(topicErrorName);
|
||||
topicErrorSensor.add("topic." + topic + ".record-error-rate", new Rate());
|
||||
}
|
||||
}
|
||||
|
||||
public void updateProduceRequestMetrics(List<InFlightRequest> requests) {
|
||||
long ns = time.nanoseconds();
|
||||
for (int i = 0; i < requests.size(); i++) {
|
||||
InFlightRequest request = requests.get(i);
|
||||
int records = 0;
|
||||
|
||||
if (request.batches != null) {
|
||||
for (RecordBatch batch : request.batches.values()) {
|
||||
|
||||
// per-topic record count
|
||||
String topicRecordsCountName = "topic." + batch.topicPartition.topic() + ".records-per-batch";
|
||||
Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName);
|
||||
if (topicRecordCount == null) {
|
||||
topicRecordCount = this.metrics.sensor(topicRecordsCountName);
|
||||
topicRecordCount.add("topic." + batch.topicPartition.topic() + ".record-send-rate", new Rate());
|
||||
}
|
||||
// register all per-topic metrics at once
|
||||
String topic = batch.topicPartition.topic();
|
||||
maybeRegisterTopicMetrics(topic);
|
||||
|
||||
// per-topic record send rate
|
||||
String topicRecordsCountName = "topic." + topic + ".records-per-batch";
|
||||
Sensor topicRecordCount = Utils.notNull(this.metrics.getSensor(topicRecordsCountName));
|
||||
topicRecordCount.record(batch.recordCount);
|
||||
|
||||
// per-topic bytes-per-second
|
||||
String topicByteRateName = "topic." + batch.topicPartition.topic() + ".bytes";
|
||||
Sensor topicByteRate = this.metrics.getSensor(topicByteRateName);
|
||||
if (topicByteRate == null) {
|
||||
topicByteRate = this.metrics.sensor(topicByteRateName);
|
||||
topicByteRate.add("topic." + batch.topicPartition.topic() + ".byte-rate", new Rate());
|
||||
}
|
||||
// per-topic bytes send rate
|
||||
String topicByteRateName = "topic." + topic + ".bytes";
|
||||
Sensor topicByteRate = Utils.notNull(this.metrics.getSensor(topicByteRateName));
|
||||
topicByteRate.record(batch.records.sizeInBytes());
|
||||
|
||||
// global metrics
|
||||
this.batchSizeSensor.record(batch.records.sizeInBytes(), ns);
|
||||
this.queueTimeSensor.record(batch.drained - batch.created, ns);
|
||||
this.maxRecordSizeSensor.record(batch.maxRecordSize, ns);
|
||||
|
@ -847,35 +876,22 @@ public class Sender implements Runnable {
|
|||
this.retrySensor.record(count);
|
||||
String topicRetryName = "topic." + topic + ".record-retries";
|
||||
Sensor topicRetrySensor = this.metrics.getSensor(topicRetryName);
|
||||
if (topicRetrySensor == null) {
|
||||
topicRetrySensor = this.metrics.sensor(topicRetryName);
|
||||
topicRetrySensor.add("topic." + topic + ".record-retry-rate", new Rate());
|
||||
}
|
||||
topicRetrySensor.record(count);
|
||||
if (topicRetrySensor != null) topicRetrySensor.record(count);
|
||||
}
|
||||
|
||||
public void recordErrors(String topic, int count) {
|
||||
this.errorSensor.record(count);
|
||||
String topicErrorName = "topic." + topic + ".record-errors";
|
||||
Sensor topicErrorSensor = this.metrics.getSensor(topicErrorName);
|
||||
if (topicErrorSensor == null) {
|
||||
topicErrorSensor = this.metrics.sensor(topicErrorName);
|
||||
topicErrorSensor.add("topic." + topic + ".record-error-rate", new Rate());
|
||||
}
|
||||
topicErrorSensor.record(count);
|
||||
if (topicErrorSensor != null) topicErrorSensor.record(count);
|
||||
}
|
||||
|
||||
public void recordLatency(int node, long latency, long nowNs) {
|
||||
this.requestTimeSensor.record(latency, nowNs);
|
||||
if (node >= 0) {
|
||||
String nodeTimeName = "server." + node + ".latency";
|
||||
String nodeTimeName = "node-" + node + ".latency";
|
||||
Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName);
|
||||
if (nodeRequestTime == null) {
|
||||
nodeRequestTime = this.metrics.sensor(nodeTimeName);
|
||||
nodeRequestTime.add("node-" + node + ".latency-avg", new Avg());
|
||||
nodeRequestTime.add("node-" + node + ".latency-max", new Max());
|
||||
}
|
||||
nodeRequestTime.record(latency, nowNs);
|
||||
if (nodeRequestTime != null) nodeRequestTime.record(latency, nowNs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -221,6 +221,10 @@ public class Selector implements Selectable {
|
|||
|
||||
Transmissions transmissions = transmissions(key);
|
||||
SocketChannel channel = channel(key);
|
||||
|
||||
// register all per-broker metrics at once
|
||||
sensors.maybeRegisterNodeMetrics(transmissions.id);
|
||||
|
||||
try {
|
||||
/* complete any connections that have finished their handshake */
|
||||
if (key.isConnectable()) {
|
||||
|
@ -401,33 +405,41 @@ public class Selector implements Selectable {
|
|||
|
||||
public SelectorMetrics(Metrics metrics) {
|
||||
this.metrics = metrics;
|
||||
|
||||
this.connectionClosed = this.metrics.sensor("connections-closed");
|
||||
this.connectionClosed.add("connection-close-rate", "Connections closed per second in the window.", new Rate());
|
||||
|
||||
this.connectionCreated = this.metrics.sensor("connections-created");
|
||||
this.connectionCreated.add("connection-creation-rate", "New connections established per second in the window.", new Rate());
|
||||
|
||||
this.bytesTransferred = this.metrics.sensor("bytes-sent-received");
|
||||
this.bytesSent = this.metrics.sensor("bytes-sent", bytesTransferred);
|
||||
this.bytesReceived = this.metrics.sensor("bytes-received", bytesTransferred);
|
||||
this.selectTime = this.metrics.sensor("select-time");
|
||||
this.ioTime = this.metrics.sensor("io-time");
|
||||
bytesTransferred.add("network-io-rate",
|
||||
"The average number of network operations (reads or writes) on all connections per second.",
|
||||
new Rate(new Count()));
|
||||
|
||||
this.bytesSent = this.metrics.sensor("bytes-sent", bytesTransferred);
|
||||
this.bytesSent.add("outgoing-byte-rate", "The average number of outgoing bytes sent per second to all servers.", new Rate());
|
||||
this.bytesSent.add("request-rate", "The average number of requests sent per second.", new Rate(new Count()));
|
||||
this.bytesSent.add("request-size-avg", "The average size of all requests in the window..", new Avg());
|
||||
this.bytesSent.add("request-size-max", "The maximum size of any request sent in the window.", new Max());
|
||||
|
||||
this.bytesReceived = this.metrics.sensor("bytes-received", bytesTransferred);
|
||||
this.bytesReceived.add("incoming-byte-rate", "Bytes/second read off all sockets", new Rate());
|
||||
this.bytesReceived.add("response-rate", "Responses received sent per second.", new Rate(new Count()));
|
||||
this.connectionCreated.add("connection-creation-rate", "New connections established per second in the window.", new Rate());
|
||||
this.connectionClosed.add("connection-close-rate", "Connections closed per second in the window.", new Rate());
|
||||
|
||||
this.selectTime = this.metrics.sensor("select-time");
|
||||
this.selectTime.add("select-rate",
|
||||
"Number of times the I/O layer checked for new I/O to perform per second",
|
||||
new Rate(new Count()));
|
||||
this.selectTime.add("io-wait-time-ns-avg",
|
||||
"The average length of time the I/O thread speant waiting for a socket ready for reads or writes in nanoseconds.",
|
||||
"The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.",
|
||||
new Avg());
|
||||
this.selectTime.add("io-wait-ratio", "The fraction of time the I/O thread spent waiting.", new Rate(TimeUnit.NANOSECONDS));
|
||||
|
||||
this.ioTime = this.metrics.sensor("io-time");
|
||||
this.ioTime.add("io-time-ns-avg", "The average length of time for I/O per select call in nanoseconds.", new Avg());
|
||||
this.ioTime.add("io-ratio", "The fraction of time the I/O thread spent doing I/O", new Rate(TimeUnit.NANOSECONDS));
|
||||
|
||||
this.metrics.addMetric("connection-count", "The current number of active connections.", new Measurable() {
|
||||
public double measure(MetricConfig config, long now) {
|
||||
return keys.size();
|
||||
|
@ -435,35 +447,49 @@ public class Selector implements Selectable {
|
|||
});
|
||||
}
|
||||
|
||||
public void maybeRegisterNodeMetrics(int node) {
|
||||
if (node >= 0) {
|
||||
// if one sensor of the metrics has been registered for the node,
|
||||
// then all other sensors should have been registered; and vice versa
|
||||
String nodeRequestName = "node-" + node + ".bytes-sent";
|
||||
Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
|
||||
if (nodeRequest == null) {
|
||||
nodeRequest = this.metrics.sensor(nodeRequestName);
|
||||
nodeRequest.add("node-" + node + ".outgoing-byte-rate", new Rate());
|
||||
nodeRequest.add("node-" + node + ".request-rate", "The average number of requests sent per second.", new Rate(new Count()));
|
||||
nodeRequest.add("node-" + node + ".request-size-avg", "The average size of all requests in the window..", new Avg());
|
||||
nodeRequest.add("node-" + node + ".request-size-max", "The maximum size of any request sent in the window.", new Max());
|
||||
|
||||
String nodeResponseName = "node-" + node + ".bytes-received";
|
||||
Sensor nodeResponse = this.metrics.sensor(nodeResponseName);
|
||||
nodeResponse.add("node-" + node + ".incoming-byte-rate", new Rate());
|
||||
nodeResponse.add("node-" + node + ".response-rate",
|
||||
"The average number of responses received per second.",
|
||||
new Rate(new Count()));
|
||||
|
||||
String nodeTimeName = "node-" + node + ".latency";
|
||||
Sensor nodeRequestTime = this.metrics.sensor(nodeTimeName);
|
||||
nodeRequestTime.add("node-" + node + ".request-latency-avg", new Avg());
|
||||
nodeRequestTime.add("node-" + node + ".request-latency-max", new Max());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void recordBytesSent(int node, int bytes) {
|
||||
this.bytesSent.record(bytes);
|
||||
if (node >= 0) {
|
||||
String name = "node-" + node + ".bytes-sent";
|
||||
Sensor sensor = this.metrics.getSensor(name);
|
||||
if (sensor == null) {
|
||||
sensor = this.metrics.sensor(name);
|
||||
sensor.add("node-" + node + ".outgoing-byte-rate", new Rate());
|
||||
sensor.add("node-" + node + ".request-rate", "The average number of requests sent per second.", new Rate(new Count()));
|
||||
sensor.add("node-" + node + ".request-size-avg", "The average size of all requests in the window..", new Avg());
|
||||
sensor.add("node-" + node + ".request-size-max", "The maximum size of any request sent in the window.", new Max());
|
||||
}
|
||||
sensor.record(bytes);
|
||||
String nodeRequestName = "node-" + node + ".bytes-sent";
|
||||
Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
|
||||
if (nodeRequest != null) nodeRequest.record(bytes);
|
||||
}
|
||||
}
|
||||
|
||||
public void recordBytesReceived(int node, int bytes) {
|
||||
this.bytesReceived.record(bytes);
|
||||
if (node >= 0) {
|
||||
String name = "node-" + node + ".bytes-received";
|
||||
Sensor sensor = this.metrics.getSensor(name);
|
||||
if (sensor == null) {
|
||||
sensor = this.metrics.sensor(name);
|
||||
sensor.add("node-" + node + ".incoming-byte-rate", new Rate());
|
||||
sensor.add("node-" + node + ".response-rate",
|
||||
"The average number of responses received per second.",
|
||||
new Rate(new Count()));
|
||||
}
|
||||
sensor.record(bytes);
|
||||
String nodeRequestName = "node-" + node + ".bytes-received";
|
||||
Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
|
||||
if (nodeRequest != null) nodeRequest.record(bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue