KAFKA-13846: Adding overloaded metricOrElseCreate method (#12121)

Reviewers: David Jacot <djacot@confluent.io>, Justine Olshan <jolshan@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
vamossagar12 2022-06-13 23:06:39 +05:30 committed by GitHub
parent 4426b05e54
commit 5cab11cf52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 92 additions and 12 deletions

View File

@ -509,7 +509,10 @@ public class Metrics implements Closeable {
Objects.requireNonNull(metricValueProvider), Objects.requireNonNull(metricValueProvider),
config == null ? this.config : config, config == null ? this.config : config,
time); time);
registerMetric(m); KafkaMetric existingMetric = registerMetric(m);
if (existingMetric != null) {
throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one.");
}
} }
/** /**
@ -524,6 +527,26 @@ public class Metrics implements Closeable {
addMetric(metricName, null, metricValueProvider); addMetric(metricName, null, metricValueProvider);
} }
/**
* Create or get an existing metric to monitor an object that implements MetricValueProvider.
* This metric won't be associated with any sensor. This is a way to expose existing values as metrics.
* This method takes care of synchronisation while updating/accessing metrics by concurrent threads.
*
* @param metricName The name of the metric
* @param metricValueProvider The metric value provider associated with this metric
* @return Existing KafkaMetric if already registered or else a newly created one
*/
public KafkaMetric addMetricIfAbsent(MetricName metricName, MetricConfig config, MetricValueProvider<?> metricValueProvider) {
KafkaMetric metric = new KafkaMetric(new Object(),
Objects.requireNonNull(metricName),
Objects.requireNonNull(metricValueProvider),
config == null ? this.config : config,
time);
KafkaMetric existingMetric = registerMetric(metric);
return existingMetric == null ? metric : existingMetric;
}
/** /**
* Remove a metric if it exists and return it. Return null otherwise. If a metric is removed, `metricRemoval` * Remove a metric if it exists and return it. Return null otherwise. If a metric is removed, `metricRemoval`
* will be invoked for each reporter. * will be invoked for each reporter.
@ -563,10 +586,18 @@ public class Metrics implements Closeable {
} }
} }
synchronized void registerMetric(KafkaMetric metric) { /**
* Register a metric if not present or return an already existing metric otherwise.
* When a metric is newly registered, this method returns null
*
* @param metric The KafkaMetric to register
* @return KafkaMetric if the metric already exists, null otherwise
*/
synchronized KafkaMetric registerMetric(KafkaMetric metric) {
MetricName metricName = metric.metricName(); MetricName metricName = metric.metricName();
if (this.metrics.containsKey(metricName)) if (this.metrics.containsKey(metricName)) {
throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one."); return this.metrics.get(metricName);
}
this.metrics.put(metricName, metric); this.metrics.put(metricName, metric);
for (MetricsReporter reporter : reporters) { for (MetricsReporter reporter : reporters) {
try { try {
@ -576,6 +607,7 @@ public class Metrics implements Closeable {
} }
} }
log.trace("Registered metric named {}", metricName); log.trace("Registered metric named {}", metricName);
return null;
} }
/** /**

View File

@ -297,7 +297,10 @@ public final class Sensor {
for (NamedMeasurable m : stat.stats()) { for (NamedMeasurable m : stat.stats()) {
final KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), statConfig, time); final KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), statConfig, time);
if (!metrics.containsKey(metric.metricName())) { if (!metrics.containsKey(metric.metricName())) {
registry.registerMetric(metric); KafkaMetric existingMetric = registry.registerMetric(metric);
if (existingMetric != null) {
throw new IllegalArgumentException("A metric named '" + metric.metricName() + "' already exists, can't register another one.");
}
metrics.put(metric.metricName(), metric); metrics.put(metric.metricName(), metric);
} }
} }
@ -336,7 +339,10 @@ public final class Sensor {
statConfig, statConfig,
time time
); );
registry.registerMetric(metric); KafkaMetric existingMetric = registry.registerMetric(metric);
if (existingMetric != null) {
throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one.");
}
metrics.put(metric.metricName(), metric); metrics.put(metric.metricName(), metric);
stats.add(new StatAndConfig(Objects.requireNonNull(stat), metric::config)); stats.add(new StatAndConfig(Objects.requireNonNull(stat), metric::config));
return true; return true;

View File

@ -319,9 +319,7 @@ public class ConnectMetrics {
*/ */
public <T> void addValueMetric(MetricNameTemplate nameTemplate, final LiteralSupplier<T> supplier) { public <T> void addValueMetric(MetricNameTemplate nameTemplate, final LiteralSupplier<T> supplier) {
MetricName metricName = metricName(nameTemplate); MetricName metricName = metricName(nameTemplate);
if (metrics().metric(metricName) == null) { metrics().addMetricIfAbsent(metricName, null, (Gauge<T>) (config, now) -> supplier.metricValue(now));
metrics().addMetric(metricName, (Gauge<T>) (config, now) -> supplier.metricValue(now));
}
} }
/** /**
@ -333,9 +331,7 @@ public class ConnectMetrics {
*/ */
public <T> void addImmutableValueMetric(MetricNameTemplate nameTemplate, final T value) { public <T> void addImmutableValueMetric(MetricNameTemplate nameTemplate, final T value) {
MetricName metricName = metricName(nameTemplate); MetricName metricName = metricName(nameTemplate);
if (metrics().metric(metricName) == null) { metrics().addMetricIfAbsent(metricName, null, (Gauge<T>) (config, now) -> value);
metrics().addMetric(metricName, (Gauge<T>) (config, now) -> value);
}
} }
/** /**

View File

@ -45,6 +45,7 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkMap;
@ -84,6 +85,8 @@ import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
import static org.powermock.api.easymock.PowerMock.createMock; import static org.powermock.api.easymock.PowerMock.createMock;
@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
@ -497,6 +500,17 @@ public class StreamsMetricsImplTest {
verify(metrics); verify(metrics);
} }
@Test
public void shouldCreateNewStoreLevelMutableMetric() {
final MetricName metricName =
new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP);
final MetricConfig metricConfig = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
final Metrics metrics = new Metrics(metricConfig);
assertNull(metrics.metric(metricName));
metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER);
assertNotNull(metrics.metric(metricName));
}
@Test @Test
public void shouldNotAddStoreLevelMutableMetricIfAlreadyExists() { public void shouldNotAddStoreLevelMutableMetricIfAlreadyExists() {
final Metrics metrics = mock(Metrics.class); final Metrics metrics = mock(Metrics.class);
@ -521,6 +535,38 @@ public class StreamsMetricsImplTest {
verify(metrics); verify(metrics);
} }
@Test
public void shouldReturnSameMetricIfAlreadyCreated() {
final MetricName metricName =
new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP);
final MetricConfig metricConfig = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
final Metrics metrics = new Metrics(metricConfig);
assertNull(metrics.metric(metricName));
final KafkaMetric kafkaMetric = metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER);
assertEquals(kafkaMetric, metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER));
}
@Test
public void shouldCreateMetricOnceDuringConcurrentMetricCreationRequest() throws InterruptedException {
final MetricName metricName =
new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP);
final MetricConfig metricConfig = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
final Metrics metrics = new Metrics(metricConfig);
assertNull(metrics.metric(metricName));
final AtomicReference<KafkaMetric> metricCreatedViaThread1 = new AtomicReference<>();
final AtomicReference<KafkaMetric> metricCreatedViaThread2 = new AtomicReference<>();
final Thread thread1 = new Thread(() -> metricCreatedViaThread1.set(metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER)));
final Thread thread2 = new Thread(() -> metricCreatedViaThread2.set(metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER)));
thread1.start();
thread2.start();
thread1.join();
thread2.join();
assertEquals(metricCreatedViaThread1.get(), metricCreatedViaThread2.get());
}
@Test @Test
public void shouldRemoveStateStoreLevelSensors() { public void shouldRemoveStateStoreLevelSensors() {
final Metrics metrics = niceMock(Metrics.class); final Metrics metrics = niceMock(Metrics.class);