KAFKA-1997; Refactor MirrorMaker based on KIP-3; reviewed by Joel Koshy and Guozhang Wang

This commit is contained in:
Jiangjie Qin 2015-03-13 15:06:10 -07:00 committed by Guozhang Wang
parent b7439c8081
commit c41c7b40b6
7 changed files with 369 additions and 532 deletions

View File

@ -12,6 +12,25 @@
*/ */
package org.apache.kafka.clients.producer.internals; package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.CopyOnWriteMap;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
@ -25,25 +44,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.CopyOnWriteMap;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords} * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords}
* instances to be sent to the server. * instances to be sent to the server.

View File

@ -19,16 +19,18 @@ package kafka.consumer
import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.ZkClient
import kafka.common.TopicAndPartition import kafka.common.TopicAndPartition
import kafka.utils.{Utils, ZkUtils, Logging} import kafka.utils.{Pool, Utils, ZkUtils, Logging}
import scala.collection.mutable
trait PartitionAssignor { trait PartitionAssignor {
/** /**
* Assigns partitions to consumer instances in a group. * Assigns partitions to consumer instances in a group.
* @return An assignment map of partition to consumer thread. This only includes assignments for threads that belong * @return An assignment map of partition to this consumer group. This includes assignments for threads that belong
* to the given assignment-context's consumer. * to the same consumer group.
*/ */
def assign(ctx: AssignmentContext): scala.collection.Map[TopicAndPartition, ConsumerThreadId] def assign(ctx: AssignmentContext): Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]]
} }
@ -69,7 +71,10 @@ class AssignmentContext(group: String, val consumerId: String, excludeInternalTo
class RoundRobinAssignor() extends PartitionAssignor with Logging { class RoundRobinAssignor() extends PartitionAssignor with Logging {
def assign(ctx: AssignmentContext) = { def assign(ctx: AssignmentContext) = {
val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]()
val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
val partitionAssignment =
new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory))
if (ctx.consumersForTopic.size > 0) { if (ctx.consumersForTopic.size > 0) {
// check conditions (a) and (b) // check conditions (a) and (b)
@ -102,12 +107,12 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging {
allTopicPartitions.foreach(topicPartition => { allTopicPartitions.foreach(topicPartition => {
val threadId = threadAssignor.next() val threadId = threadAssignor.next()
if (threadId.consumer == ctx.consumerId) // record the partition ownership decision
partitionOwnershipDecision += (topicPartition -> threadId) val assignmentForConsumer = partitionAssignment.getAndMaybePut(threadId.consumer)
assignmentForConsumer += (topicPartition -> threadId)
}) })
} }
partitionAssignment
partitionOwnershipDecision
} }
} }
@ -123,9 +128,10 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging {
class RangeAssignor() extends PartitionAssignor with Logging { class RangeAssignor() extends PartitionAssignor with Logging {
def assign(ctx: AssignmentContext) = { def assign(ctx: AssignmentContext) = {
val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]() val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
val partitionAssignment =
for ((topic, consumerThreadIdSet) <- ctx.myTopicThreadIds) { new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory))
for (topic <- ctx.myTopicThreadIds.keySet) {
val curConsumers = ctx.consumersForTopic(topic) val curConsumers = ctx.consumersForTopic(topic)
val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic) val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic)
@ -135,7 +141,7 @@ class RangeAssignor() extends PartitionAssignor with Logging {
info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions + info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions +
" for topic " + topic + " with consumers: " + curConsumers) " for topic " + topic + " with consumers: " + curConsumers)
for (consumerThreadId <- consumerThreadIdSet) { for (consumerThreadId <- curConsumers) {
val myConsumerPosition = curConsumers.indexOf(consumerThreadId) val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
assert(myConsumerPosition >= 0) assert(myConsumerPosition >= 0)
val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
@ -152,12 +158,12 @@ class RangeAssignor() extends PartitionAssignor with Logging {
val partition = curPartitions(i) val partition = curPartitions(i)
info(consumerThreadId + " attempting to claim partition " + partition) info(consumerThreadId + " attempting to claim partition " + partition)
// record the partition ownership decision // record the partition ownership decision
partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId) val assignmentForConsumer = partitionAssignment.getAndMaybePut(consumerThreadId.consumer)
assignmentForConsumer += (TopicAndPartition(topic, partition) -> consumerThreadId)
} }
} }
} }
} }
partitionAssignment
partitionOwnershipDecision
} }
} }

View File

@ -670,7 +670,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
*/ */
closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap) closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap)
if (consumerRebalanceListener != null) { if (consumerRebalanceListener != null) {
info("Calling beforeReleasingPartitions() from rebalance listener.") info("Invoking rebalance listener before relasing partition ownerships.")
consumerRebalanceListener.beforeReleasingPartitions( consumerRebalanceListener.beforeReleasingPartitions(
if (topicRegistry.size == 0) if (topicRegistry.size == 0)
new java.util.HashMap[String, java.util.Set[java.lang.Integer]] new java.util.HashMap[String, java.util.Set[java.lang.Integer]]
@ -682,12 +682,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
} }
releasePartitionOwnership(topicRegistry) releasePartitionOwnership(topicRegistry)
val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient)
val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext) val globalPartitionAssignment = partitionAssignor.assign(assignmentContext)
val partitionAssignment = Option(globalPartitionAssignment.get(assignmentContext.consumerId)).getOrElse(
mutable.HashMap.empty[TopicAndPartition, ConsumerThreadId]
)
val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]](
valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo])) valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo]))
// fetch current offsets for all topic-partitions // fetch current offsets for all topic-partitions
val topicPartitions = partitionOwnershipDecision.keySet.toSeq val topicPartitions = partitionAssignment.keySet.toSeq
val offsetFetchResponseOpt = fetchOffsets(topicPartitions) val offsetFetchResponseOpt = fetchOffsets(topicPartitions)
@ -698,7 +701,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
topicPartitions.foreach(topicAndPartition => { topicPartitions.foreach(topicAndPartition => {
val (topic, partition) = topicAndPartition.asTuple val (topic, partition) = topicAndPartition.asTuple
val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset
val threadId = partitionOwnershipDecision(topicAndPartition) val threadId = partitionAssignment(topicAndPartition)
addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId) addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId)
}) })
@ -706,10 +709,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
* move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt
* A rebalancing attempt is completed successfully only after the fetchers have been started correctly * A rebalancing attempt is completed successfully only after the fetchers have been started correctly
*/ */
if(reflectPartitionOwnershipDecision(partitionOwnershipDecision)) { if(reflectPartitionOwnershipDecision(partitionAssignment)) {
allTopicsOwnedPartitionsCount = partitionOwnershipDecision.size allTopicsOwnedPartitionsCount = partitionAssignment.size
partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic } partitionAssignment.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic }
.foreach { case (topic, partitionThreadPairs) => .foreach { case (topic, partitionThreadPairs) =>
newGauge("OwnedPartitionsCount", newGauge("OwnedPartitionsCount",
new Gauge[Int] { new Gauge[Int] {
@ -719,6 +722,30 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
} }
topicRegistry = currentTopicRegistry topicRegistry = currentTopicRegistry
// Invoke beforeStartingFetchers callback if the consumerRebalanceListener is set.
if (consumerRebalanceListener != null) {
info("Invoking rebalance listener before starting fetchers.")
// Partition assignor returns the global partition assignment organized as a map of [TopicPartition, ThreadId]
// per consumer, and we need to re-organize it to a map of [Partition, ThreadId] per topic before passing
// to the rebalance callback.
val partitionAssginmentGroupByTopic = globalPartitionAssignment.values.flatten.groupBy[String] {
case (topicPartition, _) => topicPartition.topic
}
val partitionAssigmentMapForCallback = partitionAssginmentGroupByTopic.map({
case (topic, partitionOwnerShips) =>
val partitionOwnershipForTopicScalaMap = partitionOwnerShips.map({
case (topicAndPartition, consumerThreadId) =>
topicAndPartition.partition -> consumerThreadId
})
topic -> mapAsJavaMap(collection.mutable.Map(partitionOwnershipForTopicScalaMap.toSeq:_*))
.asInstanceOf[java.util.Map[java.lang.Integer, ConsumerThreadId]]
})
consumerRebalanceListener.beforeStartingFetchers(
consumerIdString,
mapAsJavaMap(collection.mutable.Map(partitionAssigmentMapForCallback.toSeq:_*))
)
}
updateFetcher(cluster) updateFetcher(cluster)
true true
} else { } else {
@ -792,9 +819,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
} }
} }
private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[TopicAndPartition, ConsumerThreadId]): Boolean = { private def reflectPartitionOwnershipDecision(partitionAssignment: Map[TopicAndPartition, ConsumerThreadId]): Boolean = {
var successfullyOwnedPartitions : List[(String, Int)] = Nil var successfullyOwnedPartitions : List[(String, Int)] = Nil
val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner => val partitionOwnershipSuccessful = partitionAssignment.map { partitionOwner =>
val topic = partitionOwner._1.topic val topic = partitionOwner._1.topic
val partition = partitionOwner._1.partition val partition = partitionOwner._1.partition
val consumerThreadId = partitionOwner._2 val consumerThreadId = partitionOwner._2

View File

@ -17,6 +17,8 @@
package kafka.javaapi.consumer; package kafka.javaapi.consumer;
import kafka.consumer.ConsumerThreadId;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -33,7 +35,17 @@ public interface ConsumerRebalanceListener {
* This listener is initially added to prevent duplicate messages on consumer rebalance * This listener is initially added to prevent duplicate messages on consumer rebalance
* in mirror maker, where offset auto commit is disabled to prevent data loss. It could * in mirror maker, where offset auto commit is disabled to prevent data loss. It could
* also be used in more general cases. * also be used in more general cases.
* @param partitionOwnership The partition this consumer currently owns.
*/ */
public void beforeReleasingPartitions(Map<String, Set<Integer>> partitionOwnership); public void beforeReleasingPartitions(Map<String, Set<Integer>> partitionOwnership);
/**
* This method is called after the new partition assignment is finished but before fetcher
* threads start. A map of new global partition assignment is passed in as parameter.
* @param consumerId The consumer Id string of the consumer invoking this callback.
* @param globalPartitionAssignment A Map[topic, Map[Partition, ConsumerThreadId]]. It is the global partition
* assignment of this consumer group.
*/
public void beforeStartingFetchers(String consumerId, Map<String, Map<Integer, ConsumerThreadId>> globalPartitionAssignment);
} }

View File

@ -17,87 +17,64 @@
package kafka.tools package kafka.tools
import com.yammer.metrics.core._ import java.util
import kafka.common.{TopicAndPartition, OffsetAndMetadata} import java.util.concurrent.CountDownLatch
import kafka.javaapi.consumer.ConsumerRebalanceListener import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import kafka.utils._ import java.util.{Collections, Properties}
import kafka.consumer._
import kafka.serializer._
import kafka.producer.{KeyedMessage, ProducerConfig}
import kafka.metrics.KafkaMetricsGroup
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.clients.producer.{KafkaProducer, RecordMetadata, ProducerRecord}
import scala.collection.JavaConversions._
import com.yammer.metrics.core.Gauge
import joptsimple.OptionParser import joptsimple.OptionParser
import java.util.Properties import kafka.consumer.{Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, KafkaStream, Whitelist, ZookeeperConsumerConnector}
import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean} import kafka.javaapi.consumer.ConsumerRebalanceListener
import java.util.concurrent._ import kafka.message.MessageAndMetadata
import kafka.metrics.KafkaMetricsGroup
import kafka.serializer.DefaultDecoder
import kafka.utils.{CommandLineUtils, Logging, Utils}
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
/** /**
* The mirror maker consists of three major modules: * The mirror maker has the following architecture:
* Consumer Threads - The consumer threads consume messages from source Kafka cluster through * - There are N mirror maker thread shares one ZookeeperConsumerConnector and each owns a Kafka stream.
* ZookeeperConsumerConnector and put them into corresponding data channel queue based on hash value * - All the mirror maker threads share one producer.
* of source topic-partitionId string. This guarantees the message order in source partition is * - Each mirror maker thread periodically flushes the producer and then commits all offsets.
* preserved.
* Producer Threads - Producer threads take messages out of data channel queues and send them to target cluster. Each
* producer thread is bound to one data channel queue, so that the message order is preserved.
* Data Channel - The data channel has multiple queues. The number of queue is same as number of producer threads.
* *
* If new producer is used, the offset will be committed based on the new producer's callback. An offset map is * @note For mirror maker, the following settings are set by default to make sure there is no data loss:
* maintained and updated on each send() callback. A separate offset commit thread will commit the offset periodically.
* @note For mirror maker, the following settings are required to make sure there is no data loss:
* 1. use new producer with following settings * 1. use new producer with following settings
* acks=all * acks=all
* retries=max integer * retries=max integer
* block.on.buffer.full=true * block.on.buffer.full=true
* 2. Consumer Settings * 2. Consumer Settings
* auto.commit.enable=false * auto.commit.enable=false
* If --no.data.loss flag is set in option, then those settings are automatically applied. * 3. Mirror Maker Setting:
* abort.on.send.failure=true
*/ */
object MirrorMaker extends Logging with KafkaMetricsGroup { object MirrorMaker extends Logging with KafkaMetricsGroup {
private var connector: ZookeeperConsumerConnector = null private var connectors: Seq[ZookeeperConsumerConnector] = null
private var consumerThreads: Seq[ConsumerThread] = null private var producer: MirrorMakerProducer = null
private var producerThreads: Seq[ProducerThread] = null private var mirrorMakerThreads: Seq[MirrorMakerThread] = null
private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false) private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false)
private val scheduler: KafkaScheduler = new KafkaScheduler(threads = 1)
private val unackedOffsetsMap: Pool[TopicAndPartition, UnackedOffsets] =
new Pool[TopicAndPartition, UnackedOffsets](valueFactory = Some((k: TopicAndPartition) => new UnackedOffsets))
// Track the messages unacked for consumer rebalance // Track the messages unacked for consumer rebalance
private var numUnackedMessages: AtomicInteger = new AtomicInteger(0) private var numDroppedMessages: AtomicInteger = new AtomicInteger(0)
private var numSkippedUnackedMessages: AtomicInteger = new AtomicInteger(0) private var messageHandler: MirrorMakerMessageHandler = null
private var consumerRebalanceListener: ConsumerRebalanceListener = null private var offsetCommitIntervalMs = 0
// This is to indicate whether the rebalance is going on so the producer callback knows if private var abortOnSendFailure: Boolean = true
// the flag indicates internal consumer rebalance callback is waiting for all the messages sent to be acked. @volatile private var exitingOnSendFailure: Boolean = false
private var waitingForMessageAcks: Boolean = false
private val shutdownMessage : MirrorMakerRecord = new MirrorMakerRecord("shutdown", 0, 0, null, "shutdown".getBytes)
newGauge("MirrorMaker-NumUnackedMessages",
new Gauge[Int] {
def value = numUnackedMessages.get()
})
// The number of unacked offsets in the unackedOffsetsMap
newGauge("MirrorMaker-UnackedOffsetListsSize",
new Gauge[Int] {
def value = unackedOffsetsMap.iterator.map{
case(_, unackedOffsets) => unackedOffsets.size
}.sum
})
// If a message send failed after retries are exhausted. The offset of the messages will also be removed from // If a message send failed after retries are exhausted. The offset of the messages will also be removed from
// the unacked offset list to avoid offset commit being stuck on that offset. In this case, the offset of that // the unacked offset list to avoid offset commit being stuck on that offset. In this case, the offset of that
// message was not really acked, but was skipped. This metric records the number of skipped offsets. // message was not really acked, but was skipped. This metric records the number of skipped offsets.
newGauge("MirrorMaker-NumSkippedOffsets", newGauge("MirrorMaker-numDroppedMessages",
new Gauge[Int] { new Gauge[Int] {
def value = numSkippedUnackedMessages.get() def value = numDroppedMessages.get()
}) })
def main(args: Array[String]) { def main(args: Array[String]) {
info ("Starting mirror maker") info("Starting mirror maker")
val parser = new OptionParser val parser = new OptionParser
val consumerConfigOpt = parser.accepts("consumer.config", val consumerConfigOpt = parser.accepts("consumer.config",
@ -113,19 +90,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
.describedAs("config file") .describedAs("config file")
.ofType(classOf[String]) .ofType(classOf[String])
val useNewProducerOpt = parser.accepts("new.producer",
"Use the new producer implementation.")
val noDataLossOpt = parser.accepts("no.data.loss",
"Configure the mirror maker to have no data loss.")
val numProducersOpt = parser.accepts("num.producers",
"Number of producer instances")
.withRequiredArg()
.describedAs("Number of producers")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
val numStreamsOpt = parser.accepts("num.streams", val numStreamsOpt = parser.accepts("num.streams",
"Number of consumption streams.") "Number of consumption streams.")
.withRequiredArg() .withRequiredArg()
@ -133,20 +97,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
.ofType(classOf[java.lang.Integer]) .ofType(classOf[java.lang.Integer])
.defaultsTo(1) .defaultsTo(1)
val bufferSizeOpt = parser.accepts("queue.size",
"Number of messages that are buffered between the consumer and producer")
.withRequiredArg()
.describedAs("Queue size in terms of number of messages")
.ofType(classOf[java.lang.Integer])
.defaultsTo(10000)
val bufferByteSizeOpt = parser.accepts("queue.byte.size",
"Maximum bytes that can be buffered in each data channel queue")
.withRequiredArg()
.describedAs("Data channel queue size in terms of number of bytes")
.ofType(classOf[java.lang.Integer])
.defaultsTo(100000000)
val whitelistOpt = parser.accepts("whitelist", val whitelistOpt = parser.accepts("whitelist",
"Whitelist of topics to mirror.") "Whitelist of topics to mirror.")
.withRequiredArg() .withRequiredArg()
@ -160,7 +110,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
.ofType(classOf[String]) .ofType(classOf[String])
val offsetCommitIntervalMsOpt = parser.accepts("offset.commit.interval.ms", val offsetCommitIntervalMsOpt = parser.accepts("offset.commit.interval.ms",
"Offset commit interval in ms") "Offset commit interval in ms")
.withRequiredArg() .withRequiredArg()
.describedAs("offset commit interval in millisecond") .describedAs("offset commit interval in millisecond")
.ofType(classOf[java.lang.Integer]) .ofType(classOf[java.lang.Integer])
@ -172,12 +122,38 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
.describedAs("A custom rebalance listener of type ConsumerRebalanceListener") .describedAs("A custom rebalance listener of type ConsumerRebalanceListener")
.ofType(classOf[String]) .ofType(classOf[String])
val rebalanceListenerArgsOpt = parser.accepts("rebalance.listener.args",
"Arguments used by custom rebalance listener for mirror maker consumer")
.withRequiredArg()
.describedAs("Arguments passed to custom rebalance listener constructor as a string.")
.ofType(classOf[String])
val messageHandlerOpt = parser.accepts("message.handler",
"The consumer rebalance listener to use for mirror maker consumer.")
.withRequiredArg()
.describedAs("A custom rebalance listener of type MirrorMakerMessageHandler")
.ofType(classOf[String])
val messageHandlerArgsOpt = parser.accepts("message.handler.args",
"Arguments used by custom rebalance listener for mirror maker consumer")
.withRequiredArg()
.describedAs("Arguments passed to message handler constructor.")
.ofType(classOf[String])
val abortOnSendFailureOpt = parser.accepts("abort.on.send.failure",
"Configure the mirror maker to exit on a failed send.")
.withRequiredArg()
.describedAs("Stop the entire mirror maker when a send failure occurs")
.ofType(classOf[String])
.defaultsTo("true")
val helpOpt = parser.accepts("help", "Print this message.") val helpOpt = parser.accepts("help", "Print this message.")
if(args.length == 0) if (args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "Continuously copy data between two Kafka clusters.") CommandLineUtils.printUsageAndDie(parser, "Continuously copy data between two Kafka clusters.")
val options = parser.parse(args : _*)
val options = parser.parse(args: _*)
if (options.has(helpOpt)) { if (options.has(helpOpt)) {
parser.printHelpOn(System.out) parser.printHelpOn(System.out)
@ -190,109 +166,112 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
System.exit(1) System.exit(1)
} }
val numProducers = options.valueOf(numProducersOpt).intValue() abortOnSendFailure = options.valueOf(abortOnSendFailureOpt).toBoolean
offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue()
val numStreams = options.valueOf(numStreamsOpt).intValue() val numStreams = options.valueOf(numStreamsOpt).intValue()
val bufferSize = options.valueOf(bufferSizeOpt).intValue()
val bufferByteSize = options.valueOf(bufferByteSizeOpt).intValue()
val offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue()
// create consumer connector // create producer
val consumerConfigProps = Utils.loadProps(options.valueOf(consumerConfigOpt))
val noDataLoss = options.has(noDataLossOpt)
// disable consumer auto commit because offset will be committed by offset commit thread.
if (noDataLoss)
consumerConfigProps.setProperty("auto.commit.enable","false")
val consumerConfig = new ConsumerConfig(consumerConfigProps)
connector = new ZookeeperConsumerConnector(consumerConfig)
// create a data channel btw the consumers and the producers
val mirrorDataChannel = new DataChannel(bufferSize, bufferByteSize, numInputs = numStreams, numOutputs = numProducers)
// set consumer rebalance listener
// custom rebalance listener will be invoked after internal listener finishes its work.
val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt)
val customRebalanceListener = {
if (customRebalanceListenerClass != null)
Some(Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass))
else
None
}
consumerRebalanceListener = new InternalRebalanceListener(mirrorDataChannel, customRebalanceListener)
connector.setConsumerRebalanceListener(consumerRebalanceListener)
// create producer threads
val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
val useNewProducer = { // Defaults to no data loss settings.
// Override producer settings if no.data.loss is set maybeSetDefaultProperty(producerProps, "retries", Int.MaxValue.toString)
if (noDataLoss) { maybeSetDefaultProperty(producerProps, "block.on.buffer.full", "true")
producerProps.setProperty("retries",Int.MaxValue.toString) maybeSetDefaultProperty(producerProps, "acks", "all")
producerProps.setProperty("block.on.buffer.full", "true") producer = new MirrorMakerProducer(producerProps)
producerProps.setProperty("acks","all")
true // Create consumer connector
val consumerConfigProps = Utils.loadProps(options.valueOf(consumerConfigOpt))
// Disable consumer auto offsets commit to prevent data loss.
maybeSetDefaultProperty(consumerConfigProps, "auto.commit.enable", "false")
// Set the consumer timeout so we will not block for low volume pipeline. The timeout is necessary to make sure
// Offsets are still committed for those low volume pipelines.
maybeSetDefaultProperty(consumerConfigProps, "consumer.timeout.ms", "10000")
// The default client id is group id, we manually set client id to groupId-index to avoid metric collision
val groupIdString = consumerConfigProps.getProperty("group.id")
connectors = (0 until numStreams) map { i =>
consumerConfigProps.setProperty("client.id", groupIdString + "-" + i.toString)
val consumerConfig = new ConsumerConfig(consumerConfigProps)
new ZookeeperConsumerConnector(consumerConfig)
}
// Set consumer rebalance listener.
// Custom rebalance listener will be invoked after internal listener finishes its work.
val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt)
val rebalanceListenerArgs = options.valueOf(rebalanceListenerArgsOpt)
val customRebalanceListener = {
if (customRebalanceListenerClass != null) {
if (rebalanceListenerArgs != null)
Some(Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass, rebalanceListenerArgs))
else
Some(Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass))
} else { } else {
options.has(useNewProducerOpt) None
} }
} }
val clientId = producerProps.getProperty("client.id", "") connectors.foreach {
producerThreads = (0 until numProducers).map(i => { connector =>
producerProps.setProperty("client.id", clientId + "-" + i) val consumerRebalanceListener = new InternalRebalanceListener(connector, customRebalanceListener)
val producer = connector.setConsumerRebalanceListener(consumerRebalanceListener)
if (useNewProducer) {
producerProps.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer")
producerProps.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer")
new MirrorMakerNewProducer(producerProps)
}
else
new MirrorMakerOldProducer(producerProps)
new ProducerThread(mirrorDataChannel, producer, i)
})
// start offset commit thread
if (noDataLoss) {
/**
* The offset commit thread periodically commit consumed offsets. With the new producer,
* the offsets are updated upon the returned future metadata of the send() call; with the old producer,
* the offsets are updated upon the consumer's iterator advances. By doing this, it is guaranteed no data
* loss even when mirror maker is uncleanly shutdown with the new producer, while with the old producer
* messages inside the data channel could be lost upon mirror maker unclean shutdown.
*/
scheduler.startup()
scheduler.schedule("offset-commit", commitOffsets, 0, offsetCommitIntervalMs, TimeUnit.MILLISECONDS)
} }
// create consumer threads // create Kafka streams
val filterSpec = if (options.has(whitelistOpt)) val filterSpec = if (options.has(whitelistOpt))
new Whitelist(options.valueOf(whitelistOpt)) new Whitelist(options.valueOf(whitelistOpt))
else else
new Blacklist(options.valueOf(blacklistOpt)) new Blacklist(options.valueOf(blacklistOpt))
var streams: Seq[KafkaStream[Array[Byte], Array[Byte]]] = Nil // create a (connector->stream) sequence
try { val connectorStream = (0 until numStreams) map {
streams = connector.createMessageStreamsByFilter(filterSpec, numStreams, new DefaultDecoder(), new DefaultDecoder()) i => {
} catch { var stream: Seq[KafkaStream[Array[Byte], Array[Byte]]] = null
case t: Throwable => try {
fatal("Unable to create stream - shutting down mirror maker.", t) // Creating just on stream per each connector instance
connector.shutdown() stream = connectors(i).createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())
require(stream.size == 1)
} catch {
case t: Throwable =>
fatal("Unable to create stream - shutting down mirror maker.", t)
connectors(i).shutdown()
}
connectors(i) -> stream(0)
}
} }
consumerThreads = streams.zipWithIndex.map(streamAndIndex => new ConsumerThread(streamAndIndex._1, mirrorDataChannel, streamAndIndex._2))
assert(consumerThreads.size == numStreams)
Runtime.getRuntime.addShutdownHook(new Thread() { // Create mirror maker threads
mirrorMakerThreads = (0 until numStreams) map ( i =>
new MirrorMakerThread(connectorStream(i)._1, connectorStream(i)._2, i)
)
// Create and initialize message handler
val customMessageHandlerClass = options.valueOf(messageHandlerOpt)
val messageHandlerArgs = options.valueOf(messageHandlerArgsOpt)
messageHandler = {
if (customMessageHandlerClass != null) {
if (messageHandlerArgs != null)
Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, rebalanceListenerArgs)
else
Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass)
} else {
defaultMirrorMakerMessageHandler
}
}
Runtime.getRuntime.addShutdownHook(new Thread("MirrorMakerShutdownHook") {
override def run() { override def run() {
cleanShutdown() cleanShutdown()
} }
}) })
consumerThreads.foreach(_.start) mirrorMakerThreads.foreach(_.start())
producerThreads.foreach(_.start) mirrorMakerThreads.foreach(_.awaitShutdown())
}
// we wait on producer's shutdown latch instead of consumers def commitOffsets(connector: ZookeeperConsumerConnector) {
// since the consumer threads can hit a timeout/other exception; if (!exitingOnSendFailure) {
// but in this case the producer should still be able to shutdown trace("Committing offsets.")
// based on the shutdown message in the channel connector.commitOffsets
producerThreads.foreach(_.awaitShutdown()) } else {
info("Exiting on send failure, skip committing offsets.")
}
} }
def cleanShutdown() { def cleanShutdown() {
@ -300,167 +279,56 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
info("Start clean shutdown.") info("Start clean shutdown.")
// Shutdown consumer threads. // Shutdown consumer threads.
info("Shutting down consumer threads.") info("Shutting down consumer threads.")
if (consumerThreads != null) { if (mirrorMakerThreads != null) {
consumerThreads.foreach(_.shutdown()) mirrorMakerThreads.foreach(_.shutdown())
consumerThreads.foreach(_.awaitShutdown()) mirrorMakerThreads.foreach(_.awaitShutdown())
} }
// After consumer threads exit, shutdown producer. info("Closing producer.")
info("Shutting down producer threads.") producer.close()
if (producerThreads != null) { connectors.foreach(commitOffsets)
producerThreads.foreach(_.shutdown()) // Connector should only be shutdown after offsets are committed.
producerThreads.foreach(_.awaitShutdown())
}
// offset commit thread should only be shutdown after producer threads are shutdown, so we don't lose offsets.
scheduler.shutdown()
swallow(commitOffsets())
// connector should only be shutdown after offsets are committed.
info("Shutting down consumer connectors.") info("Shutting down consumer connectors.")
if (connector != null) connectors.foreach(_.shutdown())
connector.shutdown()
info("Kafka mirror maker shutdown successfully") info("Kafka mirror maker shutdown successfully")
} }
} }
class DataChannel(messageCapacity: Int, byteCapacity: Int, numInputs: Int, numOutputs: Int) private def maybeSetDefaultProperty(properties: Properties, propertyName: String, defaultValue: String) {
extends KafkaMetricsGroup { properties.setProperty(propertyName, Option(properties.getProperty(propertyName)).getOrElse(defaultValue))
if (properties.getProperty(propertyName) != defaultValue)
val queues = new Array[ByteBoundedBlockingQueue[MirrorMakerRecord]](numOutputs) info("Property %s is overridden to %s - data loss or message reordering is possible.")
val channelSizeHists = new Array[Histogram](numOutputs)
val channelByteSizeHists = new Array[Histogram](numOutputs)
val sizeFunction = (record: MirrorMakerRecord) => record.size
for (i <- 0 until numOutputs) {
queues(i) = new ByteBoundedBlockingQueue[MirrorMakerRecord](messageCapacity, byteCapacity, Some(sizeFunction))
channelSizeHists(i) = newHistogram("MirrorMaker-DataChannel-queue-%d-NumMessages".format(i))
channelByteSizeHists(i) = newHistogram("MirrorMaker-DataChannel-queue-%d-Bytes".format(i))
}
private val channelRecordSizeHist = newHistogram("MirrorMaker-DataChannel-Record-Size")
// We use a single meter for aggregated wait percentage for the data channel.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded wait
// time should be discounted by # threads.
private val waitPut = newMeter("MirrorMaker-DataChannel-WaitOnPut", "percent", TimeUnit.NANOSECONDS)
private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.NANOSECONDS)
def put(record: MirrorMakerRecord) {
// Use hash of source topic-partition to decide which queue to put the message in. The benefit is that
// we can maintain the message order for both keyed and non-keyed messages.
val queueId =
Utils.abs(java.util.Arrays.hashCode((record.sourceTopic + record.sourcePartition).toCharArray)) % numOutputs
put(record, queueId)
}
def put(record: MirrorMakerRecord, queueId: Int) {
val queue = queues(queueId)
var putSucceed = false
while (!putSucceed) {
val startPutTime = SystemTime.nanoseconds
putSucceed = queue.offer(record, 500, TimeUnit.MILLISECONDS)
waitPut.mark((SystemTime.nanoseconds - startPutTime) / numInputs)
}
channelSizeHists(queueId).update(queue.size())
channelByteSizeHists(queueId).update(queue.byteSize())
channelRecordSizeHist.update(sizeFunction(record))
}
def take(queueId: Int): MirrorMakerRecord = {
val queue = queues(queueId)
var data: MirrorMakerRecord = null
while (data == null) {
val startTakeTime = SystemTime.nanoseconds
data = queue.poll(500, TimeUnit.MILLISECONDS)
waitTake.mark((SystemTime.nanoseconds - startTakeTime) / numOutputs)
}
channelSizeHists(queueId).update(queue.size())
channelByteSizeHists(queueId).update(queue.byteSize())
data
}
def clear() {
queues.foreach(queue => queue.clear())
}
} }
class ConsumerThread(stream: KafkaStream[Array[Byte], Array[Byte]], class MirrorMakerThread(connector: ZookeeperConsumerConnector,
mirrorDataChannel: DataChannel, stream: KafkaStream[Array[Byte], Array[Byte]],
threadId: Int) val threadId: Int) extends Thread with Logging with KafkaMetricsGroup {
extends Thread with Logging with KafkaMetricsGroup { private val threadName = "mirrormaker-thread-" + threadId
private val shutdownLatch = new CountDownLatch(1)
private val threadName = "mirrormaker-consumer-" + threadId
this.logIdent = "[%s] ".format(threadName)
private var shutdownFlag: Boolean = false
this.setName(threadName)
override def run() {
info("Starting mirror maker consumer thread " + threadName)
try {
val iter = stream.iterator()
while (!shutdownFlag && iter.hasNext()) {
val msgAndMetadata = iter.next()
val data = new MirrorMakerRecord(msgAndMetadata.topic,
msgAndMetadata.partition,
msgAndMetadata.offset,
msgAndMetadata.key(),
msgAndMetadata.message())
mirrorDataChannel.put(data)
}
} catch {
case e: Throwable => {
fatal("Stream unexpectedly exited.", e)
}
} finally {
shutdownLatch.countDown()
info("Consumer thread stopped")
// If it exits accidentally, stop the entire mirror maker.
if (!isShuttingdown.get()) {
fatal("Consumer thread exited abnormally, stopping the whole mirror maker.")
System.exit(-1)
}
}
}
def shutdown() {
shutdownFlag = true
}
def awaitShutdown() {
try {
shutdownLatch.await()
info("Consumer thread shutdown complete")
} catch {
case e: InterruptedException => fatal("Shutdown of the consumer thread interrupted. This might leak data!")
}
}
}
class ProducerThread (val dataChannel: DataChannel,
val producer: MirrorMakerBaseProducer,
val threadId: Int) extends Thread with Logging with KafkaMetricsGroup {
private val threadName = "mirrormaker-producer-" + threadId
private val shutdownLatch: CountDownLatch = new CountDownLatch(1) private val shutdownLatch: CountDownLatch = new CountDownLatch(1)
private var lastOffsetCommitMs = System.currentTimeMillis()
@volatile private var shuttingDown: Boolean = false
this.logIdent = "[%s] ".format(threadName) this.logIdent = "[%s] ".format(threadName)
setName(threadName) setName(threadName)
override def run() { override def run() {
info("Starting mirror maker producer thread " + threadName) info("Starting mirror maker thread " + threadName)
val iter = stream.iterator()
try { try {
while (true) { // TODO: Need to be changed after KAFKA-1660 is available.
val data: MirrorMakerRecord = dataChannel.take(threadId) while (!exitingOnSendFailure && !shuttingDown) {
trace("Sending message with value size %d".format(data.value.size)) try {
if(data eq shutdownMessage) { while (!exitingOnSendFailure && !shuttingDown && iter.hasNext()) {
info("Received shutdown message") val data = iter.next()
return trace("Sending message with value size %d".format(data.message().size))
val records = messageHandler.handle(data)
records.foreach(producer.send)
maybeFlushAndCommitOffsets()
}
} catch {
case e: ConsumerTimeoutException =>
trace("Caught ConsumerTimeoutException, continue iteration.")
} }
producer.send(new TopicAndPartition(data.sourceTopic, data.sourcePartition), maybeFlushAndCommitOffsets()
data.sourceOffset,
data.key,
data.value)
} }
} catch { } catch {
case t: Throwable => case t: Throwable =>
@ -470,215 +338,112 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
info("Producer thread stopped") info("Producer thread stopped")
// if it exits accidentally, stop the entire mirror maker // if it exits accidentally, stop the entire mirror maker
if (!isShuttingdown.get()) { if (!isShuttingdown.get()) {
fatal("Producer thread exited abnormally, stopping the whole mirror maker.") fatal("Mirror maker thread exited abnormally, stopping the whole mirror maker.")
System.exit(-1) System.exit(-1)
} }
} }
} }
def maybeFlushAndCommitOffsets() {
if (System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) {
producer.flush()
commitOffsets(connector)
lastOffsetCommitMs = System.currentTimeMillis()
}
}
def shutdown() { def shutdown() {
try { try {
info("Producer thread " + threadName + " shutting down") info(threadName + " shutting down")
dataChannel.put(shutdownMessage, threadId) shuttingDown = true
} }
catch { catch {
case ie: InterruptedException => { case ie: InterruptedException =>
warn("Interrupt during shutdown of ProducerThread") warn("Interrupt during shutdown of ProducerThread")
}
} }
} }
def awaitShutdown() { def awaitShutdown() {
try { try {
shutdownLatch.await() shutdownLatch.await()
producer.close() info("Mirror maker thread shutdown complete")
info("Producer thread shutdown complete")
} catch { } catch {
case ie: InterruptedException => { case ie: InterruptedException =>
warn("Shutdown of the producer thread interrupted") warn("Shutdown of the producer thread interrupted")
}
} }
} }
} }
private def commitOffsets() { private class MirrorMakerProducer(val producerProps: Properties) {
try {
info("Committing offsets")
val offsetsToCommit = collection.immutable.Map(unackedOffsetsMap.map {
case (topicPartition, unackedOffsets) =>
topicPartition -> OffsetAndMetadata(unackedOffsets.getOffsetToCommit, null)
}.toSeq: _*)
if (connector == null) {
warn("No consumer connector available to commit offset.")
} else {
connector.commitOffsets(offsetsToCommit, isAutoCommit = false)
}
} catch {
case e: OutOfMemoryError =>
fatal("Shutting down mirror maker due to error when committing offsets.", e)
System.exit(-1)
case t: Throwable =>
warn("Offsets commit failed due to ", t)
}
}
private[kafka] trait MirrorMakerBaseProducer {
def send(sourceTopicPartition: TopicAndPartition, sourceOffset: Long, key: Array[Byte], value: Array[Byte])
def close()
}
private class MirrorMakerNewProducer (val producerProps: Properties) extends MirrorMakerBaseProducer {
val sync = producerProps.getProperty("producer.type", "async").equals("sync") val sync = producerProps.getProperty("producer.type", "async").equals("sync")
val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps) val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
override def send(sourceTopicPartition: TopicAndPartition, sourceOffset: Long, key: Array[Byte], value: Array[Byte]) { def send(record: ProducerRecord[Array[Byte], Array[Byte]]) {
val record = new ProducerRecord[Array[Byte], Array[Byte]](sourceTopicPartition.topic, key, value) if (sync) {
if(sync) {
this.producer.send(record).get() this.producer.send(record).get()
unackedOffsetsMap.getAndMaybePut(sourceTopicPartition).maybeUpdateMaxOffsetSeen(sourceOffset)
} else { } else {
val unackedOffsets = unackedOffsetsMap.getAndMaybePut(sourceTopicPartition)
// synchronize to ensure that addOffset precedes removeOffset
unackedOffsets synchronized {
val unackedOffset = new UnackedOffset(sourceOffset)
this.producer.send(record, this.producer.send(record,
new MirrorMakerProducerCallback(sourceTopicPartition, unackedOffset, key, value)) new MirrorMakerProducerCallback(record.topic(), record.key(), record.value()))
// add offset to unackedOffsets
unackedOffsets.addOffset(unackedOffset)
numUnackedMessages.incrementAndGet()
}
} }
} }
override def close() { def flush() {
this.producer.flush()
}
def close() {
this.producer.close() this.producer.close()
} }
} }
private class MirrorMakerOldProducer (val producerProps: Properties) extends MirrorMakerBaseProducer { private class MirrorMakerProducerCallback (topic: String, key: Array[Byte], value: Array[Byte])
extends ErrorLoggingCallback(topic, key, value, false) {
// default to byte array partitioner
if (producerProps.getProperty("partitioner.class") == null)
producerProps.setProperty("partitioner.class", classOf[kafka.producer.ByteArrayPartitioner].getName)
val producer = new kafka.producer.Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps))
override def send(topicPartition: TopicAndPartition, offset: Long, key: Array[Byte], value: Array[Byte]) {
this.producer.send(new KeyedMessage[Array[Byte], Array[Byte]](topicPartition.topic, key, value))
}
override def close() {
this.producer.close()
}
}
private class MirrorMakerProducerCallback (val topicPartition: TopicAndPartition,
val offset: UnackedOffset,
val key: Array[Byte],
val value: Array[Byte])
extends ErrorLoggingCallback(topicPartition.topic, key, value, false) {
override def onCompletion(metadata: RecordMetadata, exception: Exception) { override def onCompletion(metadata: RecordMetadata, exception: Exception) {
if (exception != null) { if (exception != null) {
// Use default call back to log error. This means the max retries of producer has reached and message // Use default call back to log error. This means the max retries of producer has reached and message
// still could not be sent. In this case we have to remove the offsets from list to let the mirror maker // still could not be sent.
// move on. The message failed to be sent will be lost in target cluster.
warn("Not be able to send message, offset of "+ topicPartition + " will not advance. Total number" +
"of skipped unacked messages is" + numSkippedUnackedMessages.incrementAndGet())
super.onCompletion(metadata, exception) super.onCompletion(metadata, exception)
} else { // If abort.on.send.failure is set, stop the mirror maker. Otherwise log skipped message and move on.
trace("Updating offset for %s to %d".format(topicPartition, offset.element)) if (abortOnSendFailure)
} exitingOnSendFailure = true
// remove the offset from the unackedOffsets
val unackedOffsets = unackedOffsetsMap.get(topicPartition)
unackedOffsets.removeOffset(offset)
// Notify the rebalance callback only when all the messages handed to producer are acked.
// There is a very slight chance that one message is held by producer thread and not handed to producer.
// That message might have duplicate. We are not handling that here.
numUnackedMessages synchronized {
if (numUnackedMessages.decrementAndGet() == 0 && waitingForMessageAcks) {
numUnackedMessages.notify()
}
} }
} }
} }
class InternalRebalanceListener (dataChannel: DataChannel, customRebalanceListener: Option[ConsumerRebalanceListener])
extends ConsumerRebalanceListener { private class InternalRebalanceListener(connector: ZookeeperConsumerConnector,
customRebalanceListener: Option[ConsumerRebalanceListener])
extends ConsumerRebalanceListener {
override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) { override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) {
info("Clearing data channel.") producer.flush()
dataChannel.clear() commitOffsets(connector)
info("Waiting until all the messages are acked.")
numUnackedMessages synchronized {
waitingForMessageAcks = true
while (numUnackedMessages.get() > 0) {
try {
numUnackedMessages.wait()
} catch {
case e: InterruptedException => info("Ignoring interrupt while waiting.")
}
}
waitingForMessageAcks = false
}
info("Committing offsets.")
commitOffsets()
// invoke custom consumer rebalance listener // invoke custom consumer rebalance listener
if (customRebalanceListener.isDefined) if (customRebalanceListener.isDefined)
customRebalanceListener.get.beforeReleasingPartitions(partitionOwnership) customRebalanceListener.get.beforeReleasingPartitions(partitionOwnership)
} }
override def beforeStartingFetchers(consumerId: String,
partitionAssignment: java.util.Map[String, java.util.Map[java.lang.Integer, ConsumerThreadId]]) {
if (customRebalanceListener.isDefined)
customRebalanceListener.get.beforeStartingFetchers(consumerId, partitionAssignment)
}
} }
private[kafka] class MirrorMakerRecord (val sourceTopic: String, /**
val sourcePartition: Int, * If message.handler.args is specified. A constructor that takes in a String as argument must exist.
val sourceOffset: Long, */
val key: Array[Byte], trait MirrorMakerMessageHandler {
val value: Array[Byte]) { def handle(record: MessageAndMetadata[Array[Byte], Array[Byte]]): util.List[ProducerRecord[Array[Byte], Array[Byte]]]
def size = {if (value == null) 0 else value.length} + {if (key == null) 0 else key.length}
} }
private class UnackedOffset(offset: Long) extends DoublyLinkedListNode[Long](offset) { private object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler {
override def handle(record: MessageAndMetadata[Array[Byte], Array[Byte]]): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = {
Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](record.topic, record.key(), record.message()))
}
} }
private class UnackedOffsets {
val offsetList = new DoublyLinkedList[Long]
var maxOffsetSeen: Long = -1L
def maybeUpdateMaxOffsetSeen(offset: Long) {
this synchronized {
maxOffsetSeen = math.max(maxOffsetSeen, offset)
}
}
def addOffset(offset: DoublyLinkedListNode[Long]) {
this synchronized {
offsetList.add(offset)
maybeUpdateMaxOffsetSeen(offset.element)
}
}
def removeOffset(offset: DoublyLinkedListNode[Long]) {
this synchronized {
offsetList.remove(offset)
}
}
def getOffsetToCommit: Long = {
this synchronized {
val smallestUnackedOffset = offsetList.peek()
if (smallestUnackedOffset == null)
// list is empty, commit maxOffsetSeen + 1
maxOffsetSeen + 1
else
// commit the smallest unacked offset
smallestUnackedOffset.element
}
}
def size: Int = offsetList.size
}
} }

View File

@ -164,7 +164,7 @@ private object PartitionAssignorTest extends Logging {
verifyAssignmentIsUniform: Boolean = false) { verifyAssignmentIsUniform: Boolean = false) {
val assignments = scenario.subscriptions.map{ case(consumer, subscription) => val assignments = scenario.subscriptions.map{ case(consumer, subscription) =>
val ctx = new AssignmentContext("g1", consumer, excludeInternalTopics = true, zkClient) val ctx = new AssignmentContext("g1", consumer, excludeInternalTopics = true, zkClient)
assignor.assign(ctx) assignor.assign(ctx).get(consumer)
} }
// check for uniqueness (i.e., any partition should be assigned to exactly one consumer stream) // check for uniqueness (i.e., any partition should be assigned to exactly one consumer stream)

View File

@ -17,22 +17,22 @@
package kafka.consumer package kafka.consumer
import java.util.{Collections, Properties}
import junit.framework.Assert._ import junit.framework.Assert._
import kafka.common.MessageStreamsExistException
import kafka.integration.KafkaServerTestHarness import kafka.integration.KafkaServerTestHarness
import kafka.javaapi.consumer.ConsumerRebalanceListener import kafka.javaapi.consumer.ConsumerRebalanceListener
import kafka.server._
import scala.collection._
import scala.collection.JavaConversions._
import org.scalatest.junit.JUnit3Suite
import kafka.message._ import kafka.message._
import kafka.serializer._ import kafka.serializer._
import org.I0Itec.zkclient.ZkClient import kafka.server._
import kafka.utils._
import kafka.producer.{KeyedMessage, Producer}
import java.util.{Collections, Properties}
import org.apache.log4j.{Logger, Level}
import kafka.utils.TestUtils._ import kafka.utils.TestUtils._
import kafka.common.{TopicAndPartition, MessageStreamsExistException} import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import org.apache.log4j.{Level, Logger}
import org.scalatest.junit.JUnit3Suite
import scala.collection._
class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging { class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
@ -362,10 +362,18 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
// Check if rebalance listener is fired // Check if rebalance listener is fired
assertEquals(true, rebalanceListener1.listenerCalled) assertEquals(true, rebalanceListener1.beforeReleasingPartitionsCalled)
assertEquals(true, rebalanceListener1.beforeStartingFetchersCalled)
assertEquals(null, rebalanceListener1.partitionOwnership.get(topic)) assertEquals(null, rebalanceListener1.partitionOwnership.get(topic))
// Check if partition assignment in rebalance listener is correct
assertEquals("group1_consumer1", rebalanceListener1.globalPartitionOwnership.get(topic).get(0).consumer)
assertEquals("group1_consumer1", rebalanceListener1.globalPartitionOwnership.get(topic).get(1).consumer)
assertEquals(0, rebalanceListener1.globalPartitionOwnership.get(topic).get(0).threadId)
assertEquals(0, rebalanceListener1.globalPartitionOwnership.get(topic).get(1).threadId)
assertEquals("group1_consumer1", rebalanceListener1.consumerId)
// reset the flag // reset the flag
rebalanceListener1.listenerCalled = false rebalanceListener1.beforeReleasingPartitionsCalled = false
rebalanceListener1.beforeStartingFetchersCalled = false
val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir) val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
val expected_1 = List(("0", "group1_consumer1-0"), val expected_1 = List(("0", "group1_consumer1-0"),
@ -379,16 +387,26 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
zkConsumerConnector2.setConsumerRebalanceListener(rebalanceListener2) zkConsumerConnector2.setConsumerRebalanceListener(rebalanceListener2)
val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
// Consume messages from consumer 1 to make sure it has finished rebalance
getMessages(nMessages, topicMessageStreams1)
val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir) val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
val expected_2 = List(("0", "group1_consumer1-0"), val expected_2 = List(("0", "group1_consumer1-0"),
("1", "group1_consumer2-0")) ("1", "group1_consumer2-0"))
assertEquals(expected_2, actual_2) assertEquals(expected_2, actual_2)
// Check if rebalance listener is fired // Check if rebalance listener is fired
assertEquals(true, rebalanceListener1.listenerCalled) assertEquals(true, rebalanceListener1.beforeReleasingPartitionsCalled)
assertEquals(true, rebalanceListener1.beforeStartingFetchersCalled)
assertEquals(Set[Int](0, 1), rebalanceListener1.partitionOwnership.get(topic)) assertEquals(Set[Int](0, 1), rebalanceListener1.partitionOwnership.get(topic))
assertEquals(true, rebalanceListener2.listenerCalled) // Check if global partition ownership in rebalance listener is correct
assertEquals(null, rebalanceListener2.partitionOwnership.get(topic)) assertEquals("group1_consumer1", rebalanceListener1.globalPartitionOwnership.get(topic).get(0).consumer)
assertEquals("group1_consumer2", rebalanceListener1.globalPartitionOwnership.get(topic).get(1).consumer)
assertEquals(0, rebalanceListener1.globalPartitionOwnership.get(topic).get(0).threadId)
assertEquals(0, rebalanceListener1.globalPartitionOwnership.get(topic).get(1).threadId)
assertEquals("group1_consumer1", rebalanceListener1.consumerId)
assertEquals("group1_consumer2", rebalanceListener2.consumerId)
assertEquals(rebalanceListener1.globalPartitionOwnership, rebalanceListener2.globalPartitionOwnership)
zkConsumerConnector1.shutdown() zkConsumerConnector1.shutdown()
zkConsumerConnector2.shutdown() zkConsumerConnector2.shutdown()
} }
@ -397,7 +415,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val children = zkClient.getChildren(path) val children = zkClient.getChildren(path)
Collections.sort(children) Collections.sort(children)
val childrenAsSeq : Seq[java.lang.String] = { val childrenAsSeq : Seq[java.lang.String] = {
import JavaConversions._ import scala.collection.JavaConversions._
children.toSeq children.toSeq
} }
childrenAsSeq.map(partition => childrenAsSeq.map(partition =>
@ -405,13 +423,22 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
} }
private class TestConsumerRebalanceListener extends ConsumerRebalanceListener { private class TestConsumerRebalanceListener extends ConsumerRebalanceListener {
var listenerCalled: Boolean = false var beforeReleasingPartitionsCalled: Boolean = false
var beforeStartingFetchersCalled: Boolean = false
var consumerId: String = "";
var partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]] = null var partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]] = null
var globalPartitionOwnership: java.util.Map[String, java.util.Map[java.lang.Integer, ConsumerThreadId]] = null
override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) { override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) {
listenerCalled = true beforeReleasingPartitionsCalled = true
this.partitionOwnership = partitionOwnership this.partitionOwnership = partitionOwnership
} }
override def beforeStartingFetchers(consumerId: String, globalPartitionOwnership: java.util.Map[String, java.util.Map[java.lang.Integer, ConsumerThreadId]]) {
beforeStartingFetchersCalled = true
this.consumerId = consumerId
this.globalPartitionOwnership = globalPartitionOwnership
}
} }
} }