KAFKA-554 Dynamic per-topic configuration. This patch adds a mechanism for storing per-topic configurations in zookeeper and dynamically making config changes across the cluster. Reviewed by Neha and Jun.

This commit is contained in:
Jay Kreps 2013-03-08 15:07:39 -08:00
parent 4f2742d60d
commit c1ed12e44d
44 changed files with 906 additions and 496 deletions

View File

@ -1,19 +0,0 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
base_dir=$(dirname $0)
export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
$base_dir/kafka-run-class.sh kafka.admin.CreateTopicCommand $@

View File

@ -1,19 +0,0 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
base_dir=$(dirname $0)
export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
$base_dir/kafka-run-class.sh kafka.admin.DeleteTopicCommand $@

View File

@ -16,4 +16,4 @@
base_dir=$(dirname $0)
export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
$base_dir/kafka-run-class.sh kafka.admin.ListTopicCommand $@
$base_dir/kafka-run-class.sh kafka.admin.TopicCommand $@

View File

@ -13,23 +13,11 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
*/
package kafka.common
package kafka.admin
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{ZKStringSerializer, ZKConfig}
import java.util.concurrent.atomic.AtomicReference
object KafkaZookeeperClient {
private val INSTANCE = new AtomicReference[ZkClient](null)
def getZookeeperClient(config: ZKConfig): ZkClient = {
// TODO: This cannot be a singleton since unit tests break if we do that
// INSTANCE.compareAndSet(null, new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
// ZKStringSerializer))
INSTANCE.set(new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
ZKStringSerializer))
INSTANCE.get()
}
class AdminOperationException(val error: String, cause: Throwable) extends RuntimeException(error, cause) {
def this(error: Throwable) = this(error.getMessage, error)
def this(msg: String) = this(msg, null)
}

View File

@ -18,9 +18,13 @@
package kafka.admin
import java.util.Random
import java.util.Properties
import kafka.api.{TopicMetadata, PartitionMetadata}
import kafka.cluster.Broker
import kafka.utils.{Logging, ZkUtils}
import kafka.log.LogConfig
import kafka.server.TopicConfigManager
import kafka.utils.{Logging, Utils, ZkUtils, Json}
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import scala.collection._
@ -30,7 +34,7 @@ import scala.Some
object AdminUtils extends Logging {
val rand = new Random
val AdminEpoch = -1
val TopicConfigChangeZnodePrefix = "config_change_"
/**
* There are 2 goals of replica assignment:
@ -50,33 +54,74 @@ object AdminUtils extends Logging {
* p3 p4 p0 p1 p2 (3nd replica)
* p7 p8 p9 p5 p6 (3nd replica)
*/
def assignReplicasToBrokers(brokerList: Seq[Int], nPartitions: Int, replicationFactor: Int,
def assignReplicasToBrokers(brokers: Seq[Int],
partitions: Int,
replicationFactor: Int,
fixedStartIndex: Int = -1) // for testing only
: Map[Int, Seq[Int]] = {
if (nPartitions <= 0)
throw new AdministrationException("number of partitions must be larger than 0")
if (partitions <= 0)
throw new AdminOperationException("number of partitions must be larger than 0")
if (replicationFactor <= 0)
throw new AdministrationException("replication factor must be larger than 0")
if (replicationFactor > brokerList.size)
throw new AdministrationException("replication factor: " + replicationFactor +
" larger than available brokers: " + brokerList.size)
throw new AdminOperationException("replication factor must be larger than 0")
if (replicationFactor > brokers.size)
throw new AdminOperationException("replication factor: " + replicationFactor +
" larger than available brokers: " + brokers.size)
val ret = new mutable.HashMap[Int, List[Int]]()
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokers.size)
var secondReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
for (i <- 0 until nPartitions) {
if (i > 0 && (i % brokerList.size == 0))
var secondReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokers.size)
for (i <- 0 until partitions) {
if (i > 0 && (i % brokers.size == 0))
secondReplicaShift += 1
val firstReplicaIndex = (i + startIndex) % brokerList.size
var replicaList = List(brokerList(firstReplicaIndex))
val firstReplicaIndex = (i + startIndex) % brokers.size
var replicaList = List(brokers(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, secondReplicaShift, j, brokerList.size))
replicaList ::= brokers(replicaIndex(firstReplicaIndex, secondReplicaShift, j, brokers.size))
ret.put(i, replicaList.reverse)
}
ret.toMap
}
def createTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, Seq[Int]], zkClient: ZkClient) {
def deleteTopic(zkClient: ZkClient, topic: String) {
zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic))
}
def topicExists(zkClient: ZkClient, topic: String): Boolean =
zkClient.exists(ZkUtils.getTopicPath(topic))
def createTopic(zkClient: ZkClient,
topic: String,
partitions: Int,
replicationFactor: Int,
topicConfig: Properties = new Properties) {
val brokerList = ZkUtils.getSortedBrokerList(zkClient)
val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor)
AdminUtils.createTopicWithAssignment(zkClient, topic, replicaAssignment, topicConfig)
}
def createTopicWithAssignment(zkClient: ZkClient,
topic: String,
partitionReplicaAssignment: Map[Int, Seq[Int]],
config: Properties = new Properties) {
// validate arguments
Topic.validate(topic)
LogConfig.validate(config)
require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, "All partitions should have the same number of replicas.")
val topicPath = ZkUtils.getTopicPath(topic)
if(zkClient.exists(topicPath))
throw new TopicExistsException("Topic \"%s\" already exists.".format(topic))
partitionReplicaAssignment.values.foreach(reps => require(reps.size == reps.toSet.size, "Duplicate replica assignment found: " + partitionReplicaAssignment))
// write out the config if there is any, this isn't transactional with the partition assignments
writeTopicConfig(zkClient, topic, config)
// create the partition assignment
writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment)
}
private def writeTopicPartitionAssignment(zkClient: ZkClient, topic: String, replicaAssignment: Map[Int, Seq[Int]]) {
try {
val zkPath = ZkUtils.getTopicPath(topic)
val jsonPartitionData = ZkUtils.replicaAssignmentZkdata(replicaAssignment.map(e => (e._1.toString -> e._2)))
@ -84,10 +129,62 @@ object AdminUtils extends Logging {
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
} catch {
case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
case e2 => throw new AdministrationException(e2.toString)
case e2 => throw new AdminOperationException(e2.toString)
}
}
/**
* Update the config for an existing topic and create a change notification so the change will propagate to other brokers
*/
def changeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) {
LogConfig.validate(config)
if(!topicExists(zkClient, topic))
throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic))
// write the new config--may not exist if there were previously no overrides
writeTopicConfig(zkClient, topic, config)
// create the change notification
zkClient.createPersistentSequential(ZkUtils.TopicConfigChangesPath + "/" + TopicConfigChangeZnodePrefix, Json.encode(topic))
}
/**
* Write out the topic config to zk, if there is any
*/
private def writeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) {
if(config.size > 0) {
val map = Map("version" -> 1, "config" -> JavaConversions.asMap(config))
ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Json.encode(map))
}
}
/**
* Read the topic config (if any) from zk
*/
def fetchTopicConfig(zkClient: ZkClient, topic: String): Properties = {
val str: String = zkClient.readData(ZkUtils.getTopicConfigPath(topic), true)
val props = new Properties()
if(str != null) {
Json.parseFull(str) match {
case None => // there are no config overrides
case Some(map: Map[String, _]) =>
require(map("version") == 1)
map.get("config") match {
case Some(config: Map[String, String]) =>
for((k,v) <- config)
props.setProperty(k, v)
case _ => throw new IllegalArgumentException("Invalid topic config: " + str)
}
case o => throw new IllegalArgumentException("Unexpected value in config: " + str)
}
}
props
}
def fetchAllTopicConfigs(zkClient: ZkClient): Map[String, Properties] =
ZkUtils.getAllTopics(zkClient).map(topic => (topic, fetchTopicConfig(zkClient, topic))).toMap
def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata =
fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker])
@ -158,12 +255,8 @@ object AdminUtils extends Logging {
}
}
private def getWrappedIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
(firstReplicaIndex + shift) % nBrokers
}
}
class AdministrationException(val errorMessage: String) extends RuntimeException(errorMessage) {
def this() = this(null)
}

View File

@ -1,117 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import joptsimple.OptionParser
import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import scala.collection.mutable
import kafka.common.Topic
object CreateTopicCommand extends Logging {
def main(args: Array[String]): Unit = {
val parser = new OptionParser
val topicOpt = parser.accepts("topic", "REQUIRED: The topic to be created.")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
"Multiple URLS can be given to allow fail-over.")
.withRequiredArg
.describedAs("urls")
.ofType(classOf[String])
val nPartitionsOpt = parser.accepts("partition", "number of partitions in the topic")
.withRequiredArg
.describedAs("# of partitions")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
val replicationFactorOpt = parser.accepts("replica", "replication factor for each partitions in the topic")
.withRequiredArg
.describedAs("replication factor")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "for manually assigning replicas to brokers")
.withRequiredArg
.describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2, " +
"broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...")
.ofType(classOf[String])
.defaultsTo("")
val options = parser.parse(args : _*)
for(arg <- List(topicOpt, zkConnectOpt)) {
if(!options.has(arg)) {
System.err.println("Missing required argument \"" + arg + "\"")
parser.printHelpOn(System.err)
System.exit(1)
}
}
val topic = options.valueOf(topicOpt)
val zkConnect = options.valueOf(zkConnectOpt)
val nPartitions = options.valueOf(nPartitionsOpt).intValue
val replicationFactor = options.valueOf(replicationFactorOpt).intValue
val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt)
var zkClient: ZkClient = null
try {
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr)
println("creation succeeded!")
} catch {
case e =>
println("creation failed because of " + e.getMessage)
println(Utils.stackTrace(e))
} finally {
if (zkClient != null)
zkClient.close()
}
}
def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "") {
Topic.validate(topic)
val brokerList = ZkUtils.getSortedBrokerList(zkClient)
val partitionReplicaAssignment = if (replicaAssignmentStr == "")
AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor)
else
getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet)
debug("Replica assignment list for %s is %s".format(topic, partitionReplicaAssignment))
AdminUtils.createTopicPartitionAssignmentPathInZK(topic, partitionReplicaAssignment, zkClient)
}
def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int]): Map[Int, List[Int]] = {
val partitionList = replicaAssignmentList.split(",")
val ret = new mutable.HashMap[Int, List[Int]]()
for (i <- 0 until partitionList.size) {
val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
if (brokerList.size <= 0)
throw new AdministrationException("replication factor must be larger than 0")
if (brokerList.size != brokerList.toSet.size)
throw new AdministrationException("duplicate brokers in replica assignment: " + brokerList)
if (!brokerList.toSet.subsetOf(availableBrokerList))
throw new AdministrationException("some specified brokers not available. specified brokers: " + brokerList.toString +
"available broker:" + availableBrokerList.toString)
ret.put(i, brokerList.toList)
if (ret(i).size != ret(0).size)
throw new AdministrationException("partition " + i + " has different replication factor: " + brokerList)
}
ret.toMap
}
}

View File

@ -85,7 +85,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt
TopicAndPartition(topic, partition)
}
case None => throw new AdministrationException("Preferred replica election data is empty")
case None => throw new AdminOperationException("Preferred replica election data is empty")
}
}
@ -102,9 +102,9 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
case nee: ZkNodeExistsException =>
val partitionsUndergoingPreferredReplicaElection =
PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(ZkUtils.readData(zkClient, zkPath)._1)
throw new AdministrationException("Preferred replica leader election currently in progress for " +
throw new AdminOperationException("Preferred replica leader election currently in progress for " +
"%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection))
case e2 => throw new AdministrationException(e2.toString)
case e2 => throw new AdminOperationException(e2.toString)
}
}
}

View File

@ -0,0 +1,185 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import joptsimple._
import java.util.Properties
import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import scala.collection._
import scala.collection.JavaConversions._
import kafka.common.Topic
import kafka.cluster.Broker
object TopicCommand {
def main(args: Array[String]): Unit = {
val opts = new TopicCommandOptions(args)
// should have exactly one action
val actions = Seq(opts.createOpt, opts.deleteOpt, opts.listOpt, opts.alterOpt, opts.describeOpt).count(opts.options.has _)
if(actions != 1) {
System.err.println("Command must include exactly one action: --list, --describe, --create, --delete, or --alter")
opts.parser.printHelpOn(System.err)
System.exit(1)
}
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)
val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer)
if(opts.options.has(opts.createOpt))
createTopic(zkClient, opts)
else if(opts.options.has(opts.alterOpt))
alterTopic(zkClient, opts)
else if(opts.options.has(opts.deleteOpt))
deleteTopic(zkClient, opts)
else if(opts.options.has(opts.listOpt))
listTopics(zkClient)
else if(opts.options.has(opts.describeOpt))
describeTopic(zkClient, opts)
zkClient.close()
}
def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
val topics = opts.options.valuesOf(opts.topicOpt)
val configs = parseTopicConfigs(opts)
for (topic <- topics) {
if (opts.options.has(opts.replicaAssignmentOpt)) {
val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
AdminUtils.createTopicWithAssignment(zkClient, topic, assignment, configs)
} else {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs)
}
println("Created topic \"%s\".".format(topic))
}
}
def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
val topics = opts.options.valuesOf(opts.topicOpt)
val configs = parseTopicConfigs(opts)
if(opts.options.has(opts.partitionsOpt))
Utils.croak("Changing the number of partitions is not supported.")
if(opts.options.has(opts.replicationFactorOpt))
Utils.croak("Changing the replication factor is not supported.")
for(topic <- topics) {
AdminUtils.changeTopicConfig(zkClient, topic, configs)
println("Updated config for topic \"%s\".".format(topic))
}
}
def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
for(topic <- opts.options.valuesOf(opts.topicOpt)) {
AdminUtils.deleteTopic(zkClient, topic)
println("Topic \"%s\" deleted.".format(topic))
}
}
def listTopics(zkClient: ZkClient) {
for(topic <- ZkUtils.getAllTopics(zkClient).sorted)
println(topic)
}
def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
val topics = opts.options.valuesOf(opts.topicOpt)
val metadata = AdminUtils.fetchTopicMetadataFromZk(topics.toSet, zkClient)
for(md <- metadata) {
println(md.topic)
val config = AdminUtils.fetchTopicConfig(zkClient, md.topic)
println("\tconfigs: " + config.map(kv => kv._1 + " = " + kv._2).mkString(", "))
println("\tpartitions: " + md.partitionsMetadata.size)
for(pd <- md.partitionsMetadata) {
println("\t\tpartition " + pd.partitionId)
println("\t\tleader: " + (if(pd.leader.isDefined) formatBroker(pd.leader.get) else "none"))
println("\t\treplicas: " + pd.replicas.map(formatBroker).mkString(", "))
println("\t\tisr: " + pd.isr.map(formatBroker).mkString(", "))
}
}
}
def formatBroker(broker: Broker) = broker.id + " (" + broker.host + ":" + broker.port + ")"
def parseTopicConfigs(opts: TopicCommandOptions): Properties = {
val configs = opts.options.valuesOf(opts.configOpt).map(_.split("\\s*=\\s*"))
require(configs.forall(_.length == 2), "Invalid topic config: all configs must be in the format \"key=val\".")
val props = new Properties
configs.foreach(pair => props.setProperty(pair(0), pair(1)))
props
}
def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = {
val partitionList = replicaAssignmentList.split(",")
val ret = new mutable.HashMap[Int, List[Int]]()
for (i <- 0 until partitionList.size) {
val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
ret.put(i, brokerList.toList)
if (ret(i).size != ret(0).size)
throw new AdminOperationException("Partition " + i + " has different replication factor: " + brokerList)
}
ret.toMap
}
class TopicCommandOptions(args: Array[String]) {
val parser = new OptionParser
val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
"Multiple URLS can be given to allow fail-over.")
.withRequiredArg
.describedAs("urls")
.ofType(classOf[String])
val listOpt = parser.accepts("list", "List all available topics.")
val createOpt = parser.accepts("create", "Create a new topic.")
val alterOpt = parser.accepts("alter", "Alter the configuration for the topic.")
val deleteOpt = parser.accepts("delete", "Delete the topic.")
val describeOpt = parser.accepts("describe", "List details for the given topics.")
val helpOpt = parser.accepts("help", "Print usage information.")
val topicOpt = parser.accepts("topic", "The topic to be create, alter, delete, or describe.")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
val configOpt = parser.accepts("config", "A topic configuration for the topic being created or altered.")
.withRequiredArg
.describedAs("name=value")
.ofType(classOf[String])
val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created.")
.withRequiredArg
.describedAs("# of partitions")
.ofType(classOf[java.lang.Integer])
val replicationFactorOpt = parser.accepts("replication-factor", "The replication factor for each partition in the topic being created.")
.withRequiredArg
.describedAs("replication factor")
.ofType(classOf[java.lang.Integer])
val replicaAssignmentOpt = parser.accepts("replica-assignment", "A list of manual partition-to-broker assignments for the topic being created.")
.withRequiredArg
.describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , " +
"broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...")
.ofType(classOf[String])
val options = parser.parse(args : _*)
}
}

View File

@ -17,9 +17,11 @@
package kafka.cluster
import scala.collection._
import kafka.admin.AdminUtils
import kafka.utils._
import java.lang.Object
import kafka.api.LeaderAndIsr
import kafka.log.LogConfig
import kafka.server.ReplicaManager
import com.yammer.metrics.core.Gauge
import kafka.metrics.KafkaMetricsGroup
@ -74,7 +76,8 @@ class Partition(val topic: String,
case Some(replica) => replica
case None =>
if (isReplicaLocal(replicaId)) {
val log = logManager.getOrCreateLog(topic, partitionId)
val config = LogConfig.fromProps(logManager.defaultConfig.toProps, AdminUtils.fetchTopicConfig(zkClient, topic))
val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent)
val offset = checkpoint.read.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset)
val localReplica = new Replica(replicaId, this, time, offset, Some(log))

View File

@ -49,7 +49,7 @@ import com.yammer.metrics.core.Gauge
*/
@threadsafe
class Log(val dir: File,
val config: LogConfig,
@volatile var config: LogConfig,
val needsRecovery: Boolean,
val scheduler: Scheduler,
time: Time = SystemTime) extends Logging with KafkaMetricsGroup {

View File

@ -17,7 +17,7 @@
package kafka.log
import java.io.File
import java.util.Properties
import scala.collection._
import kafka.common._
@ -46,6 +46,99 @@ case class LogConfig(val segmentSize: Int = 1024*1024,
val indexInterval: Int = 4096,
val fileDeleteDelayMs: Long = 60*1000,
val minCleanableRatio: Double = 0.5,
val dedupe: Boolean = false)
val dedupe: Boolean = false) {
def toProps: Properties = {
val props = new Properties()
import LogConfig._
props.put(SegmentBytesProp, segmentSize.toString)
props.put(SegmentMsProp, segmentMs.toString)
props.put(SegmentIndexBytesProp, maxIndexSize.toString)
props.put(FlushMessagesProp, flushInterval.toString)
props.put(FlushMsProp, flushMs.toString)
props.put(RetentionBytesProp, retentionSize.toString)
props.put(RententionMsProp, retentionMs.toString)
props.put(MaxMessageBytesProp, maxMessageSize.toString)
props.put(IndexIntervalBytesProp, indexInterval.toString)
props.put(FileDeleteDelayMsProp, fileDeleteDelayMs.toString)
props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString)
props.put(CleanupPolicyProp, if(dedupe) "dedupe" else "delete")
props
}
}
object LogConfig {
val SegmentBytesProp = "segment.bytes"
val SegmentMsProp = "segment.ms"
val SegmentIndexBytesProp = "segment.index.bytes"
val FlushMessagesProp = "flush.messages"
val FlushMsProp = "flush.ms"
val RetentionBytesProp = "retention.bytes"
val RententionMsProp = "retention.ms"
val MaxMessageBytesProp = "max.message.bytes"
val IndexIntervalBytesProp = "index.interval.bytes"
val FileDeleteDelayMsProp = "file.delete.delay.ms"
val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio"
val CleanupPolicyProp = "cleanup.policy"
val ConfigNames = Set(SegmentBytesProp,
SegmentMsProp,
SegmentIndexBytesProp,
FlushMessagesProp,
FlushMsProp,
RetentionBytesProp,
RententionMsProp,
MaxMessageBytesProp,
IndexIntervalBytesProp,
FileDeleteDelayMsProp,
MinCleanableDirtyRatioProp,
CleanupPolicyProp)
/**
* Parse the given properties instance into a LogConfig object
*/
def fromProps(props: Properties): LogConfig = {
new LogConfig(segmentSize = props.getProperty(SegmentBytesProp).toInt,
segmentMs = props.getProperty(SegmentMsProp).toLong,
maxIndexSize = props.getProperty(SegmentIndexBytesProp).toInt,
flushInterval = props.getProperty(FlushMessagesProp).toLong,
flushMs = props.getProperty(FlushMsProp).toLong,
retentionSize = props.getProperty(RetentionBytesProp).toLong,
retentionMs = props.getProperty(RententionMsProp).toLong,
maxMessageSize = props.getProperty(MaxMessageBytesProp).toInt,
indexInterval = props.getProperty(IndexIntervalBytesProp).toInt,
fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp).toInt,
minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp).toDouble,
dedupe = props.getProperty(CleanupPolicyProp).trim.toLowerCase == "dedupe")
}
/**
* Create a log config instance using the given properties and defaults
*/
def fromProps(defaults: Properties, overrides: Properties): LogConfig = {
val props = new Properties(defaults)
props.putAll(overrides)
fromProps(props)
}
/**
* Check that property names are valid
*/
private def validateNames(props: Properties) {
for(name <- JavaConversions.asMap(props).keys)
require(LogConfig.ConfigNames.contains(name), "Unknown configuration \"%s\".".format(name))
}
/**
* Check that the given properties contain only valid log config names, and that all values can be parsed.
*/
def validate(props: Properties) {
validateNames(props)
LogConfig.fromProps(LogConfig().toProps, props) // check that we can parse the values
}
}

View File

@ -174,31 +174,19 @@ class LogManager(val logDirs: Array[File],
/**
* Get the log if it exists, otherwise return None
*/
def getLog(topic: String, partition: Int): Option[Log] = {
val topicAndPartiton = TopicAndPartition(topic, partition)
val log = logs.get(topicAndPartiton)
def getLog(topicAndPartition: TopicAndPartition): Option[Log] = {
val log = logs.get(topicAndPartition)
if (log == null)
None
else
Some(log)
}
/**
* Create the log if it does not exist, otherwise just return it
*/
def getOrCreateLog(topic: String, partition: Int): Log = {
val topicAndPartition = TopicAndPartition(topic, partition)
logs.get(topicAndPartition) match {
case null => createLogIfNotExists(topicAndPartition)
case log: Log => log
}
}
/**
* Create a log for the given topic and the given partition
* If the log already exists, just return a copy of the existing log
*/
private def createLogIfNotExists(topicAndPartition: TopicAndPartition): Log = {
def createLog(topicAndPartition: TopicAndPartition, config: LogConfig): Log = {
logCreationLock synchronized {
var log = logs.get(topicAndPartition)
@ -211,12 +199,16 @@ class LogManager(val logDirs: Array[File],
val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition)
dir.mkdirs()
log = new Log(dir,
defaultConfig,
config,
needsRecovery = false,
scheduler,
time)
info("Created log for topic %s partition %d in %s.".format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath))
logs.put(topicAndPartition, log)
info("Created log for topic %s partition %d in %s with properties {%s}."
.format(topicAndPartition.topic,
topicAndPartition.partition,
dataDir.getAbsolutePath,
JavaConversions.asMap(config.toProps).mkString(", ")))
log
}
}
@ -290,6 +282,11 @@ class LogManager(val logDirs: Array[File],
*/
def allLogs(): Iterable[Log] = logs.values
/**
* Get a map of TopicAndPartition => Log
*/
def logsByTopicPartition = logs.toMap
/**
* Flush any log which has exceeded its flush interval and has unwritten messages.
*/

View File

@ -281,7 +281,7 @@ private[kafka] class Processor(val id: Int,
debug("Ignoring response for closed socket.")
close(key)
}
}finally {
} finally {
curr = requestChannel.receiveResponse(id)
}
}

View File

@ -17,7 +17,7 @@
package kafka.server
import kafka.admin.{CreateTopicCommand, AdminUtils}
import kafka.admin.AdminUtils
import kafka.api._
import kafka.message._
import kafka.network._
@ -25,6 +25,7 @@ import kafka.log._
import kafka.utils.ZKGroupTopicDirs
import org.apache.log4j.Logger
import scala.collection._
import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic._
import kafka.metrics.KafkaMetricsGroup
@ -367,7 +368,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
logManager.getLog(topicAndPartition.topic, topicAndPartition.partition) match {
logManager.getLog(topicAndPartition) match {
case Some(log) =>
fetchOffsetsBefore(log, timestamp, maxNumOffsets)
case None =>
@ -442,7 +443,7 @@ class KafkaApis(val requestChannel: RequestChannel,
/* check if auto creation of topics is turned on */
if (config.autoCreateTopicsEnable) {
try {
CreateTopicCommand.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
AdminUtils.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
.format(topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor))
} catch {
@ -478,21 +479,22 @@ class KafkaApis(val requestChannel: RequestChannel,
if(requestLogger.isTraceEnabled)
requestLogger.trace("Handling offset commit request " + offsetCommitRequest.toString)
trace("Handling offset commit request " + offsetCommitRequest.toString)
val responseInfo = offsetCommitRequest.requestInfo.map( t => {
val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, t._1.topic)
val responseInfo = offsetCommitRequest.requestInfo.map{
case (topicAndPartition, metaAndError) => {
val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic)
try {
if(t._2.metadata != null && t._2.metadata.length > config.offsetMetadataMaxSize) {
(t._1, ErrorMapping.OffsetMetadataTooLargeCode)
if(metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) {
(topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
} else {
ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" +
t._1.partition, t._2.offset.toString)
(t._1, ErrorMapping.NoError)
topicAndPartition.partition, metaAndError.offset.toString)
(topicAndPartition, ErrorMapping.NoError)
}
} catch {
case e =>
(t._1, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
case e => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
}
}
}
})
val response = new OffsetCommitResponse(responseInfo,
offsetCommitRequest.correlationId,
offsetCommitRequest.clientId)
@ -506,7 +508,6 @@ class KafkaApis(val requestChannel: RequestChannel,
val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
if(requestLogger.isTraceEnabled)
requestLogger.trace("Handling offset fetch request " + offsetFetchRequest.toString)
trace("Handling offset fetch request " + offsetFetchRequest.toString)
val responseInfo = offsetFetchRequest.requestInfo.map( t => {
val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, t.topic)
try {

View File

@ -25,30 +25,36 @@ import java.net.InetAddress
/**
* Handles registering broker with zookeeper in the following path:
* This class registers the broker in zookeeper to allow
* other brokers and consumers to detect failures. It uses an ephemeral znode with the path:
* /brokers/[0...N] --> host:port
*
* Right now our definition of health is fairly naive. If we register in zk we are healthy, otherwise
* we are dead.
*/
class KafkaZooKeeper(config: KafkaConfig) extends Logging {
class KafkaHealthcheck(private val brokerId: Int,
private val host: String,
private val port: Int,
private val zkClient: ZkClient) extends Logging {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
private var zkClient: ZkClient = null
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
def startup() {
/* start client */
info("connecting to ZK: " + config.zkConnect)
zkClient = KafkaZookeeperClient.getZookeeperClient(config)
zkClient.subscribeStateChanges(new SessionExpireListener)
registerBrokerInZk()
register()
}
private def registerBrokerInZk() {
/**
* Register this broker as "alive" in zookeeper
*/
def register() {
val hostName =
if(config.hostName == null || config.hostName.trim.isEmpty)
if(host == null || host.trim.isEmpty)
InetAddress.getLocalHost.getCanonicalHostName
else
config.hostName
host
val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port, jmxPort)
ZkUtils.registerBrokerInZk(zkClient, brokerId, hostName, port, jmxPort)
}
/**
@ -70,21 +76,11 @@ class KafkaZooKeeper(config: KafkaConfig) extends Logging {
*/
@throws(classOf[Exception])
def handleNewSession() {
info("re-registering broker info in ZK for broker " + config.brokerId)
registerBrokerInZk()
info("re-registering broker info in ZK for broker " + brokerId)
register()
info("done re-registering broker")
info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
}
}
def shutdown() {
if (zkClient != null) {
info("Closing zookeeper client...")
zkClient.close()
}
}
def getZookeeperClient = {
zkClient
}
}

View File

@ -67,7 +67,7 @@ class KafkaRequestHandlerPool(val brokerId: Int,
handler.shutdown
for(thread <- threads)
thread.join
info("shutted down completely")
info("shut down completely")
}
}

View File

@ -18,6 +18,7 @@
package kafka.server
import kafka.network.SocketServer
import kafka.admin._
import kafka.log.LogConfig
import kafka.log.CleanerConfig
import kafka.log.LogManager
@ -39,7 +40,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
var socketServer: SocketServer = null
var requestHandlerPool: KafkaRequestHandlerPool = null
var logManager: LogManager = null
var kafkaZookeeper: KafkaZooKeeper = null
var kafkaHealthcheck: KafkaHealthcheck = null
var topicConfigManager: TopicConfigManager = null
var replicaManager: ReplicaManager = null
var apis: KafkaApis = null
var kafkaController: KafkaController = null
@ -58,8 +60,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
/* start scheduler */
kafkaScheduler.startup()
/* setup zookeeper */
zkClient = initZk()
/* start log manager */
logManager = createLogManager(config)
logManager = createLogManager(zkClient)
logManager.startup()
socketServer = new SocketServer(config.brokerId,
@ -68,32 +73,41 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
config.numNetworkThreads,
config.queuedMaxRequests,
config.socketRequestMaxBytes)
socketServer.startup()
socketServer.startup
replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager)
kafkaController = new KafkaController(config, zkClient)
/* start client */
kafkaZookeeper = new KafkaZooKeeper(config)
// starting relevant replicas and leader election for partitions assigned to this broker
kafkaZookeeper.startup
info("Connecting to ZK: " + config.zkConnect)
replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, logManager)
kafkaController = new KafkaController(config, kafkaZookeeper.getZookeeperClient)
apis = new KafkaApis(socketServer.requestChannel, replicaManager, kafkaZookeeper.getZookeeperClient, config.brokerId, config)
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
Mx4jLoader.maybeLoad
// start the replica manager
Mx4jLoader.maybeLoad()
replicaManager.startup()
// start the controller
kafkaController.startup()
// register metrics beans
topicConfigManager = new TopicConfigManager(zkClient, logManager)
topicConfigManager.startup()
/* tell everyone we are alive */
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.hostName, config.port, zkClient)
kafkaHealthcheck.startup()
registerStats()
info("started")
}
private def initZk(): ZkClient = {
info("Connecting to zookeeper on " + config.zkConnect)
val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
ZkUtils.setupCommonPaths(zkClient)
zkClient
}
/**
* Forces some dynamic jmx beans to be registered on server startup.
*/
@ -118,15 +132,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
Utils.swallow(kafkaScheduler.shutdown())
if(apis != null)
Utils.swallow(apis.close())
if(kafkaZookeeper != null)
Utils.swallow(kafkaZookeeper.shutdown())
if(replicaManager != null)
Utils.swallow(replicaManager.shutdown())
if(logManager != null)
Utils.swallow(logManager.shutdown())
if(kafkaController != null)
Utils.swallow(kafkaController.shutdown())
if(zkClient != null)
Utils.swallow(zkClient.close())
shutdownLatch.countDown()
info("shut down completed")
@ -140,13 +153,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
def getLogManager(): LogManager = logManager
private def createLogManager(config: KafkaConfig): LogManager = {
val topics = config.logCleanupPolicyMap.keys ++
config.logSegmentBytesPerTopicMap.keys ++
config.logFlushIntervalMsPerTopicMap.keys ++
config.logRollHoursPerTopicMap.keys ++
config.logRetentionBytesPerTopicMap.keys ++
config.logRetentionHoursPerTopicMap.keys
private def createLogManager(zkClient: ZkClient): LogManager = {
val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes,
segmentMs = 60 * 60 * 1000 * config.logRollHours,
flushInterval = config.logFlushIntervalMessages,
@ -159,13 +166,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
fileDeleteDelayMs = config.logDeleteDelayMs,
minCleanableRatio = config.logCleanerMinCleanRatio,
dedupe = config.logCleanupPolicy.trim.toLowerCase == "dedupe")
val logConfigs = for(topic <- topics) yield
topic -> defaultLogConfig.copy(segmentSize = config.logSegmentBytesPerTopicMap.getOrElse(topic, config.logSegmentBytes),
segmentMs = 60 * 60 * 1000 * config.logRollHoursPerTopicMap.getOrElse(topic, config.logRollHours),
flushMs = config.logFlushIntervalMsPerTopicMap.getOrElse(topic, config.logFlushIntervalMs).toLong,
retentionSize = config.logRetentionBytesPerTopicMap.getOrElse(topic, config.logRetentionBytes),
retentionMs = 60 * 60 * 1000 * config.logRetentionHoursPerTopicMap.getOrElse(topic, config.logRetentionHours),
dedupe = config.logCleanupPolicyMap.getOrElse(topic, config.logCleanupPolicy).trim.toLowerCase == "dedupe")
val defaultProps = defaultLogConfig.toProps
val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _))
// read the log configurations from zookeeper
val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
dedupeBufferSize = config.logCleanerDedupeBufferSize,
ioBufferSize = config.logCleanerIoBufferSize,
@ -174,7 +177,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
backOffMs = config.logCleanerBackoffMs,
enableCleaner = config.logCleanerEnable)
new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
topicConfigs = logConfigs.toMap,
topicConfigs = configs,
defaultConfig = defaultLogConfig,
cleanerConfig = cleanerConfig,
flushCheckMs = config.logFlushSchedulerIntervalMs,

View File

@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util.Properties
import scala.collection._
import kafka.log._
import kafka.utils._
import kafka.admin.AdminUtils
import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
/**
* This class initiates and carries out topic config changes.
*
* It works as follows.
*
* Config is stored under the path
* /brokers/topics/<topic_name>/config
* This znode stores the topic-overrides for this topic (but no defaults) in properties format.
*
* To avoid watching all topics for changes instead we have a notification path
* /brokers/config_changes
* The TopicConfigManager has a child watch on this path.
*
* To update a topic config we first update the topic config properties. Then we create a new sequential
* znode under the change path which contains the name of the topic that was updated, say
* /brokers/config_changes/config_change_13321
*
* This will fire a watcher on all brokers. This watcher works as follows. It reads all the config change notifications.
* It keeps track of the highest config change suffix number it has applied previously. For any previously applied change it finds
* it checks if this notification is larger than a static expiration time (say 10mins) and if so it deletes this notification.
* For any new changes it reads the new configuration, combines it with the defaults, and updates the log config
* for all logs for that topic (if any) that it has.
*
* Note that config is always read from the config path in zk, the notification is just a trigger to do so. So if a broker is
* down and misses a change that is fine--when it restarts it will be loading the full config anyway. Note also that
* if there are two consecutive config changes it is possible that only the last one will be applied (since by the time the
* broker reads the config the both changes may have been made). In this case the broker would needlessly refresh the config twice,
* but that is harmless.
*
* On restart the config manager re-processes all notifications. This will usually be wasted work, but avoids any race conditions
* on startup where a change might be missed between the initial config load and registering for change notifications.
*
*/
class TopicConfigManager(private val zkClient: ZkClient,
private val logManager: LogManager,
private val changeExpirationMs: Long = 10*60*1000,
private val time: Time = SystemTime) extends Logging {
private var lastExecutedChange = -1L
/**
* Begin watching for config changes
*/
def startup() {
ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.TopicConfigChangesPath)
zkClient.subscribeChildChanges(ZkUtils.TopicConfigChangesPath, ConfigChangeListener)
processAllConfigChanges()
}
/**
* Process all config changes
*/
private def processAllConfigChanges() {
val configChanges = zkClient.getChildren(ZkUtils.TopicConfigChangesPath)
processConfigChanges(JavaConversions.asBuffer(configChanges).sorted)
}
/**
* Process the given list of config changes
*/
private def processConfigChanges(notifications: Seq[String]) {
if (notifications.size > 0) {
info("Processing %d topic config change notification(s)...".format(notifications.size))
val now = time.milliseconds
val logs = logManager.logsByTopicPartition.toBuffer
val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2))
val lastChangeId = notifications.map(changeNumber).max
for (notification <- notifications) {
val changeId = changeNumber(notification)
if (changeId > lastExecutedChange) {
val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification
val (topicJson, stat) = ZkUtils.readData(zkClient, changeZnode)
val topic = topicJson.substring(1, topicJson.length - 1) // dequote
if (logsByTopic.contains(topic)) {
/* combine the default properties with the overrides in zk to create the new LogConfig */
val props = new Properties(logManager.defaultConfig.toProps)
props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic))
val logConfig = LogConfig.fromProps(props)
for (log <- logsByTopic(topic))
log.config = logConfig
lastExecutedChange = changeId
info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props))
} else if (now - stat.getCtime > changeExpirationMs) {
/* this change is now obsolete, try to delete it unless it is the last change left */
ZkUtils.deletePath(zkClient, changeZnode)
}
}
}
}
}
/* get the change number from a change notification znode */
private def changeNumber(name: String): Long = name.substring(AdminUtils.TopicConfigChangeZnodePrefix.length).toLong
/**
* A listener that applies config changes to logs
*/
object ConfigChangeListener extends IZkChildListener {
override def handleChildChange(path: String, chillins: java.util.List[String]) {
try {
processConfigChanges(JavaConversions.asBuffer(chillins))
} catch {
case e: Exception => error("Error processing config change:", e)
}
}
}
}

View File

@ -10,7 +10,7 @@ object CommandLineUtils extends Logging {
def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
for(arg <- required) {
if(!options.has(arg)) {
error("Missing required argument \"" + arg + "\"")
System.err.println("Missing required argument \"" + arg + "\"")
parser.printHelpOn(System.err)
System.exit(1)
}

View File

@ -1,6 +1,7 @@
package kafka.utils
import kafka.common._
import scala.collection._
import util.parsing.json.JSON
/**
@ -11,6 +12,9 @@ object Json extends Logging {
JSON.globalNumberParser = myConversionFunc
val lock = new Object
/**
* Parse a JSON string into an object
*/
def parseFull(input: String): Option[Any] = {
lock synchronized {
try {
@ -21,4 +25,31 @@ object Json extends Logging {
}
}
}
/**
* Encode an object into a JSON string. This method accepts any type T where
* T => null | Boolean | String | Number | Map[String, T] | Array[T] | Iterable[T]
* Any other type will result in an exception.
*
* This method does not properly handle non-ascii characters.
*/
def encode(obj: Any): String = {
obj match {
case null => "null"
case b: Boolean => b.toString
case s: String => "\"" + s + "\""
case n: Number => n.toString
case m: Map[_, _] =>
"{" +
m.map(elem =>
elem match {
case t: Tuple2[_,_] => encode(t._1) + ":" + encode(t._2)
case _ => throw new IllegalArgumentException("Invalid map element (" + elem + ") in " + obj)
}).mkString(",") +
"}"
case a: Array[_] => encode(a.toSeq)
case i: Iterable[_] => "[" + i.map(encode).mkString(",") + "]"
case other: AnyRef => throw new IllegalArgumentException("Unknown arguement of type " + other.getClass + ": " + other)
}
}
}

View File

@ -576,6 +576,25 @@ object Utils extends Logging {
f
}
/**
* Turn a properties map into a string
*/
def asString(props: Properties): String = {
val writer = new StringWriter()
props.store(writer, "")
writer.toString
}
/**
* Read some properties with the given default values
*/
def readProps(s: String, defaults: Properties): Properties = {
val reader = new StringReader(s)
val props = new Properties(defaults)
props.load(reader)
props
}
/**
* Read a big-endian integer from a byte array
*/

View File

@ -38,6 +38,8 @@ object ZkUtils extends Logging {
val ConsumersPath = "/consumers"
val BrokerIdsPath = "/brokers/ids"
val BrokerTopicsPath = "/brokers/topics"
val TopicConfigPath = "/config/topics"
val TopicConfigChangesPath = "/config/changes"
val ControllerPath = "/controller"
val ControllerEpochPath = "/controller_epoch"
val ReassignPartitionsPath = "/admin/reassign_partitions"
@ -51,6 +53,9 @@ object ZkUtils extends Logging {
getTopicPath(topic) + "/partitions"
}
def getTopicConfigPath(topic: String): String =
TopicConfigPath + "/" + topic
def getController(zkClient: ZkClient): Int= {
readDataMaybeNull(zkClient, ControllerPath)._1 match {
case Some(controller) => controller.toInt
@ -58,17 +63,14 @@ object ZkUtils extends Logging {
}
}
def getTopicPartitionPath(topic: String, partitionId: Int): String ={
def getTopicPartitionPath(topic: String, partitionId: Int): String =
getTopicPartitionsPath(topic) + "/" + partitionId
}
def getTopicPartitionLeaderAndIsrPath(topic: String, partitionId: Int): String ={
def getTopicPartitionLeaderAndIsrPath(topic: String, partitionId: Int): String =
getTopicPartitionPath(topic, partitionId) + "/" + "state"
}
def getSortedBrokerList(zkClient: ZkClient): Seq[Int] ={
def getSortedBrokerList(zkClient: ZkClient): Seq[Int] =
ZkUtils.getChildren(zkClient, BrokerIdsPath).map(_.toInt).sorted
}
def getAllBrokersInCluster(zkClient: ZkClient): Seq[Broker] = {
val brokerIds = ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
@ -90,6 +92,11 @@ object ZkUtils extends Logging {
getLeaderIsrAndEpochForPartition(zkClient, topic, partition).map(_.leaderAndIsr)
}
def setupCommonPaths(zkClient: ZkClient) {
for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath))
makeSurePersistentPathExists(zkClient, path)
}
def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat)
: Option[LeaderIsrAndControllerEpoch] = {
Json.parseFull(leaderAndIsrStr) match {
@ -312,10 +319,8 @@ object ZkUtils extends Logging {
case e: ZkNodeExistsException =>
stat = client.writeData(path, data)
return stat.getVersion
case e2 => throw e2
}
}
case e2 => throw e2
}
}
@ -596,7 +601,7 @@ object ZkUtils extends Logging {
case nne: ZkNoNodeException =>
ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData))
case e2 => throw new AdministrationException(e2.toString)
case e2 => throw new AdminOperationException(e2.toString)
}
}
}

View File

@ -19,6 +19,9 @@ package kafka.admin
import junit.framework.Assert._
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
import java.util.Properties
import kafka.utils._
import kafka.log._
import kafka.zk.ZooKeeperTestHarness
import kafka.server.KafkaConfig
import kafka.utils.{Logging, ZkUtils, TestUtils}
@ -32,27 +35,16 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val brokerList = List(0, 1, 2, 3, 4)
// test 0 replication factor
try {
intercept[AdminOperationException] {
AdminUtils.assignReplicasToBrokers(brokerList, 10, 0)
fail("shouldn't allow replication factor 0")
}
catch {
case e: AdministrationException => // this is good
case e2 => throw e2
}
// test wrong replication factor
try {
intercept[AdminOperationException] {
AdminUtils.assignReplicasToBrokers(brokerList, 10, 6)
fail("shouldn't allow replication factor larger than # of brokers")
}
catch {
case e: AdministrationException => // this is good
case e2 => throw e2
}
// correct assignment
{
val expectedAssignment = Map(
0 -> List(0, 1, 2),
1 -> List(1, 2, 3),
@ -63,65 +55,34 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
6 -> List(1, 3, 4),
7 -> List(2, 4, 0),
8 -> List(3, 0, 1),
9 -> List(4, 1, 2)
)
9 -> List(4, 1, 2))
val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0)
val e = (expectedAssignment.toList == actualAssignment.toList)
assertTrue(expectedAssignment.toList == actualAssignment.toList)
}
}
@Test
def testManualReplicaAssignment() {
val brokerList = Set(0, 1, 2, 3, 4)
val brokers = List(0, 1, 2, 3, 4)
TestUtils.createBrokersInZk(zkClient, brokers)
// duplicated brokers
try {
val replicationAssignmentStr = "0,0,1:1,2,3"
CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
fail("replication assginment shouldn't have duplicated brokers")
}
catch {
case e: AdministrationException => // this is good
case e2 => throw e2
}
// non-exist brokers
try {
val replicationAssignmentStr = "0,1,2:1,2,7"
CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
fail("replication assginment shouldn't contain non-exist brokers")
}
catch {
case e: AdministrationException => // this is good
case e2 => throw e2
// duplicate brokers
intercept[IllegalArgumentException] {
AdminUtils.createTopicWithAssignment(zkClient, "test", Map(0->Seq(0,0)))
}
// inconsistent replication factor
try {
val replicationAssignmentStr = "0,1,2:1,2"
CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
fail("all partitions should have the same replication factor")
}
catch {
case e: AdministrationException => // this is good
case e2 => throw e2
intercept[IllegalArgumentException] {
AdminUtils.createTopicWithAssignment(zkClient, "test", Map(0->Seq(0,1), 1->Seq(0)))
}
// good assignment
{
val replicationAssignmentStr = "0:1:2,1:2:3"
val expectedReplicationAssignment = Map(
0 -> List(0, 1, 2),
1 -> List(1, 2, 3)
)
val actualReplicationAssignment = CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
assertEquals(expectedReplicationAssignment.size, actualReplicationAssignment.size)
for( (part, replicas) <- expectedReplicationAssignment ) {
assertEquals(replicas, actualReplicationAssignment(part))
}
}
val assignment = Map(0 -> List(0, 1, 2),
1 -> List(1, 2, 3))
AdminUtils.createTopicWithAssignment(zkClient, "test", assignment)
val found = ZkUtils.getPartitionAssignmentForTopics(zkClient, Seq("test"))
assertEquals(assignment, found("test"))
}
@Test
@ -157,7 +118,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val topic = "test"
TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
// create the topic
AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
// create leaders for all partitions
TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
val actualReplicaAssignment = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata.map(p => p.replicas)
@ -166,12 +127,9 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
for(i <- 0 until actualReplicaList.size)
assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i))
try {
AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
fail("shouldn't be able to create a topic already exists")
} catch {
case e: TopicExistsException => // this is good
case e2 => throw e2
intercept[TopicExistsException] {
// shouldn't be able to create a topic that already exists
AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
}
}
@ -179,15 +137,13 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
def testGetTopicMetadata() {
val expectedReplicaAssignment = Map(
0 -> List(0, 1, 2),
1 -> List(1, 2, 3)
)
1 -> List(1, 2, 3))
val leaderForPartitionMap = Map(
0 -> 0,
1 -> 1
)
1 -> 1)
val topic = "auto-topic"
TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3))
AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
// create leaders for all partitions
TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
@ -215,7 +171,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
// create brokers
val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
// create the topic
AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
// reassign partition 0
val newReplicas = Seq(0, 2, 3)
val partitionToBeReassigned = 0
@ -240,7 +196,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
// create brokers
val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
// create the topic
AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
// reassign partition 0
val newReplicas = Seq(1, 2, 3)
val partitionToBeReassigned = 0
@ -266,7 +222,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
// create brokers
val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
// create the topic
AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
// reassign partition 0
val newReplicas = Seq(2, 3)
val partitionToBeReassigned = 0
@ -307,7 +263,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val expectedReplicaAssignment = Map(0 -> List(0, 1))
val topic = "test"
// create the topic
AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
// put the partition in the reassigned path as well
// reassign partition 0
val newReplicas = Seq(0, 1)
@ -346,7 +302,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
// create brokers
val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_))
// create the topic
AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
// broker 2 should be the leader since it was started first
val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, 1000, None).get
@ -367,7 +323,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
// create brokers
val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_))
// create the topic
AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
// broker 2 should be the leader since it was started first
@ -405,6 +361,50 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
}
}
/**
* This test creates a topic with a few config overrides and checks that the configs are applied to the new topic
* then changes the config and checks that the new values take effect.
*/
@Test
def testTopicConfigChange() {
val partitions = 3
val topic = "my-topic"
val server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0)))
def makeConfig(messageSize: Int, retentionMs: Long) = {
var props = new Properties()
props.setProperty(LogConfig.MaxMessageBytesProp, messageSize.toString)
props.setProperty(LogConfig.RententionMsProp, retentionMs.toString)
props
}
def checkConfig(messageSize: Int, retentionMs: Long) {
TestUtils.retry(10000) {
for(part <- 0 until partitions) {
val logOpt = server.logManager.getLog(TopicAndPartition(topic, part))
assertTrue(logOpt.isDefined)
assertEquals(retentionMs, logOpt.get.config.retentionMs)
assertEquals(messageSize, logOpt.get.config.maxMessageSize)
}
}
}
try {
// create a topic with a few config overrides and check that they are applied
val maxMessageSize = 1024
val retentionMs = 1000*1000
AdminUtils.createTopic(server.zkClient, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs))
checkConfig(maxMessageSize, retentionMs)
// now double the config values for the topic and check that it is applied
AdminUtils.changeTopicConfig(server.zkClient, topic, makeConfig(2*maxMessageSize, 2 * retentionMs))
checkConfig(2*maxMessageSize, 2 * retentionMs)
} finally {
server.shutdown()
server.config.logDirs.map(Utils.rm(_))
}
}
private def checkIfReassignPartitionPathExists(): Boolean = {
ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)
}

View File

@ -20,6 +20,7 @@ package kafka.consumer
import java.util.concurrent._
import java.util.concurrent.atomic._
import java.util.Properties
import scala.collection._
import junit.framework.Assert._
@ -27,7 +28,7 @@ import kafka.message._
import kafka.server._
import kafka.utils.TestUtils._
import kafka.utils._
import kafka.admin.CreateTopicCommand
import kafka.admin.AdminUtils
import org.junit.Test
import kafka.serializer._
import kafka.cluster.{Broker, Cluster}
@ -60,7 +61,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
override def setUp() {
super.setUp
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(configs.head.brokerId)), new Properties)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
}

View File

@ -26,7 +26,7 @@ import org.scalatest.junit.JUnit3Suite
import org.apache.log4j.{Level, Logger}
import kafka.message._
import kafka.serializer._
import kafka.admin.CreateTopicCommand
import kafka.admin.AdminUtils
import org.I0Itec.zkclient.ZkClient
import kafka.utils._
import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
@ -298,7 +298,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val zkClient = new ZkClient(zookeeperConnect, 6000, 30000, ZKStringSerializer)
// create topic topic1 with 1 partition on broker 0
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
AdminUtils.createTopic(zkClient, topic, 1, 1)
// send some messages to each broker
val sentMessages1 = sendMessages(configs.head, nMessages, "batch1", NoCompressionCodec, 1)

View File

@ -32,8 +32,7 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
val topic = "test_topic"
val group = "default_group"
val testConsumer = "consumer"
val BrokerPort = 9892
val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, BrokerPort)))
val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0)))
val NumMessages = 10
val LargeOffset = 10000
val SmallOffset = -1

View File

@ -30,7 +30,7 @@ import kafka.serializer._
import kafka.producer.{KeyedMessage, Producer}
import kafka.utils.TestUtils._
import kafka.utils.TestUtils
import kafka.admin.CreateTopicCommand
import kafka.admin.AdminUtils
class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
@ -55,7 +55,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
override def setUp() {
super.setUp
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(configs.head.brokerId)))
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient)
fetcher.stopAllConnections()

View File

@ -27,7 +27,7 @@ import org.I0Itec.zkclient.ZkClient
import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
import scala.collection._
import kafka.admin.{AdminUtils, CreateTopicCommand}
import kafka.admin.AdminUtils
import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
import kafka.utils.{TestUtils, Utils}
@ -42,19 +42,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
val configs = List(config)
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
override def setUp() {
super.setUp
// temporarily set request handler logger to a higher level
requestHandlerLogger.setLevel(Level.FATAL)
}
override def tearDown() {
// restore set request handler logger to a higher level
requestHandlerLogger.setLevel(Level.ERROR)
super.tearDown
}
def testFetchRequestCanProperlySerialize() {
val request = new FetchRequestBuilder()
.clientId("test-client")
@ -299,7 +286,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
def testConsumerEmptyTopic() {
val newTopic = "new-topic"
CreateTopicCommand.createTopic(zkClient, newTopic, 1, 1, config.brokerId.toString)
AdminUtils.createTopic(zkClient, newTopic, 1, 1)
assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
AdminUtils.fetchTopicMetadataFromZk(newTopic, zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, newTopic, 0, 500)
@ -327,10 +314,10 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
}
// wait until the messages are published
TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test1", 0).get.logEndOffset == 2 }, 1000)
TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test2", 0).get.logEndOffset == 2 }, 1000)
TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test3", 0).get.logEndOffset == 2 }, 1000)
TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test4", 0).get.logEndOffset == 2 }, 1000)
TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test1", 0)).get.logEndOffset == 2 }, 1000)
TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test2", 0)).get.logEndOffset == 2 }, 1000)
TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test3", 0)).get.logEndOffset == 2 }, 1000)
TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test4", 0)).get.logEndOffset == 2 }, 1000)
val replicaId = servers.head.config.brokerId
val hwWaitMs = config.replicaHighWatermarkCheckpointIntervalMs
@ -354,7 +341,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
*/
def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Seq[String], brokerId: Int) {
for( topic <- topics ) {
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, brokerId.toString)
AdminUtils.createTopic(zkClient, topic, 1, 1)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
}
}

View File

@ -19,7 +19,7 @@ package kafka.integration
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import kafka.admin.CreateTopicCommand
import kafka.admin.AdminUtils
import java.nio.ByteBuffer
import junit.framework.Assert._
import org.easymock.EasyMock
@ -48,7 +48,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
def testTopicMetadataRequest {
// create topic
val topic = "test"
CreateTopicCommand.createTopic(zkClient, topic, 1)
AdminUtils.createTopic(zkClient, topic, 1, 1)
// create a topic metadata request
val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0)
@ -64,7 +64,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
def testBasicTopicMetadata {
// create topic
val topic = "test"
CreateTopicCommand.createTopic(zkClient, topic, 1)
AdminUtils.createTopic(zkClient, topic, 1, 1)
// set up leader for topic partition 0
val leaderForPartitionMap = Map(
0 -> configs.head.brokerId
@ -83,7 +83,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
def testGetAllTopicMetadata {
// create topic
val topic = "test"
CreateTopicCommand.createTopic(zkClient, topic, 1)
AdminUtils.createTopic(zkClient, topic, 1, 1)
// set up leader for topic partition 0
val leaderForPartitionMap = Map(
0 -> configs.head.brokerId

View File

@ -58,7 +58,7 @@ class LogManagerTest extends JUnit3Suite {
*/
@Test
def testCreateLog() {
val log = logManager.getOrCreateLog(name, 0)
val log = logManager.createLog(TopicAndPartition(name, 0), logConfig)
val logFile = new File(logDir, name + "-0")
assertTrue(logFile.exists)
log.append(TestUtils.singleMessageSet("test".getBytes()))
@ -69,7 +69,7 @@ class LogManagerTest extends JUnit3Suite {
*/
@Test
def testGetNonExistentLog() {
val log = logManager.getLog(name, 0)
val log = logManager.getLog(TopicAndPartition(name, 0))
assertEquals("No log should be found.", None, log)
val logFile = new File(logDir, name + "-0")
assertTrue(!logFile.exists)
@ -80,7 +80,7 @@ class LogManagerTest extends JUnit3Suite {
*/
@Test
def testCleanupExpiredSegments() {
val log = logManager.getOrCreateLog(name, 0)
val log = logManager.createLog(TopicAndPartition(name, 0), logConfig)
var offset = 0L
for(i <- 0 until 200) {
var set = TestUtils.singleMessageSet("test".getBytes())
@ -120,7 +120,7 @@ class LogManagerTest extends JUnit3Suite {
logManager.startup
// create a log
val log = logManager.getOrCreateLog(name, 0)
val log = logManager.createLog(TopicAndPartition(name, 0), config)
var offset = 0L
// add a bunch of messages that should be larger than the retentionSize
@ -158,7 +158,7 @@ class LogManagerTest extends JUnit3Suite {
val config = logConfig.copy(flushMs = 1000)
logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 1000L, time.scheduler, time)
logManager.startup
val log = logManager.getOrCreateLog(name, 0)
val log = logManager.createLog(TopicAndPartition(name, 0), config)
val lastFlush = log.lastFlushTime
for(i <- 0 until 200) {
var set = TestUtils.singleMessageSet("test".getBytes())
@ -182,7 +182,7 @@ class LogManagerTest extends JUnit3Suite {
// verify that logs are always assigned to the least loaded partition
for(partition <- 0 until 20) {
logManager.getOrCreateLog("test", partition)
logManager.createLog(TopicAndPartition("test", partition), logConfig)
assertEquals("We should have created the right number of logs", partition + 1, logManager.allLogs.size)
val counts = logManager.allLogs.groupBy(_.dir.getParent).values.map(_.size)
assertTrue("Load should balance evenly", counts.max <= counts.min + 1)

View File

@ -27,7 +27,7 @@ import org.junit.Assert._
import org.junit.Test
import kafka.utils._
import java.util
import kafka.admin.{AdminUtils, CreateTopicCommand}
import kafka.admin.AdminUtils
import util.Properties
import kafka.api.FetchRequestBuilder
import kafka.common.{KafkaException, ErrorMapping, FailedToSendMessageException}
@ -77,17 +77,15 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
// restore set request handler logger to a higher level
requestHandlerLogger.setLevel(Level.ERROR)
server1.shutdown
server1.awaitShutdown()
server2.shutdown
server2.awaitShutdown()
Utils.rm(server1.config.logDirs)
Utils.rm(server2.config.logDirs)
super.tearDown()
}
@Test
def testUpdateBrokerPartitionInfo() {
CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
AdminUtils.createTopic(zkClient, "new-topic", 1, 2)
assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
@ -152,7 +150,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
val producerConfig2 = new ProducerConfig(props2)
// create topic with 1 partition and await leadership
CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
AdminUtils.createTopic(zkClient, "new-topic", 1, 2)
assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
@ -203,7 +201,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
// create topic
CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0")
AdminUtils.createTopicWithAssignment(zkClient, "new-topic", Map(0->Seq(0), 1->Seq(0), 2->Seq(0), 3->Seq(0)))
assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
@ -213,13 +211,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
val config = new ProducerConfig(props)
val producer = new Producer[String, String](config)
try {
// Available partition ids should be 0, 1, 2 and 3, all lead and hosted only
// on broker 0
// Available partition ids should be 0, 1, 2 and 3, all lead and hosted only on broker 0
producer.send(new KeyedMessage[String, String]("new-topic", "test", "test1"))
} catch {
case e => fail("Unexpected exception: " + e)
}
// kill the broker
server1.shutdown
@ -264,7 +257,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
val producer = new Producer[String, String](config)
// create topics in ZK
CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
AdminUtils.createTopicWithAssignment(zkClient, "new-topic", Map(0->Seq(0,1)))
assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)

View File

@ -19,7 +19,7 @@ package kafka.producer
import java.net.SocketTimeoutException
import junit.framework.Assert
import kafka.admin.CreateTopicCommand
import kafka.admin.AdminUtils
import kafka.integration.KafkaServerTestHarness
import kafka.message._
import kafka.server.KafkaConfig
@ -92,7 +92,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
val producer = new SyncProducer(new SyncProducerConfig(props))
CreateTopicCommand.createTopic(zkClient, "test", 1, 1)
AdminUtils.createTopic(zkClient, "test", 1, 1)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0, 500)
val message1 = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))
@ -135,9 +135,9 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
}
// #2 - test that we get correct offsets when partition is owned by broker
CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1)
AdminUtils.createTopic(zkClient, "topic1", 1, 1)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "topic1", 0, 500)
CreateTopicCommand.createTopic(zkClient, "topic3", 1, 1)
AdminUtils.createTopic(zkClient, "topic3", 1, 1)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "topic3", 0, 500)
val response2 = producer.send(request)

View File

@ -62,7 +62,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
assertEquals(0L, fooPartition0Hw)
val partition0 = replicaManager.getOrCreatePartition(topic, 0, 1)
// create leader and follower replicas
val log0 = logManagers(0).getOrCreateLog(topic, 0)
val log0 = logManagers(0).createLog(TopicAndPartition(topic, 0), LogConfig())
val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, SystemTime, 0, Some(log0))
partition0.addReplicaIfNotExists(leaderReplicaPartition0)
val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, SystemTime)
@ -101,7 +101,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
assertEquals(0L, topic1Partition0Hw)
val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, 1)
// create leader log
val topic1Log0 = logManagers(0).getOrCreateLog(topic1, 0)
val topic1Log0 = logManagers(0).createLog(TopicAndPartition(topic1, 0), LogConfig())
// create a local replica for topic1
val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0, SystemTime, 0, Some(topic1Log0))
topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0)
@ -117,7 +117,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
// add another partition and set highwatermark
val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, 1)
// create leader log
val topic2Log0 = logManagers(0).getOrCreateLog(topic2, 0)
val topic2Log0 = logManagers(0).createLog(TopicAndPartition(topic2, 0), LogConfig())
// create a local replica for topic2
val leaderReplicaTopic2Partition0 = new Replica(configs.head.brokerId, topic2Partition0, SystemTime, 0, Some(topic2Log0))
topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0)

View File

@ -19,7 +19,7 @@ package kafka.server
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import kafka.admin.CreateTopicCommand
import kafka.admin.AdminUtils
import kafka.utils.TestUtils._
import junit.framework.Assert._
import kafka.utils.{ZkUtils, Utils, TestUtils}
@ -61,7 +61,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
val partitionId = 0
// create topic with 1 partition, 2 replicas, one on each broker
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(0, 1)))
// wait until leader is elected
val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
@ -108,7 +108,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
val partitionId = 0
// create topic with 1 partition, 2 replicas, one on each broker
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(0, 1)))
// wait until leader is elected
val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)

View File

@ -26,7 +26,7 @@ import org.junit.{After, Before, Test}
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
import kafka.admin.CreateTopicCommand
import kafka.admin.AdminUtils
import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
import kafka.utils.TestUtils._
import kafka.common.{ErrorMapping, TopicAndPartition}
@ -82,10 +82,10 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
val part = Integer.valueOf(topicPartition.split("-").last).intValue
// setup brokers in zookeeper as owners of partitions for this test
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
AdminUtils.createTopic(zkClient, topic, 1, 1)
val logManager = server.getLogManager
val log = logManager.getOrCreateLog(topic, part)
val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
val message = new Message(Integer.toString(42).getBytes())
for(i <- 0 until 20)
@ -120,7 +120,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
val topic = topicPartition.split("-").head
// setup brokers in zookeeper as owners of partitions for this test
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
AdminUtils.createTopic(zkClient, topic, 1, 1)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
var offsetChanged = false
@ -145,10 +145,10 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
val part = Integer.valueOf(topicPartition.split("-").last).intValue
// setup brokers in zookeeper as owners of partitions for this test
CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
AdminUtils.createTopic(zkClient, topic, 3, 1)
val logManager = server.getLogManager
val log = logManager.getOrCreateLog(topic, part)
val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
val message = new Message(Integer.toString(42).getBytes())
for(i <- 0 until 20)
log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
@ -174,10 +174,10 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
val part = Integer.valueOf(topicPartition.split("-").last).intValue
// setup brokers in zookeeper as owners of partitions for this test
CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
AdminUtils.createTopic(zkClient, topic, 3, 1)
val logManager = server.getLogManager
val log = logManager.getOrCreateLog(topic, part)
val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
val message = new Message(Integer.toString(42).getBytes())
for(i <- 0 until 20)
log.append(new ByteBufferMessageSet(NoCompressionCodec, message))

View File

@ -19,7 +19,7 @@ package kafka.server
import org.scalatest.junit.JUnit3Suite
import org.junit.Assert._
import java.io.File
import kafka.admin.CreateTopicCommand
import kafka.admin.AdminUtils
import kafka.utils.TestUtils._
import kafka.utils.IntEncoder
import kafka.utils.{Utils, TestUtils}
@ -54,6 +54,13 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
producerProps.put("key.serializer.class", classOf[IntEncoder].getName.toString)
producerProps.put("request.required.acks", "-1")
override def tearDown() {
super.tearDown()
for(server <- servers) {
server.shutdown()
Utils.rm(server.config.logDirs(0))
}
}
def testHWCheckpointNoFailuresSingleLogSegment {
// start both servers
@ -64,7 +71,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
producer = new Producer[Int, String](new ProducerConfig(producerProps))
// create topic with 1 partition, 2 replicas, one on each broker
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(0,1)))
// wait until leader is elected
var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
@ -86,7 +93,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
assertEquals(numMessages, leaderHW)
val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)
assertEquals(numMessages, followerHW)
servers.foreach(server => { server.shutdown(); Utils.rm(server.config.logDirs(0))})
}
def testHWCheckpointWithFailuresSingleLogSegment {
@ -98,7 +104,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
producer = new Producer[Int, String](new ProducerConfig(producerProps))
// create topic with 1 partition, 2 replicas, one on each broker
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(0,1)))
// wait until leader is elected
var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
@ -148,7 +154,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
producer.close()
assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
servers.foreach(server => Utils.rm(server.config.logDirs))
}
def testHWCheckpointNoFailuresMultipleLogSegments {
@ -163,7 +168,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
producer = new Producer[Int, String](new ProducerConfig(producerProps))
// create topic with 1 partition, 2 replicas, one on each broker
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(0,1)))
// wait until leader is elected
var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
@ -182,7 +187,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
assertEquals(hw, leaderHW)
val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)
assertEquals(hw, followerHW)
servers.foreach(server => Utils.rm(server.config.logDirs))
}
def testHWCheckpointWithFailuresMultipleLogSegments {
@ -197,7 +201,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
producer = new Producer[Int, String](new ProducerConfig(producerProps))
// create topic with 1 partition, 2 replicas, one on each broker
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(server1.config.brokerId, server2.config.brokerId)))
// wait until leader is elected
var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
@ -241,7 +245,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
producer.close()
assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
servers.foreach(server => Utils.rm(server.config.logDirs))
}
private def sendMessages(n: Int = 1) {

View File

@ -26,7 +26,6 @@ import org.junit.{After, Before, Test}
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
import kafka.admin.CreateTopicCommand
import kafka.api.{OffsetCommitRequest, OffsetFetchRequest}
import kafka.utils.TestUtils._
import kafka.common.{ErrorMapping, TopicAndPartition, OffsetMetadataAndError}

View File

@ -22,9 +22,10 @@ import kafka.zk.ZooKeeperTestHarness
import kafka.utils.TestUtils._
import kafka.producer.KeyedMessage
import kafka.serializer.StringEncoder
import kafka.admin.CreateTopicCommand
import kafka.admin.AdminUtils
import kafka.utils.TestUtils
import junit.framework.Assert._
import kafka.common._
class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = createBrokerConfigs(2)
@ -50,7 +51,7 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
// create a topic and partition and await leadership
for (topic <- List(topic1,topic2)) {
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":"))
AdminUtils.createTopic(zkClient, topic, 1, 2)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
}
@ -65,9 +66,10 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
def logsMatch(): Boolean = {
var result = true
for (topic <- List(topic1, topic2)) {
val expectedOffset = brokers.head.getLogManager().getLog(topic, partition).get.logEndOffset
val topicAndPart = TopicAndPartition(topic, partition)
val expectedOffset = brokers.head.getLogManager().getLog(topicAndPart).get.logEndOffset
result = result && expectedOffset > 0 && brokers.foldLeft(true) { (total, item) => total &&
(expectedOffset == item.getLogManager().getLog(topic, partition).get.logEndOffset) }
(expectedOffset == item.getLogManager().getLog(topicAndPart).get.logEndOffset) }
}
result
}

View File

@ -26,7 +26,7 @@ import kafka.zk.ZooKeeperTestHarness
import kafka.producer._
import kafka.utils.IntEncoder
import kafka.utils.TestUtils._
import kafka.admin.CreateTopicCommand
import kafka.admin.AdminUtils
import kafka.api.FetchRequestBuilder
import kafka.utils.{TestUtils, Utils}
@ -49,7 +49,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
var producer = new Producer[Int, String](new ProducerConfig(producerConfig))
// create topic
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
AdminUtils.createTopic(zkClient, topic, 1, 1)
// send some messages
producer.send(sent1.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*)

View File

@ -67,7 +67,7 @@ class SimpleFetchTest extends JUnit3Suite {
EasyMock.replay(log)
val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
EasyMock.expect(logManager.getLog(topic, partitionId)).andReturn(Some(log)).anyTimes()
EasyMock.expect(logManager.getLog(TopicAndPartition(topic, partitionId))).andReturn(Some(log)).anyTimes()
EasyMock.replay(logManager)
val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager])
@ -133,7 +133,7 @@ class SimpleFetchTest extends JUnit3Suite {
EasyMock.replay(log)
val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
EasyMock.expect(logManager.getLog(topic, 0)).andReturn(Some(log)).anyTimes()
EasyMock.expect(logManager.getLog(TopicAndPartition(topic, 0))).andReturn(Some(log)).anyTimes()
EasyMock.replay(logManager)
val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager])

View File

@ -0,0 +1,27 @@
package kafka.utils
import junit.framework.Assert._
import org.junit.{Test, After, Before}
class JsonTest {
@Test
def testJsonEncoding() {
assertEquals("null", Json.encode(null))
assertEquals("1", Json.encode(1))
assertEquals("1", Json.encode(1L))
assertEquals("1", Json.encode(1.toByte))
assertEquals("1", Json.encode(1.toShort))
assertEquals("1.0", Json.encode(1.0))
assertEquals("\"str\"", Json.encode("str"))
assertEquals("true", Json.encode(true))
assertEquals("false", Json.encode(false))
assertEquals("[]", Json.encode(Seq()))
assertEquals("[1,2,3]", Json.encode(Seq(1,2,3)))
assertEquals("[1,\"2\",[3]]", Json.encode(Seq(1,"2",Seq(3))))
assertEquals("{}", Json.encode(Map()))
assertEquals("{\"a\":1,\"b\":2}", Json.encode(Map("a" -> 1, "b" -> 2)))
assertEquals("{\"a\":[1,2],\"c\":[3,4]}", Json.encode(Map("a" -> Seq(1,2), "c" -> Seq(3,4))))
}
}

View File

@ -76,7 +76,9 @@ class SchedulerTest {
@Test
def testNonPeriodicTask() {
scheduler.schedule("test", counter1.getAndIncrement, delay = 0)
retry(30000, () => assertEquals(counter1.get, 1))
retry(30000) {
assertEquals(counter1.get, 1)
}
Thread.sleep(5)
assertEquals("Should only run once", 1, counter1.get)
}
@ -84,6 +86,8 @@ class SchedulerTest {
@Test
def testPeriodicTask() {
scheduler.schedule("test", counter1.getAndIncrement, delay = 0, period = 5)
retry(30000, () => assertTrue("Should count to 20", counter1.get >= 20))
retry(30000){
assertTrue("Should count to 20", counter1.get >= 20)
}
}
}

View File

@ -23,6 +23,7 @@ import java.nio._
import java.nio.channels._
import java.util.Random
import java.util.Properties
import junit.framework.AssertionFailedError
import junit.framework.Assert._
import kafka.server._
import kafka.producer._
@ -122,7 +123,7 @@ object TestUtils extends Logging {
/**
* Create a test config for the given node id
*/
def createBrokerConfig(nodeId: Int, port: Int): Properties = {
def createBrokerConfig(nodeId: Int, port: Int = choosePort()): Properties = {
val props = new Properties
props.put("broker.id", nodeId.toString)
props.put("host.name", "localhost")
@ -448,18 +449,20 @@ object TestUtils extends Logging {
* Execute the given block. If it throws an assert error, retry. Repeat
* until no error is thrown or the time limit ellapses
*/
def retry(maxWaitMs: Long, block: () => Unit) {
def retry(maxWaitMs: Long)(block: => Unit) {
var wait = 1L
val startTime = System.currentTimeMillis()
while(true) {
try {
block()
block
return
} catch {
case e: AssertionError =>
if(System.currentTimeMillis - startTime > maxWaitMs) {
case e: AssertionFailedError =>
val ellapsed = System.currentTimeMillis - startTime
if(ellapsed > maxWaitMs) {
throw e
} else {
info("Attempt failed, sleeping for " + wait + ", and then retrying.")
Thread.sleep(wait)
wait += math.min(wait, 1000)
}