Upgrade to metrics jar to 3.x to pick up csv reporter fixes; KAFKA-542; patched by Joel Koshy; reviewed by Neha Narkhede.

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1396336 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Joel Jacob Koshy 2012-10-09 21:28:20 +00:00
parent 3a525e0bfa
commit 542ac86313
15 changed files with 53 additions and 21 deletions

View File

@ -113,3 +113,10 @@ zk.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
# metrics reporter properties
# kafka.metrics.polling.interval.secs=5
# kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
# kafka.csv.metrics.dir=kafka_metrics
# kafka.csv.metrics.reporter.enabled=true

Binary file not shown.

Binary file not shown.

View File

@ -50,7 +50,7 @@ class Partition(val topic: String,
newGauge(
topic + "-" + partitionId + "UnderReplicated",
new Gauge[Int] {
def value() = {
def getValue = {
if (isUnderReplicated) 1 else 0
}
}

View File

@ -686,7 +686,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
newGauge(
config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
new Gauge[Int] {
def value() = q.size
def getValue = q.size
}
)
})

View File

@ -50,7 +50,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
newGauge(
"ActiveControllerCount",
new Gauge[Int] {
def value() = if (isActive) 1 else 0
def getValue = if (isActive) 1 else 0
}
)

View File

@ -128,10 +128,10 @@ private[kafka] class Log(val dir: File,
private var nextOffset: AtomicLong = new AtomicLong(segments.view.last.nextOffset())
newGauge(name + "-" + "NumLogSegments",
new Gauge[Int] { def value() = numberOfSegments })
new Gauge[Int] { def getValue = numberOfSegments })
newGauge(name + "-" + "LogEndOffset",
new Gauge[Long] { def value() = logEndOffset })
new Gauge[Long] { def getValue = logEndOffset })
/* The name of this log */
def name = dir.getName()

View File

@ -50,9 +50,10 @@ private class KafkaCSVMetricsReporter extends KafkaMetricsReporter
if (!csvDir.exists())
csvDir.mkdirs()
underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir)
if (props.getBoolean("kafka.csv.metrics.reporter.enabled", false))
if (props.getBoolean("kafka.csv.metrics.reporter.enabled", default = false)) {
initialized = true
startReporter(metricsConfig.pollingIntervalSecs)
initialized = true
}
}
}
}

View File

@ -92,7 +92,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
newGauge(
"RequestQueueSize",
new Gauge[Int] {
def value() = requestQueue.size
def getValue = requestQueue.size
}
)

View File

@ -36,7 +36,7 @@ class ProducerSendThread[K,V](val threadName: String,
newGauge(
"ProducerQueueSize-" + getId,
new Gauge[Int] {
def value() = queue.size
def getValue = queue.size
}
)

View File

@ -165,7 +165,7 @@ class FetcherLagMetrics(name: (String, Int)) extends KafkaMetricsGroup {
newGauge(
name._1 + "-" + name._2 + "-ConsumerLag",
new Gauge[Long] {
def value() = lagVal.get
def getValue = lagVal.get
}
)

View File

@ -47,13 +47,13 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient
newGauge(
"LeaderCount",
new Gauge[Int] {
def value() = leaderPartitions.size
def getValue = leaderPartitions.size
}
)
newGauge(
"UnderReplicatedPartitions",
new Gauge[Int] {
def value() = {
def getValue = {
leaderPartitionsLock synchronized {
leaderPartitions.count(_.isUnderReplicated)
}

View File

@ -69,7 +69,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
newGauge(
"NumDelayedRequests",
new Gauge[Int] {
def value() = expiredRequestReaper.unsatisfied.get()
def getValue = expiredRequestReaper.unsatisfied.get()
}
)

View File

@ -35,20 +35,20 @@ class KafkaTimerTest extends JUnit3Suite {
timer.time {
clock.addMillis(1000)
}
assertEquals(1, metric.count())
assertTrue((metric.max() - 1000).abs <= Double.Epsilon)
assertTrue((metric.min() - 1000).abs <= Double.Epsilon)
assertEquals(1, metric.getCount())
assertTrue((metric.getMax() - 1000).abs <= Double.Epsilon)
assertTrue((metric.getMin() - 1000).abs <= Double.Epsilon)
}
private class ManualClock extends Clock {
private var ticksInNanos = 0L
override def tick() = {
override def getTick() = {
ticksInNanos
}
override def time() = {
override def getTime() = {
TimeUnit.NANOSECONDS.toMillis(ticksInNanos)
}

View File

@ -66,17 +66,42 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
<scope>compile</scope>
</dependency>
def metricsDeps =
<dependencies>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.0.0-10ccc80c</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-annotations</artifactId>
<version>3.0.0-10ccc80c</version>
<scope>compile</scope>
</dependency>
</dependencies>
object ZkClientDepAdder extends RuleTransformer(new RewriteRule() {
override def transform(node: Node): Seq[Node] = node match {
case Elem(prefix, "dependencies", attribs, scope, deps @ _*) => {
Elem(prefix, "dependencies", attribs, scope, deps ++ zkClientDep :_*)
Elem(prefix, "dependencies", attribs, scope, deps ++ zkClientDep:_*)
}
case other => other
}
})
object MetricsDepAdder extends RuleTransformer(new RewriteRule() {
override def transform(node: Node): Seq[Node] = node match {
case Elem(prefix, "dependencies", attribs, scope, deps @ _*) => {
Elem(prefix, "dependencies", attribs, scope, deps ++ metricsDeps:_*)
}
case other => other
}
})
override def pomPostProcess(pom: Node): Node = {
ZkClientDepAdder(pom)
MetricsDepAdder(ZkClientDepAdder(pom))
}
override def artifactID = "kafka"
@ -251,7 +276,6 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
trait CoreDependencies {
val log4j = "log4j" % "log4j" % "1.2.15"
val jopt = "net.sf.jopt-simple" % "jopt-simple" % "3.2"
val metricsCore = "com.yammer.metrics" % "metrics-core" % "latest.release"
val slf4jSimple = "org.slf4j" % "slf4j-simple" % "latest.release"
}