diff --git a/config/server.properties b/config/server.properties index 9ca0f8d8cf0..e92f5990ee1 100644 --- a/config/server.properties +++ b/config/server.properties @@ -117,6 +117,7 @@ zk.connectiontimeout.ms=1000000 # metrics reporter properties kafka.metrics.polling.interval.secs=5 kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter -kafka.csv.metrics.dir=kafka_metrics -kafka.csv.metrics.reporter.enabled=true +kafka.csv.metrics.dir=/tmp/kafka_metrics +# Disable csv reporting by default. +kafka.csv.metrics.reporter.enabled=false diff --git a/core/lib/metrics-annotation-3.0.0-c0c8be71.jar b/core/lib/metrics-annotation-3.0.0-c0c8be71.jar new file mode 100644 index 00000000000..dba9d2b932f Binary files /dev/null and b/core/lib/metrics-annotation-3.0.0-c0c8be71.jar differ diff --git a/core/lib/metrics-annotation-3.0.1.jar b/core/lib/metrics-annotation-3.0.1.jar deleted file mode 100644 index 7609fbbb8cc..00000000000 Binary files a/core/lib/metrics-annotation-3.0.1.jar and /dev/null differ diff --git a/core/lib/metrics-core-3.0.1.jar b/core/lib/metrics-core-3.0.0-c0c8be71.jar similarity index 83% rename from core/lib/metrics-core-3.0.1.jar rename to core/lib/metrics-core-3.0.0-c0c8be71.jar index 22d43d54457..529a69baf19 100644 Binary files a/core/lib/metrics-core-3.0.1.jar and b/core/lib/metrics-core-3.0.0-c0c8be71.jar differ diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index 830995b54d1..dafb1ee78f2 100644 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -18,7 +18,7 @@ package kafka -import metrics.KafkaCSVMetricsReporter +import metrics.KafkaMetricsReporter import server.{KafkaConfig, KafkaServerStartable, KafkaServer} import utils.{Utils, Logging} @@ -33,7 +33,7 @@ object Kafka extends Logging { try { val props = Utils.loadProps(args(0)) val serverConfig = new KafkaConfig(props) - KafkaCSVMetricsReporter.startCSVMetricReporter(serverConfig.props) + KafkaMetricsReporter.startReporters(serverConfig.props) val kafkaServerStartble = new KafkaServerStartable(serverConfig) // attach shutdown handler to catch control-c @@ -41,7 +41,7 @@ object Kafka extends Logging { override def run() = { kafkaServerStartble.shutdown } - }); + }) kafkaServerStartble.startup kafkaServerStartble.awaitShutdown diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index 2d21a2d7f1f..79a5cdc0bbe 100644 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -26,7 +26,7 @@ import java.io.PrintStream import kafka.message._ import kafka.serializer.StringDecoder import kafka.utils._ -import kafka.metrics.KafkaCSVMetricsReporter +import kafka.metrics.KafkaMetricsReporter /** @@ -140,7 +140,7 @@ object ConsoleConsumer extends Logging { csvReporterProps.put("kafka.csv.metrics.dir", "kafka_metrics") csvReporterProps.put("kafka.csv.metrics.reporter.enabled", "true") val verifiableProps = new VerifiableProperties(csvReporterProps) - KafkaCSVMetricsReporter.startCSVMetricReporter(verifiableProps) + KafkaMetricsReporter.startReporters(verifiableProps) } val props = new Properties() diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 8e6ce17bab8..bea258afa56 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -117,7 +117,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, config.autoCommitIntervalMs, false) } - KafkaCSVMetricsReporter.startCSVMetricReporter(config.props) + KafkaMetricsReporter.startReporters(config.props) def this(config: ConsumerConfig) = this(config, true) diff --git a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala index 70fb9a53d91..fda0b240334 100644 --- a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala +++ b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala @@ -30,26 +30,6 @@ import kafka.utils.{Utils, VerifiableProperties, Logging} private trait KafkaCSVMetricsReporterMBean extends KafkaMetricsReporterMBean -object KafkaCSVMetricsReporter { - val CSVReporterStarted: AtomicBoolean = new AtomicBoolean(false) - - def startCSVMetricReporter (verifiableProps: VerifiableProperties) { - CSVReporterStarted synchronized { - if (CSVReporterStarted.get() == false) { - val metricsConfig = new KafkaMetricsConfig(verifiableProps) - if(metricsConfig.reporters.size > 0) { - metricsConfig.reporters.foreach(reporterType => { - val reporter = Utils.createObject[KafkaMetricsReporter](reporterType) - reporter.init(verifiableProps) - if (reporter.isInstanceOf[KafkaMetricsReporterMBean]) - Utils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName) - }) - CSVReporterStarted.set(true) - } - } - } - } -} private class KafkaCSVMetricsReporter extends KafkaMetricsReporter with KafkaCSVMetricsReporterMBean @@ -69,8 +49,8 @@ private class KafkaCSVMetricsReporter extends KafkaMetricsReporter if (!initialized) { val metricsConfig = new KafkaMetricsConfig(props) csvDir = new File(props.getString("kafka.csv.metrics.dir", "kafka_metrics")) - if (!csvDir.exists()) - csvDir.mkdirs() + Utils.rm(csvDir) + csvDir.mkdirs() underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir) if (props.getBoolean("kafka.csv.metrics.reporter.enabled", default = false)) { initialized = true diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala index 57f27897eac..14e46244ee2 100644 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala @@ -20,7 +20,9 @@ package kafka.metrics -import kafka.utils.VerifiableProperties +import kafka.utils.{Utils, VerifiableProperties} +import java.util.concurrent.atomic.AtomicBoolean + /** * Base trait for reporter MBeans. If a client wants to expose these JMX @@ -45,3 +47,24 @@ trait KafkaMetricsReporter { def init(props: VerifiableProperties) } +object KafkaMetricsReporter { + val ReporterStarted: AtomicBoolean = new AtomicBoolean(false) + + def startReporters (verifiableProps: VerifiableProperties) { + ReporterStarted synchronized { + if (ReporterStarted.get() == false) { + val metricsConfig = new KafkaMetricsConfig(verifiableProps) + if(metricsConfig.reporters.size > 0) { + metricsConfig.reporters.foreach(reporterType => { + val reporter = Utils.createObject[KafkaMetricsReporter](reporterType) + reporter.init(verifiableProps) + if (reporter.isInstanceOf[KafkaMetricsReporterMBean]) + Utils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName) + }) + ReporterStarted.set(true) + } + } + } + } +} + diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index 172e108a643..fbbf07d2404 100644 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -49,7 +49,7 @@ extends Logging { case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async") } - KafkaCSVMetricsReporter.startCSVMetricReporter(config.props) + KafkaMetricsReporter.startReporters(config.props) def this(config: ProducerConfig) = this(config, diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index c343d84d531..9c4e9a04329 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -26,7 +26,7 @@ import java.text.SimpleDateFormat import java.util._ import collection.immutable.List import kafka.utils.{VerifiableProperties, Logging} -import kafka.metrics.KafkaCSVMetricsReporter +import kafka.metrics.KafkaMetricsReporter /** @@ -175,7 +175,7 @@ object ProducerPerformance extends Logging { props.put("kafka.csv.metrics.dir", "kafka_metrics") props.put("kafka.csv.metrics.reporter.enabled", "true") val verifiableProps = new VerifiableProperties(props) - KafkaCSVMetricsReporter.startCSVMetricReporter(verifiableProps) + KafkaMetricsReporter.startReporters(verifiableProps) } val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue() diff --git a/project/build/KafkaProject.scala b/project/build/KafkaProject.scala index 2e8526fb388..2dd010ea792 100644 --- a/project/build/KafkaProject.scala +++ b/project/build/KafkaProject.scala @@ -71,13 +71,13 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje com.yammer.metrics metrics-core - 3.0.1 + 3.0.0-c0c8be71 compile com.yammer.metrics metrics-annotations - 3.0.1 + 3.0.0-c0c8be71 compile diff --git a/system_test/testcase_to_run.json b/system_test/testcase_to_run.json index d89916ae8df..c6cf17ea690 100644 --- a/system_test/testcase_to_run.json +++ b/system_test/testcase_to_run.json @@ -1,6 +1,5 @@ { - "ReplicaBasicTest" : [ - "testcase_0001", + "ReplicaBasicTest" : [ "testcase_1" ] }