From 49f3b4017918c4e254e9bc09e2ef5178d62e0248 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Wed, 10 Aug 2011 22:32:23 +0000 Subject: [PATCH] auto-discovery of topics for mirroring; patched by Joel; reviewed by Jun; KAFKA-74 git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1156393 13f79535-47bb-0310-9956-ffa450edef68 --- core/src/main/scala/kafka/Kafka.scala | 19 +- .../scala/kafka/consumer/ConsumerConfig.scala | 25 +- .../kafka/consumer/TopicEventHandler.scala | 24 ++ .../consumer/ZookeeperConsumerConnector.scala | 15 +- .../consumer/ZookeeperTopicEventWatcher.scala | 115 ++++++ .../scala/kafka/producer/ProducerConfig.scala | 2 +- .../kafka/producer/async/AsyncProducer.scala | 25 +- .../producer/async/AsyncProducerConfig.scala | 8 + .../AsyncProducerInterruptedException.scala | 23 ++ .../kafka/server/KafkaServerStartable.scala | 146 +++++-- project/build/KafkaProject.scala | 2 + system_test/embedded_consumer/README | 25 +- system_test/embedded_consumer/bin/run-test.sh | 371 ++++++++++++++---- .../config/blacklisttest.consumer.properties | 15 + .../config/consumer.properties | 14 - .../config/mirror_producer.properties | 13 + .../config/server_source1.properties | 2 - .../config/server_source2.properties | 2 - .../config/server_source3.properties | 2 - .../config/whitelisttest.consumer.properties | 15 + 20 files changed, 712 insertions(+), 151 deletions(-) create mode 100644 core/src/main/scala/kafka/consumer/TopicEventHandler.scala create mode 100644 core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala create mode 100644 core/src/main/scala/kafka/producer/async/AsyncProducerInterruptedException.scala create mode 100644 system_test/embedded_consumer/config/blacklisttest.consumer.properties create mode 100644 system_test/embedded_consumer/config/mirror_producer.properties create mode 100644 system_test/embedded_consumer/config/whitelisttest.consumer.properties diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index 88d840e94dd..fb96232cad7 100644 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -19,6 +19,7 @@ package kafka import consumer.ConsumerConfig import org.apache.log4j.Logger +import producer.ProducerConfig import server.{KafkaConfig, KafkaServerStartable, KafkaServer} import utils.Utils import org.apache.log4j.jmx.LoggerDynamicMBean @@ -30,21 +31,23 @@ object Kafka { val kafkaLog4jMBeanName = "kafka:type=kafka.KafkaLog4j" Utils.swallow(logger.warn, Utils.registerMBean(new LoggerDynamicMBean(Logger.getRootLogger()), kafkaLog4jMBeanName)) - if(args.length != 1 && args.length != 2) { - println("USAGE: java [options] " + classOf[KafkaServer].getSimpleName() + " server.properties [consumer.properties") + if (!List(1, 3).contains(args.length)) { + println("USAGE: java [options] %s server.properties [consumer.properties producer.properties]".format(classOf[KafkaServer].getSimpleName())) System.exit(1) } try { - var kafkaServerStartble: KafkaServerStartable = null val props = Utils.loadProps(args(0)) val serverConfig = new KafkaConfig(props) - if (args.length == 2) { - val consumerConfig = new ConsumerConfig(Utils.loadProps(args(1))) - kafkaServerStartble = new KafkaServerStartable(serverConfig, consumerConfig) + + val kafkaServerStartble = args.length match { + case 3 => + val consumerConfig = new ConsumerConfig(Utils.loadProps(args(1))) + val producerConfig = new ProducerConfig(Utils.loadProps(args(2))) + new KafkaServerStartable(serverConfig, consumerConfig, producerConfig) + case 1 => + new KafkaServerStartable(serverConfig) } - else - kafkaServerStartble = new KafkaServerStartable(serverConfig) // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread() { diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index e47eeaacba0..3475382ab31 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -20,7 +20,7 @@ package kafka.consumer import java.util.Properties import kafka.utils.{ZKConfig, Utils} import kafka.api.OffsetRequest - +import kafka.common.InvalidConfigException object ConsumerConfig { val SocketTimeout = 30 * 1000 val SocketBufferSize = 64*1024 @@ -32,7 +32,11 @@ object ConsumerConfig { val MaxQueuedChunks = 100 val AutoOffsetReset = OffsetRequest.SmallestTimeString val ConsumerTimeoutMs = -1 - val EmbeddedConsumerTopics = "" + val MirrorTopicsWhitelist = "" + val MirrorTopicsBlacklist = "" + + val MirrorTopicsWhitelistProp = "mirror.topics.whitelist" + val MirrorTopicsBlacklistProp = "mirror.topics.blacklist" } class ConsumerConfig(props: Properties) extends ZKConfig(props) { @@ -80,7 +84,18 @@ class ConsumerConfig(props: Properties) extends ZKConfig(props) { /** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */ val consumerTimeoutMs = Utils.getInt(props, "consumer.timeout.ms", ConsumerTimeoutMs) - /* embed a consumer in the broker. e.g., topic1:1,topic2:1 */ - val embeddedConsumerTopicMap = Utils.getConsumerTopicMap(Utils.getString(props, "embeddedconsumer.topics", - EmbeddedConsumerTopics)) + /** Whitelist of topics for this mirror's embedded consumer to consume. At + * most one of whitelist/blacklist may be specified. + * e.g., topic1:1,topic2:1 */ + val mirrorTopicsWhitelistMap = Utils.getConsumerTopicMap(Utils.getString( + props, MirrorTopicsWhitelistProp, MirrorTopicsWhitelist)) + + /** Topics to skip mirroring. At most one of whitelist/blacklist may be + * specified */ + val mirrorTopicsBlackList = Utils.getString( + props, MirrorTopicsBlacklistProp, MirrorTopicsBlacklist) + + if (mirrorTopicsWhitelistMap.nonEmpty && mirrorTopicsBlackList.nonEmpty) + throw new InvalidConfigException("The embedded consumer's mirror topics configuration can only contain one of blacklist or whitelist") } + diff --git a/core/src/main/scala/kafka/consumer/TopicEventHandler.scala b/core/src/main/scala/kafka/consumer/TopicEventHandler.scala new file mode 100644 index 00000000000..2423f0a4230 --- /dev/null +++ b/core/src/main/scala/kafka/consumer/TopicEventHandler.scala @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.consumer + +trait TopicEventHandler[T] { + + def handleTopicEvent(allTopics: Seq[T]) + +} diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 40743c78a6f..6e4f8acaa66 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -28,6 +28,7 @@ import java.net.InetAddress import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} import org.apache.zookeeper.Watcher.Event.KeeperState import kafka.api.OffsetRequest +import java.util.UUID /** * This class handles the consumers interaction with zookeeper @@ -156,7 +157,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, case Some(consumerId) // for testing only => consumerUuid = consumerId case None // generate unique consumerId automatically - => consumerUuid = InetAddress.getLocalHost.getHostName + "-" + System.currentTimeMillis + => val uuid = UUID.randomUUID() + consumerUuid = "%s-%d-%s".format( + InetAddress.getLocalHost.getHostName, System.currentTimeMillis, + uuid.getMostSignificantBits().toHexString.substring(0,8)) } val consumerIdString = config.groupId + "_" + consumerUuid val topicCount = new TopicCount(consumerIdString, topicCountMap) @@ -164,6 +168,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, // listener to consumer and partition changes val loadBalancerListener = new ZKRebalancerListener(config.groupId, consumerIdString) registerConsumerInZK(dirs, consumerIdString, topicCount) + + // register listener for session expired event + zkClient.subscribeStateChanges( + new ZKSessionExpireListenner(dirs, consumerIdString, topicCount, loadBalancerListener)) + zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener) // create a queue per topic per consumer thread @@ -184,10 +193,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, zkClient.subscribeChildChanges(partitionPath, loadBalancerListener) } - // register listener for session expired event - zkClient.subscribeStateChanges( - new ZKSessionExpireListenner(dirs, consumerIdString, topicCount, loadBalancerListener)) - // explicitly trigger load balancing for this consumer loadBalancerListener.syncedRebalance() ret diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala new file mode 100644 index 00000000000..ae821eaca41 --- /dev/null +++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.consumer + +import org.apache.log4j.Logger +import scala.collection.JavaConversions._ +import kafka.utils.{Utils, ZkUtils, ZKStringSerializer} +import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} +import org.apache.zookeeper.Watcher.Event.KeeperState + +class ZookeeperTopicEventWatcher(val config:ConsumerConfig, + val eventHandler: TopicEventHandler[String]) { + + private val logger = Logger.getLogger(getClass) + + val lock = new Object() + + private var zkClient: ZkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, + config.zkConnectionTimeoutMs, ZKStringSerializer) + + startWatchingTopicEvents() + + private def startWatchingTopicEvents() { + val topicEventListener = new ZkTopicEventListener + ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.BrokerTopicsPath) + + zkClient.subscribeStateChanges( + new ZkSessionExpireListener(topicEventListener)) + + val topics = zkClient.subscribeChildChanges( + ZkUtils.BrokerTopicsPath, topicEventListener).toList + + // call to bootstrap topic list + topicEventListener.handleChildChange(ZkUtils.BrokerTopicsPath, topics) + } + + private def stopWatchingTopicEvents() { zkClient.unsubscribeAll() } + + def shutdown() { + lock.synchronized { + try { + if (zkClient != null) { + stopWatchingTopicEvents() + zkClient.close() + zkClient = null + } + else + logger.warn("Cannot shutdown already shutdown topic event watcher.") + } + catch { + case e => + logger.fatal(e) + logger.fatal(Utils.stackTrace(e)) + } + } + } + + class ZkTopicEventListener() extends IZkChildListener { + + @throws(classOf[Exception]) + def handleChildChange(parent: String, children: java.util.List[String]) { + lock.synchronized { + try { + if (zkClient != null) { + val latestTopics = zkClient.getChildren(ZkUtils.BrokerTopicsPath).toList + logger.debug("all topics: %s".format(latestTopics)) + + eventHandler.handleTopicEvent(latestTopics) + } + } + catch { + case e => + logger.fatal(e) + logger.fatal(Utils.stackTrace(e)) + } + } + } + + } + + class ZkSessionExpireListener(val topicEventListener: ZkTopicEventListener) + extends IZkStateListener { + + @throws(classOf[Exception]) + def handleStateChanged(state: KeeperState) { } + + @throws(classOf[Exception]) + def handleNewSession() { + lock.synchronized { + if (zkClient != null) { + logger.info( + "ZK expired: resubscribing topic event listener to topic registry") + zkClient.subscribeChildChanges( + ZkUtils.BrokerTopicsPath, topicEventListener) + } + } + } + } +} + diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala index 8a56bd03e83..4548f33bcec 100644 --- a/core/src/main/scala/kafka/producer/ProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala @@ -22,7 +22,7 @@ import java.util.Properties import kafka.utils.{ZKConfig, Utils} import kafka.common.InvalidConfigException -class ProducerConfig(val props: Properties) extends ZKConfig(props) +class ProducerConfig(val props: Properties) extends ZKConfig(props) with AsyncProducerConfigShared with SyncProducerConfigShared{ /** For bypassing zookeeper based auto partition discovery, use this config * diff --git a/core/src/main/scala/kafka/producer/async/AsyncProducer.scala b/core/src/main/scala/kafka/producer/async/AsyncProducer.scala index 43faa903d65..1a6200079ab 100644 --- a/core/src/main/scala/kafka/producer/async/AsyncProducer.scala +++ b/core/src/main/scala/kafka/producer/async/AsyncProducer.scala @@ -17,7 +17,7 @@ package kafka.producer.async -import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} import kafka.utils.Utils import java.util.concurrent.atomic.AtomicBoolean import org.apache.log4j.{Level, Logger} @@ -90,7 +90,28 @@ private[kafka] class AsyncProducer[T](config: AsyncProducerConfig, if(cbkHandler != null) data = cbkHandler.beforeEnqueue(data) - val added = queue.offer(data) + val added = if (config.enqueueTimeoutMs != 0) { + try { + if (config.enqueueTimeoutMs < 0) { + queue.put(data) + true + } + else { + queue.offer(data, config.enqueueTimeoutMs, TimeUnit.MILLISECONDS) + } + } + catch { + case e: InterruptedException => + val msg = "%s interrupted during enqueue of event %s.".format( + getClass.getSimpleName, event.toString) + logger.error(msg) + throw new AsyncProducerInterruptedException(msg) + } + } + else { + queue.offer(data) + } + if(cbkHandler != null) cbkHandler.afterEnqueue(data, added) diff --git a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala index 7a2b185e3f5..ca3e65e2561 100644 --- a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala @@ -33,6 +33,14 @@ trait AsyncProducerConfigShared { /** the maximum size of the blocking queue for buffering on the producer */ val queueSize = Utils.getInt(props, "queue.size", 10000) + /** + * Timeout for event enqueue: + * 0: events will be enqueued immediately or dropped if the queue is full + * -ve: enqueue will block indefinitely if the queue is full + * +ve: enqueue will block up to this many milliseconds if the queue is full + */ + val enqueueTimeoutMs = Utils.getInt(props, "queue.enqueueTimeout.ms", 0) + /** the number of messages batched at the producer */ val batchSize = Utils.getInt(props, "batch.size", 200) diff --git a/core/src/main/scala/kafka/producer/async/AsyncProducerInterruptedException.scala b/core/src/main/scala/kafka/producer/async/AsyncProducerInterruptedException.scala new file mode 100644 index 00000000000..42944f4670e --- /dev/null +++ b/core/src/main/scala/kafka/producer/async/AsyncProducerInterruptedException.scala @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.producer.async + +class AsyncProducerInterruptedException(message: String) extends RuntimeException(message) { + def this() = this(null) +} + diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala index 6b2148645a8..41411173bae 100644 --- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala +++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala @@ -17,24 +17,29 @@ package kafka.server +import kafka.utils.Utils +import kafka.consumer._ +import kafka.producer.{ProducerData, ProducerConfig, Producer} +import kafka.message.Message import org.apache.log4j.Logger -import kafka.consumer.{Consumer, ConsumerConnector, ConsumerConfig} -import kafka.utils.{SystemTime, Utils} -import kafka.api.RequestKeys -import kafka.message.{NoCompressionCodec, ByteBufferMessageSet} -class KafkaServerStartable(val serverConfig: KafkaConfig, val consumerConfig: ConsumerConfig) { +import scala.collection.Map + +class KafkaServerStartable(val serverConfig: KafkaConfig, + val consumerConfig: ConsumerConfig, + val producerConfig: ProducerConfig) { private var server : KafkaServer = null private var embeddedConsumer : EmbeddedConsumer = null init - def this(serverConfig: KafkaConfig) = this(serverConfig, null) + def this(serverConfig: KafkaConfig) = this(serverConfig, null, null) private def init() { server = new KafkaServer(serverConfig) if (consumerConfig != null) - embeddedConsumer = new EmbeddedConsumer(consumerConfig, server) + embeddedConsumer = + new EmbeddedConsumer(consumerConfig, producerConfig, server) } def startup() { @@ -55,43 +60,106 @@ class KafkaServerStartable(val serverConfig: KafkaConfig, val consumerConfig: Co } class EmbeddedConsumer(private val consumerConfig: ConsumerConfig, - private val kafkaServer: KafkaServer) { - private val logger = Logger.getLogger(getClass()) - private val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig) - private val topicMessageStreams = consumerConnector.createMessageStreams(consumerConfig.embeddedConsumerTopicMap) + private val producerConfig: ProducerConfig, + private val kafkaServer: KafkaServer) extends TopicEventHandler[String] { - def startup() = { - var threadList = List[Thread]() - for ((topic, streamList) <- topicMessageStreams) - for (i <- 0 until streamList.length) - threadList ::= Utils.newThread("kafka-embedded-consumer-" + topic + "-" + i, new Runnable() { - def run() { - logger.info("starting consumer thread " + i + " for topic " + topic) - val logManager = kafkaServer.getLogManager - val stats = kafkaServer.getStats - try { - for (message <- streamList(i)) { - val partition = logManager.chooseRandomPartition(topic) - val start = SystemTime.nanoseconds - logManager.getOrCreateLog(topic, partition).append( - new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, - messages = message)) - stats.recordRequest(RequestKeys.Produce, SystemTime.nanoseconds - start) + private val logger = Logger.getLogger(getClass) + + private val blackListTopics = + consumerConfig.mirrorTopicsBlackList.split(",").toList.map(_.trim) + + // mirrorTopics should be accessed by handleTopicEvent only + private var mirrorTopics:Seq[String] = List() + + private var consumerConnector: ConsumerConnector = null + private var topicEventWatcher:ZookeeperTopicEventWatcher = null + + private val producer = new Producer[Null, Message](producerConfig) + + + private def isTopicAllowed(topic: String) = { + if (consumerConfig.mirrorTopicsWhitelistMap.nonEmpty) + consumerConfig.mirrorTopicsWhitelistMap.contains(topic) + else + !blackListTopics.contains(topic) + } + + // TopicEventHandler call-back only + @Override + def handleTopicEvent(allTopics: Seq[String]) { + val newMirrorTopics = allTopics.filter(isTopicAllowed) + + val addedTopics = newMirrorTopics filterNot (mirrorTopics contains) + if (addedTopics.nonEmpty) + logger.info("topic event: added topics = %s".format(addedTopics)) + + val deletedTopics = mirrorTopics filterNot (newMirrorTopics contains) + if (deletedTopics.nonEmpty) + logger.info("topic event: deleted topics = %s".format(deletedTopics)) + + mirrorTopics = newMirrorTopics + + if (addedTopics.nonEmpty || deletedTopics.nonEmpty) { + logger.info("mirror topics = %s".format(mirrorTopics)) + startNewConsumerThreads(makeTopicMap(mirrorTopics)) + } + } + + private def makeTopicMap(mirrorTopics: Seq[String]) = { + if (mirrorTopics.nonEmpty) + Utils.getConsumerTopicMap(mirrorTopics.mkString("", ":1,", ":1")) + else + Utils.getConsumerTopicMap("") + } + + private def startNewConsumerThreads(topicMap: Map[String, Int]) { + if (topicMap.nonEmpty) { + if (consumerConnector != null) + consumerConnector.shutdown() + consumerConnector = Consumer.create(consumerConfig) + val topicMessageStreams = consumerConnector.createMessageStreams(topicMap) + var threadList = List[Thread]() + for ((topic, streamList) <- topicMessageStreams) + for (i <- 0 until streamList.length) + threadList ::= Utils.newThread("kafka-embedded-consumer-%s-%d".format(topic, i), new Runnable() { + def run() { + logger.info("Starting consumer thread %d for topic %s".format(i, topic)) + + try { + for (message <- streamList(i)) { + val pd = new ProducerData[Null, Message](topic, message) + producer.send(pd) + } + } + catch { + case e => + logger.fatal(e + Utils.stackTrace(e)) + logger.fatal(topic + " stream " + i + " unexpectedly exited") } } - catch { - case e => - logger.fatal(e + Utils.stackTrace(e)) - logger.fatal(topic + " stream " + i + " unexpectedly exited") - } - } - }, false) + }, false) - for (thread <- threadList) - thread.start + for (thread <- threadList) + thread.start() + } + else + logger.info("Not starting consumer threads (mirror topic list is empty)") } - def shutdown() = { - consumerConnector.shutdown + def startup() { + topicEventWatcher = new ZookeeperTopicEventWatcher(consumerConfig, this) + /* + * consumer threads are (re-)started upon topic events (which includes an + * initial startup event which lists the current topics) + */ + } + + def shutdown() { + producer.close() + if (consumerConnector != null) + consumerConnector.shutdown() + if (topicEventWatcher != null) + topicEventWatcher.shutdown() } } + diff --git a/project/build/KafkaProject.scala b/project/build/KafkaProject.scala index a64bc6854e3..1db33ca52e8 100644 --- a/project/build/KafkaProject.scala +++ b/project/build/KafkaProject.scala @@ -103,6 +103,8 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje override def javaCompileOptions = super.javaCompileOptions ++ List(JavaCompileOption("-source"), JavaCompileOption("1.5")) + + override def packageAction = super.packageAction dependsOn (testCompileAction) } class KafkaExamplesProject(info: ProjectInfo) extends DefaultProject(info) diff --git a/system_test/embedded_consumer/README b/system_test/embedded_consumer/README index 7b2cbb83859..8cd6b88f8c7 100644 --- a/system_test/embedded_consumer/README +++ b/system_test/embedded_consumer/README @@ -1,8 +1,27 @@ -This test replicates messages from 3 kafka brokers to 2 other kafka brokers using the embedded consumer. -At the end, the messages produced at the source brokers should match that at the target brokers. +This test replicates messages from 3 kafka brokers to 2 other kafka brokers +using the embedded consumer. At the end, the messages produced at the source +brokers should match that at the target brokers. To run this test, do bin/run-test.sh -The expected output is given in bin/expected.out. There is only 1 thing that's important. +The expected output is given in bin/expected.out. There is only 1 thing that's +important. 1. The output should have a line "test passed". + +In the event of failure, by default the brokers and zookeepers remain running +to make it easier to debug the issue - hit Ctrl-C to shut them down. You can +change this behavior by setting the action_on_fail flag in the script to "exit" +or "proceed", in which case a snapshot of all the logs and directories is +placed in the test's base directory. + +If you are making any changes that may affect the embedded consumer, it is a +good idea to run the test in a loop. E.g.: + +:>/tmp/embeddedconsumer_test.log +for i in {1..10}; do echo "run $i"; ./bin/run-test.sh 2>1 >> /tmp/embeddedconsumer_test.log; done +tail -F /tmp/embeddedconsumer_test.log + +grep -ic passed /tmp/embeddedconsumer_test.log +grep -ic failed /tmp/embeddedconsumer_test.log + diff --git a/system_test/embedded_consumer/bin/run-test.sh b/system_test/embedded_consumer/bin/run-test.sh index 492db2cd19c..6c6c97bf519 100755 --- a/system_test/embedded_consumer/bin/run-test.sh +++ b/system_test/embedded_consumer/bin/run-test.sh @@ -1,79 +1,314 @@ #!/bin/bash -num_messages=400000 -message_size=400 +readonly num_messages=400000 +readonly message_size=400 +readonly action_on_fail="proceed" -base_dir=$(dirname $0)/.. +readonly test_start_time="$(date +%s)" -rm -rf /tmp/zookeeper_source -rm -rf /tmp/zookeeper_target -rm -rf /tmp/kafka-source1-logs -mkdir /tmp/kafka-source1-logs -mkdir /tmp/kafka-source1-logs/test01-0 -touch /tmp/kafka-source1-logs/test01-0/00000000000000000000.kafka -rm -rf /tmp/kafka-source2-logs -mkdir /tmp/kafka-source2-logs -mkdir /tmp/kafka-source2-logs/test01-0 -touch /tmp/kafka-source2-logs/test01-0/00000000000000000000.kafka -rm -rf /tmp/kafka-source3-logs -mkdir /tmp/kafka-source3-logs -mkdir /tmp/kafka-source3-logs/test01-0 -touch /tmp/kafka-source3-logs/test01-0/00000000000000000000.kafka -rm -rf /tmp/kafka-target1-logs -rm -rf /tmp/kafka-target2-logs +readonly base_dir=$(dirname $0)/.. -echo "start the servers ..." -$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source.properties 2>&1 > $base_dir/zookeeper_source.log & -$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_target.properties 2>&1 > $base_dir/zookeeper_target.log & -$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source1.properties 2>&1 > $base_dir/kafka_source1.log & -$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source2.properties 2>&1 > $base_dir/kafka_source2.log & -$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source3.properties 2>&1 > $base_dir/kafka_source3.log & -$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target1.properties $base_dir/config/consumer.properties 2>&1 > $base_dir/kafka_target1.log & -$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target2.properties $base_dir/config/consumer.properties 2>&1 > $base_dir/kafka_target2.log & +info() { + echo -e "$(date +"%Y-%m-%d %H:%M:%S") $*" +} -sleep 4 -echo "start producing messages ..." -$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo zk.connect=localhost:2181 --topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval 400000 num_messages --async --delay-btw-batch-ms 10 & +kill_child_processes() { + isTopmost=$1 + curPid=$2 + childPids=$(ps a -o pid= -o ppid= | grep "${curPid}$" | awk '{print $1;}') + for childPid in $childPids + do + kill_child_processes 0 $childPid + done + if [ $isTopmost -eq 0 ]; then + kill -15 $curPid 2> /dev/null + fi +} -echo "wait for consumer to finish consuming ..." -cur1_offset="-1" -cur2_offset="-1" -quit1=0 -quit2=0 -while [ $quit1 -eq 0 ] && [ $quit2 -eq 0 ] -do - sleep 2 - target1_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1` - if [ $target1_size -eq $cur1_offset ] - then - quit1=1 - fi - cur1_offset=$target1_size - target2_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1` - if [ $target2_size -eq $cur2_offset ] - then - quit2=1 - fi - cur2_offset=$target2_size -done +cleanup() { + info "cleaning up" + + pid_zk_source= + pid_zk_target= + pid_kafka_source1= + pid_kafka_source2= + pid_kafka_source3= + pid_kafka_target1= + pid_kafka_target2= + pid_producer= + + rm -rf /tmp/zookeeper_source + rm -rf /tmp/zookeeper_target + + rm -rf /tmp/kafka-source{1..3}-logs + # mkdir -p /tmp/kafka-source{1..3}-logs/test0{1..3}-0 + # touch /tmp/kafka-source{1..3}-logs/test0{1..3}-0/00000000000000000000.kafka + + rm -rf /tmp/kafka-target{1..2}-logs +} + +begin_timer() { + t_begin=$(date +%s) +} + +end_timer() { + t_end=$(date +%s) +} + +start_zk() { + info "starting zookeepers" + $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source.properties 2>&1 > $base_dir/zookeeper_source.log & + pid_zk_source=$! + $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_target.properties 2>&1 > $base_dir/zookeeper_target.log & + pid_zk_target=$! +} + +start_source_servers() { + info "starting source cluster" + $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source1.properties 2>&1 > $base_dir/kafka_source1.log & + pid_kafka_source1=$! + $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source2.properties 2>&1 > $base_dir/kafka_source2.log & + pid_kafka_source2=$! + $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source3.properties 2>&1 > $base_dir/kafka_source3.log & + pid_kafka_source3=$! +} + +start_target_servers_for_whitelist_test() { + echo "starting mirror cluster" + $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target1.properties $base_dir/config/whitelisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target1.log & + pid_kafka_target1=$! + $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target2.properties $base_dir/config/whitelisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target2.log & + pid_kafka_target2=$! +} + +start_target_servers_for_blacklist_test() { + echo "starting mirror cluster" + $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target1.properties $base_dir/config/blacklisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target1.log & + pid_kafka_target1=$! + $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target2.properties $base_dir/config/blacklisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target2.log & + pid_kafka_target2=$! +} + +shutdown_servers() { + info "stopping producer" + if [ "x${pid_producer}" != "x" ]; then kill_child_processes 0 ${pid_producer}; fi + + info "shutting down target servers" + if [ "x${pid_kafka_target1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target1}; fi + if [ "x${pid_kafka_target2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target2}; fi + sleep 2 + + info "shutting down source servers" + if [ "x${pid_kafka_source1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source1}; fi + if [ "x${pid_kafka_source2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source2}; fi + if [ "x${pid_kafka_source3}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source3}; fi + + info "shutting down zookeeper servers" + if [ "x${pid_zk_target}" != "x" ]; then kill_child_processes 0 ${pid_zk_target}; fi + if [ "x${pid_zk_source}" != "x" ]; then kill_child_processes 0 ${pid_zk_source}; fi +} + +start_producer() { + topic=$1 + info "start producing messages for topic $topic ..." + $base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo zk.connect=localhost:2181 --topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval $num_messages --async --delay-btw-batch-ms 10 2>&1 > $base_dir/producer_performance.log & + pid_producer=$! +} + +# In case the consumer does not consume, the test may exit prematurely (i.e., +# shut down the kafka brokers, and ProducerPerformance will start throwing ugly +# exceptions. So, wait for the producer to finish before shutting down. If it +# takes too long, the user can just hit Ctrl-c which is trapped to kill child +# processes. +# Usage: wait_partition_done ([kafka-server] [topic] [partition-id])+ +wait_partition_done() { + n_tuples=$(($# / 3)) + + i=1 + while (($#)); do + kafka_server[i]=$1 + topic[i]=$2 + partitionid[i]=$3 + prev_offset[i]=0 + info "\twaiting for partition on server ${kafka_server[i]}, topic ${topic[i]}, partition ${partitionid[i]}" + i=$((i+1)) + shift 3 + done + + all_done=0 + + # set -x + while [[ $all_done != 1 ]]; do + sleep 4 + i=$n_tuples + all_done=1 + for ((i=1; i <= $n_tuples; i++)); do + cur_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server ${kafka_server[i]} --topic ${topic[i]} --partition ${partitionid[i]} --time -1 --offsets 1 | tail -1) + if [ "x$cur_size" != "x${prev_offset[i]}" ]; then + all_done=0 + prev_offset[i]=$cur_size + fi + done + done + +} + +cmp_logs() { + topic=$1 + info "comparing source and target logs for topic $topic" + source_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1) + source_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9091 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1) + source_part2_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9090 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1) + target_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1) + target_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9094 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1) + if [ "x$target_part0_size" == "x" ]; then target_part0_size=0; fi + if [ "x$target_part1_size" == "x" ]; then target_part1_size=0; fi + expected_size=$(($source_part0_size + $source_part1_size + $source_part2_size)) + actual_size=$(($target_part0_size + $target_part1_size)) + if [ "x$expected_size" != "x$actual_size" ] + then + info "source size: $expected_size target size: $actual_size" + return 1 + else + return 0 + fi +} + +take_fail_snapshot() { + snapshot_dir="$base_dir/failed-${snapshot_prefix}-${test_start_time}" + mkdir $snapshot_dir + for dir in /tmp/zookeeper_source /tmp/zookeeper_target /tmp/kafka-source{1..3}-logs /tmp/kafka-target{1..2}-logs; do + if [ -d $dir ]; then + cp -r $dir $snapshot_dir + fi + done +} + +# Usage: process_test_result +# result: last test result +# action_on_fail: (exit|wait|proceed) +# ("wait" is useful if you want to troubleshoot using zookeeper) +process_test_result() { + result=$1 + if [ $1 -eq 0 ]; then + info "test passed" + else + info "test failed" + case "$2" in + "wait") info "waiting: hit Ctrl-c to quit" + wait + ;; + "exit") shutdown_servers + take_fail_snapshot + exit $result + ;; + *) shutdown_servers + take_fail_snapshot + info "proceeding" + ;; + esac + fi +} + +test_whitelists() { + info "### Testing whitelists" + snapshot_prefix="whitelist-test" + + cleanup + start_zk + start_source_servers + start_target_servers_for_whitelist_test + sleep 4 + + begin_timer + + start_producer test01 + info "waiting for producer to finish producing ..." + wait_partition_done kafka://localhost:9090 test01 0 kafka://localhost:9091 test01 0 kafka://localhost:9092 test01 0 + + info "waiting for consumer to finish consuming ..." + wait_partition_done kafka://localhost:9093 test01 0 kafka://localhost:9094 test01 0 + + end_timer + info "embedded consumer took $((t_end - t_begin)) seconds" + + sleep 2 + + cmp_logs test01 + result=$? + + return $result +} + +test_blacklists() { + info "### Testing blacklists" + snapshot_prefix="blacklist-test" + cleanup + start_zk + start_source_servers + start_target_servers_for_blacklist_test + sleep 4 + + start_producer test02 + info "waiting for producer to finish producing test02 ..." + wait_partition_done kafka://localhost:9090 test02 0 kafka://localhost:9091 test02 0 kafka://localhost:9092 test02 0 + + # start_producer test03 + # info "waiting for producer to finish producing test03 ..." + # wait_partition_done kafka://localhost:9090 test03 0 kafka://localhost:9091 test03 0 kafka://localhost:9092 test03 0 + + begin_timer + + start_producer test01 + info "waiting for producer to finish producing ..." + wait_partition_done kafka://localhost:9090 test01 0 kafka://localhost:9091 test01 0 kafka://localhost:9092 test01 0 + + info "waiting for consumer to finish consuming ..." + wait_partition_done kafka://localhost:9093 test01 0 kafka://localhost:9094 test01 0 + + end_timer + + info "embedded consumer took $((t_end - t_begin)) seconds" + + sleep 2 + + cmp_logs test02 + result1=$? + # cmp_logs test03 + # result2=$? + # if [[ "x$result1" == "x0" || "x$result2" == "x0" ]]; then + if [[ "x$result1" == "x0" ]]; then + result=1 + else + cmp_logs test01 + result=$? + fi + + return $result +} + +# main test begins + +echo "Test-$test_start_time" + +# Ctrl-c trap. Catches INT signal +trap "shutdown_servers; exit 0" INT + +test_whitelists +result=$? + +process_test_result $result $action_on_fail + +shutdown_servers sleep 2 -source_part0_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1` -source_part1_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9091 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1` -source_part2_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9090 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1` -target_part0_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1` -target_part1_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9094 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1` -expected_size=`expr $source_part0_size + $source_part1_size + $source_part2_size` -actual_size=`expr $target_part0_size + $target_part1_size` -if [ $expected_size != $actual_size ] -then - echo "source size: $expected_size target size: $actual_size test failed!!! look at it!!!" -else - echo "test passed" -fi +test_blacklists +result=$? + +process_test_result $result $action_on_fail + +shutdown_servers + +exit $result -echo "stopping the servers" -ps ax | grep -i 'kafka.kafka' | grep -v grep | awk '{print $1}' | xargs kill -15 2>&1 > /dev/null -sleep 2 -ps ax | grep -i 'QuorumPeerMain' | grep -v grep | awk '{print $1}' | xargs kill -15 2>&1 > /dev/null diff --git a/system_test/embedded_consumer/config/blacklisttest.consumer.properties b/system_test/embedded_consumer/config/blacklisttest.consumer.properties new file mode 100644 index 00000000000..f6f6153c58f --- /dev/null +++ b/system_test/embedded_consumer/config/blacklisttest.consumer.properties @@ -0,0 +1,15 @@ +# see kafka.consumer.ConsumerConfig for more details + +# zk connection string +# comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" +zk.connect=localhost:2181 + +# timeout in ms for connecting to zookeeper +zk.connectiontimeout.ms=1000000 + +#consumer group id +groupid=group1 + +mirror.topics.blacklist=test02,test03 + diff --git a/system_test/embedded_consumer/config/consumer.properties b/system_test/embedded_consumer/config/consumer.properties index da3aa554e71..e69de29bb2d 100644 --- a/system_test/embedded_consumer/config/consumer.properties +++ b/system_test/embedded_consumer/config/consumer.properties @@ -1,14 +0,0 @@ -# see kafka.consumer.ConsumerConfig for more details - -# zk connection string -# comma separated host:port pairs, each corresponding to a zk -# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -zk.connect=localhost:2181 - -# timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 - -#consumer group id -groupid=group1 - -embeddedconsumer.topics=test01:1 diff --git a/system_test/embedded_consumer/config/mirror_producer.properties b/system_test/embedded_consumer/config/mirror_producer.properties new file mode 100644 index 00000000000..ee99bcb5a98 --- /dev/null +++ b/system_test/embedded_consumer/config/mirror_producer.properties @@ -0,0 +1,13 @@ +# zk connection string +# comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" +zk.connect=localhost:2182 + +# timeout in ms for connecting to zookeeper +zk.connectiontimeout.ms=1000000 + +producer.type=async + +# to avoid dropping events if the queue is full, wait indefinitely +queue.enqueueTimeout.ms=-1 + diff --git a/system_test/embedded_consumer/config/server_source1.properties b/system_test/embedded_consumer/config/server_source1.properties index 6ebcafaa5e3..418fdfe9c8f 100644 --- a/system_test/embedded_consumer/config/server_source1.properties +++ b/system_test/embedded_consumer/config/server_source1.properties @@ -60,5 +60,3 @@ log.default.flush.interval.ms=1000 # time based topic flasher time rate in ms log.default.flush.scheduler.interval.ms=1000 -# topic partition count map -# topic.partition.count.map=topic1:3, topic2:4 diff --git a/system_test/embedded_consumer/config/server_source2.properties b/system_test/embedded_consumer/config/server_source2.properties index 3fadcae4e35..b2c59290da0 100644 --- a/system_test/embedded_consumer/config/server_source2.properties +++ b/system_test/embedded_consumer/config/server_source2.properties @@ -60,5 +60,3 @@ log.default.flush.interval.ms=1000 # time based topic flasher time rate in ms log.default.flush.scheduler.interval.ms=1000 -# topic partition count map -# topic.partition.count.map=topic1:3, topic2:4 diff --git a/system_test/embedded_consumer/config/server_source3.properties b/system_test/embedded_consumer/config/server_source3.properties index 7ff75a63b6b..dc0b00ebbc1 100644 --- a/system_test/embedded_consumer/config/server_source3.properties +++ b/system_test/embedded_consumer/config/server_source3.properties @@ -60,5 +60,3 @@ log.default.flush.interval.ms=1000 # time based topic flasher time rate in ms log.default.flush.scheduler.interval.ms=1000 -# topic partition count map -# topic.partition.count.map=topic1:3, topic2:4 diff --git a/system_test/embedded_consumer/config/whitelisttest.consumer.properties b/system_test/embedded_consumer/config/whitelisttest.consumer.properties new file mode 100644 index 00000000000..0c34a517654 --- /dev/null +++ b/system_test/embedded_consumer/config/whitelisttest.consumer.properties @@ -0,0 +1,15 @@ +# see kafka.consumer.ConsumerConfig for more details + +# zk connection string +# comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" +zk.connect=localhost:2181 + +# timeout in ms for connecting to zookeeper +zk.connectiontimeout.ms=1000000 + +#consumer group id +groupid=group1 + +mirror.topics.whitelist=test01:1 +