MINOR: Add maybeThrow method to ZooKeeperClient AsyncResponse

* Add maybeThrow method to AsyncResponse
* Update KafkaZkClient to use newly introduced maybeThrow
* Change AsyncResponse from trait to abstract class for
more readable stacktraces (there's no benefit in using a
trait here)

Author: Manikumar Reddy <manikumar.reddy@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #4266 from omkreddy/KAFKAZKCLEINT_EXCEPTION_CLEANUP
This commit is contained in:
Manikumar Reddy 2017-12-06 16:25:25 +02:00 committed by Ismael Juma
parent 078fd21365
commit fd8f182cc4
3 changed files with 30 additions and 26 deletions

View File

@ -53,7 +53,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
def createSequentialPersistentPath(path: String, data: String = ""): String = {
val createRequest = CreateRequest(path, data.getBytes("UTF-8"), acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
val createResponse = retryRequestUntilConnected(createRequest)
createResponse.resultException.foreach(e => throw e)
createResponse.maybeThrow
createResponse.path
}
@ -229,7 +229,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
val setDataResponse = set(configData)
setDataResponse.resultCode match {
case Code.NONODE => create(configData)
case _ => setDataResponse.resultException.foreach(e => throw e)
case _ => setDataResponse.maybeThrow
}
}
@ -251,9 +251,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
val path = ConfigEntityChangeNotificationSequenceZNode.createPath
val createRequest = CreateRequest(path, ConfigEntityChangeNotificationSequenceZNode.encode(sanitizedEntityPath), acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
val createResponse = retryRequestUntilConnected(createRequest)
if (createResponse.resultCode != Code.OK) {
createResponse.resultException.foreach(e => throw e)
}
createResponse.maybeThrow
}
/**
@ -324,9 +322,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
*/
def setTopicAssignment(topic: String, assignment: Map[TopicPartition, Seq[Int]]) = {
val setDataResponse = setTopicAssignmentRaw(topic, assignment)
if (setDataResponse.resultCode != Code.OK) {
setDataResponse.resultException.foreach(e => throw e)
}
setDataResponse.maybeThrow
}
/**
@ -379,7 +375,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
if (getChildrenResponse.resultCode == Code.OK) {
deleteLogDirEventNotifications(getChildrenResponse.children)
} else if (getChildrenResponse.resultCode != Code.NONODE) {
getChildrenResponse.resultException.foreach(e => throw e)
getChildrenResponse.maybeThrow
}
}
@ -644,8 +640,8 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
setDataResponse.resultCode match {
case Code.NONODE =>
val createDataResponse = create(reassignmentData)
createDataResponse.resultException.foreach(e => throw e)
case _ => setDataResponse.resultException.foreach(e => throw e)
createDataResponse.maybeThrow
case _ => setDataResponse.maybeThrow
}
}
@ -658,10 +654,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
val createRequest = CreateRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment),
acls(ReassignPartitionsZNode.path), CreateMode.PERSISTENT)
val createResponse = retryRequestUntilConnected(createRequest)
if (createResponse.resultCode != Code.OK) {
throw createResponse.resultException.get
}
createResponse.maybeThrow
}
/**
@ -766,7 +759,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
if (getChildrenResponse.resultCode == Code.OK) {
deleteIsrChangeNotifications(getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber))
} else if (getChildrenResponse.resultCode != Code.NONODE) {
getChildrenResponse.resultException.foreach(e => throw e)
getChildrenResponse.maybeThrow
}
}
@ -930,7 +923,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
val path = AclChangeNotificationSequenceZNode.createPath
val createRequest = CreateRequest(path, AclChangeNotificationSequenceZNode.encode(resourceName), acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
val createResponse = retryRequestUntilConnected(createRequest)
createResponse.resultException.foreach(e => throw e)
createResponse.maybeThrow
}
def propagateLogDirEvent(brokerId: Int) {
@ -949,7 +942,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
if (getChildrenResponse.resultCode == Code.OK) {
deleteAclChangeNotifications(getChildrenResponse.children)
} else if (getChildrenResponse.resultCode != Code.NONODE) {
getChildrenResponse.resultException.foreach(e => throw e)
getChildrenResponse.maybeThrow
}
}
@ -964,8 +957,8 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
val deleteResponses = retryRequestsUntilConnected(deleteRequests)
deleteResponses.foreach { deleteResponse =>
if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE) {
deleteResponse.resultException.foreach(e => throw e)
if (deleteResponse.resultCode != Code.NONODE) {
deleteResponse.maybeThrow
}
}
}
@ -1130,7 +1123,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
if (setDataResponse.resultCode == Code.NONODE) {
createConsumerOffset(group, topicPartition, offset)
} else {
setDataResponse.resultException.foreach(e => throw e)
setDataResponse.maybeThrow
}
}
@ -1202,13 +1195,13 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
var createResponse = retryRequestUntilConnected(createRequest)
if (throwIfPathExists && createResponse.resultCode == Code.NODEEXISTS) {
createResponse.resultException.foreach(e => throw e)
createResponse.maybeThrow
} else if (createResponse.resultCode == Code.NONODE) {
createRecursive0(parentPath(path))
createResponse = retryRequestUntilConnected(createRequest)
createResponse.resultException.foreach(e => throw e)
createResponse.maybeThrow
} else if (createResponse.resultCode != Code.NODEEXISTS)
createResponse.resultException.foreach(e => throw e)
createResponse.maybeThrow
}

View File

@ -388,7 +388,7 @@ case class GetChildrenRequest(path: String, ctx: Option[Any] = None) extends Asy
type Response = GetChildrenResponse
}
sealed trait AsyncResponse {
sealed abstract class AsyncResponse {
def resultCode: Code
def path: String
def ctx: Option[Any]
@ -396,6 +396,14 @@ sealed trait AsyncResponse {
/** Return None if the result code is OK and KeeperException otherwise. */
def resultException: Option[KeeperException] =
if (resultCode == Code.OK) None else Some(KeeperException.create(resultCode, path))
/**
* Throw KeeperException if the result code is not OK.
*/
def maybeThrow(): Unit = {
if (resultCode != Code.OK)
throw KeeperException.create(resultCode, path)
}
}
case class CreateResponse(resultCode: Code, path: String, ctx: Option[Any], name: String) extends AsyncResponse
case class DeleteResponse(resultCode: Code, path: String, ctx: Option[Any]) extends AsyncResponse

View File

@ -25,7 +25,7 @@ import javax.security.auth.login.Configuration
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.security.JaasUtils
import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.KeeperException.{Code, NoNodeException}
import org.apache.zookeeper.{CreateMode, ZooDefs}
import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue}
import org.junit.{After, Test}
@ -60,6 +60,9 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
def testDeleteNonExistentZNode(): Unit = {
val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, -1))
assertEquals("Response code should be NONODE", Code.NONODE, deleteResponse.resultCode)
intercept[NoNodeException] {
deleteResponse.maybeThrow()
}
}
@Test