KAFKA-150 Make the error message for duplicate node ids in zk more self-explanatory.

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1188255 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Edward Jay Kreps 2011-10-24 17:47:20 +00:00
parent 46b6144a8f
commit 76957e5e3a
2 changed files with 11 additions and 3 deletions

View File

@ -82,8 +82,7 @@ class KafkaServer(val config: KafkaConfig) {
}
catch {
case e =>
logger.fatal(e)
logger.fatal(Utils.stackTrace(e))
logger.fatal("Fatal error during startup.", e)
shutdown
}
}

View File

@ -21,6 +21,7 @@ import kafka.utils._
import org.apache.log4j.Logger
import kafka.cluster.Broker
import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.apache.zookeeper.Watcher.Event.KeeperState
import kafka.log.LogManager
import java.net.InetAddress
@ -52,7 +53,15 @@ class KafkaZooKeeper(config: KafkaConfig, logManager: LogManager) {
val hostName = if (config.hostName == null) InetAddress.getLocalHost.getHostAddress else config.hostName
val creatorId = hostName + "-" + System.currentTimeMillis
val broker = new Broker(config.brokerId, creatorId, hostName, config.port)
ZkUtils.createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZKString)
try {
ZkUtils.createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZKString)
} catch {
case e: ZkNodeExistsException =>
throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " +
"indicates that you either have configured a brokerid that is already in use, or " +
"else you have shutdown this broker and restarted it faster than the zookeeper " +
"timeout so it appears to be re-registering.")
}
logger.info("Registering broker " + brokerIdPath + " succeeded with " + broker)
}