mirror of https://github.com/apache/kafka.git
KAFKA-7560; PushHttpMetricsReporter should not convert metric value to double
Currently PushHttpMetricsReporter will convert value from KafkaMetric.metricValue() to double. This will not work for non-numerical metrics such as version in AppInfoParser whose value can be string. This has caused issue for PushHttpMetricsReporter which in turn caused system test kafkatest.tests.client.quota_test.QuotaTest.test_quota to fail. Since we allow metric value to be object, PushHttpMetricsReporter should also read metric value as object and pass it to the http server. Author: Dong Lin <lindong28@gmail.com> Reviewers: Manikumar Reddy O <manikumar.reddy@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io> Closes #5886 from lindong28/KAFKA-7560
This commit is contained in:
parent
7b5ffa0a07
commit
df0faee097
|
@ -174,8 +174,7 @@ public class PushHttpMetricsReporter implements MetricsReporter {
|
|||
samples = new ArrayList<>(metrics.size());
|
||||
for (KafkaMetric metric : metrics.values()) {
|
||||
MetricName name = metric.metricName();
|
||||
double value = (Double) metric.metricValue();
|
||||
samples.add(new MetricValue(name.name(), name.group(), name.tags(), value));
|
||||
samples.add(new MetricValue(name.name(), name.group(), name.tags(), metric.metricValue()));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -212,9 +211,9 @@ public class PushHttpMetricsReporter implements MetricsReporter {
|
|||
} else {
|
||||
log.info("Finished reporting metrics with response code {}", responseCode);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Error reporting metrics", e);
|
||||
throw new KafkaException("Failed to report current metrics", e);
|
||||
} catch (Throwable t) {
|
||||
log.error("Error reporting metrics", t);
|
||||
throw new KafkaException("Failed to report current metrics", t);
|
||||
} finally {
|
||||
if (connection != null) {
|
||||
connection.disconnect();
|
||||
|
|
|
@ -18,10 +18,11 @@ package org.apache.kafka.tools;
|
|||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import java.util.List;
|
||||
import org.apache.kafka.common.MetricName;
|
||||
import org.apache.kafka.common.config.ConfigException;
|
||||
import org.apache.kafka.common.metrics.Gauge;
|
||||
import org.apache.kafka.common.metrics.KafkaMetric;
|
||||
import org.apache.kafka.common.metrics.Measurable;
|
||||
import org.apache.kafka.common.metrics.MetricConfig;
|
||||
import org.apache.kafka.common.utils.MockTime;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
|
@ -184,32 +185,40 @@ public class PushHttpMetricsReporterTest {
|
|||
KafkaMetric metric1 = new KafkaMetric(
|
||||
new Object(),
|
||||
new MetricName("name1", "group1", "desc1", Collections.singletonMap("key1", "value1")),
|
||||
new ImmutableValue(1.0),
|
||||
new ImmutableValue<>(1.0),
|
||||
null,
|
||||
time
|
||||
);
|
||||
KafkaMetric newMetric1 = new KafkaMetric(
|
||||
new Object(),
|
||||
new MetricName("name1", "group1", "desc1", Collections.singletonMap("key1", "value1")),
|
||||
new ImmutableValue(-1.0),
|
||||
new ImmutableValue<>(-1.0),
|
||||
null,
|
||||
time
|
||||
);
|
||||
KafkaMetric metric2 = new KafkaMetric(
|
||||
new Object(),
|
||||
new MetricName("name2", "group2", "desc2", Collections.singletonMap("key2", "value2")),
|
||||
new ImmutableValue(2.0),
|
||||
new ImmutableValue<>(2.0),
|
||||
null,
|
||||
time
|
||||
);
|
||||
KafkaMetric metric3 = new KafkaMetric(
|
||||
new Object(),
|
||||
new MetricName("name3", "group3", "desc3", Collections.singletonMap("key3", "value3")),
|
||||
new ImmutableValue(3.0),
|
||||
new ImmutableValue<>(3.0),
|
||||
null,
|
||||
time
|
||||
);
|
||||
reporter.init(Arrays.asList(metric1, metric2));
|
||||
KafkaMetric metric4 = new KafkaMetric(
|
||||
new Object(),
|
||||
new MetricName("name4", "group4", "desc4", Collections.singletonMap("key4", "value4")),
|
||||
new ImmutableValue<>("value4"),
|
||||
null,
|
||||
time
|
||||
);
|
||||
|
||||
reporter.init(Arrays.asList(metric1, metric2, metric4));
|
||||
reporter.metricChange(newMetric1); // added in init, modified
|
||||
reporter.metricChange(metric3); // added by change
|
||||
reporter.metricRemoval(metric2); // added in init, deleted by removal
|
||||
|
@ -222,9 +231,12 @@ public class PushHttpMetricsReporterTest {
|
|||
// We should be left with the modified version of metric1 and metric3
|
||||
JsonNode metrics = payload.get("metrics");
|
||||
assertTrue(metrics.isArray());
|
||||
assertEquals(2, metrics.size());
|
||||
assertEquals(3, metrics.size());
|
||||
List<JsonNode> metricsList = Arrays.asList(metrics.get(0), metrics.get(1), metrics.get(2));
|
||||
// Sort metrics based on name so that we can verify the value for each metric below
|
||||
metricsList.sort((m1, m2) -> m1.get("name").textValue().compareTo(m2.get("name").textValue()));
|
||||
|
||||
JsonNode m1 = metrics.get(0);
|
||||
JsonNode m1 = metricsList.get(0);
|
||||
assertEquals("name1", m1.get("name").textValue());
|
||||
assertEquals("group1", m1.get("group").textValue());
|
||||
JsonNode m1Tags = m1.get("tags");
|
||||
|
@ -233,7 +245,7 @@ public class PushHttpMetricsReporterTest {
|
|||
assertEquals("value1", m1Tags.get("key1").textValue());
|
||||
assertEquals(-1.0, m1.get("value").doubleValue(), 0.0);
|
||||
|
||||
JsonNode m3 = metrics.get(1);
|
||||
JsonNode m3 = metricsList.get(1);
|
||||
assertEquals("name3", m3.get("name").textValue());
|
||||
assertEquals("group3", m3.get("group").textValue());
|
||||
JsonNode m3Tags = m3.get("tags");
|
||||
|
@ -242,6 +254,15 @@ public class PushHttpMetricsReporterTest {
|
|||
assertEquals("value3", m3Tags.get("key3").textValue());
|
||||
assertEquals(3.0, m3.get("value").doubleValue(), 0.0);
|
||||
|
||||
JsonNode m4 = metricsList.get(2);
|
||||
assertEquals("name4", m4.get("name").textValue());
|
||||
assertEquals("group4", m4.get("group").textValue());
|
||||
JsonNode m4Tags = m4.get("tags");
|
||||
assertTrue(m4Tags.isObject());
|
||||
assertEquals(1, m4Tags.size());
|
||||
assertEquals("value4", m4Tags.get("key4").textValue());
|
||||
assertEquals("value4", m4.get("value").textValue());
|
||||
|
||||
reporter.close();
|
||||
|
||||
verifyAll();
|
||||
|
@ -318,15 +339,15 @@ public class PushHttpMetricsReporterTest {
|
|||
EasyMock.expect(executor.awaitTermination(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class))).andReturn(true);
|
||||
}
|
||||
|
||||
private static class ImmutableValue implements Measurable {
|
||||
private final double value;
|
||||
static class ImmutableValue<T> implements Gauge<T> {
|
||||
private final T value;
|
||||
|
||||
public ImmutableValue(double value) {
|
||||
public ImmutableValue(T value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public double measure(MetricConfig config, long now) {
|
||||
public T value(MetricConfig config, long now) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue