KAFKA-569 Split up utils package and do some cleanup. Patch reviewed by Neha.

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1397765 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Edward Jay Kreps 2012-10-13 03:35:02 +00:00
parent 3015d0a447
commit d1a22b2e3b
57 changed files with 570 additions and 549 deletions

View File

@ -35,7 +35,7 @@ object Kafka extends Logging {
val verifiableProps = serverConfig.props
val metricsConfig = new KafkaMetricsConfig(verifiableProps)
metricsConfig.reporters.foreach(reporterType => {
val reporter = Utils.getObject[KafkaMetricsReporter](reporterType)
val reporter = Utils.createObject[KafkaMetricsReporter](reporterType)
reporter.init(verifiableProps)
if (reporter.isInstanceOf[KafkaMetricsReporterMBean])
Utils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName)

View File

@ -18,7 +18,8 @@
package kafka.admin
import java.util.Random
import kafka.api.{TopicMetadata, PartitionMetadata}
import kafka.api.{TopicMetadata, PartitionMetadata, TopicMetadataRequest, TopicMetadataResponse}
import kafka.common._
import kafka.cluster.Broker
import kafka.utils.{Logging, Utils, ZkUtils}
import org.I0Itec.zkclient.ZkClient

View File

@ -34,12 +34,12 @@ object CheckReassignmentStatus extends Logging {
val jsonFile = options.valueOf(jsonFileOpt)
val zkConnect = options.valueOf(zkConnectOpt)
val jsonString = Utils.readFileIntoString(jsonFile)
val jsonString = Utils.readFileAsString(jsonFile)
val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
try {
// read the json file into a string
val partitionsToBeReassigned = SyncJSON.parseFull(jsonString) match {
val partitionsToBeReassigned = Json.parseFull(jsonString) match {
case Some(reassignedPartitions) =>
val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]]
partitions.map { m =>

View File

@ -42,11 +42,11 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
val options = parser.parse(args : _*)
Utils.checkRequiredArgs(parser, options, jsonFileOpt, zkConnectOpt)
CommandLineUtils.checkRequiredArgs(parser, options, jsonFileOpt, zkConnectOpt)
val jsonFile = options.valueOf(jsonFileOpt)
val zkConnect = options.valueOf(zkConnectOpt)
val jsonString = Utils.readFileIntoString(jsonFile)
val jsonString = Utils.readFileAsString(jsonFile)
var zkClient: ZkClient = null
try {
@ -77,7 +77,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
}
def parsePreferredReplicaJsonData(jsonString: String): Set[TopicAndPartition] = {
SyncJSON.parseFull(jsonString) match {
Json.parseFull(jsonString) match {
case Some(partitionList) =>
val partitions = (partitionList.asInstanceOf[List[Any]])
Set.empty[TopicAndPartition] ++ partitions.map { m =>
@ -93,7 +93,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
partitionsUndergoingPreferredReplicaElection: scala.collection.Set[TopicAndPartition]) {
val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
val jsonData = Utils.arrayToJson(partitionsUndergoingPreferredReplicaElection.map { p =>
Utils.stringMapToJsonString(Map(("topic" -> p.topic), ("partition" -> p.partition.toString)))
Utils.stringMapToJson(Map(("topic" -> p.topic), ("partition" -> p.partition.toString)))
}.toArray)
try {
ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)

View File

@ -50,12 +50,12 @@ object ReassignPartitionsCommand extends Logging {
val jsonFile = options.valueOf(jsonFileOpt)
val zkConnect = options.valueOf(zkConnectOpt)
val jsonString = Utils.readFileIntoString(jsonFile)
val jsonString = Utils.readFileAsString(jsonFile)
var zkClient: ZkClient = null
try {
// read the json file into a string
val partitionsToBeReassigned = SyncJSON.parseFull(jsonString) match {
val partitionsToBeReassigned = Json.parseFull(jsonString) match {
case Some(reassignedPartitions) =>
val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]]
partitions.map { m =>

View File

@ -0,0 +1,92 @@
package kafka.api
import java.nio._
import kafka.common._
/**
* Helper functions specific to parsing or serializing requests and responses
*/
object ApiUtils {
val ProtocolEncoding = "UTF-8"
/**
* Read size prefixed string where the size is stored as a 2 byte short.
* @param buffer The buffer to read from
*/
def readShortString(buffer: ByteBuffer): String = {
val size: Int = buffer.getShort()
if(size < 0)
return null
val bytes = new Array[Byte](size)
buffer.get(bytes)
new String(bytes, ProtocolEncoding)
}
/**
* Write a size prefixed string where the size is stored as a 2 byte short
* @param buffer The buffer to write to
* @param string The string to write
*/
def writeShortString(buffer: ByteBuffer, string: String) {
if(string == null) {
buffer.putShort(-1)
} else if(string.length > Short.MaxValue) {
throw new KafkaException("String exceeds the maximum size of " + Short.MaxValue + ".")
} else {
buffer.putShort(string.length.asInstanceOf[Short])
buffer.put(string.getBytes(ProtocolEncoding))
}
}
/**
* Return size of a size prefixed string where the size is stored as a 2 byte short
* @param string The string to write
*/
def shortStringLength(string: String): Int = {
if(string == null) {
2
} else {
val encodedString = string.getBytes(ProtocolEncoding)
if(encodedString.length > Short.MaxValue) {
throw new KafkaException("String exceeds the maximum size of " + Short.MaxValue + ".")
} else {
2 + encodedString.length
}
}
}
/**
* Read an integer out of the bytebuffer from the current position and check that it falls within the given
* range. If not, throw KafkaException.
*/
def readIntInRange(buffer: ByteBuffer, name: String, range: (Int, Int)): Int = {
val value = buffer.getInt
if(value < range._1 || value > range._2)
throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".")
else value
}
/**
* Read a short out of the bytebuffer from the current position and check that it falls within the given
* range. If not, throw KafkaException.
*/
def readShortInRange(buffer: ByteBuffer, name: String, range: (Short, Short)): Short = {
val value = buffer.getShort
if(value < range._1 || value > range._2)
throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".")
else value
}
/**
* Read a long out of the bytebuffer from the current position and check that it falls within the given
* range. If not, throw KafkaException.
*/
def readLongInRange(buffer: ByteBuffer, name: String, range: (Long, Long)): Long = {
val value = buffer.getLong
if(value < range._1 || value > range._2)
throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".")
else value
}
}

View File

@ -18,7 +18,8 @@
package kafka.api
import java.nio.ByteBuffer
import kafka.utils.{nonthreadsafe, Utils}
import kafka.utils.nonthreadsafe
import kafka.api.ApiUtils._
import scala.collection.immutable.Map
import kafka.common.TopicAndPartition
import kafka.consumer.ConsumerConfig
@ -35,13 +36,13 @@ object FetchRequest {
def readFrom(buffer: ByteBuffer): FetchRequest = {
val versionId = buffer.getShort
val correlationId = buffer.getInt
val clientId = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
val clientId = readShortString(buffer)
val replicaId = buffer.getInt
val maxWait = buffer.getInt
val minBytes = buffer.getInt
val topicCount = buffer.getInt
val pairs = (1 to topicCount).flatMap(_ => {
val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
val topic = readShortString(buffer)
val partitionCount = buffer.getInt
(1 to partitionCount).map(_ => {
val partitionId = buffer.getInt
@ -71,14 +72,14 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
buffer.putInt(correlationId)
Utils.writeShortString(buffer, clientId, RequestOrResponse.DefaultCharset)
writeShortString(buffer, clientId)
buffer.putInt(replicaId)
buffer.putInt(maxWait)
buffer.putInt(minBytes)
buffer.putInt(requestInfoGroupedByTopic.size) // topic count
requestInfoGroupedByTopic.foreach {
case (topic, partitionFetchInfos) =>
Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset)
writeShortString(buffer, topic)
buffer.putInt(partitionFetchInfos.size) // partition count
partitionFetchInfos.foreach {
case (TopicAndPartition(_, partition), PartitionFetchInfo(offset, fetchSize)) =>
@ -92,7 +93,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
def sizeInBytes: Int = {
2 + /* versionId */
4 + /* correlationId */
Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) +
shortStringLength(clientId) +
4 + /* replicaId */
4 + /* maxWait */
4 + /* minBytes */
@ -100,7 +101,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
requestInfoGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
val (topic, partitionFetchInfos) = currTopic
foldedTopics +
Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) +
shortStringLength(topic) +
4 + /* partition count */
partitionFetchInfos.size * (
4 + /* partition id */

View File

@ -22,7 +22,7 @@ import java.nio.channels.GatheringByteChannel
import kafka.common.{TopicAndPartition, ErrorMapping}
import kafka.message.{MessageSet, ByteBufferMessageSet}
import kafka.network.{MultiSend, Send}
import kafka.utils.Utils
import kafka.api.ApiUtils._
object FetchResponsePartitionData {
def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = {
@ -85,7 +85,7 @@ class PartitionDataSend(val partitionId: Int,
object TopicData {
def readFrom(buffer: ByteBuffer): TopicData = {
val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
val topic = readShortString(buffer)
val partitionCount = buffer.getInt
val topicPartitionDataPairs = (1 to partitionCount).map(_ => {
val partitionId = buffer.getInt
@ -96,7 +96,7 @@ object TopicData {
}
def headerSize(topic: String) =
Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) +
shortStringLength(topic) +
4 /* partition count */
}
@ -115,7 +115,7 @@ class TopicDataSend(val topicData: TopicData) extends Send {
override def complete = sent >= size
private val buffer = ByteBuffer.allocate(topicData.headerSize)
Utils.writeShortString(buffer, topicData.topic)
writeShortString(buffer, topicData.topic)
buffer.putInt(topicData.partitionData.size)
buffer.rewind()

View File

@ -20,6 +20,7 @@ package kafka.api
import java.nio._
import kafka.utils._
import kafka.api.ApiUtils._
import collection.mutable.Map
import collection.mutable.HashMap
@ -30,14 +31,14 @@ object LeaderAndIsr {
}
case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int], var zkVersion: Int) {
def this(leader: Int, ISR: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, ISR, LeaderAndIsr.initialZKVersion)
def this(leader: Int, isr: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, isr, LeaderAndIsr.initialZKVersion)
override def toString(): String = {
val jsonDataMap = new HashMap[String, String]
jsonDataMap.put("leader", leader.toString)
jsonDataMap.put("leaderEpoch", leaderEpoch.toString)
jsonDataMap.put("ISR", isr.mkString(","))
Utils.stringMapToJsonString(jsonDataMap)
Utils.stringMapToJson(jsonDataMap)
}
}
@ -46,11 +47,11 @@ object PartitionStateInfo {
def readFrom(buffer: ByteBuffer): PartitionStateInfo = {
val leader = buffer.getInt
val leaderGenId = buffer.getInt
val ISRString = Utils.readShortString(buffer, "UTF-8")
val ISR = ISRString.split(",").map(_.toInt).toList
val isrString = readShortString(buffer)
val isr = isrString.split(",").map(_.toInt).toList
val zkVersion = buffer.getInt
val replicationFactor = buffer.getInt
PartitionStateInfo(LeaderAndIsr(leader, leaderGenId, ISR, zkVersion), replicationFactor)
PartitionStateInfo(LeaderAndIsr(leader, leaderGenId, isr, zkVersion), replicationFactor)
}
}
@ -58,7 +59,7 @@ case class PartitionStateInfo(val leaderAndIsr: LeaderAndIsr, val replicationFac
def writeTo(buffer: ByteBuffer) {
buffer.putInt(leaderAndIsr.leader)
buffer.putInt(leaderAndIsr.leaderEpoch)
Utils.writeShortString(buffer, leaderAndIsr.isr.mkString(","), "UTF-8")
writeShortString(buffer, leaderAndIsr.isr.mkString(","))
buffer.putInt(leaderAndIsr.zkVersion)
buffer.putInt(replicationFactor)
}
@ -79,13 +80,13 @@ object LeaderAndIsrRequest {
def readFrom(buffer: ByteBuffer): LeaderAndIsrRequest = {
val versionId = buffer.getShort
val clientId = Utils.readShortString(buffer)
val clientId = readShortString(buffer)
val ackTimeoutMs = buffer.getInt
val partitionStateInfosCount = buffer.getInt
val partitionStateInfos = new HashMap[(String, Int), PartitionStateInfo]
for(i <- 0 until partitionStateInfosCount){
val topic = Utils.readShortString(buffer, "UTF-8")
val topic = readShortString(buffer)
val partition = buffer.getInt
val partitionStateInfo = PartitionStateInfo.readFrom(buffer)
@ -108,11 +109,11 @@ case class LeaderAndIsrRequest (versionId: Short,
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
Utils.writeShortString(buffer, clientId)
writeShortString(buffer, clientId)
buffer.putInt(ackTimeoutMs)
buffer.putInt(partitionStateInfos.size)
for((key, value) <- partitionStateInfos){
Utils.writeShortString(buffer, key._1, "UTF-8")
writeShortString(buffer, key._1)
buffer.putInt(key._2)
value.writeTo(buffer)
}

View File

@ -20,28 +20,29 @@ package kafka.api
import kafka.common.ErrorMapping
import java.nio.ByteBuffer
import kafka.utils.Utils
import kafka.api.ApiUtils._
import collection.mutable.HashMap
import collection.Map
object LeaderAndISRResponse {
def readFrom(buffer: ByteBuffer): LeaderAndISRResponse = {
object LeaderAndIsrResponse {
def readFrom(buffer: ByteBuffer): LeaderAndIsrResponse = {
val versionId = buffer.getShort
val errorCode = buffer.getShort
val numEntries = buffer.getInt
val responseMap = new HashMap[(String, Int), Short]()
for (i<- 0 until numEntries){
val topic = Utils.readShortString(buffer, "UTF-8")
val topic = readShortString(buffer)
val partition = buffer.getInt
val partitionErrorCode = buffer.getShort
responseMap.put((topic, partition), partitionErrorCode)
}
new LeaderAndISRResponse(versionId, responseMap, errorCode)
new LeaderAndIsrResponse(versionId, responseMap, errorCode)
}
}
case class LeaderAndISRResponse(versionId: Short,
case class LeaderAndIsrResponse(versionId: Short,
responseMap: Map[(String, Int), Short],
errorCode: Short = ErrorMapping.NoError)
extends RequestOrResponse {
@ -58,7 +59,7 @@ case class LeaderAndISRResponse(versionId: Short,
buffer.putShort(errorCode)
buffer.putInt(responseMap.size)
for ((key:(String, Int), value) <- responseMap){
Utils.writeShortString(buffer, key._1, "UTF-8")
writeShortString(buffer, key._1)
buffer.putInt(key._2)
buffer.putShort(value)
}

View File

@ -20,6 +20,7 @@ package kafka.api
import java.nio.ByteBuffer
import kafka.utils.Utils
import kafka.common.TopicAndPartition
import kafka.api.ApiUtils._
object OffsetRequest {
@ -33,11 +34,11 @@ object OffsetRequest {
def readFrom(buffer: ByteBuffer): OffsetRequest = {
val versionId = buffer.getShort
val clientId = Utils.readShortString(buffer)
val clientId = readShortString(buffer)
val replicaId = buffer.getInt
val topicCount = buffer.getInt
val pairs = (1 to topicCount).flatMap(_ => {
val topic = Utils.readShortString(buffer)
val topic = readShortString(buffer)
val partitionCount = buffer.getInt
(1 to partitionCount).map(_ => {
val partitionId = buffer.getInt
@ -64,13 +65,13 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
Utils.writeShortString(buffer, clientId)
writeShortString(buffer, clientId)
buffer.putInt(replicaId)
buffer.putInt(requestInfoGroupedByTopic.size) // topic count
requestInfoGroupedByTopic.foreach {
case((topic, partitionInfos)) =>
Utils.writeShortString(buffer, topic)
writeShortString(buffer, topic)
buffer.putInt(partitionInfos.size) // partition count
partitionInfos.foreach {
case (TopicAndPartition(_, partition), partitionInfo) =>
@ -83,13 +84,13 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
def sizeInBytes =
2 + /* versionId */
Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) +
shortStringLength(clientId) +
4 + /* replicaId */
4 + /* topic count */
requestInfoGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
val (topic, partitionInfos) = currTopic
foldedTopics +
Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) +
shortStringLength(topic) +
4 + /* partition count */
partitionInfos.size * (
4 + /* partition */

View File

@ -20,6 +20,7 @@ package kafka.api
import java.nio.ByteBuffer
import kafka.common.{ErrorMapping, TopicAndPartition}
import kafka.utils.Utils
import kafka.api.ApiUtils._
object OffsetResponse {
@ -28,7 +29,7 @@ object OffsetResponse {
val versionId = buffer.getShort
val numTopics = buffer.getInt
val pairs = (1 to numTopics).flatMap(_ => {
val topic = Utils.readShortString(buffer)
val topic = readShortString(buffer)
val numPartitions = buffer.getInt
(1 to numPartitions).map(_ => {
val partition = buffer.getInt
@ -61,7 +62,7 @@ case class OffsetResponse(versionId: Short,
offsetsGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
val (topic, errorAndOffsetsMap) = currTopic
foldedTopics +
Utils.shortStringLength(topic) +
shortStringLength(topic) +
4 + /* partition count */
errorAndOffsetsMap.foldLeft(0)((foldedPartitions, currPartition) => {
foldedPartitions +
@ -78,7 +79,7 @@ case class OffsetResponse(versionId: Short,
buffer.putInt(offsetsGroupedByTopic.size) // topic count
offsetsGroupedByTopic.foreach {
case((topic, errorAndOffsetsMap)) =>
Utils.writeShortString(buffer, topic)
writeShortString(buffer, topic)
buffer.putInt(errorAndOffsetsMap.size) // partition count
errorAndOffsetsMap.foreach {
case((TopicAndPartition(_, partition), errorAndOffsets)) =>

View File

@ -22,6 +22,7 @@ import kafka.message._
import kafka.utils._
import scala.collection.Map
import kafka.common.TopicAndPartition
import kafka.api.ApiUtils._
object ProducerRequest {
@ -30,14 +31,14 @@ object ProducerRequest {
def readFrom(buffer: ByteBuffer): ProducerRequest = {
val versionId: Short = buffer.getShort
val correlationId: Int = buffer.getInt
val clientId: String = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
val clientId: String = readShortString(buffer)
val requiredAcks: Short = buffer.getShort
val ackTimeoutMs: Int = buffer.getInt
//build the topic structure
val topicCount = buffer.getInt
val partitionDataPairs = (1 to topicCount).flatMap(_ => {
// process topic
val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
val topic = readShortString(buffer)
val partitionCount = buffer.getInt
(1 to partitionCount).map(_ => {
val partition = buffer.getInt
@ -75,7 +76,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
buffer.putInt(correlationId)
Utils.writeShortString(buffer, clientId, RequestOrResponse.DefaultCharset)
writeShortString(buffer, clientId)
buffer.putShort(requiredAcks)
buffer.putInt(ackTimeoutMs)
@ -83,7 +84,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
buffer.putInt(dataGroupedByTopic.size) //the number of topics
dataGroupedByTopic.foreach {
case (topic, topicAndPartitionData) =>
Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset) //write the topic
writeShortString(buffer, topic) //write the topic
buffer.putInt(topicAndPartitionData.size) //the number of partitions
topicAndPartitionData.foreach(partitionAndData => {
val partition = partitionAndData._1.partition
@ -100,13 +101,13 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
def sizeInBytes: Int = {
2 + /* versionId */
4 + /* correlationId */
Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) + /* client id */
shortStringLength(clientId) + /* client id */
2 + /* requiredAcks */
4 + /* ackTimeoutMs */
4 + /* number of topics */
dataGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
foldedTopics +
Utils.shortStringLength(currTopic._1, RequestOrResponse.DefaultCharset) +
shortStringLength(currTopic._1) +
4 + /* the number of partitions */
{
currTopic._2.foldLeft(0)((foldedPartitions, currPartition) => {

View File

@ -21,6 +21,7 @@ import java.nio.ByteBuffer
import kafka.utils.Utils
import scala.collection.Map
import kafka.common.{TopicAndPartition, ErrorMapping}
import kafka.api.ApiUtils._
object ProducerResponse {
@ -29,7 +30,7 @@ object ProducerResponse {
val correlationId = buffer.getInt
val topicCount = buffer.getInt
val statusPairs = (1 to topicCount).flatMap(_ => {
val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
val topic = readShortString(buffer)
val partitionCount = buffer.getInt
(1 to partitionCount).map(_ => {
val partition = buffer.getInt
@ -64,7 +65,7 @@ case class ProducerResponse(versionId: Short,
4 + /* topic count */
groupedStatus.foldLeft (0) ((foldedTopics, currTopic) => {
foldedTopics +
Utils.shortStringLength(currTopic._1, RequestOrResponse.DefaultCharset) +
shortStringLength(currTopic._1) +
4 + /* partition count for this topic */
currTopic._2.size * {
4 + /* partition id */
@ -83,7 +84,7 @@ case class ProducerResponse(versionId: Short,
groupedStatus.foreach(topicStatus => {
val (topic, errorsAndOffsets) = topicStatus
Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset)
writeShortString(buffer, topic)
buffer.putInt(errorsAndOffsets.size) // partition count
errorsAndOffsets.foreach {
case((TopicAndPartition(_, partition), ProducerResponseStatus(error, nextOffset))) =>

View File

@ -29,12 +29,12 @@ object RequestKeys {
val StopReplicaKey: Short = 5
val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
Map( ProduceKey -> ("Produce", ProducerRequest.readFrom),
FetchKey -> ("Fetch", FetchRequest.readFrom),
OffsetsKey -> ("Offsets", OffsetRequest.readFrom),
MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom),
LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom),
StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom) )
Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
FetchKey -> ("Fetch", FetchRequest.readFrom),
OffsetsKey -> ("Offsets", OffsetRequest.readFrom),
MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom),
LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom),
StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom))
def nameForKey(key: Short): String = {
keyToNameAndDeserializerMap.get(key) match {

View File

@ -19,12 +19,6 @@ package kafka.api
import java.nio._
object RequestOrResponse {
val DefaultCharset = "UTF-8"
}
object Request {
val OrdinaryConsumerId: Int = -1
val DebuggingConsumerId: Int = -2

View File

@ -19,7 +19,7 @@ package kafka.api
import java.nio._
import kafka.utils._
import kafka.api.ApiUtils._
object StopReplicaRequest {
val CurrentVersion = 1.shortValue()
@ -28,13 +28,12 @@ object StopReplicaRequest {
def readFrom(buffer: ByteBuffer): StopReplicaRequest = {
val versionId = buffer.getShort
val clientId = Utils.readShortString(buffer)
val clientId = readShortString(buffer)
val ackTimeoutMs = buffer.getInt
val topicPartitionPairCount = buffer.getInt
val topicPartitionPairSet = new collection.mutable.HashSet[(String, Int)]()
for (i <- 0 until topicPartitionPairCount) {
topicPartitionPairSet.add(Utils.readShortString(buffer, "UTF-8"), buffer.getInt)
}
for (i <- 0 until topicPartitionPairCount)
topicPartitionPairSet.add(readShortString(buffer), buffer.getInt)
new StopReplicaRequest(versionId, clientId, ackTimeoutMs, topicPartitionPairSet.toSet)
}
}
@ -51,11 +50,11 @@ case class StopReplicaRequest(versionId: Short,
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
Utils.writeShortString(buffer, clientId)
writeShortString(buffer, clientId)
buffer.putInt(ackTimeoutMs)
buffer.putInt(partitions.size)
for ((topic, partitionId) <- partitions){
Utils.writeShortString(buffer, topic, "UTF-8")
writeShortString(buffer, topic)
buffer.putInt(partitionId)
}
}

View File

@ -22,6 +22,7 @@ import kafka.utils.Utils
import collection.mutable.HashMap
import collection.mutable.Map
import kafka.common.ErrorMapping
import kafka.api.ApiUtils._
object StopReplicaResponse {
@ -32,7 +33,7 @@ object StopReplicaResponse {
val responseMap = new HashMap[(String, Int), Short]()
for (i<- 0 until numEntries){
val topic = Utils.readShortString(buffer, "UTF-8")
val topic = readShortString(buffer)
val partition = buffer.getInt
val partitionErrorCode = buffer.getShort()
responseMap.put((topic, partition), partitionErrorCode)
@ -58,7 +59,7 @@ case class StopReplicaResponse(val versionId: Short,
buffer.putShort(errorCode)
buffer.putInt(responseMap.size)
for ((key:(String, Int), value) <- responseMap){
Utils.writeShortString(buffer, key._1, "UTF-8")
writeShortString(buffer, key._1)
buffer.putInt(key._2)
buffer.putShort(value)
}

View File

@ -19,7 +19,8 @@ package kafka.api
import kafka.cluster.Broker
import java.nio.ByteBuffer
import kafka.utils.Utils._
import kafka.api.ApiUtils._
import kafka.utils.Logging
import collection.mutable.ListBuffer
import kafka.common.{KafkaException, ErrorMapping}
@ -54,9 +55,9 @@ case object LeaderDoesNotExist extends LeaderRequest { val requestId: Byte = 0 }
object TopicMetadata {
def readFrom(buffer: ByteBuffer): TopicMetadata = {
val errorCode = getShortInRange(buffer, "error code", (-1, Short.MaxValue))
val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue))
val topic = readShortString(buffer)
val numPartitions = getIntInRange(buffer, "number of partitions", (0, Int.MaxValue))
val numPartitions = readIntInRange(buffer, "number of partitions", (0, Int.MaxValue))
val partitionsMetadata = new ListBuffer[PartitionMetadata]()
for(i <- 0 until numPartitions)
partitionsMetadata += PartitionMetadata.readFrom(buffer)
@ -64,7 +65,7 @@ object TopicMetadata {
}
}
case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata], errorCode: Short = ErrorMapping.NoError) {
case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata], errorCode: Short = ErrorMapping.NoError) extends Logging {
def sizeInBytes: Int = {
var size: Int = 2 /* error code */
size += shortStringLength(topic)
@ -87,8 +88,8 @@ case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadat
object PartitionMetadata {
def readFrom(buffer: ByteBuffer): PartitionMetadata = {
val errorCode = getShortInRange(buffer, "error code", (-1, Short.MaxValue))
val partitionId = getIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition id */
val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue))
val partitionId = readIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition id */
val doesLeaderExist = getLeaderRequest(buffer.get)
val leader = doesLeaderExist match {
case LeaderExists => /* leader exists */
@ -97,14 +98,14 @@ object PartitionMetadata {
}
/* list of all replicas */
val numReplicas = getShortInRange(buffer, "number of all replicas", (0, Short.MaxValue))
val numReplicas = readShortInRange(buffer, "number of all replicas", (0, Short.MaxValue))
val replicas = new Array[Broker](numReplicas)
for(i <- 0 until numReplicas) {
replicas(i) = Broker.readFrom(buffer)
}
/* list of in-sync replicas */
val numIsr = getShortInRange(buffer, "number of in-sync replicas", (0, Short.MaxValue))
val numIsr = readShortInRange(buffer, "number of in-sync replicas", (0, Short.MaxValue))
val isr = new Array[Broker](numIsr)
for(i <- 0 until numIsr) {
isr(i) = Broker.readFrom(buffer)
@ -122,8 +123,11 @@ object PartitionMetadata {
}
}
case class PartitionMetadata(partitionId: Int, val leader: Option[Broker], replicas: Seq[Broker], isr: Seq[Broker] = Seq.empty,
errorCode: Short = ErrorMapping.NoError) {
case class PartitionMetadata(partitionId: Int,
val leader: Option[Broker],
replicas: Seq[Broker],
isr: Seq[Broker] = Seq.empty,
errorCode: Short = ErrorMapping.NoError) extends Logging {
def sizeInBytes: Int = {
var size: Int = 2 /* error code */ + 4 /* partition id */ + 1 /* if leader exists*/

View File

@ -18,11 +18,11 @@
package kafka.api
import java.nio.ByteBuffer
import kafka.utils.Utils._
import kafka.api.ApiUtils._
import collection.mutable.ListBuffer
import kafka.utils._
import kafka.utils.Logging
object TopicMetadataRequest {
object TopicMetadataRequest extends Logging {
val CurrentVersion = 1.shortValue()
val DefaultClientId = ""
@ -33,11 +33,11 @@ object TopicMetadataRequest {
def readFrom(buffer: ByteBuffer): TopicMetadataRequest = {
val versionId = buffer.getShort
val clientId = Utils.readShortString(buffer)
val numTopics = getIntInRange(buffer, "number of topics", (0, Int.MaxValue))
val clientId = readShortString(buffer)
val numTopics = readIntInRange(buffer, "number of topics", (0, Int.MaxValue))
val topics = new ListBuffer[String]()
for(i <- 0 until numTopics)
topics += readShortString(buffer, "UTF-8")
topics += readShortString(buffer)
val topicsList = topics.toList
debug("topic = %s".format(topicsList.head))
new TopicMetadataRequest(versionId, clientId, topics.toList)
@ -54,7 +54,7 @@ def this(topics: Seq[String]) =
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
Utils.writeShortString(buffer, clientId)
writeShortString(buffer, clientId)
buffer.putInt(topics.size)
topics.foreach(topic => writeShortString(buffer, topic))
}

View File

@ -0,0 +1,60 @@
package kafka.client
import scala.collection._
import kafka.cluster._
import kafka.api._
import kafka.producer._
import kafka.common.KafkaException
import kafka.utils.{Utils, Logging}
/**
* Helper functions common to clients (producer, consumer, or admin)
*/
object ClientUtils extends Logging{
def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker]): TopicMetadataResponse = {
var fetchMetaDataSucceeded: Boolean = false
var i: Int = 0
val topicMetadataRequest = new TopicMetadataRequest(topics.toSeq)
var topicMetadataResponse: TopicMetadataResponse = null
var t: Throwable = null
while(i < brokers.size && !fetchMetaDataSucceeded) {
val producer: SyncProducer = ProducerPool.createSyncProducer(None, brokers(i))
info("Fetching metadata for topic %s".format(topics))
try {
topicMetadataResponse = producer.send(topicMetadataRequest)
fetchMetaDataSucceeded = true
}
catch {
case e =>
warn("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, brokers(i).toString), e)
t = e
} finally {
i = i + 1
producer.close()
}
}
if(!fetchMetaDataSucceeded){
throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, brokers), t)
}
return topicMetadataResponse
}
/**
* Parse a list of broker urls in the form host1:port1, host2:port2, ...
*/
def parseBrokerList(brokerListStr: String): Seq[Broker] = {
val brokersStr = Utils.parseCsvList(brokerListStr)
brokersStr.zipWithIndex.map(b =>{
val brokerStr = b._1
val brokerId = b._2
val brokerInfos = brokerStr.split(":")
val hostName = brokerInfos(0)
val port = brokerInfos(1).toInt
val creatorId = hostName + "-" + System.currentTimeMillis()
new Broker(brokerId, creatorId, hostName, port)
})
}
}

View File

@ -18,6 +18,7 @@
package kafka.cluster
import kafka.utils.Utils._
import kafka.api.ApiUtils._
import java.nio.ByteBuffer
import kafka.common.BrokerNotAvailableException

View File

@ -42,7 +42,7 @@ class Partition(val topic: String,
var leaderReplicaIdOpt: Option[Int] = None
var inSyncReplicas: Set[Replica] = Set.empty[Replica]
private val assignedReplicaMap = new Pool[Int,Replica]
private val leaderISRUpdateLock = new Object
private val leaderIsrUpdateLock = new Object
private var zkVersion: Int = LeaderAndIsr.initialZKVersion
private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
this.logIdent = "Partition [%s, %d] on broker %d: ".format(topic, partitionId, localBrokerId)
@ -90,7 +90,7 @@ class Partition(val topic: String,
}
def leaderReplicaIfLocal(): Option[Replica] = {
leaderISRUpdateLock synchronized {
leaderIsrUpdateLock synchronized {
leaderReplicaIdOpt match {
case Some(leaderReplicaId) =>
if (leaderReplicaId == localBrokerId)
@ -114,17 +114,17 @@ class Partition(val topic: String,
/**
* If the leaderEpoch of the incoming request is higher than locally cached epoch, make it the new leader of follower to the new leader.
*/
def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr, isMakingLeader: Boolean): Boolean = {
leaderISRUpdateLock synchronized {
if (leaderEpoch >= leaderAndISR.leaderEpoch){
def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr, isMakingLeader: Boolean): Boolean = {
leaderIsrUpdateLock synchronized {
if (leaderEpoch >= leaderAndIsr.leaderEpoch){
info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become %s request"
.format(leaderEpoch, leaderAndISR.leaderEpoch, if(isMakingLeader) "leader" else "follower"))
.format(leaderEpoch, leaderAndIsr.leaderEpoch, if(isMakingLeader) "leader" else "follower"))
return false
}
if(isMakingLeader)
makeLeader(topic, partitionId, leaderAndISR)
makeLeader(topic, partitionId, leaderAndIsr)
else
makeFollower(topic, partitionId, leaderAndISR)
makeFollower(topic, partitionId, leaderAndIsr)
true
}
}
@ -136,17 +136,17 @@ class Partition(val topic: String,
* 3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time)
* 4. set the new leader and ISR
*/
private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) {
trace("Started to become leader at the request %s".format(leaderAndISR.toString()))
private def makeLeader(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr) {
trace("Started to become leader at the request %s".format(leaderAndIsr.toString()))
// stop replica fetcher thread, if any
replicaFetcherManager.removeFetcher(topic, partitionId)
val newInSyncReplicas = leaderAndISR.isr.map(r => getOrCreateReplica(r)).toSet
val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet
// reset LogEndOffset for remote replicas
assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset)
inSyncReplicas = newInSyncReplicas
leaderEpoch = leaderAndISR.leaderEpoch
zkVersion = leaderAndISR.zkVersion
leaderEpoch = leaderAndIsr.leaderEpoch
zkVersion = leaderAndIsr.zkVersion
leaderReplicaIdOpt = Some(localBrokerId)
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(getReplica().get)
@ -158,9 +158,9 @@ class Partition(val topic: String,
* 3. set the leader and set ISR to empty
* 4. start a fetcher to the new leader
*/
private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) = {
trace("Started to become follower at the request %s".format(leaderAndISR.toString()))
val newLeaderBrokerId: Int = leaderAndISR.leader
private def makeFollower(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr) = {
trace("Started to become follower at the request %s".format(leaderAndIsr.toString()))
val newLeaderBrokerId: Int = leaderAndIsr.leader
info("Starting the follower state transition to follow leader %d for topic %s partition %d"
.format(newLeaderBrokerId, topic, partitionId))
ZkUtils.getBrokerInfo(zkClient, newLeaderBrokerId) match {
@ -171,8 +171,8 @@ class Partition(val topic: String,
val localReplica = getOrCreateReplica()
localReplica.log.get.truncateTo(localReplica.highWatermark)
inSyncReplicas = Set.empty[Replica]
leaderEpoch = leaderAndISR.leaderEpoch
zkVersion = leaderAndISR.zkVersion
leaderEpoch = leaderAndIsr.leaderEpoch
zkVersion = leaderAndIsr.zkVersion
leaderReplicaIdOpt = Some(newLeaderBrokerId)
// start fetcher thread to current leader
replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
@ -182,8 +182,8 @@ class Partition(val topic: String,
}
}
def updateLeaderHWAndMaybeExpandISR(replicaId: Int, offset: Long) {
leaderISRUpdateLock synchronized {
def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) {
leaderIsrUpdateLock synchronized {
debug("Recording follower %d position %d for topic %s partition %d.".format(replicaId, offset, topic, partitionId))
val replica = getOrCreateReplica(replicaId)
replica.logEndOffset = offset
@ -198,7 +198,7 @@ class Partition(val topic: String,
val newInSyncReplicas = inSyncReplicas + replica
info("Expanding ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(", ")))
// update ISR in ZK and cache
updateISR(newInSyncReplicas)
updateIsr(newInSyncReplicas)
replicaManager.isrExpandRate.mark()
}
maybeIncrementLeaderHW(leaderReplica)
@ -208,7 +208,7 @@ class Partition(val topic: String,
}
def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = {
leaderISRUpdateLock synchronized {
leaderIsrUpdateLock synchronized {
leaderReplicaIfLocal() match {
case Some(_) =>
val numAcks = inSyncReplicas.count(r => {
@ -247,8 +247,8 @@ class Partition(val topic: String,
.format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(",")))
}
def maybeShrinkISR(replicaMaxLagTimeMs: Long, replicaMaxLagBytes: Long) {
leaderISRUpdateLock synchronized {
def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagBytes: Long) {
leaderIsrUpdateLock synchronized {
leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagBytes)
@ -257,7 +257,7 @@ class Partition(val topic: String,
assert(newInSyncReplicas.size > 0)
info("Shrinking ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(",")))
// update ISR in zk and in cache
updateISR(newInSyncReplicas)
updateIsr(newInSyncReplicas)
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(leaderReplica)
replicaManager.isrShrinkRate.mark()
@ -289,15 +289,15 @@ class Partition(val topic: String,
stuckReplicas ++ slowReplicas
}
private def updateISR(newISR: Set[Replica]) {
info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newISR.mkString(", ")))
val newLeaderAndISR = new LeaderAndIsr(localBrokerId, leaderEpoch, newISR.map(r => r.brokerId).toList, zkVersion)
private def updateIsr(newIsr: Set[Replica]) {
info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newIsr.mkString(", ")))
val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndISR.toString(), zkVersion)
ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndIsr.toString(), zkVersion)
if (updateSucceeded){
inSyncReplicas = newISR
inSyncReplicas = newIsr
zkVersion = newVersion
trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newISR.mkString(","), zkVersion))
trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion))
} else {
info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion))
}

View File

@ -25,7 +25,7 @@ import java.util.Properties
import java.util.Random
import java.io.PrintStream
import kafka.message._
import kafka.utils.{Utils, Logging}
import kafka.utils.{Utils, Logging, ZkUtils, CommandLineUtils}
import kafka.utils.ZKStringSerializer
import kafka.serializer.StringDecoder
@ -109,8 +109,7 @@ object ConsoleConsumer extends Logging {
"skip it instead of halt.")
val options: OptionSet = tryParse(parser, args)
Utils.checkRequiredArgs(parser, options, zkConnectOpt)
CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
if (topicOrFilterOpt.size != 1) {
error("Exactly one of whitelist/blacklist/topic is required.")
@ -145,14 +144,14 @@ object ConsoleConsumer extends Logging {
val connector = Consumer.create(config)
if(options.has(resetBeginningOpt))
tryCleanupZookeeper(options.valueOf(zkConnectOpt), options.valueOf(groupIdOpt))
ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt))
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
connector.shutdown()
// if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack
if(!options.has(groupIdOpt))
tryCleanupZookeeper(options.valueOf(zkConnectOpt), options.valueOf(groupIdOpt))
if(!options.has(groupIdOpt))
ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt))
}
})

View File

@ -28,6 +28,7 @@ import kafka.utils.ZkUtils._
import kafka.utils.{ShutdownableThread, SystemTime}
import kafka.utils.Utils._
import kafka.common.TopicAndPartition
import kafka.client.ClientUtils
/**
* Usage:
@ -52,7 +53,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
cond.await()
val brokers = getAllBrokersInCluster(zkClient)
val topicsMetadata = getTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers).topicsMetadata
val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers).topicsMetadata
val leaderForPartitionsMap = new HashMap[(String, Int), Broker]
topicsMetadata.foreach(
tmd => {

View File

@ -20,7 +20,7 @@ package kafka.consumer
import scala.collection._
import org.I0Itec.zkclient.ZkClient
import java.util.regex.Pattern
import kafka.utils.{SyncJSON, ZKGroupDirs, ZkUtils, Logging}
import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging}
private[kafka] trait TopicCount {
def getConsumerThreadIdsPerTopic: Map[String, Set[String]]
@ -88,7 +88,7 @@ private[kafka] object TopicCount extends Logging {
else {
var topMap : Map[String,Int] = null
try {
SyncJSON.parseFull(topicCountString) match {
Json.parseFull(topicCountString) match {
case Some(m) => topMap = m.asInstanceOf[Map[String,Int]]
case None => throw new RuntimeException("error constructing TopicCount : " + topicCountString)
}

View File

@ -31,6 +31,7 @@ import java.util.UUID
import kafka.serializer.Decoder
import kafka.utils.ZkUtils._
import kafka.common._
import kafka.client.ClientUtils
import com.yammer.metrics.core.Gauge
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.Utils._
@ -390,7 +391,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
val brokers = getAllBrokersInCluster(zkClient)
val topicsMetadata = getTopicMetadata(myTopicThreadIdsMap.keySet, brokers).topicsMetadata
val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers).topicsMetadata
val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
val leaderIdForPartitionsMap = new mutable.HashMap[(String, Int), Int]
topicsMetadata.foreach(m =>{

View File

@ -119,7 +119,7 @@ class RequestSendThread(val controllerId: Int,
var response: RequestOrResponse = null
request.requestId.get match {
case RequestKeys.LeaderAndIsrKey =>
response = LeaderAndISRResponse.readFrom(receive.buffer)
response = LeaderAndIsrResponse.readFrom(receive.buffer)
case RequestKeys.StopReplicaKey =>
response = StopReplicaResponse.readFrom(receive.buffer)
}

View File

@ -65,9 +65,9 @@ private[kafka] class LogManager(val config: KafkaConfig,
warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
} else {
info("Loading log '" + dir.getName() + "'")
val topic = Utils.getTopicPartition(dir.getName)._1
val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs)
val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize)
val topicPartition = parseTopicPartitionName(dir.getName)
val rollIntervalMs = logRollMsMap.get(topicPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logFileSize)
val log = new Log(dir,
maxLogFileSize,
config.maxMessageSize,
@ -78,10 +78,9 @@ private[kafka] class LogManager(val config: KafkaConfig,
config.logIndexIntervalBytes,
time,
config.brokerId)
val topicPartition = Utils.getTopicPartition(dir.getName)
logs.putIfNotExists(topicPartition._1, new Pool[Int, Log]())
val parts = logs.get(topicPartition._1)
parts.put(topicPartition._2, log)
logs.putIfNotExists(topicPartition.topic, new Pool[Int, Log]())
val parts = logs.get(topicPartition.topic)
parts.put(topicPartition.partition, log)
}
}
}
@ -168,7 +167,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
/* Runs through the log removing segments older than a certain age */
private def cleanupExpiredSegments(log: Log): Int = {
val startMs = time.milliseconds
val topic = Utils.getTopicPartition(log.name)._1
val topic = parseTopicPartitionName(log.name).topic
val logCleanupThresholdMs = logRetentionMsMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
val toBeDeleted = log.markDeletedWhile(startMs - _.messageSet.file.lastModified > logCleanupThresholdMs)
val total = log.deleteSegments(toBeDeleted)
@ -180,7 +179,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
* is at least logRetentionSize bytes in size
*/
private def cleanupSegmentsToMaintainSize(log: Log): Int = {
val topic = Utils.getTopicPartition(log.dir.getName)._1
val topic = parseTopicPartitionName(log.dir.getName).topic
val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionSize)
if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize) return 0
var diff = log.size - maxLogRetentionSize
@ -256,5 +255,10 @@ private[kafka] class LogManager(val config: KafkaConfig,
def topics(): Iterable[String] = logs.keys
private def parseTopicPartitionName(name: String): TopicAndPartition = {
val index = name.lastIndexOf('-')
TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt)
}
}

View File

@ -116,7 +116,7 @@ class Message(val buffer: ByteBuffer) {
buffer.rewind()
// now compute the checksum and fill it in
Utils.putUnsignedInt(buffer, CrcOffset, computeChecksum)
Utils.writeUnsignedInt(buffer, CrcOffset, computeChecksum)
}
def this(bytes: Array[Byte], key: Array[Byte], codec: CompressionCodec) =
@ -140,7 +140,7 @@ class Message(val buffer: ByteBuffer) {
/**
* Retrieve the previously computed CRC for this message
*/
def checksum: Long = Utils.getUnsignedInt(buffer, CrcOffset)
def checksum: Long = Utils.readUnsignedInt(buffer, CrcOffset)
/**
* Returns true if the crc stored with the message matches the crc computed off the message contents

View File

@ -28,7 +28,7 @@ class KafkaMetricsConfig(props: VerifiableProperties) {
* Comma-separated list of reporter types. These classes should be on the
* classpath and will be instantiated at run-time.
*/
val reporters = Utils.getCSVList(props.getString("kafka.metrics.reporters", ""))
val reporters = Utils.parseCsvList(props.getString("kafka.metrics.reporters", ""))
/**
* The metrics polling interval (in seconds).

View File

@ -21,6 +21,8 @@ import kafka.api.{TopicMetadataRequest, TopicMetadata}
import kafka.common.KafkaException
import kafka.utils.{Logging, Utils}
import kafka.common.ErrorMapping
import kafka.cluster.Broker
import kafka.client.ClientUtils
class BrokerPartitionInfo(producerConfig: ProducerConfig,
@ -28,7 +30,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
topicPartitionInfo: HashMap[String, TopicMetadata])
extends Logging {
val brokerList = producerConfig.brokerList
val brokers = Utils.getAllBrokersFromBrokerList(brokerList)
val brokers = ClientUtils.parseBrokerList(brokerList)
/**
* Return a sequence of (brokerId, numPartitions).
@ -71,7 +73,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
*/
def updateInfo(topics: Set[String]) = {
var topicsMetadata: Seq[TopicMetadata] = Nil
val topicMetadataResponse = Utils.getTopicMetadata(topics, brokers)
val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers)
topicsMetadata = topicMetadataResponse.topicsMetadata
// throw partition specific exception
topicsMetadata.foreach(tmd =>{
@ -88,6 +90,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
})
producerPool.updateProducer(topicsMetadata)
}
}
case class PartitionAndLeader(topic: String, partitionId: Int, leaderBrokerIdOpt: Option[Int])

View File

@ -17,6 +17,8 @@
package kafka.producer
import kafka.utils.Utils
private[kafka] class DefaultPartitioner[T] extends Partitioner[T] {
private val random = new java.util.Random
@ -24,6 +26,6 @@ private[kafka] class DefaultPartitioner[T] extends Partitioner[T] {
if(key == null)
random.nextInt(numPartitions)
else
math.abs(key.hashCode) % numPartitions
Utils.abs(key.hashCode) % numPartitions
}
}

View File

@ -18,6 +18,7 @@ package kafka.producer
import async.{AsyncProducerStats, DefaultEventHandler, ProducerSendThread, EventHandler}
import kafka.utils._
import java.util.Random
import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
import kafka.serializer.Encoder
import java.util.concurrent.atomic.AtomicBoolean
@ -33,13 +34,14 @@ extends Logging {
private val queue = new LinkedBlockingQueue[ProducerData[K,V]](config.queueSize)
private val random = new Random
private var sync: Boolean = true
private var producerSendThread: ProducerSendThread[K,V] = null
config.producerType match {
case "sync" =>
case "async" =>
sync = false
val asyncProducerID = Utils.getNextRandomInt
val asyncProducerID = random.nextInt(Int.MaxValue)
producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + asyncProducerID, queue,
eventHandler, config.queueTime, config.batchSize)
producerSendThread.start
@ -49,8 +51,8 @@ extends Logging {
def this(config: ProducerConfig) =
this(config,
new DefaultEventHandler[K,V](config,
Utils.getObject[Partitioner[K]](config.partitionerClass),
Utils.getObject[Encoder[V]](config.serializerClass),
Utils.createObject[Partitioner[K]](config.partitionerClass),
Utils.createObject[Encoder[V]](config.serializerClass),
new ProducerPool(config)))
/**

View File

@ -78,7 +78,7 @@ class ProducerConfig private (val props: VerifiableProperties)
*
* If the compression codec is NoCompressionCodec, compression is disabled for all topics
*/
val compressedTopics = Utils.getCSVList(props.getString("compressed.topics", null))
val compressedTopics = Utils.parseCsvList(props.getString("compressed.topics", null))
/**
* The producer using the zookeeper software load balancer maintains a ZK cache that gets

View File

@ -24,6 +24,7 @@ import kafka.serializer.Encoder
import kafka.utils.{Utils, Logging}
import scala.collection.{Seq, Map}
import scala.collection.mutable.{ListBuffer, HashMap}
import java.util.concurrent.atomic._
import kafka.api.{TopicMetadata, ProducerRequest}
@ -35,6 +36,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
extends EventHandler[K,V] with Logging {
val isSync = ("sync" == config.producerType)
val counter = new AtomicInteger(0)
val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos)
private val lock = new Object()
@ -185,8 +187,11 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
if(numPartitions <= 0)
throw new UnknownTopicOrPartitionException("Invalid number of partitions: " + numPartitions +
"\n Valid values are > 0")
val partition = if(key == null) Utils.getNextRandomInt(numPartitions)
else partitioner.partition(key, numPartitions)
val partition =
if(key == null)
Utils.abs(counter.getAndIncrement()) % numPartitions
else
partitioner.partition(key, numPartitions)
if(partition < 0 || partition >= numPartitions)
throw new UnknownTopicOrPartitionException("Invalid partition id : " + partition +
"\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]")

View File

@ -57,22 +57,22 @@ class KafkaApis(val requestChannel: RequestChannel,
case RequestKeys.FetchKey => handleFetchRequest(request)
case RequestKeys.OffsetsKey => handleOffsetRequest(request)
case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
case RequestKeys.LeaderAndIsrKey => handleLeaderAndISRRequest(request)
case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
case requestId => throw new KafkaException("No mapping found for handler id " + requestId)
}
request.apiLocalCompleteTimeNs = SystemTime.nanoseconds
}
def handleLeaderAndISRRequest(request: RequestChannel.Request) {
val leaderAndISRRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
if(requestLogger.isTraceEnabled)
requestLogger.trace("Handling leader and isr request " + leaderAndISRRequest)
trace("Handling leader and isr request " + leaderAndISRRequest)
requestLogger.trace("Handling leader and ISR request " + leaderAndIsrRequest)
trace("Handling leader and ISR request " + leaderAndIsrRequest)
try {
val responseMap = replicaManager.becomeLeaderOrFollower(leaderAndISRRequest)
val leaderAndISRResponse = new LeaderAndISRResponse(leaderAndISRRequest.versionId, responseMap)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndISRResponse)))
val responseMap = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.versionId, responseMap)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse)))
} catch {
case e: KafkaStorageException =>
fatal("Disk error during leadership change.", e)

View File

@ -22,6 +22,7 @@ import kafka.message.Message
import kafka.consumer.ConsumerConfig
import java.net.InetAddress
import kafka.utils.{Topic, Utils, VerifiableProperties, ZKConfig}
import scala.collection._
/**
* Configuration settings for the kafka server
@ -73,32 +74,32 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* the default number of log partitions per topic */
val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue))
/* the directory in which the log data is kept */
/* the directories in which the log data is kept */
val logDir = props.getString("log.dir")
/* the maximum size of a single log file */
val logFileSize = props.getIntInRange("log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
/* the maximum size of a single log file for some specific topic */
val logFileSizeMap = Utils.getTopicFileSize(props.getString("topic.log.file.size", ""))
val logFileSizeMap = props.getMap("topic.log.file.size", _.toInt > 0).mapValues(_.toInt)
/* the maximum time before a new log segment is rolled out */
val logRollHours = props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue))
/* the number of hours before rolling out a new log segment for some specific topic */
val logRollHoursMap = Utils.getTopicRollHours(props.getString("topic.log.roll.hours", ""))
val logRollHoursMap = props.getMap("topic.log.roll.hours", _.toInt > 0).mapValues(_.toInt)
/* the number of hours to keep a log file before deleting it */
val logRetentionHours = props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue))
/* the number of hours to keep a log file before deleting it for some specific topic*/
val logRetentionHoursMap = Utils.getTopicRetentionHours(props.getString("topic.log.retention.hours", ""))
val logRetentionHoursMap = props.getMap("topic.log.retention.hours", _.toInt > 0).mapValues(_.toInt)
/* the maximum size of the log before deleting it */
val logRetentionSize = props.getLong("log.retention.size", -1)
/* the maximum size of the log for some specific topic before deleting it */
val logRetentionSizeMap = Utils.getTopicRetentionSize(props.getString("topic.log.retention.size", ""))
val logRetentionSizeMap = props.getMap("topic.log.retention.size", _.toLong > 0).mapValues(_.toLong)
/* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
val logCleanupIntervalMinutes = props.getIntInRange("log.cleanup.interval.mins", 10, (1, Int.MaxValue))
@ -113,7 +114,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
val flushInterval = props.getIntInRange("log.flush.interval", 500, (1, Int.MaxValue))
/* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */
val flushIntervalMap = Utils.getTopicFlushIntervals(props.getString("topic.flush.intervals.ms", ""))
val flushIntervalMap = props.getMap("topic.flush.intervals.ms", _.toInt > 0).mapValues(_.toInt)
/* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */
val flushSchedulerThreadRate = props.getInt("log.default.flush.scheduler.interval.ms", 3000)
@ -161,4 +162,5 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* number of fetcher threads used to replicate messages from a source broker.
* Increasing this value can increase the degree of I/O parallelism in the follower broker. */
val numReplicaFetchers = props.getInt("replica.fetchers", 1)
}

View File

@ -33,7 +33,10 @@ object ReplicaManager {
val UnknownLogEndOffset = -1L
}
class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient, kafkaScheduler: KafkaScheduler,
class ReplicaManager(val config: KafkaConfig,
time: Time,
val zkClient: ZkClient,
kafkaScheduler: KafkaScheduler,
val logManager: LogManager) extends Logging with KafkaMetricsGroup {
private val allPartitions = new Pool[(String, Int), Partition]
private var leaderPartitions = new mutable.HashSet[Partition]()
@ -85,7 +88,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient
def startup() {
// start ISR expiration thread
kafkaScheduler.scheduleWithRate(maybeShrinkISR, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs)
kafkaScheduler.scheduleWithRate(maybeShrinkIsr, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs)
}
def stopReplica(topic: String, partitionId: Int): Short = {
@ -221,17 +224,17 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient
}
}
private def maybeShrinkISR(): Unit = {
private def maybeShrinkIsr(): Unit = {
trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
leaderPartitionsLock synchronized {
leaderPartitions.foreach(partition => partition.maybeShrinkISR(config.replicaMaxLagTimeMs, config.replicaMaxLagBytes))
leaderPartitions.foreach(partition => partition.maybeShrinkIsr(config.replicaMaxLagTimeMs, config.replicaMaxLagBytes))
}
}
def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long) = {
val partitionOpt = getPartition(topic, partitionId)
if(partitionOpt.isDefined){
partitionOpt.get.updateLeaderHWAndMaybeExpandISR(replicaId, offset)
partitionOpt.get.updateLeaderHWAndMaybeExpandIsr(replicaId, offset)
} else {
warn("While recording the follower position, the partition [%s, %d] hasn't been created, skip updating leader HW".format(topic, partitionId))
}

View File

@ -69,8 +69,8 @@ object DumpLogSegments {
print(" keysize: " + msg.keySize)
if(printContents) {
if(msg.hasKey)
print(" key: " + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
print(" payload: " + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
print(" key: " + Utils.readString(messageAndOffset.message.payload, "UTF-8"))
print(" payload: " + Utils.readString(messageAndOffset.message.payload, "UTF-8"))
}
println()
}

View File

@ -19,7 +19,7 @@ package kafka.tools
import kafka.message.Message
import joptsimple.OptionParser
import kafka.utils.{Utils, Logging}
import kafka.utils.{Utils, CommandLineUtils, Logging}
import kafka.producer.{ProducerData, ProducerConfig, Producer}
import scala.collection.JavaConversions._
import java.util.concurrent.CountDownLatch
@ -81,8 +81,7 @@ object MirrorMaker extends Logging {
System.exit(0)
}
Utils.checkRequiredArgs(
parser, options, consumerConfigOpt, producerConfigOpt)
CommandLineUtils.checkRequiredArgs(parser, options, consumerConfigOpt, producerConfigOpt)
if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) {
println("Exactly one of whitelist or blacklist is required.")
System.exit(1)

View File

@ -22,14 +22,14 @@ import java.util.concurrent.{Executors, CountDownLatch}
import java.util.Properties
import kafka.producer.{ProducerData, ProducerConfig, Producer}
import kafka.consumer._
import kafka.utils.{ZKStringSerializer, Logging}
import kafka.utils.{ZKStringSerializer, Logging, ZkUtils}
import kafka.api.OffsetRequest
import org.I0Itec.zkclient._
import kafka.message.{CompressionCodec, Message}
object ReplayLogProducer extends Logging {
private val GROUPID: String = "replay-log-producer"
private val GroupId: String = "replay-log-producer"
def main(args: Array[String]) {
val config = new Config(args)
@ -38,12 +38,12 @@ object ReplayLogProducer extends Logging {
val allDone = new CountDownLatch(config.numThreads)
// if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack
tryCleanupZookeeper(config.zkConnect, GROUPID)
ZkUtils.maybeDeletePath(config.zkConnect, "/consumers/" + GroupId)
Thread.sleep(500)
// consumer properties
val consumerProps = new Properties
consumerProps.put("groupid", GROUPID)
consumerProps.put("groupid", GroupId)
consumerProps.put("zk.connect", config.zkConnect)
consumerProps.put("consumer.timeout.ms", "10000")
consumerProps.put("autooffset.reset", OffsetRequest.SmallestTimeString)
@ -137,18 +137,6 @@ object ReplayLogProducer extends Logging {
val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
}
def tryCleanupZookeeper(zkUrl: String, groupId: String) {
try {
val dir = "/consumers/" + groupId
info("Cleaning up temporary zookeeper data under " + dir + ".")
val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
zk.deleteRecursive(dir)
zk.close()
} catch {
case _ => // swallow
}
}
class ZKConsumerThread(config: Config, stream: KafkaStream[Message]) extends Thread with Logging {
val shutdownLatch = new CountDownLatch(1)
val props = new Properties()

View File

@ -20,6 +20,7 @@ package kafka.tools
import joptsimple._
import kafka.utils._
import kafka.consumer._
import kafka.client.ClientUtils
import kafka.api.{OffsetRequest, FetchRequestBuilder, Request}
import kafka.cluster.Broker
import scala.collection.JavaConversions._
@ -30,7 +31,7 @@ import scala.collection.JavaConversions._
*/
object SimpleConsumerShell extends Logging {
def USE_LEADER_REPLICA = -1
def UseLeaderReplica = -1
def main(args: Array[String]): Unit = {
@ -52,7 +53,7 @@ object SimpleConsumerShell extends Logging {
.withRequiredArg
.describedAs("replica id")
.ofType(classOf[java.lang.Integer])
.defaultsTo(USE_LEADER_REPLICA)
.defaultsTo(UseLeaderReplica)
val offsetOpt = parser.accepts("offset", "The offset id to consume from, default to -2 which means from beginning; while value -1 means from end")
.withOptionalArg()
.describedAs("consume offset")
@ -115,8 +116,8 @@ object SimpleConsumerShell extends Logging {
// getting topic metadata
info("Getting topic metatdata...")
val metadataTargetBrokers = Utils.getAllBrokersFromBrokerList(options.valueOf(brokerListOpt))
val topicsMetadata = Utils.getTopicMetadata(Set(topic), metadataTargetBrokers).topicsMetadata
val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers).topicsMetadata
if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata))
System.exit(1)
@ -133,7 +134,7 @@ object SimpleConsumerShell extends Logging {
// validating replica id and initializing target broker
var fetchTargetBroker: Broker = null
var replicaOpt: Option[Broker] = null
if(replicaId == USE_LEADER_REPLICA) {
if(replicaId == UseLeaderReplica) {
replicaOpt = partitionMetadataOpt.get.leader
if(!replicaOpt.isDefined) {
System.err.println("Error: user speicifies to fetch from leader for partition (%s, %d) which has not been elected yet".format(replicaId, topic, partitionId))

View File

@ -0,0 +1,20 @@
package kafka.utils
import joptsimple.{OptionSpec, OptionSet, OptionParser}
/**
* Helper functions for dealing with command line utilities
*/
object CommandLineUtils {
def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
for(arg <- required) {
if(!options.has(arg)) {
error("Missing required argument \"" + arg + "\"")
parser.printHelpOn(System.err)
System.exit(1)
}
}
}
}

View File

@ -0,0 +1,24 @@
package kafka.utils
import kafka.common._
import util.parsing.json.JSON
/**
* A wrapper that synchronizes JSON in scala, which is not threadsafe.
*/
object Json extends Logging {
val myConversionFunc = {input : String => input.toInt}
JSON.globalNumberParser = myConversionFunc
val lock = new Object
def parseFull(input: String): Option[Any] = {
lock synchronized {
try {
JSON.parseFull(input)
} catch {
case t =>
throw new KafkaException("Can't parse json string: %s".format(input), t)
}
}
}
}

View File

@ -26,26 +26,22 @@ import java.util.zip.CRC32
import javax.management._
import scala.collection._
import scala.collection.mutable
import org.I0Itec.zkclient.ZkClient
import java.util.{Random, Properties}
import joptsimple.{OptionSpec, OptionSet, OptionParser}
import java.util.Properties
import kafka.common.KafkaException
import kafka.cluster.Broker
import util.parsing.json.JSON
import kafka.api.RequestOrResponse
import kafka.api.{TopicMetadataRequest, TopicMetadataResponse}
import kafka.producer.{ProducerPool, SyncProducer}
/**
* Helper functions!
* General helper functions!
*
* This is for general helper functions that aren't specific to Kafka logic. Things that should have been included in
* the standard library etc.
*
* If you are making a new helper function and want to add it to this class please ensure the following:
* 1. It has documentation
* 2. It is the most general possible utility, not just the thing you needed in one particular place
* 3. You have tests for it if it is nontrivial in any way
*/
object Utils extends Logging {
val random = new Random
def getNextRandomInt(): Int = random.nextInt
def getNextRandomInt(upper: Int): Int = random.nextInt(upper)
/**
* Wrap the given function in a java.lang.Runnable
@ -151,55 +147,6 @@ object Utils extends Logging {
}
bytes
}
/**
* Read size prefixed string where the size is stored as a 2 byte short.
* @param buffer The buffer to read from
* @param encoding The encoding in which to read the string
*/
def readShortString(buffer: ByteBuffer, encoding: String = RequestOrResponse.DefaultCharset): String = {
val size: Int = buffer.getShort()
if(size < 0)
return null
val bytes = new Array[Byte](size)
buffer.get(bytes)
new String(bytes, encoding)
}
/**
* Write a size prefixed string where the size is stored as a 2 byte short
* @param buffer The buffer to write to
* @param string The string to write
* @param encoding The encoding in which to write the string
*/
def writeShortString(buffer: ByteBuffer, string: String, encoding: String = RequestOrResponse.DefaultCharset) {
if(string == null) {
buffer.putShort(-1)
} else if(string.length > Short.MaxValue) {
throw new KafkaException("String exceeds the maximum size of " + Short.MaxValue + ".")
} else {
buffer.putShort(string.length.asInstanceOf[Short])
buffer.put(string.getBytes(encoding))
}
}
/**
* Return size of a size prefixed string where the size is stored as a 2 byte short
* @param string The string to write
* @param encoding The encoding in which to write the string
*/
def shortStringLength(string: String, encoding: String = RequestOrResponse.DefaultCharset): Int = {
if(string == null) {
2
} else {
val encodedString = string.getBytes(encoding)
if(encodedString.length > Short.MaxValue) {
throw new KafkaException("String exceeds the maximum size of " + Short.MaxValue + ".")
} else {
2 + encodedString.length
}
}
}
/**
* Read a properties file from the given path
@ -212,27 +159,6 @@ object Utils extends Logging {
props
}
def getIntInRange(buffer: ByteBuffer, name: String, range: (Int, Int)): Int = {
val value = buffer.getInt
if(value < range._1 || value > range._2)
throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".")
else value
}
def getShortInRange(buffer: ByteBuffer, name: String, range: (Short, Short)): Short = {
val value = buffer.getShort
if(value < range._1 || value > range._2)
throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".")
else value
}
def getLongInRange(buffer: ByteBuffer, name: String, range: (Long, Long)): Long = {
val value = buffer.getLong
if(value < range._1 || value > range._2)
throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".")
else value
}
/**
* Open a channel for the given file
*/
@ -278,7 +204,7 @@ object Utils extends Logging {
* @param buffer The buffer to translate
* @param encoding The encoding to use in translating bytes to characters
*/
def toString(buffer: ByteBuffer, encoding: String): String = {
def readString(buffer: ByteBuffer, encoding: String): String = {
val bytes = new Array[Byte](buffer.remaining)
buffer.get(bytes)
new String(bytes, encoding)
@ -365,7 +291,7 @@ object Utils extends Logging {
* @param buffer The buffer to read from
* @return The integer read, as a long to avoid signedness
*/
def getUnsignedInt(buffer: ByteBuffer): Long =
def readUnsignedInt(buffer: ByteBuffer): Long =
buffer.getInt() & 0xffffffffL
/**
@ -375,7 +301,7 @@ object Utils extends Logging {
* @param index the index from which to read the integer
* @return The integer read, as a long to avoid signedness
*/
def getUnsignedInt(buffer: ByteBuffer, index: Int): Long =
def readUnsignedInt(buffer: ByteBuffer, index: Int): Long =
buffer.getInt(index) & 0xffffffffL
/**
@ -383,7 +309,7 @@ object Utils extends Logging {
* @param buffer The buffer to write to
* @param value The value to write
*/
def putUnsignedInt(buffer: ByteBuffer, value: Long): Unit =
def writetUnsignedInt(buffer: ByteBuffer, value: Long): Unit =
buffer.putInt((value & 0xffffffffL).asInstanceOf[Int])
/**
@ -392,7 +318,7 @@ object Utils extends Logging {
* @param index The position in the buffer at which to begin writing
* @param value The value to write
*/
def putUnsignedInt(buffer: ByteBuffer, index: Int, value: Long): Unit =
def writeUnsignedInt(buffer: ByteBuffer, index: Int, value: Long): Unit =
buffer.putInt(index, (value & 0xffffffffL).asInstanceOf[Int])
/**
@ -458,6 +384,10 @@ object Utils extends Logging {
}
}
/**
* Throw an exception if the given value is null, else return it. You can use this like:
* val myValue = Utils.notNull(expressionThatShouldntBeNull)
*/
def notNull[V](v: V) = {
if(v == null)
throw new KafkaException("Value cannot be null.")
@ -465,16 +395,17 @@ object Utils extends Logging {
v
}
def getHostPort(hostport: String) : (String, Int) = {
/**
* Parse a host and port out of a string
*/
def parseHostPort(hostport: String) : (String, Int) = {
val splits = hostport.split(":")
(splits(0), splits(1).toInt)
}
def getTopicPartition(topicPartition: String) : (String, Int) = {
val index = topicPartition.lastIndexOf('-')
(topicPartition.substring(0,index), topicPartition.substring(index+1).toInt)
}
/**
* Get the stack trace from an exception as a string
*/
def stackTrace(e: Throwable): String = {
val sw = new StringWriter;
val pw = new PrintWriter(sw);
@ -486,113 +417,30 @@ object Utils extends Logging {
* This method gets comma seperated values which contains key,value pairs and returns a map of
* key value pairs. the format of allCSVal is key1:val1, key2:val2 ....
*/
private def getCSVMap[K, V](allCSVals: String, exceptionMsg:String, successMsg:String) :Map[K, V] = {
val map = new mutable.HashMap[K, V]
if("".equals(allCSVals))
return map
val csVals = allCSVals.split(",")
for(i <- 0 until csVals.length)
{
try{
val tempSplit = csVals(i).split(":")
info(successMsg + tempSplit(0) + " : " + Integer.parseInt(tempSplit(1).trim))
map += tempSplit(0).asInstanceOf[K] -> Integer.parseInt(tempSplit(1).trim).asInstanceOf[V]
} catch {
case _ => error(exceptionMsg + ": " + csVals(i))
}
}
map
def parseCsvMap(str: String): Map[String, String] = {
val map = new mutable.HashMap[String, String]
if("".equals(str))
return map
val keyVals = str.split("\\s*,\\s*").map(s => s.split("\\s*:\\s*"))
keyVals.map(pair => (pair(0), pair(1))).toMap
}
def getCSVList(csvList: String): Seq[String] = {
/**
* Parse a comma separated string into a sequence of strings.
* Whitespace surrounding the comma will be removed.
*/
def parseCsvList(csvList: String): Seq[String] = {
if(csvList == null)
Seq.empty[String]
else {
csvList.split(",").filter(v => !v.equals(""))
csvList.split("\\s*,\\s*").filter(v => !v.equals(""))
}
}
def seqToCSV(seq: Seq[String]): String = {
var csvString = ""
for (i <- 0 until seq.size) {
if (i > 0)
csvString = csvString + ','
csvString = csvString + seq(i)
}
csvString
}
def getTopicRetentionHours(retentionHours: String) : Map[String, Int] = {
val exceptionMsg = "Malformed token for topic.log.retention.hours in server.properties: "
val successMsg = "The retention hours for "
val map: Map[String, Int] = getCSVMap(retentionHours, exceptionMsg, successMsg)
map.foreach{case(topic, hrs) =>
require(hrs > 0, "Log retention hours value for topic " + topic + " is " + hrs +
" which is not greater than 0.")}
map
}
def getTopicRollHours(rollHours: String) : Map[String, Int] = {
val exceptionMsg = "Malformed token for topic.log.roll.hours in server.properties: "
val successMsg = "The roll hours for "
val map: Map[String, Int] = getCSVMap(rollHours, exceptionMsg, successMsg)
map.foreach{case(topic, hrs) =>
require(hrs > 0, "Log roll hours value for topic " + topic + " is " + hrs +
" which is not greater than 0.")}
map
}
def getTopicFileSize(fileSizes: String): Map[String, Int] = {
val exceptionMsg = "Malformed token for topic.log.file.size in server.properties: "
val successMsg = "The log file size for "
val map: Map[String, Int] = getCSVMap(fileSizes, exceptionMsg, successMsg)
map.foreach{case(topic, size) =>
require(size > 0, "Log file size value for topic " + topic + " is " + size +
" which is not greater than 0.")}
map
}
def getTopicRetentionSize(retentionSizes: String): Map[String, Long] = {
val exceptionMsg = "Malformed token for topic.log.retention.size in server.properties: "
val successMsg = "The log retention size for "
val map: Map[String, Long] = getCSVMap(retentionSizes, exceptionMsg, successMsg)
map.foreach{case(topic, size) =>
require(size > 0, "Log retention size value for topic " + topic + " is " + size +
" which is not greater than 0.")}
map
}
def getTopicFlushIntervals(allIntervals: String) : Map[String, Int] = {
val exceptionMsg = "Malformed token for topic.flush.Intervals.ms in server.properties: "
val successMsg = "The flush interval for "
val map: Map[String, Int] = getCSVMap(allIntervals, exceptionMsg, successMsg)
map.foreach{case(topic, interval) =>
require(interval > 0, "Flush interval value for topic " + topic + " is " + interval +
" ms which is not greater than 0.")}
map
}
def getTopicPartitions(allPartitions: String) : Map[String, Int] = {
val exceptionMsg = "Malformed token for topic.partition.counts in server.properties: "
val successMsg = "The number of partitions for topic "
val map: Map[String, Int] = getCSVMap(allPartitions, exceptionMsg, successMsg)
map.foreach{case(topic, count) =>
require(count > 0, "The number of partitions for topic " + topic + " is " + count +
" which is not greater than 0.")}
map
}
def getConsumerTopicMap(consumerTopicString: String) : Map[String, Int] = {
val exceptionMsg = "Malformed token for embeddedconsumer.topics in consumer.properties: "
val successMsg = "The number of consumer threads for topic "
val map: Map[String, Int] = getCSVMap(consumerTopicString, exceptionMsg, successMsg)
map.foreach{case(topic, count) =>
require(count > 0, "The number of consumer threads for topic " + topic + " is " + count +
" which is not greater than 0.")}
map
}
def getObject[T<:AnyRef](className: String): T = {
/**
* Create an instance of the class with the given class name
*/
def createObject[T<:AnyRef](className: String): T = {
className match {
case null => null.asInstanceOf[T]
case _ =>
@ -604,27 +452,15 @@ object Utils extends Logging {
}
}
def propertyExists(prop: String): Boolean = {
if(prop == null)
false
else if(prop.compareTo("") == 0)
false
else true
}
/**
* Is the given string null or empty ("")?
*/
def nullOrEmpty(s: String): Boolean = s == null || s.equals("")
def tryCleanupZookeeper(zkUrl: String, groupId: String) {
try {
val dir = "/consumers/" + groupId
info("Cleaning up temporary zookeeper data under " + dir + ".")
val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
zk.deleteRecursive(dir)
zk.close()
} catch {
case _ => // swallow
}
}
def stringMapToJsonString(jsonDataMap: Map[String, String]): String = {
/**
* Format a Map[String, String] as JSON
*/
def stringMapToJson(jsonDataMap: Map[String, String]): String = {
val builder = new StringBuilder
builder.append("{ ")
var numElements = 0
@ -639,6 +475,9 @@ object Utils extends Logging {
builder.toString
}
/**
* Format an arbitrary map as JSON
*/
def mapToJson[T <: Any](map: Map[String, Seq[String]]): String = {
val builder = new StringBuilder
builder.append("{ ")
@ -654,6 +493,9 @@ object Utils extends Logging {
builder.toString
}
/**
* Format a string array as json
*/
def arrayToJson[T <: Any](arr: Array[String]): String = {
val builder = new StringBuilder
builder.append("[ ")
@ -668,57 +510,6 @@ object Utils extends Logging {
builder.toString
}
def getAllBrokersFromBrokerList(brokerListStr: String): Seq[Broker] = {
val brokersStr = Utils.getCSVList(brokerListStr)
brokersStr.zipWithIndex.map(b =>{
val brokerStr = b._1
val brokerId = b._2
val brokerInfos = brokerStr.split(":")
val hostName = brokerInfos(0)
val port = brokerInfos(1).toInt
val creatorId = hostName + "-" + System.currentTimeMillis()
new Broker(brokerId, creatorId, hostName, port)
})
}
def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
for(arg <- required) {
if(!options.has(arg)) {
error("Missing required argument \"" + arg + "\"")
parser.printHelpOn(System.err)
System.exit(1)
}
}
}
def getTopicMetadata(topics: Set[String], brokers: Seq[Broker]): TopicMetadataResponse = {
var fetchMetaDataSucceeded: Boolean = false
var i: Int = 0
val topicMetadataRequest = new TopicMetadataRequest(topics.toSeq)
var topicMetadataResponse: TopicMetadataResponse = null
var t: Throwable = null
while(i < brokers.size && !fetchMetaDataSucceeded) {
val producer: SyncProducer = ProducerPool.createSyncProducer(None, brokers(i))
info("Fetching metadata for topic %s".format(topics))
try {
topicMetadataResponse = producer.send(topicMetadataRequest)
fetchMetaDataSucceeded = true
}
catch {
case e =>
warn("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, brokers(i).toString), e)
t = e
} finally {
i = i + 1
producer.close()
}
}
if(!fetchMetaDataSucceeded){
throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, brokers), t)
}
return topicMetadataResponse
}
/**
* Create a circular (looping) iterator over a collection.
@ -731,35 +522,25 @@ object Utils extends Logging {
stream.iterator
}
def readFileIntoString(path: String): String = {
/**
* Attempt to read a file as a string
*/
def readFileAsString(path: String, charset: Charset = Charset.defaultCharset()): String = {
val stream = new FileInputStream(new File(path))
try {
val fc = stream.getChannel()
val bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size())
Charset.defaultCharset().decode(bb).toString()
charset.decode(bb).toString()
}
finally {
stream.close()
}
}
}
/**
* A wrapper that synchronizes JSON in scala, which is not threadsafe.
*/
object SyncJSON extends Logging {
val myConversionFunc = {input : String => input.toInt}
JSON.globalNumberParser = myConversionFunc
val lock = new Object
def parseFull(input: String): Option[Any] = {
lock synchronized {
try {
JSON.parseFull(input)
} catch {
case t =>
throw new KafkaException("Can't parse json string: %s".format(input), t)
}
}
}
/**
* Get the absolute value of the given number. If the number is Int.MinValue return 0.
* This is different from java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!).
*/
def abs(n: Int) = n & 0x7fffffff
}

View File

@ -18,7 +18,7 @@
package kafka.utils
import java.util.Properties
import collection.mutable
import scala.collection._
class VerifiableProperties(val props: Properties) extends Logging {
private val referenceSet = mutable.HashSet[String]()
@ -156,6 +156,23 @@ class VerifiableProperties(val props: Properties) extends Logging {
require(containsKey(name), "Missing required property '" + name + "'")
getProperty(name)
}
/**
* Get a Map[String, String] from a property list in the form k1:v2, k2:v2, ...
*/
def getMap(name: String, valid: String => Boolean): Map[String, String] = {
try {
val m = Utils.parseCsvMap(getString(name, ""))
m.foreach {
case(key, value) =>
if(!valid(value))
throw new IllegalArgumentException("Invalid entry '%s' = '%s' for property '%s'".format(key, value, name))
}
m
} catch {
case e: Exception => throw new IllegalArgumentException("Error parsing configuration property '%s': %s".format(name, e.getMessage))
}
}
def verify() {
info("Verifying properties")

View File

@ -75,8 +75,8 @@ object ZkUtils extends Logging {
}
def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = {
val leaderAndISRPath = getTopicPartitionLeaderAndIsrPath(topic, partition)
val leaderAndIsrInfo = readDataMaybeNull(zkClient, leaderAndISRPath)
val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition)
val leaderAndIsrInfo = readDataMaybeNull(zkClient, leaderAndIsrPath)
val leaderAndIsrOpt = leaderAndIsrInfo._1
val stat = leaderAndIsrInfo._2
leaderAndIsrOpt match {
@ -86,12 +86,12 @@ object ZkUtils extends Logging {
}
def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat): Option[LeaderAndIsr] = {
SyncJSON.parseFull(leaderAndIsrStr) match {
Json.parseFull(leaderAndIsrStr) match {
case Some(m) =>
val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get
val isr = Utils.getCSVList(isrString).map(r => r.toInt)
val isr = Utils.parseCsvList(isrString).map(r => r.toInt)
val zkPathVersion = stat.getVersion
debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch,
isr.toString(), zkPathVersion, topic, partition))
@ -104,7 +104,7 @@ object ZkUtils extends Logging {
val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic, partition))._1
leaderAndIsrOpt match {
case Some(leaderAndIsr) =>
SyncJSON.parseFull(leaderAndIsr) match {
Json.parseFull(leaderAndIsr) match {
case Some(m) =>
Some(m.asInstanceOf[Map[String, String]].get("leader").get.toInt)
case None => None
@ -122,7 +122,7 @@ object ZkUtils extends Logging {
val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic, partition))._1
leaderAndIsrOpt match {
case Some(leaderAndIsr) =>
SyncJSON.parseFull(leaderAndIsr) match {
Json.parseFull(leaderAndIsr) match {
case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data for topic %s partition %d is invalid".format(topic, partition))
case Some(m) => m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
}
@ -138,10 +138,10 @@ object ZkUtils extends Logging {
val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic, partition))._1
leaderAndIsrOpt match {
case Some(leaderAndIsr) =>
SyncJSON.parseFull(leaderAndIsr) match {
Json.parseFull(leaderAndIsr) match {
case Some(m) =>
val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get
Utils.getCSVList(ISRString).map(r => r.toInt)
val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get
Utils.parseCsvList(isrString).map(r => r.toInt)
case None => Seq.empty[Int]
}
case None => Seq.empty[Int]
@ -155,7 +155,7 @@ object ZkUtils extends Logging {
val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1
jsonPartitionMapOpt match {
case Some(jsonPartitionMap) =>
SyncJSON.parseFull(jsonPartitionMap) match {
Json.parseFull(jsonPartitionMap) match {
case Some(m) => m.asInstanceOf[Map[String, List[String]]].get(partition.toString) match {
case None => Seq.empty[Int]
case Some(seq) => seq.map(_.toInt)
@ -328,7 +328,7 @@ object ZkUtils extends Logging {
case e2 => throw e2
}
}
def deletePath(client: ZkClient, path: String): Boolean = {
try {
client.delete(path)
@ -351,6 +351,16 @@ object ZkUtils extends Logging {
case e2 => throw e2
}
}
def maybeDeletePath(zkUrl: String, dir: String) {
try {
val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
zk.deleteRecursive(dir)
zk.close()
} catch {
case _ => // swallow
}
}
def readData(client: ZkClient, path: String): (String, Stat) = {
val stat: Stat = new Stat()
@ -413,7 +423,7 @@ object ZkUtils extends Logging {
val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1
jsonPartitionMapOpt match {
case Some(jsonPartitionMap) =>
SyncJSON.parseFull(jsonPartitionMap) match {
Json.parseFull(jsonPartitionMap) match {
case Some(m) =>
val replicaMap = m.asInstanceOf[Map[String, Seq[String]]]
for((partition, replicas) <- replicaMap){
@ -449,7 +459,7 @@ object ZkUtils extends Logging {
val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1
jsonPartitionMapOpt match {
case Some(jsonPartitionMap) =>
SyncJSON.parseFull(jsonPartitionMap) match {
Json.parseFull(jsonPartitionMap) match {
case Some(m) =>
val replicaMap = m.asInstanceOf[Map[String, Seq[String]]]
for((partition, replicas) <- replicaMap){
@ -471,7 +481,7 @@ object ZkUtils extends Logging {
val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1
val partitionMap = jsonPartitionMapOpt match {
case Some(jsonPartitionMap) =>
SyncJSON.parseFull(jsonPartitionMap) match {
Json.parseFull(jsonPartitionMap) match {
case Some(m) =>
val m1 = m.asInstanceOf[Map[String, Seq[String]]]
m1.map(p => (p._1.toInt, p._2.map(_.toInt)))
@ -535,7 +545,7 @@ object ZkUtils extends Logging {
}
def parsePartitionReassignmentData(jsonData: String):Map[TopicAndPartition, Seq[Int]] = {
SyncJSON.parseFull(jsonData) match {
Json.parseFull(jsonData) match {
case Some(m) =>
val replicaMap = m.asInstanceOf[Map[String, Seq[String]]]
replicaMap.map { reassignedPartitions =>
@ -590,7 +600,7 @@ object ZkUtils extends Logging {
}
def parsePreferredReplicaElectionData(jsonData: String):Set[TopicAndPartition] = {
SyncJSON.parseFull(jsonData) match {
Json.parseFull(jsonData) match {
case Some(m) =>
val topicAndPartitions = m.asInstanceOf[Array[Map[String, String]]]
val partitions = topicAndPartitions.map { p =>

View File

@ -62,7 +62,7 @@ private class ConsumerThread(stream: KafkaStream[Message]) extends Thread {
override def run() {
println("Starting consumer thread..")
for (messageAndMetadata <- stream) {
println("consumed: " + Utils.toString(messageAndMetadata.message.payload, "UTF-8"))
println("consumed: " + Utils.readString(messageAndMetadata.message.payload, "UTF-8"))
}
shutdownLatch.countDown
println("thread shutdown !" )

View File

@ -82,18 +82,18 @@ object SerializationTestUtils{
private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq)
private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq)
def createTestLeaderAndISRRequest() : LeaderAndIsrRequest = {
val leaderAndISR1 = new LeaderAndIsr(leader1, 1, isr1, 1)
val leaderAndISR2 = new LeaderAndIsr(leader2, 1, isr2, 2)
val map = Map(((topic1, 0), PartitionStateInfo(leaderAndISR1, 3)),
((topic2, 0), PartitionStateInfo(leaderAndISR2, 3)))
def createTestLeaderAndIsrRequest() : LeaderAndIsrRequest = {
val leaderAndIsr1 = new LeaderAndIsr(leader1, 1, isr1, 1)
val leaderAndIsr2 = new LeaderAndIsr(leader2, 1, isr2, 2)
val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3)),
((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3)))
new LeaderAndIsrRequest(map)
}
def createTestLeaderAndISRResponse() : LeaderAndISRResponse = {
def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = {
val responseMap = Map(((topic1, 0), ErrorMapping.NoError),
((topic2, 0), ErrorMapping.NoError))
new LeaderAndISRResponse(1, responseMap)
new LeaderAndIsrResponse(1, responseMap)
}
def createTestStopReplicaRequest() : StopReplicaRequest = {
@ -145,8 +145,8 @@ object SerializationTestUtils{
}
class RequestResponseSerializationTest extends JUnitSuite {
private val leaderAndISRRequest = SerializationTestUtils.createTestLeaderAndISRRequest
private val leaderAndISRResponse = SerializationTestUtils.createTestLeaderAndISRResponse
private val leaderAndIsrRequest = SerializationTestUtils.createTestLeaderAndIsrRequest
private val leaderAndIsrResponse = SerializationTestUtils.createTestLeaderAndIsrResponse
private val stopReplicaRequest = SerializationTestUtils.createTestStopReplicaRequest
private val stopReplicaResponse = SerializationTestUtils.createTestStopReplicaResponse
private val producerRequest = SerializationTestUtils.createTestProducerRequest
@ -160,19 +160,19 @@ class RequestResponseSerializationTest extends JUnitSuite {
@Test
def testSerializationAndDeserialization() {
var buffer: ByteBuffer = ByteBuffer.allocate(leaderAndISRRequest.sizeInBytes())
leaderAndISRRequest.writeTo(buffer)
var buffer: ByteBuffer = ByteBuffer.allocate(leaderAndIsrRequest.sizeInBytes())
leaderAndIsrRequest.writeTo(buffer)
buffer.rewind()
val deserializedLeaderAndISRRequest = LeaderAndIsrRequest.readFrom(buffer)
assertEquals("The original and deserialzed leaderAndISRRequest should be the same", leaderAndISRRequest,
deserializedLeaderAndISRRequest)
val deserializedLeaderAndIsrRequest = LeaderAndIsrRequest.readFrom(buffer)
assertEquals("The original and deserialzed leaderAndISRRequest should be the same", leaderAndIsrRequest,
deserializedLeaderAndIsrRequest)
buffer = ByteBuffer.allocate(leaderAndISRResponse.sizeInBytes())
leaderAndISRResponse.writeTo(buffer)
buffer = ByteBuffer.allocate(leaderAndIsrResponse.sizeInBytes())
leaderAndIsrResponse.writeTo(buffer)
buffer.rewind()
val deserializedLeaderAndISRResponse = LeaderAndISRResponse.readFrom(buffer)
assertEquals("The original and deserialzed leaderAndISRResponse should be the same", leaderAndISRResponse,
deserializedLeaderAndISRResponse)
val deserializedLeaderAndIsrResponse = LeaderAndIsrResponse.readFrom(buffer)
assertEquals("The original and deserialzed leaderAndISRResponse should be the same", leaderAndIsrResponse,
deserializedLeaderAndIsrResponse)
buffer = ByteBuffer.allocate(stopReplicaRequest.sizeInBytes())
stopReplicaRequest.writeTo(buffer)

View File

@ -286,7 +286,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
// send some messages to each broker
val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, NoCompressionCodec)
val sentMessages2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, NoCompressionCodec)
val sentMessages = (sentMessages1 ++ sentMessages2).map(m => Utils.toString(m.payload, "UTF-8")).
val sentMessages = (sentMessages1 ++ sentMessages2).map(m => Utils.readString(m.payload, "UTF-8")).
sortWith((s, t) => s.compare(t) == -1)
val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
@ -401,7 +401,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
assertTrue(iterator.hasNext)
val message = iterator.next.message
messages ::= message
debug("received message: " + Utils.toString(message.payload, "UTF-8"))
debug("received message: " + Utils.readString(message.payload, "UTF-8"))
}
}
}

View File

@ -105,7 +105,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
assertTrue(iterator.hasNext)
val message = iterator.next.message
messages ::= message
debug("received message: " + Utils.toString(message.payload, "UTF-8"))
debug("received message: " + Utils.readString(message.payload, "UTF-8"))
}
}
}

View File

@ -159,7 +159,7 @@ class LogManagerTest extends JUnit3Suite {
override val logFileSize = 1024 *1024 *1024
override val flushSchedulerThreadRate = 50
override val flushInterval = Int.MaxValue
override val flushIntervalMap = Utils.getTopicFlushIntervals("timebasedflush:100")
override val flushIntervalMap = Map("timebasedflush" -> 100)
}
logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, maxLogAge, false)
logManager.startup

View File

@ -66,7 +66,7 @@ class MessageTest extends JUnitSuite {
assertTrue("Auto-computed checksum should be valid", v.message.isValid)
// garble checksum
val badChecksum: Int = (v.message.checksum + 1 % Int.MaxValue).toInt
Utils.putUnsignedInt(v.message.buffer, Message.CrcOffset, badChecksum)
Utils.writeUnsignedInt(v.message.buffer, Message.CrcOffset, badChecksum)
assertFalse("Message with invalid checksum should be invalid", v.message.isValid)
}
}

View File

@ -25,21 +25,21 @@ import kafka.log.Log
import org.junit.Assert._
import kafka.utils._
class ISRExpirationTest extends JUnit3Suite {
class IsrExpirationTest extends JUnit3Suite {
var topicPartitionISR: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
override val replicaMaxLagTimeMs = 100L
override val replicaMaxLagBytes = 10L
})
val topic = "foo"
def testISRExpirationForStuckFollowers() {
def testIsrExpirationForStuckFollowers() {
val time = new MockTime
val log = getLogWithLogEndOffset(15L, 2) // set logEndOffset for leader to 15L
// create one partition and all replicas
val partition0 = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head, log)
val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
val leaderReplica = partition0.getReplica(configs.head.brokerId).get
@ -58,12 +58,12 @@ class ISRExpirationTest extends JUnit3Suite {
EasyMock.verify(log)
}
def testISRExpirationForSlowFollowers() {
def testIsrExpirationForSlowFollowers() {
val time = new MockTime
// create leader replica
val log = getLogWithLogEndOffset(15L, 1)
// add one partition
val partition0 = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head, log)
val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
val leaderReplica = partition0.getReplica(configs.head.brokerId).get
// set remote replicas leo to something low, like 4
@ -77,7 +77,7 @@ class ISRExpirationTest extends JUnit3Suite {
EasyMock.verify(log)
}
private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, config: KafkaConfig,
private def getPartitionWithAllReplicasInIsr(topic: String, partitionId: Int, time: Time, config: KafkaConfig,
localLog: Log): Partition = {
val leaderId=config.brokerId
val replicaManager = new ReplicaManager(config, time, null, null, null)

View File

@ -379,18 +379,18 @@ object TestUtils extends Logging {
val partition = leaderForPartition._1
val leader = leaderForPartition._2
try{
val currentLeaderAndISROpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
var newLeaderAndISR: LeaderAndIsr = null
if(currentLeaderAndISROpt == None)
newLeaderAndISR = new LeaderAndIsr(leader, List(leader))
val currentLeaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
var newLeaderAndIsr: LeaderAndIsr = null
if(currentLeaderAndIsrOpt == None)
newLeaderAndIsr = new LeaderAndIsr(leader, List(leader))
else{
newLeaderAndISR = currentLeaderAndISROpt.get
newLeaderAndISR.leader = leader
newLeaderAndISR.leaderEpoch += 1
newLeaderAndISR.zkVersion += 1
newLeaderAndIsr = currentLeaderAndIsrOpt.get
newLeaderAndIsr.leader = leader
newLeaderAndIsr.leaderEpoch += 1
newLeaderAndIsr.zkVersion += 1
}
ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
newLeaderAndISR.toString)
newLeaderAndIsr.toString)
} catch {
case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe)
}

View File

@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicLong
import java.nio.channels.ClosedByInterruptException
import org.apache.log4j.Logger
import kafka.message.Message
import kafka.utils.Utils
import kafka.utils.ZkUtils
import java.util.{Random, Properties}
import kafka.consumer._
import java.text.SimpleDateFormat
@ -48,7 +48,7 @@ object ConsumerPerformance {
}
// clean up zookeeper state for this group id for every perf run
Utils.tryCleanupZookeeper(config.consumerConfig.zkConnect, config.consumerConfig.groupId)
ZkUtils.maybeDeletePath(config.consumerConfig.zkConnect, "/consumers/" + config.consumerConfig.groupId)
val consumerConnector: ConsumerConnector = Consumer.create(config.consumerConfig)