diff --git a/bin/kafka-consumer-offset-checker.sh b/bin/kafka-consumer-offset-checker.sh deleted file mode 100755 index 599334511aa..00000000000 --- a/bin/kafka-consumer-offset-checker.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker "$@" diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala deleted file mode 100644 index 87147dcc70b..00000000000 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ /dev/null @@ -1,209 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - - -import joptsimple._ -import kafka.utils._ -import kafka.consumer.SimpleConsumer -import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest} -import kafka.common.{OffsetMetadataAndError, TopicAndPartition} -import org.apache.kafka.common.errors.BrokerNotAvailableException -import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} -import org.apache.kafka.common.security.JaasUtils - -import scala.collection._ -import kafka.client.ClientUtils -import kafka.network.BlockingChannel -import kafka.api.PartitionOffsetRequestInfo -import org.I0Itec.zkclient.exception.ZkNoNodeException -import org.apache.kafka.common.network.ListenerName - -@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0") -object ConsumerOffsetChecker extends Logging { - - private val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map() - private val offsetMap: mutable.Map[TopicAndPartition, Long] = mutable.Map() - private var topicPidMap: immutable.Map[String, Seq[Int]] = immutable.Map() - - private def getConsumer(zkUtils: ZkUtils, bid: Int): Option[SimpleConsumer] = { - try { - zkUtils.getBrokerInfo(bid) - .map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))) - .map(endPoint => new SimpleConsumer(endPoint.host, endPoint.port, 10000, 100000, "ConsumerOffsetChecker")) - .orElse(throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid))) - } catch { - case t: Throwable => - println("Could not parse broker info due to " + t.getCause) - None - } - } - - private def processPartition(zkUtils: ZkUtils, - group: String, topic: String, producerId: Int) { - val topicPartition = TopicAndPartition(topic, producerId) - val offsetOpt = offsetMap.get(topicPartition) - val groupDirs = new ZKGroupTopicDirs(group, topic) - val owner = zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/%s".format(producerId))._1 - zkUtils.getLeaderForPartition(topic, producerId) match { - case Some(bid) => - val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkUtils, bid)) - consumerOpt.foreach { consumer => - val topicAndPartition = TopicAndPartition(topic, producerId) - val request = - OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) - val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head - - val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString) - println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, producerId, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"), - owner match {case Some(ownerStr) => ownerStr case None => "none"})) - } - case None => - println("No broker for partition %s - %s".format(topic, producerId)) - } - } - - private def processTopic(zkUtils: ZkUtils, group: String, topic: String) { - topicPidMap.get(topic).foreach { producerIds => - producerIds.sorted.foreach { - producerId => processPartition(zkUtils, group, topic, producerId) - } - } - } - - private def printBrokerInfo() { - println("BROKER INFO") - for ((bid, consumerOpt) <- consumerMap) - consumerOpt.foreach { consumer => - println("%s -> %s:%d".format(bid, consumer.host, consumer.port)) - } - } - - def main(args: Array[String]) { - warn("WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead.") - - val parser = new OptionParser(false) - - val zkConnectOpt = parser.accepts("zookeeper", "ZooKeeper connect string."). - withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String]) - val topicsOpt = parser.accepts("topic", - "Comma-separated list of consumer topics (all topics if absent)."). - withRequiredArg().ofType(classOf[String]) - val groupOpt = parser.accepts("group", "Consumer group."). - withRequiredArg().ofType(classOf[String]) - val channelSocketTimeoutMsOpt = parser.accepts("socket.timeout.ms", "Socket timeout to use when querying for offsets."). - withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(6000) - val channelRetryBackoffMsOpt = parser.accepts("retry.backoff.ms", "Retry back-off to use for failed offset queries."). - withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(3000) - - parser.accepts("broker-info", "Print broker info") - parser.accepts("help", "Print this message.") - - if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "Check the offset of your consumers.") - - val options = parser.parse(args : _*) - - if (options.has("help")) { - parser.printHelpOn(System.out) - Exit.exit(0) - } - - CommandLineUtils.checkRequiredArgs(parser, options, groupOpt, zkConnectOpt) - - val zkConnect = options.valueOf(zkConnectOpt) - - val group = options.valueOf(groupOpt) - val groupDirs = new ZKGroupDirs(group) - - val channelSocketTimeoutMs = options.valueOf(channelSocketTimeoutMsOpt).intValue() - val channelRetryBackoffMs = options.valueOf(channelRetryBackoffMsOpt).intValue() - - val topics = if (options.has(topicsOpt)) Some(options.valueOf(topicsOpt)) else None - - var zkUtils: ZkUtils = null - var channel: BlockingChannel = null - try { - zkUtils = ZkUtils(zkConnect, - 30000, - 30000, - JaasUtils.isZkSecurityEnabled()) - - val topicList = topics match { - case Some(x) => x.split(",").view.toList - case None => zkUtils.getChildren(groupDirs.consumerGroupDir + "/owners").toList - } - - topicPidMap = immutable.Map(zkUtils.getPartitionsForTopics(topicList).toSeq:_*) - val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _)) }.toSeq - channel = ClientUtils.channelToOffsetManager(group, zkUtils, channelSocketTimeoutMs, channelRetryBackoffMs) - - debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port)) - channel.send(OffsetFetchRequest(group, topicPartitions)) - val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload()) - debug("Received offset fetch response %s.".format(offsetFetchResponse)) - - offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => - if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { - val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic) - // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool - // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) - try { - val offset = zkUtils.readData(topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong - offsetMap.put(topicAndPartition, offset) - } catch { - case z: ZkNoNodeException => - if(zkUtils.pathExists(topicDirs.consumerOffsetDir)) - offsetMap.put(topicAndPartition,-1) - else - throw z - } - } - else if (offsetAndMetadata.error == Errors.NONE) - offsetMap.put(topicAndPartition, offsetAndMetadata.offset) - else { - println("Could not fetch offset for %s due to %s.".format(topicAndPartition, offsetAndMetadata.error.exception)) - } - } - channel.disconnect() - channel = null - - println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("Group", "Topic", "Pid", "Offset", "logSize", "Lag", "Owner")) - topicList.sorted.foreach { - topic => processTopic(zkUtils, group, topic) - } - - if (options.has("broker-info")) - printBrokerInfo() - - consumerMap.values.flatten.foreach(_.close()) - } - catch { - case t: Throwable => - println("Exiting due to: %s.".format(t.getMessage)) - } - finally { - consumerMap.values.flatten.foreach(_.close()) - if (zkUtils != null) - zkUtils.close() - - if (channel != null) - channel.disconnect() - } - } -} diff --git a/docs/upgrade.html b/docs/upgrade.html index a98bdeaaeae..ce750ea072d 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -66,6 +66,7 @@ This was only intended for use on the broker, but it is no longer in use and the implementations have not been maintained. A stub implementation has been retained for binary compatibility.
kafka-consumer-offset-checker.sh has been removed. Use kafka-consumer-groups.sh to get consumer group details.GroupCoordinator will delay the initial consumer rebalance.
The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
The default value for this is 3 seconds.
- During development and testing it might be desirable to set this to 0 inorder to not delay test execution time.
+ During development and testing it might be desirable to set this to 0 in order to not delay test execution time.
org.apache.kafka.common.Cluster#partitionsForTopic, partitionsForNode and availablePartitionsForTopic methods
will return an empty list instead of null (which is considered a bad practice) in case the metadata for the required topic does not exist.
@@ -205,7 +206,7 @@
tool.def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)
def readMessage(): KeyedMessage[Array[Byte], Array[Byte]] to
def readMessage(): ProducerRecord[Array[Byte], Array[Byte]] kafka.tools to kafka.common kafka.tools to kafka.common handle(record: MessageAndMetadata[Array[Byte], Array[Byte]]) method as it was never called.