mirror of https://github.com/apache/kafka.git
KAFKA-3769: Create new sensors per-thread in KafkaStreams
Author: Guozhang Wang <wangguoz@gmail.com> Reviewers: Damian Guy <damian.guy@gmail.com>, Matthias J. Sax <matthias@confluent.io>, Michael G. Noll <michael@confluent.io>, Greg Fodor <gfodor@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io> Closes #1530 from guozhangwang/K3769-per-thread-metrics
This commit is contained in:
parent
e49b3aee6b
commit
ae237be1bb
|
@ -807,8 +807,7 @@ public class Fetcher<K, V> {
|
||||||
String name = "topic." + topic + ".bytes-fetched";
|
String name = "topic." + topic + ".bytes-fetched";
|
||||||
Sensor bytesFetched = this.metrics.getSensor(name);
|
Sensor bytesFetched = this.metrics.getSensor(name);
|
||||||
if (bytesFetched == null) {
|
if (bytesFetched == null) {
|
||||||
Map<String, String> metricTags = new HashMap<>(1);
|
Map<String, String> metricTags = Collections.singletonMap("topic", topic.replace('.', '_'));
|
||||||
metricTags.put("topic", topic.replace('.', '_'));
|
|
||||||
|
|
||||||
bytesFetched = this.metrics.sensor(name);
|
bytesFetched = this.metrics.sensor(name);
|
||||||
bytesFetched.add(this.metrics.metricName("fetch-size-avg",
|
bytesFetched.add(this.metrics.metricName("fetch-size-avg",
|
||||||
|
|
|
@ -14,9 +14,9 @@ package org.apache.kafka.clients.producer.internals;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedHashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -457,14 +457,13 @@ public class Sender implements Runnable {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void maybeRegisterTopicMetrics(String topic) {
|
private void maybeRegisterTopicMetrics(String topic) {
|
||||||
// if one sensor of the metrics has been registered for the topic,
|
// if one sensor of the metrics has been registered for the topic,
|
||||||
// then all other sensors should have been registered; and vice versa
|
// then all other sensors should have been registered; and vice versa
|
||||||
String topicRecordsCountName = "topic." + topic + ".records-per-batch";
|
String topicRecordsCountName = "topic." + topic + ".records-per-batch";
|
||||||
Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName);
|
Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName);
|
||||||
if (topicRecordCount == null) {
|
if (topicRecordCount == null) {
|
||||||
Map<String, String> metricTags = new LinkedHashMap<String, String>();
|
Map<String, String> metricTags = Collections.singletonMap("topic", topic);
|
||||||
metricTags.put("topic", topic);
|
|
||||||
String metricGrpName = "producer-topic-metrics";
|
String metricGrpName = "producer-topic-metrics";
|
||||||
|
|
||||||
topicRecordCount = this.metrics.sensor(topicRecordsCountName);
|
topicRecordCount = this.metrics.sensor(topicRecordsCountName);
|
||||||
|
|
|
@ -20,6 +20,9 @@
|
||||||
# The id of the broker. This must be set to a unique integer for each broker.
|
# The id of the broker. This must be set to a unique integer for each broker.
|
||||||
broker.id=0
|
broker.id=0
|
||||||
|
|
||||||
|
# Switch to enable topic deletion or not, default value is false
|
||||||
|
#delete.topic.enable=true
|
||||||
|
|
||||||
############################# Socket Server Settings #############################
|
############################# Socket Server Settings #############################
|
||||||
|
|
||||||
# The address the socket server listens on. It will get the value returned from
|
# The address the socket server listens on. It will get the value returned from
|
||||||
|
|
|
@ -24,7 +24,19 @@ import org.apache.kafka.common.metrics.Sensor;
|
||||||
*/
|
*/
|
||||||
public interface StreamsMetrics {
|
public interface StreamsMetrics {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add the latency sensor.
|
||||||
|
*
|
||||||
|
* @param scopeName Name of the scope, could be the type of the state store, etc.
|
||||||
|
* @param entityName Name of the entity, could be the name of the state store instance, etc.
|
||||||
|
* @param operationName Name of the operation, could be get / put / delete / etc.
|
||||||
|
* @param tags Additional tags of the sensor.
|
||||||
|
* @return The added sensor.
|
||||||
|
*/
|
||||||
Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags);
|
Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record the given latency value of the sensor.
|
||||||
|
*/
|
||||||
void recordLatency(Sensor sensor, long startNs, long endNs);
|
void recordLatency(Sensor sensor, long startNs, long endNs);
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,7 +52,6 @@ import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.LinkedHashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -83,6 +82,7 @@ public class StreamThread extends Thread {
|
||||||
protected final Consumer<byte[], byte[]> consumer;
|
protected final Consumer<byte[], byte[]> consumer;
|
||||||
protected final Consumer<byte[], byte[]> restoreConsumer;
|
protected final Consumer<byte[], byte[]> restoreConsumer;
|
||||||
|
|
||||||
|
private final String threadClientId;
|
||||||
private final AtomicBoolean running;
|
private final AtomicBoolean running;
|
||||||
private final Map<TaskId, StreamTask> activeTasks;
|
private final Map<TaskId, StreamTask> activeTasks;
|
||||||
private final Map<TaskId, StandbyTask> standbyTasks;
|
private final Map<TaskId, StandbyTask> standbyTasks;
|
||||||
|
@ -98,8 +98,9 @@ public class StreamThread extends Thread {
|
||||||
|
|
||||||
private StreamPartitionAssignor partitionAssignor = null;
|
private StreamPartitionAssignor partitionAssignor = null;
|
||||||
|
|
||||||
private long lastClean;
|
private long timerStartedMs;
|
||||||
private long lastCommit;
|
private long lastCleanMs;
|
||||||
|
private long lastCommitMs;
|
||||||
private Throwable rebalanceException = null;
|
private Throwable rebalanceException = null;
|
||||||
|
|
||||||
private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
|
private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
|
||||||
|
@ -111,7 +112,7 @@ public class StreamThread extends Thread {
|
||||||
try {
|
try {
|
||||||
addStreamTasks(assignment);
|
addStreamTasks(assignment);
|
||||||
addStandbyTasks();
|
addStandbyTasks();
|
||||||
lastClean = time.milliseconds(); // start the cleaning cycle
|
lastCleanMs = time.milliseconds(); // start the cleaning cycle
|
||||||
streamsMetadataState.onChange(partitionAssignor.getPartitionsByHostState(), partitionAssignor.clusterMetadata());
|
streamsMetadataState.onChange(partitionAssignor.getPartitionsByHostState(), partitionAssignor.clusterMetadata());
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
rebalanceException = t;
|
rebalanceException = t;
|
||||||
|
@ -123,7 +124,7 @@ public class StreamThread extends Thread {
|
||||||
public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
|
public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
|
||||||
try {
|
try {
|
||||||
commitAll();
|
commitAll();
|
||||||
lastClean = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned
|
lastCleanMs = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
rebalanceException = t;
|
rebalanceException = t;
|
||||||
throw t;
|
throw t;
|
||||||
|
@ -160,7 +161,7 @@ public class StreamThread extends Thread {
|
||||||
|
|
||||||
// set the producer and consumer clients
|
// set the producer and consumer clients
|
||||||
String threadName = getName();
|
String threadName = getName();
|
||||||
String threadClientId = clientId + "-" + threadName;
|
threadClientId = clientId + "-" + threadName;
|
||||||
log.info("Creating producer client for stream thread [{}]", threadName);
|
log.info("Creating producer client for stream thread [{}]", threadName);
|
||||||
this.producer = clientSupplier.getProducer(config.getProducerConfigs(threadClientId));
|
this.producer = clientSupplier.getProducer(config.getProducerConfigs(threadClientId));
|
||||||
log.info("Creating consumer client for stream thread [{}]", threadName);
|
log.info("Creating consumer client for stream thread [{}]", threadName);
|
||||||
|
@ -187,9 +188,10 @@ public class StreamThread extends Thread {
|
||||||
this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
|
this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
|
||||||
this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
|
this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
|
||||||
|
|
||||||
this.lastClean = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment
|
|
||||||
this.lastCommit = time.milliseconds();
|
|
||||||
this.time = time;
|
this.time = time;
|
||||||
|
this.timerStartedMs = time.milliseconds();
|
||||||
|
this.lastCleanMs = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment
|
||||||
|
this.lastCommitMs = timerStartedMs;
|
||||||
|
|
||||||
this.sensors = new StreamsMetricsImpl(metrics);
|
this.sensors = new StreamsMetricsImpl(metrics);
|
||||||
|
|
||||||
|
@ -274,10 +276,26 @@ public class StreamThread extends Thread {
|
||||||
log.info("Stream thread shutdown complete [" + this.getName() + "]");
|
log.info("Stream thread shutdown complete [" + this.getName() + "]");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compute the latency based on the current marked timestamp,
|
||||||
|
* and update the marked timestamp with the current system timestamp.
|
||||||
|
*
|
||||||
|
* @return latency
|
||||||
|
*/
|
||||||
|
private long computeLatency() {
|
||||||
|
long previousTimeMs = this.timerStartedMs;
|
||||||
|
this.timerStartedMs = time.milliseconds();
|
||||||
|
|
||||||
|
return Math.max(this.timerStartedMs - previousTimeMs, 0);
|
||||||
|
}
|
||||||
|
|
||||||
private void runLoop() {
|
private void runLoop() {
|
||||||
int totalNumBuffered = 0;
|
int totalNumBuffered = 0;
|
||||||
long lastPoll = 0L;
|
|
||||||
boolean requiresPoll = true;
|
boolean requiresPoll = true;
|
||||||
|
boolean polledRecords = false;
|
||||||
|
|
||||||
|
// TODO: this can be removed after KIP-62
|
||||||
|
long lastPoll = 0L;
|
||||||
|
|
||||||
if (topicPattern != null) {
|
if (topicPattern != null) {
|
||||||
consumer.subscribe(topicPattern, rebalanceListener);
|
consumer.subscribe(topicPattern, rebalanceListener);
|
||||||
|
@ -287,13 +305,15 @@ public class StreamThread extends Thread {
|
||||||
|
|
||||||
|
|
||||||
while (stillRunning()) {
|
while (stillRunning()) {
|
||||||
|
this.timerStartedMs = time.milliseconds();
|
||||||
|
|
||||||
// try to fetch some records if necessary
|
// try to fetch some records if necessary
|
||||||
if (requiresPoll) {
|
if (requiresPoll) {
|
||||||
requiresPoll = false;
|
requiresPoll = false;
|
||||||
|
|
||||||
long startPoll = time.milliseconds();
|
boolean longPoll = totalNumBuffered == 0;
|
||||||
|
|
||||||
ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0);
|
ConsumerRecords<byte[], byte[]> records = consumer.poll(longPoll ? this.pollTimeMs : 0);
|
||||||
lastPoll = time.milliseconds();
|
lastPoll = time.milliseconds();
|
||||||
|
|
||||||
if (rebalanceException != null)
|
if (rebalanceException != null)
|
||||||
|
@ -304,41 +324,51 @@ public class StreamThread extends Thread {
|
||||||
StreamTask task = activeTasksByPartition.get(partition);
|
StreamTask task = activeTasksByPartition.get(partition);
|
||||||
task.addRecords(partition, records.records(partition));
|
task.addRecords(partition, records.records(partition));
|
||||||
}
|
}
|
||||||
|
polledRecords = true;
|
||||||
|
} else {
|
||||||
|
polledRecords = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
long endPoll = time.milliseconds();
|
// only record poll latency is long poll is required
|
||||||
sensors.pollTimeSensor.record(endPoll - startPoll);
|
if (longPoll) {
|
||||||
|
sensors.pollTimeSensor.record(computeLatency());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
totalNumBuffered = 0;
|
|
||||||
|
|
||||||
// try to process one fetch record from each task via the topology, and also trigger punctuate
|
// try to process one fetch record from each task via the topology, and also trigger punctuate
|
||||||
// functions if necessary, which may result in more records going through the topology in this loop
|
// functions if necessary, which may result in more records going through the topology in this loop
|
||||||
if (!activeTasks.isEmpty()) {
|
if (totalNumBuffered > 0 || polledRecords) {
|
||||||
for (StreamTask task : activeTasks.values()) {
|
totalNumBuffered = 0;
|
||||||
long startProcess = time.milliseconds();
|
|
||||||
|
|
||||||
totalNumBuffered += task.process();
|
if (!activeTasks.isEmpty()) {
|
||||||
requiresPoll = requiresPoll || task.requiresPoll();
|
for (StreamTask task : activeTasks.values()) {
|
||||||
|
|
||||||
sensors.processTimeSensor.record(time.milliseconds() - startProcess);
|
totalNumBuffered += task.process();
|
||||||
|
|
||||||
maybePunctuate(task);
|
requiresPoll = requiresPoll || task.requiresPoll();
|
||||||
|
|
||||||
if (task.commitNeeded())
|
sensors.processTimeSensor.record(computeLatency());
|
||||||
commitOne(task, time.milliseconds());
|
|
||||||
}
|
|
||||||
|
|
||||||
// if pollTimeMs has passed since the last poll, we poll to respond to a possible rebalance
|
maybePunctuate(task);
|
||||||
// even when we paused all partitions.
|
|
||||||
if (lastPoll + this.pollTimeMs < time.milliseconds())
|
if (task.commitNeeded())
|
||||||
|
commitOne(task);
|
||||||
|
}
|
||||||
|
|
||||||
|
// if pollTimeMs has passed since the last poll, we poll to respond to a possible rebalance
|
||||||
|
// even when we paused all partitions.
|
||||||
|
if (lastPoll + this.pollTimeMs < this.timerStartedMs)
|
||||||
|
requiresPoll = true;
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// even when no task is assigned, we must poll to get a task.
|
||||||
requiresPoll = true;
|
requiresPoll = true;
|
||||||
|
}
|
||||||
|
maybeCommit();
|
||||||
} else {
|
} else {
|
||||||
// even when no task is assigned, we must poll to get a task.
|
|
||||||
requiresPoll = true;
|
requiresPoll = true;
|
||||||
}
|
}
|
||||||
maybeCommit();
|
|
||||||
maybeUpdateStandbyTasks();
|
maybeUpdateStandbyTasks();
|
||||||
|
|
||||||
maybeClean();
|
maybeClean();
|
||||||
|
@ -401,12 +431,10 @@ public class StreamThread extends Thread {
|
||||||
|
|
||||||
private void maybePunctuate(StreamTask task) {
|
private void maybePunctuate(StreamTask task) {
|
||||||
try {
|
try {
|
||||||
long now = time.milliseconds();
|
|
||||||
|
|
||||||
// check whether we should punctuate based on the task's partition group timestamp;
|
// check whether we should punctuate based on the task's partition group timestamp;
|
||||||
// which are essentially based on record timestamp.
|
// which are essentially based on record timestamp.
|
||||||
if (task.maybePunctuate())
|
if (task.maybePunctuate())
|
||||||
sensors.punctuateTimeSensor.record(time.milliseconds() - now);
|
sensors.punctuateTimeSensor.record(computeLatency());
|
||||||
|
|
||||||
} catch (KafkaException e) {
|
} catch (KafkaException e) {
|
||||||
log.error("Failed to punctuate active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
|
log.error("Failed to punctuate active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
|
||||||
|
@ -414,35 +442,50 @@ public class StreamThread extends Thread {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Commit all tasks owned by this thread if specified interval time has elapsed
|
||||||
|
*/
|
||||||
protected void maybeCommit() {
|
protected void maybeCommit() {
|
||||||
long now = time.milliseconds();
|
long now = time.milliseconds();
|
||||||
|
|
||||||
if (commitTimeMs >= 0 && lastCommit + commitTimeMs < now) {
|
if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) {
|
||||||
log.trace("Committing processor instances because the commit interval has elapsed.");
|
log.trace("Committing processor instances because the commit interval has elapsed.");
|
||||||
|
|
||||||
commitAll();
|
commitAll();
|
||||||
lastCommit = now;
|
lastCommitMs = now;
|
||||||
|
|
||||||
processStandbyRecords = true;
|
processStandbyRecords = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup any states of the tasks that have been removed from this thread
|
||||||
|
*/
|
||||||
|
protected void maybeClean() {
|
||||||
|
long now = time.milliseconds();
|
||||||
|
|
||||||
|
if (now > lastCleanMs + cleanTimeMs) {
|
||||||
|
stateDirectory.cleanRemovedTasks();
|
||||||
|
lastCleanMs = now;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Commit the states of all its tasks
|
* Commit the states of all its tasks
|
||||||
*/
|
*/
|
||||||
private void commitAll() {
|
private void commitAll() {
|
||||||
for (StreamTask task : activeTasks.values()) {
|
for (StreamTask task : activeTasks.values()) {
|
||||||
commitOne(task, time.milliseconds());
|
commitOne(task);
|
||||||
}
|
}
|
||||||
for (StandbyTask task : standbyTasks.values()) {
|
for (StandbyTask task : standbyTasks.values()) {
|
||||||
commitOne(task, time.milliseconds());
|
commitOne(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Commit the state of a task
|
* Commit the state of a task
|
||||||
*/
|
*/
|
||||||
private void commitOne(AbstractTask task, long now) {
|
private void commitOne(AbstractTask task) {
|
||||||
try {
|
try {
|
||||||
task.commit();
|
task.commit();
|
||||||
} catch (CommitFailedException e) {
|
} catch (CommitFailedException e) {
|
||||||
|
@ -454,19 +497,7 @@ public class StreamThread extends Thread {
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
|
||||||
sensors.commitTimeSensor.record(time.milliseconds() - now);
|
sensors.commitTimeSensor.record(computeLatency());
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Cleanup any states of the tasks that have been removed from this thread
|
|
||||||
*/
|
|
||||||
protected void maybeClean() {
|
|
||||||
long now = time.milliseconds();
|
|
||||||
|
|
||||||
if (now > lastClean + cleanTimeMs) {
|
|
||||||
stateDirectory.cleanRemovedTasks();
|
|
||||||
lastClean = now;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -682,6 +713,7 @@ public class StreamThread extends Thread {
|
||||||
private class StreamsMetricsImpl implements StreamsMetrics {
|
private class StreamsMetricsImpl implements StreamsMetrics {
|
||||||
final Metrics metrics;
|
final Metrics metrics;
|
||||||
final String metricGrpName;
|
final String metricGrpName;
|
||||||
|
final String sensorNamePrefix;
|
||||||
final Map<String, String> metricTags;
|
final Map<String, String> metricTags;
|
||||||
|
|
||||||
final Sensor commitTimeSensor;
|
final Sensor commitTimeSensor;
|
||||||
|
@ -692,42 +724,41 @@ public class StreamThread extends Thread {
|
||||||
final Sensor taskDestructionSensor;
|
final Sensor taskDestructionSensor;
|
||||||
|
|
||||||
public StreamsMetricsImpl(Metrics metrics) {
|
public StreamsMetricsImpl(Metrics metrics) {
|
||||||
|
|
||||||
this.metrics = metrics;
|
this.metrics = metrics;
|
||||||
this.metricGrpName = "stream-metrics";
|
this.metricGrpName = "stream-metrics";
|
||||||
this.metricTags = new LinkedHashMap<>();
|
this.sensorNamePrefix = "thread." + threadClientId;
|
||||||
this.metricTags.put("client-id", clientId + "-" + getName());
|
this.metricTags = Collections.singletonMap("client-id", threadClientId);
|
||||||
|
|
||||||
this.commitTimeSensor = metrics.sensor("commit-time");
|
this.commitTimeSensor = metrics.sensor(sensorNamePrefix + ".commit-time");
|
||||||
this.commitTimeSensor.add(metrics.metricName("commit-time-avg", metricGrpName, "The average commit time in ms", metricTags), new Avg());
|
this.commitTimeSensor.add(metrics.metricName("commit-time-avg", metricGrpName, "The average commit time in ms", metricTags), new Avg());
|
||||||
this.commitTimeSensor.add(metrics.metricName("commit-time-max", metricGrpName, "The maximum commit time in ms", metricTags), new Max());
|
this.commitTimeSensor.add(metrics.metricName("commit-time-max", metricGrpName, "The maximum commit time in ms", metricTags), new Max());
|
||||||
this.commitTimeSensor.add(metrics.metricName("commit-calls-rate", metricGrpName, "The average per-second number of commit calls", metricTags), new Rate(new Count()));
|
this.commitTimeSensor.add(metrics.metricName("commit-calls-rate", metricGrpName, "The average per-second number of commit calls", metricTags), new Rate(new Count()));
|
||||||
|
|
||||||
this.pollTimeSensor = metrics.sensor("poll-time");
|
this.pollTimeSensor = metrics.sensor(sensorNamePrefix + ".poll-time");
|
||||||
this.pollTimeSensor.add(metrics.metricName("poll-time-avg", metricGrpName, "The average poll time in ms", metricTags), new Avg());
|
this.pollTimeSensor.add(metrics.metricName("poll-time-avg", metricGrpName, "The average poll time in ms", metricTags), new Avg());
|
||||||
this.pollTimeSensor.add(metrics.metricName("poll-time-max", metricGrpName, "The maximum poll time in ms", metricTags), new Max());
|
this.pollTimeSensor.add(metrics.metricName("poll-time-max", metricGrpName, "The maximum poll time in ms", metricTags), new Max());
|
||||||
this.pollTimeSensor.add(metrics.metricName("poll-calls-rate", metricGrpName, "The average per-second number of record-poll calls", metricTags), new Rate(new Count()));
|
this.pollTimeSensor.add(metrics.metricName("poll-calls-rate", metricGrpName, "The average per-second number of record-poll calls", metricTags), new Rate(new Count()));
|
||||||
|
|
||||||
this.processTimeSensor = metrics.sensor("process-time");
|
this.processTimeSensor = metrics.sensor(sensorNamePrefix + ".process-time");
|
||||||
this.processTimeSensor.add(metrics.metricName("process-time-avg-ms", metricGrpName, "The average process time in ms", metricTags), new Avg());
|
this.processTimeSensor.add(metrics.metricName("process-time-avg-ms", metricGrpName, "The average process time in ms", metricTags), new Avg());
|
||||||
this.processTimeSensor.add(metrics.metricName("process-time-max-ms", metricGrpName, "The maximum process time in ms", metricTags), new Max());
|
this.processTimeSensor.add(metrics.metricName("process-time-max-ms", metricGrpName, "The maximum process time in ms", metricTags), new Max());
|
||||||
this.processTimeSensor.add(metrics.metricName("process-calls-rate", metricGrpName, "The average per-second number of process calls", metricTags), new Rate(new Count()));
|
this.processTimeSensor.add(metrics.metricName("process-calls-rate", metricGrpName, "The average per-second number of process calls", metricTags), new Rate(new Count()));
|
||||||
|
|
||||||
this.punctuateTimeSensor = metrics.sensor("punctuate-time");
|
this.punctuateTimeSensor = metrics.sensor(sensorNamePrefix + ".punctuate-time");
|
||||||
this.punctuateTimeSensor.add(metrics.metricName("punctuate-time-avg", metricGrpName, "The average punctuate time in ms", metricTags), new Avg());
|
this.punctuateTimeSensor.add(metrics.metricName("punctuate-time-avg", metricGrpName, "The average punctuate time in ms", metricTags), new Avg());
|
||||||
this.punctuateTimeSensor.add(metrics.metricName("punctuate-time-max", metricGrpName, "The maximum punctuate time in ms", metricTags), new Max());
|
this.punctuateTimeSensor.add(metrics.metricName("punctuate-time-max", metricGrpName, "The maximum punctuate time in ms", metricTags), new Max());
|
||||||
this.punctuateTimeSensor.add(metrics.metricName("punctuate-calls-rate", metricGrpName, "The average per-second number of punctuate calls", metricTags), new Rate(new Count()));
|
this.punctuateTimeSensor.add(metrics.metricName("punctuate-calls-rate", metricGrpName, "The average per-second number of punctuate calls", metricTags), new Rate(new Count()));
|
||||||
|
|
||||||
this.taskCreationSensor = metrics.sensor("task-creation");
|
this.taskCreationSensor = metrics.sensor(sensorNamePrefix + ".task-creation");
|
||||||
this.taskCreationSensor.add(metrics.metricName("task-creation-rate", metricGrpName, "The average per-second number of newly created tasks", metricTags), new Rate(new Count()));
|
this.taskCreationSensor.add(metrics.metricName("task-creation-rate", metricGrpName, "The average per-second number of newly created tasks", metricTags), new Rate(new Count()));
|
||||||
|
|
||||||
this.taskDestructionSensor = metrics.sensor("task-destruction");
|
this.taskDestructionSensor = metrics.sensor(sensorNamePrefix + ".task-destruction");
|
||||||
this.taskDestructionSensor.add(metrics.metricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count()));
|
this.taskDestructionSensor.add(metrics.metricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void recordLatency(Sensor sensor, long startNs, long endNs) {
|
public void recordLatency(Sensor sensor, long startNs, long endNs) {
|
||||||
sensor.record((endNs - startNs) / 1000000, endNs);
|
sensor.record(endNs - startNs, timerStartedMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -746,11 +777,11 @@ public class StreamThread extends Thread {
|
||||||
String metricGroupName = "stream-" + scopeName + "-metrics";
|
String metricGroupName = "stream-" + scopeName + "-metrics";
|
||||||
|
|
||||||
// first add the global operation metrics if not yet, with the global tags only
|
// first add the global operation metrics if not yet, with the global tags only
|
||||||
Sensor parent = metrics.sensor(scopeName + "-" + operationName);
|
Sensor parent = metrics.sensor(sensorNamePrefix + "." + scopeName + "-" + operationName);
|
||||||
addLatencyMetrics(metricGroupName, parent, "all", operationName, this.metricTags);
|
addLatencyMetrics(metricGroupName, parent, "all", operationName, this.metricTags);
|
||||||
|
|
||||||
// add the store operation metrics with additional tags
|
// add the store operation metrics with additional tags
|
||||||
Sensor sensor = metrics.sensor(scopeName + "-" + entityName + "-" + operationName, parent);
|
Sensor sensor = metrics.sensor(sensorNamePrefix + "." + scopeName + "-" + entityName + "-" + operationName, parent);
|
||||||
addLatencyMetrics(metricGroupName, sensor, entityName, operationName, tagMap);
|
addLatencyMetrics(metricGroupName, sensor, entityName, operationName, tagMap);
|
||||||
|
|
||||||
return sensor;
|
return sensor;
|
||||||
|
|
|
@ -83,11 +83,6 @@ public class SimpleBenchmark {
|
||||||
final File rocksdbDir = new File(stateDir, "rocksdb-test");
|
final File rocksdbDir = new File(stateDir, "rocksdb-test");
|
||||||
rocksdbDir.mkdir();
|
rocksdbDir.mkdir();
|
||||||
|
|
||||||
System.out.println("SimpleBenchmark instance started");
|
|
||||||
System.out.println("kafka=" + kafka);
|
|
||||||
System.out.println("zookeeper=" + zookeeper);
|
|
||||||
System.out.println("stateDir=" + stateDir);
|
|
||||||
|
|
||||||
SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, zookeeper);
|
SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, zookeeper);
|
||||||
|
|
||||||
// producer performance
|
// producer performance
|
||||||
|
@ -232,6 +227,7 @@ public class SimpleBenchmark {
|
||||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
|
||||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
|
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
|
||||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||||
|
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
|
||||||
|
|
||||||
KafkaConsumer<Long, byte[]> consumer = new KafkaConsumer<>(props);
|
KafkaConsumer<Long, byte[]> consumer = new KafkaConsumer<>(props);
|
||||||
|
|
||||||
|
@ -250,7 +246,10 @@ public class SimpleBenchmark {
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
for (ConsumerRecord<Long, byte[]> record : records) {
|
for (ConsumerRecord<Long, byte[]> record : records) {
|
||||||
key = record.key();
|
Long recKey = record.key();
|
||||||
|
|
||||||
|
if (key == null || key < recKey)
|
||||||
|
key = recKey;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -306,14 +306,14 @@ public class StreamThreadTest {
|
||||||
|
|
||||||
List<TopicPartition> revokedPartitions;
|
List<TopicPartition> revokedPartitions;
|
||||||
List<TopicPartition> assignedPartitions;
|
List<TopicPartition> assignedPartitions;
|
||||||
Map<Integer, StreamTask> prevTasks;
|
Map<TaskId, StreamTask> prevTasks;
|
||||||
|
|
||||||
//
|
//
|
||||||
// Assign t1p1 and t1p2. This should create task1 & task2
|
// Assign t1p1 and t1p2. This should create task1 & task2
|
||||||
//
|
//
|
||||||
revokedPartitions = Collections.emptyList();
|
revokedPartitions = Collections.emptyList();
|
||||||
assignedPartitions = Arrays.asList(t1p1, t1p2);
|
assignedPartitions = Arrays.asList(t1p1, t1p2);
|
||||||
prevTasks = new HashMap(thread.tasks());
|
prevTasks = new HashMap<>(thread.tasks());
|
||||||
|
|
||||||
rebalanceListener.onPartitionsRevoked(revokedPartitions);
|
rebalanceListener.onPartitionsRevoked(revokedPartitions);
|
||||||
rebalanceListener.onPartitionsAssigned(assignedPartitions);
|
rebalanceListener.onPartitionsAssigned(assignedPartitions);
|
||||||
|
@ -345,7 +345,7 @@ public class StreamThreadTest {
|
||||||
//
|
//
|
||||||
revokedPartitions = assignedPartitions;
|
revokedPartitions = assignedPartitions;
|
||||||
assignedPartitions = Collections.emptyList();
|
assignedPartitions = Collections.emptyList();
|
||||||
prevTasks = new HashMap(thread.tasks());
|
prevTasks = new HashMap<>(thread.tasks());
|
||||||
|
|
||||||
rebalanceListener.onPartitionsRevoked(revokedPartitions);
|
rebalanceListener.onPartitionsRevoked(revokedPartitions);
|
||||||
rebalanceListener.onPartitionsAssigned(assignedPartitions);
|
rebalanceListener.onPartitionsAssigned(assignedPartitions);
|
||||||
|
|
|
@ -218,8 +218,7 @@ public class StreamThreadStateStoreProviderTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void recordLatency(final Sensor sensor, final long startNs,
|
public void recordLatency(final Sensor sensor, final long startNs, final long endNs) {
|
||||||
final long endNs) {
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue