KAFKA-6870 Concurrency conflicts in SampledStat (#4985)

Make `KafkaMetric.measurableValue` thread-safe

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
This commit is contained in:
Chia-Ping Tsai 2018-05-10 19:27:45 +08:00 committed by Rajini Sivaram
parent 9679c44d2b
commit 4f7c11a1df
2 changed files with 69 additions and 7 deletions

View File

@ -55,9 +55,7 @@ public final class KafkaMetric implements Metric {
@Override
@Deprecated
public double value() {
synchronized (this.lock) {
return measurableValue(time.milliseconds());
}
return measurableValue(time.milliseconds());
}
@Override
@ -81,10 +79,12 @@ public final class KafkaMetric implements Metric {
}
double measurableValue(long timeMs) {
if (this.metricValueProvider instanceof Measurable)
return ((Measurable) metricValueProvider).measure(config, timeMs);
else
return 0;
synchronized (this.lock) {
if (this.metricValueProvider instanceof Measurable)
return ((Measurable) metricValueProvider).measure(config, timeMs);
else
return 0;
}
}
public void config(MetricConfig config) {

View File

@ -19,19 +19,29 @@ package org.apache.kafka.common.metrics;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Sum;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -124,4 +134,56 @@ public class SensorTest {
assertEquals(1, sensor.metrics().size());
assertEquals(org.apache.kafka.common.metrics.stats.Avg.class, sensor.metrics().get(0).measurable().getClass());
}
/**
* The Sensor#checkQuotas should be thread-safe since the method may be used by many ReplicaFetcherThreads.
*/
@Test
public void testCheckQuotasInMultiThreads() throws InterruptedException, ExecutionException {
final Metrics metrics = new Metrics(new MetricConfig().quota(Quota.upperBound(Double.MAX_VALUE))
// decreasing the value of time window make SampledStat always record the given value
.timeWindow(1, TimeUnit.MILLISECONDS)
// increasing the value of samples make SampledStat store more samples
.samples(100));
final Sensor sensor = metrics.sensor("sensor");
assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new Rate()));
final int threadCount = 10;
final CountDownLatch latch = new CountDownLatch(1);
ExecutorService service = Executors.newFixedThreadPool(threadCount);
List<Future<Throwable>> workers = new ArrayList<>(threadCount);
boolean needShutdown = true;
try {
for (int i = 0; i != threadCount; ++i) {
final int index = i;
workers.add(service.submit(new Callable<Throwable>() {
@Override
public Throwable call() {
try {
assertTrue(latch.await(5, TimeUnit.SECONDS));
for (int j = 0; j != 20; ++j) {
sensor.record(j * index, System.currentTimeMillis() + j, false);
sensor.checkQuotas();
}
return null;
} catch (Throwable e) {
return e;
}
}
}));
}
latch.countDown();
service.shutdown();
assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
needShutdown = false;
for (Future<Throwable> callable : workers) {
assertTrue("If this failure happen frequently, we can try to increase the wait time", callable.isDone());
assertNull("Sensor#checkQuotas SHOULD be thread-safe!", callable.get());
}
} finally {
if (needShutdown) {
service.shutdownNow();
}
}
}
}