KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) (#14620)

The PR outlines classes to collect metrics for client by KafkaMetricsCollector implementation. The MetricsCollector defines mechanism to collect client metrics in sum and gauge metrics format. This requires to define cumulative and delta telemetry metrics while collecting raw metrics.

Singl point metric class helps creating OTLP format Metric object wrapped over Single point metric class itself.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Xavier Léauté <xavier@confluent.io>, Philip Nee <pnee@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Apoorv Mittal 2023-11-29 11:37:22 +05:30 committed by GitHub
parent 10555ec6de
commit 009b57d870
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1504 additions and 3 deletions

View File

@ -198,6 +198,7 @@
</subpackage> </subpackage>
<subpackage name="telemetry"> <subpackage name="telemetry">
<allow pkg="io.opentelemetry.proto"/>
<allow pkg="org.apache.kafka.common" /> <allow pkg="org.apache.kafka.common" />
</subpackage> </subpackage>

View File

@ -0,0 +1,346 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.telemetry.internals;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricValueProvider;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeCount;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Frequencies;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.metrics.stats.Percentiles;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.SimpleRate;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Field;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* All metrics implement the {@link MetricValueProvider} interface. They are divided into
* two base types:
*
* <ol>
* <li>{@link Gauge}</li>
* <li>{@link Measurable}</li>
* </ol>
*
* {@link Gauge Gauges} can have any value, but we only collect metrics with number values.
* {@link Measurable Measurables} are divided into simple types with single values
* ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link Rate},
* {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link Frequencies},
* {@link Meter}, and {@link Percentiles}).
*
* <p>
*
* We can safely assume that a {@link CumulativeCount count} always increases in steady state. It
* should be a bug if a count metric decreases.
*
* <p>
*
* Total and Sum are treated as a monotonically increasing counter. The javadocs for Total metric type
* say "An un-windowed cumulative total maintained over all time.". The standalone Total metrics in
* the codebase seem to be cumulative metrics that will always increase. The Total metric underlying
* Meter type is mostly a Total of a Count metric.
* We can assume that a Total metric always increases (but it is not guaranteed as the sample values might be both
* negative or positive).
* For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid counter-example.
*
* <p>
*
* The Sum as it is a sample sum which is not a cumulative metric. It is converted to GAUGE_DOUBLE.
*
* <p>
*
* The compound metrics are virtual metrics. They are composed of simple types or anonymous measurable types
* which are reported. A compound metric is never reported as-is.
*
* <p>
*
* A Meter metric is always created with and reported as 2 metrics: a rate and a count. For eg:
* org.apache.kafka.common.network.Selector has Meter metric for "connection-close" but it has to be
* created with a "connection-close-rate" metric of type rate and a "connection-close-total"
* metric of type total.
*
* <p>
*
* Frequencies is created with an array of Frequency objects. When a Frequencies metric is registered, each
* member Frequency object is converted into an anonymous Measurable and registered. So, a Frequencies metric
* is reported with a set of measurables with name = Frequency.name(). As there is no way to figure out the
* compound type, each component measurables is converted to a GAUGE_DOUBLE.
*
* <p>
*
* Percentiles work the same way as Frequencies. The only difference is that it is composed of Percentile
* types instead. So, we should treat the component measurable as GAUGE_DOUBLE.
*
* <p>
*
* Some metrics are defined as either anonymous inner classes or lambdas implementing the Measurable
* interface. As we do not have any information on how to treat them, we should fallback to treating
* them as GAUGE_DOUBLE.
*
* <p>
*
* OpenTelemetry mapping for measurables:
* Avg / Rate / Min / Max / Total / Sum -> Gauge
* Count -> Sum
* Meter has 2 elements :
* Total -> Sum
* Rate -> Gauge
* Frequencies -> each component is Gauge
* Percentiles -> each component is Gauge
*/
public class KafkaMetricsCollector implements MetricsCollector {
private static final Logger log = LoggerFactory.getLogger(KafkaMetricsCollector.class);
private final StateLedger ledger;
private final Time time;
private final MetricNamingStrategy<MetricName> metricNamingStrategy;
private static final Field METRIC_VALUE_PROVIDER_FIELD;
static {
try {
METRIC_VALUE_PROVIDER_FIELD = KafkaMetric.class.getDeclaredField("metricValueProvider");
METRIC_VALUE_PROVIDER_FIELD.setAccessible(true);
} catch (Exception e) {
throw new KafkaException(e);
}
}
public KafkaMetricsCollector(MetricNamingStrategy<MetricName> metricNamingStrategy) {
this(metricNamingStrategy, Time.SYSTEM);
}
// Visible for testing
KafkaMetricsCollector(MetricNamingStrategy<MetricName> metricNamingStrategy, Time time) {
this.metricNamingStrategy = metricNamingStrategy;
this.time = time;
this.ledger = new StateLedger();
}
public void init(List<KafkaMetric> metrics) {
ledger.init(metrics);
}
/**
* This is called whenever a metric is updated or added.
*/
public void metricChange(KafkaMetric metric) {
ledger.metricChange(metric);
}
/**
* This is called whenever a metric is removed.
*/
public void metricRemoval(KafkaMetric metric) {
ledger.metricRemoval(metric);
}
/**
* This is called whenever temporality changes, resets the value tracker for metrics.
*/
public void metricsReset() {
ledger.metricsStateReset();
}
// Visible for testing
Set<MetricKey> getTrackedMetrics() {
return ledger.metricMap.keySet();
}
@Override
public void collect(MetricsEmitter metricsEmitter) {
for (Map.Entry<MetricKey, KafkaMetric> entry : ledger.getMetrics()) {
MetricKey metricKey = entry.getKey();
KafkaMetric metric = entry.getValue();
try {
collectMetric(metricsEmitter, metricKey, metric);
} catch (Exception e) {
// catch and log to continue processing remaining metrics
log.error("Error processing Kafka metric {}", metricKey, e);
}
}
}
protected void collectMetric(MetricsEmitter metricsEmitter, MetricKey metricKey, KafkaMetric metric) {
Object metricValue;
try {
metricValue = metric.metricValue();
} catch (Exception e) {
// If an exception occurs when retrieving value, log warning and continue to process the rest of metrics
log.warn("Failed to retrieve metric value {}", metricKey.name(), e);
return;
}
Instant now = Instant.ofEpochMilli(time.milliseconds());
if (isMeasurable(metric)) {
Measurable measurable = metric.measurable();
Double value = (Double) metricValue;
if (measurable instanceof WindowedCount || measurable instanceof CumulativeSum) {
collectSum(metricKey, value, metricsEmitter, now);
} else {
collectGauge(metricKey, value, metricsEmitter, now);
}
} else {
// It is non-measurable Gauge metric.
// Collect the metric only if its value is a number.
if (metricValue instanceof Number) {
Number value = (Number) metricValue;
collectGauge(metricKey, value, metricsEmitter, now);
} else {
// skip non-measurable metrics
log.debug("Skipping non-measurable gauge metric {}", metricKey.name());
}
}
}
private void collectSum(MetricKey metricKey, double value, MetricsEmitter metricsEmitter, Instant timestamp) {
if (!metricsEmitter.shouldEmitMetric(metricKey)) {
return;
}
if (metricsEmitter.shouldEmitDeltaMetrics()) {
InstantAndValue<Double> instantAndValue = ledger.delta(metricKey, timestamp, value);
metricsEmitter.emitMetric(
SinglePointMetric.deltaSum(metricKey, instantAndValue.getValue(), true, timestamp,
instantAndValue.getIntervalStart())
);
} else {
metricsEmitter.emitMetric(
SinglePointMetric.sum(metricKey, value, true, timestamp, ledger.instantAdded(metricKey))
);
}
}
private void collectGauge(MetricKey metricKey, Number value, MetricsEmitter metricsEmitter, Instant timestamp) {
if (!metricsEmitter.shouldEmitMetric(metricKey)) {
return;
}
metricsEmitter.emitMetric(
SinglePointMetric.gauge(metricKey, value, timestamp)
);
}
private static boolean isMeasurable(KafkaMetric metric) {
// KafkaMetric does not expose the internal MetricValueProvider and throws an IllegalStateException
// exception, if measurable() is called for a Gauge.
// There are 2 ways to find the type of internal MetricValueProvider for a KafkaMetric - use reflection or
// get the information based on whether a IllegalStateException exception is thrown.
// We use reflection so that we can avoid the cost of generating the stack trace when it's
// not a measurable.
try {
Object provider = METRIC_VALUE_PROVIDER_FIELD.get(metric);
return provider instanceof Measurable;
} catch (Exception e) {
throw new KafkaException(e);
}
}
/**
* Keeps track of the state of metrics, e.g. when they were added, what their getAndSet value is,
* and clearing them out when they're removed.
*/
private class StateLedger {
private final Map<MetricKey, KafkaMetric> metricMap = new ConcurrentHashMap<>();
private final LastValueTracker<Double> doubleDeltas = new LastValueTracker<>();
private final Map<MetricKey, Instant> metricAdded = new ConcurrentHashMap<>();
private Instant instantAdded(MetricKey metricKey) {
// lookup when the metric was added to use it as the interval start. That should always
// exist, but if it doesn't (e.g. changed metrics temporality) then we use now.
return metricAdded.computeIfAbsent(metricKey, x -> Instant.ofEpochMilli(time.milliseconds()));
}
private void init(List<KafkaMetric> metrics) {
log.info("initializing Kafka metrics collector");
for (KafkaMetric m : metrics) {
metricMap.put(metricNamingStrategy.metricKey(m.metricName()), m);
}
}
private void metricChange(KafkaMetric metric) {
MetricKey metricKey = metricNamingStrategy.metricKey(metric.metricName());
metricMap.put(metricKey, metric);
if (doubleDeltas.contains(metricKey)) {
log.warn("Registering a new metric {} which already has a last value tracked. " +
"Removing metric from delta register.", metric.metricName(), new Exception());
/*
This scenario shouldn't occur while registering a metric since it should
have already been cleared out on cleanup/shutdown.
We remove the metric here to clear out the delta register because we are running
into an issue where old metrics are being re-registered which causes us to
record a negative delta
*/
doubleDeltas.remove(metricKey);
}
metricAdded.put(metricKey, Instant.ofEpochMilli(time.milliseconds()));
}
private void metricRemoval(KafkaMetric metric) {
log.debug("removing kafka metric : {}", metric.metricName());
MetricKey metricKey = metricNamingStrategy.metricKey(metric.metricName());
metricMap.remove(metricKey);
doubleDeltas.remove(metricKey);
metricAdded.remove(metricKey);
}
private Iterable<? extends Entry<MetricKey, KafkaMetric>> getMetrics() {
return metricMap.entrySet();
}
private InstantAndValue<Double> delta(MetricKey metricKey, Instant now, Double value) {
Optional<InstantAndValue<Double>> lastValue = doubleDeltas.getAndSet(metricKey, now, value);
return lastValue
.map(last -> new InstantAndValue<>(last.getIntervalStart(), value - last.getValue()))
.orElse(new InstantAndValue<>(instantAdded(metricKey), value));
}
private void metricsStateReset() {
metricAdded.clear();
doubleDeltas.reset();
}
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.telemetry.internals;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/**
* A LastValueTracker uses a ConcurrentHashMap to maintain historic values for a given key, and return
* a previous value and an Instant for that value.
*
* @param <T> The type of the value.
*/
public class LastValueTracker<T> {
private final Map<MetricKey, InstantAndValue<T>> counters = new ConcurrentHashMap<>();
/**
* Return the last instant/value for the given MetricKey, or Optional.empty if there isn't one.
*
* @param metricKey the key for which to calculate a getAndSet.
* @param now the timestamp for the new value.
* @param value the current value.
* @return the timestamp of the previous entry and its value. If there
* isn't a previous entry, then this method returns {@link Optional#empty()}
*/
public Optional<InstantAndValue<T>> getAndSet(MetricKey metricKey, Instant now, T value) {
InstantAndValue<T> instantAndValue = new InstantAndValue<>(now, value);
InstantAndValue<T> valueOrNull = counters.put(metricKey, instantAndValue);
// there wasn't already an entry, so return empty.
if (valueOrNull == null) {
return Optional.empty();
}
// Return the previous instance and the value.
return Optional.of(valueOrNull);
}
public InstantAndValue<T> remove(MetricKey metricKey) {
return counters.remove(metricKey);
}
public boolean contains(MetricKey metricKey) {
return counters.containsKey(metricKey);
}
public void reset() {
counters.clear();
}
public static class InstantAndValue<T> {
private final Instant intervalStart;
private final T value;
public InstantAndValue(Instant intervalStart, T value) {
this.intervalStart = Objects.requireNonNull(intervalStart);
this.value = Objects.requireNonNull(value);
}
public Instant getIntervalStart() {
return intervalStart;
}
public T getValue() {
return value;
}
}
}

View File

@ -50,6 +50,13 @@ public interface MetricsEmitter extends Closeable {
*/ */
boolean shouldEmitMetric(MetricKeyable metricKeyable); boolean shouldEmitMetric(MetricKeyable metricKeyable);
/**
* Determines if the delta aggregation temporality metrics are to be emitted.
*
* @return {@code true} if the delta metric should be emitted, {@code false} otherwise
*/
boolean shouldEmitDeltaMetrics();
/** /**
* Emits the metric in an implementation-specific fashion. Depending on the implementation, * Emits the metric in an implementation-specific fashion. Depending on the implementation,
* calls made to this after {@link #close()} has been invoked will fail. * calls made to this after {@link #close()} has been invoked will fail.

View File

@ -16,16 +16,28 @@
*/ */
package org.apache.kafka.common.telemetry.internals; package org.apache.kafka.common.telemetry.internals;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/** /**
* This class represents a metric that does not yet contain resource tags. * This class represents a telemetry metric that does not yet contain resource tags.
* These additional resource tags will be added before emitting metrics by the telemetry reporter. * These additional resource tags will be added before emitting metrics by the telemetry reporter.
*/ */
public class SinglePointMetric implements MetricKeyable { public class SinglePointMetric implements MetricKeyable {
private final MetricKey key; private final MetricKey key;
private final Metric.Builder metricBuilder;
private SinglePointMetric(MetricKey key) { private SinglePointMetric(MetricKey key, Metric.Builder metricBuilder) {
this.key = key; this.key = key;
this.metricBuilder = metricBuilder;
} }
@Override @Override
@ -33,5 +45,102 @@ public class SinglePointMetric implements MetricKeyable {
return key; return key;
} }
// TODO: Implement methods for serializing/deserializing metrics in required format. public Metric.Builder builder() {
return metricBuilder;
}
/*
Methods to construct gauge metric type.
*/
public static SinglePointMetric gauge(MetricKey metricKey, Number value, Instant timestamp) {
NumberDataPoint.Builder point = point(timestamp, value);
return gauge(metricKey, point);
}
public static SinglePointMetric gauge(MetricKey metricKey, double value, Instant timestamp) {
NumberDataPoint.Builder point = point(timestamp, value);
return gauge(metricKey, point);
}
/*
Methods to construct sum metric type.
*/
public static SinglePointMetric sum(MetricKey metricKey, double value, boolean monotonic, Instant timestamp) {
return sum(metricKey, value, monotonic, timestamp, null);
}
public static SinglePointMetric sum(MetricKey metricKey, double value, boolean monotonic, Instant timestamp,
Instant startTimestamp) {
NumberDataPoint.Builder point = point(timestamp, value);
if (startTimestamp != null) {
point.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));
}
return sum(metricKey, AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, monotonic, point);
}
public static SinglePointMetric deltaSum(MetricKey metricKey, double value, boolean monotonic,
Instant timestamp, Instant startTimestamp) {
NumberDataPoint.Builder point = point(timestamp, value)
.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));
return sum(metricKey, AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, monotonic, point);
}
/*
Helper methods to support metric construction.
*/
private static SinglePointMetric sum(MetricKey metricKey, AggregationTemporality aggregationTemporality,
boolean monotonic, NumberDataPoint.Builder point) {
point.addAllAttributes(asAttributes(metricKey.tags()));
Metric.Builder metric = Metric.newBuilder().setName(metricKey.name());
metric
.getSumBuilder()
.setAggregationTemporality(aggregationTemporality)
.setIsMonotonic(monotonic)
.addDataPoints(point);
return new SinglePointMetric(metricKey, metric);
}
private static SinglePointMetric gauge(MetricKey metricKey, NumberDataPoint.Builder point) {
point.addAllAttributes(asAttributes(metricKey.tags()));
Metric.Builder metric = Metric.newBuilder().setName(metricKey.name());
metric.getGaugeBuilder().addDataPoints(point);
return new SinglePointMetric(metricKey, metric);
}
private static NumberDataPoint.Builder point(Instant timestamp, Number value) {
if (value instanceof Long || value instanceof Integer) {
return point(timestamp, value.longValue());
}
return point(timestamp, value.doubleValue());
}
private static NumberDataPoint.Builder point(Instant timestamp, long value) {
return NumberDataPoint.newBuilder()
.setTimeUnixNano(toTimeUnixNanos(timestamp))
.setAsInt(value);
}
private static NumberDataPoint.Builder point(Instant timestamp, double value) {
return NumberDataPoint.newBuilder()
.setTimeUnixNano(toTimeUnixNanos(timestamp))
.setAsDouble(value);
}
private static Iterable<KeyValue> asAttributes(Map<String, String> labels) {
return labels.entrySet().stream().map(
entry -> KeyValue.newBuilder()
.setKey(entry.getKey())
.setValue(AnyValue.newBuilder().setStringValue(entry.getValue())).build()
)::iterator;
}
private static long toTimeUnixNanos(Instant t) {
return TimeUnit.SECONDS.toNanos(t.getEpochSecond()) + t.getNano();
}
} }

View File

@ -0,0 +1,605 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.telemetry.internals;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.utils.MockTime;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class KafkaMetricsCollectorTest {
private static final String DOMAIN = "test.domain";
private MetricName metricName;
private Map<String, String> tags;
private Metrics metrics;
private MetricNamingStrategy<MetricName> metricNamingStrategy;
private KafkaMetricsCollector collector;
private TestEmitter testEmitter;
private MockTime time;
@BeforeEach
public void setUp() {
metrics = new Metrics();
tags = Collections.singletonMap("tag", "value");
metricName = metrics.metricName("name1", "group1", tags);
time = new MockTime(0, 1000L, TimeUnit.MILLISECONDS.toNanos(1000L));
testEmitter = new TestEmitter();
// Define metric naming strategy.
metricNamingStrategy = TelemetryMetricNamingConvention.getClientTelemetryMetricNamingStrategy(DOMAIN);
// Define collector to test.
collector = new KafkaMetricsCollector(
metricNamingStrategy,
time
);
// Add reporter to metrics.
metrics.addReporter(getTestMetricsReporter());
}
@Test
public void testMeasurableCounter() {
Sensor sensor = metrics.sensor("test");
sensor.add(metricName, new WindowedCount());
sensor.record();
sensor.record();
time.sleep(60 * 1000L);
// Collect metrics.
collector.collect(testEmitter);
List<SinglePointMetric> result = testEmitter.emittedMetrics();
// Should get exactly 2 Kafka measurables since Metrics always includes a count measurable.
assertEquals(2, result.size());
Metric counter = result.stream()
.flatMap(metrics -> Stream.of(metrics.builder().build()))
.filter(metric -> metric.getName().equals("test.domain.group1.name1")).findFirst().get();
assertTrue(counter.hasSum());
assertEquals(tags, getTags(counter.getSum().getDataPoints(0).getAttributesList()));
assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, counter.getSum().getAggregationTemporality());
assertTrue(counter.getSum().getIsMonotonic());
NumberDataPoint point = counter.getSum().getDataPoints(0);
assertEquals(2d, point.getAsDouble(), 0.0);
assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(61L).getEpochSecond()) +
Instant.ofEpochSecond(61L).getNano(), point.getTimeUnixNano());
assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(1L).getEpochSecond()) +
Instant.ofEpochSecond(1L).getNano(), point.getStartTimeUnixNano());
}
@Test
public void testMeasurableCounterDeltaMetrics() {
Sensor sensor = metrics.sensor("test");
sensor.add(metricName, new WindowedCount());
sensor.record();
sensor.record();
time.sleep(60 * 1000L);
// Collect delta metrics.
testEmitter.onlyDeltaMetrics(true);
collector.collect(testEmitter);
List<SinglePointMetric> result = testEmitter.emittedMetrics();
// Should get exactly 2 Kafka measurables since Metrics always includes a count measurable.
assertEquals(2, result.size());
Metric counter = result.stream()
.flatMap(metrics -> Stream.of(metrics.builder().build()))
.filter(metric -> metric.getName().equals("test.domain.group1.name1")).findFirst().get();
assertTrue(counter.hasSum());
assertEquals(tags, getTags(counter.getSum().getDataPoints(0).getAttributesList()));
assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, counter.getSum().getAggregationTemporality());
assertTrue(counter.getSum().getIsMonotonic());
NumberDataPoint point = counter.getSum().getDataPoints(0);
assertEquals(2d, point.getAsDouble(), 0.0);
assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(61L).getEpochSecond()) +
Instant.ofEpochSecond(61L).getNano(), point.getTimeUnixNano());
assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(1L).getEpochSecond()) +
Instant.ofEpochSecond(1L).getNano(), point.getStartTimeUnixNano());
}
@Test
public void testMeasurableTotal() {
Sensor sensor = metrics.sensor("test");
sensor.add(metricName, new CumulativeSum());
sensor.record(10L);
sensor.record(5L);
// Collect metrics.
collector.collect(testEmitter);
List<SinglePointMetric> result = testEmitter.emittedMetrics();
// Should get exactly 2 Kafka measurables since Metrics always includes a count measurable.
assertEquals(2, result.size());
Metric counter = result.stream()
.flatMap(metrics -> Stream.of(metrics.builder().build()))
.filter(metric -> metric.getName().equals("test.domain.group1.name1")).findFirst().get();
assertTrue(counter.hasSum());
assertEquals(tags, getTags(counter.getSum().getDataPoints(0).getAttributesList()));
assertEquals(15, counter.getSum().getDataPoints(0).getAsDouble(), 0.0);
}
@Test
public void testMeasurableTotalDeltaMetrics() {
Sensor sensor = metrics.sensor("test");
sensor.add(metricName, new CumulativeSum());
sensor.record(10L);
sensor.record(5L);
// Collect metrics.
testEmitter.onlyDeltaMetrics(true);
collector.collect(testEmitter);
List<SinglePointMetric> result = testEmitter.emittedMetrics();
// Should get exactly 2 Kafka measurables since Metrics always includes a count measurable.
assertEquals(2, result.size());
Metric counter = result.stream()
.flatMap(metrics -> Stream.of(metrics.builder().build()))
.filter(metric -> metric.getName().equals("test.domain.group1.name1")).findFirst().get();
assertTrue(counter.hasSum());
assertEquals(tags, getTags(counter.getSum().getDataPoints(0).getAttributesList()));
assertEquals(15, counter.getSum().getDataPoints(0).getAsDouble(), 0.0);
}
@Test
public void testMeasurableGauge() {
metrics.addMetric(metricName, (config, now) -> 100.0);
collector.collect(testEmitter);
List<SinglePointMetric> result = testEmitter.emittedMetrics();
// Should get exactly 2 Kafka measurables since Metrics always includes a count measurable.
assertEquals(2, result.size());
Metric counter = result.stream()
.flatMap(metrics -> Stream.of(metrics.builder().build()))
.filter(metric -> metric.getName().equals("test.domain.group1.name1")).findFirst().get();
assertTrue(counter.hasGauge());
assertEquals(tags, getTags(counter.getGauge().getDataPoints(0).getAttributesList()));
assertEquals(100L, counter.getGauge().getDataPoints(0).getAsDouble(), 0.0);
}
@Test
public void testNonMeasurable() {
metrics.addMetric(metrics.metricName("float", "group1", tags), (Gauge<Float>) (config, now) -> 99f);
metrics.addMetric(metrics.metricName("double", "group1", tags), (Gauge<Double>) (config, now) -> 99d);
metrics.addMetric(metrics.metricName("int", "group1", tags), (Gauge<Integer>) (config, now) -> 100);
metrics.addMetric(metrics.metricName("long", "group1", tags), (Gauge<Long>) (config, now) -> 100L);
collector.collect(testEmitter);
List<SinglePointMetric> result = testEmitter.emittedMetrics();
// Should get exactly 5 Kafka measurables since Metrics always includes a count measurable.
assertEquals(5, result.size());
result.stream()
.flatMap(metrics -> Stream.of(metrics.builder().build()))
.filter(metric -> metric.getName().equals("test.domain.group1.(float|double)")).forEach(
doubleGauge -> {
assertTrue(doubleGauge.hasGauge());
assertEquals(tags, getTags(doubleGauge.getGauge().getDataPoints(0).getAttributesList()));
assertEquals(99d, doubleGauge.getGauge().getDataPoints(0).getAsDouble(), 0.0);
});
result.stream()
.flatMap(metrics -> Stream.of(metrics.builder().build()))
.filter(metric -> metric.getName().equals("test.domain.group1.(int|long)")).forEach(
intGauge -> {
assertTrue(intGauge.hasGauge());
assertEquals(tags, getTags(intGauge.getGauge().getDataPoints(0).getAttributesList()));
assertEquals(100, intGauge.getGauge().getDataPoints(0).getAsDouble(), 0.0);
});
}
@Test
public void testMeasurableWithException() {
metrics.addMetric(metricName, null, (config, now) -> {
throw new RuntimeException();
});
collector.collect(testEmitter);
List<SinglePointMetric> result = testEmitter.emittedMetrics();
//Verify only the global count of metrics exist
assertEquals(1, result.size());
// Group is registered as kafka-metrics-count
assertEquals("test.domain.kafka.count.count", result.get(0).builder().build().getName());
//Verify metrics with measure() method throw exception is not returned
assertFalse(result.stream()
.flatMap(metrics -> Stream.of(metrics.builder().build()))
.anyMatch(metric -> metric.getName().equals("test.domain.group1.name1")));
}
@Test
public void testMetricRemoval() {
metrics.addMetric(metricName, (config, now) -> 100.0);
collector.collect(testEmitter);
assertEquals(2, testEmitter.emittedMetrics().size());
metrics.removeMetric(metricName);
assertFalse(collector.getTrackedMetrics().contains(metricNamingStrategy.metricKey(metricName)));
// verify that the metric was removed.
testEmitter.reset();
collector.collect(testEmitter);
List<SinglePointMetric> collected = testEmitter.emittedMetrics();
assertEquals(1, collected.size());
assertEquals("test.domain.kafka.count.count", collected.get(0).builder().build().getName());
}
@Test
public void testSecondCollectCumulative() {
Sensor sensor = metrics.sensor("test");
sensor.add(metricName, new CumulativeSum());
sensor.record();
sensor.record();
time.sleep(60 * 1000L);
collector.collect(testEmitter);
// Update it again by 5 and advance time by another 60 seconds.
sensor.record();
sensor.record();
sensor.record();
sensor.record();
sensor.record();
time.sleep(60 * 1000L);
testEmitter.reset();
collector.collect(testEmitter);
List<SinglePointMetric> result = testEmitter.emittedMetrics();
assertEquals(2, result.size());
Metric cumulative = result.stream()
.flatMap(metrics -> Stream.of(metrics.builder().build()))
.filter(metric -> metric.getName().equals("test.domain.group1.name1")).findFirst().get();
NumberDataPoint point = cumulative.getSum().getDataPoints(0);
assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, cumulative.getSum().getAggregationTemporality());
assertTrue(cumulative.getSum().getIsMonotonic());
assertEquals(7d, point.getAsDouble(), 0.0);
assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(121L).getEpochSecond()) +
Instant.ofEpochSecond(121L).getNano(), point.getTimeUnixNano());
assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(1L).getEpochSecond()) +
Instant.ofEpochSecond(1L).getNano(), point.getStartTimeUnixNano());
}
@Test
public void testSecondDeltaCollectDouble() {
Sensor sensor = metrics.sensor("test");
sensor.add(metricName, new CumulativeSum());
sensor.record();
sensor.record();
time.sleep(60 * 1000L);
testEmitter.onlyDeltaMetrics(true);
collector.collect(testEmitter);
// Update it again by 5 and advance time by another 60 seconds.
sensor.record();
sensor.record();
sensor.record();
sensor.record();
sensor.record();
time.sleep(60 * 1000L);
testEmitter.reset();
collector.collect(testEmitter);
List<SinglePointMetric> result = testEmitter.emittedMetrics();
assertEquals(2, result.size());
Metric cumulative = result.stream()
.flatMap(metrics -> Stream.of(metrics.builder().build()))
.filter(metric -> metric.getName().equals("test.domain.group1.name1")).findFirst().get();
NumberDataPoint point = cumulative.getSum().getDataPoints(0);
assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, cumulative.getSum().getAggregationTemporality());
assertTrue(cumulative.getSum().getIsMonotonic());
assertEquals(5d, point.getAsDouble(), 0.0);
assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(121L).getEpochSecond()) +
Instant.ofEpochSecond(121L).getNano(), point.getTimeUnixNano());
assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(61L).getEpochSecond()) +
Instant.ofEpochSecond(61L).getNano(), point.getStartTimeUnixNano());
}
@Test
public void testCollectFilter() {
metrics.addMetric(metricName, (config, now) -> 100.0);
testEmitter.reconfigurePredicate(k -> !k.key().name().endsWith(".count"));
collector.collect(testEmitter);
List<SinglePointMetric> result = testEmitter.emittedMetrics();
// Should get exactly 1 Kafka measurables because we excluded the count measurable
assertEquals(1, result.size());
Metric counter = result.get(0).builder().build();
assertTrue(counter.hasGauge());
assertEquals(100L, counter.getGauge().getDataPoints(0).getAsDouble(), 0.0);
}
@Test
public void testCollectFilterWithCumulativeTemporality() {
MetricName name1 = metrics.metricName("nonMeasurable", "group1", tags);
MetricName name2 = metrics.metricName("windowed", "group1", tags);
MetricName name3 = metrics.metricName("cumulative", "group1", tags);
metrics.addMetric(name1, (Gauge<Double>) (config, now) -> 99d);
Sensor sensor = metrics.sensor("test");
sensor.add(name2, new WindowedCount());
sensor.add(name3, new CumulativeSum());
collector.collect(testEmitter);
List<SinglePointMetric> result = testEmitter.emittedMetrics();
// no-filter shall result in all 4 data metrics.
assertEquals(4, result.size());
testEmitter.reset();
testEmitter.reconfigurePredicate(k -> !k.key().name().endsWith(".count"));
collector.collect(testEmitter);
result = testEmitter.emittedMetrics();
// Drop metrics for Count type.
assertEquals(3, result.size());
testEmitter.reset();
testEmitter.reconfigurePredicate(k -> !k.key().name().endsWith(".nonmeasurable"));
collector.collect(testEmitter);
result = testEmitter.emittedMetrics();
// Drop non-measurable metric.
assertEquals(3, result.size());
testEmitter.reset();
testEmitter.reconfigurePredicate(key -> true);
collector.collect(testEmitter);
result = testEmitter.emittedMetrics();
// Again no filter.
assertEquals(4, result.size());
}
@Test
public void testCollectFilterWithDeltaTemporality() {
MetricName name1 = metrics.metricName("nonMeasurable", "group1", tags);
MetricName name2 = metrics.metricName("windowed", "group1", tags);
MetricName name3 = metrics.metricName("cumulative", "group1", tags);
metrics.addMetric(name1, (Gauge<Double>) (config, now) -> 99d);
Sensor sensor = metrics.sensor("test");
sensor.add(name2, new WindowedCount());
sensor.add(name3, new CumulativeSum());
testEmitter.onlyDeltaMetrics(true);
collector.collect(testEmitter);
List<SinglePointMetric> result = testEmitter.emittedMetrics();
// no-filter shall result in all 4 data metrics.
assertEquals(4, result.size());
testEmitter.reset();
testEmitter.reconfigurePredicate(k -> !k.key().name().endsWith(".count"));
collector.collect(testEmitter);
result = testEmitter.emittedMetrics();
// Drop metrics for Count type.
assertEquals(3, result.size());
testEmitter.reset();
testEmitter.reconfigurePredicate(k -> !k.key().name().endsWith(".nonmeasurable"));
collector.collect(testEmitter);
result = testEmitter.emittedMetrics();
// Drop non-measurable metric.
assertEquals(3, result.size());
testEmitter.reset();
testEmitter.reconfigurePredicate(key -> true);
collector.collect(testEmitter);
result = testEmitter.emittedMetrics();
// Again no filter.
assertEquals(4, result.size());
}
@Test
public void testCollectMetricsWithTemporalityChange() {
Sensor sensor = metrics.sensor("test");
sensor.add(metricName, new WindowedCount());
testEmitter.reconfigurePredicate(k -> !k.key().name().endsWith(".count"));
// Emit metrics as cumulative, verify the current time is 60 seconds ahead of start time.
sensor.record();
time.sleep(60 * 1000L);
collector.collect(testEmitter);
List<SinglePointMetric> result = testEmitter.emittedMetrics();
Metric counter = result.get(0).builder().build();
assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, counter.getSum().getAggregationTemporality());
NumberDataPoint point = counter.getSum().getDataPoints(0);
assertEquals(1d, point.getAsDouble());
assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(61L).getEpochSecond()) +
Instant.ofEpochSecond(61L).getNano(), point.getTimeUnixNano());
assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(1L).getEpochSecond()) +
Instant.ofEpochSecond(1L).getNano(), point.getStartTimeUnixNano());
// Again emit metrics as cumulative, verify the start time is unchanged and current time is
// advanced by 60 seconds again.
time.sleep(60 * 1000L);
sensor.record();
testEmitter.reset();
collector.collect(testEmitter);
result = testEmitter.emittedMetrics();
counter = result.get(0).builder().build();
assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, counter.getSum().getAggregationTemporality());
point = counter.getSum().getDataPoints(0);
assertEquals(2d, point.getAsDouble(), 0.0);
assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(121L).getEpochSecond()) +
Instant.ofEpochSecond(121L).getNano(), point.getTimeUnixNano());
assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(1L).getEpochSecond()) +
Instant.ofEpochSecond(1L).getNano(), point.getStartTimeUnixNano());
// Change Temporality. Emit metrics as delta, verify the temporality changes to delta and start time is reset to
// current time.
time.sleep(60 * 1000L);
sensor.record();
testEmitter.reset();
testEmitter.onlyDeltaMetrics(true);
collector.metricsReset();
collector.collect(testEmitter);
result = testEmitter.emittedMetrics();
counter = result.get(0).builder().build();
assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, counter.getSum().getAggregationTemporality());
point = counter.getSum().getDataPoints(0);
assertEquals(3d, point.getAsDouble(), 0.0);
assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(181L).getEpochSecond()) +
Instant.ofEpochSecond(181L).getNano(), point.getTimeUnixNano());
assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(181L).getEpochSecond()) +
Instant.ofEpochSecond(181L).getNano(), point.getStartTimeUnixNano());
// Again emit metrics as delta, verify the start time is tracked properly and only delta value
// is present on response.
time.sleep(60 * 1000L);
sensor.record();
testEmitter.reset();
collector.collect(testEmitter);
result = testEmitter.emittedMetrics();
counter = result.get(0).builder().build();
assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, counter.getSum().getAggregationTemporality());
point = counter.getSum().getDataPoints(0);
assertEquals(1d, point.getAsDouble(), 0.0);
assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(241L).getEpochSecond()) +
Instant.ofEpochSecond(241L).getNano(), point.getTimeUnixNano());
assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(181L).getEpochSecond()) +
Instant.ofEpochSecond(181L).getNano(), point.getStartTimeUnixNano());
// Change Temporality. Emit metrics as cumulative, verify the temporality changes to cumulative
// and start time is reset to current time.
time.sleep(60 * 1000L);
sensor.record();
testEmitter.reset();
testEmitter.onlyDeltaMetrics(false);
collector.metricsReset();
collector.collect(testEmitter);
result = testEmitter.emittedMetrics();
counter = result.get(0).builder().build();
assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, counter.getSum().getAggregationTemporality());
point = counter.getSum().getDataPoints(0);
assertEquals(5d, point.getAsDouble(), 0.0);
assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(301L).getEpochSecond()) +
Instant.ofEpochSecond(301L).getNano(), point.getTimeUnixNano());
assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(301L).getEpochSecond()) +
Instant.ofEpochSecond(301L).getNano(), point.getStartTimeUnixNano());
}
private MetricsReporter getTestMetricsReporter() {
// Inline implementation of MetricsReporter for testing.
return new MetricsReporter() {
@Override
public void init(List<KafkaMetric> metrics) {
collector.init(metrics);
}
@Override
public void metricChange(KafkaMetric metric) {
collector.metricChange(metric);
}
@Override
public void metricRemoval(KafkaMetric metric) {
collector.metricRemoval(metric);
}
@Override
public void close() {
// do nothing
}
@Override
public void configure(Map<String, ?> configs) {
// do nothing
}
};
}
public static Map<String, String> getTags(List<KeyValue> attributes) {
return attributes.stream()
.filter(attr -> attr.getValue().hasStringValue())
.collect(Collectors.toMap(
KeyValue::getKey,
attr -> attr.getValue().getStringValue()
));
}
}

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.telemetry.internals;
import org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
import org.junit.jupiter.api.Test;
import java.time.Instant;
import java.util.Collections;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class LastValueTrackerTest {
private static final MetricKey METRIC_NAME = new MetricKey("test-metric", Collections.emptyMap());
private final Instant instant1 = Instant.now();
private final Instant instant2 = instant1.plusMillis(1);
@Test
public void testGetAndSetDouble() {
LastValueTracker<Double> lastValueTracker = new LastValueTracker<>();
Optional<InstantAndValue<Double>> result = lastValueTracker.getAndSet(METRIC_NAME, instant1, 1d);
assertFalse(result.isPresent());
}
@Test
public void testGetAndSetDoubleWithTrackedValue() {
LastValueTracker<Double> lastValueTracker = new LastValueTracker<>();
lastValueTracker.getAndSet(METRIC_NAME, instant1, 1d);
Optional<InstantAndValue<Double>> result = lastValueTracker
.getAndSet(METRIC_NAME, instant2, 1000d);
assertTrue(result.isPresent());
assertEquals(instant1, result.get().getIntervalStart());
assertEquals(1d, result.get().getValue(), 1e-6);
}
@Test
public void testGetAndSetLong() {
LastValueTracker<Long> lastValueTracker = new LastValueTracker<>();
Optional<InstantAndValue<Long>> result = lastValueTracker.getAndSet(METRIC_NAME, instant1, 1L);
assertFalse(result.isPresent());
}
@Test
public void testGetAndSetLongWithTrackedValue() {
LastValueTracker<Long> lastValueTracker = new LastValueTracker<>();
lastValueTracker.getAndSet(METRIC_NAME, instant1, 2L);
Optional<InstantAndValue<Long>> result = lastValueTracker
.getAndSet(METRIC_NAME, instant2, 10000L);
assertTrue(result.isPresent());
assertEquals(instant1, result.get().getIntervalStart());
assertEquals(2L, result.get().getValue().longValue());
}
@Test
public void testRemove() {
LastValueTracker<Double> lastValueTracker = new LastValueTracker<>();
lastValueTracker.getAndSet(METRIC_NAME, instant1, 1d);
assertTrue(lastValueTracker.contains(METRIC_NAME));
InstantAndValue<Double> result = lastValueTracker.remove(METRIC_NAME);
assertNotNull(result);
assertEquals(instant1, result.getIntervalStart());
assertEquals(1d, result.getValue().longValue());
}
@Test
public void testRemoveWithNullKey() {
LastValueTracker<Double> lastValueTracker = new LastValueTracker<>();
assertThrows(NullPointerException.class, () -> lastValueTracker.remove(null));
}
@Test
public void testRemoveWithInvalidKey() {
LastValueTracker<Double> lastValueTracker = new LastValueTracker<>();
assertFalse(lastValueTracker.contains(METRIC_NAME));
InstantAndValue<Double> result = lastValueTracker.remove(METRIC_NAME);
assertNull(result);
}
}

View File

@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.telemetry.internals;
import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Instant;
import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class SinglePointMetricTest {
private MetricKey metricKey;
private Instant now;
/*
Test compares the metric representation from returned builder to ensure that the metric is
constructed correctly.
For example: Gauge metric with name "name" and double value 1.0 at certain time is represented as:
name: "name"
gauge {
data_points {
time_unix_nano: 1698063981021420000
as_double: 1.0
}
}
*/
@BeforeEach
public void setUp() {
metricKey = new MetricKey("name", Collections.emptyMap());
now = Instant.now();
}
@Test
public void testGaugeWithNumberValue() {
SinglePointMetric gaugeNumber = SinglePointMetric.gauge(metricKey, Long.valueOf(1), now);
MetricKey metricKey = gaugeNumber.key();
assertEquals("name", metricKey.name());
Metric metric = gaugeNumber.builder().build();
assertEquals(1, metric.getGauge().getDataPointsCount());
NumberDataPoint point = metric.getGauge().getDataPoints(0);
assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), point.getTimeUnixNano());
assertEquals(0, point.getStartTimeUnixNano());
assertEquals(1, point.getAsInt());
assertEquals(0, point.getAttributesCount());
}
@Test
public void testGaugeWithDoubleValue() {
SinglePointMetric gaugeNumber = SinglePointMetric.gauge(metricKey, 1.0, now);
MetricKey metricKey = gaugeNumber.key();
assertEquals("name", metricKey.name());
Metric metric = gaugeNumber.builder().build();
assertEquals(1, metric.getGauge().getDataPointsCount());
NumberDataPoint point = metric.getGauge().getDataPoints(0);
assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), point.getTimeUnixNano());
assertEquals(0, point.getStartTimeUnixNano());
assertEquals(1.0, point.getAsDouble());
assertEquals(0, point.getAttributesCount());
}
@Test
public void testGaugeWithMetricTags() {
MetricKey metricKey = new MetricKey("name", Collections.singletonMap("tag", "value"));
SinglePointMetric gaugeNumber = SinglePointMetric.gauge(metricKey, 1.0, now);
MetricKey key = gaugeNumber.key();
assertEquals("name", key.name());
Metric metric = gaugeNumber.builder().build();
assertEquals(1, metric.getGauge().getDataPointsCount());
NumberDataPoint point = metric.getGauge().getDataPoints(0);
assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), point.getTimeUnixNano());
assertEquals(0, point.getStartTimeUnixNano());
assertEquals(1.0, point.getAsDouble());
assertEquals(1, point.getAttributesCount());
assertEquals("tag", point.getAttributes(0).getKey());
assertEquals("value", point.getAttributes(0).getValue().getStringValue());
}
@Test
public void testSum() {
SinglePointMetric sum = SinglePointMetric.sum(metricKey, 1.0, false, now);
MetricKey key = sum.key();
assertEquals("name", key.name());
Metric metric = sum.builder().build();
assertFalse(metric.getSum().getIsMonotonic());
assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, metric.getSum().getAggregationTemporality());
assertEquals(1, metric.getSum().getDataPointsCount());
NumberDataPoint point = metric.getSum().getDataPoints(0);
assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), point.getTimeUnixNano());
assertEquals(0, point.getStartTimeUnixNano());
assertEquals(1.0, point.getAsDouble());
assertEquals(0, point.getAttributesCount());
}
@Test
public void testSumWithStartTimeAndTags() {
MetricKey metricKey = new MetricKey("name", Collections.singletonMap("tag", "value"));
SinglePointMetric sum = SinglePointMetric.sum(metricKey, 1.0, true, now, now);
MetricKey key = sum.key();
assertEquals("name", key.name());
Metric metric = sum.builder().build();
assertTrue(metric.getSum().getIsMonotonic());
assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, metric.getSum().getAggregationTemporality());
assertEquals(1, metric.getSum().getDataPointsCount());
NumberDataPoint point = metric.getSum().getDataPoints(0);
assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), point.getTimeUnixNano());
assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), point.getStartTimeUnixNano());
assertEquals(1.0, point.getAsDouble());
assertEquals(1, point.getAttributesCount());
assertEquals("tag", point.getAttributes(0).getKey());
assertEquals("value", point.getAttributes(0).getValue().getStringValue());
}
@Test
public void testDeltaSum() {
SinglePointMetric sum = SinglePointMetric.deltaSum(metricKey, 1.0, true, now, now);
MetricKey key = sum.key();
assertEquals("name", key.name());
Metric metric = sum.builder().build();
assertTrue(metric.getSum().getIsMonotonic());
assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, metric.getSum().getAggregationTemporality());
assertEquals(1, metric.getSum().getDataPointsCount());
NumberDataPoint point = metric.getSum().getDataPoints(0);
assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), point.getTimeUnixNano());
assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), point.getStartTimeUnixNano());
assertEquals(1.0, point.getAsDouble());
assertEquals(0, point.getAttributesCount());
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.telemetry.internals;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;
public class TestEmitter implements MetricsEmitter {
private final List<SinglePointMetric> emittedMetrics;
private Predicate<? super MetricKeyable> metricsPredicate = metricKeyable -> true;
private boolean onlyDeltaMetrics;
public TestEmitter() {
this(false);
}
public TestEmitter(boolean onlyDeltaMetrics) {
this.emittedMetrics = new ArrayList<>();
this.onlyDeltaMetrics = onlyDeltaMetrics;
}
@Override
public boolean shouldEmitMetric(MetricKeyable metricKeyable) {
return metricsPredicate.test(metricKeyable);
}
@Override
public boolean shouldEmitDeltaMetrics() {
return onlyDeltaMetrics;
}
@Override
public boolean emitMetric(SinglePointMetric metric) {
return emittedMetrics.add(metric);
}
@Override
public List<SinglePointMetric> emittedMetrics() {
return Collections.unmodifiableList(emittedMetrics);
}
public void reset() {
this.emittedMetrics.clear();
}
public void onlyDeltaMetrics(boolean onlyDeltaMetrics) {
this.onlyDeltaMetrics = onlyDeltaMetrics;
}
public void reconfigurePredicate(Predicate<? super MetricKeyable> metricsPredicate) {
this.metricsPredicate = metricsPredicate;
}
}