mirror of https://github.com/apache/kafka.git
Remove custom metrics jar and replace with latest from metrics HEAD; patched by Joel Koshy; reviewed by Jun Rao; KAFKA-585.
git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1409296 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1cc8c3beab
commit
6c97767b88
|
@ -117,6 +117,7 @@ zk.connectiontimeout.ms=1000000
|
|||
# metrics reporter properties
|
||||
kafka.metrics.polling.interval.secs=5
|
||||
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
|
||||
kafka.csv.metrics.dir=kafka_metrics
|
||||
kafka.csv.metrics.reporter.enabled=true
|
||||
kafka.csv.metrics.dir=/tmp/kafka_metrics
|
||||
# Disable csv reporting by default.
|
||||
kafka.csv.metrics.reporter.enabled=false
|
||||
|
||||
|
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -18,7 +18,7 @@
|
|||
package kafka
|
||||
|
||||
|
||||
import metrics.KafkaCSVMetricsReporter
|
||||
import metrics.KafkaMetricsReporter
|
||||
import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
|
||||
import utils.{Utils, Logging}
|
||||
|
||||
|
@ -33,7 +33,7 @@ object Kafka extends Logging {
|
|||
try {
|
||||
val props = Utils.loadProps(args(0))
|
||||
val serverConfig = new KafkaConfig(props)
|
||||
KafkaCSVMetricsReporter.startCSVMetricReporter(serverConfig.props)
|
||||
KafkaMetricsReporter.startReporters(serverConfig.props)
|
||||
val kafkaServerStartble = new KafkaServerStartable(serverConfig)
|
||||
|
||||
// attach shutdown handler to catch control-c
|
||||
|
@ -41,7 +41,7 @@ object Kafka extends Logging {
|
|||
override def run() = {
|
||||
kafkaServerStartble.shutdown
|
||||
}
|
||||
});
|
||||
})
|
||||
|
||||
kafkaServerStartble.startup
|
||||
kafkaServerStartble.awaitShutdown
|
||||
|
|
|
@ -26,7 +26,7 @@ import java.io.PrintStream
|
|||
import kafka.message._
|
||||
import kafka.serializer.StringDecoder
|
||||
import kafka.utils._
|
||||
import kafka.metrics.KafkaCSVMetricsReporter
|
||||
import kafka.metrics.KafkaMetricsReporter
|
||||
|
||||
|
||||
/**
|
||||
|
@ -140,7 +140,7 @@ object ConsoleConsumer extends Logging {
|
|||
csvReporterProps.put("kafka.csv.metrics.dir", "kafka_metrics")
|
||||
csvReporterProps.put("kafka.csv.metrics.reporter.enabled", "true")
|
||||
val verifiableProps = new VerifiableProperties(csvReporterProps)
|
||||
KafkaCSVMetricsReporter.startCSVMetricReporter(verifiableProps)
|
||||
KafkaMetricsReporter.startReporters(verifiableProps)
|
||||
}
|
||||
|
||||
val props = new Properties()
|
||||
|
|
|
@ -117,7 +117,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
|
|||
config.autoCommitIntervalMs, false)
|
||||
}
|
||||
|
||||
KafkaCSVMetricsReporter.startCSVMetricReporter(config.props)
|
||||
KafkaMetricsReporter.startReporters(config.props)
|
||||
|
||||
def this(config: ConsumerConfig) = this(config, true)
|
||||
|
||||
|
|
|
@ -30,26 +30,6 @@ import kafka.utils.{Utils, VerifiableProperties, Logging}
|
|||
|
||||
private trait KafkaCSVMetricsReporterMBean extends KafkaMetricsReporterMBean
|
||||
|
||||
object KafkaCSVMetricsReporter {
|
||||
val CSVReporterStarted: AtomicBoolean = new AtomicBoolean(false)
|
||||
|
||||
def startCSVMetricReporter (verifiableProps: VerifiableProperties) {
|
||||
CSVReporterStarted synchronized {
|
||||
if (CSVReporterStarted.get() == false) {
|
||||
val metricsConfig = new KafkaMetricsConfig(verifiableProps)
|
||||
if(metricsConfig.reporters.size > 0) {
|
||||
metricsConfig.reporters.foreach(reporterType => {
|
||||
val reporter = Utils.createObject[KafkaMetricsReporter](reporterType)
|
||||
reporter.init(verifiableProps)
|
||||
if (reporter.isInstanceOf[KafkaMetricsReporterMBean])
|
||||
Utils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName)
|
||||
})
|
||||
CSVReporterStarted.set(true)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class KafkaCSVMetricsReporter extends KafkaMetricsReporter
|
||||
with KafkaCSVMetricsReporterMBean
|
||||
|
@ -69,8 +49,8 @@ private class KafkaCSVMetricsReporter extends KafkaMetricsReporter
|
|||
if (!initialized) {
|
||||
val metricsConfig = new KafkaMetricsConfig(props)
|
||||
csvDir = new File(props.getString("kafka.csv.metrics.dir", "kafka_metrics"))
|
||||
if (!csvDir.exists())
|
||||
csvDir.mkdirs()
|
||||
Utils.rm(csvDir)
|
||||
csvDir.mkdirs()
|
||||
underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir)
|
||||
if (props.getBoolean("kafka.csv.metrics.reporter.enabled", default = false)) {
|
||||
initialized = true
|
||||
|
|
|
@ -20,7 +20,9 @@
|
|||
|
||||
package kafka.metrics
|
||||
|
||||
import kafka.utils.VerifiableProperties
|
||||
import kafka.utils.{Utils, VerifiableProperties}
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
|
||||
/**
|
||||
* Base trait for reporter MBeans. If a client wants to expose these JMX
|
||||
|
@ -45,3 +47,24 @@ trait KafkaMetricsReporter {
|
|||
def init(props: VerifiableProperties)
|
||||
}
|
||||
|
||||
object KafkaMetricsReporter {
|
||||
val ReporterStarted: AtomicBoolean = new AtomicBoolean(false)
|
||||
|
||||
def startReporters (verifiableProps: VerifiableProperties) {
|
||||
ReporterStarted synchronized {
|
||||
if (ReporterStarted.get() == false) {
|
||||
val metricsConfig = new KafkaMetricsConfig(verifiableProps)
|
||||
if(metricsConfig.reporters.size > 0) {
|
||||
metricsConfig.reporters.foreach(reporterType => {
|
||||
val reporter = Utils.createObject[KafkaMetricsReporter](reporterType)
|
||||
reporter.init(verifiableProps)
|
||||
if (reporter.isInstanceOf[KafkaMetricsReporterMBean])
|
||||
Utils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName)
|
||||
})
|
||||
ReporterStarted.set(true)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -49,7 +49,7 @@ extends Logging {
|
|||
case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async")
|
||||
}
|
||||
|
||||
KafkaCSVMetricsReporter.startCSVMetricReporter(config.props)
|
||||
KafkaMetricsReporter.startReporters(config.props)
|
||||
|
||||
def this(config: ProducerConfig) =
|
||||
this(config,
|
||||
|
|
|
@ -26,7 +26,7 @@ import java.text.SimpleDateFormat
|
|||
import java.util._
|
||||
import collection.immutable.List
|
||||
import kafka.utils.{VerifiableProperties, Logging}
|
||||
import kafka.metrics.KafkaCSVMetricsReporter
|
||||
import kafka.metrics.KafkaMetricsReporter
|
||||
|
||||
|
||||
/**
|
||||
|
@ -175,7 +175,7 @@ object ProducerPerformance extends Logging {
|
|||
props.put("kafka.csv.metrics.dir", "kafka_metrics")
|
||||
props.put("kafka.csv.metrics.reporter.enabled", "true")
|
||||
val verifiableProps = new VerifiableProperties(props)
|
||||
KafkaCSVMetricsReporter.startCSVMetricReporter(verifiableProps)
|
||||
KafkaMetricsReporter.startReporters(verifiableProps)
|
||||
}
|
||||
|
||||
val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue()
|
||||
|
|
|
@ -71,13 +71,13 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
|
|||
<dependency>
|
||||
<groupId>com.yammer.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
<version>3.0.1</version>
|
||||
<version>3.0.0-c0c8be71</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.yammer.metrics</groupId>
|
||||
<artifactId>metrics-annotations</artifactId>
|
||||
<version>3.0.1</version>
|
||||
<version>3.0.0-c0c8be71</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
{
|
||||
"ReplicaBasicTest" : [
|
||||
"testcase_0001",
|
||||
"ReplicaBasicTest" : [
|
||||
"testcase_1"
|
||||
]
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue