diff --git a/config/consumer.properties b/config/consumer.properties index a067ac0a27a..1c43bf96311 100644 --- a/config/consumer.properties +++ b/config/consumer.properties @@ -23,7 +23,7 @@ zk.connect=127.0.0.1:2181 zk.connectiontimeout.ms=1000000 #consumer group id -groupid=test-consumer-group +group.id=test-consumer-group #consumer timeout #consumer.timeout.ms=5000 diff --git a/config/producer.properties b/config/producer.properties index eb366912078..a1c8cb21889 100644 --- a/config/producer.properties +++ b/config/producer.properties @@ -36,35 +36,18 @@ serializer.class=kafka.serializer.StringEncoder # allow topic level compression #compressed.topics= -# max message size; messages larger than that size are discarded; default is 1000000 -#max.message.size= - - ############################# Async Producer ############################# # maximum time, in milliseconds, for buffering data on the producer queue -#queue.time= +#queue.buffering.max.ms= # the maximum size of the blocking queue for buffering on the producer -#queue.size= +#queue.buffering.max.messages= # Timeout for event enqueue: # 0: events will be enqueued immediately or dropped if the queue is full # -ve: enqueue will block indefinitely if the queue is full # +ve: enqueue will block up to this many milliseconds if the queue is full -#queue.enqueueTimeout.ms= +#queue.enqueue.timeout.ms= # the number of messages batched at the producer -#batch.size= - -# the callback handler for one or multiple events -#callback.handler= - -# properties required to initialize the callback handler -#callback.handler.props= - -# the handler for events -#event.handler= - -# properties required to initialize the event handler -#event.handler.props= - +#batch.num.messages= diff --git a/config/server.properties b/config/server.properties index f4521fb5aca..9a9cd063e72 100644 --- a/config/server.properties +++ b/config/server.properties @@ -17,7 +17,7 @@ ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. -brokerid=0 +broker.id=0 ############################# Socket Server Settings ############################# @@ -27,22 +27,22 @@ port=9092 # Hostname the broker will bind to and advertise to producers and consumers. # If not set, the server will bind to all interfaces and advertise the value returned from # from java.net.InetAddress.getCanonicalHostName(). -#hostname=localhost +#host.name=localhost # The number of threads handling network requests -network.threads=2 +num.network.threads=2 # The number of threads doing disk I/O -io.threads=2 +num.io.threads=2 # The send buffer (SO_SNDBUF) used by the socket server -socket.send.buffer=1048576 +socket.send.buffer.bytes=1048576 # The receive buffer (SO_RCVBUF) used by the socket server -socket.receive.buffer=1048576 +socket.receive.buffer.bytes=1048576 # The maximum size of a request that the socket server will accept (protection against OOM) -max.socket.request.bytes=104857600 +socket.request.max.bytes=104857600 ############################# Log Basics ############################# @@ -54,9 +54,6 @@ log.dir=/tmp/kafka-logs # for consumption, but also mean more files. num.partitions=1 -# Overrides for for the default given by num.partitions on a per-topic basis -#topic.partition.count.map=topic1:3, topic2:4 - ############################# Log Flush Policy ############################# # The following configurations control the flush of data to disk. This is the most @@ -69,16 +66,13 @@ num.partitions=1 # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk -log.flush.interval=10000 +log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush -log.default.flush.interval.ms=1000 +log.flush.interval.ms=1000 -# Per-topic overrides for log.default.flush.interval.ms -#topic.flush.intervals.ms=topic1:1000, topic2:3000 - -# The interval (in ms) at which logs are checked to see if they need to be flushed to disk. -log.default.flush.scheduler.interval.ms=1000 +# Per-topic overrides for log.flush.interval.ms +#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000 ############################# Log Retention Policy ############################# @@ -91,11 +85,11 @@ log.default.flush.scheduler.interval.ms=1000 log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining -# segments don't drop below log.retention.size. -#log.retention.size=1073741824 +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. -log.file.size=536870912 +log.segment.bytes=536870912 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java index 7f70f9e431d..df17978b0e6 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java @@ -71,7 +71,7 @@ public class DataGenerator { System.out.println("server uri:" + _uri.toString()); Properties producerProps = new Properties(); producerProps.put("broker.list", String.format("%s:%d", _uri.getHost(), _uri.getPort())); - producerProps.put("buffer.size", String.valueOf(TCP_BUFFER_SIZE)); + producerProps.put("send.buffer.bytes", String.valueOf(TCP_BUFFER_SIZE)); producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT)); producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL)); diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java index 9a1c3595815..2fd203509bd 100644 --- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java +++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java @@ -119,10 +119,9 @@ public class KafkaOutputFormat extends OutputFormat - val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagBytes) + val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages) if(outOfSyncReplicas.size > 0) { val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas assert(newInSyncReplicas.size > 0) @@ -281,12 +281,12 @@ class Partition(val topic: String, } } - def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncBytes: Long): Set[Replica] = { + def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncMessages: Long): Set[Replica] = { /** * there are two cases that need to be handled here - * 1. Stuck followers: If the leo of the replica is less than the leo of leader and the leo hasn't been updated * for keepInSyncTimeMs ms, the follower is stuck and should be removed from the ISR - * 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncBytes, the + * 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncMessages, the * follower is not catching up and should be removed from the ISR **/ val leaderLogEndOffset = leaderReplica.logEndOffset @@ -298,7 +298,7 @@ class Partition(val topic: String, val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTimeMs < (time.milliseconds - keepInSyncTimeMs)) debug("Stuck replicas for topic %s partition %d are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(","))) // Case 2 above - val slowReplicas = candidateReplicas.filter(r => r.logEndOffset >= 0 && (leaderLogEndOffset - r.logEndOffset) > keepInSyncBytes) + val slowReplicas = candidateReplicas.filter(r => r.logEndOffset >= 0 && (leaderLogEndOffset - r.logEndOffset) > keepInSyncMessages) debug("Slow replicas for topic %s partition %d are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(","))) stuckReplicas ++ slowReplicas } diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index b857d145496..5dffa7ed891 100644 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -144,14 +144,14 @@ object ConsoleConsumer extends Logging { } val props = new Properties() - props.put("groupid", options.valueOf(groupIdOpt)) - props.put("socket.buffersize", options.valueOf(socketBufferSizeOpt).toString) - props.put("fetch.size", options.valueOf(fetchSizeOpt).toString) - props.put("min.fetch.bytes", options.valueOf(minFetchBytesOpt).toString) - props.put("max.fetch.wait.ms", options.valueOf(maxWaitMsOpt).toString) - props.put("autocommit.enable", "true") - props.put("autocommit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString) - props.put("autooffset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest") + props.put("group.id", options.valueOf(groupIdOpt)) + props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString) + props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString) + props.put("fetch.min.bytes", options.valueOf(minFetchBytesOpt).toString) + props.put("fetch.wait.max.ms", options.valueOf(maxWaitMsOpt).toString) + props.put("auto.commit.enable", "true") + props.put("auto.commit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString) + props.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest") props.put("zk.connect", options.valueOf(zkConnectOpt)) props.put("consumer.timeout.ms", options.valueOf(consumerTimeoutMsOpt).toString) val config = new ConsumerConfig(props) diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index b379c9dd713..45db07b00f1 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -52,11 +52,11 @@ object ConsumerConfig extends Config { } def validateClientId(clientId: String) { - validateChars("clientid", clientId) + validateChars("client.id", clientId) } def validateGroupId(groupId: String) { - validateChars("groupid", groupId) + validateChars("group.id", groupId) } def validateAutoOffsetReset(autoOffsetReset: String) { @@ -77,38 +77,38 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( } /** a string that uniquely identifies a set of consumers within the same consumer group */ - val groupId = props.getString("groupid") + val groupId = props.getString("group.id") /** consumer id: generated automatically if not set. * Set this explicitly for only testing purpose. */ - val consumerId: Option[String] = Option(props.getString("consumerid", null)) + val consumerId: Option[String] = Option(props.getString("consumer.id", null)) /** the socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms. */ val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout) /** the socket receive buffer for network requests */ - val socketBufferSize = props.getInt("socket.buffersize", SocketBufferSize) + val socketReceiveBufferBytes = props.getInt("socket.receive.buffer.bytes", SocketBufferSize) /** the number of byes of messages to attempt to fetch */ - val fetchSize = props.getInt("fetch.size", FetchSize) + val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize) /** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */ - val autoCommit = props.getBoolean("autocommit.enable", AutoCommit) + val autoCommitEnable = props.getBoolean("auto.commit.enable", AutoCommit) /** the frequency in ms that the consumer offsets are committed to zookeeper */ - val autoCommitIntervalMs = props.getInt("autocommit.interval.ms", AutoCommitInterval) + val autoCommitIntervalMs = props.getInt("auto.commit.interval.ms", AutoCommitInterval) /** max number of messages buffered for consumption */ - val maxQueuedChunks = props.getInt("queuedchunks.max", MaxQueuedChunks) + val queuedMaxMessages = props.getInt("queued.max.messages", MaxQueuedChunks) /** max number of retries during rebalance */ - val maxRebalanceRetries = props.getInt("rebalance.retries.max", MaxRebalanceRetries) + val rebalanceMaxRetries = props.getInt("rebalance.max.retries", MaxRebalanceRetries) /** the minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */ - val minFetchBytes = props.getInt("min.fetch.bytes", MinFetchBytes) + val fetchMinBytes = props.getInt("fetch.min.bytes", MinFetchBytes) - /** the maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediate satisfy min.fetch.bytes */ - val maxFetchWaitMs = props.getInt("max.fetch.wait.ms", MaxFetchWaitMs) + /** the maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes */ + val fetchWaitMaxMs = props.getInt("fetch.wait.max.ms", MaxFetchWaitMs) /** backoff time between retries during rebalance */ val rebalanceBackoffMs = props.getInt("rebalance.backoff.ms", zkSyncTimeMs) @@ -120,7 +120,7 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( smallest : automatically reset the offset to the smallest offset largest : automatically reset the offset to the largest offset anything else: throw exception to the consumer */ - val autoOffsetReset = props.getString("autooffset.reset", AutoOffsetReset) + val autoOffsetReset = props.getString("auto.offset.reset", AutoOffsetReset) /** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */ val consumerTimeoutMs = props.getInt("consumer.timeout.ms", ConsumerTimeoutMs) @@ -129,12 +129,12 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( * Typically, it's only used for mirroring raw messages from one kafka cluster to another to save the * overhead of decompression. * */ - val enableShallowIterator = props.getBoolean("shallowiterator.enable", false) + val shallowIteratorEnable = props.getBoolean("shallow.iterator.enable", false) /** * Client id is specified by the kafka consumer client, used to distinguish different clients */ - val clientId = props.getString("clientid", groupId) + val clientId = props.getString("client.id", groupId) validate(this) } diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index 2ce024c8943..713c7c99097 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -33,11 +33,11 @@ class ConsumerFetcherThread(name: String, clientId = config.clientId + "-" + name, sourceBroker = sourceBroker, socketTimeout = config.socketTimeoutMs, - socketBufferSize = config.socketBufferSize, - fetchSize = config.fetchSize, + socketBufferSize = config.socketReceiveBufferBytes, + fetchSize = config.fetchMessageMaxBytes, fetcherBrokerId = Request.OrdinaryConsumerId, - maxWait = config.maxFetchWaitMs, - minBytes = config.minFetchBytes) { + maxWait = config.fetchWaitMaxMs, + minBytes = config.fetchMinBytes) { // process fetched data def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) { diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index aee9293bfe7..42a962893a8 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -112,7 +112,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, connectZk() createFetcher() - if (config.autoCommit) { + if (config.autoCommitEnable) { scheduler.startup info("starting auto committer every " + config.autoCommitIntervalMs + " ms") scheduler.scheduleWithRate(autoCommit, "Kafka-consumer-autocommit-", config.autoCommitIntervalMs, @@ -160,14 +160,14 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, if (wildcardTopicWatcher != null) wildcardTopicWatcher.shutdown() try { - if (config.autoCommit) + if (config.autoCommitEnable) scheduler.shutdownNow() fetcher match { case Some(f) => f.shutdown case None => } sendShutdownToAllQueues() - if (config.autoCommit) + if (config.autoCommitEnable) commitOffsets() if (zkClient != null) { zkClient.close() @@ -194,9 +194,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, // make a list of (queue,stream) pairs, one pair for each threadId val queuesAndStreams = topicThreadIds.values.map(threadIdSet => threadIdSet.map(_ => { - val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks) + val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) val stream = new KafkaStream[K,V]( - queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator, config.clientId) + queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.shallowIteratorEnable, config.clientId) (queue, stream) }) ).flatten.toList @@ -365,7 +365,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, def syncedRebalance() { rebalanceLock synchronized { - for (i <- 0 until config.maxRebalanceRetries) { + for (i <- 0 until config.rebalanceMaxRetries) { info("begin rebalancing consumer " + consumerIdString + " try #" + i) var done = false val cluster = getCluster(zkClient) @@ -393,7 +393,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } } - throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.maxRebalanceRetries +" retries") + throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.rebalanceMaxRetries +" retries") } private def rebalance(cluster: Cluster): Boolean = { @@ -610,7 +610,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, queue, consumedOffset, fetchedOffset, - new AtomicInteger(config.fetchSize), + new AtomicInteger(config.fetchMessageMaxBytes), config.clientId) partTopicInfoMap.put(partition, partTopicInfo) debug(partTopicInfo + " selected new offset " + offset) @@ -709,12 +709,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private val wildcardQueuesAndStreams = (1 to numStreams) .map(e => { - val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks) + val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) val stream = new KafkaStream[K,V](queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, - config.enableShallowIterator, + config.shallowIteratorEnable, config.clientId) (queue, stream) }).toList diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 5f0148c482c..497cfddb872 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -43,15 +43,15 @@ private[kafka] class LogManager(val config: KafkaConfig, val CleanShutdownFile = ".kafka_cleanshutdown" val LockFile = ".lock" val logDirs: Array[File] = config.logDirs.map(new File(_)).toArray - private val logFileSizeMap = config.logFileSizeMap - private val logFlushInterval = config.flushInterval - private val logFlushIntervals = config.flushIntervalMap + private val logFileSizeMap = config.logSegmentBytesPerTopicMap + private val logFlushInterval = config.logFlushIntervalMessages + private val logFlushIntervals = config.logFlushIntervalMsPerTopicMap private val logCreationLock = new Object - private val logRetentionSizeMap = config.logRetentionSizeMap - private val logRetentionMsMap = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms - private val logRollMsMap = config.logRollHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) + private val logRetentionSizeMap = config.logRetentionBytesPerTopicMap + private val logRetentionMsMap = config.logRetentionHoursPerTopicMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms + private val logRollMsMap = config.logRollHoursPerTopicMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) private val logRollDefaultIntervalMs = 1000L * 60 * 60 * config.logRollHours - private val logCleanupIntervalMs = 1000L * 60 * config.logCleanupIntervalMinutes + private val logCleanupIntervalMs = 1000L * 60 * config.logCleanupIntervalMins private val logCleanupDefaultAgeMs = 1000L * 60 * 60 * config.logRetentionHours this.logIdent = "[Log Manager on Broker " + config.brokerId + "] " @@ -111,14 +111,14 @@ private[kafka] class LogManager(val config: KafkaConfig, info("Loading log '" + dir.getName + "'") val topicPartition = parseTopicPartitionName(dir.getName) val rollIntervalMs = logRollMsMap.get(topicPartition.topic).getOrElse(this.logRollDefaultIntervalMs) - val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logFileSize) + val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logSegmentBytes) val log = new Log(dir, maxLogFileSize, - config.maxMessageSize, + config.messageMaxBytes, logFlushInterval, rollIntervalMs, needsRecovery, - config.logIndexMaxSizeBytes, + config.logIndexSizeMaxBytes, config.logIndexIntervalBytes, time, config.brokerId) @@ -139,10 +139,10 @@ private[kafka] class LogManager(val config: KafkaConfig, if(scheduler != null) { info("Starting log cleaner every " + logCleanupIntervalMs + " ms") scheduler.scheduleWithRate(cleanupLogs, "kafka-logcleaner-", 60 * 1000, logCleanupIntervalMs, false) - info("Starting log flusher every " + config.flushSchedulerThreadRate + + info("Starting log flusher every " + config.logFlushSchedulerIntervalMs + " ms with the following overrides " + logFlushIntervals) scheduler.scheduleWithRate(flushDirtyLogs, "kafka-logflusher-", - config.flushSchedulerThreadRate, config.flushSchedulerThreadRate, false) + config.logFlushSchedulerIntervalMs, config.logFlushSchedulerIntervalMs, false) } } @@ -186,14 +186,14 @@ private[kafka] class LogManager(val config: KafkaConfig, val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition) dir.mkdirs() val rollIntervalMs = logRollMsMap.get(topicAndPartition.topic).getOrElse(this.logRollDefaultIntervalMs) - val maxLogFileSize = logFileSizeMap.get(topicAndPartition.topic).getOrElse(config.logFileSize) + val maxLogFileSize = logFileSizeMap.get(topicAndPartition.topic).getOrElse(config.logSegmentBytes) log = new Log(dir, maxLogFileSize, - config.maxMessageSize, + config.messageMaxBytes, logFlushInterval, rollIntervalMs, needsRecovery = false, - config.logIndexMaxSizeBytes, + config.logIndexSizeMaxBytes, config.logIndexIntervalBytes, time, config.brokerId) @@ -249,7 +249,7 @@ private[kafka] class LogManager(val config: KafkaConfig, */ private def cleanupSegmentsToMaintainSize(log: Log): Int = { val topic = parseTopicPartitionName(log.dir.getName).topic - val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionSize) + val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionBytes) if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize) return 0 var diff = log.size - maxLogRetentionSize def shouldDelete(segment: LogSegment) = { @@ -310,7 +310,7 @@ private[kafka] class LogManager(val config: KafkaConfig, for (log <- allLogs) { try { val timeSinceLastFlush = System.currentTimeMillis - log.getLastFlushedTime - var logFlushInterval = config.defaultFlushIntervalMs + var logFlushInterval = config.logFlushIntervalMs if(logFlushIntervals.contains(log.topicName)) logFlushInterval = logFlushIntervals(log.topicName) debug(log.topicName + " flush interval " + logFlushInterval + diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala index 4e2f2af03f0..1a98174065e 100644 --- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala @@ -125,12 +125,12 @@ object ConsoleProducer { props.put("compression.codec", codec.toString) props.put("producer.type", if(sync) "sync" else "async") if(options.has(batchSizeOpt)) - props.put("batch.size", batchSize.toString) - props.put("queue.time", sendTimeout.toString) - props.put("queue.size", queueSize.toString) + props.put("batch.num.messages", batchSize.toString) + props.put("queue.buffering.max.ms", sendTimeout.toString) + props.put("queue.buffering.max.messages", queueSize.toString) props.put("queue.enqueueTimeout.ms", queueEnqueueTimeoutMs.toString) - props.put("producer.request.required.acks", requestRequiredAcks.toString) - props.put("producer.request.timeout.ms", requestTimeoutMs.toString) + props.put("request.required.acks", requestRequiredAcks.toString) + props.put("request.timeout.ms", requestTimeoutMs.toString) props.put("key.serializer.class", keyEncoderClass) props.put("serializer.class", valueEncoderClass) diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala index a7c101a2868..af077e0ef37 100644 --- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala +++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala @@ -73,8 +73,8 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { //These have default values in ProducerConfig and AsyncProducerConfig. We don't care if they're not specified if(producerType != null) props.put("producer.type", producerType) if(compressionCodec != null) props.put("compression.codec", compressionCodec) - if(enqueueTimeout != null) props.put("queue.enqueueTimeout.ms", enqueueTimeout) - if(queueSize != null) props.put("queue.size", queueSize) + if(enqueueTimeout != null) props.put("queue.enqueue.timeout.ms", enqueueTimeout) + if(queueSize != null) props.put("queue.buffering.max.messages", queueSize) val config : ProducerConfig = new ProducerConfig(props) producer = new Producer[String, String](config) LogLog.debug("Kafka producer connected to " + config.brokerList) diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index a1835257435..66638f233ee 100644 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -31,7 +31,7 @@ class Producer[K,V](config: ProducerConfig, extends Logging { private val hasShutdown = new AtomicBoolean(false) - private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueSize) + private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages) private val random = new Random private var sync: Boolean = true @@ -44,8 +44,8 @@ class Producer[K,V](config: ProducerConfig, producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + asyncProducerID, queue, eventHandler, - config.queueTime, - config.batchSize, + config.queueBufferingMaxMs, + config.batchNumMessages, config.clientId) producerSendThread.start() } @@ -87,17 +87,17 @@ class Producer[K,V](config: ProducerConfig, private def asyncSend(messages: Seq[KeyedMessage[K,V]]) { for (message <- messages) { - val added = config.enqueueTimeoutMs match { + val added = config.queueEnqueueTimeoutMs match { case 0 => queue.offer(message) case _ => try { - config.enqueueTimeoutMs < 0 match { + config.queueEnqueueTimeoutMs < 0 match { case true => queue.put(message) true case _ => - queue.offer(message, config.enqueueTimeoutMs, TimeUnit.MILLISECONDS) + queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS) } } catch { diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala index 235b228c895..e27ec44a83f 100644 --- a/core/src/main/scala/kafka/producer/ProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala @@ -26,12 +26,12 @@ import kafka.common.{InvalidConfigException, Config} object ProducerConfig extends Config { def validate(config: ProducerConfig) { validateClientId(config.clientId) - validateBatchSize(config.batchSize, config.queueSize) + validateBatchSize(config.batchNumMessages, config.queueBufferingMaxMessages) validateProducerType(config.producerType) } def validateClientId(clientId: String) { - validateChars("clientid", clientId) + validateChars("client.id", clientId) } def validateBatchSize(batchSize: Int, queueSize: Int) { @@ -101,17 +101,16 @@ class ProducerConfig private (val props: VerifiableProperties) */ val compressedTopics = Utils.parseCsvList(props.getString("compressed.topics", null)) - /** - * The producer using the zookeeper software load balancer maintains a ZK cache that gets - * updated by the zookeeper watcher listeners. During some events like a broker bounce, the - * producer ZK cache can get into an inconsistent state, for a small time period. In this time - * period, it could end up picking a broker partition that is unavailable. When this happens, the - * ZK cache needs to be updated. - * This parameter specifies the number of times the producer attempts to refresh this ZK cache. - */ - val producerRetries = props.getInt("producer.num.retries", 3) + /** The leader may be unavailable transiently, which can fail the sending of a message. + * This property specifies the number of retries when such failures occur. + */ + val messageSendMaxRetries = props.getInt("message.send.max.retries", 3) - val producerRetryBackoffMs = props.getInt("producer.retry.backoff.ms", 100) + /** Before each retry, the producer refreshes the metadata of relevant topics. Since leader + * election takes a bit of time, this property specifies the amount of time that the producer + * waits before refreshing the metadata. + */ + val retryBackoffMs = props.getInt("retry.backoff.ms", 100) /** * The producer generally refreshes the topic metadata from brokers when there is a failure @@ -121,7 +120,7 @@ class ProducerConfig private (val props: VerifiableProperties) * Important note: the refresh happen only AFTER the message is sent, so if the producer never sends * a message the metadata is never refreshed */ - val topicMetadataRefreshIntervalMs = props.getInt("producer.metadata.refresh.interval.ms", 600000) + val topicMetadataRefreshIntervalMs = props.getInt("topic.metadata.refresh.interval.ms", 600000) validate(this) } diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index 0ef320b9d14..0469a391c66 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -36,7 +36,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { private val lock = new Object() @volatile private var shutdown: Boolean = false private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize, - config.bufferSize, config.requestTimeoutMs) + config.sendBufferBytes, config.requestTimeoutMs) val brokerInfo = "host_%s-port_%s".format(config.host, config.port) val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId) diff --git a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala index 5ebd29af996..ef326203426 100644 --- a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala @@ -36,24 +36,22 @@ class SyncProducerConfig private (val props: VerifiableProperties) extends SyncP trait SyncProducerConfigShared { val props: VerifiableProperties - val bufferSize = props.getInt("buffer.size", 100*1024) - - val maxMessageSize = props.getInt("max.message.size", 1000000) + val sendBufferBytes = props.getInt("send.buffer.bytes", 100*1024) /* the client application sending the producer requests */ - val clientId = props.getString("clientid", SyncProducerConfig.DefaultClientId) + val clientId = props.getString("client.id", SyncProducerConfig.DefaultClientId) /* * The required acks of the producer requests - negative value means ack * after the replicas in ISR have caught up to the leader's offset * corresponding to this produce request. */ - val requiredAcks = props.getShort("producer.request.required.acks", SyncProducerConfig.DefaultRequiredAcks) + val requestRequiredAcks = props.getShort("request.required.acks", SyncProducerConfig.DefaultRequiredAcks) /* * The ack timeout of the producer requests. Value must be non-negative and non-zero */ - val requestTimeoutMs = props.getIntInRange("producer.request.timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs, + val requestTimeoutMs = props.getIntInRange("request.timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs, (1, Integer.MAX_VALUE)) } diff --git a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala index 07935d7ccc7..973fa08c9af 100644 --- a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala @@ -22,10 +22,10 @@ trait AsyncProducerConfig { val props: VerifiableProperties /* maximum time, in milliseconds, for buffering data on the producer queue */ - val queueTime = props.getInt("queue.time", 5000) + val queueBufferingMaxMs = props.getInt("queue.buffering.max.ms", 5000) /** the maximum size of the blocking queue for buffering on the producer */ - val queueSize = props.getInt("queue.size", 10000) + val queueBufferingMaxMessages = props.getInt("queue.buffering.max.messages", 10000) /** * Timeout for event enqueue: @@ -33,10 +33,10 @@ trait AsyncProducerConfig { * -ve: enqueue will block indefinitely if the queue is full * +ve: enqueue will block up to this many milliseconds if the queue is full */ - val enqueueTimeoutMs = props.getInt("queue.enqueueTimeout.ms", 0) + val queueEnqueueTimeoutMs = props.getInt("queue.enqueue.timeout.ms", 0) /** the number of messages batched at the producer */ - val batchSize = props.getInt("batch.size", 200) + val batchNumMessages = props.getInt("batch.num.messages", 200) /** the serializer class for values */ val serializerClass = props.getString("serializer.class", "kafka.serializer.DefaultEncoder") diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 58f582f92b1..9a4e4bc0371 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -59,7 +59,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, producerTopicStats.getProducerAllTopicStats.byteRate.mark(dataSize) } var outstandingProduceRequests = serializedData - var remainingRetries = config.producerRetries + 1 + var remainingRetries = config.messageSendMaxRetries + 1 val correlationIdStart = correlationId.get() while (remainingRetries > 0 && outstandingProduceRequests.size > 0) { topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic) @@ -72,7 +72,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests) if (outstandingProduceRequests.size > 0) { // back off and update the topic metadata cache before attempting another send operation - Thread.sleep(config.producerRetryBackoffMs) + Thread.sleep(config.retryBackoffMs) // get topics of the outstanding produce requests and refresh metadata for those Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement)) remainingRetries -= 1 @@ -81,9 +81,10 @@ class DefaultEventHandler[K,V](config: ProducerConfig, } if(outstandingProduceRequests.size > 0) { producerStats.failedSendRate.mark() + val correlationIdEnd = correlationId.get() error("Failed to send the following requests with correlation ids in [%d,%d]: %s".format(correlationIdStart, correlationIdEnd-1, outstandingProduceRequests)) - throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null) + throw new FailedToSendMessageException("Failed to send messages after " + config.messageSendMaxRetries + " tries.", null) } } } @@ -231,7 +232,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, messagesPerTopic.keys.toSeq } else if(messagesPerTopic.size > 0) { val currentCorrelationId = correlationId.getAndIncrement - val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requiredAcks, + val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks, config.requestTimeoutMs, messagesPerTopic) var failedTopicPartitions = Seq.empty[TopicAndPartition] try { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index e2dfb3e09ad..60752fb2fdf 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -41,9 +41,9 @@ class KafkaApis(val requestChannel: RequestChannel, brokerId: Int) extends Logging { private val producerRequestPurgatory = - new ProducerRequestPurgatory(replicaManager.config.producerRequestPurgatoryPurgeInterval) + new ProducerRequestPurgatory(replicaManager.config.producerPurgatoryPurgeIntervalRequests) private val fetchRequestPurgatory = - new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchRequestPurgatoryPurgeInterval) + new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) private val delayedRequestMetrics = new DelayedRequestMetrics private val requestLogger = Logger.getLogger("kafka.request.logger") @@ -442,7 +442,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ErrorMapping.UnknownTopicOrPartitionCode => try { /* check if auto creation of topics is turned on */ - if (config.autoCreateTopics) { + if (config.autoCreateTopicsEnable) { try { CreateTopicCommand.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor) info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 962b65f1a3e..f65db338793 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -36,37 +36,37 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /*********** General Configuration ***********/ /* the broker id for this server */ - val brokerId: Int = props.getIntInRange("brokerid", (0, Int.MaxValue)) + val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue)) /* the maximum size of message that the server can receive */ - val maxMessageSize = props.getIntInRange("max.message.size", 1000000, (0, Int.MaxValue)) + val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000, (0, Int.MaxValue)) /* the number of network threads that the server uses for handling network requests */ - val numNetworkThreads = props.getIntInRange("network.threads", 3, (1, Int.MaxValue)) + val numNetworkThreads = props.getIntInRange("num.network.threads", 3, (1, Int.MaxValue)) /* the number of io threads that the server uses for carrying out network requests */ - val numIoThreads = props.getIntInRange("io.threads", 8, (1, Int.MaxValue)) + val numIoThreads = props.getIntInRange("num.io.threads", 8, (1, Int.MaxValue)) /* the number of queued requests allowed before blocking the network threads */ - val numQueuedRequests = props.getIntInRange("max.queued.requests", 500, (1, Int.MaxValue)) + val queuedMaxRequests = props.getIntInRange("queued.max.requests", 500, (1, Int.MaxValue)) /*********** Socket Server Configuration ***********/ /* the port to listen and accept connections on */ val port: Int = props.getInt("port", 6667) - /* hostname of broker. If this is set, it will only bind to this address. If this is not set, + /* hostname of broker. If this is set, it will only bind to this address. If this is not set, * it will bind to all interfaces, and publish one to ZK */ - val hostName: String = props.getString("hostname", null) + val hostName: String = props.getString("host.name", null) /* the SO_SNDBUFF buffer of the socket sever sockets */ - val socketSendBuffer: Int = props.getInt("socket.send.buffer", 100*1024) + val socketSendBufferBytes: Int = props.getInt("socket.send.buffer.bytes", 100*1024) /* the SO_RCVBUFF buffer of the socket sever sockets */ - val socketReceiveBuffer: Int = props.getInt("socket.receive.buffer", 100*1024) + val socketReceiveBufferBytes: Int = props.getInt("socket.receive.buffer.bytes", 100*1024) /* the maximum number of bytes in a socket request */ - val maxSocketRequestSize: Int = props.getIntInRange("max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue)) + val socketRequestMaxBytes: Int = props.getIntInRange("socket.request.max.bytes", 100*1024*1024, (1, Int.MaxValue)) /*********** Log Configuration ***********/ @@ -74,56 +74,56 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue)) /* the directories in which the log data is kept */ - val logDirs = Utils.parseCsvList(props.getString("log.directories", props.getString("log.dir", ""))) + val logDirs = Utils.parseCsvList(props.getString("log.dirs", props.getString("log.dir", "/tmp/kafka-logs"))) require(logDirs.size > 0) /* the maximum size of a single log file */ - val logFileSize = props.getIntInRange("log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue)) + val logSegmentBytes = props.getIntInRange("log.segment.bytes", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue)) /* the maximum size of a single log file for some specific topic */ - val logFileSizeMap = props.getMap("topic.log.file.size", _.toInt > 0).mapValues(_.toInt) + val logSegmentBytesPerTopicMap = props.getMap("log.segment.bytes.per.topic", _.toInt > 0).mapValues(_.toInt) /* the maximum time before a new log segment is rolled out */ val logRollHours = props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue)) /* the number of hours before rolling out a new log segment for some specific topic */ - val logRollHoursMap = props.getMap("topic.log.roll.hours", _.toInt > 0).mapValues(_.toInt) + val logRollHoursPerTopicMap = props.getMap("log.roll.hours.per.topic", _.toInt > 0).mapValues(_.toInt) /* the number of hours to keep a log file before deleting it */ val logRetentionHours = props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue)) /* the number of hours to keep a log file before deleting it for some specific topic*/ - val logRetentionHoursMap = props.getMap("topic.log.retention.hours", _.toInt > 0).mapValues(_.toInt) + val logRetentionHoursPerTopicMap = props.getMap("log.retention.hours.per.topic", _.toInt > 0).mapValues(_.toInt) /* the maximum size of the log before deleting it */ - val logRetentionSize = props.getLong("log.retention.size", -1) + val logRetentionBytes = props.getLong("log.retention.bytes", -1) /* the maximum size of the log for some specific topic before deleting it */ - val logRetentionSizeMap = props.getMap("topic.log.retention.size", _.toLong > 0).mapValues(_.toLong) + val logRetentionBytesPerTopicMap = props.getMap("log.retention.bytes.per.topic", _.toLong > 0).mapValues(_.toLong) /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */ - val logCleanupIntervalMinutes = props.getIntInRange("log.cleanup.interval.mins", 10, (1, Int.MaxValue)) + val logCleanupIntervalMins = props.getIntInRange("log.cleanup.interval.mins", 10, (1, Int.MaxValue)) /* the maximum size in bytes of the offset index */ - val logIndexMaxSizeBytes = props.getIntInRange("log.index.max.size", 10*1024*1024, (4, Int.MaxValue)) + val logIndexSizeMaxBytes = props.getIntInRange("log.index.size.max.bytes", 10*1024*1024, (4, Int.MaxValue)) /* the interval with which we add an entry to the offset index */ val logIndexIntervalBytes = props.getIntInRange("log.index.interval.bytes", 4096, (0, Int.MaxValue)) /* the number of messages accumulated on a log partition before messages are flushed to disk */ - val flushInterval = props.getIntInRange("log.flush.interval", 500, (1, Int.MaxValue)) + val logFlushIntervalMessages = props.getIntInRange("log.flush.interval.messages", 500, (1, Int.MaxValue)) /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */ - val flushIntervalMap = props.getMap("topic.flush.intervals.ms", _.toInt > 0).mapValues(_.toInt) + val logFlushIntervalMsPerTopicMap = props.getMap("log.flush.interval.ms.per.topic", _.toInt > 0).mapValues(_.toInt) /* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */ - val flushSchedulerThreadRate = props.getInt("log.default.flush.scheduler.interval.ms", 3000) + val logFlushSchedulerIntervalMs = props.getInt("log.flush.scheduler.interval.ms", 3000) /* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */ - val defaultFlushIntervalMs = props.getInt("log.default.flush.interval.ms", flushSchedulerThreadRate) + val logFlushIntervalMs = props.getInt("log.flush.interval.ms", logFlushSchedulerIntervalMs) /* enable auto creation of topic on the server */ - val autoCreateTopics = props.getBoolean("auto.create.topics", true) + val autoCreateTopicsEnable = props.getBoolean("auto.create.topics.enable", true) /*********** Replication configuration ***********/ @@ -136,36 +136,38 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* default replication factors for automatically created topics */ val defaultReplicationFactor = props.getInt("default.replication.factor", 1) - val replicaMaxLagTimeMs = props.getLong("replica.max.lag.time.ms", 10000) + /* If a follower hasn't sent any fetch requests during this time, the leader will remove the follower from isr */ + val replicaLagTimeMaxMs = props.getLong("replica.lag.time.max.ms", 10000) - val replicaMaxLagBytes = props.getLong("replica.max.lag.bytes", 4000) + /* If the lag in messages between a leader and a follower exceeds this number, the leader will remove the follower from isr */ + val replicaLagMaxMessages = props.getLong("replica.lag.max.messages", 4000) /* the socket timeout for network requests */ val replicaSocketTimeoutMs = props.getInt("replica.socket.timeout.ms", ConsumerConfig.SocketTimeout) /* the socket receive buffer for network requests */ - val replicaSocketBufferSize = props.getInt("replica.socket.buffersize", ConsumerConfig.SocketBufferSize) + val replicaSocketReceiveBufferBytes = props.getInt("replica.socket.receive.buffer.bytes", ConsumerConfig.SocketBufferSize) /* the number of byes of messages to attempt to fetch */ - val replicaFetchSize = props.getInt("replica.fetch.size", ConsumerConfig.FetchSize) + val replicaFetchMaxBytes = props.getInt("replica.fetch.max.bytes", ConsumerConfig.FetchSize) /* max wait time for each fetcher request issued by follower replicas*/ - val replicaMaxWaitTimeMs = props.getInt("replica.fetch.wait.time.ms", 500) + val replicaFetchWaitMaxMs = props.getInt("replica.fetch.wait.max.ms", 500) /* minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs */ - val replicaMinBytes = props.getInt("replica.fetch.min.bytes", 1) + val replicaFetchMinBytes = props.getInt("replica.fetch.min.bytes", 1) /* number of fetcher threads used to replicate messages from a source broker. * Increasing this value can increase the degree of I/O parallelism in the follower broker. */ - val numReplicaFetchers = props.getInt("replica.fetchers", 1) + val numReplicaFetchers = props.getInt("num.replica.fetchers", 1) - /* the frequency with which the highwater mark is saved out to disk */ - val highWaterMarkCheckpointIntervalMs = props.getLong("replica.highwatermark.checkpoint.ms", 5000L) + /* the frequency with which the high watermark is saved out to disk */ + val replicaHighWatermarkCheckpointIntervalMs = props.getLong("replica.high.watermark.checkpoint.interval.ms", 5000L) /* the purge interval (in number of requests) of the fetch request purgatory */ - val fetchRequestPurgatoryPurgeInterval = props.getInt("fetch.purgatory.purge.interval", 10000) + val fetchPurgatoryPurgeIntervalRequests = props.getInt("fetch.purgatory.purge.interval.requests", 10000) /* the purge interval (in number of requests) of the producer request purgatory */ - val producerRequestPurgatoryPurgeInterval = props.getInt("producer.purgatory.purge.interval", 10000) + val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 10000) } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index ae35e4feed0..1fe1ca9ddf8 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -65,8 +65,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg config.hostName, config.port, config.numNetworkThreads, - config.numQueuedRequests, - config.maxSocketRequestSize) + config.queuedMaxRequests, + config.socketRequestMaxBytes) socketServer.startup diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index c1d323510dd..6ae601ed7db 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -31,11 +31,11 @@ class ReplicaFetcherThread(name:String, clientId = FetchRequest.ReplicaFetcherClientId, sourceBroker = sourceBroker, socketTimeout = brokerConfig.replicaSocketTimeoutMs, - socketBufferSize = brokerConfig.replicaSocketBufferSize, - fetchSize = brokerConfig.replicaFetchSize, + socketBufferSize = brokerConfig.replicaSocketReceiveBufferBytes, + fetchSize = brokerConfig.replicaFetchMaxBytes, fetcherBrokerId = brokerConfig.brokerId, - maxWait = brokerConfig.replicaMaxWaitTimeMs, - minBytes = brokerConfig.replicaMinBytes) { + maxWait = brokerConfig.replicaFetchWaitMaxMs, + minBytes = brokerConfig.replicaFetchMinBytes) { // process fetched data def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 42068cad3a2..064af6bd826 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -72,7 +72,7 @@ class ReplicaManager(val config: KafkaConfig, def startHighWaterMarksCheckPointThread() = { if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true)) - kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.highWaterMarkCheckpointIntervalMs) + kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.replicaHighWatermarkCheckpointIntervalMs) } /** @@ -91,7 +91,7 @@ class ReplicaManager(val config: KafkaConfig, def startup() { // start ISR expiration thread - kafkaScheduler.scheduleWithRate(maybeShrinkIsr, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs) + kafkaScheduler.scheduleWithRate(maybeShrinkIsr, "isr-expiration-thread-", 0, config.replicaLagTimeMaxMs) } def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = { @@ -244,7 +244,7 @@ class ReplicaManager(val config: KafkaConfig, private def maybeShrinkIsr(): Unit = { trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR") leaderPartitionsLock synchronized { - leaderPartitions.foreach(partition => partition.maybeShrinkIsr(config.replicaMaxLagTimeMs, config.replicaMaxLagBytes)) + leaderPartitions.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages)) } } diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java index 36a119bce7d..1f5c7ba9235 100644 --- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java +++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java @@ -182,9 +182,9 @@ public class KafkaMigrationTool Properties kafkaConsumerProperties_07 = new Properties(); kafkaConsumerProperties_07.load(new FileInputStream(consumerConfigFile_07)); /** Disable shallow iteration because the message format is different between 07 and 08, we have to get each individual message **/ - if(kafkaConsumerProperties_07.getProperty("shallowiterator.enable", "").equals("true")){ + if(kafkaConsumerProperties_07.getProperty("shallow.iterator.enable", "").equals("true")){ logger.warn("Shallow iterator should not be used in the migration tool"); - kafkaConsumerProperties_07.setProperty("shallowiterator.enable", "false"); + kafkaConsumerProperties_07.setProperty("shallow.iterator.enable", "false"); } Object consumerConfig_07 = ConsumerConfigConstructor_07.newInstance(kafkaConsumerProperties_07); diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala index db14c825af5..d744a78a78b 100644 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -42,12 +42,12 @@ object ReplayLogProducer extends Logging { // consumer properties val consumerProps = new Properties - consumerProps.put("groupid", GroupId) + consumerProps.put("group.id", GroupId) consumerProps.put("zk.connect", config.zkConnect) consumerProps.put("consumer.timeout.ms", "10000") - consumerProps.put("autooffset.reset", OffsetRequest.SmallestTimeString) - consumerProps.put("fetch.size", (1024*1024).toString) - consumerProps.put("socket.buffer.size", (2 * 1024 * 1024).toString) + consumerProps.put("auto.offset.reset", OffsetRequest.SmallestTimeString) + consumerProps.put("fetch.message.max.bytes", (1024*1024).toString) + consumerProps.put("socket.receive.buffer.bytes", (2 * 1024 * 1024).toString) val consumerConfig = new ConsumerConfig(consumerProps) val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig) val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(config.inputTopic -> config.numThreads)) @@ -141,10 +141,10 @@ object ReplayLogProducer extends Logging { val props = new Properties() props.put("broker.list", config.brokerList) props.put("reconnect.interval", Integer.MAX_VALUE.toString) - props.put("buffer.size", (64*1024).toString) + props.put("send.buffer.bytes", (64*1024).toString) props.put("compression.codec", config.compressionCodec.codec.toString) - props.put("batch.size", config.batchSize.toString) - props.put("queue.enqueueTimeout.ms", "-1") + props.put("batch.num.messages", config.batchSize.toString) + props.put("queue.enqueue.timeout.ms", "-1") if(config.isAsync) props.put("producer.type", "async") diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 5ba5938c897..f59440449ec 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -785,11 +785,11 @@ class ZKConfig(props: VerifiableProperties) { val zkConnect = props.getString("zk.connect", null) /** zookeeper session timeout */ - val zkSessionTimeoutMs = props.getInt("zk.sessiontimeout.ms", 6000) + val zkSessionTimeoutMs = props.getInt("zk.session.timeout.ms", 6000) /** the max time that the client waits to establish a connection to zookeeper */ - val zkConnectionTimeoutMs = props.getInt("zk.connectiontimeout.ms",zkSessionTimeoutMs) + val zkConnectionTimeoutMs = props.getInt("zk.connection.timeout.ms",zkSessionTimeoutMs) /** how far a ZK follower can be behind a ZK leader */ - val zkSyncTimeMs = props.getInt("zk.synctime.ms", 2000) + val zkSyncTimeMs = props.getInt("zk.sync.time.ms", 2000) } diff --git a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala index 5be4f4e7c57..98c12b79467 100644 --- a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala +++ b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala @@ -35,9 +35,9 @@ object TestEndToEndLatency { val topic = "test" val consumerProps = new Properties() - consumerProps.put("groupid", topic) + consumerProps.put("group.id", topic) consumerProps.put("auto.commit", "true") - consumerProps.put("autooffset.reset", "largest") + consumerProps.put("auto.offset.reset", "largest") consumerProps.put("zk.connect", zkConnect) consumerProps.put("socket.timeout.ms", 1201000.toString) diff --git a/core/src/test/scala/other/kafka/TestLogPerformance.scala b/core/src/test/scala/other/kafka/TestLogPerformance.scala index 75c33e0345a..9f3bb40728f 100644 --- a/core/src/test/scala/other/kafka/TestLogPerformance.scala +++ b/core/src/test/scala/other/kafka/TestLogPerformance.scala @@ -33,7 +33,7 @@ object TestLogPerformance { val props = TestUtils.createBrokerConfig(0, -1) val config = new KafkaConfig(props) val dir = TestUtils.tempDir() - val log = new Log(dir, 50*1024*1024, config.maxMessageSize, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, time = SystemTime) + val log = new Log(dir, 50*1024*1024, config.messageMaxBytes, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, time = SystemTime) val bytes = new Array[Byte](messageSize) new java.util.Random().nextBytes(bytes) val message = new Message(bytes) diff --git a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala b/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala index 5b72eeda35b..31534ca303e 100644 --- a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala +++ b/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala @@ -31,7 +31,7 @@ object TestZKConsumerOffsets { val topic = args(1) val autoOffsetReset = args(2) val props = Utils.loadProps(args(0)) - props.put("autooffset.reset", "largest") + props.put("auto.offset.reset", "largest") val config = new ConsumerConfig(props) val consumerConnector: ConsumerConnector = Consumer.create(config) diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala index d7945a510dc..4c646f0803e 100644 --- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala +++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala @@ -78,9 +78,9 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L // update offset in zookeeper for consumer to jump "forward" in time val dirs = new ZKGroupTopicDirs(group, topic) var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer) - consumerProps.put("autooffset.reset", resetTo) + consumerProps.put("auto.offset.reset", resetTo) consumerProps.put("consumer.timeout.ms", "2000") - consumerProps.put("max.fetch.wait.ms", "0") + consumerProps.put("fetch.wait.max.ms", "0") val consumerConfig = new ConsumerConfig(consumerProps) TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0", offset) diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala index caea8586bfe..0fde254daf0 100644 --- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala @@ -35,12 +35,12 @@ trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarnes val props = new Properties() props.put("partitioner.class", "kafka.utils.StaticPartitioner") props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) - props.put("buffer.size", "65536") + props.put("send.buffer.bytes", "65536") props.put("connect.timeout.ms", "100000") props.put("reconnect.interval", "10000") - props.put("producer.retry.backoff.ms", "1000") - props.put("producer.num.retries", "3") - props.put("producer.request.required.acks", "-1") + props.put("retry.backoff.ms", "1000") + props.put("message.send.max.retries", "3") + props.put("request.required.acks", "-1") props.put("serializer.class", classOf[StringEncoder].getName.toString) producer = new Producer(new ProducerConfig(props)) consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "") diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index b06d8128805..ce893bfe842 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -40,8 +40,8 @@ class LogManagerTest extends JUnit3Suite { override def setUp() { super.setUp() config = new KafkaConfig(TestUtils.createBrokerConfig(0, -1)) { - override val logFileSize = 1024 - override val flushInterval = 10000 + override val logSegmentBytes = 1024 + override val logFlushIntervalMessages = 10000 override val logRetentionHours = maxLogAgeHours } scheduler.startup @@ -114,10 +114,10 @@ class LogManagerTest extends JUnit3Suite { val props = TestUtils.createBrokerConfig(0, -1) logManager.shutdown() config = new KafkaConfig(props) { - override val logFileSize = (10 * (setSize - 1)) // each segment will be 10 messages - override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long] + override val logSegmentBytes = (10 * (setSize - 1)) // each segment will be 10 messages + override val logRetentionBytes = (5 * 10 * setSize + 10).asInstanceOf[Long] override val logRetentionHours = retentionHours - override val flushInterval = 100 + override val logFlushIntervalMessages = 100 override val logRollHours = maxRollInterval } logManager = new LogManager(config, scheduler, time) @@ -158,11 +158,11 @@ class LogManagerTest extends JUnit3Suite { val props = TestUtils.createBrokerConfig(0, -1) logManager.shutdown() config = new KafkaConfig(props) { - override val logFileSize = 1024 *1024 *1024 - override val flushSchedulerThreadRate = 50 - override val flushInterval = Int.MaxValue + override val logSegmentBytes = 1024 *1024 *1024 + override val logFlushSchedulerIntervalMs = 50 + override val logFlushIntervalMessages = Int.MaxValue override val logRollHours = maxRollInterval - override val flushIntervalMap = Map("timebasedflush" -> 100) + override val logFlushIntervalMsPerTopicMap = Map("timebasedflush" -> 100) } logManager = new LogManager(config, scheduler, time) logManager.startup @@ -173,7 +173,7 @@ class LogManagerTest extends JUnit3Suite { } val ellapsed = System.currentTimeMillis - log.getLastFlushedTime assertTrue("The last flush time has to be within defaultflushInterval of current time (was %d)".format(ellapsed), - ellapsed < 2*config.flushSchedulerThreadRate) + ellapsed < 2*config.logFlushSchedulerIntervalMs) } @Test @@ -183,7 +183,7 @@ class LogManagerTest extends JUnit3Suite { val dirs = Seq(TestUtils.tempDir().getAbsolutePath, TestUtils.tempDir().getAbsolutePath, TestUtils.tempDir().getAbsolutePath) - props.put("log.directories", dirs.mkString(",")) + props.put("log.dirs", dirs.mkString(",")) logManager.shutdown() logManager = new LogManager(new KafkaConfig(props), scheduler, time) diff --git a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala index c6ea3b61009..b343d98232b 100644 --- a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala @@ -198,15 +198,15 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { private def createBrokerConfig(nodeId: Int, port: Int): Properties = { val props = new Properties - props.put("brokerid", nodeId.toString) + props.put("broker.id", nodeId.toString) props.put("port", port.toString) props.put("log.dir", getLogDir.getAbsolutePath) - props.put("log.flush.interval", "1") + props.put("log.flush.interval.messages", "1") props.put("enable.zookeeper", "false") props.put("num.partitions", "20") props.put("log.retention.hours", "10") props.put("log.cleanup.interval.mins", "5") - props.put("log.file.size", logSize.toString) + props.put("log.segment.bytes", logSize.toString) props.put("zk.connect", zkConnect.toString) props } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 900d0e2f33b..786ae038d51 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -62,7 +62,7 @@ class LogTest extends JUnitSuite { val time: MockTime = new MockTime() // create a log - val log = new Log(logDir, 1000, config.maxMessageSize, 1000, rollMs, needsRecovery = false, time = time) + val log = new Log(logDir, 1000, config.messageMaxBytes, 1000, rollMs, needsRecovery = false, time = time) time.currentMs += rollMs + 1 // segment age is less than its limit @@ -96,7 +96,7 @@ class LogTest extends JUnitSuite { val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages // create a log - val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time) + val log = new Log(logDir, logFileSize, config.messageMaxBytes, 1000, 10000, needsRecovery = false, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) // segments expire in size @@ -109,12 +109,12 @@ class LogTest extends JUnitSuite { @Test def testLoadEmptyLog() { createEmptyLogs(logDir, 0) - new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + new Log(logDir, 1024, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) } @Test def testAppendAndRead() { - val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + val log = new Log(logDir, 1024, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) val message = new Message(Integer.toString(42).getBytes()) for(i <- 0 until 10) log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) @@ -131,7 +131,7 @@ class LogTest extends JUnitSuite { @Test def testReadOutOfRange() { createEmptyLogs(logDir, 1024) - val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + val log = new Log(logDir, 1024, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).sizeInBytes) try { log.read(0, 1024) @@ -151,7 +151,7 @@ class LogTest extends JUnitSuite { @Test def testLogRolls() { /* create a multipart log with 100 messages */ - val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + val log = new Log(logDir, 100, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) val numMessages = 100 val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes)) val offsets = messageSets.map(log.append(_)._1) @@ -173,7 +173,7 @@ class LogTest extends JUnitSuite { @Test def testCompressedMessages() { /* this log should roll after every messageset */ - val log = new Log(logDir, 10, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + val log = new Log(logDir, 10, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */ log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))) @@ -206,7 +206,7 @@ class LogTest extends JUnitSuite { @Test def testEdgeLogRollsStartingAtZero() { // first test a log segment starting at 0 - val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + val log = new Log(logDir, 100, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) val curOffset = log.logEndOffset assertEquals(curOffset, 0) @@ -221,7 +221,7 @@ class LogTest extends JUnitSuite { @Test def testEdgeLogRollsStartingAtNonZero() { // second test an empty log segment starting at non-zero - val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + val log = new Log(logDir, 100, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) val numMessages = 1 for(i <- 0 until numMessages) log.append(TestUtils.singleMessageSet(i.toString.getBytes)) @@ -269,7 +269,7 @@ class LogTest extends JUnitSuite { val messageSize = 100 val segmentSize = 7 * messageSize val indexInterval = 3 * messageSize - var log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096) + var log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096) for(i <- 0 until numMessages) log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize))) assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset) @@ -278,14 +278,14 @@ class LogTest extends JUnitSuite { log.close() // test non-recovery case - log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096) + log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096) assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset) assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset) assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries) log.close() // test - log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096) + log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096) assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset) assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset) assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries) @@ -300,7 +300,7 @@ class LogTest extends JUnitSuite { val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages // create a log - val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time) + val log = new Log(logDir, logFileSize, config.messageMaxBytes, 1000, 10000, needsRecovery = false, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) for (i<- 1 to msgPerSeg) @@ -349,7 +349,7 @@ class LogTest extends JUnitSuite { val setSize = set.sizeInBytes val msgPerSeg = 10 val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages - val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time) + val log = new Log(logDir, logFileSize, config.messageMaxBytes, 1000, 10000, needsRecovery = false, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) for (i<- 1 to msgPerSeg) log.append(set) @@ -373,7 +373,7 @@ class LogTest extends JUnitSuite { logDir.mkdir() var log = new Log(logDir, maxLogFileSize = 64*1024, - maxMessageSize = config.maxMessageSize, + maxMessageSize = config.messageMaxBytes, maxIndexSize = 1000, indexIntervalBytes = 10000, needsRecovery = true) @@ -403,7 +403,7 @@ class LogTest extends JUnitSuite { val set = TestUtils.singleMessageSet("test".getBytes()) val log = new Log(logDir, maxLogFileSize = set.sizeInBytes * 5, - maxMessageSize = config.maxMessageSize, + maxMessageSize = config.messageMaxBytes, maxIndexSize = 1000, indexIntervalBytes = 1, needsRecovery = false) @@ -425,7 +425,7 @@ class LogTest extends JUnitSuite { // create a log var log = new Log(logDir, maxLogFileSize = set.sizeInBytes * 5, - maxMessageSize = config.maxMessageSize, + maxMessageSize = config.messageMaxBytes, maxIndexSize = 1000, indexIntervalBytes = 10000, needsRecovery = true) @@ -436,7 +436,7 @@ class LogTest extends JUnitSuite { log.close() log = new Log(logDir, maxLogFileSize = set.sizeInBytes * 5, - maxMessageSize = config.maxMessageSize, + maxMessageSize = config.messageMaxBytes, maxIndexSize = 1000, indexIntervalBytes = 10000, needsRecovery = true) diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 3e46dd76898..beb63a4de95 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -63,8 +63,8 @@ class AsyncProducerTest extends JUnit3Suite { props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) props.put("producer.type", "async") - props.put("queue.size", "10") - props.put("batch.size", "1") + props.put("queue.buffering.max.messages", "10") + props.put("batch.num.messages", "1") val config = new ProducerConfig(props) val produceData = getProduceData(12) @@ -87,7 +87,7 @@ class AsyncProducerTest extends JUnit3Suite { props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) props.put("producer.type", "async") - props.put("batch.size", "1") + props.put("batch.num.messages", "1") val config = new ProducerConfig(props) val produceData = getProduceData(10) @@ -358,7 +358,7 @@ class AsyncProducerTest extends JUnit3Suite { val props = new Properties() props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("producer.type", "async") - props.put("batch.size", "5") + props.put("batch.num.messages", "5") props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) val config = new ProducerConfig(props) diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 48842eb1d5b..792919ba970 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -140,13 +140,13 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ props1.put("serializer.class", "kafka.serializer.StringEncoder") props1.put("partitioner.class", "kafka.utils.StaticPartitioner") props1.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) - props1.put("producer.request.required.acks", "2") - props1.put("producer.request.timeout.ms", "1000") + props1.put("request.required.acks", "2") + props1.put("request.timeout.ms", "1000") val props2 = new util.Properties() props2.putAll(props1) - props2.put("producer.request.required.acks", "3") - props2.put("producer.request.timeout.ms", "1000") + props2.put("request.required.acks", "3") + props2.put("request.timeout.ms", "1000") val producerConfig1 = new ProducerConfig(props1) val producerConfig2 = new ProducerConfig(props2) @@ -198,8 +198,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val props = new Properties() props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("partitioner.class", "kafka.utils.StaticPartitioner") - props.put("producer.request.timeout.ms", "2000") -// props.put("producer.request.required.acks", "-1") + props.put("request.timeout.ms", "2000") +// props.put("request.required.acks", "-1") props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) // create topic @@ -256,7 +256,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val props = new Properties() props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("partitioner.class", "kafka.utils.StaticPartitioner") - props.put("producer.request.timeout.ms", String.valueOf(timeoutMs)) + props.put("request.timeout.ms", String.valueOf(timeoutMs)) props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) val config = new ProducerConfig(props) @@ -300,7 +300,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ // make sure we don't wait fewer than numRetries*timeoutMs milliseconds // we do this because the DefaultEventHandler retries a number of times - assertTrue((t2-t1) >= timeoutMs*config.producerRetries) + assertTrue((t2-t1) >= timeoutMs*config.messageSendMaxRetries) } } diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index b289dda56de..89ba9444061 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -41,7 +41,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val props = new Properties() props.put("host", "localhost") props.put("port", server.socketServer.port.toString) - props.put("buffer.size", "102400") + props.put("send.buffer.bytes", "102400") props.put("connect.timeout.ms", "500") props.put("reconnect.interval", "1000") val producer = new SyncProducer(new SyncProducerConfig(props)) @@ -77,10 +77,9 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val props = new Properties() props.put("host", "localhost") props.put("port", server.socketServer.port.toString) - props.put("buffer.size", "102400") + props.put("send.buffer.bytes", "102400") props.put("connect.timeout.ms", "300") props.put("reconnect.interval", "500") - props.put("max.message.size", "100") val correlationId = 0 val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs @@ -98,12 +97,11 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val props = new Properties() props.put("host", "localhost") props.put("port", server.socketServer.port.toString) - props.put("max.message.size", 50000.toString) val producer = new SyncProducer(new SyncProducerConfig(props)) CreateTopicCommand.createTopic(zkClient, "test", 1, 1) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0, 500) - val message1 = new Message(new Array[Byte](configs(0).maxMessageSize + 1)) + val message1 = new Message(new Array[Byte](configs(0).messageMaxBytes + 1)) val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1) val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1)) @@ -111,7 +109,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { Assert.assertEquals(ErrorMapping.MessageSizeTooLargeCode, response1.status(TopicAndPartition("test", 0)).error) Assert.assertEquals(-1L, response1.status(TopicAndPartition("test", 0)).offset) - val safeSize = configs(0).maxMessageSize - Message.MessageOverhead - MessageSet.LogOverhead - 1 + val safeSize = configs(0).messageMaxBytes - Message.MessageOverhead - MessageSet.LogOverhead - 1 val message2 = new Message(new Array[Byte](safeSize)) val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2) val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2)) @@ -127,10 +125,9 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val props = new Properties() props.put("host", "localhost") props.put("port", server.socketServer.port.toString) - props.put("buffer.size", "102400") + props.put("send.buffer.bytes", "102400") props.put("connect.timeout.ms", "300") props.put("reconnect.interval", "500") - props.put("max.message.size", "100") val producer = new SyncProducer(new SyncProducerConfig(props)) val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) @@ -179,8 +176,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val props = new Properties() props.put("host", "localhost") props.put("port", server.socketServer.port.toString) - props.put("buffer.size", "102400") - props.put("producer.request.timeout.ms", String.valueOf(timeoutMs)) + props.put("send.buffer.bytes", "102400") + props.put("request.timeout.ms", String.valueOf(timeoutMs)) val producer = new SyncProducer(new SyncProducerConfig(props)) val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index 3dfb406b8f4..6184f42751f 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -29,8 +29,8 @@ class IsrExpirationTest extends JUnit3Suite { var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]() val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { - override val replicaMaxLagTimeMs = 100L - override val replicaMaxLagBytes = 10L + override val replicaLagTimeMaxMs = 100L + override val replicaLagMaxMessages = 10L }) val topic = "foo" @@ -45,7 +45,7 @@ class IsrExpirationTest extends JUnit3Suite { // let the follower catch up to 10 (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = 10) - var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes) + var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs, configs.head.replicaLagMaxMessages) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) // let some time pass @@ -53,7 +53,7 @@ class IsrExpirationTest extends JUnit3Suite { // now follower (broker id 1) has caught up to only 10, while the leader is at 15 AND the follower hasn't // pulled any data for > replicaMaxLagTimeMs ms. So it is stuck - partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes) + partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs, configs.head.replicaLagMaxMessages) assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId)) EasyMock.verify(log) } @@ -71,7 +71,7 @@ class IsrExpirationTest extends JUnit3Suite { // now follower (broker id 1) has caught up to only 4, while the leader is at 15. Since the gap it larger than // replicaMaxLagBytes, the follower is out of sync. - val partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes) + val partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs, configs.head.replicaLagMaxMessages) assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId)) EasyMock.verify(log) diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index a3afa2dd14f..cd724a337f8 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -28,9 +28,9 @@ import kafka.producer.{ProducerConfig, KeyedMessage, Producer} class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { - override val replicaMaxLagTimeMs = 5000L - override val replicaMaxLagBytes = 10L - override val replicaMinBytes = 20 + override val replicaLagTimeMaxMs = 5000L + override val replicaLagMaxMessages = 10L + override val replicaFetchMinBytes = 20 }) val topic = "new-topic" val partitionId = 0 @@ -50,7 +50,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000) producerProps.put("key.serializer.class", classOf[IntEncoder].getName.toString) - producerProps.put("producer.request.required.acks", "-1") + producerProps.put("request.required.acks", "-1") def testHWCheckpointNoFailuresSingleLogSegment { diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 0377e0805f9..1557047977f 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -33,8 +33,8 @@ import kafka.common.TopicAndPartition class SimpleFetchTest extends JUnit3Suite { val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { - override val replicaMaxLagTimeMs = 100L - override val replicaMaxLagBytes = 10L + override val replicaLagTimeMaxMs = 100L + override val replicaLagMaxMessages = 10L }) val topic = "foo" val partitionId = 0 diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index a508895cb45..9400328ea55 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -123,11 +123,11 @@ object TestUtils extends Logging { */ def createBrokerConfig(nodeId: Int, port: Int): Properties = { val props = new Properties - props.put("brokerid", nodeId.toString) - props.put("hostname", "localhost") + props.put("broker.id", nodeId.toString) + props.put("host.name", "localhost") props.put("port", port.toString) props.put("log.dir", TestUtils.tempDir().getAbsolutePath) - props.put("log.flush.interval", "1") + props.put("log.flush.interval.messages", "1") props.put("zk.connect", TestZKUtils.zookeeperConnect) props.put("replica.socket.timeout.ms", "1500") props @@ -140,13 +140,13 @@ object TestUtils extends Logging { consumerTimeout: Long = -1): Properties = { val props = new Properties props.put("zk.connect", zkConnect) - props.put("groupid", groupId) - props.put("consumerid", consumerId) + props.put("group.id", groupId) + props.put("consumer.id", consumerId) props.put("consumer.timeout.ms", consumerTimeout.toString) - props.put("zk.sessiontimeout.ms", "400") - props.put("zk.synctime.ms", "200") - props.put("autocommit.interval.ms", "1000") - props.put("rebalance.retries.max", "4") + props.put("zk.session.timeout.ms", "400") + props.put("zk.sync.time.ms", "200") + props.put("auto.commit.interval.ms", "1000") + props.put("rebalance.max.retries", "4") props } @@ -293,7 +293,7 @@ object TestUtils extends Logging { keyEncoder: Encoder[K] = new DefaultEncoder()): Producer[K, V] = { val props = new Properties() props.put("broker.list", brokerList) - props.put("buffer.size", "65536") + props.put("send.buffer.bytes", "65536") props.put("connect.timeout.ms", "100000") props.put("reconnect.interval", "10000") props.put("serializer.class", encoder.getClass.getCanonicalName) @@ -307,10 +307,10 @@ object TestUtils extends Logging { props.put("producer.type", "sync") props.put("broker.list", brokerList) props.put("partitioner.class", "kafka.utils.FixedValuePartitioner") - props.put("buffer.size", bufferSize.toString) + props.put("send.buffer.bytes", bufferSize.toString) props.put("connect.timeout.ms", connectTimeout.toString) props.put("reconnect.interval", reconnectInterval.toString) - props.put("producer.request.timeout.ms", 30000.toString) + props.put("request.timeout.ms", 30000.toString) props.put("serializer.class", classOf[StringEncoder].getName.toString) props } diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java index 2b875600ab0..3460d36adaa 100644 --- a/examples/src/main/java/kafka/examples/Consumer.java +++ b/examples/src/main/java/kafka/examples/Consumer.java @@ -44,10 +44,10 @@ public class Consumer extends Thread { Properties props = new Properties(); props.put("zk.connect", KafkaProperties.zkConnect); - props.put("groupid", KafkaProperties.groupId); - props.put("zk.sessiontimeout.ms", "400"); - props.put("zk.synctime.ms", "200"); - props.put("autocommit.interval.ms", "1000"); + props.put("group.id", KafkaProperties.groupId); + props.put("zk.session.timeout.ms", "400"); + props.put("zk.sync.time.ms", "200"); + props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala index a720ced44a3..ee2ce95766f 100644 --- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala @@ -74,7 +74,7 @@ object ConsumerPerformance { if(!config.showDetailedStats) { val totalMBRead = (totalBytesRead.get*1.0)/(1024*1024) println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), config.dateFormat.format(endMs), - config.consumerConfig.fetchSize, totalMBRead, totalMBRead/elapsedSecs, totalMessagesRead.get, + config.consumerConfig.fetchMessageMaxBytes, totalMBRead, totalMBRead/elapsedSecs, totalMessagesRead.get, totalMessagesRead.get/elapsedSecs)) } System.exit(0) @@ -124,10 +124,10 @@ object ConsumerPerformance { } val props = new Properties - props.put("groupid", options.valueOf(groupIdOpt)) - props.put("socket.buffer.size", options.valueOf(socketBufferSizeOpt).toString) - props.put("fetch.size", options.valueOf(fetchSizeOpt).toString) - props.put("autooffset.reset", if(options.has(resetBeginningOffsetOpt)) "largest" else "smallest") + props.put("group.id", options.valueOf(groupIdOpt)) + props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString) + props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString) + props.put("auto.offset.reset", if(options.has(resetBeginningOffsetOpt)) "largest" else "smallest") props.put("zk.connect", options.valueOf(zkConnectOpt)) props.put("consumer.timeout.ms", "5000") val consumerConfig = new ConsumerConfig(props) @@ -190,7 +190,7 @@ object ConsumerPerformance { val totalMBRead = (bytesRead*1.0)/(1024*1024) val mbRead = ((bytesRead - lastBytesRead)*1.0)/(1024*1024) println(("%s, %d, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(endMs), id, - config.consumerConfig.fetchSize, totalMBRead, + config.consumerConfig.fetchMessageMaxBytes, totalMBRead, 1000.0*(mbRead/elapsedMs), messagesRead, ((messagesRead - lastMessagesRead)/elapsedMs)*1000.0)) } diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index 0367af21c17..4822a7ecd60 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -191,17 +191,17 @@ object ProducerPerformance extends Logging { props.put("broker.list", config.brokerList) props.put("compression.codec", config.compressionCodec.codec.toString) props.put("reconnect.interval", Integer.MAX_VALUE.toString) - props.put("buffer.size", (64*1024).toString) + props.put("send.buffer.bytes", (64*1024).toString) if(!config.isSync) { props.put("producer.type","async") - props.put("batch.size", config.batchSize.toString) - props.put("queue.enqueueTimeout.ms", "-1") + props.put("batch.num.messages", config.batchSize.toString) + props.put("queue.enqueue.timeout.ms", "-1") } - props.put("clientid", "ProducerPerformance") - props.put("producer.request.required.acks", config.producerRequestRequiredAcks.toString) - props.put("producer.request.timeout.ms", config.producerRequestTimeoutMs.toString) - props.put("producer.num.retries", config.producerNumRetries.toString) - props.put("producer.retry.backoff.ms", config.producerRetryBackoffMs.toString) + props.put("client.id", "ProducerPerformance") + props.put("request.required.acks", config.producerRequestRequiredAcks.toString) + props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString) + props.put("message.send.max.retries", config.producerNumRetries.toString) + props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString) props.put("serializer.class", classOf[DefaultEncoder].getName.toString) props.put("key.serializer.class", classOf[NullEncoder[Long]].getName.toString) diff --git a/system_test/broker_failure/config/mirror_producer.properties b/system_test/broker_failure/config/mirror_producer.properties index 9ea68d059c4..7f80a1e1064 100644 --- a/system_test/broker_failure/config/mirror_producer.properties +++ b/system_test/broker_failure/config/mirror_producer.properties @@ -18,10 +18,10 @@ zk.connect=localhost:2182 # timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 producer.type=async # to avoid dropping events if the queue is full, wait indefinitely -queue.enqueueTimeout.ms=-1 +queue.enqueue.timeout.ms=-1 diff --git a/system_test/broker_failure/config/mirror_producer1.properties b/system_test/broker_failure/config/mirror_producer1.properties index 7f37db3db38..81dae76f86d 100644 --- a/system_test/broker_failure/config/mirror_producer1.properties +++ b/system_test/broker_failure/config/mirror_producer1.properties @@ -19,10 +19,10 @@ zk.connect=localhost:2182 # timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 producer.type=async # to avoid dropping events if the queue is full, wait indefinitely -queue.enqueueTimeout.ms=-1 +queue.enqueue.timeout.ms=-1 diff --git a/system_test/broker_failure/config/mirror_producer2.properties b/system_test/broker_failure/config/mirror_producer2.properties index 047f8404e1d..714b95df9b1 100644 --- a/system_test/broker_failure/config/mirror_producer2.properties +++ b/system_test/broker_failure/config/mirror_producer2.properties @@ -19,10 +19,10 @@ zk.connect=localhost:2182 # timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 producer.type=async # to avoid dropping events if the queue is full, wait indefinitely -queue.enqueueTimeout.ms=-1 +queue.enqueue.timeout.ms=-1 diff --git a/system_test/broker_failure/config/mirror_producer3.properties b/system_test/broker_failure/config/mirror_producer3.properties index 5e8b7dcf789..e8fa72db4d2 100644 --- a/system_test/broker_failure/config/mirror_producer3.properties +++ b/system_test/broker_failure/config/mirror_producer3.properties @@ -19,10 +19,10 @@ zk.connect=localhost:2182 # timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 producer.type=async # to avoid dropping events if the queue is full, wait indefinitely -queue.enqueueTimeout.ms=-1 +queue.enqueue.timeout.ms=-1 diff --git a/system_test/broker_failure/config/server_source1.properties b/system_test/broker_failure/config/server_source1.properties index 1a16c2cef0a..bbf288ebfec 100644 --- a/system_test/broker_failure/config/server_source1.properties +++ b/system_test/broker_failure/config/server_source1.properties @@ -15,12 +15,12 @@ # see kafka.server.KafkaConfig for additional details and defaults # the id of the broker -brokerid=1 +broker.id=1 # hostname of broker. If not set, will pick up from the value returned # from getLocalHost. If there are multiple interfaces getLocalHost # may not be what you want. -# hostname= +# host.name= # number of logical partitions on this broker num.partitions=1 @@ -35,13 +35,13 @@ num.threads=8 log.dir=/tmp/kafka-source1-logs # the send buffer used by the socket server -socket.send.buffer=1048576 +socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server -socket.receive.buffer=1048576 +socket.receive.buffer.bytes=1048576 # the maximum size of a log segment -log.file.size=10000000 +log.segment.bytes=10000000 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 @@ -50,7 +50,7 @@ log.cleanup.interval.mins=1 log.retention.hours=168 #the number of messages to accept without flushing the log to disk -log.flush.interval=600 +log.flush.interval.messages=600 #set the following properties to use zookeeper @@ -63,14 +63,14 @@ enable.zookeeper=true zk.connect=localhost:2181 # timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 # time based topic flush intervals in ms -#topic.flush.intervals.ms=topic:1000 +#log.flush.intervals.ms.per.topic=topic:1000 # default time based flush interval in ms -log.default.flush.interval.ms=1000 +log.flush.interval.ms=1000 # time based topic flasher time rate in ms -log.default.flush.scheduler.interval.ms=1000 +log.flush.scheduler.interval.ms=1000 diff --git a/system_test/broker_failure/config/server_source2.properties b/system_test/broker_failure/config/server_source2.properties index 032bbcc8a9f..570bafc6b92 100644 --- a/system_test/broker_failure/config/server_source2.properties +++ b/system_test/broker_failure/config/server_source2.properties @@ -15,12 +15,12 @@ # see kafka.server.KafkaConfig for additional details and defaults # the id of the broker -brokerid=2 +broker.id=2 # hostname of broker. If not set, will pick up from the value returned # from getLocalHost. If there are multiple interfaces getLocalHost # may not be what you want. -# hostname= +# host.name= # number of logical partitions on this broker num.partitions=1 @@ -35,13 +35,13 @@ num.threads=8 log.dir=/tmp/kafka-source2-logs # the send buffer used by the socket server -socket.send.buffer=1048576 +socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server -socket.receive.buffer=1048576 +socket.receive.buffer.bytes=1048576 # the maximum size of a log segment -log.file.size=10000000 +log.segment.bytes=10000000 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 @@ -50,7 +50,7 @@ log.cleanup.interval.mins=1 log.retention.hours=168 #the number of messages to accept without flushing the log to disk -log.flush.interval=600 +log.flush.interval.messages=600 #set the following properties to use zookeeper @@ -63,14 +63,14 @@ enable.zookeeper=true zk.connect=localhost:2181 # timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 # time based topic flush intervals in ms -#topic.flush.intervals.ms=topic:1000 +#log.flush.intervals.ms.per.topic=topic:1000 # default time based flush interval in ms -log.default.flush.interval.ms=1000 +log.flush.interval.ms=1000 # time based topic flasher time rate in ms -log.default.flush.scheduler.interval.ms=1000 +log.flush.scheduler.interval.ms=1000 diff --git a/system_test/broker_failure/config/server_source3.properties b/system_test/broker_failure/config/server_source3.properties index 05b3a97c7b2..df8ff6a6357 100644 --- a/system_test/broker_failure/config/server_source3.properties +++ b/system_test/broker_failure/config/server_source3.properties @@ -15,12 +15,12 @@ # see kafka.server.KafkaConfig for additional details and defaults # the id of the broker -brokerid=3 +broker.id=3 # hostname of broker. If not set, will pick up from the value returned # from getLocalHost. If there are multiple interfaces getLocalHost # may not be what you want. -# hostname= +# host.name= # number of logical partitions on this broker num.partitions=1 @@ -35,13 +35,13 @@ num.threads=8 log.dir=/tmp/kafka-source3-logs # the send buffer used by the socket server -socket.send.buffer=1048576 +socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server -socket.receive.buffer=1048576 +socket.receive.buffer.bytes=1048576 # the maximum size of a log segment -log.file.size=10000000 +log.segment.size=10000000 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 @@ -50,7 +50,7 @@ log.cleanup.interval.mins=1 log.retention.hours=168 #the number of messages to accept without flushing the log to disk -log.flush.interval=600 +log.flush.interval.messages=600 #set the following properties to use zookeeper @@ -63,14 +63,14 @@ enable.zookeeper=true zk.connect=localhost:2181 # timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 # time based topic flush intervals in ms -#topic.flush.intervals.ms=topic:1000 +#log.flush.intervals.ms.per.topic=topic:1000 # default time based flush interval in ms -log.default.flush.interval.ms=1000 +log.flush.interval.ms=1000 # time based topic flasher time rate in ms -log.default.flush.scheduler.interval.ms=1000 +log.flush.scheduler.interval.ms=1000 diff --git a/system_test/broker_failure/config/server_source4.properties b/system_test/broker_failure/config/server_source4.properties index c94204d1317..ee9c7fd3bee 100644 --- a/system_test/broker_failure/config/server_source4.properties +++ b/system_test/broker_failure/config/server_source4.properties @@ -15,12 +15,12 @@ # see kafka.server.KafkaConfig for additional details and defaults # the id of the broker -brokerid=4 +broker.id=4 # hostname of broker. If not set, will pick up from the value returned # from getLocalHost. If there are multiple interfaces getLocalHost # may not be what you want. -# hostname= +# host.name= # number of logical partitions on this broker num.partitions=1 @@ -35,13 +35,13 @@ num.threads=8 log.dir=/tmp/kafka-source4-logs # the send buffer used by the socket server -socket.send.buffer=1048576 +socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server -socket.receive.buffer=1048576 +socket.receive.buffer.bytes=1048576 # the maximum size of a log segment -log.file.size=10000000 +log.segment.bytes=10000000 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 @@ -50,7 +50,7 @@ log.cleanup.interval.mins=1 log.retention.hours=168 #the number of messages to accept without flushing the log to disk -log.flush.interval=600 +log.flush.interval.messages=600 #set the following properties to use zookeeper @@ -63,14 +63,14 @@ enable.zookeeper=true zk.connect=localhost:2181 # timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 # time based topic flush intervals in ms -#topic.flush.intervals.ms=topic:1000 +#log.flush.intervals.ms.per.topic=topic:1000 # default time based flush interval in ms -log.default.flush.interval.ms=1000 +log.flush.interval.ms=1000 # time based topic flasher time rate in ms -log.default.flush.scheduler.interval.ms=1000 +log.flush.scheduler.interval.ms=1000 diff --git a/system_test/broker_failure/config/server_target1.properties b/system_test/broker_failure/config/server_target1.properties index e9cc0384fba..7f776bd3c2b 100644 --- a/system_test/broker_failure/config/server_target1.properties +++ b/system_test/broker_failure/config/server_target1.properties @@ -15,12 +15,12 @@ # see kafka.server.KafkaConfig for additional details and defaults # the id of the broker -brokerid=1 +broker.id=1 # hostname of broker. If not set, will pick up from the value returned # from getLocalHost. If there are multiple interfaces getLocalHost # may not be what you want. -# hostname= +# host.name= # number of logical partitions on this broker num.partitions=1 @@ -35,13 +35,13 @@ num.threads=8 log.dir=/tmp/kafka-target1-logs # the send buffer used by the socket server -socket.send.buffer=1048576 +socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server -socket.receive.buffer=1048576 +socket.receive.buffer.bytes=1048576 # the maximum size of a log segment -log.file.size=10000000 +log.segment.bytes=10000000 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 @@ -50,7 +50,7 @@ log.cleanup.interval.mins=1 log.retention.hours=168 #the number of messages to accept without flushing the log to disk -log.flush.interval=600 +log.flush.interval.messages=600 #set the following properties to use zookeeper @@ -63,16 +63,16 @@ enable.zookeeper=true zk.connect=localhost:2182 # timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 # time based topic flush intervals in ms -#topic.flush.intervals.ms=topic:1000 +#log.flush.intervals.ms.per.topic=topic:1000 # default time based flush interval in ms -log.default.flush.interval.ms=1000 +log.flush.interval.ms=1000 # time based topic flasher time rate in ms -log.default.flush.scheduler.interval.ms=1000 +log.flush.scheduler.interval.ms=1000 # topic partition count map # topic.partition.count.map=topic1:3, topic2:4 diff --git a/system_test/broker_failure/config/server_target2.properties b/system_test/broker_failure/config/server_target2.properties index 6007fa88011..6d997dcbdaa 100644 --- a/system_test/broker_failure/config/server_target2.properties +++ b/system_test/broker_failure/config/server_target2.properties @@ -15,12 +15,12 @@ # see kafka.server.KafkaConfig for additional details and defaults # the id of the broker -brokerid=2 +broker.id=2 # hostname of broker. If not set, will pick up from the value returned # from getLocalHost. If there are multiple interfaces getLocalHost # may not be what you want. -# hostname= +# host.name= # number of logical partitions on this broker num.partitions=1 @@ -35,13 +35,13 @@ num.threads=8 log.dir=/tmp/kafka-target2-logs # the send buffer used by the socket server -socket.send.buffer=1048576 +socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server -socket.receive.buffer=1048576 +socket.receive.buffer.bytes=1048576 # the maximum size of a log segment -log.file.size=10000000 +log.segment.bytes=10000000 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 @@ -50,7 +50,7 @@ log.cleanup.interval.mins=1 log.retention.hours=168 #the number of messages to accept without flushing the log to disk -log.flush.interval=600 +log.flush.interval.messages=600 #set the following properties to use zookeeper @@ -63,16 +63,16 @@ enable.zookeeper=true zk.connect=localhost:2182 # timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 # time based topic flush intervals in ms -#topic.flush.intervals.ms=topic:1000 +#log.flush.intervals.ms.per.topic=topic:1000 # default time based flush interval in ms -log.default.flush.interval.ms=1000 +log.flush.interval.ms=1000 # time based topic flasher time rate in ms -log.default.flush.scheduler.interval.ms=1000 +log.flush.scheduler.interval.ms=1000 # topic partition count map # topic.partition.count.map=topic1:3, topic2:4 diff --git a/system_test/broker_failure/config/server_target3.properties b/system_test/broker_failure/config/server_target3.properties index 9ac0b06603e..0d3a9aead1a 100644 --- a/system_test/broker_failure/config/server_target3.properties +++ b/system_test/broker_failure/config/server_target3.properties @@ -15,12 +15,12 @@ # see kafka.server.KafkaConfig for additional details and defaults # the id of the broker -brokerid=3 +broker.id=3 # hostname of broker. If not set, will pick up from the value returned # from getLocalHost. If there are multiple interfaces getLocalHost # may not be what you want. -# hostname= +# host.name= # number of logical partitions on this broker num.partitions=1 @@ -35,13 +35,13 @@ num.threads=8 log.dir=/tmp/kafka-target3-logs # the send buffer used by the socket server -socket.send.buffer=1048576 +socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server -socket.receive.buffer=1048576 +socket.receive.buffer.bytes=1048576 # the maximum size of a log segment -log.file.size=10000000 +log.segment.bytes=10000000 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 @@ -50,7 +50,7 @@ log.cleanup.interval.mins=1 log.retention.hours=168 #the number of messages to accept without flushing the log to disk -log.flush.interval=600 +log.flush.interval.messages=600 #set the following properties to use zookeeper @@ -63,16 +63,16 @@ enable.zookeeper=true zk.connect=localhost:2182 # timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 # time based topic flush intervals in ms -#topic.flush.intervals.ms=topic:1000 +#log.flush.intervals.ms.per.topic=topic:1000 # default time based flush interval in ms -log.default.flush.interval.ms=1000 +log.flush.interval.ms=1000 # time based topic flasher time rate in ms -log.default.flush.scheduler.interval.ms=1000 +log.flush.scheduler.interval.ms=1000 # topic partition count map # topic.partition.count.map=topic1:3, topic2:4 diff --git a/system_test/broker_failure/config/whitelisttest.consumer.properties b/system_test/broker_failure/config/whitelisttest.consumer.properties index aaa3f7c07f1..dd91bd3049c 100644 --- a/system_test/broker_failure/config/whitelisttest.consumer.properties +++ b/system_test/broker_failure/config/whitelisttest.consumer.properties @@ -20,10 +20,10 @@ zk.connect=localhost:2181 # timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 #consumer group id -groupid=group1 +group.id=group1 mirror.topics.whitelist=test_1,test_2 -autooffset.reset=smallest +auto.offset.reset=smallest diff --git a/system_test/common/util.sh b/system_test/common/util.sh index d3ee607433c..e3d10c68061 100644 --- a/system_test/common/util.sh +++ b/system_test/common/util.sh @@ -72,7 +72,7 @@ kill_child_processes() { # from the settings in config/server.properties while the brokerid and # server port will be incremented accordingly # 3. to generate properties files with non-default values such as -# "socket.send.buffer=2097152", simply add the property with new value +# "socket.send.buffer.bytes=2097152", simply add the property with new value # to the array variable kafka_properties_to_replace as shown below # ========================================================================= generate_kafka_properties_files() { @@ -103,10 +103,10 @@ generate_kafka_properties_files() { # values. Other kafka properties can be added # in a similar fashion. # ============================================= - # kafka_properties_to_replace[1]="socket.send.buffer=2097152" - # kafka_properties_to_replace[2]="socket.receive.buffer=2097152" + # kafka_properties_to_replace[1]="socket.send.buffer.bytes=2097152" + # kafka_properties_to_replace[2]="socket.receive.buffer.bytes=2097152" # kafka_properties_to_replace[3]="num.partitions=3" - # kafka_properties_to_replace[4]="max.socket.request.bytes=10485760" + # kafka_properties_to_replace[4]="socket.request.max.bytes=10485760" server_properties=`cat ${this_config_dir}/server.properties` diff --git a/system_test/migration_tool_testsuite/config/migration_producer.properties b/system_test/migration_tool_testsuite/config/migration_producer.properties index a923ee3e6a0..af080ae7ccd 100644 --- a/system_test/migration_tool_testsuite/config/migration_producer.properties +++ b/system_test/migration_tool_testsuite/config/migration_producer.properties @@ -26,10 +26,10 @@ broker.list=localhost:9094,localhost:9095,localhost:9096 #zk.connect= # zookeeper session timeout; default is 6000 -#zk.sessiontimeout.ms= +#zk.session.timeout.ms= # the max time that the client waits to establish a connection to zookeeper; default is 6000 -#zk.connectiontimeout.ms +#zk.connection.timeout.ms # name of the partitioner class for partitioning events; default partition spreads data randomly #partitioner.class= @@ -46,35 +46,18 @@ serializer.class=kafka.serializer.DefaultEncoder # allow topic level compression #compressed.topics= -# max message size; messages larger than that size are discarded; default is 1000000 -#max.message.size= - - ############################# Async Producer ############################# # maximum time, in milliseconds, for buffering data on the producer queue -#queue.time= +#queue.buffering.max.ms= # the maximum size of the blocking queue for buffering on the producer -#queue.size= +#queue.buffering.max.messages= # Timeout for event enqueue: # 0: events will be enqueued immediately or dropped if the queue is full # -ve: enqueue will block indefinitely if the queue is full # +ve: enqueue will block up to this many milliseconds if the queue is full -#queue.enqueueTimeout.ms= +#queue.enqueue.timeout.ms= # the number of messages batched at the producer -#batch.size= - -# the callback handler for one or multiple events -#callback.handler= - -# properties required to initialize the callback handler -#callback.handler.props= - -# the handler for events -#event.handler= - -# properties required to initialize the event handler -#event.handler.props= - +#batch.num.messages= diff --git a/system_test/migration_tool_testsuite/config/server.properties b/system_test/migration_tool_testsuite/config/server.properties index 6976869d08a..d231d4c89be 100644 --- a/system_test/migration_tool_testsuite/config/server.properties +++ b/system_test/migration_tool_testsuite/config/server.properties @@ -17,12 +17,12 @@ ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. -brokerid=0 +broker.id=0 # Hostname the broker will advertise to consumers. If not set, kafka will use the value returned # from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost # may not be what you want. -#hostname= +#host.name= ############################# Socket Server Settings ############################# @@ -31,19 +31,19 @@ brokerid=0 port=9091 # The number of threads handling network requests -network.threads=2 +num.network.threads=2 # The number of threads doing disk I/O -io.threads=2 +num.io.threads=2 # The send buffer (SO_SNDBUF) used by the socket server -socket.send.buffer=1048576 +socket.send.buffer.bytes=1048576 # The receive buffer (SO_RCVBUF) used by the socket server -socket.receive.buffer=1048576 +socket.receive.buffer.bytes=1048576 # The maximum size of a request that the socket server will accept (protection against OOM) -max.socket.request.bytes=104857600 +socket.request.max.bytes=104857600 ############################# Log Basics ############################# @@ -70,16 +70,16 @@ num.partitions=5 # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk -log.flush.interval=10000 +log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush -log.default.flush.interval.ms=1000 +log.flush.interval.ms=1000 -# Per-topic overrides for log.default.flush.interval.ms -#topic.flush.intervals.ms=topic1:1000, topic2:3000 +# Per-topic overrides for log.flush.interval.ms +#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000 # The interval (in ms) at which logs are checked to see if they need to be flushed to disk. -log.default.flush.scheduler.interval.ms=1000 +log.flush.scheduler.interval.ms=1000 ############################# Log Retention Policy ############################# @@ -92,13 +92,13 @@ log.default.flush.scheduler.interval.ms=1000 log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining -# segments don't drop below log.retention.size. -#log.retention.size=1073741824 +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. -#log.file.size=536870912 -#log.file.size=102400 -log.file.size=128 +#log.segment.bytes=536870912 +#log.segment.bytes=102400 +log.segment.bytes=128 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies @@ -117,6 +117,6 @@ enable.zookeeper=true zk.connect=localhost:2181 # Timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 monitoring.period.secs=1 diff --git a/system_test/mirror_maker/config/blacklisttest.consumer.properties b/system_test/mirror_maker/config/blacklisttest.consumer.properties index 6ea85ecb9a1..ff1201582ff 100644 --- a/system_test/mirror_maker/config/blacklisttest.consumer.properties +++ b/system_test/mirror_maker/config/blacklisttest.consumer.properties @@ -20,9 +20,9 @@ zk.connect=localhost:2181 # timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 #consumer group id -groupid=group1 -shallowiterator.enable=true +group.id=group1 +shallow.iterator.enable=true diff --git a/system_test/mirror_maker/config/mirror_producer.properties b/system_test/mirror_maker/config/mirror_producer.properties index b74c631d1d2..aa8be6504a7 100644 --- a/system_test/mirror_maker/config/mirror_producer.properties +++ b/system_test/mirror_maker/config/mirror_producer.properties @@ -19,12 +19,12 @@ zk.connect=localhost:2183 # broker.list=1:localhost:9094,2:localhost:9095 # timeout in ms for connecting to zookeeper -# zk.connectiontimeout.ms=1000000 +# zk.connection.timeout.ms=1000000 producer.type=async # to avoid dropping events if the queue is full, wait indefinitely -queue.enqueueTimeout.ms=-1 +queue.enqueue.timeout.ms=-1 num.producers.per.broker=2 diff --git a/system_test/mirror_maker/config/server_source_1_1.properties b/system_test/mirror_maker/config/server_source_1_1.properties index d89c4fbe0e8..2f070a74e8b 100644 --- a/system_test/mirror_maker/config/server_source_1_1.properties +++ b/system_test/mirror_maker/config/server_source_1_1.properties @@ -15,12 +15,12 @@ # see kafka.server.KafkaConfig for additional details and defaults # the id of the broker -brokerid=1 +broker.id=1 # hostname of broker. If not set, will pick up from the value returned # from getLocalHost. If there are multiple interfaces getLocalHost # may not be what you want. -# hostname= +# host.name= # number of logical partitions on this broker num.partitions=1 @@ -35,13 +35,13 @@ num.threads=8 log.dir=/tmp/kafka-source-1-1-logs # the send buffer used by the socket server -socket.send.buffer=1048576 +socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server -socket.receive.buffer=1048576 +socket.receive.buffer.bytes=1048576 # the maximum size of a log segment -log.file.size=10000000 +log.segment.bytes=10000000 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 @@ -50,7 +50,7 @@ log.cleanup.interval.mins=1 log.retention.hours=168 #the number of messages to accept without flushing the log to disk -log.flush.interval=600 +log.flush.interval.messages=600 #set the following properties to use zookeeper @@ -63,14 +63,14 @@ enable.zookeeper=true zk.connect=localhost:2181 # timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 # time based topic flush intervals in ms -#topic.flush.intervals.ms=topic:1000 +#log.flush.intervals.ms.per.topic=topic:1000 # default time based flush interval in ms -log.default.flush.interval.ms=1000 +log.flush.interval.ms=1000 # time based topic flasher time rate in ms -log.default.flush.scheduler.interval.ms=1000 +log.flush.scheduler.interval.ms=1000 diff --git a/system_test/mirror_maker/config/server_source_1_2.properties b/system_test/mirror_maker/config/server_source_1_2.properties index 063d68b0113..f9353e8934d 100644 --- a/system_test/mirror_maker/config/server_source_1_2.properties +++ b/system_test/mirror_maker/config/server_source_1_2.properties @@ -15,12 +15,12 @@ # see kafka.server.KafkaConfig for additional details and defaults # the id of the broker -brokerid=2 +broker.id=2 # hostname of broker. If not set, will pick up from the value returned # from getLocalHost. If there are multiple interfaces getLocalHost # may not be what you want. -# hostname= +# host.name= # number of logical partitions on this broker num.partitions=1 @@ -35,13 +35,13 @@ num.threads=8 log.dir=/tmp/kafka-source-1-2-logs # the send buffer used by the socket server -socket.send.buffer=1048576 +socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server -socket.receive.buffer=1048576 +socket.receive.buffer.bytes=1048576 # the maximum size of a log segment -log.file.size=536870912 +log.segment.bytes=536870912 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 @@ -50,7 +50,7 @@ log.cleanup.interval.mins=1 log.retention.hours=168 #the number of messages to accept without flushing the log to disk -log.flush.interval=600 +log.flush.interval.messages=600 #set the following properties to use zookeeper @@ -63,14 +63,14 @@ enable.zookeeper=true zk.connect=localhost:2181 # timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 # time based topic flush intervals in ms -#topic.flush.intervals.ms=topic:1000 +#log.flush.intervals.ms.per.topic=topic:1000 # default time based flush interval in ms -log.default.flush.interval.ms=1000 +log.flush.interval.ms=1000 # time based topic flasher time rate in ms -log.default.flush.scheduler.interval.ms=1000 +log.flush.scheduler.interval.ms=1000 diff --git a/system_test/mirror_maker/config/server_source_2_1.properties b/system_test/mirror_maker/config/server_source_2_1.properties index 998b460b944..daa01ad463e 100644 --- a/system_test/mirror_maker/config/server_source_2_1.properties +++ b/system_test/mirror_maker/config/server_source_2_1.properties @@ -15,12 +15,12 @@ # see kafka.server.KafkaConfig for additional details and defaults # the id of the broker -brokerid=1 +broker.id=1 # hostname of broker. If not set, will pick up from the value returned # from getLocalHost. If there are multiple interfaces getLocalHost # may not be what you want. -# hostname= +# host.name= # number of logical partitions on this broker num.partitions=1 @@ -35,13 +35,13 @@ num.threads=8 log.dir=/tmp/kafka-source-2-1-logs # the send buffer used by the socket server -socket.send.buffer=1048576 +socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server -socket.receive.buffer=1048576 +socket.receive.buffer.bytes=1048576 # the maximum size of a log segment -log.file.size=536870912 +log.segment.bytes=536870912 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 @@ -50,7 +50,7 @@ log.cleanup.interval.mins=1 log.retention.hours=168 #the number of messages to accept without flushing the log to disk -log.flush.interval=600 +log.flush.interval.messages=600 #set the following properties to use zookeeper @@ -63,14 +63,14 @@ enable.zookeeper=true zk.connect=localhost:2182 # timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 # time based topic flush intervals in ms -#topic.flush.intervals.ms=topic:1000 +#log.flush.intervals.ms.per.topic=topic:1000 # default time based flush interval in ms -log.default.flush.interval.ms=1000 +log.flush.interval.ms=1000 # time based topic flasher time rate in ms -log.default.flush.scheduler.interval.ms=1000 +log.flush.scheduler.interval.ms=1000 diff --git a/system_test/mirror_maker/config/server_source_2_2.properties b/system_test/mirror_maker/config/server_source_2_2.properties index 81427aee926..be6fdfcc11d 100644 --- a/system_test/mirror_maker/config/server_source_2_2.properties +++ b/system_test/mirror_maker/config/server_source_2_2.properties @@ -15,12 +15,12 @@ # see kafka.server.KafkaConfig for additional details and defaults # the id of the broker -brokerid=2 +broker.id=2 # hostname of broker. If not set, will pick up from the value returned # from getLocalHost. If there are multiple interfaces getLocalHost # may not be what you want. -# hostname= +# host.name= # number of logical partitions on this broker num.partitions=1 @@ -35,13 +35,13 @@ num.threads=8 log.dir=/tmp/kafka-source-2-2-logs # the send buffer used by the socket server -socket.send.buffer=1048576 +socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server -socket.receive.buffer=1048576 +socket.receive.buffer.bytes=1048576 # the maximum size of a log segment -log.file.size=536870912 +log.segment.bytes=536870912 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 @@ -50,7 +50,7 @@ log.cleanup.interval.mins=1 log.retention.hours=168 #the number of messages to accept without flushing the log to disk -log.flush.interval=600 +log.flush.interval.messages=600 #set the following properties to use zookeeper @@ -63,14 +63,14 @@ enable.zookeeper=true zk.connect=localhost:2182 # timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 # time based topic flush intervals in ms -#topic.flush.intervals.ms=topic:1000 +#log.flush.intervals.ms.per.topic=topic:1000 # default time based flush interval in ms -log.default.flush.interval.ms=1000 +log.flush.interval.ms=1000 # time based topic flasher time rate in ms -log.default.flush.scheduler.interval.ms=1000 +log.flush.scheduler.interval.ms=1000 diff --git a/system_test/mirror_maker/config/server_target_1_1.properties b/system_test/mirror_maker/config/server_target_1_1.properties index 0265f4efc40..d37955a66dc 100644 --- a/system_test/mirror_maker/config/server_target_1_1.properties +++ b/system_test/mirror_maker/config/server_target_1_1.properties @@ -15,12 +15,12 @@ # see kafka.server.KafkaConfig for additional details and defaults # the id of the broker -brokerid=1 +broker.id=1 # hostname of broker. If not set, will pick up from the value returned # from getLocalHost. If there are multiple interfaces getLocalHost # may not be what you want. -# hostname= +# host.name= # number of logical partitions on this broker num.partitions=1 @@ -35,13 +35,13 @@ num.threads=8 log.dir=/tmp/kafka-target-1-1-logs # the send buffer used by the socket server -socket.send.buffer=1048576 +socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server -socket.receive.buffer=1048576 +socket.receive.buffer.bytes=1048576 # the maximum size of a log segment -log.file.size=536870912 +log.segment.bytes=536870912 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 @@ -50,7 +50,7 @@ log.cleanup.interval.mins=1 log.retention.hours=168 #the number of messages to accept without flushing the log to disk -log.flush.interval=600 +log.flush.interval.messages=600 #set the following properties to use zookeeper @@ -63,16 +63,16 @@ enable.zookeeper=true zk.connect=localhost:2183 # timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 # time based topic flush intervals in ms -#topic.flush.intervals.ms=topic:1000 +#log.flush.intervals.ms.per.topic=topic:1000 # default time based flush interval in ms -log.default.flush.interval.ms=1000 +log.flush.interval.ms=1000 # time based topic flasher time rate in ms -log.default.flush.scheduler.interval.ms=1000 +log.flush.scheduler.interval.ms=1000 # topic partition count map # topic.partition.count.map=topic1:3, topic2:4 diff --git a/system_test/mirror_maker/config/server_target_1_2.properties b/system_test/mirror_maker/config/server_target_1_2.properties index a31e9ca1762..aa7546cfc8f 100644 --- a/system_test/mirror_maker/config/server_target_1_2.properties +++ b/system_test/mirror_maker/config/server_target_1_2.properties @@ -15,12 +15,12 @@ # see kafka.server.KafkaConfig for additional details and defaults # the id of the broker -brokerid=2 +broker.id=2 # hostname of broker. If not set, will pick up from the value returned # from getLocalHost. If there are multiple interfaces getLocalHost # may not be what you want. -# hostname= +# host.name= # number of logical partitions on this broker num.partitions=1 @@ -35,13 +35,13 @@ num.threads=8 log.dir=/tmp/kafka-target-1-2-logs # the send buffer used by the socket server -socket.send.buffer=1048576 +socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server -socket.receive.buffer=1048576 +socket.receive.buffer.bytes=1048576 # the maximum size of a log segment -log.file.size=536870912 +log.segment.bytes=536870912 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 @@ -50,7 +50,7 @@ log.cleanup.interval.mins=1 log.retention.hours=168 #the number of messages to accept without flushing the log to disk -log.flush.interval=600 +log.flush.interval.messages=600 #set the following properties to use zookeeper @@ -63,16 +63,16 @@ enable.zookeeper=true zk.connect=localhost:2183 # timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 # time based topic flush intervals in ms -#topic.flush.intervals.ms=topic:1000 +#log.flush.intervals.ms.per.topic=topic:1000 # default time based flush interval in ms -log.default.flush.interval.ms=1000 +log.flush.interval.ms=1000 # time based topic flasher time rate in ms -log.default.flush.scheduler.interval.ms=1000 +log.flush.scheduler.interval.ms=1000 # topic partition count map # topic.partition.count.map=topic1:3, topic2:4 diff --git a/system_test/mirror_maker/config/whitelisttest_1.consumer.properties b/system_test/mirror_maker/config/whitelisttest_1.consumer.properties index 6ea85ecb9a1..ff1201582ff 100644 --- a/system_test/mirror_maker/config/whitelisttest_1.consumer.properties +++ b/system_test/mirror_maker/config/whitelisttest_1.consumer.properties @@ -20,9 +20,9 @@ zk.connect=localhost:2181 # timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 #consumer group id -groupid=group1 -shallowiterator.enable=true +group.id=group1 +shallow.iterator.enable=true diff --git a/system_test/mirror_maker/config/whitelisttest_2.consumer.properties b/system_test/mirror_maker/config/whitelisttest_2.consumer.properties index e11112fdc00..f1a902b100a 100644 --- a/system_test/mirror_maker/config/whitelisttest_2.consumer.properties +++ b/system_test/mirror_maker/config/whitelisttest_2.consumer.properties @@ -20,9 +20,9 @@ zk.connect=localhost:2182 # timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 #consumer group id -groupid=group1 -shallowiterator.enable=true +group.id=group1 +shallow.iterator.enable=true diff --git a/system_test/mirror_maker_testsuite/config/mirror_consumer.properties b/system_test/mirror_maker_testsuite/config/mirror_consumer.properties index 651797670c0..ea415e6712f 100644 --- a/system_test/mirror_maker_testsuite/config/mirror_consumer.properties +++ b/system_test/mirror_maker_testsuite/config/mirror_consumer.properties @@ -1,12 +1,12 @@ zk.connect=localhost:2108 -zk.connectiontimeout.ms=1000000 -groupid=mm_regtest_grp -autocommit.interval.ms=120000 -autooffset.reset=smallest -#fetch.size=1048576 -#rebalance.retries.max=4 +zk.connection.timeout.ms=1000000 +group.id=mm_regtest_grp +auto.commit.interval.ms=120000 +auto.offset.reset=smallest +#fetch.message.max.bytes=1048576 +#rebalance.max.retries=4 #rebalance.backoff.ms=2000 -socket.buffersize=1048576 -fetch.size=1048576 -zk.synctime.ms=15000 -shallowiterator.enable=true +socket.receive.buffer.bytes=1048576 +fetch.message.max.bytes=1048576 +zk.sync.time.ms=15000 +shallow.iterator.enable=true diff --git a/system_test/mirror_maker_testsuite/config/mirror_producer.properties b/system_test/mirror_maker_testsuite/config/mirror_producer.properties index 3bb5a7ba60c..7db5bfcad39 100644 --- a/system_test/mirror_maker_testsuite/config/mirror_producer.properties +++ b/system_test/mirror_maker_testsuite/config/mirror_producer.properties @@ -1,5 +1,5 @@ producer.type=async -queue.enqueueTimeout.ms=-1 +queue.enqueue.timeout.ms=-1 broker.list=localhost:9094 compression.codec=0 diff --git a/system_test/mirror_maker_testsuite/config/server.properties b/system_test/mirror_maker_testsuite/config/server.properties index 8ef65ba133b..dacf158431c 100644 --- a/system_test/mirror_maker_testsuite/config/server.properties +++ b/system_test/mirror_maker_testsuite/config/server.properties @@ -17,12 +17,12 @@ ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. -brokerid=0 +broker.id=0 # Hostname the broker will advertise to consumers. If not set, kafka will use the value returned # from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost # may not be what you want. -#hostname= +#host.name= ############################# Socket Server Settings ############################# @@ -31,19 +31,19 @@ brokerid=0 port=9091 # The number of threads handling network requests -network.threads=2 +num.network.threads=2 # The number of threads doing disk I/O -io.threads=2 +num.io.threads=2 # The send buffer (SO_SNDBUF) used by the socket server -socket.send.buffer=1048576 +socket.send.buffer.bytes=1048576 # The receive buffer (SO_RCVBUF) used by the socket server -socket.receive.buffer=1048576 +socket.receive.buffer.bytes=1048576 # The maximum size of a request that the socket server will accept (protection against OOM) -max.socket.request.bytes=104857600 +socket.request.max.bytes=104857600 ############################# Log Basics ############################# @@ -70,16 +70,16 @@ num.partitions=5 # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk -log.flush.interval=10000 +log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush -log.default.flush.interval.ms=1000 +log.flush.interval.ms=1000 -# Per-topic overrides for log.default.flush.interval.ms -#topic.flush.intervals.ms=topic1:1000, topic2:3000 +# Per-topic overrides for log.flush.interval.ms +#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000 # The interval (in ms) at which logs are checked to see if they need to be flushed to disk. -log.default.flush.scheduler.interval.ms=1000 +log.flush.scheduler.interval.ms=1000 ############################# Log Retention Policy ############################# @@ -92,13 +92,13 @@ log.default.flush.scheduler.interval.ms=1000 log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining -# segments don't drop below log.retention.size. -#log.retention.size=1073741824 -log.retention.size=-1 +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 +log.retention.bytes=-1 # The maximum size of a log segment file. When this size is reached a new log segment will be created. -#log.file.size=536870912 -log.file.size=102400 +#log.segment.size=536870912 +log.segment.bytes=102400 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies @@ -117,23 +117,23 @@ enable.zookeeper=true zk.connect=localhost:2181 # Timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 monitoring.period.secs=1 -max.message.size=1000000 -max.queued.requests=500 +message.max.bytes=1000000 +queued.max.requests=500 log.roll.hours=168 -log.index.max.size=10485760 +log.index.size.max.bytes=10485760 log.index.interval.bytes=4096 -auto.create.topics=true +auto.create.topics.enable=true controller.socket.timeout.ms=30000 controller.message.queue.size=10 default.replication.factor=1 -replica.max.lag.time.ms=10000 -replica.max.lag.bytes=4000 +replica.lag.time.max.ms=10000 +replica.lag.max.messages=4000 replica.socket.timeout.ms=30000 -replica.socket.buffersize=65536 -replica.fetch.size=1048576 -replica.fetch.wait.time.ms=500 +replica.socket.receive.buffer.bytes=65536 +replica.fetch.max.bytes=1048576 +replica.fetch.wait.max.ms=500 replica.fetch.min.bytes=4096 -replica.fetchers=1 +num.replica.fetchers=1 diff --git a/system_test/producer_perf/config/server.properties b/system_test/producer_perf/config/server.properties index abd076566ff..9f8a633ab2f 100644 --- a/system_test/producer_perf/config/server.properties +++ b/system_test/producer_perf/config/server.properties @@ -15,12 +15,12 @@ # see kafka.server.KafkaConfig for additional details and defaults # the id of the broker -brokerid=0 +broker.id=0 # hostname of broker. If not set, will pick up from the value returned # from getLocalHost. If there are multiple interfaces getLocalHost # may not be what you want. -# hostname= +# host.name= # number of logical partitions on this broker num.partitions=1 @@ -35,13 +35,13 @@ num.threads=8 log.dir=/tmp/kafka-logs # the send buffer used by the socket server -socket.send.buffer=1048576 +socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server -socket.receive.buffer=1048576 +socket.receive.buffer.bytes=1048576 # the maximum size of a log segment -log.file.size=536870912 +log.segment.bytes=536870912 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 @@ -50,7 +50,7 @@ log.cleanup.interval.mins=1 log.retention.hours=168 #the number of messages to accept without flushing the log to disk -log.flush.interval=600 +log.flush.interval.messages=600 #set the following properties to use zookeeper @@ -63,16 +63,16 @@ enable.zookeeper=true zk.connect=localhost:2181 # timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 # time based topic flush intervals in ms -#topic.flush.intervals.ms=topic:1000 +#log.flush.intervals.ms.per.topic=topic:1000 # default time based flush interval in ms -log.default.flush.interval.ms=1000 +log.flush.interval.ms=1000 # time based topic flasher time rate in ms -log.default.flush.scheduler.interval.ms=1000 +log.flush.scheduler.interval.ms=1000 # topic partition count map # topic.partition.count.map=topic1:3, topic2:4 diff --git a/system_test/replication_testsuite/config/server.properties b/system_test/replication_testsuite/config/server.properties index 8ef65ba133b..dacf158431c 100644 --- a/system_test/replication_testsuite/config/server.properties +++ b/system_test/replication_testsuite/config/server.properties @@ -17,12 +17,12 @@ ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. -brokerid=0 +broker.id=0 # Hostname the broker will advertise to consumers. If not set, kafka will use the value returned # from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost # may not be what you want. -#hostname= +#host.name= ############################# Socket Server Settings ############################# @@ -31,19 +31,19 @@ brokerid=0 port=9091 # The number of threads handling network requests -network.threads=2 +num.network.threads=2 # The number of threads doing disk I/O -io.threads=2 +num.io.threads=2 # The send buffer (SO_SNDBUF) used by the socket server -socket.send.buffer=1048576 +socket.send.buffer.bytes=1048576 # The receive buffer (SO_RCVBUF) used by the socket server -socket.receive.buffer=1048576 +socket.receive.buffer.bytes=1048576 # The maximum size of a request that the socket server will accept (protection against OOM) -max.socket.request.bytes=104857600 +socket.request.max.bytes=104857600 ############################# Log Basics ############################# @@ -70,16 +70,16 @@ num.partitions=5 # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk -log.flush.interval=10000 +log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush -log.default.flush.interval.ms=1000 +log.flush.interval.ms=1000 -# Per-topic overrides for log.default.flush.interval.ms -#topic.flush.intervals.ms=topic1:1000, topic2:3000 +# Per-topic overrides for log.flush.interval.ms +#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000 # The interval (in ms) at which logs are checked to see if they need to be flushed to disk. -log.default.flush.scheduler.interval.ms=1000 +log.flush.scheduler.interval.ms=1000 ############################# Log Retention Policy ############################# @@ -92,13 +92,13 @@ log.default.flush.scheduler.interval.ms=1000 log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining -# segments don't drop below log.retention.size. -#log.retention.size=1073741824 -log.retention.size=-1 +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 +log.retention.bytes=-1 # The maximum size of a log segment file. When this size is reached a new log segment will be created. -#log.file.size=536870912 -log.file.size=102400 +#log.segment.size=536870912 +log.segment.bytes=102400 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies @@ -117,23 +117,23 @@ enable.zookeeper=true zk.connect=localhost:2181 # Timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 monitoring.period.secs=1 -max.message.size=1000000 -max.queued.requests=500 +message.max.bytes=1000000 +queued.max.requests=500 log.roll.hours=168 -log.index.max.size=10485760 +log.index.size.max.bytes=10485760 log.index.interval.bytes=4096 -auto.create.topics=true +auto.create.topics.enable=true controller.socket.timeout.ms=30000 controller.message.queue.size=10 default.replication.factor=1 -replica.max.lag.time.ms=10000 -replica.max.lag.bytes=4000 +replica.lag.time.max.ms=10000 +replica.lag.max.messages=4000 replica.socket.timeout.ms=30000 -replica.socket.buffersize=65536 -replica.fetch.size=1048576 -replica.fetch.wait.time.ms=500 +replica.socket.receive.buffer.bytes=65536 +replica.fetch.max.bytes=1048576 +replica.fetch.wait.max.ms=500 replica.fetch.min.bytes=4096 -replica.fetchers=1 +num.replica.fetchers=1