mirror of https://github.com/apache/kafka.git
KAFKA-1086 Improve GetOffsetShell to find metadata automatically; reviwed by Jun Rao and Joel Koshy
This commit is contained in:
parent
1c36605687
commit
cd3b796993
|
@ -23,25 +23,27 @@ import joptsimple._
|
||||||
import java.net.URI
|
import java.net.URI
|
||||||
import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
|
import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
|
||||||
import kafka.common.TopicAndPartition
|
import kafka.common.TopicAndPartition
|
||||||
|
import kafka.client.ClientUtils
|
||||||
|
import kafka.utils.CommandLineUtils
|
||||||
|
|
||||||
|
|
||||||
object GetOffsetShell {
|
object GetOffsetShell {
|
||||||
|
|
||||||
def main(args: Array[String]): Unit = {
|
def main(args: Array[String]): Unit = {
|
||||||
val parser = new OptionParser
|
val parser = new OptionParser
|
||||||
val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.")
|
val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.")
|
||||||
.withRequiredArg
|
.withRequiredArg
|
||||||
.describedAs("kafka://hostname:port")
|
.describedAs("hostname:port,...,hostname:port")
|
||||||
.ofType(classOf[String])
|
.ofType(classOf[String])
|
||||||
val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.")
|
val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.")
|
||||||
.withRequiredArg
|
.withRequiredArg
|
||||||
.describedAs("topic")
|
.describedAs("topic")
|
||||||
.ofType(classOf[String])
|
.ofType(classOf[String])
|
||||||
val partitionOpt = parser.accepts("partition", "partition id")
|
val partitionOpt = parser.accepts("partitions", "comma separated list of partition ids. If not specified, it will find offsets for all partitions")
|
||||||
.withRequiredArg
|
.withRequiredArg
|
||||||
.describedAs("partition id")
|
.describedAs("partition ids")
|
||||||
.ofType(classOf[java.lang.Integer])
|
.ofType(classOf[String])
|
||||||
.defaultsTo(0)
|
.defaultsTo("")
|
||||||
val timeOpt = parser.accepts("time", "timestamp of the offsets before that")
|
val timeOpt = parser.accepts("time", "timestamp of the offsets before that")
|
||||||
.withRequiredArg
|
.withRequiredArg
|
||||||
.describedAs("timestamp/-1(latest)/-2(earliest)")
|
.describedAs("timestamp/-1(latest)/-2(earliest)")
|
||||||
|
@ -51,28 +53,52 @@ object GetOffsetShell {
|
||||||
.describedAs("count")
|
.describedAs("count")
|
||||||
.ofType(classOf[java.lang.Integer])
|
.ofType(classOf[java.lang.Integer])
|
||||||
.defaultsTo(1)
|
.defaultsTo(1)
|
||||||
|
val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
|
||||||
|
.withRequiredArg
|
||||||
|
.describedAs("ms")
|
||||||
|
.ofType(classOf[java.lang.Integer])
|
||||||
|
.defaultsTo(1000)
|
||||||
|
|
||||||
val options = parser.parse(args : _*)
|
val options = parser.parse(args : _*)
|
||||||
|
|
||||||
for(arg <- List(urlOpt, topicOpt, timeOpt)) {
|
CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, timeOpt)
|
||||||
if(!options.has(arg)) {
|
|
||||||
System.err.println("Missing required argument \"" + arg + "\"")
|
|
||||||
parser.printHelpOn(System.err)
|
|
||||||
System.exit(1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
val url = new URI(options.valueOf(urlOpt))
|
val clientId = "GetOffsetShell"
|
||||||
|
val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
|
||||||
val topic = options.valueOf(topicOpt)
|
val topic = options.valueOf(topicOpt)
|
||||||
val partition = options.valueOf(partitionOpt).intValue
|
var partitionList = options.valueOf(partitionOpt)
|
||||||
var time = options.valueOf(timeOpt).longValue
|
var time = options.valueOf(timeOpt).longValue
|
||||||
val nOffsets = options.valueOf(nOffsetsOpt).intValue
|
val nOffsets = options.valueOf(nOffsetsOpt).intValue
|
||||||
val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 100000, "GetOffsetShell")
|
val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()
|
||||||
val topicAndPartition = TopicAndPartition(topic, partition)
|
|
||||||
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
|
val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
|
||||||
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
|
if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
|
||||||
println("get " + offsets.length + " results")
|
System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) +
|
||||||
for (offset <- offsets)
|
"kafka-list-topic.sh to verify")
|
||||||
println(offset)
|
System.exit(1)
|
||||||
|
}
|
||||||
|
val partitions =
|
||||||
|
if(partitionList == "") {
|
||||||
|
topicsMetadata.head.partitionsMetadata.map(_.partitionId)
|
||||||
|
} else {
|
||||||
|
partitionList.split(",").map(_.toInt).toSeq
|
||||||
|
}
|
||||||
|
partitions.foreach { partitionId =>
|
||||||
|
val partitionMetadataOpt = topicsMetadata.head.partitionsMetadata.find(_.partitionId == partitionId)
|
||||||
|
partitionMetadataOpt match {
|
||||||
|
case Some(metadata) =>
|
||||||
|
metadata.leader match {
|
||||||
|
case Some(leader) =>
|
||||||
|
val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId)
|
||||||
|
val topicAndPartition = TopicAndPartition(topic, partitionId)
|
||||||
|
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
|
||||||
|
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
|
||||||
|
|
||||||
|
println("%s:%d:%s".format(topic, partitionId, offsets.mkString(",")))
|
||||||
|
case None => System.err.println("Error: partition %d does not have a leader. Skip getting offsets".format(partitionId))
|
||||||
|
}
|
||||||
|
case None => System.err.println("Error: partition %d does not exist".format(partitionId))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -90,7 +90,7 @@ def main():
|
||||||
|
|
||||||
comment="Created reviewboard "
|
comment="Created reviewboard "
|
||||||
if not opt.reviewboard:
|
if not opt.reviewboard:
|
||||||
print 'Created a new reviewboard ',rb_url
|
print 'Created a new reviewboard ',rb_url,' against branch: ',opt.branch
|
||||||
else:
|
else:
|
||||||
print 'Updated reviewboard',opt.reviewboard
|
print 'Updated reviewboard',opt.reviewboard
|
||||||
comment="Updated reviewboard "
|
comment="Updated reviewboard "
|
||||||
|
|
Loading…
Reference in New Issue