Use uniform convention for naming properties keys; kafka-648; patched by Sriram Subramanian; reviewed by Jun Rao

This commit is contained in:
Jun Rao 2013-01-11 16:12:57 -08:00
parent dbe87f6df3
commit a40953196e
76 changed files with 541 additions and 585 deletions

View File

@ -23,7 +23,7 @@ zk.connect=127.0.0.1:2181
zk.connectiontimeout.ms=1000000
#consumer group id
groupid=test-consumer-group
group.id=test-consumer-group
#consumer timeout
#consumer.timeout.ms=5000

View File

@ -36,35 +36,18 @@ serializer.class=kafka.serializer.StringEncoder
# allow topic level compression
#compressed.topics=
# max message size; messages larger than that size are discarded; default is 1000000
#max.message.size=
############################# Async Producer #############################
# maximum time, in milliseconds, for buffering data on the producer queue
#queue.time=
#queue.buffering.max.ms=
# the maximum size of the blocking queue for buffering on the producer
#queue.size=
#queue.buffering.max.messages=
# Timeout for event enqueue:
# 0: events will be enqueued immediately or dropped if the queue is full
# -ve: enqueue will block indefinitely if the queue is full
# +ve: enqueue will block up to this many milliseconds if the queue is full
#queue.enqueueTimeout.ms=
#queue.enqueue.timeout.ms=
# the number of messages batched at the producer
#batch.size=
# the callback handler for one or multiple events
#callback.handler=
# properties required to initialize the callback handler
#callback.handler.props=
# the handler for events
#event.handler=
# properties required to initialize the event handler
#event.handler.props=
#batch.num.messages=

View File

@ -17,7 +17,7 @@
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
brokerid=0
broker.id=0
############################# Socket Server Settings #############################
@ -27,22 +27,22 @@ port=9092
# Hostname the broker will bind to and advertise to producers and consumers.
# If not set, the server will bind to all interfaces and advertise the value returned from
# from java.net.InetAddress.getCanonicalHostName().
#hostname=localhost
#host.name=localhost
# The number of threads handling network requests
network.threads=2
num.network.threads=2
# The number of threads doing disk I/O
io.threads=2
num.io.threads=2
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer=1048576
socket.send.buffer.bytes=1048576
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer=1048576
socket.receive.buffer.bytes=1048576
# The maximum size of a request that the socket server will accept (protection against OOM)
max.socket.request.bytes=104857600
socket.request.max.bytes=104857600
############################# Log Basics #############################
@ -54,9 +54,6 @@ log.dir=/tmp/kafka-logs
# for consumption, but also mean more files.
num.partitions=1
# Overrides for for the default given by num.partitions on a per-topic basis
#topic.partition.count.map=topic1:3, topic2:4
############################# Log Flush Policy #############################
# The following configurations control the flush of data to disk. This is the most
@ -69,16 +66,13 @@ num.partitions=1
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
log.flush.interval=10000
log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
log.default.flush.interval.ms=1000
log.flush.interval.ms=1000
# Per-topic overrides for log.default.flush.interval.ms
#topic.flush.intervals.ms=topic1:1000, topic2:3000
# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
log.default.flush.scheduler.interval.ms=1000
# Per-topic overrides for log.flush.interval.ms
#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000
############################# Log Retention Policy #############################
@ -91,11 +85,11 @@ log.default.flush.scheduler.interval.ms=1000
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.size.
#log.retention.size=1073741824
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.file.size=536870912
log.segment.bytes=536870912
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies

View File

@ -71,7 +71,7 @@ public class DataGenerator {
System.out.println("server uri:" + _uri.toString());
Properties producerProps = new Properties();
producerProps.put("broker.list", String.format("%s:%d", _uri.getHost(), _uri.getPort()));
producerProps.put("buffer.size", String.valueOf(TCP_BUFFER_SIZE));
producerProps.put("send.buffer.bytes", String.valueOf(TCP_BUFFER_SIZE));
producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT));
producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL));

View File

@ -119,10 +119,9 @@ public class KafkaOutputFormat<W extends BytesWritable> extends OutputFormat<Nul
job.setInt("kafka.output.compression_codec", compressionCodec);
props.setProperty("producer.type", producerType);
props.setProperty("buffer.size", Integer.toString(bufSize));
props.setProperty("send.buffer.bytes", Integer.toString(bufSize));
props.setProperty("connect.timeout.ms", Integer.toString(timeout));
props.setProperty("reconnect.interval", Integer.toString(interval));
props.setProperty("max.message.size", Integer.toString(maxSize));
props.setProperty("compression.codec", Integer.toString(compressionCodec));
if (uri.getScheme().equals("kafka")) {

View File

@ -61,7 +61,7 @@ object ClientUtils extends Logging{
def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String): TopicMetadataResponse = {
val props = new Properties()
props.put("broker.list", brokers.map(_.getConnectionString()).mkString(","))
props.put("clientid", clientId)
props.put("client.id", clientId)
val producerConfig = new ProducerConfig(props)
fetchTopicMetadata(topics, brokers, producerConfig, 0)
}

View File

@ -261,11 +261,11 @@ class Partition(val topic: String,
.format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(",")))
}
def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagBytes: Long) {
def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) {
leaderIsrUpdateLock synchronized {
leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagBytes)
val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages)
if(outOfSyncReplicas.size > 0) {
val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
assert(newInSyncReplicas.size > 0)
@ -281,12 +281,12 @@ class Partition(val topic: String,
}
}
def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncBytes: Long): Set[Replica] = {
def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncMessages: Long): Set[Replica] = {
/**
* there are two cases that need to be handled here -
* 1. Stuck followers: If the leo of the replica is less than the leo of leader and the leo hasn't been updated
* for keepInSyncTimeMs ms, the follower is stuck and should be removed from the ISR
* 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncBytes, the
* 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncMessages, the
* follower is not catching up and should be removed from the ISR
**/
val leaderLogEndOffset = leaderReplica.logEndOffset
@ -298,7 +298,7 @@ class Partition(val topic: String,
val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTimeMs < (time.milliseconds - keepInSyncTimeMs))
debug("Stuck replicas for topic %s partition %d are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
// Case 2 above
val slowReplicas = candidateReplicas.filter(r => r.logEndOffset >= 0 && (leaderLogEndOffset - r.logEndOffset) > keepInSyncBytes)
val slowReplicas = candidateReplicas.filter(r => r.logEndOffset >= 0 && (leaderLogEndOffset - r.logEndOffset) > keepInSyncMessages)
debug("Slow replicas for topic %s partition %d are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(",")))
stuckReplicas ++ slowReplicas
}

View File

@ -144,14 +144,14 @@ object ConsoleConsumer extends Logging {
}
val props = new Properties()
props.put("groupid", options.valueOf(groupIdOpt))
props.put("socket.buffersize", options.valueOf(socketBufferSizeOpt).toString)
props.put("fetch.size", options.valueOf(fetchSizeOpt).toString)
props.put("min.fetch.bytes", options.valueOf(minFetchBytesOpt).toString)
props.put("max.fetch.wait.ms", options.valueOf(maxWaitMsOpt).toString)
props.put("autocommit.enable", "true")
props.put("autocommit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString)
props.put("autooffset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest")
props.put("group.id", options.valueOf(groupIdOpt))
props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)
props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString)
props.put("fetch.min.bytes", options.valueOf(minFetchBytesOpt).toString)
props.put("fetch.wait.max.ms", options.valueOf(maxWaitMsOpt).toString)
props.put("auto.commit.enable", "true")
props.put("auto.commit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString)
props.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest")
props.put("zk.connect", options.valueOf(zkConnectOpt))
props.put("consumer.timeout.ms", options.valueOf(consumerTimeoutMsOpt).toString)
val config = new ConsumerConfig(props)

View File

@ -52,11 +52,11 @@ object ConsumerConfig extends Config {
}
def validateClientId(clientId: String) {
validateChars("clientid", clientId)
validateChars("client.id", clientId)
}
def validateGroupId(groupId: String) {
validateChars("groupid", groupId)
validateChars("group.id", groupId)
}
def validateAutoOffsetReset(autoOffsetReset: String) {
@ -77,38 +77,38 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
}
/** a string that uniquely identifies a set of consumers within the same consumer group */
val groupId = props.getString("groupid")
val groupId = props.getString("group.id")
/** consumer id: generated automatically if not set.
* Set this explicitly for only testing purpose. */
val consumerId: Option[String] = Option(props.getString("consumerid", null))
val consumerId: Option[String] = Option(props.getString("consumer.id", null))
/** the socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms. */
val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout)
/** the socket receive buffer for network requests */
val socketBufferSize = props.getInt("socket.buffersize", SocketBufferSize)
val socketReceiveBufferBytes = props.getInt("socket.receive.buffer.bytes", SocketBufferSize)
/** the number of byes of messages to attempt to fetch */
val fetchSize = props.getInt("fetch.size", FetchSize)
val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize)
/** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */
val autoCommit = props.getBoolean("autocommit.enable", AutoCommit)
val autoCommitEnable = props.getBoolean("auto.commit.enable", AutoCommit)
/** the frequency in ms that the consumer offsets are committed to zookeeper */
val autoCommitIntervalMs = props.getInt("autocommit.interval.ms", AutoCommitInterval)
val autoCommitIntervalMs = props.getInt("auto.commit.interval.ms", AutoCommitInterval)
/** max number of messages buffered for consumption */
val maxQueuedChunks = props.getInt("queuedchunks.max", MaxQueuedChunks)
val queuedMaxMessages = props.getInt("queued.max.messages", MaxQueuedChunks)
/** max number of retries during rebalance */
val maxRebalanceRetries = props.getInt("rebalance.retries.max", MaxRebalanceRetries)
val rebalanceMaxRetries = props.getInt("rebalance.max.retries", MaxRebalanceRetries)
/** the minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */
val minFetchBytes = props.getInt("min.fetch.bytes", MinFetchBytes)
val fetchMinBytes = props.getInt("fetch.min.bytes", MinFetchBytes)
/** the maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediate satisfy min.fetch.bytes */
val maxFetchWaitMs = props.getInt("max.fetch.wait.ms", MaxFetchWaitMs)
/** the maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes */
val fetchWaitMaxMs = props.getInt("fetch.wait.max.ms", MaxFetchWaitMs)
/** backoff time between retries during rebalance */
val rebalanceBackoffMs = props.getInt("rebalance.backoff.ms", zkSyncTimeMs)
@ -120,7 +120,7 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
smallest : automatically reset the offset to the smallest offset
largest : automatically reset the offset to the largest offset
anything else: throw exception to the consumer */
val autoOffsetReset = props.getString("autooffset.reset", AutoOffsetReset)
val autoOffsetReset = props.getString("auto.offset.reset", AutoOffsetReset)
/** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
val consumerTimeoutMs = props.getInt("consumer.timeout.ms", ConsumerTimeoutMs)
@ -129,12 +129,12 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
* Typically, it's only used for mirroring raw messages from one kafka cluster to another to save the
* overhead of decompression.
* */
val enableShallowIterator = props.getBoolean("shallowiterator.enable", false)
val shallowIteratorEnable = props.getBoolean("shallow.iterator.enable", false)
/**
* Client id is specified by the kafka consumer client, used to distinguish different clients
*/
val clientId = props.getString("clientid", groupId)
val clientId = props.getString("client.id", groupId)
validate(this)
}

View File

@ -33,11 +33,11 @@ class ConsumerFetcherThread(name: String,
clientId = config.clientId + "-" + name,
sourceBroker = sourceBroker,
socketTimeout = config.socketTimeoutMs,
socketBufferSize = config.socketBufferSize,
fetchSize = config.fetchSize,
socketBufferSize = config.socketReceiveBufferBytes,
fetchSize = config.fetchMessageMaxBytes,
fetcherBrokerId = Request.OrdinaryConsumerId,
maxWait = config.maxFetchWaitMs,
minBytes = config.minFetchBytes) {
maxWait = config.fetchWaitMaxMs,
minBytes = config.fetchMinBytes) {
// process fetched data
def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {

View File

@ -112,7 +112,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
connectZk()
createFetcher()
if (config.autoCommit) {
if (config.autoCommitEnable) {
scheduler.startup
info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
scheduler.scheduleWithRate(autoCommit, "Kafka-consumer-autocommit-", config.autoCommitIntervalMs,
@ -160,14 +160,14 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
if (wildcardTopicWatcher != null)
wildcardTopicWatcher.shutdown()
try {
if (config.autoCommit)
if (config.autoCommitEnable)
scheduler.shutdownNow()
fetcher match {
case Some(f) => f.shutdown
case None =>
}
sendShutdownToAllQueues()
if (config.autoCommit)
if (config.autoCommitEnable)
commitOffsets()
if (zkClient != null) {
zkClient.close()
@ -194,9 +194,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
// make a list of (queue,stream) pairs, one pair for each threadId
val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
threadIdSet.map(_ => {
val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
val stream = new KafkaStream[K,V](
queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator, config.clientId)
queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.shallowIteratorEnable, config.clientId)
(queue, stream)
})
).flatten.toList
@ -365,7 +365,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def syncedRebalance() {
rebalanceLock synchronized {
for (i <- 0 until config.maxRebalanceRetries) {
for (i <- 0 until config.rebalanceMaxRetries) {
info("begin rebalancing consumer " + consumerIdString + " try #" + i)
var done = false
val cluster = getCluster(zkClient)
@ -393,7 +393,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
}
throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.maxRebalanceRetries +" retries")
throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.rebalanceMaxRetries +" retries")
}
private def rebalance(cluster: Cluster): Boolean = {
@ -610,7 +610,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
queue,
consumedOffset,
fetchedOffset,
new AtomicInteger(config.fetchSize),
new AtomicInteger(config.fetchMessageMaxBytes),
config.clientId)
partTopicInfoMap.put(partition, partTopicInfo)
debug(partTopicInfo + " selected new offset " + offset)
@ -709,12 +709,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private val wildcardQueuesAndStreams = (1 to numStreams)
.map(e => {
val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
val stream = new KafkaStream[K,V](queue,
config.consumerTimeoutMs,
keyDecoder,
valueDecoder,
config.enableShallowIterator,
config.shallowIteratorEnable,
config.clientId)
(queue, stream)
}).toList

View File

@ -43,15 +43,15 @@ private[kafka] class LogManager(val config: KafkaConfig,
val CleanShutdownFile = ".kafka_cleanshutdown"
val LockFile = ".lock"
val logDirs: Array[File] = config.logDirs.map(new File(_)).toArray
private val logFileSizeMap = config.logFileSizeMap
private val logFlushInterval = config.flushInterval
private val logFlushIntervals = config.flushIntervalMap
private val logFileSizeMap = config.logSegmentBytesPerTopicMap
private val logFlushInterval = config.logFlushIntervalMessages
private val logFlushIntervals = config.logFlushIntervalMsPerTopicMap
private val logCreationLock = new Object
private val logRetentionSizeMap = config.logRetentionSizeMap
private val logRetentionMsMap = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
private val logRollMsMap = config.logRollHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L))
private val logRetentionSizeMap = config.logRetentionBytesPerTopicMap
private val logRetentionMsMap = config.logRetentionHoursPerTopicMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
private val logRollMsMap = config.logRollHoursPerTopicMap.map(e => (e._1, e._2 * 60 * 60 * 1000L))
private val logRollDefaultIntervalMs = 1000L * 60 * 60 * config.logRollHours
private val logCleanupIntervalMs = 1000L * 60 * config.logCleanupIntervalMinutes
private val logCleanupIntervalMs = 1000L * 60 * config.logCleanupIntervalMins
private val logCleanupDefaultAgeMs = 1000L * 60 * 60 * config.logRetentionHours
this.logIdent = "[Log Manager on Broker " + config.brokerId + "] "
@ -111,14 +111,14 @@ private[kafka] class LogManager(val config: KafkaConfig,
info("Loading log '" + dir.getName + "'")
val topicPartition = parseTopicPartitionName(dir.getName)
val rollIntervalMs = logRollMsMap.get(topicPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logFileSize)
val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logSegmentBytes)
val log = new Log(dir,
maxLogFileSize,
config.maxMessageSize,
config.messageMaxBytes,
logFlushInterval,
rollIntervalMs,
needsRecovery,
config.logIndexMaxSizeBytes,
config.logIndexSizeMaxBytes,
config.logIndexIntervalBytes,
time,
config.brokerId)
@ -139,10 +139,10 @@ private[kafka] class LogManager(val config: KafkaConfig,
if(scheduler != null) {
info("Starting log cleaner every " + logCleanupIntervalMs + " ms")
scheduler.scheduleWithRate(cleanupLogs, "kafka-logcleaner-", 60 * 1000, logCleanupIntervalMs, false)
info("Starting log flusher every " + config.flushSchedulerThreadRate +
info("Starting log flusher every " + config.logFlushSchedulerIntervalMs +
" ms with the following overrides " + logFlushIntervals)
scheduler.scheduleWithRate(flushDirtyLogs, "kafka-logflusher-",
config.flushSchedulerThreadRate, config.flushSchedulerThreadRate, false)
config.logFlushSchedulerIntervalMs, config.logFlushSchedulerIntervalMs, false)
}
}
@ -186,14 +186,14 @@ private[kafka] class LogManager(val config: KafkaConfig,
val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition)
dir.mkdirs()
val rollIntervalMs = logRollMsMap.get(topicAndPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
val maxLogFileSize = logFileSizeMap.get(topicAndPartition.topic).getOrElse(config.logFileSize)
val maxLogFileSize = logFileSizeMap.get(topicAndPartition.topic).getOrElse(config.logSegmentBytes)
log = new Log(dir,
maxLogFileSize,
config.maxMessageSize,
config.messageMaxBytes,
logFlushInterval,
rollIntervalMs,
needsRecovery = false,
config.logIndexMaxSizeBytes,
config.logIndexSizeMaxBytes,
config.logIndexIntervalBytes,
time,
config.brokerId)
@ -249,7 +249,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
*/
private def cleanupSegmentsToMaintainSize(log: Log): Int = {
val topic = parseTopicPartitionName(log.dir.getName).topic
val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionSize)
val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionBytes)
if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize) return 0
var diff = log.size - maxLogRetentionSize
def shouldDelete(segment: LogSegment) = {
@ -310,7 +310,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
for (log <- allLogs) {
try {
val timeSinceLastFlush = System.currentTimeMillis - log.getLastFlushedTime
var logFlushInterval = config.defaultFlushIntervalMs
var logFlushInterval = config.logFlushIntervalMs
if(logFlushIntervals.contains(log.topicName))
logFlushInterval = logFlushIntervals(log.topicName)
debug(log.topicName + " flush interval " + logFlushInterval +

View File

@ -125,12 +125,12 @@ object ConsoleProducer {
props.put("compression.codec", codec.toString)
props.put("producer.type", if(sync) "sync" else "async")
if(options.has(batchSizeOpt))
props.put("batch.size", batchSize.toString)
props.put("queue.time", sendTimeout.toString)
props.put("queue.size", queueSize.toString)
props.put("batch.num.messages", batchSize.toString)
props.put("queue.buffering.max.ms", sendTimeout.toString)
props.put("queue.buffering.max.messages", queueSize.toString)
props.put("queue.enqueueTimeout.ms", queueEnqueueTimeoutMs.toString)
props.put("producer.request.required.acks", requestRequiredAcks.toString)
props.put("producer.request.timeout.ms", requestTimeoutMs.toString)
props.put("request.required.acks", requestRequiredAcks.toString)
props.put("request.timeout.ms", requestTimeoutMs.toString)
props.put("key.serializer.class", keyEncoderClass)
props.put("serializer.class", valueEncoderClass)

View File

@ -73,8 +73,8 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
//These have default values in ProducerConfig and AsyncProducerConfig. We don't care if they're not specified
if(producerType != null) props.put("producer.type", producerType)
if(compressionCodec != null) props.put("compression.codec", compressionCodec)
if(enqueueTimeout != null) props.put("queue.enqueueTimeout.ms", enqueueTimeout)
if(queueSize != null) props.put("queue.size", queueSize)
if(enqueueTimeout != null) props.put("queue.enqueue.timeout.ms", enqueueTimeout)
if(queueSize != null) props.put("queue.buffering.max.messages", queueSize)
val config : ProducerConfig = new ProducerConfig(props)
producer = new Producer[String, String](config)
LogLog.debug("Kafka producer connected to " + config.brokerList)

View File

@ -31,7 +31,7 @@ class Producer[K,V](config: ProducerConfig,
extends Logging {
private val hasShutdown = new AtomicBoolean(false)
private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueSize)
private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)
private val random = new Random
private var sync: Boolean = true
@ -44,8 +44,8 @@ class Producer[K,V](config: ProducerConfig,
producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + asyncProducerID,
queue,
eventHandler,
config.queueTime,
config.batchSize,
config.queueBufferingMaxMs,
config.batchNumMessages,
config.clientId)
producerSendThread.start()
}
@ -87,17 +87,17 @@ class Producer[K,V](config: ProducerConfig,
private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
for (message <- messages) {
val added = config.enqueueTimeoutMs match {
val added = config.queueEnqueueTimeoutMs match {
case 0 =>
queue.offer(message)
case _ =>
try {
config.enqueueTimeoutMs < 0 match {
config.queueEnqueueTimeoutMs < 0 match {
case true =>
queue.put(message)
true
case _ =>
queue.offer(message, config.enqueueTimeoutMs, TimeUnit.MILLISECONDS)
queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)
}
}
catch {

View File

@ -26,12 +26,12 @@ import kafka.common.{InvalidConfigException, Config}
object ProducerConfig extends Config {
def validate(config: ProducerConfig) {
validateClientId(config.clientId)
validateBatchSize(config.batchSize, config.queueSize)
validateBatchSize(config.batchNumMessages, config.queueBufferingMaxMessages)
validateProducerType(config.producerType)
}
def validateClientId(clientId: String) {
validateChars("clientid", clientId)
validateChars("client.id", clientId)
}
def validateBatchSize(batchSize: Int, queueSize: Int) {
@ -101,17 +101,16 @@ class ProducerConfig private (val props: VerifiableProperties)
*/
val compressedTopics = Utils.parseCsvList(props.getString("compressed.topics", null))
/**
* The producer using the zookeeper software load balancer maintains a ZK cache that gets
* updated by the zookeeper watcher listeners. During some events like a broker bounce, the
* producer ZK cache can get into an inconsistent state, for a small time period. In this time
* period, it could end up picking a broker partition that is unavailable. When this happens, the
* ZK cache needs to be updated.
* This parameter specifies the number of times the producer attempts to refresh this ZK cache.
*/
val producerRetries = props.getInt("producer.num.retries", 3)
/** The leader may be unavailable transiently, which can fail the sending of a message.
* This property specifies the number of retries when such failures occur.
*/
val messageSendMaxRetries = props.getInt("message.send.max.retries", 3)
val producerRetryBackoffMs = props.getInt("producer.retry.backoff.ms", 100)
/** Before each retry, the producer refreshes the metadata of relevant topics. Since leader
* election takes a bit of time, this property specifies the amount of time that the producer
* waits before refreshing the metadata.
*/
val retryBackoffMs = props.getInt("retry.backoff.ms", 100)
/**
* The producer generally refreshes the topic metadata from brokers when there is a failure
@ -121,7 +120,7 @@ class ProducerConfig private (val props: VerifiableProperties)
* Important note: the refresh happen only AFTER the message is sent, so if the producer never sends
* a message the metadata is never refreshed
*/
val topicMetadataRefreshIntervalMs = props.getInt("producer.metadata.refresh.interval.ms", 600000)
val topicMetadataRefreshIntervalMs = props.getInt("topic.metadata.refresh.interval.ms", 600000)
validate(this)
}

View File

@ -36,7 +36,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
private val lock = new Object()
@volatile private var shutdown: Boolean = false
private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
config.bufferSize, config.requestTimeoutMs)
config.sendBufferBytes, config.requestTimeoutMs)
val brokerInfo = "host_%s-port_%s".format(config.host, config.port)
val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId)

View File

@ -36,24 +36,22 @@ class SyncProducerConfig private (val props: VerifiableProperties) extends SyncP
trait SyncProducerConfigShared {
val props: VerifiableProperties
val bufferSize = props.getInt("buffer.size", 100*1024)
val maxMessageSize = props.getInt("max.message.size", 1000000)
val sendBufferBytes = props.getInt("send.buffer.bytes", 100*1024)
/* the client application sending the producer requests */
val clientId = props.getString("clientid", SyncProducerConfig.DefaultClientId)
val clientId = props.getString("client.id", SyncProducerConfig.DefaultClientId)
/*
* The required acks of the producer requests - negative value means ack
* after the replicas in ISR have caught up to the leader's offset
* corresponding to this produce request.
*/
val requiredAcks = props.getShort("producer.request.required.acks", SyncProducerConfig.DefaultRequiredAcks)
val requestRequiredAcks = props.getShort("request.required.acks", SyncProducerConfig.DefaultRequiredAcks)
/*
* The ack timeout of the producer requests. Value must be non-negative and non-zero
*/
val requestTimeoutMs = props.getIntInRange("producer.request.timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs,
val requestTimeoutMs = props.getIntInRange("request.timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs,
(1, Integer.MAX_VALUE))
}

View File

@ -22,10 +22,10 @@ trait AsyncProducerConfig {
val props: VerifiableProperties
/* maximum time, in milliseconds, for buffering data on the producer queue */
val queueTime = props.getInt("queue.time", 5000)
val queueBufferingMaxMs = props.getInt("queue.buffering.max.ms", 5000)
/** the maximum size of the blocking queue for buffering on the producer */
val queueSize = props.getInt("queue.size", 10000)
val queueBufferingMaxMessages = props.getInt("queue.buffering.max.messages", 10000)
/**
* Timeout for event enqueue:
@ -33,10 +33,10 @@ trait AsyncProducerConfig {
* -ve: enqueue will block indefinitely if the queue is full
* +ve: enqueue will block up to this many milliseconds if the queue is full
*/
val enqueueTimeoutMs = props.getInt("queue.enqueueTimeout.ms", 0)
val queueEnqueueTimeoutMs = props.getInt("queue.enqueue.timeout.ms", 0)
/** the number of messages batched at the producer */
val batchSize = props.getInt("batch.size", 200)
val batchNumMessages = props.getInt("batch.num.messages", 200)
/** the serializer class for values */
val serializerClass = props.getString("serializer.class", "kafka.serializer.DefaultEncoder")

View File

@ -59,7 +59,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
producerTopicStats.getProducerAllTopicStats.byteRate.mark(dataSize)
}
var outstandingProduceRequests = serializedData
var remainingRetries = config.producerRetries + 1
var remainingRetries = config.messageSendMaxRetries + 1
val correlationIdStart = correlationId.get()
while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
@ -72,7 +72,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
if (outstandingProduceRequests.size > 0) {
// back off and update the topic metadata cache before attempting another send operation
Thread.sleep(config.producerRetryBackoffMs)
Thread.sleep(config.retryBackoffMs)
// get topics of the outstanding produce requests and refresh metadata for those
Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))
remainingRetries -= 1
@ -81,9 +81,10 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
}
if(outstandingProduceRequests.size > 0) {
producerStats.failedSendRate.mark()
val correlationIdEnd = correlationId.get()
error("Failed to send the following requests with correlation ids in [%d,%d]: %s".format(correlationIdStart, correlationIdEnd-1, outstandingProduceRequests))
throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null)
throw new FailedToSendMessageException("Failed to send messages after " + config.messageSendMaxRetries + " tries.", null)
}
}
}
@ -231,7 +232,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
messagesPerTopic.keys.toSeq
} else if(messagesPerTopic.size > 0) {
val currentCorrelationId = correlationId.getAndIncrement
val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requiredAcks,
val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks,
config.requestTimeoutMs, messagesPerTopic)
var failedTopicPartitions = Seq.empty[TopicAndPartition]
try {

View File

@ -41,9 +41,9 @@ class KafkaApis(val requestChannel: RequestChannel,
brokerId: Int) extends Logging {
private val producerRequestPurgatory =
new ProducerRequestPurgatory(replicaManager.config.producerRequestPurgatoryPurgeInterval)
new ProducerRequestPurgatory(replicaManager.config.producerPurgatoryPurgeIntervalRequests)
private val fetchRequestPurgatory =
new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchRequestPurgatoryPurgeInterval)
new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchPurgatoryPurgeIntervalRequests)
private val delayedRequestMetrics = new DelayedRequestMetrics
private val requestLogger = Logger.getLogger("kafka.request.logger")
@ -442,7 +442,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ErrorMapping.UnknownTopicOrPartitionCode =>
try {
/* check if auto creation of topics is turned on */
if (config.autoCreateTopics) {
if (config.autoCreateTopicsEnable) {
try {
CreateTopicCommand.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"

View File

@ -36,37 +36,37 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/*********** General Configuration ***********/
/* the broker id for this server */
val brokerId: Int = props.getIntInRange("brokerid", (0, Int.MaxValue))
val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue))
/* the maximum size of message that the server can receive */
val maxMessageSize = props.getIntInRange("max.message.size", 1000000, (0, Int.MaxValue))
val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000, (0, Int.MaxValue))
/* the number of network threads that the server uses for handling network requests */
val numNetworkThreads = props.getIntInRange("network.threads", 3, (1, Int.MaxValue))
val numNetworkThreads = props.getIntInRange("num.network.threads", 3, (1, Int.MaxValue))
/* the number of io threads that the server uses for carrying out network requests */
val numIoThreads = props.getIntInRange("io.threads", 8, (1, Int.MaxValue))
val numIoThreads = props.getIntInRange("num.io.threads", 8, (1, Int.MaxValue))
/* the number of queued requests allowed before blocking the network threads */
val numQueuedRequests = props.getIntInRange("max.queued.requests", 500, (1, Int.MaxValue))
val queuedMaxRequests = props.getIntInRange("queued.max.requests", 500, (1, Int.MaxValue))
/*********** Socket Server Configuration ***********/
/* the port to listen and accept connections on */
val port: Int = props.getInt("port", 6667)
/* hostname of broker. If this is set, it will only bind to this address. If this is not set,
/* hostname of broker. If this is set, it will only bind to this address. If this is not set,
* it will bind to all interfaces, and publish one to ZK */
val hostName: String = props.getString("hostname", null)
val hostName: String = props.getString("host.name", null)
/* the SO_SNDBUFF buffer of the socket sever sockets */
val socketSendBuffer: Int = props.getInt("socket.send.buffer", 100*1024)
val socketSendBufferBytes: Int = props.getInt("socket.send.buffer.bytes", 100*1024)
/* the SO_RCVBUFF buffer of the socket sever sockets */
val socketReceiveBuffer: Int = props.getInt("socket.receive.buffer", 100*1024)
val socketReceiveBufferBytes: Int = props.getInt("socket.receive.buffer.bytes", 100*1024)
/* the maximum number of bytes in a socket request */
val maxSocketRequestSize: Int = props.getIntInRange("max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
val socketRequestMaxBytes: Int = props.getIntInRange("socket.request.max.bytes", 100*1024*1024, (1, Int.MaxValue))
/*********** Log Configuration ***********/
@ -74,56 +74,56 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue))
/* the directories in which the log data is kept */
val logDirs = Utils.parseCsvList(props.getString("log.directories", props.getString("log.dir", "")))
val logDirs = Utils.parseCsvList(props.getString("log.dirs", props.getString("log.dir", "/tmp/kafka-logs")))
require(logDirs.size > 0)
/* the maximum size of a single log file */
val logFileSize = props.getIntInRange("log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
val logSegmentBytes = props.getIntInRange("log.segment.bytes", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
/* the maximum size of a single log file for some specific topic */
val logFileSizeMap = props.getMap("topic.log.file.size", _.toInt > 0).mapValues(_.toInt)
val logSegmentBytesPerTopicMap = props.getMap("log.segment.bytes.per.topic", _.toInt > 0).mapValues(_.toInt)
/* the maximum time before a new log segment is rolled out */
val logRollHours = props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue))
/* the number of hours before rolling out a new log segment for some specific topic */
val logRollHoursMap = props.getMap("topic.log.roll.hours", _.toInt > 0).mapValues(_.toInt)
val logRollHoursPerTopicMap = props.getMap("log.roll.hours.per.topic", _.toInt > 0).mapValues(_.toInt)
/* the number of hours to keep a log file before deleting it */
val logRetentionHours = props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue))
/* the number of hours to keep a log file before deleting it for some specific topic*/
val logRetentionHoursMap = props.getMap("topic.log.retention.hours", _.toInt > 0).mapValues(_.toInt)
val logRetentionHoursPerTopicMap = props.getMap("log.retention.hours.per.topic", _.toInt > 0).mapValues(_.toInt)
/* the maximum size of the log before deleting it */
val logRetentionSize = props.getLong("log.retention.size", -1)
val logRetentionBytes = props.getLong("log.retention.bytes", -1)
/* the maximum size of the log for some specific topic before deleting it */
val logRetentionSizeMap = props.getMap("topic.log.retention.size", _.toLong > 0).mapValues(_.toLong)
val logRetentionBytesPerTopicMap = props.getMap("log.retention.bytes.per.topic", _.toLong > 0).mapValues(_.toLong)
/* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
val logCleanupIntervalMinutes = props.getIntInRange("log.cleanup.interval.mins", 10, (1, Int.MaxValue))
val logCleanupIntervalMins = props.getIntInRange("log.cleanup.interval.mins", 10, (1, Int.MaxValue))
/* the maximum size in bytes of the offset index */
val logIndexMaxSizeBytes = props.getIntInRange("log.index.max.size", 10*1024*1024, (4, Int.MaxValue))
val logIndexSizeMaxBytes = props.getIntInRange("log.index.size.max.bytes", 10*1024*1024, (4, Int.MaxValue))
/* the interval with which we add an entry to the offset index */
val logIndexIntervalBytes = props.getIntInRange("log.index.interval.bytes", 4096, (0, Int.MaxValue))
/* the number of messages accumulated on a log partition before messages are flushed to disk */
val flushInterval = props.getIntInRange("log.flush.interval", 500, (1, Int.MaxValue))
val logFlushIntervalMessages = props.getIntInRange("log.flush.interval.messages", 500, (1, Int.MaxValue))
/* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */
val flushIntervalMap = props.getMap("topic.flush.intervals.ms", _.toInt > 0).mapValues(_.toInt)
val logFlushIntervalMsPerTopicMap = props.getMap("log.flush.interval.ms.per.topic", _.toInt > 0).mapValues(_.toInt)
/* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */
val flushSchedulerThreadRate = props.getInt("log.default.flush.scheduler.interval.ms", 3000)
val logFlushSchedulerIntervalMs = props.getInt("log.flush.scheduler.interval.ms", 3000)
/* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */
val defaultFlushIntervalMs = props.getInt("log.default.flush.interval.ms", flushSchedulerThreadRate)
val logFlushIntervalMs = props.getInt("log.flush.interval.ms", logFlushSchedulerIntervalMs)
/* enable auto creation of topic on the server */
val autoCreateTopics = props.getBoolean("auto.create.topics", true)
val autoCreateTopicsEnable = props.getBoolean("auto.create.topics.enable", true)
/*********** Replication configuration ***********/
@ -136,36 +136,38 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* default replication factors for automatically created topics */
val defaultReplicationFactor = props.getInt("default.replication.factor", 1)
val replicaMaxLagTimeMs = props.getLong("replica.max.lag.time.ms", 10000)
/* If a follower hasn't sent any fetch requests during this time, the leader will remove the follower from isr */
val replicaLagTimeMaxMs = props.getLong("replica.lag.time.max.ms", 10000)
val replicaMaxLagBytes = props.getLong("replica.max.lag.bytes", 4000)
/* If the lag in messages between a leader and a follower exceeds this number, the leader will remove the follower from isr */
val replicaLagMaxMessages = props.getLong("replica.lag.max.messages", 4000)
/* the socket timeout for network requests */
val replicaSocketTimeoutMs = props.getInt("replica.socket.timeout.ms", ConsumerConfig.SocketTimeout)
/* the socket receive buffer for network requests */
val replicaSocketBufferSize = props.getInt("replica.socket.buffersize", ConsumerConfig.SocketBufferSize)
val replicaSocketReceiveBufferBytes = props.getInt("replica.socket.receive.buffer.bytes", ConsumerConfig.SocketBufferSize)
/* the number of byes of messages to attempt to fetch */
val replicaFetchSize = props.getInt("replica.fetch.size", ConsumerConfig.FetchSize)
val replicaFetchMaxBytes = props.getInt("replica.fetch.max.bytes", ConsumerConfig.FetchSize)
/* max wait time for each fetcher request issued by follower replicas*/
val replicaMaxWaitTimeMs = props.getInt("replica.fetch.wait.time.ms", 500)
val replicaFetchWaitMaxMs = props.getInt("replica.fetch.wait.max.ms", 500)
/* minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs */
val replicaMinBytes = props.getInt("replica.fetch.min.bytes", 1)
val replicaFetchMinBytes = props.getInt("replica.fetch.min.bytes", 1)
/* number of fetcher threads used to replicate messages from a source broker.
* Increasing this value can increase the degree of I/O parallelism in the follower broker. */
val numReplicaFetchers = props.getInt("replica.fetchers", 1)
val numReplicaFetchers = props.getInt("num.replica.fetchers", 1)
/* the frequency with which the highwater mark is saved out to disk */
val highWaterMarkCheckpointIntervalMs = props.getLong("replica.highwatermark.checkpoint.ms", 5000L)
/* the frequency with which the high watermark is saved out to disk */
val replicaHighWatermarkCheckpointIntervalMs = props.getLong("replica.high.watermark.checkpoint.interval.ms", 5000L)
/* the purge interval (in number of requests) of the fetch request purgatory */
val fetchRequestPurgatoryPurgeInterval = props.getInt("fetch.purgatory.purge.interval", 10000)
val fetchPurgatoryPurgeIntervalRequests = props.getInt("fetch.purgatory.purge.interval.requests", 10000)
/* the purge interval (in number of requests) of the producer request purgatory */
val producerRequestPurgatoryPurgeInterval = props.getInt("producer.purgatory.purge.interval", 10000)
val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 10000)
}

View File

@ -65,8 +65,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
config.hostName,
config.port,
config.numNetworkThreads,
config.numQueuedRequests,
config.maxSocketRequestSize)
config.queuedMaxRequests,
config.socketRequestMaxBytes)
socketServer.startup

View File

@ -31,11 +31,11 @@ class ReplicaFetcherThread(name:String,
clientId = FetchRequest.ReplicaFetcherClientId,
sourceBroker = sourceBroker,
socketTimeout = brokerConfig.replicaSocketTimeoutMs,
socketBufferSize = brokerConfig.replicaSocketBufferSize,
fetchSize = brokerConfig.replicaFetchSize,
socketBufferSize = brokerConfig.replicaSocketReceiveBufferBytes,
fetchSize = brokerConfig.replicaFetchMaxBytes,
fetcherBrokerId = brokerConfig.brokerId,
maxWait = brokerConfig.replicaMaxWaitTimeMs,
minBytes = brokerConfig.replicaMinBytes) {
maxWait = brokerConfig.replicaFetchWaitMaxMs,
minBytes = brokerConfig.replicaFetchMinBytes) {
// process fetched data
def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {

View File

@ -72,7 +72,7 @@ class ReplicaManager(val config: KafkaConfig,
def startHighWaterMarksCheckPointThread() = {
if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.highWaterMarkCheckpointIntervalMs)
kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.replicaHighWatermarkCheckpointIntervalMs)
}
/**
@ -91,7 +91,7 @@ class ReplicaManager(val config: KafkaConfig,
def startup() {
// start ISR expiration thread
kafkaScheduler.scheduleWithRate(maybeShrinkIsr, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs)
kafkaScheduler.scheduleWithRate(maybeShrinkIsr, "isr-expiration-thread-", 0, config.replicaLagTimeMaxMs)
}
def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = {
@ -244,7 +244,7 @@ class ReplicaManager(val config: KafkaConfig,
private def maybeShrinkIsr(): Unit = {
trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
leaderPartitionsLock synchronized {
leaderPartitions.foreach(partition => partition.maybeShrinkIsr(config.replicaMaxLagTimeMs, config.replicaMaxLagBytes))
leaderPartitions.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages))
}
}

View File

@ -182,9 +182,9 @@ public class KafkaMigrationTool
Properties kafkaConsumerProperties_07 = new Properties();
kafkaConsumerProperties_07.load(new FileInputStream(consumerConfigFile_07));
/** Disable shallow iteration because the message format is different between 07 and 08, we have to get each individual message **/
if(kafkaConsumerProperties_07.getProperty("shallowiterator.enable", "").equals("true")){
if(kafkaConsumerProperties_07.getProperty("shallow.iterator.enable", "").equals("true")){
logger.warn("Shallow iterator should not be used in the migration tool");
kafkaConsumerProperties_07.setProperty("shallowiterator.enable", "false");
kafkaConsumerProperties_07.setProperty("shallow.iterator.enable", "false");
}
Object consumerConfig_07 = ConsumerConfigConstructor_07.newInstance(kafkaConsumerProperties_07);

View File

@ -42,12 +42,12 @@ object ReplayLogProducer extends Logging {
// consumer properties
val consumerProps = new Properties
consumerProps.put("groupid", GroupId)
consumerProps.put("group.id", GroupId)
consumerProps.put("zk.connect", config.zkConnect)
consumerProps.put("consumer.timeout.ms", "10000")
consumerProps.put("autooffset.reset", OffsetRequest.SmallestTimeString)
consumerProps.put("fetch.size", (1024*1024).toString)
consumerProps.put("socket.buffer.size", (2 * 1024 * 1024).toString)
consumerProps.put("auto.offset.reset", OffsetRequest.SmallestTimeString)
consumerProps.put("fetch.message.max.bytes", (1024*1024).toString)
consumerProps.put("socket.receive.buffer.bytes", (2 * 1024 * 1024).toString)
val consumerConfig = new ConsumerConfig(consumerProps)
val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(config.inputTopic -> config.numThreads))
@ -141,10 +141,10 @@ object ReplayLogProducer extends Logging {
val props = new Properties()
props.put("broker.list", config.brokerList)
props.put("reconnect.interval", Integer.MAX_VALUE.toString)
props.put("buffer.size", (64*1024).toString)
props.put("send.buffer.bytes", (64*1024).toString)
props.put("compression.codec", config.compressionCodec.codec.toString)
props.put("batch.size", config.batchSize.toString)
props.put("queue.enqueueTimeout.ms", "-1")
props.put("batch.num.messages", config.batchSize.toString)
props.put("queue.enqueue.timeout.ms", "-1")
if(config.isAsync)
props.put("producer.type", "async")

View File

@ -785,11 +785,11 @@ class ZKConfig(props: VerifiableProperties) {
val zkConnect = props.getString("zk.connect", null)
/** zookeeper session timeout */
val zkSessionTimeoutMs = props.getInt("zk.sessiontimeout.ms", 6000)
val zkSessionTimeoutMs = props.getInt("zk.session.timeout.ms", 6000)
/** the max time that the client waits to establish a connection to zookeeper */
val zkConnectionTimeoutMs = props.getInt("zk.connectiontimeout.ms",zkSessionTimeoutMs)
val zkConnectionTimeoutMs = props.getInt("zk.connection.timeout.ms",zkSessionTimeoutMs)
/** how far a ZK follower can be behind a ZK leader */
val zkSyncTimeMs = props.getInt("zk.synctime.ms", 2000)
val zkSyncTimeMs = props.getInt("zk.sync.time.ms", 2000)
}

View File

@ -35,9 +35,9 @@ object TestEndToEndLatency {
val topic = "test"
val consumerProps = new Properties()
consumerProps.put("groupid", topic)
consumerProps.put("group.id", topic)
consumerProps.put("auto.commit", "true")
consumerProps.put("autooffset.reset", "largest")
consumerProps.put("auto.offset.reset", "largest")
consumerProps.put("zk.connect", zkConnect)
consumerProps.put("socket.timeout.ms", 1201000.toString)

View File

@ -33,7 +33,7 @@ object TestLogPerformance {
val props = TestUtils.createBrokerConfig(0, -1)
val config = new KafkaConfig(props)
val dir = TestUtils.tempDir()
val log = new Log(dir, 50*1024*1024, config.maxMessageSize, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, time = SystemTime)
val log = new Log(dir, 50*1024*1024, config.messageMaxBytes, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, time = SystemTime)
val bytes = new Array[Byte](messageSize)
new java.util.Random().nextBytes(bytes)
val message = new Message(bytes)

View File

@ -31,7 +31,7 @@ object TestZKConsumerOffsets {
val topic = args(1)
val autoOffsetReset = args(2)
val props = Utils.loadProps(args(0))
props.put("autooffset.reset", "largest")
props.put("auto.offset.reset", "largest")
val config = new ConsumerConfig(props)
val consumerConnector: ConsumerConnector = Consumer.create(config)

View File

@ -78,9 +78,9 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
// update offset in zookeeper for consumer to jump "forward" in time
val dirs = new ZKGroupTopicDirs(group, topic)
var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer)
consumerProps.put("autooffset.reset", resetTo)
consumerProps.put("auto.offset.reset", resetTo)
consumerProps.put("consumer.timeout.ms", "2000")
consumerProps.put("max.fetch.wait.ms", "0")
consumerProps.put("fetch.wait.max.ms", "0")
val consumerConfig = new ConsumerConfig(consumerProps)
TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0", offset)

View File

@ -35,12 +35,12 @@ trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarnes
val props = new Properties()
props.put("partitioner.class", "kafka.utils.StaticPartitioner")
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
props.put("buffer.size", "65536")
props.put("send.buffer.bytes", "65536")
props.put("connect.timeout.ms", "100000")
props.put("reconnect.interval", "10000")
props.put("producer.retry.backoff.ms", "1000")
props.put("producer.num.retries", "3")
props.put("producer.request.required.acks", "-1")
props.put("retry.backoff.ms", "1000")
props.put("message.send.max.retries", "3")
props.put("request.required.acks", "-1")
props.put("serializer.class", classOf[StringEncoder].getName.toString)
producer = new Producer(new ProducerConfig(props))
consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")

View File

@ -40,8 +40,8 @@ class LogManagerTest extends JUnit3Suite {
override def setUp() {
super.setUp()
config = new KafkaConfig(TestUtils.createBrokerConfig(0, -1)) {
override val logFileSize = 1024
override val flushInterval = 10000
override val logSegmentBytes = 1024
override val logFlushIntervalMessages = 10000
override val logRetentionHours = maxLogAgeHours
}
scheduler.startup
@ -114,10 +114,10 @@ class LogManagerTest extends JUnit3Suite {
val props = TestUtils.createBrokerConfig(0, -1)
logManager.shutdown()
config = new KafkaConfig(props) {
override val logFileSize = (10 * (setSize - 1)) // each segment will be 10 messages
override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long]
override val logSegmentBytes = (10 * (setSize - 1)) // each segment will be 10 messages
override val logRetentionBytes = (5 * 10 * setSize + 10).asInstanceOf[Long]
override val logRetentionHours = retentionHours
override val flushInterval = 100
override val logFlushIntervalMessages = 100
override val logRollHours = maxRollInterval
}
logManager = new LogManager(config, scheduler, time)
@ -158,11 +158,11 @@ class LogManagerTest extends JUnit3Suite {
val props = TestUtils.createBrokerConfig(0, -1)
logManager.shutdown()
config = new KafkaConfig(props) {
override val logFileSize = 1024 *1024 *1024
override val flushSchedulerThreadRate = 50
override val flushInterval = Int.MaxValue
override val logSegmentBytes = 1024 *1024 *1024
override val logFlushSchedulerIntervalMs = 50
override val logFlushIntervalMessages = Int.MaxValue
override val logRollHours = maxRollInterval
override val flushIntervalMap = Map("timebasedflush" -> 100)
override val logFlushIntervalMsPerTopicMap = Map("timebasedflush" -> 100)
}
logManager = new LogManager(config, scheduler, time)
logManager.startup
@ -173,7 +173,7 @@ class LogManagerTest extends JUnit3Suite {
}
val ellapsed = System.currentTimeMillis - log.getLastFlushedTime
assertTrue("The last flush time has to be within defaultflushInterval of current time (was %d)".format(ellapsed),
ellapsed < 2*config.flushSchedulerThreadRate)
ellapsed < 2*config.logFlushSchedulerIntervalMs)
}
@Test
@ -183,7 +183,7 @@ class LogManagerTest extends JUnit3Suite {
val dirs = Seq(TestUtils.tempDir().getAbsolutePath,
TestUtils.tempDir().getAbsolutePath,
TestUtils.tempDir().getAbsolutePath)
props.put("log.directories", dirs.mkString(","))
props.put("log.dirs", dirs.mkString(","))
logManager.shutdown()
logManager = new LogManager(new KafkaConfig(props), scheduler, time)

View File

@ -198,15 +198,15 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
private def createBrokerConfig(nodeId: Int, port: Int): Properties = {
val props = new Properties
props.put("brokerid", nodeId.toString)
props.put("broker.id", nodeId.toString)
props.put("port", port.toString)
props.put("log.dir", getLogDir.getAbsolutePath)
props.put("log.flush.interval", "1")
props.put("log.flush.interval.messages", "1")
props.put("enable.zookeeper", "false")
props.put("num.partitions", "20")
props.put("log.retention.hours", "10")
props.put("log.cleanup.interval.mins", "5")
props.put("log.file.size", logSize.toString)
props.put("log.segment.bytes", logSize.toString)
props.put("zk.connect", zkConnect.toString)
props
}

View File

@ -62,7 +62,7 @@ class LogTest extends JUnitSuite {
val time: MockTime = new MockTime()
// create a log
val log = new Log(logDir, 1000, config.maxMessageSize, 1000, rollMs, needsRecovery = false, time = time)
val log = new Log(logDir, 1000, config.messageMaxBytes, 1000, rollMs, needsRecovery = false, time = time)
time.currentMs += rollMs + 1
// segment age is less than its limit
@ -96,7 +96,7 @@ class LogTest extends JUnitSuite {
val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
// create a log
val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time)
val log = new Log(logDir, logFileSize, config.messageMaxBytes, 1000, 10000, needsRecovery = false, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
// segments expire in size
@ -109,12 +109,12 @@ class LogTest extends JUnitSuite {
@Test
def testLoadEmptyLog() {
createEmptyLogs(logDir, 0)
new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
new Log(logDir, 1024, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
}
@Test
def testAppendAndRead() {
val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
val log = new Log(logDir, 1024, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
val message = new Message(Integer.toString(42).getBytes())
for(i <- 0 until 10)
log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
@ -131,7 +131,7 @@ class LogTest extends JUnitSuite {
@Test
def testReadOutOfRange() {
createEmptyLogs(logDir, 1024)
val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
val log = new Log(logDir, 1024, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).sizeInBytes)
try {
log.read(0, 1024)
@ -151,7 +151,7 @@ class LogTest extends JUnitSuite {
@Test
def testLogRolls() {
/* create a multipart log with 100 messages */
val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
val log = new Log(logDir, 100, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
val numMessages = 100
val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes))
val offsets = messageSets.map(log.append(_)._1)
@ -173,7 +173,7 @@ class LogTest extends JUnitSuite {
@Test
def testCompressedMessages() {
/* this log should roll after every messageset */
val log = new Log(logDir, 10, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
val log = new Log(logDir, 10, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
/* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
@ -206,7 +206,7 @@ class LogTest extends JUnitSuite {
@Test
def testEdgeLogRollsStartingAtZero() {
// first test a log segment starting at 0
val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
val log = new Log(logDir, 100, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
val curOffset = log.logEndOffset
assertEquals(curOffset, 0)
@ -221,7 +221,7 @@ class LogTest extends JUnitSuite {
@Test
def testEdgeLogRollsStartingAtNonZero() {
// second test an empty log segment starting at non-zero
val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
val log = new Log(logDir, 100, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
val numMessages = 1
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(i.toString.getBytes))
@ -269,7 +269,7 @@ class LogTest extends JUnitSuite {
val messageSize = 100
val segmentSize = 7 * messageSize
val indexInterval = 3 * messageSize
var log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
var log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize)))
assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset)
@ -278,14 +278,14 @@ class LogTest extends JUnitSuite {
log.close()
// test non-recovery case
log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset)
assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset)
assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries)
log.close()
// test
log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset)
assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset)
assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries)
@ -300,7 +300,7 @@ class LogTest extends JUnitSuite {
val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
// create a log
val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time)
val log = new Log(logDir, logFileSize, config.messageMaxBytes, 1000, 10000, needsRecovery = false, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
for (i<- 1 to msgPerSeg)
@ -349,7 +349,7 @@ class LogTest extends JUnitSuite {
val setSize = set.sizeInBytes
val msgPerSeg = 10
val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time)
val log = new Log(logDir, logFileSize, config.messageMaxBytes, 1000, 10000, needsRecovery = false, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
for (i<- 1 to msgPerSeg)
log.append(set)
@ -373,7 +373,7 @@ class LogTest extends JUnitSuite {
logDir.mkdir()
var log = new Log(logDir,
maxLogFileSize = 64*1024,
maxMessageSize = config.maxMessageSize,
maxMessageSize = config.messageMaxBytes,
maxIndexSize = 1000,
indexIntervalBytes = 10000,
needsRecovery = true)
@ -403,7 +403,7 @@ class LogTest extends JUnitSuite {
val set = TestUtils.singleMessageSet("test".getBytes())
val log = new Log(logDir,
maxLogFileSize = set.sizeInBytes * 5,
maxMessageSize = config.maxMessageSize,
maxMessageSize = config.messageMaxBytes,
maxIndexSize = 1000,
indexIntervalBytes = 1,
needsRecovery = false)
@ -425,7 +425,7 @@ class LogTest extends JUnitSuite {
// create a log
var log = new Log(logDir,
maxLogFileSize = set.sizeInBytes * 5,
maxMessageSize = config.maxMessageSize,
maxMessageSize = config.messageMaxBytes,
maxIndexSize = 1000,
indexIntervalBytes = 10000,
needsRecovery = true)
@ -436,7 +436,7 @@ class LogTest extends JUnitSuite {
log.close()
log = new Log(logDir,
maxLogFileSize = set.sizeInBytes * 5,
maxMessageSize = config.maxMessageSize,
maxMessageSize = config.messageMaxBytes,
maxIndexSize = 1000,
indexIntervalBytes = 10000,
needsRecovery = true)

View File

@ -63,8 +63,8 @@ class AsyncProducerTest extends JUnit3Suite {
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
props.put("producer.type", "async")
props.put("queue.size", "10")
props.put("batch.size", "1")
props.put("queue.buffering.max.messages", "10")
props.put("batch.num.messages", "1")
val config = new ProducerConfig(props)
val produceData = getProduceData(12)
@ -87,7 +87,7 @@ class AsyncProducerTest extends JUnit3Suite {
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
props.put("producer.type", "async")
props.put("batch.size", "1")
props.put("batch.num.messages", "1")
val config = new ProducerConfig(props)
val produceData = getProduceData(10)
@ -358,7 +358,7 @@ class AsyncProducerTest extends JUnit3Suite {
val props = new Properties()
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("producer.type", "async")
props.put("batch.size", "5")
props.put("batch.num.messages", "5")
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
val config = new ProducerConfig(props)

View File

@ -140,13 +140,13 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
props1.put("serializer.class", "kafka.serializer.StringEncoder")
props1.put("partitioner.class", "kafka.utils.StaticPartitioner")
props1.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
props1.put("producer.request.required.acks", "2")
props1.put("producer.request.timeout.ms", "1000")
props1.put("request.required.acks", "2")
props1.put("request.timeout.ms", "1000")
val props2 = new util.Properties()
props2.putAll(props1)
props2.put("producer.request.required.acks", "3")
props2.put("producer.request.timeout.ms", "1000")
props2.put("request.required.acks", "3")
props2.put("request.timeout.ms", "1000")
val producerConfig1 = new ProducerConfig(props1)
val producerConfig2 = new ProducerConfig(props2)
@ -198,8 +198,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
val props = new Properties()
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("partitioner.class", "kafka.utils.StaticPartitioner")
props.put("producer.request.timeout.ms", "2000")
// props.put("producer.request.required.acks", "-1")
props.put("request.timeout.ms", "2000")
// props.put("request.required.acks", "-1")
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
// create topic
@ -256,7 +256,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
val props = new Properties()
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("partitioner.class", "kafka.utils.StaticPartitioner")
props.put("producer.request.timeout.ms", String.valueOf(timeoutMs))
props.put("request.timeout.ms", String.valueOf(timeoutMs))
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
val config = new ProducerConfig(props)
@ -300,7 +300,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
// make sure we don't wait fewer than numRetries*timeoutMs milliseconds
// we do this because the DefaultEventHandler retries a number of times
assertTrue((t2-t1) >= timeoutMs*config.producerRetries)
assertTrue((t2-t1) >= timeoutMs*config.messageSendMaxRetries)
}
}

View File

@ -41,7 +41,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
val props = new Properties()
props.put("host", "localhost")
props.put("port", server.socketServer.port.toString)
props.put("buffer.size", "102400")
props.put("send.buffer.bytes", "102400")
props.put("connect.timeout.ms", "500")
props.put("reconnect.interval", "1000")
val producer = new SyncProducer(new SyncProducerConfig(props))
@ -77,10 +77,9 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
val props = new Properties()
props.put("host", "localhost")
props.put("port", server.socketServer.port.toString)
props.put("buffer.size", "102400")
props.put("send.buffer.bytes", "102400")
props.put("connect.timeout.ms", "300")
props.put("reconnect.interval", "500")
props.put("max.message.size", "100")
val correlationId = 0
val clientId = SyncProducerConfig.DefaultClientId
val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
@ -98,12 +97,11 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
val props = new Properties()
props.put("host", "localhost")
props.put("port", server.socketServer.port.toString)
props.put("max.message.size", 50000.toString)
val producer = new SyncProducer(new SyncProducerConfig(props))
CreateTopicCommand.createTopic(zkClient, "test", 1, 1)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0, 500)
val message1 = new Message(new Array[Byte](configs(0).maxMessageSize + 1))
val message1 = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))
val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1)
val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1))
@ -111,7 +109,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
Assert.assertEquals(ErrorMapping.MessageSizeTooLargeCode, response1.status(TopicAndPartition("test", 0)).error)
Assert.assertEquals(-1L, response1.status(TopicAndPartition("test", 0)).offset)
val safeSize = configs(0).maxMessageSize - Message.MessageOverhead - MessageSet.LogOverhead - 1
val safeSize = configs(0).messageMaxBytes - Message.MessageOverhead - MessageSet.LogOverhead - 1
val message2 = new Message(new Array[Byte](safeSize))
val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2)
val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2))
@ -127,10 +125,9 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
val props = new Properties()
props.put("host", "localhost")
props.put("port", server.socketServer.port.toString)
props.put("buffer.size", "102400")
props.put("send.buffer.bytes", "102400")
props.put("connect.timeout.ms", "300")
props.put("reconnect.interval", "500")
props.put("max.message.size", "100")
val producer = new SyncProducer(new SyncProducerConfig(props))
val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
@ -179,8 +176,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
val props = new Properties()
props.put("host", "localhost")
props.put("port", server.socketServer.port.toString)
props.put("buffer.size", "102400")
props.put("producer.request.timeout.ms", String.valueOf(timeoutMs))
props.put("send.buffer.bytes", "102400")
props.put("request.timeout.ms", String.valueOf(timeoutMs))
val producer = new SyncProducer(new SyncProducerConfig(props))
val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))

View File

@ -29,8 +29,8 @@ class IsrExpirationTest extends JUnit3Suite {
var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
override val replicaMaxLagTimeMs = 100L
override val replicaMaxLagBytes = 10L
override val replicaLagTimeMaxMs = 100L
override val replicaLagMaxMessages = 10L
})
val topic = "foo"
@ -45,7 +45,7 @@ class IsrExpirationTest extends JUnit3Suite {
// let the follower catch up to 10
(partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = 10)
var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs, configs.head.replicaLagMaxMessages)
assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
// let some time pass
@ -53,7 +53,7 @@ class IsrExpirationTest extends JUnit3Suite {
// now follower (broker id 1) has caught up to only 10, while the leader is at 15 AND the follower hasn't
// pulled any data for > replicaMaxLagTimeMs ms. So it is stuck
partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs, configs.head.replicaLagMaxMessages)
assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
EasyMock.verify(log)
}
@ -71,7 +71,7 @@ class IsrExpirationTest extends JUnit3Suite {
// now follower (broker id 1) has caught up to only 4, while the leader is at 15. Since the gap it larger than
// replicaMaxLagBytes, the follower is out of sync.
val partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
val partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs, configs.head.replicaLagMaxMessages)
assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
EasyMock.verify(log)

View File

@ -28,9 +28,9 @@ import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
override val replicaMaxLagTimeMs = 5000L
override val replicaMaxLagBytes = 10L
override val replicaMinBytes = 20
override val replicaLagTimeMaxMs = 5000L
override val replicaLagMaxMessages = 10L
override val replicaFetchMinBytes = 20
})
val topic = "new-topic"
val partitionId = 0
@ -50,7 +50,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
producerProps.put("key.serializer.class", classOf[IntEncoder].getName.toString)
producerProps.put("producer.request.required.acks", "-1")
producerProps.put("request.required.acks", "-1")
def testHWCheckpointNoFailuresSingleLogSegment {

View File

@ -33,8 +33,8 @@ import kafka.common.TopicAndPartition
class SimpleFetchTest extends JUnit3Suite {
val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
override val replicaMaxLagTimeMs = 100L
override val replicaMaxLagBytes = 10L
override val replicaLagTimeMaxMs = 100L
override val replicaLagMaxMessages = 10L
})
val topic = "foo"
val partitionId = 0

View File

@ -123,11 +123,11 @@ object TestUtils extends Logging {
*/
def createBrokerConfig(nodeId: Int, port: Int): Properties = {
val props = new Properties
props.put("brokerid", nodeId.toString)
props.put("hostname", "localhost")
props.put("broker.id", nodeId.toString)
props.put("host.name", "localhost")
props.put("port", port.toString)
props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
props.put("log.flush.interval", "1")
props.put("log.flush.interval.messages", "1")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
props.put("replica.socket.timeout.ms", "1500")
props
@ -140,13 +140,13 @@ object TestUtils extends Logging {
consumerTimeout: Long = -1): Properties = {
val props = new Properties
props.put("zk.connect", zkConnect)
props.put("groupid", groupId)
props.put("consumerid", consumerId)
props.put("group.id", groupId)
props.put("consumer.id", consumerId)
props.put("consumer.timeout.ms", consumerTimeout.toString)
props.put("zk.sessiontimeout.ms", "400")
props.put("zk.synctime.ms", "200")
props.put("autocommit.interval.ms", "1000")
props.put("rebalance.retries.max", "4")
props.put("zk.session.timeout.ms", "400")
props.put("zk.sync.time.ms", "200")
props.put("auto.commit.interval.ms", "1000")
props.put("rebalance.max.retries", "4")
props
}
@ -293,7 +293,7 @@ object TestUtils extends Logging {
keyEncoder: Encoder[K] = new DefaultEncoder()): Producer[K, V] = {
val props = new Properties()
props.put("broker.list", brokerList)
props.put("buffer.size", "65536")
props.put("send.buffer.bytes", "65536")
props.put("connect.timeout.ms", "100000")
props.put("reconnect.interval", "10000")
props.put("serializer.class", encoder.getClass.getCanonicalName)
@ -307,10 +307,10 @@ object TestUtils extends Logging {
props.put("producer.type", "sync")
props.put("broker.list", brokerList)
props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
props.put("buffer.size", bufferSize.toString)
props.put("send.buffer.bytes", bufferSize.toString)
props.put("connect.timeout.ms", connectTimeout.toString)
props.put("reconnect.interval", reconnectInterval.toString)
props.put("producer.request.timeout.ms", 30000.toString)
props.put("request.timeout.ms", 30000.toString)
props.put("serializer.class", classOf[StringEncoder].getName.toString)
props
}

View File

@ -44,10 +44,10 @@ public class Consumer extends Thread
{
Properties props = new Properties();
props.put("zk.connect", KafkaProperties.zkConnect);
props.put("groupid", KafkaProperties.groupId);
props.put("zk.sessiontimeout.ms", "400");
props.put("zk.synctime.ms", "200");
props.put("autocommit.interval.ms", "1000");
props.put("group.id", KafkaProperties.groupId);
props.put("zk.session.timeout.ms", "400");
props.put("zk.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);

View File

@ -74,7 +74,7 @@ object ConsumerPerformance {
if(!config.showDetailedStats) {
val totalMBRead = (totalBytesRead.get*1.0)/(1024*1024)
println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), config.dateFormat.format(endMs),
config.consumerConfig.fetchSize, totalMBRead, totalMBRead/elapsedSecs, totalMessagesRead.get,
config.consumerConfig.fetchMessageMaxBytes, totalMBRead, totalMBRead/elapsedSecs, totalMessagesRead.get,
totalMessagesRead.get/elapsedSecs))
}
System.exit(0)
@ -124,10 +124,10 @@ object ConsumerPerformance {
}
val props = new Properties
props.put("groupid", options.valueOf(groupIdOpt))
props.put("socket.buffer.size", options.valueOf(socketBufferSizeOpt).toString)
props.put("fetch.size", options.valueOf(fetchSizeOpt).toString)
props.put("autooffset.reset", if(options.has(resetBeginningOffsetOpt)) "largest" else "smallest")
props.put("group.id", options.valueOf(groupIdOpt))
props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)
props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString)
props.put("auto.offset.reset", if(options.has(resetBeginningOffsetOpt)) "largest" else "smallest")
props.put("zk.connect", options.valueOf(zkConnectOpt))
props.put("consumer.timeout.ms", "5000")
val consumerConfig = new ConsumerConfig(props)
@ -190,7 +190,7 @@ object ConsumerPerformance {
val totalMBRead = (bytesRead*1.0)/(1024*1024)
val mbRead = ((bytesRead - lastBytesRead)*1.0)/(1024*1024)
println(("%s, %d, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(endMs), id,
config.consumerConfig.fetchSize, totalMBRead,
config.consumerConfig.fetchMessageMaxBytes, totalMBRead,
1000.0*(mbRead/elapsedMs), messagesRead, ((messagesRead - lastMessagesRead)/elapsedMs)*1000.0))
}

View File

@ -191,17 +191,17 @@ object ProducerPerformance extends Logging {
props.put("broker.list", config.brokerList)
props.put("compression.codec", config.compressionCodec.codec.toString)
props.put("reconnect.interval", Integer.MAX_VALUE.toString)
props.put("buffer.size", (64*1024).toString)
props.put("send.buffer.bytes", (64*1024).toString)
if(!config.isSync) {
props.put("producer.type","async")
props.put("batch.size", config.batchSize.toString)
props.put("queue.enqueueTimeout.ms", "-1")
props.put("batch.num.messages", config.batchSize.toString)
props.put("queue.enqueue.timeout.ms", "-1")
}
props.put("clientid", "ProducerPerformance")
props.put("producer.request.required.acks", config.producerRequestRequiredAcks.toString)
props.put("producer.request.timeout.ms", config.producerRequestTimeoutMs.toString)
props.put("producer.num.retries", config.producerNumRetries.toString)
props.put("producer.retry.backoff.ms", config.producerRetryBackoffMs.toString)
props.put("client.id", "ProducerPerformance")
props.put("request.required.acks", config.producerRequestRequiredAcks.toString)
props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString)
props.put("message.send.max.retries", config.producerNumRetries.toString)
props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString)
props.put("serializer.class", classOf[DefaultEncoder].getName.toString)
props.put("key.serializer.class", classOf[NullEncoder[Long]].getName.toString)

View File

@ -18,10 +18,10 @@
zk.connect=localhost:2182
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
producer.type=async
# to avoid dropping events if the queue is full, wait indefinitely
queue.enqueueTimeout.ms=-1
queue.enqueue.timeout.ms=-1

View File

@ -19,10 +19,10 @@
zk.connect=localhost:2182
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
producer.type=async
# to avoid dropping events if the queue is full, wait indefinitely
queue.enqueueTimeout.ms=-1
queue.enqueue.timeout.ms=-1

View File

@ -19,10 +19,10 @@
zk.connect=localhost:2182
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
producer.type=async
# to avoid dropping events if the queue is full, wait indefinitely
queue.enqueueTimeout.ms=-1
queue.enqueue.timeout.ms=-1

View File

@ -19,10 +19,10 @@
zk.connect=localhost:2182
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
producer.type=async
# to avoid dropping events if the queue is full, wait indefinitely
queue.enqueueTimeout.ms=-1
queue.enqueue.timeout.ms=-1

View File

@ -15,12 +15,12 @@
# see kafka.server.KafkaConfig for additional details and defaults
# the id of the broker
brokerid=1
broker.id=1
# hostname of broker. If not set, will pick up from the value returned
# from getLocalHost. If there are multiple interfaces getLocalHost
# may not be what you want.
# hostname=
# host.name=
# number of logical partitions on this broker
num.partitions=1
@ -35,13 +35,13 @@ num.threads=8
log.dir=/tmp/kafka-source1-logs
# the send buffer used by the socket server
socket.send.buffer=1048576
socket.send.buffer.bytes=1048576
# the receive buffer used by the socket server
socket.receive.buffer=1048576
socket.receive.buffer.bytes=1048576
# the maximum size of a log segment
log.file.size=10000000
log.segment.bytes=10000000
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
@ -50,7 +50,7 @@ log.cleanup.interval.mins=1
log.retention.hours=168
#the number of messages to accept without flushing the log to disk
log.flush.interval=600
log.flush.interval.messages=600
#set the following properties to use zookeeper
@ -63,14 +63,14 @@ enable.zookeeper=true
zk.connect=localhost:2181
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
# time based topic flush intervals in ms
#topic.flush.intervals.ms=topic:1000
#log.flush.intervals.ms.per.topic=topic:1000
# default time based flush interval in ms
log.default.flush.interval.ms=1000
log.flush.interval.ms=1000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
log.flush.scheduler.interval.ms=1000

View File

@ -15,12 +15,12 @@
# see kafka.server.KafkaConfig for additional details and defaults
# the id of the broker
brokerid=2
broker.id=2
# hostname of broker. If not set, will pick up from the value returned
# from getLocalHost. If there are multiple interfaces getLocalHost
# may not be what you want.
# hostname=
# host.name=
# number of logical partitions on this broker
num.partitions=1
@ -35,13 +35,13 @@ num.threads=8
log.dir=/tmp/kafka-source2-logs
# the send buffer used by the socket server
socket.send.buffer=1048576
socket.send.buffer.bytes=1048576
# the receive buffer used by the socket server
socket.receive.buffer=1048576
socket.receive.buffer.bytes=1048576
# the maximum size of a log segment
log.file.size=10000000
log.segment.bytes=10000000
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
@ -50,7 +50,7 @@ log.cleanup.interval.mins=1
log.retention.hours=168
#the number of messages to accept without flushing the log to disk
log.flush.interval=600
log.flush.interval.messages=600
#set the following properties to use zookeeper
@ -63,14 +63,14 @@ enable.zookeeper=true
zk.connect=localhost:2181
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
# time based topic flush intervals in ms
#topic.flush.intervals.ms=topic:1000
#log.flush.intervals.ms.per.topic=topic:1000
# default time based flush interval in ms
log.default.flush.interval.ms=1000
log.flush.interval.ms=1000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
log.flush.scheduler.interval.ms=1000

View File

@ -15,12 +15,12 @@
# see kafka.server.KafkaConfig for additional details and defaults
# the id of the broker
brokerid=3
broker.id=3
# hostname of broker. If not set, will pick up from the value returned
# from getLocalHost. If there are multiple interfaces getLocalHost
# may not be what you want.
# hostname=
# host.name=
# number of logical partitions on this broker
num.partitions=1
@ -35,13 +35,13 @@ num.threads=8
log.dir=/tmp/kafka-source3-logs
# the send buffer used by the socket server
socket.send.buffer=1048576
socket.send.buffer.bytes=1048576
# the receive buffer used by the socket server
socket.receive.buffer=1048576
socket.receive.buffer.bytes=1048576
# the maximum size of a log segment
log.file.size=10000000
log.segment.size=10000000
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
@ -50,7 +50,7 @@ log.cleanup.interval.mins=1
log.retention.hours=168
#the number of messages to accept without flushing the log to disk
log.flush.interval=600
log.flush.interval.messages=600
#set the following properties to use zookeeper
@ -63,14 +63,14 @@ enable.zookeeper=true
zk.connect=localhost:2181
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
# time based topic flush intervals in ms
#topic.flush.intervals.ms=topic:1000
#log.flush.intervals.ms.per.topic=topic:1000
# default time based flush interval in ms
log.default.flush.interval.ms=1000
log.flush.interval.ms=1000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
log.flush.scheduler.interval.ms=1000

View File

@ -15,12 +15,12 @@
# see kafka.server.KafkaConfig for additional details and defaults
# the id of the broker
brokerid=4
broker.id=4
# hostname of broker. If not set, will pick up from the value returned
# from getLocalHost. If there are multiple interfaces getLocalHost
# may not be what you want.
# hostname=
# host.name=
# number of logical partitions on this broker
num.partitions=1
@ -35,13 +35,13 @@ num.threads=8
log.dir=/tmp/kafka-source4-logs
# the send buffer used by the socket server
socket.send.buffer=1048576
socket.send.buffer.bytes=1048576
# the receive buffer used by the socket server
socket.receive.buffer=1048576
socket.receive.buffer.bytes=1048576
# the maximum size of a log segment
log.file.size=10000000
log.segment.bytes=10000000
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
@ -50,7 +50,7 @@ log.cleanup.interval.mins=1
log.retention.hours=168
#the number of messages to accept without flushing the log to disk
log.flush.interval=600
log.flush.interval.messages=600
#set the following properties to use zookeeper
@ -63,14 +63,14 @@ enable.zookeeper=true
zk.connect=localhost:2181
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
# time based topic flush intervals in ms
#topic.flush.intervals.ms=topic:1000
#log.flush.intervals.ms.per.topic=topic:1000
# default time based flush interval in ms
log.default.flush.interval.ms=1000
log.flush.interval.ms=1000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
log.flush.scheduler.interval.ms=1000

View File

@ -15,12 +15,12 @@
# see kafka.server.KafkaConfig for additional details and defaults
# the id of the broker
brokerid=1
broker.id=1
# hostname of broker. If not set, will pick up from the value returned
# from getLocalHost. If there are multiple interfaces getLocalHost
# may not be what you want.
# hostname=
# host.name=
# number of logical partitions on this broker
num.partitions=1
@ -35,13 +35,13 @@ num.threads=8
log.dir=/tmp/kafka-target1-logs
# the send buffer used by the socket server
socket.send.buffer=1048576
socket.send.buffer.bytes=1048576
# the receive buffer used by the socket server
socket.receive.buffer=1048576
socket.receive.buffer.bytes=1048576
# the maximum size of a log segment
log.file.size=10000000
log.segment.bytes=10000000
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
@ -50,7 +50,7 @@ log.cleanup.interval.mins=1
log.retention.hours=168
#the number of messages to accept without flushing the log to disk
log.flush.interval=600
log.flush.interval.messages=600
#set the following properties to use zookeeper
@ -63,16 +63,16 @@ enable.zookeeper=true
zk.connect=localhost:2182
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
# time based topic flush intervals in ms
#topic.flush.intervals.ms=topic:1000
#log.flush.intervals.ms.per.topic=topic:1000
# default time based flush interval in ms
log.default.flush.interval.ms=1000
log.flush.interval.ms=1000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
log.flush.scheduler.interval.ms=1000
# topic partition count map
# topic.partition.count.map=topic1:3, topic2:4

View File

@ -15,12 +15,12 @@
# see kafka.server.KafkaConfig for additional details and defaults
# the id of the broker
brokerid=2
broker.id=2
# hostname of broker. If not set, will pick up from the value returned
# from getLocalHost. If there are multiple interfaces getLocalHost
# may not be what you want.
# hostname=
# host.name=
# number of logical partitions on this broker
num.partitions=1
@ -35,13 +35,13 @@ num.threads=8
log.dir=/tmp/kafka-target2-logs
# the send buffer used by the socket server
socket.send.buffer=1048576
socket.send.buffer.bytes=1048576
# the receive buffer used by the socket server
socket.receive.buffer=1048576
socket.receive.buffer.bytes=1048576
# the maximum size of a log segment
log.file.size=10000000
log.segment.bytes=10000000
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
@ -50,7 +50,7 @@ log.cleanup.interval.mins=1
log.retention.hours=168
#the number of messages to accept without flushing the log to disk
log.flush.interval=600
log.flush.interval.messages=600
#set the following properties to use zookeeper
@ -63,16 +63,16 @@ enable.zookeeper=true
zk.connect=localhost:2182
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
# time based topic flush intervals in ms
#topic.flush.intervals.ms=topic:1000
#log.flush.intervals.ms.per.topic=topic:1000
# default time based flush interval in ms
log.default.flush.interval.ms=1000
log.flush.interval.ms=1000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
log.flush.scheduler.interval.ms=1000
# topic partition count map
# topic.partition.count.map=topic1:3, topic2:4

View File

@ -15,12 +15,12 @@
# see kafka.server.KafkaConfig for additional details and defaults
# the id of the broker
brokerid=3
broker.id=3
# hostname of broker. If not set, will pick up from the value returned
# from getLocalHost. If there are multiple interfaces getLocalHost
# may not be what you want.
# hostname=
# host.name=
# number of logical partitions on this broker
num.partitions=1
@ -35,13 +35,13 @@ num.threads=8
log.dir=/tmp/kafka-target3-logs
# the send buffer used by the socket server
socket.send.buffer=1048576
socket.send.buffer.bytes=1048576
# the receive buffer used by the socket server
socket.receive.buffer=1048576
socket.receive.buffer.bytes=1048576
# the maximum size of a log segment
log.file.size=10000000
log.segment.bytes=10000000
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
@ -50,7 +50,7 @@ log.cleanup.interval.mins=1
log.retention.hours=168
#the number of messages to accept without flushing the log to disk
log.flush.interval=600
log.flush.interval.messages=600
#set the following properties to use zookeeper
@ -63,16 +63,16 @@ enable.zookeeper=true
zk.connect=localhost:2182
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
# time based topic flush intervals in ms
#topic.flush.intervals.ms=topic:1000
#log.flush.intervals.ms.per.topic=topic:1000
# default time based flush interval in ms
log.default.flush.interval.ms=1000
log.flush.interval.ms=1000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
log.flush.scheduler.interval.ms=1000
# topic partition count map
# topic.partition.count.map=topic1:3, topic2:4

View File

@ -20,10 +20,10 @@
zk.connect=localhost:2181
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
#consumer group id
groupid=group1
group.id=group1
mirror.topics.whitelist=test_1,test_2
autooffset.reset=smallest
auto.offset.reset=smallest

View File

@ -72,7 +72,7 @@ kill_child_processes() {
# from the settings in config/server.properties while the brokerid and
# server port will be incremented accordingly
# 3. to generate properties files with non-default values such as
# "socket.send.buffer=2097152", simply add the property with new value
# "socket.send.buffer.bytes=2097152", simply add the property with new value
# to the array variable kafka_properties_to_replace as shown below
# =========================================================================
generate_kafka_properties_files() {
@ -103,10 +103,10 @@ generate_kafka_properties_files() {
# values. Other kafka properties can be added
# in a similar fashion.
# =============================================
# kafka_properties_to_replace[1]="socket.send.buffer=2097152"
# kafka_properties_to_replace[2]="socket.receive.buffer=2097152"
# kafka_properties_to_replace[1]="socket.send.buffer.bytes=2097152"
# kafka_properties_to_replace[2]="socket.receive.buffer.bytes=2097152"
# kafka_properties_to_replace[3]="num.partitions=3"
# kafka_properties_to_replace[4]="max.socket.request.bytes=10485760"
# kafka_properties_to_replace[4]="socket.request.max.bytes=10485760"
server_properties=`cat ${this_config_dir}/server.properties`

View File

@ -26,10 +26,10 @@ broker.list=localhost:9094,localhost:9095,localhost:9096
#zk.connect=
# zookeeper session timeout; default is 6000
#zk.sessiontimeout.ms=
#zk.session.timeout.ms=
# the max time that the client waits to establish a connection to zookeeper; default is 6000
#zk.connectiontimeout.ms
#zk.connection.timeout.ms
# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=
@ -46,35 +46,18 @@ serializer.class=kafka.serializer.DefaultEncoder
# allow topic level compression
#compressed.topics=
# max message size; messages larger than that size are discarded; default is 1000000
#max.message.size=
############################# Async Producer #############################
# maximum time, in milliseconds, for buffering data on the producer queue
#queue.time=
#queue.buffering.max.ms=
# the maximum size of the blocking queue for buffering on the producer
#queue.size=
#queue.buffering.max.messages=
# Timeout for event enqueue:
# 0: events will be enqueued immediately or dropped if the queue is full
# -ve: enqueue will block indefinitely if the queue is full
# +ve: enqueue will block up to this many milliseconds if the queue is full
#queue.enqueueTimeout.ms=
#queue.enqueue.timeout.ms=
# the number of messages batched at the producer
#batch.size=
# the callback handler for one or multiple events
#callback.handler=
# properties required to initialize the callback handler
#callback.handler.props=
# the handler for events
#event.handler=
# properties required to initialize the event handler
#event.handler.props=
#batch.num.messages=

View File

@ -17,12 +17,12 @@
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
brokerid=0
broker.id=0
# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost
# may not be what you want.
#hostname=
#host.name=
############################# Socket Server Settings #############################
@ -31,19 +31,19 @@ brokerid=0
port=9091
# The number of threads handling network requests
network.threads=2
num.network.threads=2
# The number of threads doing disk I/O
io.threads=2
num.io.threads=2
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer=1048576
socket.send.buffer.bytes=1048576
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer=1048576
socket.receive.buffer.bytes=1048576
# The maximum size of a request that the socket server will accept (protection against OOM)
max.socket.request.bytes=104857600
socket.request.max.bytes=104857600
############################# Log Basics #############################
@ -70,16 +70,16 @@ num.partitions=5
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
log.flush.interval=10000
log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
log.default.flush.interval.ms=1000
log.flush.interval.ms=1000
# Per-topic overrides for log.default.flush.interval.ms
#topic.flush.intervals.ms=topic1:1000, topic2:3000
# Per-topic overrides for log.flush.interval.ms
#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000
# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
log.default.flush.scheduler.interval.ms=1000
log.flush.scheduler.interval.ms=1000
############################# Log Retention Policy #############################
@ -92,13 +92,13 @@ log.default.flush.scheduler.interval.ms=1000
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.size.
#log.retention.size=1073741824
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.file.size=536870912
#log.file.size=102400
log.file.size=128
#log.segment.bytes=536870912
#log.segment.bytes=102400
log.segment.bytes=128
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
@ -117,6 +117,6 @@ enable.zookeeper=true
zk.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
monitoring.period.secs=1

View File

@ -20,9 +20,9 @@
zk.connect=localhost:2181
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
#consumer group id
groupid=group1
shallowiterator.enable=true
group.id=group1
shallow.iterator.enable=true

View File

@ -19,12 +19,12 @@ zk.connect=localhost:2183
# broker.list=1:localhost:9094,2:localhost:9095
# timeout in ms for connecting to zookeeper
# zk.connectiontimeout.ms=1000000
# zk.connection.timeout.ms=1000000
producer.type=async
# to avoid dropping events if the queue is full, wait indefinitely
queue.enqueueTimeout.ms=-1
queue.enqueue.timeout.ms=-1
num.producers.per.broker=2

View File

@ -15,12 +15,12 @@
# see kafka.server.KafkaConfig for additional details and defaults
# the id of the broker
brokerid=1
broker.id=1
# hostname of broker. If not set, will pick up from the value returned
# from getLocalHost. If there are multiple interfaces getLocalHost
# may not be what you want.
# hostname=
# host.name=
# number of logical partitions on this broker
num.partitions=1
@ -35,13 +35,13 @@ num.threads=8
log.dir=/tmp/kafka-source-1-1-logs
# the send buffer used by the socket server
socket.send.buffer=1048576
socket.send.buffer.bytes=1048576
# the receive buffer used by the socket server
socket.receive.buffer=1048576
socket.receive.buffer.bytes=1048576
# the maximum size of a log segment
log.file.size=10000000
log.segment.bytes=10000000
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
@ -50,7 +50,7 @@ log.cleanup.interval.mins=1
log.retention.hours=168
#the number of messages to accept without flushing the log to disk
log.flush.interval=600
log.flush.interval.messages=600
#set the following properties to use zookeeper
@ -63,14 +63,14 @@ enable.zookeeper=true
zk.connect=localhost:2181
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
# time based topic flush intervals in ms
#topic.flush.intervals.ms=topic:1000
#log.flush.intervals.ms.per.topic=topic:1000
# default time based flush interval in ms
log.default.flush.interval.ms=1000
log.flush.interval.ms=1000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
log.flush.scheduler.interval.ms=1000

View File

@ -15,12 +15,12 @@
# see kafka.server.KafkaConfig for additional details and defaults
# the id of the broker
brokerid=2
broker.id=2
# hostname of broker. If not set, will pick up from the value returned
# from getLocalHost. If there are multiple interfaces getLocalHost
# may not be what you want.
# hostname=
# host.name=
# number of logical partitions on this broker
num.partitions=1
@ -35,13 +35,13 @@ num.threads=8
log.dir=/tmp/kafka-source-1-2-logs
# the send buffer used by the socket server
socket.send.buffer=1048576
socket.send.buffer.bytes=1048576
# the receive buffer used by the socket server
socket.receive.buffer=1048576
socket.receive.buffer.bytes=1048576
# the maximum size of a log segment
log.file.size=536870912
log.segment.bytes=536870912
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
@ -50,7 +50,7 @@ log.cleanup.interval.mins=1
log.retention.hours=168
#the number of messages to accept without flushing the log to disk
log.flush.interval=600
log.flush.interval.messages=600
#set the following properties to use zookeeper
@ -63,14 +63,14 @@ enable.zookeeper=true
zk.connect=localhost:2181
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
# time based topic flush intervals in ms
#topic.flush.intervals.ms=topic:1000
#log.flush.intervals.ms.per.topic=topic:1000
# default time based flush interval in ms
log.default.flush.interval.ms=1000
log.flush.interval.ms=1000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
log.flush.scheduler.interval.ms=1000

View File

@ -15,12 +15,12 @@
# see kafka.server.KafkaConfig for additional details and defaults
# the id of the broker
brokerid=1
broker.id=1
# hostname of broker. If not set, will pick up from the value returned
# from getLocalHost. If there are multiple interfaces getLocalHost
# may not be what you want.
# hostname=
# host.name=
# number of logical partitions on this broker
num.partitions=1
@ -35,13 +35,13 @@ num.threads=8
log.dir=/tmp/kafka-source-2-1-logs
# the send buffer used by the socket server
socket.send.buffer=1048576
socket.send.buffer.bytes=1048576
# the receive buffer used by the socket server
socket.receive.buffer=1048576
socket.receive.buffer.bytes=1048576
# the maximum size of a log segment
log.file.size=536870912
log.segment.bytes=536870912
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
@ -50,7 +50,7 @@ log.cleanup.interval.mins=1
log.retention.hours=168
#the number of messages to accept without flushing the log to disk
log.flush.interval=600
log.flush.interval.messages=600
#set the following properties to use zookeeper
@ -63,14 +63,14 @@ enable.zookeeper=true
zk.connect=localhost:2182
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
# time based topic flush intervals in ms
#topic.flush.intervals.ms=topic:1000
#log.flush.intervals.ms.per.topic=topic:1000
# default time based flush interval in ms
log.default.flush.interval.ms=1000
log.flush.interval.ms=1000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
log.flush.scheduler.interval.ms=1000

View File

@ -15,12 +15,12 @@
# see kafka.server.KafkaConfig for additional details and defaults
# the id of the broker
brokerid=2
broker.id=2
# hostname of broker. If not set, will pick up from the value returned
# from getLocalHost. If there are multiple interfaces getLocalHost
# may not be what you want.
# hostname=
# host.name=
# number of logical partitions on this broker
num.partitions=1
@ -35,13 +35,13 @@ num.threads=8
log.dir=/tmp/kafka-source-2-2-logs
# the send buffer used by the socket server
socket.send.buffer=1048576
socket.send.buffer.bytes=1048576
# the receive buffer used by the socket server
socket.receive.buffer=1048576
socket.receive.buffer.bytes=1048576
# the maximum size of a log segment
log.file.size=536870912
log.segment.bytes=536870912
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
@ -50,7 +50,7 @@ log.cleanup.interval.mins=1
log.retention.hours=168
#the number of messages to accept without flushing the log to disk
log.flush.interval=600
log.flush.interval.messages=600
#set the following properties to use zookeeper
@ -63,14 +63,14 @@ enable.zookeeper=true
zk.connect=localhost:2182
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
# time based topic flush intervals in ms
#topic.flush.intervals.ms=topic:1000
#log.flush.intervals.ms.per.topic=topic:1000
# default time based flush interval in ms
log.default.flush.interval.ms=1000
log.flush.interval.ms=1000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
log.flush.scheduler.interval.ms=1000

View File

@ -15,12 +15,12 @@
# see kafka.server.KafkaConfig for additional details and defaults
# the id of the broker
brokerid=1
broker.id=1
# hostname of broker. If not set, will pick up from the value returned
# from getLocalHost. If there are multiple interfaces getLocalHost
# may not be what you want.
# hostname=
# host.name=
# number of logical partitions on this broker
num.partitions=1
@ -35,13 +35,13 @@ num.threads=8
log.dir=/tmp/kafka-target-1-1-logs
# the send buffer used by the socket server
socket.send.buffer=1048576
socket.send.buffer.bytes=1048576
# the receive buffer used by the socket server
socket.receive.buffer=1048576
socket.receive.buffer.bytes=1048576
# the maximum size of a log segment
log.file.size=536870912
log.segment.bytes=536870912
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
@ -50,7 +50,7 @@ log.cleanup.interval.mins=1
log.retention.hours=168
#the number of messages to accept without flushing the log to disk
log.flush.interval=600
log.flush.interval.messages=600
#set the following properties to use zookeeper
@ -63,16 +63,16 @@ enable.zookeeper=true
zk.connect=localhost:2183
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
# time based topic flush intervals in ms
#topic.flush.intervals.ms=topic:1000
#log.flush.intervals.ms.per.topic=topic:1000
# default time based flush interval in ms
log.default.flush.interval.ms=1000
log.flush.interval.ms=1000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
log.flush.scheduler.interval.ms=1000
# topic partition count map
# topic.partition.count.map=topic1:3, topic2:4

View File

@ -15,12 +15,12 @@
# see kafka.server.KafkaConfig for additional details and defaults
# the id of the broker
brokerid=2
broker.id=2
# hostname of broker. If not set, will pick up from the value returned
# from getLocalHost. If there are multiple interfaces getLocalHost
# may not be what you want.
# hostname=
# host.name=
# number of logical partitions on this broker
num.partitions=1
@ -35,13 +35,13 @@ num.threads=8
log.dir=/tmp/kafka-target-1-2-logs
# the send buffer used by the socket server
socket.send.buffer=1048576
socket.send.buffer.bytes=1048576
# the receive buffer used by the socket server
socket.receive.buffer=1048576
socket.receive.buffer.bytes=1048576
# the maximum size of a log segment
log.file.size=536870912
log.segment.bytes=536870912
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
@ -50,7 +50,7 @@ log.cleanup.interval.mins=1
log.retention.hours=168
#the number of messages to accept without flushing the log to disk
log.flush.interval=600
log.flush.interval.messages=600
#set the following properties to use zookeeper
@ -63,16 +63,16 @@ enable.zookeeper=true
zk.connect=localhost:2183
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
# time based topic flush intervals in ms
#topic.flush.intervals.ms=topic:1000
#log.flush.intervals.ms.per.topic=topic:1000
# default time based flush interval in ms
log.default.flush.interval.ms=1000
log.flush.interval.ms=1000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
log.flush.scheduler.interval.ms=1000
# topic partition count map
# topic.partition.count.map=topic1:3, topic2:4

View File

@ -20,9 +20,9 @@
zk.connect=localhost:2181
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
#consumer group id
groupid=group1
shallowiterator.enable=true
group.id=group1
shallow.iterator.enable=true

View File

@ -20,9 +20,9 @@
zk.connect=localhost:2182
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
#consumer group id
groupid=group1
shallowiterator.enable=true
group.id=group1
shallow.iterator.enable=true

View File

@ -1,12 +1,12 @@
zk.connect=localhost:2108
zk.connectiontimeout.ms=1000000
groupid=mm_regtest_grp
autocommit.interval.ms=120000
autooffset.reset=smallest
#fetch.size=1048576
#rebalance.retries.max=4
zk.connection.timeout.ms=1000000
group.id=mm_regtest_grp
auto.commit.interval.ms=120000
auto.offset.reset=smallest
#fetch.message.max.bytes=1048576
#rebalance.max.retries=4
#rebalance.backoff.ms=2000
socket.buffersize=1048576
fetch.size=1048576
zk.synctime.ms=15000
shallowiterator.enable=true
socket.receive.buffer.bytes=1048576
fetch.message.max.bytes=1048576
zk.sync.time.ms=15000
shallow.iterator.enable=true

View File

@ -1,5 +1,5 @@
producer.type=async
queue.enqueueTimeout.ms=-1
queue.enqueue.timeout.ms=-1
broker.list=localhost:9094
compression.codec=0

View File

@ -17,12 +17,12 @@
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
brokerid=0
broker.id=0
# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost
# may not be what you want.
#hostname=
#host.name=
############################# Socket Server Settings #############################
@ -31,19 +31,19 @@ brokerid=0
port=9091
# The number of threads handling network requests
network.threads=2
num.network.threads=2
# The number of threads doing disk I/O
io.threads=2
num.io.threads=2
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer=1048576
socket.send.buffer.bytes=1048576
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer=1048576
socket.receive.buffer.bytes=1048576
# The maximum size of a request that the socket server will accept (protection against OOM)
max.socket.request.bytes=104857600
socket.request.max.bytes=104857600
############################# Log Basics #############################
@ -70,16 +70,16 @@ num.partitions=5
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
log.flush.interval=10000
log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
log.default.flush.interval.ms=1000
log.flush.interval.ms=1000
# Per-topic overrides for log.default.flush.interval.ms
#topic.flush.intervals.ms=topic1:1000, topic2:3000
# Per-topic overrides for log.flush.interval.ms
#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000
# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
log.default.flush.scheduler.interval.ms=1000
log.flush.scheduler.interval.ms=1000
############################# Log Retention Policy #############################
@ -92,13 +92,13 @@ log.default.flush.scheduler.interval.ms=1000
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.size.
#log.retention.size=1073741824
log.retention.size=-1
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
log.retention.bytes=-1
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.file.size=536870912
log.file.size=102400
#log.segment.size=536870912
log.segment.bytes=102400
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
@ -117,23 +117,23 @@ enable.zookeeper=true
zk.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
monitoring.period.secs=1
max.message.size=1000000
max.queued.requests=500
message.max.bytes=1000000
queued.max.requests=500
log.roll.hours=168
log.index.max.size=10485760
log.index.size.max.bytes=10485760
log.index.interval.bytes=4096
auto.create.topics=true
auto.create.topics.enable=true
controller.socket.timeout.ms=30000
controller.message.queue.size=10
default.replication.factor=1
replica.max.lag.time.ms=10000
replica.max.lag.bytes=4000
replica.lag.time.max.ms=10000
replica.lag.max.messages=4000
replica.socket.timeout.ms=30000
replica.socket.buffersize=65536
replica.fetch.size=1048576
replica.fetch.wait.time.ms=500
replica.socket.receive.buffer.bytes=65536
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.fetch.min.bytes=4096
replica.fetchers=1
num.replica.fetchers=1

View File

@ -15,12 +15,12 @@
# see kafka.server.KafkaConfig for additional details and defaults
# the id of the broker
brokerid=0
broker.id=0
# hostname of broker. If not set, will pick up from the value returned
# from getLocalHost. If there are multiple interfaces getLocalHost
# may not be what you want.
# hostname=
# host.name=
# number of logical partitions on this broker
num.partitions=1
@ -35,13 +35,13 @@ num.threads=8
log.dir=/tmp/kafka-logs
# the send buffer used by the socket server
socket.send.buffer=1048576
socket.send.buffer.bytes=1048576
# the receive buffer used by the socket server
socket.receive.buffer=1048576
socket.receive.buffer.bytes=1048576
# the maximum size of a log segment
log.file.size=536870912
log.segment.bytes=536870912
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
@ -50,7 +50,7 @@ log.cleanup.interval.mins=1
log.retention.hours=168
#the number of messages to accept without flushing the log to disk
log.flush.interval=600
log.flush.interval.messages=600
#set the following properties to use zookeeper
@ -63,16 +63,16 @@ enable.zookeeper=true
zk.connect=localhost:2181
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
# time based topic flush intervals in ms
#topic.flush.intervals.ms=topic:1000
#log.flush.intervals.ms.per.topic=topic:1000
# default time based flush interval in ms
log.default.flush.interval.ms=1000
log.flush.interval.ms=1000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
log.flush.scheduler.interval.ms=1000
# topic partition count map
# topic.partition.count.map=topic1:3, topic2:4

View File

@ -17,12 +17,12 @@
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
brokerid=0
broker.id=0
# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost
# may not be what you want.
#hostname=
#host.name=
############################# Socket Server Settings #############################
@ -31,19 +31,19 @@ brokerid=0
port=9091
# The number of threads handling network requests
network.threads=2
num.network.threads=2
# The number of threads doing disk I/O
io.threads=2
num.io.threads=2
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer=1048576
socket.send.buffer.bytes=1048576
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer=1048576
socket.receive.buffer.bytes=1048576
# The maximum size of a request that the socket server will accept (protection against OOM)
max.socket.request.bytes=104857600
socket.request.max.bytes=104857600
############################# Log Basics #############################
@ -70,16 +70,16 @@ num.partitions=5
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
log.flush.interval=10000
log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
log.default.flush.interval.ms=1000
log.flush.interval.ms=1000
# Per-topic overrides for log.default.flush.interval.ms
#topic.flush.intervals.ms=topic1:1000, topic2:3000
# Per-topic overrides for log.flush.interval.ms
#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000
# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
log.default.flush.scheduler.interval.ms=1000
log.flush.scheduler.interval.ms=1000
############################# Log Retention Policy #############################
@ -92,13 +92,13 @@ log.default.flush.scheduler.interval.ms=1000
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.size.
#log.retention.size=1073741824
log.retention.size=-1
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
log.retention.bytes=-1
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.file.size=536870912
log.file.size=102400
#log.segment.size=536870912
log.segment.bytes=102400
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
@ -117,23 +117,23 @@ enable.zookeeper=true
zk.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
monitoring.period.secs=1
max.message.size=1000000
max.queued.requests=500
message.max.bytes=1000000
queued.max.requests=500
log.roll.hours=168
log.index.max.size=10485760
log.index.size.max.bytes=10485760
log.index.interval.bytes=4096
auto.create.topics=true
auto.create.topics.enable=true
controller.socket.timeout.ms=30000
controller.message.queue.size=10
default.replication.factor=1
replica.max.lag.time.ms=10000
replica.max.lag.bytes=4000
replica.lag.time.max.ms=10000
replica.lag.max.messages=4000
replica.socket.timeout.ms=30000
replica.socket.buffersize=65536
replica.fetch.size=1048576
replica.fetch.wait.time.ms=500
replica.socket.receive.buffer.bytes=65536
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.fetch.min.bytes=4096
replica.fetchers=1
num.replica.fetchers=1