diff --git a/config/producer.properties b/config/producer.properties index a1c8cb21889..cc8f5f6d1b7 100644 --- a/config/producer.properties +++ b/config/producer.properties @@ -31,7 +31,7 @@ producer.type=sync compression.codec=none # message encoder -serializer.class=kafka.serializer.StringEncoder +serializer.class=kafka.serializer.DefaultEncoder # allow topic level compression #compressed.topics= diff --git a/core/src/main/scala/kafka/javaapi/producer/Producer.scala b/core/src/main/scala/kafka/javaapi/producer/Producer.scala index 424ef39c3d9..72653285607 100644 --- a/core/src/main/scala/kafka/javaapi/producer/Producer.scala +++ b/core/src/main/scala/kafka/javaapi/producer/Producer.scala @@ -26,7 +26,7 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for /** * Sends the data to a single topic, partitioned by key, using either the * synchronous or the asynchronous producer - * @param producerData the producer data object that encapsulates the topic, key and message data + * @param message the producer data object that encapsulates the topic, key and message data */ def send(message: KeyedMessage[K,V]) { underlying.send(message) @@ -34,7 +34,7 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for /** * Use this API to send data to multiple topics - * @param producerData list of producer data objects that encapsulate the topic, key and message data + * @param messages list of producer data objects that encapsulate the topic, key and message data */ def send(messages: java.util.List[KeyedMessage[K,V]]) { import collection.JavaConversions._ diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java index eb63d75617a..a15b3503272 100644 --- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java +++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java @@ -220,7 +220,7 @@ public class KafkaMigrationTool { kafkaProducerProperties_08.setProperty("serializer.class", "kafka.serializer.DefaultEncoder"); // create a producer channel instead int queueSize = options.valueOf(queueSizeOpt); - ProducerDataChannel> producerDataChannel = new ProducerDataChannel>(queueSize); + ProducerDataChannel> producerDataChannel = new ProducerDataChannel>(queueSize); int threadId = 0; Runtime.getRuntime().addShutdownHook(new Thread() { @@ -279,8 +279,7 @@ public class KafkaMigrationTool { } } - - private static class ProducerDataChannel { + static class ProducerDataChannel { private final int producerQueueSize; private final BlockingQueue producerRequestQueue; @@ -300,14 +299,14 @@ public class KafkaMigrationTool { private static class MigrationThread extends Thread { private final Object stream; - private final ProducerDataChannel> producerDataChannel; + private final ProducerDataChannel> producerDataChannel; private final int threadId; private final String threadName; private final org.apache.log4j.Logger logger; private CountDownLatch shutdownComplete = new CountDownLatch(1); private final AtomicBoolean isRunning = new AtomicBoolean(true); - MigrationThread(Object _stream, ProducerDataChannel> _producerDataChannel, int _threadId) { + MigrationThread(Object _stream, ProducerDataChannel> _producerDataChannel, int _threadId) { stream = _stream; producerDataChannel = _producerDataChannel; threadId = _threadId; @@ -336,7 +335,7 @@ public class KafkaMigrationTool { ((ByteBuffer)payload_07).get(bytes); if(logger.isDebugEnabled()) logger.debug("Migration thread " + threadId + " sending message of size " + bytes.length + " to topic "+ topic); - KeyedMessage producerData = new KeyedMessage((String)topic, null, bytes); + KeyedMessage producerData = new KeyedMessage((String)topic, null, bytes); producerDataChannel.sendRequest(producerData); } logger.info("Migration thread " + threadName + " finished running"); @@ -362,17 +361,17 @@ public class KafkaMigrationTool { } } - private static class ProducerThread extends Thread { - private final ProducerDataChannel> producerDataChannel; - private final Producer producer; + static class ProducerThread extends Thread { + private final ProducerDataChannel> producerDataChannel; + private final Producer producer; private final int threadId; private String threadName; private org.apache.log4j.Logger logger; private CountDownLatch shutdownComplete = new CountDownLatch(1); - private KeyedMessage shutdownMessage = new KeyedMessage("shutdown", null, null); + private KeyedMessage shutdownMessage = new KeyedMessage("shutdown", null, null); - public ProducerThread(ProducerDataChannel> _producerDataChannel, - Producer _producer, + public ProducerThread(ProducerDataChannel> _producerDataChannel, + Producer _producer, int _threadId) { producerDataChannel = _producerDataChannel; producer = _producer; @@ -385,7 +384,7 @@ public class KafkaMigrationTool { public void run() { try{ while(true) { - KeyedMessage data = producerDataChannel.receiveRequest(); + KeyedMessage data = producerDataChannel.receiveRequest(); if(!data.equals(shutdownMessage)) producer.send(data); else diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 5c4b3d29d61..3d22dc7448e 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -24,6 +24,9 @@ import scala.collection.JavaConversions._ import java.util.concurrent.CountDownLatch import kafka.consumer._ import kafka.serializer._ +import collection.mutable.ListBuffer +import kafka.tools.KafkaMigrationTool.{ProducerThread, ProducerDataChannel} +import kafka.javaapi object MirrorMaker extends Logging { @@ -59,7 +62,13 @@ object MirrorMaker extends Logging { .describedAs("Number of threads") .ofType(classOf[java.lang.Integer]) .defaultsTo(1) - + + val bufferSizeOpt = parser.accepts("queue.size", "Number of messages that are buffered between the consumer and producer") + .withRequiredArg() + .describedAs("Queue size in terms of number of messages") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(10000); + val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to mirror.") .withRequiredArg() @@ -88,6 +97,7 @@ object MirrorMaker extends Logging { } val numStreams = options.valueOf(numStreamsOpt) + val bufferSize = options.valueOf(bufferSizeOpt).intValue() val producers = (1 to options.valueOf(numProducersOpt).intValue()).map(_ => { val config = new ProducerConfig( @@ -95,52 +105,63 @@ object MirrorMaker extends Logging { new Producer[Array[Byte], Array[Byte]](config) }) - val threads = { - val connectors = options.valuesOf(consumerConfigOpt).toList - .map(cfg => new ConsumerConfig(Utils.loadProps(cfg.toString))) - .map(new ZookeeperConsumerConnector(_)) + val connectors = options.valuesOf(consumerConfigOpt).toList + .map(cfg => new ConsumerConfig(Utils.loadProps(cfg.toString))) + .map(new ZookeeperConsumerConnector(_)) - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run() { - connectors.foreach(_.shutdown()) - producers.foreach(_.close()) - } - }) + val filterSpec = if (options.has(whitelistOpt)) + new Whitelist(options.valueOf(whitelistOpt)) + else + new Blacklist(options.valueOf(blacklistOpt)) - val filterSpec = if (options.has(whitelistOpt)) - new Whitelist(options.valueOf(whitelistOpt)) - else - new Blacklist(options.valueOf(blacklistOpt)) + val streams = + connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder())) - val streams = - connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder())) + val producerDataChannel = new ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]](bufferSize); - streams.flatten.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producers, streamAndIndex._2)) + val consumerThreads = + streams.flatten.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producerDataChannel, streamAndIndex._2)) + + val producerThreads = new ListBuffer[ProducerThread]() + + Runtime.getRuntime.addShutdownHook(new Thread() { + override def run() { + connectors.foreach(_.shutdown) + consumerThreads.foreach(_.awaitShutdown) + producerThreads.foreach(_.shutdown) + producerThreads.foreach(_.awaitShutdown) + logger.info("Kafka migration tool shutdown successfully"); + } + }) + + // create producer threads + var i: Int = 1 + for(producer <- producers) { + val producerThread: KafkaMigrationTool.ProducerThread = new KafkaMigrationTool.ProducerThread(producerDataChannel, + new javaapi.producer.Producer[Array[Byte], Array[Byte]](producer), i) + producerThreads += producerThread + i += 1 } - threads.foreach(_.start()) - - threads.foreach(_.awaitShutdown()) + consumerThreads.foreach(_.start) + producerThreads.foreach(_.start) } class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]], - producers: Seq[Producer[Array[Byte], Array[Byte]]], + producerDataChannel: ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]], threadId: Int) extends Thread with Logging { private val shutdownLatch = new CountDownLatch(1) private val threadName = "mirrormaker-" + threadId - private val producerSelector = Utils.circularIterator(producers) this.setName(threadName) override def run() { try { for (msgAndMetadata <- stream) { - val producer = producerSelector.next() - val pd = new KeyedMessage[Array[Byte], Array[Byte]]( - msgAndMetadata.topic, msgAndMetadata.message) - producer.send(pd) + val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.message) + producerDataChannel.sendRequest(pd) } } catch { case e => @@ -155,9 +176,7 @@ object MirrorMaker extends Logging { try { shutdownLatch.await() } catch { - case e: InterruptedException => fatal( - "Shutdown of thread %s interrupted. This might leak data!" - .format(threadName)) + case e: InterruptedException => fatal("Shutdown of thread %s interrupted. This might leak data!".format(threadName)) } } }