From 010ab19b724ae011e85686ce47320f4f85d9a11f Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Wed, 31 Jul 2024 03:07:09 +0800 Subject: [PATCH] KAFKA-16972 Move BrokerTopicMetrics to org.apache.kafka.storage.log.metrics (#16387) Reviewers: Chia-Ping Tsai --- checkstyle/import-control-storage.xml | 5 + .../src/main/scala/kafka/log/UnifiedLog.scala | 3 +- .../kafka/server/KafkaRequestHandler.scala | 310 +------------- .../server/KafkaRequestHandlerTest.scala | 19 +- .../unit/kafka/log/LogValidatorTest.scala | 13 +- .../scala/unit/kafka/log/UnifiedLogTest.scala | 5 +- .../unit/kafka/metrics/MetricsTest.scala | 11 +- .../server/AbstractFetcherThreadTest.scala | 2 +- ...FetchRequestDownConversionConfigTest.scala | 7 +- .../kafka/server/ProduceRequestTest.scala | 5 +- .../server/metrics/KafkaMetricsGroup.java | 19 +- .../server/metrics/KafkaMetricsGroupTest.java | 37 ++ .../log/metrics/BrokerTopicMetrics.java | 405 ++++++++++++++++++ 13 files changed, 515 insertions(+), 326 deletions(-) create mode 100644 server-common/src/test/java/org/apache/kafka/server/metrics/KafkaMetricsGroupTest.java create mode 100644 storage/src/main/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetrics.java diff --git a/checkstyle/import-control-storage.xml b/checkstyle/import-control-storage.xml index 445bed20162..18fd7938ad6 100644 --- a/checkstyle/import-control-storage.xml +++ b/checkstyle/import-control-storage.xml @@ -83,6 +83,11 @@ + + + + + diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index bef18806b0d..4a89ae4d56c 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -21,7 +21,7 @@ import com.yammer.metrics.core.MetricName import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException} import kafka.log.LocalLog.nextOption import kafka.log.remote.RemoteLogManager -import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, RequestLocal} +import kafka.server.{BrokerTopicStats, RequestLocal} import kafka.utils._ import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic @@ -42,6 +42,7 @@ import org.apache.kafka.server.util.Scheduler import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile} import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, VerificationGuard} +import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics import java.io.{File, IOException} import java.nio.file.{Files, Path} diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index 209db531718..6ef9f93180f 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -21,17 +21,16 @@ import kafka.network.RequestChannel import kafka.utils.{Exit, Logging, Pool} import kafka.server.KafkaRequestHandler.{threadCurrentRequest, threadRequestChannel} -import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit} +import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicInteger import com.yammer.metrics.core.Meter import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.utils.{KafkaThread, Time} import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics import org.apache.kafka.server.metrics.KafkaMetricsGroup +import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics -import java.util.Collections import scala.collection.mutable -import scala.jdk.CollectionConverters._ trait ApiRequestHandler { def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit @@ -246,284 +245,11 @@ class KafkaRequestHandlerPool( } } -class BrokerTopicMetrics(name: Option[String], remoteStorageEnabled: Boolean = false) { - private val metricsGroup = new KafkaMetricsGroup(this.getClass) - - val tags: java.util.Map[String, String] = name match { - case None => Collections.emptyMap() - case Some(topic) => Map("topic" -> topic).asJava - } - - case class MeterWrapper(metricType: String, eventType: String) { - @volatile private var lazyMeter: Meter = _ - private val meterLock = new Object - - def meter(): Meter = { - var meter = lazyMeter - if (meter == null) { - meterLock synchronized { - meter = lazyMeter - if (meter == null) { - meter = metricsGroup.newMeter(metricType, eventType, TimeUnit.SECONDS, tags) - lazyMeter = meter - } - } - } - meter - } - - def close(): Unit = meterLock synchronized { - if (lazyMeter != null) { - metricsGroup.removeMetric(metricType, tags) - lazyMeter = null - } - } - - if (tags.isEmpty) // greedily initialize the general topic metrics - meter() - } - - case class GaugeWrapper(metricType: String) { - // The map to store: - // - per-partition value for topic-level metrics. The key will be the partition number - // - per-topic value for broker-level metrics. The key will be the topic name - private val metricValues = new ConcurrentHashMap[String, Long]() - - def setValue(key: String, value: Long): Unit = { - newGaugeIfNeed() - metricValues.put(key, value) - } - - def removeKey(key: String): Unit = { - newGaugeIfNeed() - metricValues.remove(key) - } - - // metricsGroup uses ConcurrentMap to store gauges, so we don't need to use synchronized block here - def close(): Unit = { - metricsGroup.removeMetric(metricType, tags) - metricValues.clear() - } - - def value(): Long = metricValues.values().stream().mapToLong(v => v).sum() - - // metricsGroup uses ConcurrentMap to store gauges, so we don't need to use synchronized block here - private def newGaugeIfNeed(): Unit = { - metricsGroup.newGauge(metricType, () => value(), tags) - } - - newGaugeIfNeed() - } - - // an internal map for "lazy initialization" of certain metrics - private val metricTypeMap = new Pool[String, MeterWrapper]() - private val metricGaugeTypeMap = new Pool[String, GaugeWrapper]() - metricTypeMap.putAll(Map( - BrokerTopicStats.MessagesInPerSec -> MeterWrapper(BrokerTopicStats.MessagesInPerSec, "messages"), - BrokerTopicStats.BytesInPerSec -> MeterWrapper(BrokerTopicStats.BytesInPerSec, "bytes"), - BrokerTopicStats.BytesOutPerSec -> MeterWrapper(BrokerTopicStats.BytesOutPerSec, "bytes"), - BrokerTopicStats.BytesRejectedPerSec -> MeterWrapper(BrokerTopicStats.BytesRejectedPerSec, "bytes"), - BrokerTopicStats.FailedProduceRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedProduceRequestsPerSec, "requests"), - BrokerTopicStats.FailedFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedFetchRequestsPerSec, "requests"), - BrokerTopicStats.TotalProduceRequestsPerSec -> MeterWrapper(BrokerTopicStats.TotalProduceRequestsPerSec, "requests"), - BrokerTopicStats.TotalFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"), - BrokerTopicStats.FetchMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"), - BrokerTopicStats.ProduceMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests"), - BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec -> MeterWrapper(BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec, "requests"), - BrokerTopicStats.InvalidMagicNumberRecordsPerSec -> MeterWrapper(BrokerTopicStats.InvalidMagicNumberRecordsPerSec, "requests"), - BrokerTopicStats.InvalidMessageCrcRecordsPerSec -> MeterWrapper(BrokerTopicStats.InvalidMessageCrcRecordsPerSec, "requests"), - BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec -> MeterWrapper(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec, "requests") - ).asJava) - - if (name.isEmpty) { - metricTypeMap.put(BrokerTopicStats.ReplicationBytesInPerSec, MeterWrapper(BrokerTopicStats.ReplicationBytesInPerSec, "bytes")) - metricTypeMap.put(BrokerTopicStats.ReplicationBytesOutPerSec, MeterWrapper(BrokerTopicStats.ReplicationBytesOutPerSec, "bytes")) - metricTypeMap.put(BrokerTopicStats.ReassignmentBytesInPerSec, MeterWrapper(BrokerTopicStats.ReassignmentBytesInPerSec, "bytes")) - metricTypeMap.put(BrokerTopicStats.ReassignmentBytesOutPerSec, MeterWrapper(BrokerTopicStats.ReassignmentBytesOutPerSec, "bytes")) - } - - if (remoteStorageEnabled) { - metricTypeMap.putAll(Map( - RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName, "bytes"), - RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName, "bytes"), - RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName, "requests"), - RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName, "requests"), - RemoteStorageMetrics.REMOTE_DELETE_REQUESTS_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.REMOTE_DELETE_REQUESTS_PER_SEC_METRIC.getName, "requests"), - RemoteStorageMetrics.BUILD_REMOTE_LOG_AUX_STATE_REQUESTS_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.BUILD_REMOTE_LOG_AUX_STATE_REQUESTS_PER_SEC_METRIC.getName, "requests"), - RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName, "requests"), - RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName, "requests"), - RemoteStorageMetrics.FAILED_REMOTE_DELETE_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_DELETE_PER_SEC_METRIC.getName, "requests"), - RemoteStorageMetrics.FAILED_BUILD_REMOTE_LOG_AUX_STATE_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.FAILED_BUILD_REMOTE_LOG_AUX_STATE_PER_SEC_METRIC.getName, "requests") - ).asJava) - - metricGaugeTypeMap.putAll(Map( - RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName), - RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName), - RemoteStorageMetrics.REMOTE_DELETE_LAG_BYTES_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_DELETE_LAG_BYTES_METRIC.getName), - RemoteStorageMetrics.REMOTE_DELETE_LAG_SEGMENTS_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_DELETE_LAG_SEGMENTS_METRIC.getName), - RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName), - RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName), - RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName) - ).asJava) - } - - // used for testing only - def metricMap: Map[String, MeterWrapper] = metricTypeMap.toMap - - def metricGaugeMap: Map[String, GaugeWrapper] = metricGaugeTypeMap.toMap - - def messagesInRate: Meter = metricTypeMap.get(BrokerTopicStats.MessagesInPerSec).meter() - - def bytesInRate: Meter = metricTypeMap.get(BrokerTopicStats.BytesInPerSec).meter() - - def bytesOutRate: Meter = metricTypeMap.get(BrokerTopicStats.BytesOutPerSec).meter() - - def bytesRejectedRate: Meter = metricTypeMap.get(BrokerTopicStats.BytesRejectedPerSec).meter() - - private[server] def replicationBytesInRate: Option[Meter] = - if (name.isEmpty) Some(metricTypeMap.get(BrokerTopicStats.ReplicationBytesInPerSec).meter()) - else None - - private[server] def replicationBytesOutRate: Option[Meter] = - if (name.isEmpty) Some(metricTypeMap.get(BrokerTopicStats.ReplicationBytesOutPerSec).meter()) - else None - - private[server] def reassignmentBytesInPerSec: Option[Meter] = - if (name.isEmpty) Some(metricTypeMap.get(BrokerTopicStats.ReassignmentBytesInPerSec).meter()) - else None - - private[server] def reassignmentBytesOutPerSec: Option[Meter] = - if (name.isEmpty) Some(metricTypeMap.get(BrokerTopicStats.ReassignmentBytesOutPerSec).meter()) - else None - - def failedProduceRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.FailedProduceRequestsPerSec).meter() - - def failedFetchRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.FailedFetchRequestsPerSec).meter() - - def totalProduceRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.TotalProduceRequestsPerSec).meter() - - def totalFetchRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.TotalFetchRequestsPerSec).meter() - - def fetchMessageConversionsRate: Meter = metricTypeMap.get(BrokerTopicStats.FetchMessageConversionsPerSec).meter() - - def produceMessageConversionsRate: Meter = metricTypeMap.get(BrokerTopicStats.ProduceMessageConversionsPerSec).meter() - - def noKeyCompactedTopicRecordsPerSec: Meter = metricTypeMap.get(BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec).meter() - - def invalidMagicNumberRecordsPerSec: Meter = metricTypeMap.get(BrokerTopicStats.InvalidMagicNumberRecordsPerSec).meter() - - def invalidMessageCrcRecordsPerSec: Meter = metricTypeMap.get(BrokerTopicStats.InvalidMessageCrcRecordsPerSec).meter() - - def invalidOffsetOrSequenceRecordsPerSec: Meter = metricTypeMap.get(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec).meter() - - def remoteCopyLagBytesAggrMetric(): GaugeWrapper = { - metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName) - } - - // Visible for testing - def remoteCopyLagBytes: Long = remoteCopyLagBytesAggrMetric().value() - - def remoteCopyLagSegmentsAggrMetric(): GaugeWrapper = { - metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName) - } - - // Visible for testing - def remoteCopyLagSegments: Long = remoteCopyLagSegmentsAggrMetric().value() - - def remoteLogMetadataCountAggrMetric(): GaugeWrapper = { - metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName) - } - - def remoteLogMetadataCount: Long = remoteLogMetadataCountAggrMetric().value() - - def remoteLogSizeBytesAggrMetric(): GaugeWrapper = { - metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName) - } - - def remoteLogSizeBytes: Long = remoteLogSizeBytesAggrMetric().value() - - def remoteLogSizeComputationTimeAggrMetric(): GaugeWrapper = { - metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName) - } - - def remoteLogSizeComputationTime: Long = remoteLogSizeComputationTimeAggrMetric().value() - - def remoteDeleteLagBytesAggrMetric(): GaugeWrapper = { - metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_DELETE_LAG_BYTES_METRIC.getName) - } - - // Visible for testing - def remoteDeleteLagBytes: Long = remoteDeleteLagBytesAggrMetric().value() - - def remoteDeleteLagSegmentsAggrMetric(): GaugeWrapper = { - metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_DELETE_LAG_SEGMENTS_METRIC.getName) - } - - // Visible for testing - def remoteDeleteLagSegments: Long = remoteDeleteLagSegmentsAggrMetric().value() - - def remoteCopyBytesRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName).meter() - - def remoteFetchBytesRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName).meter() - - def remoteFetchRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName).meter() - - def remoteCopyRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName).meter() - - def remoteDeleteRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_DELETE_REQUESTS_PER_SEC_METRIC.getName).meter() - - def buildRemoteLogAuxStateRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.BUILD_REMOTE_LOG_AUX_STATE_REQUESTS_PER_SEC_METRIC.getName).meter() - - def failedRemoteFetchRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName).meter() - - def failedRemoteCopyRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName).meter() - - def failedRemoteDeleteRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.FAILED_REMOTE_DELETE_PER_SEC_METRIC.getName).meter() - - def failedBuildRemoteLogAuxStateRate: Meter = metricTypeMap.get(RemoteStorageMetrics.FAILED_BUILD_REMOTE_LOG_AUX_STATE_PER_SEC_METRIC.getName).meter() - - def closeMetric(metricType: String): Unit = { - val meter = metricTypeMap.get(metricType) - if (meter != null) - meter.close() - val gauge = metricGaugeTypeMap.get(metricType) - if (gauge != null) - gauge.close() - } - - def close(): Unit = { - metricTypeMap.values.foreach(_.close()) - metricGaugeTypeMap.values.foreach(_.close()) - } -} - -object BrokerTopicStats { - val MessagesInPerSec = "MessagesInPerSec" - val BytesInPerSec = "BytesInPerSec" - val BytesOutPerSec = "BytesOutPerSec" - val BytesRejectedPerSec = "BytesRejectedPerSec" - val ReplicationBytesInPerSec = "ReplicationBytesInPerSec" - val ReplicationBytesOutPerSec = "ReplicationBytesOutPerSec" - val FailedProduceRequestsPerSec = "FailedProduceRequestsPerSec" - val FailedFetchRequestsPerSec = "FailedFetchRequestsPerSec" - val TotalProduceRequestsPerSec = "TotalProduceRequestsPerSec" - val TotalFetchRequestsPerSec = "TotalFetchRequestsPerSec" - val FetchMessageConversionsPerSec = "FetchMessageConversionsPerSec" - val ProduceMessageConversionsPerSec = "ProduceMessageConversionsPerSec" - val ReassignmentBytesInPerSec = "ReassignmentBytesInPerSec" - val ReassignmentBytesOutPerSec = "ReassignmentBytesOutPerSec" - // These following topics are for LogValidator for better debugging on failed records - val NoKeyCompactedTopicRecordsPerSec = "NoKeyCompactedTopicRecordsPerSec" - val InvalidMagicNumberRecordsPerSec = "InvalidMagicNumberRecordsPerSec" - val InvalidMessageCrcRecordsPerSec = "InvalidMessageCrcRecordsPerSec" - val InvalidOffsetOrSequenceRecordsPerSec = "InvalidOffsetOrSequenceRecordsPerSec" -} - class BrokerTopicStats(remoteStorageEnabled: Boolean = false) extends Logging { - private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k), remoteStorageEnabled) + private val valueFactory = (k: String) => new BrokerTopicMetrics(k, remoteStorageEnabled) private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory)) - val allTopicsStats = new BrokerTopicMetrics(None, remoteStorageEnabled) + val allTopicsStats = new BrokerTopicMetrics(remoteStorageEnabled) def isTopicStatsExisted(topic: String): Boolean = stats.contains(topic) @@ -532,25 +258,25 @@ class BrokerTopicStats(remoteStorageEnabled: Boolean = false) extends Logging { stats.getAndMaybePut(topic) def updateReplicationBytesIn(value: Long): Unit = { - allTopicsStats.replicationBytesInRate.foreach { metric => + allTopicsStats.replicationBytesInRate.ifPresent { metric => metric.mark(value) } } private def updateReplicationBytesOut(value: Long): Unit = { - allTopicsStats.replicationBytesOutRate.foreach { metric => + allTopicsStats.replicationBytesOutRate.ifPresent { metric => metric.mark(value) } } def updateReassignmentBytesIn(value: Long): Unit = { - allTopicsStats.reassignmentBytesInPerSec.foreach { metric => + allTopicsStats.reassignmentBytesInPerSec.ifPresent { metric => metric.mark(value) } } private def updateReassignmentBytesOut(value: Long): Unit = { - allTopicsStats.reassignmentBytesOutPerSec.foreach { metric => + allTopicsStats.reassignmentBytesOutPerSec.ifPresent { metric => metric.mark(value) } } @@ -559,14 +285,14 @@ class BrokerTopicStats(remoteStorageEnabled: Boolean = false) extends Logging { def removeOldLeaderMetrics(topic: String): Unit = { val topicMetrics = topicStats(topic) if (topicMetrics != null) { - topicMetrics.closeMetric(BrokerTopicStats.MessagesInPerSec) - topicMetrics.closeMetric(BrokerTopicStats.BytesInPerSec) - topicMetrics.closeMetric(BrokerTopicStats.BytesRejectedPerSec) - topicMetrics.closeMetric(BrokerTopicStats.FailedProduceRequestsPerSec) - topicMetrics.closeMetric(BrokerTopicStats.TotalProduceRequestsPerSec) - topicMetrics.closeMetric(BrokerTopicStats.ProduceMessageConversionsPerSec) - topicMetrics.closeMetric(BrokerTopicStats.ReplicationBytesOutPerSec) - topicMetrics.closeMetric(BrokerTopicStats.ReassignmentBytesOutPerSec) + topicMetrics.closeMetric(BrokerTopicMetrics.MESSAGE_IN_PER_SEC) + topicMetrics.closeMetric(BrokerTopicMetrics.BYTES_IN_PER_SEC) + topicMetrics.closeMetric(BrokerTopicMetrics.BYTES_REJECTED_PER_SEC) + topicMetrics.closeMetric(BrokerTopicMetrics.FAILED_PRODUCE_REQUESTS_PER_SEC) + topicMetrics.closeMetric(BrokerTopicMetrics.TOTAL_PRODUCE_REQUESTS_PER_SEC) + topicMetrics.closeMetric(BrokerTopicMetrics.PRODUCE_MESSAGE_CONVERSIONS_PER_SEC) + topicMetrics.closeMetric(BrokerTopicMetrics.REPLICATION_BYTES_OUT_PER_SEC) + topicMetrics.closeMetric(BrokerTopicMetrics.REASSIGNMENT_BYTES_OUT_PER_SEC) topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName) topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName) topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName) @@ -592,8 +318,8 @@ class BrokerTopicStats(remoteStorageEnabled: Boolean = false) extends Logging { def removeOldFollowerMetrics(topic: String): Unit = { val topicMetrics = topicStats(topic) if (topicMetrics != null) { - topicMetrics.closeMetric(BrokerTopicStats.ReplicationBytesInPerSec) - topicMetrics.closeMetric(BrokerTopicStats.ReassignmentBytesInPerSec) + topicMetrics.closeMetric(BrokerTopicMetrics.REPLICATION_BYTES_IN_PER_SEC) + topicMetrics.closeMetric(BrokerTopicMetrics.REASSIGNMENT_BYTES_IN_PER_SEC) } } diff --git a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala index a71e8cc6a06..adcd8a13ef3 100644 --- a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala +++ b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala @@ -27,6 +27,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.utils.{BufferSupplier, MockTime, Time} import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics import org.apache.kafka.server.metrics.KafkaYammerMetrics +import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest @@ -210,19 +211,19 @@ class KafkaRequestHandlerTest { RemoteStorageMetrics.brokerTopicStatsMetrics.forEach(metric => { if (systemRemoteStorageEnabled) { if (!gaugeMetrics.contains(metric.getName)) { - assertTrue(brokerTopicMetrics.metricMap.contains(metric.getName), "the metric is missing: " + metric.getName) + assertTrue(brokerTopicMetrics.metricMapKeySet().contains(metric.getName), "the metric is missing: " + metric.getName) } else { - assertFalse(brokerTopicMetrics.metricMap.contains(metric.getName), "the metric should not appear: " + metric.getName) + assertFalse(brokerTopicMetrics.metricMapKeySet().contains(metric.getName), "the metric should not appear: " + metric.getName) } } else { - assertFalse(brokerTopicMetrics.metricMap.contains(metric.getName)) + assertFalse(brokerTopicMetrics.metricMapKeySet().contains(metric.getName)) } }) gaugeMetrics.foreach(metricName => { if (systemRemoteStorageEnabled) { - assertTrue(brokerTopicMetrics.metricGaugeMap.contains(metricName), "The metric is missing:" + metricName) + assertTrue(brokerTopicMetrics.metricGaugeMap.containsKey(metricName), "The metric is missing:" + metricName) } else { - assertFalse(brokerTopicMetrics.metricGaugeMap.contains(metricName), "The metric should appear:" + metricName) + assertFalse(brokerTopicMetrics.metricGaugeMap.containsKey(metricName), "The metric should appear:" + metricName) } }) } @@ -241,7 +242,7 @@ class KafkaRequestHandlerTest { def setupBrokerTopicMetrics(systemRemoteStorageEnabled: Boolean = true): BrokerTopicMetrics = { val topic = "topic" - new BrokerTopicMetrics(Option.apply(topic), systemRemoteStorageEnabled) + new BrokerTopicMetrics(topic, systemRemoteStorageEnabled) } @ParameterizedTest @@ -259,8 +260,8 @@ class KafkaRequestHandlerTest { brokerTopicStats.recordRemoteCopyLagBytes(topic2, 0, 100) assertEquals(600, brokerTopicStats.allTopicsStats.remoteCopyLagBytes) } else { - assertEquals(None, brokerTopicMetrics.metricGaugeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName)) - assertEquals(None, brokerTopicStats.allTopicsStats.metricGaugeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName)) + assertEquals(null, brokerTopicMetrics.metricGaugeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName)) + assertEquals(null, brokerTopicStats.allTopicsStats.metricGaugeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName)) } } @@ -599,7 +600,7 @@ class KafkaRequestHandlerTest { brokerTopicStats.recordRemoteLogSizeBytes(topic2, 0, 100) assertEquals(600, brokerTopicStats.allTopicsStats.remoteLogSizeBytes) } else { - assertEquals(None, brokerTopicMetrics.metricGaugeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName)) + assertEquals(null, brokerTopicMetrics.metricGaugeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName)) } } diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 61f2f2feb1e..ef80d5167a3 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -30,6 +30,7 @@ import org.apache.kafka.storage.internals.log.LogValidator.ValidationResult import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.util.MockTime import org.apache.kafka.storage.internals.log.{AppendOrigin, LogValidator, RecordValidationException} +import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics import org.apache.kafka.test.TestUtils import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test @@ -110,8 +111,8 @@ class LogValidatorTest { assertThrows(classOf[RecordValidationException], () => validateMessages(recordsWithInvalidInnerMagic(batchMagic, recordMagic, compression), batchMagic, compression.`type`(), compression) ) - assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicStats.InvalidMagicNumberRecordsPerSec}")), 1) - assertTrue(meterCount(s"${BrokerTopicStats.InvalidMagicNumberRecordsPerSec}") > 0) + assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicMetrics.INVALID_MAGIC_NUMBER_RECORDS_PER_SEC}")), 1) + assertTrue(meterCount(s"${BrokerTopicMetrics.INVALID_MAGIC_NUMBER_RECORDS_PER_SEC}") > 0) } private def validateMessages(records: MemoryRecords, @@ -732,8 +733,8 @@ class LogValidatorTest { ) ) - assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicStats.InvalidMessageCrcRecordsPerSec}")), 1) - assertTrue(meterCount(s"${BrokerTopicStats.InvalidMessageCrcRecordsPerSec}") > 0) + assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicMetrics.INVALID_MESSAGE_CRC_RECORDS_PER_SEC}")), 1) + assertTrue(meterCount(s"${BrokerTopicMetrics.INVALID_MESSAGE_CRC_RECORDS_PER_SEC}") > 0) } @@ -1421,8 +1422,8 @@ class LogValidatorTest { ).validateMessagesAndAssignOffsets( PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier )) - assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec}")), 1) - assertTrue(meterCount(s"${BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec}") > 0) + assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicMetrics.INVALID_OFFSET_OR_SEQUENCE_RECORDS_PER_SEC}")), 1) + assertTrue(meterCount(s"${BrokerTopicMetrics.INVALID_OFFSET_OR_SEQUENCE_RECORDS_PER_SEC}") > 0) } @Test diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index c18371854fe..bea15ad4883 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -40,6 +40,7 @@ import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, VerificationGuard} import org.apache.kafka.storage.internals.utils.Throttler +import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest @@ -1859,8 +1860,8 @@ class UnifiedLogTest { assertTrue(e.recordErrors.get(0).message.startsWith(errorMsgPrefix)) // check if metric for NoKeyCompactedTopicRecordsPerSec is logged - assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec}")), 1) - assertTrue(TestUtils.meterCount(s"${BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec}") > 0) + assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicMetrics.NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC}")), 1) + assertTrue(TestUtils.meterCount(s"${BrokerTopicMetrics.NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC}") > 0) // the following should succeed without any InvalidMessageException log.appendAsLeader(messageSetWithKeyedMessage, leaderEpoch = 0) diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala index e2355dfe119..3f6c5f0b4ee 100644 --- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala @@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.Time import org.apache.kafka.metadata.migration.ZkMigrationState import org.apache.kafka.server.config.ServerLogConfigs import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector} +import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics import org.junit.jupiter.api.Timeout import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource @@ -147,7 +148,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging { // The broker metrics for all topics should be greedily registered assertTrue(topicMetrics(None).nonEmpty, "General topic metrics don't exist") - assertEquals(brokers.head.brokerTopicStats.allTopicsStats.metricMap.size, topicMetrics(None).size) + assertEquals(brokers.head.brokerTopicStats.allTopicsStats.metricMapKeySet().size, topicMetrics(None).size) assertEquals(0, brokers.head.brokerTopicStats.allTopicsStats.metricGaugeMap.size) // topic metrics should be lazily registered assertTrue(topicMetricGroups(topic).isEmpty, "Topic metrics aren't lazily registered") @@ -169,10 +170,10 @@ class MetricsTest extends KafkaServerTestHarness with Logging { @ValueSource(strings = Array("zk", "kraft")) def testBrokerTopicMetricsBytesInOut(quorum: String): Unit = { val topic = "test-bytes-in-out" - val replicationBytesIn = BrokerTopicStats.ReplicationBytesInPerSec - val replicationBytesOut = BrokerTopicStats.ReplicationBytesOutPerSec - val bytesIn = s"${BrokerTopicStats.BytesInPerSec},topic=$topic" - val bytesOut = s"${BrokerTopicStats.BytesOutPerSec},topic=$topic" + val replicationBytesIn = BrokerTopicMetrics.REPLICATION_BYTES_IN_PER_SEC + val replicationBytesOut = BrokerTopicMetrics.REPLICATION_BYTES_OUT_PER_SEC + val bytesIn = s"${BrokerTopicMetrics.BYTES_IN_PER_SEC},topic=$topic" + val bytesOut = s"${BrokerTopicMetrics.BYTES_OUT_PER_SEC},topic=$topic" val topicConfig = new Properties topicConfig.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala index 2f5116c4245..b98c1ddfd03 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala @@ -68,7 +68,7 @@ class AbstractFetcherThreadTest { fetcher.start() - val brokerTopicStatsMetrics = fetcher.brokerTopicStats.allTopicsStats.metricMap.keySet + val brokerTopicStatsMetrics = fetcher.brokerTopicStats.allTopicsStats.metricMapKeySet().asScala val fetcherMetrics = Set(FetcherMetrics.BytesPerSec, FetcherMetrics.RequestsPerSec, FetcherMetrics.ConsumerLag) // wait until all fetcher metrics are present diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala index 84d744479d5..a05a7c3c71d 100644 --- a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala @@ -28,6 +28,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{FetchRequest, FetchResponse} import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.server.config.ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG +import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} import org.junit.jupiter.params.ParameterizedTest @@ -168,7 +169,7 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest { val fetchRequest = "request=Fetch" val fetchTemporaryMemoryBytesMetricName = s"$TemporaryMemoryBytes,$fetchRequest" val fetchMessageConversionsTimeMsMetricName = s"$MessageConversionsTimeMs,$fetchRequest" - val initialFetchMessageConversionsPerSec = TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec) + val initialFetchMessageConversionsPerSec = TestUtils.metersCount(BrokerTopicMetrics.FETCH_MESSAGE_CONVERSIONS_PER_SEC) val initialFetchMessageConversionsTimeMs = TestUtils.metersCount(fetchMessageConversionsTimeMsMetricName) val initialFetchTemporaryMemoryBytes = TestUtils.metersCount(fetchTemporaryMemoryBytesMetricName) val topicWithDownConversionEnabled = "foo" @@ -224,9 +225,9 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest { } def verifyMetrics(): Unit = { - TestUtils.waitUntilTrue(() => TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec) > initialFetchMessageConversionsPerSec, + TestUtils.waitUntilTrue(() => TestUtils.metersCount(BrokerTopicMetrics.FETCH_MESSAGE_CONVERSIONS_PER_SEC) > initialFetchMessageConversionsPerSec, s"The `FetchMessageConversionsPerSec` metric count is not incremented after 5 seconds. " + - s"init: $initialFetchMessageConversionsPerSec final: ${TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec)}", 5000) + s"init: $initialFetchMessageConversionsPerSec final: ${TestUtils.metersCount(BrokerTopicMetrics.FETCH_MESSAGE_CONVERSIONS_PER_SEC)}", 5000) TestUtils.waitUntilTrue(() => TestUtils.metersCount(fetchMessageConversionsTimeMsMetricName) > initialFetchMessageConversionsTimeMs, s"The `MessageConversionsTimeMs` in fetch request metric count is not incremented after 5 seconds. " + diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index fea41b76112..086d0d0b294 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -29,6 +29,7 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse} import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.record.BrokerCompressionType +import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{Arguments, MethodSource} @@ -204,8 +205,8 @@ class ProduceRequestTest extends BaseRequestTest { assertEquals(Errors.CORRUPT_MESSAGE, Errors.forCode(partitionProduceResponse.errorCode)) assertEquals(-1, partitionProduceResponse.baseOffset) assertEquals(-1, partitionProduceResponse.logAppendTimeMs) - assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicStats.InvalidMessageCrcRecordsPerSec}")), 1) - assertTrue(TestUtils.meterCount(s"${BrokerTopicStats.InvalidMessageCrcRecordsPerSec}") > 0) + assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicMetrics.INVALID_MESSAGE_CRC_RECORDS_PER_SEC}")), 1) + assertTrue(TestUtils.meterCount(s"${BrokerTopicMetrics.INVALID_MESSAGE_CRC_RECORDS_PER_SEC}") > 0) } @ParameterizedTest diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java index f876ea76c19..5c5607db737 100644 --- a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java +++ b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java @@ -33,10 +33,21 @@ import java.util.function.Supplier; import java.util.stream.Collectors; public class KafkaMetricsGroup { - private final Class klass; + private final String pkg; + private final String simpleName; public KafkaMetricsGroup(Class klass) { - this.klass = klass; + this(klass.getPackage() == null ? "" : klass.getPackage().getName(), klass.getSimpleName().replaceAll("\\$$", "")); + } + + /** + * This constructor allows caller to build metrics name with custom package and class name. This is useful to keep metrics + * compatibility in migrating scala code, since the difference of either package or class name will impact the mbean name and + * that will break the backward compatibility. + */ + public KafkaMetricsGroup(String packageName, String simpleName) { + this.pkg = packageName; + this.simpleName = simpleName; } /** @@ -47,9 +58,7 @@ public class KafkaMetricsGroup { * @return Sanitized metric name object. */ public MetricName metricName(String name, Map tags) { - String pkg = klass.getPackage() == null ? "" : klass.getPackage().getName(); - String simpleName = klass.getSimpleName().replaceAll("\\$$", ""); - return explicitMetricName(pkg, simpleName, name, tags); + return explicitMetricName(this.pkg, this.simpleName, name, tags); } public static MetricName explicitMetricName(String group, String typeName, diff --git a/server-common/src/test/java/org/apache/kafka/server/metrics/KafkaMetricsGroupTest.java b/server-common/src/test/java/org/apache/kafka/server/metrics/KafkaMetricsGroupTest.java new file mode 100644 index 00000000000..08b5b00fb26 --- /dev/null +++ b/server-common/src/test/java/org/apache/kafka/server/metrics/KafkaMetricsGroupTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.metrics; + +import com.yammer.metrics.core.MetricName; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class KafkaMetricsGroupTest { + @Test + public void testConstructorWithPackageAndSimpleName() { + String packageName = "testPackage"; + String simpleName = "testSimple"; + KafkaMetricsGroup group = new KafkaMetricsGroup(packageName, simpleName); + MetricName metricName = group.metricName("metric-name", Collections.emptyMap()); + assertEquals(packageName, metricName.getGroup()); + assertEquals(simpleName, metricName.getType()); + } +} diff --git a/storage/src/main/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetrics.java b/storage/src/main/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetrics.java new file mode 100644 index 00000000000..c638beb1e9a --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetrics.java @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.log.metrics; + +import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics; +import org.apache.kafka.server.metrics.KafkaMetricsGroup; + +import com.yammer.metrics.core.Meter; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class BrokerTopicMetrics { + public static final String MESSAGE_IN_PER_SEC = "MessagesInPerSec"; + public static final String BYTES_IN_PER_SEC = "BytesInPerSec"; + public static final String BYTES_OUT_PER_SEC = "BytesOutPerSec"; + public static final String BYTES_REJECTED_PER_SEC = "BytesRejectedPerSec"; + public static final String REPLICATION_BYTES_IN_PER_SEC = "ReplicationBytesInPerSec"; + public static final String REPLICATION_BYTES_OUT_PER_SEC = "ReplicationBytesOutPerSec"; + public static final String FAILED_PRODUCE_REQUESTS_PER_SEC = "FailedProduceRequestsPerSec"; + public static final String FAILED_FETCH_REQUESTS_PER_SEC = "FailedFetchRequestsPerSec"; + public static final String TOTAL_PRODUCE_REQUESTS_PER_SEC = "TotalProduceRequestsPerSec"; + public static final String TOTAL_FETCH_REQUESTS_PER_SEC = "TotalFetchRequestsPerSec"; + public static final String FETCH_MESSAGE_CONVERSIONS_PER_SEC = "FetchMessageConversionsPerSec"; + public static final String PRODUCE_MESSAGE_CONVERSIONS_PER_SEC = "ProduceMessageConversionsPerSec"; + public static final String REASSIGNMENT_BYTES_IN_PER_SEC = "ReassignmentBytesInPerSec"; + public static final String REASSIGNMENT_BYTES_OUT_PER_SEC = "ReassignmentBytesOutPerSec"; + // These following topics are for LogValidator for better debugging on failed records + public static final String NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC = "NoKeyCompactedTopicRecordsPerSec"; + public static final String INVALID_MAGIC_NUMBER_RECORDS_PER_SEC = "InvalidMagicNumberRecordsPerSec"; + public static final String INVALID_MESSAGE_CRC_RECORDS_PER_SEC = "InvalidMessageCrcRecordsPerSec"; + public static final String INVALID_OFFSET_OR_SEQUENCE_RECORDS_PER_SEC = "InvalidOffsetOrSequenceRecordsPerSec"; + + // KAFKA-16972: BrokerTopicMetrics is migrated from "kafka.server" package. + // For backward compatibility, we keep the old package name as metric group name. + private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", "BrokerTopicMetrics"); + private final Map tags; + private final Map metricTypeMap = new java.util.HashMap<>(); + private final Map metricGaugeTypeMap = new java.util.HashMap<>(); + + public BrokerTopicMetrics(boolean remoteStorageEnabled) { + this(Optional.empty(), remoteStorageEnabled); + } + + public BrokerTopicMetrics(String name, boolean remoteStorageEnabled) { + this(Optional.of(name), remoteStorageEnabled); + } + + private BrokerTopicMetrics(Optional name, boolean remoteStorageEnabled) { + this.tags = name.map(s -> Collections.singletonMap("topic", s)).orElse(Collections.emptyMap()); + + metricTypeMap.put(MESSAGE_IN_PER_SEC, new MeterWrapper(MESSAGE_IN_PER_SEC, "messages")); + metricTypeMap.put(BYTES_IN_PER_SEC, new MeterWrapper(BYTES_IN_PER_SEC, "bytes")); + metricTypeMap.put(BYTES_OUT_PER_SEC, new MeterWrapper(BYTES_OUT_PER_SEC, "bytes")); + metricTypeMap.put(BYTES_REJECTED_PER_SEC, new MeterWrapper(BYTES_REJECTED_PER_SEC, "bytes")); + metricTypeMap.put(FAILED_PRODUCE_REQUESTS_PER_SEC, new MeterWrapper(FAILED_PRODUCE_REQUESTS_PER_SEC, "requests")); + metricTypeMap.put(FAILED_FETCH_REQUESTS_PER_SEC, new MeterWrapper(FAILED_FETCH_REQUESTS_PER_SEC, "requests")); + metricTypeMap.put(TOTAL_PRODUCE_REQUESTS_PER_SEC, new MeterWrapper(TOTAL_PRODUCE_REQUESTS_PER_SEC, "requests")); + metricTypeMap.put(TOTAL_FETCH_REQUESTS_PER_SEC, new MeterWrapper(TOTAL_FETCH_REQUESTS_PER_SEC, "requests")); + metricTypeMap.put(FETCH_MESSAGE_CONVERSIONS_PER_SEC, new MeterWrapper(FETCH_MESSAGE_CONVERSIONS_PER_SEC, "requests")); + metricTypeMap.put(PRODUCE_MESSAGE_CONVERSIONS_PER_SEC, new MeterWrapper(PRODUCE_MESSAGE_CONVERSIONS_PER_SEC, "requests")); + metricTypeMap.put(NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC, new MeterWrapper(NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC, "requests")); + metricTypeMap.put(INVALID_MAGIC_NUMBER_RECORDS_PER_SEC, new MeterWrapper(INVALID_MAGIC_NUMBER_RECORDS_PER_SEC, "requests")); + metricTypeMap.put(INVALID_MESSAGE_CRC_RECORDS_PER_SEC, new MeterWrapper(INVALID_MESSAGE_CRC_RECORDS_PER_SEC, "requests")); + metricTypeMap.put(INVALID_OFFSET_OR_SEQUENCE_RECORDS_PER_SEC, new MeterWrapper(INVALID_OFFSET_OR_SEQUENCE_RECORDS_PER_SEC, "requests")); + + if (!name.isPresent()) { + metricTypeMap.put(REPLICATION_BYTES_IN_PER_SEC, new MeterWrapper(REPLICATION_BYTES_IN_PER_SEC, "bytes")); + metricTypeMap.put(REPLICATION_BYTES_OUT_PER_SEC, new MeterWrapper(REPLICATION_BYTES_OUT_PER_SEC, "bytes")); + metricTypeMap.put(REASSIGNMENT_BYTES_IN_PER_SEC, new MeterWrapper(REASSIGNMENT_BYTES_IN_PER_SEC, "bytes")); + metricTypeMap.put(REASSIGNMENT_BYTES_OUT_PER_SEC, new MeterWrapper(REASSIGNMENT_BYTES_OUT_PER_SEC, "bytes")); + } + + if (remoteStorageEnabled) { + metricTypeMap.put(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName(), new MeterWrapper(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName(), "bytes")); + metricTypeMap.put(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName(), new MeterWrapper(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName(), "bytes")); + metricTypeMap.put(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName(), new MeterWrapper(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName(), "requests")); + metricTypeMap.put(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName(), new MeterWrapper(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName(), "requests")); + metricTypeMap.put(RemoteStorageMetrics.REMOTE_DELETE_REQUESTS_PER_SEC_METRIC.getName(), new MeterWrapper(RemoteStorageMetrics.REMOTE_DELETE_REQUESTS_PER_SEC_METRIC.getName(), "requests")); + metricTypeMap.put(RemoteStorageMetrics.BUILD_REMOTE_LOG_AUX_STATE_REQUESTS_PER_SEC_METRIC.getName(), new MeterWrapper(RemoteStorageMetrics.BUILD_REMOTE_LOG_AUX_STATE_REQUESTS_PER_SEC_METRIC.getName(), "requests")); + metricTypeMap.put(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName(), new MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName(), "requests")); + metricTypeMap.put(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName(), new MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName(), "requests")); + metricTypeMap.put(RemoteStorageMetrics.FAILED_REMOTE_DELETE_PER_SEC_METRIC.getName(), new MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_DELETE_PER_SEC_METRIC.getName(), "requests")); + metricTypeMap.put(RemoteStorageMetrics.FAILED_BUILD_REMOTE_LOG_AUX_STATE_PER_SEC_METRIC.getName(), new MeterWrapper(RemoteStorageMetrics.FAILED_BUILD_REMOTE_LOG_AUX_STATE_PER_SEC_METRIC.getName(), "requests")); + + metricGaugeTypeMap.put(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName(), new GaugeWrapper(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName())); + metricGaugeTypeMap.put(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName(), new GaugeWrapper(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName())); + metricGaugeTypeMap.put(RemoteStorageMetrics.REMOTE_DELETE_LAG_BYTES_METRIC.getName(), new GaugeWrapper(RemoteStorageMetrics.REMOTE_DELETE_LAG_BYTES_METRIC.getName())); + metricGaugeTypeMap.put(RemoteStorageMetrics.REMOTE_DELETE_LAG_SEGMENTS_METRIC.getName(), new GaugeWrapper(RemoteStorageMetrics.REMOTE_DELETE_LAG_SEGMENTS_METRIC.getName())); + metricGaugeTypeMap.put(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName(), new GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName())); + metricGaugeTypeMap.put(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName(), new GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName())); + metricGaugeTypeMap.put(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName(), new GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName())); + } + } + + public void closeMetric(String metricName) { + MeterWrapper mw = metricTypeMap.get(metricName); + if (mw != null) mw.close(); + GaugeWrapper mg = metricGaugeTypeMap.get(metricName); + if (mg != null) mg.close(); + } + + public void close() { + metricTypeMap.values().forEach(MeterWrapper::close); + metricGaugeTypeMap.values().forEach(GaugeWrapper::close); + } + + // used for testing only + public Set metricMapKeySet() { + return metricTypeMap.keySet(); + } + + public Map metricGaugeMap() { + return metricGaugeTypeMap; + } + + public Meter messagesInRate() { + return metricTypeMap.get(MESSAGE_IN_PER_SEC).meter(); + } + + public Meter bytesInRate() { + return metricTypeMap.get(BYTES_IN_PER_SEC).meter(); + } + + public Meter bytesOutRate() { + return metricTypeMap.get(BYTES_OUT_PER_SEC).meter(); + } + + public Meter bytesRejectedRate() { + return metricTypeMap.get(BYTES_REJECTED_PER_SEC).meter(); + } + + public Optional replicationBytesInRate() { + if (tags.isEmpty()) { + return Optional.of(metricTypeMap.get(REPLICATION_BYTES_IN_PER_SEC).meter()); + } else { + return Optional.empty(); + } + } + + public Optional replicationBytesOutRate() { + if (tags.isEmpty()) { + return Optional.of(metricTypeMap.get(REPLICATION_BYTES_OUT_PER_SEC).meter()); + } else { + return Optional.empty(); + } + } + + public Optional reassignmentBytesInPerSec() { + if (tags.isEmpty()) { + return Optional.of(metricTypeMap.get(REASSIGNMENT_BYTES_IN_PER_SEC).meter()); + } else { + return Optional.empty(); + } + } + + public Optional reassignmentBytesOutPerSec() { + if (tags.isEmpty()) { + return Optional.of(metricTypeMap.get(REASSIGNMENT_BYTES_OUT_PER_SEC).meter()); + } else { + return Optional.empty(); + } + } + + public Meter failedProduceRequestRate() { + return metricTypeMap.get(FAILED_PRODUCE_REQUESTS_PER_SEC).meter(); + } + + public Meter failedFetchRequestRate() { + return metricTypeMap.get(FAILED_FETCH_REQUESTS_PER_SEC).meter(); + } + + public Meter totalProduceRequestRate() { + return metricTypeMap.get(TOTAL_PRODUCE_REQUESTS_PER_SEC).meter(); + } + + public Meter totalFetchRequestRate() { + return metricTypeMap.get(TOTAL_FETCH_REQUESTS_PER_SEC).meter(); + } + + public Meter fetchMessageConversionsRate() { + return metricTypeMap.get(FETCH_MESSAGE_CONVERSIONS_PER_SEC).meter(); + } + + public Meter produceMessageConversionsRate() { + return metricTypeMap.get(PRODUCE_MESSAGE_CONVERSIONS_PER_SEC).meter(); + } + + public Meter noKeyCompactedTopicRecordsPerSec() { + return metricTypeMap.get(NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC).meter(); + } + + public Meter invalidMagicNumberRecordsPerSec() { + return metricTypeMap.get(INVALID_MAGIC_NUMBER_RECORDS_PER_SEC).meter(); + } + + public Meter invalidMessageCrcRecordsPerSec() { + return metricTypeMap.get(INVALID_MESSAGE_CRC_RECORDS_PER_SEC).meter(); + } + + public Meter invalidOffsetOrSequenceRecordsPerSec() { + return metricTypeMap.get(INVALID_OFFSET_OR_SEQUENCE_RECORDS_PER_SEC).meter(); + } + + public GaugeWrapper remoteCopyLagBytesAggrMetric() { + return metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName()); + } + + // Visible for testing + public long remoteCopyLagBytes() { + return remoteCopyLagBytesAggrMetric().value(); + } + + public GaugeWrapper remoteCopyLagSegmentsAggrMetric() { + return metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName()); + } + + // Visible for testing + public long remoteCopyLagSegments() { + return remoteCopyLagSegmentsAggrMetric().value(); + } + + public GaugeWrapper remoteLogMetadataCountAggrMetric() { + return metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName()); + } + + public long remoteLogMetadataCount() { + return remoteLogMetadataCountAggrMetric().value(); + } + + public GaugeWrapper remoteLogSizeBytesAggrMetric() { + return metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName()); + } + + public long remoteLogSizeBytes() { + return remoteLogSizeBytesAggrMetric().value(); + } + + public GaugeWrapper remoteLogSizeComputationTimeAggrMetric() { + return metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName()); + } + + public long remoteLogSizeComputationTime() { + return remoteLogSizeComputationTimeAggrMetric().value(); + } + + public GaugeWrapper remoteDeleteLagBytesAggrMetric() { + return metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_DELETE_LAG_BYTES_METRIC.getName()); + } + + // Visible for testing + public long remoteDeleteLagBytes() { + return remoteDeleteLagBytesAggrMetric().value(); + } + + public GaugeWrapper remoteDeleteLagSegmentsAggrMetric() { + return metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_DELETE_LAG_SEGMENTS_METRIC.getName()); + } + + // Visible for testing + public long remoteDeleteLagSegments() { + return remoteDeleteLagSegmentsAggrMetric().value(); + } + + public Meter remoteCopyBytesRate() { + return metricTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName()).meter(); + } + + public Meter remoteFetchBytesRate() { + return metricTypeMap.get(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName()).meter(); + } + + public Meter remoteFetchRequestRate() { + return metricTypeMap.get(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName()).meter(); + } + + public Meter remoteCopyRequestRate() { + return metricTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName()).meter(); + } + + public Meter remoteDeleteRequestRate() { + return metricTypeMap.get(RemoteStorageMetrics.REMOTE_DELETE_REQUESTS_PER_SEC_METRIC.getName()).meter(); + } + + public Meter buildRemoteLogAuxStateRequestRate() { + return metricTypeMap.get(RemoteStorageMetrics.BUILD_REMOTE_LOG_AUX_STATE_REQUESTS_PER_SEC_METRIC.getName()).meter(); + } + + public Meter failedRemoteFetchRequestRate() { + return metricTypeMap.get(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName()).meter(); + } + + public Meter failedRemoteCopyRequestRate() { + return metricTypeMap.get(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName()).meter(); + } + + public Meter failedRemoteDeleteRequestRate() { + return metricTypeMap.get(RemoteStorageMetrics.FAILED_REMOTE_DELETE_PER_SEC_METRIC.getName()).meter(); + } + + public Meter failedBuildRemoteLogAuxStateRate() { + return metricTypeMap.get(RemoteStorageMetrics.FAILED_BUILD_REMOTE_LOG_AUX_STATE_PER_SEC_METRIC.getName()).meter(); + } + + private class MeterWrapper { + private final String metricType; + private final String eventType; + private volatile Meter lazyMeter; + private final Lock meterLock = new ReentrantLock(); + + public MeterWrapper(String metricType, String eventType) { + this.metricType = metricType; + this.eventType = eventType; + if (tags.isEmpty()) { + meter(); // greedily initialize the general topic metrics + } + } + + public Meter meter() { + Meter meter = lazyMeter; + if (meter == null) { + meterLock.lock(); + try { + meter = lazyMeter; + if (meter == null) { + meter = metricsGroup.newMeter(metricType, eventType, TimeUnit.SECONDS, tags); + lazyMeter = meter; + } + } finally { + meterLock.unlock(); + } + } + return meter; + } + + public void close() { + meterLock.lock(); + try { + if (lazyMeter != null) { + metricsGroup.removeMetric(metricType, tags); + lazyMeter = null; + } + } finally { + meterLock.unlock(); + } + } + } + + public class GaugeWrapper { + // The map to store: + // - per-partition value for topic-level metrics. The key will be the partition number + // - per-topic value for broker-level metrics. The key will be the topic name + private final ConcurrentHashMap metricValues = new ConcurrentHashMap<>(); + private final String metricType; + + public GaugeWrapper(String metricType) { + this.metricType = metricType; + newGaugeIfNeed(); + } + + public void setValue(String key, long value) { + newGaugeIfNeed(); + metricValues.put(key, value); + } + + public void removeKey(String key) { + newGaugeIfNeed(); + metricValues.remove(key); + } + + public void close() { + metricsGroup.removeMetric(metricType, tags); + metricValues.clear(); + } + + public long value() { + return metricValues.values().stream().mapToLong(v -> v).sum(); + } + + // metricsGroup uses ConcurrentMap to store gauges, so we don't need to use synchronized block here + private void newGaugeIfNeed() { + metricsGroup.newGauge(metricType, () -> metricValues.values().stream().mapToLong(v -> v).sum(), tags); + } + } +}