mirror of https://github.com/apache/kafka.git
KAFKA-826 Make Kafka 0.8 depend on metrics 2.2.0 instead of 3.x; reviewed by Swapnil Ghike, Neha Narkhede, Matt Christiansen, Scott Carey
This commit is contained in:
parent
ae362b0864
commit
6dbf9212ae
|
@ -22,42 +22,15 @@ fi
|
|||
|
||||
base_dir=$(dirname $0)/..
|
||||
|
||||
SCALA_VERSION=2.8.0
|
||||
|
||||
USER_HOME=$(eval echo ~${USER})
|
||||
ivyPath=$(echo "$USER_HOME/.ivy2/cache")
|
||||
|
||||
snappy=$(echo "$ivyPath/org.xerial.snappy/snappy-java/bundles/snappy-java-1.0.4.1.jar")
|
||||
CLASSPATH=$CLASSPATH:$snappy
|
||||
|
||||
library=$(echo "$ivyPath/org.scala-lang/scala-library/jars/scala-library-2.8.0.jar")
|
||||
CLASSPATH=$CLASSPATH:$library
|
||||
|
||||
compiler=~$(echo "$ivyPath/org.scala-lang/scala-compiler/jars/scala-compiler-2.8.0.jar")
|
||||
CLASSPATH=$CLASSPATH:$compiler
|
||||
|
||||
log4j=$(echo "$ivyPath/log4j/log4j/jars/log4j-1.2.15.jar")
|
||||
CLASSPATH=$CLASSPATH:$log4j
|
||||
|
||||
slf=$(echo "$ivyPath/org.slf4j/slf4j-api/jars/slf4j-api-1.6.4.jar")
|
||||
CLASSPATH=$CLASSPATH:$slf
|
||||
|
||||
zookeeper=$(echo "$ivyPath/org.apache.zookeeper/zookeeper/jars/zookeeper-3.3.4.jar")
|
||||
CLASSPATH=$CLASSPATH:$zookeeper
|
||||
|
||||
jopt=$(echo "$ivyPath/net.sf.jopt-simple/jopt-simple/jars/jopt-simple-3.2.jar")
|
||||
CLASSPATH=$CLASSPATH:$jopt
|
||||
|
||||
# assume all dependencies have been packaged into one jar with sbt-assembly's task "assembly-package-dependency"
|
||||
for file in $base_dir/core/target/scala-2.8.0/*.jar;
|
||||
do
|
||||
CLASSPATH=$CLASSPATH:$file
|
||||
done
|
||||
|
||||
for file in $base_dir/core/lib/*.jar;
|
||||
do
|
||||
CLASSPATH=$CLASSPATH:$file
|
||||
done
|
||||
|
||||
for file in $base_dir/perf/target/scala-2.8.0/kafka*.jar;
|
||||
for file in $base_dir/perf/target/scala-${SCALA_VERSION}/kafka*.jar;
|
||||
do
|
||||
CLASSPATH=$CLASSPATH:$file
|
||||
done
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import sbt._
|
||||
import Keys._
|
||||
import AssemblyKeys._
|
||||
|
||||
name := "kafka"
|
||||
|
||||
|
@ -11,8 +12,10 @@ libraryDependencies <+= scalaVersion("org.scala-lang" % "scala-compiler" % _ )
|
|||
|
||||
libraryDependencies ++= Seq(
|
||||
"org.apache.zookeeper" % "zookeeper" % "3.3.4",
|
||||
"com.github.sgroschupf" % "zkclient" % "0.1",
|
||||
"com.101tec" % "zkclient" % "0.2",
|
||||
"org.xerial.snappy" % "snappy-java" % "1.0.4.1",
|
||||
"com.yammer.metrics" % "metrics-core" % "2.2.0",
|
||||
"com.yammer.metrics" % "metrics-annotation" % "2.2.0",
|
||||
"org.easymock" % "easymock" % "3.0" % "test",
|
||||
"junit" % "junit" % "4.1" % "test"
|
||||
)
|
||||
|
@ -24,4 +27,5 @@ libraryDependencies <<= (scalaVersion, libraryDependencies) { (sv, deps) =>
|
|||
})
|
||||
}
|
||||
|
||||
assemblySettings
|
||||
|
||||
|
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -60,7 +60,7 @@ class Partition(val topic: String,
|
|||
newGauge(
|
||||
topic + "-" + partitionId + "-UnderReplicated",
|
||||
new Gauge[Int] {
|
||||
def getValue = {
|
||||
def value = {
|
||||
if (isUnderReplicated) 1 else 0
|
||||
}
|
||||
}
|
||||
|
|
|
@ -650,7 +650,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
|
|||
newGauge(
|
||||
config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
|
||||
new Gauge[Int] {
|
||||
def getValue = q.size
|
||||
def value = q.size
|
||||
}
|
||||
)
|
||||
})
|
||||
|
|
|
@ -97,14 +97,14 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
|
|||
newGauge(
|
||||
"ActiveControllerCount",
|
||||
new Gauge[Int] {
|
||||
def getValue() = if (isActive) 1 else 0
|
||||
def value() = if (isActive) 1 else 0
|
||||
}
|
||||
)
|
||||
|
||||
newGauge(
|
||||
"OfflinePartitionsCount",
|
||||
new Gauge[Int] {
|
||||
def getValue: Int = {
|
||||
def value(): Int = {
|
||||
controllerContext.controllerLock synchronized {
|
||||
controllerContext.partitionLeadershipInfo.count(p => !controllerContext.liveBrokerIds.contains(p._2.leaderAndIsr.leader))
|
||||
}
|
||||
|
|
|
@ -130,10 +130,10 @@ private[kafka] class Log(val dir: File,
|
|||
info("Completed load of log %s with log end offset %d".format(name, logEndOffset))
|
||||
|
||||
newGauge(name + "-" + "NumLogSegments",
|
||||
new Gauge[Int] { def getValue = numberOfSegments })
|
||||
new Gauge[Int] { def value = numberOfSegments })
|
||||
|
||||
newGauge(name + "-" + "LogEndOffset",
|
||||
new Gauge[Long] { def getValue = logEndOffset })
|
||||
new Gauge[Long] { def value = logEndOffset })
|
||||
|
||||
/* The name of this log */
|
||||
def name = dir.getName()
|
||||
|
|
|
@ -99,7 +99,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
|
|||
newGauge(
|
||||
"RequestQueueSize",
|
||||
new Gauge[Int] {
|
||||
def getValue = requestQueue.size
|
||||
def value = requestQueue.size
|
||||
}
|
||||
)
|
||||
|
||||
|
|
|
@ -36,7 +36,7 @@ class ProducerSendThread[K,V](val threadName: String,
|
|||
|
||||
newGauge(clientId + "-ProducerQueueSize",
|
||||
new Gauge[Int] {
|
||||
def getValue = queue.size
|
||||
def value = queue.size
|
||||
})
|
||||
|
||||
override def run {
|
||||
|
|
|
@ -201,7 +201,7 @@ class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMet
|
|||
newGauge(
|
||||
metricId + "-ConsumerLag",
|
||||
new Gauge[Long] {
|
||||
def getValue = lagVal.get
|
||||
def value = lagVal.get
|
||||
}
|
||||
)
|
||||
|
||||
|
|
|
@ -57,7 +57,7 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
newGauge(
|
||||
"LeaderCount",
|
||||
new Gauge[Int] {
|
||||
def getValue = {
|
||||
def value = {
|
||||
leaderPartitionsLock synchronized {
|
||||
leaderPartitions.size
|
||||
}
|
||||
|
@ -67,13 +67,13 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
newGauge(
|
||||
"PartitionCount",
|
||||
new Gauge[Int] {
|
||||
def getValue = allPartitions.size
|
||||
def value = allPartitions.size
|
||||
}
|
||||
)
|
||||
newGauge(
|
||||
"UnderReplicatedPartitions",
|
||||
new Gauge[Int] {
|
||||
def getValue = {
|
||||
def value = {
|
||||
leaderPartitionsLock synchronized {
|
||||
leaderPartitions.count(_.isUnderReplicated)
|
||||
}
|
||||
|
|
|
@ -72,14 +72,14 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
|
|||
newGauge(
|
||||
"PurgatorySize",
|
||||
new Gauge[Int] {
|
||||
def getValue = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests
|
||||
def value = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests
|
||||
}
|
||||
)
|
||||
|
||||
newGauge(
|
||||
"NumDelayedRequests",
|
||||
new Gauge[Int] {
|
||||
def getValue = expiredRequestReaper.unsatisfied.get()
|
||||
def value = expiredRequestReaper.unsatisfied.get()
|
||||
}
|
||||
)
|
||||
|
||||
|
|
|
@ -35,20 +35,20 @@ class KafkaTimerTest extends JUnit3Suite {
|
|||
timer.time {
|
||||
clock.addMillis(1000)
|
||||
}
|
||||
assertEquals(1, metric.getCount())
|
||||
assertTrue((metric.getMax() - 1000).abs <= Double.Epsilon)
|
||||
assertTrue((metric.getMin() - 1000).abs <= Double.Epsilon)
|
||||
assertEquals(1, metric.count())
|
||||
assertTrue((metric.max() - 1000).abs <= Double.Epsilon)
|
||||
assertTrue((metric.min() - 1000).abs <= Double.Epsilon)
|
||||
}
|
||||
|
||||
private class ManualClock extends Clock {
|
||||
|
||||
private var ticksInNanos = 0L
|
||||
|
||||
override def getTick() = {
|
||||
override def tick() = {
|
||||
ticksInNanos
|
||||
}
|
||||
|
||||
override def getTime() = {
|
||||
override def time() = {
|
||||
TimeUnit.NANOSECONDS.toMillis(ticksInNanos)
|
||||
}
|
||||
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
import sbt._
|
||||
import Keys._
|
||||
import java.io.File
|
||||
|
||||
import scala.xml.{Node, Elem}
|
||||
import scala.xml.transform.{RewriteRule, RuleTransformer}
|
||||
|
@ -78,17 +77,13 @@ object KafkaBuild extends Build {
|
|||
</dependencies>
|
||||
)
|
||||
|
||||
val coreSettings = Seq(
|
||||
pomPostProcess := { (pom: Node) => MetricsDepAdder(ZkClientDepAdder(pom)) }
|
||||
)
|
||||
|
||||
val runRat = TaskKey[Unit]("run-rat-task", "Runs Apache rat on Kafka")
|
||||
val runRatTask = runRat := {
|
||||
"bin/run-rat.sh" !
|
||||
}
|
||||
|
||||
lazy val kafka = Project(id = "Kafka", base = file(".")).aggregate(core, examples, contrib, perf).settings((commonSettings ++ runRatTask): _*)
|
||||
lazy val core = Project(id = "core", base = file("core")).settings(commonSettings: _*).settings(coreSettings: _*)
|
||||
lazy val core = Project(id = "core", base = file("core")).settings(commonSettings: _*)
|
||||
lazy val examples = Project(id = "java-examples", base = file("examples")).settings(commonSettings :_*) dependsOn (core)
|
||||
lazy val perf = Project(id = "perf", base = file("perf")).settings((Seq(name := "kafka-perf") ++ commonSettings):_*) dependsOn (core)
|
||||
|
||||
|
@ -96,48 +91,4 @@ object KafkaBuild extends Build {
|
|||
lazy val hadoopProducer = Project(id = "hadoop-producer", base = file("contrib/hadoop-producer")).settings(hadoopSettings ++ commonSettings: _*) dependsOn (core)
|
||||
lazy val hadoopConsumer = Project(id = "hadoop-consumer", base = file("contrib/hadoop-consumer")).settings(hadoopSettings ++ commonSettings: _*) dependsOn (core)
|
||||
|
||||
|
||||
// POM Tweaking for core:
|
||||
def zkClientDep =
|
||||
<dependency>
|
||||
<groupId>zkclient</groupId>
|
||||
<artifactId>zkclient</artifactId>
|
||||
<version>20120522</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
def metricsDeps =
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.yammer.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
<version>3.0.0-c0c8be71</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.yammer.metrics</groupId>
|
||||
<artifactId>metrics-annotations</artifactId>
|
||||
<version>3.0.0-c0c8be71</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
object ZkClientDepAdder extends RuleTransformer(new RewriteRule() {
|
||||
override def transform(node: Node): Seq[Node] = node match {
|
||||
case Elem(prefix, "dependencies", attribs, scope, deps @ _*) => {
|
||||
Elem(prefix, "dependencies", attribs, scope, deps ++ zkClientDep:_*)
|
||||
}
|
||||
case other => other
|
||||
}
|
||||
})
|
||||
|
||||
object MetricsDepAdder extends RuleTransformer(new RewriteRule() {
|
||||
override def transform(node: Node): Seq[Node] = node match {
|
||||
case Elem(prefix, "dependencies", attribs, scope, deps @ _*) => {
|
||||
Elem(prefix, "dependencies", attribs, scope, deps ++ metricsDeps:_*)
|
||||
}
|
||||
case other => other
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
|
|
|
@ -62,52 +62,6 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
|
|||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
def zkClientDep =
|
||||
<dependency>
|
||||
<groupId>com.101tec</groupId>
|
||||
<artifactId>zkclient</artifactId>
|
||||
<version>0.2</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
def metricsDepsCore =
|
||||
<dependency>
|
||||
<groupId>com.yammer.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
<version>3.0.0-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
def metricsDepsAnnotations =
|
||||
<dependency>
|
||||
<groupId>com.yammer.metrics</groupId>
|
||||
<artifactId>metrics-annotation</artifactId>
|
||||
<version>3.0.0-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
object ZkClientDepAdder extends RuleTransformer(new RewriteRule() {
|
||||
override def transform(node: Node): Seq[Node] = node match {
|
||||
case Elem(prefix, "dependencies", attribs, scope, deps @ _*) => {
|
||||
Elem(prefix, "dependencies", attribs, scope, deps ++ zkClientDep:_*)
|
||||
}
|
||||
case other => other
|
||||
}
|
||||
})
|
||||
|
||||
object MetricsDepAdder extends RuleTransformer(new RewriteRule() {
|
||||
override def transform(node: Node): Seq[Node] = node match {
|
||||
case Elem(prefix, "dependencies", attribs, scope, deps @ _*) => {
|
||||
Elem(prefix, "dependencies", attribs, scope, deps ++ metricsDepsCore ++ metricsDepsAnnotations:_*)
|
||||
}
|
||||
case other => other
|
||||
}
|
||||
})
|
||||
|
||||
override def pomPostProcess(pom: Node): Node = {
|
||||
MetricsDepAdder(ZkClientDepAdder(pom))
|
||||
}
|
||||
|
||||
override def organization = "org.apache"
|
||||
override def filterScalaJars = false
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns)
|
||||
|
||||
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.5")
|
||||
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.8")
|
||||
|
||||
addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.2.0")
|
||||
|
|
Loading…
Reference in New Issue