kafka-1667; topic-level configuration not validated; patched by Dmytro Kostiuchenko; reviewed by Jun Rao

This commit is contained in:
Dmytro Kostiuchenko 2014-11-25 14:36:31 -08:00 committed by Jun Rao
parent 9f8b8dad2b
commit 834b641980
6 changed files with 218 additions and 60 deletions

View File

@ -19,6 +19,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* This class is used for specifying the set of expected configurations, their type, their defaults, their
@ -48,6 +49,14 @@ public class ConfigDef {
private final Map<String, ConfigKey> configKeys = new HashMap<String, ConfigKey>();
/**
* Returns unmodifiable set of properties names defined in this {@linkplain ConfigDef}
* @return new unmodifiable {@link Set} instance containing the keys
*/
public Set<String> names() {
return Collections.unmodifiableSet(configKeys.keySet());
}
/**
* Define a new configuration
* @param name The name of the config parameter

View File

@ -256,7 +256,7 @@ object TopicCommand {
.ofType(classOf[String])
val nl = System.getProperty("line.separator")
val configOpt = parser.accepts("config", "A topic configuration override for the topic being created or altered." +
"The following is a list of valid configurations: " + nl + LogConfig.ConfigNames.map("\t" + _).mkString(nl) + nl +
"The following is a list of valid configurations: " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl +
"See the Kafka documentation for full details on the topic configs.")
.withRequiredArg
.describedAs("name=value")

View File

@ -21,7 +21,7 @@ import java.util.Properties
import org.apache.kafka.common.utils.Utils
import scala.collection._
import kafka.common._
import org.apache.kafka.common.config.ConfigDef
object Defaults {
val SegmentSize = 1024 * 1024
@ -44,21 +44,20 @@ object Defaults {
/**
* Configuration settings for a log
* @param segmentSize The soft maximum for the size of a segment file in the log
* @param segmentSize The hard maximum for the size of a segment file in the log
* @param segmentMs The soft maximum on the amount of time before a new log segment is rolled
* @param segmentJitterMs The maximum random jitter subtracted from segmentMs to avoid thundering herds of segment rolling
* @param flushInterval The number of messages that can be written to the log before a flush is forced
* @param flushMs The amount of time the log can have dirty data before a flush is forced
* @param retentionSize The approximate total number of bytes this log can use
* @param retentionMs The age approximate maximum age of the last segment that is retained
* @param retentionMs The approximate maximum age of the last segment that is retained
* @param maxIndexSize The maximum size of an index file
* @param indexInterval The approximate number of bytes between index entries
* @param fileDeleteDelayMs The time to wait before deleting a file from the filesystem
* @param deleteRetentionMs The time to retain delete markers in the log. Only applicable for logs that are being compacted.
* @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned
* @param compact Should old segments in this log be deleted or deduplicated?
* @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled; actually a controller-level property
* but included here for topic-specific configuration validation purposes
* @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled
* @param minInSyncReplicas If number of insync replicas drops below this number, we stop accepting writes with -1 (or all) required acks
*
*/
@ -106,6 +105,10 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
}
object LogConfig {
val Delete = "delete"
val Compact = "compact"
val SegmentBytesProp = "segment.bytes"
val SegmentMsProp = "segment.ms"
val SegmentJitterMsProp = "segment.jitter.ms"
@ -123,46 +126,84 @@ object LogConfig {
val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable"
val MinInSyncReplicasProp = "min.insync.replicas"
val ConfigNames = Set(SegmentBytesProp,
SegmentMsProp,
SegmentJitterMsProp,
SegmentIndexBytesProp,
FlushMessagesProp,
FlushMsProp,
RetentionBytesProp,
RententionMsProp,
MaxMessageBytesProp,
IndexIntervalBytesProp,
FileDeleteDelayMsProp,
DeleteRetentionMsProp,
MinCleanableDirtyRatioProp,
CleanupPolicyProp,
UncleanLeaderElectionEnableProp,
MinInSyncReplicasProp)
val SegmentSizeDoc = "The hard maximum for the size of a segment file in the log"
val SegmentMsDoc = "The soft maximum on the amount of time before a new log segment is rolled"
val SegmentJitterMsDoc = "The maximum random jitter subtracted from segmentMs to avoid thundering herds of segment" +
" rolling"
val FlushIntervalDoc = "The number of messages that can be written to the log before a flush is forced"
val FlushMsDoc = "The amount of time the log can have dirty data before a flush is forced"
val RetentionSizeDoc = "The approximate total number of bytes this log can use"
val RetentionMsDoc = "The approximate maximum age of the last segment that is retained"
val MaxIndexSizeDoc = "The maximum size of an index file"
val MaxMessageSizeDoc = "The maximum size of a message"
val IndexIntervalDoc = "The approximate number of bytes between index entries"
val FileDeleteDelayMsDoc = "The time to wait before deleting a file from the filesystem"
val DeleteRetentionMsDoc = "The time to retain delete markers in the log. Only applicable for logs that are being" +
" compacted."
val MinCleanableRatioDoc = "The ratio of bytes that are available for cleaning to the bytes already cleaned"
val CompactDoc = "Should old segments in this log be deleted or deduplicated?"
val UncleanLeaderElectionEnableDoc = "Indicates whether unclean leader election is enabled"
val MinInSyncReplicasDoc = "If number of insync replicas drops below this number, we stop accepting writes with" +
" -1 (or all) required acks"
private val configDef = {
import ConfigDef.Range._
import ConfigDef.ValidString._
import ConfigDef.Type._
import ConfigDef.Importance._
import java.util.Arrays.asList
new ConfigDef()
.define(SegmentBytesProp, INT, Defaults.SegmentSize, atLeast(0), MEDIUM, SegmentSizeDoc)
.define(SegmentMsProp, LONG, Defaults.SegmentMs, atLeast(0), MEDIUM, SegmentMsDoc)
.define(SegmentJitterMsProp, LONG, Defaults.SegmentJitterMs, atLeast(0), MEDIUM, SegmentJitterMsDoc)
.define(SegmentIndexBytesProp, INT, Defaults.MaxIndexSize, atLeast(0), MEDIUM, MaxIndexSizeDoc)
.define(FlushMessagesProp, LONG, Defaults.FlushInterval, atLeast(0), MEDIUM, FlushIntervalDoc)
.define(FlushMsProp, LONG, Defaults.FlushMs, atLeast(0), MEDIUM, FlushMsDoc)
// can be negative. See kafka.log.LogManager.cleanupSegmentsToMaintainSize
.define(RetentionBytesProp, LONG, Defaults.RetentionSize, MEDIUM, RetentionSizeDoc)
.define(RententionMsProp, LONG, Defaults.RetentionMs, atLeast(0), MEDIUM, RetentionMsDoc)
.define(MaxMessageBytesProp, INT, Defaults.MaxMessageSize, atLeast(0), MEDIUM, MaxMessageSizeDoc)
.define(IndexIntervalBytesProp, INT, Defaults.IndexInterval, atLeast(0), MEDIUM, IndexIntervalDoc)
.define(DeleteRetentionMsProp, LONG, Defaults.DeleteRetentionMs, atLeast(0), MEDIUM, DeleteRetentionMsDoc)
.define(FileDeleteDelayMsProp, LONG, Defaults.FileDeleteDelayMs, atLeast(0), MEDIUM, FileDeleteDelayMsDoc)
.define(MinCleanableDirtyRatioProp, DOUBLE, Defaults.MinCleanableDirtyRatio, between(0, 1), MEDIUM,
MinCleanableRatioDoc)
.define(CleanupPolicyProp, STRING, if (Defaults.Compact) Compact else Delete, in(asList(Compact, Delete)), MEDIUM,
CompactDoc)
// we validate true/false explicitly to fail in case of typo
.define(UncleanLeaderElectionEnableProp, STRING, Defaults.UncleanLeaderElectionEnable.toString,
in(asList(true.toString, false.toString)), MEDIUM, UncleanLeaderElectionEnableDoc)
.define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), MEDIUM, MinInSyncReplicasDoc)
}
def configNames() = {
import JavaConversions._
configDef.names().toList.sorted
}
/**
* Parse the given properties instance into a LogConfig object
*/
def fromProps(props: Properties): LogConfig = {
new LogConfig(segmentSize = props.getProperty(SegmentBytesProp, Defaults.SegmentSize.toString).toInt,
segmentMs = props.getProperty(SegmentMsProp, Defaults.SegmentMs.toString).toLong,
segmentJitterMs = props.getProperty(SegmentJitterMsProp, Defaults.SegmentJitterMs.toString).toLong,
maxIndexSize = props.getProperty(SegmentIndexBytesProp, Defaults.MaxIndexSize.toString).toInt,
flushInterval = props.getProperty(FlushMessagesProp, Defaults.FlushInterval.toString).toLong,
flushMs = props.getProperty(FlushMsProp, Defaults.FlushMs.toString).toLong,
retentionSize = props.getProperty(RetentionBytesProp, Defaults.RetentionSize.toString).toLong,
retentionMs = props.getProperty(RententionMsProp, Defaults.RetentionMs.toString).toLong,
maxMessageSize = props.getProperty(MaxMessageBytesProp, Defaults.MaxMessageSize.toString).toInt,
indexInterval = props.getProperty(IndexIntervalBytesProp, Defaults.IndexInterval.toString).toInt,
fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp, Defaults.FileDeleteDelayMs.toString).toInt,
deleteRetentionMs = props.getProperty(DeleteRetentionMsProp, Defaults.DeleteRetentionMs.toString).toLong,
minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp,
Defaults.MinCleanableDirtyRatio.toString).toDouble,
compact = props.getProperty(CleanupPolicyProp, if(Defaults.Compact) "compact" else "delete")
.trim.toLowerCase != "delete",
uncleanLeaderElectionEnable = props.getProperty(UncleanLeaderElectionEnableProp,
Defaults.UncleanLeaderElectionEnable.toString).toBoolean,
minInSyncReplicas = props.getProperty(MinInSyncReplicasProp,Defaults.MinInSyncReplicas.toString).toInt)
import kafka.utils.Utils.evaluateDefaults
val parsed = configDef.parse(evaluateDefaults(props))
new LogConfig(segmentSize = parsed.get(SegmentBytesProp).asInstanceOf[Int],
segmentMs = parsed.get(SegmentMsProp).asInstanceOf[Long],
segmentJitterMs = parsed.get(SegmentJitterMsProp).asInstanceOf[Long],
maxIndexSize = parsed.get(SegmentIndexBytesProp).asInstanceOf[Int],
flushInterval = parsed.get(FlushMessagesProp).asInstanceOf[Long],
flushMs = parsed.get(FlushMsProp).asInstanceOf[Long],
retentionSize = parsed.get(RetentionBytesProp).asInstanceOf[Long],
retentionMs = parsed.get(RententionMsProp).asInstanceOf[Long],
maxMessageSize = parsed.get(MaxMessageBytesProp).asInstanceOf[Int],
indexInterval = parsed.get(IndexIntervalBytesProp).asInstanceOf[Int],
fileDeleteDelayMs = parsed.get(FileDeleteDelayMsProp).asInstanceOf[Long],
deleteRetentionMs = parsed.get(DeleteRetentionMsProp).asInstanceOf[Long],
minCleanableRatio = parsed.get(MinCleanableDirtyRatioProp).asInstanceOf[Double],
compact = parsed.get(CleanupPolicyProp).asInstanceOf[String].toLowerCase != Delete,
uncleanLeaderElectionEnable = parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[String].toBoolean,
minInSyncReplicas = parsed.get(MinInSyncReplicasProp).asInstanceOf[Int])
}
/**
@ -179,30 +220,17 @@ object LogConfig {
*/
def validateNames(props: Properties) {
import JavaConversions._
val names = configDef.names()
for(name <- props.keys)
require(LogConfig.ConfigNames.contains(name), "Unknown configuration \"%s\".".format(name))
require(names.contains(name), "Unknown configuration \"%s\".".format(name))
}
/**
* Check that the given properties contain only valid log config names, and that all values can be parsed.
* Check that the given properties contain only valid log config names and that all values can be parsed and are valid
*/
def validate(props: Properties) {
validateNames(props)
validateMinInSyncReplicas(props)
LogConfig.fromProps(LogConfig().toProps, props) // check that we can parse the values
}
/**
* Check that MinInSyncReplicas is reasonable
* Unfortunately, we can't validate its smaller than number of replicas
* since we don't have this information here
*/
private def validateMinInSyncReplicas(props: Properties) {
val minIsr = props.getProperty(MinInSyncReplicasProp)
if (minIsr != null && minIsr.toInt < 1) {
throw new InvalidConfigException("Wrong value " + minIsr + " of min.insync.replicas in topic configuration; " +
" Valid values are at least 1")
}
configDef.parse(props)
}
}

View File

@ -506,6 +506,32 @@ object Utils extends Logging {
writer.toString
}
/**
* Turn {@linkplain java.util.Properties} with default values into a {@linkplain java.util.Map}. Following example
* illustrates difference from the cast
* <pre>
* val defaults = new Properties()
* defaults.put("foo", "bar")
* val props = new Properties(defaults)
*
* props.getProperty("foo") // "bar"
* props.get("foo") // null
* evaluateDefaults(props).get("foo") // "bar"
* </pre>
*
* @param props properties to evaluate
* @return new java.util.Map instance
*/
def evaluateDefaults(props: Properties): java.util.Map[String, String] = {
import java.util._
import JavaConversions.asScalaSet
val evaluated = new HashMap[String, String]()
for (name <- props.stringPropertyNames()) {
evaluated.put(name, props.getProperty(name))
}
evaluated
}
/**
* Read some properties with the given default values
*/

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import org.apache.kafka.common.config.ConfigException
import org.scalatest.junit.JUnit3Suite
import org.junit.{Assert, Test}
import java.util.Properties
class LogConfigTest extends JUnit3Suite {
@Test
def testFromPropsDefaults() {
val defaults = new Properties()
defaults.put(LogConfig.SegmentBytesProp, "4242")
val props = new Properties(defaults)
val config = LogConfig.fromProps(props)
Assert.assertEquals(4242, config.segmentSize)
Assert.assertEquals("LogConfig defaults should be retained", Defaults.MaxMessageSize, config.maxMessageSize)
}
@Test
def testFromPropsEmpty() {
val p = new Properties()
val config = LogConfig.fromProps(p)
Assert.assertEquals(LogConfig(), config)
}
@Test
def testFromPropsToProps() {
import scala.util.Random._
val expected = new Properties()
LogConfig.configNames().foreach((name) => {
name match {
case LogConfig.UncleanLeaderElectionEnableProp => expected.setProperty(name, randFrom("true", "false"))
case LogConfig.CleanupPolicyProp => expected.setProperty(name, randFrom(LogConfig.Compact, LogConfig.Delete))
case LogConfig.MinCleanableDirtyRatioProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1))
case LogConfig.MinInSyncReplicasProp => expected.setProperty(name, (nextInt(Int.MaxValue - 1) + 1).toString)
case LogConfig.RetentionBytesProp => expected.setProperty(name, nextInt().toString)
case positiveIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString)
}
})
val actual = LogConfig.fromProps(expected).toProps
Assert.assertEquals(expected, actual)
}
@Test
def testFromPropsInvalid() {
LogConfig.configNames().foreach((name) => {
name match {
case LogConfig.UncleanLeaderElectionEnableProp => return
case LogConfig.RetentionBytesProp => assertPropertyInvalid(name, "not_a_number")
case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar");
case LogConfig.MinCleanableDirtyRatioProp => assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2")
case LogConfig.MinInSyncReplicasProp => assertPropertyInvalid(name, "not_a_number", "0", "-1")
case positiveIntProperty => assertPropertyInvalid(name, "not_a_number", "-1")
}
})
}
private def assertPropertyInvalid(name: String, values: AnyRef*) {
values.foreach((value) => {
val props = new Properties
props.setProperty(name, value.toString)
intercept[ConfigException] {
LogConfig.fromProps(props)
}
})
}
private def randFrom[T](choices: T*): T = {
import scala.util.Random
choices(Random.nextInt(choices.size))
}
}

View File

@ -17,6 +17,8 @@
package kafka.integration
import org.apache.kafka.common.config.ConfigException
import scala.collection.mutable.MutableList
import scala.util.Random
import org.apache.log4j.{Level, Logger}
@ -155,7 +157,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
val topicProps = new Properties()
topicProps.put("unclean.leader.election.enable", "invalid")
intercept[IllegalArgumentException] {
intercept[ConfigException] {
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1)), topicProps)
}
}