KAFKA-1845 KafkaConfig should use ConfigDef patch by Andrii Biletskyi reviewed by Gwen Shapira

This commit is contained in:
Joe Stein 2015-03-05 09:53:27 -05:00
parent 3a9f4b833b
commit 8f0003f9b6
48 changed files with 1528 additions and 522 deletions

1
.gitignore vendored
View File

@ -7,6 +7,7 @@ lib_managed/
src_managed/
project/boot/
project/plugins/project/
patch-process/*
.idea
.svn
.classpath

View File

@ -3,9 +3,9 @@
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
@ -26,22 +26,22 @@ import org.apache.kafka.common.utils.Utils;
/**
* This class is used for specifying the set of expected configurations, their type, their defaults, their
* documentation, and any special validation logic used for checking the correctness of the values the user provides.
* <p>
* <p/>
* Usage of this class looks something like this:
*
* <p/>
* <pre>
* ConfigDef defs = new ConfigDef();
* defs.define(&quot;config_name&quot;, Type.STRING, &quot;default string value&quot;, &quot;This configuration is used for blah blah blah.&quot;);
* defs.define(&quot;another_config_name&quot;, Type.INT, 42, Range.atLeast(0), &quot;More documentation on this config&quot;);
*
*
* Properties props = new Properties();
* props.setProperty(&quot;config_name&quot;, &quot;some value&quot;);
* Map&lt;String, Object&gt; configs = defs.parse(props);
*
*
* String someConfig = (String) configs.get(&quot;config_name&quot;); // will return &quot;some value&quot;
* int anotherConfig = (Integer) configs.get(&quot;another_config_name&quot;); // will return default value of 42
* </pre>
*
* <p/>
* This class can be used stand-alone or in combination with {@link AbstractConfig} which provides some additional
* functionality for accessing configs.
*/
@ -53,7 +53,7 @@ public class ConfigDef {
/**
* Returns unmodifiable set of properties names defined in this {@linkplain ConfigDef}
*
*
* @return new unmodifiable {@link Set} instance containing the keys
*/
public Set<String> names() {
@ -62,90 +62,121 @@ public class ConfigDef {
/**
* Define a new configuration
*
* @param name The name of the config parameter
* @param type The type of the config
* @param defaultValue The default value to use if this config isn't present
* @param validator A validator to use in checking the correctness of the config
* @param importance The importance of this config: is this something you will likely need to change.
*
* @param name The name of the config parameter
* @param type The type of the config
* @param defaultValue The default value to use if this config isn't present
* @param validator A validator to use in checking the correctness of the config
* @param importance The importance of this config: is this something you will likely need to change.
* @param documentation The documentation string for the config
* @param required Should the config fail if given property is not set and doesn't have default value specified
* @return This ConfigDef so you can chain calls
*/
public ConfigDef define(String name,
Type type,
Object defaultValue,
Validator validator,
Importance importance,
String documentation) {
public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation,
boolean required) {
if (configKeys.containsKey(name))
throw new ConfigException("Configuration " + name + " is defined twice.");
Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE
: parseType(name, defaultValue, type);
configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation));
Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation, required));
return this;
}
/**
* Define a new required configuration
*
* @param name The name of the config parameter
* @param type The type of the config
* @param defaultValue The default value to use if this config isn't present
* @param validator A validator to use in checking the correctness of the config
* @param importance The importance of this config: is this something you will likely need to change.
* @param documentation The documentation string for the config
* @return This ConfigDef so you can chain calls
*/
public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) {
return define(name, type, defaultValue, validator, importance, documentation, true);
}
/**
* Define a new configuration with no special validation logic
*
* @param name The name of the config parameter
* @param type The type of the config
* @param defaultValue The default value to use if this config isn't present
* @param importance The importance of this config: is this something you will likely need to change.
*
* @param name The name of the config parameter
* @param type The type of the config
* @param defaultValue The default value to use if this config isn't present
* @param importance The importance of this config: is this something you will likely need to change.
* @param documentation The documentation string for the config
* @return This ConfigDef so you can chain calls
*/
public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation) {
return define(name, type, defaultValue, null, importance, documentation);
return define(name, type, defaultValue, null, importance, documentation, true);
}
/**
* Define a required parameter with no default value
*
* @param name The name of the config parameter
* @param type The type of the config
* @param validator A validator to use in checking the correctness of the config
* @param importance The importance of this config: is this something you will likely need to change.
*
* @param name The name of the config parameter
* @param type The type of the config
* @param validator A validator to use in checking the correctness of the config
* @param importance The importance of this config: is this something you will likely need to change.
* @param documentation The documentation string for the config
* @return This ConfigDef so you can chain calls
*/
public ConfigDef define(String name, Type type, Validator validator, Importance importance, String documentation) {
return define(name, type, NO_DEFAULT_VALUE, validator, importance, documentation);
return define(name, type, NO_DEFAULT_VALUE, validator, importance, documentation, true);
}
/**
* Define a required parameter with no default value and no special validation logic
*
* @param name The name of the config parameter
* @param type The type of the config
* @param importance The importance of this config: is this something you will likely need to change.
*
* @param name The name of the config parameter
* @param type The type of the config
* @param importance The importance of this config: is this something you will likely need to change.
* @param documentation The documentation string for the config
* @return This ConfigDef so you can chain calls
*/
public ConfigDef define(String name, Type type, Importance importance, String documentation) {
return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation);
return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, true);
}
/**
* Define a required parameter with no default value and no special validation logic
*
* @param name The name of the config parameter
* @param type The type of the config
* @param importance The importance of this config: is this something you will likely need to change.
* @param documentation The documentation string for the config
* @param required Should the config fail if given property is not set and doesn't have default value specified
* @return This ConfigDef so you can chain calls
*/
public ConfigDef define(String name, Type type, Importance importance, String documentation, boolean required) {
return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, required);
}
/**
* Parse and validate configs against this configuration definition. The input is a map of configs. It is expected
* that the keys of the map are strings, but the values can either be strings or they may already be of the
* appropriate type (int, string, etc). This will work equally well with either java.util.Properties instances or a
* programmatically constructed map.
*
*
* @param props The configs to parse and validate
* @return Parsed and validated configs. The key will be the config name and the value will be the value parsed into
* the appropriate type (int, string, etc)
* the appropriate type (int, string, etc)
*/
public Map<String, Object> parse(Map<?, ?> props) {
/* parse all known keys */
Map<String, Object> values = new HashMap<String, Object>();
for (ConfigKey key : configKeys.values()) {
Object value;
// props map contains setting - assign ConfigKey value
if (props.containsKey(key.name))
value = parseType(key.name, props.get(key.name), key.type);
else if (key.defaultValue == NO_DEFAULT_VALUE)
throw new ConfigException("Missing required configuration \"" + key.name
+ "\" which has no default value.");
// props map doesn't contain setting, the key is required and no default value specified - it's an error
else if (key.defaultValue == NO_DEFAULT_VALUE && key.required)
throw new ConfigException("Missing required configuration \"" + key.name + "\" which has no default value.");
// props map doesn't contain setting, no default value specified and the key is not required - assign it to null
else if (!key.hasDefault() && !key.required)
value = null;
// otherwise assign setting it's default value
else
value = key.defaultValue;
if (key.validator != null)
@ -157,10 +188,10 @@ public class ConfigDef {
/**
* Parse a value according to its expected type.
*
* @param name The config name
*
* @param name The config name
* @param value The config value
* @param type The expected type
* @param type The expected type
* @return The parsed object
*/
private Object parseType(String name, Object value, Type type) {
@ -185,8 +216,7 @@ public class ConfigDef {
if (value instanceof String)
return trimmed;
else
throw new ConfigException(name, value, "Expected value to be a string, but it was a "
+ value.getClass().getName());
throw new ConfigException(name, value, "Expected value to be a string, but it was a " + value.getClass().getName());
case INT:
if (value instanceof Integer) {
return (Integer) value;
@ -195,6 +225,14 @@ public class ConfigDef {
} else {
throw new ConfigException(name, value, "Expected value to be an number.");
}
case SHORT:
if (value instanceof Short) {
return (Short) value;
} else if (value instanceof String) {
return Short.parseShort(trimmed);
} else {
throw new ConfigException(name, value, "Expected value to be an number.");
}
case LONG:
if (value instanceof Integer)
return ((Integer) value).longValue();
@ -242,7 +280,7 @@ public class ConfigDef {
* The config types
*/
public enum Type {
BOOLEAN, STRING, INT, LONG, DOUBLE, LIST, CLASS;
BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS;
}
public enum Importance {
@ -270,7 +308,7 @@ public class ConfigDef {
/**
* A numeric range that checks only the lower bound
*
*
* @param min The minimum acceptable value
*/
public static Range atLeast(Number min) {
@ -303,7 +341,7 @@ public class ConfigDef {
}
public static class ValidString implements Validator {
private final List<String> validStrings;
List<String> validStrings;
private ValidString(List<String> validStrings) {
this.validStrings = validStrings;
@ -316,14 +354,15 @@ public class ConfigDef {
@Override
public void ensureValid(String name, Object o) {
String s = (String) o;
if (!validStrings.contains(s))
if (!validStrings.contains(s)) {
throw new ConfigException(name, o, "String must be one of: " + Utils.join(validStrings, ", "));
}
}
public String toString() {
return "[" + Utils.join(validStrings, ", ") + "]";
}
}
private static class ConfigKey {
@ -333,13 +372,9 @@ public class ConfigDef {
public final Object defaultValue;
public final Validator validator;
public final Importance importance;
public final boolean required;
public ConfigKey(String name,
Type type,
Object defaultValue,
Validator validator,
Importance importance,
String documentation) {
public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation, boolean required) {
super();
this.name = name;
this.type = type;
@ -349,6 +384,7 @@ public class ConfigDef {
if (this.validator != null)
this.validator.ensureValid(name, defaultValue);
this.documentation = documentation;
this.required = required;
}
public boolean hasDefault() {
@ -408,4 +444,4 @@ public class ConfigDef {
b.append("</table>");
return b.toString();
}
}
}

View File

@ -21,7 +21,7 @@ import scala.collection.JavaConversions._
import joptsimple.OptionParser
import metrics.KafkaMetricsReporter
import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
import kafka.utils.{CommandLineUtils, Utils, Logging}
import kafka.utils.{VerifiableProperties, CommandLineUtils, Utils, Logging}
object Kafka extends Logging {
@ -47,13 +47,13 @@ object Kafka extends Logging {
props.putAll(CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt)))
}
new KafkaConfig(props)
KafkaConfig.fromProps(props)
}
def main(args: Array[String]): Unit = {
try {
val serverConfig = getKafkaConfigFromArgs(args)
KafkaMetricsReporter.startReporters(serverConfig.props)
KafkaMetricsReporter.startReporters(new VerifiableProperties(serverConfig.toProps))
val kafkaServerStartable = new KafkaServerStartable(serverConfig)
// attach shutdown handler to catch control-c

View File

@ -1012,7 +1012,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
// if the replica to be removed from the ISR is the last surviving member of the ISR and unclean leader election
// is disallowed for the corresponding topic, then we must preserve the ISR membership so that the replica can
// eventually be restored as the leader.
if (newIsr.isEmpty && !LogConfig.fromProps(config.props.props, AdminUtils.fetchTopicConfig(zkClient,
if (newIsr.isEmpty && !LogConfig.fromProps(config.toProps, AdminUtils.fetchTopicConfig(zkClient,
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
info("Retaining last ISR %d of partition %s since unclean leader election is disabled".format(replicaId, topicAndPartition))
newIsr = leaderAndIsr.isr

View File

@ -61,7 +61,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi
case true =>
// Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration
// for unclean leader election.
if (!LogConfig.fromProps(config.props.props, AdminUtils.fetchTopicConfig(controllerContext.zkClient,
if (!LogConfig.fromProps(config.toProps, AdminUtils.fetchTopicConfig(controllerContext.zkClient,
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
throw new NoReplicaOnlineException(("No broker in ISR for partition " +
"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +

File diff suppressed because it is too large Load Diff

View File

@ -453,7 +453,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
private def generateBrokerId: Int = {
try {
ZkUtils.getBrokerSequenceId(zkClient, config.MaxReservedBrokerId)
ZkUtils.getBrokerSequenceId(zkClient, config.maxReservedBrokerId)
} catch {
case e: Exception =>
error("Failed to generate broker.id due to ", e)

View File

@ -89,7 +89,7 @@ class ReplicaFetcherThread(name:String,
// Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
// This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
// we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
if (!LogConfig.fromProps(brokerConfig.props.props, AdminUtils.fetchTopicConfig(replicaMgr.zkClient,
if (!LogConfig.fromProps(brokerConfig.toProps, AdminUtils.fetchTopicConfig(replicaMgr.zkClient,
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
// Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur.
fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) +

View File

@ -44,7 +44,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
override lazy val configs = {
val cfgs = TestUtils.createBrokerConfigs(serverCount)
cfgs.map(_.putAll(serverConfig))
cfgs.map(new KafkaConfig(_))
cfgs.map(KafkaConfig.fromProps)
}
var consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()

View File

@ -44,7 +44,7 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK
private var server: KafkaServer = null
private val props = TestUtils.createBrokerConfig(brokerId, port)
private val config = new KafkaConfig(props)
private val config = KafkaConfig.fromProps(props)
private val topic = "topic"
private val numRecords = 2000

View File

@ -40,17 +40,18 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
private val serverMessageMaxBytes = producerBufferSize/2
val numServers = 2
val configs =
for(props <- TestUtils.createBrokerConfigs(numServers, false))
yield new KafkaConfig(props) {
override val zkConnect = TestZKUtils.zookeeperConnect
override val autoCreateTopicsEnable = false
override val messageMaxBytes = serverMessageMaxBytes
// Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic)
// so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
override val offsetsTopicPartitions = 1
}
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.ZkConnectProp, TestZKUtils.zookeeperConnect)
overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
overridingProps.put(KafkaConfig.MessageMaxBytesProp, serverMessageMaxBytes.toString)
// Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic)
// so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
val configs =
for (props <- TestUtils.createBrokerConfigs(numServers, false))
yield KafkaConfig.fromProps(props, overridingProps)
private var consumer1: SimpleConsumer = null
private var consumer2: SimpleConsumer = null

View File

@ -37,12 +37,14 @@ import org.apache.kafka.common.serialization.ByteArraySerializer
class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
val numServers = 2
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.ZkConnectProp, TestZKUtils.zookeeperConnect)
overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString)
val configs =
for(props <- TestUtils.createBrokerConfigs(numServers, false))
yield new KafkaConfig(props) {
override val zkConnect = TestZKUtils.zookeeperConnect
override val numPartitions = 4
}
for (props <- TestUtils.createBrokerConfigs(numServers, false))
yield KafkaConfig.fromProps(props, overridingProps)
private var consumer1: SimpleConsumer = null
private var consumer2: SimpleConsumer = null

View File

@ -55,10 +55,10 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
override def setUp() {
super.setUp()
// start all the servers
val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
val server3 = TestUtils.createServer(new KafkaConfig(configProps3))
val server4 = TestUtils.createServer(new KafkaConfig(configProps4))
val server1 = TestUtils.createServer(KafkaConfig.fromProps(configProps1))
val server2 = TestUtils.createServer(KafkaConfig.fromProps(configProps2))
val server3 = TestUtils.createServer(KafkaConfig.fromProps(configProps3))
val server4 = TestUtils.createServer(KafkaConfig.fromProps(configProps4))
servers ++= List(server1, server2, server3, server4)
brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port))

View File

@ -145,7 +145,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
val topic = "test"
// create brokers
val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b)))
val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// create the topic
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
// reassign partition 0
@ -176,7 +176,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
val topic = "test"
// create brokers
val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b)))
val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// create the topic
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
// reassign partition 0
@ -207,7 +207,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val expectedReplicaAssignment = Map(0 -> List(0, 1))
val topic = "test"
// create brokers
val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b)))
val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// create the topic
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
// reassign partition 0
@ -236,7 +236,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
def testReassigningNonExistingPartition() {
val topic = "test"
// create brokers
val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b)))
val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// reassign partition 0
val newReplicas = Seq(2, 3)
val partitionToBeReassigned = 0
@ -262,7 +262,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas))
reassignPartitionsCommand.reassignPartitions
// create brokers
val servers = TestUtils.createBrokerConfigs(2, false).map(b => TestUtils.createServer(new KafkaConfig(b)))
val servers = TestUtils.createBrokerConfigs(2, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// wait until reassignment completes
TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkClient),
@ -298,7 +298,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val partition = 1
val preferredReplica = 0
// create brokers
val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(new KafkaConfig(_))
val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(KafkaConfig.fromProps)
// create the topic
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
@ -318,7 +318,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val topic = "test"
val partition = 1
// create brokers
val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(new KafkaConfig(_))
val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(KafkaConfig.fromProps)
val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
// create the topic
TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers)
@ -365,7 +365,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
def testTopicConfigChange() {
val partitions = 3
val topic = "my-topic"
val server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0)))
val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0)))
def makeConfig(messageSize: Int, retentionMs: Long) = {
var props = new Properties()

View File

@ -26,7 +26,7 @@ import kafka.integration.KafkaServerTestHarness
class DeleteConsumerGroupTest extends JUnit3Suite with KafkaServerTestHarness {
val configs = TestUtils.createBrokerConfigs(3, false, true).map(new KafkaConfig(_))
val configs = TestUtils.createBrokerConfigs(3, false, true).map(KafkaConfig.fromProps)
@Test
def testGroupWideDeleteInZK() {

View File

@ -99,7 +99,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
val brokerConfigs = TestUtils.createBrokerConfigs(4, false)
brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
// create brokers
val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b)))
val allServers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
// create the topic
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
@ -263,7 +263,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
val topicAndPartition = TopicAndPartition(topic, 0)
// create brokers
val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b)))
val servers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// create the topic
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
// wait until replica log is created on every broker

View File

@ -18,6 +18,7 @@
package kafka.consumer
import java.util.Properties
import java.util.concurrent._
import java.util.concurrent.atomic._
import scala.collection._
@ -36,11 +37,14 @@ import kafka.integration.KafkaServerTestHarness
class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
val numNodes = 1
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.ZkConnectProp, TestZKUtils.zookeeperConnect)
val configs =
for(props <- TestUtils.createBrokerConfigs(numNodes))
yield new KafkaConfig(props) {
override val zkConnect = TestZKUtils.zookeeperConnect
}
yield KafkaConfig.fromProps(props, overridingProps)
val messages = new mutable.HashMap[Int, Seq[Message]]
val topic = "topic"
val group = "group1"

View File

@ -42,12 +42,14 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val numNodes = 2
val numParts = 2
val topic = "topic1"
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.ZkConnectProp, zookeeperConnect)
overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
val configs =
for(props <- TestUtils.createBrokerConfigs(numNodes))
yield new KafkaConfig(props) {
override val zkConnect = zookeeperConnect
override val numPartitions = numParts
}
for (props <- TestUtils.createBrokerConfigs(numNodes))
yield KafkaConfig.fromProps(props, overridingProps)
val group = "group1"
val consumer0 = "consumer0"
val consumer1 = "consumer1"

View File

@ -31,7 +31,7 @@ import junit.framework.Assert._
class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0)))
val configs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0)))
val topic = "test_topic"
val group = "default_group"

View File

@ -36,7 +36,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
val numNodes = 1
val configs =
for(props <- TestUtils.createBrokerConfigs(numNodes))
yield new KafkaConfig(props)
yield KafkaConfig.fromProps(props)
val messages = new mutable.HashMap[Int, Seq[Array[Byte]]]
val topic = "topic"
val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port)))

View File

@ -42,7 +42,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
val port = TestUtils.choosePort()
val props = TestUtils.createBrokerConfig(0, port)
val config = new KafkaConfig(props)
val config = KafkaConfig.fromProps(props)
val configs = List(config)
def testFetchRequestCanProperlySerialize() {

View File

@ -49,10 +49,10 @@ class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness {
override def setUp() {
super.setUp()
// start all the servers
val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
val server3 = TestUtils.createServer(new KafkaConfig(configProps3))
val server4 = TestUtils.createServer(new KafkaConfig(configProps4))
val server1 = TestUtils.createServer(KafkaConfig.fromProps(configProps1))
val server2 = TestUtils.createServer(KafkaConfig.fromProps(configProps2))
val server3 = TestUtils.createServer(KafkaConfig.fromProps(configProps3))
val server4 = TestUtils.createServer(KafkaConfig.fromProps(configProps4))
servers ++= List(server1, server2, server3, server4)
}

View File

@ -32,7 +32,7 @@ import kafka.client.ClientUtils
class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = createBrokerConfigs(1)
val configs = props.map(p => new KafkaConfig(p))
val configs = props.map(p => KafkaConfig.fromProps(p))
private var server1: KafkaServer = null
val brokers = configs.map(c => new Broker(c.brokerId,c.hostName,c.port))

View File

@ -91,7 +91,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
private def startBrokers(cluster: Seq[Properties]) {
for (props <- cluster) {
val config = new KafkaConfig(props)
val config = KafkaConfig.fromProps(props)
val server = createServer(config)
configs ++= List(config)
servers ++= List(server)

View File

@ -17,6 +17,8 @@
package kafka.javaapi.consumer
import java.util.Properties
import kafka.server._
import kafka.message._
import kafka.serializer._
@ -42,12 +44,15 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val numNodes = 2
val numParts = 2
val topic = "topic1"
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.ZkConnectProp, zookeeperConnect)
overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
val configs =
for(props <- TestUtils.createBrokerConfigs(numNodes))
yield new KafkaConfig(props) {
override val numPartitions = numParts
override val zkConnect = zookeeperConnect
}
for (props <- TestUtils.createBrokerConfigs(numNodes))
yield KafkaConfig.fromProps(props, overridingProps)
val group = "group1"
val consumer1 = "consumer1"
val nMessages = 2

View File

@ -38,7 +38,7 @@ class LogTest extends JUnitSuite {
def setUp() {
logDir = TestUtils.tempDir()
val props = TestUtils.createBrokerConfig(0, -1)
config = new KafkaConfig(props)
config = KafkaConfig.fromProps(props)
}
@After

View File

@ -57,7 +57,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
val propsZk = TestUtils.createBrokerConfig(brokerZk, portZk)
val logDirZkPath = propsZk.getProperty("log.dir")
logDirZk = new File(logDirZkPath)
config = new KafkaConfig(propsZk)
config = KafkaConfig.fromProps(propsZk)
server = TestUtils.createServer(config)
simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64 * 1024, "")
}

View File

@ -17,6 +17,8 @@
package kafka.consumer
import java.util.Properties
import com.yammer.metrics.Metrics
import com.yammer.metrics.core.MetricPredicate
import org.junit.Test
@ -38,12 +40,15 @@ class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
val numNodes = 2
val numParts = 2
val topic = "topic1"
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.ZkConnectProp, zookeeperConnect)
overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
val configs =
for (props <- TestUtils.createBrokerConfigs(numNodes, enableDeleteTopic=true))
yield new KafkaConfig(props) {
override val zkConnect = zookeeperConnect
override val numPartitions = numParts
}
for (props <- TestUtils.createBrokerConfigs(numNodes, enableDeleteTopic = true))
yield KafkaConfig.fromProps(props, overridingProps)
val nMessages = 2
override def tearDown() {

View File

@ -37,7 +37,7 @@ import kafka.utils._
class AsyncProducerTest extends JUnit3Suite {
val props = createBrokerConfigs(1)
val configs = props.map(p => new KafkaConfig(p))
val configs = props.map(p => KafkaConfig.fromProps(p))
override def setUp() {
super.setUp()

View File

@ -51,10 +51,10 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false)
props1.put("num.partitions", "4")
private val config1 = new KafkaConfig(props1)
private val config1 = KafkaConfig.fromProps(props1)
private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false)
props2.put("num.partitions", "4")
private val config2 = new KafkaConfig(props2)
private val config2 = KafkaConfig.fromProps(props2)
override def setUp() {
super.setUp()

View File

@ -33,7 +33,7 @@ import kafka.common.{TopicAndPartition, ErrorMapping}
class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
private var messageBytes = new Array[Byte](2);
// turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool.
val configs = List(new KafkaConfig(TestUtils.createBrokerConfigs(1, false).head))
val configs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, false).head))
val zookeeperConnect = TestZKUtils.zookeeperConnect
@Test

View File

@ -34,7 +34,7 @@ class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness {
props.put("advertised.host.name", advertisedHostName)
props.put("advertised.port", advertisedPort.toString)
server = TestUtils.createServer(new KafkaConfig(props))
server = TestUtils.createServer(KafkaConfig.fromProps(props))
}
override def tearDown() {

View File

@ -27,7 +27,7 @@ import org.scalatest.junit.JUnit3Suite
class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness {
override val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, TestUtils.choosePort)))
override val configs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, TestUtils.choosePort)))
@Test
def testConfigChange() {

View File

@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean
class HighwatermarkPersistenceTest extends JUnit3Suite {
val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_))
val configs = TestUtils.createBrokerConfigs(2).map(KafkaConfig.fromProps)
val topic = "foo"
val logManagers = configs map { config =>
TestUtils.createLogManager(

View File

@ -16,6 +16,8 @@
*/
package kafka.server
import java.util.Properties
import org.scalatest.junit.JUnit3Suite
import collection.mutable.HashMap
import collection.mutable.Map
@ -29,11 +31,15 @@ import java.util.concurrent.atomic.AtomicBoolean
class IsrExpirationTest extends JUnit3Suite {
var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
override val replicaLagTimeMaxMs = 100L
override val replicaFetchWaitMaxMs = 100
override val replicaLagMaxMessages = 10L
})
val replicaLagTimeMaxMs = 100L
val replicaFetchWaitMaxMs = 100
val replicaLagMaxMessages = 10L
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString)
overridingProps.put(KafkaConfig.ReplicaLagMaxMessagesProp, replicaLagMaxMessages.toString)
val configs = TestUtils.createBrokerConfigs(2).map(KafkaConfig.fromProps(_, overridingProps))
val topic = "foo"
val time = new MockTime

View File

@ -0,0 +1,403 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package unit.kafka.server
import java.util.Properties
import kafka.message._
import kafka.server.{Defaults, KafkaConfig}
import org.apache.kafka.common.config.ConfigException
import org.junit.{Assert, Test}
import org.scalatest.junit.JUnit3Suite
import scala.collection.Map
import scala.util.Random._
class KafkaConfigConfigDefTest extends JUnit3Suite {
@Test
def testFromPropsDefaults() {
val defaults = new Properties()
defaults.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
// some ordinary setting
defaults.put(KafkaConfig.AdvertisedPortProp, "1818")
val props = new Properties(defaults)
val config = KafkaConfig.fromProps(props)
Assert.assertEquals(1818, config.advertisedPort)
Assert.assertEquals("KafkaConfig defaults should be retained", Defaults.ConnectionsMaxIdleMs, config.connectionsMaxIdleMs)
}
@Test
def testFromPropsEmpty() {
// only required
val p = new Properties()
p.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
val actualConfig = KafkaConfig.fromProps(p)
val expectedConfig = new KafkaConfig(zkConnect = "127.0.0.1:2181")
Assert.assertEquals(expectedConfig.zkConnect, actualConfig.zkConnect)
Assert.assertEquals(expectedConfig.zkSessionTimeoutMs, actualConfig.zkSessionTimeoutMs)
Assert.assertEquals(expectedConfig.zkConnectionTimeoutMs, actualConfig.zkConnectionTimeoutMs)
Assert.assertEquals(expectedConfig.zkSyncTimeMs, actualConfig.zkSyncTimeMs)
Assert.assertEquals(expectedConfig.maxReservedBrokerId, actualConfig.maxReservedBrokerId)
Assert.assertEquals(expectedConfig.brokerId, actualConfig.brokerId)
Assert.assertEquals(expectedConfig.messageMaxBytes, actualConfig.messageMaxBytes)
Assert.assertEquals(expectedConfig.numNetworkThreads, actualConfig.numNetworkThreads)
Assert.assertEquals(expectedConfig.numIoThreads, actualConfig.numIoThreads)
Assert.assertEquals(expectedConfig.backgroundThreads, actualConfig.backgroundThreads)
Assert.assertEquals(expectedConfig.queuedMaxRequests, actualConfig.queuedMaxRequests)
Assert.assertEquals(expectedConfig.port, actualConfig.port)
Assert.assertEquals(expectedConfig.hostName, actualConfig.hostName)
Assert.assertEquals(expectedConfig.advertisedHostName, actualConfig.advertisedHostName)
Assert.assertEquals(expectedConfig.advertisedPort, actualConfig.advertisedPort)
Assert.assertEquals(expectedConfig.socketSendBufferBytes, actualConfig.socketSendBufferBytes)
Assert.assertEquals(expectedConfig.socketReceiveBufferBytes, actualConfig.socketReceiveBufferBytes)
Assert.assertEquals(expectedConfig.socketRequestMaxBytes, actualConfig.socketRequestMaxBytes)
Assert.assertEquals(expectedConfig.maxConnectionsPerIp, actualConfig.maxConnectionsPerIp)
Assert.assertEquals(expectedConfig.maxConnectionsPerIpOverrides, actualConfig.maxConnectionsPerIpOverrides)
Assert.assertEquals(expectedConfig.connectionsMaxIdleMs, actualConfig.connectionsMaxIdleMs)
Assert.assertEquals(expectedConfig.numPartitions, actualConfig.numPartitions)
Assert.assertEquals(expectedConfig.logDirs, actualConfig.logDirs)
Assert.assertEquals(expectedConfig.logSegmentBytes, actualConfig.logSegmentBytes)
Assert.assertEquals(expectedConfig.logRollTimeMillis, actualConfig.logRollTimeMillis)
Assert.assertEquals(expectedConfig.logRollTimeJitterMillis, actualConfig.logRollTimeJitterMillis)
Assert.assertEquals(expectedConfig.logRetentionTimeMillis, actualConfig.logRetentionTimeMillis)
Assert.assertEquals(expectedConfig.logRetentionBytes, actualConfig.logRetentionBytes)
Assert.assertEquals(expectedConfig.logCleanupIntervalMs, actualConfig.logCleanupIntervalMs)
Assert.assertEquals(expectedConfig.logCleanupPolicy, actualConfig.logCleanupPolicy)
Assert.assertEquals(expectedConfig.logCleanerThreads, actualConfig.logCleanerThreads)
Assert.assertEquals(expectedConfig.logCleanerIoMaxBytesPerSecond, actualConfig.logCleanerIoMaxBytesPerSecond, 0.0)
Assert.assertEquals(expectedConfig.logCleanerDedupeBufferSize, actualConfig.logCleanerDedupeBufferSize)
Assert.assertEquals(expectedConfig.logCleanerIoBufferSize, actualConfig.logCleanerIoBufferSize)
Assert.assertEquals(expectedConfig.logCleanerDedupeBufferLoadFactor, actualConfig.logCleanerDedupeBufferLoadFactor, 0.0)
Assert.assertEquals(expectedConfig.logCleanerBackoffMs, actualConfig.logCleanerBackoffMs)
Assert.assertEquals(expectedConfig.logCleanerMinCleanRatio, actualConfig.logCleanerMinCleanRatio, 0.0)
Assert.assertEquals(expectedConfig.logCleanerEnable, actualConfig.logCleanerEnable)
Assert.assertEquals(expectedConfig.logCleanerDeleteRetentionMs, actualConfig.logCleanerDeleteRetentionMs)
Assert.assertEquals(expectedConfig.logIndexSizeMaxBytes, actualConfig.logIndexSizeMaxBytes)
Assert.assertEquals(expectedConfig.logIndexIntervalBytes, actualConfig.logIndexIntervalBytes)
Assert.assertEquals(expectedConfig.logFlushIntervalMessages, actualConfig.logFlushIntervalMessages)
Assert.assertEquals(expectedConfig.logDeleteDelayMs, actualConfig.logDeleteDelayMs)
Assert.assertEquals(expectedConfig.logFlushSchedulerIntervalMs, actualConfig.logFlushSchedulerIntervalMs)
Assert.assertEquals(expectedConfig.logFlushIntervalMs, actualConfig.logFlushIntervalMs)
Assert.assertEquals(expectedConfig.logFlushOffsetCheckpointIntervalMs, actualConfig.logFlushOffsetCheckpointIntervalMs)
Assert.assertEquals(expectedConfig.numRecoveryThreadsPerDataDir, actualConfig.numRecoveryThreadsPerDataDir)
Assert.assertEquals(expectedConfig.autoCreateTopicsEnable, actualConfig.autoCreateTopicsEnable)
Assert.assertEquals(expectedConfig.minInSyncReplicas, actualConfig.minInSyncReplicas)
Assert.assertEquals(expectedConfig.controllerSocketTimeoutMs, actualConfig.controllerSocketTimeoutMs)
Assert.assertEquals(expectedConfig.controllerMessageQueueSize, actualConfig.controllerMessageQueueSize)
Assert.assertEquals(expectedConfig.defaultReplicationFactor, actualConfig.defaultReplicationFactor)
Assert.assertEquals(expectedConfig.replicaLagTimeMaxMs, actualConfig.replicaLagTimeMaxMs)
Assert.assertEquals(expectedConfig.replicaLagMaxMessages, actualConfig.replicaLagMaxMessages)
Assert.assertEquals(expectedConfig.replicaSocketTimeoutMs, actualConfig.replicaSocketTimeoutMs)
Assert.assertEquals(expectedConfig.replicaSocketReceiveBufferBytes, actualConfig.replicaSocketReceiveBufferBytes)
Assert.assertEquals(expectedConfig.replicaFetchMaxBytes, actualConfig.replicaFetchMaxBytes)
Assert.assertEquals(expectedConfig.replicaFetchWaitMaxMs, actualConfig.replicaFetchWaitMaxMs)
Assert.assertEquals(expectedConfig.replicaFetchMinBytes, actualConfig.replicaFetchMinBytes)
Assert.assertEquals(expectedConfig.numReplicaFetchers, actualConfig.numReplicaFetchers)
Assert.assertEquals(expectedConfig.replicaHighWatermarkCheckpointIntervalMs, actualConfig.replicaHighWatermarkCheckpointIntervalMs)
Assert.assertEquals(expectedConfig.fetchPurgatoryPurgeIntervalRequests, actualConfig.fetchPurgatoryPurgeIntervalRequests)
Assert.assertEquals(expectedConfig.producerPurgatoryPurgeIntervalRequests, actualConfig.producerPurgatoryPurgeIntervalRequests)
Assert.assertEquals(expectedConfig.autoLeaderRebalanceEnable, actualConfig.autoLeaderRebalanceEnable)
Assert.assertEquals(expectedConfig.leaderImbalancePerBrokerPercentage, actualConfig.leaderImbalancePerBrokerPercentage)
Assert.assertEquals(expectedConfig.leaderImbalanceCheckIntervalSeconds, actualConfig.leaderImbalanceCheckIntervalSeconds)
Assert.assertEquals(expectedConfig.uncleanLeaderElectionEnable, actualConfig.uncleanLeaderElectionEnable)
Assert.assertEquals(expectedConfig.controlledShutdownMaxRetries, actualConfig.controlledShutdownMaxRetries)
Assert.assertEquals(expectedConfig.controlledShutdownRetryBackoffMs, actualConfig.controlledShutdownRetryBackoffMs)
Assert.assertEquals(expectedConfig.controlledShutdownEnable, actualConfig.controlledShutdownEnable)
Assert.assertEquals(expectedConfig.offsetMetadataMaxSize, actualConfig.offsetMetadataMaxSize)
Assert.assertEquals(expectedConfig.offsetsLoadBufferSize, actualConfig.offsetsLoadBufferSize)
Assert.assertEquals(expectedConfig.offsetsTopicReplicationFactor, actualConfig.offsetsTopicReplicationFactor)
Assert.assertEquals(expectedConfig.offsetsTopicPartitions, actualConfig.offsetsTopicPartitions)
Assert.assertEquals(expectedConfig.offsetsTopicSegmentBytes, actualConfig.offsetsTopicSegmentBytes)
Assert.assertEquals(expectedConfig.offsetsTopicCompressionCodec, actualConfig.offsetsTopicCompressionCodec)
Assert.assertEquals(expectedConfig.offsetsRetentionMinutes, actualConfig.offsetsRetentionMinutes)
Assert.assertEquals(expectedConfig.offsetsRetentionCheckIntervalMs, actualConfig.offsetsRetentionCheckIntervalMs)
Assert.assertEquals(expectedConfig.offsetCommitTimeoutMs, actualConfig.offsetCommitTimeoutMs)
Assert.assertEquals(expectedConfig.offsetCommitRequiredAcks, actualConfig.offsetCommitRequiredAcks)
Assert.assertEquals(expectedConfig.deleteTopicEnable, actualConfig.deleteTopicEnable)
Assert.assertEquals(expectedConfig.compressionType, actualConfig.compressionType)
}
private def atLeastXIntProp(x: Int): String = (nextInt(Int.MaxValue - 1) + x).toString
private def atLeastOneIntProp: String = atLeastXIntProp(1)
private def inRangeIntProp(fromInc: Int, toInc: Int): String = (nextInt(toInc + 1 - fromInc) + fromInc).toString
@Test
def testFromPropsToProps() {
import scala.util.Random._
val expected = new Properties()
KafkaConfig.configNames().foreach(name => {
name match {
case KafkaConfig.ZkConnectProp => expected.setProperty(name, "127.0.0.1:2181")
case KafkaConfig.ZkSessionTimeoutMsProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.ZkConnectionTimeoutMsProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.ZkSyncTimeMsProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.NumNetworkThreadsProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.NumIoThreadsProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.BackgroundThreadsProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.QueuedMaxRequestsProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.PortProp => expected.setProperty(name, "1234")
case KafkaConfig.HostNameProp => expected.setProperty(name, nextString(10))
case KafkaConfig.AdvertisedHostNameProp => expected.setProperty(name, nextString(10))
case KafkaConfig.AdvertisedPortProp => expected.setProperty(name, "4321")
case KafkaConfig.SocketRequestMaxBytesProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.MaxConnectionsPerIpProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.MaxConnectionsPerIpOverridesProp => expected.setProperty(name, "127.0.0.1:2, 127.0.0.2:3")
case KafkaConfig.NumPartitionsProp => expected.setProperty(name, "2")
case KafkaConfig.LogDirsProp => expected.setProperty(name, "/tmp/logs,/tmp/logs2")
case KafkaConfig.LogDirProp => expected.setProperty(name, "/tmp/log")
case KafkaConfig.LogSegmentBytesProp => expected.setProperty(name, atLeastXIntProp(Message.MinHeaderSize))
case KafkaConfig.LogRollTimeMillisProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.LogRollTimeHoursProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.LogRetentionTimeMillisProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.LogRetentionTimeMinutesProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.LogRetentionTimeHoursProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.LogCleanupIntervalMsProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.LogCleanupPolicyProp => expected.setProperty(name, randFrom(Defaults.Compact, Defaults.Delete))
case KafkaConfig.LogCleanerIoMaxBytesPerSecondProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1))
case KafkaConfig.LogCleanerDedupeBufferLoadFactorProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1))
case KafkaConfig.LogCleanerMinCleanRatioProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1))
case KafkaConfig.LogCleanerEnableProp => expected.setProperty(name, randFrom("true", "false"))
case KafkaConfig.LogIndexSizeMaxBytesProp => expected.setProperty(name, atLeastXIntProp(4))
case KafkaConfig.LogFlushIntervalMessagesProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.NumRecoveryThreadsPerDataDirProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.AutoCreateTopicsEnableProp => expected.setProperty(name, randFrom("true", "false"))
case KafkaConfig.MinInSyncReplicasProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.AutoLeaderRebalanceEnableProp => expected.setProperty(name, randFrom("true", "false"))
case KafkaConfig.UncleanLeaderElectionEnableProp => expected.setProperty(name, randFrom("true", "false"))
case KafkaConfig.ControlledShutdownEnableProp => expected.setProperty(name, randFrom("true", "false"))
case KafkaConfig.OffsetsLoadBufferSizeProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.OffsetsTopicPartitionsProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.OffsetsTopicSegmentBytesProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.OffsetsTopicCompressionCodecProp => expected.setProperty(name, randFrom(GZIPCompressionCodec.codec.toString,
SnappyCompressionCodec.codec.toString, LZ4CompressionCodec.codec.toString, NoCompressionCodec.codec.toString))
case KafkaConfig.OffsetsRetentionMinutesProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.OffsetsRetentionCheckIntervalMsProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.OffsetCommitTimeoutMsProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.DeleteTopicEnableProp => expected.setProperty(name, randFrom("true", "false"))
// explicit, non trivial validations or with transient dependencies
// require(brokerId >= -1 && brokerId <= maxReservedBrokerId)
case KafkaConfig.MaxReservedBrokerIdProp => expected.setProperty(name, "100")
case KafkaConfig.BrokerIdProp => expected.setProperty(name, inRangeIntProp(0, 100))
// require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024)
case KafkaConfig.LogCleanerThreadsProp => expected.setProperty(name, "2")
case KafkaConfig.LogCleanerDedupeBufferSizeProp => expected.setProperty(name, (1024 * 1024 * 3 + 1).toString)
// require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs)
case KafkaConfig.ReplicaFetchWaitMaxMsProp => expected.setProperty(name, "321")
case KafkaConfig.ReplicaSocketTimeoutMsProp => expected.setProperty(name, atLeastXIntProp(321))
// require(replicaFetchMaxBytes >= messageMaxBytes)
case KafkaConfig.MessageMaxBytesProp => expected.setProperty(name, "1234")
case KafkaConfig.ReplicaFetchMaxBytesProp => expected.setProperty(name, atLeastXIntProp(1234))
// require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs)
case KafkaConfig.ReplicaLagTimeMaxMsProp => expected.setProperty(name, atLeastXIntProp(321))
//require(offsetCommitRequiredAcks >= -1 && offsetCommitRequiredAcks <= offsetsTopicReplicationFactor)
case KafkaConfig.OffsetCommitRequiredAcksProp => expected.setProperty(name, "-1")
case KafkaConfig.OffsetsTopicReplicationFactorProp => expected.setProperty(name, inRangeIntProp(-1, Short.MaxValue))
//BrokerCompressionCodec.isValid(compressionType)
case KafkaConfig.CompressionTypeProp => expected.setProperty(name, randFrom(BrokerCompressionCodec.brokerCompressionOptions))
case nonNegativeIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString)
}
})
val actual = KafkaConfig.fromProps(expected).toProps
Assert.assertEquals(expected, actual)
}
@Test
def testFromPropsInvalid() {
def getBaseProperties(): Properties = {
val validRequiredProperties = new Properties()
validRequiredProperties.put(KafkaConfig.ZkConnectProp, "127.0.0.1")
validRequiredProperties
}
// to ensure a basis is valid - bootstraps all needed validation
KafkaConfig.fromProps(getBaseProperties())
KafkaConfig.configNames().foreach(name => {
name match {
case KafkaConfig.ZkConnectProp => // ignore string
case KafkaConfig.ZkSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ZkConnectionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ZkSyncTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.BrokerIdProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.NumNetworkThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.NumIoThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.BackgroundThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.QueuedMaxRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.PortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.HostNameProp => // ignore string
case KafkaConfig.AdvertisedHostNameProp => //ignore string
case KafkaConfig.AdvertisedPortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.SocketSendBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.SocketReceiveBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.MaxConnectionsPerIpOverridesProp =>
assertPropertyInvalid(getBaseProperties(), name, "127.0.0.1:not_a_number")
case KafkaConfig.ConnectionsMaxIdleMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.NumPartitionsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.LogDirsProp => // ignore string
case KafkaConfig.LogDirProp => // ignore string
case KafkaConfig.LogSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", Message.MinHeaderSize - 1)
case KafkaConfig.LogRollTimeMillisProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.LogRollTimeHoursProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.LogRetentionTimeMillisProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.LogRetentionTimeMinutesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.LogRetentionTimeHoursProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.LogRetentionBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LogCleanupIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.LogCleanupPolicyProp => assertPropertyInvalid(getBaseProperties(), name, "unknown_policy", "0")
case KafkaConfig.LogCleanerIoMaxBytesPerSecondProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LogCleanerDedupeBufferSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "1024")
case KafkaConfig.LogCleanerDedupeBufferLoadFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LogCleanerEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
case KafkaConfig.LogCleanerDeleteRetentionMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LogCleanerMinCleanRatioProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LogIndexSizeMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "3")
case KafkaConfig.LogFlushIntervalMessagesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.LogFlushSchedulerIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LogFlushIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.NumRecoveryThreadsPerDataDirProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.AutoCreateTopicsEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
case KafkaConfig.MinInSyncReplicasProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.ControllerSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ControllerMessageQueueSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.DefaultReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ReplicaLagTimeMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ReplicaLagMaxMessagesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ReplicaSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2")
case KafkaConfig.ReplicaSocketReceiveBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ReplicaFetchMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ReplicaFetchWaitMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ReplicaFetchMinBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.NumReplicaFetchersProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.AutoLeaderRebalanceEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
case KafkaConfig.LeaderImbalancePerBrokerPercentageProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.UncleanLeaderElectionEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
case KafkaConfig.ControlledShutdownMaxRetriesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ControlledShutdownRetryBackoffMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ControlledShutdownEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
case KafkaConfig.OffsetMetadataMaxSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.OffsetsLoadBufferSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.OffsetsTopicReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.OffsetsTopicPartitionsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.OffsetsTopicSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.OffsetsTopicCompressionCodecProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
case KafkaConfig.OffsetsRetentionMinutesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.OffsetsRetentionCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.OffsetCommitTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.OffsetCommitRequiredAcksProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2")
case KafkaConfig.DeleteTopicEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
case nonNegativeIntProperty => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
}
})
}
@Test
def testSpecificProperties(): Unit = {
val defaults = new Properties()
defaults.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
// For ZkConnectionTimeoutMs
defaults.put(KafkaConfig.ZkSessionTimeoutMsProp, "1234")
defaults.put(KafkaConfig.MaxReservedBrokerIdProp, "1")
defaults.put(KafkaConfig.BrokerIdProp, "1")
defaults.put(KafkaConfig.HostNameProp, "127.0.0.1")
defaults.put(KafkaConfig.PortProp, "1122")
defaults.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:2, 127.0.0.2:3")
defaults.put(KafkaConfig.LogDirProp, "/tmp1,/tmp2")
defaults.put(KafkaConfig.LogRollTimeHoursProp, "12")
defaults.put(KafkaConfig.LogRollTimeJitterHoursProp, "11")
defaults.put(KafkaConfig.LogRetentionTimeHoursProp, "10")
//For LogFlushIntervalMsProp
defaults.put(KafkaConfig.LogFlushSchedulerIntervalMsProp, "123")
defaults.put(KafkaConfig.OffsetsTopicCompressionCodecProp, SnappyCompressionCodec.codec.toString)
val config = KafkaConfig.fromProps(defaults)
Assert.assertEquals("127.0.0.1:2181", config.zkConnect)
Assert.assertEquals(1234, config.zkConnectionTimeoutMs)
Assert.assertEquals(1, config.maxReservedBrokerId)
Assert.assertEquals(1, config.brokerId)
Assert.assertEquals("127.0.0.1", config.hostName)
Assert.assertEquals(1122, config.advertisedPort)
Assert.assertEquals("127.0.0.1", config.advertisedHostName)
Assert.assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), config.maxConnectionsPerIpOverrides)
Assert.assertEquals(List("/tmp1", "/tmp2"), config.logDirs)
Assert.assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis)
Assert.assertEquals(11 * 60L * 1000L * 60, config.logRollTimeJitterMillis)
Assert.assertEquals(10 * 60L * 1000L * 60, config.logRetentionTimeMillis)
Assert.assertEquals(123L, config.logFlushIntervalMs)
Assert.assertEquals(SnappyCompressionCodec, config.offsetsTopicCompressionCodec)
}
private def assertPropertyInvalid(validRequiredProps: => Properties, name: String, values: Any*) {
values.foreach((value) => {
val props = validRequiredProps
props.setProperty(name, value.toString)
intercept[Exception] {
KafkaConfig.fromProps(props)
}
})
}
private def randFrom[T](choices: T*): T = {
import scala.util.Random
choices(Random.nextInt(choices.size))
}
private def randFrom[T](choices: List[T]): T = {
import scala.util.Random
choices(Random.nextInt(choices.size))
}
}

View File

@ -17,6 +17,7 @@
package kafka.server
import org.apache.kafka.common.config.ConfigException
import org.junit.Test
import junit.framework.Assert._
import org.scalatest.junit.JUnit3Suite
@ -31,7 +32,7 @@ class KafkaConfigTest extends JUnit3Suite {
val props = TestUtils.createBrokerConfig(0, 8181)
props.put("log.retention.hours", "1")
val cfg = new KafkaConfig(props)
val cfg = KafkaConfig.fromProps(props)
assertEquals(60L * 60L * 1000L, cfg.logRetentionTimeMillis)
}
@ -41,7 +42,7 @@ class KafkaConfigTest extends JUnit3Suite {
val props = TestUtils.createBrokerConfig(0, 8181)
props.put("log.retention.minutes", "30")
val cfg = new KafkaConfig(props)
val cfg = KafkaConfig.fromProps(props)
assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis)
}
@ -51,7 +52,7 @@ class KafkaConfigTest extends JUnit3Suite {
val props = TestUtils.createBrokerConfig(0, 8181)
props.put("log.retention.ms", "1800000")
val cfg = new KafkaConfig(props)
val cfg = KafkaConfig.fromProps(props)
assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis)
}
@ -60,7 +61,7 @@ class KafkaConfigTest extends JUnit3Suite {
def testLogRetentionTimeNoConfigProvided() {
val props = TestUtils.createBrokerConfig(0, 8181)
val cfg = new KafkaConfig(props)
val cfg = KafkaConfig.fromProps(props)
assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRetentionTimeMillis)
}
@ -71,7 +72,7 @@ class KafkaConfigTest extends JUnit3Suite {
props.put("log.retention.minutes", "30")
props.put("log.retention.hours", "1")
val cfg = new KafkaConfig(props)
val cfg = KafkaConfig.fromProps(props)
assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis)
}
@ -82,7 +83,7 @@ class KafkaConfigTest extends JUnit3Suite {
props.put("log.retention.ms", "1800000")
props.put("log.retention.minutes", "10")
val cfg = new KafkaConfig(props)
val cfg = KafkaConfig.fromProps(props)
assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis)
}
@ -95,7 +96,7 @@ class KafkaConfigTest extends JUnit3Suite {
val props = TestUtils.createBrokerConfig(0, port)
props.put("host.name", hostName)
val serverConfig = new KafkaConfig(props)
val serverConfig = KafkaConfig.fromProps(props)
assertEquals(serverConfig.advertisedHostName, hostName)
assertEquals(serverConfig.advertisedPort, port)
@ -111,7 +112,7 @@ class KafkaConfigTest extends JUnit3Suite {
props.put("advertised.host.name", advertisedHostName)
props.put("advertised.port", advertisedPort.toString)
val serverConfig = new KafkaConfig(props)
val serverConfig = KafkaConfig.fromProps(props)
assertEquals(serverConfig.advertisedHostName, advertisedHostName)
assertEquals(serverConfig.advertisedPort, advertisedPort)
@ -120,7 +121,7 @@ class KafkaConfigTest extends JUnit3Suite {
@Test
def testUncleanLeaderElectionDefault() {
val props = TestUtils.createBrokerConfig(0, 8181)
val serverConfig = new KafkaConfig(props)
val serverConfig = KafkaConfig.fromProps(props)
assertEquals(serverConfig.uncleanLeaderElectionEnable, true)
}
@ -129,7 +130,7 @@ class KafkaConfigTest extends JUnit3Suite {
def testUncleanElectionDisabled() {
val props = TestUtils.createBrokerConfig(0, 8181)
props.put("unclean.leader.election.enable", String.valueOf(false))
val serverConfig = new KafkaConfig(props)
val serverConfig = KafkaConfig.fromProps(props)
assertEquals(serverConfig.uncleanLeaderElectionEnable, false)
}
@ -138,7 +139,7 @@ class KafkaConfigTest extends JUnit3Suite {
def testUncleanElectionEnabled() {
val props = TestUtils.createBrokerConfig(0, 8181)
props.put("unclean.leader.election.enable", String.valueOf(true))
val serverConfig = new KafkaConfig(props)
val serverConfig = KafkaConfig.fromProps(props)
assertEquals(serverConfig.uncleanLeaderElectionEnable, true)
}
@ -148,8 +149,8 @@ class KafkaConfigTest extends JUnit3Suite {
val props = TestUtils.createBrokerConfig(0, 8181)
props.put("unclean.leader.election.enable", "invalid")
intercept[IllegalArgumentException] {
new KafkaConfig(props)
intercept[ConfigException] {
KafkaConfig.fromProps(props)
}
}
@ -158,7 +159,7 @@ class KafkaConfigTest extends JUnit3Suite {
val props = TestUtils.createBrokerConfig(0, 8181)
props.put("log.roll.ms", "1800000")
val cfg = new KafkaConfig(props)
val cfg = KafkaConfig.fromProps(props)
assertEquals(30 * 60L * 1000L, cfg.logRollTimeMillis)
}
@ -169,7 +170,7 @@ class KafkaConfigTest extends JUnit3Suite {
props.put("log.roll.ms", "1800000")
props.put("log.roll.hours", "1")
val cfg = new KafkaConfig(props)
val cfg = KafkaConfig.fromProps(props)
assertEquals( 30 * 60L * 1000L, cfg.logRollTimeMillis)
}
@ -178,7 +179,7 @@ class KafkaConfigTest extends JUnit3Suite {
def testLogRollTimeNoConfigProvided() {
val props = TestUtils.createBrokerConfig(0, 8181)
val cfg = new KafkaConfig(props)
val cfg = KafkaConfig.fromProps(props)
assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRollTimeMillis )
}
@ -186,7 +187,7 @@ class KafkaConfigTest extends JUnit3Suite {
@Test
def testDefaultCompressionType() {
val props = TestUtils.createBrokerConfig(0, 8181)
val serverConfig = new KafkaConfig(props)
val serverConfig = KafkaConfig.fromProps(props)
assertEquals(serverConfig.compressionType, "producer")
}
@ -195,7 +196,7 @@ class KafkaConfigTest extends JUnit3Suite {
def testValidCompressionType() {
val props = TestUtils.createBrokerConfig(0, 8181)
props.put("compression.type", "gzip")
val serverConfig = new KafkaConfig(props)
val serverConfig = KafkaConfig.fromProps(props)
assertEquals(serverConfig.compressionType, "gzip")
}
@ -205,7 +206,7 @@ class KafkaConfigTest extends JUnit3Suite {
val props = TestUtils.createBrokerConfig(0, 8181)
props.put("compression.type", "abc")
intercept[IllegalArgumentException] {
new KafkaConfig(props)
KafkaConfig.fromProps(props)
}
}
}

View File

@ -43,8 +43,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
override def setUp() {
super.setUp()
// start both servers
val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
val server1 = TestUtils.createServer(KafkaConfig.fromProps(configProps1))
val server2 = TestUtils.createServer(KafkaConfig.fromProps(configProps2))
servers ++= List(server1, server2)
}
@ -117,7 +117,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
// start another controller
val controllerId = 2
val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort()))
val controllerConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort()))
val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port))
val controllerContext = new ControllerContext(zkClient, 6000)
controllerContext.liveBrokers = brokers.toSet

View File

@ -50,7 +50,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
val logDirPath = config.getProperty("log.dir")
logDir = new File(logDirPath)
time = new MockTime()
server = TestUtils.createServer(new KafkaConfig(config), time)
server = TestUtils.createServer(KafkaConfig.fromProps(config), time)
simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024, "")
}

View File

@ -16,6 +16,8 @@
*/
package kafka.server
import java.util.Properties
import kafka.utils.TestUtils._
import kafka.utils.IntEncoder
import kafka.utils.{Utils, TestUtils}
@ -31,12 +33,18 @@ import org.junit.Assert._
class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
val configs = TestUtils.createBrokerConfigs(2, false).map(new KafkaConfig(_) {
override val replicaLagTimeMaxMs = 5000L
override val replicaLagMaxMessages = 10L
override val replicaFetchWaitMaxMs = 1000
override val replicaFetchMinBytes = 20
})
val replicaLagTimeMaxMs = 5000L
val replicaLagMaxMessages = 10L
val replicaFetchWaitMaxMs = 1000
val replicaFetchMinBytes = 20
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
overridingProps.put(KafkaConfig.ReplicaLagMaxMessagesProp, replicaLagMaxMessages.toString)
overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString)
overridingProps.put(KafkaConfig.ReplicaFetchMinBytesProp, replicaFetchMinBytes.toString)
val configs = TestUtils.createBrokerConfigs(2, false).map(KafkaConfig.fromProps(_, overridingProps))
val topic = "new-topic"
val partitionId = 0

View File

@ -49,7 +49,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
val logDirPath = config.getProperty("log.dir")
logDir = new File(logDirPath)
time = new MockTime()
server = TestUtils.createServer(new KafkaConfig(config), time)
server = TestUtils.createServer(KafkaConfig.fromProps(config), time)
simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024, "test-client")
val consumerMetadataRequest = ConsumerMetadataRequest(group)
Stream.continually {

View File

@ -28,7 +28,7 @@ import kafka.common._
class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = createBrokerConfigs(2,false)
val configs = props.map(p => new KafkaConfig(p))
val configs = props.map(p => KafkaConfig.fromProps(p))
var brokers: Seq[KafkaServer] = null
val topic1 = "foo"
val topic2 = "bar"

View File

@ -39,7 +39,7 @@ class ReplicaManagerTest extends JUnit3Suite {
@Test
def testHighWaterMarkDirectoryMapping() {
val props = TestUtils.createBrokerConfig(1)
val config = new KafkaConfig(props)
val config = KafkaConfig.fromProps(props)
val zkClient = EasyMock.createMock(classOf[ZkClient])
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
val time: MockTime = new MockTime()
@ -56,7 +56,7 @@ class ReplicaManagerTest extends JUnit3Suite {
def testHighwaterMarkRelativeDirectoryMapping() {
val props = TestUtils.createBrokerConfig(1)
props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
val config = new KafkaConfig(props)
val config = KafkaConfig.fromProps(props)
val zkClient = EasyMock.createMock(classOf[ZkClient])
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
val time: MockTime = new MockTime()
@ -72,7 +72,7 @@ class ReplicaManagerTest extends JUnit3Suite {
@Test
def testIllegalRequiredAcks() {
val props = TestUtils.createBrokerConfig(1)
val config = new KafkaConfig(props)
val config = KafkaConfig.fromProps(props)
val zkClient = EasyMock.createMock(classOf[ZkClient])
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
val time: MockTime = new MockTime()

View File

@ -25,9 +25,9 @@ import java.io.File
class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness {
var props1 = TestUtils.createBrokerConfig(-1, TestUtils.choosePort)
var config1 = new KafkaConfig(props1)
var config1 = KafkaConfig.fromProps(props1)
var props2 = TestUtils.createBrokerConfig(0, TestUtils.choosePort)
var config2 = new KafkaConfig(props2)
var config2 = KafkaConfig.fromProps(props2)
val brokerMetaPropsFile = "meta.properties"
@ -52,7 +52,7 @@ class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness {
val server1 = new KafkaServer(config1)
val server2 = new KafkaServer(config2)
val props3 = TestUtils.createBrokerConfig(-1, TestUtils.choosePort)
val config3 = new KafkaConfig(props3)
val config3 = KafkaConfig.fromProps(props3)
val server3 = new KafkaServer(config3)
server1.startup()
assertEquals(server1.config.brokerId,1001)
@ -78,7 +78,7 @@ class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness {
val logDirs = props1.getProperty("log.dir")+ "," + TestUtils.tempDir().getAbsolutePath +
"," + TestUtils.tempDir().getAbsolutePath
props1.setProperty("log.dir",logDirs)
config1 = new KafkaConfig(props1)
config1 = KafkaConfig.fromProps(props1)
var server1 = new KafkaServer(config1)
server1.startup()
server1.shutdown()
@ -86,7 +86,7 @@ class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness {
// addition to log.dirs after generation of a broker.id from zk should be copied over
val newLogDirs = props1.getProperty("log.dir") + "," + TestUtils.tempDir().getAbsolutePath
props1.setProperty("log.dir",newLogDirs)
config1 = new KafkaConfig(props1)
config1 = KafkaConfig.fromProps(props1)
server1 = new KafkaServer(config1)
server1.startup()
server1.shutdown()

View File

@ -34,7 +34,7 @@ import junit.framework.Assert._
class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
val port = TestUtils.choosePort
val props = TestUtils.createBrokerConfig(0, port)
val config = new KafkaConfig(props)
val config = KafkaConfig.fromProps(props)
val host = "localhost"
val topic = "test"
@ -105,7 +105,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
def testCleanShutdownWithDeleteTopicEnabled() {
val newProps = TestUtils.createBrokerConfig(0, port)
newProps.setProperty("delete.topic.enable", "true")
val newConfig = new KafkaConfig(newProps)
val newConfig = KafkaConfig.fromProps(newProps)
val server = new KafkaServer(newConfig)
server.startup()
server.shutdown()
@ -118,7 +118,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
def testCleanShutdownAfterFailedStartup() {
val newProps = TestUtils.createBrokerConfig(0, port)
newProps.setProperty("zookeeper.connect", "fakehostthatwontresolve:65535")
val newConfig = new KafkaConfig(newProps)
val newConfig = KafkaConfig.fromProps(newProps)
val server = new KafkaServer(newConfig)
try {
server.startup()

View File

@ -33,7 +33,7 @@ class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort())
val zooKeeperConnect = props.get("zookeeper.connect")
props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot)
val server = TestUtils.createServer(new KafkaConfig(props))
val server = TestUtils.createServer(KafkaConfig.fromProps(props))
val pathExists = ZkUtils.pathExists(zkClient, zookeeperChroot)
assertTrue(pathExists)
@ -48,12 +48,12 @@ class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness {
val brokerId = 0
val props1 = TestUtils.createBrokerConfig(brokerId)
val server1 = TestUtils.createServer(new KafkaConfig(props1))
val server1 = TestUtils.createServer(KafkaConfig.fromProps(props1))
val brokerRegistration = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1
val props2 = TestUtils.createBrokerConfig(brokerId)
try {
TestUtils.createServer(new KafkaConfig(props2))
TestUtils.createServer(KafkaConfig.fromProps(props2))
fail("Registering a broker with a conflicting id should fail")
} catch {
case e : RuntimeException =>

View File

@ -24,7 +24,7 @@ import kafka.log.Log
import kafka.message.{ByteBufferMessageSet, Message}
import scala.Some
import java.util.Collections
import java.util.{Properties, Collections}
import java.util.concurrent.atomic.AtomicBoolean
import collection.JavaConversions._
@ -35,11 +35,16 @@ import junit.framework.Assert._
class SimpleFetchTest extends JUnit3Suite {
val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
override val replicaLagTimeMaxMs = 100L
override val replicaFetchWaitMaxMs = 100
override val replicaLagMaxMessages = 10L
})
val replicaLagTimeMaxMs = 100L
val replicaFetchWaitMaxMs = 100
val replicaLagMaxMessages = 10L
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString)
overridingProps.put(KafkaConfig.ReplicaLagMaxMessagesProp, replicaLagMaxMessages.toString)
val configs = TestUtils.createBrokerConfigs(2).map(KafkaConfig.fromProps(_, overridingProps))
// set the replica manager with the partition
val time = new MockTime

View File

@ -53,7 +53,7 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness {
@Test
def testUpdateLeaderAndIsr() {
val configs = TestUtils.createBrokerConfigs(1).map(new KafkaConfig(_))
val configs = TestUtils.createBrokerConfigs(1).map(KafkaConfig.fromProps)
val log = EasyMock.createMock(classOf[kafka.log.Log])
EasyMock.expect(log.logEndOffset).andReturn(20).anyTimes()
EasyMock.expect(log)