mirror of https://github.com/apache/kafka.git
KAFKA-3356: Remove ConsumerOffsetChecker
Author: Mickael Maison <mickael.maison@gmail.com> Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io> Closes #3036 from mimaison/KAFKA-3356
This commit is contained in:
parent
271f6b5aec
commit
a0f533266a
|
@ -1,17 +0,0 @@
|
|||
#!/bin/bash
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker "$@"
|
|
@ -1,209 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.tools
|
||||
|
||||
|
||||
import joptsimple._
|
||||
import kafka.utils._
|
||||
import kafka.consumer.SimpleConsumer
|
||||
import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest}
|
||||
import kafka.common.{OffsetMetadataAndError, TopicAndPartition}
|
||||
import org.apache.kafka.common.errors.BrokerNotAvailableException
|
||||
import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
|
||||
import org.apache.kafka.common.security.JaasUtils
|
||||
|
||||
import scala.collection._
|
||||
import kafka.client.ClientUtils
|
||||
import kafka.network.BlockingChannel
|
||||
import kafka.api.PartitionOffsetRequestInfo
|
||||
import org.I0Itec.zkclient.exception.ZkNoNodeException
|
||||
import org.apache.kafka.common.network.ListenerName
|
||||
|
||||
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
|
||||
object ConsumerOffsetChecker extends Logging {
|
||||
|
||||
private val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map()
|
||||
private val offsetMap: mutable.Map[TopicAndPartition, Long] = mutable.Map()
|
||||
private var topicPidMap: immutable.Map[String, Seq[Int]] = immutable.Map()
|
||||
|
||||
private def getConsumer(zkUtils: ZkUtils, bid: Int): Option[SimpleConsumer] = {
|
||||
try {
|
||||
zkUtils.getBrokerInfo(bid)
|
||||
.map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)))
|
||||
.map(endPoint => new SimpleConsumer(endPoint.host, endPoint.port, 10000, 100000, "ConsumerOffsetChecker"))
|
||||
.orElse(throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid)))
|
||||
} catch {
|
||||
case t: Throwable =>
|
||||
println("Could not parse broker info due to " + t.getCause)
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
private def processPartition(zkUtils: ZkUtils,
|
||||
group: String, topic: String, producerId: Int) {
|
||||
val topicPartition = TopicAndPartition(topic, producerId)
|
||||
val offsetOpt = offsetMap.get(topicPartition)
|
||||
val groupDirs = new ZKGroupTopicDirs(group, topic)
|
||||
val owner = zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/%s".format(producerId))._1
|
||||
zkUtils.getLeaderForPartition(topic, producerId) match {
|
||||
case Some(bid) =>
|
||||
val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkUtils, bid))
|
||||
consumerOpt.foreach { consumer =>
|
||||
val topicAndPartition = TopicAndPartition(topic, producerId)
|
||||
val request =
|
||||
OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
|
||||
val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
|
||||
|
||||
val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString)
|
||||
println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, producerId, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"),
|
||||
owner match {case Some(ownerStr) => ownerStr case None => "none"}))
|
||||
}
|
||||
case None =>
|
||||
println("No broker for partition %s - %s".format(topic, producerId))
|
||||
}
|
||||
}
|
||||
|
||||
private def processTopic(zkUtils: ZkUtils, group: String, topic: String) {
|
||||
topicPidMap.get(topic).foreach { producerIds =>
|
||||
producerIds.sorted.foreach {
|
||||
producerId => processPartition(zkUtils, group, topic, producerId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def printBrokerInfo() {
|
||||
println("BROKER INFO")
|
||||
for ((bid, consumerOpt) <- consumerMap)
|
||||
consumerOpt.foreach { consumer =>
|
||||
println("%s -> %s:%d".format(bid, consumer.host, consumer.port))
|
||||
}
|
||||
}
|
||||
|
||||
def main(args: Array[String]) {
|
||||
warn("WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead.")
|
||||
|
||||
val parser = new OptionParser(false)
|
||||
|
||||
val zkConnectOpt = parser.accepts("zookeeper", "ZooKeeper connect string.").
|
||||
withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String])
|
||||
val topicsOpt = parser.accepts("topic",
|
||||
"Comma-separated list of consumer topics (all topics if absent).").
|
||||
withRequiredArg().ofType(classOf[String])
|
||||
val groupOpt = parser.accepts("group", "Consumer group.").
|
||||
withRequiredArg().ofType(classOf[String])
|
||||
val channelSocketTimeoutMsOpt = parser.accepts("socket.timeout.ms", "Socket timeout to use when querying for offsets.").
|
||||
withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(6000)
|
||||
val channelRetryBackoffMsOpt = parser.accepts("retry.backoff.ms", "Retry back-off to use for failed offset queries.").
|
||||
withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(3000)
|
||||
|
||||
parser.accepts("broker-info", "Print broker info")
|
||||
parser.accepts("help", "Print this message.")
|
||||
|
||||
if(args.length == 0)
|
||||
CommandLineUtils.printUsageAndDie(parser, "Check the offset of your consumers.")
|
||||
|
||||
val options = parser.parse(args : _*)
|
||||
|
||||
if (options.has("help")) {
|
||||
parser.printHelpOn(System.out)
|
||||
Exit.exit(0)
|
||||
}
|
||||
|
||||
CommandLineUtils.checkRequiredArgs(parser, options, groupOpt, zkConnectOpt)
|
||||
|
||||
val zkConnect = options.valueOf(zkConnectOpt)
|
||||
|
||||
val group = options.valueOf(groupOpt)
|
||||
val groupDirs = new ZKGroupDirs(group)
|
||||
|
||||
val channelSocketTimeoutMs = options.valueOf(channelSocketTimeoutMsOpt).intValue()
|
||||
val channelRetryBackoffMs = options.valueOf(channelRetryBackoffMsOpt).intValue()
|
||||
|
||||
val topics = if (options.has(topicsOpt)) Some(options.valueOf(topicsOpt)) else None
|
||||
|
||||
var zkUtils: ZkUtils = null
|
||||
var channel: BlockingChannel = null
|
||||
try {
|
||||
zkUtils = ZkUtils(zkConnect,
|
||||
30000,
|
||||
30000,
|
||||
JaasUtils.isZkSecurityEnabled())
|
||||
|
||||
val topicList = topics match {
|
||||
case Some(x) => x.split(",").view.toList
|
||||
case None => zkUtils.getChildren(groupDirs.consumerGroupDir + "/owners").toList
|
||||
}
|
||||
|
||||
topicPidMap = immutable.Map(zkUtils.getPartitionsForTopics(topicList).toSeq:_*)
|
||||
val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _)) }.toSeq
|
||||
channel = ClientUtils.channelToOffsetManager(group, zkUtils, channelSocketTimeoutMs, channelRetryBackoffMs)
|
||||
|
||||
debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port))
|
||||
channel.send(OffsetFetchRequest(group, topicPartitions))
|
||||
val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload())
|
||||
debug("Received offset fetch response %s.".format(offsetFetchResponse))
|
||||
|
||||
offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) =>
|
||||
if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
|
||||
val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic)
|
||||
// this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool
|
||||
// (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage)
|
||||
try {
|
||||
val offset = zkUtils.readData(topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong
|
||||
offsetMap.put(topicAndPartition, offset)
|
||||
} catch {
|
||||
case z: ZkNoNodeException =>
|
||||
if(zkUtils.pathExists(topicDirs.consumerOffsetDir))
|
||||
offsetMap.put(topicAndPartition,-1)
|
||||
else
|
||||
throw z
|
||||
}
|
||||
}
|
||||
else if (offsetAndMetadata.error == Errors.NONE)
|
||||
offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
|
||||
else {
|
||||
println("Could not fetch offset for %s due to %s.".format(topicAndPartition, offsetAndMetadata.error.exception))
|
||||
}
|
||||
}
|
||||
channel.disconnect()
|
||||
channel = null
|
||||
|
||||
println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("Group", "Topic", "Pid", "Offset", "logSize", "Lag", "Owner"))
|
||||
topicList.sorted.foreach {
|
||||
topic => processTopic(zkUtils, group, topic)
|
||||
}
|
||||
|
||||
if (options.has("broker-info"))
|
||||
printBrokerInfo()
|
||||
|
||||
consumerMap.values.flatten.foreach(_.close())
|
||||
}
|
||||
catch {
|
||||
case t: Throwable =>
|
||||
println("Exiting due to: %s.".format(t.getMessage))
|
||||
}
|
||||
finally {
|
||||
consumerMap.values.flatten.foreach(_.close())
|
||||
if (zkUtils != null)
|
||||
zkUtils.close()
|
||||
|
||||
if (channel != null)
|
||||
channel.disconnect()
|
||||
}
|
||||
}
|
||||
}
|
|
@ -66,6 +66,7 @@
|
|||
This was only intended for use on the broker, but it is no longer in use and the implementations have not been maintained.
|
||||
A stub implementation has been retained for binary compatibility.</li>
|
||||
<li>The Java clients and tools now accept any string as a client-id.</li>
|
||||
<li>The deprecated tool <code>kafka-consumer-offset-checker.sh</code> has been removed. Use <code>kafka-consumer-groups.sh</code> to get consumer group details.</li>
|
||||
</ul>
|
||||
|
||||
<h5><a id="upgrade_100_new_protocols" href="#upgrade_100_new_protocols">New Protocol Versions</a></h5>
|
||||
|
@ -205,7 +206,7 @@
|
|||
tool.</li>
|
||||
<li>EoS in Kafka introduces new request APIs and modifies several existing ones. See
|
||||
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-RPCProtocolSummary">KIP-98</a>
|
||||
for the full details</code></li>
|
||||
for the full details</li>
|
||||
</ol>
|
||||
|
||||
<h5><a id="upgrade_11_message_format" href="#upgrade_11_message_format">Notes on the new message format in 0.11.0</a></h5>
|
||||
|
@ -236,7 +237,7 @@
|
|||
is already not possible in that case. In order to avoid the cost of down-conversion, you should ensure that consumer applications
|
||||
are upgraded to the latest 0.11.0 client. Significantly, since the old consumer has been deprecated in 0.11.0.0, it does not support
|
||||
the new message format. You must upgrade to use the new consumer to use the new message format without the cost of down-conversion.
|
||||
Note that 0.11.0 consumers support backwards compability with brokers 0.10.0 brokers and upward, so it is possible to upgrade the
|
||||
Note that 0.11.0 consumers support backwards compatibility with brokers 0.10.0 brokers and upward, so it is possible to upgrade the
|
||||
clients first before the brokers.
|
||||
</p>
|
||||
|
||||
|
@ -483,7 +484,6 @@ work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded to 0.9
|
|||
<code>def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)</code> </li>
|
||||
<li> MessageReader interface was changed from <code>def readMessage(): KeyedMessage[Array[Byte], Array[Byte]]</code> to
|
||||
<code>def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]</code> </li>
|
||||
</li>
|
||||
<li> MessageFormatter's package was changed from <code>kafka.tools</code> to <code>kafka.common</code> </li>
|
||||
<li> MessageReader's package was changed from <code>kafka.tools</code> to <code>kafka.common</code> </li>
|
||||
<li> MirrorMakerMessageHandler no longer exposes the <code>handle(record: MessageAndMetadata[Array[Byte], Array[Byte]])</code> method as it was never called. </li>
|
||||
|
|
Loading…
Reference in New Issue