auto-discovery of topics for mirroring; patched by Joel; reviewed by Jun; KAFKA-74

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1156393 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jun Rao 2011-08-10 22:32:23 +00:00
parent d0c980dab2
commit 49f3b40179
20 changed files with 712 additions and 151 deletions

View File

@ -19,6 +19,7 @@ package kafka
import consumer.ConsumerConfig import consumer.ConsumerConfig
import org.apache.log4j.Logger import org.apache.log4j.Logger
import producer.ProducerConfig
import server.{KafkaConfig, KafkaServerStartable, KafkaServer} import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
import utils.Utils import utils.Utils
import org.apache.log4j.jmx.LoggerDynamicMBean import org.apache.log4j.jmx.LoggerDynamicMBean
@ -30,21 +31,23 @@ object Kafka {
val kafkaLog4jMBeanName = "kafka:type=kafka.KafkaLog4j" val kafkaLog4jMBeanName = "kafka:type=kafka.KafkaLog4j"
Utils.swallow(logger.warn, Utils.registerMBean(new LoggerDynamicMBean(Logger.getRootLogger()), kafkaLog4jMBeanName)) Utils.swallow(logger.warn, Utils.registerMBean(new LoggerDynamicMBean(Logger.getRootLogger()), kafkaLog4jMBeanName))
if(args.length != 1 && args.length != 2) { if (!List(1, 3).contains(args.length)) {
println("USAGE: java [options] " + classOf[KafkaServer].getSimpleName() + " server.properties [consumer.properties") println("USAGE: java [options] %s server.properties [consumer.properties producer.properties]".format(classOf[KafkaServer].getSimpleName()))
System.exit(1) System.exit(1)
} }
try { try {
var kafkaServerStartble: KafkaServerStartable = null
val props = Utils.loadProps(args(0)) val props = Utils.loadProps(args(0))
val serverConfig = new KafkaConfig(props) val serverConfig = new KafkaConfig(props)
if (args.length == 2) {
val kafkaServerStartble = args.length match {
case 3 =>
val consumerConfig = new ConsumerConfig(Utils.loadProps(args(1))) val consumerConfig = new ConsumerConfig(Utils.loadProps(args(1)))
kafkaServerStartble = new KafkaServerStartable(serverConfig, consumerConfig) val producerConfig = new ProducerConfig(Utils.loadProps(args(2)))
new KafkaServerStartable(serverConfig, consumerConfig, producerConfig)
case 1 =>
new KafkaServerStartable(serverConfig)
} }
else
kafkaServerStartble = new KafkaServerStartable(serverConfig)
// attach shutdown handler to catch control-c // attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread() { Runtime.getRuntime().addShutdownHook(new Thread() {

View File

@ -20,7 +20,7 @@ package kafka.consumer
import java.util.Properties import java.util.Properties
import kafka.utils.{ZKConfig, Utils} import kafka.utils.{ZKConfig, Utils}
import kafka.api.OffsetRequest import kafka.api.OffsetRequest
import kafka.common.InvalidConfigException
object ConsumerConfig { object ConsumerConfig {
val SocketTimeout = 30 * 1000 val SocketTimeout = 30 * 1000
val SocketBufferSize = 64*1024 val SocketBufferSize = 64*1024
@ -32,7 +32,11 @@ object ConsumerConfig {
val MaxQueuedChunks = 100 val MaxQueuedChunks = 100
val AutoOffsetReset = OffsetRequest.SmallestTimeString val AutoOffsetReset = OffsetRequest.SmallestTimeString
val ConsumerTimeoutMs = -1 val ConsumerTimeoutMs = -1
val EmbeddedConsumerTopics = "" val MirrorTopicsWhitelist = ""
val MirrorTopicsBlacklist = ""
val MirrorTopicsWhitelistProp = "mirror.topics.whitelist"
val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"
} }
class ConsumerConfig(props: Properties) extends ZKConfig(props) { class ConsumerConfig(props: Properties) extends ZKConfig(props) {
@ -80,7 +84,18 @@ class ConsumerConfig(props: Properties) extends ZKConfig(props) {
/** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */ /** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
val consumerTimeoutMs = Utils.getInt(props, "consumer.timeout.ms", ConsumerTimeoutMs) val consumerTimeoutMs = Utils.getInt(props, "consumer.timeout.ms", ConsumerTimeoutMs)
/* embed a consumer in the broker. e.g., topic1:1,topic2:1 */ /** Whitelist of topics for this mirror's embedded consumer to consume. At
val embeddedConsumerTopicMap = Utils.getConsumerTopicMap(Utils.getString(props, "embeddedconsumer.topics", * most one of whitelist/blacklist may be specified.
EmbeddedConsumerTopics)) * e.g., topic1:1,topic2:1 */
val mirrorTopicsWhitelistMap = Utils.getConsumerTopicMap(Utils.getString(
props, MirrorTopicsWhitelistProp, MirrorTopicsWhitelist))
/** Topics to skip mirroring. At most one of whitelist/blacklist may be
* specified */
val mirrorTopicsBlackList = Utils.getString(
props, MirrorTopicsBlacklistProp, MirrorTopicsBlacklist)
if (mirrorTopicsWhitelistMap.nonEmpty && mirrorTopicsBlackList.nonEmpty)
throw new InvalidConfigException("The embedded consumer's mirror topics configuration can only contain one of blacklist or whitelist")
} }

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.consumer
trait TopicEventHandler[T] {
def handleTopicEvent(allTopics: Seq[T])
}

View File

@ -28,6 +28,7 @@ import java.net.InetAddress
import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
import org.apache.zookeeper.Watcher.Event.KeeperState import org.apache.zookeeper.Watcher.Event.KeeperState
import kafka.api.OffsetRequest import kafka.api.OffsetRequest
import java.util.UUID
/** /**
* This class handles the consumers interaction with zookeeper * This class handles the consumers interaction with zookeeper
@ -156,7 +157,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
case Some(consumerId) // for testing only case Some(consumerId) // for testing only
=> consumerUuid = consumerId => consumerUuid = consumerId
case None // generate unique consumerId automatically case None // generate unique consumerId automatically
=> consumerUuid = InetAddress.getLocalHost.getHostName + "-" + System.currentTimeMillis => val uuid = UUID.randomUUID()
consumerUuid = "%s-%d-%s".format(
InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
uuid.getMostSignificantBits().toHexString.substring(0,8))
} }
val consumerIdString = config.groupId + "_" + consumerUuid val consumerIdString = config.groupId + "_" + consumerUuid
val topicCount = new TopicCount(consumerIdString, topicCountMap) val topicCount = new TopicCount(consumerIdString, topicCountMap)
@ -164,6 +168,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
// listener to consumer and partition changes // listener to consumer and partition changes
val loadBalancerListener = new ZKRebalancerListener(config.groupId, consumerIdString) val loadBalancerListener = new ZKRebalancerListener(config.groupId, consumerIdString)
registerConsumerInZK(dirs, consumerIdString, topicCount) registerConsumerInZK(dirs, consumerIdString, topicCount)
// register listener for session expired event
zkClient.subscribeStateChanges(
new ZKSessionExpireListenner(dirs, consumerIdString, topicCount, loadBalancerListener))
zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener) zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
// create a queue per topic per consumer thread // create a queue per topic per consumer thread
@ -184,10 +193,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
zkClient.subscribeChildChanges(partitionPath, loadBalancerListener) zkClient.subscribeChildChanges(partitionPath, loadBalancerListener)
} }
// register listener for session expired event
zkClient.subscribeStateChanges(
new ZKSessionExpireListenner(dirs, consumerIdString, topicCount, loadBalancerListener))
// explicitly trigger load balancing for this consumer // explicitly trigger load balancing for this consumer
loadBalancerListener.syncedRebalance() loadBalancerListener.syncedRebalance()
ret ret

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.consumer
import org.apache.log4j.Logger
import scala.collection.JavaConversions._
import kafka.utils.{Utils, ZkUtils, ZKStringSerializer}
import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
import org.apache.zookeeper.Watcher.Event.KeeperState
class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
val eventHandler: TopicEventHandler[String]) {
private val logger = Logger.getLogger(getClass)
val lock = new Object()
private var zkClient: ZkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs,
config.zkConnectionTimeoutMs, ZKStringSerializer)
startWatchingTopicEvents()
private def startWatchingTopicEvents() {
val topicEventListener = new ZkTopicEventListener
ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.BrokerTopicsPath)
zkClient.subscribeStateChanges(
new ZkSessionExpireListener(topicEventListener))
val topics = zkClient.subscribeChildChanges(
ZkUtils.BrokerTopicsPath, topicEventListener).toList
// call to bootstrap topic list
topicEventListener.handleChildChange(ZkUtils.BrokerTopicsPath, topics)
}
private def stopWatchingTopicEvents() { zkClient.unsubscribeAll() }
def shutdown() {
lock.synchronized {
try {
if (zkClient != null) {
stopWatchingTopicEvents()
zkClient.close()
zkClient = null
}
else
logger.warn("Cannot shutdown already shutdown topic event watcher.")
}
catch {
case e =>
logger.fatal(e)
logger.fatal(Utils.stackTrace(e))
}
}
}
class ZkTopicEventListener() extends IZkChildListener {
@throws(classOf[Exception])
def handleChildChange(parent: String, children: java.util.List[String]) {
lock.synchronized {
try {
if (zkClient != null) {
val latestTopics = zkClient.getChildren(ZkUtils.BrokerTopicsPath).toList
logger.debug("all topics: %s".format(latestTopics))
eventHandler.handleTopicEvent(latestTopics)
}
}
catch {
case e =>
logger.fatal(e)
logger.fatal(Utils.stackTrace(e))
}
}
}
}
class ZkSessionExpireListener(val topicEventListener: ZkTopicEventListener)
extends IZkStateListener {
@throws(classOf[Exception])
def handleStateChanged(state: KeeperState) { }
@throws(classOf[Exception])
def handleNewSession() {
lock.synchronized {
if (zkClient != null) {
logger.info(
"ZK expired: resubscribing topic event listener to topic registry")
zkClient.subscribeChildChanges(
ZkUtils.BrokerTopicsPath, topicEventListener)
}
}
}
}
}

View File

@ -17,7 +17,7 @@
package kafka.producer.async package kafka.producer.async
import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
import kafka.utils.Utils import kafka.utils.Utils
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import org.apache.log4j.{Level, Logger} import org.apache.log4j.{Level, Logger}
@ -90,7 +90,28 @@ private[kafka] class AsyncProducer[T](config: AsyncProducerConfig,
if(cbkHandler != null) if(cbkHandler != null)
data = cbkHandler.beforeEnqueue(data) data = cbkHandler.beforeEnqueue(data)
val added = queue.offer(data) val added = if (config.enqueueTimeoutMs != 0) {
try {
if (config.enqueueTimeoutMs < 0) {
queue.put(data)
true
}
else {
queue.offer(data, config.enqueueTimeoutMs, TimeUnit.MILLISECONDS)
}
}
catch {
case e: InterruptedException =>
val msg = "%s interrupted during enqueue of event %s.".format(
getClass.getSimpleName, event.toString)
logger.error(msg)
throw new AsyncProducerInterruptedException(msg)
}
}
else {
queue.offer(data)
}
if(cbkHandler != null) if(cbkHandler != null)
cbkHandler.afterEnqueue(data, added) cbkHandler.afterEnqueue(data, added)

View File

@ -33,6 +33,14 @@ trait AsyncProducerConfigShared {
/** the maximum size of the blocking queue for buffering on the producer */ /** the maximum size of the blocking queue for buffering on the producer */
val queueSize = Utils.getInt(props, "queue.size", 10000) val queueSize = Utils.getInt(props, "queue.size", 10000)
/**
* Timeout for event enqueue:
* 0: events will be enqueued immediately or dropped if the queue is full
* -ve: enqueue will block indefinitely if the queue is full
* +ve: enqueue will block up to this many milliseconds if the queue is full
*/
val enqueueTimeoutMs = Utils.getInt(props, "queue.enqueueTimeout.ms", 0)
/** the number of messages batched at the producer */ /** the number of messages batched at the producer */
val batchSize = Utils.getInt(props, "batch.size", 200) val batchSize = Utils.getInt(props, "batch.size", 200)

View File

@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.producer.async
class AsyncProducerInterruptedException(message: String) extends RuntimeException(message) {
def this() = this(null)
}

View File

@ -17,24 +17,29 @@
package kafka.server package kafka.server
import kafka.utils.Utils
import kafka.consumer._
import kafka.producer.{ProducerData, ProducerConfig, Producer}
import kafka.message.Message
import org.apache.log4j.Logger import org.apache.log4j.Logger
import kafka.consumer.{Consumer, ConsumerConnector, ConsumerConfig}
import kafka.utils.{SystemTime, Utils}
import kafka.api.RequestKeys
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet}
class KafkaServerStartable(val serverConfig: KafkaConfig, val consumerConfig: ConsumerConfig) { import scala.collection.Map
class KafkaServerStartable(val serverConfig: KafkaConfig,
val consumerConfig: ConsumerConfig,
val producerConfig: ProducerConfig) {
private var server : KafkaServer = null private var server : KafkaServer = null
private var embeddedConsumer : EmbeddedConsumer = null private var embeddedConsumer : EmbeddedConsumer = null
init init
def this(serverConfig: KafkaConfig) = this(serverConfig, null) def this(serverConfig: KafkaConfig) = this(serverConfig, null, null)
private def init() { private def init() {
server = new KafkaServer(serverConfig) server = new KafkaServer(serverConfig)
if (consumerConfig != null) if (consumerConfig != null)
embeddedConsumer = new EmbeddedConsumer(consumerConfig, server) embeddedConsumer =
new EmbeddedConsumer(consumerConfig, producerConfig, server)
} }
def startup() { def startup() {
@ -55,28 +60,75 @@ class KafkaServerStartable(val serverConfig: KafkaConfig, val consumerConfig: Co
} }
class EmbeddedConsumer(private val consumerConfig: ConsumerConfig, class EmbeddedConsumer(private val consumerConfig: ConsumerConfig,
private val kafkaServer: KafkaServer) { private val producerConfig: ProducerConfig,
private val logger = Logger.getLogger(getClass()) private val kafkaServer: KafkaServer) extends TopicEventHandler[String] {
private val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
private val topicMessageStreams = consumerConnector.createMessageStreams(consumerConfig.embeddedConsumerTopicMap)
def startup() = { private val logger = Logger.getLogger(getClass)
private val blackListTopics =
consumerConfig.mirrorTopicsBlackList.split(",").toList.map(_.trim)
// mirrorTopics should be accessed by handleTopicEvent only
private var mirrorTopics:Seq[String] = List()
private var consumerConnector: ConsumerConnector = null
private var topicEventWatcher:ZookeeperTopicEventWatcher = null
private val producer = new Producer[Null, Message](producerConfig)
private def isTopicAllowed(topic: String) = {
if (consumerConfig.mirrorTopicsWhitelistMap.nonEmpty)
consumerConfig.mirrorTopicsWhitelistMap.contains(topic)
else
!blackListTopics.contains(topic)
}
// TopicEventHandler call-back only
@Override
def handleTopicEvent(allTopics: Seq[String]) {
val newMirrorTopics = allTopics.filter(isTopicAllowed)
val addedTopics = newMirrorTopics filterNot (mirrorTopics contains)
if (addedTopics.nonEmpty)
logger.info("topic event: added topics = %s".format(addedTopics))
val deletedTopics = mirrorTopics filterNot (newMirrorTopics contains)
if (deletedTopics.nonEmpty)
logger.info("topic event: deleted topics = %s".format(deletedTopics))
mirrorTopics = newMirrorTopics
if (addedTopics.nonEmpty || deletedTopics.nonEmpty) {
logger.info("mirror topics = %s".format(mirrorTopics))
startNewConsumerThreads(makeTopicMap(mirrorTopics))
}
}
private def makeTopicMap(mirrorTopics: Seq[String]) = {
if (mirrorTopics.nonEmpty)
Utils.getConsumerTopicMap(mirrorTopics.mkString("", ":1,", ":1"))
else
Utils.getConsumerTopicMap("")
}
private def startNewConsumerThreads(topicMap: Map[String, Int]) {
if (topicMap.nonEmpty) {
if (consumerConnector != null)
consumerConnector.shutdown()
consumerConnector = Consumer.create(consumerConfig)
val topicMessageStreams = consumerConnector.createMessageStreams(topicMap)
var threadList = List[Thread]() var threadList = List[Thread]()
for ((topic, streamList) <- topicMessageStreams) for ((topic, streamList) <- topicMessageStreams)
for (i <- 0 until streamList.length) for (i <- 0 until streamList.length)
threadList ::= Utils.newThread("kafka-embedded-consumer-" + topic + "-" + i, new Runnable() { threadList ::= Utils.newThread("kafka-embedded-consumer-%s-%d".format(topic, i), new Runnable() {
def run() { def run() {
logger.info("starting consumer thread " + i + " for topic " + topic) logger.info("Starting consumer thread %d for topic %s".format(i, topic))
val logManager = kafkaServer.getLogManager
val stats = kafkaServer.getStats
try { try {
for (message <- streamList(i)) { for (message <- streamList(i)) {
val partition = logManager.chooseRandomPartition(topic) val pd = new ProducerData[Null, Message](topic, message)
val start = SystemTime.nanoseconds producer.send(pd)
logManager.getOrCreateLog(topic, partition).append(
new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
messages = message))
stats.recordRequest(RequestKeys.Produce, SystemTime.nanoseconds - start)
} }
} }
catch { catch {
@ -88,10 +140,26 @@ class EmbeddedConsumer(private val consumerConfig: ConsumerConfig,
}, false) }, false)
for (thread <- threadList) for (thread <- threadList)
thread.start thread.start()
}
else
logger.info("Not starting consumer threads (mirror topic list is empty)")
} }
def shutdown() = { def startup() {
consumerConnector.shutdown topicEventWatcher = new ZookeeperTopicEventWatcher(consumerConfig, this)
/*
* consumer threads are (re-)started upon topic events (which includes an
* initial startup event which lists the current topics)
*/
}
def shutdown() {
producer.close()
if (consumerConnector != null)
consumerConnector.shutdown()
if (topicEventWatcher != null)
topicEventWatcher.shutdown()
} }
} }

View File

@ -103,6 +103,8 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
override def javaCompileOptions = super.javaCompileOptions ++ override def javaCompileOptions = super.javaCompileOptions ++
List(JavaCompileOption("-source"), JavaCompileOption("1.5")) List(JavaCompileOption("-source"), JavaCompileOption("1.5"))
override def packageAction = super.packageAction dependsOn (testCompileAction)
} }
class KafkaExamplesProject(info: ProjectInfo) extends DefaultProject(info) class KafkaExamplesProject(info: ProjectInfo) extends DefaultProject(info)

View File

@ -1,8 +1,27 @@
This test replicates messages from 3 kafka brokers to 2 other kafka brokers using the embedded consumer. This test replicates messages from 3 kafka brokers to 2 other kafka brokers
At the end, the messages produced at the source brokers should match that at the target brokers. using the embedded consumer. At the end, the messages produced at the source
brokers should match that at the target brokers.
To run this test, do To run this test, do
bin/run-test.sh bin/run-test.sh
The expected output is given in bin/expected.out. There is only 1 thing that's important. The expected output is given in bin/expected.out. There is only 1 thing that's
important.
1. The output should have a line "test passed". 1. The output should have a line "test passed".
In the event of failure, by default the brokers and zookeepers remain running
to make it easier to debug the issue - hit Ctrl-C to shut them down. You can
change this behavior by setting the action_on_fail flag in the script to "exit"
or "proceed", in which case a snapshot of all the logs and directories is
placed in the test's base directory.
If you are making any changes that may affect the embedded consumer, it is a
good idea to run the test in a loop. E.g.:
:>/tmp/embeddedconsumer_test.log
for i in {1..10}; do echo "run $i"; ./bin/run-test.sh 2>1 >> /tmp/embeddedconsumer_test.log; done
tail -F /tmp/embeddedconsumer_test.log
grep -ic passed /tmp/embeddedconsumer_test.log
grep -ic failed /tmp/embeddedconsumer_test.log

View File

@ -1,79 +1,314 @@
#!/bin/bash #!/bin/bash
num_messages=400000 readonly num_messages=400000
message_size=400 readonly message_size=400
readonly action_on_fail="proceed"
base_dir=$(dirname $0)/.. readonly test_start_time="$(date +%s)"
rm -rf /tmp/zookeeper_source readonly base_dir=$(dirname $0)/..
rm -rf /tmp/zookeeper_target
rm -rf /tmp/kafka-source1-logs
mkdir /tmp/kafka-source1-logs
mkdir /tmp/kafka-source1-logs/test01-0
touch /tmp/kafka-source1-logs/test01-0/00000000000000000000.kafka
rm -rf /tmp/kafka-source2-logs
mkdir /tmp/kafka-source2-logs
mkdir /tmp/kafka-source2-logs/test01-0
touch /tmp/kafka-source2-logs/test01-0/00000000000000000000.kafka
rm -rf /tmp/kafka-source3-logs
mkdir /tmp/kafka-source3-logs
mkdir /tmp/kafka-source3-logs/test01-0
touch /tmp/kafka-source3-logs/test01-0/00000000000000000000.kafka
rm -rf /tmp/kafka-target1-logs
rm -rf /tmp/kafka-target2-logs
echo "start the servers ..." info() {
$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source.properties 2>&1 > $base_dir/zookeeper_source.log & echo -e "$(date +"%Y-%m-%d %H:%M:%S") $*"
$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_target.properties 2>&1 > $base_dir/zookeeper_target.log & }
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source1.properties 2>&1 > $base_dir/kafka_source1.log &
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source2.properties 2>&1 > $base_dir/kafka_source2.log &
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source3.properties 2>&1 > $base_dir/kafka_source3.log &
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target1.properties $base_dir/config/consumer.properties 2>&1 > $base_dir/kafka_target1.log &
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target2.properties $base_dir/config/consumer.properties 2>&1 > $base_dir/kafka_target2.log &
sleep 4 kill_child_processes() {
echo "start producing messages ..." isTopmost=$1
$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo zk.connect=localhost:2181 --topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval 400000 num_messages --async --delay-btw-batch-ms 10 & curPid=$2
childPids=$(ps a -o pid= -o ppid= | grep "${curPid}$" | awk '{print $1;}')
for childPid in $childPids
do
kill_child_processes 0 $childPid
done
if [ $isTopmost -eq 0 ]; then
kill -15 $curPid 2> /dev/null
fi
}
echo "wait for consumer to finish consuming ..." cleanup() {
cur1_offset="-1" info "cleaning up"
cur2_offset="-1"
quit1=0 pid_zk_source=
quit2=0 pid_zk_target=
while [ $quit1 -eq 0 ] && [ $quit2 -eq 0 ] pid_kafka_source1=
do pid_kafka_source2=
pid_kafka_source3=
pid_kafka_target1=
pid_kafka_target2=
pid_producer=
rm -rf /tmp/zookeeper_source
rm -rf /tmp/zookeeper_target
rm -rf /tmp/kafka-source{1..3}-logs
# mkdir -p /tmp/kafka-source{1..3}-logs/test0{1..3}-0
# touch /tmp/kafka-source{1..3}-logs/test0{1..3}-0/00000000000000000000.kafka
rm -rf /tmp/kafka-target{1..2}-logs
}
begin_timer() {
t_begin=$(date +%s)
}
end_timer() {
t_end=$(date +%s)
}
start_zk() {
info "starting zookeepers"
$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source.properties 2>&1 > $base_dir/zookeeper_source.log &
pid_zk_source=$!
$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_target.properties 2>&1 > $base_dir/zookeeper_target.log &
pid_zk_target=$!
}
start_source_servers() {
info "starting source cluster"
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source1.properties 2>&1 > $base_dir/kafka_source1.log &
pid_kafka_source1=$!
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source2.properties 2>&1 > $base_dir/kafka_source2.log &
pid_kafka_source2=$!
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source3.properties 2>&1 > $base_dir/kafka_source3.log &
pid_kafka_source3=$!
}
start_target_servers_for_whitelist_test() {
echo "starting mirror cluster"
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target1.properties $base_dir/config/whitelisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target1.log &
pid_kafka_target1=$!
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target2.properties $base_dir/config/whitelisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target2.log &
pid_kafka_target2=$!
}
start_target_servers_for_blacklist_test() {
echo "starting mirror cluster"
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target1.properties $base_dir/config/blacklisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target1.log &
pid_kafka_target1=$!
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target2.properties $base_dir/config/blacklisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target2.log &
pid_kafka_target2=$!
}
shutdown_servers() {
info "stopping producer"
if [ "x${pid_producer}" != "x" ]; then kill_child_processes 0 ${pid_producer}; fi
info "shutting down target servers"
if [ "x${pid_kafka_target1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target1}; fi
if [ "x${pid_kafka_target2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target2}; fi
sleep 2 sleep 2
target1_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
if [ $target1_size -eq $cur1_offset ] info "shutting down source servers"
then if [ "x${pid_kafka_source1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source1}; fi
quit1=1 if [ "x${pid_kafka_source2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source2}; fi
if [ "x${pid_kafka_source3}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source3}; fi
info "shutting down zookeeper servers"
if [ "x${pid_zk_target}" != "x" ]; then kill_child_processes 0 ${pid_zk_target}; fi
if [ "x${pid_zk_source}" != "x" ]; then kill_child_processes 0 ${pid_zk_source}; fi
}
start_producer() {
topic=$1
info "start producing messages for topic $topic ..."
$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo zk.connect=localhost:2181 --topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval $num_messages --async --delay-btw-batch-ms 10 2>&1 > $base_dir/producer_performance.log &
pid_producer=$!
}
# In case the consumer does not consume, the test may exit prematurely (i.e.,
# shut down the kafka brokers, and ProducerPerformance will start throwing ugly
# exceptions. So, wait for the producer to finish before shutting down. If it
# takes too long, the user can just hit Ctrl-c which is trapped to kill child
# processes.
# Usage: wait_partition_done ([kafka-server] [topic] [partition-id])+
wait_partition_done() {
n_tuples=$(($# / 3))
i=1
while (($#)); do
kafka_server[i]=$1
topic[i]=$2
partitionid[i]=$3
prev_offset[i]=0
info "\twaiting for partition on server ${kafka_server[i]}, topic ${topic[i]}, partition ${partitionid[i]}"
i=$((i+1))
shift 3
done
all_done=0
# set -x
while [[ $all_done != 1 ]]; do
sleep 4
i=$n_tuples
all_done=1
for ((i=1; i <= $n_tuples; i++)); do
cur_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server ${kafka_server[i]} --topic ${topic[i]} --partition ${partitionid[i]} --time -1 --offsets 1 | tail -1)
if [ "x$cur_size" != "x${prev_offset[i]}" ]; then
all_done=0
prev_offset[i]=$cur_size
fi fi
cur1_offset=$target1_size done
target2_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1` done
if [ $target2_size -eq $cur2_offset ]
}
cmp_logs() {
topic=$1
info "comparing source and target logs for topic $topic"
source_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
source_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9091 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
source_part2_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9090 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
target_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
target_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9094 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
if [ "x$target_part0_size" == "x" ]; then target_part0_size=0; fi
if [ "x$target_part1_size" == "x" ]; then target_part1_size=0; fi
expected_size=$(($source_part0_size + $source_part1_size + $source_part2_size))
actual_size=$(($target_part0_size + $target_part1_size))
if [ "x$expected_size" != "x$actual_size" ]
then then
quit2=1 info "source size: $expected_size target size: $actual_size"
return 1
else
return 0
fi fi
cur2_offset=$target2_size }
done
take_fail_snapshot() {
snapshot_dir="$base_dir/failed-${snapshot_prefix}-${test_start_time}"
mkdir $snapshot_dir
for dir in /tmp/zookeeper_source /tmp/zookeeper_target /tmp/kafka-source{1..3}-logs /tmp/kafka-target{1..2}-logs; do
if [ -d $dir ]; then
cp -r $dir $snapshot_dir
fi
done
}
# Usage: process_test_result <result> <action_on_fail>
# result: last test result
# action_on_fail: (exit|wait|proceed)
# ("wait" is useful if you want to troubleshoot using zookeeper)
process_test_result() {
result=$1
if [ $1 -eq 0 ]; then
info "test passed"
else
info "test failed"
case "$2" in
"wait") info "waiting: hit Ctrl-c to quit"
wait
;;
"exit") shutdown_servers
take_fail_snapshot
exit $result
;;
*) shutdown_servers
take_fail_snapshot
info "proceeding"
;;
esac
fi
}
test_whitelists() {
info "### Testing whitelists"
snapshot_prefix="whitelist-test"
cleanup
start_zk
start_source_servers
start_target_servers_for_whitelist_test
sleep 4
begin_timer
start_producer test01
info "waiting for producer to finish producing ..."
wait_partition_done kafka://localhost:9090 test01 0 kafka://localhost:9091 test01 0 kafka://localhost:9092 test01 0
info "waiting for consumer to finish consuming ..."
wait_partition_done kafka://localhost:9093 test01 0 kafka://localhost:9094 test01 0
end_timer
info "embedded consumer took $((t_end - t_begin)) seconds"
sleep 2
cmp_logs test01
result=$?
return $result
}
test_blacklists() {
info "### Testing blacklists"
snapshot_prefix="blacklist-test"
cleanup
start_zk
start_source_servers
start_target_servers_for_blacklist_test
sleep 4
start_producer test02
info "waiting for producer to finish producing test02 ..."
wait_partition_done kafka://localhost:9090 test02 0 kafka://localhost:9091 test02 0 kafka://localhost:9092 test02 0
# start_producer test03
# info "waiting for producer to finish producing test03 ..."
# wait_partition_done kafka://localhost:9090 test03 0 kafka://localhost:9091 test03 0 kafka://localhost:9092 test03 0
begin_timer
start_producer test01
info "waiting for producer to finish producing ..."
wait_partition_done kafka://localhost:9090 test01 0 kafka://localhost:9091 test01 0 kafka://localhost:9092 test01 0
info "waiting for consumer to finish consuming ..."
wait_partition_done kafka://localhost:9093 test01 0 kafka://localhost:9094 test01 0
end_timer
info "embedded consumer took $((t_end - t_begin)) seconds"
sleep 2
cmp_logs test02
result1=$?
# cmp_logs test03
# result2=$?
# if [[ "x$result1" == "x0" || "x$result2" == "x0" ]]; then
if [[ "x$result1" == "x0" ]]; then
result=1
else
cmp_logs test01
result=$?
fi
return $result
}
# main test begins
echo "Test-$test_start_time"
# Ctrl-c trap. Catches INT signal
trap "shutdown_servers; exit 0" INT
test_whitelists
result=$?
process_test_result $result $action_on_fail
shutdown_servers
sleep 2 sleep 2
source_part0_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
source_part1_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9091 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
source_part2_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9090 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
target_part0_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
target_part1_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9094 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
expected_size=`expr $source_part0_size + $source_part1_size + $source_part2_size` test_blacklists
actual_size=`expr $target_part0_size + $target_part1_size` result=$?
if [ $expected_size != $actual_size ]
then process_test_result $result $action_on_fail
echo "source size: $expected_size target size: $actual_size test failed!!! look at it!!!"
else shutdown_servers
echo "test passed"
fi exit $result
echo "stopping the servers"
ps ax | grep -i 'kafka.kafka' | grep -v grep | awk '{print $1}' | xargs kill -15 2>&1 > /dev/null
sleep 2
ps ax | grep -i 'QuorumPeerMain' | grep -v grep | awk '{print $1}' | xargs kill -15 2>&1 > /dev/null

View File

@ -0,0 +1,15 @@
# see kafka.consumer.ConsumerConfig for more details
# zk connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zk.connect=localhost:2181
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
#consumer group id
groupid=group1
mirror.topics.blacklist=test02,test03

View File

@ -1,14 +0,0 @@
# see kafka.consumer.ConsumerConfig for more details
# zk connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zk.connect=localhost:2181
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
#consumer group id
groupid=group1
embeddedconsumer.topics=test01:1

View File

@ -0,0 +1,13 @@
# zk connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zk.connect=localhost:2182
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
producer.type=async
# to avoid dropping events if the queue is full, wait indefinitely
queue.enqueueTimeout.ms=-1

View File

@ -60,5 +60,3 @@ log.default.flush.interval.ms=1000
# time based topic flasher time rate in ms # time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000 log.default.flush.scheduler.interval.ms=1000
# topic partition count map
# topic.partition.count.map=topic1:3, topic2:4

View File

@ -60,5 +60,3 @@ log.default.flush.interval.ms=1000
# time based topic flasher time rate in ms # time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000 log.default.flush.scheduler.interval.ms=1000
# topic partition count map
# topic.partition.count.map=topic1:3, topic2:4

View File

@ -60,5 +60,3 @@ log.default.flush.interval.ms=1000
# time based topic flasher time rate in ms # time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000 log.default.flush.scheduler.interval.ms=1000
# topic partition count map
# topic.partition.count.map=topic1:3, topic2:4

View File

@ -0,0 +1,15 @@
# see kafka.consumer.ConsumerConfig for more details
# zk connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zk.connect=localhost:2181
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
#consumer group id
groupid=group1
mirror.topics.whitelist=test01:1