KAFKA-622 Create mbeans per client; patched by Swapnil; reviewed by Neha Narkhede

git-svn-id: https://svn.apache.org/repos/asf/kafka/branches/0.8@1415021 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Neha Narkhede 2012-11-29 01:31:18 +00:00
parent e556063520
commit d7c71c0949
63 changed files with 440 additions and 310 deletions

View File

@ -109,7 +109,7 @@ public class KafkaETLContext {
// read data from queue // read data from queue
URI uri = _request.getURI(); URI uri = _request.getURI();
_consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), _timeout, _bufferSize); _consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), _timeout, _bufferSize, "KafkaETLContext");
// get available offset range // get available offset range
_offsetRange = getOffsetRange(); _offsetRange = getOffsetRange();

View File

@ -33,7 +33,7 @@ object FetchRequest {
val CurrentVersion = 1.shortValue() val CurrentVersion = 1.shortValue()
val DefaultMaxWait = 0 val DefaultMaxWait = 0
val DefaultMinBytes = 0 val DefaultMinBytes = 0
val ReplicaFetcherClientId = "replica fetcher" val ReplicaFetcherClientId = "replica-fetcher"
val DefaultCorrelationId = 0 val DefaultCorrelationId = 0
def readFrom(buffer: ByteBuffer): FetchRequest = { def readFrom(buffer: ByteBuffer): FetchRequest = {

View File

@ -12,14 +12,14 @@ import kafka.utils.{Utils, Logging}
*/ */
object ClientUtils extends Logging{ object ClientUtils extends Logging{
def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker]): TopicMetadataResponse = { def fetchTopicMetadata(topics: Set[String], clientId: String, brokers: Seq[Broker]): TopicMetadataResponse = {
var fetchMetaDataSucceeded: Boolean = false var fetchMetaDataSucceeded: Boolean = false
var i: Int = 0 var i: Int = 0
val topicMetadataRequest = new TopicMetadataRequest(topics.toSeq) val topicMetadataRequest = new TopicMetadataRequest(topics.toSeq)
var topicMetadataResponse: TopicMetadataResponse = null var topicMetadataResponse: TopicMetadataResponse = null
var t: Throwable = null var t: Throwable = null
while(i < brokers.size && !fetchMetaDataSucceeded) { while(i < brokers.size && !fetchMetaDataSucceeded) {
val producer: SyncProducer = ProducerPool.createSyncProducer(None, brokers(i)) val producer: SyncProducer = ProducerPool.createSyncProducer(clientId + "-FetchTopicMetadata", brokers(i))
info("Fetching metadata for topic %s".format(topics)) info("Fetching metadata for topic %s".format(topics))
try { try {
topicMetadataResponse = producer.send(topicMetadataRequest) topicMetadataResponse = producer.send(topicMetadataRequest)

View File

@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.common
class InvalidClientIdException(message: String) extends RuntimeException(message) {
def this() = this(null)
}

View File

@ -54,7 +54,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
try { try {
trace("Partitions without leader %s".format(noLeaderPartitionSet)) trace("Partitions without leader %s".format(noLeaderPartitionSet))
val brokers = getAllBrokersInCluster(zkClient) val brokers = getAllBrokersInCluster(zkClient)
val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers).topicsMetadata val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, config.clientId, brokers).topicsMetadata
val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker] val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
topicsMetadata.foreach( topicsMetadata.foreach(
tmd => { tmd => {

View File

@ -34,7 +34,8 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
consumerTimeoutMs: Int, consumerTimeoutMs: Int,
private val keyDecoder: Decoder[K], private val keyDecoder: Decoder[K],
private val valueDecoder: Decoder[V], private val valueDecoder: Decoder[V],
val enableShallowIterator: Boolean) val enableShallowIterator: Boolean,
val consumerTopicStats: ConsumerTopicStats)
extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging { extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null) private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
@ -48,8 +49,8 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
currentTopicInfo.resetConsumeOffset(consumedOffset) currentTopicInfo.resetConsumeOffset(consumedOffset)
val topic = currentTopicInfo.topic val topic = currentTopicInfo.topic
trace("Setting %s consumed offset to %d".format(topic, consumedOffset)) trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
ConsumerTopicStat.getConsumerTopicStat(topic).messageRate.mark() consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
ConsumerTopicStat.getConsumerAllTopicStat().messageRate.mark() consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
item item
} }

View File

@ -1,40 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.consumer
import kafka.utils.{Pool, threadsafe, Logging}
import java.util.concurrent.TimeUnit
import kafka.metrics.KafkaMetricsGroup
@threadsafe
class ConsumerTopicStat(name: String) extends KafkaMetricsGroup {
val messageRate = newMeter(name + "MessagesPerSec", "messages", TimeUnit.SECONDS)
val byteRate = newMeter(name + "BytesPerSec", "bytes", TimeUnit.SECONDS)
}
object ConsumerTopicStat extends Logging {
private val valueFactory = (k: String) => new ConsumerTopicStat(k)
private val stats = new Pool[String, ConsumerTopicStat](Some(valueFactory))
private val allTopicStat = new ConsumerTopicStat("AllTopics")
def getConsumerAllTopicStat(): ConsumerTopicStat = allTopicStat
def getConsumerTopicStat(topic: String): ConsumerTopicStat = {
stats.getAndMaybePut(topic + "-")
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.consumer
import kafka.utils.{ClientIdAndTopic, Pool, threadsafe, Logging}
import java.util.concurrent.TimeUnit
import kafka.metrics.KafkaMetricsGroup
@threadsafe
class ConsumerTopicMetrics(clientIdTopic: ClientIdAndTopic) extends KafkaMetricsGroup {
val messageRate = newMeter(clientIdTopic + "-MessagesPerSec", "messages", TimeUnit.SECONDS)
val byteRate = newMeter(clientIdTopic + "-BytesPerSec", "bytes", TimeUnit.SECONDS)
}
class ConsumerTopicStats(clientId: String) extends Logging {
private val valueFactory = (k: ClientIdAndTopic) => new ConsumerTopicMetrics(k)
private val stats = new Pool[ClientIdAndTopic, ConsumerTopicMetrics](Some(valueFactory))
private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics"))
def getConsumerAllTopicStats(): ConsumerTopicMetrics = allTopicStats
def getConsumerTopicStats(topic: String): ConsumerTopicMetrics = {
stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
}
}

View File

@ -26,11 +26,12 @@ class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int, consumerTimeoutMs: Int,
private val keyDecoder: Decoder[K], private val keyDecoder: Decoder[K],
private val valueDecoder: Decoder[V], private val valueDecoder: Decoder[V],
val enableShallowIterator: Boolean) val enableShallowIterator: Boolean,
val consumerTopicStats: ConsumerTopicStats)
extends Iterable[MessageAndMetadata[K,V]] with java.lang.Iterable[MessageAndMetadata[K,V]] { extends Iterable[MessageAndMetadata[K,V]] with java.lang.Iterable[MessageAndMetadata[K,V]] {
private val iter: ConsumerIterator[K,V] = private val iter: ConsumerIterator[K,V] =
new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator) new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator, consumerTopicStats)
/** /**
* Create an iterator over messages in the stream. * Create an iterator over messages in the stream.

View File

@ -28,7 +28,8 @@ class PartitionTopicInfo(val topic: String,
private val chunkQueue: BlockingQueue[FetchedDataChunk], private val chunkQueue: BlockingQueue[FetchedDataChunk],
private val consumedOffset: AtomicLong, private val consumedOffset: AtomicLong,
private val fetchedOffset: AtomicLong, private val fetchedOffset: AtomicLong,
private val fetchSize: AtomicInteger) extends Logging { private val fetchSize: AtomicInteger,
private val consumerTopicStats: ConsumerTopicStats) extends Logging {
debug("initial consumer offset of " + this + " is " + consumedOffset.get) debug("initial consumer offset of " + this + " is " + consumedOffset.get)
debug("initial fetch offset of " + this + " is " + fetchedOffset.get) debug("initial fetch offset of " + this + " is " + fetchedOffset.get)
@ -58,8 +59,8 @@ class PartitionTopicInfo(val topic: String,
chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get)) chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))
fetchedOffset.set(next) fetchedOffset.set(next)
debug("updated fetch offset of (%s) to %d".format(this, next)) debug("updated fetch offset of (%s) to %d".format(this, next))
ConsumerTopicStat.getConsumerTopicStat(topic).byteRate.mark(size) consumerTopicStats.getConsumerTopicStats(topic).byteRate.mark(size)
ConsumerTopicStat.getConsumerAllTopicStat().byteRate.mark(size) consumerTopicStats.getConsumerAllTopicStats().byteRate.mark(size)
} }
} }

View File

@ -31,12 +31,12 @@ import kafka.cluster.Broker
object SimpleConsumer extends Logging { object SimpleConsumer extends Logging {
def earliestOrLatestOffset(broker: Broker, topic: String, partitionId: Int, earliestOrLatest: Long, def earliestOrLatestOffset(broker: Broker, topic: String, partitionId: Int, earliestOrLatest: Long,
isFromOrdinaryConsumer: Boolean): Long = { clientId: String, isFromOrdinaryConsumer: Boolean): Long = {
var simpleConsumer: SimpleConsumer = null var simpleConsumer: SimpleConsumer = null
var producedOffset: Long = -1L var producedOffset: Long = -1L
try { try {
simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout, simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout,
ConsumerConfig.SocketBufferSize) ConsumerConfig.SocketBufferSize, clientId)
val topicAndPartition = TopicAndPartition(topic, partitionId) val topicAndPartition = TopicAndPartition(topic, partitionId)
val request = if(isFromOrdinaryConsumer) val request = if(isFromOrdinaryConsumer)
new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1))) new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)))
@ -56,14 +56,14 @@ object SimpleConsumer extends Logging {
} }
def earliestOrLatestOffset(zkClient: ZkClient, topic: String, brokerId: Int, partitionId: Int, def earliestOrLatestOffset(zkClient: ZkClient, topic: String, brokerId: Int, partitionId: Int,
earliestOrLatest: Long, isFromOrdinaryConsumer: Boolean = true): Long = { earliestOrLatest: Long, clientId: String, isFromOrdinaryConsumer: Boolean = true): Long = {
val cluster = getCluster(zkClient) val cluster = getCluster(zkClient)
val broker = cluster.getBroker(brokerId) match { val broker = cluster.getBroker(brokerId) match {
case Some(b) => b case Some(b) => b
case None => throw new KafkaException("Broker " + brokerId + " is unavailable. Cannot issue " + case None => throw new KafkaException("Broker " + brokerId + " is unavailable. Cannot issue " +
"getOffsetsBefore request") "getOffsetsBefore request")
} }
earliestOrLatestOffset(broker, topic, partitionId, earliestOrLatest, isFromOrdinaryConsumer) earliestOrLatestOffset(broker, topic, partitionId, earliestOrLatest, clientId, isFromOrdinaryConsumer)
} }
} }
@ -75,10 +75,13 @@ object SimpleConsumer extends Logging {
class SimpleConsumer(val host: String, class SimpleConsumer(val host: String,
val port: Int, val port: Int,
val soTimeout: Int, val soTimeout: Int,
val bufferSize: Int) extends Logging { val bufferSize: Int,
val clientId: String) extends Logging {
ClientId.validate(clientId)
private val lock = new Object() private val lock = new Object()
private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout) private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
private val fetchRequestAndResponseStats = new FetchRequestAndResponseStats(clientId, "host_" + host + "-port_" + port)
private def connect(): BlockingChannel = { private def connect(): BlockingChannel = {
close close
@ -143,12 +146,12 @@ class SimpleConsumer(val host: String,
*/ */
def fetch(request: FetchRequest): FetchResponse = { def fetch(request: FetchRequest): FetchResponse = {
var response: Receive = null var response: Receive = null
FetchRequestAndResponseStat.requestTimer.time { fetchRequestAndResponseStats.requestTimer.time {
response = sendRequest(request) response = sendRequest(request)
} }
val fetchResponse = FetchResponse.readFrom(response.buffer) val fetchResponse = FetchResponse.readFrom(response.buffer)
val fetchedSize = fetchResponse.sizeInBytes val fetchedSize = fetchResponse.sizeInBytes
FetchRequestAndResponseStat.respondSizeHist.update(fetchedSize) fetchRequestAndResponseStats.respondSizeHist.update(fetchedSize)
fetchResponse fetchResponse
} }
@ -166,7 +169,7 @@ class SimpleConsumer(val host: String,
} }
} }
object FetchRequestAndResponseStat extends KafkaMetricsGroup { class FetchRequestAndResponseStats(clientId: String, brokerInfo: String) extends KafkaMetricsGroup {
val requestTimer = new KafkaTimer(newTimer("FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) val requestTimer = new KafkaTimer(newTimer(clientId + "-" + brokerInfo + "-FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
val respondSizeHist = newHistogram("FetchResponseSize") val respondSizeHist = newHistogram(clientId + "-" + brokerInfo + "-FetchResponseSize")
} }

View File

@ -35,7 +35,6 @@ import kafka.client.ClientUtils
import com.yammer.metrics.core.Gauge import com.yammer.metrics.core.Gauge
import kafka.api.OffsetRequest import kafka.api.OffsetRequest
import kafka.metrics._ import kafka.metrics._
import kafka.producer.ProducerConfig
/** /**
@ -80,6 +79,8 @@ private[kafka] object ZookeeperConsumerConnector {
private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val enableFetcher: Boolean) // for testing only val enableFetcher: Boolean) // for testing only
extends ConsumerConnector with Logging with KafkaMetricsGroup { extends ConsumerConnector with Logging with KafkaMetricsGroup {
ClientId.validate(config.clientId)
private val isShuttingDown = new AtomicBoolean(false) private val isShuttingDown = new AtomicBoolean(false)
private val rebalanceLock = new Object private val rebalanceLock = new Object
private var fetcher: Option[ConsumerFetcherManager] = None private var fetcher: Option[ConsumerFetcherManager] = None
@ -94,6 +95,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
private val consumerTopicStats = new ConsumerTopicStats(config.clientId)
val consumerIdString = { val consumerIdString = {
var consumerUuid : String = null var consumerUuid : String = null
config.consumerId match { config.consumerId match {
@ -195,7 +198,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
threadIdSet.map(_ => { threadIdSet.map(_ => {
val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks) val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
val stream = new KafkaStream[K,V]( val stream = new KafkaStream[K,V](
queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator) queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator, consumerTopicStats)
(queue, stream) (queue, stream)
}) })
).flatten.toList ).flatten.toList
@ -399,7 +402,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
val consumersPerTopicMap = getConsumersPerTopic(zkClient, group) val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
val brokers = getAllBrokersInCluster(zkClient) val brokers = getAllBrokersInCluster(zkClient)
val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers).topicsMetadata val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, config.clientId, brokers).topicsMetadata
val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]] val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
val leaderIdForPartitionsMap = new mutable.HashMap[(String, Int), Int] val leaderIdForPartitionsMap = new mutable.HashMap[(String, Int), Int]
topicsMetadata.foreach(m =>{ topicsMetadata.foreach(m =>{
@ -595,9 +598,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
case None => case None =>
config.autoOffsetReset match { config.autoOffsetReset match {
case OffsetRequest.SmallestTimeString => case OffsetRequest.SmallestTimeString =>
SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.EarliestTime) SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.EarliestTime, config.clientId)
case OffsetRequest.LargestTimeString => case OffsetRequest.LargestTimeString =>
SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.LatestTime) SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.LatestTime, config.clientId)
case _ => case _ =>
throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig") throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig")
} }
@ -611,7 +614,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
queue, queue,
consumedOffset, consumedOffset,
fetchedOffset, fetchedOffset,
new AtomicInteger(config.fetchSize)) new AtomicInteger(config.fetchSize),
consumerTopicStats)
partTopicInfoMap.put(partition, partTopicInfo) partTopicInfoMap.put(partition, partTopicInfo)
debug(partTopicInfo + " selected new offset " + offset) debug(partTopicInfo + " selected new offset " + offset)
} }
@ -667,7 +671,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val q = e._2._1 val q = e._2._1
topicThreadIdAndQueues.put(topicThreadId, q) topicThreadIdAndQueues.put(topicThreadId, q)
newGauge( newGauge(
config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize", config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
new Gauge[Int] { new Gauge[Int] {
def getValue = q.size def getValue = q.size
} }
@ -714,7 +718,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
config.consumerTimeoutMs, config.consumerTimeoutMs,
keyDecoder, keyDecoder,
valueDecoder, valueDecoder,
config.enableShallowIterator) config.enableShallowIterator,
consumerTopicStats)
(queue, stream) (queue, stream)
}).toList }).toList

View File

@ -961,7 +961,7 @@ case class PartitionAndReplica(topic: String, partition: Int, replica: Int)
case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int)
object ControllerStat extends KafkaMetricsGroup { object ControllerStats extends KafkaMetricsGroup {
val offlinePartitionRate = newMeter("OfflinePartitionsPerSec", "partitions", TimeUnit.SECONDS) val offlinePartitionRate = newMeter("OfflinePartitionsPerSec", "partitions", TimeUnit.SECONDS)
val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS) val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)
val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))

View File

@ -58,12 +58,12 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
.format(liveAssignedReplicasToThisPartition.mkString(","))) .format(liveAssignedReplicasToThisPartition.mkString(",")))
liveAssignedReplicasToThisPartition.isEmpty match { liveAssignedReplicasToThisPartition.isEmpty match {
case true => case true =>
ControllerStat.offlinePartitionRate.mark() ControllerStats.offlinePartitionRate.mark()
throw new PartitionOfflineException(("No replica for partition " + throw new PartitionOfflineException(("No replica for partition " +
"([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, controllerContext.liveBrokerIds)) + "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, controllerContext.liveBrokerIds)) +
" Assigned replicas are: [%s]".format(assignedReplicas)) " Assigned replicas are: [%s]".format(assignedReplicas))
case false => case false =>
ControllerStat.uncleanLeaderElectionRate.mark() ControllerStats.uncleanLeaderElectionRate.mark()
val newLeader = liveAssignedReplicasToThisPartition.head val newLeader = liveAssignedReplicasToThisPartition.head
warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) + warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) +
"There's potential data loss") "There's potential data loss")
@ -78,7 +78,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
partition)) partition))
(newLeaderAndIsr, liveAssignedReplicasToThisPartition) (newLeaderAndIsr, liveAssignedReplicasToThisPartition)
case None => case None =>
ControllerStat.offlinePartitionRate.mark() ControllerStats.offlinePartitionRate.mark()
throw new PartitionOfflineException("Partition [%s, %d] doesn't have".format(topic, partition) + throw new PartitionOfflineException("Partition [%s, %d] doesn't have".format(topic, partition) +
"replicas assigned to it") "replicas assigned to it")
} }

View File

@ -223,7 +223,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r)) val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
liveAssignedReplicas.size match { liveAssignedReplicas.size match {
case 0 => case 0 =>
ControllerStat.offlinePartitionRate.mark() ControllerStats.offlinePartitionRate.mark()
throw new StateChangeFailedException(("During state change of partition %s from NEW to ONLINE, assigned replicas are " + throw new StateChangeFailedException(("During state change of partition %s from NEW to ONLINE, assigned replicas are " +
"[%s], live brokers are [%s]. No assigned replica is alive").format(topicAndPartition, "[%s], live brokers are [%s]. No assigned replica is alive").format(topicAndPartition,
replicaAssignment.mkString(","), controllerContext.liveBrokerIds)) replicaAssignment.mkString(","), controllerContext.liveBrokerIds))
@ -249,7 +249,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
// read the controller epoch // read the controller epoch
val leaderIsrAndEpoch = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic, val leaderIsrAndEpoch = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic,
topicAndPartition.partition).get topicAndPartition.partition).get
ControllerStat.offlinePartitionRate.mark() ControllerStats.offlinePartitionRate.mark()
throw new StateChangeFailedException("Error while changing partition %s's state from New to Online" throw new StateChangeFailedException("Error while changing partition %s's state from New to Online"
.format(topicAndPartition) + " since Leader and isr path already exists with value " + .format(topicAndPartition) + " since Leader and isr path already exists with value " +
"%s and controller epoch %d".format(leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch)) "%s and controller epoch %d".format(leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch))

View File

@ -227,7 +227,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
class BrokerChangeListener() extends IZkChildListener with Logging { class BrokerChangeListener() extends IZkChildListener with Logging {
this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: " this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: "
def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) { def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
ControllerStat.leaderElectionTimer.time { ControllerStats.leaderElectionTimer.time {
info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(","))) info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(",")))
if(!isShuttingDown.get()) { if(!isShuttingDown.get()) {
controllerContext.controllerLock synchronized { controllerContext.controllerLock synchronized {

View File

@ -29,8 +29,10 @@ import kafka.javaapi.OffsetRequest
class SimpleConsumer(val host: String, class SimpleConsumer(val host: String,
val port: Int, val port: Int,
val soTimeout: Int, val soTimeout: Int,
val bufferSize: Int) { val bufferSize: Int,
private val underlying = new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize) val clientId: String) {
private val underlying = new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize, clientId)
/** /**
* Fetch a set of messages from a topic. This version of the fetch method * Fetch a set of messages from a topic. This version of the fetch method

View File

@ -16,7 +16,6 @@
*/ */
package kafka.javaapi.consumer package kafka.javaapi.consumer
import kafka.message.Message
import kafka.serializer._ import kafka.serializer._
import kafka.consumer._ import kafka.consumer._
import scala.collection.JavaConversions.asList import scala.collection.JavaConversions.asList

View File

@ -24,7 +24,7 @@ import java.util.concurrent.atomic._
import kafka.utils._ import kafka.utils._
import scala.math._ import scala.math._
import java.text.NumberFormat import java.text.NumberFormat
import kafka.server.BrokerTopicStat import kafka.server.BrokerTopicStats
import kafka.message._ import kafka.message._
import kafka.common._ import kafka.common._
import kafka.metrics.KafkaMetricsGroup import kafka.metrics.KafkaMetricsGroup
@ -244,8 +244,8 @@ private[kafka] class Log(val dir: File,
if(messageSetInfo.count == 0) { if(messageSetInfo.count == 0) {
(-1L, -1L) (-1L, -1L)
} else { } else {
BrokerTopicStat.getBrokerTopicStat(topicName).messagesInRate.mark(messageSetInfo.count) BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(messageSetInfo.count)
BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(messageSetInfo.count) BrokerTopicStats.getBrokerAllTopicStats.messagesInRate.mark(messageSetInfo.count)
// trim any invalid bytes or partial messages before appending it to the on-disk log // trim any invalid bytes or partial messages before appending it to the on-disk log
var validMessages = trimInvalidBytes(messages) var validMessages = trimInvalidBytes(messages)

View File

@ -24,7 +24,6 @@ import com.yammer.metrics.Metrics
import java.io.File import java.io.File
import com.yammer.metrics.reporting.CsvReporter import com.yammer.metrics.reporting.CsvReporter
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import kafka.utils.{Utils, VerifiableProperties, Logging} import kafka.utils.{Utils, VerifiableProperties, Logging}

View File

@ -68,11 +68,11 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
/** /**
* It updates the cache by issuing a get topic metadata request to a random broker. * It updates the cache by issuing a get topic metadata request to a random broker.
* @param topic the topic for which the metadata is to be fetched * @param topics the topics for which the metadata is to be fetched
*/ */
def updateInfo(topics: Set[String]) = { def updateInfo(topics: Set[String]) {
var topicsMetadata: Seq[TopicMetadata] = Nil var topicsMetadata: Seq[TopicMetadata] = Nil
val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers) val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, producerConfig.clientId, brokers)
topicsMetadata = topicMetadataResponse.topicsMetadata topicsMetadata = topicMetadataResponse.topicsMetadata
// throw partition specific exception // throw partition specific exception
topicsMetadata.foreach(tmd =>{ topicsMetadata.foreach(tmd =>{

View File

@ -20,7 +20,6 @@ package kafka.producer
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import joptsimple._ import joptsimple._
import java.util.Properties import java.util.Properties
import java.util.regex._
import java.io._ import java.io._
import kafka.common._ import kafka.common._
import kafka.message._ import kafka.message._

View File

@ -17,7 +17,6 @@
package kafka.producer package kafka.producer
import kafka.utils.Utils
import kafka.utils._ import kafka.utils._

View File

@ -16,7 +16,7 @@
*/ */
package kafka.producer package kafka.producer
import async.{AsyncProducerStats, DefaultEventHandler, ProducerSendThread, EventHandler} import async.{DefaultEventHandler, ProducerSendThread, EventHandler}
import kafka.utils._ import kafka.utils._
import java.util.Random import java.util.Random
import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
@ -27,8 +27,11 @@ import kafka.metrics._
class Producer[K,V](config: ProducerConfig, class Producer[K,V](config: ProducerConfig,
private val eventHandler: EventHandler[K,V]) // for testing only private val eventHandler: EventHandler[K,V],
extends Logging { private val producerStats: ProducerStats,
private val producerTopicStats: ProducerTopicStats) // only for unit testing
extends Logging {
private val hasShutdown = new AtomicBoolean(false) private val hasShutdown = new AtomicBoolean(false)
if (config.batchSize > config.queueSize) if (config.batchSize > config.queueSize)
throw new InvalidConfigException("Batch size can't be larger than queue size.") throw new InvalidConfigException("Batch size can't be larger than queue size.")
@ -47,25 +50,38 @@ extends Logging {
queue, queue,
eventHandler, eventHandler,
config.queueTime, config.queueTime,
config.batchSize) config.batchSize,
config.clientId)
producerSendThread.start() producerSendThread.start()
case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async") case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async")
} }
KafkaMetricsReporter.startReporters(config.props) KafkaMetricsReporter.startReporters(config.props)
def this(t: (ProducerConfig, EventHandler[K,V], ProducerStats, ProducerTopicStats)) =
this(t._1, t._2, t._3, t._4)
def this(config: ProducerConfig) = def this(config: ProducerConfig) =
this(config, this {
new DefaultEventHandler[K,V](config, ClientId.validate(config.clientId)
Utils.createObject[Partitioner[K]](config.partitionerClass, config.props), val producerStats = new ProducerStats(config.clientId)
Utils.createObject[Encoder[V]](config.serializerClass, config.props), val producerTopicStats = new ProducerTopicStats(config.clientId)
Utils.createObject[Encoder[K]](config.keySerializerClass, config.props), (config,
new ProducerPool(config))) new DefaultEventHandler[K,V](config,
Utils.createObject[Partitioner[K]](config.partitionerClass, config.props),
Utils.createObject[Encoder[V]](config.serializerClass, config.props),
Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
new ProducerPool(config),
producerStats = producerStats,
producerTopicStats = producerTopicStats),
producerStats,
producerTopicStats)
}
/** /**
* Sends the data, partitioned by key to the topic using either the * Sends the data, partitioned by key to the topic using either the
* synchronous or the asynchronous producer * synchronous or the asynchronous producer
* @param producerData the producer data object that encapsulates the topic, key and message data * @param messages the producer data object that encapsulates the topic, key and message data
*/ */
def send(messages: KeyedMessage[K,V]*) { def send(messages: KeyedMessage[K,V]*) {
if (hasShutdown.get) if (hasShutdown.get)
@ -79,8 +95,8 @@ extends Logging {
private def recordStats(messages: Seq[KeyedMessage[K,V]]) { private def recordStats(messages: Seq[KeyedMessage[K,V]]) {
for (message <- messages) { for (message <- messages) {
ProducerTopicStat.getProducerTopicStat(message.topic).messageRate.mark() producerTopicStats.getProducerTopicStats(message.topic).messageRate.mark()
ProducerTopicStat.getProducerAllTopicStat.messageRate.mark() producerTopicStats.getProducerAllTopicStats.messageRate.mark()
} }
} }
@ -105,7 +121,7 @@ extends Logging {
} }
} }
if(!added) { if(!added) {
AsyncProducerStats.droppedMessageRate.mark() producerStats.droppedMessageRate.mark()
error("Event queue is full of unsent messages, could not send event: " + message.toString) error("Event queue is full of unsent messages, could not send event: " + message.toString)
throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString) throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
}else { }else {
@ -131,26 +147,27 @@ extends Logging {
} }
@threadsafe @threadsafe
class ProducerTopicStat(name: String) extends KafkaMetricsGroup { class ProducerTopicMetrics(clientIdTopic: ClientIdAndTopic) extends KafkaMetricsGroup {
val messageRate = newMeter(name + "MessagesPerSec", "messages", TimeUnit.SECONDS) val messageRate = newMeter(clientIdTopic + "-MessagesPerSec", "messages", TimeUnit.SECONDS)
val byteRate = newMeter(name + "BytesPerSec", "bytes", TimeUnit.SECONDS) val byteRate = newMeter(clientIdTopic + "-BytesPerSec", "bytes", TimeUnit.SECONDS)
} }
object ProducerTopicStat { class ProducerTopicStats(clientId: String) {
private val valueFactory = (k: String) => new ProducerTopicStat(k) private val valueFactory = (k: ClientIdAndTopic) => new ProducerTopicMetrics(k)
private val stats = new Pool[String, ProducerTopicStat](Some(valueFactory)) private val stats = new Pool[ClientIdAndTopic, ProducerTopicMetrics](Some(valueFactory))
private val allTopicStat = new ProducerTopicStat("AllTopics") private val allTopicStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics"))
def getProducerAllTopicStat(): ProducerTopicStat = allTopicStat def getProducerAllTopicStats(): ProducerTopicMetrics = allTopicStats
def getProducerTopicStat(topic: String): ProducerTopicStat = { def getProducerTopicStats(topic: String): ProducerTopicMetrics = {
stats.getAndMaybePut(topic + "-") stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
} }
} }
object ProducerStats extends KafkaMetricsGroup { class ProducerStats(clientId: String) extends KafkaMetricsGroup {
val serializationErrorRate = newMeter("SerializationErrorsPerSec", "errors", TimeUnit.SECONDS) val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec", "errors", TimeUnit.SECONDS)
val resendRate = newMeter( "ResendsPerSec", "resends", TimeUnit.SECONDS) val resendRate = newMeter(clientId + "-ResendsPerSec", "resends", TimeUnit.SECONDS)
val failedSendRate = newMeter("FailedSendsPerSec", "failed sends", TimeUnit.SECONDS) val failedSendRate = newMeter(clientId + "-FailedSendsPerSec", "failed sends", TimeUnit.SECONDS)
val droppedMessageRate = newMeter(clientId + "-DroppedMessagesPerSec", "drops", TimeUnit.SECONDS)
} }

View File

@ -26,13 +26,26 @@ import kafka.api.TopicMetadata
import kafka.common.UnavailableProducerException import kafka.common.UnavailableProducerException
object ProducerPool{ object ProducerPool {
def createSyncProducer(configOpt: Option[ProducerConfig], broker: Broker): SyncProducer = { /**
* Used in ProducerPool to initiate a SyncProducer connection with a broker.
*/
def createSyncProducer(config: ProducerConfig, broker: Broker): SyncProducer = {
val props = new Properties() val props = new Properties()
props.put("host", broker.host) props.put("host", broker.host)
props.put("port", broker.port.toString) props.put("port", broker.port.toString)
if(configOpt.isDefined) props.putAll(config.props.props)
props.putAll(configOpt.get.props.props) new SyncProducer(new SyncProducerConfig(props))
}
/**
* Used in ClientUtils to send TopicMetadataRequest to a broker.
*/
def createSyncProducer(clientId: String, broker: Broker): SyncProducer = {
val props = new Properties()
props.put("host", broker.host)
props.put("port", broker.port.toString)
props.put("client.id", clientId)
new SyncProducer(new SyncProducerConfig(props)) new SyncProducer(new SyncProducerConfig(props))
} }
} }
@ -41,9 +54,9 @@ class ProducerPool(val config: ProducerConfig) extends Logging {
private val syncProducers = new HashMap[Int, SyncProducer] private val syncProducers = new HashMap[Int, SyncProducer]
private val lock = new Object() private val lock = new Object()
def updateProducer(topicMetaDatas: Seq[TopicMetadata]) { def updateProducer(topicMetadatas: Seq[TopicMetadata]) {
val newBrokers = new collection.mutable.HashSet[Broker] val newBrokers = new collection.mutable.HashSet[Broker]
topicMetaDatas.foreach(tmd => { topicMetadatas.foreach(tmd => {
tmd.partitionsMetadata.foreach(pmd => { tmd.partitionsMetadata.foreach(pmd => {
if(pmd.leader.isDefined) if(pmd.leader.isDefined)
newBrokers+=(pmd.leader.get) newBrokers+=(pmd.leader.get)
@ -53,9 +66,9 @@ class ProducerPool(val config: ProducerConfig) extends Logging {
newBrokers.foreach(b => { newBrokers.foreach(b => {
if(syncProducers.contains(b.id)){ if(syncProducers.contains(b.id)){
syncProducers(b.id).close() syncProducers(b.id).close()
syncProducers.put(b.id, ProducerPool.createSyncProducer(Some(config), b)) syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
} else } else
syncProducers.put(b.id, ProducerPool.createSyncProducer(Some(config), b)) syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
}) })
} }
} }

View File

@ -34,14 +34,12 @@ object SyncProducer {
*/ */
@threadsafe @threadsafe
class SyncProducer(val config: SyncProducerConfig) extends Logging { class SyncProducer(val config: SyncProducerConfig) extends Logging {
private val MaxConnectBackoffMs = 60000
private var sentOnConnection = 0
private val lock = new Object() private val lock = new Object()
@volatile private var shutdown: Boolean = false @volatile private var shutdown: Boolean = false
private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize, private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
config.bufferSize, config.requestTimeoutMs) config.bufferSize, config.requestTimeoutMs)
val producerRequestStats = new ProducerRequestStats(config.clientId, "host_" + config.host + "-port_" + config.port)
trace("Instantiating Scala Sync Producer") trace("Instantiating Scala Sync Producer")
@ -89,9 +87,9 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
* Send a message * Send a message
*/ */
def send(producerRequest: ProducerRequest): ProducerResponse = { def send(producerRequest: ProducerRequest): ProducerResponse = {
ProducerRequestStat.requestSizeHist.update(producerRequest.sizeInBytes) producerRequestStats.requestSizeHist.update(producerRequest.sizeInBytes)
var response: Receive = null var response: Receive = null
ProducerRequestStat.requestTimer.time { producerRequestStats.requestTimer.time {
response = doSend(producerRequest) response = doSend(producerRequest)
} }
ProducerResponse.readFrom(response.buffer) ProducerResponse.readFrom(response.buffer)
@ -152,7 +150,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
} }
} }
object ProducerRequestStat extends KafkaMetricsGroup { class ProducerRequestStats(clientId: String, brokerInfo: String) extends KafkaMetricsGroup {
val requestTimer = new KafkaTimer(newTimer("ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) val requestTimer = new KafkaTimer(newTimer(clientId + "-" + brokerInfo + "-ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
val requestSizeHist = newHistogram("ProducerRequestSize") val requestSizeHist = newHistogram(clientId + "-" + brokerInfo + "-ProducerRequestSize")
} }

View File

@ -44,7 +44,7 @@ trait SyncProducerConfigShared {
val correlationId = props.getInt("producer.request.correlation_id", SyncProducerConfig.DefaultCorrelationId) val correlationId = props.getInt("producer.request.correlation_id", SyncProducerConfig.DefaultCorrelationId)
/* the client application sending the producer requests */ /* the client application sending the producer requests */
val clientId = props.getString("producer.request.client_id",SyncProducerConfig.DefaultClientId) val clientId = props.getString("clientid", SyncProducerConfig.DefaultClientId)
/* /*
* The required acks of the producer requests - negative value means ack * The required acks of the producer requests - negative value means ack

View File

@ -1,25 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.producer.async
import kafka.metrics.KafkaMetricsGroup
import java.util.concurrent.TimeUnit
object AsyncProducerStats extends KafkaMetricsGroup {
val droppedMessageRate = newMeter("DroppedMessagesPerSec", "drops", TimeUnit.SECONDS)
}

View File

@ -33,7 +33,9 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
private val encoder: Encoder[V], private val encoder: Encoder[V],
private val keyEncoder: Encoder[K], private val keyEncoder: Encoder[K],
private val producerPool: ProducerPool, private val producerPool: ProducerPool,
private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata]) private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata],
private val producerStats: ProducerStats,
private val producerTopicStats: ProducerTopicStats)
extends EventHandler[K,V] with Logging { extends EventHandler[K,V] with Logging {
val isSync = ("sync" == config.producerType) val isSync = ("sync" == config.producerType)
@ -48,8 +50,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
serializedData.foreach{ serializedData.foreach{
keyed => keyed =>
val dataSize = keyed.message.payloadSize val dataSize = keyed.message.payloadSize
ProducerTopicStat.getProducerTopicStat(keyed.topic).byteRate.mark(dataSize) producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)
ProducerTopicStat.getProducerAllTopicStat.byteRate.mark(dataSize) producerTopicStats.getProducerAllTopicStats.byteRate.mark(dataSize)
} }
var outstandingProduceRequests = serializedData var outstandingProduceRequests = serializedData
var remainingRetries = config.producerRetries + 1 var remainingRetries = config.producerRetries + 1
@ -61,11 +63,11 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
// get topics of the outstanding produce requests and refresh metadata for those // get topics of the outstanding produce requests and refresh metadata for those
Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet)) Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet))
remainingRetries -= 1 remainingRetries -= 1
ProducerStats.resendRate.mark() producerStats.resendRate.mark()
} }
} }
if(outstandingProduceRequests.size > 0) { if(outstandingProduceRequests.size > 0) {
ProducerStats.failedSendRate.mark() producerStats.failedSendRate.mark()
error("Failed to send the following requests: " + outstandingProduceRequests) error("Failed to send the following requests: " + outstandingProduceRequests)
throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null) throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null)
} }
@ -111,7 +113,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = null.asInstanceOf[K], message = new Message(bytes = encoder.toBytes(e.message))) serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = null.asInstanceOf[K], message = new Message(bytes = encoder.toBytes(e.message)))
} catch { } catch {
case t => case t =>
ProducerStats.serializationErrorRate.mark() producerStats.serializationErrorRate.mark()
if (isSync) { if (isSync) {
throw t throw t
} else { } else {

View File

@ -28,12 +28,13 @@ class ProducerSendThread[K,V](val threadName: String,
val queue: BlockingQueue[KeyedMessage[K,V]], val queue: BlockingQueue[KeyedMessage[K,V]],
val handler: EventHandler[K,V], val handler: EventHandler[K,V],
val queueTime: Long, val queueTime: Long,
val batchSize: Int) extends Thread(threadName) with Logging with KafkaMetricsGroup { val batchSize: Int,
val clientId: String) extends Thread(threadName) with Logging with KafkaMetricsGroup {
private val shutdownLatch = new CountDownLatch(1) private val shutdownLatch = new CountDownLatch(1)
private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V]) private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V])
newGauge("ProducerQueueSize-" + getId, newGauge(clientId + "-ProducerQueueSize-" + getId,
new Gauge[Int] { new Gauge[Int] {
def getValue = queue.size def getValue = queue.size
}) })

View File

@ -17,7 +17,6 @@
package kafka.serializer package kafka.serializer
import kafka.message._
import kafka.utils.VerifiableProperties import kafka.utils.VerifiableProperties
/** /**

View File

@ -18,8 +18,6 @@
package kafka.serializer package kafka.serializer
import kafka.utils.VerifiableProperties import kafka.utils.VerifiableProperties
import kafka.message._
import kafka.utils.Utils
/** /**
* An encoder is a method of turning objects into byte arrays. * An encoder is a method of turning objects into byte arrays.

View File

@ -27,7 +27,7 @@ import kafka.api.{FetchResponse, FetchResponsePartitionData, FetchRequestBuilder
import kafka.metrics.KafkaMetricsGroup import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge import com.yammer.metrics.core.Gauge
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import kafka.utils.{Pool, ShutdownableThread} import kafka.utils.{ClientIdAndTopic, Pool, ShutdownableThread}
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
@ -38,12 +38,13 @@ import java.util.concurrent.locks.ReentrantLock
abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int, abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int,
fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1) fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1)
extends ShutdownableThread(name) { extends ShutdownableThread(name) {
private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map
private val partitionMapLock = new ReentrantLock private val partitionMapLock = new ReentrantLock
private val partitionMapCond = partitionMapLock.newCondition() private val partitionMapCond = partitionMapLock.newCondition()
val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize) val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId)
val fetcherMetrics = FetcherStat.getFetcherStat(name + "-" + sourceBroker.id) val fetcherStats = new FetcherStats(clientId)
val fetcherMetrics = fetcherStats.getFetcherStats(name + "-" + sourceBroker.id)
val fetcherLagStats = new FetcherLagStats(clientId)
/* callbacks to be defined in subclass */ /* callbacks to be defined in subclass */
@ -117,7 +118,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
case None => currentOffset.get case None => currentOffset.get
} }
partitionMap.put(topicAndPartition, newOffset) partitionMap.put(topicAndPartition, newOffset)
FetcherLagMetrics.getFetcherLagMetrics(topic, partitionId).lag = partitionData.hw - newOffset fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
fetcherMetrics.byteRate.mark(validBytes) fetcherMetrics.byteRate.mark(validBytes)
// Once we hand off the partition data to the subclass, we can't mess with it any more in this thread // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
processPartitionData(topicAndPartition, currentOffset.get, partitionData) processPartitionData(topicAndPartition, currentOffset.get, partitionData)
@ -182,10 +183,10 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
} }
} }
class FetcherLagMetrics(name: (String, Int)) extends KafkaMetricsGroup { class FetcherLagMetrics(clientIdTopicPartition: ClientIdTopicPartition) extends KafkaMetricsGroup {
private[this] var lagVal = new AtomicLong(-1L) private[this] var lagVal = new AtomicLong(-1L)
newGauge( newGauge(
name._1 + "-" + name._2 + "-ConsumerLag", clientIdTopicPartition + "-ConsumerLag",
new Gauge[Long] { new Gauge[Long] {
def getValue = lagVal.get def getValue = lagVal.get
} }
@ -198,25 +199,29 @@ class FetcherLagMetrics(name: (String, Int)) extends KafkaMetricsGroup {
def lag = lagVal.get def lag = lagVal.get
} }
object FetcherLagMetrics { class FetcherLagStats(clientId: String) {
private val valueFactory = (k: (String, Int)) => new FetcherLagMetrics(k) private val valueFactory = (k: ClientIdTopicPartition) => new FetcherLagMetrics(k)
private val stats = new Pool[(String, Int), FetcherLagMetrics](Some(valueFactory)) private val stats = new Pool[ClientIdTopicPartition, FetcherLagMetrics](Some(valueFactory))
def getFetcherLagMetrics(topic: String, partitionId: Int): FetcherLagMetrics = { def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = {
stats.getAndMaybePut( (topic, partitionId) ) stats.getAndMaybePut(new ClientIdTopicPartition(clientId, topic, partitionId))
} }
} }
class FetcherStat(name: String) extends KafkaMetricsGroup { class FetcherMetrics(clientIdTopic: ClientIdAndTopic) extends KafkaMetricsGroup {
val requestRate = newMeter(name + "RequestsPerSec", "requests", TimeUnit.SECONDS) val requestRate = newMeter(clientIdTopic + "-RequestsPerSec", "requests", TimeUnit.SECONDS)
val byteRate = newMeter(name + "BytesPerSec", "bytes", TimeUnit.SECONDS) val byteRate = newMeter(clientIdTopic + "-BytesPerSec", "bytes", TimeUnit.SECONDS)
} }
object FetcherStat { class FetcherStats(clientId: String) {
private val valueFactory = (k: String) => new FetcherStat(k) private val valueFactory = (k: ClientIdAndTopic) => new FetcherMetrics(k)
private val stats = new Pool[String, FetcherStat](Some(valueFactory)) private val stats = new Pool[ClientIdAndTopic, FetcherMetrics](Some(valueFactory))
def getFetcherStat(name: String): FetcherStat = { def getFetcherStats(name: String): FetcherMetrics = {
stats.getAndMaybePut(name) stats.getAndMaybePut(new ClientIdAndTopic(clientId, name))
} }
} }
case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) {
override def toString = "%s-%s-%d".format(clientId, topic, partitionId)
}

View File

@ -237,8 +237,8 @@ class KafkaApis(val requestChannel: RequestChannel,
private def appendToLocalLog(partitionAndData: Map[TopicAndPartition, MessageSet]): Iterable[ProduceResult] = { private def appendToLocalLog(partitionAndData: Map[TopicAndPartition, MessageSet]): Iterable[ProduceResult] = {
trace("Append [%s] to local log ".format(partitionAndData.toString)) trace("Append [%s] to local log ".format(partitionAndData.toString))
partitionAndData.map {case (topicAndPartition, messages) => partitionAndData.map {case (topicAndPartition, messages) =>
BrokerTopicStat.getBrokerTopicStat(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes) BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes)
BrokerTopicStat.getBrokerAllTopicStat.bytesInRate.mark(messages.sizeInBytes) BrokerTopicStats.getBrokerAllTopicStats.bytesInRate.mark(messages.sizeInBytes)
try { try {
val localReplica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition) val localReplica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
@ -255,8 +255,8 @@ class KafkaApis(val requestChannel: RequestChannel,
Runtime.getRuntime.halt(1) Runtime.getRuntime.halt(1)
null null
case e => case e =>
BrokerTopicStat.getBrokerTopicStat(topicAndPartition.topic).failedProduceRequestRate.mark() BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
BrokerTopicStat.getBrokerAllTopicStat.failedProduceRequestRate.mark() BrokerTopicStats.getBrokerAllTopicStats.failedProduceRequestRate.mark()
error("Error processing ProducerRequest on %s:%d".format(topicAndPartition.topic, topicAndPartition.partition), e) error("Error processing ProducerRequest on %s:%d".format(topicAndPartition.topic, topicAndPartition.partition), e)
new ProduceResult(topicAndPartition, e) new ProduceResult(topicAndPartition, e)
} }
@ -323,8 +323,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val partitionData = val partitionData =
try { try {
val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId) val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId)
BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes) BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes)
BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes) BrokerTopicStats.getBrokerAllTopicStats.bytesOutRate.mark(messages.sizeInBytes)
if (!isFetchFromFollower) { if (!isFetchFromFollower) {
new FetchResponsePartitionData(ErrorMapping.NoError, offset, highWatermark, messages) new FetchResponsePartitionData(ErrorMapping.NoError, offset, highWatermark, messages)
} else { } else {
@ -334,8 +334,8 @@ class KafkaApis(val requestChannel: RequestChannel,
} }
} catch { } catch {
case t: Throwable => case t: Throwable =>
BrokerTopicStat.getBrokerTopicStat(topic).failedFetchRequestRate.mark() BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
BrokerTopicStat.getBrokerAllTopicStat.failedFetchRequestRate.mark() BrokerTopicStats.getBrokerAllTopicStats.failedFetchRequestRate.mark()
error("error when processing request " + (topic, partition, offset, fetchSize), t) error("error when processing request " + (topic, partition, offset, fetchSize), t)
new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]),
offset, -1L, MessageSet.Empty) offset, -1L, MessageSet.Empty)

View File

@ -79,14 +79,14 @@ class BrokerTopicMetrics(name: String) extends KafkaMetricsGroup {
val failedFetchRequestRate = newMeter(name + "FailedFetchRequestsPerSec", "requests", TimeUnit.SECONDS) val failedFetchRequestRate = newMeter(name + "FailedFetchRequestsPerSec", "requests", TimeUnit.SECONDS)
} }
object BrokerTopicStat extends Logging { object BrokerTopicStats extends Logging {
private val valueFactory = (k: String) => new BrokerTopicMetrics(k) private val valueFactory = (k: String) => new BrokerTopicMetrics(k)
private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory)) private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
private val allTopicStat = new BrokerTopicMetrics("AllTopics") private val allTopicStats = new BrokerTopicMetrics("AllTopics")
def getBrokerAllTopicStat(): BrokerTopicMetrics = allTopicStat def getBrokerAllTopicStats(): BrokerTopicMetrics = allTopicStats
def getBrokerTopicStat(topic: String): BrokerTopicMetrics = { def getBrokerTopicStats(topic: String): BrokerTopicMetrics = {
stats.getAndMaybePut(topic + "-") stats.getAndMaybePut(topic + "-")
} }
} }

View File

@ -23,7 +23,7 @@ import kafka.utils._
import java.util.concurrent._ import java.util.concurrent._
import atomic.AtomicBoolean import atomic.AtomicBoolean
import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.ZkClient
import kafka.controller.{ControllerStat, KafkaController} import kafka.controller.{ControllerStats, KafkaController}
/** /**
* Represents the lifecycle of a single Kafka broker. Handles all functionality required * Represents the lifecycle of a single Kafka broker. Handles all functionality required
@ -96,9 +96,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
* Forces some dynamic jmx beans to be registered on server startup. * Forces some dynamic jmx beans to be registered on server startup.
*/ */
private def registerStats() { private def registerStats() {
BrokerTopicStat.getBrokerAllTopicStat() BrokerTopicStats.getBrokerAllTopicStats()
ControllerStat.offlinePartitionRate ControllerStats.offlinePartitionRate
ControllerStat.uncleanLeaderElectionRate ControllerStats.uncleanLeaderElectionRate
} }
/** /**

View File

@ -28,7 +28,7 @@ class ReplicaFetcherThread(name:String,
brokerConfig: KafkaConfig, brokerConfig: KafkaConfig,
replicaMgr: ReplicaManager) replicaMgr: ReplicaManager)
extends AbstractFetcherThread(name = name, extends AbstractFetcherThread(name = name,
clientId = FetchRequest.ReplicaFetcherClientId + "- %s:%d".format(sourceBroker.host, sourceBroker.port) , clientId = FetchRequest.ReplicaFetcherClientId + "-host_%s-port_%d".format(sourceBroker.host, sourceBroker.port),
sourceBroker = sourceBroker, sourceBroker = sourceBroker,
socketTimeout = brokerConfig.replicaSocketTimeoutMs, socketTimeout = brokerConfig.replicaSocketTimeoutMs,
socketBufferSize = brokerConfig.replicaSocketBufferSize, socketBufferSize = brokerConfig.replicaSocketBufferSize,

View File

@ -41,7 +41,7 @@ object ConsumerOffsetChecker extends Logging {
val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))._1 val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))._1
val consumer = brokerInfo match { val consumer = brokerInfo match {
case BrokerIpPattern(ip, port) => case BrokerIpPattern(ip, port) =>
Some(new SimpleConsumer(ip, port.toInt, 10000, 100000)) Some(new SimpleConsumer(ip, port.toInt, 10000, 100000, "ConsumerOffsetChecker"))
case _ => case _ =>
error("Could not parse broker info %s".format(brokerInfo)) error("Could not parse broker info %s".format(brokerInfo))
None None

View File

@ -67,7 +67,7 @@ object GetOffsetShell {
val partition = options.valueOf(partitionOpt).intValue val partition = options.valueOf(partitionOpt).intValue
var time = options.valueOf(timeOpt).longValue var time = options.valueOf(timeOpt).longValue
val nOffsets = options.valueOf(nOffsetsOpt).intValue val nOffsets = options.valueOf(nOffsetsOpt).intValue
val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 100000) val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 100000, "GetOffsetShell")
val topicAndPartition = TopicAndPartition(topic, partition) val topicAndPartition = TopicAndPartition(topic, partition)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets))) val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets

View File

@ -17,7 +17,6 @@
package kafka.tools package kafka.tools
import kafka.message.Message
import joptsimple.OptionParser import joptsimple.OptionParser
import kafka.utils.{Utils, CommandLineUtils, Logging} import kafka.utils.{Utils, CommandLineUtils, Logging}
import kafka.producer.{KeyedMessage, ProducerConfig, Producer} import kafka.producer.{KeyedMessage, ProducerConfig, Producer}

View File

@ -24,7 +24,7 @@ import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
import kafka.consumer._ import kafka.consumer._
import kafka.utils.{Logging, ZkUtils} import kafka.utils.{Logging, ZkUtils}
import kafka.api.OffsetRequest import kafka.api.OffsetRequest
import kafka.message.{CompressionCodec, Message} import kafka.message.CompressionCodec
object ReplayLogProducer extends Logging { object ReplayLogProducer extends Logging {

View File

@ -19,12 +19,10 @@ package kafka.tools
import joptsimple._ import joptsimple._
import kafka.utils._ import kafka.utils._
import kafka.producer.ProducerConfig
import kafka.consumer._ import kafka.consumer._
import kafka.client.ClientUtils import kafka.client.ClientUtils
import kafka.api.{OffsetRequest, FetchRequestBuilder, Request} import kafka.api.{OffsetRequest, FetchRequestBuilder, Request}
import kafka.cluster.Broker import kafka.cluster.Broker
import java.util.Properties
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
/** /**
@ -127,7 +125,7 @@ object SimpleConsumerShell extends Logging {
// getting topic metadata // getting topic metadata
info("Getting topic metatdata...") info("Getting topic metatdata...")
val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers).topicsMetadata val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), clientId, metadataTargetBrokers).topicsMetadata
if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata)) System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata))
System.exit(1) System.exit(1)
@ -167,7 +165,7 @@ object SimpleConsumerShell extends Logging {
System.exit(1) System.exit(1)
} }
if(startingOffset < 0) if(startingOffset < 0)
startingOffset = SimpleConsumer.earliestOrLatestOffset(fetchTargetBroker, topic, partitionId, startingOffset, false) startingOffset = SimpleConsumer.earliestOrLatestOffset(fetchTargetBroker, topic, partitionId, startingOffset, clientId, false)
// initializing formatter // initializing formatter
val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
@ -175,7 +173,7 @@ object SimpleConsumerShell extends Logging {
info("Starting simple consumer shell to partition [%s, %d], replica [%d], host and port: [%s, %d], from offset [%d]" info("Starting simple consumer shell to partition [%s, %d], replica [%d], host and port: [%s, %d], from offset [%d]"
.format(topic, partitionId, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset)) .format(topic, partitionId, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset))
val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024) val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024, clientId)
val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() { val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() {
def run() { def run() {
var offset = startingOffset var offset = startingOffset

View File

@ -65,7 +65,7 @@ object UpdateOffsetsInZK {
ZkUtils.getBrokerInfo(zkClient, broker) match { ZkUtils.getBrokerInfo(zkClient, broker) match {
case Some(brokerInfo) => case Some(brokerInfo) =>
val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024) val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024, "UpdateOffsetsInZk")
val topicAndPartition = TopicAndPartition(topic, partition) val topicAndPartition = TopicAndPartition(topic, partition)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(offsetOption, 1))) val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(offsetOption, 1)))
val offset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head val offset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import kafka.common.InvalidTopicException
import kafka.common.InvalidClientIdException
import util.matching.Regex
object ClientId {
val legalChars = "[a-zA-Z0-9_-]"
val maxNameLength = 200 // to prevent hitting filename max length limit
private val rgx = new Regex(legalChars + "*")
def validate(clientId: String) {
if (clientId.length > maxNameLength)
throw new InvalidClientIdException("ClientId is illegal, can't be longer than " + maxNameLength + " characters")
rgx.findFirstIn(clientId) match {
case Some(t) =>
if (!t.equals(clientId))
throw new InvalidClientIdException("ClientId " + clientId + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
case None => throw new InvalidClientIdException("ClientId " + clientId + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
}
}
}
object Topic {
val legalChars = "[a-zA-Z0-9_-]"
val maxNameLength = 255
private val rgx = new Regex(legalChars + "+")
def validate(topic: String) {
if (topic.length <= 0)
throw new InvalidTopicException("topic name is illegal, can't be empty")
else if (topic.length > maxNameLength)
throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxNameLength + " characters")
rgx.findFirstIn(topic) match {
case Some(t) =>
if (!t.equals(topic))
throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
case None => throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
}
}
}
case class ClientIdAndTopic(clientId: String, topic:String) {
override def toString = "%s-%s".format(clientId, topic)
}

View File

@ -1,41 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import kafka.common.InvalidTopicException
import util.matching.Regex
object Topic {
val legalChars = "[a-zA-Z0-9_-]"
val maxNameLength = 255
private val rgx = new Regex(legalChars + "+")
def validate(topic: String) {
if (topic.length <= 0)
throw new InvalidTopicException("topic name is illegal, can't be empty")
else if (topic.length > maxNameLength)
throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxNameLength + " characters")
rgx.findFirstIn(topic) match {
case Some(t) =>
if (!t.equals(topic))
throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
case None => throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
}
}
}

View File

@ -17,7 +17,6 @@
package kafka package kafka
import message.Message
import org.apache.log4j.PropertyConfigurator import org.apache.log4j.PropertyConfigurator
import kafka.utils.Logging import kafka.utils.Logging
import serializer.Encoder import serializer.Encoder

View File

@ -18,7 +18,6 @@
package kafka package kafka
import consumer._ import consumer._
import message.Message
import utils.Utils import utils.Utils
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch

View File

@ -55,7 +55,8 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
queue, queue,
new AtomicLong(consumedOffset), new AtomicLong(consumedOffset),
new AtomicLong(0), new AtomicLong(0),
new AtomicInteger(0))) new AtomicInteger(0),
new ConsumerTopicStats("")))
val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0)) val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
override def setUp() { override def setUp() {
@ -78,7 +79,8 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
consumerConfig.consumerTimeoutMs, consumerConfig.consumerTimeoutMs,
new StringDecoder(), new StringDecoder(),
new StringDecoder(), new StringDecoder(),
enableShallowIterator = false) enableShallowIterator = false,
consumerTopicStats = new ConsumerTopicStats(""))
var receivedMessages = (0 until 5).map(i => iter.next.message).toList var receivedMessages = (0 until 5).map(i => iter.next.message).toList
assertFalse(iter.hasNext) assertFalse(iter.hasNext)

View File

@ -24,7 +24,6 @@ import kafka.server._
import org.apache.log4j.{Level, Logger} import org.apache.log4j.{Level, Logger}
import org.scalatest.junit.JUnit3Suite import org.scalatest.junit.JUnit3Suite
import kafka.utils.TestUtils import kafka.utils.TestUtils
import kafka.message.Message
import kafka.serializer._ import kafka.serializer._
import kafka.producer.{Producer, KeyedMessage} import kafka.producer.{Producer, KeyedMessage}

View File

@ -23,7 +23,6 @@ import scala.collection._
import junit.framework.Assert._ import junit.framework.Assert._
import kafka.cluster._ import kafka.cluster._
import kafka.message._
import kafka.server._ import kafka.server._
import org.scalatest.junit.JUnit3Suite import org.scalatest.junit.JUnit3Suite
import kafka.consumer._ import kafka.consumer._
@ -50,7 +49,8 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
queue, queue,
new AtomicLong(0), new AtomicLong(0),
new AtomicLong(0), new AtomicLong(0),
new AtomicInteger(0))) new AtomicInteger(0),
new ConsumerTopicStats("")))
var fetcher: ConsumerFetcherManager = null var fetcher: ConsumerFetcherManager = null

View File

@ -18,7 +18,7 @@
package kafka.integration package kafka.integration
import kafka.api.FetchRequestBuilder import kafka.api.FetchRequestBuilder
import kafka.message.{Message, ByteBufferMessageSet} import kafka.message.ByteBufferMessageSet
import kafka.server.{KafkaRequestHandler, KafkaConfig} import kafka.server.{KafkaRequestHandler, KafkaConfig}
import org.apache.log4j.{Level, Logger} import org.apache.log4j.{Level, Logger}
import org.junit.Assert._ import org.junit.Assert._

View File

@ -25,7 +25,6 @@ import java.util.Properties
import kafka.utils.Utils import kafka.utils.Utils
import kafka.producer.{KeyedMessage, Producer, ProducerConfig} import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import kafka.serializer._ import kafka.serializer._
import kafka.message.Message
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.apache.log4j.{Level, Logger} import org.apache.log4j.{Level, Logger}
import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.ZkClient

View File

@ -21,7 +21,6 @@ import kafka.consumer.SimpleConsumer
import org.scalatest.junit.JUnit3Suite import org.scalatest.junit.JUnit3Suite
import java.util.Properties import java.util.Properties
import kafka.producer.{ProducerConfig, Producer} import kafka.producer.{ProducerConfig, Producer}
import kafka.message.Message
import kafka.utils.TestUtils import kafka.utils.TestUtils
import kafka.serializer._ import kafka.serializer._
@ -44,10 +43,7 @@ trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarnes
props.put("producer.request.required.acks", "-1") props.put("producer.request.required.acks", "-1")
props.put("serializer.class", classOf[StringEncoder].getName.toString) props.put("serializer.class", classOf[StringEncoder].getName.toString)
producer = new Producer(new ProducerConfig(props)) producer = new Producer(new ProducerConfig(props))
consumer = new SimpleConsumer(host, consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
port,
1000000,
64*1024)
} }
override def tearDown() { override def tearDown() {

View File

@ -29,7 +29,7 @@ import kafka.producer.KeyedMessage
import kafka.javaapi.producer.Producer import kafka.javaapi.producer.Producer
import kafka.utils.IntEncoder import kafka.utils.IntEncoder
import kafka.utils.TestUtils._ import kafka.utils.TestUtils._
import kafka.utils.{Utils, Logging, TestUtils} import kafka.utils.{Logging, TestUtils}
import kafka.consumer.{KafkaStream, ConsumerConfig} import kafka.consumer.{KafkaStream, ConsumerConfig}
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness

View File

@ -53,7 +53,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
logDir = new File(logDirPath) logDir = new File(logDirPath)
time = new MockTime() time = new MockTime()
server = TestUtils.createServer(new KafkaConfig(config), time) server = TestUtils.createServer(new KafkaConfig(config), time)
simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024) simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024, "")
} }
@After @After

View File

@ -24,7 +24,6 @@ import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.{TestUtils, Utils, Logging} import kafka.utils.{TestUtils, Utils, Logging}
import junit.framework.Assert._ import junit.framework.Assert._
import kafka.api.FetchRequestBuilder import kafka.api.FetchRequestBuilder
import kafka.message.Message
import kafka.producer.async.MissingConfigException import kafka.producer.async.MissingConfigException
import kafka.serializer.Encoder import kafka.serializer.Encoder
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
@ -57,7 +56,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
logDirZk = new File(logDirZkPath) logDirZk = new File(logDirZkPath)
config = new KafkaConfig(propsZk) config = new KafkaConfig(propsZk)
serverZk = TestUtils.createServer(config); serverZk = TestUtils.createServer(config);
simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024) simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024, "")
} }
@After @After

View File

@ -17,7 +17,7 @@
package kafka.producer package kafka.producer
import java.util.{LinkedList, Properties} import java.util.Properties
import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.LinkedBlockingQueue
import junit.framework.Assert._ import junit.framework.Assert._
import org.easymock.EasyMock import org.easymock.EasyMock
@ -68,7 +68,10 @@ class AsyncProducerTest extends JUnit3Suite {
val config = new ProducerConfig(props) val config = new ProducerConfig(props)
val produceData = getProduceData(12) val produceData = getProduceData(12)
val producer = new Producer[String, String](config, mockEventHandler) val producer = new Producer[String, String](config,
mockEventHandler,
new ProducerStats(""),
new ProducerTopicStats(""))
try { try {
// send all 10 messages, should hit the batch size and then reach broker // send all 10 messages, should hit the batch size and then reach broker
producer.send(produceData: _*) producer.send(produceData: _*)
@ -118,7 +121,7 @@ class AsyncProducerTest extends JUnit3Suite {
val queue = new LinkedBlockingQueue[KeyedMessage[String,String]](10) val queue = new LinkedBlockingQueue[KeyedMessage[String,String]](10)
val producerSendThread = val producerSendThread =
new ProducerSendThread[String,String]("thread1", queue, mockHandler, Integer.MAX_VALUE, 5) new ProducerSendThread[String,String]("thread1", queue, mockHandler, Integer.MAX_VALUE, 5, "")
producerSendThread.start() producerSendThread.start()
for (producerData <- producerDataList) for (producerData <- producerDataList)
@ -143,7 +146,7 @@ class AsyncProducerTest extends JUnit3Suite {
val queueExpirationTime = 200 val queueExpirationTime = 200
val queue = new LinkedBlockingQueue[KeyedMessage[String,String]](10) val queue = new LinkedBlockingQueue[KeyedMessage[String,String]](10)
val producerSendThread = val producerSendThread =
new ProducerSendThread[String,String]("thread1", queue, mockHandler, queueExpirationTime, 5) new ProducerSendThread[String,String]("thread1", queue, mockHandler, queueExpirationTime, 5, "")
producerSendThread.start() producerSendThread.start()
for (producerData <- producerDataList) for (producerData <- producerDataList)
@ -185,11 +188,13 @@ class AsyncProducerTest extends JUnit3Suite {
val producerPool = new ProducerPool(config) val producerPool = new ProducerPool(config)
val handler = new DefaultEventHandler[Int,String](config, val handler = new DefaultEventHandler[Int,String](config,
partitioner = intPartitioner, partitioner = intPartitioner,
encoder = null.asInstanceOf[Encoder[String]], encoder = null.asInstanceOf[Encoder[String]],
keyEncoder = new IntEncoder(), keyEncoder = new IntEncoder(),
producerPool = producerPool, producerPool = producerPool,
topicPartitionInfos) topicPartitionInfos = topicPartitionInfos,
producerStats = new ProducerStats(""),
producerTopicStats = new ProducerTopicStats(""))
val topic1Broker1Data = val topic1Broker1Data =
ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", 0, new Message("msg1".getBytes)), ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", 0, new Message("msg1".getBytes)),
@ -228,8 +233,9 @@ class AsyncProducerTest extends JUnit3Suite {
encoder = new StringEncoder, encoder = new StringEncoder,
keyEncoder = new StringEncoder, keyEncoder = new StringEncoder,
producerPool = producerPool, producerPool = producerPool,
topicPartitionInfos topicPartitionInfos = topicPartitionInfos,
) producerStats = new ProducerStats(""),
producerTopicStats = new ProducerTopicStats(""))
val serializedData = handler.serialize(produceData) val serializedData = handler.serialize(produceData)
val deserializedData = serializedData.map(d => new KeyedMessage[String,String](d.topic, Utils.readString(d.message.payload))) val deserializedData = serializedData.map(d => new KeyedMessage[String,String](d.topic, Utils.readString(d.message.payload)))
@ -257,7 +263,9 @@ class AsyncProducerTest extends JUnit3Suite {
encoder = null.asInstanceOf[Encoder[String]], encoder = null.asInstanceOf[Encoder[String]],
keyEncoder = null.asInstanceOf[Encoder[String]], keyEncoder = null.asInstanceOf[Encoder[String]],
producerPool = producerPool, producerPool = producerPool,
topicPartitionInfos) topicPartitionInfos = topicPartitionInfos,
producerStats = new ProducerStats(""),
producerTopicStats = new ProducerTopicStats(""))
try { try {
handler.partitionAndCollate(producerDataList) handler.partitionAndCollate(producerDataList)
fail("Should fail with UnknownTopicOrPartitionException") fail("Should fail with UnknownTopicOrPartitionException")
@ -288,7 +296,9 @@ class AsyncProducerTest extends JUnit3Suite {
encoder = new StringEncoder, encoder = new StringEncoder,
keyEncoder = new StringEncoder, keyEncoder = new StringEncoder,
producerPool = producerPool, producerPool = producerPool,
topicPartitionInfos) topicPartitionInfos = topicPartitionInfos,
producerStats = new ProducerStats(""),
producerTopicStats = new ProducerTopicStats(""))
try { try {
handler.handle(producerDataList) handler.handle(producerDataList)
fail("Should fail with NoBrokersForPartitionException") fail("Should fail with NoBrokersForPartitionException")
@ -335,7 +345,9 @@ class AsyncProducerTest extends JUnit3Suite {
encoder = null.asInstanceOf[Encoder[String]], encoder = null.asInstanceOf[Encoder[String]],
keyEncoder = null.asInstanceOf[Encoder[String]], keyEncoder = null.asInstanceOf[Encoder[String]],
producerPool = producerPool, producerPool = producerPool,
topicPartitionInfos) topicPartitionInfos = topicPartitionInfos,
producerStats = new ProducerStats(""),
producerTopicStats = new ProducerTopicStats(""))
val producerDataList = new ArrayBuffer[KeyedMessage[String,Message]] val producerDataList = new ArrayBuffer[KeyedMessage[String,Message]]
producerDataList.append(new KeyedMessage[String,Message]("topic1", new Message("msg1".getBytes))) producerDataList.append(new KeyedMessage[String,Message]("topic1", new Message("msg1".getBytes)))
producerDataList.append(new KeyedMessage[String,Message]("topic2", new Message("msg2".getBytes))) producerDataList.append(new KeyedMessage[String,Message]("topic2", new Message("msg2".getBytes)))
@ -373,14 +385,19 @@ class AsyncProducerTest extends JUnit3Suite {
val msgs = TestUtils.getMsgStrings(10) val msgs = TestUtils.getMsgStrings(10)
val handler = new DefaultEventHandler[String,String]( config, val handler = new DefaultEventHandler[String,String](config,
partitioner = null.asInstanceOf[Partitioner[String]], partitioner = null.asInstanceOf[Partitioner[String]],
encoder = new StringEncoder, encoder = new StringEncoder,
keyEncoder = new StringEncoder, keyEncoder = new StringEncoder,
producerPool = producerPool, producerPool = producerPool,
topicPartitionInfos) topicPartitionInfos = topicPartitionInfos,
producerStats = new ProducerStats(""),
producerTopicStats = new ProducerTopicStats(""))
val producer = new Producer[String, String](config, handler) val producer = new Producer[String, String](config,
handler,
new ProducerStats(""),
new ProducerTopicStats(""))
try { try {
// send all 10 messages, should create 2 batches and 2 syncproducer calls // send all 10 messages, should create 2 batches and 2 syncproducer calls
producer.send(msgs.map(m => new KeyedMessage[String,String](topic, m)): _*) producer.send(msgs.map(m => new KeyedMessage[String,String](topic, m)): _*)
@ -435,7 +452,9 @@ class AsyncProducerTest extends JUnit3Suite {
encoder = new StringEncoder(), encoder = new StringEncoder(),
keyEncoder = new NullEncoder[Int](), keyEncoder = new NullEncoder[Int](),
producerPool = producerPool, producerPool = producerPool,
topicPartitionInfos) topicPartitionInfos = topicPartitionInfos,
producerStats = new ProducerStats(""),
producerTopicStats = new ProducerTopicStats(""))
val data = msgs.map(m => new KeyedMessage[Int,String](topic1, 0, m)) ++ msgs.map(m => new KeyedMessage[Int,String](topic1, 1, m)) val data = msgs.map(m => new KeyedMessage[Int,String](topic1, 0, m)) ++ msgs.map(m => new KeyedMessage[Int,String](topic1, 1, m))
handler.handle(data) handler.handle(data)
handler.close() handler.close()

View File

@ -65,8 +65,9 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
props.put("host", "localhost") props.put("host", "localhost")
props.put("port", port1.toString) props.put("port", port1.toString)
consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024) consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024, "")
consumer2 = new SimpleConsumer("localhost", port2, 100, 64*1024) consumer2 = new SimpleConsumer("localhost", port2, 100, 64*1024, "")
// temporarily set request handler logger to a higher level // temporarily set request handler logger to a higher level
requestHandlerLogger.setLevel(Level.FATAL) requestHandlerLogger.setLevel(Level.FATAL)

View File

@ -23,8 +23,6 @@ import kafka.utils.TestUtils._
import kafka.utils.IntEncoder import kafka.utils.IntEncoder
import kafka.utils.{Utils, TestUtils} import kafka.utils.{Utils, TestUtils}
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import kafka.serializer._
import kafka.message.Message
import kafka.producer.{ProducerConfig, KeyedMessage, Producer} import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {

View File

@ -20,7 +20,7 @@ import java.io.File
import kafka.consumer.SimpleConsumer import kafka.consumer.SimpleConsumer
import org.junit.Test import org.junit.Test
import junit.framework.Assert._ import junit.framework.Assert._
import kafka.message.{Message, ByteBufferMessageSet} import kafka.message.ByteBufferMessageSet
import org.scalatest.junit.JUnit3Suite import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import kafka.producer._ import kafka.producer._
@ -66,10 +66,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
server.startup() server.startup()
producer = new Producer[Int, String](new ProducerConfig(producerConfig)) producer = new Producer[Int, String](new ProducerConfig(producerConfig))
val consumer = new SimpleConsumer(host, val consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
port,
1000000,
64*1024)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import junit.framework.Assert._
import collection.mutable.ArrayBuffer
import kafka.common.InvalidClientIdException
import org.junit.Test
class ClientIdTest {
@Test
def testInvalidClientIds() {
val invalidclientIds = new ArrayBuffer[String]()
invalidclientIds += (".", "..")
var longName = "ATCG"
for (i <- 1 to 6)
longName += longName
invalidclientIds += longName
val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', '.', ' ', '\t', '\r', '\n', '=')
for (weirdChar <- badChars) {
invalidclientIds += "Is" + weirdChar + "funny"
}
for (i <- 0 until invalidclientIds.size) {
try {
ClientId.validate(invalidclientIds(i))
fail("Should throw InvalidClientIdException.")
}
catch {
case e: InvalidClientIdException => "This is good."
}
}
val validClientIds = new ArrayBuffer[String]()
validClientIds += ("valid", "CLIENT", "iDs", "ar6", "VaL1d", "_0-9_", "")
for (i <- 0 until validClientIds.size) {
try {
ClientId.validate(validClientIds(i))
}
catch {
case e: Exception => fail("Should not throw exception.")
}
}
}
}

View File

@ -59,7 +59,8 @@ public class SimpleConsumerDemo {
SimpleConsumer simpleConsumer = new SimpleConsumer(KafkaProperties.kafkaServerURL, SimpleConsumer simpleConsumer = new SimpleConsumer(KafkaProperties.kafkaServerURL,
KafkaProperties.kafkaServerPort, KafkaProperties.kafkaServerPort,
KafkaProperties.connectionTimeOut, KafkaProperties.connectionTimeOut,
KafkaProperties.kafkaProducerBufferSize); KafkaProperties.kafkaProducerBufferSize,
KafkaProperties.clientId);
System.out.println("Testing single fetch"); System.out.println("Testing single fetch");
FetchRequest req = new FetchRequestBuilder() FetchRequest req = new FetchRequestBuilder()

View File

@ -42,7 +42,7 @@ object SimpleConsumerPerformance {
println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
} }
val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize) val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize, config.clientId)
// reset to latest or smallest offset // reset to latest or smallest offset
val topicAndPartition = TopicAndPartition(config.topic, config.partition) val topicAndPartition = TopicAndPartition(config.topic, config.partition)