KAFKA-829 Mirror maker needs to share the migration tool request channel; reviewed by Jun Rao

This commit is contained in:
Neha Narkhede 2013-03-28 09:54:02 -07:00
parent be3ce14721
commit 66b1038957
4 changed files with 64 additions and 46 deletions

View File

@ -31,7 +31,7 @@ producer.type=sync
compression.codec=none
# message encoder
serializer.class=kafka.serializer.StringEncoder
serializer.class=kafka.serializer.DefaultEncoder
# allow topic level compression
#compressed.topics=

View File

@ -26,7 +26,7 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for
/**
* Sends the data to a single topic, partitioned by key, using either the
* synchronous or the asynchronous producer
* @param producerData the producer data object that encapsulates the topic, key and message data
* @param message the producer data object that encapsulates the topic, key and message data
*/
def send(message: KeyedMessage[K,V]) {
underlying.send(message)
@ -34,7 +34,7 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for
/**
* Use this API to send data to multiple topics
* @param producerData list of producer data objects that encapsulate the topic, key and message data
* @param messages list of producer data objects that encapsulate the topic, key and message data
*/
def send(messages: java.util.List[KeyedMessage[K,V]]) {
import collection.JavaConversions._

View File

@ -220,7 +220,7 @@ public class KafkaMigrationTool {
kafkaProducerProperties_08.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
// create a producer channel instead
int queueSize = options.valueOf(queueSizeOpt);
ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<String, byte[]>>(queueSize);
ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<byte[], byte[]>>(queueSize);
int threadId = 0;
Runtime.getRuntime().addShutdownHook(new Thread() {
@ -279,8 +279,7 @@ public class KafkaMigrationTool {
}
}
private static class ProducerDataChannel<T> {
static class ProducerDataChannel<T> {
private final int producerQueueSize;
private final BlockingQueue<T> producerRequestQueue;
@ -300,14 +299,14 @@ public class KafkaMigrationTool {
private static class MigrationThread extends Thread {
private final Object stream;
private final ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel;
private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel;
private final int threadId;
private final String threadName;
private final org.apache.log4j.Logger logger;
private CountDownLatch shutdownComplete = new CountDownLatch(1);
private final AtomicBoolean isRunning = new AtomicBoolean(true);
MigrationThread(Object _stream, ProducerDataChannel<KeyedMessage<String, byte[]>> _producerDataChannel, int _threadId) {
MigrationThread(Object _stream, ProducerDataChannel<KeyedMessage<byte[], byte[]>> _producerDataChannel, int _threadId) {
stream = _stream;
producerDataChannel = _producerDataChannel;
threadId = _threadId;
@ -336,7 +335,7 @@ public class KafkaMigrationTool {
((ByteBuffer)payload_07).get(bytes);
if(logger.isDebugEnabled())
logger.debug("Migration thread " + threadId + " sending message of size " + bytes.length + " to topic "+ topic);
KeyedMessage<String, byte[]> producerData = new KeyedMessage((String)topic, null, bytes);
KeyedMessage<byte[], byte[]> producerData = new KeyedMessage((String)topic, null, bytes);
producerDataChannel.sendRequest(producerData);
}
logger.info("Migration thread " + threadName + " finished running");
@ -362,17 +361,17 @@ public class KafkaMigrationTool {
}
}
private static class ProducerThread extends Thread {
private final ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel;
private final Producer<String, byte[]> producer;
static class ProducerThread extends Thread {
private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel;
private final Producer<byte[], byte[]> producer;
private final int threadId;
private String threadName;
private org.apache.log4j.Logger logger;
private CountDownLatch shutdownComplete = new CountDownLatch(1);
private KeyedMessage<String, byte[]> shutdownMessage = new KeyedMessage("shutdown", null, null);
private KeyedMessage<byte[], byte[]> shutdownMessage = new KeyedMessage("shutdown", null, null);
public ProducerThread(ProducerDataChannel<KeyedMessage<String, byte[]>> _producerDataChannel,
Producer<String, byte[]> _producer,
public ProducerThread(ProducerDataChannel<KeyedMessage<byte[], byte[]>> _producerDataChannel,
Producer<byte[], byte[]> _producer,
int _threadId) {
producerDataChannel = _producerDataChannel;
producer = _producer;
@ -385,7 +384,7 @@ public class KafkaMigrationTool {
public void run() {
try{
while(true) {
KeyedMessage<String, byte[]> data = producerDataChannel.receiveRequest();
KeyedMessage<byte[], byte[]> data = producerDataChannel.receiveRequest();
if(!data.equals(shutdownMessage))
producer.send(data);
else

View File

@ -24,6 +24,9 @@ import scala.collection.JavaConversions._
import java.util.concurrent.CountDownLatch
import kafka.consumer._
import kafka.serializer._
import collection.mutable.ListBuffer
import kafka.tools.KafkaMigrationTool.{ProducerThread, ProducerDataChannel}
import kafka.javaapi
object MirrorMaker extends Logging {
@ -59,7 +62,13 @@ object MirrorMaker extends Logging {
.describedAs("Number of threads")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
val bufferSizeOpt = parser.accepts("queue.size", "Number of messages that are buffered between the consumer and producer")
.withRequiredArg()
.describedAs("Queue size in terms of number of messages")
.ofType(classOf[java.lang.Integer])
.defaultsTo(10000);
val whitelistOpt = parser.accepts("whitelist",
"Whitelist of topics to mirror.")
.withRequiredArg()
@ -88,6 +97,7 @@ object MirrorMaker extends Logging {
}
val numStreams = options.valueOf(numStreamsOpt)
val bufferSize = options.valueOf(bufferSizeOpt).intValue()
val producers = (1 to options.valueOf(numProducersOpt).intValue()).map(_ => {
val config = new ProducerConfig(
@ -95,52 +105,63 @@ object MirrorMaker extends Logging {
new Producer[Array[Byte], Array[Byte]](config)
})
val threads = {
val connectors = options.valuesOf(consumerConfigOpt).toList
.map(cfg => new ConsumerConfig(Utils.loadProps(cfg.toString)))
.map(new ZookeeperConsumerConnector(_))
val connectors = options.valuesOf(consumerConfigOpt).toList
.map(cfg => new ConsumerConfig(Utils.loadProps(cfg.toString)))
.map(new ZookeeperConsumerConnector(_))
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
connectors.foreach(_.shutdown())
producers.foreach(_.close())
}
})
val filterSpec = if (options.has(whitelistOpt))
new Whitelist(options.valueOf(whitelistOpt))
else
new Blacklist(options.valueOf(blacklistOpt))
val filterSpec = if (options.has(whitelistOpt))
new Whitelist(options.valueOf(whitelistOpt))
else
new Blacklist(options.valueOf(blacklistOpt))
val streams =
connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder()))
val streams =
connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder()))
val producerDataChannel = new ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]](bufferSize);
streams.flatten.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producers, streamAndIndex._2))
val consumerThreads =
streams.flatten.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producerDataChannel, streamAndIndex._2))
val producerThreads = new ListBuffer[ProducerThread]()
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
connectors.foreach(_.shutdown)
consumerThreads.foreach(_.awaitShutdown)
producerThreads.foreach(_.shutdown)
producerThreads.foreach(_.awaitShutdown)
logger.info("Kafka migration tool shutdown successfully");
}
})
// create producer threads
var i: Int = 1
for(producer <- producers) {
val producerThread: KafkaMigrationTool.ProducerThread = new KafkaMigrationTool.ProducerThread(producerDataChannel,
new javaapi.producer.Producer[Array[Byte], Array[Byte]](producer), i)
producerThreads += producerThread
i += 1
}
threads.foreach(_.start())
threads.foreach(_.awaitShutdown())
consumerThreads.foreach(_.start)
producerThreads.foreach(_.start)
}
class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]],
producers: Seq[Producer[Array[Byte], Array[Byte]]],
producerDataChannel: ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]],
threadId: Int)
extends Thread with Logging {
private val shutdownLatch = new CountDownLatch(1)
private val threadName = "mirrormaker-" + threadId
private val producerSelector = Utils.circularIterator(producers)
this.setName(threadName)
override def run() {
try {
for (msgAndMetadata <- stream) {
val producer = producerSelector.next()
val pd = new KeyedMessage[Array[Byte], Array[Byte]](
msgAndMetadata.topic, msgAndMetadata.message)
producer.send(pd)
val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.message)
producerDataChannel.sendRequest(pd)
}
} catch {
case e =>
@ -155,9 +176,7 @@ object MirrorMaker extends Logging {
try {
shutdownLatch.await()
} catch {
case e: InterruptedException => fatal(
"Shutdown of thread %s interrupted. This might leak data!"
.format(threadName))
case e: InterruptedException => fatal("Shutdown of thread %s interrupted. This might leak data!".format(threadName))
}
}
}