KAFKA-16972 Move BrokerTopicMetrics to org.apache.kafka.storage.log.metrics (#16387)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
PoAn Yang 2024-07-31 03:07:09 +08:00 committed by GitHub
parent 7c0a96d08d
commit 010ab19b72
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 515 additions and 326 deletions

View File

@ -83,6 +83,11 @@
<allow pkg="org.apache.kafka.coordinator.transaction"/> <allow pkg="org.apache.kafka.coordinator.transaction"/>
</subpackage> </subpackage>
<subpackage name="storage.log">
<allow pkg="org.apache.kafka.server" />
<allow pkg="com.yammer.metrics" />
</subpackage>
<!-- START OF TIERED STORAGE INTEGRATION TEST IMPORT DEPENDENCIES --> <!-- START OF TIERED STORAGE INTEGRATION TEST IMPORT DEPENDENCIES -->
<subpackage name="tiered.storage"> <subpackage name="tiered.storage">
<allow pkg="scala" /> <allow pkg="scala" />

View File

@ -21,7 +21,7 @@ import com.yammer.metrics.core.MetricName
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException} import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.log.LocalLog.nextOption import kafka.log.LocalLog.nextOption
import kafka.log.remote.RemoteLogManager import kafka.log.remote.RemoteLogManager
import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, RequestLocal} import kafka.server.{BrokerTopicStats, RequestLocal}
import kafka.utils._ import kafka.utils._
import org.apache.kafka.common.errors._ import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.internals.Topic
@ -42,6 +42,7 @@ import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile} import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, VerificationGuard} import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics
import java.io.{File, IOException} import java.io.{File, IOException}
import java.nio.file.{Files, Path} import java.nio.file.{Files, Path}

View File

@ -21,17 +21,16 @@ import kafka.network.RequestChannel
import kafka.utils.{Exit, Logging, Pool} import kafka.utils.{Exit, Logging, Pool}
import kafka.server.KafkaRequestHandler.{threadCurrentRequest, threadRequestChannel} import kafka.server.KafkaRequestHandler.{threadCurrentRequest, threadRequestChannel}
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit} import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import com.yammer.metrics.core.Meter import com.yammer.metrics.core.Meter
import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.utils.{KafkaThread, Time} import org.apache.kafka.common.utils.{KafkaThread, Time}
import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics
import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics
import java.util.Collections
import scala.collection.mutable import scala.collection.mutable
import scala.jdk.CollectionConverters._
trait ApiRequestHandler { trait ApiRequestHandler {
def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit
@ -246,284 +245,11 @@ class KafkaRequestHandlerPool(
} }
} }
class BrokerTopicMetrics(name: Option[String], remoteStorageEnabled: Boolean = false) {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
val tags: java.util.Map[String, String] = name match {
case None => Collections.emptyMap()
case Some(topic) => Map("topic" -> topic).asJava
}
case class MeterWrapper(metricType: String, eventType: String) {
@volatile private var lazyMeter: Meter = _
private val meterLock = new Object
def meter(): Meter = {
var meter = lazyMeter
if (meter == null) {
meterLock synchronized {
meter = lazyMeter
if (meter == null) {
meter = metricsGroup.newMeter(metricType, eventType, TimeUnit.SECONDS, tags)
lazyMeter = meter
}
}
}
meter
}
def close(): Unit = meterLock synchronized {
if (lazyMeter != null) {
metricsGroup.removeMetric(metricType, tags)
lazyMeter = null
}
}
if (tags.isEmpty) // greedily initialize the general topic metrics
meter()
}
case class GaugeWrapper(metricType: String) {
// The map to store:
// - per-partition value for topic-level metrics. The key will be the partition number
// - per-topic value for broker-level metrics. The key will be the topic name
private val metricValues = new ConcurrentHashMap[String, Long]()
def setValue(key: String, value: Long): Unit = {
newGaugeIfNeed()
metricValues.put(key, value)
}
def removeKey(key: String): Unit = {
newGaugeIfNeed()
metricValues.remove(key)
}
// metricsGroup uses ConcurrentMap to store gauges, so we don't need to use synchronized block here
def close(): Unit = {
metricsGroup.removeMetric(metricType, tags)
metricValues.clear()
}
def value(): Long = metricValues.values().stream().mapToLong(v => v).sum()
// metricsGroup uses ConcurrentMap to store gauges, so we don't need to use synchronized block here
private def newGaugeIfNeed(): Unit = {
metricsGroup.newGauge(metricType, () => value(), tags)
}
newGaugeIfNeed()
}
// an internal map for "lazy initialization" of certain metrics
private val metricTypeMap = new Pool[String, MeterWrapper]()
private val metricGaugeTypeMap = new Pool[String, GaugeWrapper]()
metricTypeMap.putAll(Map(
BrokerTopicStats.MessagesInPerSec -> MeterWrapper(BrokerTopicStats.MessagesInPerSec, "messages"),
BrokerTopicStats.BytesInPerSec -> MeterWrapper(BrokerTopicStats.BytesInPerSec, "bytes"),
BrokerTopicStats.BytesOutPerSec -> MeterWrapper(BrokerTopicStats.BytesOutPerSec, "bytes"),
BrokerTopicStats.BytesRejectedPerSec -> MeterWrapper(BrokerTopicStats.BytesRejectedPerSec, "bytes"),
BrokerTopicStats.FailedProduceRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedProduceRequestsPerSec, "requests"),
BrokerTopicStats.FailedFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedFetchRequestsPerSec, "requests"),
BrokerTopicStats.TotalProduceRequestsPerSec -> MeterWrapper(BrokerTopicStats.TotalProduceRequestsPerSec, "requests"),
BrokerTopicStats.TotalFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"),
BrokerTopicStats.FetchMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"),
BrokerTopicStats.ProduceMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests"),
BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec -> MeterWrapper(BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec, "requests"),
BrokerTopicStats.InvalidMagicNumberRecordsPerSec -> MeterWrapper(BrokerTopicStats.InvalidMagicNumberRecordsPerSec, "requests"),
BrokerTopicStats.InvalidMessageCrcRecordsPerSec -> MeterWrapper(BrokerTopicStats.InvalidMessageCrcRecordsPerSec, "requests"),
BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec -> MeterWrapper(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec, "requests")
).asJava)
if (name.isEmpty) {
metricTypeMap.put(BrokerTopicStats.ReplicationBytesInPerSec, MeterWrapper(BrokerTopicStats.ReplicationBytesInPerSec, "bytes"))
metricTypeMap.put(BrokerTopicStats.ReplicationBytesOutPerSec, MeterWrapper(BrokerTopicStats.ReplicationBytesOutPerSec, "bytes"))
metricTypeMap.put(BrokerTopicStats.ReassignmentBytesInPerSec, MeterWrapper(BrokerTopicStats.ReassignmentBytesInPerSec, "bytes"))
metricTypeMap.put(BrokerTopicStats.ReassignmentBytesOutPerSec, MeterWrapper(BrokerTopicStats.ReassignmentBytesOutPerSec, "bytes"))
}
if (remoteStorageEnabled) {
metricTypeMap.putAll(Map(
RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName, "bytes"),
RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName, "bytes"),
RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName, "requests"),
RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName, "requests"),
RemoteStorageMetrics.REMOTE_DELETE_REQUESTS_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.REMOTE_DELETE_REQUESTS_PER_SEC_METRIC.getName, "requests"),
RemoteStorageMetrics.BUILD_REMOTE_LOG_AUX_STATE_REQUESTS_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.BUILD_REMOTE_LOG_AUX_STATE_REQUESTS_PER_SEC_METRIC.getName, "requests"),
RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName, "requests"),
RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName, "requests"),
RemoteStorageMetrics.FAILED_REMOTE_DELETE_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_DELETE_PER_SEC_METRIC.getName, "requests"),
RemoteStorageMetrics.FAILED_BUILD_REMOTE_LOG_AUX_STATE_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.FAILED_BUILD_REMOTE_LOG_AUX_STATE_PER_SEC_METRIC.getName, "requests")
).asJava)
metricGaugeTypeMap.putAll(Map(
RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName),
RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName),
RemoteStorageMetrics.REMOTE_DELETE_LAG_BYTES_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_DELETE_LAG_BYTES_METRIC.getName),
RemoteStorageMetrics.REMOTE_DELETE_LAG_SEGMENTS_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_DELETE_LAG_SEGMENTS_METRIC.getName),
RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName),
RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName),
RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName)
).asJava)
}
// used for testing only
def metricMap: Map[String, MeterWrapper] = metricTypeMap.toMap
def metricGaugeMap: Map[String, GaugeWrapper] = metricGaugeTypeMap.toMap
def messagesInRate: Meter = metricTypeMap.get(BrokerTopicStats.MessagesInPerSec).meter()
def bytesInRate: Meter = metricTypeMap.get(BrokerTopicStats.BytesInPerSec).meter()
def bytesOutRate: Meter = metricTypeMap.get(BrokerTopicStats.BytesOutPerSec).meter()
def bytesRejectedRate: Meter = metricTypeMap.get(BrokerTopicStats.BytesRejectedPerSec).meter()
private[server] def replicationBytesInRate: Option[Meter] =
if (name.isEmpty) Some(metricTypeMap.get(BrokerTopicStats.ReplicationBytesInPerSec).meter())
else None
private[server] def replicationBytesOutRate: Option[Meter] =
if (name.isEmpty) Some(metricTypeMap.get(BrokerTopicStats.ReplicationBytesOutPerSec).meter())
else None
private[server] def reassignmentBytesInPerSec: Option[Meter] =
if (name.isEmpty) Some(metricTypeMap.get(BrokerTopicStats.ReassignmentBytesInPerSec).meter())
else None
private[server] def reassignmentBytesOutPerSec: Option[Meter] =
if (name.isEmpty) Some(metricTypeMap.get(BrokerTopicStats.ReassignmentBytesOutPerSec).meter())
else None
def failedProduceRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.FailedProduceRequestsPerSec).meter()
def failedFetchRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.FailedFetchRequestsPerSec).meter()
def totalProduceRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.TotalProduceRequestsPerSec).meter()
def totalFetchRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.TotalFetchRequestsPerSec).meter()
def fetchMessageConversionsRate: Meter = metricTypeMap.get(BrokerTopicStats.FetchMessageConversionsPerSec).meter()
def produceMessageConversionsRate: Meter = metricTypeMap.get(BrokerTopicStats.ProduceMessageConversionsPerSec).meter()
def noKeyCompactedTopicRecordsPerSec: Meter = metricTypeMap.get(BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec).meter()
def invalidMagicNumberRecordsPerSec: Meter = metricTypeMap.get(BrokerTopicStats.InvalidMagicNumberRecordsPerSec).meter()
def invalidMessageCrcRecordsPerSec: Meter = metricTypeMap.get(BrokerTopicStats.InvalidMessageCrcRecordsPerSec).meter()
def invalidOffsetOrSequenceRecordsPerSec: Meter = metricTypeMap.get(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec).meter()
def remoteCopyLagBytesAggrMetric(): GaugeWrapper = {
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName)
}
// Visible for testing
def remoteCopyLagBytes: Long = remoteCopyLagBytesAggrMetric().value()
def remoteCopyLagSegmentsAggrMetric(): GaugeWrapper = {
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName)
}
// Visible for testing
def remoteCopyLagSegments: Long = remoteCopyLagSegmentsAggrMetric().value()
def remoteLogMetadataCountAggrMetric(): GaugeWrapper = {
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName)
}
def remoteLogMetadataCount: Long = remoteLogMetadataCountAggrMetric().value()
def remoteLogSizeBytesAggrMetric(): GaugeWrapper = {
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName)
}
def remoteLogSizeBytes: Long = remoteLogSizeBytesAggrMetric().value()
def remoteLogSizeComputationTimeAggrMetric(): GaugeWrapper = {
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName)
}
def remoteLogSizeComputationTime: Long = remoteLogSizeComputationTimeAggrMetric().value()
def remoteDeleteLagBytesAggrMetric(): GaugeWrapper = {
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_DELETE_LAG_BYTES_METRIC.getName)
}
// Visible for testing
def remoteDeleteLagBytes: Long = remoteDeleteLagBytesAggrMetric().value()
def remoteDeleteLagSegmentsAggrMetric(): GaugeWrapper = {
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_DELETE_LAG_SEGMENTS_METRIC.getName)
}
// Visible for testing
def remoteDeleteLagSegments: Long = remoteDeleteLagSegmentsAggrMetric().value()
def remoteCopyBytesRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName).meter()
def remoteFetchBytesRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName).meter()
def remoteFetchRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName).meter()
def remoteCopyRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName).meter()
def remoteDeleteRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_DELETE_REQUESTS_PER_SEC_METRIC.getName).meter()
def buildRemoteLogAuxStateRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.BUILD_REMOTE_LOG_AUX_STATE_REQUESTS_PER_SEC_METRIC.getName).meter()
def failedRemoteFetchRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName).meter()
def failedRemoteCopyRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName).meter()
def failedRemoteDeleteRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.FAILED_REMOTE_DELETE_PER_SEC_METRIC.getName).meter()
def failedBuildRemoteLogAuxStateRate: Meter = metricTypeMap.get(RemoteStorageMetrics.FAILED_BUILD_REMOTE_LOG_AUX_STATE_PER_SEC_METRIC.getName).meter()
def closeMetric(metricType: String): Unit = {
val meter = metricTypeMap.get(metricType)
if (meter != null)
meter.close()
val gauge = metricGaugeTypeMap.get(metricType)
if (gauge != null)
gauge.close()
}
def close(): Unit = {
metricTypeMap.values.foreach(_.close())
metricGaugeTypeMap.values.foreach(_.close())
}
}
object BrokerTopicStats {
val MessagesInPerSec = "MessagesInPerSec"
val BytesInPerSec = "BytesInPerSec"
val BytesOutPerSec = "BytesOutPerSec"
val BytesRejectedPerSec = "BytesRejectedPerSec"
val ReplicationBytesInPerSec = "ReplicationBytesInPerSec"
val ReplicationBytesOutPerSec = "ReplicationBytesOutPerSec"
val FailedProduceRequestsPerSec = "FailedProduceRequestsPerSec"
val FailedFetchRequestsPerSec = "FailedFetchRequestsPerSec"
val TotalProduceRequestsPerSec = "TotalProduceRequestsPerSec"
val TotalFetchRequestsPerSec = "TotalFetchRequestsPerSec"
val FetchMessageConversionsPerSec = "FetchMessageConversionsPerSec"
val ProduceMessageConversionsPerSec = "ProduceMessageConversionsPerSec"
val ReassignmentBytesInPerSec = "ReassignmentBytesInPerSec"
val ReassignmentBytesOutPerSec = "ReassignmentBytesOutPerSec"
// These following topics are for LogValidator for better debugging on failed records
val NoKeyCompactedTopicRecordsPerSec = "NoKeyCompactedTopicRecordsPerSec"
val InvalidMagicNumberRecordsPerSec = "InvalidMagicNumberRecordsPerSec"
val InvalidMessageCrcRecordsPerSec = "InvalidMessageCrcRecordsPerSec"
val InvalidOffsetOrSequenceRecordsPerSec = "InvalidOffsetOrSequenceRecordsPerSec"
}
class BrokerTopicStats(remoteStorageEnabled: Boolean = false) extends Logging { class BrokerTopicStats(remoteStorageEnabled: Boolean = false) extends Logging {
private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k), remoteStorageEnabled) private val valueFactory = (k: String) => new BrokerTopicMetrics(k, remoteStorageEnabled)
private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory)) private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
val allTopicsStats = new BrokerTopicMetrics(None, remoteStorageEnabled) val allTopicsStats = new BrokerTopicMetrics(remoteStorageEnabled)
def isTopicStatsExisted(topic: String): Boolean = def isTopicStatsExisted(topic: String): Boolean =
stats.contains(topic) stats.contains(topic)
@ -532,25 +258,25 @@ class BrokerTopicStats(remoteStorageEnabled: Boolean = false) extends Logging {
stats.getAndMaybePut(topic) stats.getAndMaybePut(topic)
def updateReplicationBytesIn(value: Long): Unit = { def updateReplicationBytesIn(value: Long): Unit = {
allTopicsStats.replicationBytesInRate.foreach { metric => allTopicsStats.replicationBytesInRate.ifPresent { metric =>
metric.mark(value) metric.mark(value)
} }
} }
private def updateReplicationBytesOut(value: Long): Unit = { private def updateReplicationBytesOut(value: Long): Unit = {
allTopicsStats.replicationBytesOutRate.foreach { metric => allTopicsStats.replicationBytesOutRate.ifPresent { metric =>
metric.mark(value) metric.mark(value)
} }
} }
def updateReassignmentBytesIn(value: Long): Unit = { def updateReassignmentBytesIn(value: Long): Unit = {
allTopicsStats.reassignmentBytesInPerSec.foreach { metric => allTopicsStats.reassignmentBytesInPerSec.ifPresent { metric =>
metric.mark(value) metric.mark(value)
} }
} }
private def updateReassignmentBytesOut(value: Long): Unit = { private def updateReassignmentBytesOut(value: Long): Unit = {
allTopicsStats.reassignmentBytesOutPerSec.foreach { metric => allTopicsStats.reassignmentBytesOutPerSec.ifPresent { metric =>
metric.mark(value) metric.mark(value)
} }
} }
@ -559,14 +285,14 @@ class BrokerTopicStats(remoteStorageEnabled: Boolean = false) extends Logging {
def removeOldLeaderMetrics(topic: String): Unit = { def removeOldLeaderMetrics(topic: String): Unit = {
val topicMetrics = topicStats(topic) val topicMetrics = topicStats(topic)
if (topicMetrics != null) { if (topicMetrics != null) {
topicMetrics.closeMetric(BrokerTopicStats.MessagesInPerSec) topicMetrics.closeMetric(BrokerTopicMetrics.MESSAGE_IN_PER_SEC)
topicMetrics.closeMetric(BrokerTopicStats.BytesInPerSec) topicMetrics.closeMetric(BrokerTopicMetrics.BYTES_IN_PER_SEC)
topicMetrics.closeMetric(BrokerTopicStats.BytesRejectedPerSec) topicMetrics.closeMetric(BrokerTopicMetrics.BYTES_REJECTED_PER_SEC)
topicMetrics.closeMetric(BrokerTopicStats.FailedProduceRequestsPerSec) topicMetrics.closeMetric(BrokerTopicMetrics.FAILED_PRODUCE_REQUESTS_PER_SEC)
topicMetrics.closeMetric(BrokerTopicStats.TotalProduceRequestsPerSec) topicMetrics.closeMetric(BrokerTopicMetrics.TOTAL_PRODUCE_REQUESTS_PER_SEC)
topicMetrics.closeMetric(BrokerTopicStats.ProduceMessageConversionsPerSec) topicMetrics.closeMetric(BrokerTopicMetrics.PRODUCE_MESSAGE_CONVERSIONS_PER_SEC)
topicMetrics.closeMetric(BrokerTopicStats.ReplicationBytesOutPerSec) topicMetrics.closeMetric(BrokerTopicMetrics.REPLICATION_BYTES_OUT_PER_SEC)
topicMetrics.closeMetric(BrokerTopicStats.ReassignmentBytesOutPerSec) topicMetrics.closeMetric(BrokerTopicMetrics.REASSIGNMENT_BYTES_OUT_PER_SEC)
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName) topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName) topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName) topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName)
@ -592,8 +318,8 @@ class BrokerTopicStats(remoteStorageEnabled: Boolean = false) extends Logging {
def removeOldFollowerMetrics(topic: String): Unit = { def removeOldFollowerMetrics(topic: String): Unit = {
val topicMetrics = topicStats(topic) val topicMetrics = topicStats(topic)
if (topicMetrics != null) { if (topicMetrics != null) {
topicMetrics.closeMetric(BrokerTopicStats.ReplicationBytesInPerSec) topicMetrics.closeMetric(BrokerTopicMetrics.REPLICATION_BYTES_IN_PER_SEC)
topicMetrics.closeMetric(BrokerTopicStats.ReassignmentBytesInPerSec) topicMetrics.closeMetric(BrokerTopicMetrics.REASSIGNMENT_BYTES_IN_PER_SEC)
} }
} }

View File

@ -27,6 +27,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.{BufferSupplier, MockTime, Time} import org.apache.kafka.common.utils.{BufferSupplier, MockTime, Time}
import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics
import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
@ -210,19 +211,19 @@ class KafkaRequestHandlerTest {
RemoteStorageMetrics.brokerTopicStatsMetrics.forEach(metric => { RemoteStorageMetrics.brokerTopicStatsMetrics.forEach(metric => {
if (systemRemoteStorageEnabled) { if (systemRemoteStorageEnabled) {
if (!gaugeMetrics.contains(metric.getName)) { if (!gaugeMetrics.contains(metric.getName)) {
assertTrue(brokerTopicMetrics.metricMap.contains(metric.getName), "the metric is missing: " + metric.getName) assertTrue(brokerTopicMetrics.metricMapKeySet().contains(metric.getName), "the metric is missing: " + metric.getName)
} else { } else {
assertFalse(brokerTopicMetrics.metricMap.contains(metric.getName), "the metric should not appear: " + metric.getName) assertFalse(brokerTopicMetrics.metricMapKeySet().contains(metric.getName), "the metric should not appear: " + metric.getName)
} }
} else { } else {
assertFalse(brokerTopicMetrics.metricMap.contains(metric.getName)) assertFalse(brokerTopicMetrics.metricMapKeySet().contains(metric.getName))
} }
}) })
gaugeMetrics.foreach(metricName => { gaugeMetrics.foreach(metricName => {
if (systemRemoteStorageEnabled) { if (systemRemoteStorageEnabled) {
assertTrue(brokerTopicMetrics.metricGaugeMap.contains(metricName), "The metric is missing:" + metricName) assertTrue(brokerTopicMetrics.metricGaugeMap.containsKey(metricName), "The metric is missing:" + metricName)
} else { } else {
assertFalse(brokerTopicMetrics.metricGaugeMap.contains(metricName), "The metric should appear:" + metricName) assertFalse(brokerTopicMetrics.metricGaugeMap.containsKey(metricName), "The metric should appear:" + metricName)
} }
}) })
} }
@ -241,7 +242,7 @@ class KafkaRequestHandlerTest {
def setupBrokerTopicMetrics(systemRemoteStorageEnabled: Boolean = true): BrokerTopicMetrics = { def setupBrokerTopicMetrics(systemRemoteStorageEnabled: Boolean = true): BrokerTopicMetrics = {
val topic = "topic" val topic = "topic"
new BrokerTopicMetrics(Option.apply(topic), systemRemoteStorageEnabled) new BrokerTopicMetrics(topic, systemRemoteStorageEnabled)
} }
@ParameterizedTest @ParameterizedTest
@ -259,8 +260,8 @@ class KafkaRequestHandlerTest {
brokerTopicStats.recordRemoteCopyLagBytes(topic2, 0, 100) brokerTopicStats.recordRemoteCopyLagBytes(topic2, 0, 100)
assertEquals(600, brokerTopicStats.allTopicsStats.remoteCopyLagBytes) assertEquals(600, brokerTopicStats.allTopicsStats.remoteCopyLagBytes)
} else { } else {
assertEquals(None, brokerTopicMetrics.metricGaugeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName)) assertEquals(null, brokerTopicMetrics.metricGaugeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName))
assertEquals(None, brokerTopicStats.allTopicsStats.metricGaugeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName)) assertEquals(null, brokerTopicStats.allTopicsStats.metricGaugeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName))
} }
} }
@ -599,7 +600,7 @@ class KafkaRequestHandlerTest {
brokerTopicStats.recordRemoteLogSizeBytes(topic2, 0, 100) brokerTopicStats.recordRemoteLogSizeBytes(topic2, 0, 100)
assertEquals(600, brokerTopicStats.allTopicsStats.remoteLogSizeBytes) assertEquals(600, brokerTopicStats.allTopicsStats.remoteLogSizeBytes)
} else { } else {
assertEquals(None, brokerTopicMetrics.metricGaugeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName)) assertEquals(null, brokerTopicMetrics.metricGaugeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName))
} }
} }

View File

@ -30,6 +30,7 @@ import org.apache.kafka.storage.internals.log.LogValidator.ValidationResult
import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.MockTime import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogValidator, RecordValidationException} import org.apache.kafka.storage.internals.log.{AppendOrigin, LogValidator, RecordValidationException}
import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics
import org.apache.kafka.test.TestUtils import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
@ -110,8 +111,8 @@ class LogValidatorTest {
assertThrows(classOf[RecordValidationException], assertThrows(classOf[RecordValidationException],
() => validateMessages(recordsWithInvalidInnerMagic(batchMagic, recordMagic, compression), batchMagic, compression.`type`(), compression) () => validateMessages(recordsWithInvalidInnerMagic(batchMagic, recordMagic, compression), batchMagic, compression.`type`(), compression)
) )
assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicStats.InvalidMagicNumberRecordsPerSec}")), 1) assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicMetrics.INVALID_MAGIC_NUMBER_RECORDS_PER_SEC}")), 1)
assertTrue(meterCount(s"${BrokerTopicStats.InvalidMagicNumberRecordsPerSec}") > 0) assertTrue(meterCount(s"${BrokerTopicMetrics.INVALID_MAGIC_NUMBER_RECORDS_PER_SEC}") > 0)
} }
private def validateMessages(records: MemoryRecords, private def validateMessages(records: MemoryRecords,
@ -732,8 +733,8 @@ class LogValidatorTest {
) )
) )
assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicStats.InvalidMessageCrcRecordsPerSec}")), 1) assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicMetrics.INVALID_MESSAGE_CRC_RECORDS_PER_SEC}")), 1)
assertTrue(meterCount(s"${BrokerTopicStats.InvalidMessageCrcRecordsPerSec}") > 0) assertTrue(meterCount(s"${BrokerTopicMetrics.INVALID_MESSAGE_CRC_RECORDS_PER_SEC}") > 0)
} }
@ -1421,8 +1422,8 @@ class LogValidatorTest {
).validateMessagesAndAssignOffsets( ).validateMessagesAndAssignOffsets(
PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
)) ))
assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec}")), 1) assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicMetrics.INVALID_OFFSET_OR_SEQUENCE_RECORDS_PER_SEC}")), 1)
assertTrue(meterCount(s"${BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec}") > 0) assertTrue(meterCount(s"${BrokerTopicMetrics.INVALID_OFFSET_OR_SEQUENCE_RECORDS_PER_SEC}") > 0)
} }
@Test @Test

View File

@ -40,6 +40,7 @@ import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile,
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, VerificationGuard} import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, VerificationGuard}
import org.apache.kafka.storage.internals.utils.Throttler import org.apache.kafka.storage.internals.utils.Throttler
import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
@ -1859,8 +1860,8 @@ class UnifiedLogTest {
assertTrue(e.recordErrors.get(0).message.startsWith(errorMsgPrefix)) assertTrue(e.recordErrors.get(0).message.startsWith(errorMsgPrefix))
// check if metric for NoKeyCompactedTopicRecordsPerSec is logged // check if metric for NoKeyCompactedTopicRecordsPerSec is logged
assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec}")), 1) assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicMetrics.NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC}")), 1)
assertTrue(TestUtils.meterCount(s"${BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec}") > 0) assertTrue(TestUtils.meterCount(s"${BrokerTopicMetrics.NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC}") > 0)
// the following should succeed without any InvalidMessageException // the following should succeed without any InvalidMessageException
log.appendAsLeader(messageSetWithKeyedMessage, leaderEpoch = 0) log.appendAsLeader(messageSetWithKeyedMessage, leaderEpoch = 0)

View File

@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.migration.ZkMigrationState import org.apache.kafka.metadata.migration.ZkMigrationState
import org.apache.kafka.server.config.ServerLogConfigs import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector} import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics
import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.Timeout
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource import org.junit.jupiter.params.provider.ValueSource
@ -147,7 +148,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
// The broker metrics for all topics should be greedily registered // The broker metrics for all topics should be greedily registered
assertTrue(topicMetrics(None).nonEmpty, "General topic metrics don't exist") assertTrue(topicMetrics(None).nonEmpty, "General topic metrics don't exist")
assertEquals(brokers.head.brokerTopicStats.allTopicsStats.metricMap.size, topicMetrics(None).size) assertEquals(brokers.head.brokerTopicStats.allTopicsStats.metricMapKeySet().size, topicMetrics(None).size)
assertEquals(0, brokers.head.brokerTopicStats.allTopicsStats.metricGaugeMap.size) assertEquals(0, brokers.head.brokerTopicStats.allTopicsStats.metricGaugeMap.size)
// topic metrics should be lazily registered // topic metrics should be lazily registered
assertTrue(topicMetricGroups(topic).isEmpty, "Topic metrics aren't lazily registered") assertTrue(topicMetricGroups(topic).isEmpty, "Topic metrics aren't lazily registered")
@ -169,10 +170,10 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
@ValueSource(strings = Array("zk", "kraft")) @ValueSource(strings = Array("zk", "kraft"))
def testBrokerTopicMetricsBytesInOut(quorum: String): Unit = { def testBrokerTopicMetricsBytesInOut(quorum: String): Unit = {
val topic = "test-bytes-in-out" val topic = "test-bytes-in-out"
val replicationBytesIn = BrokerTopicStats.ReplicationBytesInPerSec val replicationBytesIn = BrokerTopicMetrics.REPLICATION_BYTES_IN_PER_SEC
val replicationBytesOut = BrokerTopicStats.ReplicationBytesOutPerSec val replicationBytesOut = BrokerTopicMetrics.REPLICATION_BYTES_OUT_PER_SEC
val bytesIn = s"${BrokerTopicStats.BytesInPerSec},topic=$topic" val bytesIn = s"${BrokerTopicMetrics.BYTES_IN_PER_SEC},topic=$topic"
val bytesOut = s"${BrokerTopicStats.BytesOutPerSec},topic=$topic" val bytesOut = s"${BrokerTopicMetrics.BYTES_OUT_PER_SEC},topic=$topic"
val topicConfig = new Properties val topicConfig = new Properties
topicConfig.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") topicConfig.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2")

View File

@ -68,7 +68,7 @@ class AbstractFetcherThreadTest {
fetcher.start() fetcher.start()
val brokerTopicStatsMetrics = fetcher.brokerTopicStats.allTopicsStats.metricMap.keySet val brokerTopicStatsMetrics = fetcher.brokerTopicStats.allTopicsStats.metricMapKeySet().asScala
val fetcherMetrics = Set(FetcherMetrics.BytesPerSec, FetcherMetrics.RequestsPerSec, FetcherMetrics.ConsumerLag) val fetcherMetrics = Set(FetcherMetrics.BytesPerSec, FetcherMetrics.RequestsPerSec, FetcherMetrics.ConsumerLag)
// wait until all fetcher metrics are present // wait until all fetcher metrics are present

View File

@ -28,6 +28,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse} import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.server.config.ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG import org.apache.kafka.server.config.ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG
import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
@ -168,7 +169,7 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest {
val fetchRequest = "request=Fetch" val fetchRequest = "request=Fetch"
val fetchTemporaryMemoryBytesMetricName = s"$TemporaryMemoryBytes,$fetchRequest" val fetchTemporaryMemoryBytesMetricName = s"$TemporaryMemoryBytes,$fetchRequest"
val fetchMessageConversionsTimeMsMetricName = s"$MessageConversionsTimeMs,$fetchRequest" val fetchMessageConversionsTimeMsMetricName = s"$MessageConversionsTimeMs,$fetchRequest"
val initialFetchMessageConversionsPerSec = TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec) val initialFetchMessageConversionsPerSec = TestUtils.metersCount(BrokerTopicMetrics.FETCH_MESSAGE_CONVERSIONS_PER_SEC)
val initialFetchMessageConversionsTimeMs = TestUtils.metersCount(fetchMessageConversionsTimeMsMetricName) val initialFetchMessageConversionsTimeMs = TestUtils.metersCount(fetchMessageConversionsTimeMsMetricName)
val initialFetchTemporaryMemoryBytes = TestUtils.metersCount(fetchTemporaryMemoryBytesMetricName) val initialFetchTemporaryMemoryBytes = TestUtils.metersCount(fetchTemporaryMemoryBytesMetricName)
val topicWithDownConversionEnabled = "foo" val topicWithDownConversionEnabled = "foo"
@ -224,9 +225,9 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest {
} }
def verifyMetrics(): Unit = { def verifyMetrics(): Unit = {
TestUtils.waitUntilTrue(() => TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec) > initialFetchMessageConversionsPerSec, TestUtils.waitUntilTrue(() => TestUtils.metersCount(BrokerTopicMetrics.FETCH_MESSAGE_CONVERSIONS_PER_SEC) > initialFetchMessageConversionsPerSec,
s"The `FetchMessageConversionsPerSec` metric count is not incremented after 5 seconds. " + s"The `FetchMessageConversionsPerSec` metric count is not incremented after 5 seconds. " +
s"init: $initialFetchMessageConversionsPerSec final: ${TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec)}", 5000) s"init: $initialFetchMessageConversionsPerSec final: ${TestUtils.metersCount(BrokerTopicMetrics.FETCH_MESSAGE_CONVERSIONS_PER_SEC)}", 5000)
TestUtils.waitUntilTrue(() => TestUtils.metersCount(fetchMessageConversionsTimeMsMetricName) > initialFetchMessageConversionsTimeMs, TestUtils.waitUntilTrue(() => TestUtils.metersCount(fetchMessageConversionsTimeMsMetricName) > initialFetchMessageConversionsTimeMs,
s"The `MessageConversionsTimeMs` in fetch request metric count is not incremented after 5 seconds. " + s"The `MessageConversionsTimeMs` in fetch request metric count is not incremented after 5 seconds. " +

View File

@ -29,6 +29,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse} import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource} import org.junit.jupiter.params.provider.{Arguments, MethodSource}
@ -204,8 +205,8 @@ class ProduceRequestTest extends BaseRequestTest {
assertEquals(Errors.CORRUPT_MESSAGE, Errors.forCode(partitionProduceResponse.errorCode)) assertEquals(Errors.CORRUPT_MESSAGE, Errors.forCode(partitionProduceResponse.errorCode))
assertEquals(-1, partitionProduceResponse.baseOffset) assertEquals(-1, partitionProduceResponse.baseOffset)
assertEquals(-1, partitionProduceResponse.logAppendTimeMs) assertEquals(-1, partitionProduceResponse.logAppendTimeMs)
assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicStats.InvalidMessageCrcRecordsPerSec}")), 1) assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicMetrics.INVALID_MESSAGE_CRC_RECORDS_PER_SEC}")), 1)
assertTrue(TestUtils.meterCount(s"${BrokerTopicStats.InvalidMessageCrcRecordsPerSec}") > 0) assertTrue(TestUtils.meterCount(s"${BrokerTopicMetrics.INVALID_MESSAGE_CRC_RECORDS_PER_SEC}") > 0)
} }
@ParameterizedTest @ParameterizedTest

View File

@ -33,10 +33,21 @@ import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class KafkaMetricsGroup { public class KafkaMetricsGroup {
private final Class<?> klass; private final String pkg;
private final String simpleName;
public KafkaMetricsGroup(Class<?> klass) { public KafkaMetricsGroup(Class<?> klass) {
this.klass = klass; this(klass.getPackage() == null ? "" : klass.getPackage().getName(), klass.getSimpleName().replaceAll("\\$$", ""));
}
/**
* This constructor allows caller to build metrics name with custom package and class name. This is useful to keep metrics
* compatibility in migrating scala code, since the difference of either package or class name will impact the mbean name and
* that will break the backward compatibility.
*/
public KafkaMetricsGroup(String packageName, String simpleName) {
this.pkg = packageName;
this.simpleName = simpleName;
} }
/** /**
@ -47,9 +58,7 @@ public class KafkaMetricsGroup {
* @return Sanitized metric name object. * @return Sanitized metric name object.
*/ */
public MetricName metricName(String name, Map<String, String> tags) { public MetricName metricName(String name, Map<String, String> tags) {
String pkg = klass.getPackage() == null ? "" : klass.getPackage().getName(); return explicitMetricName(this.pkg, this.simpleName, name, tags);
String simpleName = klass.getSimpleName().replaceAll("\\$$", "");
return explicitMetricName(pkg, simpleName, name, tags);
} }
public static MetricName explicitMetricName(String group, String typeName, public static MetricName explicitMetricName(String group, String typeName,

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.metrics;
import com.yammer.metrics.core.MetricName;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class KafkaMetricsGroupTest {
@Test
public void testConstructorWithPackageAndSimpleName() {
String packageName = "testPackage";
String simpleName = "testSimple";
KafkaMetricsGroup group = new KafkaMetricsGroup(packageName, simpleName);
MetricName metricName = group.metricName("metric-name", Collections.emptyMap());
assertEquals(packageName, metricName.getGroup());
assertEquals(simpleName, metricName.getType());
}
}

View File

@ -0,0 +1,405 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.log.metrics;
import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import com.yammer.metrics.core.Meter;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BrokerTopicMetrics {
public static final String MESSAGE_IN_PER_SEC = "MessagesInPerSec";
public static final String BYTES_IN_PER_SEC = "BytesInPerSec";
public static final String BYTES_OUT_PER_SEC = "BytesOutPerSec";
public static final String BYTES_REJECTED_PER_SEC = "BytesRejectedPerSec";
public static final String REPLICATION_BYTES_IN_PER_SEC = "ReplicationBytesInPerSec";
public static final String REPLICATION_BYTES_OUT_PER_SEC = "ReplicationBytesOutPerSec";
public static final String FAILED_PRODUCE_REQUESTS_PER_SEC = "FailedProduceRequestsPerSec";
public static final String FAILED_FETCH_REQUESTS_PER_SEC = "FailedFetchRequestsPerSec";
public static final String TOTAL_PRODUCE_REQUESTS_PER_SEC = "TotalProduceRequestsPerSec";
public static final String TOTAL_FETCH_REQUESTS_PER_SEC = "TotalFetchRequestsPerSec";
public static final String FETCH_MESSAGE_CONVERSIONS_PER_SEC = "FetchMessageConversionsPerSec";
public static final String PRODUCE_MESSAGE_CONVERSIONS_PER_SEC = "ProduceMessageConversionsPerSec";
public static final String REASSIGNMENT_BYTES_IN_PER_SEC = "ReassignmentBytesInPerSec";
public static final String REASSIGNMENT_BYTES_OUT_PER_SEC = "ReassignmentBytesOutPerSec";
// These following topics are for LogValidator for better debugging on failed records
public static final String NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC = "NoKeyCompactedTopicRecordsPerSec";
public static final String INVALID_MAGIC_NUMBER_RECORDS_PER_SEC = "InvalidMagicNumberRecordsPerSec";
public static final String INVALID_MESSAGE_CRC_RECORDS_PER_SEC = "InvalidMessageCrcRecordsPerSec";
public static final String INVALID_OFFSET_OR_SEQUENCE_RECORDS_PER_SEC = "InvalidOffsetOrSequenceRecordsPerSec";
// KAFKA-16972: BrokerTopicMetrics is migrated from "kafka.server" package.
// For backward compatibility, we keep the old package name as metric group name.
private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", "BrokerTopicMetrics");
private final Map<String, String> tags;
private final Map<String, MeterWrapper> metricTypeMap = new java.util.HashMap<>();
private final Map<String, GaugeWrapper> metricGaugeTypeMap = new java.util.HashMap<>();
public BrokerTopicMetrics(boolean remoteStorageEnabled) {
this(Optional.empty(), remoteStorageEnabled);
}
public BrokerTopicMetrics(String name, boolean remoteStorageEnabled) {
this(Optional.of(name), remoteStorageEnabled);
}
private BrokerTopicMetrics(Optional<String> name, boolean remoteStorageEnabled) {
this.tags = name.map(s -> Collections.singletonMap("topic", s)).orElse(Collections.emptyMap());
metricTypeMap.put(MESSAGE_IN_PER_SEC, new MeterWrapper(MESSAGE_IN_PER_SEC, "messages"));
metricTypeMap.put(BYTES_IN_PER_SEC, new MeterWrapper(BYTES_IN_PER_SEC, "bytes"));
metricTypeMap.put(BYTES_OUT_PER_SEC, new MeterWrapper(BYTES_OUT_PER_SEC, "bytes"));
metricTypeMap.put(BYTES_REJECTED_PER_SEC, new MeterWrapper(BYTES_REJECTED_PER_SEC, "bytes"));
metricTypeMap.put(FAILED_PRODUCE_REQUESTS_PER_SEC, new MeterWrapper(FAILED_PRODUCE_REQUESTS_PER_SEC, "requests"));
metricTypeMap.put(FAILED_FETCH_REQUESTS_PER_SEC, new MeterWrapper(FAILED_FETCH_REQUESTS_PER_SEC, "requests"));
metricTypeMap.put(TOTAL_PRODUCE_REQUESTS_PER_SEC, new MeterWrapper(TOTAL_PRODUCE_REQUESTS_PER_SEC, "requests"));
metricTypeMap.put(TOTAL_FETCH_REQUESTS_PER_SEC, new MeterWrapper(TOTAL_FETCH_REQUESTS_PER_SEC, "requests"));
metricTypeMap.put(FETCH_MESSAGE_CONVERSIONS_PER_SEC, new MeterWrapper(FETCH_MESSAGE_CONVERSIONS_PER_SEC, "requests"));
metricTypeMap.put(PRODUCE_MESSAGE_CONVERSIONS_PER_SEC, new MeterWrapper(PRODUCE_MESSAGE_CONVERSIONS_PER_SEC, "requests"));
metricTypeMap.put(NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC, new MeterWrapper(NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC, "requests"));
metricTypeMap.put(INVALID_MAGIC_NUMBER_RECORDS_PER_SEC, new MeterWrapper(INVALID_MAGIC_NUMBER_RECORDS_PER_SEC, "requests"));
metricTypeMap.put(INVALID_MESSAGE_CRC_RECORDS_PER_SEC, new MeterWrapper(INVALID_MESSAGE_CRC_RECORDS_PER_SEC, "requests"));
metricTypeMap.put(INVALID_OFFSET_OR_SEQUENCE_RECORDS_PER_SEC, new MeterWrapper(INVALID_OFFSET_OR_SEQUENCE_RECORDS_PER_SEC, "requests"));
if (!name.isPresent()) {
metricTypeMap.put(REPLICATION_BYTES_IN_PER_SEC, new MeterWrapper(REPLICATION_BYTES_IN_PER_SEC, "bytes"));
metricTypeMap.put(REPLICATION_BYTES_OUT_PER_SEC, new MeterWrapper(REPLICATION_BYTES_OUT_PER_SEC, "bytes"));
metricTypeMap.put(REASSIGNMENT_BYTES_IN_PER_SEC, new MeterWrapper(REASSIGNMENT_BYTES_IN_PER_SEC, "bytes"));
metricTypeMap.put(REASSIGNMENT_BYTES_OUT_PER_SEC, new MeterWrapper(REASSIGNMENT_BYTES_OUT_PER_SEC, "bytes"));
}
if (remoteStorageEnabled) {
metricTypeMap.put(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName(), new MeterWrapper(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName(), "bytes"));
metricTypeMap.put(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName(), new MeterWrapper(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName(), "bytes"));
metricTypeMap.put(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName(), new MeterWrapper(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName(), "requests"));
metricTypeMap.put(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName(), new MeterWrapper(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName(), "requests"));
metricTypeMap.put(RemoteStorageMetrics.REMOTE_DELETE_REQUESTS_PER_SEC_METRIC.getName(), new MeterWrapper(RemoteStorageMetrics.REMOTE_DELETE_REQUESTS_PER_SEC_METRIC.getName(), "requests"));
metricTypeMap.put(RemoteStorageMetrics.BUILD_REMOTE_LOG_AUX_STATE_REQUESTS_PER_SEC_METRIC.getName(), new MeterWrapper(RemoteStorageMetrics.BUILD_REMOTE_LOG_AUX_STATE_REQUESTS_PER_SEC_METRIC.getName(), "requests"));
metricTypeMap.put(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName(), new MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName(), "requests"));
metricTypeMap.put(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName(), new MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName(), "requests"));
metricTypeMap.put(RemoteStorageMetrics.FAILED_REMOTE_DELETE_PER_SEC_METRIC.getName(), new MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_DELETE_PER_SEC_METRIC.getName(), "requests"));
metricTypeMap.put(RemoteStorageMetrics.FAILED_BUILD_REMOTE_LOG_AUX_STATE_PER_SEC_METRIC.getName(), new MeterWrapper(RemoteStorageMetrics.FAILED_BUILD_REMOTE_LOG_AUX_STATE_PER_SEC_METRIC.getName(), "requests"));
metricGaugeTypeMap.put(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName(), new GaugeWrapper(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName()));
metricGaugeTypeMap.put(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName(), new GaugeWrapper(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName()));
metricGaugeTypeMap.put(RemoteStorageMetrics.REMOTE_DELETE_LAG_BYTES_METRIC.getName(), new GaugeWrapper(RemoteStorageMetrics.REMOTE_DELETE_LAG_BYTES_METRIC.getName()));
metricGaugeTypeMap.put(RemoteStorageMetrics.REMOTE_DELETE_LAG_SEGMENTS_METRIC.getName(), new GaugeWrapper(RemoteStorageMetrics.REMOTE_DELETE_LAG_SEGMENTS_METRIC.getName()));
metricGaugeTypeMap.put(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName(), new GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName()));
metricGaugeTypeMap.put(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName(), new GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName()));
metricGaugeTypeMap.put(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName(), new GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName()));
}
}
public void closeMetric(String metricName) {
MeterWrapper mw = metricTypeMap.get(metricName);
if (mw != null) mw.close();
GaugeWrapper mg = metricGaugeTypeMap.get(metricName);
if (mg != null) mg.close();
}
public void close() {
metricTypeMap.values().forEach(MeterWrapper::close);
metricGaugeTypeMap.values().forEach(GaugeWrapper::close);
}
// used for testing only
public Set<String> metricMapKeySet() {
return metricTypeMap.keySet();
}
public Map<String, GaugeWrapper> metricGaugeMap() {
return metricGaugeTypeMap;
}
public Meter messagesInRate() {
return metricTypeMap.get(MESSAGE_IN_PER_SEC).meter();
}
public Meter bytesInRate() {
return metricTypeMap.get(BYTES_IN_PER_SEC).meter();
}
public Meter bytesOutRate() {
return metricTypeMap.get(BYTES_OUT_PER_SEC).meter();
}
public Meter bytesRejectedRate() {
return metricTypeMap.get(BYTES_REJECTED_PER_SEC).meter();
}
public Optional<Meter> replicationBytesInRate() {
if (tags.isEmpty()) {
return Optional.of(metricTypeMap.get(REPLICATION_BYTES_IN_PER_SEC).meter());
} else {
return Optional.empty();
}
}
public Optional<Meter> replicationBytesOutRate() {
if (tags.isEmpty()) {
return Optional.of(metricTypeMap.get(REPLICATION_BYTES_OUT_PER_SEC).meter());
} else {
return Optional.empty();
}
}
public Optional<Meter> reassignmentBytesInPerSec() {
if (tags.isEmpty()) {
return Optional.of(metricTypeMap.get(REASSIGNMENT_BYTES_IN_PER_SEC).meter());
} else {
return Optional.empty();
}
}
public Optional<Meter> reassignmentBytesOutPerSec() {
if (tags.isEmpty()) {
return Optional.of(metricTypeMap.get(REASSIGNMENT_BYTES_OUT_PER_SEC).meter());
} else {
return Optional.empty();
}
}
public Meter failedProduceRequestRate() {
return metricTypeMap.get(FAILED_PRODUCE_REQUESTS_PER_SEC).meter();
}
public Meter failedFetchRequestRate() {
return metricTypeMap.get(FAILED_FETCH_REQUESTS_PER_SEC).meter();
}
public Meter totalProduceRequestRate() {
return metricTypeMap.get(TOTAL_PRODUCE_REQUESTS_PER_SEC).meter();
}
public Meter totalFetchRequestRate() {
return metricTypeMap.get(TOTAL_FETCH_REQUESTS_PER_SEC).meter();
}
public Meter fetchMessageConversionsRate() {
return metricTypeMap.get(FETCH_MESSAGE_CONVERSIONS_PER_SEC).meter();
}
public Meter produceMessageConversionsRate() {
return metricTypeMap.get(PRODUCE_MESSAGE_CONVERSIONS_PER_SEC).meter();
}
public Meter noKeyCompactedTopicRecordsPerSec() {
return metricTypeMap.get(NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC).meter();
}
public Meter invalidMagicNumberRecordsPerSec() {
return metricTypeMap.get(INVALID_MAGIC_NUMBER_RECORDS_PER_SEC).meter();
}
public Meter invalidMessageCrcRecordsPerSec() {
return metricTypeMap.get(INVALID_MESSAGE_CRC_RECORDS_PER_SEC).meter();
}
public Meter invalidOffsetOrSequenceRecordsPerSec() {
return metricTypeMap.get(INVALID_OFFSET_OR_SEQUENCE_RECORDS_PER_SEC).meter();
}
public GaugeWrapper remoteCopyLagBytesAggrMetric() {
return metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName());
}
// Visible for testing
public long remoteCopyLagBytes() {
return remoteCopyLagBytesAggrMetric().value();
}
public GaugeWrapper remoteCopyLagSegmentsAggrMetric() {
return metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName());
}
// Visible for testing
public long remoteCopyLagSegments() {
return remoteCopyLagSegmentsAggrMetric().value();
}
public GaugeWrapper remoteLogMetadataCountAggrMetric() {
return metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName());
}
public long remoteLogMetadataCount() {
return remoteLogMetadataCountAggrMetric().value();
}
public GaugeWrapper remoteLogSizeBytesAggrMetric() {
return metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName());
}
public long remoteLogSizeBytes() {
return remoteLogSizeBytesAggrMetric().value();
}
public GaugeWrapper remoteLogSizeComputationTimeAggrMetric() {
return metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName());
}
public long remoteLogSizeComputationTime() {
return remoteLogSizeComputationTimeAggrMetric().value();
}
public GaugeWrapper remoteDeleteLagBytesAggrMetric() {
return metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_DELETE_LAG_BYTES_METRIC.getName());
}
// Visible for testing
public long remoteDeleteLagBytes() {
return remoteDeleteLagBytesAggrMetric().value();
}
public GaugeWrapper remoteDeleteLagSegmentsAggrMetric() {
return metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_DELETE_LAG_SEGMENTS_METRIC.getName());
}
// Visible for testing
public long remoteDeleteLagSegments() {
return remoteDeleteLagSegmentsAggrMetric().value();
}
public Meter remoteCopyBytesRate() {
return metricTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName()).meter();
}
public Meter remoteFetchBytesRate() {
return metricTypeMap.get(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName()).meter();
}
public Meter remoteFetchRequestRate() {
return metricTypeMap.get(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName()).meter();
}
public Meter remoteCopyRequestRate() {
return metricTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName()).meter();
}
public Meter remoteDeleteRequestRate() {
return metricTypeMap.get(RemoteStorageMetrics.REMOTE_DELETE_REQUESTS_PER_SEC_METRIC.getName()).meter();
}
public Meter buildRemoteLogAuxStateRequestRate() {
return metricTypeMap.get(RemoteStorageMetrics.BUILD_REMOTE_LOG_AUX_STATE_REQUESTS_PER_SEC_METRIC.getName()).meter();
}
public Meter failedRemoteFetchRequestRate() {
return metricTypeMap.get(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName()).meter();
}
public Meter failedRemoteCopyRequestRate() {
return metricTypeMap.get(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName()).meter();
}
public Meter failedRemoteDeleteRequestRate() {
return metricTypeMap.get(RemoteStorageMetrics.FAILED_REMOTE_DELETE_PER_SEC_METRIC.getName()).meter();
}
public Meter failedBuildRemoteLogAuxStateRate() {
return metricTypeMap.get(RemoteStorageMetrics.FAILED_BUILD_REMOTE_LOG_AUX_STATE_PER_SEC_METRIC.getName()).meter();
}
private class MeterWrapper {
private final String metricType;
private final String eventType;
private volatile Meter lazyMeter;
private final Lock meterLock = new ReentrantLock();
public MeterWrapper(String metricType, String eventType) {
this.metricType = metricType;
this.eventType = eventType;
if (tags.isEmpty()) {
meter(); // greedily initialize the general topic metrics
}
}
public Meter meter() {
Meter meter = lazyMeter;
if (meter == null) {
meterLock.lock();
try {
meter = lazyMeter;
if (meter == null) {
meter = metricsGroup.newMeter(metricType, eventType, TimeUnit.SECONDS, tags);
lazyMeter = meter;
}
} finally {
meterLock.unlock();
}
}
return meter;
}
public void close() {
meterLock.lock();
try {
if (lazyMeter != null) {
metricsGroup.removeMetric(metricType, tags);
lazyMeter = null;
}
} finally {
meterLock.unlock();
}
}
}
public class GaugeWrapper {
// The map to store:
// - per-partition value for topic-level metrics. The key will be the partition number
// - per-topic value for broker-level metrics. The key will be the topic name
private final ConcurrentHashMap<String, Long> metricValues = new ConcurrentHashMap<>();
private final String metricType;
public GaugeWrapper(String metricType) {
this.metricType = metricType;
newGaugeIfNeed();
}
public void setValue(String key, long value) {
newGaugeIfNeed();
metricValues.put(key, value);
}
public void removeKey(String key) {
newGaugeIfNeed();
metricValues.remove(key);
}
public void close() {
metricsGroup.removeMetric(metricType, tags);
metricValues.clear();
}
public long value() {
return metricValues.values().stream().mapToLong(v -> v).sum();
}
// metricsGroup uses ConcurrentMap to store gauges, so we don't need to use synchronized block here
private void newGaugeIfNeed() {
metricsGroup.newGauge(metricType, () -> metricValues.values().stream().mapToLong(v -> v).sum(), tags);
}
}
}