KAFKA-17222 Remove the subclass of KafkaMetricsGroup (#16752)

The method overrides of metricName in KafkaMetricsGroup are no longer required since there's a new constructor that implement this with the same behavior.

Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
bboyleonp666 2024-08-07 01:26:49 +08:00 committed by GitHub
parent 46f1f0268b
commit 5fac905749
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 22 additions and 71 deletions

View File

@ -17,7 +17,6 @@
package kafka.log
import com.yammer.metrics.core.MetricName
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.log.LocalLog.nextOption
import kafka.log.remote.RemoteLogManager
@ -111,12 +110,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
import kafka.log.UnifiedLog._
private val metricsGroup = new KafkaMetricsGroup(this.getClass) {
// For compatibility, metrics are defined to be under `Log` class
override def metricName(name: String, tags: util.Map[String, String]): MetricName = {
KafkaMetricsGroup.explicitMetricName(getClass.getPackage.getName, "Log", name, tags)
}
}
// For compatibility, metrics are defined to be under `Log` class
private val metricsGroup = new KafkaMetricsGroup(getClass.getPackage.getName, "Log")
this.logIdent = s"[UnifiedLog partition=$topicPartition, dir=$parentDir] "

View File

@ -608,9 +608,8 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
private[network] val processors = new ArrayBuffer[Processor]()
// Build the metric name explicitly in order to keep the existing name for compatibility
private val blockedPercentMeterMetricName = KafkaMetricsGroup.explicitMetricName(
"kafka.network",
"Acceptor",
private val backwardCompatibilityMetricGroup = new KafkaMetricsGroup("kafka.network", "Acceptor")
private val blockedPercentMeterMetricName = backwardCompatibilityMetricGroup.metricName(
s"${metricPrefix()}AcceptorBlockedPercent",
Map(ListenerMetricTag -> endPoint.listenerName.value).asJava)
private val blockedPercentMeter = metricsGroup.newMeter(blockedPercentMeterMetricName,"blocked time", TimeUnit.NANOSECONDS)

View File

@ -17,7 +17,6 @@
package kafka.server
import com.yammer.metrics.core.MetricName
import kafka.log.LogManager
import kafka.log.remote.RemoteLogManager
import kafka.network.SocketServer
@ -37,7 +36,6 @@ import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, L
import org.apache.kafka.server.util.Scheduler
import java.time.Duration
import java.util
import scala.collection.Seq
import scala.jdk.CollectionConverters._
@ -103,13 +101,9 @@ trait KafkaBroker extends Logging {
def clientToControllerChannelManager: NodeToControllerChannelManager
def tokenCache: DelegationTokenCache
private val metricsGroup = new KafkaMetricsGroup(this.getClass) {
// For backwards compatibility, we need to keep older metrics tied
// to their original name when this class was named `KafkaServer`
override def metricName(name: String, tags: util.Map[String, String]): MetricName = {
KafkaMetricsGroup.explicitMetricName(Server.MetricsPrefix, KafkaBroker.MetricsTypeName, name, tags)
}
}
// For backwards compatibility, we need to keep older metrics tied
// to their original name when this class was named `KafkaServer`
private val metricsGroup = new KafkaMetricsGroup(Server.MetricsPrefix, KafkaBroker.MetricsTypeName)
metricsGroup.newGauge("BrokerState", () => brokerState.value)
metricsGroup.newGauge("ClusterId", () => clusterId)

View File

@ -33,10 +33,8 @@ final class BrokerServerMetrics private (
) extends AutoCloseable {
import BrokerServerMetrics._
private val batchProcessingTimeHistName = KafkaMetricsGroup.explicitMetricName("kafka.server",
"BrokerMetadataListener",
"MetadataBatchProcessingTimeUs",
Collections.emptyMap())
private val metricsGroup = new KafkaMetricsGroup("kafka.server","BrokerMetadataListener")
private val batchProcessingTimeHistName = metricsGroup.metricName("MetadataBatchProcessingTimeUs", Collections.emptyMap())
/**
* A histogram tracking the time in microseconds it took to process batches of events.
@ -44,10 +42,7 @@ final class BrokerServerMetrics private (
private val batchProcessingTimeHist =
KafkaYammerMetrics.defaultRegistry().newHistogram(batchProcessingTimeHistName, true)
private val batchSizeHistName = KafkaMetricsGroup.explicitMetricName("kafka.server",
"BrokerMetadataListener",
"MetadataBatchSizes",
Collections.emptyMap())
private val batchSizeHistName = metricsGroup.metricName("MetadataBatchSizes", Collections.emptyMap())
/**
* A histogram tracking the sizes of batches that we have processed.

View File

@ -17,7 +17,6 @@
package kafka.zk
import java.util.Properties
import com.yammer.metrics.core.MetricName
import kafka.api.LeaderAndIsr
import kafka.cluster.Broker
import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch, ReplicaAssignment}
@ -44,7 +43,6 @@ import org.apache.zookeeper.common.ZKConfig
import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper.{CreateMode, KeeperException, OpResult, ZooKeeper}
import java.util
import java.lang.{Long => JLong}
import scala.collection.{Map, Seq, mutable}
@ -67,11 +65,7 @@ class KafkaZkClient private[zk] (
enableEntityConfigControllerCheck: Boolean
) extends AutoCloseable with Logging {
private val metricsGroup: KafkaMetricsGroup = new KafkaMetricsGroup(this.getClass) {
override def metricName(name: String, metricTags: util.Map[String, String]): MetricName = {
KafkaMetricsGroup.explicitMetricName("kafka.server", "ZooKeeperClientMetrics", name, metricTags)
}
}
private val metricsGroup: KafkaMetricsGroup = new KafkaMetricsGroup("kafka.server", "ZooKeeperClientMetrics")
private val latencyMetric = metricsGroup.newHistogram("ZooKeeperRequestLatencyMs")

View File

@ -21,7 +21,6 @@ import java.util.Locale
import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
import java.util.concurrent._
import java.util.{List => JList}
import com.yammer.metrics.core.MetricName
import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
import kafka.utils.Logging
import kafka.zookeeper.ZooKeeperClient._
@ -36,7 +35,6 @@ import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper._
import org.apache.zookeeper.client.ZKClientConfig
import java.util
import scala.jdk.CollectionConverters._
import scala.collection.{Seq, mutable}
@ -64,12 +62,7 @@ class ZooKeeperClient(connectString: String,
private[zookeeper] val clientConfig: ZKClientConfig,
name: String) extends Logging {
private val metricsGroup: KafkaMetricsGroup = new KafkaMetricsGroup(this.getClass) {
override def metricName(name: String, metricTags: util.Map[String, String]): MetricName = {
KafkaMetricsGroup.explicitMetricName(metricGroup, metricType, name, metricTags)
}
}
private val metricsGroup: KafkaMetricsGroup = new KafkaMetricsGroup(metricGroup, metricType)
this.logIdent = s"[ZooKeeperClient $name] "
private val initializationLock = new ReentrantReadWriteLock()

View File

@ -29,12 +29,8 @@ class KafkaMetricsGroupTest {
@Test
def testUntaggedMetricName(): Unit = {
val metricName = KafkaMetricsGroup.explicitMetricName(
"kafka.metrics",
"TestMetrics",
"TaggedMetric",
Collections.emptyMap()
)
val metricsGroup = new KafkaMetricsGroup("kafka.metrics", "TestMetrics")
val metricName = metricsGroup.metricName("TaggedMetric", Collections.emptyMap())
assertEquals("kafka.metrics", metricName.getGroup)
assertEquals("TestMetrics", metricName.getType)
@ -47,12 +43,8 @@ class KafkaMetricsGroupTest {
@Test
def testTaggedMetricName(): Unit = {
val tags = Map("foo" -> "bar", "bar" -> "baz", "baz" -> "raz.taz").asJava
val metricName = KafkaMetricsGroup.explicitMetricName(
"kafka.metrics",
"TestMetrics",
"TaggedMetric",
tags
)
val metricsGroup = new KafkaMetricsGroup("kafka.metrics", "TestMetrics")
val metricName = metricsGroup.metricName("TaggedMetric", tags)
assertEquals("kafka.metrics", metricName.getGroup)
assertEquals("TestMetrics", metricName.getType)
@ -65,12 +57,8 @@ class KafkaMetricsGroupTest {
@Test
def testTaggedMetricNameWithEmptyValue(): Unit = {
val tags = Map("foo" -> "bar", "bar" -> "", "baz" -> "raz.taz").asJava
val metricName = KafkaMetricsGroup.explicitMetricName(
"kafka.metrics",
"TestMetrics",
"TaggedMetric",
tags
)
val metricsGroup = new KafkaMetricsGroup("kafka.metrics", "TestMetrics")
val metricName = metricsGroup.metricName("TaggedMetric", tags)
assertEquals("kafka.metrics", metricName.getGroup)
assertEquals("TestMetrics", metricName.getType)

View File

@ -61,7 +61,7 @@ public class KafkaMetricsGroup {
return explicitMetricName(this.pkg, this.simpleName, name, tags);
}
public static MetricName explicitMetricName(String group, String typeName,
private static MetricName explicitMetricName(String group, String typeName,
String name, Map<String, String> tags) {
StringBuilder nameBuilder = new StringBuilder(100);
nameBuilder.append(group);

View File

@ -29,7 +29,6 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.Timer;
import org.slf4j.Logger;
@ -44,7 +43,6 @@ import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.attribute.FileTime;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.Callable;
@ -72,14 +70,9 @@ public class LogSegment implements Closeable {
private static final Pattern FUTURE_DIR_PATTERN = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + FUTURE_DIR_SUFFIX);
static {
KafkaMetricsGroup logFlushStatsMetricsGroup = new KafkaMetricsGroup(LogSegment.class) {
@Override
public MetricName metricName(String name, Map<String, String> tags) {
// Override the group and type names for compatibility - this metrics group was previously defined within
// a Scala object named `kafka.log.LogFlushStats`
return KafkaMetricsGroup.explicitMetricName("kafka.log", "LogFlushStats", name, tags);
}
};
// For compatibility - this metrics group was previously defined within
// a Scala object named `kafka.log.LogFlushStats`
KafkaMetricsGroup logFlushStatsMetricsGroup = new KafkaMetricsGroup("kafka.log", "LogFlushStats");
LOG_FLUSH_TIMER = logFlushStatsMetricsGroup.newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
}