From c1ed12e44ddebe41dc464683e3d7eeb4e6d39a45 Mon Sep 17 00:00:00 2001 From: Jay Kreps Date: Fri, 8 Mar 2013 15:07:39 -0800 Subject: [PATCH] KAFKA-554 Dynamic per-topic configuration. This patch adds a mechanism for storing per-topic configurations in zookeeper and dynamically making config changes across the cluster. Reviewed by Neha and Jun. --- bin/kafka-create-topic.sh | 19 -- bin/kafka-delete-topic.sh | 19 -- bin/{kafka-list-topic.sh => kafka-topics.sh} | 2 +- .../AdminOperationException.scala} | 24 +-- .../main/scala/kafka/admin/AdminUtils.scala | 137 ++++++++++--- .../kafka/admin/CreateTopicCommand.scala | 117 ----------- ...referredReplicaLeaderElectionCommand.scala | 6 +- .../main/scala/kafka/admin/TopicCommand.scala | 185 ++++++++++++++++++ .../main/scala/kafka/cluster/Partition.scala | 5 +- core/src/main/scala/kafka/log/Log.scala | 2 +- core/src/main/scala/kafka/log/LogConfig.scala | 97 ++++++++- .../src/main/scala/kafka/log/LogManager.scala | 31 ++- .../scala/kafka/network/SocketServer.scala | 2 +- .../main/scala/kafka/server/KafkaApis.scala | 43 ++-- ...ZooKeeper.scala => KafkaHealthcheck.scala} | 52 +++-- .../kafka/server/KafkaRequestHandler.scala | 2 +- .../main/scala/kafka/server/KafkaServer.scala | 77 ++++---- .../scala/kafka/server/ReplicaManager.scala | 2 +- .../kafka/server/TopicConfigManager.scala | 133 +++++++++++++ .../scala/kafka/utils/CommandLineUtils.scala | 2 +- core/src/main/scala/kafka/utils/Json.scala | 31 +++ core/src/main/scala/kafka/utils/Utils.scala | 19 ++ core/src/main/scala/kafka/utils/ZkUtils.scala | 25 ++- .../scala/unit/kafka/admin/AdminTest.scala | 158 +++++++-------- .../kafka/consumer/ConsumerIteratorTest.scala | 5 +- .../ZookeeperConsumerConnectorTest.scala | 4 +- .../integration/AutoOffsetResetTest.scala | 3 +- .../unit/kafka/integration/FetcherTest.scala | 4 +- .../kafka/integration/PrimitiveApiTest.scala | 27 +-- .../kafka/integration/TopicMetadataTest.scala | 8 +- .../scala/unit/kafka/log/LogManagerTest.scala | 12 +- .../unit/kafka/producer/ProducerTest.scala | 23 +-- .../kafka/producer/SyncProducerTest.scala | 8 +- .../server/HighwatermarkPersistenceTest.scala | 6 +- .../kafka/server/LeaderElectionTest.scala | 6 +- .../unit/kafka/server/LogOffsetTest.scala | 16 +- .../unit/kafka/server/LogRecoveryTest.scala | 23 ++- .../unit/kafka/server/OffsetCommitTest.scala | 1 - .../unit/kafka/server/ReplicaFetchTest.scala | 10 +- .../kafka/server/ServerShutdownTest.scala | 4 +- .../unit/kafka/server/SimpleFetchTest.scala | 4 +- .../scala/unit/kafka/utils/JsonTest.scala | 27 +++ .../unit/kafka/utils/SchedulerTest.scala | 8 +- .../scala/unit/kafka/utils/TestUtils.scala | 13 +- 44 files changed, 906 insertions(+), 496 deletions(-) delete mode 100755 bin/kafka-create-topic.sh delete mode 100755 bin/kafka-delete-topic.sh rename bin/{kafka-list-topic.sh => kafka-topics.sh} (94%) rename core/src/main/scala/kafka/{common/KafkaZookeperClient.scala => admin/AdminOperationException.scala} (50%) delete mode 100644 core/src/main/scala/kafka/admin/CreateTopicCommand.scala create mode 100644 core/src/main/scala/kafka/admin/TopicCommand.scala rename core/src/main/scala/kafka/server/{KafkaZooKeeper.scala => KafkaHealthcheck.scala} (68%) create mode 100644 core/src/main/scala/kafka/server/TopicConfigManager.scala create mode 100644 core/src/test/scala/unit/kafka/utils/JsonTest.scala diff --git a/bin/kafka-create-topic.sh b/bin/kafka-create-topic.sh deleted file mode 100755 index fccda7b6572..00000000000 --- a/bin/kafka-create-topic.sh +++ /dev/null @@ -1,19 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -base_dir=$(dirname $0) -export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties" -$base_dir/kafka-run-class.sh kafka.admin.CreateTopicCommand $@ diff --git a/bin/kafka-delete-topic.sh b/bin/kafka-delete-topic.sh deleted file mode 100755 index f266ae3bdb6..00000000000 --- a/bin/kafka-delete-topic.sh +++ /dev/null @@ -1,19 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -base_dir=$(dirname $0) -export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties" -$base_dir/kafka-run-class.sh kafka.admin.DeleteTopicCommand $@ diff --git a/bin/kafka-list-topic.sh b/bin/kafka-topics.sh similarity index 94% rename from bin/kafka-list-topic.sh rename to bin/kafka-topics.sh index 1235ad025df..b3195ee4527 100755 --- a/bin/kafka-list-topic.sh +++ b/bin/kafka-topics.sh @@ -16,4 +16,4 @@ base_dir=$(dirname $0) export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties" -$base_dir/kafka-run-class.sh kafka.admin.ListTopicCommand $@ +$base_dir/kafka-run-class.sh kafka.admin.TopicCommand $@ diff --git a/core/src/main/scala/kafka/common/KafkaZookeperClient.scala b/core/src/main/scala/kafka/admin/AdminOperationException.scala similarity index 50% rename from core/src/main/scala/kafka/common/KafkaZookeperClient.scala rename to core/src/main/scala/kafka/admin/AdminOperationException.scala index bace1d228f7..a45b3f7e93a 100644 --- a/core/src/main/scala/kafka/common/KafkaZookeperClient.scala +++ b/core/src/main/scala/kafka/admin/AdminOperationException.scala @@ -13,23 +13,11 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ -package kafka.common +package kafka.admin -import org.I0Itec.zkclient.ZkClient -import kafka.utils.{ZKStringSerializer, ZKConfig} -import java.util.concurrent.atomic.AtomicReference - -object KafkaZookeeperClient { - private val INSTANCE = new AtomicReference[ZkClient](null) - - def getZookeeperClient(config: ZKConfig): ZkClient = { - // TODO: This cannot be a singleton since unit tests break if we do that -// INSTANCE.compareAndSet(null, new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, -// ZKStringSerializer)) - INSTANCE.set(new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, - ZKStringSerializer)) - INSTANCE.get() - } -} +class AdminOperationException(val error: String, cause: Throwable) extends RuntimeException(error, cause) { + def this(error: Throwable) = this(error.getMessage, error) + def this(msg: String) = this(msg, null) +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index b9ef4dc92e0..647938562d0 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -18,9 +18,13 @@ package kafka.admin import java.util.Random +import java.util.Properties import kafka.api.{TopicMetadata, PartitionMetadata} import kafka.cluster.Broker import kafka.utils.{Logging, ZkUtils} +import kafka.log.LogConfig +import kafka.server.TopicConfigManager +import kafka.utils.{Logging, Utils, ZkUtils, Json} import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException import scala.collection._ @@ -30,7 +34,7 @@ import scala.Some object AdminUtils extends Logging { val rand = new Random - val AdminEpoch = -1 + val TopicConfigChangeZnodePrefix = "config_change_" /** * There are 2 goals of replica assignment: @@ -50,33 +54,74 @@ object AdminUtils extends Logging { * p3 p4 p0 p1 p2 (3nd replica) * p7 p8 p9 p5 p6 (3nd replica) */ - def assignReplicasToBrokers(brokerList: Seq[Int], nPartitions: Int, replicationFactor: Int, + def assignReplicasToBrokers(brokers: Seq[Int], + partitions: Int, + replicationFactor: Int, fixedStartIndex: Int = -1) // for testing only : Map[Int, Seq[Int]] = { - if (nPartitions <= 0) - throw new AdministrationException("number of partitions must be larger than 0") + if (partitions <= 0) + throw new AdminOperationException("number of partitions must be larger than 0") if (replicationFactor <= 0) - throw new AdministrationException("replication factor must be larger than 0") - if (replicationFactor > brokerList.size) - throw new AdministrationException("replication factor: " + replicationFactor + - " larger than available brokers: " + brokerList.size) + throw new AdminOperationException("replication factor must be larger than 0") + if (replicationFactor > brokers.size) + throw new AdminOperationException("replication factor: " + replicationFactor + + " larger than available brokers: " + brokers.size) val ret = new mutable.HashMap[Int, List[Int]]() - val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size) + val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokers.size) - var secondReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size) - for (i <- 0 until nPartitions) { - if (i > 0 && (i % brokerList.size == 0)) + var secondReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokers.size) + for (i <- 0 until partitions) { + if (i > 0 && (i % brokers.size == 0)) secondReplicaShift += 1 - val firstReplicaIndex = (i + startIndex) % brokerList.size - var replicaList = List(brokerList(firstReplicaIndex)) + val firstReplicaIndex = (i + startIndex) % brokers.size + var replicaList = List(brokers(firstReplicaIndex)) for (j <- 0 until replicationFactor - 1) - replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, secondReplicaShift, j, brokerList.size)) + replicaList ::= brokers(replicaIndex(firstReplicaIndex, secondReplicaShift, j, brokers.size)) ret.put(i, replicaList.reverse) } ret.toMap } + + def deleteTopic(zkClient: ZkClient, topic: String) { + zkClient.deleteRecursive(ZkUtils.getTopicPath(topic)) + zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic)) + } + + def topicExists(zkClient: ZkClient, topic: String): Boolean = + zkClient.exists(ZkUtils.getTopicPath(topic)) + + def createTopic(zkClient: ZkClient, + topic: String, + partitions: Int, + replicationFactor: Int, + topicConfig: Properties = new Properties) { + val brokerList = ZkUtils.getSortedBrokerList(zkClient) + val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor) + AdminUtils.createTopicWithAssignment(zkClient, topic, replicaAssignment, topicConfig) + } + + def createTopicWithAssignment(zkClient: ZkClient, + topic: String, + partitionReplicaAssignment: Map[Int, Seq[Int]], + config: Properties = new Properties) { + // validate arguments + Topic.validate(topic) + LogConfig.validate(config) + require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, "All partitions should have the same number of replicas.") - def createTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, Seq[Int]], zkClient: ZkClient) { + val topicPath = ZkUtils.getTopicPath(topic) + if(zkClient.exists(topicPath)) + throw new TopicExistsException("Topic \"%s\" already exists.".format(topic)) + partitionReplicaAssignment.values.foreach(reps => require(reps.size == reps.toSet.size, "Duplicate replica assignment found: " + partitionReplicaAssignment)) + + // write out the config if there is any, this isn't transactional with the partition assignments + writeTopicConfig(zkClient, topic, config) + + // create the partition assignment + writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment) + } + + private def writeTopicPartitionAssignment(zkClient: ZkClient, topic: String, replicaAssignment: Map[Int, Seq[Int]]) { try { val zkPath = ZkUtils.getTopicPath(topic) val jsonPartitionData = ZkUtils.replicaAssignmentZkdata(replicaAssignment.map(e => (e._1.toString -> e._2))) @@ -84,9 +129,61 @@ object AdminUtils extends Logging { debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData)) } catch { case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic)) - case e2 => throw new AdministrationException(e2.toString) + case e2 => throw new AdminOperationException(e2.toString) } } + + /** + * Update the config for an existing topic and create a change notification so the change will propagate to other brokers + */ + def changeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) { + LogConfig.validate(config) + if(!topicExists(zkClient, topic)) + throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic)) + + // write the new config--may not exist if there were previously no overrides + writeTopicConfig(zkClient, topic, config) + + // create the change notification + zkClient.createPersistentSequential(ZkUtils.TopicConfigChangesPath + "/" + TopicConfigChangeZnodePrefix, Json.encode(topic)) + } + + /** + * Write out the topic config to zk, if there is any + */ + private def writeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) { + if(config.size > 0) { + val map = Map("version" -> 1, "config" -> JavaConversions.asMap(config)) + ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Json.encode(map)) + } + } + + /** + * Read the topic config (if any) from zk + */ + def fetchTopicConfig(zkClient: ZkClient, topic: String): Properties = { + val str: String = zkClient.readData(ZkUtils.getTopicConfigPath(topic), true) + val props = new Properties() + if(str != null) { + Json.parseFull(str) match { + case None => // there are no config overrides + case Some(map: Map[String, _]) => + require(map("version") == 1) + map.get("config") match { + case Some(config: Map[String, String]) => + for((k,v) <- config) + props.setProperty(k, v) + case _ => throw new IllegalArgumentException("Invalid topic config: " + str) + } + + case o => throw new IllegalArgumentException("Unexpected value in config: " + str) + } + } + props + } + + def fetchAllTopicConfigs(zkClient: ZkClient): Map[String, Properties] = + ZkUtils.getAllTopics(zkClient).map(topic => (topic, fetchTopicConfig(zkClient, topic))).toMap def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata = fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker]) @@ -158,12 +255,8 @@ object AdminUtils extends Logging { } } - private def getWrappedIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = { + private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = { val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1) (firstReplicaIndex + shift) % nBrokers } } - -class AdministrationException(val errorMessage: String) extends RuntimeException(errorMessage) { - def this() = this(null) -} diff --git a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala deleted file mode 100644 index e76211523d7..00000000000 --- a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.admin - -import joptsimple.OptionParser -import kafka.utils._ -import org.I0Itec.zkclient.ZkClient -import scala.collection.mutable -import kafka.common.Topic - -object CreateTopicCommand extends Logging { - - def main(args: Array[String]): Unit = { - val parser = new OptionParser - val topicOpt = parser.accepts("topic", "REQUIRED: The topic to be created.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) - val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + - "Multiple URLS can be given to allow fail-over.") - .withRequiredArg - .describedAs("urls") - .ofType(classOf[String]) - val nPartitionsOpt = parser.accepts("partition", "number of partitions in the topic") - .withRequiredArg - .describedAs("# of partitions") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) - val replicationFactorOpt = parser.accepts("replica", "replication factor for each partitions in the topic") - .withRequiredArg - .describedAs("replication factor") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) - val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "for manually assigning replicas to brokers") - .withRequiredArg - .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2, " + - "broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...") - .ofType(classOf[String]) - .defaultsTo("") - - val options = parser.parse(args : _*) - - for(arg <- List(topicOpt, zkConnectOpt)) { - if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } - - val topic = options.valueOf(topicOpt) - val zkConnect = options.valueOf(zkConnectOpt) - val nPartitions = options.valueOf(nPartitionsOpt).intValue - val replicationFactor = options.valueOf(replicationFactorOpt).intValue - val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt) - var zkClient: ZkClient = null - try { - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) - createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr) - println("creation succeeded!") - } catch { - case e => - println("creation failed because of " + e.getMessage) - println(Utils.stackTrace(e)) - } finally { - if (zkClient != null) - zkClient.close() - } - } - - def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "") { - Topic.validate(topic) - - val brokerList = ZkUtils.getSortedBrokerList(zkClient) - - val partitionReplicaAssignment = if (replicaAssignmentStr == "") - AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor) - else - getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet) - debug("Replica assignment list for %s is %s".format(topic, partitionReplicaAssignment)) - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, partitionReplicaAssignment, zkClient) - } - - def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int]): Map[Int, List[Int]] = { - val partitionList = replicaAssignmentList.split(",") - val ret = new mutable.HashMap[Int, List[Int]]() - for (i <- 0 until partitionList.size) { - val brokerList = partitionList(i).split(":").map(s => s.trim().toInt) - if (brokerList.size <= 0) - throw new AdministrationException("replication factor must be larger than 0") - if (brokerList.size != brokerList.toSet.size) - throw new AdministrationException("duplicate brokers in replica assignment: " + brokerList) - if (!brokerList.toSet.subsetOf(availableBrokerList)) - throw new AdministrationException("some specified brokers not available. specified brokers: " + brokerList.toString + - "available broker:" + availableBrokerList.toString) - ret.put(i, brokerList.toList) - if (ret(i).size != ret(0).size) - throw new AdministrationException("partition " + i + " has different replication factor: " + brokerList) - } - ret.toMap - } -} diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index ebcf669f9b7..49342c6cd4b 100644 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -85,7 +85,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt TopicAndPartition(topic, partition) } - case None => throw new AdministrationException("Preferred replica election data is empty") + case None => throw new AdminOperationException("Preferred replica election data is empty") } } @@ -102,9 +102,9 @@ object PreferredReplicaLeaderElectionCommand extends Logging { case nee: ZkNodeExistsException => val partitionsUndergoingPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(ZkUtils.readData(zkClient, zkPath)._1) - throw new AdministrationException("Preferred replica leader election currently in progress for " + + throw new AdminOperationException("Preferred replica leader election currently in progress for " + "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection)) - case e2 => throw new AdministrationException(e2.toString) + case e2 => throw new AdminOperationException(e2.toString) } } } diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala new file mode 100644 index 00000000000..d3646089a53 --- /dev/null +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import joptsimple._ +import java.util.Properties +import kafka.utils._ +import org.I0Itec.zkclient.ZkClient +import scala.collection._ +import scala.collection.JavaConversions._ +import kafka.common.Topic +import kafka.cluster.Broker + +object TopicCommand { + + def main(args: Array[String]): Unit = { + + val opts = new TopicCommandOptions(args) + + // should have exactly one action + val actions = Seq(opts.createOpt, opts.deleteOpt, opts.listOpt, opts.alterOpt, opts.describeOpt).count(opts.options.has _) + if(actions != 1) { + System.err.println("Command must include exactly one action: --list, --describe, --create, --delete, or --alter") + opts.parser.printHelpOn(System.err) + System.exit(1) + } + + CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt) + + val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer) + + if(opts.options.has(opts.createOpt)) + createTopic(zkClient, opts) + else if(opts.options.has(opts.alterOpt)) + alterTopic(zkClient, opts) + else if(opts.options.has(opts.deleteOpt)) + deleteTopic(zkClient, opts) + else if(opts.options.has(opts.listOpt)) + listTopics(zkClient) + else if(opts.options.has(opts.describeOpt)) + describeTopic(zkClient, opts) + + zkClient.close() + } + + def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) { + CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) + val topics = opts.options.valuesOf(opts.topicOpt) + val configs = parseTopicConfigs(opts) + for (topic <- topics) { + if (opts.options.has(opts.replicaAssignmentOpt)) { + val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt)) + AdminUtils.createTopicWithAssignment(zkClient, topic, assignment, configs) + } else { + CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt) + val partitions = opts.options.valueOf(opts.partitionsOpt).intValue + val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue + AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs) + } + println("Created topic \"%s\".".format(topic)) + } + } + + def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) { + CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) + val topics = opts.options.valuesOf(opts.topicOpt) + val configs = parseTopicConfigs(opts) + if(opts.options.has(opts.partitionsOpt)) + Utils.croak("Changing the number of partitions is not supported.") + if(opts.options.has(opts.replicationFactorOpt)) + Utils.croak("Changing the replication factor is not supported.") + for(topic <- topics) { + AdminUtils.changeTopicConfig(zkClient, topic, configs) + println("Updated config for topic \"%s\".".format(topic)) + } + } + + def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) { + CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) + for(topic <- opts.options.valuesOf(opts.topicOpt)) { + AdminUtils.deleteTopic(zkClient, topic) + println("Topic \"%s\" deleted.".format(topic)) + } + } + + def listTopics(zkClient: ZkClient) { + for(topic <- ZkUtils.getAllTopics(zkClient).sorted) + println(topic) + } + + def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) { + CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) + val topics = opts.options.valuesOf(opts.topicOpt) + val metadata = AdminUtils.fetchTopicMetadataFromZk(topics.toSet, zkClient) + for(md <- metadata) { + println(md.topic) + val config = AdminUtils.fetchTopicConfig(zkClient, md.topic) + println("\tconfigs: " + config.map(kv => kv._1 + " = " + kv._2).mkString(", ")) + println("\tpartitions: " + md.partitionsMetadata.size) + for(pd <- md.partitionsMetadata) { + println("\t\tpartition " + pd.partitionId) + println("\t\tleader: " + (if(pd.leader.isDefined) formatBroker(pd.leader.get) else "none")) + println("\t\treplicas: " + pd.replicas.map(formatBroker).mkString(", ")) + println("\t\tisr: " + pd.isr.map(formatBroker).mkString(", ")) + } + } + } + + def formatBroker(broker: Broker) = broker.id + " (" + broker.host + ":" + broker.port + ")" + + def parseTopicConfigs(opts: TopicCommandOptions): Properties = { + val configs = opts.options.valuesOf(opts.configOpt).map(_.split("\\s*=\\s*")) + require(configs.forall(_.length == 2), "Invalid topic config: all configs must be in the format \"key=val\".") + val props = new Properties + configs.foreach(pair => props.setProperty(pair(0), pair(1))) + props + } + + def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = { + val partitionList = replicaAssignmentList.split(",") + val ret = new mutable.HashMap[Int, List[Int]]() + for (i <- 0 until partitionList.size) { + val brokerList = partitionList(i).split(":").map(s => s.trim().toInt) + ret.put(i, brokerList.toList) + if (ret(i).size != ret(0).size) + throw new AdminOperationException("Partition " + i + " has different replication factor: " + brokerList) + } + ret.toMap + } + + class TopicCommandOptions(args: Array[String]) { + val parser = new OptionParser + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + val listOpt = parser.accepts("list", "List all available topics.") + val createOpt = parser.accepts("create", "Create a new topic.") + val alterOpt = parser.accepts("alter", "Alter the configuration for the topic.") + val deleteOpt = parser.accepts("delete", "Delete the topic.") + val describeOpt = parser.accepts("describe", "List details for the given topics.") + val helpOpt = parser.accepts("help", "Print usage information.") + val topicOpt = parser.accepts("topic", "The topic to be create, alter, delete, or describe.") + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + val configOpt = parser.accepts("config", "A topic configuration for the topic being created or altered.") + .withRequiredArg + .describedAs("name=value") + .ofType(classOf[String]) + val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created.") + .withRequiredArg + .describedAs("# of partitions") + .ofType(classOf[java.lang.Integer]) + val replicationFactorOpt = parser.accepts("replication-factor", "The replication factor for each partition in the topic being created.") + .withRequiredArg + .describedAs("replication factor") + .ofType(classOf[java.lang.Integer]) + val replicaAssignmentOpt = parser.accepts("replica-assignment", "A list of manual partition-to-broker assignments for the topic being created.") + .withRequiredArg + .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , " + + "broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...") + .ofType(classOf[String]) + + + val options = parser.parse(args : _*) + } + +} diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index a24094a4e64..367ccd56fbb 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -17,9 +17,11 @@ package kafka.cluster import scala.collection._ +import kafka.admin.AdminUtils import kafka.utils._ import java.lang.Object import kafka.api.LeaderAndIsr +import kafka.log.LogConfig import kafka.server.ReplicaManager import com.yammer.metrics.core.Gauge import kafka.metrics.KafkaMetricsGroup @@ -74,7 +76,8 @@ class Partition(val topic: String, case Some(replica) => replica case None => if (isReplicaLocal(replicaId)) { - val log = logManager.getOrCreateLog(topic, partitionId) + val config = LogConfig.fromProps(logManager.defaultConfig.toProps, AdminUtils.fetchTopicConfig(zkClient, topic)) + val log = logManager.createLog(TopicAndPartition(topic, partitionId), config) val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent) val offset = checkpoint.read.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset) val localReplica = new Replica(replicaId, this, time, offset, Some(log)) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 0cc03bbc67b..631953f1b24 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -49,7 +49,7 @@ import com.yammer.metrics.core.Gauge */ @threadsafe class Log(val dir: File, - val config: LogConfig, + @volatile var config: LogConfig, val needsRecovery: Boolean, val scheduler: Scheduler, time: Time = SystemTime) extends Logging with KafkaMetricsGroup { diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 5a10bef3b36..dc42a74774f 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -17,7 +17,7 @@ package kafka.log -import java.io.File +import java.util.Properties import scala.collection._ import kafka.common._ @@ -46,6 +46,99 @@ case class LogConfig(val segmentSize: Int = 1024*1024, val indexInterval: Int = 4096, val fileDeleteDelayMs: Long = 60*1000, val minCleanableRatio: Double = 0.5, - val dedupe: Boolean = false) + val dedupe: Boolean = false) { + + def toProps: Properties = { + val props = new Properties() + import LogConfig._ + props.put(SegmentBytesProp, segmentSize.toString) + props.put(SegmentMsProp, segmentMs.toString) + props.put(SegmentIndexBytesProp, maxIndexSize.toString) + props.put(FlushMessagesProp, flushInterval.toString) + props.put(FlushMsProp, flushMs.toString) + props.put(RetentionBytesProp, retentionSize.toString) + props.put(RententionMsProp, retentionMs.toString) + props.put(MaxMessageBytesProp, maxMessageSize.toString) + props.put(IndexIntervalBytesProp, indexInterval.toString) + props.put(FileDeleteDelayMsProp, fileDeleteDelayMs.toString) + props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString) + props.put(CleanupPolicyProp, if(dedupe) "dedupe" else "delete") + props + } + +} + +object LogConfig { + val SegmentBytesProp = "segment.bytes" + val SegmentMsProp = "segment.ms" + val SegmentIndexBytesProp = "segment.index.bytes" + val FlushMessagesProp = "flush.messages" + val FlushMsProp = "flush.ms" + val RetentionBytesProp = "retention.bytes" + val RententionMsProp = "retention.ms" + val MaxMessageBytesProp = "max.message.bytes" + val IndexIntervalBytesProp = "index.interval.bytes" + val FileDeleteDelayMsProp = "file.delete.delay.ms" + val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio" + val CleanupPolicyProp = "cleanup.policy" + + val ConfigNames = Set(SegmentBytesProp, + SegmentMsProp, + SegmentIndexBytesProp, + FlushMessagesProp, + FlushMsProp, + RetentionBytesProp, + RententionMsProp, + MaxMessageBytesProp, + IndexIntervalBytesProp, + FileDeleteDelayMsProp, + MinCleanableDirtyRatioProp, + CleanupPolicyProp) + + + /** + * Parse the given properties instance into a LogConfig object + */ + def fromProps(props: Properties): LogConfig = { + new LogConfig(segmentSize = props.getProperty(SegmentBytesProp).toInt, + segmentMs = props.getProperty(SegmentMsProp).toLong, + maxIndexSize = props.getProperty(SegmentIndexBytesProp).toInt, + flushInterval = props.getProperty(FlushMessagesProp).toLong, + flushMs = props.getProperty(FlushMsProp).toLong, + retentionSize = props.getProperty(RetentionBytesProp).toLong, + retentionMs = props.getProperty(RententionMsProp).toLong, + maxMessageSize = props.getProperty(MaxMessageBytesProp).toInt, + indexInterval = props.getProperty(IndexIntervalBytesProp).toInt, + fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp).toInt, + minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp).toDouble, + dedupe = props.getProperty(CleanupPolicyProp).trim.toLowerCase == "dedupe") + } + + /** + * Create a log config instance using the given properties and defaults + */ + def fromProps(defaults: Properties, overrides: Properties): LogConfig = { + val props = new Properties(defaults) + props.putAll(overrides) + fromProps(props) + } + + /** + * Check that property names are valid + */ + private def validateNames(props: Properties) { + for(name <- JavaConversions.asMap(props).keys) + require(LogConfig.ConfigNames.contains(name), "Unknown configuration \"%s\".".format(name)) + } + + /** + * Check that the given properties contain only valid log config names, and that all values can be parsed. + */ + def validate(props: Properties) { + validateNames(props) + LogConfig.fromProps(LogConfig().toProps, props) // check that we can parse the values + } + +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 438d802ab23..0d567e4e758 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -174,31 +174,19 @@ class LogManager(val logDirs: Array[File], /** * Get the log if it exists, otherwise return None */ - def getLog(topic: String, partition: Int): Option[Log] = { - val topicAndPartiton = TopicAndPartition(topic, partition) - val log = logs.get(topicAndPartiton) + def getLog(topicAndPartition: TopicAndPartition): Option[Log] = { + val log = logs.get(topicAndPartition) if (log == null) None else Some(log) } - /** - * Create the log if it does not exist, otherwise just return it - */ - def getOrCreateLog(topic: String, partition: Int): Log = { - val topicAndPartition = TopicAndPartition(topic, partition) - logs.get(topicAndPartition) match { - case null => createLogIfNotExists(topicAndPartition) - case log: Log => log - } - } - /** * Create a log for the given topic and the given partition * If the log already exists, just return a copy of the existing log */ - private def createLogIfNotExists(topicAndPartition: TopicAndPartition): Log = { + def createLog(topicAndPartition: TopicAndPartition, config: LogConfig): Log = { logCreationLock synchronized { var log = logs.get(topicAndPartition) @@ -211,12 +199,16 @@ class LogManager(val logDirs: Array[File], val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition) dir.mkdirs() log = new Log(dir, - defaultConfig, + config, needsRecovery = false, scheduler, time) - info("Created log for topic %s partition %d in %s.".format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath)) logs.put(topicAndPartition, log) + info("Created log for topic %s partition %d in %s with properties {%s}." + .format(topicAndPartition.topic, + topicAndPartition.partition, + dataDir.getAbsolutePath, + JavaConversions.asMap(config.toProps).mkString(", "))) log } } @@ -289,6 +281,11 @@ class LogManager(val logDirs: Array[File], * Get all the partition logs */ def allLogs(): Iterable[Log] = logs.values + + /** + * Get a map of TopicAndPartition => Log + */ + def logsByTopicPartition = logs.toMap /** * Flush any log which has exceeded its flush interval and has unwritten messages. diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 648d936473a..865f7b460d3 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -281,7 +281,7 @@ private[kafka] class Processor(val id: Int, debug("Ignoring response for closed socket.") close(key) } - }finally { + } finally { curr = requestChannel.receiveResponse(id) } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index f30dca1ebea..f8faf96eafc 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -17,7 +17,7 @@ package kafka.server -import kafka.admin.{CreateTopicCommand, AdminUtils} +import kafka.admin.AdminUtils import kafka.api._ import kafka.message._ import kafka.network._ @@ -25,6 +25,7 @@ import kafka.log._ import kafka.utils.ZKGroupTopicDirs import org.apache.log4j.Logger import scala.collection._ +import java.util.Properties import java.util.concurrent.TimeUnit import java.util.concurrent.atomic._ import kafka.metrics.KafkaMetricsGroup @@ -367,7 +368,7 @@ class KafkaApis(val requestChannel: RequestChannel, } def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { - logManager.getLog(topicAndPartition.topic, topicAndPartition.partition) match { + logManager.getLog(topicAndPartition) match { case Some(log) => fetchOffsetsBefore(log, timestamp, maxNumOffsets) case None => @@ -442,7 +443,7 @@ class KafkaApis(val requestChannel: RequestChannel, /* check if auto creation of topics is turned on */ if (config.autoCreateTopicsEnable) { try { - CreateTopicCommand.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor) + AdminUtils.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor) info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" .format(topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor)) } catch { @@ -478,24 +479,25 @@ class KafkaApis(val requestChannel: RequestChannel, if(requestLogger.isTraceEnabled) requestLogger.trace("Handling offset commit request " + offsetCommitRequest.toString) trace("Handling offset commit request " + offsetCommitRequest.toString) - val responseInfo = offsetCommitRequest.requestInfo.map( t => { - val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, t._1.topic) - try { - if(t._2.metadata != null && t._2.metadata.length > config.offsetMetadataMaxSize) { - (t._1, ErrorMapping.OffsetMetadataTooLargeCode) - } else { - ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + - t._1.partition, t._2.offset.toString) - (t._1, ErrorMapping.NoError) + val responseInfo = offsetCommitRequest.requestInfo.map{ + case (topicAndPartition, metaAndError) => { + val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic) + try { + if(metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) { + (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode) + } else { + ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + + topicAndPartition.partition, metaAndError.offset.toString) + (topicAndPartition, ErrorMapping.NoError) + } + } catch { + case e => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } - } catch { - case e => - (t._1, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } - }) + } val response = new OffsetCommitResponse(responseInfo, - offsetCommitRequest.correlationId, - offsetCommitRequest.clientId) + offsetCommitRequest.correlationId, + offsetCommitRequest.clientId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } @@ -506,7 +508,6 @@ class KafkaApis(val requestChannel: RequestChannel, val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest] if(requestLogger.isTraceEnabled) requestLogger.trace("Handling offset fetch request " + offsetFetchRequest.toString) - trace("Handling offset fetch request " + offsetFetchRequest.toString) val responseInfo = offsetFetchRequest.requestInfo.map( t => { val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, t.topic) try { @@ -525,8 +526,8 @@ class KafkaApis(val requestChannel: RequestChannel, } }) val response = new OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), - offsetFetchRequest.correlationId, - offsetFetchRequest.clientId) + offsetFetchRequest.correlationId, + offsetFetchRequest.clientId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala similarity index 68% rename from core/src/main/scala/kafka/server/KafkaZooKeeper.scala rename to core/src/main/scala/kafka/server/KafkaHealthcheck.scala index 0e6c65660f5..a0787075d3f 100644 --- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -25,30 +25,36 @@ import java.net.InetAddress /** - * Handles registering broker with zookeeper in the following path: + * This class registers the broker in zookeeper to allow + * other brokers and consumers to detect failures. It uses an ephemeral znode with the path: * /brokers/[0...N] --> host:port + * + * Right now our definition of health is fairly naive. If we register in zk we are healthy, otherwise + * we are dead. */ -class KafkaZooKeeper(config: KafkaConfig) extends Logging { +class KafkaHealthcheck(private val brokerId: Int, + private val host: String, + private val port: Int, + private val zkClient: ZkClient) extends Logging { - val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId - private var zkClient: ZkClient = null + val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId + + def startup() { + zkClient.subscribeStateChanges(new SessionExpireListener) + register() + } - def startup() { - /* start client */ - info("connecting to ZK: " + config.zkConnect) - zkClient = KafkaZookeeperClient.getZookeeperClient(config) - zkClient.subscribeStateChanges(new SessionExpireListener) - registerBrokerInZk() - } - - private def registerBrokerInZk() { + /** + * Register this broker as "alive" in zookeeper + */ + def register() { val hostName = - if(config.hostName == null || config.hostName.trim.isEmpty) + if(host == null || host.trim.isEmpty) InetAddress.getLocalHost.getCanonicalHostName else - config.hostName + host val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt - ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port, jmxPort) + ZkUtils.registerBrokerInZk(zkClient, brokerId, hostName, port, jmxPort) } /** @@ -70,21 +76,11 @@ class KafkaZooKeeper(config: KafkaConfig) extends Logging { */ @throws(classOf[Exception]) def handleNewSession() { - info("re-registering broker info in ZK for broker " + config.brokerId) - registerBrokerInZk() + info("re-registering broker info in ZK for broker " + brokerId) + register() info("done re-registering broker") info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath)) } } - def shutdown() { - if (zkClient != null) { - info("Closing zookeeper client...") - zkClient.close() - } - } - - def getZookeeperClient = { - zkClient - } } diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index 842dcf38da3..f0949c297a0 100644 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -67,7 +67,7 @@ class KafkaRequestHandlerPool(val brokerId: Int, handler.shutdown for(thread <- threads) thread.join - info("shutted down completely") + info("shut down completely") } } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index badd1f8dec2..9fa432d6705 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -18,6 +18,7 @@ package kafka.server import kafka.network.SocketServer +import kafka.admin._ import kafka.log.LogConfig import kafka.log.CleanerConfig import kafka.log.LogManager @@ -39,7 +40,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg var socketServer: SocketServer = null var requestHandlerPool: KafkaRequestHandlerPool = null var logManager: LogManager = null - var kafkaZookeeper: KafkaZooKeeper = null + var kafkaHealthcheck: KafkaHealthcheck = null + var topicConfigManager: TopicConfigManager = null var replicaManager: ReplicaManager = null var apis: KafkaApis = null var kafkaController: KafkaController = null @@ -57,9 +59,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg /* start scheduler */ kafkaScheduler.startup() + + /* setup zookeeper */ + zkClient = initZk() /* start log manager */ - logManager = createLogManager(config) + logManager = createLogManager(zkClient) logManager.startup() socketServer = new SocketServer(config.brokerId, @@ -68,31 +73,40 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg config.numNetworkThreads, config.queuedMaxRequests, config.socketRequestMaxBytes) + socketServer.startup() - socketServer.startup - - /* start client */ - kafkaZookeeper = new KafkaZooKeeper(config) - // starting relevant replicas and leader election for partitions assigned to this broker - kafkaZookeeper.startup - - info("Connecting to ZK: " + config.zkConnect) - - replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, logManager) - - kafkaController = new KafkaController(config, kafkaZookeeper.getZookeeperClient) - apis = new KafkaApis(socketServer.requestChannel, replicaManager, kafkaZookeeper.getZookeeperClient, config.brokerId, config) + replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager) + kafkaController = new KafkaController(config, zkClient) + + /* start processing requests */ + apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) - Mx4jLoader.maybeLoad + + Mx4jLoader.maybeLoad() - // start the replica manager replicaManager.startup() - // start the controller + kafkaController.startup() - // register metrics beans + + topicConfigManager = new TopicConfigManager(zkClient, logManager) + topicConfigManager.startup() + + /* tell everyone we are alive */ + kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.hostName, config.port, zkClient) + kafkaHealthcheck.startup() + + registerStats() + info("started") } + + private def initZk(): ZkClient = { + info("Connecting to zookeeper on " + config.zkConnect) + val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) + ZkUtils.setupCommonPaths(zkClient) + zkClient + } /** * Forces some dynamic jmx beans to be registered on server startup. @@ -118,15 +132,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg Utils.swallow(kafkaScheduler.shutdown()) if(apis != null) Utils.swallow(apis.close()) - if(kafkaZookeeper != null) - Utils.swallow(kafkaZookeeper.shutdown()) if(replicaManager != null) Utils.swallow(replicaManager.shutdown()) if(logManager != null) Utils.swallow(logManager.shutdown()) - if(kafkaController != null) Utils.swallow(kafkaController.shutdown()) + if(zkClient != null) + Utils.swallow(zkClient.close()) shutdownLatch.countDown() info("shut down completed") @@ -140,13 +153,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg def getLogManager(): LogManager = logManager - private def createLogManager(config: KafkaConfig): LogManager = { - val topics = config.logCleanupPolicyMap.keys ++ - config.logSegmentBytesPerTopicMap.keys ++ - config.logFlushIntervalMsPerTopicMap.keys ++ - config.logRollHoursPerTopicMap.keys ++ - config.logRetentionBytesPerTopicMap.keys ++ - config.logRetentionHoursPerTopicMap.keys + private def createLogManager(zkClient: ZkClient): LogManager = { val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes, segmentMs = 60 * 60 * 1000 * config.logRollHours, flushInterval = config.logFlushIntervalMessages, @@ -159,13 +166,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg fileDeleteDelayMs = config.logDeleteDelayMs, minCleanableRatio = config.logCleanerMinCleanRatio, dedupe = config.logCleanupPolicy.trim.toLowerCase == "dedupe") - val logConfigs = for(topic <- topics) yield - topic -> defaultLogConfig.copy(segmentSize = config.logSegmentBytesPerTopicMap.getOrElse(topic, config.logSegmentBytes), - segmentMs = 60 * 60 * 1000 * config.logRollHoursPerTopicMap.getOrElse(topic, config.logRollHours), - flushMs = config.logFlushIntervalMsPerTopicMap.getOrElse(topic, config.logFlushIntervalMs).toLong, - retentionSize = config.logRetentionBytesPerTopicMap.getOrElse(topic, config.logRetentionBytes), - retentionMs = 60 * 60 * 1000 * config.logRetentionHoursPerTopicMap.getOrElse(topic, config.logRetentionHours), - dedupe = config.logCleanupPolicyMap.getOrElse(topic, config.logCleanupPolicy).trim.toLowerCase == "dedupe") + val defaultProps = defaultLogConfig.toProps + val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _)) + // read the log configurations from zookeeper val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads, dedupeBufferSize = config.logCleanerDedupeBufferSize, ioBufferSize = config.logCleanerIoBufferSize, @@ -174,7 +177,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg backOffMs = config.logCleanerBackoffMs, enableCleaner = config.logCleanerEnable) new LogManager(logDirs = config.logDirs.map(new File(_)).toArray, - topicConfigs = logConfigs.toMap, + topicConfigs = configs, defaultConfig = defaultLogConfig, cleanerConfig = cleanerConfig, flushCheckMs = config.logFlushSchedulerIntervalMs, diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 573601f8aaf..765d3cbb9a1 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -175,7 +175,7 @@ class ReplicaManager(val config: KafkaConfig, case Some(leaderReplica) => leaderReplica case None => throw new LeaderNotAvailableException("Leader not local for topic %s partition %d on broker %d" - .format(topic, partitionId, config.brokerId)) + .format(topic, partitionId, config.brokerId)) } } } diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala new file mode 100644 index 00000000000..5814cb7c4ee --- /dev/null +++ b/core/src/main/scala/kafka/server/TopicConfigManager.scala @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.Properties +import scala.collection._ +import kafka.log._ +import kafka.utils._ +import kafka.admin.AdminUtils +import org.I0Itec.zkclient.{IZkChildListener, ZkClient} + +/** + * This class initiates and carries out topic config changes. + * + * It works as follows. + * + * Config is stored under the path + * /brokers/topics//config + * This znode stores the topic-overrides for this topic (but no defaults) in properties format. + * + * To avoid watching all topics for changes instead we have a notification path + * /brokers/config_changes + * The TopicConfigManager has a child watch on this path. + * + * To update a topic config we first update the topic config properties. Then we create a new sequential + * znode under the change path which contains the name of the topic that was updated, say + * /brokers/config_changes/config_change_13321 + * + * This will fire a watcher on all brokers. This watcher works as follows. It reads all the config change notifications. + * It keeps track of the highest config change suffix number it has applied previously. For any previously applied change it finds + * it checks if this notification is larger than a static expiration time (say 10mins) and if so it deletes this notification. + * For any new changes it reads the new configuration, combines it with the defaults, and updates the log config + * for all logs for that topic (if any) that it has. + * + * Note that config is always read from the config path in zk, the notification is just a trigger to do so. So if a broker is + * down and misses a change that is fine--when it restarts it will be loading the full config anyway. Note also that + * if there are two consecutive config changes it is possible that only the last one will be applied (since by the time the + * broker reads the config the both changes may have been made). In this case the broker would needlessly refresh the config twice, + * but that is harmless. + * + * On restart the config manager re-processes all notifications. This will usually be wasted work, but avoids any race conditions + * on startup where a change might be missed between the initial config load and registering for change notifications. + * + */ +class TopicConfigManager(private val zkClient: ZkClient, + private val logManager: LogManager, + private val changeExpirationMs: Long = 10*60*1000, + private val time: Time = SystemTime) extends Logging { + private var lastExecutedChange = -1L + + /** + * Begin watching for config changes + */ + def startup() { + ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.TopicConfigChangesPath) + zkClient.subscribeChildChanges(ZkUtils.TopicConfigChangesPath, ConfigChangeListener) + processAllConfigChanges() + } + + /** + * Process all config changes + */ + private def processAllConfigChanges() { + val configChanges = zkClient.getChildren(ZkUtils.TopicConfigChangesPath) + processConfigChanges(JavaConversions.asBuffer(configChanges).sorted) + } + + /** + * Process the given list of config changes + */ + private def processConfigChanges(notifications: Seq[String]) { + if (notifications.size > 0) { + info("Processing %d topic config change notification(s)...".format(notifications.size)) + val now = time.milliseconds + val logs = logManager.logsByTopicPartition.toBuffer + val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2)) + val lastChangeId = notifications.map(changeNumber).max + for (notification <- notifications) { + val changeId = changeNumber(notification) + if (changeId > lastExecutedChange) { + val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification + val (topicJson, stat) = ZkUtils.readData(zkClient, changeZnode) + val topic = topicJson.substring(1, topicJson.length - 1) // dequote + if (logsByTopic.contains(topic)) { + /* combine the default properties with the overrides in zk to create the new LogConfig */ + val props = new Properties(logManager.defaultConfig.toProps) + props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic)) + val logConfig = LogConfig.fromProps(props) + for (log <- logsByTopic(topic)) + log.config = logConfig + lastExecutedChange = changeId + info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props)) + } else if (now - stat.getCtime > changeExpirationMs) { + /* this change is now obsolete, try to delete it unless it is the last change left */ + ZkUtils.deletePath(zkClient, changeZnode) + } + } + } + } + } + + /* get the change number from a change notification znode */ + private def changeNumber(name: String): Long = name.substring(AdminUtils.TopicConfigChangeZnodePrefix.length).toLong + + /** + * A listener that applies config changes to logs + */ + object ConfigChangeListener extends IZkChildListener { + override def handleChildChange(path: String, chillins: java.util.List[String]) { + try { + processConfigChanges(JavaConversions.asBuffer(chillins)) + } catch { + case e: Exception => error("Error processing config change:", e) + } + } + } + +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala b/core/src/main/scala/kafka/utils/CommandLineUtils.scala index 21c5d4adfb5..29f1209c61c 100644 --- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala +++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala @@ -10,7 +10,7 @@ object CommandLineUtils extends Logging { def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) { for(arg <- required) { if(!options.has(arg)) { - error("Missing required argument \"" + arg + "\"") + System.err.println("Missing required argument \"" + arg + "\"") parser.printHelpOn(System.err) System.exit(1) } diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala index a1147699d92..e300f60c6e0 100644 --- a/core/src/main/scala/kafka/utils/Json.scala +++ b/core/src/main/scala/kafka/utils/Json.scala @@ -1,6 +1,7 @@ package kafka.utils import kafka.common._ +import scala.collection._ import util.parsing.json.JSON /** @@ -11,6 +12,9 @@ object Json extends Logging { JSON.globalNumberParser = myConversionFunc val lock = new Object + /** + * Parse a JSON string into an object + */ def parseFull(input: String): Option[Any] = { lock synchronized { try { @@ -21,4 +25,31 @@ object Json extends Logging { } } } + + /** + * Encode an object into a JSON string. This method accepts any type T where + * T => null | Boolean | String | Number | Map[String, T] | Array[T] | Iterable[T] + * Any other type will result in an exception. + * + * This method does not properly handle non-ascii characters. + */ + def encode(obj: Any): String = { + obj match { + case null => "null" + case b: Boolean => b.toString + case s: String => "\"" + s + "\"" + case n: Number => n.toString + case m: Map[_, _] => + "{" + + m.map(elem => + elem match { + case t: Tuple2[_,_] => encode(t._1) + ":" + encode(t._2) + case _ => throw new IllegalArgumentException("Invalid map element (" + elem + ") in " + obj) + }).mkString(",") + + "}" + case a: Array[_] => encode(a.toSeq) + case i: Iterable[_] => "[" + i.map(encode).mkString(",") + "]" + case other: AnyRef => throw new IllegalArgumentException("Unknown arguement of type " + other.getClass + ": " + other) + } + } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 37b39753286..c8fdf4a2ea8 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -576,6 +576,25 @@ object Utils extends Logging { f } + /** + * Turn a properties map into a string + */ + def asString(props: Properties): String = { + val writer = new StringWriter() + props.store(writer, "") + writer.toString + } + + /** + * Read some properties with the given default values + */ + def readProps(s: String, defaults: Properties): Properties = { + val reader = new StringReader(s) + val props = new Properties(defaults) + props.load(reader) + props + } + /** * Read a big-endian integer from a byte array */ diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index f0aba120375..c6119d90194 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -38,6 +38,8 @@ object ZkUtils extends Logging { val ConsumersPath = "/consumers" val BrokerIdsPath = "/brokers/ids" val BrokerTopicsPath = "/brokers/topics" + val TopicConfigPath = "/config/topics" + val TopicConfigChangesPath = "/config/changes" val ControllerPath = "/controller" val ControllerEpochPath = "/controller_epoch" val ReassignPartitionsPath = "/admin/reassign_partitions" @@ -51,6 +53,9 @@ object ZkUtils extends Logging { getTopicPath(topic) + "/partitions" } + def getTopicConfigPath(topic: String): String = + TopicConfigPath + "/" + topic + def getController(zkClient: ZkClient): Int= { readDataMaybeNull(zkClient, ControllerPath)._1 match { case Some(controller) => controller.toInt @@ -58,17 +63,14 @@ object ZkUtils extends Logging { } } - def getTopicPartitionPath(topic: String, partitionId: Int): String ={ + def getTopicPartitionPath(topic: String, partitionId: Int): String = getTopicPartitionsPath(topic) + "/" + partitionId - } - def getTopicPartitionLeaderAndIsrPath(topic: String, partitionId: Int): String ={ + def getTopicPartitionLeaderAndIsrPath(topic: String, partitionId: Int): String = getTopicPartitionPath(topic, partitionId) + "/" + "state" - } - def getSortedBrokerList(zkClient: ZkClient): Seq[Int] ={ + def getSortedBrokerList(zkClient: ZkClient): Seq[Int] = ZkUtils.getChildren(zkClient, BrokerIdsPath).map(_.toInt).sorted - } def getAllBrokersInCluster(zkClient: ZkClient): Seq[Broker] = { val brokerIds = ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted @@ -89,6 +91,11 @@ object ZkUtils extends Logging { def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = { getLeaderIsrAndEpochForPartition(zkClient, topic, partition).map(_.leaderAndIsr) } + + def setupCommonPaths(zkClient: ZkClient) { + for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath)) + makeSurePersistentPathExists(zkClient, path) + } def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat) : Option[LeaderIsrAndControllerEpoch] = { @@ -179,7 +186,7 @@ object ZkUtils extends Logging { debug("The list of replicas for topic %s, partition %d is %s".format(topic, partition, replicas)) replicas.contains(brokerId.toString) } - + def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, jmxPort: Int) { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id val brokerInfo = @@ -312,10 +319,8 @@ object ZkUtils extends Logging { case e: ZkNodeExistsException => stat = client.writeData(path, data) return stat.getVersion - case e2 => throw e2 } } - case e2 => throw e2 } } @@ -596,7 +601,7 @@ object ZkUtils extends Logging { case nne: ZkNoNodeException => ZkUtils.createPersistentPath(zkClient, zkPath, jsonData) debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData)) - case e2 => throw new AdministrationException(e2.toString) + case e2 => throw new AdminOperationException(e2.toString) } } } diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 8e027b2f85d..b73e5d4f6d4 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -19,6 +19,9 @@ package kafka.admin import junit.framework.Assert._ import org.junit.Test import org.scalatest.junit.JUnit3Suite +import java.util.Properties +import kafka.utils._ +import kafka.log._ import kafka.zk.ZooKeeperTestHarness import kafka.server.KafkaConfig import kafka.utils.{Logging, ZkUtils, TestUtils} @@ -32,28 +35,17 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val brokerList = List(0, 1, 2, 3, 4) // test 0 replication factor - try { + intercept[AdminOperationException] { AdminUtils.assignReplicasToBrokers(brokerList, 10, 0) - fail("shouldn't allow replication factor 0") - } - catch { - case e: AdministrationException => // this is good - case e2 => throw e2 } // test wrong replication factor - try { + intercept[AdminOperationException] { AdminUtils.assignReplicasToBrokers(brokerList, 10, 6) - fail("shouldn't allow replication factor larger than # of brokers") - } - catch { - case e: AdministrationException => // this is good - case e2 => throw e2 } // correct assignment - { - val expectedAssignment = Map( + val expectedAssignment = Map( 0 -> List(0, 1, 2), 1 -> List(1, 2, 3), 2 -> List(2, 3, 4), @@ -63,65 +55,34 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { 6 -> List(1, 3, 4), 7 -> List(2, 4, 0), 8 -> List(3, 0, 1), - 9 -> List(4, 1, 2) - ) + 9 -> List(4, 1, 2)) - val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0) - val e = (expectedAssignment.toList == actualAssignment.toList) - assertTrue(expectedAssignment.toList == actualAssignment.toList) - } + val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0) + val e = (expectedAssignment.toList == actualAssignment.toList) + assertTrue(expectedAssignment.toList == actualAssignment.toList) } @Test def testManualReplicaAssignment() { - val brokerList = Set(0, 1, 2, 3, 4) + val brokers = List(0, 1, 2, 3, 4) + TestUtils.createBrokersInZk(zkClient, brokers) - // duplicated brokers - try { - val replicationAssignmentStr = "0,0,1:1,2,3" - CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList) - fail("replication assginment shouldn't have duplicated brokers") - } - catch { - case e: AdministrationException => // this is good - case e2 => throw e2 - } - - // non-exist brokers - try { - val replicationAssignmentStr = "0,1,2:1,2,7" - CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList) - fail("replication assginment shouldn't contain non-exist brokers") - } - catch { - case e: AdministrationException => // this is good - case e2 => throw e2 + // duplicate brokers + intercept[IllegalArgumentException] { + AdminUtils.createTopicWithAssignment(zkClient, "test", Map(0->Seq(0,0))) } // inconsistent replication factor - try { - val replicationAssignmentStr = "0,1,2:1,2" - CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList) - fail("all partitions should have the same replication factor") - } - catch { - case e: AdministrationException => // this is good - case e2 => throw e2 + intercept[IllegalArgumentException] { + AdminUtils.createTopicWithAssignment(zkClient, "test", Map(0->Seq(0,1), 1->Seq(0))) } // good assignment - { - val replicationAssignmentStr = "0:1:2,1:2:3" - val expectedReplicationAssignment = Map( - 0 -> List(0, 1, 2), - 1 -> List(1, 2, 3) - ) - val actualReplicationAssignment = CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList) - assertEquals(expectedReplicationAssignment.size, actualReplicationAssignment.size) - for( (part, replicas) <- expectedReplicationAssignment ) { - assertEquals(replicas, actualReplicationAssignment(part)) - } - } + val assignment = Map(0 -> List(0, 1, 2), + 1 -> List(1, 2, 3)) + AdminUtils.createTopicWithAssignment(zkClient, "test", assignment) + val found = ZkUtils.getPartitionAssignmentForTopics(zkClient, Seq("test")) + assertEquals(assignment, found("test")) } @Test @@ -157,7 +118,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val topic = "test" TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4)) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment) // create leaders for all partitions TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) val actualReplicaAssignment = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata.map(p => p.replicas) @@ -166,12 +127,9 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { for(i <- 0 until actualReplicaList.size) assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i)) - try { - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) - fail("shouldn't be able to create a topic already exists") - } catch { - case e: TopicExistsException => // this is good - case e2 => throw e2 + intercept[TopicExistsException] { + // shouldn't be able to create a topic that already exists + AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment) } } @@ -179,15 +137,13 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { def testGetTopicMetadata() { val expectedReplicaAssignment = Map( 0 -> List(0, 1, 2), - 1 -> List(1, 2, 3) - ) + 1 -> List(1, 2, 3)) val leaderForPartitionMap = Map( 0 -> 0, - 1 -> 1 - ) + 1 -> 1) val topic = "auto-topic" TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3)) - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment) // create leaders for all partitions TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) @@ -215,7 +171,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // create brokers val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment) // reassign partition 0 val newReplicas = Seq(0, 2, 3) val partitionToBeReassigned = 0 @@ -240,7 +196,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // create brokers val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment) // reassign partition 0 val newReplicas = Seq(1, 2, 3) val partitionToBeReassigned = 0 @@ -266,7 +222,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // create brokers val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment) // reassign partition 0 val newReplicas = Seq(2, 3) val partitionToBeReassigned = 0 @@ -307,7 +263,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val expectedReplicaAssignment = Map(0 -> List(0, 1)) val topic = "test" // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment) // put the partition in the reassigned path as well // reassign partition 0 val newReplicas = Seq(0, 1) @@ -346,7 +302,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // create brokers val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_)) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment) val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) // broker 2 should be the leader since it was started first val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, 1000, None).get @@ -367,7 +323,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // create brokers val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_)) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment) val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) // broker 2 should be the leader since it was started first @@ -404,6 +360,50 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { servers.foreach(_.shutdown()) } } + + /** + * This test creates a topic with a few config overrides and checks that the configs are applied to the new topic + * then changes the config and checks that the new values take effect. + */ + @Test + def testTopicConfigChange() { + val partitions = 3 + val topic = "my-topic" + val server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0))) + + def makeConfig(messageSize: Int, retentionMs: Long) = { + var props = new Properties() + props.setProperty(LogConfig.MaxMessageBytesProp, messageSize.toString) + props.setProperty(LogConfig.RententionMsProp, retentionMs.toString) + props + } + + def checkConfig(messageSize: Int, retentionMs: Long) { + TestUtils.retry(10000) { + for(part <- 0 until partitions) { + val logOpt = server.logManager.getLog(TopicAndPartition(topic, part)) + assertTrue(logOpt.isDefined) + assertEquals(retentionMs, logOpt.get.config.retentionMs) + assertEquals(messageSize, logOpt.get.config.maxMessageSize) + } + } + } + + try { + // create a topic with a few config overrides and check that they are applied + val maxMessageSize = 1024 + val retentionMs = 1000*1000 + AdminUtils.createTopic(server.zkClient, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs)) + checkConfig(maxMessageSize, retentionMs) + + // now double the config values for the topic and check that it is applied + AdminUtils.changeTopicConfig(server.zkClient, topic, makeConfig(2*maxMessageSize, 2 * retentionMs)) + checkConfig(2*maxMessageSize, 2 * retentionMs) + } finally { + server.shutdown() + server.config.logDirs.map(Utils.rm(_)) + } + } private def checkIfReassignPartitionPathExists(): Boolean = { ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath) diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index 8ae30ea785d..fec17aa1098 100644 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -20,6 +20,7 @@ package kafka.consumer import java.util.concurrent._ import java.util.concurrent.atomic._ +import java.util.Properties import scala.collection._ import junit.framework.Assert._ @@ -27,7 +28,7 @@ import kafka.message._ import kafka.server._ import kafka.utils.TestUtils._ import kafka.utils._ -import kafka.admin.CreateTopicCommand +import kafka.admin.AdminUtils import org.junit.Test import kafka.serializer._ import kafka.cluster.{Broker, Cluster} @@ -60,7 +61,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { override def setUp() { super.setUp - CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString) + AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(configs.head.brokerId)), new Properties) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) } diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index f7ee914d4da..c70a435398a 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -26,7 +26,7 @@ import org.scalatest.junit.JUnit3Suite import org.apache.log4j.{Level, Logger} import kafka.message._ import kafka.serializer._ -import kafka.admin.CreateTopicCommand +import kafka.admin.AdminUtils import org.I0Itec.zkclient.ZkClient import kafka.utils._ import kafka.producer.{ProducerConfig, KeyedMessage, Producer} @@ -298,7 +298,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkClient = new ZkClient(zookeeperConnect, 6000, 30000, ZKStringSerializer) // create topic topic1 with 1 partition on broker 0 - CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0") + AdminUtils.createTopic(zkClient, topic, 1, 1) // send some messages to each broker val sentMessages1 = sendMessages(configs.head, nMessages, "batch1", NoCompressionCodec, 1) diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala index 4c646f0803e..c046a398263 100644 --- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala +++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala @@ -32,8 +32,7 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L val topic = "test_topic" val group = "default_group" val testConsumer = "consumer" - val BrokerPort = 9892 - val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, BrokerPort))) + val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0))) val NumMessages = 10 val LargeOffset = 10000 val SmallOffset = -1 diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index 5a57bd14526..845b966e3f0 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -30,7 +30,7 @@ import kafka.serializer._ import kafka.producer.{KeyedMessage, Producer} import kafka.utils.TestUtils._ import kafka.utils.TestUtils -import kafka.admin.CreateTopicCommand +import kafka.admin.AdminUtils class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { @@ -55,7 +55,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { override def setUp() { super.setUp - CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString) + AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(configs.head.brokerId))) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient) fetcher.stopAllConnections() diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 2fc08d366a4..1c6a01b6fa0 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -27,7 +27,7 @@ import org.I0Itec.zkclient.ZkClient import kafka.zk.ZooKeeperTestHarness import org.scalatest.junit.JUnit3Suite import scala.collection._ -import kafka.admin.{AdminUtils, CreateTopicCommand} +import kafka.admin.AdminUtils import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException} import kafka.utils.{TestUtils, Utils} @@ -42,19 +42,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with val configs = List(config) val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) - override def setUp() { - super.setUp - // temporarily set request handler logger to a higher level - requestHandlerLogger.setLevel(Level.FATAL) - } - - override def tearDown() { - // restore set request handler logger to a higher level - requestHandlerLogger.setLevel(Level.ERROR) - - super.tearDown - } - def testFetchRequestCanProperlySerialize() { val request = new FetchRequestBuilder() .clientId("test-client") @@ -299,7 +286,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with def testConsumerEmptyTopic() { val newTopic = "new-topic" - CreateTopicCommand.createTopic(zkClient, newTopic, 1, 1, config.brokerId.toString) + AdminUtils.createTopic(zkClient, newTopic, 1, 1) assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => AdminUtils.fetchTopicMetadataFromZk(newTopic, zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, newTopic, 0, 500) @@ -327,10 +314,10 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with } // wait until the messages are published - TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test1", 0).get.logEndOffset == 2 }, 1000) - TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test2", 0).get.logEndOffset == 2 }, 1000) - TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test3", 0).get.logEndOffset == 2 }, 1000) - TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test4", 0).get.logEndOffset == 2 }, 1000) + TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test1", 0)).get.logEndOffset == 2 }, 1000) + TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test2", 0)).get.logEndOffset == 2 }, 1000) + TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test3", 0)).get.logEndOffset == 2 }, 1000) + TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test4", 0)).get.logEndOffset == 2 }, 1000) val replicaId = servers.head.config.brokerId val hwWaitMs = config.replicaHighWatermarkCheckpointIntervalMs @@ -354,7 +341,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with */ def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Seq[String], brokerId: Int) { for( topic <- topics ) { - CreateTopicCommand.createTopic(zkClient, topic, 1, 1, brokerId.toString) + AdminUtils.createTopic(zkClient, topic, 1, 1) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) } } diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 5169aeabde1..be94254a76c 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -19,7 +19,7 @@ package kafka.integration import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness -import kafka.admin.CreateTopicCommand +import kafka.admin.AdminUtils import java.nio.ByteBuffer import junit.framework.Assert._ import org.easymock.EasyMock @@ -48,7 +48,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { def testTopicMetadataRequest { // create topic val topic = "test" - CreateTopicCommand.createTopic(zkClient, topic, 1) + AdminUtils.createTopic(zkClient, topic, 1, 1) // create a topic metadata request val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0) @@ -64,7 +64,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { def testBasicTopicMetadata { // create topic val topic = "test" - CreateTopicCommand.createTopic(zkClient, topic, 1) + AdminUtils.createTopic(zkClient, topic, 1, 1) // set up leader for topic partition 0 val leaderForPartitionMap = Map( 0 -> configs.head.brokerId @@ -83,7 +83,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { def testGetAllTopicMetadata { // create topic val topic = "test" - CreateTopicCommand.createTopic(zkClient, topic, 1) + AdminUtils.createTopic(zkClient, topic, 1, 1) // set up leader for topic partition 0 val leaderForPartitionMap = Map( 0 -> configs.head.brokerId diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index fad3baac1c6..6916df4b550 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -58,7 +58,7 @@ class LogManagerTest extends JUnit3Suite { */ @Test def testCreateLog() { - val log = logManager.getOrCreateLog(name, 0) + val log = logManager.createLog(TopicAndPartition(name, 0), logConfig) val logFile = new File(logDir, name + "-0") assertTrue(logFile.exists) log.append(TestUtils.singleMessageSet("test".getBytes())) @@ -69,7 +69,7 @@ class LogManagerTest extends JUnit3Suite { */ @Test def testGetNonExistentLog() { - val log = logManager.getLog(name, 0) + val log = logManager.getLog(TopicAndPartition(name, 0)) assertEquals("No log should be found.", None, log) val logFile = new File(logDir, name + "-0") assertTrue(!logFile.exists) @@ -80,7 +80,7 @@ class LogManagerTest extends JUnit3Suite { */ @Test def testCleanupExpiredSegments() { - val log = logManager.getOrCreateLog(name, 0) + val log = logManager.createLog(TopicAndPartition(name, 0), logConfig) var offset = 0L for(i <- 0 until 200) { var set = TestUtils.singleMessageSet("test".getBytes()) @@ -120,7 +120,7 @@ class LogManagerTest extends JUnit3Suite { logManager.startup // create a log - val log = logManager.getOrCreateLog(name, 0) + val log = logManager.createLog(TopicAndPartition(name, 0), config) var offset = 0L // add a bunch of messages that should be larger than the retentionSize @@ -158,7 +158,7 @@ class LogManagerTest extends JUnit3Suite { val config = logConfig.copy(flushMs = 1000) logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 1000L, time.scheduler, time) logManager.startup - val log = logManager.getOrCreateLog(name, 0) + val log = logManager.createLog(TopicAndPartition(name, 0), config) val lastFlush = log.lastFlushTime for(i <- 0 until 200) { var set = TestUtils.singleMessageSet("test".getBytes()) @@ -182,7 +182,7 @@ class LogManagerTest extends JUnit3Suite { // verify that logs are always assigned to the least loaded partition for(partition <- 0 until 20) { - logManager.getOrCreateLog("test", partition) + logManager.createLog(TopicAndPartition("test", partition), logConfig) assertEquals("We should have created the right number of logs", partition + 1, logManager.allLogs.size) val counts = logManager.allLogs.groupBy(_.dir.getParent).values.map(_.size) assertTrue("Load should balance evenly", counts.max <= counts.min + 1) diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 04acef5a5c3..e4b057e00fc 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -27,7 +27,7 @@ import org.junit.Assert._ import org.junit.Test import kafka.utils._ import java.util -import kafka.admin.{AdminUtils, CreateTopicCommand} +import kafka.admin.AdminUtils import util.Properties import kafka.api.FetchRequestBuilder import kafka.common.{KafkaException, ErrorMapping, FailedToSendMessageException} @@ -77,17 +77,15 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ // restore set request handler logger to a higher level requestHandlerLogger.setLevel(Level.ERROR) server1.shutdown - server1.awaitShutdown() server2.shutdown - server2.awaitShutdown() Utils.rm(server1.config.logDirs) Utils.rm(server2.config.logDirs) super.tearDown() } - + @Test def testUpdateBrokerPartitionInfo() { - CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2) + AdminUtils.createTopic(zkClient, "new-topic", 1, 2) assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500) @@ -152,7 +150,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val producerConfig2 = new ProducerConfig(props2) // create topic with 1 partition and await leadership - CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2) + AdminUtils.createTopic(zkClient, "new-topic", 1, 2) assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500) @@ -203,7 +201,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) // create topic - CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0") + AdminUtils.createTopicWithAssignment(zkClient, "new-topic", Map(0->Seq(0), 1->Seq(0), 2->Seq(0), 3->Seq(0))) assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500) @@ -213,13 +211,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val config = new ProducerConfig(props) val producer = new Producer[String, String](config) - try { - // Available partition ids should be 0, 1, 2 and 3, all lead and hosted only - // on broker 0 - producer.send(new KeyedMessage[String, String]("new-topic", "test", "test1")) - } catch { - case e => fail("Unexpected exception: " + e) - } + // Available partition ids should be 0, 1, 2 and 3, all lead and hosted only on broker 0 + producer.send(new KeyedMessage[String, String]("new-topic", "test", "test1")) // kill the broker server1.shutdown @@ -264,7 +257,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val producer = new Producer[String, String](config) // create topics in ZK - CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1") + AdminUtils.createTopicWithAssignment(zkClient, "new-topic", Map(0->Seq(0,1))) assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500) diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index b5ee31d5178..bbf04063d2b 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -19,7 +19,7 @@ package kafka.producer import java.net.SocketTimeoutException import junit.framework.Assert -import kafka.admin.CreateTopicCommand +import kafka.admin.AdminUtils import kafka.integration.KafkaServerTestHarness import kafka.message._ import kafka.server.KafkaConfig @@ -92,7 +92,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val props = TestUtils.getSyncProducerConfig(server.socketServer.port) val producer = new SyncProducer(new SyncProducerConfig(props)) - CreateTopicCommand.createTopic(zkClient, "test", 1, 1) + AdminUtils.createTopic(zkClient, "test", 1, 1) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0, 500) val message1 = new Message(new Array[Byte](configs(0).messageMaxBytes + 1)) @@ -135,9 +135,9 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { } // #2 - test that we get correct offsets when partition is owned by broker - CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1) + AdminUtils.createTopic(zkClient, "topic1", 1, 1) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "topic1", 0, 500) - CreateTopicCommand.createTopic(zkClient, "topic3", 1, 1) + AdminUtils.createTopic(zkClient, "topic3", 1, 1) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "topic3", 0, 500) val response2 = producer.send(request) diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 8a3e33b5a39..99635021da2 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -62,7 +62,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { assertEquals(0L, fooPartition0Hw) val partition0 = replicaManager.getOrCreatePartition(topic, 0, 1) // create leader and follower replicas - val log0 = logManagers(0).getOrCreateLog(topic, 0) + val log0 = logManagers(0).createLog(TopicAndPartition(topic, 0), LogConfig()) val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, SystemTime, 0, Some(log0)) partition0.addReplicaIfNotExists(leaderReplicaPartition0) val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, SystemTime) @@ -101,7 +101,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { assertEquals(0L, topic1Partition0Hw) val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, 1) // create leader log - val topic1Log0 = logManagers(0).getOrCreateLog(topic1, 0) + val topic1Log0 = logManagers(0).createLog(TopicAndPartition(topic1, 0), LogConfig()) // create a local replica for topic1 val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0, SystemTime, 0, Some(topic1Log0)) topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0) @@ -117,7 +117,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { // add another partition and set highwatermark val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, 1) // create leader log - val topic2Log0 = logManagers(0).getOrCreateLog(topic2, 0) + val topic2Log0 = logManagers(0).createLog(TopicAndPartition(topic2, 0), LogConfig()) // create a local replica for topic2 val leaderReplicaTopic2Partition0 = new Replica(configs.head.brokerId, topic2Partition0, SystemTime, 0, Some(topic2Log0)) topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0) diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 129bc56da83..176718e5000 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -19,7 +19,7 @@ package kafka.server import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness -import kafka.admin.CreateTopicCommand +import kafka.admin.AdminUtils import kafka.utils.TestUtils._ import junit.framework.Assert._ import kafka.utils.{ZkUtils, Utils, TestUtils} @@ -61,7 +61,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { val partitionId = 0 // create topic with 1 partition, 2 replicas, one on each broker - CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1") + AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(0, 1))) // wait until leader is elected val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500) @@ -108,7 +108,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { val partitionId = 0 // create topic with 1 partition, 2 replicas, one on each broker - CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1") + AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(0, 1))) // wait until leader is elected val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500) diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index f857171ef68..6801f4e4e55 100644 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -26,7 +26,7 @@ import org.junit.{After, Before, Test} import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} import kafka.zk.ZooKeeperTestHarness import org.scalatest.junit.JUnit3Suite -import kafka.admin.CreateTopicCommand +import kafka.admin.AdminUtils import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest} import kafka.utils.TestUtils._ import kafka.common.{ErrorMapping, TopicAndPartition} @@ -82,10 +82,10 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { val part = Integer.valueOf(topicPartition.split("-").last).intValue // setup brokers in zookeeper as owners of partitions for this test - CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1") + AdminUtils.createTopic(zkClient, topic, 1, 1) val logManager = server.getLogManager - val log = logManager.getOrCreateLog(topic, part) + val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig) val message = new Message(Integer.toString(42).getBytes()) for(i <- 0 until 20) @@ -120,7 +120,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { val topic = topicPartition.split("-").head // setup brokers in zookeeper as owners of partitions for this test - CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1") + AdminUtils.createTopic(zkClient, topic, 1, 1) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) var offsetChanged = false @@ -145,10 +145,10 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { val part = Integer.valueOf(topicPartition.split("-").last).intValue // setup brokers in zookeeper as owners of partitions for this test - CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1") + AdminUtils.createTopic(zkClient, topic, 3, 1) val logManager = server.getLogManager - val log = logManager.getOrCreateLog(topic, part) + val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig) val message = new Message(Integer.toString(42).getBytes()) for(i <- 0 until 20) log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) @@ -174,10 +174,10 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { val part = Integer.valueOf(topicPartition.split("-").last).intValue // setup brokers in zookeeper as owners of partitions for this test - CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1") + AdminUtils.createTopic(zkClient, topic, 3, 1) val logManager = server.getLogManager - val log = logManager.getOrCreateLog(topic, part) + val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig) val message = new Message(Integer.toString(42).getBytes()) for(i <- 0 until 20) log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 7430485f1fd..d2650e30677 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -19,7 +19,7 @@ package kafka.server import org.scalatest.junit.JUnit3Suite import org.junit.Assert._ import java.io.File -import kafka.admin.CreateTopicCommand +import kafka.admin.AdminUtils import kafka.utils.TestUtils._ import kafka.utils.IntEncoder import kafka.utils.{Utils, TestUtils} @@ -53,7 +53,14 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs)) producerProps.put("key.serializer.class", classOf[IntEncoder].getName.toString) producerProps.put("request.required.acks", "-1") - + + override def tearDown() { + super.tearDown() + for(server <- servers) { + server.shutdown() + Utils.rm(server.config.logDirs(0)) + } + } def testHWCheckpointNoFailuresSingleLogSegment { // start both servers @@ -64,7 +71,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producer = new Producer[Int, String](new ProducerConfig(producerProps)) // create topic with 1 partition, 2 replicas, one on each broker - CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":")) + AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(0,1))) // wait until leader is elected var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500) @@ -86,7 +93,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(numMessages, leaderHW) val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L) assertEquals(numMessages, followerHW) - servers.foreach(server => { server.shutdown(); Utils.rm(server.config.logDirs(0))}) } def testHWCheckpointWithFailuresSingleLogSegment { @@ -98,7 +104,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producer = new Producer[Int, String](new ProducerConfig(producerProps)) // create topic with 1 partition, 2 replicas, one on each broker - CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":")) + AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(0,1))) // wait until leader is elected var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500) @@ -148,7 +154,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producer.close() assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)) assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)) - servers.foreach(server => Utils.rm(server.config.logDirs)) } def testHWCheckpointNoFailuresMultipleLogSegments { @@ -163,7 +168,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producer = new Producer[Int, String](new ProducerConfig(producerProps)) // create topic with 1 partition, 2 replicas, one on each broker - CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":")) + AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(0,1))) // wait until leader is elected var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500) @@ -182,7 +187,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(hw, leaderHW) val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L) assertEquals(hw, followerHW) - servers.foreach(server => Utils.rm(server.config.logDirs)) } def testHWCheckpointWithFailuresMultipleLogSegments { @@ -197,7 +201,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producer = new Producer[Int, String](new ProducerConfig(producerProps)) // create topic with 1 partition, 2 replicas, one on each broker - CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":")) + AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(server1.config.brokerId, server2.config.brokerId))) // wait until leader is elected var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500) @@ -241,7 +245,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producer.close() assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)) assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)) - servers.foreach(server => Utils.rm(server.config.logDirs)) } private def sendMessages(n: Int = 1) { diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 6989c95e611..c0475d07a77 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -26,7 +26,6 @@ import org.junit.{After, Before, Test} import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} import kafka.zk.ZooKeeperTestHarness import org.scalatest.junit.JUnit3Suite -import kafka.admin.CreateTopicCommand import kafka.api.{OffsetCommitRequest, OffsetFetchRequest} import kafka.utils.TestUtils._ import kafka.common.{ErrorMapping, TopicAndPartition, OffsetMetadataAndError} diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index d0e35909f05..dd85c718167 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -22,9 +22,10 @@ import kafka.zk.ZooKeeperTestHarness import kafka.utils.TestUtils._ import kafka.producer.KeyedMessage import kafka.serializer.StringEncoder -import kafka.admin.CreateTopicCommand +import kafka.admin.AdminUtils import kafka.utils.TestUtils import junit.framework.Assert._ +import kafka.common._ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { val props = createBrokerConfigs(2) @@ -50,7 +51,7 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { // create a topic and partition and await leadership for (topic <- List(topic1,topic2)) { - CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":")) + AdminUtils.createTopic(zkClient, topic, 1, 2) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) } @@ -65,9 +66,10 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { def logsMatch(): Boolean = { var result = true for (topic <- List(topic1, topic2)) { - val expectedOffset = brokers.head.getLogManager().getLog(topic, partition).get.logEndOffset + val topicAndPart = TopicAndPartition(topic, partition) + val expectedOffset = brokers.head.getLogManager().getLog(topicAndPart).get.logEndOffset result = result && expectedOffset > 0 && brokers.foldLeft(true) { (total, item) => total && - (expectedOffset == item.getLogManager().getLog(topic, partition).get.logEndOffset) } + (expectedOffset == item.getLogManager().getLog(topicAndPart).get.logEndOffset) } } result } diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 3728f8c60c8..c5f39cb6888 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -26,7 +26,7 @@ import kafka.zk.ZooKeeperTestHarness import kafka.producer._ import kafka.utils.IntEncoder import kafka.utils.TestUtils._ -import kafka.admin.CreateTopicCommand +import kafka.admin.AdminUtils import kafka.api.FetchRequestBuilder import kafka.utils.{TestUtils, Utils} @@ -49,7 +49,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { var producer = new Producer[Int, String](new ProducerConfig(producerConfig)) // create topic - CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0") + AdminUtils.createTopic(zkClient, topic, 1, 1) // send some messages producer.send(sent1.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*) diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 571e2df48bd..1c6f615d0c7 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -67,7 +67,7 @@ class SimpleFetchTest extends JUnit3Suite { EasyMock.replay(log) val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) - EasyMock.expect(logManager.getLog(topic, partitionId)).andReturn(Some(log)).anyTimes() + EasyMock.expect(logManager.getLog(TopicAndPartition(topic, partitionId))).andReturn(Some(log)).anyTimes() EasyMock.replay(logManager) val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager]) @@ -133,7 +133,7 @@ class SimpleFetchTest extends JUnit3Suite { EasyMock.replay(log) val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) - EasyMock.expect(logManager.getLog(topic, 0)).andReturn(Some(log)).anyTimes() + EasyMock.expect(logManager.getLog(TopicAndPartition(topic, 0))).andReturn(Some(log)).anyTimes() EasyMock.replay(logManager) val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager]) diff --git a/core/src/test/scala/unit/kafka/utils/JsonTest.scala b/core/src/test/scala/unit/kafka/utils/JsonTest.scala new file mode 100644 index 00000000000..4482dab56be --- /dev/null +++ b/core/src/test/scala/unit/kafka/utils/JsonTest.scala @@ -0,0 +1,27 @@ +package kafka.utils + +import junit.framework.Assert._ +import org.junit.{Test, After, Before} + +class JsonTest { + + @Test + def testJsonEncoding() { + assertEquals("null", Json.encode(null)) + assertEquals("1", Json.encode(1)) + assertEquals("1", Json.encode(1L)) + assertEquals("1", Json.encode(1.toByte)) + assertEquals("1", Json.encode(1.toShort)) + assertEquals("1.0", Json.encode(1.0)) + assertEquals("\"str\"", Json.encode("str")) + assertEquals("true", Json.encode(true)) + assertEquals("false", Json.encode(false)) + assertEquals("[]", Json.encode(Seq())) + assertEquals("[1,2,3]", Json.encode(Seq(1,2,3))) + assertEquals("[1,\"2\",[3]]", Json.encode(Seq(1,"2",Seq(3)))) + assertEquals("{}", Json.encode(Map())) + assertEquals("{\"a\":1,\"b\":2}", Json.encode(Map("a" -> 1, "b" -> 2))) + assertEquals("{\"a\":[1,2],\"c\":[3,4]}", Json.encode(Map("a" -> Seq(1,2), "c" -> Seq(3,4)))) + } + +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala index ec27ef94e4a..b364ac2d6d6 100644 --- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala +++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala @@ -76,7 +76,9 @@ class SchedulerTest { @Test def testNonPeriodicTask() { scheduler.schedule("test", counter1.getAndIncrement, delay = 0) - retry(30000, () => assertEquals(counter1.get, 1)) + retry(30000) { + assertEquals(counter1.get, 1) + } Thread.sleep(5) assertEquals("Should only run once", 1, counter1.get) } @@ -84,6 +86,8 @@ class SchedulerTest { @Test def testPeriodicTask() { scheduler.schedule("test", counter1.getAndIncrement, delay = 0, period = 5) - retry(30000, () => assertTrue("Should count to 20", counter1.get >= 20)) + retry(30000){ + assertTrue("Should count to 20", counter1.get >= 20) + } } } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 98901c21a8b..40bfacbc888 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -23,6 +23,7 @@ import java.nio._ import java.nio.channels._ import java.util.Random import java.util.Properties +import junit.framework.AssertionFailedError import junit.framework.Assert._ import kafka.server._ import kafka.producer._ @@ -122,7 +123,7 @@ object TestUtils extends Logging { /** * Create a test config for the given node id */ - def createBrokerConfig(nodeId: Int, port: Int): Properties = { + def createBrokerConfig(nodeId: Int, port: Int = choosePort()): Properties = { val props = new Properties props.put("broker.id", nodeId.toString) props.put("host.name", "localhost") @@ -448,18 +449,20 @@ object TestUtils extends Logging { * Execute the given block. If it throws an assert error, retry. Repeat * until no error is thrown or the time limit ellapses */ - def retry(maxWaitMs: Long, block: () => Unit) { + def retry(maxWaitMs: Long)(block: => Unit) { var wait = 1L val startTime = System.currentTimeMillis() while(true) { try { - block() + block return } catch { - case e: AssertionError => - if(System.currentTimeMillis - startTime > maxWaitMs) { + case e: AssertionFailedError => + val ellapsed = System.currentTimeMillis - startTime + if(ellapsed > maxWaitMs) { throw e } else { + info("Attempt failed, sleeping for " + wait + ", and then retrying.") Thread.sleep(wait) wait += math.min(wait, 1000) }