mirror of https://github.com/apache/kafka.git
KAFKA-3525; getSequenceId should return 1 for first data node creation
ZkUtils.getSequenceId() method is used to generate broker id sequence numbers. During startup, each broker updates the data at /brokers/seqid zk path and returns stat.getVersion as next sequence id. stat.getVersion returns "1" for first data update. So ZkUtils.getSequenceId() should return "1" on first data update. Author: Manikumar reddy O <manikumar.reddy@gmail.com> Reviewers: Flavio Junqueira <fpj@apache.org>, Ismael Juma <ismael@juma.me.uk> Closes #1224 from omkreddy/KAFKA-3525
This commit is contained in:
parent
fe0335ea10
commit
3e89d2bc59
|
@ -356,7 +356,7 @@ object KafkaConfig {
|
||||||
val ZkSyncTimeMsDoc = "How far a ZK follower can be behind a ZK leader"
|
val ZkSyncTimeMsDoc = "How far a ZK follower can be behind a ZK leader"
|
||||||
val ZkEnableSecureAclsDoc = "Set client to use secure ACLs"
|
val ZkEnableSecureAclsDoc = "Set client to use secure ACLs"
|
||||||
/** ********* General Configuration ***********/
|
/** ********* General Configuration ***********/
|
||||||
val BrokerIdGenerationEnableDoc = s"Enable automatic broker id generation on the server? When enabled the value configured for $MaxReservedBrokerIdProp should be reviewed."
|
val BrokerIdGenerationEnableDoc = s"Enable automatic broker id generation on the server. When enabled the value configured for $MaxReservedBrokerIdProp should be reviewed."
|
||||||
val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id"
|
val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id"
|
||||||
val BrokerIdDoc = "The broker id for this server. If unset, a unique broker id will be generated." +
|
val BrokerIdDoc = "The broker id for this server. If unset, a unique broker id will be generated." +
|
||||||
"To avoid conflicts between zookeeper generated broker id's and user configured broker id's, generated broker ids" +
|
"To avoid conflicts between zookeeper generated broker id's and user configured broker id's, generated broker ids" +
|
||||||
|
|
|
@ -792,24 +792,16 @@ class ZkUtils(val zkClient: ZkClient,
|
||||||
/**
|
/**
|
||||||
* This API produces a sequence number by creating / updating given path in zookeeper
|
* This API produces a sequence number by creating / updating given path in zookeeper
|
||||||
* It uses the stat returned by the zookeeper and return the version. Every time
|
* It uses the stat returned by the zookeeper and return the version. Every time
|
||||||
* client updates the path stat.version gets incremented
|
* client updates the path stat.version gets incremented. Starting value of sequence number is 1.
|
||||||
*/
|
*/
|
||||||
def getSequenceId(path: String, acls: java.util.List[ACL] = DefaultAcls): Int = {
|
def getSequenceId(path: String, acls: java.util.List[ACL] = DefaultAcls): Int = {
|
||||||
|
def writeToZk: Int = zkClient.writeDataReturnStat(path, "", -1).getVersion
|
||||||
try {
|
try {
|
||||||
val stat = zkClient.writeDataReturnStat(path, "", -1)
|
writeToZk
|
||||||
stat.getVersion
|
|
||||||
} catch {
|
} catch {
|
||||||
case e: ZkNoNodeException => {
|
case e1: ZkNoNodeException =>
|
||||||
createParentPath(BrokerSequenceIdPath, acls)
|
makeSurePersistentPathExists(path)
|
||||||
try {
|
writeToZk
|
||||||
zkClient.createPersistent(BrokerSequenceIdPath, "", acls)
|
|
||||||
0
|
|
||||||
} catch {
|
|
||||||
case e: ZkNodeExistsException =>
|
|
||||||
val stat = zkClient.writeDataReturnStat(BrokerSequenceIdPath, "", -1)
|
|
||||||
stat.getVersion
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -187,4 +187,12 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
|
||||||
}
|
}
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testGetSequenceIdMethod() {
|
||||||
|
val path = "/test/seqid"
|
||||||
|
(1 to 10).foreach { seqid =>
|
||||||
|
assertEquals(seqid, zkUtils.getSequenceId(path))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue