diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java index b0e75bc18e7..8e98efc1cdf 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java @@ -109,7 +109,7 @@ public class KafkaETLContext { // read data from queue URI uri = _request.getURI(); - _consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), _timeout, _bufferSize); + _consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), _timeout, _bufferSize, "KafkaETLContext"); // get available offset range _offsetRange = getOffsetRange(); diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index 9892fb35627..4ed071afb2d 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -33,7 +33,7 @@ object FetchRequest { val CurrentVersion = 1.shortValue() val DefaultMaxWait = 0 val DefaultMinBytes = 0 - val ReplicaFetcherClientId = "replica fetcher" + val ReplicaFetcherClientId = "replica-fetcher" val DefaultCorrelationId = 0 def readFrom(buffer: ByteBuffer): FetchRequest = { diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index aeead2d8998..e6e7200a048 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -12,14 +12,14 @@ import kafka.utils.{Utils, Logging} */ object ClientUtils extends Logging{ - def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker]): TopicMetadataResponse = { + def fetchTopicMetadata(topics: Set[String], clientId: String, brokers: Seq[Broker]): TopicMetadataResponse = { var fetchMetaDataSucceeded: Boolean = false var i: Int = 0 val topicMetadataRequest = new TopicMetadataRequest(topics.toSeq) var topicMetadataResponse: TopicMetadataResponse = null var t: Throwable = null while(i < brokers.size && !fetchMetaDataSucceeded) { - val producer: SyncProducer = ProducerPool.createSyncProducer(None, brokers(i)) + val producer: SyncProducer = ProducerPool.createSyncProducer(clientId + "-FetchTopicMetadata", brokers(i)) info("Fetching metadata for topic %s".format(topics)) try { topicMetadataResponse = producer.send(topicMetadataRequest) diff --git a/core/src/main/scala/kafka/common/InvalidClientIdException.scala b/core/src/main/scala/kafka/common/InvalidClientIdException.scala new file mode 100644 index 00000000000..edf072d4e25 --- /dev/null +++ b/core/src/main/scala/kafka/common/InvalidClientIdException.scala @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +class InvalidClientIdException(message: String) extends RuntimeException(message) { + def this() = this(null) +} diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index bdc020b4b1e..a80fac93fbc 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -54,7 +54,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, try { trace("Partitions without leader %s".format(noLeaderPartitionSet)) val brokers = getAllBrokersInCluster(zkClient) - val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers).topicsMetadata + val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, config.clientId, brokers).topicsMetadata val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker] topicsMetadata.foreach( tmd => { diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index fd80104435b..c5062fc8565 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -34,7 +34,8 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk consumerTimeoutMs: Int, private val keyDecoder: Decoder[K], private val valueDecoder: Decoder[V], - val enableShallowIterator: Boolean) + val enableShallowIterator: Boolean, + val consumerTopicStats: ConsumerTopicStats) extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging { private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null) @@ -48,8 +49,8 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk currentTopicInfo.resetConsumeOffset(consumedOffset) val topic = currentTopicInfo.topic trace("Setting %s consumed offset to %d".format(topic, consumedOffset)) - ConsumerTopicStat.getConsumerTopicStat(topic).messageRate.mark() - ConsumerTopicStat.getConsumerAllTopicStat().messageRate.mark() + consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark() + consumerTopicStats.getConsumerAllTopicStats().messageRate.mark() item } diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala index 2c9c204e331..e69de29bb2d 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.consumer - -import kafka.utils.{Pool, threadsafe, Logging} -import java.util.concurrent.TimeUnit -import kafka.metrics.KafkaMetricsGroup - -@threadsafe -class ConsumerTopicStat(name: String) extends KafkaMetricsGroup { - val messageRate = newMeter(name + "MessagesPerSec", "messages", TimeUnit.SECONDS) - val byteRate = newMeter(name + "BytesPerSec", "bytes", TimeUnit.SECONDS) -} - -object ConsumerTopicStat extends Logging { - private val valueFactory = (k: String) => new ConsumerTopicStat(k) - private val stats = new Pool[String, ConsumerTopicStat](Some(valueFactory)) - private val allTopicStat = new ConsumerTopicStat("AllTopics") - - def getConsumerAllTopicStat(): ConsumerTopicStat = allTopicStat - - def getConsumerTopicStat(topic: String): ConsumerTopicStat = { - stats.getAndMaybePut(topic + "-") - } -} diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala new file mode 100644 index 00000000000..2a9d9fbf2b8 --- /dev/null +++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.consumer + +import kafka.utils.{ClientIdAndTopic, Pool, threadsafe, Logging} +import java.util.concurrent.TimeUnit +import kafka.metrics.KafkaMetricsGroup + +@threadsafe +class ConsumerTopicMetrics(clientIdTopic: ClientIdAndTopic) extends KafkaMetricsGroup { + val messageRate = newMeter(clientIdTopic + "-MessagesPerSec", "messages", TimeUnit.SECONDS) + val byteRate = newMeter(clientIdTopic + "-BytesPerSec", "bytes", TimeUnit.SECONDS) +} + +class ConsumerTopicStats(clientId: String) extends Logging { + private val valueFactory = (k: ClientIdAndTopic) => new ConsumerTopicMetrics(k) + private val stats = new Pool[ClientIdAndTopic, ConsumerTopicMetrics](Some(valueFactory)) + private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics")) + + def getConsumerAllTopicStats(): ConsumerTopicMetrics = allTopicStats + + def getConsumerTopicStats(topic: String): ConsumerTopicMetrics = { + stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic)) + } +} + diff --git a/core/src/main/scala/kafka/consumer/KafkaStream.scala b/core/src/main/scala/kafka/consumer/KafkaStream.scala index 115d41a9810..569f6df1236 100644 --- a/core/src/main/scala/kafka/consumer/KafkaStream.scala +++ b/core/src/main/scala/kafka/consumer/KafkaStream.scala @@ -26,11 +26,12 @@ class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int, private val keyDecoder: Decoder[K], private val valueDecoder: Decoder[V], - val enableShallowIterator: Boolean) + val enableShallowIterator: Boolean, + val consumerTopicStats: ConsumerTopicStats) extends Iterable[MessageAndMetadata[K,V]] with java.lang.Iterable[MessageAndMetadata[K,V]] { private val iter: ConsumerIterator[K,V] = - new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator) + new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator, consumerTopicStats) /** * Create an iterator over messages in the stream. diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala index 5249ddc3b66..8c42d118115 100644 --- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala +++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala @@ -28,7 +28,8 @@ class PartitionTopicInfo(val topic: String, private val chunkQueue: BlockingQueue[FetchedDataChunk], private val consumedOffset: AtomicLong, private val fetchedOffset: AtomicLong, - private val fetchSize: AtomicInteger) extends Logging { + private val fetchSize: AtomicInteger, + private val consumerTopicStats: ConsumerTopicStats) extends Logging { debug("initial consumer offset of " + this + " is " + consumedOffset.get) debug("initial fetch offset of " + this + " is " + fetchedOffset.get) @@ -58,8 +59,8 @@ class PartitionTopicInfo(val topic: String, chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get)) fetchedOffset.set(next) debug("updated fetch offset of (%s) to %d".format(this, next)) - ConsumerTopicStat.getConsumerTopicStat(topic).byteRate.mark(size) - ConsumerTopicStat.getConsumerAllTopicStat().byteRate.mark(size) + consumerTopicStats.getConsumerTopicStats(topic).byteRate.mark(size) + consumerTopicStats.getConsumerAllTopicStats().byteRate.mark(size) } } diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index d642a670f24..e42923ae56c 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -31,12 +31,12 @@ import kafka.cluster.Broker object SimpleConsumer extends Logging { def earliestOrLatestOffset(broker: Broker, topic: String, partitionId: Int, earliestOrLatest: Long, - isFromOrdinaryConsumer: Boolean): Long = { + clientId: String, isFromOrdinaryConsumer: Boolean): Long = { var simpleConsumer: SimpleConsumer = null var producedOffset: Long = -1L try { simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout, - ConsumerConfig.SocketBufferSize) + ConsumerConfig.SocketBufferSize, clientId) val topicAndPartition = TopicAndPartition(topic, partitionId) val request = if(isFromOrdinaryConsumer) new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1))) @@ -56,14 +56,14 @@ object SimpleConsumer extends Logging { } def earliestOrLatestOffset(zkClient: ZkClient, topic: String, brokerId: Int, partitionId: Int, - earliestOrLatest: Long, isFromOrdinaryConsumer: Boolean = true): Long = { + earliestOrLatest: Long, clientId: String, isFromOrdinaryConsumer: Boolean = true): Long = { val cluster = getCluster(zkClient) val broker = cluster.getBroker(brokerId) match { case Some(b) => b case None => throw new KafkaException("Broker " + brokerId + " is unavailable. Cannot issue " + "getOffsetsBefore request") } - earliestOrLatestOffset(broker, topic, partitionId, earliestOrLatest, isFromOrdinaryConsumer) + earliestOrLatestOffset(broker, topic, partitionId, earliestOrLatest, clientId, isFromOrdinaryConsumer) } } @@ -75,10 +75,13 @@ object SimpleConsumer extends Logging { class SimpleConsumer(val host: String, val port: Int, val soTimeout: Int, - val bufferSize: Int) extends Logging { + val bufferSize: Int, + val clientId: String) extends Logging { + ClientId.validate(clientId) private val lock = new Object() private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout) + private val fetchRequestAndResponseStats = new FetchRequestAndResponseStats(clientId, "host_" + host + "-port_" + port) private def connect(): BlockingChannel = { close @@ -143,12 +146,12 @@ class SimpleConsumer(val host: String, */ def fetch(request: FetchRequest): FetchResponse = { var response: Receive = null - FetchRequestAndResponseStat.requestTimer.time { + fetchRequestAndResponseStats.requestTimer.time { response = sendRequest(request) } val fetchResponse = FetchResponse.readFrom(response.buffer) val fetchedSize = fetchResponse.sizeInBytes - FetchRequestAndResponseStat.respondSizeHist.update(fetchedSize) + fetchRequestAndResponseStats.respondSizeHist.update(fetchedSize) fetchResponse } @@ -166,7 +169,7 @@ class SimpleConsumer(val host: String, } } -object FetchRequestAndResponseStat extends KafkaMetricsGroup { - val requestTimer = new KafkaTimer(newTimer("FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) - val respondSizeHist = newHistogram("FetchResponseSize") +class FetchRequestAndResponseStats(clientId: String, brokerInfo: String) extends KafkaMetricsGroup { + val requestTimer = new KafkaTimer(newTimer(clientId + "-" + brokerInfo + "-FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) + val respondSizeHist = newHistogram(clientId + "-" + brokerInfo + "-FetchResponseSize") } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 227c90db805..fbb82a21694 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -35,7 +35,6 @@ import kafka.client.ClientUtils import com.yammer.metrics.core.Gauge import kafka.api.OffsetRequest import kafka.metrics._ -import kafka.producer.ProducerConfig /** @@ -80,6 +79,8 @@ private[kafka] object ZookeeperConsumerConnector { private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val enableFetcher: Boolean) // for testing only extends ConsumerConnector with Logging with KafkaMetricsGroup { + + ClientId.validate(config.clientId) private val isShuttingDown = new AtomicBoolean(false) private val rebalanceLock = new Object private var fetcher: Option[ConsumerFetcherManager] = None @@ -94,6 +95,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null + private val consumerTopicStats = new ConsumerTopicStats(config.clientId) + val consumerIdString = { var consumerUuid : String = null config.consumerId match { @@ -195,7 +198,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, threadIdSet.map(_ => { val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks) val stream = new KafkaStream[K,V]( - queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator) + queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator, consumerTopicStats) (queue, stream) }) ).flatten.toList @@ -399,7 +402,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic val consumersPerTopicMap = getConsumersPerTopic(zkClient, group) val brokers = getAllBrokersInCluster(zkClient) - val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers).topicsMetadata + val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, config.clientId, brokers).topicsMetadata val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]] val leaderIdForPartitionsMap = new mutable.HashMap[(String, Int), Int] topicsMetadata.foreach(m =>{ @@ -595,9 +598,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, case None => config.autoOffsetReset match { case OffsetRequest.SmallestTimeString => - SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.EarliestTime) + SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.EarliestTime, config.clientId) case OffsetRequest.LargestTimeString => - SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.LatestTime) + SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.LatestTime, config.clientId) case _ => throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig") } @@ -611,7 +614,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, queue, consumedOffset, fetchedOffset, - new AtomicInteger(config.fetchSize)) + new AtomicInteger(config.fetchSize), + consumerTopicStats) partTopicInfoMap.put(partition, partTopicInfo) debug(partTopicInfo + " selected new offset " + offset) } @@ -667,7 +671,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val q = e._2._1 topicThreadIdAndQueues.put(topicThreadId, q) newGauge( - config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize", + config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize", new Gauge[Int] { def getValue = q.size } @@ -714,7 +718,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, config.consumerTimeoutMs, keyDecoder, valueDecoder, - config.enableShallowIterator) + config.enableShallowIterator, + consumerTopicStats) (queue, stream) }).toList diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 3b1a53faf11..4840c0c38ce 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -961,7 +961,7 @@ case class PartitionAndReplica(topic: String, partition: Int, replica: Int) case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) -object ControllerStat extends KafkaMetricsGroup { +object ControllerStats extends KafkaMetricsGroup { val offlinePartitionRate = newMeter("OfflinePartitionsPerSec", "partitions", TimeUnit.SECONDS) val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS) val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index 3e5435ee777..3eb23cd7d67 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -58,12 +58,12 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten .format(liveAssignedReplicasToThisPartition.mkString(","))) liveAssignedReplicasToThisPartition.isEmpty match { case true => - ControllerStat.offlinePartitionRate.mark() + ControllerStats.offlinePartitionRate.mark() throw new PartitionOfflineException(("No replica for partition " + "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, controllerContext.liveBrokerIds)) + " Assigned replicas are: [%s]".format(assignedReplicas)) case false => - ControllerStat.uncleanLeaderElectionRate.mark() + ControllerStats.uncleanLeaderElectionRate.mark() val newLeader = liveAssignedReplicasToThisPartition.head warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) + "There's potential data loss") @@ -78,7 +78,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten partition)) (newLeaderAndIsr, liveAssignedReplicasToThisPartition) case None => - ControllerStat.offlinePartitionRate.mark() + ControllerStats.offlinePartitionRate.mark() throw new PartitionOfflineException("Partition [%s, %d] doesn't have".format(topic, partition) + "replicas assigned to it") } diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 17dfbee00c0..02787826a9f 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -223,7 +223,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r)) liveAssignedReplicas.size match { case 0 => - ControllerStat.offlinePartitionRate.mark() + ControllerStats.offlinePartitionRate.mark() throw new StateChangeFailedException(("During state change of partition %s from NEW to ONLINE, assigned replicas are " + "[%s], live brokers are [%s]. No assigned replica is alive").format(topicAndPartition, replicaAssignment.mkString(","), controllerContext.liveBrokerIds)) @@ -249,7 +249,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { // read the controller epoch val leaderIsrAndEpoch = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition).get - ControllerStat.offlinePartitionRate.mark() + ControllerStats.offlinePartitionRate.mark() throw new StateChangeFailedException("Error while changing partition %s's state from New to Online" .format(topicAndPartition) + " since Leader and isr path already exists with value " + "%s and controller epoch %d".format(leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch)) diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 5c8aec5acf1..1753947124c 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -227,7 +227,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { class BrokerChangeListener() extends IZkChildListener with Logging { this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: " def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) { - ControllerStat.leaderElectionTimer.time { + ControllerStats.leaderElectionTimer.time { info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(","))) if(!isShuttingDown.get()) { controllerContext.controllerLock synchronized { diff --git a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala index 803ec4bd1d5..58c708114ea 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala @@ -29,8 +29,10 @@ import kafka.javaapi.OffsetRequest class SimpleConsumer(val host: String, val port: Int, val soTimeout: Int, - val bufferSize: Int) { - private val underlying = new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize) + val bufferSize: Int, + val clientId: String) { + + private val underlying = new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize, clientId) /** * Fetch a set of messages from a topic. This version of the fetch method diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala index 2b20aa44462..14c4c8ad086 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala @@ -16,7 +16,6 @@ */ package kafka.javaapi.consumer -import kafka.message.Message import kafka.serializer._ import kafka.consumer._ import scala.collection.JavaConversions.asList diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 4cb244550ed..c2fccec2cff 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -24,7 +24,7 @@ import java.util.concurrent.atomic._ import kafka.utils._ import scala.math._ import java.text.NumberFormat -import kafka.server.BrokerTopicStat +import kafka.server.BrokerTopicStats import kafka.message._ import kafka.common._ import kafka.metrics.KafkaMetricsGroup @@ -244,8 +244,8 @@ private[kafka] class Log(val dir: File, if(messageSetInfo.count == 0) { (-1L, -1L) } else { - BrokerTopicStat.getBrokerTopicStat(topicName).messagesInRate.mark(messageSetInfo.count) - BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(messageSetInfo.count) + BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(messageSetInfo.count) + BrokerTopicStats.getBrokerAllTopicStats.messagesInRate.mark(messageSetInfo.count) // trim any invalid bytes or partial messages before appending it to the on-disk log var validMessages = trimInvalidBytes(messages) diff --git a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala index fda0b240334..ea9559f822c 100644 --- a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala +++ b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala @@ -24,7 +24,6 @@ import com.yammer.metrics.Metrics import java.io.File import com.yammer.metrics.reporting.CsvReporter import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicBoolean import kafka.utils.{Utils, VerifiableProperties, Logging} diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala index 27bae5ee3ab..938504acdac 100644 --- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala +++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala @@ -68,11 +68,11 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, /** * It updates the cache by issuing a get topic metadata request to a random broker. - * @param topic the topic for which the metadata is to be fetched + * @param topics the topics for which the metadata is to be fetched */ - def updateInfo(topics: Set[String]) = { + def updateInfo(topics: Set[String]) { var topicsMetadata: Seq[TopicMetadata] = Nil - val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers) + val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, producerConfig.clientId, brokers) topicsMetadata = topicMetadataResponse.topicsMetadata // throw partition specific exception topicsMetadata.foreach(tmd =>{ diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala index e7f50e4ce54..8c321150eb8 100644 --- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala @@ -20,7 +20,6 @@ package kafka.producer import scala.collection.JavaConversions._ import joptsimple._ import java.util.Properties -import java.util.regex._ import java.io._ import kafka.common._ import kafka.message._ diff --git a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala index d0a89eb5c1e..c82670e6163 100644 --- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala +++ b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala @@ -17,7 +17,6 @@ package kafka.producer -import kafka.utils.Utils import kafka.utils._ diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index 7b8926c1e7e..3bfd563fccb 100644 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -16,7 +16,7 @@ */ package kafka.producer -import async.{AsyncProducerStats, DefaultEventHandler, ProducerSendThread, EventHandler} +import async.{DefaultEventHandler, ProducerSendThread, EventHandler} import kafka.utils._ import java.util.Random import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} @@ -27,8 +27,11 @@ import kafka.metrics._ class Producer[K,V](config: ProducerConfig, - private val eventHandler: EventHandler[K,V]) // for testing only -extends Logging { + private val eventHandler: EventHandler[K,V], + private val producerStats: ProducerStats, + private val producerTopicStats: ProducerTopicStats) // only for unit testing + extends Logging { + private val hasShutdown = new AtomicBoolean(false) if (config.batchSize > config.queueSize) throw new InvalidConfigException("Batch size can't be larger than queue size.") @@ -47,25 +50,38 @@ extends Logging { queue, eventHandler, config.queueTime, - config.batchSize) + config.batchSize, + config.clientId) producerSendThread.start() case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async") } KafkaMetricsReporter.startReporters(config.props) + def this(t: (ProducerConfig, EventHandler[K,V], ProducerStats, ProducerTopicStats)) = + this(t._1, t._2, t._3, t._4) + def this(config: ProducerConfig) = - this(config, - new DefaultEventHandler[K,V](config, - Utils.createObject[Partitioner[K]](config.partitionerClass, config.props), - Utils.createObject[Encoder[V]](config.serializerClass, config.props), - Utils.createObject[Encoder[K]](config.keySerializerClass, config.props), - new ProducerPool(config))) + this { + ClientId.validate(config.clientId) + val producerStats = new ProducerStats(config.clientId) + val producerTopicStats = new ProducerTopicStats(config.clientId) + (config, + new DefaultEventHandler[K,V](config, + Utils.createObject[Partitioner[K]](config.partitionerClass, config.props), + Utils.createObject[Encoder[V]](config.serializerClass, config.props), + Utils.createObject[Encoder[K]](config.keySerializerClass, config.props), + new ProducerPool(config), + producerStats = producerStats, + producerTopicStats = producerTopicStats), + producerStats, + producerTopicStats) + } /** * Sends the data, partitioned by key to the topic using either the * synchronous or the asynchronous producer - * @param producerData the producer data object that encapsulates the topic, key and message data + * @param messages the producer data object that encapsulates the topic, key and message data */ def send(messages: KeyedMessage[K,V]*) { if (hasShutdown.get) @@ -79,8 +95,8 @@ extends Logging { private def recordStats(messages: Seq[KeyedMessage[K,V]]) { for (message <- messages) { - ProducerTopicStat.getProducerTopicStat(message.topic).messageRate.mark() - ProducerTopicStat.getProducerAllTopicStat.messageRate.mark() + producerTopicStats.getProducerTopicStats(message.topic).messageRate.mark() + producerTopicStats.getProducerAllTopicStats.messageRate.mark() } } @@ -105,7 +121,7 @@ extends Logging { } } if(!added) { - AsyncProducerStats.droppedMessageRate.mark() + producerStats.droppedMessageRate.mark() error("Event queue is full of unsent messages, could not send event: " + message.toString) throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString) }else { @@ -131,26 +147,27 @@ extends Logging { } @threadsafe -class ProducerTopicStat(name: String) extends KafkaMetricsGroup { - val messageRate = newMeter(name + "MessagesPerSec", "messages", TimeUnit.SECONDS) - val byteRate = newMeter(name + "BytesPerSec", "bytes", TimeUnit.SECONDS) +class ProducerTopicMetrics(clientIdTopic: ClientIdAndTopic) extends KafkaMetricsGroup { + val messageRate = newMeter(clientIdTopic + "-MessagesPerSec", "messages", TimeUnit.SECONDS) + val byteRate = newMeter(clientIdTopic + "-BytesPerSec", "bytes", TimeUnit.SECONDS) } -object ProducerTopicStat { - private val valueFactory = (k: String) => new ProducerTopicStat(k) - private val stats = new Pool[String, ProducerTopicStat](Some(valueFactory)) - private val allTopicStat = new ProducerTopicStat("AllTopics") +class ProducerTopicStats(clientId: String) { + private val valueFactory = (k: ClientIdAndTopic) => new ProducerTopicMetrics(k) + private val stats = new Pool[ClientIdAndTopic, ProducerTopicMetrics](Some(valueFactory)) + private val allTopicStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics")) - def getProducerAllTopicStat(): ProducerTopicStat = allTopicStat + def getProducerAllTopicStats(): ProducerTopicMetrics = allTopicStats - def getProducerTopicStat(topic: String): ProducerTopicStat = { - stats.getAndMaybePut(topic + "-") + def getProducerTopicStats(topic: String): ProducerTopicMetrics = { + stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic)) } } -object ProducerStats extends KafkaMetricsGroup { - val serializationErrorRate = newMeter("SerializationErrorsPerSec", "errors", TimeUnit.SECONDS) - val resendRate = newMeter( "ResendsPerSec", "resends", TimeUnit.SECONDS) - val failedSendRate = newMeter("FailedSendsPerSec", "failed sends", TimeUnit.SECONDS) +class ProducerStats(clientId: String) extends KafkaMetricsGroup { + val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec", "errors", TimeUnit.SECONDS) + val resendRate = newMeter(clientId + "-ResendsPerSec", "resends", TimeUnit.SECONDS) + val failedSendRate = newMeter(clientId + "-FailedSendsPerSec", "failed sends", TimeUnit.SECONDS) + val droppedMessageRate = newMeter(clientId + "-DroppedMessagesPerSec", "drops", TimeUnit.SECONDS) } diff --git a/core/src/main/scala/kafka/producer/ProducerPool.scala b/core/src/main/scala/kafka/producer/ProducerPool.scala index eb8ead336df..7e78c7ee179 100644 --- a/core/src/main/scala/kafka/producer/ProducerPool.scala +++ b/core/src/main/scala/kafka/producer/ProducerPool.scala @@ -26,13 +26,26 @@ import kafka.api.TopicMetadata import kafka.common.UnavailableProducerException -object ProducerPool{ - def createSyncProducer(configOpt: Option[ProducerConfig], broker: Broker): SyncProducer = { +object ProducerPool { + /** + * Used in ProducerPool to initiate a SyncProducer connection with a broker. + */ + def createSyncProducer(config: ProducerConfig, broker: Broker): SyncProducer = { val props = new Properties() props.put("host", broker.host) props.put("port", broker.port.toString) - if(configOpt.isDefined) - props.putAll(configOpt.get.props.props) + props.putAll(config.props.props) + new SyncProducer(new SyncProducerConfig(props)) + } + + /** + * Used in ClientUtils to send TopicMetadataRequest to a broker. + */ + def createSyncProducer(clientId: String, broker: Broker): SyncProducer = { + val props = new Properties() + props.put("host", broker.host) + props.put("port", broker.port.toString) + props.put("client.id", clientId) new SyncProducer(new SyncProducerConfig(props)) } } @@ -41,9 +54,9 @@ class ProducerPool(val config: ProducerConfig) extends Logging { private val syncProducers = new HashMap[Int, SyncProducer] private val lock = new Object() - def updateProducer(topicMetaDatas: Seq[TopicMetadata]) { + def updateProducer(topicMetadatas: Seq[TopicMetadata]) { val newBrokers = new collection.mutable.HashSet[Broker] - topicMetaDatas.foreach(tmd => { + topicMetadatas.foreach(tmd => { tmd.partitionsMetadata.foreach(pmd => { if(pmd.leader.isDefined) newBrokers+=(pmd.leader.get) @@ -53,9 +66,9 @@ class ProducerPool(val config: ProducerConfig) extends Logging { newBrokers.foreach(b => { if(syncProducers.contains(b.id)){ syncProducers(b.id).close() - syncProducers.put(b.id, ProducerPool.createSyncProducer(Some(config), b)) + syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b)) } else - syncProducers.put(b.id, ProducerPool.createSyncProducer(Some(config), b)) + syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b)) }) } } diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index 15733bbdeab..ea03d51e7c4 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -34,14 +34,12 @@ object SyncProducer { */ @threadsafe class SyncProducer(val config: SyncProducerConfig) extends Logging { - - private val MaxConnectBackoffMs = 60000 - private var sentOnConnection = 0 private val lock = new Object() @volatile private var shutdown: Boolean = false private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize, config.bufferSize, config.requestTimeoutMs) + val producerRequestStats = new ProducerRequestStats(config.clientId, "host_" + config.host + "-port_" + config.port) trace("Instantiating Scala Sync Producer") @@ -89,9 +87,9 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { * Send a message */ def send(producerRequest: ProducerRequest): ProducerResponse = { - ProducerRequestStat.requestSizeHist.update(producerRequest.sizeInBytes) + producerRequestStats.requestSizeHist.update(producerRequest.sizeInBytes) var response: Receive = null - ProducerRequestStat.requestTimer.time { + producerRequestStats.requestTimer.time { response = doSend(producerRequest) } ProducerResponse.readFrom(response.buffer) @@ -152,7 +150,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { } } -object ProducerRequestStat extends KafkaMetricsGroup { - val requestTimer = new KafkaTimer(newTimer("ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) - val requestSizeHist = newHistogram("ProducerRequestSize") +class ProducerRequestStats(clientId: String, brokerInfo: String) extends KafkaMetricsGroup { + val requestTimer = new KafkaTimer(newTimer(clientId + "-" + brokerInfo + "-ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) + val requestSizeHist = newHistogram(clientId + "-" + brokerInfo + "-ProducerRequestSize") } diff --git a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala index 6f6a3f36961..3e3dc498768 100644 --- a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala @@ -44,7 +44,7 @@ trait SyncProducerConfigShared { val correlationId = props.getInt("producer.request.correlation_id", SyncProducerConfig.DefaultCorrelationId) /* the client application sending the producer requests */ - val clientId = props.getString("producer.request.client_id",SyncProducerConfig.DefaultClientId) + val clientId = props.getString("clientid", SyncProducerConfig.DefaultClientId) /* * The required acks of the producer requests - negative value means ack diff --git a/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala b/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala index dd9078e48a9..e69de29bb2d 100644 --- a/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala +++ b/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package kafka.producer.async - -import kafka.metrics.KafkaMetricsGroup -import java.util.concurrent.TimeUnit - -object AsyncProducerStats extends KafkaMetricsGroup { - val droppedMessageRate = newMeter("DroppedMessagesPerSec", "drops", TimeUnit.SECONDS) -} diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 9be87d0ab33..645402eeae0 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -33,7 +33,9 @@ class DefaultEventHandler[K,V](config: ProducerConfig, private val encoder: Encoder[V], private val keyEncoder: Encoder[K], private val producerPool: ProducerPool, - private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata]) + private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata], + private val producerStats: ProducerStats, + private val producerTopicStats: ProducerTopicStats) extends EventHandler[K,V] with Logging { val isSync = ("sync" == config.producerType) @@ -48,8 +50,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig, serializedData.foreach{ keyed => val dataSize = keyed.message.payloadSize - ProducerTopicStat.getProducerTopicStat(keyed.topic).byteRate.mark(dataSize) - ProducerTopicStat.getProducerAllTopicStat.byteRate.mark(dataSize) + producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize) + producerTopicStats.getProducerAllTopicStats.byteRate.mark(dataSize) } var outstandingProduceRequests = serializedData var remainingRetries = config.producerRetries + 1 @@ -61,11 +63,11 @@ class DefaultEventHandler[K,V](config: ProducerConfig, // get topics of the outstanding produce requests and refresh metadata for those Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet)) remainingRetries -= 1 - ProducerStats.resendRate.mark() + producerStats.resendRate.mark() } } if(outstandingProduceRequests.size > 0) { - ProducerStats.failedSendRate.mark() + producerStats.failedSendRate.mark() error("Failed to send the following requests: " + outstandingProduceRequests) throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null) } @@ -111,7 +113,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = null.asInstanceOf[K], message = new Message(bytes = encoder.toBytes(e.message))) } catch { case t => - ProducerStats.serializationErrorRate.mark() + producerStats.serializationErrorRate.mark() if (isSync) { throw t } else { diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala index c900c4571c5..2b39cab158a 100644 --- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala +++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala @@ -28,12 +28,13 @@ class ProducerSendThread[K,V](val threadName: String, val queue: BlockingQueue[KeyedMessage[K,V]], val handler: EventHandler[K,V], val queueTime: Long, - val batchSize: Int) extends Thread(threadName) with Logging with KafkaMetricsGroup { + val batchSize: Int, + val clientId: String) extends Thread(threadName) with Logging with KafkaMetricsGroup { private val shutdownLatch = new CountDownLatch(1) private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V]) - newGauge("ProducerQueueSize-" + getId, + newGauge(clientId + "-ProducerQueueSize-" + getId, new Gauge[Int] { def getValue = queue.size }) diff --git a/core/src/main/scala/kafka/serializer/Decoder.scala b/core/src/main/scala/kafka/serializer/Decoder.scala index 6fc3b1d0e88..54d0b935704 100644 --- a/core/src/main/scala/kafka/serializer/Decoder.scala +++ b/core/src/main/scala/kafka/serializer/Decoder.scala @@ -17,7 +17,6 @@ package kafka.serializer -import kafka.message._ import kafka.utils.VerifiableProperties /** diff --git a/core/src/main/scala/kafka/serializer/Encoder.scala b/core/src/main/scala/kafka/serializer/Encoder.scala index fa9067ffb80..020e73c72a3 100644 --- a/core/src/main/scala/kafka/serializer/Encoder.scala +++ b/core/src/main/scala/kafka/serializer/Encoder.scala @@ -18,8 +18,6 @@ package kafka.serializer import kafka.utils.VerifiableProperties -import kafka.message._ -import kafka.utils.Utils /** * An encoder is a method of turning objects into byte arrays. diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index c2cc3cbdf39..e4520a4ac1e 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -27,7 +27,7 @@ import kafka.api.{FetchResponse, FetchResponsePartitionData, FetchRequestBuilder import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge import java.util.concurrent.atomic.AtomicLong -import kafka.utils.{Pool, ShutdownableThread} +import kafka.utils.{ClientIdAndTopic, Pool, ShutdownableThread} import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock @@ -38,12 +38,13 @@ import java.util.concurrent.locks.ReentrantLock abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int, fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1) extends ShutdownableThread(name) { - private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map private val partitionMapLock = new ReentrantLock private val partitionMapCond = partitionMapLock.newCondition() - val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize) - val fetcherMetrics = FetcherStat.getFetcherStat(name + "-" + sourceBroker.id) + val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId) + val fetcherStats = new FetcherStats(clientId) + val fetcherMetrics = fetcherStats.getFetcherStats(name + "-" + sourceBroker.id) + val fetcherLagStats = new FetcherLagStats(clientId) /* callbacks to be defined in subclass */ @@ -117,7 +118,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke case None => currentOffset.get } partitionMap.put(topicAndPartition, newOffset) - FetcherLagMetrics.getFetcherLagMetrics(topic, partitionId).lag = partitionData.hw - newOffset + fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset fetcherMetrics.byteRate.mark(validBytes) // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread processPartitionData(topicAndPartition, currentOffset.get, partitionData) @@ -182,10 +183,10 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } } -class FetcherLagMetrics(name: (String, Int)) extends KafkaMetricsGroup { +class FetcherLagMetrics(clientIdTopicPartition: ClientIdTopicPartition) extends KafkaMetricsGroup { private[this] var lagVal = new AtomicLong(-1L) newGauge( - name._1 + "-" + name._2 + "-ConsumerLag", + clientIdTopicPartition + "-ConsumerLag", new Gauge[Long] { def getValue = lagVal.get } @@ -198,25 +199,29 @@ class FetcherLagMetrics(name: (String, Int)) extends KafkaMetricsGroup { def lag = lagVal.get } -object FetcherLagMetrics { - private val valueFactory = (k: (String, Int)) => new FetcherLagMetrics(k) - private val stats = new Pool[(String, Int), FetcherLagMetrics](Some(valueFactory)) +class FetcherLagStats(clientId: String) { + private val valueFactory = (k: ClientIdTopicPartition) => new FetcherLagMetrics(k) + private val stats = new Pool[ClientIdTopicPartition, FetcherLagMetrics](Some(valueFactory)) - def getFetcherLagMetrics(topic: String, partitionId: Int): FetcherLagMetrics = { - stats.getAndMaybePut( (topic, partitionId) ) + def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = { + stats.getAndMaybePut(new ClientIdTopicPartition(clientId, topic, partitionId)) } } -class FetcherStat(name: String) extends KafkaMetricsGroup { - val requestRate = newMeter(name + "RequestsPerSec", "requests", TimeUnit.SECONDS) - val byteRate = newMeter(name + "BytesPerSec", "bytes", TimeUnit.SECONDS) +class FetcherMetrics(clientIdTopic: ClientIdAndTopic) extends KafkaMetricsGroup { + val requestRate = newMeter(clientIdTopic + "-RequestsPerSec", "requests", TimeUnit.SECONDS) + val byteRate = newMeter(clientIdTopic + "-BytesPerSec", "bytes", TimeUnit.SECONDS) } -object FetcherStat { - private val valueFactory = (k: String) => new FetcherStat(k) - private val stats = new Pool[String, FetcherStat](Some(valueFactory)) +class FetcherStats(clientId: String) { + private val valueFactory = (k: ClientIdAndTopic) => new FetcherMetrics(k) + private val stats = new Pool[ClientIdAndTopic, FetcherMetrics](Some(valueFactory)) - def getFetcherStat(name: String): FetcherStat = { - stats.getAndMaybePut(name) + def getFetcherStats(name: String): FetcherMetrics = { + stats.getAndMaybePut(new ClientIdAndTopic(clientId, name)) } } + +case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) { + override def toString = "%s-%s-%d".format(clientId, topic, partitionId) +} diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index a14e0a2f49b..cc04ed5ab20 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -237,8 +237,8 @@ class KafkaApis(val requestChannel: RequestChannel, private def appendToLocalLog(partitionAndData: Map[TopicAndPartition, MessageSet]): Iterable[ProduceResult] = { trace("Append [%s] to local log ".format(partitionAndData.toString)) partitionAndData.map {case (topicAndPartition, messages) => - BrokerTopicStat.getBrokerTopicStat(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes) - BrokerTopicStat.getBrokerAllTopicStat.bytesInRate.mark(messages.sizeInBytes) + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes) + BrokerTopicStats.getBrokerAllTopicStats.bytesInRate.mark(messages.sizeInBytes) try { val localReplica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition) @@ -255,8 +255,8 @@ class KafkaApis(val requestChannel: RequestChannel, Runtime.getRuntime.halt(1) null case e => - BrokerTopicStat.getBrokerTopicStat(topicAndPartition.topic).failedProduceRequestRate.mark() - BrokerTopicStat.getBrokerAllTopicStat.failedProduceRequestRate.mark() + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() + BrokerTopicStats.getBrokerAllTopicStats.failedProduceRequestRate.mark() error("Error processing ProducerRequest on %s:%d".format(topicAndPartition.topic, topicAndPartition.partition), e) new ProduceResult(topicAndPartition, e) } @@ -323,8 +323,8 @@ class KafkaApis(val requestChannel: RequestChannel, val partitionData = try { val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId) - BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes) - BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes) + BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes) + BrokerTopicStats.getBrokerAllTopicStats.bytesOutRate.mark(messages.sizeInBytes) if (!isFetchFromFollower) { new FetchResponsePartitionData(ErrorMapping.NoError, offset, highWatermark, messages) } else { @@ -334,8 +334,8 @@ class KafkaApis(val requestChannel: RequestChannel, } } catch { case t: Throwable => - BrokerTopicStat.getBrokerTopicStat(topic).failedFetchRequestRate.mark() - BrokerTopicStat.getBrokerAllTopicStat.failedFetchRequestRate.mark() + BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() + BrokerTopicStats.getBrokerAllTopicStats.failedFetchRequestRate.mark() error("error when processing request " + (topic, partition, offset, fetchSize), t) new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), offset, -1L, MessageSet.Empty) diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index d94965253da..f0c05a5395f 100644 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -79,14 +79,14 @@ class BrokerTopicMetrics(name: String) extends KafkaMetricsGroup { val failedFetchRequestRate = newMeter(name + "FailedFetchRequestsPerSec", "requests", TimeUnit.SECONDS) } -object BrokerTopicStat extends Logging { +object BrokerTopicStats extends Logging { private val valueFactory = (k: String) => new BrokerTopicMetrics(k) private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory)) - private val allTopicStat = new BrokerTopicMetrics("AllTopics") + private val allTopicStats = new BrokerTopicMetrics("AllTopics") - def getBrokerAllTopicStat(): BrokerTopicMetrics = allTopicStat + def getBrokerAllTopicStats(): BrokerTopicMetrics = allTopicStats - def getBrokerTopicStat(topic: String): BrokerTopicMetrics = { + def getBrokerTopicStats(topic: String): BrokerTopicMetrics = { stats.getAndMaybePut(topic + "-") } } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index e0a86b966b9..d444d221760 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -23,7 +23,7 @@ import kafka.utils._ import java.util.concurrent._ import atomic.AtomicBoolean import org.I0Itec.zkclient.ZkClient -import kafka.controller.{ControllerStat, KafkaController} +import kafka.controller.{ControllerStats, KafkaController} /** * Represents the lifecycle of a single Kafka broker. Handles all functionality required @@ -96,9 +96,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg * Forces some dynamic jmx beans to be registered on server startup. */ private def registerStats() { - BrokerTopicStat.getBrokerAllTopicStat() - ControllerStat.offlinePartitionRate - ControllerStat.uncleanLeaderElectionRate + BrokerTopicStats.getBrokerAllTopicStats() + ControllerStats.offlinePartitionRate + ControllerStats.uncleanLeaderElectionRate } /** diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 40afcabbe5e..34166e44b3d 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -28,7 +28,7 @@ class ReplicaFetcherThread(name:String, brokerConfig: KafkaConfig, replicaMgr: ReplicaManager) extends AbstractFetcherThread(name = name, - clientId = FetchRequest.ReplicaFetcherClientId + "- %s:%d".format(sourceBroker.host, sourceBroker.port) , + clientId = FetchRequest.ReplicaFetcherClientId + "-host_%s-port_%d".format(sourceBroker.host, sourceBroker.port), sourceBroker = sourceBroker, socketTimeout = brokerConfig.replicaSocketTimeoutMs, socketBufferSize = brokerConfig.replicaSocketBufferSize, diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index 9ca4dc82f29..db9acc9f19b 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -41,7 +41,7 @@ object ConsumerOffsetChecker extends Logging { val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))._1 val consumer = brokerInfo match { case BrokerIpPattern(ip, port) => - Some(new SimpleConsumer(ip, port.toInt, 10000, 100000)) + Some(new SimpleConsumer(ip, port.toInt, 10000, 100000, "ConsumerOffsetChecker")) case _ => error("Could not parse broker info %s".format(brokerInfo)) None diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index e78d53d2b48..2b9438ae3a4 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -67,7 +67,7 @@ object GetOffsetShell { val partition = options.valueOf(partitionOpt).intValue var time = options.valueOf(timeOpt).longValue val nOffsets = options.valueOf(nOffsetsOpt).intValue - val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 100000) + val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 100000, "GetOffsetShell") val topicAndPartition = TopicAndPartition(topic, partition) val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets))) val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index aa235597825..5c4b3d29d61 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -17,7 +17,6 @@ package kafka.tools -import kafka.message.Message import joptsimple.OptionParser import kafka.utils.{Utils, CommandLineUtils, Logging} import kafka.producer.{KeyedMessage, ProducerConfig, Producer} diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala index 79ffcc5649c..db14c825af5 100644 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -24,7 +24,7 @@ import kafka.producer.{KeyedMessage, ProducerConfig, Producer} import kafka.consumer._ import kafka.utils.{Logging, ZkUtils} import kafka.api.OffsetRequest -import kafka.message.{CompressionCodec, Message} +import kafka.message.CompressionCodec object ReplayLogProducer extends Logging { diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index e34a4326cc4..addd8db08f0 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -19,12 +19,10 @@ package kafka.tools import joptsimple._ import kafka.utils._ -import kafka.producer.ProducerConfig import kafka.consumer._ import kafka.client.ClientUtils import kafka.api.{OffsetRequest, FetchRequestBuilder, Request} import kafka.cluster.Broker -import java.util.Properties import scala.collection.JavaConversions._ /** @@ -127,7 +125,7 @@ object SimpleConsumerShell extends Logging { // getting topic metadata info("Getting topic metatdata...") val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) - val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers).topicsMetadata + val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), clientId, metadataTargetBrokers).topicsMetadata if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata)) System.exit(1) @@ -167,7 +165,7 @@ object SimpleConsumerShell extends Logging { System.exit(1) } if(startingOffset < 0) - startingOffset = SimpleConsumer.earliestOrLatestOffset(fetchTargetBroker, topic, partitionId, startingOffset, false) + startingOffset = SimpleConsumer.earliestOrLatestOffset(fetchTargetBroker, topic, partitionId, startingOffset, clientId, false) // initializing formatter val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] @@ -175,7 +173,7 @@ object SimpleConsumerShell extends Logging { info("Starting simple consumer shell to partition [%s, %d], replica [%d], host and port: [%s, %d], from offset [%d]" .format(topic, partitionId, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset)) - val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024) + val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024, clientId) val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() { def run() { var offset = startingOffset diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala index fed7aad42c3..111c9a8b94c 100644 --- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala +++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala @@ -65,7 +65,7 @@ object UpdateOffsetsInZK { ZkUtils.getBrokerInfo(zkClient, broker) match { case Some(brokerInfo) => - val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024) + val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024, "UpdateOffsetsInZk") val topicAndPartition = TopicAndPartition(topic, partition) val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(offsetOption, 1))) val offset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head diff --git a/core/src/main/scala/kafka/utils/ClientIdAndTopic.scala b/core/src/main/scala/kafka/utils/ClientIdAndTopic.scala new file mode 100644 index 00000000000..780339ecd89 --- /dev/null +++ b/core/src/main/scala/kafka/utils/ClientIdAndTopic.scala @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +import kafka.common.InvalidTopicException +import kafka.common.InvalidClientIdException +import util.matching.Regex + +object ClientId { + val legalChars = "[a-zA-Z0-9_-]" + val maxNameLength = 200 // to prevent hitting filename max length limit + private val rgx = new Regex(legalChars + "*") + + def validate(clientId: String) { + if (clientId.length > maxNameLength) + throw new InvalidClientIdException("ClientId is illegal, can't be longer than " + maxNameLength + " characters") + + rgx.findFirstIn(clientId) match { + case Some(t) => + if (!t.equals(clientId)) + throw new InvalidClientIdException("ClientId " + clientId + " is illegal, contains a character other than ASCII alphanumerics, _ and -") + case None => throw new InvalidClientIdException("ClientId " + clientId + " is illegal, contains a character other than ASCII alphanumerics, _ and -") + } + } +} + +object Topic { + val legalChars = "[a-zA-Z0-9_-]" + val maxNameLength = 255 + private val rgx = new Regex(legalChars + "+") + + def validate(topic: String) { + if (topic.length <= 0) + throw new InvalidTopicException("topic name is illegal, can't be empty") + else if (topic.length > maxNameLength) + throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxNameLength + " characters") + + rgx.findFirstIn(topic) match { + case Some(t) => + if (!t.equals(topic)) + throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -") + case None => throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -") + } + } +} + +case class ClientIdAndTopic(clientId: String, topic:String) { + override def toString = "%s-%s".format(clientId, topic) +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/utils/Topic.scala b/core/src/main/scala/kafka/utils/Topic.scala index fe79adff53a..e69de29bb2d 100644 --- a/core/src/main/scala/kafka/utils/Topic.scala +++ b/core/src/main/scala/kafka/utils/Topic.scala @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - -import kafka.common.InvalidTopicException -import util.matching.Regex - -object Topic { - val legalChars = "[a-zA-Z0-9_-]" - val maxNameLength = 255 - private val rgx = new Regex(legalChars + "+") - - def validate(topic: String) { - if (topic.length <= 0) - throw new InvalidTopicException("topic name is illegal, can't be empty") - else if (topic.length > maxNameLength) - throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxNameLength + " characters") - - rgx.findFirstIn(topic) match { - case Some(t) => - if (!t.equals(topic)) - throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -") - case None => throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -") - } - } -} diff --git a/core/src/test/scala/other/kafka/TestKafkaAppender.scala b/core/src/test/scala/other/kafka/TestKafkaAppender.scala index bd09d786800..ab807a1c35d 100644 --- a/core/src/test/scala/other/kafka/TestKafkaAppender.scala +++ b/core/src/test/scala/other/kafka/TestKafkaAppender.scala @@ -17,7 +17,6 @@ package kafka -import message.Message import org.apache.log4j.PropertyConfigurator import kafka.utils.Logging import serializer.Encoder diff --git a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala b/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala index 7d484583c03..5b72eeda35b 100644 --- a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala +++ b/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala @@ -18,7 +18,6 @@ package kafka import consumer._ -import message.Message import utils.Utils import java.util.concurrent.CountDownLatch diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index 962d5f9eb88..bb39e0985e6 100644 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -55,7 +55,8 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { queue, new AtomicLong(consumedOffset), new AtomicLong(0), - new AtomicInteger(0))) + new AtomicInteger(0), + new ConsumerTopicStats(""))) val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0)) override def setUp() { @@ -78,7 +79,8 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { consumerConfig.consumerTimeoutMs, new StringDecoder(), new StringDecoder(), - enableShallowIterator = false) + enableShallowIterator = false, + consumerTopicStats = new ConsumerTopicStats("")) var receivedMessages = (0 until 5).map(i => iter.next.message).toList assertFalse(iter.hasNext) diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala index 8c7f7745992..d7945a510dc 100644 --- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala +++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala @@ -24,7 +24,6 @@ import kafka.server._ import org.apache.log4j.{Level, Logger} import org.scalatest.junit.JUnit3Suite import kafka.utils.TestUtils -import kafka.message.Message import kafka.serializer._ import kafka.producer.{Producer, KeyedMessage} diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index df754cc14ea..dec04537c0d 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -23,7 +23,6 @@ import scala.collection._ import junit.framework.Assert._ import kafka.cluster._ -import kafka.message._ import kafka.server._ import org.scalatest.junit.JUnit3Suite import kafka.consumer._ @@ -50,7 +49,8 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { queue, new AtomicLong(0), new AtomicLong(0), - new AtomicInteger(0))) + new AtomicInteger(0), + new ConsumerTopicStats(""))) var fetcher: ConsumerFetcherManager = null diff --git a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala b/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala index 4411f45275f..c4866eb2064 100644 --- a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala +++ b/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala @@ -18,7 +18,7 @@ package kafka.integration import kafka.api.FetchRequestBuilder -import kafka.message.{Message, ByteBufferMessageSet} +import kafka.message.ByteBufferMessageSet import kafka.server.{KafkaRequestHandler, KafkaConfig} import org.apache.log4j.{Level, Logger} import org.junit.Assert._ diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 72902ba8b50..402fced5ea9 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -25,7 +25,6 @@ import java.util.Properties import kafka.utils.Utils import kafka.producer.{KeyedMessage, Producer, ProducerConfig} import kafka.serializer._ -import kafka.message.Message import kafka.utils.TestUtils import org.apache.log4j.{Level, Logger} import org.I0Itec.zkclient.ZkClient diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala index fd9fae5e33f..caea8586bfe 100644 --- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala @@ -21,7 +21,6 @@ import kafka.consumer.SimpleConsumer import org.scalatest.junit.JUnit3Suite import java.util.Properties import kafka.producer.{ProducerConfig, Producer} -import kafka.message.Message import kafka.utils.TestUtils import kafka.serializer._ @@ -44,10 +43,7 @@ trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarnes props.put("producer.request.required.acks", "-1") props.put("serializer.class", classOf[StringEncoder].getName.toString) producer = new Producer(new ProducerConfig(props)) - consumer = new SimpleConsumer(host, - port, - 1000000, - 64*1024) + consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "") } override def tearDown() { diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index 96648768098..9f243f0ab1e 100644 --- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -29,7 +29,7 @@ import kafka.producer.KeyedMessage import kafka.javaapi.producer.Producer import kafka.utils.IntEncoder import kafka.utils.TestUtils._ -import kafka.utils.{Utils, Logging, TestUtils} +import kafka.utils.{Logging, TestUtils} import kafka.consumer.{KafkaStream, ConsumerConfig} import kafka.zk.ZooKeeperTestHarness diff --git a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala index f3a272e8e3e..b6bab2d16ae 100644 --- a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala @@ -53,7 +53,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { logDir = new File(logDirPath) time = new MockTime() server = TestUtils.createServer(new KafkaConfig(config), time) - simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024) + simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024, "") } @After diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala index da3c704f941..c25255fd7ae 100644 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -24,7 +24,6 @@ import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.{TestUtils, Utils, Logging} import junit.framework.Assert._ import kafka.api.FetchRequestBuilder -import kafka.message.Message import kafka.producer.async.MissingConfigException import kafka.serializer.Encoder import kafka.zk.ZooKeeperTestHarness @@ -57,7 +56,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with logDirZk = new File(logDirZkPath) config = new KafkaConfig(propsZk) serverZk = TestUtils.createServer(config); - simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024) + simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024, "") } @After diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 19f4c3b2421..d67abe94693 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -17,7 +17,7 @@ package kafka.producer -import java.util.{LinkedList, Properties} +import java.util.Properties import java.util.concurrent.LinkedBlockingQueue import junit.framework.Assert._ import org.easymock.EasyMock @@ -68,7 +68,10 @@ class AsyncProducerTest extends JUnit3Suite { val config = new ProducerConfig(props) val produceData = getProduceData(12) - val producer = new Producer[String, String](config, mockEventHandler) + val producer = new Producer[String, String](config, + mockEventHandler, + new ProducerStats(""), + new ProducerTopicStats("")) try { // send all 10 messages, should hit the batch size and then reach broker producer.send(produceData: _*) @@ -118,7 +121,7 @@ class AsyncProducerTest extends JUnit3Suite { val queue = new LinkedBlockingQueue[KeyedMessage[String,String]](10) val producerSendThread = - new ProducerSendThread[String,String]("thread1", queue, mockHandler, Integer.MAX_VALUE, 5) + new ProducerSendThread[String,String]("thread1", queue, mockHandler, Integer.MAX_VALUE, 5, "") producerSendThread.start() for (producerData <- producerDataList) @@ -143,7 +146,7 @@ class AsyncProducerTest extends JUnit3Suite { val queueExpirationTime = 200 val queue = new LinkedBlockingQueue[KeyedMessage[String,String]](10) val producerSendThread = - new ProducerSendThread[String,String]("thread1", queue, mockHandler, queueExpirationTime, 5) + new ProducerSendThread[String,String]("thread1", queue, mockHandler, queueExpirationTime, 5, "") producerSendThread.start() for (producerData <- producerDataList) @@ -185,11 +188,13 @@ class AsyncProducerTest extends JUnit3Suite { val producerPool = new ProducerPool(config) val handler = new DefaultEventHandler[Int,String](config, - partitioner = intPartitioner, - encoder = null.asInstanceOf[Encoder[String]], - keyEncoder = new IntEncoder(), - producerPool = producerPool, - topicPartitionInfos) + partitioner = intPartitioner, + encoder = null.asInstanceOf[Encoder[String]], + keyEncoder = new IntEncoder(), + producerPool = producerPool, + topicPartitionInfos = topicPartitionInfos, + producerStats = new ProducerStats(""), + producerTopicStats = new ProducerTopicStats("")) val topic1Broker1Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", 0, new Message("msg1".getBytes)), @@ -228,8 +233,9 @@ class AsyncProducerTest extends JUnit3Suite { encoder = new StringEncoder, keyEncoder = new StringEncoder, producerPool = producerPool, - topicPartitionInfos - ) + topicPartitionInfos = topicPartitionInfos, + producerStats = new ProducerStats(""), + producerTopicStats = new ProducerTopicStats("")) val serializedData = handler.serialize(produceData) val deserializedData = serializedData.map(d => new KeyedMessage[String,String](d.topic, Utils.readString(d.message.payload))) @@ -257,7 +263,9 @@ class AsyncProducerTest extends JUnit3Suite { encoder = null.asInstanceOf[Encoder[String]], keyEncoder = null.asInstanceOf[Encoder[String]], producerPool = producerPool, - topicPartitionInfos) + topicPartitionInfos = topicPartitionInfos, + producerStats = new ProducerStats(""), + producerTopicStats = new ProducerTopicStats("")) try { handler.partitionAndCollate(producerDataList) fail("Should fail with UnknownTopicOrPartitionException") @@ -288,7 +296,9 @@ class AsyncProducerTest extends JUnit3Suite { encoder = new StringEncoder, keyEncoder = new StringEncoder, producerPool = producerPool, - topicPartitionInfos) + topicPartitionInfos = topicPartitionInfos, + producerStats = new ProducerStats(""), + producerTopicStats = new ProducerTopicStats("")) try { handler.handle(producerDataList) fail("Should fail with NoBrokersForPartitionException") @@ -335,7 +345,9 @@ class AsyncProducerTest extends JUnit3Suite { encoder = null.asInstanceOf[Encoder[String]], keyEncoder = null.asInstanceOf[Encoder[String]], producerPool = producerPool, - topicPartitionInfos) + topicPartitionInfos = topicPartitionInfos, + producerStats = new ProducerStats(""), + producerTopicStats = new ProducerTopicStats("")) val producerDataList = new ArrayBuffer[KeyedMessage[String,Message]] producerDataList.append(new KeyedMessage[String,Message]("topic1", new Message("msg1".getBytes))) producerDataList.append(new KeyedMessage[String,Message]("topic2", new Message("msg2".getBytes))) @@ -373,14 +385,19 @@ class AsyncProducerTest extends JUnit3Suite { val msgs = TestUtils.getMsgStrings(10) - val handler = new DefaultEventHandler[String,String]( config, - partitioner = null.asInstanceOf[Partitioner[String]], - encoder = new StringEncoder, - keyEncoder = new StringEncoder, - producerPool = producerPool, - topicPartitionInfos) + val handler = new DefaultEventHandler[String,String](config, + partitioner = null.asInstanceOf[Partitioner[String]], + encoder = new StringEncoder, + keyEncoder = new StringEncoder, + producerPool = producerPool, + topicPartitionInfos = topicPartitionInfos, + producerStats = new ProducerStats(""), + producerTopicStats = new ProducerTopicStats("")) - val producer = new Producer[String, String](config, handler) + val producer = new Producer[String, String](config, + handler, + new ProducerStats(""), + new ProducerTopicStats("")) try { // send all 10 messages, should create 2 batches and 2 syncproducer calls producer.send(msgs.map(m => new KeyedMessage[String,String](topic, m)): _*) @@ -435,7 +452,9 @@ class AsyncProducerTest extends JUnit3Suite { encoder = new StringEncoder(), keyEncoder = new NullEncoder[Int](), producerPool = producerPool, - topicPartitionInfos) + topicPartitionInfos = topicPartitionInfos, + producerStats = new ProducerStats(""), + producerTopicStats = new ProducerTopicStats("")) val data = msgs.map(m => new KeyedMessage[Int,String](topic1, 0, m)) ++ msgs.map(m => new KeyedMessage[Int,String](topic1, 1, m)) handler.handle(data) handler.close() diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 0b86777550a..48842eb1d5b 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -65,8 +65,9 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ props.put("host", "localhost") props.put("port", port1.toString) - consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024) - consumer2 = new SimpleConsumer("localhost", port2, 100, 64*1024) + consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024, "") + consumer2 = new SimpleConsumer("localhost", port2, 100, 64*1024, "") + // temporarily set request handler logger to a higher level requestHandlerLogger.setLevel(Level.FATAL) diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index ad2158c6a97..a3afa2dd14f 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -23,8 +23,6 @@ import kafka.utils.TestUtils._ import kafka.utils.IntEncoder import kafka.utils.{Utils, TestUtils} import kafka.zk.ZooKeeperTestHarness -import kafka.serializer._ -import kafka.message.Message import kafka.producer.{ProducerConfig, KeyedMessage, Producer} class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index fa6a64eec3e..7afbe5406da 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -20,7 +20,7 @@ import java.io.File import kafka.consumer.SimpleConsumer import org.junit.Test import junit.framework.Assert._ -import kafka.message.{Message, ByteBufferMessageSet} +import kafka.message.ByteBufferMessageSet import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import kafka.producer._ @@ -66,10 +66,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { server.startup() producer = new Producer[Int, String](new ProducerConfig(producerConfig)) - val consumer = new SimpleConsumer(host, - port, - 1000000, - 64*1024) + val consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "") waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) diff --git a/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala b/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala new file mode 100644 index 00000000000..6b9315edee3 --- /dev/null +++ b/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +import junit.framework.Assert._ +import collection.mutable.ArrayBuffer +import kafka.common.InvalidClientIdException +import org.junit.Test + +class ClientIdTest { + + @Test + def testInvalidClientIds() { + val invalidclientIds = new ArrayBuffer[String]() + invalidclientIds += (".", "..") + var longName = "ATCG" + for (i <- 1 to 6) + longName += longName + invalidclientIds += longName + val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', '.', ' ', '\t', '\r', '\n', '=') + for (weirdChar <- badChars) { + invalidclientIds += "Is" + weirdChar + "funny" + } + + for (i <- 0 until invalidclientIds.size) { + try { + ClientId.validate(invalidclientIds(i)) + fail("Should throw InvalidClientIdException.") + } + catch { + case e: InvalidClientIdException => "This is good." + } + } + + val validClientIds = new ArrayBuffer[String]() + validClientIds += ("valid", "CLIENT", "iDs", "ar6", "VaL1d", "_0-9_", "") + for (i <- 0 until validClientIds.size) { + try { + ClientId.validate(validClientIds(i)) + } + catch { + case e: Exception => fail("Should not throw exception.") + } + } + } +} \ No newline at end of file diff --git a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java index b87d50c4644..c79192c5c19 100644 --- a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java +++ b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java @@ -59,7 +59,8 @@ public class SimpleConsumerDemo { SimpleConsumer simpleConsumer = new SimpleConsumer(KafkaProperties.kafkaServerURL, KafkaProperties.kafkaServerPort, KafkaProperties.connectionTimeOut, - KafkaProperties.kafkaProducerBufferSize); + KafkaProperties.kafkaProducerBufferSize, + KafkaProperties.clientId); System.out.println("Testing single fetch"); FetchRequest req = new FetchRequestBuilder() diff --git a/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala index ed1e0e89471..9c9eeada1bf 100644 --- a/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala @@ -42,7 +42,7 @@ object SimpleConsumerPerformance { println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") } - val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize) + val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize, config.clientId) // reset to latest or smallest offset val topicAndPartition = TopicAndPartition(config.topic, config.partition)