KAFKA-2047; Move the stream creation into concurrent mirror maker threads; reviewed by Guozhang Wang

This commit is contained in:
Jiangjie Qin 2015-03-25 14:01:19 -07:00 committed by Guozhang Wang
parent a74688de46
commit 5b42b538eb
1 changed files with 19 additions and 30 deletions

View File

@ -22,18 +22,18 @@ import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.{Collections, Properties}
import scala.collection.JavaConversions._
import com.yammer.metrics.core.Gauge
import joptsimple.OptionParser
import kafka.consumer.{Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, KafkaStream, Whitelist, ZookeeperConsumerConnector}
import kafka.consumer.{KafkaStream, Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector}
import kafka.javaapi.consumer.ConsumerRebalanceListener
import kafka.message.MessageAndMetadata
import kafka.metrics.KafkaMetricsGroup
import kafka.serializer.DefaultDecoder
import kafka.utils.{CommandLineUtils, Logging, Utils}
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
import scala.collection.JavaConversions._
/**
* The mirror maker has the following architecture:
@ -226,26 +226,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
else
new Blacklist(options.valueOf(blacklistOpt))
// create a (connector->stream) sequence
val connectorStream = (0 until numStreams) map {
i => {
var stream: Seq[KafkaStream[Array[Byte], Array[Byte]]] = null
try {
// Creating just on stream per each connector instance
stream = connectors(i).createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())
require(stream.size == 1)
} catch {
case t: Throwable =>
fatal("Unable to create stream - shutting down mirror maker.", t)
connectors(i).shutdown()
}
connectors(i) -> stream(0)
}
}
// Create mirror maker threads
mirrorMakerThreads = (0 until numStreams) map ( i =>
new MirrorMakerThread(connectorStream(i)._1, connectorStream(i)._2, i)
new MirrorMakerThread(connectors(i), filterSpec, i)
)
// Create and initialize message handler
@ -295,13 +278,14 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
}
private def maybeSetDefaultProperty(properties: Properties, propertyName: String, defaultValue: String) {
properties.setProperty(propertyName, Option(properties.getProperty(propertyName)).getOrElse(defaultValue))
val propertyValue = properties.getProperty(propertyName)
properties.setProperty(propertyName, Option(propertyValue).getOrElse(defaultValue))
if (properties.getProperty(propertyName) != defaultValue)
info("Property %s is overridden to %s - data loss or message reordering is possible.")
info("Property %s is overridden to %s - data loss or message reordering is possible.".format(propertyName, propertyValue))
}
class MirrorMakerThread(connector: ZookeeperConsumerConnector,
stream: KafkaStream[Array[Byte], Array[Byte]],
filterSpec: TopicFilter,
val threadId: Int) extends Thread with Logging with KafkaMetricsGroup {
private val threadName = "mirrormaker-thread-" + threadId
private val shutdownLatch: CountDownLatch = new CountDownLatch(1)
@ -313,8 +297,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
override def run() {
info("Starting mirror maker thread " + threadName)
val iter = stream.iterator()
try {
// Creating one stream per each connector instance
val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())
require(streams.size == 1)
val stream = streams(0)
val iter = stream.iterator()
// TODO: Need to be changed after KAFKA-1660 is available.
while (!exitingOnSendFailure && !shuttingDown) {
try {
@ -333,10 +322,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
}
} catch {
case t: Throwable =>
fatal("Producer thread failure due to ", t)
fatal("Mirror maker thread failure due to ", t)
} finally {
shutdownLatch.countDown()
info("Producer thread stopped")
info("Mirror maker thread stopped")
// if it exits accidentally, stop the entire mirror maker
if (!isShuttingdown.get()) {
fatal("Mirror maker thread exited abnormally, stopping the whole mirror maker.")
@ -360,7 +349,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
}
catch {
case ie: InterruptedException =>
warn("Interrupt during shutdown of ProducerThread")
warn("Interrupt during shutdown of the mirror maker thread")
}
}
@ -370,7 +359,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
info("Mirror maker thread shutdown complete")
} catch {
case ie: InterruptedException =>
warn("Shutdown of the producer thread interrupted")
warn("Shutdown of the mirror maker thread interrupted")
}
}
}