kafka-2249; KafkaConfig does not preserve original Properties; patched by Gwen Shapira; reviewed by Jun Rao

This commit is contained in:
Gwen Shapira 2015-06-18 14:07:33 -07:00 committed by Jun Rao
parent ba86f0a25d
commit 5c90407454
22 changed files with 442 additions and 680 deletions

View File

@ -57,15 +57,19 @@ public class AbstractConfig {
return values.get(key);
}
public int getInt(String key) {
public Short getShort(String key) {
return (Short) get(key);
}
public Integer getInt(String key) {
return (Integer) get(key);
}
public long getLong(String key) {
public Long getLong(String key) {
return (Long) get(key);
}
public double getDouble(String key) {
public Double getDouble(String key) {
return (Double) get(key);
}
@ -92,7 +96,7 @@ public class AbstractConfig {
return keys;
}
public Map<String, ?> originals() {
public Map<String, Object> originals() {
Map<String, Object> copy = new HashMap<String, Object>();
copy.putAll(originals);
return copy;

View File

@ -86,7 +86,7 @@ class Partition(val topic: String,
case Some(replica) => replica
case None =>
if (isReplicaLocal(replicaId)) {
val config = LogConfig.fromProps(logManager.defaultConfig.toProps, AdminUtils.fetchTopicConfig(zkClient, topic))
val config = LogConfig.fromProps(logManager.defaultConfig.originals, AdminUtils.fetchTopicConfig(zkClient, topic))
val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
val offsetMap = checkpoint.read

View File

@ -325,7 +325,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
info("starting the partition rebalance scheduler")
autoRebalanceScheduler.startup()
autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS)
5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
}
deleteTopicManager.start()
}
@ -1013,7 +1013,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
// if the replica to be removed from the ISR is the last surviving member of the ISR and unclean leader election
// is disallowed for the corresponding topic, then we must preserve the ISR membership so that the replica can
// eventually be restored as the leader.
if (newIsr.isEmpty && !LogConfig.fromProps(config.toProps, AdminUtils.fetchTopicConfig(zkClient,
if (newIsr.isEmpty && !LogConfig.fromProps(config.originals, AdminUtils.fetchTopicConfig(zkClient,
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
info("Retaining last ISR %d of partition %s since unclean leader election is disabled".format(replicaId, topicAndPartition))
newIsr = leaderAndIsr.isr

View File

@ -61,7 +61,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi
case true =>
// Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration
// for unclean leader election.
if (!LogConfig.fromProps(config.toProps, AdminUtils.fetchTopicConfig(controllerContext.zkClient,
if (!LogConfig.fromProps(config.originals, AdminUtils.fetchTopicConfig(controllerContext.zkClient,
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
throw new NoReplicaOnlineException(("No broker in ISR for partition " +
"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +

View File

@ -18,92 +18,52 @@
package kafka.log
import java.util.Properties
import kafka.server.KafkaConfig
import org.apache.kafka.common.utils.Utils
import scala.collection._
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
import kafka.message.BrokerCompressionCodec
import kafka.message.Message
object Defaults {
val SegmentSize = 1024 * 1024
val SegmentMs = Long.MaxValue
val SegmentJitterMs = 0L
val FlushInterval = Long.MaxValue
val FlushMs = Long.MaxValue
val RetentionSize = Long.MaxValue
val RetentionMs = Long.MaxValue
val MaxMessageSize = Int.MaxValue
val MaxIndexSize = 1024 * 1024
val IndexInterval = 4096
val FileDeleteDelayMs = 60 * 1000L
val DeleteRetentionMs = 24 * 60 * 60 * 1000L
val MinCleanableDirtyRatio = 0.5
val Compact = false
val UncleanLeaderElectionEnable = true
val MinInSyncReplicas = 1
val CompressionType = "producer"
val SegmentSize = kafka.server.Defaults.LogSegmentBytes
val SegmentMs = kafka.server.Defaults.LogRollHours * 60 * 60 * 1000L
val SegmentJitterMs = kafka.server.Defaults.LogRollJitterHours * 60 * 60 * 1000L
val FlushInterval = kafka.server.Defaults.LogFlushIntervalMessages
val FlushMs = kafka.server.Defaults.LogFlushSchedulerIntervalMs
val RetentionSize = kafka.server.Defaults.LogRetentionBytes
val RetentionMs = kafka.server.Defaults.LogRetentionHours * 60 * 60 * 1000L
val MaxMessageSize = kafka.server.Defaults.MessageMaxBytes
val MaxIndexSize = kafka.server.Defaults.LogIndexSizeMaxBytes
val IndexInterval = kafka.server.Defaults.LogIndexIntervalBytes
val FileDeleteDelayMs = kafka.server.Defaults.LogDeleteDelayMs
val DeleteRetentionMs = kafka.server.Defaults.LogCleanerDeleteRetentionMs
val MinCleanableDirtyRatio = kafka.server.Defaults.LogCleanerMinCleanRatio
val Compact = kafka.server.Defaults.LogCleanupPolicy
val UncleanLeaderElectionEnable = kafka.server.Defaults.UncleanLeaderElectionEnable
val MinInSyncReplicas = kafka.server.Defaults.MinInSyncReplicas
val CompressionType = kafka.server.Defaults.CompressionType
}
/**
* Configuration settings for a log
* @param segmentSize The hard maximum for the size of a segment file in the log
* @param segmentMs The soft maximum on the amount of time before a new log segment is rolled
* @param segmentJitterMs The maximum random jitter subtracted from segmentMs to avoid thundering herds of segment rolling
* @param flushInterval The number of messages that can be written to the log before a flush is forced
* @param flushMs The amount of time the log can have dirty data before a flush is forced
* @param retentionSize The approximate total number of bytes this log can use
* @param retentionMs The approximate maximum age of the last segment that is retained
* @param maxIndexSize The maximum size of an index file
* @param indexInterval The approximate number of bytes between index entries
* @param fileDeleteDelayMs The time to wait before deleting a file from the filesystem
* @param deleteRetentionMs The time to retain delete markers in the log. Only applicable for logs that are being compacted.
* @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned
* @param compact Should old segments in this log be deleted or deduplicated?
* @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled
* @param minInSyncReplicas If number of insync replicas drops below this number, we stop accepting writes with -1 (or all) required acks
* @param compressionType compressionType for a given topic
*
*/
case class LogConfig(segmentSize: Int = Defaults.SegmentSize,
segmentMs: Long = Defaults.SegmentMs,
segmentJitterMs: Long = Defaults.SegmentJitterMs,
flushInterval: Long = Defaults.FlushInterval,
flushMs: Long = Defaults.FlushMs,
retentionSize: Long = Defaults.RetentionSize,
retentionMs: Long = Defaults.RetentionMs,
maxMessageSize: Int = Defaults.MaxMessageSize,
maxIndexSize: Int = Defaults.MaxIndexSize,
indexInterval: Int = Defaults.IndexInterval,
fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs,
deleteRetentionMs: Long = Defaults.DeleteRetentionMs,
minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio,
compact: Boolean = Defaults.Compact,
uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable,
minInSyncReplicas: Int = Defaults.MinInSyncReplicas,
compressionType: String = Defaults.CompressionType) {
case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props) {
def toProps: Properties = {
val props = new Properties()
import LogConfig._
props.put(SegmentBytesProp, segmentSize.toString)
props.put(SegmentMsProp, segmentMs.toString)
props.put(SegmentJitterMsProp, segmentJitterMs.toString)
props.put(SegmentIndexBytesProp, maxIndexSize.toString)
props.put(FlushMessagesProp, flushInterval.toString)
props.put(FlushMsProp, flushMs.toString)
props.put(RetentionBytesProp, retentionSize.toString)
props.put(RetentionMsProp, retentionMs.toString)
props.put(MaxMessageBytesProp, maxMessageSize.toString)
props.put(IndexIntervalBytesProp, indexInterval.toString)
props.put(DeleteRetentionMsProp, deleteRetentionMs.toString)
props.put(FileDeleteDelayMsProp, fileDeleteDelayMs.toString)
props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString)
props.put(CleanupPolicyProp, if(compact) "compact" else "delete")
props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString)
props.put(MinInSyncReplicasProp, minInSyncReplicas.toString)
props.put(CompressionTypeProp, compressionType)
props
}
val segmentSize = getInt(LogConfig.SegmentBytesProp)
val segmentMs = getLong(LogConfig.SegmentMsProp)
val segmentJitterMs = getLong(LogConfig.SegmentJitterMsProp)
val maxIndexSize = getInt(LogConfig.SegmentIndexBytesProp)
val flushInterval = getLong(LogConfig.FlushMessagesProp)
val flushMs = getLong(LogConfig.FlushMsProp)
val retentionSize = getLong(LogConfig.RetentionBytesProp)
val retentionMs = getLong(LogConfig.RetentionMsProp)
val maxMessageSize = getInt(LogConfig.MaxMessageBytesProp)
val indexInterval = getInt(LogConfig.IndexIntervalBytesProp)
val fileDeleteDelayMs = getLong(LogConfig.FileDeleteDelayMsProp)
val deleteRetentionMs = getLong(LogConfig.DeleteRetentionMsProp)
val minCleanableRatio = getDouble(LogConfig.MinCleanableDirtyRatioProp)
val compact = getString(LogConfig.CleanupPolicyProp).toLowerCase != LogConfig.Delete
val uncleanLeaderElectionEnable = getBoolean(LogConfig.UncleanLeaderElectionEnableProp)
val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp)
val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase
def randomSegmentJitter: Long =
if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs)
@ -111,6 +71,10 @@ case class LogConfig(segmentSize: Int = Defaults.SegmentSize,
object LogConfig {
def main(args: Array[String]) {
System.out.println(configDef.toHtmlTable)
}
val Delete = "delete"
val Compact = "compact"
@ -179,7 +143,7 @@ object LogConfig {
.define(FileDeleteDelayMsProp, LONG, Defaults.FileDeleteDelayMs, atLeast(0), MEDIUM, FileDeleteDelayMsDoc)
.define(MinCleanableDirtyRatioProp, DOUBLE, Defaults.MinCleanableDirtyRatio, between(0, 1), MEDIUM,
MinCleanableRatioDoc)
.define(CleanupPolicyProp, STRING, if (Defaults.Compact) Compact else Delete, in(Compact, Delete), MEDIUM,
.define(CleanupPolicyProp, STRING, Defaults.Compact, in(Compact, Delete), MEDIUM,
CompactDoc)
.define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable,
MEDIUM, UncleanLeaderElectionEnableDoc)
@ -187,44 +151,22 @@ object LogConfig {
.define(CompressionTypeProp, STRING, Defaults.CompressionType, in(BrokerCompressionCodec.brokerCompressionOptions:_*), MEDIUM, CompressionTypeDoc)
}
def apply(): LogConfig = LogConfig(new Properties())
def configNames() = {
import JavaConversions._
configDef.names().toList.sorted
}
/**
* Parse the given properties instance into a LogConfig object
*/
def fromProps(props: Properties): LogConfig = {
import kafka.utils.CoreUtils.evaluateDefaults
val parsed = configDef.parse(evaluateDefaults(props))
new LogConfig(segmentSize = parsed.get(SegmentBytesProp).asInstanceOf[Int],
segmentMs = parsed.get(SegmentMsProp).asInstanceOf[Long],
segmentJitterMs = parsed.get(SegmentJitterMsProp).asInstanceOf[Long],
maxIndexSize = parsed.get(SegmentIndexBytesProp).asInstanceOf[Int],
flushInterval = parsed.get(FlushMessagesProp).asInstanceOf[Long],
flushMs = parsed.get(FlushMsProp).asInstanceOf[Long],
retentionSize = parsed.get(RetentionBytesProp).asInstanceOf[Long],
retentionMs = parsed.get(RetentionMsProp).asInstanceOf[Long],
maxMessageSize = parsed.get(MaxMessageBytesProp).asInstanceOf[Int],
indexInterval = parsed.get(IndexIntervalBytesProp).asInstanceOf[Int],
fileDeleteDelayMs = parsed.get(FileDeleteDelayMsProp).asInstanceOf[Long],
deleteRetentionMs = parsed.get(DeleteRetentionMsProp).asInstanceOf[Long],
minCleanableRatio = parsed.get(MinCleanableDirtyRatioProp).asInstanceOf[Double],
compact = parsed.get(CleanupPolicyProp).asInstanceOf[String].toLowerCase != Delete,
uncleanLeaderElectionEnable = parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[Boolean],
minInSyncReplicas = parsed.get(MinInSyncReplicasProp).asInstanceOf[Int],
compressionType = parsed.get(CompressionTypeProp).asInstanceOf[String].toLowerCase())
}
/**
* Create a log config instance using the given properties and defaults
*/
def fromProps(defaults: Properties, overrides: Properties): LogConfig = {
val props = new Properties(defaults)
def fromProps(defaults: java.util.Map[_ <: Object, _ <: Object], overrides: Properties): LogConfig = {
val props = new Properties()
props.putAll(defaults)
props.putAll(overrides)
fromProps(props)
LogConfig(props)
}
/**

View File

@ -356,7 +356,7 @@ class LogManager(val logDirs: Array[File],
.format(topicAndPartition.topic,
topicAndPartition.partition,
dataDir.getAbsolutePath,
{import JavaConversions._; config.toProps.mkString(", ")}))
{import JavaConversions._; config.originals.mkString(", ")}))
log
}
}

View File

@ -428,9 +428,9 @@ class KafkaApis(val requestChannel: RequestChannel,
val aliveBrokers = metadataCache.getAliveBrokers
val offsetsTopicReplicationFactor =
if (aliveBrokers.length > 0)
Math.min(config.offsetsTopicReplicationFactor, aliveBrokers.length)
Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length)
else
config.offsetsTopicReplicationFactor
config.offsetsTopicReplicationFactor.toInt
AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions,
offsetsTopicReplicationFactor,
offsetManager.offsetsTopicConfig)

View File

@ -26,7 +26,7 @@ import kafka.consumer.ConsumerConfig
import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet}
import kafka.utils.CoreUtils
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.common.config.{ConfigException, AbstractConfig, ConfigDef}
import org.apache.kafka.common.metrics.MetricsReporter
import org.apache.kafka.common.protocol.SecurityProtocol
import scala.collection.{mutable, immutable, JavaConversions, Map}
@ -141,6 +141,10 @@ object Defaults {
object KafkaConfig {
def main(args: Array[String]) {
System.out.println(configDef.toHtmlTable)
}
/** ********* Zookeeper Configuration ***********/
val ZkConnectProp = "zookeeper.connect"
val ZkSessionTimeoutMsProp = "zookeeper.session.timeout.ms"
@ -482,14 +486,14 @@ object KafkaConfig {
.define(ProducerPurgatoryPurgeIntervalRequestsProp, INT, Defaults.ProducerPurgatoryPurgeIntervalRequests, MEDIUM, ProducerPurgatoryPurgeIntervalRequestsDoc)
.define(AutoLeaderRebalanceEnableProp, BOOLEAN, Defaults.AutoLeaderRebalanceEnable, HIGH, AutoLeaderRebalanceEnableDoc)
.define(LeaderImbalancePerBrokerPercentageProp, INT, Defaults.LeaderImbalancePerBrokerPercentage, HIGH, LeaderImbalancePerBrokerPercentageDoc)
.define(LeaderImbalanceCheckIntervalSecondsProp, INT, Defaults.LeaderImbalanceCheckIntervalSeconds, HIGH, LeaderImbalanceCheckIntervalSecondsDoc)
.define(LeaderImbalanceCheckIntervalSecondsProp, LONG, Defaults.LeaderImbalanceCheckIntervalSeconds, HIGH, LeaderImbalanceCheckIntervalSecondsDoc)
.define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable, HIGH, UncleanLeaderElectionEnableDoc)
.define(InterBrokerSecurityProtocolProp, STRING, Defaults.InterBrokerSecurityProtocol, MEDIUM, InterBrokerSecurityProtocolDoc)
.define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, MEDIUM, InterBrokerProtocolVersionDoc)
/** ********* Controlled shutdown configuration ***********/
.define(ControlledShutdownMaxRetriesProp, INT, Defaults.ControlledShutdownMaxRetries, MEDIUM, ControlledShutdownMaxRetriesDoc)
.define(ControlledShutdownRetryBackoffMsProp, INT, Defaults.ControlledShutdownRetryBackoffMs, MEDIUM, ControlledShutdownRetryBackoffMsDoc)
.define(ControlledShutdownRetryBackoffMsProp, LONG, Defaults.ControlledShutdownRetryBackoffMs, MEDIUM, ControlledShutdownRetryBackoffMsDoc)
.define(ControlledShutdownEnableProp, BOOLEAN, Defaults.ControlledShutdownEnable, MEDIUM, ControlledShutdownEnableDoc)
/** ********* Consumer coordinator configuration ***********/
@ -519,139 +523,6 @@ object KafkaConfig {
configDef.names().toList.sorted
}
/**
* Parse the given properties instance into a KafkaConfig object
*/
def fromProps(props: Properties): KafkaConfig = {
import kafka.utils.CoreUtils.evaluateDefaults
val parsed = configDef.parse(evaluateDefaults(props))
new KafkaConfig(
/** ********* Zookeeper Configuration ***********/
zkConnect = parsed.get(ZkConnectProp).asInstanceOf[String],
zkSessionTimeoutMs = parsed.get(ZkSessionTimeoutMsProp).asInstanceOf[Int],
_zkConnectionTimeoutMs = Option(parsed.get(ZkConnectionTimeoutMsProp)).map(_.asInstanceOf[Int]),
zkSyncTimeMs = parsed.get(ZkSyncTimeMsProp).asInstanceOf[Int],
/** ********* General Configuration ***********/
maxReservedBrokerId = parsed.get(MaxReservedBrokerIdProp).asInstanceOf[Int],
brokerId = parsed.get(BrokerIdProp).asInstanceOf[Int],
messageMaxBytes = parsed.get(MessageMaxBytesProp).asInstanceOf[Int],
numNetworkThreads = parsed.get(NumNetworkThreadsProp).asInstanceOf[Int],
numIoThreads = parsed.get(NumIoThreadsProp).asInstanceOf[Int],
backgroundThreads = parsed.get(BackgroundThreadsProp).asInstanceOf[Int],
queuedMaxRequests = parsed.get(QueuedMaxRequestsProp).asInstanceOf[Int],
/** ********* Socket Server Configuration ***********/
port = parsed.get(PortProp).asInstanceOf[Int],
hostName = parsed.get(HostNameProp).asInstanceOf[String],
_listeners = Option(parsed.get(ListenersProp)).map(_.asInstanceOf[String]),
_advertisedHostName = Option(parsed.get(AdvertisedHostNameProp)).map(_.asInstanceOf[String]),
_advertisedPort = Option(parsed.get(AdvertisedPortProp)).map(_.asInstanceOf[Int]),
_advertisedListeners = Option(parsed.get(AdvertisedListenersProp)).map(_.asInstanceOf[String]),
socketSendBufferBytes = parsed.get(SocketSendBufferBytesProp).asInstanceOf[Int],
socketReceiveBufferBytes = parsed.get(SocketReceiveBufferBytesProp).asInstanceOf[Int],
socketRequestMaxBytes = parsed.get(SocketRequestMaxBytesProp).asInstanceOf[Int],
maxConnectionsPerIp = parsed.get(MaxConnectionsPerIpProp).asInstanceOf[Int],
_maxConnectionsPerIpOverrides = parsed.get(MaxConnectionsPerIpOverridesProp).asInstanceOf[String],
connectionsMaxIdleMs = parsed.get(ConnectionsMaxIdleMsProp).asInstanceOf[Long],
/** ********* Log Configuration ***********/
numPartitions = parsed.get(NumPartitionsProp).asInstanceOf[Int],
_logDir = parsed.get(LogDirProp).asInstanceOf[String],
_logDirs = Option(parsed.get(LogDirsProp)).map(_.asInstanceOf[String]),
logSegmentBytes = parsed.get(LogSegmentBytesProp).asInstanceOf[Int],
logRollTimeHours = parsed.get(LogRollTimeHoursProp).asInstanceOf[Int],
_logRollTimeMillis = Option(parsed.get(LogRollTimeMillisProp)).map(_.asInstanceOf[Long]),
logRollTimeJitterHours = parsed.get(LogRollTimeJitterHoursProp).asInstanceOf[Int],
_logRollTimeJitterMillis = Option(parsed.get(LogRollTimeJitterMillisProp)).map(_.asInstanceOf[Long]),
logRetentionTimeHours = parsed.get(LogRetentionTimeHoursProp).asInstanceOf[Int],
_logRetentionTimeMins = Option(parsed.get(LogRetentionTimeMinutesProp)).map(_.asInstanceOf[Int]),
_logRetentionTimeMillis = Option(parsed.get(LogRetentionTimeMillisProp)).map(_.asInstanceOf[Long]),
logRetentionBytes = parsed.get(LogRetentionBytesProp).asInstanceOf[Long],
logCleanupIntervalMs = parsed.get(LogCleanupIntervalMsProp).asInstanceOf[Long],
logCleanupPolicy = parsed.get(LogCleanupPolicyProp).asInstanceOf[String],
logCleanerThreads = parsed.get(LogCleanerThreadsProp).asInstanceOf[Int],
logCleanerIoMaxBytesPerSecond = parsed.get(LogCleanerIoMaxBytesPerSecondProp).asInstanceOf[Double],
logCleanerDedupeBufferSize = parsed.get(LogCleanerDedupeBufferSizeProp).asInstanceOf[Long],
logCleanerIoBufferSize = parsed.get(LogCleanerIoBufferSizeProp).asInstanceOf[Int],
logCleanerDedupeBufferLoadFactor = parsed.get(LogCleanerDedupeBufferLoadFactorProp).asInstanceOf[Double],
logCleanerBackoffMs = parsed.get(LogCleanerBackoffMsProp).asInstanceOf[Long],
logCleanerMinCleanRatio = parsed.get(LogCleanerMinCleanRatioProp).asInstanceOf[Double],
logCleanerEnable = parsed.get(LogCleanerEnableProp).asInstanceOf[Boolean],
logCleanerDeleteRetentionMs = parsed.get(LogCleanerDeleteRetentionMsProp).asInstanceOf[Long],
logIndexSizeMaxBytes = parsed.get(LogIndexSizeMaxBytesProp).asInstanceOf[Int],
logIndexIntervalBytes = parsed.get(LogIndexIntervalBytesProp).asInstanceOf[Int],
logFlushIntervalMessages = parsed.get(LogFlushIntervalMessagesProp).asInstanceOf[Long],
logDeleteDelayMs = parsed.get(LogDeleteDelayMsProp).asInstanceOf[Long],
logFlushSchedulerIntervalMs = parsed.get(LogFlushSchedulerIntervalMsProp).asInstanceOf[Long],
_logFlushIntervalMs = Option(parsed.get(LogFlushIntervalMsProp)).map(_.asInstanceOf[Long]),
logFlushOffsetCheckpointIntervalMs = parsed.get(LogFlushOffsetCheckpointIntervalMsProp).asInstanceOf[Int],
numRecoveryThreadsPerDataDir = parsed.get(NumRecoveryThreadsPerDataDirProp).asInstanceOf[Int],
autoCreateTopicsEnable = parsed.get(AutoCreateTopicsEnableProp).asInstanceOf[Boolean],
minInSyncReplicas = parsed.get(MinInSyncReplicasProp).asInstanceOf[Int],
/** ********* Replication configuration ***********/
controllerSocketTimeoutMs = parsed.get(ControllerSocketTimeoutMsProp).asInstanceOf[Int],
defaultReplicationFactor = parsed.get(DefaultReplicationFactorProp).asInstanceOf[Int],
replicaLagTimeMaxMs = parsed.get(ReplicaLagTimeMaxMsProp).asInstanceOf[Long],
replicaSocketTimeoutMs = parsed.get(ReplicaSocketTimeoutMsProp).asInstanceOf[Int],
replicaSocketReceiveBufferBytes = parsed.get(ReplicaSocketReceiveBufferBytesProp).asInstanceOf[Int],
replicaFetchMaxBytes = parsed.get(ReplicaFetchMaxBytesProp).asInstanceOf[Int],
replicaFetchWaitMaxMs = parsed.get(ReplicaFetchWaitMaxMsProp).asInstanceOf[Int],
replicaFetchMinBytes = parsed.get(ReplicaFetchMinBytesProp).asInstanceOf[Int],
replicaFetchBackoffMs = parsed.get(ReplicaFetchBackoffMsProp).asInstanceOf[Int],
numReplicaFetchers = parsed.get(NumReplicaFetchersProp).asInstanceOf[Int],
replicaHighWatermarkCheckpointIntervalMs = parsed.get(ReplicaHighWatermarkCheckpointIntervalMsProp).asInstanceOf[Long],
fetchPurgatoryPurgeIntervalRequests = parsed.get(FetchPurgatoryPurgeIntervalRequestsProp).asInstanceOf[Int],
producerPurgatoryPurgeIntervalRequests = parsed.get(ProducerPurgatoryPurgeIntervalRequestsProp).asInstanceOf[Int],
autoLeaderRebalanceEnable = parsed.get(AutoLeaderRebalanceEnableProp).asInstanceOf[Boolean],
leaderImbalancePerBrokerPercentage = parsed.get(LeaderImbalancePerBrokerPercentageProp).asInstanceOf[Int],
leaderImbalanceCheckIntervalSeconds = parsed.get(LeaderImbalanceCheckIntervalSecondsProp).asInstanceOf[Int],
uncleanLeaderElectionEnable = parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[Boolean],
interBrokerSecurityProtocol = SecurityProtocol.valueOf(parsed.get(InterBrokerSecurityProtocolProp).asInstanceOf[String]),
interBrokerProtocolVersion = ApiVersion(parsed.get(InterBrokerProtocolVersionProp).asInstanceOf[String]),
/** ********* Controlled shutdown configuration ***********/
controlledShutdownMaxRetries = parsed.get(ControlledShutdownMaxRetriesProp).asInstanceOf[Int],
controlledShutdownRetryBackoffMs = parsed.get(ControlledShutdownRetryBackoffMsProp).asInstanceOf[Int],
controlledShutdownEnable = parsed.get(ControlledShutdownEnableProp).asInstanceOf[Boolean],
/** ********* Consumer coordinator configuration ***********/
consumerMinSessionTimeoutMs = parsed.get(ConsumerMinSessionTimeoutMsProp).asInstanceOf[Int],
consumerMaxSessionTimeoutMs = parsed.get(ConsumerMaxSessionTimeoutMsProp).asInstanceOf[Int],
/** ********* Offset management configuration ***********/
offsetMetadataMaxSize = parsed.get(OffsetMetadataMaxSizeProp).asInstanceOf[Int],
offsetsLoadBufferSize = parsed.get(OffsetsLoadBufferSizeProp).asInstanceOf[Int],
offsetsTopicReplicationFactor = parsed.get(OffsetsTopicReplicationFactorProp).asInstanceOf[Short],
offsetsTopicPartitions = parsed.get(OffsetsTopicPartitionsProp).asInstanceOf[Int],
offsetsTopicSegmentBytes = parsed.get(OffsetsTopicSegmentBytesProp).asInstanceOf[Int],
offsetsTopicCompressionCodec = Option(parsed.get(OffsetsTopicCompressionCodecProp)).map(_.asInstanceOf[Int]).map(value => CompressionCodec.getCompressionCodec(value)).orNull,
offsetsRetentionMinutes = parsed.get(OffsetsRetentionMinutesProp).asInstanceOf[Int],
offsetsRetentionCheckIntervalMs = parsed.get(OffsetsRetentionCheckIntervalMsProp).asInstanceOf[Long],
offsetCommitTimeoutMs = parsed.get(OffsetCommitTimeoutMsProp).asInstanceOf[Int],
offsetCommitRequiredAcks = parsed.get(OffsetCommitRequiredAcksProp).asInstanceOf[Short],
deleteTopicEnable = parsed.get(DeleteTopicEnableProp).asInstanceOf[Boolean],
compressionType = parsed.get(CompressionTypeProp).asInstanceOf[String],
metricNumSamples = parsed.get(MetricNumSamplesProp).asInstanceOf[Int],
metricSampleWindowMs = parsed.get(MetricSampleWindowMsProp).asInstanceOf[Long],
_metricReporterClasses = parsed.get(MetricReporterClassesProp).asInstanceOf[java.util.List[String]]
)
}
/**
* Create a log config instance using the given properties and defaults
*/
def fromProps(defaults: Properties, overrides: Properties): KafkaConfig = {
val props = new Properties(defaults)
props.putAll(overrides)
fromProps(props)
}
/**
* Check that property names are valid
*/
@ -662,171 +533,149 @@ object KafkaConfig {
require(names.contains(name), "Unknown configuration \"%s\".".format(name))
}
/**
* Check that the given properties contain only valid kafka config names and that all values can be parsed and are valid
*/
def validate(props: Properties) {
validateNames(props)
configDef.parse(props)
// to bootstrap KafkaConfig.validateValues()
KafkaConfig.fromProps(props)
def fromProps(props: Properties): KafkaConfig = {
KafkaConfig(props)
}
def fromProps(defaults: Properties, overrides: Properties): KafkaConfig = {
val props = new Properties()
props.putAll(defaults)
props.putAll(overrides)
fromProps(props)
}
}
class KafkaConfig (/** ********* Zookeeper Configuration ***********/
val zkConnect: String,
val zkSessionTimeoutMs: Int = Defaults.ZkSessionTimeoutMs,
private val _zkConnectionTimeoutMs: Option[Int] = None,
val zkSyncTimeMs: Int = Defaults.ZkSyncTimeMs,
case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(KafkaConfig.configDef, props) {
/** ********* General Configuration ***********/
val maxReservedBrokerId: Int = Defaults.MaxReservedBrokerId,
var brokerId: Int = Defaults.BrokerId,
val messageMaxBytes: Int = Defaults.MessageMaxBytes,
val numNetworkThreads: Int = Defaults.NumNetworkThreads,
val numIoThreads: Int = Defaults.NumIoThreads,
val backgroundThreads: Int = Defaults.BackgroundThreads,
val queuedMaxRequests: Int = Defaults.QueuedMaxRequests,
/** ********* Zookeeper Configuration ***********/
val zkConnect: String = getString(KafkaConfig.ZkConnectProp)
val zkSessionTimeoutMs: Int = getInt(KafkaConfig.ZkSessionTimeoutMsProp)
val zkConnectionTimeoutMs: java.lang.Integer =
Option(getInt(KafkaConfig.ZkConnectionTimeoutMsProp)).getOrElse(getInt(KafkaConfig.ZkSessionTimeoutMsProp))
val zkSyncTimeMs: Int = getInt(KafkaConfig.ZkSyncTimeMsProp)
/** ********* Socket Server Configuration ***********/
val port: Int = Defaults.Port,
val hostName: String = Defaults.HostName,
private val _listeners: Option[String] = None,
private val _advertisedHostName: Option[String] = None,
private val _advertisedPort: Option[Int] = None,
private val _advertisedListeners: Option[String] = None,
val socketSendBufferBytes: Int = Defaults.SocketSendBufferBytes,
val socketReceiveBufferBytes: Int = Defaults.SocketReceiveBufferBytes,
val socketRequestMaxBytes: Int = Defaults.SocketRequestMaxBytes,
val maxConnectionsPerIp: Int = Defaults.MaxConnectionsPerIp,
private val _maxConnectionsPerIpOverrides: String = Defaults.MaxConnectionsPerIpOverrides,
val connectionsMaxIdleMs: Long = Defaults.ConnectionsMaxIdleMs,
/** ********* General Configuration ***********/
val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
val numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp)
val backgroundThreads = getInt(KafkaConfig.BackgroundThreadsProp)
val queuedMaxRequests = getInt(KafkaConfig.QueuedMaxRequestsProp)
val numIoThreads = getInt(KafkaConfig.NumIoThreadsProp)
val messageMaxBytes = getInt(KafkaConfig.MessageMaxBytesProp)
/** ********* Log Configuration ***********/
val numPartitions: Int = Defaults.NumPartitions,
private val _logDir: String = Defaults.LogDir,
private val _logDirs: Option[String] = None,
val logSegmentBytes: Int = Defaults.LogSegmentBytes,
val logRollTimeHours: Int = Defaults.LogRollHours,
private val _logRollTimeMillis: Option[Long] = None,
val logRollTimeJitterHours: Int = Defaults.LogRollJitterHours,
private val _logRollTimeJitterMillis: Option[Long] = None,
val logRetentionTimeHours: Int = Defaults.LogRetentionHours,
private val _logRetentionTimeMins: Option[Int] = None,
private val _logRetentionTimeMillis: Option[Long] = None,
val logRetentionBytes: Long = Defaults.LogRetentionBytes,
val logCleanupIntervalMs: Long = Defaults.LogCleanupIntervalMs,
val logCleanupPolicy: String = Defaults.LogCleanupPolicy,
val logCleanerThreads: Int = Defaults.LogCleanerThreads,
val logCleanerIoMaxBytesPerSecond: Double = Defaults.LogCleanerIoMaxBytesPerSecond,
val logCleanerDedupeBufferSize: Long = Defaults.LogCleanerDedupeBufferSize,
val logCleanerIoBufferSize: Int = Defaults.LogCleanerIoBufferSize,
val logCleanerDedupeBufferLoadFactor: Double = Defaults.LogCleanerDedupeBufferLoadFactor,
val logCleanerBackoffMs: Long = Defaults.LogCleanerBackoffMs,
val logCleanerMinCleanRatio: Double = Defaults.LogCleanerMinCleanRatio,
val logCleanerEnable: Boolean = Defaults.LogCleanerEnable,
val logCleanerDeleteRetentionMs: Long = Defaults.LogCleanerDeleteRetentionMs,
val logIndexSizeMaxBytes: Int = Defaults.LogIndexSizeMaxBytes,
val logIndexIntervalBytes: Int = Defaults.LogIndexIntervalBytes,
val logFlushIntervalMessages: Long = Defaults.LogFlushIntervalMessages,
val logDeleteDelayMs: Long = Defaults.LogDeleteDelayMs,
val logFlushSchedulerIntervalMs: Long = Defaults.LogFlushSchedulerIntervalMs,
private val _logFlushIntervalMs: Option[Long] = None,
val logFlushOffsetCheckpointIntervalMs: Int = Defaults.LogFlushOffsetCheckpointIntervalMs,
val numRecoveryThreadsPerDataDir: Int = Defaults.NumRecoveryThreadsPerDataDir,
val autoCreateTopicsEnable: Boolean = Defaults.AutoCreateTopicsEnable,
val minInSyncReplicas: Int = Defaults.MinInSyncReplicas,
/** ********* Replication configuration ***********/
val controllerSocketTimeoutMs: Int = Defaults.ControllerSocketTimeoutMs,
val defaultReplicationFactor: Int = Defaults.DefaultReplicationFactor,
val replicaLagTimeMaxMs: Long = Defaults.ReplicaLagTimeMaxMs,
val replicaSocketTimeoutMs: Int = Defaults.ReplicaSocketTimeoutMs,
val replicaSocketReceiveBufferBytes: Int = Defaults.ReplicaSocketReceiveBufferBytes,
val replicaFetchMaxBytes: Int = Defaults.ReplicaFetchMaxBytes,
val replicaFetchWaitMaxMs: Int = Defaults.ReplicaFetchWaitMaxMs,
val replicaFetchMinBytes: Int = Defaults.ReplicaFetchMinBytes,
val replicaFetchBackoffMs: Int = Defaults.ReplicaFetchBackoffMs,
val numReplicaFetchers: Int = Defaults.NumReplicaFetchers,
val replicaHighWatermarkCheckpointIntervalMs: Long = Defaults.ReplicaHighWatermarkCheckpointIntervalMs,
val fetchPurgatoryPurgeIntervalRequests: Int = Defaults.FetchPurgatoryPurgeIntervalRequests,
val producerPurgatoryPurgeIntervalRequests: Int = Defaults.ProducerPurgatoryPurgeIntervalRequests,
val autoLeaderRebalanceEnable: Boolean = Defaults.AutoLeaderRebalanceEnable,
val leaderImbalancePerBrokerPercentage: Int = Defaults.LeaderImbalancePerBrokerPercentage,
val leaderImbalanceCheckIntervalSeconds: Int = Defaults.LeaderImbalanceCheckIntervalSeconds,
val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable,
val interBrokerSecurityProtocol: SecurityProtocol = SecurityProtocol.valueOf(Defaults.InterBrokerSecurityProtocol),
val interBrokerProtocolVersion: ApiVersion = ApiVersion(Defaults.InterBrokerProtocolVersion),
/** ********* Controlled shutdown configuration ***********/
val controlledShutdownMaxRetries: Int = Defaults.ControlledShutdownMaxRetries,
val controlledShutdownRetryBackoffMs: Int = Defaults.ControlledShutdownRetryBackoffMs,
val controlledShutdownEnable: Boolean = Defaults.ControlledShutdownEnable,
/** ********* Consumer coordinator configuration ***********/
val consumerMinSessionTimeoutMs: Int = Defaults.ConsumerMinSessionTimeoutMs,
val consumerMaxSessionTimeoutMs: Int = Defaults.ConsumerMaxSessionTimeoutMs,
/** ********* Offset management configuration ***********/
val offsetMetadataMaxSize: Int = Defaults.OffsetMetadataMaxSize,
val offsetsLoadBufferSize: Int = Defaults.OffsetsLoadBufferSize,
val offsetsTopicReplicationFactor: Short = Defaults.OffsetsTopicReplicationFactor,
val offsetsTopicPartitions: Int = Defaults.OffsetsTopicPartitions,
val offsetsTopicSegmentBytes: Int = Defaults.OffsetsTopicSegmentBytes,
val offsetsTopicCompressionCodec: CompressionCodec = CompressionCodec.getCompressionCodec(Defaults.OffsetsTopicCompressionCodec),
val offsetsRetentionMinutes: Int = Defaults.OffsetsRetentionMinutes,
val offsetsRetentionCheckIntervalMs: Long = Defaults.OffsetsRetentionCheckIntervalMs,
val offsetCommitTimeoutMs: Int = Defaults.OffsetCommitTimeoutMs,
val offsetCommitRequiredAcks: Short = Defaults.OffsetCommitRequiredAcks,
val deleteTopicEnable: Boolean = Defaults.DeleteTopicEnable,
val compressionType: String = Defaults.CompressionType,
val metricSampleWindowMs: Long = Defaults.MetricSampleWindowMs,
val metricNumSamples: Int = Defaults.MetricNumSamples,
private val _metricReporterClasses: java.util.List[String] = util.Arrays.asList(Defaults.MetricReporterClasses)
) {
val zkConnectionTimeoutMs: Int = _zkConnectionTimeoutMs.getOrElse(zkSessionTimeoutMs)
val listeners = getListeners()
val advertisedHostName: String = _advertisedHostName.getOrElse(hostName)
val advertisedPort: Int = _advertisedPort.getOrElse(port)
val advertisedListeners = getAdvertisedListeners()
val logDirs = CoreUtils.parseCsvList(_logDirs.getOrElse(_logDir))
val logRollTimeMillis = _logRollTimeMillis.getOrElse(60 * 60 * 1000L * logRollTimeHours)
val logRollTimeJitterMillis = _logRollTimeJitterMillis.getOrElse(60 * 60 * 1000L * logRollTimeJitterHours)
val logRetentionTimeMillis = getLogRetentionTimeMillis
val logFlushIntervalMs = _logFlushIntervalMs.getOrElse(logFlushSchedulerIntervalMs)
/** ********* Socket Server Configuration ***********/
val hostName = getString(KafkaConfig.HostNameProp)
val port = getInt(KafkaConfig.PortProp)
val advertisedHostName = Option(getString(KafkaConfig.AdvertisedHostNameProp)).getOrElse(hostName)
val advertisedPort: java.lang.Integer = Option(getInt(KafkaConfig.AdvertisedPortProp)).getOrElse(port)
val socketSendBufferBytes = getInt(KafkaConfig.SocketSendBufferBytesProp)
val socketReceiveBufferBytes = getInt(KafkaConfig.SocketReceiveBufferBytesProp)
val socketRequestMaxBytes = getInt(KafkaConfig.SocketRequestMaxBytesProp)
val maxConnectionsPerIp = getInt(KafkaConfig.MaxConnectionsPerIpProp)
val maxConnectionsPerIpOverrides: Map[String, Int] =
getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, _maxConnectionsPerIpOverrides).map { case (k, v) => (k, v.toInt)}
getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, getString(KafkaConfig.MaxConnectionsPerIpOverridesProp)).map { case (k, v) => (k, v.toInt)}
val connectionsMaxIdleMs = getLong(KafkaConfig.ConnectionsMaxIdleMsProp)
val metricReporterClasses: java.util.List[MetricsReporter] = getMetricClasses(_metricReporterClasses)
/** ********* Log Configuration ***********/
val autoCreateTopicsEnable = getBoolean(KafkaConfig.AutoCreateTopicsEnableProp)
val numPartitions = getInt(KafkaConfig.NumPartitionsProp)
val logDirs = CoreUtils.parseCsvList( Option(getString(KafkaConfig.LogDirsProp)).getOrElse(getString(KafkaConfig.LogDirProp)))
val logSegmentBytes = getInt(KafkaConfig.LogSegmentBytesProp)
val logFlushIntervalMessages = getLong(KafkaConfig.LogFlushIntervalMessagesProp)
val logCleanerThreads = getInt(KafkaConfig.LogCleanerThreadsProp)
val numRecoveryThreadsPerDataDir = getInt(KafkaConfig.NumRecoveryThreadsPerDataDirProp)
val logFlushSchedulerIntervalMs = getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp)
val logFlushOffsetCheckpointIntervalMs = getInt(KafkaConfig.LogFlushOffsetCheckpointIntervalMsProp).toLong
val logCleanupIntervalMs = getLong(KafkaConfig.LogCleanupIntervalMsProp)
val logCleanupPolicy = getString(KafkaConfig.LogCleanupPolicyProp)
val offsetsRetentionMinutes = getInt(KafkaConfig.OffsetsRetentionMinutesProp)
val offsetsRetentionCheckIntervalMs = getLong(KafkaConfig.OffsetsRetentionCheckIntervalMsProp)
val logRetentionBytes = getLong(KafkaConfig.LogRetentionBytesProp)
val logCleanerDedupeBufferSize = getLong(KafkaConfig.LogCleanerDedupeBufferSizeProp)
val logCleanerDedupeBufferLoadFactor = getDouble(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp)
val logCleanerIoBufferSize = getInt(KafkaConfig.LogCleanerIoBufferSizeProp)
val logCleanerIoMaxBytesPerSecond = getDouble(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp)
val logCleanerDeleteRetentionMs = getLong(KafkaConfig.LogCleanerDeleteRetentionMsProp)
val logCleanerBackoffMs = getLong(KafkaConfig.LogCleanerBackoffMsProp)
val logCleanerMinCleanRatio = getDouble(KafkaConfig.LogCleanerMinCleanRatioProp)
val logCleanerEnable = getBoolean(KafkaConfig.LogCleanerEnableProp)
val logIndexSizeMaxBytes = getInt(KafkaConfig.LogIndexSizeMaxBytesProp)
val logIndexIntervalBytes = getInt(KafkaConfig.LogIndexIntervalBytesProp)
val logDeleteDelayMs = getLong(KafkaConfig.LogDeleteDelayMsProp)
val logRollTimeMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeHoursProp))
val logRollTimeJitterMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeJitterMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeJitterHoursProp))
val logFlushIntervalMs: java.lang.Long = Option(getLong(KafkaConfig.LogFlushIntervalMsProp)).getOrElse(getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp))
val minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp)
/** ********* Replication configuration ***********/
val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
val defaultReplicationFactor: Int = getInt(KafkaConfig.DefaultReplicationFactorProp)
val replicaLagTimeMaxMs = getLong(KafkaConfig.ReplicaLagTimeMaxMsProp)
val replicaSocketTimeoutMs = getInt(KafkaConfig.ReplicaSocketTimeoutMsProp)
val replicaSocketReceiveBufferBytes = getInt(KafkaConfig.ReplicaSocketReceiveBufferBytesProp)
val replicaFetchMaxBytes = getInt(KafkaConfig.ReplicaFetchMaxBytesProp)
val replicaFetchWaitMaxMs = getInt(KafkaConfig.ReplicaFetchWaitMaxMsProp)
val replicaFetchMinBytes = getInt(KafkaConfig.ReplicaFetchMinBytesProp)
val replicaFetchBackoffMs = getInt(KafkaConfig.ReplicaFetchBackoffMsProp)
val numReplicaFetchers = getInt(KafkaConfig.NumReplicaFetchersProp)
val replicaHighWatermarkCheckpointIntervalMs = getLong(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp)
val fetchPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp)
val producerPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp)
val autoLeaderRebalanceEnable = getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp)
val leaderImbalancePerBrokerPercentage = getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp)
val leaderImbalanceCheckIntervalSeconds = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
val uncleanLeaderElectionEnable = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
val interBrokerSecurityProtocol = SecurityProtocol.valueOf(getString(KafkaConfig.InterBrokerSecurityProtocolProp))
val interBrokerProtocolVersion = ApiVersion(getString(KafkaConfig.InterBrokerProtocolVersionProp))
/** ********* Controlled shutdown configuration ***********/
val controlledShutdownMaxRetries = getInt(KafkaConfig.ControlledShutdownMaxRetriesProp)
val controlledShutdownRetryBackoffMs = getLong(KafkaConfig.ControlledShutdownRetryBackoffMsProp)
val controlledShutdownEnable = getBoolean(KafkaConfig.ControlledShutdownEnableProp)
/** ********* Consumer coordinator configuration ***********/
val consumerMinSessionTimeoutMs = getInt(KafkaConfig.ConsumerMinSessionTimeoutMsProp)
val consumerMaxSessionTimeoutMs = getInt(KafkaConfig.ConsumerMaxSessionTimeoutMsProp)
/** ********* Offset management configuration ***********/
val offsetMetadataMaxSize = getInt(KafkaConfig.OffsetMetadataMaxSizeProp)
val offsetsLoadBufferSize = getInt(KafkaConfig.OffsetsLoadBufferSizeProp)
val offsetsTopicReplicationFactor = getShort(KafkaConfig.OffsetsTopicReplicationFactorProp)
val offsetsTopicPartitions = getInt(KafkaConfig.OffsetsTopicPartitionsProp)
val offsetCommitTimeoutMs = getInt(KafkaConfig.OffsetCommitTimeoutMsProp)
val offsetCommitRequiredAcks = getShort(KafkaConfig.OffsetCommitRequiredAcksProp)
val offsetsTopicSegmentBytes = getInt(KafkaConfig.OffsetsTopicSegmentBytesProp)
val offsetsTopicCompressionCodec = Option(getInt(KafkaConfig.OffsetsTopicCompressionCodecProp)).map(value => CompressionCodec.getCompressionCodec(value)).orNull
/** ********* Metric Configuration **************/
val metricNumSamples = getInt(KafkaConfig.MetricNumSamplesProp)
val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp)
val metricReporterClasses: java.util.List[MetricsReporter] = getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter])
val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp)
val compressionType = getString(KafkaConfig.CompressionTypeProp)
val listeners = getListeners
val advertisedListeners = getAdvertisedListeners
val logRetentionTimeMillis = getLogRetentionTimeMillis
private def getLogRetentionTimeMillis: Long = {
val millisInMinute = 60L * 1000L
val millisInHour = 60L * millisInMinute
val millis = {
_logRetentionTimeMillis.getOrElse(
_logRetentionTimeMins match {
case Some(mins) => millisInMinute * mins
case None => millisInHour * logRetentionTimeHours
}
)
}
if (millis < 0) return -1
millis
val millis: java.lang.Long =
Option(getLong(KafkaConfig.LogRetentionTimeMillisProp)).getOrElse(
Option(getInt(KafkaConfig.LogRetentionTimeMinutesProp)) match {
case Some(mins) => millisInMinute * mins
case None => getInt(KafkaConfig.LogRetentionTimeHoursProp) * millisInHour
})
if (millis < 0) return -1
millis
}
private def getMap(propName: String, propValue: String): Map[String, String] = {
@ -855,9 +704,9 @@ class KafkaConfig (/** ********* Zookeeper Configuration ***********/
// If the user did not define listeners but did define host or port, let's use them in backward compatible way
// If none of those are defined, we default to PLAINTEXT://:9092
private def getListeners(): immutable.Map[SecurityProtocol, EndPoint] = {
if (_listeners.isDefined) {
validateUniquePortAndProtocol(_listeners.get)
CoreUtils.listenerListToEndPoints(_listeners.get)
if (getString(KafkaConfig.ListenersProp) != null) {
validateUniquePortAndProtocol(getString(KafkaConfig.ListenersProp))
CoreUtils.listenerListToEndPoints(getString(KafkaConfig.ListenersProp))
} else {
CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port)
}
@ -867,11 +716,12 @@ class KafkaConfig (/** ********* Zookeeper Configuration ***********/
// If he didn't but did define advertised host or port, we'll use those and fill in the missing value from regular host / port or defaults
// If none of these are defined, we'll use the listeners
private def getAdvertisedListeners(): immutable.Map[SecurityProtocol, EndPoint] = {
if (_advertisedListeners.isDefined) {
validateUniquePortAndProtocol(_advertisedListeners.get)
CoreUtils.listenerListToEndPoints(_advertisedListeners.get)
} else if (_advertisedHostName.isDefined || _advertisedPort.isDefined ) {
CoreUtils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName + ":" + advertisedPort)
if (getString(KafkaConfig.AdvertisedListenersProp) != null) {
validateUniquePortAndProtocol(getString(KafkaConfig.AdvertisedListenersProp))
CoreUtils.listenerListToEndPoints(getString(KafkaConfig.AdvertisedListenersProp))
} else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) {
CoreUtils.listenerListToEndPoints("PLAINTEXT://" +
getString(KafkaConfig.AdvertisedHostNameProp) + ":" + getInt(KafkaConfig.AdvertisedPortProp))
} else {
getListeners()
}
@ -886,7 +736,7 @@ class KafkaConfig (/** ********* Zookeeper Configuration ***********/
val reporterName = iterator.next()
if (!reporterName.isEmpty) {
val reporter: MetricsReporter = CoreUtils.createObject[MetricsReporter](reporterName)
reporter.configure(toProps.asInstanceOf[java.util.Map[String, _]])
reporter.configure(originals)
reporterList.add(reporter)
}
}
@ -895,19 +745,13 @@ class KafkaConfig (/** ********* Zookeeper Configuration ***********/
}
validateValues()
private def validateValues() {
require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be equal or greater than -1 and not greater than reserved.broker.max.id")
require(logRollTimeMillis >= 1, "log.roll.ms must be equal or greater than 1")
require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be equal or greater than 0")
require(_logRetentionTimeMins.forall(_ >= 1)|| _logRetentionTimeMins.forall(_ .equals(-1)), "log.retention.minutes must be unlimited (-1) or, equal or greater than 1")
require(logRetentionTimeHours >= 1 || logRetentionTimeHours == -1, "log.retention.hours must be unlimited (-1) or, equal or greater than 1")
require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, equal or greater than 1")
require(logDirs.size > 0)
require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.")
require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" +
@ -920,127 +764,4 @@ class KafkaConfig (/** ********* Zookeeper Configuration ***********/
require(BrokerCompressionCodec.isValid(compressionType), "compression.type : " + compressionType + " is not valid." +
" Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
}
def toProps: Properties = {
val props = new Properties()
import kafka.server.KafkaConfig._
/** ********* Zookeeper Configuration ***********/
props.put(ZkConnectProp, zkConnect)
props.put(ZkSessionTimeoutMsProp, zkSessionTimeoutMs.toString)
_zkConnectionTimeoutMs.foreach(value => props.put(ZkConnectionTimeoutMsProp, value.toString))
props.put(ZkSyncTimeMsProp, zkSyncTimeMs.toString)
/** ********* General Configuration ***********/
props.put(MaxReservedBrokerIdProp, maxReservedBrokerId.toString)
props.put(BrokerIdProp, brokerId.toString)
props.put(MessageMaxBytesProp, messageMaxBytes.toString)
props.put(NumNetworkThreadsProp, numNetworkThreads.toString)
props.put(NumIoThreadsProp, numIoThreads.toString)
props.put(BackgroundThreadsProp, backgroundThreads.toString)
props.put(QueuedMaxRequestsProp, queuedMaxRequests.toString)
/** ********* Socket Server Configuration ***********/
props.put(PortProp, port.toString)
props.put(HostNameProp, hostName)
_listeners.foreach(props.put(ListenersProp, _))
_advertisedHostName.foreach(props.put(AdvertisedHostNameProp, _))
_advertisedPort.foreach(value => props.put(AdvertisedPortProp, value.toString))
_advertisedListeners.foreach(props.put(AdvertisedListenersProp, _))
props.put(SocketSendBufferBytesProp, socketSendBufferBytes.toString)
props.put(SocketReceiveBufferBytesProp, socketReceiveBufferBytes.toString)
props.put(SocketRequestMaxBytesProp, socketRequestMaxBytes.toString)
props.put(MaxConnectionsPerIpProp, maxConnectionsPerIp.toString)
props.put(MaxConnectionsPerIpOverridesProp, _maxConnectionsPerIpOverrides)
props.put(ConnectionsMaxIdleMsProp, connectionsMaxIdleMs.toString)
/** ********* Log Configuration ***********/
props.put(NumPartitionsProp, numPartitions.toString)
props.put(LogDirProp, _logDir)
_logDirs.foreach(value => props.put(LogDirsProp, value))
props.put(LogSegmentBytesProp, logSegmentBytes.toString)
props.put(LogRollTimeHoursProp, logRollTimeHours.toString)
_logRollTimeMillis.foreach(v => props.put(LogRollTimeMillisProp, v.toString))
props.put(LogRollTimeJitterHoursProp, logRollTimeJitterHours.toString)
_logRollTimeJitterMillis.foreach(v => props.put(LogRollTimeJitterMillisProp, v.toString))
props.put(LogRetentionTimeHoursProp, logRetentionTimeHours.toString)
_logRetentionTimeMins.foreach(v => props.put(LogRetentionTimeMinutesProp, v.toString))
_logRetentionTimeMillis.foreach(v => props.put(LogRetentionTimeMillisProp, v.toString))
props.put(LogRetentionBytesProp, logRetentionBytes.toString)
props.put(LogCleanupIntervalMsProp, logCleanupIntervalMs.toString)
props.put(LogCleanupPolicyProp, logCleanupPolicy)
props.put(LogCleanerThreadsProp, logCleanerThreads.toString)
props.put(LogCleanerIoMaxBytesPerSecondProp, logCleanerIoMaxBytesPerSecond.toString)
props.put(LogCleanerDedupeBufferSizeProp, logCleanerDedupeBufferSize.toString)
props.put(LogCleanerIoBufferSizeProp, logCleanerIoBufferSize.toString)
props.put(LogCleanerDedupeBufferLoadFactorProp, logCleanerDedupeBufferLoadFactor.toString)
props.put(LogCleanerBackoffMsProp, logCleanerBackoffMs.toString)
props.put(LogCleanerMinCleanRatioProp, logCleanerMinCleanRatio.toString)
props.put(LogCleanerEnableProp, logCleanerEnable.toString)
props.put(LogCleanerDeleteRetentionMsProp, logCleanerDeleteRetentionMs.toString)
props.put(LogIndexSizeMaxBytesProp, logIndexSizeMaxBytes.toString)
props.put(LogIndexIntervalBytesProp, logIndexIntervalBytes.toString)
props.put(LogFlushIntervalMessagesProp, logFlushIntervalMessages.toString)
props.put(LogDeleteDelayMsProp, logDeleteDelayMs.toString)
props.put(LogFlushSchedulerIntervalMsProp, logFlushSchedulerIntervalMs.toString)
_logFlushIntervalMs.foreach(v => props.put(LogFlushIntervalMsProp, v.toString))
props.put(LogFlushOffsetCheckpointIntervalMsProp, logFlushOffsetCheckpointIntervalMs.toString)
props.put(NumRecoveryThreadsPerDataDirProp, numRecoveryThreadsPerDataDir.toString)
props.put(AutoCreateTopicsEnableProp, autoCreateTopicsEnable.toString)
props.put(MinInSyncReplicasProp, minInSyncReplicas.toString)
/** ********* Replication configuration ***********/
props.put(ControllerSocketTimeoutMsProp, controllerSocketTimeoutMs.toString)
props.put(DefaultReplicationFactorProp, defaultReplicationFactor.toString)
props.put(ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
props.put(ReplicaSocketTimeoutMsProp, replicaSocketTimeoutMs.toString)
props.put(ReplicaSocketReceiveBufferBytesProp, replicaSocketReceiveBufferBytes.toString)
props.put(ReplicaFetchMaxBytesProp, replicaFetchMaxBytes.toString)
props.put(ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString)
props.put(ReplicaFetchMinBytesProp, replicaFetchMinBytes.toString)
props.put(ReplicaFetchBackoffMsProp, replicaFetchBackoffMs.toString)
props.put(NumReplicaFetchersProp, numReplicaFetchers.toString)
props.put(ReplicaHighWatermarkCheckpointIntervalMsProp, replicaHighWatermarkCheckpointIntervalMs.toString)
props.put(FetchPurgatoryPurgeIntervalRequestsProp, fetchPurgatoryPurgeIntervalRequests.toString)
props.put(ProducerPurgatoryPurgeIntervalRequestsProp, producerPurgatoryPurgeIntervalRequests.toString)
props.put(AutoLeaderRebalanceEnableProp, autoLeaderRebalanceEnable.toString)
props.put(LeaderImbalancePerBrokerPercentageProp, leaderImbalancePerBrokerPercentage.toString)
props.put(LeaderImbalanceCheckIntervalSecondsProp, leaderImbalanceCheckIntervalSeconds.toString)
props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString)
props.put(InterBrokerSecurityProtocolProp, interBrokerSecurityProtocol.toString)
props.put(InterBrokerProtocolVersionProp, interBrokerProtocolVersion.toString)
/** ********* Controlled shutdown configuration ***********/
props.put(ControlledShutdownMaxRetriesProp, controlledShutdownMaxRetries.toString)
props.put(ControlledShutdownRetryBackoffMsProp, controlledShutdownRetryBackoffMs.toString)
props.put(ControlledShutdownEnableProp, controlledShutdownEnable.toString)
/** ********* Consumer coordinator configuration ***********/
props.put(ConsumerMinSessionTimeoutMsProp, consumerMinSessionTimeoutMs.toString)
props.put(ConsumerMaxSessionTimeoutMsProp, consumerMaxSessionTimeoutMs.toString)
/** ********* Offset management configuration ***********/
props.put(OffsetMetadataMaxSizeProp, offsetMetadataMaxSize.toString)
props.put(OffsetsLoadBufferSizeProp, offsetsLoadBufferSize.toString)
props.put(OffsetsTopicReplicationFactorProp, offsetsTopicReplicationFactor.toString)
props.put(OffsetsTopicPartitionsProp, offsetsTopicPartitions.toString)
props.put(OffsetsTopicSegmentBytesProp, offsetsTopicSegmentBytes.toString)
props.put(OffsetsTopicCompressionCodecProp, offsetsTopicCompressionCodec.codec.toString)
props.put(OffsetsRetentionMinutesProp, offsetsRetentionMinutes.toString)
props.put(OffsetsRetentionCheckIntervalMsProp, offsetsRetentionCheckIntervalMs.toString)
props.put(OffsetCommitTimeoutMsProp, offsetCommitTimeoutMs.toString)
props.put(OffsetCommitRequiredAcksProp, offsetCommitRequiredAcks.toString)
props.put(DeleteTopicEnableProp, deleteTopicEnable.toString)
props.put(CompressionTypeProp, compressionType.toString)
props.put(MetricNumSamplesProp, metricNumSamples.toString)
props.put(MetricSampleWindowMsProp, metricSampleWindowMs.toString)
props.put(MetricReporterClassesProp, JavaConversions.collectionAsScalaIterable(_metricReporterClasses).mkString(","))
props
}
}

View File

@ -17,6 +17,9 @@
package kafka.server
import java.util
import java.util.Properties
import kafka.admin._
import kafka.log.LogConfig
import kafka.log.CleanerConfig
@ -388,23 +391,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
def boundPort(): Int = socketServer.boundPort()
private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = {
val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes,
segmentMs = config.logRollTimeMillis,
segmentJitterMs = config.logRollTimeJitterMillis,
flushInterval = config.logFlushIntervalMessages,
flushMs = config.logFlushIntervalMs.toLong,
retentionSize = config.logRetentionBytes,
retentionMs = config.logRetentionTimeMillis,
maxMessageSize = config.messageMaxBytes,
maxIndexSize = config.logIndexSizeMaxBytes,
indexInterval = config.logIndexIntervalBytes,
deleteRetentionMs = config.logCleanerDeleteRetentionMs,
fileDeleteDelayMs = config.logDeleteDelayMs,
minCleanableRatio = config.logCleanerMinCleanRatio,
compact = config.logCleanupPolicy.trim.toLowerCase == "compact",
minInSyncReplicas = config.minInSyncReplicas,
compressionType = config.compressionType)
val defaultProps = defaultLogConfig.toProps
val defaultProps = copyKafkaConfigToLog(config.originals)
val defaultLogConfig = LogConfig(defaultProps)
val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _))
// read the log configurations from zookeeper
val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
@ -428,6 +417,38 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
time = time)
}
// Copy the subset of properties that are relevant to Logs
// I'm listing out individual properties here since the names are slightly different in each Config class...
private def copyKafkaConfigToLog(serverProps: java.util.Map[String, Object]): java.util.Map[String, Object] = {
val logProps = new util.HashMap[String, Object]()
val entryset = serverProps.entrySet.iterator
while (entryset.hasNext) {
val entry = entryset.next
entry.getKey match {
case KafkaConfig.LogSegmentBytesProp => logProps.put(LogConfig.SegmentBytesProp, entry.getValue)
case KafkaConfig.LogRollTimeMillisProp => logProps.put(LogConfig.SegmentMsProp, entry.getValue)
case KafkaConfig.LogRollTimeJitterMillisProp => logProps.put(LogConfig.SegmentJitterMsProp, entry.getValue)
case KafkaConfig.LogIndexSizeMaxBytesProp => logProps.put(LogConfig.SegmentIndexBytesProp, entry.getValue)
case KafkaConfig.LogFlushIntervalMessagesProp => logProps.put(LogConfig.FlushMessagesProp, entry.getValue)
case KafkaConfig.LogFlushIntervalMsProp => logProps.put(LogConfig.FlushMsProp, entry.getValue)
case KafkaConfig.LogRetentionBytesProp => logProps.put(LogConfig.RetentionBytesProp, entry.getValue)
case KafkaConfig.LogRetentionTimeMillisProp => logProps.put(LogConfig.RetentionMsProp, entry.getValue)
case KafkaConfig.MessageMaxBytesProp => logProps.put(LogConfig.MaxMessageBytesProp, entry.getValue)
case KafkaConfig.LogIndexIntervalBytesProp => logProps.put(LogConfig.IndexIntervalBytesProp, entry.getValue)
case KafkaConfig.LogCleanerDeleteRetentionMsProp => logProps.put(LogConfig.DeleteRetentionMsProp, entry.getValue)
case KafkaConfig.LogDeleteDelayMsProp => logProps.put(LogConfig.FileDeleteDelayMsProp, entry.getValue)
case KafkaConfig.LogCleanerMinCleanRatioProp => logProps.put(LogConfig.MinCleanableDirtyRatioProp, entry.getValue)
case KafkaConfig.LogCleanupPolicyProp => logProps.put(LogConfig.CleanupPolicyProp, entry.getValue)
case KafkaConfig.MinInSyncReplicasProp => logProps.put(LogConfig.MinInSyncReplicasProp, entry.getValue)
case KafkaConfig.CompressionTypeProp => logProps.put(LogConfig.CompressionTypeProp, entry.getValue)
case KafkaConfig.UncleanLeaderElectionEnableProp => logProps.put(LogConfig.UncleanLeaderElectionEnableProp, entry.getValue)
case _ => // we just leave those out
}
}
logProps
}
private def createOffsetManager(): OffsetManager = {
val offsetManagerConfig = OffsetManagerConfig(
maxMetadataSize = config.offsetMetadataMaxSize,

View File

@ -90,7 +90,7 @@ class ReplicaFetcherThread(name:String,
// Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
// This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
// we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
if (!LogConfig.fromProps(brokerConfig.toProps, AdminUtils.fetchTopicConfig(replicaMgr.zkClient,
if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchTopicConfig(replicaMgr.zkClient,
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
// Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur.
fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) +
@ -120,6 +120,6 @@ class ReplicaFetcherThread(name:String,
// any logic for partitions whose leader has changed
def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {
delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs)
delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong)
}
}

View File

@ -101,9 +101,10 @@ class TopicConfigManager(private val zkClient: ZkClient,
val topic = json.substring(1, json.length - 1) // hacky way to dequote
if (logsByTopic.contains(topic)) {
/* combine the default properties with the overrides in zk to create the new LogConfig */
val props = new Properties(logManager.defaultConfig.toProps)
val props = new Properties()
props.putAll(logManager.defaultConfig.originals)
props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic))
val logConfig = LogConfig.fromProps(props)
val logConfig = LogConfig(props)
for (log <- logsByTopic(topic))
log.config = logConfig
info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props))

View File

@ -253,32 +253,6 @@ object CoreUtils extends Logging {
s.substring(0, s.length - oldSuffix.length) + newSuffix
}
/**
* Turn {@linkplain java.util.Properties} with default values into a {@linkplain java.util.Map}. Following example
* illustrates difference from the cast
* <pre>
* val defaults = new Properties()
* defaults.put("foo", "bar")
* val props = new Properties(defaults)
*
* props.getProperty("foo") // "bar"
* props.get("foo") // null
* evaluateDefaults(props).get("foo") // "bar"
* </pre>
*
* @param props properties to evaluate
* @return new java.util.Map instance
*/
def evaluateDefaults(props: Properties): java.util.Map[String, String] = {
import java.util._
import JavaConversions.asScalaSet
val evaluated = new HashMap[String, String]()
for (name <- props.stringPropertyNames()) {
evaluated.put(name, props.getProperty(name))
}
evaluated
}
/**
* Read a big-endian integer from a byte array
*/

View File

@ -17,6 +17,7 @@
package kafka
import java.util.Properties
import java.util.concurrent.atomic._
import kafka.common._
import kafka.message._
@ -33,10 +34,13 @@ object StressTestLog {
def main(args: Array[String]) {
val dir = TestUtils.tempDir()
val time = new MockTime
val logProprties = new Properties()
logProprties.put(LogConfig.SegmentBytesProp, 64*1024*1024: java.lang.Integer)
logProprties.put(LogConfig.MaxMessageBytesProp, Int.MaxValue: java.lang.Integer)
logProprties.put(LogConfig.SegmentIndexBytesProp, 1024*1024: java.lang.Integer)
val log = new Log(dir = dir,
config = LogConfig(segmentSize = 64*1024*1024,
maxMessageSize = Int.MaxValue,
maxIndexSize = 1024*1024),
config = LogConfig(logProprties),
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time)

View File

@ -20,7 +20,7 @@ package kafka
import java.io._
import java.nio._
import java.nio.channels._
import java.util.Random
import java.util.{Properties, Random}
import kafka.log._
import kafka.utils._
import kafka.message._
@ -110,7 +110,10 @@ object TestLinearWriteSpeed {
writables(i) = new ChannelWritable(new File(dir, "kafka-test-" + i + ".dat"), buffer)
} else if(options.has(logOpt)) {
val segmentSize = rand.nextInt(512)*1024*1024 + 64*1024*1024 // vary size to avoid herd effect
writables(i) = new LogWritable(new File(dir, "kafka-test-" + i), new LogConfig(segmentSize=segmentSize, flushInterval = flushInterval), scheduler, messageSet)
val logProperties = new Properties()
logProperties.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
logProperties.put(LogConfig.FlushMessagesProp, flushInterval: java.lang.Long)
writables(i) = new LogWritable(new File(dir, "kafka-test-" + i), new LogConfig(logProperties), scheduler, messageSet)
} else {
System.err.println("Must specify what to write to with one of --log, --channel, or --mmap")
System.exit(1)

View File

@ -26,7 +26,7 @@ import org.junit.Assert._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters
import java.util.{ Collection, ArrayList }
import java.util.{Properties, Collection, ArrayList}
import kafka.server.KafkaConfig
import org.apache.kafka.common.record.CompressionType
import scala.collection.JavaConversions._
@ -54,9 +54,10 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
@Test
def testBrokerSideCompression() {
val messageCompressionCode = CompressionCodec.getCompressionCodec(messageCompression)
val logProps = new Properties()
logProps.put(LogConfig.CompressionTypeProp,brokerCompression)
/*configure broker-side compression */
val log = new Log(logDir, logConfig.copy(compressionType = brokerCompression), recoveryPoint = 0L, time.scheduler, time = time)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
/* append two messages */
log.append(new ByteBufferMessageSet(messageCompressionCode, new Message("hello".getBytes), new Message("there".getBytes)))

View File

@ -17,6 +17,8 @@
package kafka.log
import java.util.Properties
import junit.framework.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Test}
@ -35,7 +37,11 @@ import org.apache.kafka.common.utils.Utils
class CleanerTest extends JUnitSuite {
val dir = TestUtils.tempDir()
val logConfig = LogConfig(segmentSize=1024, maxIndexSize=1024, compact=true)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 1024: java.lang.Integer)
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
val logConfig = LogConfig(logProps)
val time = new MockTime()
val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time)
@ -50,8 +56,11 @@ class CleanerTest extends JUnitSuite {
@Test
def testCleanSegments() {
val cleaner = makeCleaner(Int.MaxValue)
val log = makeLog(config = logConfig.copy(segmentSize = 1024))
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// append messages to the log until we have four segments
while(log.numberOfSegments < 4)
log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
@ -72,7 +81,10 @@ class CleanerTest extends JUnitSuite {
@Test
def testCleaningWithDeletes() {
val cleaner = makeCleaner(Int.MaxValue)
val log = makeLog(config = logConfig.copy(segmentSize = 1024))
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// append messages with the keys 0 through N
while(log.numberOfSegments < 2)
@ -98,7 +110,11 @@ class CleanerTest extends JUnitSuite {
val cleaner = makeCleaner(Int.MaxValue)
// create a log with compaction turned off so we can append unkeyed messages
val log = makeLog(config = logConfig.copy(segmentSize = 1024, compact = false))
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Delete)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// append unkeyed messages
while(log.numberOfSegments < 2)
@ -114,7 +130,9 @@ class CleanerTest extends JUnitSuite {
val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages
// turn on compaction and compact the log
val compactedLog = makeLog(config = logConfig.copy(segmentSize = 1024))
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
val compactedLog = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0))
assertEquals("Log should only contain keyed messages after cleaning.", 0, unkeyedMessageCountInLog(log))
@ -139,7 +157,10 @@ class CleanerTest extends JUnitSuite {
@Test
def testCleanSegmentsWithAbort() {
val cleaner = makeCleaner(Int.MaxValue, abortCheckDone)
val log = makeLog(config = logConfig.copy(segmentSize = 1024))
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// append messages to the log until we have four segments
while(log.numberOfSegments < 4)
@ -159,7 +180,11 @@ class CleanerTest extends JUnitSuite {
@Test
def testSegmentGrouping() {
val cleaner = makeCleaner(Int.MaxValue)
val log = makeLog(config = logConfig.copy(segmentSize = 300, indexInterval = 1))
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// append some messages to the log
var i = 0
@ -208,7 +233,12 @@ class CleanerTest extends JUnitSuite {
@Test
def testSegmentGroupingWithSparseOffsets() {
val cleaner = makeCleaner(Int.MaxValue)
val log = makeLog(config = logConfig.copy(segmentSize = 1024, indexInterval = 1))
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// fill up first segment
while (log.numberOfSegments == 1)
@ -288,7 +318,12 @@ class CleanerTest extends JUnitSuite {
@Test
def testRecoveryAfterCrash() {
val cleaner = makeCleaner(Int.MaxValue)
val config = logConfig.copy(segmentSize = 300, indexInterval = 1, fileDeleteDelayMs = 10)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
logProps.put(LogConfig.FileDeleteDelayMsProp, 10: java.lang.Integer)
val config = LogConfig.fromProps(logConfig.originals, logProps)
def recoverAndCheck(config: LogConfig, expectedKeys : Iterable[Int]) : Log = {
// Recover log file and check that after recovery, keys are as expected

View File

@ -18,6 +18,7 @@
package kafka.log
import java.io.File
import java.util.Properties
import kafka.common.TopicAndPartition
import kafka.message._
@ -127,8 +128,13 @@ class LogCleanerIntegrationTest(compressionCodec: String) extends JUnit3Suite {
for(i <- 0 until parts) {
val dir = new File(logDir, "log-" + i)
dir.mkdirs()
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 100*1024: java.lang.Integer)
logProps.put(LogConfig.FileDeleteDelayMsProp, deleteDelay: java.lang.Integer)
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
val log = new Log(dir = dir,
LogConfig(segmentSize = segmentSize, maxIndexSize = 100*1024, fileDeleteDelayMs = deleteDelay, compact = true),
LogConfig(logProps),
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time)

View File

@ -25,23 +25,10 @@ import org.scalatest.junit.JUnit3Suite
class LogConfigTest extends JUnit3Suite {
@Test
def testFromPropsDefaults() {
val defaults = new Properties()
defaults.put(LogConfig.SegmentBytesProp, "4242")
val props = new Properties(defaults)
val config = LogConfig.fromProps(props)
Assert.assertEquals(4242, config.segmentSize)
Assert.assertEquals("LogConfig defaults should be retained", Defaults.MaxMessageSize, config.maxMessageSize)
Assert.assertEquals("producer", config.compressionType)
}
@Test
def testFromPropsEmpty() {
val p = new Properties()
val config = LogConfig.fromProps(p)
val config = LogConfig(p)
Assert.assertEquals(LogConfig(), config)
}
@ -62,7 +49,7 @@ class LogConfigTest extends JUnit3Suite {
}
})
val actual = LogConfig.fromProps(expected).toProps
val actual = LogConfig(expected).originals
Assert.assertEquals(expected, actual)
}
@ -86,7 +73,7 @@ class LogConfigTest extends JUnit3Suite {
val props = new Properties
props.setProperty(name, value.toString)
intercept[ConfigException] {
LogConfig.fromProps(props)
LogConfig(props)
}
})
}

View File

@ -18,6 +18,7 @@
package kafka.log
import java.io._
import java.util.Properties
import junit.framework.Assert._
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
@ -30,7 +31,11 @@ class LogManagerTest extends JUnit3Suite {
val time: MockTime = new MockTime()
val maxRollInterval = 100
val maxLogAgeMs = 10*60*60*1000
val logConfig = LogConfig(segmentSize = 1024, maxIndexSize = 4096, retentionMs = maxLogAgeMs)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 4096: java.lang.Integer)
logProps.put(LogConfig.RetentionMsProp, maxLogAgeMs: java.lang.Integer)
val logConfig = LogConfig(logProps)
var logDir: File = null
var logManager: LogManager = null
val name = "kafka"
@ -113,8 +118,11 @@ class LogManagerTest extends JUnit3Suite {
def testCleanupSegmentsToMaintainSize() {
val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes
logManager.shutdown()
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 10 * setSize: java.lang.Integer)
logProps.put(LogConfig.RetentionBytesProp, 5L * 10L * setSize + 10L: java.lang.Long)
val config = LogConfig.fromProps(logConfig.originals, logProps)
val config = logConfig.copy(segmentSize = 10 * setSize, retentionSize = 5L * 10L * setSize + 10L)
logManager = createLogManager()
logManager.startup
@ -154,7 +162,10 @@ class LogManagerTest extends JUnit3Suite {
@Test
def testTimeBasedFlush() {
logManager.shutdown()
val config = logConfig.copy(flushMs = 1000)
val logProps = new Properties()
logProps.put(LogConfig.FlushMsProp, 1000: java.lang.Integer)
val config = LogConfig.fromProps(logConfig.originals, logProps)
logManager = createLogManager()
logManager.startup
val log = logManager.createLog(TopicAndPartition(name, 0), config)

View File

@ -18,6 +18,7 @@
package kafka.log
import java.io._
import java.util.Properties
import java.util.concurrent.atomic._
import junit.framework.Assert._
import org.scalatest.junit.JUnitSuite
@ -61,9 +62,12 @@ class LogTest extends JUnitSuite {
def testTimeBasedLogRoll() {
val set = TestUtils.singleMessageSet("test".getBytes())
val logProps = new Properties()
logProps.put(LogConfig.SegmentMsProp, 1 * 60 * 60: java.lang.Long)
// create a log
val log = new Log(logDir,
logConfig.copy(segmentMs = 1 * 60 * 60L),
LogConfig(logProps),
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time)
@ -96,9 +100,12 @@ class LogTest extends JUnitSuite {
val set = TestUtils.singleMessageSet("test".getBytes())
val maxJitter = 20 * 60L
val logProps = new Properties()
logProps.put(LogConfig.SegmentMsProp, 1 * 60 * 60: java.lang.Long)
logProps.put(LogConfig.SegmentJitterMsProp, maxJitter: java.lang.Long)
// create a log
val log = new Log(logDir,
logConfig.copy(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter),
LogConfig(logProps),
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time)
@ -123,8 +130,10 @@ class LogTest extends JUnitSuite {
val msgPerSeg = 10
val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
// create a log
val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), recoveryPoint = 0L, time.scheduler, time = time)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
// segments expire in size
@ -149,7 +158,9 @@ class LogTest extends JUnitSuite {
*/
@Test
def testAppendAndReadWithSequentialOffsets() {
val log = new Log(logDir, logConfig.copy(segmentSize = 71), recoveryPoint = 0L, time.scheduler, time = time)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray
for(i <- 0 until messages.length)
@ -168,7 +179,9 @@ class LogTest extends JUnitSuite {
*/
@Test
def testAppendAndReadWithNonSequentialOffsets() {
val log = new Log(logDir, logConfig.copy(segmentSize = 71), recoveryPoint = 0L, time.scheduler, time = time)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
val messages = messageIds.map(id => new Message(id.toString.getBytes))
@ -191,7 +204,9 @@ class LogTest extends JUnitSuite {
*/
@Test
def testReadAtLogGap() {
val log = new Log(logDir, logConfig.copy(segmentSize = 300), recoveryPoint = 0L, time.scheduler, time = time)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
// keep appending until we have two segments with only a single message in the second segment
while(log.numberOfSegments == 1)
@ -211,7 +226,9 @@ class LogTest extends JUnitSuite {
@Test
def testReadOutOfRange() {
createEmptyLogs(logDir, 1024)
val log = new Log(logDir, logConfig.copy(segmentSize = 1024), recoveryPoint = 0L, time.scheduler, time = time)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).messageSet.sizeInBytes)
try {
log.read(0, 1024)
@ -234,7 +251,9 @@ class LogTest extends JUnitSuite {
@Test
def testLogRolls() {
/* create a multipart log with 100 messages */
val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
val numMessages = 100
val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes))
messageSets.foreach(log.append(_))
@ -263,7 +282,9 @@ class LogTest extends JUnitSuite {
@Test
def testCompressedMessages() {
/* this log should roll after every messageset */
val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
/* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
@ -286,7 +307,9 @@ class LogTest extends JUnitSuite {
for(messagesToAppend <- List(0, 1, 25)) {
logDir.mkdirs()
// first test a log segment starting at 0
val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
for(i <- 0 until messagesToAppend)
log.append(TestUtils.singleMessageSet(i.toString.getBytes))
@ -318,7 +341,9 @@ class LogTest extends JUnitSuite {
val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes), new Message("bethe".getBytes))
// append messages to log
val configSegmentSize = messageSet.sizeInBytes - 1
val log = new Log(logDir, logConfig.copy(segmentSize = configSegmentSize), recoveryPoint = 0L, time.scheduler, time = time)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, configSegmentSize: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
try {
log.append(messageSet)
@ -342,7 +367,10 @@ class LogTest extends JUnitSuite {
val messageSetWithKeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage)
val messageSetWithKeyedMessages = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage, anotherKeyedMessage)
val log = new Log(logDir, logConfig.copy(compact = true), recoveryPoint = 0L, time.scheduler, time)
val logProps = new Properties()
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time)
try {
log.append(messageSetWithUnkeyedMessage)
@ -380,7 +408,9 @@ class LogTest extends JUnitSuite {
// append messages to log
val maxMessageSize = second.sizeInBytes - 1
val log = new Log(logDir, logConfig.copy(maxMessageSize = maxMessageSize), recoveryPoint = 0L, time.scheduler, time = time)
val logProps = new Properties()
logProps.put(LogConfig.MaxMessageBytesProp, maxMessageSize: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
// should be able to append the small message
log.append(first)
@ -401,7 +431,11 @@ class LogTest extends JUnitSuite {
val messageSize = 100
val segmentSize = 7 * messageSize
val indexInterval = 3 * messageSize
val config = logConfig.copy(segmentSize = segmentSize, indexInterval = indexInterval, maxIndexSize = 4096)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, indexInterval: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 4096: java.lang.Integer)
val config = LogConfig(logProps)
var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize)))
@ -432,7 +466,11 @@ class LogTest extends JUnitSuite {
def testIndexRebuild() {
// publish the messages and close the log
val numMessages = 200
val config = logConfig.copy(segmentSize = 200, indexInterval = 1)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 200: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
val config = LogConfig(logProps)
var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10)))
@ -460,8 +498,11 @@ class LogTest extends JUnitSuite {
val msgPerSeg = 10
val segmentSize = msgPerSeg * setSize // each segment will be 10 messages
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
// create a log
val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
for (i<- 1 to msgPerSeg)
@ -513,7 +554,9 @@ class LogTest extends JUnitSuite {
val setSize = set.sizeInBytes
val msgPerSeg = 10
val segmentSize = msgPerSeg * setSize // each segment will be 10 messages
val config = logConfig.copy(segmentSize = segmentSize)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
val config = LogConfig(logProps)
val log = new Log(logDir, config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
for (i<- 1 to msgPerSeg)
@ -540,10 +583,12 @@ class LogTest extends JUnitSuite {
val bogusIndex2 = Log.indexFilename(logDir, 5)
val set = TestUtils.singleMessageSet("test".getBytes())
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
val log = new Log(logDir,
logConfig.copy(segmentSize = set.sizeInBytes * 5,
maxIndexSize = 1000,
indexInterval = 1),
LogConfig(logProps),
recoveryPoint = 0L,
time.scheduler,
time)
@ -564,9 +609,11 @@ class LogTest extends JUnitSuite {
@Test
def testReopenThenTruncate() {
val set = TestUtils.singleMessageSet("test".getBytes())
val config = logConfig.copy(segmentSize = set.sizeInBytes * 5,
maxIndexSize = 1000,
indexInterval = 10000)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, 10000: java.lang.Integer)
val config = LogConfig(logProps)
// create a log
var log = new Log(logDir,
@ -596,10 +643,13 @@ class LogTest extends JUnitSuite {
def testAsyncDelete() {
val set = TestUtils.singleMessageSet("test".getBytes())
val asyncDeleteMs = 1000
val config = logConfig.copy(segmentSize = set.sizeInBytes * 5,
fileDeleteDelayMs = asyncDeleteMs,
maxIndexSize = 1000,
indexInterval = 10000)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, 10000: java.lang.Integer)
logProps.put(LogConfig.FileDeleteDelayMsProp, asyncDeleteMs: java.lang.Integer)
val config = LogConfig(logProps)
val log = new Log(logDir,
config,
recoveryPoint = 0L,
@ -634,7 +684,10 @@ class LogTest extends JUnitSuite {
@Test
def testOpenDeletesObsoleteFiles() {
val set = TestUtils.singleMessageSet("test".getBytes())
val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, maxIndexSize = 1000)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
val config = LogConfig(logProps)
var log = new Log(logDir,
config,
recoveryPoint = 0L,
@ -672,7 +725,11 @@ class LogTest extends JUnitSuite {
@Test
def testCorruptLog() {
// append some messages to create some segments
val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1000: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer)
val config = LogConfig(logProps)
val set = TestUtils.singleMessageSet("test".getBytes())
val recoveryPoint = 50L
for(iteration <- 0 until 50) {
@ -704,7 +761,11 @@ class LogTest extends JUnitSuite {
@Test
def testCleanShutdownFile() {
// append some messages to create some segments
val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1000: java.lang.Integer)
logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
val config = LogConfig(logProps)
val set = TestUtils.singleMessageSet("test".getBytes())
val parentLogDir = logDir.getParentFile
assertTrue("Data directory %s must exist", parentLogDir.isDirectory)

View File

@ -16,6 +16,8 @@
*/
package kafka.server
import java.util.Properties
import junit.framework.Assert._
import org.junit.Test
import kafka.integration.KafkaServerTestHarness
@ -30,16 +32,19 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness {
@Test
def testConfigChange() {
val oldVal = 100000
val newVal = 200000
val oldVal: java.lang.Long = 100000
val newVal: java.lang.Long = 200000
val tp = TopicAndPartition("test", 0)
AdminUtils.createTopic(zkClient, tp.topic, 1, 1, LogConfig(flushInterval = oldVal).toProps)
val logProps = new Properties()
logProps.put(LogConfig.FlushMessagesProp, oldVal.toString)
AdminUtils.createTopic(zkClient, tp.topic, 1, 1, logProps)
TestUtils.retry(10000) {
val logOpt = this.servers(0).logManager.getLog(tp)
assertTrue(logOpt.isDefined)
assertEquals(oldVal, logOpt.get.config.flushInterval)
}
AdminUtils.changeTopicConfig(zkClient, tp.topic, LogConfig(flushInterval = newVal).toProps)
logProps.put(LogConfig.FlushMessagesProp, newVal.toString)
AdminUtils.changeTopicConfig(zkClient, tp.topic, logProps)
TestUtils.retry(10000) {
assertEquals(newVal, this.servers(0).logManager.getLog(tp).get.config.flushInterval)
}
@ -49,7 +54,9 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness {
def testConfigChangeOnNonExistingTopic() {
val topic = TestUtils.tempTopic
try {
AdminUtils.changeTopicConfig(zkClient, topic, LogConfig(flushInterval = 10000).toProps)
val logProps = new Properties()
logProps.put(LogConfig.FlushMessagesProp, 10000: java.lang.Integer)
AdminUtils.changeTopicConfig(zkClient, topic, logProps)
fail("Should fail with AdminOperationException for topic doesn't exist")
} catch {
case e: AdminOperationException => // expected

View File

@ -29,22 +29,6 @@ import scala.util.Random._
class KafkaConfigConfigDefTest extends JUnit3Suite {
@Test
def testFromPropsDefaults() {
val defaults = new Properties()
defaults.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
// some ordinary setting
defaults.put(KafkaConfig.AdvertisedPortProp, "1818")
val props = new Properties(defaults)
val config = KafkaConfig.fromProps(props)
Assert.assertEquals(1818, config.advertisedPort)
Assert.assertEquals("KafkaConfig defaults should be retained", Defaults.ConnectionsMaxIdleMs, config.connectionsMaxIdleMs)
}
@Test
def testFromPropsEmpty() {
// only required
@ -52,7 +36,7 @@ class KafkaConfigConfigDefTest extends JUnit3Suite {
p.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
val actualConfig = KafkaConfig.fromProps(p)
val expectedConfig = new KafkaConfig(zkConnect = "127.0.0.1:2181")
val expectedConfig = new KafkaConfig(p)
Assert.assertEquals(expectedConfig.zkConnect, actualConfig.zkConnect)
Assert.assertEquals(expectedConfig.zkSessionTimeoutMs, actualConfig.zkSessionTimeoutMs)
@ -252,7 +236,7 @@ class KafkaConfigConfigDefTest extends JUnit3Suite {
}
})
val actual = KafkaConfig.fromProps(expected).toProps
val actual = KafkaConfig.fromProps(expected).originals
Assert.assertEquals(expected, actual)
}