KAFKA-19684 Move Gauge#value to MetricValueProvider (#20543)

KIP-188 introduced MetricValueProvider, adding Measurable and Gauge as
its sub interfaces. However, this left a legacy

[issue](https://github.com/apache/kafka/pull/3705#discussion_r140830112):
move the value method from Gauge to the super interface,
MetricValueProvider.

This PR moves the value method from Gauge to MetricValueProvider and
provides a default implementation in Measurable. This unifies the
methods used by Gauge and Measurable to obtain monitoring values.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai
 <chia7712@gmail.com>
This commit is contained in:
majialong 2025-10-04 23:30:39 +08:00 committed by GitHub
parent bc2f23b879
commit 5736d506f0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 61 additions and 28 deletions

View File

@ -20,13 +20,4 @@ package org.apache.kafka.common.metrics;
* A gauge metric is an instantaneous reading of a particular value. * A gauge metric is an instantaneous reading of a particular value.
*/ */
@FunctionalInterface @FunctionalInterface
public interface Gauge<T> extends MetricValueProvider<T> { public interface Gauge<T> extends MetricValueProvider<T> { }
/**
* Returns the current value associated with this gauge.
* @param config The configuration for this metric
* @param now The POSIX time in milliseconds the measurement is being taken
*/
T value(MetricConfig config, long now);
}

View File

@ -20,6 +20,8 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import java.util.Objects;
public final class KafkaMetric implements Metric { public final class KafkaMetric implements Metric {
private final MetricName metricName; private final MetricName metricName;
@ -41,9 +43,7 @@ public final class KafkaMetric implements Metric {
MetricConfig config, Time time) { MetricConfig config, Time time) {
this.metricName = metricName; this.metricName = metricName;
this.lock = lock; this.lock = lock;
if (!(valueProvider instanceof Measurable) && !(valueProvider instanceof Gauge)) this.metricValueProvider = Objects.requireNonNull(valueProvider, "valueProvider must not be null");
throw new IllegalArgumentException("Unsupported metric value provider of class " + valueProvider.getClass());
this.metricValueProvider = valueProvider;
this.config = config; this.config = config;
this.time = time; this.time = time;
} }
@ -67,20 +67,15 @@ public final class KafkaMetric implements Metric {
} }
/** /**
* Take the metric and return the value, which could be a {@link Measurable} or a {@link Gauge} * Take the metric and return the value via {@link MetricValueProvider#value(MetricConfig, long)}.
*
* @return Return the metric value * @return Return the metric value
* @throws IllegalStateException if the underlying metric is not a {@link Measurable} or a {@link Gauge}.
*/ */
@Override @Override
public Object metricValue() { public Object metricValue() {
long now = time.milliseconds(); long now = time.milliseconds();
synchronized (this.lock) { synchronized (this.lock) {
if (isMeasurable()) return metricValueProvider.value(config, now);
return ((Measurable) metricValueProvider).measure(config, now);
else if (this.metricValueProvider instanceof Gauge)
return ((Gauge<?>) metricValueProvider).value(config, now);
else
throw new IllegalStateException("Not a valid metric: " + this.metricValueProvider.getClass());
} }
} }

View File

@ -22,11 +22,26 @@ package org.apache.kafka.common.metrics;
public interface Measurable extends MetricValueProvider<Double> { public interface Measurable extends MetricValueProvider<Double> {
/** /**
* Measure this quantity and return the result as a double * Measure this quantity and return the result as a double.
*
* @param config The configuration for this metric * @param config The configuration for this metric
* @param now The POSIX time in milliseconds the measurement is being taken * @param now The POSIX time in milliseconds the measurement is being taken
* @return The measured value * @return The measured value
*/ */
double measure(MetricConfig config, long now); double measure(MetricConfig config, long now);
/**
* Measure this quantity and return the result as a double.
*
* This default implementation delegates to {@link #measure(MetricConfig, long)}.
*
* @param config The configuration for this metric
* @param now The POSIX time in milliseconds the measurement is being taken
* @return The measured value as a {@link Double}
*/
@Override
default Double value(MetricConfig config, long now) {
return measure(config, now);
}
} }

View File

@ -19,10 +19,17 @@ package org.apache.kafka.common.metrics;
/** /**
* Super-interface for {@link Measurable} or {@link Gauge} that provides * Super-interface for {@link Measurable} or {@link Gauge} that provides
* metric values. * metric values.
* <p>
* In the future for Java8 and above, {@link Gauge#value(MetricConfig, long)} will be
* moved to this interface with a default implementation in {@link Measurable} that returns
* {@link Measurable#measure(MetricConfig, long)}.
* </p>
*/ */
public interface MetricValueProvider<T> { } @FunctionalInterface
public interface MetricValueProvider<T> {
/**
* Returns the current value associated with this metric.
*
* @param config The configuration for this metric
* @param now The POSIX time in milliseconds the measurement is being taken
* @return the current metric value
*/
T value(MetricConfig config, long now);
}

View File

@ -48,4 +48,29 @@ public class KafkaMetricTest {
assertThrows(IllegalStateException.class, metric::measurable); assertThrows(IllegalStateException.class, metric::measurable);
} }
@Test
public void testMeasurableValueReturnsZeroWhenNotMeasurable() {
MockTime time = new MockTime();
MetricConfig config = new MetricConfig();
Gauge<Integer> gauge = (c, now) -> 7;
KafkaMetric metric = new KafkaMetric(new Object(), METRIC_NAME, gauge, config, time);
assertEquals(0.0d, metric.measurableValue(time.milliseconds()), 0.0d);
}
@Test
public void testKafkaMetricAcceptsNonMeasurableNonGaugeProvider() {
MetricValueProvider<String> provider = (config, now) -> "metric value provider";
KafkaMetric metric = new KafkaMetric(new Object(), METRIC_NAME, provider, new MetricConfig(), new MockTime());
Object value = metric.metricValue();
assertEquals("metric value provider", value);
}
@Test
public void testConstructorWithNullProvider() {
assertThrows(NullPointerException.class, () ->
new KafkaMetric(new Object(), METRIC_NAME, null, new MetricConfig(), new MockTime())
);
}
} }