Mirroring should use multiple producers; add producer retries to DefaultEventHandler; patched by Joel Koshy; reviewed by Jun Rao; KAFKA-332

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1330083 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jun Rao 2012-04-25 02:24:47 +00:00
parent d97c557202
commit d6b1de35f6
8 changed files with 84 additions and 15 deletions

View File

@ -32,6 +32,17 @@ class ProducerConfig(val props: Properties) extends ZKConfig(props)
if(Utils.propertyExists(brokerList) && Utils.getString(props, "partitioner.class", null) != null)
throw new InvalidConfigException("partitioner.class cannot be used when broker.list is set")
/**
* If DefaultEventHandler is used, this specifies the number of times to
* retry if an error is encountered during send. Currently, it is only
* appropriate when broker.list points to a VIP. If the zk.connect option
* is used instead, this will not have any effect because with the zk-based
* producer, brokers are not re-selected upon retry. So retries would go to
* the same (potentially still down) broker. (KAFKA-253 will help address
* this.)
*/
val numRetries = Utils.getInt(props, "num.retries", 0)
/** If both broker.list and zk.connect options are specified, throw an exception */
if(Utils.propertyExists(brokerList) && Utils.propertyExists(zkConnect))
throw new InvalidConfigException("only one of broker.list and zk.connect can be specified")

View File

@ -47,9 +47,25 @@ private[kafka] class DefaultEventHandler[T](val config: ProducerConfig,
private def send(messagesPerTopic: Map[(String, Int), ByteBufferMessageSet], syncProducer: SyncProducer) {
if(messagesPerTopic.size > 0) {
val requests = messagesPerTopic.map(f => new ProducerRequest(f._1._1, f._1._2, f._2)).toArray
syncProducer.multiSend(requests)
trace("kafka producer sent messages for topics %s to broker %s:%d"
.format(messagesPerTopic, syncProducer.config.host, syncProducer.config.port))
val maxAttempts = config.numRetries + 1
var attemptsRemaining = maxAttempts
var sent = false
while (attemptsRemaining > 0 && !sent) {
attemptsRemaining -= 1
try {
syncProducer.multiSend(requests)
trace("kafka producer sent messages for topics %s to broker %s:%d (on attempt %d)"
.format(messagesPerTopic, syncProducer.config.host, syncProducer.config.port, maxAttempts - attemptsRemaining))
sent = true
}
catch {
case e => warn("Error sending messages, %d attempts remaining".format(attemptsRemaining))
if (attemptsRemaining == 0)
throw e
}
}
}
}

View File

@ -115,7 +115,7 @@ private[async] class ProducerSendThread[T](val threadName: String,
if(events.size > 0)
handler.handle(events, underlyingProducer, serializer)
}catch {
case e: Exception => error("Error in handling batch of " + events.size + " events", e)
case e => error("Error in handling batch of " + events.size + " events", e)
}
}

View File

@ -33,20 +33,27 @@ object MirrorMaker extends Logging {
info ("Starting mirror maker")
val parser = new OptionParser
val consumerConfigOpt = parser.accepts("consumer-config",
val consumerConfigOpt = parser.accepts("consumer.config",
"Consumer config to consume from a source cluster. " +
"You may specify multiple of these.")
.withRequiredArg()
.describedAs("config file")
.ofType(classOf[String])
val producerConfigOpt = parser.accepts("producer-config",
val producerConfigOpt = parser.accepts("producer.config",
"Embedded producer config.")
.withRequiredArg()
.describedAs("config file")
.ofType(classOf[String])
val numProducersOpt = parser.accepts("num.producers",
"Number of producer instances")
.withRequiredArg()
.describedAs("Number of producers")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
val numStreamsOpt = parser.accepts("num-streams",
val numStreamsOpt = parser.accepts("num.streams",
"Number of consumption streams.")
.withRequiredArg()
.describedAs("Number of threads")
@ -83,11 +90,11 @@ object MirrorMaker extends Logging {
val numStreams = options.valueOf(numStreamsOpt)
val producer = {
val producers = (1 to options.valueOf(numProducersOpt).intValue()).map(_ => {
val config = new ProducerConfig(
Utils.loadProps(options.valueOf(producerConfigOpt)))
new Producer[Null, Message](config)
}
})
val threads = {
val connectors = options.valuesOf(consumerConfigOpt).toList
@ -97,7 +104,7 @@ object MirrorMaker extends Logging {
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
connectors.foreach(_.shutdown())
producer.close()
producers.foreach(_.close())
}
})
@ -110,7 +117,7 @@ object MirrorMaker extends Logging {
connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue()))
streams.flatten.zipWithIndex.map(streamAndIndex => {
new MirrorMakerThread(streamAndIndex._1, producer, streamAndIndex._2)
new MirrorMakerThread(streamAndIndex._1, producers, streamAndIndex._2)
})
}
@ -120,18 +127,20 @@ object MirrorMaker extends Logging {
}
class MirrorMakerThread(stream: KafkaStream[Message],
producer: Producer[Null, Message],
producers: Seq[Producer[Null, Message]],
threadId: Int)
extends Thread with Logging {
private val shutdownLatch = new CountDownLatch(1)
private val threadName = "mirrormaker-" + threadId
private val producerSelector = Utils.circularIterator(producers)
this.setName(threadName)
override def run() {
try {
for (msgAndMetadata <- stream) {
val producer = producerSelector.next()
val pd = new ProducerData[Null, Message](
msgAndMetadata.topic, msgAndMetadata.message)
producer.send(pd)

View File

@ -668,6 +668,17 @@ object Utils extends Logging {
}
}
}
/**
* Create a circular (looping) iterator over a collection.
* @param coll An iterable over the underlying collection.
* @return A circular iterator over the collection.
*/
def circularIterator[T](coll: Iterable[T]) = {
val stream: Stream[T] =
for (forever <- Stream.continually(1); t <- coll) yield t
stream.iterator
}
}
class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) {

View File

@ -20,6 +20,7 @@ package kafka.utils
import org.apache.log4j.Logger
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import org.junit.Assert._
class UtilsTest extends JUnitSuite {
@ -31,4 +32,23 @@ class UtilsTest extends JUnitSuite {
Utils.swallow(logger.info, throw new IllegalStateException("test"))
}
@Test
def testCircularIterator() {
val l = List(1, 2)
val itl = Utils.circularIterator(l)
assertEquals(1, itl.next())
assertEquals(2, itl.next())
assertEquals(1, itl.next())
assertEquals(2, itl.next())
assertFalse(itl.hasDefiniteSize)
val s = Set(1, 2)
val its = Utils.circularIterator(s)
assertEquals(1, its.next())
assertEquals(2, its.next())
assertEquals(1, its.next())
assertEquals(2, its.next())
assertEquals(1, its.next())
}
}

View File

@ -241,9 +241,9 @@ test_whitelists() {
sleep 4
info "starting mirror makers"
JMX_PORT=7777 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer-config $base_dir/config/whitelisttest_1.consumer.properties --consumer-config $base_dir/config/whitelisttest_2.consumer.properties --producer-config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num-streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log &
JMX_PORT=7777 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config $base_dir/config/whitelisttest_1.consumer.properties --consumer.config $base_dir/config/whitelisttest_2.consumer.properties --producer.config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num.streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log &
pid_mirrormaker_1=$!
JMX_PORT=8888 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer-config $base_dir/config/whitelisttest_1.consumer.properties --consumer-config $base_dir/config/whitelisttest_2.consumer.properties --producer-config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num-streams 2 2>&1 > $base_dir/kafka_mirrormaker_2.log &
JMX_PORT=8888 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config $base_dir/config/whitelisttest_1.consumer.properties --consumer.config $base_dir/config/whitelisttest_2.consumer.properties --producer.config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num.streams 2 2>&1 > $base_dir/kafka_mirrormaker_2.log &
pid_mirrormaker_2=$!
begin_timer
@ -298,7 +298,7 @@ test_blacklists() {
sleep 4
info "starting mirror maker"
$base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer-config $base_dir/config/blacklisttest.consumer.properties --producer-config $base_dir/config/mirror_producer.properties --blacklist="black.*" --num-streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log &
$base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config $base_dir/config/blacklisttest.consumer.properties --producer.config $base_dir/config/mirror_producer.properties --blacklist="black.*" --num.streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log &
pid_mirrormaker_1=$!
start_producer blacktopic01 localhost:2181

View File

@ -26,3 +26,5 @@ producer.type=async
# to avoid dropping events if the queue is full, wait indefinitely
queue.enqueueTimeout.ms=-1
num.producers.per.broker=2