diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 90424ba9f0a..fe8e28c7ad8 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -198,6 +198,7 @@
+
diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java
new file mode 100644
index 00000000000..3aea4a11879
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are divided into
+ * two base types:
+ *
+ *
+ * - {@link Gauge}
+ * - {@link Measurable}
+ *
+ *
+ * {@link Gauge Gauges} can have any value, but we only collect metrics with number values.
+ * {@link Measurable Measurables} are divided into simple types with single values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ *
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ *
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid counter-example.
+ *
+ *
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is converted to GAUGE_DOUBLE.
+ *
+ *
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ *
+ *
+ * A Meter metric is always created with and reported as 2 metrics: a rate and a count. For eg:
+ * org.apache.kafka.common.network.Selector has Meter metric for "connection-close" but it has to be
+ * created with a "connection-close-rate" metric of type rate and a "connection-close-total"
+ * metric of type total.
+ *
+ *
+ *
+ * Frequencies is created with an array of Frequency objects. When a Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ *
+ *
+ * Percentiles work the same way as Frequencies. The only difference is that it is composed of Percentile
+ * types instead. So, we should treat the component measurable as GAUGE_DOUBLE.
+ *
+ *
+ *
+ * Some metrics are defined as either anonymous inner classes or lambdas implementing the Measurable
+ * interface. As we do not have any information on how to treat them, we should fallback to treating
+ * them as GAUGE_DOUBLE.
+ *
+ *
+ *
+ * OpenTelemetry mapping for measurables:
+ * Avg / Rate / Min / Max / Total / Sum -> Gauge
+ * Count -> Sum
+ * Meter has 2 elements :
+ * Total -> Sum
+ * Rate -> Gauge
+ * Frequencies -> each component is Gauge
+ * Percentiles -> each component is Gauge
+ */
+public class KafkaMetricsCollector implements MetricsCollector {
+
+ private static final Logger log = LoggerFactory.getLogger(KafkaMetricsCollector.class);
+
+ private final StateLedger ledger;
+ private final Time time;
+ private final MetricNamingStrategy metricNamingStrategy;
+
+ private static final Field METRIC_VALUE_PROVIDER_FIELD;
+
+ static {
+ try {
+ METRIC_VALUE_PROVIDER_FIELD = KafkaMetric.class.getDeclaredField("metricValueProvider");
+ METRIC_VALUE_PROVIDER_FIELD.setAccessible(true);
+ } catch (Exception e) {
+ throw new KafkaException(e);
+ }
+ }
+
+ public KafkaMetricsCollector(MetricNamingStrategy metricNamingStrategy) {
+ this(metricNamingStrategy, Time.SYSTEM);
+ }
+
+ // Visible for testing
+ KafkaMetricsCollector(MetricNamingStrategy metricNamingStrategy, Time time) {
+ this.metricNamingStrategy = metricNamingStrategy;
+ this.time = time;
+ this.ledger = new StateLedger();
+ }
+
+ public void init(List metrics) {
+ ledger.init(metrics);
+ }
+
+ /**
+ * This is called whenever a metric is updated or added.
+ */
+ public void metricChange(KafkaMetric metric) {
+ ledger.metricChange(metric);
+ }
+
+ /**
+ * This is called whenever a metric is removed.
+ */
+ public void metricRemoval(KafkaMetric metric) {
+ ledger.metricRemoval(metric);
+ }
+
+ /**
+ * This is called whenever temporality changes, resets the value tracker for metrics.
+ */
+ public void metricsReset() {
+ ledger.metricsStateReset();
+ }
+
+ // Visible for testing
+ Set getTrackedMetrics() {
+ return ledger.metricMap.keySet();
+ }
+
+ @Override
+ public void collect(MetricsEmitter metricsEmitter) {
+ for (Map.Entry entry : ledger.getMetrics()) {
+ MetricKey metricKey = entry.getKey();
+ KafkaMetric metric = entry.getValue();
+
+ try {
+ collectMetric(metricsEmitter, metricKey, metric);
+ } catch (Exception e) {
+ // catch and log to continue processing remaining metrics
+ log.error("Error processing Kafka metric {}", metricKey, e);
+ }
+ }
+ }
+
+ protected void collectMetric(MetricsEmitter metricsEmitter, MetricKey metricKey, KafkaMetric metric) {
+ Object metricValue;
+
+ try {
+ metricValue = metric.metricValue();
+ } catch (Exception e) {
+ // If an exception occurs when retrieving value, log warning and continue to process the rest of metrics
+ log.warn("Failed to retrieve metric value {}", metricKey.name(), e);
+ return;
+ }
+
+ Instant now = Instant.ofEpochMilli(time.milliseconds());
+ if (isMeasurable(metric)) {
+ Measurable measurable = metric.measurable();
+ Double value = (Double) metricValue;
+
+ if (measurable instanceof WindowedCount || measurable instanceof CumulativeSum) {
+ collectSum(metricKey, value, metricsEmitter, now);
+ } else {
+ collectGauge(metricKey, value, metricsEmitter, now);
+ }
+ } else {
+ // It is non-measurable Gauge metric.
+ // Collect the metric only if its value is a number.
+ if (metricValue instanceof Number) {
+ Number value = (Number) metricValue;
+ collectGauge(metricKey, value, metricsEmitter, now);
+ } else {
+ // skip non-measurable metrics
+ log.debug("Skipping non-measurable gauge metric {}", metricKey.name());
+ }
+ }
+ }
+
+ private void collectSum(MetricKey metricKey, double value, MetricsEmitter metricsEmitter, Instant timestamp) {
+ if (!metricsEmitter.shouldEmitMetric(metricKey)) {
+ return;
+ }
+
+ if (metricsEmitter.shouldEmitDeltaMetrics()) {
+ InstantAndValue instantAndValue = ledger.delta(metricKey, timestamp, value);
+
+ metricsEmitter.emitMetric(
+ SinglePointMetric.deltaSum(metricKey, instantAndValue.getValue(), true, timestamp,
+ instantAndValue.getIntervalStart())
+ );
+ } else {
+ metricsEmitter.emitMetric(
+ SinglePointMetric.sum(metricKey, value, true, timestamp, ledger.instantAdded(metricKey))
+ );
+ }
+ }
+
+ private void collectGauge(MetricKey metricKey, Number value, MetricsEmitter metricsEmitter, Instant timestamp) {
+ if (!metricsEmitter.shouldEmitMetric(metricKey)) {
+ return;
+ }
+
+ metricsEmitter.emitMetric(
+ SinglePointMetric.gauge(metricKey, value, timestamp)
+ );
+ }
+
+ private static boolean isMeasurable(KafkaMetric metric) {
+ // KafkaMetric does not expose the internal MetricValueProvider and throws an IllegalStateException
+ // exception, if measurable() is called for a Gauge.
+ // There are 2 ways to find the type of internal MetricValueProvider for a KafkaMetric - use reflection or
+ // get the information based on whether a IllegalStateException exception is thrown.
+ // We use reflection so that we can avoid the cost of generating the stack trace when it's
+ // not a measurable.
+ try {
+ Object provider = METRIC_VALUE_PROVIDER_FIELD.get(metric);
+ return provider instanceof Measurable;
+ } catch (Exception e) {
+ throw new KafkaException(e);
+ }
+ }
+
+ /**
+ * Keeps track of the state of metrics, e.g. when they were added, what their getAndSet value is,
+ * and clearing them out when they're removed.
+ */
+ private class StateLedger {
+
+ private final Map metricMap = new ConcurrentHashMap<>();
+ private final LastValueTracker doubleDeltas = new LastValueTracker<>();
+ private final Map metricAdded = new ConcurrentHashMap<>();
+
+ private Instant instantAdded(MetricKey metricKey) {
+ // lookup when the metric was added to use it as the interval start. That should always
+ // exist, but if it doesn't (e.g. changed metrics temporality) then we use now.
+ return metricAdded.computeIfAbsent(metricKey, x -> Instant.ofEpochMilli(time.milliseconds()));
+ }
+
+ private void init(List metrics) {
+ log.info("initializing Kafka metrics collector");
+ for (KafkaMetric m : metrics) {
+ metricMap.put(metricNamingStrategy.metricKey(m.metricName()), m);
+ }
+ }
+
+ private void metricChange(KafkaMetric metric) {
+ MetricKey metricKey = metricNamingStrategy.metricKey(metric.metricName());
+ metricMap.put(metricKey, metric);
+ if (doubleDeltas.contains(metricKey)) {
+ log.warn("Registering a new metric {} which already has a last value tracked. " +
+ "Removing metric from delta register.", metric.metricName(), new Exception());
+
+ /*
+ This scenario shouldn't occur while registering a metric since it should
+ have already been cleared out on cleanup/shutdown.
+ We remove the metric here to clear out the delta register because we are running
+ into an issue where old metrics are being re-registered which causes us to
+ record a negative delta
+ */
+ doubleDeltas.remove(metricKey);
+ }
+ metricAdded.put(metricKey, Instant.ofEpochMilli(time.milliseconds()));
+ }
+
+ private void metricRemoval(KafkaMetric metric) {
+ log.debug("removing kafka metric : {}", metric.metricName());
+ MetricKey metricKey = metricNamingStrategy.metricKey(metric.metricName());
+ metricMap.remove(metricKey);
+ doubleDeltas.remove(metricKey);
+ metricAdded.remove(metricKey);
+ }
+
+ private Iterable extends Entry> getMetrics() {
+ return metricMap.entrySet();
+ }
+
+ private InstantAndValue delta(MetricKey metricKey, Instant now, Double value) {
+ Optional> lastValue = doubleDeltas.getAndSet(metricKey, now, value);
+
+ return lastValue
+ .map(last -> new InstantAndValue<>(last.getIntervalStart(), value - last.getValue()))
+ .orElse(new InstantAndValue<>(instantAdded(metricKey), value));
+ }
+
+ private void metricsStateReset() {
+ metricAdded.clear();
+ doubleDeltas.reset();
+ }
+ }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/LastValueTracker.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/LastValueTracker.java
new file mode 100644
index 00000000000..117b40656e1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/LastValueTracker.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A LastValueTracker uses a ConcurrentHashMap to maintain historic values for a given key, and return
+ * a previous value and an Instant for that value.
+ *
+ * @param The type of the value.
+ */
+public class LastValueTracker {
+ private final Map> counters = new ConcurrentHashMap<>();
+
+ /**
+ * Return the last instant/value for the given MetricKey, or Optional.empty if there isn't one.
+ *
+ * @param metricKey the key for which to calculate a getAndSet.
+ * @param now the timestamp for the new value.
+ * @param value the current value.
+ * @return the timestamp of the previous entry and its value. If there
+ * isn't a previous entry, then this method returns {@link Optional#empty()}
+ */
+ public Optional> getAndSet(MetricKey metricKey, Instant now, T value) {
+ InstantAndValue instantAndValue = new InstantAndValue<>(now, value);
+ InstantAndValue valueOrNull = counters.put(metricKey, instantAndValue);
+
+ // there wasn't already an entry, so return empty.
+ if (valueOrNull == null) {
+ return Optional.empty();
+ }
+
+ // Return the previous instance and the value.
+ return Optional.of(valueOrNull);
+ }
+
+ public InstantAndValue remove(MetricKey metricKey) {
+ return counters.remove(metricKey);
+ }
+
+ public boolean contains(MetricKey metricKey) {
+ return counters.containsKey(metricKey);
+ }
+
+ public void reset() {
+ counters.clear();
+ }
+
+ public static class InstantAndValue {
+
+ private final Instant intervalStart;
+ private final T value;
+
+ public InstantAndValue(Instant intervalStart, T value) {
+ this.intervalStart = Objects.requireNonNull(intervalStart);
+ this.value = Objects.requireNonNull(value);
+ }
+
+ public Instant getIntervalStart() {
+ return intervalStart;
+ }
+
+ public T getValue() {
+ return value;
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsEmitter.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsEmitter.java
index 8ba3a13bd32..e9d978d6aa5 100644
--- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsEmitter.java
+++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsEmitter.java
@@ -50,6 +50,13 @@ public interface MetricsEmitter extends Closeable {
*/
boolean shouldEmitMetric(MetricKeyable metricKeyable);
+ /**
+ * Determines if the delta aggregation temporality metrics are to be emitted.
+ *
+ * @return {@code true} if the delta metric should be emitted, {@code false} otherwise
+ */
+ boolean shouldEmitDeltaMetrics();
+
/**
* Emits the metric in an implementation-specific fashion. Depending on the implementation,
* calls made to this after {@link #close()} has been invoked will fail.
diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java
index f81a9bfb60e..4194bd469bf 100644
--- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java
+++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java
@@ -16,16 +16,28 @@
*/
package org.apache.kafka.common.telemetry.internals;
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
/**
- * This class represents a metric that does not yet contain resource tags.
+ * This class represents a telemetry metric that does not yet contain resource tags.
* These additional resource tags will be added before emitting metrics by the telemetry reporter.
*/
public class SinglePointMetric implements MetricKeyable {
private final MetricKey key;
+ private final Metric.Builder metricBuilder;
- private SinglePointMetric(MetricKey key) {
+ private SinglePointMetric(MetricKey key, Metric.Builder metricBuilder) {
this.key = key;
+ this.metricBuilder = metricBuilder;
}
@Override
@@ -33,5 +45,102 @@ public class SinglePointMetric implements MetricKeyable {
return key;
}
- // TODO: Implement methods for serializing/deserializing metrics in required format.
+ public Metric.Builder builder() {
+ return metricBuilder;
+ }
+
+ /*
+ Methods to construct gauge metric type.
+ */
+ public static SinglePointMetric gauge(MetricKey metricKey, Number value, Instant timestamp) {
+ NumberDataPoint.Builder point = point(timestamp, value);
+ return gauge(metricKey, point);
+ }
+
+ public static SinglePointMetric gauge(MetricKey metricKey, double value, Instant timestamp) {
+ NumberDataPoint.Builder point = point(timestamp, value);
+ return gauge(metricKey, point);
+ }
+
+ /*
+ Methods to construct sum metric type.
+ */
+
+ public static SinglePointMetric sum(MetricKey metricKey, double value, boolean monotonic, Instant timestamp) {
+ return sum(metricKey, value, monotonic, timestamp, null);
+ }
+
+ public static SinglePointMetric sum(MetricKey metricKey, double value, boolean monotonic, Instant timestamp,
+ Instant startTimestamp) {
+ NumberDataPoint.Builder point = point(timestamp, value);
+ if (startTimestamp != null) {
+ point.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));
+ }
+
+ return sum(metricKey, AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, monotonic, point);
+ }
+
+ public static SinglePointMetric deltaSum(MetricKey metricKey, double value, boolean monotonic,
+ Instant timestamp, Instant startTimestamp) {
+ NumberDataPoint.Builder point = point(timestamp, value)
+ .setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));
+
+ return sum(metricKey, AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, monotonic, point);
+ }
+
+ /*
+ Helper methods to support metric construction.
+ */
+ private static SinglePointMetric sum(MetricKey metricKey, AggregationTemporality aggregationTemporality,
+ boolean monotonic, NumberDataPoint.Builder point) {
+ point.addAllAttributes(asAttributes(metricKey.tags()));
+
+ Metric.Builder metric = Metric.newBuilder().setName(metricKey.name());
+ metric
+ .getSumBuilder()
+ .setAggregationTemporality(aggregationTemporality)
+ .setIsMonotonic(monotonic)
+ .addDataPoints(point);
+ return new SinglePointMetric(metricKey, metric);
+ }
+
+ private static SinglePointMetric gauge(MetricKey metricKey, NumberDataPoint.Builder point) {
+ point.addAllAttributes(asAttributes(metricKey.tags()));
+
+ Metric.Builder metric = Metric.newBuilder().setName(metricKey.name());
+ metric.getGaugeBuilder().addDataPoints(point);
+ return new SinglePointMetric(metricKey, metric);
+ }
+
+ private static NumberDataPoint.Builder point(Instant timestamp, Number value) {
+ if (value instanceof Long || value instanceof Integer) {
+ return point(timestamp, value.longValue());
+ }
+
+ return point(timestamp, value.doubleValue());
+ }
+
+ private static NumberDataPoint.Builder point(Instant timestamp, long value) {
+ return NumberDataPoint.newBuilder()
+ .setTimeUnixNano(toTimeUnixNanos(timestamp))
+ .setAsInt(value);
+ }
+
+ private static NumberDataPoint.Builder point(Instant timestamp, double value) {
+ return NumberDataPoint.newBuilder()
+ .setTimeUnixNano(toTimeUnixNanos(timestamp))
+ .setAsDouble(value);
+ }
+
+ private static Iterable asAttributes(Map labels) {
+ return labels.entrySet().stream().map(
+ entry -> KeyValue.newBuilder()
+ .setKey(entry.getKey())
+ .setValue(AnyValue.newBuilder().setStringValue(entry.getValue())).build()
+ )::iterator;
+ }
+
+ private static long toTimeUnixNanos(Instant t) {
+ return TimeUnit.SECONDS.toNanos(t.getEpochSecond()) + t.getNano();
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollectorTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollectorTest.java
new file mode 100644
index 00000000000..bb4e0cdc2d3
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollectorTest.java
@@ -0,0 +1,605 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class KafkaMetricsCollectorTest {
+
+ private static final String DOMAIN = "test.domain";
+
+ private MetricName metricName;
+ private Map tags;
+ private Metrics metrics;
+ private MetricNamingStrategy metricNamingStrategy;
+ private KafkaMetricsCollector collector;
+
+ private TestEmitter testEmitter;
+ private MockTime time;
+
+ @BeforeEach
+ public void setUp() {
+ metrics = new Metrics();
+ tags = Collections.singletonMap("tag", "value");
+ metricName = metrics.metricName("name1", "group1", tags);
+ time = new MockTime(0, 1000L, TimeUnit.MILLISECONDS.toNanos(1000L));
+ testEmitter = new TestEmitter();
+
+ // Define metric naming strategy.
+ metricNamingStrategy = TelemetryMetricNamingConvention.getClientTelemetryMetricNamingStrategy(DOMAIN);
+
+ // Define collector to test.
+ collector = new KafkaMetricsCollector(
+ metricNamingStrategy,
+ time
+ );
+
+ // Add reporter to metrics.
+ metrics.addReporter(getTestMetricsReporter());
+ }
+
+ @Test
+ public void testMeasurableCounter() {
+ Sensor sensor = metrics.sensor("test");
+ sensor.add(metricName, new WindowedCount());
+
+ sensor.record();
+ sensor.record();
+
+ time.sleep(60 * 1000L);
+
+ // Collect metrics.
+ collector.collect(testEmitter);
+ List result = testEmitter.emittedMetrics();
+
+ // Should get exactly 2 Kafka measurables since Metrics always includes a count measurable.
+ assertEquals(2, result.size());
+
+ Metric counter = result.stream()
+ .flatMap(metrics -> Stream.of(metrics.builder().build()))
+ .filter(metric -> metric.getName().equals("test.domain.group1.name1")).findFirst().get();
+
+ assertTrue(counter.hasSum());
+ assertEquals(tags, getTags(counter.getSum().getDataPoints(0).getAttributesList()));
+
+ assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, counter.getSum().getAggregationTemporality());
+ assertTrue(counter.getSum().getIsMonotonic());
+ NumberDataPoint point = counter.getSum().getDataPoints(0);
+ assertEquals(2d, point.getAsDouble(), 0.0);
+ assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(61L).getEpochSecond()) +
+ Instant.ofEpochSecond(61L).getNano(), point.getTimeUnixNano());
+ assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(1L).getEpochSecond()) +
+ Instant.ofEpochSecond(1L).getNano(), point.getStartTimeUnixNano());
+ }
+
+ @Test
+ public void testMeasurableCounterDeltaMetrics() {
+ Sensor sensor = metrics.sensor("test");
+ sensor.add(metricName, new WindowedCount());
+
+ sensor.record();
+ sensor.record();
+
+ time.sleep(60 * 1000L);
+
+ // Collect delta metrics.
+ testEmitter.onlyDeltaMetrics(true);
+ collector.collect(testEmitter);
+ List result = testEmitter.emittedMetrics();
+
+ // Should get exactly 2 Kafka measurables since Metrics always includes a count measurable.
+ assertEquals(2, result.size());
+
+ Metric counter = result.stream()
+ .flatMap(metrics -> Stream.of(metrics.builder().build()))
+ .filter(metric -> metric.getName().equals("test.domain.group1.name1")).findFirst().get();
+
+ assertTrue(counter.hasSum());
+ assertEquals(tags, getTags(counter.getSum().getDataPoints(0).getAttributesList()));
+
+ assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, counter.getSum().getAggregationTemporality());
+ assertTrue(counter.getSum().getIsMonotonic());
+ NumberDataPoint point = counter.getSum().getDataPoints(0);
+ assertEquals(2d, point.getAsDouble(), 0.0);
+ assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(61L).getEpochSecond()) +
+ Instant.ofEpochSecond(61L).getNano(), point.getTimeUnixNano());
+ assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(1L).getEpochSecond()) +
+ Instant.ofEpochSecond(1L).getNano(), point.getStartTimeUnixNano());
+ }
+
+ @Test
+ public void testMeasurableTotal() {
+ Sensor sensor = metrics.sensor("test");
+ sensor.add(metricName, new CumulativeSum());
+
+ sensor.record(10L);
+ sensor.record(5L);
+
+ // Collect metrics.
+ collector.collect(testEmitter);
+ List result = testEmitter.emittedMetrics();
+
+ // Should get exactly 2 Kafka measurables since Metrics always includes a count measurable.
+ assertEquals(2, result.size());
+
+ Metric counter = result.stream()
+ .flatMap(metrics -> Stream.of(metrics.builder().build()))
+ .filter(metric -> metric.getName().equals("test.domain.group1.name1")).findFirst().get();
+
+ assertTrue(counter.hasSum());
+ assertEquals(tags, getTags(counter.getSum().getDataPoints(0).getAttributesList()));
+ assertEquals(15, counter.getSum().getDataPoints(0).getAsDouble(), 0.0);
+ }
+
+ @Test
+ public void testMeasurableTotalDeltaMetrics() {
+ Sensor sensor = metrics.sensor("test");
+ sensor.add(metricName, new CumulativeSum());
+
+ sensor.record(10L);
+ sensor.record(5L);
+
+ // Collect metrics.
+ testEmitter.onlyDeltaMetrics(true);
+ collector.collect(testEmitter);
+ List result = testEmitter.emittedMetrics();
+
+ // Should get exactly 2 Kafka measurables since Metrics always includes a count measurable.
+ assertEquals(2, result.size());
+
+ Metric counter = result.stream()
+ .flatMap(metrics -> Stream.of(metrics.builder().build()))
+ .filter(metric -> metric.getName().equals("test.domain.group1.name1")).findFirst().get();
+
+ assertTrue(counter.hasSum());
+ assertEquals(tags, getTags(counter.getSum().getDataPoints(0).getAttributesList()));
+ assertEquals(15, counter.getSum().getDataPoints(0).getAsDouble(), 0.0);
+ }
+
+ @Test
+ public void testMeasurableGauge() {
+ metrics.addMetric(metricName, (config, now) -> 100.0);
+
+ collector.collect(testEmitter);
+ List result = testEmitter.emittedMetrics();
+
+ // Should get exactly 2 Kafka measurables since Metrics always includes a count measurable.
+ assertEquals(2, result.size());
+
+ Metric counter = result.stream()
+ .flatMap(metrics -> Stream.of(metrics.builder().build()))
+ .filter(metric -> metric.getName().equals("test.domain.group1.name1")).findFirst().get();
+
+ assertTrue(counter.hasGauge());
+ assertEquals(tags, getTags(counter.getGauge().getDataPoints(0).getAttributesList()));
+ assertEquals(100L, counter.getGauge().getDataPoints(0).getAsDouble(), 0.0);
+ }
+
+ @Test
+ public void testNonMeasurable() {
+ metrics.addMetric(metrics.metricName("float", "group1", tags), (Gauge) (config, now) -> 99f);
+ metrics.addMetric(metrics.metricName("double", "group1", tags), (Gauge) (config, now) -> 99d);
+ metrics.addMetric(metrics.metricName("int", "group1", tags), (Gauge) (config, now) -> 100);
+ metrics.addMetric(metrics.metricName("long", "group1", tags), (Gauge) (config, now) -> 100L);
+
+ collector.collect(testEmitter);
+ List result = testEmitter.emittedMetrics();
+
+ // Should get exactly 5 Kafka measurables since Metrics always includes a count measurable.
+ assertEquals(5, result.size());
+
+ result.stream()
+ .flatMap(metrics -> Stream.of(metrics.builder().build()))
+ .filter(metric -> metric.getName().equals("test.domain.group1.(float|double)")).forEach(
+ doubleGauge -> {
+ assertTrue(doubleGauge.hasGauge());
+ assertEquals(tags, getTags(doubleGauge.getGauge().getDataPoints(0).getAttributesList()));
+ assertEquals(99d, doubleGauge.getGauge().getDataPoints(0).getAsDouble(), 0.0);
+ });
+
+ result.stream()
+ .flatMap(metrics -> Stream.of(metrics.builder().build()))
+ .filter(metric -> metric.getName().equals("test.domain.group1.(int|long)")).forEach(
+ intGauge -> {
+ assertTrue(intGauge.hasGauge());
+ assertEquals(tags, getTags(intGauge.getGauge().getDataPoints(0).getAttributesList()));
+ assertEquals(100, intGauge.getGauge().getDataPoints(0).getAsDouble(), 0.0);
+ });
+ }
+
+ @Test
+ public void testMeasurableWithException() {
+ metrics.addMetric(metricName, null, (config, now) -> {
+ throw new RuntimeException();
+ });
+
+ collector.collect(testEmitter);
+ List result = testEmitter.emittedMetrics();
+
+ //Verify only the global count of metrics exist
+ assertEquals(1, result.size());
+ // Group is registered as kafka-metrics-count
+ assertEquals("test.domain.kafka.count.count", result.get(0).builder().build().getName());
+ //Verify metrics with measure() method throw exception is not returned
+ assertFalse(result.stream()
+ .flatMap(metrics -> Stream.of(metrics.builder().build()))
+ .anyMatch(metric -> metric.getName().equals("test.domain.group1.name1")));
+ }
+
+ @Test
+ public void testMetricRemoval() {
+ metrics.addMetric(metricName, (config, now) -> 100.0);
+
+ collector.collect(testEmitter);
+ assertEquals(2, testEmitter.emittedMetrics().size());
+
+ metrics.removeMetric(metricName);
+ assertFalse(collector.getTrackedMetrics().contains(metricNamingStrategy.metricKey(metricName)));
+
+ // verify that the metric was removed.
+ testEmitter.reset();
+ collector.collect(testEmitter);
+ List collected = testEmitter.emittedMetrics();
+ assertEquals(1, collected.size());
+ assertEquals("test.domain.kafka.count.count", collected.get(0).builder().build().getName());
+ }
+
+ @Test
+ public void testSecondCollectCumulative() {
+ Sensor sensor = metrics.sensor("test");
+ sensor.add(metricName, new CumulativeSum());
+
+ sensor.record();
+ sensor.record();
+ time.sleep(60 * 1000L);
+
+ collector.collect(testEmitter);
+
+ // Update it again by 5 and advance time by another 60 seconds.
+ sensor.record();
+ sensor.record();
+ sensor.record();
+ sensor.record();
+ sensor.record();
+ time.sleep(60 * 1000L);
+
+ testEmitter.reset();
+ collector.collect(testEmitter);
+ List result = testEmitter.emittedMetrics();
+
+ assertEquals(2, result.size());
+
+
+ Metric cumulative = result.stream()
+ .flatMap(metrics -> Stream.of(metrics.builder().build()))
+ .filter(metric -> metric.getName().equals("test.domain.group1.name1")).findFirst().get();
+
+ NumberDataPoint point = cumulative.getSum().getDataPoints(0);
+ assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, cumulative.getSum().getAggregationTemporality());
+ assertTrue(cumulative.getSum().getIsMonotonic());
+ assertEquals(7d, point.getAsDouble(), 0.0);
+ assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(121L).getEpochSecond()) +
+ Instant.ofEpochSecond(121L).getNano(), point.getTimeUnixNano());
+ assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(1L).getEpochSecond()) +
+ Instant.ofEpochSecond(1L).getNano(), point.getStartTimeUnixNano());
+ }
+
+ @Test
+ public void testSecondDeltaCollectDouble() {
+ Sensor sensor = metrics.sensor("test");
+ sensor.add(metricName, new CumulativeSum());
+
+ sensor.record();
+ sensor.record();
+ time.sleep(60 * 1000L);
+
+ testEmitter.onlyDeltaMetrics(true);
+ collector.collect(testEmitter);
+
+ // Update it again by 5 and advance time by another 60 seconds.
+ sensor.record();
+ sensor.record();
+ sensor.record();
+ sensor.record();
+ sensor.record();
+ time.sleep(60 * 1000L);
+
+ testEmitter.reset();
+ collector.collect(testEmitter);
+ List result = testEmitter.emittedMetrics();
+
+ assertEquals(2, result.size());
+
+ Metric cumulative = result.stream()
+ .flatMap(metrics -> Stream.of(metrics.builder().build()))
+ .filter(metric -> metric.getName().equals("test.domain.group1.name1")).findFirst().get();
+
+ NumberDataPoint point = cumulative.getSum().getDataPoints(0);
+ assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, cumulative.getSum().getAggregationTemporality());
+ assertTrue(cumulative.getSum().getIsMonotonic());
+ assertEquals(5d, point.getAsDouble(), 0.0);
+ assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(121L).getEpochSecond()) +
+ Instant.ofEpochSecond(121L).getNano(), point.getTimeUnixNano());
+ assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(61L).getEpochSecond()) +
+ Instant.ofEpochSecond(61L).getNano(), point.getStartTimeUnixNano());
+ }
+
+ @Test
+ public void testCollectFilter() {
+ metrics.addMetric(metricName, (config, now) -> 100.0);
+
+ testEmitter.reconfigurePredicate(k -> !k.key().name().endsWith(".count"));
+ collector.collect(testEmitter);
+ List result = testEmitter.emittedMetrics();
+
+ // Should get exactly 1 Kafka measurables because we excluded the count measurable
+ assertEquals(1, result.size());
+
+ Metric counter = result.get(0).builder().build();
+
+ assertTrue(counter.hasGauge());
+ assertEquals(100L, counter.getGauge().getDataPoints(0).getAsDouble(), 0.0);
+ }
+
+ @Test
+ public void testCollectFilterWithCumulativeTemporality() {
+ MetricName name1 = metrics.metricName("nonMeasurable", "group1", tags);
+ MetricName name2 = metrics.metricName("windowed", "group1", tags);
+ MetricName name3 = metrics.metricName("cumulative", "group1", tags);
+
+ metrics.addMetric(name1, (Gauge) (config, now) -> 99d);
+
+ Sensor sensor = metrics.sensor("test");
+ sensor.add(name2, new WindowedCount());
+ sensor.add(name3, new CumulativeSum());
+
+ collector.collect(testEmitter);
+ List result = testEmitter.emittedMetrics();
+
+ // no-filter shall result in all 4 data metrics.
+ assertEquals(4, result.size());
+
+ testEmitter.reset();
+ testEmitter.reconfigurePredicate(k -> !k.key().name().endsWith(".count"));
+ collector.collect(testEmitter);
+ result = testEmitter.emittedMetrics();
+
+ // Drop metrics for Count type.
+ assertEquals(3, result.size());
+
+ testEmitter.reset();
+ testEmitter.reconfigurePredicate(k -> !k.key().name().endsWith(".nonmeasurable"));
+ collector.collect(testEmitter);
+ result = testEmitter.emittedMetrics();
+
+ // Drop non-measurable metric.
+ assertEquals(3, result.size());
+
+ testEmitter.reset();
+ testEmitter.reconfigurePredicate(key -> true);
+ collector.collect(testEmitter);
+ result = testEmitter.emittedMetrics();
+
+ // Again no filter.
+ assertEquals(4, result.size());
+ }
+
+ @Test
+ public void testCollectFilterWithDeltaTemporality() {
+ MetricName name1 = metrics.metricName("nonMeasurable", "group1", tags);
+ MetricName name2 = metrics.metricName("windowed", "group1", tags);
+ MetricName name3 = metrics.metricName("cumulative", "group1", tags);
+
+ metrics.addMetric(name1, (Gauge) (config, now) -> 99d);
+
+ Sensor sensor = metrics.sensor("test");
+ sensor.add(name2, new WindowedCount());
+ sensor.add(name3, new CumulativeSum());
+
+ testEmitter.onlyDeltaMetrics(true);
+ collector.collect(testEmitter);
+ List result = testEmitter.emittedMetrics();
+
+ // no-filter shall result in all 4 data metrics.
+ assertEquals(4, result.size());
+
+ testEmitter.reset();
+ testEmitter.reconfigurePredicate(k -> !k.key().name().endsWith(".count"));
+ collector.collect(testEmitter);
+ result = testEmitter.emittedMetrics();
+
+ // Drop metrics for Count type.
+ assertEquals(3, result.size());
+
+ testEmitter.reset();
+ testEmitter.reconfigurePredicate(k -> !k.key().name().endsWith(".nonmeasurable"));
+ collector.collect(testEmitter);
+ result = testEmitter.emittedMetrics();
+
+ // Drop non-measurable metric.
+ assertEquals(3, result.size());
+
+ testEmitter.reset();
+ testEmitter.reconfigurePredicate(key -> true);
+ collector.collect(testEmitter);
+ result = testEmitter.emittedMetrics();
+
+ // Again no filter.
+ assertEquals(4, result.size());
+ }
+
+ @Test
+ public void testCollectMetricsWithTemporalityChange() {
+ Sensor sensor = metrics.sensor("test");
+ sensor.add(metricName, new WindowedCount());
+ testEmitter.reconfigurePredicate(k -> !k.key().name().endsWith(".count"));
+
+ // Emit metrics as cumulative, verify the current time is 60 seconds ahead of start time.
+ sensor.record();
+ time.sleep(60 * 1000L);
+ collector.collect(testEmitter);
+
+ List result = testEmitter.emittedMetrics();
+ Metric counter = result.get(0).builder().build();
+ assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, counter.getSum().getAggregationTemporality());
+ NumberDataPoint point = counter.getSum().getDataPoints(0);
+ assertEquals(1d, point.getAsDouble());
+ assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(61L).getEpochSecond()) +
+ Instant.ofEpochSecond(61L).getNano(), point.getTimeUnixNano());
+ assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(1L).getEpochSecond()) +
+ Instant.ofEpochSecond(1L).getNano(), point.getStartTimeUnixNano());
+
+ // Again emit metrics as cumulative, verify the start time is unchanged and current time is
+ // advanced by 60 seconds again.
+ time.sleep(60 * 1000L);
+ sensor.record();
+ testEmitter.reset();
+ collector.collect(testEmitter);
+
+ result = testEmitter.emittedMetrics();
+ counter = result.get(0).builder().build();
+ assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, counter.getSum().getAggregationTemporality());
+ point = counter.getSum().getDataPoints(0);
+ assertEquals(2d, point.getAsDouble(), 0.0);
+ assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(121L).getEpochSecond()) +
+ Instant.ofEpochSecond(121L).getNano(), point.getTimeUnixNano());
+ assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(1L).getEpochSecond()) +
+ Instant.ofEpochSecond(1L).getNano(), point.getStartTimeUnixNano());
+
+
+ // Change Temporality. Emit metrics as delta, verify the temporality changes to delta and start time is reset to
+ // current time.
+ time.sleep(60 * 1000L);
+ sensor.record();
+ testEmitter.reset();
+ testEmitter.onlyDeltaMetrics(true);
+ collector.metricsReset();
+ collector.collect(testEmitter);
+
+ result = testEmitter.emittedMetrics();
+ counter = result.get(0).builder().build();
+ assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, counter.getSum().getAggregationTemporality());
+ point = counter.getSum().getDataPoints(0);
+ assertEquals(3d, point.getAsDouble(), 0.0);
+ assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(181L).getEpochSecond()) +
+ Instant.ofEpochSecond(181L).getNano(), point.getTimeUnixNano());
+ assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(181L).getEpochSecond()) +
+ Instant.ofEpochSecond(181L).getNano(), point.getStartTimeUnixNano());
+
+ // Again emit metrics as delta, verify the start time is tracked properly and only delta value
+ // is present on response.
+ time.sleep(60 * 1000L);
+ sensor.record();
+ testEmitter.reset();
+ collector.collect(testEmitter);
+
+ result = testEmitter.emittedMetrics();
+ counter = result.get(0).builder().build();
+ assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, counter.getSum().getAggregationTemporality());
+ point = counter.getSum().getDataPoints(0);
+ assertEquals(1d, point.getAsDouble(), 0.0);
+ assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(241L).getEpochSecond()) +
+ Instant.ofEpochSecond(241L).getNano(), point.getTimeUnixNano());
+ assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(181L).getEpochSecond()) +
+ Instant.ofEpochSecond(181L).getNano(), point.getStartTimeUnixNano());
+
+ // Change Temporality. Emit metrics as cumulative, verify the temporality changes to cumulative
+ // and start time is reset to current time.
+ time.sleep(60 * 1000L);
+ sensor.record();
+ testEmitter.reset();
+ testEmitter.onlyDeltaMetrics(false);
+ collector.metricsReset();
+ collector.collect(testEmitter);
+
+ result = testEmitter.emittedMetrics();
+ counter = result.get(0).builder().build();
+ assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, counter.getSum().getAggregationTemporality());
+ point = counter.getSum().getDataPoints(0);
+ assertEquals(5d, point.getAsDouble(), 0.0);
+ assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(301L).getEpochSecond()) +
+ Instant.ofEpochSecond(301L).getNano(), point.getTimeUnixNano());
+ assertEquals(TimeUnit.SECONDS.toNanos(Instant.ofEpochSecond(301L).getEpochSecond()) +
+ Instant.ofEpochSecond(301L).getNano(), point.getStartTimeUnixNano());
+ }
+
+ private MetricsReporter getTestMetricsReporter() {
+ // Inline implementation of MetricsReporter for testing.
+ return new MetricsReporter() {
+ @Override
+ public void init(List metrics) {
+ collector.init(metrics);
+ }
+
+ @Override
+ public void metricChange(KafkaMetric metric) {
+ collector.metricChange(metric);
+ }
+
+ @Override
+ public void metricRemoval(KafkaMetric metric) {
+ collector.metricRemoval(metric);
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+
+ @Override
+ public void configure(Map configs) {
+ // do nothing
+ }
+ };
+ }
+
+ public static Map getTags(List attributes) {
+ return attributes.stream()
+ .filter(attr -> attr.getValue().hasStringValue())
+ .collect(Collectors.toMap(
+ KeyValue::getKey,
+ attr -> attr.getValue().getStringValue()
+ ));
+ }
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/LastValueTrackerTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/LastValueTrackerTest.java
new file mode 100644
index 00000000000..a5947ae58ad
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/LastValueTrackerTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LastValueTrackerTest {
+
+ private static final MetricKey METRIC_NAME = new MetricKey("test-metric", Collections.emptyMap());
+
+ private final Instant instant1 = Instant.now();
+ private final Instant instant2 = instant1.plusMillis(1);
+
+ @Test
+ public void testGetAndSetDouble() {
+ LastValueTracker lastValueTracker = new LastValueTracker<>();
+ Optional> result = lastValueTracker.getAndSet(METRIC_NAME, instant1, 1d);
+ assertFalse(result.isPresent());
+ }
+
+ @Test
+ public void testGetAndSetDoubleWithTrackedValue() {
+ LastValueTracker lastValueTracker = new LastValueTracker<>();
+ lastValueTracker.getAndSet(METRIC_NAME, instant1, 1d);
+
+ Optional> result = lastValueTracker
+ .getAndSet(METRIC_NAME, instant2, 1000d);
+
+ assertTrue(result.isPresent());
+ assertEquals(instant1, result.get().getIntervalStart());
+ assertEquals(1d, result.get().getValue(), 1e-6);
+ }
+
+ @Test
+ public void testGetAndSetLong() {
+ LastValueTracker lastValueTracker = new LastValueTracker<>();
+ Optional> result = lastValueTracker.getAndSet(METRIC_NAME, instant1, 1L);
+ assertFalse(result.isPresent());
+ }
+
+ @Test
+ public void testGetAndSetLongWithTrackedValue() {
+ LastValueTracker lastValueTracker = new LastValueTracker<>();
+ lastValueTracker.getAndSet(METRIC_NAME, instant1, 2L);
+ Optional> result = lastValueTracker
+ .getAndSet(METRIC_NAME, instant2, 10000L);
+
+ assertTrue(result.isPresent());
+ assertEquals(instant1, result.get().getIntervalStart());
+ assertEquals(2L, result.get().getValue().longValue());
+ }
+
+ @Test
+ public void testRemove() {
+ LastValueTracker lastValueTracker = new LastValueTracker<>();
+ lastValueTracker.getAndSet(METRIC_NAME, instant1, 1d);
+
+ assertTrue(lastValueTracker.contains(METRIC_NAME));
+
+ InstantAndValue result = lastValueTracker.remove(METRIC_NAME);
+ assertNotNull(result);
+ assertEquals(instant1, result.getIntervalStart());
+ assertEquals(1d, result.getValue().longValue());
+ }
+
+ @Test
+ public void testRemoveWithNullKey() {
+ LastValueTracker lastValueTracker = new LastValueTracker<>();
+ assertThrows(NullPointerException.class, () -> lastValueTracker.remove(null));
+ }
+
+ @Test
+ public void testRemoveWithInvalidKey() {
+ LastValueTracker lastValueTracker = new LastValueTracker<>();
+ assertFalse(lastValueTracker.contains(METRIC_NAME));
+
+ InstantAndValue result = lastValueTracker.remove(METRIC_NAME);
+ assertNull(result);
+ }
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/SinglePointMetricTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/SinglePointMetricTest.java
new file mode 100644
index 00000000000..97801b1e069
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/SinglePointMetricTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class SinglePointMetricTest {
+
+ private MetricKey metricKey;
+ private Instant now;
+
+ /*
+ Test compares the metric representation from returned builder to ensure that the metric is
+ constructed correctly.
+
+ For example: Gauge metric with name "name" and double value 1.0 at certain time is represented as:
+
+ name: "name"
+ gauge {
+ data_points {
+ time_unix_nano: 1698063981021420000
+ as_double: 1.0
+ }
+ }
+ */
+
+ @BeforeEach
+ public void setUp() {
+ metricKey = new MetricKey("name", Collections.emptyMap());
+ now = Instant.now();
+ }
+
+ @Test
+ public void testGaugeWithNumberValue() {
+ SinglePointMetric gaugeNumber = SinglePointMetric.gauge(metricKey, Long.valueOf(1), now);
+ MetricKey metricKey = gaugeNumber.key();
+ assertEquals("name", metricKey.name());
+
+ Metric metric = gaugeNumber.builder().build();
+ assertEquals(1, metric.getGauge().getDataPointsCount());
+
+ NumberDataPoint point = metric.getGauge().getDataPoints(0);
+ assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), point.getTimeUnixNano());
+ assertEquals(0, point.getStartTimeUnixNano());
+ assertEquals(1, point.getAsInt());
+ assertEquals(0, point.getAttributesCount());
+ }
+
+ @Test
+ public void testGaugeWithDoubleValue() {
+ SinglePointMetric gaugeNumber = SinglePointMetric.gauge(metricKey, 1.0, now);
+ MetricKey metricKey = gaugeNumber.key();
+ assertEquals("name", metricKey.name());
+
+ Metric metric = gaugeNumber.builder().build();
+ assertEquals(1, metric.getGauge().getDataPointsCount());
+
+ NumberDataPoint point = metric.getGauge().getDataPoints(0);
+ assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), point.getTimeUnixNano());
+ assertEquals(0, point.getStartTimeUnixNano());
+ assertEquals(1.0, point.getAsDouble());
+ assertEquals(0, point.getAttributesCount());
+ }
+
+ @Test
+ public void testGaugeWithMetricTags() {
+ MetricKey metricKey = new MetricKey("name", Collections.singletonMap("tag", "value"));
+ SinglePointMetric gaugeNumber = SinglePointMetric.gauge(metricKey, 1.0, now);
+
+ MetricKey key = gaugeNumber.key();
+ assertEquals("name", key.name());
+
+ Metric metric = gaugeNumber.builder().build();
+ assertEquals(1, metric.getGauge().getDataPointsCount());
+
+ NumberDataPoint point = metric.getGauge().getDataPoints(0);
+ assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), point.getTimeUnixNano());
+ assertEquals(0, point.getStartTimeUnixNano());
+ assertEquals(1.0, point.getAsDouble());
+ assertEquals(1, point.getAttributesCount());
+ assertEquals("tag", point.getAttributes(0).getKey());
+ assertEquals("value", point.getAttributes(0).getValue().getStringValue());
+ }
+
+ @Test
+ public void testSum() {
+ SinglePointMetric sum = SinglePointMetric.sum(metricKey, 1.0, false, now);
+
+ MetricKey key = sum.key();
+ assertEquals("name", key.name());
+
+ Metric metric = sum.builder().build();
+ assertFalse(metric.getSum().getIsMonotonic());
+ assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, metric.getSum().getAggregationTemporality());
+ assertEquals(1, metric.getSum().getDataPointsCount());
+
+ NumberDataPoint point = metric.getSum().getDataPoints(0);
+ assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), point.getTimeUnixNano());
+ assertEquals(0, point.getStartTimeUnixNano());
+ assertEquals(1.0, point.getAsDouble());
+ assertEquals(0, point.getAttributesCount());
+ }
+
+ @Test
+ public void testSumWithStartTimeAndTags() {
+ MetricKey metricKey = new MetricKey("name", Collections.singletonMap("tag", "value"));
+ SinglePointMetric sum = SinglePointMetric.sum(metricKey, 1.0, true, now, now);
+
+ MetricKey key = sum.key();
+ assertEquals("name", key.name());
+
+ Metric metric = sum.builder().build();
+ assertTrue(metric.getSum().getIsMonotonic());
+ assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, metric.getSum().getAggregationTemporality());
+ assertEquals(1, metric.getSum().getDataPointsCount());
+
+ NumberDataPoint point = metric.getSum().getDataPoints(0);
+ assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), point.getTimeUnixNano());
+ assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), point.getStartTimeUnixNano());
+ assertEquals(1.0, point.getAsDouble());
+ assertEquals(1, point.getAttributesCount());
+ assertEquals("tag", point.getAttributes(0).getKey());
+ assertEquals("value", point.getAttributes(0).getValue().getStringValue());
+ }
+
+ @Test
+ public void testDeltaSum() {
+ SinglePointMetric sum = SinglePointMetric.deltaSum(metricKey, 1.0, true, now, now);
+
+ MetricKey key = sum.key();
+ assertEquals("name", key.name());
+
+ Metric metric = sum.builder().build();
+ assertTrue(metric.getSum().getIsMonotonic());
+ assertEquals(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, metric.getSum().getAggregationTemporality());
+ assertEquals(1, metric.getSum().getDataPointsCount());
+
+ NumberDataPoint point = metric.getSum().getDataPoints(0);
+ assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), point.getTimeUnixNano());
+ assertEquals(now.getEpochSecond() * Math.pow(10, 9) + now.getNano(), point.getStartTimeUnixNano());
+ assertEquals(1.0, point.getAsDouble());
+ assertEquals(0, point.getAttributesCount());
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/TestEmitter.java b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/TestEmitter.java
new file mode 100644
index 00000000000..bcc5a868558
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/TestEmitter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Predicate;
+
+public class TestEmitter implements MetricsEmitter {
+
+ private final List emittedMetrics;
+ private Predicate super MetricKeyable> metricsPredicate = metricKeyable -> true;
+ private boolean onlyDeltaMetrics;
+
+ public TestEmitter() {
+ this(false);
+ }
+
+ public TestEmitter(boolean onlyDeltaMetrics) {
+ this.emittedMetrics = new ArrayList<>();
+ this.onlyDeltaMetrics = onlyDeltaMetrics;
+ }
+
+ @Override
+ public boolean shouldEmitMetric(MetricKeyable metricKeyable) {
+ return metricsPredicate.test(metricKeyable);
+ }
+
+ @Override
+ public boolean shouldEmitDeltaMetrics() {
+ return onlyDeltaMetrics;
+ }
+
+ @Override
+ public boolean emitMetric(SinglePointMetric metric) {
+ return emittedMetrics.add(metric);
+ }
+
+ @Override
+ public List emittedMetrics() {
+ return Collections.unmodifiableList(emittedMetrics);
+ }
+
+ public void reset() {
+ this.emittedMetrics.clear();
+ }
+
+ public void onlyDeltaMetrics(boolean onlyDeltaMetrics) {
+ this.onlyDeltaMetrics = onlyDeltaMetrics;
+ }
+
+ public void reconfigurePredicate(Predicate super MetricKeyable> metricsPredicate) {
+ this.metricsPredicate = metricsPredicate;
+ }
+}