KAFKA-1737; Enforce ZKSerializer while creating ZkClient; reviewed by Guozhang Wang

This commit is contained in:
Vivek Madani 2015-05-24 12:02:32 -07:00 committed by Guozhang Wang
parent 467736c7ad
commit 43b92f8b1c
21 changed files with 50 additions and 56 deletions

View File

@ -48,7 +48,7 @@ object ConsumerGroupCommand {
opts.checkArgs()
val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer)
val zkClient = ZkUtils.createZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000)
try {
if (opts.options.has(opts.listOpt))

View File

@ -53,7 +53,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
var zkClient: ZkClient = null
try {
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
val partitionsForPreferredReplicaElection =
if (!options.has(jsonFileOpt))
ZkUtils.getAllPartitions(zkClient)

View File

@ -38,7 +38,7 @@ object ReassignPartitionsCommand extends Logging {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)
val zkConnect = opts.options.valueOf(opts.zkConnectOpt)
var zkClient: ZkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
var zkClient: ZkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
try {
if(opts.options.has(opts.verifyOpt))
verifyAssignment(zkClient, opts)

View File

@ -47,7 +47,7 @@ object TopicCommand {
opts.checkArgs()
val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer)
val zkClient = ZkUtils.createZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000)
try {
if(opts.options.has(opts.createOpt))

View File

@ -178,7 +178,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private def connectZk() {
info("Connecting to zookeeper instance at " + config.zkConnect)
zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
}
// Blocks until the offset manager is located and a channel is established to it.

View File

@ -18,7 +18,7 @@
package kafka.consumer
import scala.collection.JavaConversions._
import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
import kafka.utils.{ZkUtils, Logging}
import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
import org.apache.zookeeper.Watcher.Event.KeeperState

View File

@ -196,13 +196,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
if (chroot.length > 1) {
val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/"))
val zkClientForChrootCreation = new ZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
val zkClientForChrootCreation = ZkUtils.createZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, chroot)
info("Created zookeeper path " + chroot)
zkClientForChrootCreation.close()
}
val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
ZkUtils.setupCommonPaths(zkClient)
zkClient
}

View File

@ -209,7 +209,7 @@ object ConsoleConsumer extends Logging {
def checkZkPathExists(zkUrl: String, path: String): Boolean = {
try {
val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer)
val zk = ZkUtils.createZkClient(zkUrl, 30*1000,30*1000);
zk.exists(path)
} catch {
case _: Throwable => false

View File

@ -149,7 +149,7 @@ object ConsumerOffsetChecker extends Logging {
var zkClient: ZkClient = null
var channel: BlockingChannel = null
try {
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
val topicList = topics match {
case Some(x) => x.split(",").view.toList

View File

@ -19,7 +19,7 @@ package kafka.tools
import java.io.FileWriter
import joptsimple._
import kafka.utils.{Logging, ZkUtils, ZKStringSerializer, ZKGroupTopicDirs, CommandLineUtils}
import kafka.utils.{Logging, ZkUtils, ZKGroupTopicDirs, CommandLineUtils}
import org.I0Itec.zkclient.ZkClient
@ -76,7 +76,7 @@ object ExportZkOffsets extends Logging {
val fileWriter : FileWriter = new FileWriter(outfile)
try {
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
var consumerGroups: Seq[String] = null

View File

@ -20,7 +20,7 @@ package kafka.tools
import java.io.BufferedReader
import java.io.FileReader
import joptsimple._
import kafka.utils.{Logging, ZkUtils,ZKStringSerializer, CommandLineUtils}
import kafka.utils.{Logging, ZkUtils, CommandLineUtils}
import org.I0Itec.zkclient.ZkClient
@ -68,7 +68,7 @@ object ImportZkOffsets extends Logging {
val zkConnect = options.valueOf(zkConnectOpt)
val partitionOffsetFile = options.valueOf(inFileOpt)
val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
val zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
val partitionOffsets: Map[String,String] = getPartitionOffsetsFromFile(partitionOffsetFile)
updateZkOffsets(zkClient, partitionOffsets)

View File

@ -21,7 +21,7 @@ import org.I0Itec.zkclient.ZkClient
import kafka.consumer.{SimpleConsumer, ConsumerConfig}
import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
import kafka.common.{TopicAndPartition, KafkaException}
import kafka.utils.{ZKGroupTopicDirs, ZkUtils, ZKStringSerializer, CoreUtils}
import kafka.utils.{ZKGroupTopicDirs, ZkUtils, CoreUtils}
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.utils.Utils
@ -36,8 +36,8 @@ object UpdateOffsetsInZK {
if(args.length < 3)
usage
val config = new ConsumerConfig(Utils.loadProps(args(1)))
val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs,
config.zkConnectionTimeoutMs, ZKStringSerializer)
val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs,
config.zkConnectionTimeoutMs)
args(0) match {
case Earliest => getAndSetOffsets(zkClient, OffsetRequest.EarliestTime, config, args(2))
case Latest => getAndSetOffsets(zkClient, OffsetRequest.LatestTime, config, args(2))

View File

@ -19,7 +19,7 @@ package kafka.tools
import joptsimple.OptionParser
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils, ZKStringSerializer, CommandLineUtils}
import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils, CommandLineUtils}
object VerifyConsumerRebalance extends Logging {
def main(args: Array[String]) {
@ -48,7 +48,7 @@ object VerifyConsumerRebalance extends Logging {
var zkClient: ZkClient = null
try {
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
debug("zkConnect = %s; group = %s".format(zkConnect, group))

View File

@ -460,7 +460,7 @@ object ZkUtils extends Logging {
def maybeDeletePath(zkUrl: String, dir: String) {
try {
val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
val zk = createZkClient(zkUrl, 30*1000, 30*1000)
zk.deleteRecursive(dir)
zk.close()
} catch {
@ -781,9 +781,14 @@ object ZkUtils extends Logging {
}
}
}
def createZkClient(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int): ZkClient = {
val zkClient = new ZkClient(zkUrl, sessionTimeout, connectionTimeout, ZKStringSerializer)
zkClient
}
}
object ZKStringSerializer extends ZkSerializer {
private object ZKStringSerializer extends ZkSerializer {
@throws(classOf[ZkMarshallingError])
def serialize(data : Object) : Array[Byte] = data.asInstanceOf[String].getBytes("UTF-8")

View File

@ -18,7 +18,7 @@
package kafka
import consumer.ConsumerConfig
import utils.{ZKStringSerializer, ZkUtils}
import utils.ZkUtils
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.utils.Utils
@ -32,8 +32,7 @@ object DeleteZKPath {
val config = new ConsumerConfig(Utils.loadProps(args(0)))
val zkPath = args(1)
val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
ZKStringSerializer)
val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
try {
ZkUtils.deletePathRecursive(zkClient, zkPath);

View File

@ -2,7 +2,7 @@ package other.kafka
import org.I0Itec.zkclient.ZkClient
import kafka.api._
import kafka.utils.{ShutdownableThread, ZKStringSerializer}
import kafka.utils.{ZkUtils, ShutdownableThread}
import org.apache.kafka.common.protocol.SecurityProtocol
import scala.collection._
import kafka.client.ClientUtils
@ -238,7 +238,7 @@ object TestOffsetManager {
var fetchThread: FetchThread = null
var statsThread: StatsThread = null
try {
zkClient = new ZkClient(zookeeper, 6000, 2000, ZKStringSerializer)
zkClient = ZkUtils.createZkClient(zookeeper, 6000, 2000)
commitThreads = (0 to (threadCount-1)).map { threadId =>
new CommitThread(threadId, partitionCount, commitIntervalMs, zkClient)
}

View File

@ -316,7 +316,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
}
def testLeaderSelectionForPartition() {
val zkClient = new ZkClient(zkConnect, 6000, 30000, ZKStringSerializer)
val zkClient = ZkUtils.createZkClient(zkConnect, 6000, 30000)
// create topic topic1 with 1 partition on broker 0
createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)

View File

@ -448,7 +448,7 @@ object TestUtils extends Logging {
}
def updateConsumerOffset(config : ConsumerConfig, path : String, offset : Long) = {
val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
ZkUtils.updatePersistentPath(zkClient, path, offset.toString)
}

View File

@ -19,7 +19,7 @@ package kafka.zk
import kafka.consumer.ConsumerConfig
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{ZkUtils, ZKStringSerializer}
import kafka.utils.ZkUtils
import kafka.utils.TestUtils
import org.junit.Assert
import org.scalatest.junit.JUnit3Suite
@ -29,8 +29,7 @@ class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness {
def testEphemeralNodeCleanup = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
ZKStringSerializer)
var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
try {
ZkUtils.createEphemeralPathExpectConflict(zkClient, "/tmp/zktest", "node created")
@ -42,8 +41,7 @@ class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness {
testData = ZkUtils.readData(zkClient, "/tmp/zktest")._1
Assert.assertNotNull(testData)
zkClient.close
zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
ZKStringSerializer)
zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
val nodeExists = ZkUtils.pathExists(zkClient, "/tmp/zktest")
Assert.assertFalse(nodeExists)
}

View File

@ -19,7 +19,7 @@ package unit.kafka.zk
import junit.framework.Assert
import kafka.consumer.ConsumerConfig
import kafka.utils.{ZkPath, TestUtils, ZKStringSerializer, ZkUtils}
import kafka.utils.{ZkPath, TestUtils, ZkUtils}
import kafka.zk.ZooKeeperTestHarness
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.config.ConfigException
@ -34,9 +34,8 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
def testCreatePersistentPathThrowsException {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1"))
var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
config.zkConnectionTimeoutMs,
ZKStringSerializer)
var zkClient = ZkUtils.createZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
config.zkConnectionTimeoutMs)
try {
ZkPath.resetNamespaceCheckedState
ZkUtils.createPersistentPath(zkClient, path)
@ -49,8 +48,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
def testCreatePersistentPath {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
ZKStringSerializer)
var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
try {
ZkPath.resetNamespaceCheckedState
ZkUtils.createPersistentPath(zkClient, path)
@ -64,9 +62,8 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
def testMakeSurePersistsPathExistsThrowsException {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1"))
var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
config.zkConnectionTimeoutMs,
ZKStringSerializer)
var zkClient = ZkUtils.createZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
config.zkConnectionTimeoutMs)
try {
ZkPath.resetNamespaceCheckedState
ZkUtils.makeSurePersistentPathExists(zkClient, path)
@ -79,8 +76,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
def testMakeSurePersistsPathExists {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
ZKStringSerializer)
var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
try {
ZkPath.resetNamespaceCheckedState
ZkUtils.makeSurePersistentPathExists(zkClient, path)
@ -94,9 +90,8 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
def testCreateEphemeralPathThrowsException {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1"))
var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
config.zkConnectionTimeoutMs,
ZKStringSerializer)
var zkClient = ZkUtils.createZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
config.zkConnectionTimeoutMs)
try {
ZkPath.resetNamespaceCheckedState
ZkUtils.createEphemeralPathExpectConflict(zkClient, path, "somedata")
@ -109,8 +104,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
def testCreateEphemeralPathExists {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
ZKStringSerializer)
var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
try {
ZkPath.resetNamespaceCheckedState
ZkUtils.createEphemeralPathExpectConflict(zkClient, path, "somedata")
@ -124,9 +118,8 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
def testCreatePersistentSequentialThrowsException {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1"))
var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
config.zkConnectionTimeoutMs,
ZKStringSerializer)
var zkClient = ZkUtils.createZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
config.zkConnectionTimeoutMs)
try {
ZkPath.resetNamespaceCheckedState
ZkUtils.createSequentialPersistentPath(zkClient, path)
@ -139,8 +132,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
def testCreatePersistentSequentialExists {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
ZKStringSerializer)
var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
var actualPath: String = ""
try {

View File

@ -19,7 +19,7 @@ package kafka.zk
import org.scalatest.junit.JUnit3Suite
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{ZKStringSerializer, CoreUtils}
import kafka.utils.{ZkUtils, CoreUtils}
trait ZooKeeperTestHarness extends JUnit3Suite {
var zkPort: Int = -1
@ -34,7 +34,7 @@ trait ZooKeeperTestHarness extends JUnit3Suite {
super.setUp
zookeeper = new EmbeddedZookeeper()
zkPort = zookeeper.port
zkClient = new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout)
}
override def tearDown() {