From d6b1de35f6b9cd5370c7812790fea8e61618f461 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Wed, 25 Apr 2012 02:24:47 +0000 Subject: [PATCH] Mirroring should use multiple producers; add producer retries to DefaultEventHandler; patched by Joel Koshy; reviewed by Jun Rao; KAFKA-332 git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1330083 13f79535-47bb-0310-9956-ffa450edef68 --- .../scala/kafka/producer/ProducerConfig.scala | 11 ++++++++ .../producer/async/DefaultEventHandler.scala | 22 +++++++++++++--- .../producer/async/ProducerSendThread.scala | 2 +- .../main/scala/kafka/tools/MirrorMaker.scala | 25 +++++++++++++------ core/src/main/scala/kafka/utils/Utils.scala | 11 ++++++++ .../scala/unit/kafka/utils/UtilsTest.scala | 20 +++++++++++++++ system_test/mirror_maker/bin/run-test.sh | 6 ++--- .../config/mirror_producer.properties | 2 ++ 8 files changed, 84 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala index fa989c88d89..8a5b53cb0ac 100644 --- a/core/src/main/scala/kafka/producer/ProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala @@ -32,6 +32,17 @@ class ProducerConfig(val props: Properties) extends ZKConfig(props) if(Utils.propertyExists(brokerList) && Utils.getString(props, "partitioner.class", null) != null) throw new InvalidConfigException("partitioner.class cannot be used when broker.list is set") + /** + * If DefaultEventHandler is used, this specifies the number of times to + * retry if an error is encountered during send. Currently, it is only + * appropriate when broker.list points to a VIP. If the zk.connect option + * is used instead, this will not have any effect because with the zk-based + * producer, brokers are not re-selected upon retry. So retries would go to + * the same (potentially still down) broker. (KAFKA-253 will help address + * this.) + */ + val numRetries = Utils.getInt(props, "num.retries", 0) + /** If both broker.list and zk.connect options are specified, throw an exception */ if(Utils.propertyExists(brokerList) && Utils.propertyExists(zkConnect)) throw new InvalidConfigException("only one of broker.list and zk.connect can be specified") diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 0bf3c3c1e30..f72eed18147 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -47,9 +47,25 @@ private[kafka] class DefaultEventHandler[T](val config: ProducerConfig, private def send(messagesPerTopic: Map[(String, Int), ByteBufferMessageSet], syncProducer: SyncProducer) { if(messagesPerTopic.size > 0) { val requests = messagesPerTopic.map(f => new ProducerRequest(f._1._1, f._1._2, f._2)).toArray - syncProducer.multiSend(requests) - trace("kafka producer sent messages for topics %s to broker %s:%d" - .format(messagesPerTopic, syncProducer.config.host, syncProducer.config.port)) + + val maxAttempts = config.numRetries + 1 + var attemptsRemaining = maxAttempts + var sent = false + + while (attemptsRemaining > 0 && !sent) { + attemptsRemaining -= 1 + try { + syncProducer.multiSend(requests) + trace("kafka producer sent messages for topics %s to broker %s:%d (on attempt %d)" + .format(messagesPerTopic, syncProducer.config.host, syncProducer.config.port, maxAttempts - attemptsRemaining)) + sent = true + } + catch { + case e => warn("Error sending messages, %d attempts remaining".format(attemptsRemaining)) + if (attemptsRemaining == 0) + throw e + } + } } } diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala index 0c4a4edbc16..91c2fad2bcb 100644 --- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala +++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala @@ -115,7 +115,7 @@ private[async] class ProducerSendThread[T](val threadName: String, if(events.size > 0) handler.handle(events, underlyingProducer, serializer) }catch { - case e: Exception => error("Error in handling batch of " + events.size + " events", e) + case e => error("Error in handling batch of " + events.size + " events", e) } } diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 98dd65df5e7..3438f2c910b 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -33,20 +33,27 @@ object MirrorMaker extends Logging { info ("Starting mirror maker") val parser = new OptionParser - val consumerConfigOpt = parser.accepts("consumer-config", + val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config to consume from a source cluster. " + "You may specify multiple of these.") .withRequiredArg() .describedAs("config file") .ofType(classOf[String]) - val producerConfigOpt = parser.accepts("producer-config", + val producerConfigOpt = parser.accepts("producer.config", "Embedded producer config.") .withRequiredArg() .describedAs("config file") .ofType(classOf[String]) + + val numProducersOpt = parser.accepts("num.producers", + "Number of producer instances") + .withRequiredArg() + .describedAs("Number of producers") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) - val numStreamsOpt = parser.accepts("num-streams", + val numStreamsOpt = parser.accepts("num.streams", "Number of consumption streams.") .withRequiredArg() .describedAs("Number of threads") @@ -83,11 +90,11 @@ object MirrorMaker extends Logging { val numStreams = options.valueOf(numStreamsOpt) - val producer = { + val producers = (1 to options.valueOf(numProducersOpt).intValue()).map(_ => { val config = new ProducerConfig( Utils.loadProps(options.valueOf(producerConfigOpt))) new Producer[Null, Message](config) - } + }) val threads = { val connectors = options.valuesOf(consumerConfigOpt).toList @@ -97,7 +104,7 @@ object MirrorMaker extends Logging { Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { connectors.foreach(_.shutdown()) - producer.close() + producers.foreach(_.close()) } }) @@ -110,7 +117,7 @@ object MirrorMaker extends Logging { connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue())) streams.flatten.zipWithIndex.map(streamAndIndex => { - new MirrorMakerThread(streamAndIndex._1, producer, streamAndIndex._2) + new MirrorMakerThread(streamAndIndex._1, producers, streamAndIndex._2) }) } @@ -120,18 +127,20 @@ object MirrorMaker extends Logging { } class MirrorMakerThread(stream: KafkaStream[Message], - producer: Producer[Null, Message], + producers: Seq[Producer[Null, Message]], threadId: Int) extends Thread with Logging { private val shutdownLatch = new CountDownLatch(1) private val threadName = "mirrormaker-" + threadId + private val producerSelector = Utils.circularIterator(producers) this.setName(threadName) override def run() { try { for (msgAndMetadata <- stream) { + val producer = producerSelector.next() val pd = new ProducerData[Null, Message]( msgAndMetadata.topic, msgAndMetadata.message) producer.send(pd) diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index a3c27010be3..eb72b0fb1dd 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -668,6 +668,17 @@ object Utils extends Logging { } } } + + /** + * Create a circular (looping) iterator over a collection. + * @param coll An iterable over the underlying collection. + * @return A circular iterator over the collection. + */ + def circularIterator[T](coll: Iterable[T]) = { + val stream: Stream[T] = + for (forever <- Stream.continually(1); t <- coll) yield t + stream.iterator + } } class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) { diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala index 218e2298536..771432e1990 100644 --- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala @@ -20,6 +20,7 @@ package kafka.utils import org.apache.log4j.Logger import org.scalatest.junit.JUnitSuite import org.junit.Test +import org.junit.Assert._ class UtilsTest extends JUnitSuite { @@ -31,4 +32,23 @@ class UtilsTest extends JUnitSuite { Utils.swallow(logger.info, throw new IllegalStateException("test")) } + @Test + def testCircularIterator() { + val l = List(1, 2) + val itl = Utils.circularIterator(l) + assertEquals(1, itl.next()) + assertEquals(2, itl.next()) + assertEquals(1, itl.next()) + assertEquals(2, itl.next()) + assertFalse(itl.hasDefiniteSize) + + val s = Set(1, 2) + val its = Utils.circularIterator(s) + assertEquals(1, its.next()) + assertEquals(2, its.next()) + assertEquals(1, its.next()) + assertEquals(2, its.next()) + assertEquals(1, its.next()) + } + } diff --git a/system_test/mirror_maker/bin/run-test.sh b/system_test/mirror_maker/bin/run-test.sh index bdc3f37d8e8..e4bbd816192 100644 --- a/system_test/mirror_maker/bin/run-test.sh +++ b/system_test/mirror_maker/bin/run-test.sh @@ -241,9 +241,9 @@ test_whitelists() { sleep 4 info "starting mirror makers" - JMX_PORT=7777 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer-config $base_dir/config/whitelisttest_1.consumer.properties --consumer-config $base_dir/config/whitelisttest_2.consumer.properties --producer-config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num-streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log & + JMX_PORT=7777 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config $base_dir/config/whitelisttest_1.consumer.properties --consumer.config $base_dir/config/whitelisttest_2.consumer.properties --producer.config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num.streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log & pid_mirrormaker_1=$! - JMX_PORT=8888 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer-config $base_dir/config/whitelisttest_1.consumer.properties --consumer-config $base_dir/config/whitelisttest_2.consumer.properties --producer-config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num-streams 2 2>&1 > $base_dir/kafka_mirrormaker_2.log & + JMX_PORT=8888 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config $base_dir/config/whitelisttest_1.consumer.properties --consumer.config $base_dir/config/whitelisttest_2.consumer.properties --producer.config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num.streams 2 2>&1 > $base_dir/kafka_mirrormaker_2.log & pid_mirrormaker_2=$! begin_timer @@ -298,7 +298,7 @@ test_blacklists() { sleep 4 info "starting mirror maker" - $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer-config $base_dir/config/blacklisttest.consumer.properties --producer-config $base_dir/config/mirror_producer.properties --blacklist="black.*" --num-streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log & + $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config $base_dir/config/blacklisttest.consumer.properties --producer.config $base_dir/config/mirror_producer.properties --blacklist="black.*" --num.streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log & pid_mirrormaker_1=$! start_producer blacktopic01 localhost:2181 diff --git a/system_test/mirror_maker/config/mirror_producer.properties b/system_test/mirror_maker/config/mirror_producer.properties index 5940c242d3f..b74c631d1d2 100644 --- a/system_test/mirror_maker/config/mirror_producer.properties +++ b/system_test/mirror_maker/config/mirror_producer.properties @@ -26,3 +26,5 @@ producer.type=async # to avoid dropping events if the queue is full, wait indefinitely queue.enqueueTimeout.ms=-1 +num.producers.per.broker=2 +