From 0c1885b800077e4d360935a6d91fe1068a684560 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 13 Sep 2013 16:17:55 -0700 Subject: [PATCH 01/13] KAFKA-955 (followup patch) After a leader change, messages sent with ack=0 are lost; reviewed by Neha Narkhede and Jun Rao --- .../main/scala/kafka/api/ProducerRequest.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index fda3e391843..c6063511173 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -135,12 +135,17 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, } override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { - val producerResponseStatus = data.map { - case (topicAndPartition, data) => - (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l)) + if(request.requestObj.asInstanceOf[ProducerRequest].requiredAcks == 0) { + requestChannel.closeConnection(request.processor, request) + } + else { + val producerResponseStatus = data.map { + case (topicAndPartition, data) => + (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l)) + } + val errorResponse = ProducerResponse(correlationId, producerResponseStatus) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) } - val errorResponse = ProducerResponse(correlationId, producerResponseStatus) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) } def emptyData(){ From aebf746190685d055358ca122aedc424fe024afa Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Fri, 13 Sep 2013 16:29:39 -0700 Subject: [PATCH 02/13] KAFKA-1038; fetch response should use empty messageset instead of null when handling errors; patched by Jun Rao; reviewed by Neha Narkhede --- core/src/main/scala/kafka/api/FetchRequest.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index a807c1f7534..fb2a2306003 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -25,6 +25,7 @@ import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.consumer.ConsumerConfig import java.util.concurrent.atomic.AtomicInteger import kafka.network.RequestChannel +import kafka.message.MessageSet case class PartitionFetchInfo(offset: Long, fetchSize: Int) @@ -155,7 +156,7 @@ case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentV override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { val fetchResponsePartitionData = requestInfo.map { case (topicAndPartition, data) => - (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, null)) + (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, MessageSet.Empty)) } val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse))) From c6ca971738700643ecba78ad1f3998062481aab9 Mon Sep 17 00:00:00 2001 From: Neha Narkhede Date: Tue, 17 Sep 2013 14:22:48 -0700 Subject: [PATCH 03/13] KAFKA-1030 Addition of partitions requires bouncing all the consumers of that topic; reviewed by Neha, Swapnil, Joel --- .../consumer/ZookeeperConsumerConnector.scala | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 81bf0bda322..881f51e0cab 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -31,7 +31,6 @@ import java.util.UUID import kafka.serializer._ import kafka.utils.ZkUtils._ import kafka.common._ -import kafka.client.ClientUtils import com.yammer.metrics.core.Gauge import kafka.metrics._ import scala.Some @@ -422,17 +421,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, true } else { - val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, - brokers, - config.clientId, - config.socketTimeoutMs, - correlationId.getAndIncrement).topicsMetadata - val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]] - topicsMetadata.foreach(m => { - val topic = m.topic - val partitions = m.partitionsMetadata.map(m1 => m1.partitionId) - partitionsPerTopicMap.put(topic, partitions) - }) + val partitionsAssignmentPerTopicMap = getPartitionAssignmentForTopics(zkClient, myTopicThreadIdsMap.keySet.toSeq) + val partitionsPerTopicMap = partitionsAssignmentPerTopicMap.map(p => (p._1, p._2.keySet.toSeq)) /** * fetchers must be stopped to avoid data duplication, since if the current From d6f95cf97a928067977fef5216e954d5147fd138 Mon Sep 17 00:00:00 2001 From: Neha Narkhede Date: Tue, 17 Sep 2013 20:44:33 -0700 Subject: [PATCH 04/13] KAFKA-1053 Kafka patch review tool that integrates JIRA and reviewboard; reviewed by Joel Koshy, Swapnil Ghike and Guozhang Wang --- .reviewboardrc | 3 ++ kafka-patch-review.py | 103 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+) create mode 100644 .reviewboardrc create mode 100644 kafka-patch-review.py diff --git a/.reviewboardrc b/.reviewboardrc new file mode 100644 index 00000000000..5e8d67014c0 --- /dev/null +++ b/.reviewboardrc @@ -0,0 +1,3 @@ +REPOSITORY = 'git://git.apache.org/kafka.git' +TARGET_GROUPS = 'kafka' +GUESS_FIELDS = True diff --git a/kafka-patch-review.py b/kafka-patch-review.py new file mode 100644 index 00000000000..f1d51920577 --- /dev/null +++ b/kafka-patch-review.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python + +import argparse +import sys +import os +import time +import datetime +import tempfile +from jira.client import JIRA + +def get_jira(): + options = { + 'server': 'https://issues.apache.org/jira' + } + # read the config file + home=jira_home=os.getenv('HOME') + home=home.rstrip('/') + jira_config = dict(line.strip().split('=') for line in open(home + '/jira.ini')) + jira = JIRA(options,basic_auth=(jira_config['user'], jira_config['password'])) + return jira + +def main(): + ''' main(), shut up, pylint ''' + popt = argparse.ArgumentParser(description='Kafka patch review tool') + popt.add_argument('-b', '--branch', action='store', dest='branch', required=True, help='Tracking branch to create diff against') + popt.add_argument('-j', '--jira', action='store', dest='jira', required=True, help='JIRA corresponding to the reviewboard') + popt.add_argument('-s', '--summary', action='store', dest='summary', required=False, help='Summary for the reviewboard') + popt.add_argument('-d', '--description', action='store', dest='description', required=False, help='Description for reviewboard') + popt.add_argument('-r', '--rb', action='store', dest='reviewboard', required=False, help='Review board that needs to be updated') + popt.add_argument('-t', '--testing-done', action='store', dest='testing', required=False, help='Text for the Testing Done section of the reviewboard') + popt.add_argument('-db', '--debug', action='store_true', required=False, help='Enable debug mode') + opt = popt.parse_args() + + patch_file=tempfile.gettempdir() + "/" + opt.jira + ".patch" + if opt.reviewboard: + ts = time.time() + st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d_%H:%M:%S') + patch_file=tempfile.gettempdir() + "/" + opt.jira + '_' + st + '.patch' + + git_remote_update="git remote update" + print "Updating your remote branches to pull the latest changes" + p=os.popen(git_remote_update) + p.close() + + rb_command="post-review --publish --tracking-branch " + opt.branch + " --target-groups=kafka --bugs-closed=" + opt.jira + if opt.debug: + rb_command=rb_command + " --debug" + summary="Patch for " + opt.jira + if opt.summary: + summary=opt.summary + rb_command=rb_command + " --summary \"" + summary + "\"" + if opt.description: + rb_command=rb_command + " --description \"" + opt.description + "\"" + if opt.reviewboard: + rb_command=rb_command + " -r " + opt.reviewboard + if opt.testing: + rb_command=rb_command + " --testing-done=" + opt.testing + if opt.debug: + print rb_command + p=os.popen(rb_command) + rb_url="" + for line in p: + print line + if line.startswith('http'): + rb_url = line + elif line.startswith("There don't seem to be any diffs"): + print 'ERROR: Your reviewboard was not created/updated since there was no diff to upload. The reasons that can cause this issue are 1) Your diff is not checked into your local branch. Please check in the diff to the local branch and retry 2) You are not specifying the local branch name as part of the --branch option. Please specify the remote branch name obtained from git branch -r' + p.close() + sys.exit(1) + elif line.startswith("Your review request still exists, but the diff is not attached") and not opt.debug: + print 'ERROR: Your reviewboard was not created/updated. Please run the script with the --debug option to troubleshoot the problem' + p.close() + sys.exit(1) + p.close() + if opt.debug: + print 'rb url=',rb_url + + git_command="git diff " + opt.branch + " > " + patch_file + if opt.debug: + print git_command + p=os.popen(git_command) + p.close() + + print 'Creating diff against', opt.branch, 'and uploading patch to JIRA',opt.jira + jira=get_jira() + issue = jira.issue(opt.jira) + attachment=open(patch_file) + jira.add_attachment(issue,attachment) + attachment.close() + + comment="Created reviewboard " + if not opt.reviewboard: + print 'Created a new reviewboard ',rb_url + else: + print 'Updated reviewboard',opt.reviewboard + comment="Updated reviewboard " + + comment = comment + rb_url + jira.add_comment(opt.jira, comment) + +if __name__ == '__main__': + sys.exit(main()) + From 61b8b2bf818c36e5844a32b24b2855c32bd34254 Mon Sep 17 00:00:00 2001 From: Neha Narkhede Date: Fri, 20 Sep 2013 09:56:28 -0700 Subject: [PATCH 05/13] KAFKA-1062 Reading topic metadata from zookeeper leads to incompatible ordering of partition list; reviewed by Neha and Guozhang --- .../main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 881f51e0cab..08b4b7218f6 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -422,7 +422,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } else { val partitionsAssignmentPerTopicMap = getPartitionAssignmentForTopics(zkClient, myTopicThreadIdsMap.keySet.toSeq) - val partitionsPerTopicMap = partitionsAssignmentPerTopicMap.map(p => (p._1, p._2.keySet.toSeq)) + val partitionsPerTopicMap = partitionsAssignmentPerTopicMap.map(p => (p._1, p._2.keySet.toSeq.sorted)) /** * fetchers must be stopped to avoid data duplication, since if the current From bb7b45cd5d454d64c3454b01f8f3f1e13ed26ff3 Mon Sep 17 00:00:00 2001 From: Sam Meder Date: Tue, 24 Sep 2013 08:31:18 -0700 Subject: [PATCH 06/13] kafka-946; Kafka Hadoop Consumer fails when verifying message checksum; patched by Sam Meder; reviewed by Jun Rao --- .../src/main/java/kafka/etl/KafkaETLContext.java | 2 +- .../src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java index 8e98efc1cdf..1d0e0a91798 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java @@ -196,7 +196,7 @@ public class KafkaETLContext { if (_messageIt != null && _messageIt.hasNext()) { MessageAndOffset messageAndOffset = _messageIt.next(); - ByteBuffer buf = messageAndOffset.message().payload(); + ByteBuffer buf = messageAndOffset.message().buffer(); int origSize = buf.remaining(); byte[] bytes = new byte[origSize]; buf.get(bytes, buf.position(), origSize); diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java index b0aadff332d..45cc9218101 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java @@ -16,8 +16,6 @@ */ package kafka.etl.impl; -import java.io.IOException; -import java.nio.ByteBuffer; import kafka.etl.KafkaETLKey; import kafka.etl.KafkaETLUtils; import kafka.message.Message; @@ -29,6 +27,9 @@ import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; +import java.io.IOException; +import java.nio.ByteBuffer; + /** * Simple implementation of KafkaETLMapper. It assumes that * input data are text timestamp (long). @@ -59,7 +60,7 @@ Mapper { byte[] bytes = KafkaETLUtils.getBytes(val); //check the checksum of message - Message message = new Message(bytes); + Message message = new Message(ByteBuffer.wrap(bytes)); long checksum = key.getChecksum(); if (checksum != message.checksum()) throw new IOException ("Invalid message checksum " From 56ada9576204b754e792a3d3c9063908f4f24b4d Mon Sep 17 00:00:00 2001 From: Joe Stein Date: Thu, 26 Sep 2013 23:01:09 -0400 Subject: [PATCH 07/13] KAFKA-939 sbt publish-local fails due to invalid javac flags passed to javadoc patch by Frank Grimes reviewed by Joe Stein --- project/Build.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/project/Build.scala b/project/Build.scala index 2cdbc9ebc95..33bb76f4ebb 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -50,7 +50,8 @@ object KafkaBuild extends Build { buildNumber := System.getProperty("build.number", ""), version <<= (buildNumber, version) { (build, version) => if (build == "") version else version + "+" + build}, releaseName <<= (name, version, scalaVersion) {(name, version, scalaVersion) => name + "_" + scalaVersion + "-" + version}, - javacOptions ++= Seq("-Xlint:unchecked", "-source", "1.5"), + javacOptions in compile ++= Seq("-Xlint:unchecked", "-source", "1.5"), + javacOptions in doc ++= Seq("-source", "1.5"), parallelExecution in Test := false, // Prevent tests from overrunning each other libraryDependencies ++= Seq( "log4j" % "log4j" % "1.2.15" exclude("javax.jms", "jms"), @@ -73,7 +74,7 @@ object KafkaBuild extends Build { ) val hadoopSettings = Seq( - javacOptions ++= Seq("-Xlint:deprecation"), + javacOptions in compile ++= Seq("-Xlint:deprecation"), libraryDependencies ++= Seq( "org.apache.avro" % "avro" % "1.4.0", "org.apache.pig" % "pig" % "0.8.0", From dcbf0bf0b7e949e0f2652d6f3bd967349813bf6f Mon Sep 17 00:00:00 2001 From: Neha Narkhede Date: Tue, 1 Oct 2013 13:22:46 -0700 Subject: [PATCH 08/13] KAFKA-1068 OfflinePartitionCount metrics may be incorrect after the controller failover; reviewed by Neha Narkhede and Guozhang Wang --- core/src/main/scala/kafka/controller/KafkaController.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index aef41ad5f08..88d130f5599 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -129,7 +129,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg new Gauge[Int] { def value(): Int = { controllerContext.controllerLock synchronized { - controllerContext.partitionLeadershipInfo.count(p => !controllerContext.liveBrokerIds.contains(p._2.leaderAndIsr.leader)) + if (!isActive()) + 0 + else + controllerContext.partitionLeadershipInfo.count(p => !controllerContext.liveBrokerIds.contains(p._2.leaderAndIsr.leader)) } } } From 93921a3a5720a1ffd9e272d59d8a7627da28e89e Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 3 Oct 2013 17:46:53 -0700 Subject: [PATCH 09/13] KAFKA-1071; The random partition selected in DefaultEventHandler is not random across producer instances; reviewed by Neha Narkhede and Jun Rao --- .../main/scala/kafka/producer/async/DefaultEventHandler.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index c15103253c0..eba375b776f 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -22,6 +22,7 @@ import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} import kafka.producer._ import kafka.serializer.Encoder import kafka.utils.{Utils, Logging, SystemTime} +import scala.util.Random import scala.collection.{Seq, Map} import scala.collection.mutable.{ArrayBuffer, HashMap, Set} import java.util.concurrent.atomic._ @@ -36,7 +37,6 @@ class DefaultEventHandler[K,V](config: ProducerConfig, extends EventHandler[K,V] with Logging { val isSync = ("sync" == config.producerType) - val partitionCounter = new AtomicInteger(0) val correlationId = new AtomicInteger(0) val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos) @@ -217,7 +217,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) if (availablePartitions.isEmpty) throw new LeaderNotAvailableException("No leader for any partition in topic " + topic) - val index = Utils.abs(partitionCounter.getAndIncrement()) % availablePartitions.size + val index = Utils.abs(Random.nextInt) % availablePartitions.size val partitionId = availablePartitions(index).partitionId sendPartitionPerTopicCache.put(topic, partitionId) partitionId From 2c6d3c7b450e50e2efaab8d55a5574b2ec1cd376 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Sun, 6 Oct 2013 09:11:07 -0700 Subject: [PATCH 10/13] kafka-1076; system tests in 0.8 are broken due to wrong log4j config; patched by Joel Koshy; reviewed by Jay Kreps and Jun Rao --- config/test-log4j.properties | 68 ++++++++++++++++++++ system_test/utils/kafka_system_test_utils.py | 1 + 2 files changed, 69 insertions(+) create mode 100644 config/test-log4j.properties diff --git a/config/test-log4j.properties b/config/test-log4j.properties new file mode 100644 index 00000000000..a3ae33f20e4 --- /dev/null +++ b/config/test-log4j.properties @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.kafkaAppender.File=logs/server.log +log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.stateChangeAppender.File=logs/state-change.log +log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.requestAppender.File=logs/kafka-request.log +log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.controllerAppender.File=logs/controller.log +log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +# Turn on all our debugging info +#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender +#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender +log4j.logger.kafka.perf=DEBUG, kafkaAppender +log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender +#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG +log4j.logger.kafka=INFO, kafkaAppender + +log4j.logger.kafka.network.RequestChannel$=TRACE, requestAppender +log4j.additivity.kafka.network.RequestChannel$=false + +#log4j.logger.kafka.network.Processor=TRACE, requestAppender +#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender +#log4j.additivity.kafka.server.KafkaApis=false +log4j.logger.kafka.request.logger=TRACE, requestAppender +log4j.additivity.kafka.request.logger=false + +log4j.logger.kafka.controller=TRACE, controllerAppender +log4j.additivity.kafka.controller=false + +log4j.logger.state.change.logger=TRACE, stateChangeAppender +log4j.additivity.state.change.logger=false + + diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index 35fc383933d..1b473eb9e1d 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -1010,6 +1010,7 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk cmdList = ["ssh " + host, "'JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, + "KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%s/config/test-log4j.properties" % kafkaHome, kafkaRunClassBin + " kafka.perf.ProducerPerformance", "--broker-list " + brokerListStr, "--initial-message-id " + str(initMsgId), From 71ed6ca3368ff38909f502565a4bf0f39e70fc6c Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Mon, 7 Oct 2013 09:22:12 -0700 Subject: [PATCH 11/13] kafka-1073; CheckReassignmentStatus is broken; patched by Jun Rao; reviewed by Guozhang Wang, Swapnil Ghike and Neha Narkhede --- bin/kafka-check-reassignment-status.sh | 17 --- .../kafka/admin/CheckReassignmentStatus.scala | 110 ------------------ .../admin/ReassignPartitionsCommand.scala | 72 ++++++++++-- 3 files changed, 62 insertions(+), 137 deletions(-) delete mode 100755 bin/kafka-check-reassignment-status.sh delete mode 100644 core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala diff --git a/bin/kafka-check-reassignment-status.sh b/bin/kafka-check-reassignment-status.sh deleted file mode 100755 index 1f218585cdd..00000000000 --- a/bin/kafka-check-reassignment-status.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -$(dirname $0)/kafka-run-class.sh kafka.admin.CheckReassignmentStatus $@ diff --git a/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala b/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala deleted file mode 100644 index 7e85f87e96d..00000000000 --- a/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.admin - -import joptsimple.OptionParser -import org.I0Itec.zkclient.ZkClient -import kafka.utils._ -import scala.collection.Map -import kafka.common.TopicAndPartition - -object CheckReassignmentStatus extends Logging { - - def main(args: Array[String]): Unit = { - val parser = new OptionParser - val jsonFileOpt = parser.accepts("path-to-json-file", "REQUIRED: The JSON file with the list of partitions and the " + - "new replicas they should be reassigned to") - .withRequiredArg - .describedAs("partition reassignment json file path") - .ofType(classOf[String]) - val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " + - "form host:port. Multiple URLS can be given to allow fail-over.") - .withRequiredArg - .describedAs("urls") - .ofType(classOf[String]) - - val options = parser.parse(args : _*) - - for(arg <- List(jsonFileOpt, zkConnectOpt)) { - if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } - - val jsonFile = options.valueOf(jsonFileOpt) - val zkConnect = options.valueOf(zkConnectOpt) - val jsonString = Utils.readFileAsString(jsonFile) - val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) - - try { - // read the json file into a string - val partitionsToBeReassigned = Json.parseFull(jsonString) match { - case Some(reassignedPartitions) => - val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]] - partitions.map { m => - val topic = m.asInstanceOf[Map[String, String]].get("topic").get - val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt - val replicasList = m.asInstanceOf[Map[String, String]].get("replicas").get - val newReplicas = replicasList.split(",").map(_.toInt) - (TopicAndPartition(topic, partition), newReplicas.toSeq) - }.toMap - case None => Map.empty[TopicAndPartition, Seq[Int]] - } - - val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned) - reassignedPartitionsStatus.foreach { partition => - partition._2 match { - case ReassignmentCompleted => - println("Partition %s reassignment completed successfully".format(partition._1)) - case ReassignmentFailed => - println("Partition %s reassignment failed".format(partition._1)) - case ReassignmentInProgress => - println("Partition %s reassignment in progress".format(partition._1)) - } - } - } - } - - def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) - :Map[TopicAndPartition, ReassignmentStatus] = { - val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas) - // for all partitions whose replica reassignment is complete, check the status - partitionsToBeReassigned.map { topicAndPartition => - (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1, - topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned)) - } - } - - def checkIfPartitionReassignmentSucceeded(zkClient: ZkClient, topicAndPartition: TopicAndPartition, - reassignedReplicas: Seq[Int], - partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]], - partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = { - val newReplicas = partitionsToBeReassigned(topicAndPartition) - partitionsBeingReassigned.get(topicAndPartition) match { - case Some(partition) => ReassignmentInProgress - case None => - // check if AR == RAR - val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition) - if(assignedReplicas == newReplicas) - ReassignmentCompleted - else - ReassignmentFailed - } - } -} \ No newline at end of file diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index f333d29bf36..c6fc4ab7c21 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -58,6 +58,12 @@ object ReassignPartitionsCommand extends Logging { .describedAs("execute") .ofType(classOf[String]) + val statusCheckJsonFileOpt = parser.accepts("status-check-json-file", "REQUIRED: The JSON file with the list of partitions and the " + + "new replicas they should be reassigned to, which can be obtained from the output of a dry run.") + .withRequiredArg + .describedAs("partition reassignment json file path") + .ofType(classOf[String]) + val options = parser.parse(args : _*) for(arg <- List(zkConnectOpt)) { @@ -80,7 +86,24 @@ object ReassignPartitionsCommand extends Logging { var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]() - if(options.has(topicsToMoveJsonFileOpt)) { + if(options.has(statusCheckJsonFileOpt)) { + val jsonFile = options.valueOf(statusCheckJsonFileOpt) + val jsonString = Utils.readFileAsString(jsonFile) + val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString) + + println("Status of partition reassignment:") + val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned) + reassignedPartitionsStatus.foreach { partition => + partition._2 match { + case ReassignmentCompleted => + println("Reassignment of partition %s completed successfully".format(partition._1)) + case ReassignmentFailed => + println("Reassignment of partition %s failed".format(partition._1)) + case ReassignmentInProgress => + println("Reassignment of partition %s is still in progress".format(partition._1)) + } + } + } else if(options.has(topicsToMoveJsonFileOpt)) { val topicsToMoveJsonFile = options.valueOf(topicsToMoveJsonFileOpt) val brokerList = options.valueOf(brokerListOpt) val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile) @@ -107,16 +130,19 @@ object ReassignPartitionsCommand extends Logging { System.exit(1) } - if (options.has(executeOpt)) { - val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) + if (options.has(topicsToMoveJsonFileOpt) || options.has(manualAssignmentJsonFileOpt)) { + if (options.has(executeOpt)) { + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) - if(reassignPartitionsCommand.reassignPartitions()) - println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned)) - else - println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) - } else { - System.out.println("This is a dry run (Use --execute to do the actual reassignment. " + - "The replica assignment is \n" + partitionsToBeReassigned.toString()) + if(reassignPartitionsCommand.reassignPartitions()) + println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned)) + else + println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) + } else { + System.out.println("This is a dry run (Use --execute to do the actual reassignment. " + + "The following is the replica assignment. Save it for the status check option.\n" + + ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned)) + } } } catch { case e: Throwable => @@ -127,6 +153,32 @@ object ReassignPartitionsCommand extends Logging { zkClient.close() } } + + private def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) + :Map[TopicAndPartition, ReassignmentStatus] = { + val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas) + partitionsToBeReassigned.map { topicAndPartition => + (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1, + topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned)) + } + } + + private def checkIfPartitionReassignmentSucceeded(zkClient: ZkClient, topicAndPartition: TopicAndPartition, + reassignedReplicas: Seq[Int], + partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]], + partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = { + val newReplicas = partitionsToBeReassigned(topicAndPartition) + partitionsBeingReassigned.get(topicAndPartition) match { + case Some(partition) => ReassignmentInProgress + case None => + // check if the current replica assignment matches the expected one after reassignment + val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition) + if(assignedReplicas == newReplicas) + ReassignmentCompleted + else + ReassignmentFailed + } + } } class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[TopicAndPartition, collection.Seq[Int]]) From 40efe0a972e8a1673b51b05f3937265d97acb01c Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Mon, 7 Oct 2013 14:00:14 -0700 Subject: [PATCH 12/13] trivial follow-up patch for kafka-1073 to fix unit tests; patched by Jun Rao --- .../main/scala/kafka/admin/ReassignPartitionsCommand.scala | 2 +- core/src/test/scala/unit/kafka/admin/AdminTest.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index c6fc4ab7c21..2f706c94d34 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -163,7 +163,7 @@ object ReassignPartitionsCommand extends Logging { } } - private def checkIfPartitionReassignmentSucceeded(zkClient: ZkClient, topicAndPartition: TopicAndPartition, + def checkIfPartitionReassignmentSucceeded(zkClient: ZkClient, topicAndPartition: TopicAndPartition, reassignedReplicas: Seq[Int], partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]], partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = { diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 881e69b76f6..a480881d7f3 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -191,7 +191,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // wait until reassignment is completed TestUtils.waitUntilTrue(() => { val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); - CheckReassignmentStatus.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, + ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; }, 1000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) @@ -216,7 +216,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // wait until reassignment is completed TestUtils.waitUntilTrue(() => { val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); - CheckReassignmentStatus.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, + ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; }, 1000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) @@ -242,7 +242,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // wait until reassignment is completed TestUtils.waitUntilTrue(() => { val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); - CheckReassignmentStatus.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, + ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; }, 2000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) From 1cb217579bbb1f9d4df24a3d32bf88644697b7fe Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 7 Oct 2013 14:05:40 -0700 Subject: [PATCH 13/13] KAFKA-1075; Consumer will not rebalance upon topic partition change; reviewed by Neha Narkhede and Jun Rao --- .../consumer/ZookeeperConsumerConnector.scala | 38 ++++++++++++++++--- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 08b4b7218f6..36b167b28ac 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -25,7 +25,7 @@ import kafka.cluster._ import kafka.utils._ import org.I0Itec.zkclient.exception.ZkNodeExistsException import java.net.InetAddress -import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} +import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, IZkChildListener, ZkClient} import org.apache.zookeeper.Watcher.Event.KeeperState import java.util.UUID import kafka.serializer._ @@ -90,6 +90,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private val messageStreamCreated = new AtomicBoolean(false) private var sessionExpirationListener: ZKSessionExpireListener = null + private var topicPartitionChangeListenner: ZKTopicPartitionChangeListener = null private var loadBalancerListener: ZKRebalancerListener = null private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null @@ -268,8 +269,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } } - - class ZKSessionExpireListener(val dirs: ZKGroupDirs, val consumerIdString: String, val topicCount: TopicCount, @@ -306,6 +305,29 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } + class ZKTopicPartitionChangeListener(val loadBalancerListener: ZKRebalancerListener) + extends IZkDataListener { + + def handleDataChange(dataPath : String, data: Object) { + try { + info("Topic info for path " + dataPath + " changed to " + data.toString + ", triggering rebalance") + // explicitly trigger load balancing for this consumer + loadBalancerListener.syncedRebalance() + + // There is no need to re-subscribe the watcher since it will be automatically + // re-registered upon firing of this event by zkClient + } catch { + case e: Throwable => error("Error while handling topic partition change for data path " + dataPath, e ) + } + } + + @throws(classOf[Exception]) + def handleDataDeleted(dataPath : String) { + // TODO: This need to be implemented when we support delete topic + warn("Topic for path " + dataPath + " gets deleted, which should not happen at this time") + } + } + class ZKRebalancerListener(val group: String, val consumerIdString: String, val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]]) extends IZkChildListener { @@ -626,11 +648,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]]) } - // register listener for session expired event + // create listener for session expired event if not exist yet if (sessionExpirationListener == null) sessionExpirationListener = new ZKSessionExpireListener( dirs, consumerIdString, topicCount, loadBalancerListener) + // create listener for topic partition change event if not exist yet + if (topicPartitionChangeListenner == null) + topicPartitionChangeListenner = new ZKTopicPartitionChangeListener(loadBalancerListener) + val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams // map of {topic -> Set(thread-1, thread-2, ...)} @@ -686,8 +712,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, topicStreamsMap.foreach { topicAndStreams => // register on broker partition path changes - val partitionPath = BrokerTopicsPath + "/" + topicAndStreams._1 - zkClient.subscribeChildChanges(partitionPath, loadBalancerListener) + val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1 + zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListenner) } // explicitly trigger load balancing for this consumer