From d1a22b2e3b59c2cf00adabd75d29ddd53938bacb Mon Sep 17 00:00:00 2001 From: Edward Jay Kreps Date: Sat, 13 Oct 2012 03:35:02 +0000 Subject: [PATCH] KAFKA-569 Split up utils package and do some cleanup. Patch reviewed by Neha. git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1397765 13f79535-47bb-0310-9956-ffa450edef68 --- core/src/main/scala/kafka/Kafka.scala | 2 +- .../main/scala/kafka/admin/AdminUtils.scala | 3 +- .../kafka/admin/CheckReassignmentStatus.scala | 4 +- ...referredReplicaLeaderElectionCommand.scala | 8 +- .../admin/ReassignPartitionsCommand.scala | 4 +- core/src/main/scala/kafka/api/ApiUtils.scala | 92 +++++ .../main/scala/kafka/api/FetchRequest.scala | 15 +- .../main/scala/kafka/api/FetchResponse.scala | 8 +- .../scala/kafka/api/LeaderAndIsrRequest.scala | 21 +- ...ponse.scala => LeaderAndIsrResponse.scala} | 13 +- .../main/scala/kafka/api/OffsetRequest.scala | 13 +- .../main/scala/kafka/api/OffsetResponse.scala | 7 +- .../scala/kafka/api/ProducerRequest.scala | 13 +- .../scala/kafka/api/ProducerResponse.scala | 7 +- .../main/scala/kafka/api/RequestKeys.scala | 12 +- .../scala/kafka/api/RequestOrResponse.scala | 6 - .../scala/kafka/api/StopReplicaRequest.scala | 13 +- .../scala/kafka/api/StopReplicaResponse.scala | 5 +- .../main/scala/kafka/api/TopicMetadata.scala | 24 +- .../kafka/api/TopicMetadataRequest.scala | 14 +- .../main/scala/kafka/client/ClientUtils.scala | 60 +++ .../src/main/scala/kafka/cluster/Broker.scala | 1 + .../main/scala/kafka/cluster/Partition.scala | 62 +-- .../kafka/consumer/ConsoleConsumer.scala | 11 +- .../consumer/ConsumerFetcherManager.scala | 3 +- .../scala/kafka/consumer/TopicCount.scala | 4 +- .../consumer/ZookeeperConsumerConnector.scala | 3 +- .../controller/ControllerChannelManager.scala | 2 +- .../src/main/scala/kafka/log/LogManager.scala | 22 +- .../main/scala/kafka/message/Message.scala | 4 +- .../kafka/metrics/KafkaMetricsConfig.scala | 2 +- .../kafka/producer/BrokerPartitionInfo.scala | 7 +- .../kafka/producer/DefaultPartitioner.scala | 4 +- .../main/scala/kafka/producer/Producer.scala | 8 +- .../scala/kafka/producer/ProducerConfig.scala | 2 +- .../producer/async/DefaultEventHandler.scala | 9 +- .../main/scala/kafka/server/KafkaApis.scala | 16 +- .../main/scala/kafka/server/KafkaConfig.scala | 14 +- .../scala/kafka/server/ReplicaManager.scala | 13 +- .../scala/kafka/tools/DumpLogSegments.scala | 4 +- .../main/scala/kafka/tools/MirrorMaker.scala | 5 +- .../scala/kafka/tools/ReplayLogProducer.scala | 20 +- .../kafka/tools/SimpleConsumerShell.scala | 11 +- .../scala/kafka/utils/CommandLineUtils.scala | 20 + core/src/main/scala/kafka/utils/Json.scala | 24 ++ core/src/main/scala/kafka/utils/Utils.scala | 357 ++++-------------- .../kafka/utils/VerifiableProperties.scala | 19 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 42 ++- .../other/kafka/TestZKConsumerOffsets.scala | 2 +- .../RequestResponseSerializationTest.scala | 38 +- .../ZookeeperConsumerConnectorTest.scala | 4 +- .../ZookeeperConsumerConnectorTest.scala | 2 +- .../scala/unit/kafka/log/LogManagerTest.scala | 2 +- .../unit/kafka/message/MessageTest.scala | 2 +- .../unit/kafka/server/ISRExpirationTest.scala | 14 +- .../scala/unit/kafka/utils/TestUtils.scala | 18 +- .../kafka/perf/ConsumerPerformance.scala | 4 +- 57 files changed, 570 insertions(+), 549 deletions(-) create mode 100644 core/src/main/scala/kafka/api/ApiUtils.scala rename core/src/main/scala/kafka/api/{LeaderAndISRResponse.scala => LeaderAndIsrResponse.scala} (85%) create mode 100644 core/src/main/scala/kafka/client/ClientUtils.scala create mode 100644 core/src/main/scala/kafka/utils/CommandLineUtils.scala create mode 100644 core/src/main/scala/kafka/utils/Json.scala diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index 0fe6471f92f..84a4a058503 100644 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -35,7 +35,7 @@ object Kafka extends Logging { val verifiableProps = serverConfig.props val metricsConfig = new KafkaMetricsConfig(verifiableProps) metricsConfig.reporters.foreach(reporterType => { - val reporter = Utils.getObject[KafkaMetricsReporter](reporterType) + val reporter = Utils.createObject[KafkaMetricsReporter](reporterType) reporter.init(verifiableProps) if (reporter.isInstanceOf[KafkaMetricsReporterMBean]) Utils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName) diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 40976c5a0d4..933922473b9 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -18,7 +18,8 @@ package kafka.admin import java.util.Random -import kafka.api.{TopicMetadata, PartitionMetadata} +import kafka.api.{TopicMetadata, PartitionMetadata, TopicMetadataRequest, TopicMetadataResponse} +import kafka.common._ import kafka.cluster.Broker import kafka.utils.{Logging, Utils, ZkUtils} import org.I0Itec.zkclient.ZkClient diff --git a/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala b/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala index c9eda48f921..e1d99d2d8f7 100644 --- a/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala +++ b/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala @@ -34,12 +34,12 @@ object CheckReassignmentStatus extends Logging { val jsonFile = options.valueOf(jsonFileOpt) val zkConnect = options.valueOf(zkConnectOpt) - val jsonString = Utils.readFileIntoString(jsonFile) + val jsonString = Utils.readFileAsString(jsonFile) val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) try { // read the json file into a string - val partitionsToBeReassigned = SyncJSON.parseFull(jsonString) match { + val partitionsToBeReassigned = Json.parseFull(jsonString) match { case Some(reassignedPartitions) => val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]] partitions.map { m => diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index 6ad02039463..2ee59df0138 100644 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -42,11 +42,11 @@ object PreferredReplicaLeaderElectionCommand extends Logging { val options = parser.parse(args : _*) - Utils.checkRequiredArgs(parser, options, jsonFileOpt, zkConnectOpt) + CommandLineUtils.checkRequiredArgs(parser, options, jsonFileOpt, zkConnectOpt) val jsonFile = options.valueOf(jsonFileOpt) val zkConnect = options.valueOf(zkConnectOpt) - val jsonString = Utils.readFileIntoString(jsonFile) + val jsonString = Utils.readFileAsString(jsonFile) var zkClient: ZkClient = null try { @@ -77,7 +77,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { } def parsePreferredReplicaJsonData(jsonString: String): Set[TopicAndPartition] = { - SyncJSON.parseFull(jsonString) match { + Json.parseFull(jsonString) match { case Some(partitionList) => val partitions = (partitionList.asInstanceOf[List[Any]]) Set.empty[TopicAndPartition] ++ partitions.map { m => @@ -93,7 +93,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { partitionsUndergoingPreferredReplicaElection: scala.collection.Set[TopicAndPartition]) { val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath val jsonData = Utils.arrayToJson(partitionsUndergoingPreferredReplicaElection.map { p => - Utils.stringMapToJsonString(Map(("topic" -> p.topic), ("partition" -> p.partition.toString))) + Utils.stringMapToJson(Map(("topic" -> p.topic), ("partition" -> p.partition.toString))) }.toArray) try { ZkUtils.createPersistentPath(zkClient, zkPath, jsonData) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index b50140bc86a..6b6b1bdc0c5 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -50,12 +50,12 @@ object ReassignPartitionsCommand extends Logging { val jsonFile = options.valueOf(jsonFileOpt) val zkConnect = options.valueOf(zkConnectOpt) - val jsonString = Utils.readFileIntoString(jsonFile) + val jsonString = Utils.readFileAsString(jsonFile) var zkClient: ZkClient = null try { // read the json file into a string - val partitionsToBeReassigned = SyncJSON.parseFull(jsonString) match { + val partitionsToBeReassigned = Json.parseFull(jsonString) match { case Some(reassignedPartitions) => val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]] partitions.map { m => diff --git a/core/src/main/scala/kafka/api/ApiUtils.scala b/core/src/main/scala/kafka/api/ApiUtils.scala new file mode 100644 index 00000000000..ba1d199ad46 --- /dev/null +++ b/core/src/main/scala/kafka/api/ApiUtils.scala @@ -0,0 +1,92 @@ +package kafka.api + +import java.nio._ +import kafka.common._ + +/** + * Helper functions specific to parsing or serializing requests and responses + */ +object ApiUtils { + + val ProtocolEncoding = "UTF-8" + + /** + * Read size prefixed string where the size is stored as a 2 byte short. + * @param buffer The buffer to read from + */ + def readShortString(buffer: ByteBuffer): String = { + val size: Int = buffer.getShort() + if(size < 0) + return null + val bytes = new Array[Byte](size) + buffer.get(bytes) + new String(bytes, ProtocolEncoding) + } + + /** + * Write a size prefixed string where the size is stored as a 2 byte short + * @param buffer The buffer to write to + * @param string The string to write + */ + def writeShortString(buffer: ByteBuffer, string: String) { + if(string == null) { + buffer.putShort(-1) + } else if(string.length > Short.MaxValue) { + throw new KafkaException("String exceeds the maximum size of " + Short.MaxValue + ".") + } else { + buffer.putShort(string.length.asInstanceOf[Short]) + buffer.put(string.getBytes(ProtocolEncoding)) + } + } + + /** + * Return size of a size prefixed string where the size is stored as a 2 byte short + * @param string The string to write + */ + def shortStringLength(string: String): Int = { + if(string == null) { + 2 + } else { + val encodedString = string.getBytes(ProtocolEncoding) + if(encodedString.length > Short.MaxValue) { + throw new KafkaException("String exceeds the maximum size of " + Short.MaxValue + ".") + } else { + 2 + encodedString.length + } + } + } + + /** + * Read an integer out of the bytebuffer from the current position and check that it falls within the given + * range. If not, throw KafkaException. + */ + def readIntInRange(buffer: ByteBuffer, name: String, range: (Int, Int)): Int = { + val value = buffer.getInt + if(value < range._1 || value > range._2) + throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".") + else value + } + + /** + * Read a short out of the bytebuffer from the current position and check that it falls within the given + * range. If not, throw KafkaException. + */ + def readShortInRange(buffer: ByteBuffer, name: String, range: (Short, Short)): Short = { + val value = buffer.getShort + if(value < range._1 || value > range._2) + throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".") + else value + } + + /** + * Read a long out of the bytebuffer from the current position and check that it falls within the given + * range. If not, throw KafkaException. + */ + def readLongInRange(buffer: ByteBuffer, name: String, range: (Long, Long)): Long = { + val value = buffer.getLong + if(value < range._1 || value > range._2) + throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".") + else value + } + +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index a80731eef63..88f620e329e 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -18,7 +18,8 @@ package kafka.api import java.nio.ByteBuffer -import kafka.utils.{nonthreadsafe, Utils} +import kafka.utils.nonthreadsafe +import kafka.api.ApiUtils._ import scala.collection.immutable.Map import kafka.common.TopicAndPartition import kafka.consumer.ConsumerConfig @@ -35,13 +36,13 @@ object FetchRequest { def readFrom(buffer: ByteBuffer): FetchRequest = { val versionId = buffer.getShort val correlationId = buffer.getInt - val clientId = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) + val clientId = readShortString(buffer) val replicaId = buffer.getInt val maxWait = buffer.getInt val minBytes = buffer.getInt val topicCount = buffer.getInt val pairs = (1 to topicCount).flatMap(_ => { - val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) + val topic = readShortString(buffer) val partitionCount = buffer.getInt (1 to partitionCount).map(_ => { val partitionId = buffer.getInt @@ -71,14 +72,14 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) buffer.putInt(correlationId) - Utils.writeShortString(buffer, clientId, RequestOrResponse.DefaultCharset) + writeShortString(buffer, clientId) buffer.putInt(replicaId) buffer.putInt(maxWait) buffer.putInt(minBytes) buffer.putInt(requestInfoGroupedByTopic.size) // topic count requestInfoGroupedByTopic.foreach { case (topic, partitionFetchInfos) => - Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset) + writeShortString(buffer, topic) buffer.putInt(partitionFetchInfos.size) // partition count partitionFetchInfos.foreach { case (TopicAndPartition(_, partition), PartitionFetchInfo(offset, fetchSize)) => @@ -92,7 +93,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, def sizeInBytes: Int = { 2 + /* versionId */ 4 + /* correlationId */ - Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) + + shortStringLength(clientId) + 4 + /* replicaId */ 4 + /* maxWait */ 4 + /* minBytes */ @@ -100,7 +101,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, requestInfoGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => { val (topic, partitionFetchInfos) = currTopic foldedTopics + - Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) + + shortStringLength(topic) + 4 + /* partition count */ partitionFetchInfos.size * ( 4 + /* partition id */ diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index a0bc24ff113..174b3d73d6d 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -22,7 +22,7 @@ import java.nio.channels.GatheringByteChannel import kafka.common.{TopicAndPartition, ErrorMapping} import kafka.message.{MessageSet, ByteBufferMessageSet} import kafka.network.{MultiSend, Send} -import kafka.utils.Utils +import kafka.api.ApiUtils._ object FetchResponsePartitionData { def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = { @@ -85,7 +85,7 @@ class PartitionDataSend(val partitionId: Int, object TopicData { def readFrom(buffer: ByteBuffer): TopicData = { - val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) + val topic = readShortString(buffer) val partitionCount = buffer.getInt val topicPartitionDataPairs = (1 to partitionCount).map(_ => { val partitionId = buffer.getInt @@ -96,7 +96,7 @@ object TopicData { } def headerSize(topic: String) = - Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) + + shortStringLength(topic) + 4 /* partition count */ } @@ -115,7 +115,7 @@ class TopicDataSend(val topicData: TopicData) extends Send { override def complete = sent >= size private val buffer = ByteBuffer.allocate(topicData.headerSize) - Utils.writeShortString(buffer, topicData.topic) + writeShortString(buffer, topicData.topic) buffer.putInt(topicData.partitionData.size) buffer.rewind() diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index fd8ef8a8686..850ba543007 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -20,6 +20,7 @@ package kafka.api import java.nio._ import kafka.utils._ +import kafka.api.ApiUtils._ import collection.mutable.Map import collection.mutable.HashMap @@ -30,14 +31,14 @@ object LeaderAndIsr { } case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int], var zkVersion: Int) { - def this(leader: Int, ISR: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, ISR, LeaderAndIsr.initialZKVersion) + def this(leader: Int, isr: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, isr, LeaderAndIsr.initialZKVersion) override def toString(): String = { val jsonDataMap = new HashMap[String, String] jsonDataMap.put("leader", leader.toString) jsonDataMap.put("leaderEpoch", leaderEpoch.toString) jsonDataMap.put("ISR", isr.mkString(",")) - Utils.stringMapToJsonString(jsonDataMap) + Utils.stringMapToJson(jsonDataMap) } } @@ -46,11 +47,11 @@ object PartitionStateInfo { def readFrom(buffer: ByteBuffer): PartitionStateInfo = { val leader = buffer.getInt val leaderGenId = buffer.getInt - val ISRString = Utils.readShortString(buffer, "UTF-8") - val ISR = ISRString.split(",").map(_.toInt).toList + val isrString = readShortString(buffer) + val isr = isrString.split(",").map(_.toInt).toList val zkVersion = buffer.getInt val replicationFactor = buffer.getInt - PartitionStateInfo(LeaderAndIsr(leader, leaderGenId, ISR, zkVersion), replicationFactor) + PartitionStateInfo(LeaderAndIsr(leader, leaderGenId, isr, zkVersion), replicationFactor) } } @@ -58,7 +59,7 @@ case class PartitionStateInfo(val leaderAndIsr: LeaderAndIsr, val replicationFac def writeTo(buffer: ByteBuffer) { buffer.putInt(leaderAndIsr.leader) buffer.putInt(leaderAndIsr.leaderEpoch) - Utils.writeShortString(buffer, leaderAndIsr.isr.mkString(","), "UTF-8") + writeShortString(buffer, leaderAndIsr.isr.mkString(",")) buffer.putInt(leaderAndIsr.zkVersion) buffer.putInt(replicationFactor) } @@ -79,13 +80,13 @@ object LeaderAndIsrRequest { def readFrom(buffer: ByteBuffer): LeaderAndIsrRequest = { val versionId = buffer.getShort - val clientId = Utils.readShortString(buffer) + val clientId = readShortString(buffer) val ackTimeoutMs = buffer.getInt val partitionStateInfosCount = buffer.getInt val partitionStateInfos = new HashMap[(String, Int), PartitionStateInfo] for(i <- 0 until partitionStateInfosCount){ - val topic = Utils.readShortString(buffer, "UTF-8") + val topic = readShortString(buffer) val partition = buffer.getInt val partitionStateInfo = PartitionStateInfo.readFrom(buffer) @@ -108,11 +109,11 @@ case class LeaderAndIsrRequest (versionId: Short, def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) - Utils.writeShortString(buffer, clientId) + writeShortString(buffer, clientId) buffer.putInt(ackTimeoutMs) buffer.putInt(partitionStateInfos.size) for((key, value) <- partitionStateInfos){ - Utils.writeShortString(buffer, key._1, "UTF-8") + writeShortString(buffer, key._1) buffer.putInt(key._2) value.writeTo(buffer) } diff --git a/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala similarity index 85% rename from core/src/main/scala/kafka/api/LeaderAndISRResponse.scala rename to core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala index 33e0ecb6364..c80f0858a7a 100644 --- a/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala @@ -20,28 +20,29 @@ package kafka.api import kafka.common.ErrorMapping import java.nio.ByteBuffer import kafka.utils.Utils +import kafka.api.ApiUtils._ import collection.mutable.HashMap import collection.Map -object LeaderAndISRResponse { - def readFrom(buffer: ByteBuffer): LeaderAndISRResponse = { +object LeaderAndIsrResponse { + def readFrom(buffer: ByteBuffer): LeaderAndIsrResponse = { val versionId = buffer.getShort val errorCode = buffer.getShort val numEntries = buffer.getInt val responseMap = new HashMap[(String, Int), Short]() for (i<- 0 until numEntries){ - val topic = Utils.readShortString(buffer, "UTF-8") + val topic = readShortString(buffer) val partition = buffer.getInt val partitionErrorCode = buffer.getShort responseMap.put((topic, partition), partitionErrorCode) } - new LeaderAndISRResponse(versionId, responseMap, errorCode) + new LeaderAndIsrResponse(versionId, responseMap, errorCode) } } -case class LeaderAndISRResponse(versionId: Short, +case class LeaderAndIsrResponse(versionId: Short, responseMap: Map[(String, Int), Short], errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse { @@ -58,7 +59,7 @@ case class LeaderAndISRResponse(versionId: Short, buffer.putShort(errorCode) buffer.putInt(responseMap.size) for ((key:(String, Int), value) <- responseMap){ - Utils.writeShortString(buffer, key._1, "UTF-8") + writeShortString(buffer, key._1) buffer.putInt(key._2) buffer.putShort(value) } diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala index 1c58e08e363..433dc49e6c5 100644 --- a/core/src/main/scala/kafka/api/OffsetRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetRequest.scala @@ -20,6 +20,7 @@ package kafka.api import java.nio.ByteBuffer import kafka.utils.Utils import kafka.common.TopicAndPartition +import kafka.api.ApiUtils._ object OffsetRequest { @@ -33,11 +34,11 @@ object OffsetRequest { def readFrom(buffer: ByteBuffer): OffsetRequest = { val versionId = buffer.getShort - val clientId = Utils.readShortString(buffer) + val clientId = readShortString(buffer) val replicaId = buffer.getInt val topicCount = buffer.getInt val pairs = (1 to topicCount).flatMap(_ => { - val topic = Utils.readShortString(buffer) + val topic = readShortString(buffer) val partitionCount = buffer.getInt (1 to partitionCount).map(_ => { val partitionId = buffer.getInt @@ -64,13 +65,13 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) - Utils.writeShortString(buffer, clientId) + writeShortString(buffer, clientId) buffer.putInt(replicaId) buffer.putInt(requestInfoGroupedByTopic.size) // topic count requestInfoGroupedByTopic.foreach { case((topic, partitionInfos)) => - Utils.writeShortString(buffer, topic) + writeShortString(buffer, topic) buffer.putInt(partitionInfos.size) // partition count partitionInfos.foreach { case (TopicAndPartition(_, partition), partitionInfo) => @@ -83,13 +84,13 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ def sizeInBytes = 2 + /* versionId */ - Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) + + shortStringLength(clientId) + 4 + /* replicaId */ 4 + /* topic count */ requestInfoGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => { val (topic, partitionInfos) = currTopic foldedTopics + - Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) + + shortStringLength(topic) + 4 + /* partition count */ partitionInfos.size * ( 4 + /* partition */ diff --git a/core/src/main/scala/kafka/api/OffsetResponse.scala b/core/src/main/scala/kafka/api/OffsetResponse.scala index 242b496966f..a80386733d5 100644 --- a/core/src/main/scala/kafka/api/OffsetResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetResponse.scala @@ -20,6 +20,7 @@ package kafka.api import java.nio.ByteBuffer import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.utils.Utils +import kafka.api.ApiUtils._ object OffsetResponse { @@ -28,7 +29,7 @@ object OffsetResponse { val versionId = buffer.getShort val numTopics = buffer.getInt val pairs = (1 to numTopics).flatMap(_ => { - val topic = Utils.readShortString(buffer) + val topic = readShortString(buffer) val numPartitions = buffer.getInt (1 to numPartitions).map(_ => { val partition = buffer.getInt @@ -61,7 +62,7 @@ case class OffsetResponse(versionId: Short, offsetsGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => { val (topic, errorAndOffsetsMap) = currTopic foldedTopics + - Utils.shortStringLength(topic) + + shortStringLength(topic) + 4 + /* partition count */ errorAndOffsetsMap.foldLeft(0)((foldedPartitions, currPartition) => { foldedPartitions + @@ -78,7 +79,7 @@ case class OffsetResponse(versionId: Short, buffer.putInt(offsetsGroupedByTopic.size) // topic count offsetsGroupedByTopic.foreach { case((topic, errorAndOffsetsMap)) => - Utils.writeShortString(buffer, topic) + writeShortString(buffer, topic) buffer.putInt(errorAndOffsetsMap.size) // partition count errorAndOffsetsMap.foreach { case((TopicAndPartition(_, partition), errorAndOffsets)) => diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index 9396e03d6da..65abd025814 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -22,6 +22,7 @@ import kafka.message._ import kafka.utils._ import scala.collection.Map import kafka.common.TopicAndPartition +import kafka.api.ApiUtils._ object ProducerRequest { @@ -30,14 +31,14 @@ object ProducerRequest { def readFrom(buffer: ByteBuffer): ProducerRequest = { val versionId: Short = buffer.getShort val correlationId: Int = buffer.getInt - val clientId: String = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) + val clientId: String = readShortString(buffer) val requiredAcks: Short = buffer.getShort val ackTimeoutMs: Int = buffer.getInt //build the topic structure val topicCount = buffer.getInt val partitionDataPairs = (1 to topicCount).flatMap(_ => { // process topic - val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) + val topic = readShortString(buffer) val partitionCount = buffer.getInt (1 to partitionCount).map(_ => { val partition = buffer.getInt @@ -75,7 +76,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) buffer.putInt(correlationId) - Utils.writeShortString(buffer, clientId, RequestOrResponse.DefaultCharset) + writeShortString(buffer, clientId) buffer.putShort(requiredAcks) buffer.putInt(ackTimeoutMs) @@ -83,7 +84,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, buffer.putInt(dataGroupedByTopic.size) //the number of topics dataGroupedByTopic.foreach { case (topic, topicAndPartitionData) => - Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset) //write the topic + writeShortString(buffer, topic) //write the topic buffer.putInt(topicAndPartitionData.size) //the number of partitions topicAndPartitionData.foreach(partitionAndData => { val partition = partitionAndData._1.partition @@ -100,13 +101,13 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, def sizeInBytes: Int = { 2 + /* versionId */ 4 + /* correlationId */ - Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) + /* client id */ + shortStringLength(clientId) + /* client id */ 2 + /* requiredAcks */ 4 + /* ackTimeoutMs */ 4 + /* number of topics */ dataGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => { foldedTopics + - Utils.shortStringLength(currTopic._1, RequestOrResponse.DefaultCharset) + + shortStringLength(currTopic._1) + 4 + /* the number of partitions */ { currTopic._2.foldLeft(0)((foldedPartitions, currPartition) => { diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala index 6de9c9301df..f56a93a4ddd 100644 --- a/core/src/main/scala/kafka/api/ProducerResponse.scala +++ b/core/src/main/scala/kafka/api/ProducerResponse.scala @@ -21,6 +21,7 @@ import java.nio.ByteBuffer import kafka.utils.Utils import scala.collection.Map import kafka.common.{TopicAndPartition, ErrorMapping} +import kafka.api.ApiUtils._ object ProducerResponse { @@ -29,7 +30,7 @@ object ProducerResponse { val correlationId = buffer.getInt val topicCount = buffer.getInt val statusPairs = (1 to topicCount).flatMap(_ => { - val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) + val topic = readShortString(buffer) val partitionCount = buffer.getInt (1 to partitionCount).map(_ => { val partition = buffer.getInt @@ -64,7 +65,7 @@ case class ProducerResponse(versionId: Short, 4 + /* topic count */ groupedStatus.foldLeft (0) ((foldedTopics, currTopic) => { foldedTopics + - Utils.shortStringLength(currTopic._1, RequestOrResponse.DefaultCharset) + + shortStringLength(currTopic._1) + 4 + /* partition count for this topic */ currTopic._2.size * { 4 + /* partition id */ @@ -83,7 +84,7 @@ case class ProducerResponse(versionId: Short, groupedStatus.foreach(topicStatus => { val (topic, errorsAndOffsets) = topicStatus - Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset) + writeShortString(buffer, topic) buffer.putInt(errorsAndOffsets.size) // partition count errorsAndOffsets.foreach { case((TopicAndPartition(_, partition), ProducerResponseStatus(error, nextOffset))) => diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala index 94e13f0b033..b000eb7a434 100644 --- a/core/src/main/scala/kafka/api/RequestKeys.scala +++ b/core/src/main/scala/kafka/api/RequestKeys.scala @@ -29,12 +29,12 @@ object RequestKeys { val StopReplicaKey: Short = 5 val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]= - Map( ProduceKey -> ("Produce", ProducerRequest.readFrom), - FetchKey -> ("Fetch", FetchRequest.readFrom), - OffsetsKey -> ("Offsets", OffsetRequest.readFrom), - MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom), - LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom), - StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom) ) + Map(ProduceKey -> ("Produce", ProducerRequest.readFrom), + FetchKey -> ("Fetch", FetchRequest.readFrom), + OffsetsKey -> ("Offsets", OffsetRequest.readFrom), + MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom), + LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom), + StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom)) def nameForKey(key: Short): String = { keyToNameAndDeserializerMap.get(key) match { diff --git a/core/src/main/scala/kafka/api/RequestOrResponse.scala b/core/src/main/scala/kafka/api/RequestOrResponse.scala index c0c2d8e4ac8..83ad42c5e57 100644 --- a/core/src/main/scala/kafka/api/RequestOrResponse.scala +++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala @@ -19,12 +19,6 @@ package kafka.api import java.nio._ - -object RequestOrResponse { - val DefaultCharset = "UTF-8" -} - - object Request { val OrdinaryConsumerId: Int = -1 val DebuggingConsumerId: Int = -2 diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala index 99a5f95bb23..c3db6f988ff 100644 --- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala +++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala @@ -19,7 +19,7 @@ package kafka.api import java.nio._ -import kafka.utils._ +import kafka.api.ApiUtils._ object StopReplicaRequest { val CurrentVersion = 1.shortValue() @@ -28,13 +28,12 @@ object StopReplicaRequest { def readFrom(buffer: ByteBuffer): StopReplicaRequest = { val versionId = buffer.getShort - val clientId = Utils.readShortString(buffer) + val clientId = readShortString(buffer) val ackTimeoutMs = buffer.getInt val topicPartitionPairCount = buffer.getInt val topicPartitionPairSet = new collection.mutable.HashSet[(String, Int)]() - for (i <- 0 until topicPartitionPairCount) { - topicPartitionPairSet.add(Utils.readShortString(buffer, "UTF-8"), buffer.getInt) - } + for (i <- 0 until topicPartitionPairCount) + topicPartitionPairSet.add(readShortString(buffer), buffer.getInt) new StopReplicaRequest(versionId, clientId, ackTimeoutMs, topicPartitionPairSet.toSet) } } @@ -51,11 +50,11 @@ case class StopReplicaRequest(versionId: Short, def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) - Utils.writeShortString(buffer, clientId) + writeShortString(buffer, clientId) buffer.putInt(ackTimeoutMs) buffer.putInt(partitions.size) for ((topic, partitionId) <- partitions){ - Utils.writeShortString(buffer, topic, "UTF-8") + writeShortString(buffer, topic) buffer.putInt(partitionId) } } diff --git a/core/src/main/scala/kafka/api/StopReplicaResponse.scala b/core/src/main/scala/kafka/api/StopReplicaResponse.scala index 29e5209e714..d8495700f5a 100644 --- a/core/src/main/scala/kafka/api/StopReplicaResponse.scala +++ b/core/src/main/scala/kafka/api/StopReplicaResponse.scala @@ -22,6 +22,7 @@ import kafka.utils.Utils import collection.mutable.HashMap import collection.mutable.Map import kafka.common.ErrorMapping +import kafka.api.ApiUtils._ object StopReplicaResponse { @@ -32,7 +33,7 @@ object StopReplicaResponse { val responseMap = new HashMap[(String, Int), Short]() for (i<- 0 until numEntries){ - val topic = Utils.readShortString(buffer, "UTF-8") + val topic = readShortString(buffer) val partition = buffer.getInt val partitionErrorCode = buffer.getShort() responseMap.put((topic, partition), partitionErrorCode) @@ -58,7 +59,7 @@ case class StopReplicaResponse(val versionId: Short, buffer.putShort(errorCode) buffer.putInt(responseMap.size) for ((key:(String, Int), value) <- responseMap){ - Utils.writeShortString(buffer, key._1, "UTF-8") + writeShortString(buffer, key._1) buffer.putInt(key._2) buffer.putShort(value) } diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala index f538db3b061..e2d03e818c6 100644 --- a/core/src/main/scala/kafka/api/TopicMetadata.scala +++ b/core/src/main/scala/kafka/api/TopicMetadata.scala @@ -19,7 +19,8 @@ package kafka.api import kafka.cluster.Broker import java.nio.ByteBuffer -import kafka.utils.Utils._ +import kafka.api.ApiUtils._ +import kafka.utils.Logging import collection.mutable.ListBuffer import kafka.common.{KafkaException, ErrorMapping} @@ -54,9 +55,9 @@ case object LeaderDoesNotExist extends LeaderRequest { val requestId: Byte = 0 } object TopicMetadata { def readFrom(buffer: ByteBuffer): TopicMetadata = { - val errorCode = getShortInRange(buffer, "error code", (-1, Short.MaxValue)) + val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue)) val topic = readShortString(buffer) - val numPartitions = getIntInRange(buffer, "number of partitions", (0, Int.MaxValue)) + val numPartitions = readIntInRange(buffer, "number of partitions", (0, Int.MaxValue)) val partitionsMetadata = new ListBuffer[PartitionMetadata]() for(i <- 0 until numPartitions) partitionsMetadata += PartitionMetadata.readFrom(buffer) @@ -64,7 +65,7 @@ object TopicMetadata { } } -case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata], errorCode: Short = ErrorMapping.NoError) { +case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata], errorCode: Short = ErrorMapping.NoError) extends Logging { def sizeInBytes: Int = { var size: Int = 2 /* error code */ size += shortStringLength(topic) @@ -87,8 +88,8 @@ case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadat object PartitionMetadata { def readFrom(buffer: ByteBuffer): PartitionMetadata = { - val errorCode = getShortInRange(buffer, "error code", (-1, Short.MaxValue)) - val partitionId = getIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition id */ + val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue)) + val partitionId = readIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition id */ val doesLeaderExist = getLeaderRequest(buffer.get) val leader = doesLeaderExist match { case LeaderExists => /* leader exists */ @@ -97,14 +98,14 @@ object PartitionMetadata { } /* list of all replicas */ - val numReplicas = getShortInRange(buffer, "number of all replicas", (0, Short.MaxValue)) + val numReplicas = readShortInRange(buffer, "number of all replicas", (0, Short.MaxValue)) val replicas = new Array[Broker](numReplicas) for(i <- 0 until numReplicas) { replicas(i) = Broker.readFrom(buffer) } /* list of in-sync replicas */ - val numIsr = getShortInRange(buffer, "number of in-sync replicas", (0, Short.MaxValue)) + val numIsr = readShortInRange(buffer, "number of in-sync replicas", (0, Short.MaxValue)) val isr = new Array[Broker](numIsr) for(i <- 0 until numIsr) { isr(i) = Broker.readFrom(buffer) @@ -122,8 +123,11 @@ object PartitionMetadata { } } -case class PartitionMetadata(partitionId: Int, val leader: Option[Broker], replicas: Seq[Broker], isr: Seq[Broker] = Seq.empty, - errorCode: Short = ErrorMapping.NoError) { +case class PartitionMetadata(partitionId: Int, + val leader: Option[Broker], + replicas: Seq[Broker], + isr: Seq[Broker] = Seq.empty, + errorCode: Short = ErrorMapping.NoError) extends Logging { def sizeInBytes: Int = { var size: Int = 2 /* error code */ + 4 /* partition id */ + 1 /* if leader exists*/ diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index 7ce953a87ab..70c42e306a8 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -18,11 +18,11 @@ package kafka.api import java.nio.ByteBuffer -import kafka.utils.Utils._ +import kafka.api.ApiUtils._ import collection.mutable.ListBuffer -import kafka.utils._ +import kafka.utils.Logging -object TopicMetadataRequest { +object TopicMetadataRequest extends Logging { val CurrentVersion = 1.shortValue() val DefaultClientId = "" @@ -33,11 +33,11 @@ object TopicMetadataRequest { def readFrom(buffer: ByteBuffer): TopicMetadataRequest = { val versionId = buffer.getShort - val clientId = Utils.readShortString(buffer) - val numTopics = getIntInRange(buffer, "number of topics", (0, Int.MaxValue)) + val clientId = readShortString(buffer) + val numTopics = readIntInRange(buffer, "number of topics", (0, Int.MaxValue)) val topics = new ListBuffer[String]() for(i <- 0 until numTopics) - topics += readShortString(buffer, "UTF-8") + topics += readShortString(buffer) val topicsList = topics.toList debug("topic = %s".format(topicsList.head)) new TopicMetadataRequest(versionId, clientId, topics.toList) @@ -54,7 +54,7 @@ def this(topics: Seq[String]) = def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) - Utils.writeShortString(buffer, clientId) + writeShortString(buffer, clientId) buffer.putInt(topics.size) topics.foreach(topic => writeShortString(buffer, topic)) } diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala new file mode 100644 index 00000000000..aeead2d8998 --- /dev/null +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -0,0 +1,60 @@ +package kafka.client + +import scala.collection._ +import kafka.cluster._ +import kafka.api._ +import kafka.producer._ +import kafka.common.KafkaException +import kafka.utils.{Utils, Logging} + +/** + * Helper functions common to clients (producer, consumer, or admin) + */ +object ClientUtils extends Logging{ + + def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker]): TopicMetadataResponse = { + var fetchMetaDataSucceeded: Boolean = false + var i: Int = 0 + val topicMetadataRequest = new TopicMetadataRequest(topics.toSeq) + var topicMetadataResponse: TopicMetadataResponse = null + var t: Throwable = null + while(i < brokers.size && !fetchMetaDataSucceeded) { + val producer: SyncProducer = ProducerPool.createSyncProducer(None, brokers(i)) + info("Fetching metadata for topic %s".format(topics)) + try { + topicMetadataResponse = producer.send(topicMetadataRequest) + fetchMetaDataSucceeded = true + } + catch { + case e => + warn("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, brokers(i).toString), e) + t = e + } finally { + i = i + 1 + producer.close() + } + } + if(!fetchMetaDataSucceeded){ + throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, brokers), t) + } + return topicMetadataResponse + } + + /** + * Parse a list of broker urls in the form host1:port1, host2:port2, ... + */ + def parseBrokerList(brokerListStr: String): Seq[Broker] = { + val brokersStr = Utils.parseCsvList(brokerListStr) + + brokersStr.zipWithIndex.map(b =>{ + val brokerStr = b._1 + val brokerId = b._2 + val brokerInfos = brokerStr.split(":") + val hostName = brokerInfos(0) + val port = brokerInfos(1).toInt + val creatorId = hostName + "-" + System.currentTimeMillis() + new Broker(brokerId, creatorId, hostName, port) + }) + } + +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala index 9b57b420092..03a75f00b2c 100644 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -18,6 +18,7 @@ package kafka.cluster import kafka.utils.Utils._ +import kafka.api.ApiUtils._ import java.nio.ByteBuffer import kafka.common.BrokerNotAvailableException diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index fdabd6bf7e1..d3be2daf836 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -42,7 +42,7 @@ class Partition(val topic: String, var leaderReplicaIdOpt: Option[Int] = None var inSyncReplicas: Set[Replica] = Set.empty[Replica] private val assignedReplicaMap = new Pool[Int,Replica] - private val leaderISRUpdateLock = new Object + private val leaderIsrUpdateLock = new Object private var zkVersion: Int = LeaderAndIsr.initialZKVersion private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1 this.logIdent = "Partition [%s, %d] on broker %d: ".format(topic, partitionId, localBrokerId) @@ -90,7 +90,7 @@ class Partition(val topic: String, } def leaderReplicaIfLocal(): Option[Replica] = { - leaderISRUpdateLock synchronized { + leaderIsrUpdateLock synchronized { leaderReplicaIdOpt match { case Some(leaderReplicaId) => if (leaderReplicaId == localBrokerId) @@ -114,17 +114,17 @@ class Partition(val topic: String, /** * If the leaderEpoch of the incoming request is higher than locally cached epoch, make it the new leader of follower to the new leader. */ - def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr, isMakingLeader: Boolean): Boolean = { - leaderISRUpdateLock synchronized { - if (leaderEpoch >= leaderAndISR.leaderEpoch){ + def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr, isMakingLeader: Boolean): Boolean = { + leaderIsrUpdateLock synchronized { + if (leaderEpoch >= leaderAndIsr.leaderEpoch){ info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become %s request" - .format(leaderEpoch, leaderAndISR.leaderEpoch, if(isMakingLeader) "leader" else "follower")) + .format(leaderEpoch, leaderAndIsr.leaderEpoch, if(isMakingLeader) "leader" else "follower")) return false } if(isMakingLeader) - makeLeader(topic, partitionId, leaderAndISR) + makeLeader(topic, partitionId, leaderAndIsr) else - makeFollower(topic, partitionId, leaderAndISR) + makeFollower(topic, partitionId, leaderAndIsr) true } } @@ -136,17 +136,17 @@ class Partition(val topic: String, * 3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time) * 4. set the new leader and ISR */ - private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) { - trace("Started to become leader at the request %s".format(leaderAndISR.toString())) + private def makeLeader(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr) { + trace("Started to become leader at the request %s".format(leaderAndIsr.toString())) // stop replica fetcher thread, if any replicaFetcherManager.removeFetcher(topic, partitionId) - val newInSyncReplicas = leaderAndISR.isr.map(r => getOrCreateReplica(r)).toSet + val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet // reset LogEndOffset for remote replicas assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset) inSyncReplicas = newInSyncReplicas - leaderEpoch = leaderAndISR.leaderEpoch - zkVersion = leaderAndISR.zkVersion + leaderEpoch = leaderAndIsr.leaderEpoch + zkVersion = leaderAndIsr.zkVersion leaderReplicaIdOpt = Some(localBrokerId) // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(getReplica().get) @@ -158,9 +158,9 @@ class Partition(val topic: String, * 3. set the leader and set ISR to empty * 4. start a fetcher to the new leader */ - private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) = { - trace("Started to become follower at the request %s".format(leaderAndISR.toString())) - val newLeaderBrokerId: Int = leaderAndISR.leader + private def makeFollower(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr) = { + trace("Started to become follower at the request %s".format(leaderAndIsr.toString())) + val newLeaderBrokerId: Int = leaderAndIsr.leader info("Starting the follower state transition to follow leader %d for topic %s partition %d" .format(newLeaderBrokerId, topic, partitionId)) ZkUtils.getBrokerInfo(zkClient, newLeaderBrokerId) match { @@ -171,8 +171,8 @@ class Partition(val topic: String, val localReplica = getOrCreateReplica() localReplica.log.get.truncateTo(localReplica.highWatermark) inSyncReplicas = Set.empty[Replica] - leaderEpoch = leaderAndISR.leaderEpoch - zkVersion = leaderAndISR.zkVersion + leaderEpoch = leaderAndIsr.leaderEpoch + zkVersion = leaderAndIsr.zkVersion leaderReplicaIdOpt = Some(newLeaderBrokerId) // start fetcher thread to current leader replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker) @@ -182,8 +182,8 @@ class Partition(val topic: String, } } - def updateLeaderHWAndMaybeExpandISR(replicaId: Int, offset: Long) { - leaderISRUpdateLock synchronized { + def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) { + leaderIsrUpdateLock synchronized { debug("Recording follower %d position %d for topic %s partition %d.".format(replicaId, offset, topic, partitionId)) val replica = getOrCreateReplica(replicaId) replica.logEndOffset = offset @@ -198,7 +198,7 @@ class Partition(val topic: String, val newInSyncReplicas = inSyncReplicas + replica info("Expanding ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(", "))) // update ISR in ZK and cache - updateISR(newInSyncReplicas) + updateIsr(newInSyncReplicas) replicaManager.isrExpandRate.mark() } maybeIncrementLeaderHW(leaderReplica) @@ -208,7 +208,7 @@ class Partition(val topic: String, } def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = { - leaderISRUpdateLock synchronized { + leaderIsrUpdateLock synchronized { leaderReplicaIfLocal() match { case Some(_) => val numAcks = inSyncReplicas.count(r => { @@ -247,8 +247,8 @@ class Partition(val topic: String, .format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(","))) } - def maybeShrinkISR(replicaMaxLagTimeMs: Long, replicaMaxLagBytes: Long) { - leaderISRUpdateLock synchronized { + def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagBytes: Long) { + leaderIsrUpdateLock synchronized { leaderReplicaIfLocal() match { case Some(leaderReplica) => val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagBytes) @@ -257,7 +257,7 @@ class Partition(val topic: String, assert(newInSyncReplicas.size > 0) info("Shrinking ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(","))) // update ISR in zk and in cache - updateISR(newInSyncReplicas) + updateIsr(newInSyncReplicas) // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(leaderReplica) replicaManager.isrShrinkRate.mark() @@ -289,15 +289,15 @@ class Partition(val topic: String, stuckReplicas ++ slowReplicas } - private def updateISR(newISR: Set[Replica]) { - info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newISR.mkString(", "))) - val newLeaderAndISR = new LeaderAndIsr(localBrokerId, leaderEpoch, newISR.map(r => r.brokerId).toList, zkVersion) + private def updateIsr(newIsr: Set[Replica]) { + info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newIsr.mkString(", "))) + val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion) val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, - ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndISR.toString(), zkVersion) + ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndIsr.toString(), zkVersion) if (updateSucceeded){ - inSyncReplicas = newISR + inSyncReplicas = newIsr zkVersion = newVersion - trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newISR.mkString(","), zkVersion)) + trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion)) } else { info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion)) } diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index 81fd4f52686..d2bb2803315 100644 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -25,7 +25,7 @@ import java.util.Properties import java.util.Random import java.io.PrintStream import kafka.message._ -import kafka.utils.{Utils, Logging} +import kafka.utils.{Utils, Logging, ZkUtils, CommandLineUtils} import kafka.utils.ZKStringSerializer import kafka.serializer.StringDecoder @@ -109,8 +109,7 @@ object ConsoleConsumer extends Logging { "skip it instead of halt.") val options: OptionSet = tryParse(parser, args) - Utils.checkRequiredArgs(parser, options, zkConnectOpt) - + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has) if (topicOrFilterOpt.size != 1) { error("Exactly one of whitelist/blacklist/topic is required.") @@ -145,14 +144,14 @@ object ConsoleConsumer extends Logging { val connector = Consumer.create(config) if(options.has(resetBeginningOpt)) - tryCleanupZookeeper(options.valueOf(zkConnectOpt), options.valueOf(groupIdOpt)) + ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt)) Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { connector.shutdown() // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack - if(!options.has(groupIdOpt)) - tryCleanupZookeeper(options.valueOf(zkConnectOpt), options.valueOf(groupIdOpt)) + if(!options.has(groupIdOpt)) + ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt)) } }) diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index 3c248a9ec96..47fedf49e9e 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -28,6 +28,7 @@ import kafka.utils.ZkUtils._ import kafka.utils.{ShutdownableThread, SystemTime} import kafka.utils.Utils._ import kafka.common.TopicAndPartition +import kafka.client.ClientUtils /** * Usage: @@ -52,7 +53,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, cond.await() val brokers = getAllBrokersInCluster(zkClient) - val topicsMetadata = getTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers).topicsMetadata + val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers).topicsMetadata val leaderForPartitionsMap = new HashMap[(String, Int), Broker] topicsMetadata.foreach( tmd => { diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala index 06bce24d662..9cf171eaf52 100644 --- a/core/src/main/scala/kafka/consumer/TopicCount.scala +++ b/core/src/main/scala/kafka/consumer/TopicCount.scala @@ -20,7 +20,7 @@ package kafka.consumer import scala.collection._ import org.I0Itec.zkclient.ZkClient import java.util.regex.Pattern -import kafka.utils.{SyncJSON, ZKGroupDirs, ZkUtils, Logging} +import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging} private[kafka] trait TopicCount { def getConsumerThreadIdsPerTopic: Map[String, Set[String]] @@ -88,7 +88,7 @@ private[kafka] object TopicCount extends Logging { else { var topMap : Map[String,Int] = null try { - SyncJSON.parseFull(topicCountString) match { + Json.parseFull(topicCountString) match { case Some(m) => topMap = m.asInstanceOf[Map[String,Int]] case None => throw new RuntimeException("error constructing TopicCount : " + topicCountString) } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 23bdb68772b..13e87e456dc 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -31,6 +31,7 @@ import java.util.UUID import kafka.serializer.Decoder import kafka.utils.ZkUtils._ import kafka.common._ +import kafka.client.ClientUtils import com.yammer.metrics.core.Gauge import kafka.metrics.KafkaMetricsGroup import kafka.utils.Utils._ @@ -390,7 +391,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic val consumersPerTopicMap = getConsumersPerTopic(zkClient, group) val brokers = getAllBrokersInCluster(zkClient) - val topicsMetadata = getTopicMetadata(myTopicThreadIdsMap.keySet, brokers).topicsMetadata + val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers).topicsMetadata val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]] val leaderIdForPartitionsMap = new mutable.HashMap[(String, Int), Int] topicsMetadata.foreach(m =>{ diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 137a0970198..3791a036332 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -119,7 +119,7 @@ class RequestSendThread(val controllerId: Int, var response: RequestOrResponse = null request.requestId.get match { case RequestKeys.LeaderAndIsrKey => - response = LeaderAndISRResponse.readFrom(receive.buffer) + response = LeaderAndIsrResponse.readFrom(receive.buffer) case RequestKeys.StopReplicaKey => response = StopReplicaResponse.readFrom(receive.buffer) } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 23f1c2ddd02..7f5e9aa5daf 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -65,9 +65,9 @@ private[kafka] class LogManager(val config: KafkaConfig, warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?") } else { info("Loading log '" + dir.getName() + "'") - val topic = Utils.getTopicPartition(dir.getName)._1 - val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs) - val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize) + val topicPartition = parseTopicPartitionName(dir.getName) + val rollIntervalMs = logRollMsMap.get(topicPartition.topic).getOrElse(this.logRollDefaultIntervalMs) + val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logFileSize) val log = new Log(dir, maxLogFileSize, config.maxMessageSize, @@ -78,10 +78,9 @@ private[kafka] class LogManager(val config: KafkaConfig, config.logIndexIntervalBytes, time, config.brokerId) - val topicPartition = Utils.getTopicPartition(dir.getName) - logs.putIfNotExists(topicPartition._1, new Pool[Int, Log]()) - val parts = logs.get(topicPartition._1) - parts.put(topicPartition._2, log) + logs.putIfNotExists(topicPartition.topic, new Pool[Int, Log]()) + val parts = logs.get(topicPartition.topic) + parts.put(topicPartition.partition, log) } } } @@ -168,7 +167,7 @@ private[kafka] class LogManager(val config: KafkaConfig, /* Runs through the log removing segments older than a certain age */ private def cleanupExpiredSegments(log: Log): Int = { val startMs = time.milliseconds - val topic = Utils.getTopicPartition(log.name)._1 + val topic = parseTopicPartitionName(log.name).topic val logCleanupThresholdMs = logRetentionMsMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs) val toBeDeleted = log.markDeletedWhile(startMs - _.messageSet.file.lastModified > logCleanupThresholdMs) val total = log.deleteSegments(toBeDeleted) @@ -180,7 +179,7 @@ private[kafka] class LogManager(val config: KafkaConfig, * is at least logRetentionSize bytes in size */ private def cleanupSegmentsToMaintainSize(log: Log): Int = { - val topic = Utils.getTopicPartition(log.dir.getName)._1 + val topic = parseTopicPartitionName(log.dir.getName).topic val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionSize) if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize) return 0 var diff = log.size - maxLogRetentionSize @@ -256,5 +255,10 @@ private[kafka] class LogManager(val config: KafkaConfig, def topics(): Iterable[String] = logs.keys + + private def parseTopicPartitionName(name: String): TopicAndPartition = { + val index = name.lastIndexOf('-') + TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt) + } } diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala index aff46e4530d..aedab429596 100644 --- a/core/src/main/scala/kafka/message/Message.scala +++ b/core/src/main/scala/kafka/message/Message.scala @@ -116,7 +116,7 @@ class Message(val buffer: ByteBuffer) { buffer.rewind() // now compute the checksum and fill it in - Utils.putUnsignedInt(buffer, CrcOffset, computeChecksum) + Utils.writeUnsignedInt(buffer, CrcOffset, computeChecksum) } def this(bytes: Array[Byte], key: Array[Byte], codec: CompressionCodec) = @@ -140,7 +140,7 @@ class Message(val buffer: ByteBuffer) { /** * Retrieve the previously computed CRC for this message */ - def checksum: Long = Utils.getUnsignedInt(buffer, CrcOffset) + def checksum: Long = Utils.readUnsignedInt(buffer, CrcOffset) /** * Returns true if the crc stored with the message matches the crc computed off the message contents diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala index 35c4f22f87b..84f6208610e 100644 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala @@ -28,7 +28,7 @@ class KafkaMetricsConfig(props: VerifiableProperties) { * Comma-separated list of reporter types. These classes should be on the * classpath and will be instantiated at run-time. */ - val reporters = Utils.getCSVList(props.getString("kafka.metrics.reporters", "")) + val reporters = Utils.parseCsvList(props.getString("kafka.metrics.reporters", "")) /** * The metrics polling interval (in seconds). diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala index f881b29f896..cdb3cb6486d 100644 --- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala +++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala @@ -21,6 +21,8 @@ import kafka.api.{TopicMetadataRequest, TopicMetadata} import kafka.common.KafkaException import kafka.utils.{Logging, Utils} import kafka.common.ErrorMapping +import kafka.cluster.Broker +import kafka.client.ClientUtils class BrokerPartitionInfo(producerConfig: ProducerConfig, @@ -28,7 +30,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, topicPartitionInfo: HashMap[String, TopicMetadata]) extends Logging { val brokerList = producerConfig.brokerList - val brokers = Utils.getAllBrokersFromBrokerList(brokerList) + val brokers = ClientUtils.parseBrokerList(brokerList) /** * Return a sequence of (brokerId, numPartitions). @@ -71,7 +73,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, */ def updateInfo(topics: Set[String]) = { var topicsMetadata: Seq[TopicMetadata] = Nil - val topicMetadataResponse = Utils.getTopicMetadata(topics, brokers) + val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers) topicsMetadata = topicMetadataResponse.topicsMetadata // throw partition specific exception topicsMetadata.foreach(tmd =>{ @@ -88,6 +90,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, }) producerPool.updateProducer(topicsMetadata) } + } case class PartitionAndLeader(topic: String, partitionId: Int, leaderBrokerIdOpt: Option[Int]) diff --git a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala index 34592246928..ccbb76c1066 100644 --- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala +++ b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala @@ -17,6 +17,8 @@ package kafka.producer +import kafka.utils.Utils + private[kafka] class DefaultPartitioner[T] extends Partitioner[T] { private val random = new java.util.Random @@ -24,6 +26,6 @@ private[kafka] class DefaultPartitioner[T] extends Partitioner[T] { if(key == null) random.nextInt(numPartitions) else - math.abs(key.hashCode) % numPartitions + Utils.abs(key.hashCode) % numPartitions } } diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index 0baa5a5aa7a..84c52789aed 100644 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -18,6 +18,7 @@ package kafka.producer import async.{AsyncProducerStats, DefaultEventHandler, ProducerSendThread, EventHandler} import kafka.utils._ +import java.util.Random import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} import kafka.serializer.Encoder import java.util.concurrent.atomic.AtomicBoolean @@ -33,13 +34,14 @@ extends Logging { private val queue = new LinkedBlockingQueue[ProducerData[K,V]](config.queueSize) + private val random = new Random private var sync: Boolean = true private var producerSendThread: ProducerSendThread[K,V] = null config.producerType match { case "sync" => case "async" => sync = false - val asyncProducerID = Utils.getNextRandomInt + val asyncProducerID = random.nextInt(Int.MaxValue) producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + asyncProducerID, queue, eventHandler, config.queueTime, config.batchSize) producerSendThread.start @@ -49,8 +51,8 @@ extends Logging { def this(config: ProducerConfig) = this(config, new DefaultEventHandler[K,V](config, - Utils.getObject[Partitioner[K]](config.partitionerClass), - Utils.getObject[Encoder[V]](config.serializerClass), + Utils.createObject[Partitioner[K]](config.partitionerClass), + Utils.createObject[Encoder[V]](config.serializerClass), new ProducerPool(config))) /** diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala index c14061f218e..2977095dc76 100644 --- a/core/src/main/scala/kafka/producer/ProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala @@ -78,7 +78,7 @@ class ProducerConfig private (val props: VerifiableProperties) * * If the compression codec is NoCompressionCodec, compression is disabled for all topics */ - val compressedTopics = Utils.getCSVList(props.getString("compressed.topics", null)) + val compressedTopics = Utils.parseCsvList(props.getString("compressed.topics", null)) /** * The producer using the zookeeper software load balancer maintains a ZK cache that gets diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 3f3f8d2af3b..9b8a9bc8cc6 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -24,6 +24,7 @@ import kafka.serializer.Encoder import kafka.utils.{Utils, Logging} import scala.collection.{Seq, Map} import scala.collection.mutable.{ListBuffer, HashMap} +import java.util.concurrent.atomic._ import kafka.api.{TopicMetadata, ProducerRequest} @@ -35,6 +36,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, extends EventHandler[K,V] with Logging { val isSync = ("sync" == config.producerType) + val counter = new AtomicInteger(0) val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos) private val lock = new Object() @@ -185,8 +187,11 @@ class DefaultEventHandler[K,V](config: ProducerConfig, if(numPartitions <= 0) throw new UnknownTopicOrPartitionException("Invalid number of partitions: " + numPartitions + "\n Valid values are > 0") - val partition = if(key == null) Utils.getNextRandomInt(numPartitions) - else partitioner.partition(key, numPartitions) + val partition = + if(key == null) + Utils.abs(counter.getAndIncrement()) % numPartitions + else + partitioner.partition(key, numPartitions) if(partition < 0 || partition >= numPartitions) throw new UnknownTopicOrPartitionException("Invalid partition id : " + partition + "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]") diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 133083a4154..4a868d26d07 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -57,22 +57,22 @@ class KafkaApis(val requestChannel: RequestChannel, case RequestKeys.FetchKey => handleFetchRequest(request) case RequestKeys.OffsetsKey => handleOffsetRequest(request) case RequestKeys.MetadataKey => handleTopicMetadataRequest(request) - case RequestKeys.LeaderAndIsrKey => handleLeaderAndISRRequest(request) + case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request) case requestId => throw new KafkaException("No mapping found for handler id " + requestId) } request.apiLocalCompleteTimeNs = SystemTime.nanoseconds } - def handleLeaderAndISRRequest(request: RequestChannel.Request) { - val leaderAndISRRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest] + def handleLeaderAndIsrRequest(request: RequestChannel.Request) { + val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest] if(requestLogger.isTraceEnabled) - requestLogger.trace("Handling leader and isr request " + leaderAndISRRequest) - trace("Handling leader and isr request " + leaderAndISRRequest) + requestLogger.trace("Handling leader and ISR request " + leaderAndIsrRequest) + trace("Handling leader and ISR request " + leaderAndIsrRequest) try { - val responseMap = replicaManager.becomeLeaderOrFollower(leaderAndISRRequest) - val leaderAndISRResponse = new LeaderAndISRResponse(leaderAndISRRequest.versionId, responseMap) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndISRResponse))) + val responseMap = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest) + val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.versionId, responseMap) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse))) } catch { case e: KafkaStorageException => fatal("Disk error during leadership change.", e) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 4252c897db7..43cf3b11f80 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -22,6 +22,7 @@ import kafka.message.Message import kafka.consumer.ConsumerConfig import java.net.InetAddress import kafka.utils.{Topic, Utils, VerifiableProperties, ZKConfig} +import scala.collection._ /** * Configuration settings for the kafka server @@ -73,32 +74,32 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the default number of log partitions per topic */ val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue)) - /* the directory in which the log data is kept */ + /* the directories in which the log data is kept */ val logDir = props.getString("log.dir") /* the maximum size of a single log file */ val logFileSize = props.getIntInRange("log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue)) /* the maximum size of a single log file for some specific topic */ - val logFileSizeMap = Utils.getTopicFileSize(props.getString("topic.log.file.size", "")) + val logFileSizeMap = props.getMap("topic.log.file.size", _.toInt > 0).mapValues(_.toInt) /* the maximum time before a new log segment is rolled out */ val logRollHours = props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue)) /* the number of hours before rolling out a new log segment for some specific topic */ - val logRollHoursMap = Utils.getTopicRollHours(props.getString("topic.log.roll.hours", "")) + val logRollHoursMap = props.getMap("topic.log.roll.hours", _.toInt > 0).mapValues(_.toInt) /* the number of hours to keep a log file before deleting it */ val logRetentionHours = props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue)) /* the number of hours to keep a log file before deleting it for some specific topic*/ - val logRetentionHoursMap = Utils.getTopicRetentionHours(props.getString("topic.log.retention.hours", "")) + val logRetentionHoursMap = props.getMap("topic.log.retention.hours", _.toInt > 0).mapValues(_.toInt) /* the maximum size of the log before deleting it */ val logRetentionSize = props.getLong("log.retention.size", -1) /* the maximum size of the log for some specific topic before deleting it */ - val logRetentionSizeMap = Utils.getTopicRetentionSize(props.getString("topic.log.retention.size", "")) + val logRetentionSizeMap = props.getMap("topic.log.retention.size", _.toLong > 0).mapValues(_.toLong) /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */ val logCleanupIntervalMinutes = props.getIntInRange("log.cleanup.interval.mins", 10, (1, Int.MaxValue)) @@ -113,7 +114,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val flushInterval = props.getIntInRange("log.flush.interval", 500, (1, Int.MaxValue)) /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */ - val flushIntervalMap = Utils.getTopicFlushIntervals(props.getString("topic.flush.intervals.ms", "")) + val flushIntervalMap = props.getMap("topic.flush.intervals.ms", _.toInt > 0).mapValues(_.toInt) /* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */ val flushSchedulerThreadRate = props.getInt("log.default.flush.scheduler.interval.ms", 3000) @@ -161,4 +162,5 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* number of fetcher threads used to replicate messages from a source broker. * Increasing this value can increase the degree of I/O parallelism in the follower broker. */ val numReplicaFetchers = props.getInt("replica.fetchers", 1) + } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 1e3e2a87ab5..8461dbeea8a 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -33,7 +33,10 @@ object ReplicaManager { val UnknownLogEndOffset = -1L } -class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient, kafkaScheduler: KafkaScheduler, +class ReplicaManager(val config: KafkaConfig, + time: Time, + val zkClient: ZkClient, + kafkaScheduler: KafkaScheduler, val logManager: LogManager) extends Logging with KafkaMetricsGroup { private val allPartitions = new Pool[(String, Int), Partition] private var leaderPartitions = new mutable.HashSet[Partition]() @@ -85,7 +88,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient def startup() { // start ISR expiration thread - kafkaScheduler.scheduleWithRate(maybeShrinkISR, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs) + kafkaScheduler.scheduleWithRate(maybeShrinkIsr, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs) } def stopReplica(topic: String, partitionId: Int): Short = { @@ -221,17 +224,17 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient } } - private def maybeShrinkISR(): Unit = { + private def maybeShrinkIsr(): Unit = { trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR") leaderPartitionsLock synchronized { - leaderPartitions.foreach(partition => partition.maybeShrinkISR(config.replicaMaxLagTimeMs, config.replicaMaxLagBytes)) + leaderPartitions.foreach(partition => partition.maybeShrinkIsr(config.replicaMaxLagTimeMs, config.replicaMaxLagBytes)) } } def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long) = { val partitionOpt = getPartition(topic, partitionId) if(partitionOpt.isDefined){ - partitionOpt.get.updateLeaderHWAndMaybeExpandISR(replicaId, offset) + partitionOpt.get.updateLeaderHWAndMaybeExpandIsr(replicaId, offset) } else { warn("While recording the follower position, the partition [%s, %d] hasn't been created, skip updating leader HW".format(topic, partitionId)) } diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index dc11990e0dd..edf8b09a8eb 100644 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -69,8 +69,8 @@ object DumpLogSegments { print(" keysize: " + msg.keySize) if(printContents) { if(msg.hasKey) - print(" key: " + Utils.toString(messageAndOffset.message.payload, "UTF-8")) - print(" payload: " + Utils.toString(messageAndOffset.message.payload, "UTF-8")) + print(" key: " + Utils.readString(messageAndOffset.message.payload, "UTF-8")) + print(" payload: " + Utils.readString(messageAndOffset.message.payload, "UTF-8")) } println() } diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 3438f2c910b..8a2588d6113 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -19,7 +19,7 @@ package kafka.tools import kafka.message.Message import joptsimple.OptionParser -import kafka.utils.{Utils, Logging} +import kafka.utils.{Utils, CommandLineUtils, Logging} import kafka.producer.{ProducerData, ProducerConfig, Producer} import scala.collection.JavaConversions._ import java.util.concurrent.CountDownLatch @@ -81,8 +81,7 @@ object MirrorMaker extends Logging { System.exit(0) } - Utils.checkRequiredArgs( - parser, options, consumerConfigOpt, producerConfigOpt) + CommandLineUtils.checkRequiredArgs(parser, options, consumerConfigOpt, producerConfigOpt) if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) { println("Exactly one of whitelist or blacklist is required.") System.exit(1) diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala index 83bbc940d6a..952b034931d 100644 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -22,14 +22,14 @@ import java.util.concurrent.{Executors, CountDownLatch} import java.util.Properties import kafka.producer.{ProducerData, ProducerConfig, Producer} import kafka.consumer._ -import kafka.utils.{ZKStringSerializer, Logging} +import kafka.utils.{ZKStringSerializer, Logging, ZkUtils} import kafka.api.OffsetRequest import org.I0Itec.zkclient._ import kafka.message.{CompressionCodec, Message} object ReplayLogProducer extends Logging { - private val GROUPID: String = "replay-log-producer" + private val GroupId: String = "replay-log-producer" def main(args: Array[String]) { val config = new Config(args) @@ -38,12 +38,12 @@ object ReplayLogProducer extends Logging { val allDone = new CountDownLatch(config.numThreads) // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack - tryCleanupZookeeper(config.zkConnect, GROUPID) + ZkUtils.maybeDeletePath(config.zkConnect, "/consumers/" + GroupId) Thread.sleep(500) // consumer properties val consumerProps = new Properties - consumerProps.put("groupid", GROUPID) + consumerProps.put("groupid", GroupId) consumerProps.put("zk.connect", config.zkConnect) consumerProps.put("consumer.timeout.ms", "10000") consumerProps.put("autooffset.reset", OffsetRequest.SmallestTimeString) @@ -137,18 +137,6 @@ object ReplayLogProducer extends Logging { val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue) } - def tryCleanupZookeeper(zkUrl: String, groupId: String) { - try { - val dir = "/consumers/" + groupId - info("Cleaning up temporary zookeeper data under " + dir + ".") - val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) - zk.deleteRecursive(dir) - zk.close() - } catch { - case _ => // swallow - } - } - class ZKConsumerThread(config: Config, stream: KafkaStream[Message]) extends Thread with Logging { val shutdownLatch = new CountDownLatch(1) val props = new Properties() diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index 92811374e1b..f696050e8d8 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -20,6 +20,7 @@ package kafka.tools import joptsimple._ import kafka.utils._ import kafka.consumer._ +import kafka.client.ClientUtils import kafka.api.{OffsetRequest, FetchRequestBuilder, Request} import kafka.cluster.Broker import scala.collection.JavaConversions._ @@ -30,7 +31,7 @@ import scala.collection.JavaConversions._ */ object SimpleConsumerShell extends Logging { - def USE_LEADER_REPLICA = -1 + def UseLeaderReplica = -1 def main(args: Array[String]): Unit = { @@ -52,7 +53,7 @@ object SimpleConsumerShell extends Logging { .withRequiredArg .describedAs("replica id") .ofType(classOf[java.lang.Integer]) - .defaultsTo(USE_LEADER_REPLICA) + .defaultsTo(UseLeaderReplica) val offsetOpt = parser.accepts("offset", "The offset id to consume from, default to -2 which means from beginning; while value -1 means from end") .withOptionalArg() .describedAs("consume offset") @@ -115,8 +116,8 @@ object SimpleConsumerShell extends Logging { // getting topic metadata info("Getting topic metatdata...") - val metadataTargetBrokers = Utils.getAllBrokersFromBrokerList(options.valueOf(brokerListOpt)) - val topicsMetadata = Utils.getTopicMetadata(Set(topic), metadataTargetBrokers).topicsMetadata + val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) + val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers).topicsMetadata if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata)) System.exit(1) @@ -133,7 +134,7 @@ object SimpleConsumerShell extends Logging { // validating replica id and initializing target broker var fetchTargetBroker: Broker = null var replicaOpt: Option[Broker] = null - if(replicaId == USE_LEADER_REPLICA) { + if(replicaId == UseLeaderReplica) { replicaOpt = partitionMetadataOpt.get.leader if(!replicaOpt.isDefined) { System.err.println("Error: user speicifies to fetch from leader for partition (%s, %d) which has not been elected yet".format(replicaId, topic, partitionId)) diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala b/core/src/main/scala/kafka/utils/CommandLineUtils.scala new file mode 100644 index 00000000000..5516afa50de --- /dev/null +++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala @@ -0,0 +1,20 @@ +package kafka.utils + +import joptsimple.{OptionSpec, OptionSet, OptionParser} + +/** + * Helper functions for dealing with command line utilities + */ +object CommandLineUtils { + + def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) { + for(arg <- required) { + if(!options.has(arg)) { + error("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + } + +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala new file mode 100644 index 00000000000..a1147699d92 --- /dev/null +++ b/core/src/main/scala/kafka/utils/Json.scala @@ -0,0 +1,24 @@ +package kafka.utils + +import kafka.common._ +import util.parsing.json.JSON + +/** + * A wrapper that synchronizes JSON in scala, which is not threadsafe. + */ +object Json extends Logging { + val myConversionFunc = {input : String => input.toInt} + JSON.globalNumberParser = myConversionFunc + val lock = new Object + + def parseFull(input: String): Option[Any] = { + lock synchronized { + try { + JSON.parseFull(input) + } catch { + case t => + throw new KafkaException("Can't parse json string: %s".format(input), t) + } + } + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index c6f71d56f61..753234e547f 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -26,26 +26,22 @@ import java.util.zip.CRC32 import javax.management._ import scala.collection._ import scala.collection.mutable -import org.I0Itec.zkclient.ZkClient -import java.util.{Random, Properties} -import joptsimple.{OptionSpec, OptionSet, OptionParser} +import java.util.Properties import kafka.common.KafkaException -import kafka.cluster.Broker -import util.parsing.json.JSON -import kafka.api.RequestOrResponse -import kafka.api.{TopicMetadataRequest, TopicMetadataResponse} -import kafka.producer.{ProducerPool, SyncProducer} /** - * Helper functions! + * General helper functions! + * + * This is for general helper functions that aren't specific to Kafka logic. Things that should have been included in + * the standard library etc. + * + * If you are making a new helper function and want to add it to this class please ensure the following: + * 1. It has documentation + * 2. It is the most general possible utility, not just the thing you needed in one particular place + * 3. You have tests for it if it is nontrivial in any way */ object Utils extends Logging { - val random = new Random - - def getNextRandomInt(): Int = random.nextInt - - def getNextRandomInt(upper: Int): Int = random.nextInt(upper) /** * Wrap the given function in a java.lang.Runnable @@ -151,55 +147,6 @@ object Utils extends Logging { } bytes } - - /** - * Read size prefixed string where the size is stored as a 2 byte short. - * @param buffer The buffer to read from - * @param encoding The encoding in which to read the string - */ - def readShortString(buffer: ByteBuffer, encoding: String = RequestOrResponse.DefaultCharset): String = { - val size: Int = buffer.getShort() - if(size < 0) - return null - val bytes = new Array[Byte](size) - buffer.get(bytes) - new String(bytes, encoding) - } - - /** - * Write a size prefixed string where the size is stored as a 2 byte short - * @param buffer The buffer to write to - * @param string The string to write - * @param encoding The encoding in which to write the string - */ - def writeShortString(buffer: ByteBuffer, string: String, encoding: String = RequestOrResponse.DefaultCharset) { - if(string == null) { - buffer.putShort(-1) - } else if(string.length > Short.MaxValue) { - throw new KafkaException("String exceeds the maximum size of " + Short.MaxValue + ".") - } else { - buffer.putShort(string.length.asInstanceOf[Short]) - buffer.put(string.getBytes(encoding)) - } - } - - /** - * Return size of a size prefixed string where the size is stored as a 2 byte short - * @param string The string to write - * @param encoding The encoding in which to write the string - */ - def shortStringLength(string: String, encoding: String = RequestOrResponse.DefaultCharset): Int = { - if(string == null) { - 2 - } else { - val encodedString = string.getBytes(encoding) - if(encodedString.length > Short.MaxValue) { - throw new KafkaException("String exceeds the maximum size of " + Short.MaxValue + ".") - } else { - 2 + encodedString.length - } - } - } /** * Read a properties file from the given path @@ -212,27 +159,6 @@ object Utils extends Logging { props } - def getIntInRange(buffer: ByteBuffer, name: String, range: (Int, Int)): Int = { - val value = buffer.getInt - if(value < range._1 || value > range._2) - throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".") - else value - } - - def getShortInRange(buffer: ByteBuffer, name: String, range: (Short, Short)): Short = { - val value = buffer.getShort - if(value < range._1 || value > range._2) - throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".") - else value - } - - def getLongInRange(buffer: ByteBuffer, name: String, range: (Long, Long)): Long = { - val value = buffer.getLong - if(value < range._1 || value > range._2) - throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".") - else value - } - /** * Open a channel for the given file */ @@ -278,7 +204,7 @@ object Utils extends Logging { * @param buffer The buffer to translate * @param encoding The encoding to use in translating bytes to characters */ - def toString(buffer: ByteBuffer, encoding: String): String = { + def readString(buffer: ByteBuffer, encoding: String): String = { val bytes = new Array[Byte](buffer.remaining) buffer.get(bytes) new String(bytes, encoding) @@ -365,7 +291,7 @@ object Utils extends Logging { * @param buffer The buffer to read from * @return The integer read, as a long to avoid signedness */ - def getUnsignedInt(buffer: ByteBuffer): Long = + def readUnsignedInt(buffer: ByteBuffer): Long = buffer.getInt() & 0xffffffffL /** @@ -375,7 +301,7 @@ object Utils extends Logging { * @param index the index from which to read the integer * @return The integer read, as a long to avoid signedness */ - def getUnsignedInt(buffer: ByteBuffer, index: Int): Long = + def readUnsignedInt(buffer: ByteBuffer, index: Int): Long = buffer.getInt(index) & 0xffffffffL /** @@ -383,7 +309,7 @@ object Utils extends Logging { * @param buffer The buffer to write to * @param value The value to write */ - def putUnsignedInt(buffer: ByteBuffer, value: Long): Unit = + def writetUnsignedInt(buffer: ByteBuffer, value: Long): Unit = buffer.putInt((value & 0xffffffffL).asInstanceOf[Int]) /** @@ -392,7 +318,7 @@ object Utils extends Logging { * @param index The position in the buffer at which to begin writing * @param value The value to write */ - def putUnsignedInt(buffer: ByteBuffer, index: Int, value: Long): Unit = + def writeUnsignedInt(buffer: ByteBuffer, index: Int, value: Long): Unit = buffer.putInt(index, (value & 0xffffffffL).asInstanceOf[Int]) /** @@ -458,6 +384,10 @@ object Utils extends Logging { } } + /** + * Throw an exception if the given value is null, else return it. You can use this like: + * val myValue = Utils.notNull(expressionThatShouldntBeNull) + */ def notNull[V](v: V) = { if(v == null) throw new KafkaException("Value cannot be null.") @@ -465,16 +395,17 @@ object Utils extends Logging { v } - def getHostPort(hostport: String) : (String, Int) = { + /** + * Parse a host and port out of a string + */ + def parseHostPort(hostport: String) : (String, Int) = { val splits = hostport.split(":") (splits(0), splits(1).toInt) } - def getTopicPartition(topicPartition: String) : (String, Int) = { - val index = topicPartition.lastIndexOf('-') - (topicPartition.substring(0,index), topicPartition.substring(index+1).toInt) - } - + /** + * Get the stack trace from an exception as a string + */ def stackTrace(e: Throwable): String = { val sw = new StringWriter; val pw = new PrintWriter(sw); @@ -486,113 +417,30 @@ object Utils extends Logging { * This method gets comma seperated values which contains key,value pairs and returns a map of * key value pairs. the format of allCSVal is key1:val1, key2:val2 .... */ - private def getCSVMap[K, V](allCSVals: String, exceptionMsg:String, successMsg:String) :Map[K, V] = { - val map = new mutable.HashMap[K, V] - if("".equals(allCSVals)) - return map - val csVals = allCSVals.split(",") - for(i <- 0 until csVals.length) - { - try{ - val tempSplit = csVals(i).split(":") - info(successMsg + tempSplit(0) + " : " + Integer.parseInt(tempSplit(1).trim)) - map += tempSplit(0).asInstanceOf[K] -> Integer.parseInt(tempSplit(1).trim).asInstanceOf[V] - } catch { - case _ => error(exceptionMsg + ": " + csVals(i)) - } - } - map + def parseCsvMap(str: String): Map[String, String] = { + val map = new mutable.HashMap[String, String] + if("".equals(str)) + return map + val keyVals = str.split("\\s*,\\s*").map(s => s.split("\\s*:\\s*")) + keyVals.map(pair => (pair(0), pair(1))).toMap } - - def getCSVList(csvList: String): Seq[String] = { + + /** + * Parse a comma separated string into a sequence of strings. + * Whitespace surrounding the comma will be removed. + */ + def parseCsvList(csvList: String): Seq[String] = { if(csvList == null) Seq.empty[String] else { - csvList.split(",").filter(v => !v.equals("")) + csvList.split("\\s*,\\s*").filter(v => !v.equals("")) } } - def seqToCSV(seq: Seq[String]): String = { - var csvString = "" - for (i <- 0 until seq.size) { - if (i > 0) - csvString = csvString + ',' - csvString = csvString + seq(i) - } - csvString - } - - def getTopicRetentionHours(retentionHours: String) : Map[String, Int] = { - val exceptionMsg = "Malformed token for topic.log.retention.hours in server.properties: " - val successMsg = "The retention hours for " - val map: Map[String, Int] = getCSVMap(retentionHours, exceptionMsg, successMsg) - map.foreach{case(topic, hrs) => - require(hrs > 0, "Log retention hours value for topic " + topic + " is " + hrs + - " which is not greater than 0.")} - map - } - - def getTopicRollHours(rollHours: String) : Map[String, Int] = { - val exceptionMsg = "Malformed token for topic.log.roll.hours in server.properties: " - val successMsg = "The roll hours for " - val map: Map[String, Int] = getCSVMap(rollHours, exceptionMsg, successMsg) - map.foreach{case(topic, hrs) => - require(hrs > 0, "Log roll hours value for topic " + topic + " is " + hrs + - " which is not greater than 0.")} - map - } - - def getTopicFileSize(fileSizes: String): Map[String, Int] = { - val exceptionMsg = "Malformed token for topic.log.file.size in server.properties: " - val successMsg = "The log file size for " - val map: Map[String, Int] = getCSVMap(fileSizes, exceptionMsg, successMsg) - map.foreach{case(topic, size) => - require(size > 0, "Log file size value for topic " + topic + " is " + size + - " which is not greater than 0.")} - map - } - - def getTopicRetentionSize(retentionSizes: String): Map[String, Long] = { - val exceptionMsg = "Malformed token for topic.log.retention.size in server.properties: " - val successMsg = "The log retention size for " - val map: Map[String, Long] = getCSVMap(retentionSizes, exceptionMsg, successMsg) - map.foreach{case(topic, size) => - require(size > 0, "Log retention size value for topic " + topic + " is " + size + - " which is not greater than 0.")} - map - } - - def getTopicFlushIntervals(allIntervals: String) : Map[String, Int] = { - val exceptionMsg = "Malformed token for topic.flush.Intervals.ms in server.properties: " - val successMsg = "The flush interval for " - val map: Map[String, Int] = getCSVMap(allIntervals, exceptionMsg, successMsg) - map.foreach{case(topic, interval) => - require(interval > 0, "Flush interval value for topic " + topic + " is " + interval + - " ms which is not greater than 0.")} - map - } - - def getTopicPartitions(allPartitions: String) : Map[String, Int] = { - val exceptionMsg = "Malformed token for topic.partition.counts in server.properties: " - val successMsg = "The number of partitions for topic " - val map: Map[String, Int] = getCSVMap(allPartitions, exceptionMsg, successMsg) - map.foreach{case(topic, count) => - require(count > 0, "The number of partitions for topic " + topic + " is " + count + - " which is not greater than 0.")} - map - } - - def getConsumerTopicMap(consumerTopicString: String) : Map[String, Int] = { - val exceptionMsg = "Malformed token for embeddedconsumer.topics in consumer.properties: " - val successMsg = "The number of consumer threads for topic " - val map: Map[String, Int] = getCSVMap(consumerTopicString, exceptionMsg, successMsg) - map.foreach{case(topic, count) => - require(count > 0, "The number of consumer threads for topic " + topic + " is " + count + - " which is not greater than 0.")} - map - } - - def getObject[T<:AnyRef](className: String): T = { + /** + * Create an instance of the class with the given class name + */ + def createObject[T<:AnyRef](className: String): T = { className match { case null => null.asInstanceOf[T] case _ => @@ -604,27 +452,15 @@ object Utils extends Logging { } } - def propertyExists(prop: String): Boolean = { - if(prop == null) - false - else if(prop.compareTo("") == 0) - false - else true - } + /** + * Is the given string null or empty ("")? + */ + def nullOrEmpty(s: String): Boolean = s == null || s.equals("") - def tryCleanupZookeeper(zkUrl: String, groupId: String) { - try { - val dir = "/consumers/" + groupId - info("Cleaning up temporary zookeeper data under " + dir + ".") - val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) - zk.deleteRecursive(dir) - zk.close() - } catch { - case _ => // swallow - } - } - - def stringMapToJsonString(jsonDataMap: Map[String, String]): String = { + /** + * Format a Map[String, String] as JSON + */ + def stringMapToJson(jsonDataMap: Map[String, String]): String = { val builder = new StringBuilder builder.append("{ ") var numElements = 0 @@ -639,6 +475,9 @@ object Utils extends Logging { builder.toString } + /** + * Format an arbitrary map as JSON + */ def mapToJson[T <: Any](map: Map[String, Seq[String]]): String = { val builder = new StringBuilder builder.append("{ ") @@ -654,6 +493,9 @@ object Utils extends Logging { builder.toString } + /** + * Format a string array as json + */ def arrayToJson[T <: Any](arr: Array[String]): String = { val builder = new StringBuilder builder.append("[ ") @@ -668,57 +510,6 @@ object Utils extends Logging { builder.toString } - def getAllBrokersFromBrokerList(brokerListStr: String): Seq[Broker] = { - val brokersStr = Utils.getCSVList(brokerListStr) - - brokersStr.zipWithIndex.map(b =>{ - val brokerStr = b._1 - val brokerId = b._2 - val brokerInfos = brokerStr.split(":") - val hostName = brokerInfos(0) - val port = brokerInfos(1).toInt - val creatorId = hostName + "-" + System.currentTimeMillis() - new Broker(brokerId, creatorId, hostName, port) - }) - } - - def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) { - for(arg <- required) { - if(!options.has(arg)) { - error("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } - } - - def getTopicMetadata(topics: Set[String], brokers: Seq[Broker]): TopicMetadataResponse = { - var fetchMetaDataSucceeded: Boolean = false - var i: Int = 0 - val topicMetadataRequest = new TopicMetadataRequest(topics.toSeq) - var topicMetadataResponse: TopicMetadataResponse = null - var t: Throwable = null - while(i < brokers.size && !fetchMetaDataSucceeded) { - val producer: SyncProducer = ProducerPool.createSyncProducer(None, brokers(i)) - info("Fetching metadata for topic %s".format(topics)) - try { - topicMetadataResponse = producer.send(topicMetadataRequest) - fetchMetaDataSucceeded = true - } - catch { - case e => - warn("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, brokers(i).toString), e) - t = e - } finally { - i = i + 1 - producer.close() - } - } - if(!fetchMetaDataSucceeded){ - throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, brokers), t) - } - return topicMetadataResponse - } /** * Create a circular (looping) iterator over a collection. @@ -731,35 +522,25 @@ object Utils extends Logging { stream.iterator } - def readFileIntoString(path: String): String = { + /** + * Attempt to read a file as a string + */ + def readFileAsString(path: String, charset: Charset = Charset.defaultCharset()): String = { val stream = new FileInputStream(new File(path)) try { val fc = stream.getChannel() val bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size()) - Charset.defaultCharset().decode(bb).toString() + charset.decode(bb).toString() } finally { stream.close() } } -} - -/** - * A wrapper that synchronizes JSON in scala, which is not threadsafe. - */ -object SyncJSON extends Logging { - val myConversionFunc = {input : String => input.toInt} - JSON.globalNumberParser = myConversionFunc - val lock = new Object - - def parseFull(input: String): Option[Any] = { - lock synchronized { - try { - JSON.parseFull(input) - } catch { - case t => - throw new KafkaException("Can't parse json string: %s".format(input), t) - } - } - } + + /** + * Get the absolute value of the given number. If the number is Int.MinValue return 0. + * This is different from java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!). + */ + def abs(n: Int) = n & 0x7fffffff + } \ No newline at end of file diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala index 22aaba84836..30c2758fc8b 100644 --- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala +++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala @@ -18,7 +18,7 @@ package kafka.utils import java.util.Properties -import collection.mutable +import scala.collection._ class VerifiableProperties(val props: Properties) extends Logging { private val referenceSet = mutable.HashSet[String]() @@ -156,6 +156,23 @@ class VerifiableProperties(val props: Properties) extends Logging { require(containsKey(name), "Missing required property '" + name + "'") getProperty(name) } + + /** + * Get a Map[String, String] from a property list in the form k1:v2, k2:v2, ... + */ + def getMap(name: String, valid: String => Boolean): Map[String, String] = { + try { + val m = Utils.parseCsvMap(getString(name, "")) + m.foreach { + case(key, value) => + if(!valid(value)) + throw new IllegalArgumentException("Invalid entry '%s' = '%s' for property '%s'".format(key, value, name)) + } + m + } catch { + case e: Exception => throw new IllegalArgumentException("Error parsing configuration property '%s': %s".format(name, e.getMessage)) + } + } def verify() { info("Verifying properties") diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 4c709503cd8..114bc98e29a 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -75,8 +75,8 @@ object ZkUtils extends Logging { } def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = { - val leaderAndISRPath = getTopicPartitionLeaderAndIsrPath(topic, partition) - val leaderAndIsrInfo = readDataMaybeNull(zkClient, leaderAndISRPath) + val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition) + val leaderAndIsrInfo = readDataMaybeNull(zkClient, leaderAndIsrPath) val leaderAndIsrOpt = leaderAndIsrInfo._1 val stat = leaderAndIsrInfo._2 leaderAndIsrOpt match { @@ -86,12 +86,12 @@ object ZkUtils extends Logging { } def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat): Option[LeaderAndIsr] = { - SyncJSON.parseFull(leaderAndIsrStr) match { + Json.parseFull(leaderAndIsrStr) match { case Some(m) => val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get - val isr = Utils.getCSVList(isrString).map(r => r.toInt) + val isr = Utils.parseCsvList(isrString).map(r => r.toInt) val zkPathVersion = stat.getVersion debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch, isr.toString(), zkPathVersion, topic, partition)) @@ -104,7 +104,7 @@ object ZkUtils extends Logging { val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic, partition))._1 leaderAndIsrOpt match { case Some(leaderAndIsr) => - SyncJSON.parseFull(leaderAndIsr) match { + Json.parseFull(leaderAndIsr) match { case Some(m) => Some(m.asInstanceOf[Map[String, String]].get("leader").get.toInt) case None => None @@ -122,7 +122,7 @@ object ZkUtils extends Logging { val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic, partition))._1 leaderAndIsrOpt match { case Some(leaderAndIsr) => - SyncJSON.parseFull(leaderAndIsr) match { + Json.parseFull(leaderAndIsr) match { case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data for topic %s partition %d is invalid".format(topic, partition)) case Some(m) => m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt } @@ -138,10 +138,10 @@ object ZkUtils extends Logging { val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic, partition))._1 leaderAndIsrOpt match { case Some(leaderAndIsr) => - SyncJSON.parseFull(leaderAndIsr) match { + Json.parseFull(leaderAndIsr) match { case Some(m) => - val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get - Utils.getCSVList(ISRString).map(r => r.toInt) + val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get + Utils.parseCsvList(isrString).map(r => r.toInt) case None => Seq.empty[Int] } case None => Seq.empty[Int] @@ -155,7 +155,7 @@ object ZkUtils extends Logging { val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1 jsonPartitionMapOpt match { case Some(jsonPartitionMap) => - SyncJSON.parseFull(jsonPartitionMap) match { + Json.parseFull(jsonPartitionMap) match { case Some(m) => m.asInstanceOf[Map[String, List[String]]].get(partition.toString) match { case None => Seq.empty[Int] case Some(seq) => seq.map(_.toInt) @@ -328,7 +328,7 @@ object ZkUtils extends Logging { case e2 => throw e2 } } - + def deletePath(client: ZkClient, path: String): Boolean = { try { client.delete(path) @@ -351,6 +351,16 @@ object ZkUtils extends Logging { case e2 => throw e2 } } + + def maybeDeletePath(zkUrl: String, dir: String) { + try { + val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) + zk.deleteRecursive(dir) + zk.close() + } catch { + case _ => // swallow + } + } def readData(client: ZkClient, path: String): (String, Stat) = { val stat: Stat = new Stat() @@ -413,7 +423,7 @@ object ZkUtils extends Logging { val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1 jsonPartitionMapOpt match { case Some(jsonPartitionMap) => - SyncJSON.parseFull(jsonPartitionMap) match { + Json.parseFull(jsonPartitionMap) match { case Some(m) => val replicaMap = m.asInstanceOf[Map[String, Seq[String]]] for((partition, replicas) <- replicaMap){ @@ -449,7 +459,7 @@ object ZkUtils extends Logging { val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1 jsonPartitionMapOpt match { case Some(jsonPartitionMap) => - SyncJSON.parseFull(jsonPartitionMap) match { + Json.parseFull(jsonPartitionMap) match { case Some(m) => val replicaMap = m.asInstanceOf[Map[String, Seq[String]]] for((partition, replicas) <- replicaMap){ @@ -471,7 +481,7 @@ object ZkUtils extends Logging { val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1 val partitionMap = jsonPartitionMapOpt match { case Some(jsonPartitionMap) => - SyncJSON.parseFull(jsonPartitionMap) match { + Json.parseFull(jsonPartitionMap) match { case Some(m) => val m1 = m.asInstanceOf[Map[String, Seq[String]]] m1.map(p => (p._1.toInt, p._2.map(_.toInt))) @@ -535,7 +545,7 @@ object ZkUtils extends Logging { } def parsePartitionReassignmentData(jsonData: String):Map[TopicAndPartition, Seq[Int]] = { - SyncJSON.parseFull(jsonData) match { + Json.parseFull(jsonData) match { case Some(m) => val replicaMap = m.asInstanceOf[Map[String, Seq[String]]] replicaMap.map { reassignedPartitions => @@ -590,7 +600,7 @@ object ZkUtils extends Logging { } def parsePreferredReplicaElectionData(jsonData: String):Set[TopicAndPartition] = { - SyncJSON.parseFull(jsonData) match { + Json.parseFull(jsonData) match { case Some(m) => val topicAndPartitions = m.asInstanceOf[Array[Map[String, String]]] val partitions = topicAndPartitions.map { p => diff --git a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala b/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala index fa709de7614..22601115b70 100644 --- a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala +++ b/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala @@ -62,7 +62,7 @@ private class ConsumerThread(stream: KafkaStream[Message]) extends Thread { override def run() { println("Starting consumer thread..") for (messageAndMetadata <- stream) { - println("consumed: " + Utils.toString(messageAndMetadata.message.payload, "UTF-8")) + println("consumed: " + Utils.readString(messageAndMetadata.message.payload, "UTF-8")) } shutdownLatch.countDown println("thread shutdown !" ) diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index b4a56530d08..c50f91aaeec 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -82,18 +82,18 @@ object SerializationTestUtils{ private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq) private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq) - def createTestLeaderAndISRRequest() : LeaderAndIsrRequest = { - val leaderAndISR1 = new LeaderAndIsr(leader1, 1, isr1, 1) - val leaderAndISR2 = new LeaderAndIsr(leader2, 1, isr2, 2) - val map = Map(((topic1, 0), PartitionStateInfo(leaderAndISR1, 3)), - ((topic2, 0), PartitionStateInfo(leaderAndISR2, 3))) + def createTestLeaderAndIsrRequest() : LeaderAndIsrRequest = { + val leaderAndIsr1 = new LeaderAndIsr(leader1, 1, isr1, 1) + val leaderAndIsr2 = new LeaderAndIsr(leader2, 1, isr2, 2) + val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3)), + ((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3))) new LeaderAndIsrRequest(map) } - def createTestLeaderAndISRResponse() : LeaderAndISRResponse = { + def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = { val responseMap = Map(((topic1, 0), ErrorMapping.NoError), ((topic2, 0), ErrorMapping.NoError)) - new LeaderAndISRResponse(1, responseMap) + new LeaderAndIsrResponse(1, responseMap) } def createTestStopReplicaRequest() : StopReplicaRequest = { @@ -145,8 +145,8 @@ object SerializationTestUtils{ } class RequestResponseSerializationTest extends JUnitSuite { - private val leaderAndISRRequest = SerializationTestUtils.createTestLeaderAndISRRequest - private val leaderAndISRResponse = SerializationTestUtils.createTestLeaderAndISRResponse + private val leaderAndIsrRequest = SerializationTestUtils.createTestLeaderAndIsrRequest + private val leaderAndIsrResponse = SerializationTestUtils.createTestLeaderAndIsrResponse private val stopReplicaRequest = SerializationTestUtils.createTestStopReplicaRequest private val stopReplicaResponse = SerializationTestUtils.createTestStopReplicaResponse private val producerRequest = SerializationTestUtils.createTestProducerRequest @@ -160,19 +160,19 @@ class RequestResponseSerializationTest extends JUnitSuite { @Test def testSerializationAndDeserialization() { - var buffer: ByteBuffer = ByteBuffer.allocate(leaderAndISRRequest.sizeInBytes()) - leaderAndISRRequest.writeTo(buffer) + var buffer: ByteBuffer = ByteBuffer.allocate(leaderAndIsrRequest.sizeInBytes()) + leaderAndIsrRequest.writeTo(buffer) buffer.rewind() - val deserializedLeaderAndISRRequest = LeaderAndIsrRequest.readFrom(buffer) - assertEquals("The original and deserialzed leaderAndISRRequest should be the same", leaderAndISRRequest, - deserializedLeaderAndISRRequest) + val deserializedLeaderAndIsrRequest = LeaderAndIsrRequest.readFrom(buffer) + assertEquals("The original and deserialzed leaderAndISRRequest should be the same", leaderAndIsrRequest, + deserializedLeaderAndIsrRequest) - buffer = ByteBuffer.allocate(leaderAndISRResponse.sizeInBytes()) - leaderAndISRResponse.writeTo(buffer) + buffer = ByteBuffer.allocate(leaderAndIsrResponse.sizeInBytes()) + leaderAndIsrResponse.writeTo(buffer) buffer.rewind() - val deserializedLeaderAndISRResponse = LeaderAndISRResponse.readFrom(buffer) - assertEquals("The original and deserialzed leaderAndISRResponse should be the same", leaderAndISRResponse, - deserializedLeaderAndISRResponse) + val deserializedLeaderAndIsrResponse = LeaderAndIsrResponse.readFrom(buffer) + assertEquals("The original and deserialzed leaderAndISRResponse should be the same", leaderAndIsrResponse, + deserializedLeaderAndIsrResponse) buffer = ByteBuffer.allocate(stopReplicaRequest.sizeInBytes()) stopReplicaRequest.writeTo(buffer) diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index f08b1561083..18f3f80435f 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -286,7 +286,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar // send some messages to each broker val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, NoCompressionCodec) val sentMessages2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, NoCompressionCodec) - val sentMessages = (sentMessages1 ++ sentMessages2).map(m => Utils.toString(m.payload, "UTF-8")). + val sentMessages = (sentMessages1 ++ sentMessages2).map(m => Utils.readString(m.payload, "UTF-8")). sortWith((s, t) => s.compare(t) == -1) val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) @@ -401,7 +401,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar assertTrue(iterator.hasNext) val message = iterator.next.message messages ::= message - debug("received message: " + Utils.toString(message.payload, "UTF-8")) + debug("received message: " + Utils.readString(message.payload, "UTF-8")) } } } diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index d489872d7b2..e5cc7923a1d 100644 --- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -105,7 +105,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar assertTrue(iterator.hasNext) val message = iterator.next.message messages ::= message - debug("received message: " + Utils.toString(message.payload, "UTF-8")) + debug("received message: " + Utils.readString(message.payload, "UTF-8")) } } } diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index cf304b50e94..536101f5c99 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -159,7 +159,7 @@ class LogManagerTest extends JUnit3Suite { override val logFileSize = 1024 *1024 *1024 override val flushSchedulerThreadRate = 50 override val flushInterval = Int.MaxValue - override val flushIntervalMap = Utils.getTopicFlushIntervals("timebasedflush:100") + override val flushIntervalMap = Map("timebasedflush" -> 100) } logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, maxLogAge, false) logManager.startup diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala index fc75dd6022e..f905612485b 100644 --- a/core/src/test/scala/unit/kafka/message/MessageTest.scala +++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala @@ -66,7 +66,7 @@ class MessageTest extends JUnitSuite { assertTrue("Auto-computed checksum should be valid", v.message.isValid) // garble checksum val badChecksum: Int = (v.message.checksum + 1 % Int.MaxValue).toInt - Utils.putUnsignedInt(v.message.buffer, Message.CrcOffset, badChecksum) + Utils.writeUnsignedInt(v.message.buffer, Message.CrcOffset, badChecksum) assertFalse("Message with invalid checksum should be invalid", v.message.isValid) } } diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index e8bf168c77c..3dfb406b8f4 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -25,21 +25,21 @@ import kafka.log.Log import org.junit.Assert._ import kafka.utils._ -class ISRExpirationTest extends JUnit3Suite { +class IsrExpirationTest extends JUnit3Suite { - var topicPartitionISR: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]() + var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]() val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { override val replicaMaxLagTimeMs = 100L override val replicaMaxLagBytes = 10L }) val topic = "foo" - def testISRExpirationForStuckFollowers() { + def testIsrExpirationForStuckFollowers() { val time = new MockTime val log = getLogWithLogEndOffset(15L, 2) // set logEndOffset for leader to 15L // create one partition and all replicas - val partition0 = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head, log) + val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log) assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId)) val leaderReplica = partition0.getReplica(configs.head.brokerId).get @@ -58,12 +58,12 @@ class ISRExpirationTest extends JUnit3Suite { EasyMock.verify(log) } - def testISRExpirationForSlowFollowers() { + def testIsrExpirationForSlowFollowers() { val time = new MockTime // create leader replica val log = getLogWithLogEndOffset(15L, 1) // add one partition - val partition0 = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head, log) + val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log) assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId)) val leaderReplica = partition0.getReplica(configs.head.brokerId).get // set remote replicas leo to something low, like 4 @@ -77,7 +77,7 @@ class ISRExpirationTest extends JUnit3Suite { EasyMock.verify(log) } - private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, config: KafkaConfig, + private def getPartitionWithAllReplicasInIsr(topic: String, partitionId: Int, time: Time, config: KafkaConfig, localLog: Log): Partition = { val leaderId=config.brokerId val replicaManager = new ReplicaManager(config, time, null, null, null) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index ab3ac8416a2..a5c663ce7e0 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -379,18 +379,18 @@ object TestUtils extends Logging { val partition = leaderForPartition._1 val leader = leaderForPartition._2 try{ - val currentLeaderAndISROpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) - var newLeaderAndISR: LeaderAndIsr = null - if(currentLeaderAndISROpt == None) - newLeaderAndISR = new LeaderAndIsr(leader, List(leader)) + val currentLeaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) + var newLeaderAndIsr: LeaderAndIsr = null + if(currentLeaderAndIsrOpt == None) + newLeaderAndIsr = new LeaderAndIsr(leader, List(leader)) else{ - newLeaderAndISR = currentLeaderAndISROpt.get - newLeaderAndISR.leader = leader - newLeaderAndISR.leaderEpoch += 1 - newLeaderAndISR.zkVersion += 1 + newLeaderAndIsr = currentLeaderAndIsrOpt.get + newLeaderAndIsr.leader = leader + newLeaderAndIsr.leaderEpoch += 1 + newLeaderAndIsr.zkVersion += 1 } ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), - newLeaderAndISR.toString) + newLeaderAndIsr.toString) } catch { case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe) } diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala index 96ad30704e8..fb9106bcb9a 100644 --- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicLong import java.nio.channels.ClosedByInterruptException import org.apache.log4j.Logger import kafka.message.Message -import kafka.utils.Utils +import kafka.utils.ZkUtils import java.util.{Random, Properties} import kafka.consumer._ import java.text.SimpleDateFormat @@ -48,7 +48,7 @@ object ConsumerPerformance { } // clean up zookeeper state for this group id for every perf run - Utils.tryCleanupZookeeper(config.consumerConfig.zkConnect, config.consumerConfig.groupId) + ZkUtils.maybeDeletePath(config.consumerConfig.zkConnect, "/consumers/" + config.consumerConfig.groupId) val consumerConnector: ConsumerConnector = Consumer.create(config.consumerConfig)