mirror of https://github.com/apache/kafka.git
				
				
				
			Merge remote branch 'origin/0.8' into trunk
This commit is contained in:
		
						commit
						bfc4ba4995
					
				|  | @ -1,17 +0,0 @@ | |||
| #!/bin/bash | ||||
| # Licensed to the Apache Software Foundation (ASF) under one or more | ||||
| # contributor license agreements.  See the NOTICE file distributed with | ||||
| # this work for additional information regarding copyright ownership. | ||||
| # The ASF licenses this file to You under the Apache License, Version 2.0 | ||||
| # (the "License"); you may not use this file except in compliance with | ||||
| # the License.  You may obtain a copy of the License at | ||||
| # | ||||
| #    http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| 
 | ||||
| $(dirname $0)/kafka-run-class.sh kafka.admin.CheckReassignmentStatus $@ | ||||
|  | @ -0,0 +1,68 @@ | |||
| # Licensed to the Apache Software Foundation (ASF) under one or more | ||||
| # contributor license agreements.  See the NOTICE file distributed with | ||||
| # this work for additional information regarding copyright ownership. | ||||
| # The ASF licenses this file to You under the Apache License, Version 2.0 | ||||
| # (the "License"); you may not use this file except in compliance with | ||||
| # the License.  You may obtain a copy of the License at | ||||
| # | ||||
| #    http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| log4j.rootLogger=INFO, stdout  | ||||
| 
 | ||||
| log4j.appender.stdout=org.apache.log4j.ConsoleAppender | ||||
| log4j.appender.stdout.layout=org.apache.log4j.PatternLayout | ||||
| log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n | ||||
| 
 | ||||
| log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender | ||||
| log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH | ||||
| log4j.appender.kafkaAppender.File=logs/server.log | ||||
| log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout | ||||
| log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n | ||||
| 
 | ||||
| log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender | ||||
| log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH | ||||
| log4j.appender.stateChangeAppender.File=logs/state-change.log | ||||
| log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout | ||||
| log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n | ||||
| 
 | ||||
| log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender | ||||
| log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH | ||||
| log4j.appender.requestAppender.File=logs/kafka-request.log | ||||
| log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout | ||||
| log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n | ||||
| 
 | ||||
| log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender | ||||
| log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH | ||||
| log4j.appender.controllerAppender.File=logs/controller.log | ||||
| log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout | ||||
| log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n | ||||
| 
 | ||||
| # Turn on all our debugging info | ||||
| #log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender | ||||
| #log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender | ||||
| log4j.logger.kafka.perf=DEBUG, kafkaAppender | ||||
| log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender | ||||
| #log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG | ||||
| log4j.logger.kafka=INFO, kafkaAppender | ||||
| 
 | ||||
| log4j.logger.kafka.network.RequestChannel$=TRACE, requestAppender | ||||
| log4j.additivity.kafka.network.RequestChannel$=false | ||||
| 
 | ||||
| #log4j.logger.kafka.network.Processor=TRACE, requestAppender | ||||
| #log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender | ||||
| #log4j.additivity.kafka.server.KafkaApis=false | ||||
| log4j.logger.kafka.request.logger=TRACE, requestAppender | ||||
| log4j.additivity.kafka.request.logger=false | ||||
| 
 | ||||
| log4j.logger.kafka.controller=TRACE, controllerAppender | ||||
| log4j.additivity.kafka.controller=false | ||||
| 
 | ||||
| log4j.logger.state.change.logger=TRACE, stateChangeAppender | ||||
| log4j.additivity.state.change.logger=false | ||||
| 
 | ||||
| 
 | ||||
|  | @ -196,7 +196,7 @@ public class KafkaETLContext { | |||
|         if (_messageIt != null && _messageIt.hasNext()) { | ||||
|             MessageAndOffset messageAndOffset = _messageIt.next(); | ||||
|              | ||||
|             ByteBuffer buf = messageAndOffset.message().payload(); | ||||
|             ByteBuffer buf = messageAndOffset.message().buffer(); | ||||
|             int origSize = buf.remaining(); | ||||
|             byte[] bytes = new byte[origSize]; | ||||
|           buf.get(bytes, buf.position(), origSize); | ||||
|  |  | |||
|  | @ -16,8 +16,6 @@ | |||
|  */ | ||||
| package kafka.etl.impl; | ||||
| 
 | ||||
| import java.io.IOException; | ||||
| import java.nio.ByteBuffer; | ||||
| import kafka.etl.KafkaETLKey; | ||||
| import kafka.etl.KafkaETLUtils; | ||||
| import kafka.message.Message; | ||||
|  | @ -29,6 +27,9 @@ import org.apache.hadoop.mapred.Mapper; | |||
| import org.apache.hadoop.mapred.OutputCollector; | ||||
| import org.apache.hadoop.mapred.Reporter; | ||||
| 
 | ||||
| import java.io.IOException; | ||||
| import java.nio.ByteBuffer; | ||||
| 
 | ||||
| /** | ||||
|  * Simple implementation of KafkaETLMapper. It assumes that  | ||||
|  * input data are text timestamp (long). | ||||
|  | @ -61,7 +62,7 @@ Mapper<KafkaETLKey, BytesWritable, LongWritable, Text> { | |||
|         byte[] bytes = KafkaETLUtils.getBytes(val); | ||||
|          | ||||
|         //check the checksum of message | ||||
|         Message message = new Message(bytes); | ||||
|         Message message = new Message(ByteBuffer.wrap(bytes)); | ||||
|         long checksum = key.getChecksum(); | ||||
|         if (checksum != message.checksum())  | ||||
|             throw new IOException ("Invalid message checksum "  | ||||
|  |  | |||
|  | @ -1,110 +0,0 @@ | |||
| /** | ||||
|  * Licensed to the Apache Software Foundation (ASF) under one or more | ||||
|  * contributor license agreements.  See the NOTICE file distributed with | ||||
|  * this work for additional information regarding copyright ownership. | ||||
|  * The ASF licenses this file to You under the Apache License, Version 2.0 | ||||
|  * (the "License"); you may not use this file except in compliance with | ||||
|  * the License.  You may obtain a copy of the License at | ||||
|  *  | ||||
|  *    http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
| package kafka.admin | ||||
| 
 | ||||
| import joptsimple.OptionParser | ||||
| import org.I0Itec.zkclient.ZkClient | ||||
| import kafka.utils._ | ||||
| import scala.collection.Map | ||||
| import kafka.common.TopicAndPartition | ||||
| 
 | ||||
| object CheckReassignmentStatus extends Logging { | ||||
| 
 | ||||
|   def main(args: Array[String]): Unit = { | ||||
|     val parser = new OptionParser | ||||
|     val jsonFileOpt = parser.accepts("path-to-json-file", "REQUIRED: The JSON file with the list of partitions and the " + | ||||
|       "new replicas they should be reassigned to") | ||||
|       .withRequiredArg | ||||
|       .describedAs("partition reassignment json file path") | ||||
|       .ofType(classOf[String]) | ||||
|     val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " + | ||||
|       "form host:port. Multiple URLS can be given to allow fail-over.") | ||||
|       .withRequiredArg | ||||
|       .describedAs("urls") | ||||
|       .ofType(classOf[String]) | ||||
| 
 | ||||
|     val options = parser.parse(args : _*) | ||||
| 
 | ||||
|     for(arg <- List(jsonFileOpt, zkConnectOpt)) { | ||||
|       if(!options.has(arg)) { | ||||
|         System.err.println("Missing required argument \"" + arg + "\"") | ||||
|         parser.printHelpOn(System.err) | ||||
|         System.exit(1) | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|     val jsonFile = options.valueOf(jsonFileOpt) | ||||
|     val zkConnect = options.valueOf(zkConnectOpt) | ||||
|     val jsonString = Utils.readFileAsString(jsonFile) | ||||
|     val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) | ||||
| 
 | ||||
|     try { | ||||
|       // read the json file into a string | ||||
|       val partitionsToBeReassigned = Json.parseFull(jsonString) match { | ||||
|         case Some(reassignedPartitions) => | ||||
|           val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]] | ||||
|           partitions.map { m => | ||||
|             val topic = m.asInstanceOf[Map[String, String]].get("topic").get | ||||
|             val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt | ||||
|             val replicasList = m.asInstanceOf[Map[String, String]].get("replicas").get | ||||
|             val newReplicas = replicasList.split(",").map(_.toInt) | ||||
|             (TopicAndPartition(topic, partition), newReplicas.toSeq) | ||||
|           }.toMap | ||||
|         case None => Map.empty[TopicAndPartition, Seq[Int]] | ||||
|       } | ||||
| 
 | ||||
|       val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned) | ||||
|       reassignedPartitionsStatus.foreach { partition => | ||||
|         partition._2 match { | ||||
|           case ReassignmentCompleted => | ||||
|             println("Partition %s reassignment completed successfully".format(partition._1)) | ||||
|           case ReassignmentFailed => | ||||
|             println("Partition %s reassignment failed".format(partition._1)) | ||||
|           case ReassignmentInProgress => | ||||
|             println("Partition %s reassignment in progress".format(partition._1)) | ||||
|         } | ||||
|       } | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) | ||||
|   :Map[TopicAndPartition, ReassignmentStatus] = { | ||||
|     val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas) | ||||
|     // for all partitions whose replica reassignment is complete, check the status | ||||
|     partitionsToBeReassigned.map { topicAndPartition => | ||||
|       (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1, | ||||
|         topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned)) | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   def checkIfPartitionReassignmentSucceeded(zkClient: ZkClient, topicAndPartition: TopicAndPartition, | ||||
|                                             reassignedReplicas: Seq[Int], | ||||
|                                             partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]], | ||||
|                                             partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = { | ||||
|     val newReplicas = partitionsToBeReassigned(topicAndPartition) | ||||
|     partitionsBeingReassigned.get(topicAndPartition) match { | ||||
|       case Some(partition) => ReassignmentInProgress | ||||
|       case None => | ||||
|         // check if AR == RAR | ||||
|         val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition) | ||||
|         if(assignedReplicas == newReplicas) | ||||
|           ReassignmentCompleted | ||||
|         else | ||||
|           ReassignmentFailed | ||||
|     } | ||||
|   } | ||||
| } | ||||
|  | @ -58,6 +58,12 @@ object ReassignPartitionsCommand extends Logging { | |||
|       .describedAs("execute") | ||||
|       .ofType(classOf[String]) | ||||
| 
 | ||||
|     val statusCheckJsonFileOpt = parser.accepts("status-check-json-file", "REQUIRED: The JSON file with the list of partitions and the " + | ||||
|       "new replicas they should be reassigned to, which can be obtained from the output of a dry run.") | ||||
|       .withRequiredArg | ||||
|       .describedAs("partition reassignment json file path") | ||||
|       .ofType(classOf[String]) | ||||
| 
 | ||||
|     val options = parser.parse(args : _*) | ||||
| 
 | ||||
|     for(arg <- List(zkConnectOpt)) { | ||||
|  | @ -80,7 +86,24 @@ object ReassignPartitionsCommand extends Logging { | |||
| 
 | ||||
|       var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]() | ||||
| 
 | ||||
|       if(options.has(topicsToMoveJsonFileOpt)) { | ||||
|       if(options.has(statusCheckJsonFileOpt)) { | ||||
|         val jsonFile = options.valueOf(statusCheckJsonFileOpt) | ||||
|         val jsonString = Utils.readFileAsString(jsonFile) | ||||
|         val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString) | ||||
| 
 | ||||
|         println("Status of partition reassignment:") | ||||
|         val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned) | ||||
|         reassignedPartitionsStatus.foreach { partition => | ||||
|           partition._2 match { | ||||
|             case ReassignmentCompleted => | ||||
|               println("Reassignment of partition %s completed successfully".format(partition._1)) | ||||
|             case ReassignmentFailed => | ||||
|               println("Reassignment of partition %s failed".format(partition._1)) | ||||
|             case ReassignmentInProgress => | ||||
|               println("Reassignment of partition %s is still in progress".format(partition._1)) | ||||
|           } | ||||
|         } | ||||
|       } else if(options.has(topicsToMoveJsonFileOpt)) { | ||||
|         val topicsToMoveJsonFile = options.valueOf(topicsToMoveJsonFileOpt) | ||||
|         val brokerList = options.valueOf(brokerListOpt) | ||||
|         val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile) | ||||
|  | @ -107,16 +130,19 @@ object ReassignPartitionsCommand extends Logging { | |||
|         System.exit(1) | ||||
|       } | ||||
| 
 | ||||
|       if (options.has(executeOpt)) { | ||||
|         val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) | ||||
|       if (options.has(topicsToMoveJsonFileOpt) || options.has(manualAssignmentJsonFileOpt)) { | ||||
|         if (options.has(executeOpt)) { | ||||
|           val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) | ||||
| 
 | ||||
|         if(reassignPartitionsCommand.reassignPartitions()) | ||||
|           println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned)) | ||||
|         else | ||||
|           println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) | ||||
|       } else { | ||||
|         System.out.println("This is a dry run (Use --execute to do the actual reassignment. " + | ||||
|           "The replica assignment is \n" + partitionsToBeReassigned.toString()) | ||||
|           if(reassignPartitionsCommand.reassignPartitions()) | ||||
|             println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned)) | ||||
|           else | ||||
|             println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) | ||||
|         } else { | ||||
|           System.out.println("This is a dry run (Use --execute to do the actual reassignment. " + | ||||
|             "The following is the replica assignment. Save it for the status check option.\n" + | ||||
|             ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned)) | ||||
|         } | ||||
|       } | ||||
|     } catch { | ||||
|       case e: Throwable => | ||||
|  | @ -127,6 +153,32 @@ object ReassignPartitionsCommand extends Logging { | |||
|         zkClient.close() | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   private def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) | ||||
|   :Map[TopicAndPartition, ReassignmentStatus] = { | ||||
|     val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas) | ||||
|     partitionsToBeReassigned.map { topicAndPartition => | ||||
|       (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1, | ||||
|         topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned)) | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   def checkIfPartitionReassignmentSucceeded(zkClient: ZkClient, topicAndPartition: TopicAndPartition, | ||||
|                                             reassignedReplicas: Seq[Int], | ||||
|                                             partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]], | ||||
|                                             partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = { | ||||
|     val newReplicas = partitionsToBeReassigned(topicAndPartition) | ||||
|     partitionsBeingReassigned.get(topicAndPartition) match { | ||||
|       case Some(partition) => ReassignmentInProgress | ||||
|       case None => | ||||
|         // check if the current replica assignment matches the expected one after reassignment | ||||
|         val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition) | ||||
|         if(assignedReplicas == newReplicas) | ||||
|           ReassignmentCompleted | ||||
|         else | ||||
|           ReassignmentFailed | ||||
|     } | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[TopicAndPartition, collection.Seq[Int]]) | ||||
|  |  | |||
|  | @ -25,6 +25,7 @@ import kafka.common.{ErrorMapping, TopicAndPartition} | |||
| import kafka.consumer.ConsumerConfig | ||||
| import java.util.concurrent.atomic.AtomicInteger | ||||
| import kafka.network.RequestChannel | ||||
| import kafka.message.MessageSet | ||||
| 
 | ||||
| 
 | ||||
| case class PartitionFetchInfo(offset: Long, fetchSize: Int) | ||||
|  | @ -155,7 +156,7 @@ case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentV | |||
|   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { | ||||
|     val fetchResponsePartitionData = requestInfo.map { | ||||
|       case (topicAndPartition, data) => | ||||
|         (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, null)) | ||||
|         (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, MessageSet.Empty)) | ||||
|     } | ||||
|     val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData) | ||||
|     requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse))) | ||||
|  |  | |||
|  | @ -135,12 +135,17 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, | |||
|   } | ||||
| 
 | ||||
|   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { | ||||
|     val producerResponseStatus = data.map { | ||||
|       case (topicAndPartition, data) => | ||||
|         (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l)) | ||||
|     if(request.requestObj.asInstanceOf[ProducerRequest].requiredAcks == 0) { | ||||
|         requestChannel.closeConnection(request.processor, request) | ||||
|     } | ||||
|     else { | ||||
|       val producerResponseStatus = data.map { | ||||
|         case (topicAndPartition, data) => | ||||
|           (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l)) | ||||
|       } | ||||
|       val errorResponse = ProducerResponse(correlationId, producerResponseStatus) | ||||
|       requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) | ||||
|     } | ||||
|     val errorResponse = ProducerResponse(correlationId, producerResponseStatus) | ||||
|     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) | ||||
|   } | ||||
| 
 | ||||
|   def emptyData(){ | ||||
|  |  | |||
|  | @ -25,13 +25,12 @@ import kafka.cluster._ | |||
| import kafka.utils._ | ||||
| import org.I0Itec.zkclient.exception.ZkNodeExistsException | ||||
| import java.net.InetAddress | ||||
| import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} | ||||
| import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, IZkChildListener, ZkClient} | ||||
| import org.apache.zookeeper.Watcher.Event.KeeperState | ||||
| import java.util.UUID | ||||
| import kafka.serializer._ | ||||
| import kafka.utils.ZkUtils._ | ||||
| import kafka.common._ | ||||
| import kafka.client.ClientUtils | ||||
| import com.yammer.metrics.core.Gauge | ||||
| import kafka.metrics._ | ||||
| import scala.Some | ||||
|  | @ -91,6 +90,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |||
|   private val messageStreamCreated = new AtomicBoolean(false) | ||||
| 
 | ||||
|   private var sessionExpirationListener: ZKSessionExpireListener = null | ||||
|   private var topicPartitionChangeListenner: ZKTopicPartitionChangeListener = null | ||||
|   private var loadBalancerListener: ZKRebalancerListener = null | ||||
| 
 | ||||
|   private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null | ||||
|  | @ -272,8 +272,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |||
|     } | ||||
|   } | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
|   class ZKSessionExpireListener(val dirs: ZKGroupDirs, | ||||
|                                  val consumerIdString: String, | ||||
|                                  val topicCount: TopicCount, | ||||
|  | @ -310,6 +308,29 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |||
| 
 | ||||
|   } | ||||
| 
 | ||||
|   class ZKTopicPartitionChangeListener(val loadBalancerListener: ZKRebalancerListener) | ||||
|     extends IZkDataListener { | ||||
| 
 | ||||
|     def handleDataChange(dataPath : String, data: Object) { | ||||
|       try { | ||||
|         info("Topic info for path " + dataPath + " changed to " + data.toString + ", triggering rebalance") | ||||
|         // explicitly trigger load balancing for this consumer | ||||
|         loadBalancerListener.syncedRebalance() | ||||
| 
 | ||||
|         // There is no need to re-subscribe the watcher since it will be automatically | ||||
|         // re-registered upon firing of this event by zkClient | ||||
|       } catch { | ||||
|         case e: Throwable => error("Error while handling topic partition change for data path " + dataPath, e ) | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|     @throws(classOf[Exception]) | ||||
|     def handleDataDeleted(dataPath : String) { | ||||
|       // TODO: This need to be implemented when we support delete topic | ||||
|       warn("Topic for path " + dataPath + " gets deleted, which should not happen at this time") | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   class ZKRebalancerListener(val group: String, val consumerIdString: String, | ||||
|                              val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]]) | ||||
|     extends IZkChildListener { | ||||
|  | @ -425,17 +446,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |||
|         true | ||||
|       } | ||||
|       else { | ||||
|         val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, | ||||
|                                                             brokers, | ||||
|                                                             config.clientId, | ||||
|                                                             config.socketTimeoutMs, | ||||
|                                                             correlationId.getAndIncrement).topicsMetadata | ||||
|         val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]] | ||||
|         topicsMetadata.foreach(m => { | ||||
|           val topic = m.topic | ||||
|           val partitions = m.partitionsMetadata.map(m1 => m1.partitionId) | ||||
|           partitionsPerTopicMap.put(topic, partitions) | ||||
|         }) | ||||
|         val partitionsAssignmentPerTopicMap = getPartitionAssignmentForTopics(zkClient, myTopicThreadIdsMap.keySet.toSeq) | ||||
|         val partitionsPerTopicMap = partitionsAssignmentPerTopicMap.map(p => (p._1, p._2.keySet.toSeq.sorted)) | ||||
| 
 | ||||
|         /** | ||||
|          * fetchers must be stopped to avoid data duplication, since if the current | ||||
|  | @ -639,11 +651,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |||
|         config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]]) | ||||
|     } | ||||
| 
 | ||||
|     // register listener for session expired event | ||||
|     // create listener for session expired event if not exist yet | ||||
|     if (sessionExpirationListener == null) | ||||
|       sessionExpirationListener = new ZKSessionExpireListener( | ||||
|         dirs, consumerIdString, topicCount, loadBalancerListener) | ||||
| 
 | ||||
|     // create listener for topic partition change event if not exist yet | ||||
|     if (topicPartitionChangeListenner == null) | ||||
|       topicPartitionChangeListenner = new ZKTopicPartitionChangeListener(loadBalancerListener) | ||||
| 
 | ||||
|     val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams | ||||
| 
 | ||||
|     // map of {topic -> Set(thread-1, thread-2, ...)} | ||||
|  | @ -699,8 +715,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |||
| 
 | ||||
|     topicStreamsMap.foreach { topicAndStreams => | ||||
|       // register on broker partition path changes | ||||
|       val partitionPath = BrokerTopicsPath + "/" + topicAndStreams._1 | ||||
|       zkClient.subscribeChildChanges(partitionPath, loadBalancerListener) | ||||
|       val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1 | ||||
|       zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListenner) | ||||
|     } | ||||
| 
 | ||||
|     // explicitly trigger load balancing for this consumer | ||||
|  |  | |||
|  | @ -129,7 +129,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg | |||
|     new Gauge[Int] { | ||||
|       def value(): Int = { | ||||
|         controllerContext.controllerLock synchronized { | ||||
|           controllerContext.partitionLeadershipInfo.count(p => !controllerContext.liveBrokerIds.contains(p._2.leaderAndIsr.leader)) | ||||
|           if (!isActive()) | ||||
|             0 | ||||
|           else | ||||
|             controllerContext.partitionLeadershipInfo.count(p => !controllerContext.liveBrokerIds.contains(p._2.leaderAndIsr.leader)) | ||||
|         } | ||||
|       } | ||||
|     } | ||||
|  |  | |||
|  | @ -22,6 +22,7 @@ import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} | |||
| import kafka.producer._ | ||||
| import kafka.serializer.Encoder | ||||
| import kafka.utils.{Utils, Logging, SystemTime} | ||||
| import scala.util.Random | ||||
| import scala.collection.{Seq, Map} | ||||
| import scala.collection.mutable.{ArrayBuffer, HashMap, Set} | ||||
| import java.util.concurrent.atomic._ | ||||
|  | @ -36,7 +37,6 @@ class DefaultEventHandler[K,V](config: ProducerConfig, | |||
|   extends EventHandler[K,V] with Logging { | ||||
|   val isSync = ("sync" == config.producerType) | ||||
| 
 | ||||
|   val partitionCounter = new AtomicInteger(0) | ||||
|   val correlationId = new AtomicInteger(0) | ||||
|   val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos) | ||||
| 
 | ||||
|  | @ -218,7 +218,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, | |||
|             val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) | ||||
|             if (availablePartitions.isEmpty) | ||||
|               throw new LeaderNotAvailableException("No leader for any partition in topic " + topic) | ||||
|             val index = Utils.abs(partitionCounter.getAndIncrement()) % availablePartitions.size | ||||
|             val index = Utils.abs(Random.nextInt) % availablePartitions.size | ||||
|             val partitionId = availablePartitions(index).partitionId | ||||
|             sendPartitionPerTopicCache.put(topic, partitionId) | ||||
|             partitionId | ||||
|  |  | |||
|  | @ -149,7 +149,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { | |||
|     // wait until reassignment is completed | ||||
|     TestUtils.waitUntilTrue(() => { | ||||
|       val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); | ||||
|       CheckReassignmentStatus.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, | ||||
|       ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, | ||||
|       Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; | ||||
|     }, 1000) | ||||
|     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) | ||||
|  | @ -174,7 +174,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { | |||
|     // wait until reassignment is completed | ||||
|     TestUtils.waitUntilTrue(() => { | ||||
|       val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); | ||||
|       CheckReassignmentStatus.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, | ||||
|       ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, | ||||
|         Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; | ||||
|     }, 1000) | ||||
|     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) | ||||
|  | @ -200,7 +200,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { | |||
|     // wait until reassignment is completed | ||||
|     TestUtils.waitUntilTrue(() => { | ||||
|       val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); | ||||
|       CheckReassignmentStatus.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, | ||||
|       ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, | ||||
|         Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; | ||||
|     }, 2000) | ||||
|     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) | ||||
|  |  | |||
|  | @ -50,7 +50,8 @@ object KafkaBuild extends Build { | |||
|     buildNumber := System.getProperty("build.number", ""), | ||||
|     version <<= (buildNumber, version)  { (build, version)  => if (build == "") version else version + "+" + build}, | ||||
|     releaseName <<= (name, version, scalaVersion) {(name, version, scalaVersion) => name + "_" + scalaVersion + "-" + version}, | ||||
|     javacOptions ++= Seq("-Xlint:unchecked", "-source", "1.5"), | ||||
|     javacOptions in compile ++= Seq("-Xlint:unchecked", "-source", "1.5"), | ||||
|     javacOptions in doc ++= Seq("-source", "1.5"), | ||||
|     parallelExecution in Test := false, // Prevent tests from overrunning each other | ||||
|     libraryDependencies ++= Seq( | ||||
|       "log4j"                 % "log4j"        % "1.2.15" exclude("javax.jms", "jms"), | ||||
|  | @ -73,7 +74,7 @@ object KafkaBuild extends Build { | |||
|   ) | ||||
| 
 | ||||
|   val hadoopSettings = Seq( | ||||
|     javacOptions ++= Seq("-Xlint:deprecation"), | ||||
|     javacOptions in compile ++= Seq("-Xlint:deprecation"), | ||||
|     libraryDependencies ++= Seq( | ||||
|       "org.apache.avro"      % "avro"               % "1.4.0", | ||||
|       "org.apache.pig"       % "pig"                % "0.8.0", | ||||
|  |  | |||
|  | @ -1010,6 +1010,7 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk | |||
|             cmdList = ["ssh " + host, | ||||
|                        "'JAVA_HOME=" + javaHome, | ||||
|                        "JMX_PORT=" + jmxPort, | ||||
|                        "KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%s/config/test-log4j.properties" % kafkaHome, | ||||
|                        kafkaRunClassBin + " kafka.perf.ProducerPerformance", | ||||
|                        "--broker-list " + brokerListStr, | ||||
|                        "--initial-message-id " + str(initMsgId), | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue