KAFKA-9924: Add RocksDB metric num-entries-active-mem-table (#9177)

* Add the first RocksDB metric that exposes a RocksDB property: num-entries-active-mem-table.
* Add code StreamsMetricsImpl in support of exposing RocksDB properties
* unit tests and intergration tests

This commit only contains one metric to keep the PR at a reasonable size.
All other RocksDB metrics described in KIP-607 will be added in other PRs.

Implements: KIP-607
Reviewers: Guozhang Wang <guozhang@apache.org>, John Roesler <vvcephei@apache.org>
This commit is contained in:
Bruno Cadonna 2020-08-28 01:04:28 +02:00 committed by GitHub
parent b6ba67482f
commit 9da32b6bd0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 639 additions and 268 deletions

View File

@ -45,6 +45,8 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@ -95,7 +97,8 @@ public class StreamsMetricsImpl implements StreamsMetrics {
private final Map<String, Deque<String>> taskLevelSensors = new HashMap<>();
private final Map<String, Deque<String>> nodeLevelSensors = new HashMap<>();
private final Map<String, Deque<String>> cacheLevelSensors = new HashMap<>();
private final Map<String, Deque<String>> storeLevelSensors = new HashMap<>();
private final ConcurrentMap<String, Deque<String>> storeLevelSensors = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Deque<MetricName>> storeLevelMetrics = new ConcurrentHashMap<>();
private final RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger;
@ -274,11 +277,10 @@ public class StreamsMetricsImpl implements StreamsMetrics {
return tagMap;
}
public Map<String, String> storeLevelTagMap(final String threadId,
final String taskName,
public Map<String, String> storeLevelTagMap(final String taskName,
final String storeType,
final String storeName) {
final Map<String, String> tagMap = taskLevelTagMap(threadId, taskName);
final Map<String, String> tagMap = taskLevelTagMap(Thread.currentThread().getName(), taskName);
tagMap.put(storeType + "-" + STORE_ID_TAG, storeName);
return tagMap;
}
@ -402,34 +404,73 @@ public class StreamsMetricsImpl implements StreamsMetrics {
+ SENSOR_PREFIX_DELIMITER + SENSOR_CACHE_LABEL + SENSOR_PREFIX_DELIMITER + cacheName;
}
public final Sensor storeLevelSensor(final String threadId,
final String taskId,
public final Sensor storeLevelSensor(final String taskId,
final String storeName,
final String sensorName,
final Sensor.RecordingLevel recordingLevel,
final RecordingLevel recordingLevel,
final Sensor... parents) {
final String key = storeSensorPrefix(threadId, taskId, storeName);
synchronized (storeLevelSensors) {
final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;
final Sensor sensor = metrics.getSensor(fullSensorName);
if (sensor == null) {
storeLevelSensors.computeIfAbsent(key, ignored -> new LinkedList<>()).push(fullSensorName);
return metrics.sensor(fullSensorName, recordingLevel, parents);
} else {
return sensor;
}
final String key = storeSensorPrefix(Thread.currentThread().getName(), taskId, storeName);
final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;
final Sensor sensor = metrics.getSensor(fullSensorName);
if (sensor == null) {
// since the keys in the map storeLevelSensors contain the name of the current thread and threads only
// access keys in which their name is contained, the value in the maps do not need to be thread safe
// and we can use a LinkedList here.
// TODO: In future, we could use thread local maps since each thread will exclusively access the set of keys
// that contain its name. Similar is true for the other metric levels. Thread-level metrics need some
// special attention, since they are created before the thread is constructed. The creation of those
// metrics could be moved into the run() method of the thread.
storeLevelSensors.computeIfAbsent(key, ignored -> new LinkedList<>()).push(fullSensorName);
return metrics.sensor(fullSensorName, recordingLevel, parents);
}
return sensor;
}
public <T> void addStoreLevelMutableMetric(final String taskId,
final String metricsScope,
final String storeName,
final String name,
final String description,
final RecordingLevel recordingLevel,
final Gauge<T> valueProvider) {
final MetricName metricName = metrics.metricName(
name,
STATE_STORE_LEVEL_GROUP,
description,
storeLevelTagMap(taskId, metricsScope, storeName)
);
if (metrics.metric(metricName) == null) {
final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel);
final String key = storeSensorPrefix(Thread.currentThread().getName(), taskId, storeName);
metrics.addMetric(metricName, metricConfig, valueProvider);
storeLevelMetrics.computeIfAbsent(key, ignored -> new LinkedList<>()).push(metricName);
}
}
public final void removeAllStoreLevelSensors(final String threadId,
final String taskId,
final String storeName) {
public final void removeAllStoreLevelSensorsAndMetrics(final String taskId,
final String storeName) {
final String threadId = Thread.currentThread().getName();
removeAllStoreLevelSensors(threadId, taskId, storeName);
removeAllStoreLevelMetrics(threadId, taskId, storeName);
}
private void removeAllStoreLevelSensors(final String threadId,
final String taskId,
final String storeName) {
final String key = storeSensorPrefix(threadId, taskId, storeName);
synchronized (storeLevelSensors) {
final Deque<String> sensors = storeLevelSensors.remove(key);
while (sensors != null && !sensors.isEmpty()) {
metrics.removeSensor(sensors.pop());
}
final Deque<String> sensors = storeLevelSensors.remove(key);
while (sensors != null && !sensors.isEmpty()) {
metrics.removeSensor(sensors.pop());
}
}
private void removeAllStoreLevelMetrics(final String threadId,
final String taskId,
final String storeName) {
final String key = storeSensorPrefix(threadId, taskId, storeName);
final Deque<MetricName> metricNames = storeLevelMetrics.remove(key);
while (metricNames != null && !metricNames.isEmpty()) {
metrics.removeMetric(metricNames.pop());
}
}

View File

@ -227,7 +227,7 @@ public class TaskMetrics {
final String storeName,
final StreamsMetricsImpl streamsMetrics) {
if (streamsMetrics.version() == Version.FROM_0100_TO_24) {
return StateStoreMetrics.expiredWindowRecordDropSensor(threadId, taskId, storeType, storeName, streamsMetrics);
return StateStoreMetrics.expiredWindowRecordDropSensor(taskId, storeType, storeName, streamsMetrics);
}
return droppedRecordsSensor(threadId, taskId, streamsMetrics);
}

View File

@ -236,7 +236,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
memBufferSize = 0;
minTimestamp = Long.MAX_VALUE;
updateBufferMetrics();
streamsMetrics.removeAllStoreLevelSensors(threadId, taskId, storeName);
streamsMetrics.removeAllStoreLevelSensorsAndMetrics(taskId, storeName);
}
@Override

View File

@ -32,7 +32,7 @@ class KeyValueSegments extends AbstractSegments<KeyValueSegment> {
final long retentionPeriod,
final long segmentInterval) {
super(name, retentionPeriod, segmentInterval);
metricsRecorder = new RocksDBMetricsRecorder(metricsScope, Thread.currentThread().getName(), name);
metricsRecorder = new RocksDBMetricsRecorder(metricsScope, name);
}
@Override

View File

@ -99,7 +99,7 @@ public class MeteredKeyValueStore<K, V>
rangeSensor = StateStoreMetrics.rangeSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
deleteSensor = StateStoreMetrics.deleteSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(threadId, taskId, metricsScope, name(), streamsMetrics);
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics);
final Sensor restoreSensor =
StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
@ -229,7 +229,7 @@ public class MeteredKeyValueStore<K, V>
try {
wrapped().close();
} finally {
streamsMetrics.removeAllStoreLevelSensors(threadId, taskId, name());
streamsMetrics.removeAllStoreLevelSensorsAndMetrics(taskId, name());
}
}

View File

@ -80,7 +80,7 @@ public class MeteredSessionStore<K, V>
fetchSensor = StateStoreMetrics.fetchSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
removeSensor = StateStoreMetrics.removeSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(threadId, taskId, metricsScope, name(), streamsMetrics);
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics);
final Sensor restoreSensor =
StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
@ -246,7 +246,7 @@ public class MeteredSessionStore<K, V>
try {
wrapped().close();
} finally {
streamsMetrics.removeAllStoreLevelSensors(threadId, taskId, name());
streamsMetrics.removeAllStoreLevelSensorsAndMetrics(taskId, name());
}
}

View File

@ -80,7 +80,7 @@ public class MeteredWindowStore<K, V>
putSensor = StateStoreMetrics.putSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
fetchSensor = StateStoreMetrics.fetchSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(threadId, taskId, metricsScope, name(), streamsMetrics);
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics);
final Sensor restoreSensor =
StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
@ -213,7 +213,7 @@ public class MeteredWindowStore<K, V>
try {
wrapped().close();
} finally {
streamsMetrics.removeAllStoreLevelSensors(threadId, taskId, name());
streamsMetrics.removeAllStoreLevelSensorsAndMetrics(taskId, name());
}
}

View File

@ -110,7 +110,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
RocksDBStore(final String name,
final String metricsScope) {
this(name, DB_FILE_DIR, new RocksDBMetricsRecorder(metricsScope, Thread.currentThread().getName(), name));
this(name, DB_FILE_DIR, new RocksDBMetricsRecorder(metricsScope, name));
}
RocksDBStore(final String name,

View File

@ -32,7 +32,7 @@ class TimestampedSegments extends AbstractSegments<TimestampedSegment> {
final long retentionPeriod,
final long segmentInterval) {
super(name, retentionPeriod, segmentInterval);
metricsRecorder = new RocksDBMetricsRecorder(metricsScope, Thread.currentThread().getName(), name);
metricsRecorder = new RocksDBMetricsRecorder(metricsScope, name);
}
@Override

View File

@ -16,10 +16,12 @@
*/
package org.apache.kafka.streams.state.internals.metrics;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import java.math.BigInteger;
import java.util.Objects;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.AVG_SUFFIX;
@ -56,6 +58,7 @@ public class RocksDBMetrics {
private static final String COMPACTION_TIME_MAX = COMPACTION_TIME + MAX_SUFFIX;
private static final String NUMBER_OF_OPEN_FILES = "number-open-files";
private static final String NUMBER_OF_FILE_ERRORS = "number-file-errors";
static final String NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE = "num-entries-active-mem-table";
private static final String BYTES_WRITTEN_TO_DB_RATE_DESCRIPTION =
"Average number of bytes written per second to the RocksDB state store";
@ -94,26 +97,22 @@ public class RocksDBMetrics {
private static final String COMPACTION_TIME_MAX_DESCRIPTION = "Maximum time spent on compaction in ms";
private static final String NUMBER_OF_OPEN_FILES_DESCRIPTION = "Number of currently open files";
private static final String NUMBER_OF_FILE_ERRORS_DESCRIPTION = "Total number of file errors occurred";
private static final String NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE_DESCRIPTION =
"Current total number of entries in the active memtable";
public static class RocksDBMetricContext {
private final String threadId;
private final String taskName;
private final String metricsScope;
private final String storeName;
public RocksDBMetricContext(final String threadId,
final String taskName,
public RocksDBMetricContext(final String taskName,
final String metricsScope,
final String storeName) {
this.threadId = threadId;
this.taskName = taskName;
this.metricsScope = metricsScope;
this.storeName = storeName;
}
public String threadId() {
return threadId;
}
public String taskName() {
return taskName;
}
@ -133,15 +132,14 @@ public class RocksDBMetrics {
return false;
}
final RocksDBMetricContext that = (RocksDBMetricContext) o;
return Objects.equals(threadId, that.threadId) &&
Objects.equals(taskName, that.taskName) &&
return Objects.equals(taskName, that.taskName) &&
Objects.equals(metricsScope, that.metricsScope) &&
Objects.equals(storeName, that.storeName);
}
@Override
public int hashCode() {
return Objects.hash(threadId, taskName, metricsScope, storeName);
return Objects.hash(taskName, metricsScope, storeName);
}
}
@ -152,7 +150,6 @@ public class RocksDBMetrics {
sensor,
STATE_STORE_LEVEL_GROUP,
streamsMetrics.storeLevelTagMap(
metricContext.threadId(),
metricContext.taskName(),
metricContext.metricsScope(),
metricContext.storeName()
@ -171,7 +168,6 @@ public class RocksDBMetrics {
sensor,
STATE_STORE_LEVEL_GROUP,
streamsMetrics.storeLevelTagMap(
metricContext.threadId(),
metricContext.taskName(),
metricContext.metricsScope(),
metricContext.storeName()
@ -190,7 +186,6 @@ public class RocksDBMetrics {
sensor,
STATE_STORE_LEVEL_GROUP,
streamsMetrics.storeLevelTagMap(
metricContext.threadId(),
metricContext.taskName(),
metricContext.metricsScope(),
metricContext.storeName()
@ -209,7 +204,6 @@ public class RocksDBMetrics {
sensor,
STATE_STORE_LEVEL_GROUP,
streamsMetrics.storeLevelTagMap(
metricContext.threadId(),
metricContext.taskName(),
metricContext.metricsScope(),
metricContext.storeName()
@ -227,7 +221,6 @@ public class RocksDBMetrics {
sensor,
STATE_STORE_LEVEL_GROUP,
streamsMetrics.storeLevelTagMap(
metricContext.threadId(),
metricContext.taskName(),
metricContext.metricsScope(),
metricContext.storeName()
@ -245,7 +238,6 @@ public class RocksDBMetrics {
sensor,
STATE_STORE_LEVEL_GROUP,
streamsMetrics.storeLevelTagMap(
metricContext.threadId(),
metricContext.taskName(),
metricContext.metricsScope(),
metricContext.storeName()
@ -263,7 +255,6 @@ public class RocksDBMetrics {
sensor,
STATE_STORE_LEVEL_GROUP,
streamsMetrics.storeLevelTagMap(
metricContext.threadId(),
metricContext.taskName(),
metricContext.metricsScope(),
metricContext.storeName()
@ -281,7 +272,6 @@ public class RocksDBMetrics {
sensor,
STATE_STORE_LEVEL_GROUP,
streamsMetrics.storeLevelTagMap(
metricContext.threadId(),
metricContext.taskName(),
metricContext.metricsScope(),
metricContext.storeName()
@ -300,7 +290,6 @@ public class RocksDBMetrics {
sensor,
STATE_STORE_LEVEL_GROUP,
streamsMetrics.storeLevelTagMap(
metricContext.threadId(),
metricContext.taskName(),
metricContext.metricsScope(),
metricContext.storeName()
@ -318,7 +307,6 @@ public class RocksDBMetrics {
sensor,
STATE_STORE_LEVEL_GROUP,
streamsMetrics.storeLevelTagMap(
metricContext.threadId(),
metricContext.taskName(),
metricContext.metricsScope(),
metricContext.storeName()
@ -336,7 +324,6 @@ public class RocksDBMetrics {
sensor,
STATE_STORE_LEVEL_GROUP,
streamsMetrics.storeLevelTagMap(
metricContext.threadId(),
metricContext.taskName(),
metricContext.metricsScope(),
metricContext.storeName()
@ -354,7 +341,6 @@ public class RocksDBMetrics {
sensor,
STATE_STORE_LEVEL_GROUP,
streamsMetrics.storeLevelTagMap(
metricContext.threadId(),
metricContext.taskName(),
metricContext.metricsScope(),
metricContext.storeName()
@ -372,7 +358,6 @@ public class RocksDBMetrics {
sensor,
STATE_STORE_LEVEL_GROUP,
streamsMetrics.storeLevelTagMap(
metricContext.threadId(),
metricContext.taskName(),
metricContext.metricsScope(),
metricContext.storeName()
@ -390,7 +375,6 @@ public class RocksDBMetrics {
sensor,
STATE_STORE_LEVEL_GROUP,
streamsMetrics.storeLevelTagMap(
metricContext.threadId(),
metricContext.taskName(),
metricContext.metricsScope(),
metricContext.storeName()
@ -408,7 +392,6 @@ public class RocksDBMetrics {
sensor,
STATE_STORE_LEVEL_GROUP,
streamsMetrics.storeLevelTagMap(
metricContext.threadId(),
metricContext.taskName(),
metricContext.metricsScope(),
metricContext.storeName()
@ -426,7 +409,6 @@ public class RocksDBMetrics {
sensor,
STATE_STORE_LEVEL_GROUP,
streamsMetrics.storeLevelTagMap(
metricContext.threadId(),
metricContext.taskName(),
metricContext.metricsScope(),
metricContext.storeName()
@ -444,7 +426,6 @@ public class RocksDBMetrics {
sensor,
STATE_STORE_LEVEL_GROUP,
streamsMetrics.storeLevelTagMap(
metricContext.threadId(),
metricContext.taskName(),
metricContext.metricsScope(),
metricContext.storeName()
@ -463,7 +444,6 @@ public class RocksDBMetrics {
sensor,
STATE_STORE_LEVEL_GROUP,
streamsMetrics.storeLevelTagMap(
metricContext.threadId(),
metricContext.taskName(),
metricContext.metricsScope(),
metricContext.storeName()
@ -474,11 +454,24 @@ public class RocksDBMetrics {
return sensor;
}
public static void addNumEntriesActiveMemTableMetric(final StreamsMetricsImpl streamsMetrics,
final RocksDBMetricContext metricContext,
final Gauge<BigInteger> valueProvider) {
streamsMetrics.addStoreLevelMutableMetric(
metricContext.taskName(),
metricContext.metricsScope(),
metricContext.storeName(),
NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE,
NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE_DESCRIPTION,
RecordingLevel.INFO,
valueProvider
);
}
private static Sensor createSensor(final StreamsMetricsImpl streamsMetrics,
final RocksDBMetricContext metricContext,
final String sensorName) {
return streamsMetrics.storeLevelSensor(
metricContext.threadId(),
metricContext.taskName(),
metricContext.storeName(),
sensorName,

View File

@ -18,20 +18,26 @@ package org.apache.kafka.streams.state.internals.metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.RocksDBMetricContext;
import org.rocksdb.Cache;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Statistics;
import org.rocksdb.StatsLevel;
import org.rocksdb.TickerType;
import org.slf4j.Logger;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE;
public class RocksDBMetricsRecorder {
private static class DbAndCacheAndStatistics {
@ -56,6 +62,8 @@ public class RocksDBMetricsRecorder {
}
}
private static final String ROCKSDB_PROPERTIES_PREFIX = "rocksdb.";
private final Logger logger;
private Sensor bytesWrittenToDatabaseSensor;
@ -74,15 +82,12 @@ public class RocksDBMetricsRecorder {
private final Map<String, DbAndCacheAndStatistics> storeToValueProviders = new ConcurrentHashMap<>();
private final String metricsScope;
private final String storeName;
private final String threadId;
private TaskId taskId;
private StreamsMetricsImpl streamsMetrics;
public RocksDBMetricsRecorder(final String metricsScope,
final String threadId,
final String storeName) {
this.metricsScope = metricsScope;
this.threadId = threadId;
this.storeName = storeName;
final LogContext logContext = new LogContext(String.format("[RocksDB Metrics Recorder for %s] ", storeName));
logger = logContext.logger(RocksDBMetricsRecorder.class);
@ -113,7 +118,9 @@ public class RocksDBMetricsRecorder {
+ "This is a bug in Kafka Streams. " +
"Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
}
initSensors(streamsMetrics, taskId);
final RocksDBMetricContext metricContext = new RocksDBMetricContext(taskId.toString(), metricsScope, storeName);
initSensors(streamsMetrics, metricContext);
initGauges(streamsMetrics, metricContext);
this.taskId = taskId;
this.streamsMetrics = streamsMetrics;
}
@ -151,9 +158,7 @@ public class RocksDBMetricsRecorder {
}
}
private void initSensors(final StreamsMetricsImpl streamsMetrics, final TaskId taskId) {
final RocksDBMetricContext metricContext =
new RocksDBMetricContext(threadId, taskId.toString(), metricsScope, storeName);
private void initSensors(final StreamsMetricsImpl streamsMetrics, final RocksDBMetricContext metricContext) {
bytesWrittenToDatabaseSensor = RocksDBMetrics.bytesWrittenToDatabaseSensor(streamsMetrics, metricContext);
bytesReadFromDatabaseSensor = RocksDBMetrics.bytesReadFromDatabaseSensor(streamsMetrics, metricContext);
memtableBytesFlushedSensor = RocksDBMetrics.memtableBytesFlushedSensor(streamsMetrics, metricContext);
@ -169,6 +174,30 @@ public class RocksDBMetricsRecorder {
numberOfFileErrorsSensor = RocksDBMetrics.numberOfFileErrorsSensor(streamsMetrics, metricContext);
}
private void initGauges(final StreamsMetricsImpl streamsMetrics, final RocksDBMetricContext metricContext) {
RocksDBMetrics.addNumEntriesActiveMemTableMetric(streamsMetrics, metricContext, (metricsConfig, now) -> {
BigInteger result = BigInteger.valueOf(0);
for (final DbAndCacheAndStatistics valueProvider : storeToValueProviders.values()) {
try {
// values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use
// BigInteger and construct the object from the byte representation of the value
result = result.add(new BigInteger(1, longToBytes(
valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE))));
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error adding RocksDB metric " + NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE, e);
}
}
return result;
});
}
private static byte[] longToBytes(final long data) {
final ByteBuffer conversionBuffer = ByteBuffer.allocate(Long.BYTES);
conversionBuffer.putLong(0, data);
return conversionBuffer.array();
}
public void removeValueProviders(final String segmentName) {
logger.debug("Removing value providers for store {} of task {}", segmentName, taskId);
final DbAndCacheAndStatistics removedValueProviders = storeToValueProviders.remove(segmentName);

View File

@ -386,13 +386,11 @@ public class StateStoreMetrics {
);
}
public static Sensor expiredWindowRecordDropSensor(final String threadId,
final String taskId,
public static Sensor expiredWindowRecordDropSensor(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics) {
final Sensor sensor = streamsMetrics.storeLevelSensor(
threadId,
taskId,
storeName,
EXPIRED_WINDOW_RECORD_DROP,
@ -401,7 +399,7 @@ public class StateStoreMetrics {
addInvocationRateAndCountToSensor(
sensor,
"stream-" + storeType + "-metrics",
streamsMetrics.storeLevelTagMap(threadId, taskId, storeType, storeName),
streamsMetrics.storeLevelTagMap(taskId, storeType, storeName),
EXPIRED_WINDOW_RECORD_DROP,
EXPIRED_WINDOW_RECORD_DROP_RATE_DESCRIPTION,
EXPIRED_WINDOW_RECORD_DROP_TOTAL_DESCRIPTION
@ -447,13 +445,12 @@ public class StateStoreMetrics {
);
}
public static Sensor e2ELatencySensor(final String threadId,
final String taskId,
public static Sensor e2ELatencySensor(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics) {
final Sensor sensor = streamsMetrics.storeLevelSensor(threadId, taskId, storeName, RECORD_E2E_LATENCY, RecordingLevel.TRACE);
final Map<String, String> tagMap = streamsMetrics.storeLevelTagMap(threadId, taskId, storeType, storeName);
final Sensor sensor = streamsMetrics.storeLevelSensor(taskId, storeName, RECORD_E2E_LATENCY, RecordingLevel.TRACE);
final Map<String, String> tagMap = streamsMetrics.storeLevelTagMap(taskId, storeType, storeName);
addAvgAndMinAndMaxToSensor(
sensor,
STATE_STORE_LEVEL_GROUP,
@ -477,7 +474,7 @@ public class StateStoreMetrics {
final RecordingLevel recordingLevel,
final StreamsMetricsImpl streamsMetrics) {
final Version version = streamsMetrics.version();
final Sensor sensor = streamsMetrics.storeLevelSensor(threadId, taskId, storeName, metricName, recordingLevel);
final Sensor sensor = streamsMetrics.storeLevelSensor(taskId, storeName, metricName, recordingLevel);
final String group;
final Map<String, String> tagMap;
if (version == Version.FROM_0100_TO_24) {
@ -487,7 +484,7 @@ public class StateStoreMetrics {
} else {
group = STATE_STORE_LEVEL_GROUP;
tagMap = streamsMetrics.storeLevelTagMap(threadId, taskId, storeType, storeName);
tagMap = streamsMetrics.storeLevelTagMap(taskId, storeType, storeName);
}
addAvgAndMaxToSensor(sensor, group, tagMap, metricName, descriptionOfAvg, descriptionOfMax);
return sensor;
@ -507,7 +504,7 @@ public class StateStoreMetrics {
final Sensor sensor;
final String latencyMetricName = metricName + LATENCY_SUFFIX;
final Version version = streamsMetrics.version();
final Map<String, String> tagMap = streamsMetrics.storeLevelTagMap(threadId, taskId, storeType, storeName);
final Map<String, String> tagMap = streamsMetrics.storeLevelTagMap(taskId, storeType, storeName);
final String stateStoreLevelGroup = stateStoreLevelGroup(storeType, version);
if (version == Version.FROM_0100_TO_24) {
final Sensor parentSensor = parentSensor(
@ -524,7 +521,7 @@ public class StateStoreMetrics {
recordingLevel,
streamsMetrics
);
sensor = streamsMetrics.storeLevelSensor(threadId, taskId, storeName, metricName, recordingLevel, parentSensor);
sensor = streamsMetrics.storeLevelSensor(taskId, storeName, metricName, recordingLevel, parentSensor);
addInvocationRateAndCountToSensor(
sensor,
stateStoreLevelGroup,
@ -534,7 +531,7 @@ public class StateStoreMetrics {
descriptionOfCount
);
} else {
sensor = streamsMetrics.storeLevelSensor(threadId, taskId, storeName, metricName, recordingLevel);
sensor = streamsMetrics.storeLevelSensor(taskId, storeName, metricName, recordingLevel);
addInvocationRateToSensor(sensor, stateStoreLevelGroup, tagMap, metricName, descriptionOfRate);
}
addAvgAndMaxToSensor(
@ -561,7 +558,7 @@ public class StateStoreMetrics {
final RecordingLevel recordingLevel,
final StreamsMetricsImpl streamsMetrics) {
final Sensor sensor = streamsMetrics.taskLevelSensor(threadId, taskId, metricName, recordingLevel);
final Map<String, String> allTagMap = streamsMetrics.storeLevelTagMap(threadId, taskId, storeType, ROLLUP_VALUE);
final Map<String, String> allTagMap = streamsMetrics.storeLevelTagMap(taskId, storeType, ROLLUP_VALUE);
addAvgAndMaxToSensor(
sensor,
stateStoreLevelGroup,

View File

@ -100,6 +100,7 @@ public class RocksDBMetricsIntegrationTest {
private static final String BYTES_WRITTEN_DURING_COMPACTION_RATE = "bytes-written-compaction-rate";
private static final String NUMBER_OF_OPEN_FILES = "number-open-files";
private static final String NUMBER_OF_FILE_ERRORS = "number-file-errors-total";
private static final String NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE = "num-entries-active-mem-table";
@Parameters(name = "{0}")
public static Collection<Object[]> data() {
@ -255,6 +256,7 @@ public class RocksDBMetricsIntegrationTest {
checkMetricByName(listMetricStore, BYTES_WRITTEN_DURING_COMPACTION_RATE, 1);
checkMetricByName(listMetricStore, NUMBER_OF_OPEN_FILES, 1);
checkMetricByName(listMetricStore, NUMBER_OF_FILE_ERRORS, 1);
checkMetricByName(listMetricStore, NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE, 1);
}
private void checkMetricByName(final List<Metric> listMetric,

View File

@ -30,6 +30,8 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ImmutableMetricValue;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
import org.apache.kafka.test.StreamsTestUtils;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.easymock.IArgumentMatcher;
import org.junit.Test;
@ -54,13 +56,16 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxLatencyToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.anyString;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.newCapture;
import static org.easymock.EasyMock.niceMock;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.resetToDefault;
@ -68,6 +73,7 @@ import static org.easymock.EasyMock.verify;
import static org.easymock.EasyMock.eq;
import static org.hamcrest.CoreMatchers.equalToObject;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
@ -78,48 +84,56 @@ import static org.junit.Assert.assertTrue;
import static org.powermock.api.easymock.PowerMock.createMock;
@RunWith(PowerMockRunner.class)
@PrepareForTest(Sensor.class)
@PrepareForTest({Sensor.class, KafkaMetric.class})
public class StreamsMetricsImplTest {
private final static String SENSOR_PREFIX_DELIMITER = ".";
private final static String SENSOR_NAME_DELIMITER = ".s.";
private final static String SENSOR_NAME_1 = "sensor1";
private final static String SENSOR_NAME_2 = "sensor2";
private final static String INTERNAL_PREFIX = "internal";
private final static String VERSION = StreamsConfig.METRICS_LATEST;
private final static String CLIENT_ID = "test-client";
private final static String THREAD_ID = "test-thread";
private final static String TASK_ID = "test-task";
private final static String THREAD_ID1 = "test-thread-1";
private final static String TASK_ID1 = "test-task-1";
private final static String TASK_ID2 = "test-task-2";
private final static String METRIC_NAME1 = "test-metric1";
private final static String METRIC_NAME2 = "test-metric2";
private final static String THREAD_ID_TAG = "thread-id";
private final static String THREAD_ID_TAG_0100_TO_24 = "client-id";
private final static String TASK_ID_TAG = "task-id";
private final static String STORE_ID_TAG = "state-id";
private final static String RECORD_CACHE_ID_TAG = "record-cache-id";
private final static String SCOPE_NAME = "test-scope";
private final static String STORE_ID_TAG = "-state-id";
private final static String STORE_NAME1 = "store1";
private final static String STORE_NAME2 = "store2";
private final static String RECORD_CACHE_ID_TAG = "record-cache-id";
private final static String ENTITY_NAME = "test-entity";
private final static String OPERATION_NAME = "test-operation";
private final static String CUSTOM_TAG_KEY1 = "test-key1";
private final static String CUSTOM_TAG_VALUE1 = "test-value1";
private final static String CUSTOM_TAG_KEY2 = "test-key2";
private final static String CUSTOM_TAG_VALUE2 = "test-value2";
private final static RecordingLevel INFO_RECORDING_LEVEL = RecordingLevel.INFO;
private final static String DESCRIPTION1 = "description number one";
private final static String DESCRIPTION2 = "description number two";
private final static String DESCRIPTION3 = "description number three";
private final static Gauge<String> VALUE_PROVIDER = (config, now) -> "mutable-value";
private final Metrics metrics = new Metrics();
private final Sensor sensor = metrics.sensor("dummy");
private final String storeName = "store";
private final String sensorName1 = "sensor1";
private final String sensorName2 = "sensor2";
private final String metricNamePrefix = "metric";
private final String group = "group";
private final Map<String, String> tags = mkMap(mkEntry("tag", "value"));
private final String description1 = "description number one";
private final String description2 = "description number two";
private final String description3 = "description number three";
private final String description4 = "description number four";
private final Map<String, String> clientLevelTags = mkMap(mkEntry("client-id", CLIENT_ID));
private final Map<String, String> clientLevelTags = mkMap(mkEntry(CLIENT_ID_TAG, CLIENT_ID));
private final Map<String, String> storeLevelTagMap = mkMap(
mkEntry(THREAD_ID_TAG, Thread.currentThread().getName()),
mkEntry(TASK_ID_TAG, TASK_ID1),
mkEntry(SCOPE_NAME + STORE_ID_TAG, STORE_NAME1)
);
private final MetricName metricName1 =
new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, description1, clientLevelTags);
new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags);
private final MetricName metricName2 =
new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, description2, clientLevelTags);
new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION2, clientLevelTags);
private final MockTime time = new MockTime(0);
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
@ -176,57 +190,78 @@ public class StreamsMetricsImplTest {
return null;
}
private void addSensorsOnAllLevels(final Metrics metrics, final StreamsMetricsImpl streamsMetrics) {
expect(metrics.sensor(anyString(), anyObject(RecordingLevel.class), anyObject(Sensor[].class)))
private Capture<String> addSensorsOnAllLevels(final Metrics metrics, final StreamsMetricsImpl streamsMetrics) {
final Capture<String> sensorKeys = newCapture(CaptureType.ALL);
final Sensor[] parents = {};
expect(metrics.sensor(capture(sensorKeys), eq(INFO_RECORDING_LEVEL), parents))
.andStubReturn(sensor);
expect(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, description1, clientLevelTags))
expect(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags))
.andReturn(metricName1);
expect(metrics.metricName(METRIC_NAME2, CLIENT_LEVEL_GROUP, description2, clientLevelTags))
expect(metrics.metricName(METRIC_NAME2, CLIENT_LEVEL_GROUP, DESCRIPTION2, clientLevelTags))
.andReturn(metricName2);
replay(metrics);
streamsMetrics.addClientLevelImmutableMetric(METRIC_NAME1, description1, RecordingLevel.INFO, "value");
streamsMetrics.addClientLevelImmutableMetric(METRIC_NAME2, description2, RecordingLevel.INFO, "value");
streamsMetrics.threadLevelSensor(THREAD_ID, sensorName1, RecordingLevel.INFO);
streamsMetrics.threadLevelSensor(THREAD_ID, sensorName2, RecordingLevel.INFO);
streamsMetrics.taskLevelSensor(THREAD_ID, TASK_ID, sensorName1, RecordingLevel.INFO);
streamsMetrics.taskLevelSensor(THREAD_ID, TASK_ID, sensorName2, RecordingLevel.INFO);
streamsMetrics.storeLevelSensor(THREAD_ID, TASK_ID, storeName, sensorName1, RecordingLevel.INFO);
streamsMetrics.storeLevelSensor(THREAD_ID, TASK_ID, storeName, sensorName2, RecordingLevel.INFO);
streamsMetrics.addClientLevelImmutableMetric(METRIC_NAME1, DESCRIPTION1, INFO_RECORDING_LEVEL, "value");
streamsMetrics.addClientLevelImmutableMetric(METRIC_NAME2, DESCRIPTION2, INFO_RECORDING_LEVEL, "value");
streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_2, INFO_RECORDING_LEVEL);
streamsMetrics.taskLevelSensor(THREAD_ID1, TASK_ID1, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
streamsMetrics.taskLevelSensor(THREAD_ID1, TASK_ID1, SENSOR_NAME_2, INFO_RECORDING_LEVEL);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_2, INFO_RECORDING_LEVEL);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME2, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
streamsMetrics.addStoreLevelMutableMetric(
TASK_ID1,
SCOPE_NAME,
STORE_NAME1,
METRIC_NAME1,
DESCRIPTION1,
INFO_RECORDING_LEVEL,
VALUE_PROVIDER
);
streamsMetrics.addStoreLevelMutableMetric(
TASK_ID1,
SCOPE_NAME,
STORE_NAME1,
METRIC_NAME2,
DESCRIPTION2,
INFO_RECORDING_LEVEL,
VALUE_PROVIDER
);
streamsMetrics.addStoreLevelMutableMetric(
TASK_ID1,
SCOPE_NAME,
STORE_NAME2,
METRIC_NAME1,
DESCRIPTION1,
INFO_RECORDING_LEVEL,
VALUE_PROVIDER
);
return sensorKeys;
}
private void setupGetNewSensorTest(final Metrics metrics,
final String level,
final RecordingLevel recordingLevel) {
final String fullSensorName = fullSensorName(level);
expect(metrics.getSensor(fullSensorName)).andStubReturn(null);
private Capture<String> setupGetNewSensorTest(final Metrics metrics,
final RecordingLevel recordingLevel) {
final Capture<String> sensorKey = newCapture(CaptureType.ALL);
expect(metrics.getSensor(capture(sensorKey))).andStubReturn(null);
final Sensor[] parents = {};
expect(metrics.sensor(fullSensorName, recordingLevel, parents)).andReturn(sensor);
expect(metrics.sensor(capture(sensorKey), eq(recordingLevel), parents)).andReturn(sensor);
replay(metrics);
return sensorKey;
}
private void setupGetExistingSensorTest(final Metrics metrics,
final String level) {
final String fullSensorName = fullSensorName(level);
expect(metrics.getSensor(fullSensorName)).andStubReturn(sensor);
private void setupGetExistingSensorTest(final Metrics metrics) {
expect(metrics.getSensor(anyString())).andStubReturn(sensor);
replay(metrics);
}
private String fullSensorName(final String level) {
return INTERNAL_PREFIX + SENSOR_PREFIX_DELIMITER + level + SENSOR_NAME_DELIMITER + sensorName1;
}
@Test
public void shouldGetNewThreadLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(metrics, THREAD_ID, recordingLevel);
setupGetNewSensorTest(metrics, recordingLevel);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
final Sensor actualSensor = streamsMetrics.threadLevelSensor(
THREAD_ID,
sensorName1,
recordingLevel
);
final Sensor actualSensor = streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel);
verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
@ -236,14 +271,10 @@ public class StreamsMetricsImplTest {
public void shouldGetExistingThreadLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics, THREAD_ID);
setupGetExistingSensorTest(metrics);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
final Sensor actualSensor = streamsMetrics.threadLevelSensor(
THREAD_ID,
sensorName1,
recordingLevel
);
final Sensor actualSensor = streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel);
verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
@ -253,13 +284,13 @@ public class StreamsMetricsImplTest {
public void shouldGetNewTaskLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(metrics, THREAD_ID + ".task." + TASK_ID, recordingLevel);
setupGetNewSensorTest(metrics, recordingLevel);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
final Sensor actualSensor = streamsMetrics.taskLevelSensor(
THREAD_ID,
TASK_ID,
sensorName1,
THREAD_ID1,
TASK_ID1,
SENSOR_NAME_1,
recordingLevel
);
@ -271,13 +302,13 @@ public class StreamsMetricsImplTest {
public void shouldGetExistingTaskLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics, THREAD_ID + ".task." + TASK_ID);
setupGetExistingSensorTest(metrics);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
final Sensor actualSensor = streamsMetrics.taskLevelSensor(
THREAD_ID,
TASK_ID,
sensorName1,
THREAD_ID1,
TASK_ID1,
SENSOR_NAME_1,
recordingLevel
);
@ -286,44 +317,35 @@ public class StreamsMetricsImplTest {
}
@Test
public void shouldGetNewStoreLevelSensor() {
public void shouldGetNewStoreLevelSensorIfNoneExists() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(
metrics,
THREAD_ID + ".task." + storeName + SENSOR_PREFIX_DELIMITER + storeName + SENSOR_PREFIX_DELIMITER
+ TASK_ID,
recordingLevel
);
final Capture<String> sensorKeys = setupGetNewSensorTest(metrics, recordingLevel);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
final Sensor actualSensor = streamsMetrics.storeLevelSensor(
THREAD_ID,
storeName,
TASK_ID,
sensorName1,
TASK_ID1,
STORE_NAME1,
SENSOR_NAME_1,
recordingLevel
);
verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
assertThat(sensorKeys.getValues().get(0), is(sensorKeys.getValues().get(1)));
}
@Test
public void shouldGetExistingStoreLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(
metrics, THREAD_ID + ".task." + storeName + SENSOR_PREFIX_DELIMITER + storeName + SENSOR_PREFIX_DELIMITER
+ TASK_ID
);
setupGetExistingSensorTest(metrics);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
final Sensor actualSensor = streamsMetrics.storeLevelSensor(
THREAD_ID,
storeName,
TASK_ID,
sensorName1,
TASK_ID1,
STORE_NAME1,
SENSOR_NAME_1,
recordingLevel
);
@ -331,22 +353,164 @@ public class StreamsMetricsImplTest {
assertThat(actualSensor, is(equalToObject(sensor)));
}
@Test
public void shouldUseSameStoreLevelSensorKeyWithTwoDifferentSensorNames() {
final Metrics metrics = niceMock(Metrics.class);
final Capture<String> sensorKeys = setUpSensorKeyTests(metrics);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_2, INFO_RECORDING_LEVEL);
assertThat(sensorKeys.getValues().get(0), not(sensorKeys.getValues().get(1)));
}
@Test
public void shouldNotUseSameStoreLevelSensorKeyWithDifferentTaskIds() {
final Metrics metrics = niceMock(Metrics.class);
final Capture<String> sensorKeys = setUpSensorKeyTests(metrics);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
streamsMetrics.storeLevelSensor(TASK_ID2, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
assertThat(sensorKeys.getValues().get(0), not(sensorKeys.getValues().get(1)));
}
@Test
public void shouldNotUseSameStoreLevelSensorKeyWithDifferentStoreNames() {
final Metrics metrics = niceMock(Metrics.class);
final Capture<String> sensorKeys = setUpSensorKeyTests(metrics);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME2, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
assertThat(sensorKeys.getValues().get(0), not(sensorKeys.getValues().get(1)));
}
@Test
public void shouldNotUseSameStoreLevelSensorKeyWithDifferentThreadIds() throws InterruptedException {
final Metrics metrics = niceMock(Metrics.class);
final Capture<String> sensorKeys = setUpSensorKeyTests(metrics);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
final Thread otherThread =
new Thread(() -> streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL));
otherThread.start();
otherThread.join();
assertThat(sensorKeys.getValues().get(0), not(sensorKeys.getValues().get(1)));
}
@Test
public void shouldUseSameStoreLevelSensorKeyWithSameSensorNames() {
final Metrics metrics = niceMock(Metrics.class);
final Capture<String> sensorKeys = setUpSensorKeyTests(metrics);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL);
assertThat(sensorKeys.getValues().get(0), is(sensorKeys.getValues().get(1)));
}
private Capture<String> setUpSensorKeyTests(final Metrics metrics) {
final Capture<String> sensorKeys = newCapture(CaptureType.ALL);
expect(metrics.getSensor(capture(sensorKeys))).andStubReturn(sensor);
replay(metrics);
return sensorKeys;
}
@Test
public void shouldAddNewStoreLevelMutableMetric() {
final Metrics metrics = mock(Metrics.class);
final MetricName metricName =
new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, storeLevelTagMap);
final MetricConfig metricConfig = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, storeLevelTagMap))
.andReturn(metricName);
expect(metrics.metric(metricName)).andReturn(null);
metrics.addMetric(eq(metricName), eqMetricConfig(metricConfig), eq(VALUE_PROVIDER));
replay(metrics);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
streamsMetrics.addStoreLevelMutableMetric(
TASK_ID1,
SCOPE_NAME,
STORE_NAME1,
METRIC_NAME1,
DESCRIPTION1,
INFO_RECORDING_LEVEL,
VALUE_PROVIDER
);
verify(metrics);
}
@Test
public void shouldNotAddStoreLevelMutableMetricIfAlreadyExists() {
final Metrics metrics = mock(Metrics.class);
final MetricName metricName =
new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, storeLevelTagMap);
expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, storeLevelTagMap))
.andReturn(metricName);
expect(metrics.metric(metricName)).andReturn(mock(KafkaMetric.class));
replay(metrics);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
streamsMetrics.addStoreLevelMutableMetric(
TASK_ID1,
SCOPE_NAME,
STORE_NAME1,
METRIC_NAME1,
DESCRIPTION1,
INFO_RECORDING_LEVEL,
VALUE_PROVIDER
);
verify(metrics);
}
@Test
public void shouldRemoveStateStoreLevelSensors() {
final Metrics metrics = niceMock(Metrics.class);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
final MetricName metricName1 =
new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, storeLevelTagMap);
final MetricName metricName2 =
new MetricName(METRIC_NAME2, STATE_STORE_LEVEL_GROUP, DESCRIPTION2, storeLevelTagMap);
expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, storeLevelTagMap))
.andReturn(metricName1);
expect(metrics.metricName(METRIC_NAME2, STATE_STORE_LEVEL_GROUP, DESCRIPTION2, storeLevelTagMap))
.andReturn(metricName2);
final Capture<String> sensorKeys = addSensorsOnAllLevels(metrics, streamsMetrics);
resetToDefault(metrics);
metrics.removeSensor(sensorKeys.getValues().get(4));
metrics.removeSensor(sensorKeys.getValues().get(5));
expect(metrics.removeMetric(metricName1)).andReturn(mock(KafkaMetric.class));
expect(metrics.removeMetric(metricName2)).andReturn(mock(KafkaMetric.class));
replay(metrics);
streamsMetrics.removeAllStoreLevelSensorsAndMetrics(TASK_ID1, STORE_NAME1);
verify(metrics);
}
@Test
public void shouldGetNewNodeLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
final String processorNodeName = "processorNodeName";
setupGetNewSensorTest(metrics, THREAD_ID + ".task." + TASK_ID + SENSOR_PREFIX_DELIMITER + "node"
+ SENSOR_PREFIX_DELIMITER + processorNodeName,
recordingLevel
);
setupGetNewSensorTest(metrics, recordingLevel);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
THREAD_ID,
TASK_ID,
THREAD_ID1,
TASK_ID1,
processorNodeName,
sensorName1,
SENSOR_NAME_1,
recordingLevel
);
@ -359,17 +523,14 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
final String processorNodeName = "processorNodeName";
setupGetExistingSensorTest(
metrics, THREAD_ID + ".task." + TASK_ID + SENSOR_PREFIX_DELIMITER
+ "node" + SENSOR_PREFIX_DELIMITER + processorNodeName
);
setupGetExistingSensorTest(metrics);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
THREAD_ID,
TASK_ID,
THREAD_ID1,
TASK_ID1,
processorNodeName,
sensorName1,
SENSOR_NAME_1,
recordingLevel
);
@ -382,18 +543,14 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
final String processorCacheName = "processorNodeName";
setupGetNewSensorTest(
metrics, THREAD_ID + ".task." + TASK_ID + SENSOR_PREFIX_DELIMITER
+ "cache" + SENSOR_PREFIX_DELIMITER + processorCacheName,
recordingLevel
);
setupGetNewSensorTest(metrics, recordingLevel);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
THREAD_ID,
TASK_ID,
THREAD_ID1,
TASK_ID1,
processorCacheName,
sensorName1,
SENSOR_NAME_1,
recordingLevel
);
@ -406,16 +563,13 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
final String processorCacheName = "processorNodeName";
setupGetExistingSensorTest(
metrics, THREAD_ID + ".task." + TASK_ID + SENSOR_PREFIX_DELIMITER
+ "cache" + SENSOR_PREFIX_DELIMITER + processorCacheName
);
setupGetExistingSensorTest(metrics);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
THREAD_ID, TASK_ID,
THREAD_ID1, TASK_ID1,
processorCacheName,
sensorName1,
SENSOR_NAME_1,
recordingLevel
);
@ -430,13 +584,13 @@ public class StreamsMetricsImplTest {
final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel);
final String value = "immutable-value";
final ImmutableMetricValue immutableValue = new ImmutableMetricValue<>(value);
expect(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, description1, clientLevelTags))
expect(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags))
.andReturn(metricName1);
metrics.addMetric(eq(metricName1), eqMetricConfig(metricConfig), eq(immutableValue));
replay(metrics);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
streamsMetrics.addClientLevelImmutableMetric(METRIC_NAME1, description1, recordingLevel, value);
streamsMetrics.addClientLevelImmutableMetric(METRIC_NAME1, DESCRIPTION1, recordingLevel, value);
verify(metrics);
}
@ -447,13 +601,13 @@ public class StreamsMetricsImplTest {
final RecordingLevel recordingLevel = RecordingLevel.INFO;
final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel);
final Gauge<String> valueProvider = (config, now) -> "mutable-value";
expect(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, description1, clientLevelTags))
expect(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags))
.andReturn(metricName1);
metrics.addMetric(EasyMock.eq(metricName1), eqMetricConfig(metricConfig), eq(valueProvider));
replay(metrics);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
streamsMetrics.addClientLevelMutableMetric(METRIC_NAME1, description1, recordingLevel, valueProvider);
streamsMetrics.addClientLevelMutableMetric(METRIC_NAME1, DESCRIPTION1, recordingLevel, valueProvider);
verify(metrics);
}
@ -469,8 +623,8 @@ public class StreamsMetricsImplTest {
final RecordingLevel recordingLevel) {
final String fullSensorNamePrefix = INTERNAL_PREFIX + SENSOR_PREFIX_DELIMITER + level + SENSOR_NAME_DELIMITER;
resetToDefault(metrics);
metrics.removeSensor(fullSensorNamePrefix + sensorName1);
metrics.removeSensor(fullSensorNamePrefix + sensorName2);
metrics.removeSensor(fullSensorNamePrefix + SENSOR_NAME_1);
metrics.removeSensor(fullSensorNamePrefix + SENSOR_NAME_2);
replay(metrics);
}
@ -494,9 +648,9 @@ public class StreamsMetricsImplTest {
final Metrics metrics = niceMock(Metrics.class);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
addSensorsOnAllLevels(metrics, streamsMetrics);
setupRemoveSensorsTest(metrics, THREAD_ID, RecordingLevel.INFO);
setupRemoveSensorsTest(metrics, THREAD_ID1, RecordingLevel.INFO);
streamsMetrics.removeAllThreadLevelSensors(THREAD_ID);
streamsMetrics.removeAllThreadLevelSensors(THREAD_ID1);
verify(metrics);
}
@ -536,7 +690,7 @@ public class StreamsMetricsImplTest {
@Test
public void testMultiLevelSensorRemoval() {
final Metrics registry = new Metrics();
final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, THREAD_ID, VERSION, time);
final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, THREAD_ID1, VERSION, time);
for (final MetricName defaultMetric : registry.metrics().keySet()) {
registry.removeMetric(defaultMetric);
}
@ -548,39 +702,39 @@ public class StreamsMetricsImplTest {
final String processorNodeName = "processorNodeName";
final Map<String, String> nodeTags = mkMap(mkEntry("nkey", "value"));
final Sensor parent1 = metrics.taskLevelSensor(THREAD_ID, taskName, operation, RecordingLevel.DEBUG);
final Sensor parent1 = metrics.taskLevelSensor(THREAD_ID1, taskName, operation, RecordingLevel.DEBUG);
addAvgAndMaxLatencyToSensor(parent1, PROCESSOR_NODE_LEVEL_GROUP, taskTags, operation);
addInvocationRateAndCountToSensor(parent1, PROCESSOR_NODE_LEVEL_GROUP, taskTags, operation, "", "");
final int numberOfTaskMetrics = registry.metrics().size();
final Sensor sensor1 = metrics.nodeLevelSensor(THREAD_ID, taskName, processorNodeName, operation, RecordingLevel.DEBUG, parent1);
final Sensor sensor1 = metrics.nodeLevelSensor(THREAD_ID1, taskName, processorNodeName, operation, RecordingLevel.DEBUG, parent1);
addAvgAndMaxLatencyToSensor(sensor1, PROCESSOR_NODE_LEVEL_GROUP, nodeTags, operation);
addInvocationRateAndCountToSensor(sensor1, PROCESSOR_NODE_LEVEL_GROUP, nodeTags, operation, "", "");
assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics));
metrics.removeAllNodeLevelSensors(THREAD_ID, taskName, processorNodeName);
metrics.removeAllNodeLevelSensors(THREAD_ID1, taskName, processorNodeName);
assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
final Sensor parent2 = metrics.taskLevelSensor(THREAD_ID, taskName, operation, RecordingLevel.DEBUG);
final Sensor parent2 = metrics.taskLevelSensor(THREAD_ID1, taskName, operation, RecordingLevel.DEBUG);
addAvgAndMaxLatencyToSensor(parent2, PROCESSOR_NODE_LEVEL_GROUP, taskTags, operation);
addInvocationRateAndCountToSensor(parent2, PROCESSOR_NODE_LEVEL_GROUP, taskTags, operation, "", "");
assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
final Sensor sensor2 = metrics.nodeLevelSensor(THREAD_ID, taskName, processorNodeName, operation, RecordingLevel.DEBUG, parent2);
final Sensor sensor2 = metrics.nodeLevelSensor(THREAD_ID1, taskName, processorNodeName, operation, RecordingLevel.DEBUG, parent2);
addAvgAndMaxLatencyToSensor(sensor2, PROCESSOR_NODE_LEVEL_GROUP, nodeTags, operation);
addInvocationRateAndCountToSensor(sensor2, PROCESSOR_NODE_LEVEL_GROUP, nodeTags, operation, "", "");
assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics));
metrics.removeAllNodeLevelSensors(THREAD_ID, taskName, processorNodeName);
metrics.removeAllNodeLevelSensors(THREAD_ID1, taskName, processorNodeName);
assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
metrics.removeAllTaskLevelSensors(THREAD_ID, taskName);
metrics.removeAllTaskLevelSensors(THREAD_ID1, taskName);
assertThat(registry.metrics().size(), equalTo(0));
}
@ -840,15 +994,15 @@ public class StreamsMetricsImplTest {
final String taskName = "test-task";
final String storeType = "remote-window";
final String storeName = "window-keeper";
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_ID, builtInMetricsVersion, time);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_ID1, builtInMetricsVersion, time);
final Map<String, String> tagMap = streamsMetrics.storeLevelTagMap(THREAD_ID, taskName, storeType, storeName);
final Map<String, String> tagMap = streamsMetrics.storeLevelTagMap(taskName, storeType, storeName);
assertThat(tagMap.size(), equalTo(3));
final boolean isMetricsLatest = builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST);
assertThat(
tagMap.get(isMetricsLatest ? StreamsMetricsImpl.THREAD_ID_TAG : StreamsMetricsImpl.THREAD_ID_TAG_0100_TO_24),
equalTo(THREAD_ID));
equalTo(Thread.currentThread().getName()));
assertThat(tagMap.get(StreamsMetricsImpl.TASK_ID_TAG), equalTo(taskName));
assertThat(tagMap.get(storeType + "-" + StreamsMetricsImpl.STORE_ID_TAG), equalTo(storeName));
}
@ -865,17 +1019,17 @@ public class StreamsMetricsImplTest {
private void shouldGetCacheLevelTagMap(final String builtInMetricsVersion) {
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, THREAD_ID, builtInMetricsVersion, time);
new StreamsMetricsImpl(metrics, THREAD_ID1, builtInMetricsVersion, time);
final String taskName = "taskName";
final String storeName = "storeName";
final Map<String, String> tagMap = streamsMetrics.cacheLevelTagMap(THREAD_ID, taskName, storeName);
final Map<String, String> tagMap = streamsMetrics.cacheLevelTagMap(THREAD_ID1, taskName, storeName);
assertThat(tagMap.size(), equalTo(3));
final boolean isMetricsLatest = builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST);
assertThat(
tagMap.get(isMetricsLatest ? StreamsMetricsImpl.THREAD_ID_TAG : StreamsMetricsImpl.THREAD_ID_TAG_0100_TO_24),
equalTo(THREAD_ID)
equalTo(THREAD_ID1)
);
assertThat(tagMap.get(TASK_ID_TAG), equalTo(taskName));
assertThat(tagMap.get(RECORD_CACHE_ID_TAG), equalTo(storeName));
@ -892,26 +1046,26 @@ public class StreamsMetricsImplTest {
}
private void shouldGetThreadLevelTagMap(final String builtInMetricsVersion) {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_ID, builtInMetricsVersion, time);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_ID1, builtInMetricsVersion, time);
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(THREAD_ID);
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(THREAD_ID1);
assertThat(tagMap.size(), equalTo(1));
assertThat(
tagMap.get(builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST) ? THREAD_ID_TAG
: THREAD_ID_TAG_0100_TO_24),
equalTo(THREAD_ID)
equalTo(THREAD_ID1)
);
}
@Test
public void shouldAddInvocationRateToSensor() {
final Sensor sensor = createMock(Sensor.class);
final MetricName expectedMetricName = new MetricName(METRIC_NAME1 + "-rate", group, description1, tags);
final MetricName expectedMetricName = new MetricName(METRIC_NAME1 + "-rate", group, DESCRIPTION1, tags);
expect(sensor.add(eq(expectedMetricName), anyObject(Rate.class))).andReturn(true);
replay(sensor);
StreamsMetricsImpl.addInvocationRateToSensor(sensor, group, tags, METRIC_NAME1, description1);
StreamsMetricsImpl.addInvocationRateToSensor(sensor, group, tags, METRIC_NAME1, DESCRIPTION1);
verify(sensor);
}
@ -919,46 +1073,46 @@ public class StreamsMetricsImplTest {
@Test
public void shouldAddAmountRateAndSum() {
StreamsMetricsImpl
.addRateOfSumAndSumMetricsToSensor(sensor, group, tags, metricNamePrefix, description1, description2);
.addRateOfSumAndSumMetricsToSensor(sensor, group, tags, metricNamePrefix, DESCRIPTION1, DESCRIPTION2);
final double valueToRecord1 = 18.0;
final double valueToRecord2 = 72.0;
final long defaultWindowSizeInSeconds = Duration.ofMillis(new MetricConfig().timeWindowMs()).getSeconds();
final double expectedRateMetricValue = (valueToRecord1 + valueToRecord2) / defaultWindowSizeInSeconds;
verifyMetric(metricNamePrefix + "-rate", description1, valueToRecord1, valueToRecord2, expectedRateMetricValue);
verifyMetric(metricNamePrefix + "-rate", DESCRIPTION1, valueToRecord1, valueToRecord2, expectedRateMetricValue);
final double expectedSumMetricValue = 2 * valueToRecord1 + 2 * valueToRecord2; // values are recorded once for each metric verification
verifyMetric(metricNamePrefix + "-total", description2, valueToRecord1, valueToRecord2, expectedSumMetricValue);
verifyMetric(metricNamePrefix + "-total", DESCRIPTION2, valueToRecord1, valueToRecord2, expectedSumMetricValue);
assertThat(metrics.metrics().size(), equalTo(2 + 1)); // one metric is added automatically in the constructor of Metrics
}
@Test
public void shouldAddSum() {
StreamsMetricsImpl.addSumMetricToSensor(sensor, group, tags, metricNamePrefix, description1);
StreamsMetricsImpl.addSumMetricToSensor(sensor, group, tags, metricNamePrefix, DESCRIPTION1);
final double valueToRecord1 = 18.0;
final double valueToRecord2 = 42.0;
final double expectedSumMetricValue = valueToRecord1 + valueToRecord2;
verifyMetric(metricNamePrefix + "-total", description1, valueToRecord1, valueToRecord2, expectedSumMetricValue);
verifyMetric(metricNamePrefix + "-total", DESCRIPTION1, valueToRecord1, valueToRecord2, expectedSumMetricValue);
assertThat(metrics.metrics().size(), equalTo(1 + 1)); // one metric is added automatically in the constructor of Metrics
}
@Test
public void shouldAddAmountRate() {
StreamsMetricsImpl.addRateOfSumMetricToSensor(sensor, group, tags, metricNamePrefix, description1);
StreamsMetricsImpl.addRateOfSumMetricToSensor(sensor, group, tags, metricNamePrefix, DESCRIPTION1);
final double valueToRecord1 = 18.0;
final double valueToRecord2 = 72.0;
final long defaultWindowSizeInSeconds = Duration.ofMillis(new MetricConfig().timeWindowMs()).getSeconds();
final double expectedRateMetricValue = (valueToRecord1 + valueToRecord2) / defaultWindowSizeInSeconds;
verifyMetric(metricNamePrefix + "-rate", description1, valueToRecord1, valueToRecord2, expectedRateMetricValue);
verifyMetric(metricNamePrefix + "-rate", DESCRIPTION1, valueToRecord1, valueToRecord2, expectedRateMetricValue);
assertThat(metrics.metrics().size(), equalTo(1 + 1)); // one metric is added automatically in the constructor of Metrics
}
@Test
public void shouldAddValue() {
StreamsMetricsImpl.addValueMetricToSensor(sensor, group, tags, metricNamePrefix, description1);
StreamsMetricsImpl.addValueMetricToSensor(sensor, group, tags, metricNamePrefix, DESCRIPTION1);
final KafkaMetric ratioMetric = metrics.metric(new MetricName(metricNamePrefix, group, description1, tags));
final KafkaMetric ratioMetric = metrics.metric(new MetricName(metricNamePrefix, group, DESCRIPTION1, tags));
assertThat(ratioMetric, is(notNullValue()));
final MetricConfig metricConfig = new MetricConfig();
final double value1 = 42.0;
@ -973,47 +1127,47 @@ public class StreamsMetricsImplTest {
@Test
public void shouldAddAvgAndTotalMetricsToSensor() {
StreamsMetricsImpl
.addAvgAndSumMetricsToSensor(sensor, group, tags, metricNamePrefix, description1, description2);
.addAvgAndSumMetricsToSensor(sensor, group, tags, metricNamePrefix, DESCRIPTION1, DESCRIPTION2);
final double valueToRecord1 = 18.0;
final double valueToRecord2 = 42.0;
final double expectedAvgMetricValue = (valueToRecord1 + valueToRecord2) / 2;
verifyMetric(metricNamePrefix + "-avg", description1, valueToRecord1, valueToRecord2, expectedAvgMetricValue);
verifyMetric(metricNamePrefix + "-avg", DESCRIPTION1, valueToRecord1, valueToRecord2, expectedAvgMetricValue);
final double expectedSumMetricValue = 2 * valueToRecord1 + 2 * valueToRecord2; // values are recorded once for each metric verification
verifyMetric(metricNamePrefix + "-total", description2, valueToRecord1, valueToRecord2, expectedSumMetricValue);
verifyMetric(metricNamePrefix + "-total", DESCRIPTION2, valueToRecord1, valueToRecord2, expectedSumMetricValue);
assertThat(metrics.metrics().size(), equalTo(2 + 1)); // one metric is added automatically in the constructor of Metrics
}
@Test
public void shouldAddAvgAndMinAndMaxMetricsToSensor() {
StreamsMetricsImpl
.addAvgAndMinAndMaxToSensor(sensor, group, tags, metricNamePrefix, description1, description2, description3);
.addAvgAndMinAndMaxToSensor(sensor, group, tags, metricNamePrefix, DESCRIPTION1, DESCRIPTION2, DESCRIPTION3);
final double valueToRecord1 = 18.0;
final double valueToRecord2 = 42.0;
final double expectedAvgMetricValue = (valueToRecord1 + valueToRecord2) / 2;
verifyMetric(metricNamePrefix + "-avg", description1, valueToRecord1, valueToRecord2, expectedAvgMetricValue);
verifyMetric(metricNamePrefix + "-min", description2, valueToRecord1, valueToRecord2, valueToRecord1);
verifyMetric(metricNamePrefix + "-max", description3, valueToRecord1, valueToRecord2, valueToRecord2);
verifyMetric(metricNamePrefix + "-avg", DESCRIPTION1, valueToRecord1, valueToRecord2, expectedAvgMetricValue);
verifyMetric(metricNamePrefix + "-min", DESCRIPTION2, valueToRecord1, valueToRecord2, valueToRecord1);
verifyMetric(metricNamePrefix + "-max", DESCRIPTION3, valueToRecord1, valueToRecord2, valueToRecord2);
assertThat(metrics.metrics().size(), equalTo(3 + 1)); // one metric is added automatically in the constructor of Metrics
}
@Test
public void shouldAddMinAndMaxMetricsToSensor() {
StreamsMetricsImpl
.addMinAndMaxToSensor(sensor, group, tags, metricNamePrefix, description1, description2);
.addMinAndMaxToSensor(sensor, group, tags, metricNamePrefix, DESCRIPTION1, DESCRIPTION2);
final double valueToRecord1 = 18.0;
final double valueToRecord2 = 42.0;
verifyMetric(metricNamePrefix + "-min", description1, valueToRecord1, valueToRecord2, valueToRecord1);
verifyMetric(metricNamePrefix + "-max", description2, valueToRecord1, valueToRecord2, valueToRecord2);
verifyMetric(metricNamePrefix + "-min", DESCRIPTION1, valueToRecord1, valueToRecord2, valueToRecord1);
verifyMetric(metricNamePrefix + "-max", DESCRIPTION2, valueToRecord1, valueToRecord2, valueToRecord2);
assertThat(metrics.metrics().size(), equalTo(2 + 1)); // one metric is added automatically in the constructor of Metrics
}
@Test
public void shouldReturnMetricsVersionCurrent() {
assertThat(
new StreamsMetricsImpl(metrics, THREAD_ID, StreamsConfig.METRICS_LATEST, time).version(),
new StreamsMetricsImpl(metrics, THREAD_ID1, StreamsConfig.METRICS_LATEST, time).version(),
equalTo(Version.LATEST)
);
}
@ -1021,7 +1175,7 @@ public class StreamsMetricsImplTest {
@Test
public void shouldReturnMetricsVersionFrom100To23() {
assertThat(
new StreamsMetricsImpl(metrics, THREAD_ID, StreamsConfig.METRICS_0100_TO_24, time).version(),
new StreamsMetricsImpl(metrics, THREAD_ID1, StreamsConfig.METRICS_0100_TO_24, time).version(),
equalTo(Version.FROM_0100_TO_24)
);
}

View File

@ -303,7 +303,6 @@ public class TaskMetricsTest {
mockStatic(StateStoreMetrics.class);
if (builtInMetricsVersion == Version.FROM_0100_TO_24) {
expect(StateStoreMetrics.expiredWindowRecordDropSensor(
THREAD_ID,
TASK_ID,
storeType,
storeName,

View File

@ -46,7 +46,7 @@ import static org.junit.Assert.assertTrue;
public class KeyValueSegmentTest {
private final RocksDBMetricsRecorder metricsRecorder =
new RocksDBMetricsRecorder("metrics-scope", "thread-id", "store-name");
new RocksDBMetricsRecorder("metrics-scope", "store-name");
@Before
public void setUp() {

View File

@ -56,6 +56,7 @@ import org.rocksdb.Statistics;
import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@ -71,6 +72,7 @@ import static org.easymock.EasyMock.notNull;
import static org.easymock.EasyMock.reset;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
@ -578,7 +580,7 @@ public class RocksDBStoreTest {
}
@Test
public void shouldVerifyThatMetricsGetMeasurementsFromRocksDB() {
public void shouldVerifyThatMetricsRecordedFromStatisticsGetMeasurementsFromRocksDB() {
final TaskId taskId = new TaskId(0, 0);
final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.DEBUG));
@ -604,11 +606,47 @@ public class RocksDBStoreTest {
"bytes-written-total",
StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP,
"description is not verified",
streamsMetrics.storeLevelTagMap(Thread.currentThread().getName(), taskId.toString(), METRICS_SCOPE, DB_NAME)
streamsMetrics.storeLevelTagMap(taskId.toString(), METRICS_SCOPE, DB_NAME)
));
assertThat((double) bytesWrittenTotal.metricValue(), greaterThan(0d));
}
@Test
public void shouldVerifyThatMetricsRecordedFromPropertiesGetMeasurementsFromRocksDB() {
final TaskId taskId = new TaskId(0, 0);
final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.INFO));
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test-application", StreamsConfig.METRICS_LATEST, time);
context = EasyMock.niceMock(InternalMockProcessorContext.class);
EasyMock.expect(context.metrics()).andStubReturn(streamsMetrics);
EasyMock.expect(context.taskId()).andStubReturn(taskId);
EasyMock.expect(context.appConfigs())
.andStubReturn(new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals());
EasyMock.expect(context.stateDir()).andStubReturn(dir);
EasyMock.replay(context);
rocksDBStore.init(context, rocksDBStore);
final Metric numberOfEntriesActiveMemTable = metrics.metric(new MetricName(
"num-entries-active-mem-table",
StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP,
"description is not verified",
streamsMetrics.storeLevelTagMap(taskId.toString(), METRICS_SCOPE, DB_NAME)
));
assertThat(numberOfEntriesActiveMemTable, notNullValue());
assertThat(numberOfEntriesActiveMemTable.metricValue(), is(BigInteger.valueOf(0)));
final byte[] key = "hello".getBytes();
final byte[] value = "world".getBytes();
rocksDBStore.put(Bytes.wrap(key), value);
assertThat(numberOfEntriesActiveMemTable, notNullValue());
assertThat((BigInteger) numberOfEntriesActiveMemTable.metricValue(), greaterThan(BigInteger.valueOf(0)));
}
public static class MockRocksDbConfigSetter implements RocksDBConfigSetter {
static boolean called;

View File

@ -42,7 +42,7 @@ import static org.junit.Assert.assertTrue;
public class SegmentIteratorTest {
private final RocksDBMetricsRecorder rocksDBMetricsRecorder =
new RocksDBMetricsRecorder("metrics-scope", "thread-id", "store-name");
new RocksDBMetricsRecorder("metrics-scope", "store-name");
private final KeyValueSegment segmentOne =
new KeyValueSegment("one", "one", 0, rocksDBMetricsRecorder);
private final KeyValueSegment segmentTwo =

View File

@ -46,7 +46,7 @@ import static org.junit.Assert.assertTrue;
public class TimestampedSegmentTest {
private final RocksDBMetricsRecorder metricsRecorder =
new RocksDBMetricsRecorder("metrics-scope", "thread-id", "store-name");
new RocksDBMetricsRecorder("metrics-scope", "store-name");
@Before
public void setUp() {

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals.metrics;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.junit.Test;
import org.rocksdb.Cache;
import org.rocksdb.RocksDB;
import org.rocksdb.Statistics;
import java.math.BigInteger;
import java.util.Map;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.STORE_ID_TAG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_ID_TAG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG;
import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.powermock.api.easymock.PowerMock.replay;
public class RocksDBMetricsRecorderGaugesTest {
private static final String METRICS_SCOPE = "metrics-scope";
private static final TaskId TASK_ID = new TaskId(0, 0);
private static final String STORE_NAME = "store-name";
private static final String SEGMENT_STORE_NAME_1 = "segment-store-name-1";
private static final String SEGMENT_STORE_NAME_2 = "segment-store-name-2";
private static final String ROCKSDB_PROPERTIES_PREFIX = "rocksdb.";
private final RocksDB dbToAdd1 = mock(RocksDB.class);
private final RocksDB dbToAdd2 = mock(RocksDB.class);
private final Cache cacheToAdd1 = mock(Cache.class);
private final Cache cacheToAdd2 = mock(Cache.class);
private final Statistics statisticsToAdd1 = mock(Statistics.class);
private final Statistics statisticsToAdd2 = mock(Statistics.class);
@Test
public void shouldGetNumberOfEntriesActiveMemTable() throws Exception {
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, new MockTime());
final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
expect(dbToAdd1.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE))
.andStubReturn(5L);
expect(dbToAdd2.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE))
.andStubReturn(3L);
replay(dbToAdd1, dbToAdd2);
recorder.init(streamsMetrics, TASK_ID);
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2);
final Map<MetricName, ? extends Metric> metrics = streamsMetrics.metrics();
final Map<String, String> tagMap = mkMap(
mkEntry(THREAD_ID_TAG, Thread.currentThread().getName()),
mkEntry(TASK_ID_TAG, TASK_ID.toString()),
mkEntry(METRICS_SCOPE + "-" + STORE_ID_TAG, STORE_NAME)
);
final KafkaMetric metric = (KafkaMetric) metrics.get(new MetricName(
NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE,
STATE_STORE_LEVEL_GROUP,
"description is ignored",
tagMap
));
assertThat(metric, notNullValue());
assertThat(metric.metricValue(), is(BigInteger.valueOf(8)));
}
}

View File

@ -83,7 +83,7 @@ public class RocksDBMetricsRecorderTest {
private final StreamsMetricsImpl streamsMetrics = niceMock(StreamsMetricsImpl.class);
private final RocksDBMetricsRecordingTrigger recordingTrigger = mock(RocksDBMetricsRecordingTrigger.class);
private final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, THREAD_ID, STORE_NAME);
private final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
@Before
public void setUp() {
@ -446,7 +446,7 @@ public class RocksDBMetricsRecorderTest {
private void setUpMetricsMock() {
mockStatic(RocksDBMetrics.class);
final RocksDBMetricContext metricsContext =
new RocksDBMetricContext(THREAD_ID, TASK_ID1.toString(), METRICS_SCOPE, STORE_NAME);
new RocksDBMetricContext(TASK_ID1.toString(), METRICS_SCOPE, STORE_NAME);
expect(RocksDBMetrics.bytesWrittenToDatabaseSensor(eq(streamsMetrics), eq(metricsContext)))
.andReturn(bytesWrittenToDatabaseSensor);
expect(RocksDBMetrics.bytesReadFromDatabaseSensor(eq(streamsMetrics), eq(metricsContext)))
@ -471,13 +471,14 @@ public class RocksDBMetricsRecorderTest {
.andReturn(numberOfOpenFilesSensor);
expect(RocksDBMetrics.numberOfFileErrorsSensor(eq(streamsMetrics), eq(metricsContext)))
.andReturn(numberOfFileErrorsSensor);
RocksDBMetrics.addNumEntriesActiveMemTableMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
replay(RocksDBMetrics.class);
}
private void setUpMetricsStubMock() {
mockStatic(RocksDBMetrics.class);
final RocksDBMetricContext metricsContext =
new RocksDBMetricContext(THREAD_ID, TASK_ID1.toString(), METRICS_SCOPE, STORE_NAME);
new RocksDBMetricContext(TASK_ID1.toString(), METRICS_SCOPE, STORE_NAME);
expect(RocksDBMetrics.bytesWrittenToDatabaseSensor(streamsMetrics, metricsContext))
.andStubReturn(bytesWrittenToDatabaseSensor);
expect(RocksDBMetrics.bytesReadFromDatabaseSensor(streamsMetrics, metricsContext))
@ -502,6 +503,7 @@ public class RocksDBMetricsRecorderTest {
.andStubReturn(numberOfOpenFilesSensor);
expect(RocksDBMetrics.numberOfFileErrorsSensor(streamsMetrics, metricsContext))
.andStubReturn(numberOfFileErrorsSensor);
RocksDBMetrics.addNumEntriesActiveMemTableMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
replay(RocksDBMetrics.class);
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals.metrics;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
@ -26,9 +27,11 @@ import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.math.BigInteger;
import java.util.Collections;
import java.util.Map;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@ -44,10 +47,12 @@ import static org.powermock.api.easymock.PowerMock.verifyAll;
public class RocksDBMetricsTest {
private static final String STATE_LEVEL_GROUP = "stream-state-metrics";
private static final String THREAD_ID = "test-thread";
private static final String TASK_ID = "test-task";
private static final String STORE_TYPE = "test-store-type";
private static final String STORE_NAME = "store";
private static final RocksDBMetricContext ROCKSDB_METRIC_CONTEXT =
new RocksDBMetricContext(TASK_ID, STORE_TYPE, STORE_NAME);
private final Metrics metrics = new Metrics();
private final Sensor sensor = metrics.sensor("dummy");
private final StreamsMetricsImpl streamsMetrics = createStrictMock(StreamsMetricsImpl.class);
@ -215,6 +220,27 @@ public class RocksDBMetricsTest {
verifySumSensor(metricNamePrefix, true, description, RocksDBMetrics::numberOfFileErrorsSensor);
}
@Test
public void shouldAddNumImmutableMemTableMetric() {
final String name = "num-entries-active-mem-table";
final String description = "Current total number of entries in the active memtable";
final Gauge<BigInteger> valueProvider = (config, now) -> BigInteger.valueOf(10);
streamsMetrics.addStoreLevelMutableMetric(
eq(TASK_ID),
eq(STORE_TYPE),
eq(STORE_NAME),
eq(name),
eq(description),
eq(RecordingLevel.INFO),
eq(valueProvider)
);
replay(streamsMetrics);
RocksDBMetrics.addNumEntriesActiveMemTableMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, valueProvider);
verify(streamsMetrics);
}
private void verifyRateAndTotalSensor(final String metricNamePrefix,
final String descriptionOfTotal,
final String descriptionOfRate,
@ -268,14 +294,12 @@ public class RocksDBMetricsTest {
private void setupStreamsMetricsMock(final String metricNamePrefix) {
mockStatic(StreamsMetricsImpl.class);
expect(streamsMetrics.storeLevelSensor(
THREAD_ID,
TASK_ID,
STORE_NAME,
metricNamePrefix,
RecordingLevel.DEBUG
)).andReturn(sensor);
expect(streamsMetrics.storeLevelTagMap(
THREAD_ID,
TASK_ID,
STORE_TYPE,
STORE_NAME
@ -286,8 +310,7 @@ public class RocksDBMetricsTest {
replayAll();
replay(StreamsMetricsImpl.class);
final Sensor sensor =
sensorCreator.sensor(streamsMetrics, new RocksDBMetricContext(THREAD_ID, TASK_ID, STORE_TYPE, STORE_NAME));
final Sensor sensor = sensorCreator.sensor(streamsMetrics, ROCKSDB_METRIC_CONTEXT);
verifyAll();
verify(StreamsMetricsImpl.class);

View File

@ -307,9 +307,9 @@ public class StateStoreMetricsTest {
final String metricName = "expired-window-record-drop";
final String descriptionOfRate = "The average number of dropped records due to an expired window per second";
final String descriptionOfCount = "The total number of dropped records due to an expired window";
expect(streamsMetrics.storeLevelSensor(THREAD_ID, TASK_ID, STORE_NAME, metricName, RecordingLevel.INFO))
expect(streamsMetrics.storeLevelSensor(TASK_ID, STORE_NAME, metricName, RecordingLevel.INFO))
.andReturn(expectedSensor);
expect(streamsMetrics.storeLevelTagMap(THREAD_ID, TASK_ID, STORE_TYPE, STORE_NAME)).andReturn(storeTagMap);
expect(streamsMetrics.storeLevelTagMap(TASK_ID, STORE_TYPE, STORE_NAME)).andReturn(storeTagMap);
StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor,
"stream-" + STORE_TYPE + "-metrics",
@ -321,7 +321,7 @@ public class StateStoreMetricsTest {
replay(StreamsMetricsImpl.class, streamsMetrics);
final Sensor sensor =
StateStoreMetrics.expiredWindowRecordDropSensor(THREAD_ID, TASK_ID, STORE_TYPE, STORE_NAME, streamsMetrics);
StateStoreMetrics.expiredWindowRecordDropSensor(TASK_ID, STORE_TYPE, STORE_NAME, streamsMetrics);
verify(StreamsMetricsImpl.class, streamsMetrics);
assertThat(sensor, is(expectedSensor));
@ -338,9 +338,9 @@ public class StateStoreMetricsTest {
final String descriptionOfMin = "The minimum " + e2eLatencyDescription;
final String descriptionOfMax = "The maximum " + e2eLatencyDescription;
expect(streamsMetrics.storeLevelSensor(THREAD_ID, TASK_ID, STORE_NAME, metricName, RecordingLevel.TRACE))
expect(streamsMetrics.storeLevelSensor(TASK_ID, STORE_NAME, metricName, RecordingLevel.TRACE))
.andReturn(expectedSensor);
expect(streamsMetrics.storeLevelTagMap(THREAD_ID, TASK_ID, STORE_TYPE, STORE_NAME)).andReturn(storeTagMap);
expect(streamsMetrics.storeLevelTagMap(TASK_ID, STORE_TYPE, STORE_NAME)).andReturn(storeTagMap);
StreamsMetricsImpl.addAvgAndMinAndMaxToSensor(
expectedSensor,
STORE_LEVEL_GROUP,
@ -353,7 +353,7 @@ public class StateStoreMetricsTest {
replay(StreamsMetricsImpl.class, streamsMetrics);
final Sensor sensor =
StateStoreMetrics.e2ELatencySensor(THREAD_ID, TASK_ID, STORE_TYPE, STORE_NAME, streamsMetrics);
StateStoreMetrics.e2ELatencySensor(TASK_ID, STORE_TYPE, STORE_NAME, streamsMetrics);
verify(StreamsMetricsImpl.class, streamsMetrics);
assertThat(sensor, is(expectedSensor));
@ -368,7 +368,6 @@ public class StateStoreMetricsTest {
if (builtInMetricsVersion == Version.FROM_0100_TO_24) {
setUpParentSensor(metricName, descriptionOfRate, descriptionOfCount, descriptionOfAvg, descriptionOfMax);
expect(streamsMetrics.storeLevelSensor(
THREAD_ID,
TASK_ID,
STORE_NAME,
metricName,
@ -385,7 +384,6 @@ public class StateStoreMetricsTest {
);
} else {
expect(streamsMetrics.storeLevelSensor(
THREAD_ID,
TASK_ID,
STORE_NAME,
metricName,
@ -399,7 +397,7 @@ public class StateStoreMetricsTest {
descriptionOfRate
);
}
expect(streamsMetrics.storeLevelTagMap(THREAD_ID, TASK_ID, STORE_TYPE, STORE_NAME)).andReturn(storeTagMap);
expect(streamsMetrics.storeLevelTagMap(TASK_ID, STORE_TYPE, STORE_NAME)).andReturn(storeTagMap);
StreamsMetricsImpl.addAvgAndMaxToSensor(
expectedSensor,
storeLevelGroup,
@ -422,7 +420,7 @@ public class StateStoreMetricsTest {
final String descriptionOfMax) {
expect(streamsMetrics.taskLevelSensor(THREAD_ID, TASK_ID, metricName, RecordingLevel.DEBUG))
.andReturn(parentSensor);
expect(streamsMetrics.storeLevelTagMap(THREAD_ID, TASK_ID, STORE_TYPE, StreamsMetricsImpl.ROLLUP_VALUE))
expect(streamsMetrics.storeLevelTagMap(TASK_ID, STORE_TYPE, StreamsMetricsImpl.ROLLUP_VALUE))
.andReturn(allTagMap);
StreamsMetricsImpl.addInvocationRateAndCountToSensor(
parentSensor,
@ -454,7 +452,7 @@ public class StateStoreMetricsTest {
final String currentMetricName = metricName + "-current";
final String group;
final Map<String, String> tagMap;
expect(streamsMetrics.storeLevelSensor(THREAD_ID, TASK_ID, BUFFER_NAME, metricName, RecordingLevel.DEBUG)).andReturn(expectedSensor);
expect(streamsMetrics.storeLevelSensor(TASK_ID, BUFFER_NAME, metricName, RecordingLevel.DEBUG)).andReturn(expectedSensor);
if (builtInMetricsVersion == Version.FROM_0100_TO_24) {
group = BUFFER_LEVEL_GROUP_FROM_0100_TO_24;
tagMap = bufferTagMap;
@ -469,7 +467,7 @@ public class StateStoreMetricsTest {
} else {
group = STORE_LEVEL_GROUP;
tagMap = storeTagMap;
expect(streamsMetrics.storeLevelTagMap(THREAD_ID, TASK_ID, STORE_TYPE, BUFFER_NAME)).andReturn(tagMap);
expect(streamsMetrics.storeLevelTagMap(TASK_ID, STORE_TYPE, BUFFER_NAME)).andReturn(tagMap);
}
StreamsMetricsImpl.addAvgAndMaxToSensor(
expectedSensor,