KAFKA-776 Changing ZK format breaks some tools; reviewed by Neha Narkhede

This commit is contained in:
Swapnil Ghike 2013-02-27 19:17:43 -08:00 committed by Neha Narkhede
parent 89622c8e88
commit db65c95735
6 changed files with 54 additions and 35 deletions

View File

@ -25,6 +25,7 @@ import javax.management.remote.{JMXServiceURL, JMXConnectorFactory}
import javax.management.ObjectName
import kafka.controller.KafkaController
import scala.Some
import kafka.common.BrokerNotAvailableException
object ShutdownBroker extends Logging {
@ -35,15 +36,22 @@ object ShutdownBroker extends Logging {
var zkClient: ZkClient = null
try {
zkClient = new ZkClient(params.zkConnect, 30000, 30000, ZKStringSerializer)
val controllerBrokerId = ZkUtils.getController(zkClient)
ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + controllerBrokerId)._1 match {
case Some(controllerInfo) =>
val parsed = controllerInfo.split(":")
val controllerHost = parsed(0)
val controllerJmxPort = parsed(2)
val jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi"
.format(controllerHost, controllerJmxPort))
var controllerHost: String = null
var controllerJmxPort: Int = -1
try {
Json.parseFull(controllerInfo) match {
case Some(m) =>
val brokerInfo = m.asInstanceOf[Map[String, Any]]
controllerHost = brokerInfo.get("host").get.toString
controllerJmxPort = brokerInfo.get("jmx_port").get.asInstanceOf[Int]
case None =>
throw new BrokerNotAvailableException("Broker id %d does not exist".format(controllerBrokerId))
}
}
val jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi".format(controllerHost, controllerJmxPort))
info("Connecting to jmx url " + jmxUrl)
val jmxc = JMXConnectorFactory.connect(jmxUrl, null)
val mbsc = jmxc.getMBeanServerConnection
@ -52,21 +60,17 @@ object ShutdownBroker extends Logging {
Array(params.brokerId),
Array(classOf[Int].getName)).asInstanceOf[Int]
val shutdownComplete = (leaderPartitionsRemaining == 0)
info("Shutdown status: " + (if (shutdownComplete)
"complete" else
"incomplete (broker still leads %d partitions)".format(leaderPartitionsRemaining)))
info("Shutdown status: " +
(if (shutdownComplete) "complete" else "incomplete (broker still leads %d partitions)".format(leaderPartitionsRemaining)))
shutdownComplete
case None =>
error("Operation failed due to controller failure on %d.".format(controllerBrokerId))
false
throw new BrokerNotAvailableException("Broker id %d does not exist".format(controllerBrokerId))
}
}
catch {
} catch {
case t: Throwable =>
error("Operation failed due to %s.".format(t.getMessage), t)
error("Operation failed due to controller failure", t)
false
}
finally {
} finally {
if (zkClient != null)
zkClient.close()
}

View File

@ -35,11 +35,11 @@ private[kafka] object Broker {
Json.parseFull(brokerInfoString) match {
case Some(m) =>
val brokerInfo = m.asInstanceOf[Map[String, Any]]
val host = brokerInfo.get("host").get.toString
val host = brokerInfo.get("host").get.asInstanceOf[String]
val port = brokerInfo.get("port").get.asInstanceOf[Int]
new Broker(id, host, port)
case None =>
throw new BrokerNotAvailableException("Broker id %s does not exist".format(id))
throw new BrokerNotAvailableException("Broker id %d does not exist".format(id))
}
} catch {
case t => throw new KafkaException("Failed to parse the broker info from zookeeper: " + brokerInfoString, t)

View File

@ -194,7 +194,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
}
debug("Remaining partitions to move on broker %d: %s".format(id, partitionsRemaining.mkString(",")))
debug("Remaining partitions to move from broker %d: %s".format(id, partitionsRemaining.mkString(",")))
partitionsRemaining.size
}
}

View File

@ -20,10 +20,10 @@ package kafka.tools
import joptsimple._
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
import kafka.utils.{Json, ZkUtils, ZKStringSerializer, Logging}
import kafka.consumer.SimpleConsumer
import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
import kafka.common.TopicAndPartition
import kafka.common.{BrokerNotAvailableException, TopicAndPartition}
import scala.collection._
@ -31,19 +31,27 @@ object ConsumerOffsetChecker extends Logging {
private val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map()
private val BrokerIpPattern = """^([^:]+):(\d+).*$""".r
// e.g., 127.0.0.1:9092:9999 (with JMX port)
private def getConsumer(zkClient: ZkClient, bid: Int): Option[SimpleConsumer] = {
val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))._1
val consumer = brokerInfo match {
case Some(BrokerIpPattern(ip, port)) =>
Some(new SimpleConsumer(ip, port.toInt, 10000, 100000, "ConsumerOffsetChecker"))
case _ =>
error("Could not parse broker info %s with regex %s".format(brokerInfo, BrokerIpPattern.toString()))
try {
ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)._1 match {
case Some(brokerInfoString) =>
Json.parseFull(brokerInfoString) match {
case Some(m) =>
val brokerInfo = m.asInstanceOf[Map[String, Any]]
val host = brokerInfo.get("host").get.asInstanceOf[String]
val port = brokerInfo.get("port").get.asInstanceOf[Int]
Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerOffsetChecker"))
case None =>
throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid))
}
case None =>
throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid))
}
} catch {
case t: Throwable =>
error("Could not parse broker info", t)
None
}
consumer
}
private def processPartition(zkClient: ZkClient,

View File

@ -100,9 +100,13 @@ object ExportZkOffsets extends Logging {
for (bidPid <- bidPidList) {
val zkGrpTpDir = new ZKGroupTopicDirs(consumerGrp,topic)
val offsetPath = zkGrpTpDir.consumerOffsetDir + "/" + bidPid
val offsetVal = ZkUtils.readDataMaybeNull(zkClient, offsetPath)._1
fileWriter.write(offsetPath + ":" + offsetVal + "\n")
debug(offsetPath + " => " + offsetVal)
ZkUtils.readDataMaybeNull(zkClient, offsetPath)._1 match {
case Some(offsetVal) =>
fileWriter.write(offsetPath + ":" + offsetVal + "\n")
debug(offsetPath + " => " + offsetVal)
case None =>
error("Could not retrieve offset value from " + offsetPath)
}
}
}
}

View File

@ -104,7 +104,10 @@ object VerifyConsumerRebalance extends Logging {
}
// try reading the partition owner path for see if a valid consumer id exists there
val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
val partitionOwner = ZkUtils.readDataMaybeNull(zkClient, partitionOwnerPath)._1
val partitionOwner = ZkUtils.readDataMaybeNull(zkClient, partitionOwnerPath)._1 match {
case Some(m) => m
case None => null
}
if(partitionOwner == null) {
error("No owner for topic %s partition %s".format(topic, partition))
rebalanceSucceeded = false