mirror of https://github.com/apache/kafka.git
KAFKA-17491: Move BrokerServerMetrics to server module (#17114)
Reviewers: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
parent
beacf488d1
commit
af8d058d8e
|
@ -19,7 +19,6 @@ package kafka.server
|
|||
|
||||
import kafka.raft.KafkaRaftManager
|
||||
import kafka.server.Server.MetricsPrefix
|
||||
import kafka.server.metadata.BrokerServerMetrics
|
||||
import kafka.utils.{CoreUtils, Logging}
|
||||
import org.apache.kafka.common.metrics.Metrics
|
||||
import org.apache.kafka.common.network.ListenerName
|
||||
|
@ -37,7 +36,7 @@ import org.apache.kafka.raft.Endpoints
|
|||
import org.apache.kafka.server.ProcessRole
|
||||
import org.apache.kafka.server.common.ApiMessageAndVersion
|
||||
import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, ProcessTerminatingFaultHandler}
|
||||
import org.apache.kafka.server.metrics.KafkaYammerMetrics
|
||||
import org.apache.kafka.server.metrics.{BrokerServerMetrics, KafkaYammerMetrics}
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
import java.util.Arrays
|
||||
|
@ -267,7 +266,7 @@ class SharedServer(
|
|||
sharedServerConfig.dynamicConfig.initialize(zkClientOpt = None, clientMetricsReceiverPluginOpt = None)
|
||||
|
||||
if (sharedServerConfig.processRoles.contains(ProcessRole.BrokerRole)) {
|
||||
brokerMetrics = BrokerServerMetrics(metrics)
|
||||
brokerMetrics = new BrokerServerMetrics(metrics)
|
||||
}
|
||||
if (sharedServerConfig.processRoles.contains(ProcessRole.ControllerRole)) {
|
||||
controllerServerMetrics = new ControllerMetadataMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()))
|
||||
|
|
|
@ -1,150 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.server.metadata
|
||||
|
||||
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
|
||||
import org.apache.kafka.common.MetricName
|
||||
import org.apache.kafka.common.metrics.Gauge
|
||||
import org.apache.kafka.common.metrics.Metrics
|
||||
import org.apache.kafka.common.metrics.MetricConfig
|
||||
import org.apache.kafka.image.MetadataProvenance
|
||||
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
|
||||
|
||||
import java.util.Collections
|
||||
import java.util.concurrent.TimeUnit.NANOSECONDS
|
||||
|
||||
final class BrokerServerMetrics private (
|
||||
metrics: Metrics
|
||||
) extends AutoCloseable {
|
||||
import BrokerServerMetrics._
|
||||
|
||||
private val metricsGroup = new KafkaMetricsGroup("kafka.server","BrokerMetadataListener")
|
||||
private val batchProcessingTimeHistName = metricsGroup.metricName("MetadataBatchProcessingTimeUs", Collections.emptyMap())
|
||||
|
||||
/**
|
||||
* A histogram tracking the time in microseconds it took to process batches of events.
|
||||
*/
|
||||
private val batchProcessingTimeHist =
|
||||
KafkaYammerMetrics.defaultRegistry().newHistogram(batchProcessingTimeHistName, true)
|
||||
|
||||
private val batchSizeHistName = metricsGroup.metricName("MetadataBatchSizes", Collections.emptyMap())
|
||||
|
||||
/**
|
||||
* A histogram tracking the sizes of batches that we have processed.
|
||||
*/
|
||||
private val batchSizeHist =
|
||||
KafkaYammerMetrics.defaultRegistry().newHistogram(batchSizeHistName, true)
|
||||
|
||||
val lastAppliedImageProvenance: AtomicReference[MetadataProvenance] =
|
||||
new AtomicReference[MetadataProvenance](MetadataProvenance.EMPTY)
|
||||
val metadataLoadErrorCount: AtomicLong = new AtomicLong(0)
|
||||
val metadataApplyErrorCount: AtomicLong = new AtomicLong(0)
|
||||
|
||||
val lastAppliedRecordOffsetName: MetricName = metrics.metricName(
|
||||
"last-applied-record-offset",
|
||||
metricGroupName,
|
||||
"The offset of the last record from the cluster metadata partition that was applied by the broker"
|
||||
)
|
||||
|
||||
val lastAppliedRecordTimestampName: MetricName = metrics.metricName(
|
||||
"last-applied-record-timestamp",
|
||||
metricGroupName,
|
||||
"The timestamp of the last record from the cluster metadata partition that was applied by the broker"
|
||||
)
|
||||
|
||||
val lastAppliedRecordLagMsName: MetricName = metrics.metricName(
|
||||
"last-applied-record-lag-ms",
|
||||
metricGroupName,
|
||||
"The difference between now and the timestamp of the last record from the cluster metadata partition that was applied by the broker"
|
||||
)
|
||||
|
||||
val metadataLoadErrorCountName: MetricName = metrics.metricName(
|
||||
"metadata-load-error-count",
|
||||
metricGroupName,
|
||||
"The number of errors encountered by the BrokerMetadataListener while loading the metadata log and generating a new MetadataDelta based on it."
|
||||
)
|
||||
|
||||
val metadataApplyErrorCountName: MetricName = metrics.metricName(
|
||||
"metadata-apply-error-count",
|
||||
metricGroupName,
|
||||
"The number of errors encountered by the BrokerMetadataPublisher while applying a new MetadataImage based on the latest MetadataDelta."
|
||||
)
|
||||
|
||||
addMetric(metrics, lastAppliedRecordOffsetName) { _ =>
|
||||
lastAppliedImageProvenance.get.lastContainedOffset()
|
||||
}
|
||||
|
||||
addMetric(metrics, lastAppliedRecordTimestampName) { _ =>
|
||||
lastAppliedImageProvenance.get.lastContainedLogTimeMs()
|
||||
}
|
||||
|
||||
addMetric(metrics, lastAppliedRecordLagMsName) { now =>
|
||||
now - lastAppliedImageProvenance.get.lastContainedLogTimeMs()
|
||||
}
|
||||
|
||||
addMetric(metrics, metadataLoadErrorCountName) { _ =>
|
||||
metadataLoadErrorCount.get
|
||||
}
|
||||
|
||||
addMetric(metrics, metadataApplyErrorCountName) { _ =>
|
||||
metadataApplyErrorCount.get
|
||||
}
|
||||
|
||||
override def close(): Unit = {
|
||||
KafkaYammerMetrics.defaultRegistry().removeMetric(batchProcessingTimeHistName)
|
||||
KafkaYammerMetrics.defaultRegistry().removeMetric(batchSizeHistName)
|
||||
List(
|
||||
lastAppliedRecordOffsetName,
|
||||
lastAppliedRecordTimestampName,
|
||||
lastAppliedRecordLagMsName,
|
||||
metadataLoadErrorCountName,
|
||||
metadataApplyErrorCountName
|
||||
).foreach(metrics.removeMetric)
|
||||
}
|
||||
|
||||
def updateBatchProcessingTime(elapsedNs: Long): Unit =
|
||||
batchProcessingTimeHist.update(NANOSECONDS.toMicros(elapsedNs))
|
||||
|
||||
def updateBatchSize(size: Int): Unit = batchSizeHist.update(size)
|
||||
|
||||
def updateLastAppliedImageProvenance(provenance: MetadataProvenance): Unit =
|
||||
lastAppliedImageProvenance.set(provenance)
|
||||
|
||||
def lastAppliedOffset(): Long = lastAppliedImageProvenance.get().lastContainedOffset()
|
||||
|
||||
def lastAppliedTimestamp(): Long = lastAppliedImageProvenance.get().lastContainedLogTimeMs()
|
||||
}
|
||||
|
||||
|
||||
object BrokerServerMetrics {
|
||||
private val metricGroupName = "broker-metadata-metrics"
|
||||
|
||||
private def addMetric[T](metrics: Metrics, name: MetricName)(func: Long => T): Unit = {
|
||||
metrics.addMetric(name, new FuncGauge(func))
|
||||
}
|
||||
|
||||
private final class FuncGauge[T](func: Long => T) extends Gauge[T] {
|
||||
override def value(config: MetricConfig, now: Long): T = {
|
||||
func(now)
|
||||
}
|
||||
}
|
||||
|
||||
def apply(metrics: Metrics): BrokerServerMetrics = {
|
||||
new BrokerServerMetrics(metrics)
|
||||
}
|
||||
}
|
|
@ -1,131 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.server.metadata
|
||||
|
||||
import java.util.Collections
|
||||
import org.apache.kafka.common.MetricName
|
||||
import org.apache.kafka.common.metrics.Metrics
|
||||
import org.apache.kafka.common.utils.MockTime
|
||||
import org.apache.kafka.image.MetadataProvenance
|
||||
import org.junit.jupiter.api.Assertions.assertEquals
|
||||
import org.junit.jupiter.api.Assertions.assertTrue
|
||||
import org.junit.jupiter.api.Test
|
||||
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.util.Using
|
||||
|
||||
final class BrokerServerMetricsTest {
|
||||
@Test
|
||||
def testMetricsExported(): Unit = {
|
||||
val metrics = new Metrics()
|
||||
val expectedGroup = "broker-metadata-metrics"
|
||||
|
||||
// Metric description is not use for metric name equality
|
||||
val expectedMetrics = Set(
|
||||
new MetricName("last-applied-record-offset", expectedGroup, "", Collections.emptyMap()),
|
||||
new MetricName("last-applied-record-timestamp", expectedGroup, "", Collections.emptyMap()),
|
||||
new MetricName("last-applied-record-lag-ms", expectedGroup, "", Collections.emptyMap()),
|
||||
new MetricName("metadata-load-error-count", expectedGroup, "", Collections.emptyMap()),
|
||||
new MetricName("metadata-apply-error-count", expectedGroup, "", Collections.emptyMap())
|
||||
)
|
||||
|
||||
Using(BrokerServerMetrics(metrics)) { brokerMetrics =>
|
||||
val metricsMap = metrics.metrics().asScala.filter{ case (name, _) => name.group == expectedGroup }
|
||||
assertEquals(expectedMetrics.size, metricsMap.size)
|
||||
metricsMap.foreach { case (name, metric) =>
|
||||
assertTrue(expectedMetrics.contains(name))
|
||||
}
|
||||
}
|
||||
|
||||
val metricsMap = metrics.metrics().asScala.filter{ case (name, _) => name.group == expectedGroup }
|
||||
assertEquals(0, metricsMap.size)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testLastAppliedRecordOffset(): Unit = {
|
||||
val metrics = new Metrics()
|
||||
Using(BrokerServerMetrics(metrics)) { brokerMetrics =>
|
||||
val offsetMetric = metrics.metrics().get(brokerMetrics.lastAppliedRecordOffsetName)
|
||||
assertEquals(-1L, offsetMetric.metricValue.asInstanceOf[Long])
|
||||
|
||||
// Update metric value and check
|
||||
val expectedValue = 1000
|
||||
brokerMetrics.updateLastAppliedImageProvenance(new MetadataProvenance(
|
||||
expectedValue,
|
||||
brokerMetrics.lastAppliedImageProvenance.get().lastContainedEpoch(),
|
||||
brokerMetrics.lastAppliedTimestamp()))
|
||||
assertEquals(expectedValue, offsetMetric.metricValue.asInstanceOf[Long])
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testLastAppliedRecordTimestamp(): Unit = {
|
||||
val time = new MockTime()
|
||||
val metrics = new Metrics(time)
|
||||
Using(BrokerServerMetrics(metrics)) { brokerMetrics =>
|
||||
time.sleep(1000)
|
||||
val timestampMetric = metrics.metrics().get(brokerMetrics.lastAppliedRecordTimestampName)
|
||||
val lagMetric = metrics.metrics().get(brokerMetrics.lastAppliedRecordLagMsName)
|
||||
|
||||
assertEquals(-1L, timestampMetric.metricValue.asInstanceOf[Long])
|
||||
assertEquals(time.milliseconds + 1, lagMetric.metricValue.asInstanceOf[Long])
|
||||
|
||||
// Update metric value and check
|
||||
val timestamp = 500L
|
||||
|
||||
brokerMetrics.updateLastAppliedImageProvenance(new MetadataProvenance(
|
||||
brokerMetrics.lastAppliedOffset(),
|
||||
brokerMetrics.lastAppliedImageProvenance.get().lastContainedEpoch(),
|
||||
timestamp))
|
||||
assertEquals(timestamp, timestampMetric.metricValue.asInstanceOf[Long])
|
||||
assertEquals(time.milliseconds - timestamp, lagMetric.metricValue.asInstanceOf[Long])
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testMetadataLoadErrorCount(): Unit = {
|
||||
val time = new MockTime()
|
||||
val metrics = new Metrics(time)
|
||||
Using(BrokerServerMetrics(metrics)) { brokerMetrics =>
|
||||
val metadataLoadErrorCountMetric = metrics.metrics().get(brokerMetrics.metadataLoadErrorCountName)
|
||||
|
||||
assertEquals(0L, metadataLoadErrorCountMetric.metricValue.asInstanceOf[Long])
|
||||
|
||||
// Update metric value and check
|
||||
val errorCount = 100
|
||||
brokerMetrics.metadataLoadErrorCount.set(errorCount)
|
||||
assertEquals(errorCount, metadataLoadErrorCountMetric.metricValue.asInstanceOf[Long])
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testMetadataApplyErrorCount(): Unit = {
|
||||
val time = new MockTime()
|
||||
val metrics = new Metrics(time)
|
||||
Using(BrokerServerMetrics(metrics)) { brokerMetrics =>
|
||||
val metadataApplyErrorCountMetric = metrics.metrics().get(brokerMetrics.metadataApplyErrorCountName)
|
||||
|
||||
assertEquals(0L, metadataApplyErrorCountMetric.metricValue.asInstanceOf[Long])
|
||||
|
||||
// Update metric value and check
|
||||
val errorCount = 100
|
||||
brokerMetrics.metadataApplyErrorCount.set(errorCount)
|
||||
assertEquals(errorCount, metadataApplyErrorCountMetric.metricValue.asInstanceOf[Long])
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,164 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.server.metrics;
|
||||
|
||||
import org.apache.kafka.common.MetricName;
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
import org.apache.kafka.image.MetadataProvenance;
|
||||
|
||||
import com.yammer.metrics.core.Histogram;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.NANOSECONDS;
|
||||
|
||||
public final class BrokerServerMetrics implements AutoCloseable {
|
||||
private static final String METRIC_GROUP_NAME = "broker-metadata-metrics";
|
||||
|
||||
private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", "BrokerMetadataListener");
|
||||
private final com.yammer.metrics.core.MetricName batchProcessingTimeHistName =
|
||||
metricsGroup.metricName("MetadataBatchProcessingTimeUs", Collections.emptyMap());
|
||||
|
||||
/**
|
||||
* A histogram tracking the time in microseconds it took to process batches of events.
|
||||
*/
|
||||
private final Histogram batchProcessingTimeHist = KafkaYammerMetrics.defaultRegistry()
|
||||
.newHistogram(batchProcessingTimeHistName, true);
|
||||
|
||||
private final com.yammer.metrics.core.MetricName batchSizeHistName =
|
||||
metricsGroup.metricName("MetadataBatchSizes", Collections.emptyMap());
|
||||
|
||||
/**
|
||||
* A histogram tracking the sizes of batches that we have processed.
|
||||
*/
|
||||
private final Histogram batchSizeHist = KafkaYammerMetrics.defaultRegistry()
|
||||
.newHistogram(batchSizeHistName, true);
|
||||
|
||||
private final AtomicReference<MetadataProvenance> lastAppliedImageProvenance = new AtomicReference<>(MetadataProvenance.EMPTY);
|
||||
private final AtomicLong metadataLoadErrorCount = new AtomicLong(0);
|
||||
private final AtomicLong metadataApplyErrorCount = new AtomicLong(0);
|
||||
|
||||
private final Metrics metrics;
|
||||
private final MetricName lastAppliedRecordOffsetName;
|
||||
private final MetricName lastAppliedRecordTimestampName;
|
||||
private final MetricName lastAppliedRecordLagMsName;
|
||||
private final MetricName metadataLoadErrorCountName;
|
||||
private final MetricName metadataApplyErrorCountName;
|
||||
|
||||
public BrokerServerMetrics(Metrics metrics) {
|
||||
this.metrics = metrics;
|
||||
lastAppliedRecordOffsetName = metrics.metricName(
|
||||
"last-applied-record-offset",
|
||||
METRIC_GROUP_NAME,
|
||||
"The offset of the last record from the cluster metadata partition that was applied by the broker"
|
||||
);
|
||||
lastAppliedRecordTimestampName = metrics.metricName(
|
||||
"last-applied-record-timestamp",
|
||||
METRIC_GROUP_NAME,
|
||||
"The timestamp of the last record from the cluster metadata partition that was applied by the broker"
|
||||
);
|
||||
lastAppliedRecordLagMsName = metrics.metricName(
|
||||
"last-applied-record-lag-ms",
|
||||
METRIC_GROUP_NAME,
|
||||
"The difference between now and the timestamp of the last record from the cluster metadata partition that was applied by the broker"
|
||||
);
|
||||
metadataLoadErrorCountName = metrics.metricName(
|
||||
"metadata-load-error-count",
|
||||
METRIC_GROUP_NAME,
|
||||
"The number of errors encountered by the BrokerMetadataListener while loading the metadata log and generating a new MetadataDelta based on it."
|
||||
);
|
||||
metadataApplyErrorCountName = metrics.metricName(
|
||||
"metadata-apply-error-count",
|
||||
METRIC_GROUP_NAME,
|
||||
"The number of errors encountered by the BrokerMetadataPublisher while applying a new MetadataImage based on the latest MetadataDelta."
|
||||
);
|
||||
|
||||
metrics.addMetric(lastAppliedRecordOffsetName, (config, now) -> lastAppliedImageProvenance.get().lastContainedOffset());
|
||||
metrics.addMetric(lastAppliedRecordTimestampName, (config, now) -> lastAppliedImageProvenance.get().lastContainedLogTimeMs());
|
||||
metrics.addMetric(lastAppliedRecordLagMsName, (config, now) -> now - lastAppliedImageProvenance.get().lastContainedLogTimeMs());
|
||||
metrics.addMetric(metadataLoadErrorCountName, (config, now) -> metadataLoadErrorCount.get());
|
||||
metrics.addMetric(metadataApplyErrorCountName, (config, now) -> metadataApplyErrorCount.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
KafkaYammerMetrics.defaultRegistry().removeMetric(batchProcessingTimeHistName);
|
||||
KafkaYammerMetrics.defaultRegistry().removeMetric(batchSizeHistName);
|
||||
Stream.of(
|
||||
lastAppliedRecordOffsetName,
|
||||
lastAppliedRecordTimestampName,
|
||||
lastAppliedRecordLagMsName,
|
||||
metadataLoadErrorCountName,
|
||||
metadataApplyErrorCountName
|
||||
).forEach(metrics::removeMetric);
|
||||
}
|
||||
|
||||
public MetricName lastAppliedRecordOffsetName() {
|
||||
return lastAppliedRecordOffsetName;
|
||||
}
|
||||
|
||||
public MetricName lastAppliedRecordTimestampName() {
|
||||
return lastAppliedRecordTimestampName;
|
||||
}
|
||||
|
||||
public MetricName lastAppliedRecordLagMsName() {
|
||||
return lastAppliedRecordLagMsName;
|
||||
}
|
||||
|
||||
public MetricName metadataLoadErrorCountName() {
|
||||
return metadataLoadErrorCountName;
|
||||
}
|
||||
|
||||
public MetricName metadataApplyErrorCountName() {
|
||||
return metadataApplyErrorCountName;
|
||||
}
|
||||
|
||||
public AtomicReference<MetadataProvenance> lastAppliedImageProvenance() {
|
||||
return lastAppliedImageProvenance;
|
||||
}
|
||||
|
||||
public AtomicLong metadataLoadErrorCount() {
|
||||
return metadataLoadErrorCount;
|
||||
}
|
||||
|
||||
public AtomicLong metadataApplyErrorCount() {
|
||||
return metadataApplyErrorCount;
|
||||
}
|
||||
|
||||
public void updateBatchProcessingTime(long elapsedNs) {
|
||||
batchProcessingTimeHist.update(NANOSECONDS.toMicros(elapsedNs));
|
||||
}
|
||||
|
||||
public void updateBatchSize(int size) {
|
||||
batchSizeHist.update(size);
|
||||
}
|
||||
|
||||
void updateLastAppliedImageProvenance(MetadataProvenance provenance) {
|
||||
lastAppliedImageProvenance.set(provenance);
|
||||
}
|
||||
|
||||
long lastAppliedOffset() {
|
||||
return lastAppliedImageProvenance.get().lastContainedOffset();
|
||||
}
|
||||
|
||||
long lastAppliedTimestamp() {
|
||||
return lastAppliedImageProvenance.get().lastContainedLogTimeMs();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,139 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.server.metrics;
|
||||
|
||||
import org.apache.kafka.common.MetricName;
|
||||
import org.apache.kafka.common.metrics.KafkaMetric;
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
import org.apache.kafka.common.utils.MockTime;
|
||||
import org.apache.kafka.image.MetadataProvenance;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public final class BrokerServerMetricsTest {
|
||||
@Test
|
||||
public void testMetricsExported() throws Exception {
|
||||
Metrics metrics = new Metrics();
|
||||
String expectedGroup = "broker-metadata-metrics";
|
||||
|
||||
// Metric description is not use for metric name equality
|
||||
Set<MetricName> expectedMetrics = new HashSet<>(Arrays.asList(
|
||||
new MetricName("last-applied-record-offset", expectedGroup, "", Collections.emptyMap()),
|
||||
new MetricName("last-applied-record-timestamp", expectedGroup, "", Collections.emptyMap()),
|
||||
new MetricName("last-applied-record-lag-ms", expectedGroup, "", Collections.emptyMap()),
|
||||
new MetricName("metadata-load-error-count", expectedGroup, "", Collections.emptyMap()),
|
||||
new MetricName("metadata-apply-error-count", expectedGroup, "", Collections.emptyMap())
|
||||
));
|
||||
|
||||
try (BrokerServerMetrics ignored = new BrokerServerMetrics(metrics)) {
|
||||
Map<MetricName, KafkaMetric> metricsMap = metrics.metrics().entrySet().stream()
|
||||
.filter(entry -> Objects.equals(entry.getKey().group(), expectedGroup))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
assertEquals(expectedMetrics.size(), metricsMap.size());
|
||||
metricsMap.forEach((name, metric) -> assertTrue(expectedMetrics.contains(name)));
|
||||
}
|
||||
|
||||
Map<MetricName, KafkaMetric> metricsMap = metrics.metrics().entrySet().stream()
|
||||
.filter(entry -> Objects.equals(entry.getKey().group(), expectedGroup))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
assertEquals(0, metricsMap.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLastAppliedRecordOffset() throws Exception {
|
||||
Metrics metrics = new Metrics();
|
||||
try (BrokerServerMetrics brokerMetrics = new BrokerServerMetrics(metrics)) {
|
||||
KafkaMetric offsetMetric = metrics.metrics().get(brokerMetrics.lastAppliedRecordOffsetName());
|
||||
assertEquals((double) -1L, offsetMetric.metricValue());
|
||||
|
||||
// Update metric value and check
|
||||
long expectedValue = 1000;
|
||||
brokerMetrics.updateLastAppliedImageProvenance(new MetadataProvenance(
|
||||
expectedValue,
|
||||
brokerMetrics.lastAppliedImageProvenance().get().lastContainedEpoch(),
|
||||
brokerMetrics.lastAppliedTimestamp()));
|
||||
assertEquals((double) expectedValue, offsetMetric.metricValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLastAppliedRecordTimestamp() throws Exception {
|
||||
MockTime time = new MockTime();
|
||||
Metrics metrics = new Metrics(time);
|
||||
try (BrokerServerMetrics brokerMetrics = new BrokerServerMetrics(metrics)) {
|
||||
time.sleep(1000);
|
||||
KafkaMetric timestampMetric = metrics.metrics().get(brokerMetrics.lastAppliedRecordTimestampName());
|
||||
KafkaMetric lagMetric = metrics.metrics().get(brokerMetrics.lastAppliedRecordLagMsName());
|
||||
|
||||
assertEquals((double) -1L, timestampMetric.metricValue());
|
||||
assertEquals((double) time.milliseconds() + 1, lagMetric.metricValue());
|
||||
|
||||
// Update metric value and check
|
||||
long timestamp = 500L;
|
||||
|
||||
brokerMetrics.updateLastAppliedImageProvenance(new MetadataProvenance(
|
||||
brokerMetrics.lastAppliedOffset(),
|
||||
brokerMetrics.lastAppliedImageProvenance().get().lastContainedEpoch(),
|
||||
timestamp));
|
||||
assertEquals((double) timestamp, timestampMetric.metricValue());
|
||||
assertEquals((double) time.milliseconds() - timestamp, lagMetric.metricValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetadataLoadErrorCount() throws Exception {
|
||||
MockTime time = new MockTime();
|
||||
Metrics metrics = new Metrics(time);
|
||||
try (BrokerServerMetrics brokerMetrics = new BrokerServerMetrics(metrics)) {
|
||||
KafkaMetric metadataLoadErrorCountMetric = metrics.metrics().get(brokerMetrics.metadataLoadErrorCountName());
|
||||
|
||||
assertEquals((double) 0L, metadataLoadErrorCountMetric.metricValue());
|
||||
|
||||
// Update metric value and check
|
||||
long errorCount = 100;
|
||||
brokerMetrics.metadataLoadErrorCount().set(errorCount);
|
||||
assertEquals((double) errorCount, metadataLoadErrorCountMetric.metricValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetadataApplyErrorCount() throws Exception {
|
||||
MockTime time = new MockTime();
|
||||
Metrics metrics = new Metrics(time);
|
||||
try (BrokerServerMetrics brokerMetrics = new BrokerServerMetrics(metrics)) {
|
||||
KafkaMetric metadataApplyErrorCountMetric = metrics.metrics().get(brokerMetrics.metadataApplyErrorCountName());
|
||||
|
||||
assertEquals((double) 0L, metadataApplyErrorCountMetric.metricValue());
|
||||
|
||||
// Update metric value and check
|
||||
long errorCount = 100;
|
||||
brokerMetrics.metadataApplyErrorCount().set(errorCount);
|
||||
assertEquals((double) errorCount, metadataApplyErrorCountMetric.metricValue());
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue