KAFKA-7007: Use JSON for /kafka-acl-extended-changes path (#5161)

Keep Literal ACLs on the old paths, using the old formats, to maintain backwards compatibility.
Have Prefixed, and any latter types, go on new paths, using JSON, (old brokers are not aware of them)
Add checks to reject any adminClient requests to add prefixed acls before the cluster is fully upgraded.

Colin Patrick McCabe <colin@cmccabe.xyz>, Jun Rao <junrao@gmail.com>
This commit is contained in:
Andy Coates 2018-06-12 23:07:10 +01:00 committed by Jun Rao
parent 16190e9bfd
commit a592402512
13 changed files with 555 additions and 136 deletions

View File

@ -62,6 +62,12 @@ public enum ResourceNameType {
.collect(Collectors.toMap(ResourceNameType::code, Function.identity())) .collect(Collectors.toMap(ResourceNameType::code, Function.identity()))
); );
private final static Map<String, ResourceNameType> NAME_TO_VALUE =
Collections.unmodifiableMap(
Arrays.stream(ResourceNameType.values())
.collect(Collectors.toMap(ResourceNameType::name, Function.identity()))
);
private final byte code; private final byte code;
ResourceNameType(byte code) { ResourceNameType(byte code) {
@ -88,4 +94,11 @@ public enum ResourceNameType {
public static ResourceNameType fromCode(byte code) { public static ResourceNameType fromCode(byte code) {
return CODE_TO_VALUE.getOrDefault(code, UNKNOWN); return CODE_TO_VALUE.getOrDefault(code, UNKNOWN);
} }
/**
* Return the ResourceNameType with the provided name or {@link #UNKNOWN} if one cannot be found.
*/
public static ResourceNameType fromString(String name) {
return NAME_TO_VALUE.getOrDefault(name, UNKNOWN);
}
} }

View File

@ -24,6 +24,8 @@ import kafka.zk.{KafkaZkClient, StateChangeHandlers}
import kafka.zookeeper.{StateChangeHandler, ZNodeChildChangeHandler} import kafka.zookeeper.{StateChangeHandler, ZNodeChildChangeHandler}
import org.apache.kafka.common.utils.Time import org.apache.kafka.common.utils.Time
import scala.util.{Failure, Try}
/** /**
* Handle the notificationMessage. * Handle the notificationMessage.
*/ */
@ -83,12 +85,7 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
for (notification <- notifications) { for (notification <- notifications) {
val changeId = changeNumber(notification) val changeId = changeNumber(notification)
if (changeId > lastExecutedChange) { if (changeId > lastExecutedChange) {
val changeZnode = seqNodeRoot + "/" + notification processNotification(notification)
val (data, _) = zkClient.getDataAndStat(changeZnode)
data match {
case Some(d) => notificationHandler.processNotification(d)
case None => warn(s"read null data from $changeZnode when processing notification $notification")
}
lastExecutedChange = changeId lastExecutedChange = changeId
} }
} }
@ -100,6 +97,18 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
} }
} }
private def processNotification(notification: String): Unit = {
val changeZnode = seqNodeRoot + "/" + notification
val (data, _) = zkClient.getDataAndStat(changeZnode)
data match {
case Some(d) => Try(notificationHandler.processNotification(d)) match {
case Failure(e) => error(s"error processing change notification from $changeZnode", e)
case _ =>
}
case None => warn(s"read null data from $changeZnode")
}
}
private def addChangeNotification(): Unit = { private def addChangeNotification(): Unit = {
if (!isClosed.get && queue.peek() == null) if (!isClosed.get && queue.peek() == null)
queue.put(new ChangeNotification) queue.put(new ChangeNotification)

View File

@ -16,6 +16,7 @@
*/ */
package kafka.security.auth package kafka.security.auth
import kafka.common.KafkaException
import org.apache.kafka.common.resource.{ResourceNameType, ResourcePattern} import org.apache.kafka.common.resource.{ResourceNameType, ResourcePattern}
object Resource { object Resource {
@ -26,16 +27,18 @@ object Resource {
val WildCardResource = "*" val WildCardResource = "*"
def fromString(str: String): Resource = { def fromString(str: String): Resource = {
ResourceNameType.values.find(nameType => str.startsWith(nameType.name)) match { ResourceType.values.find(resourceType => str.startsWith(resourceType.name + Separator)) match {
case None => throw new KafkaException("Invalid resource string: '" + str + "'")
case Some(resourceType) =>
val remaining = str.substring(resourceType.name.length + 1)
ResourceNameType.values.find(nameType => remaining.startsWith(nameType.name + Separator)) match {
case Some(nameType) => case Some(nameType) =>
str.split(Separator, 3) match { val name = remaining.substring(nameType.name.length + 1)
case Array(_, resourceType, name, _*) => new Resource(ResourceType.fromString(resourceType), name, nameType) Resource(resourceType, name, nameType)
case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str)
} case None =>
case _ => Resource(resourceType, remaining, ResourceNameType.LITERAL)
str.split(Separator, 2) match {
case Array(resourceType, name, _*) => new Resource(ResourceType.fromString(resourceType), name, ResourceNameType.LITERAL)
case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str)
} }
} }
} }
@ -74,7 +77,7 @@ case class Resource(resourceType: ResourceType, name: String, nameType: Resource
} }
override def toString: String = { override def toString: String = {
nameType + Resource.Separator + resourceType.name + Resource.Separator + name resourceType.name + Resource.Separator + nameType + Resource.Separator + name
} }
} }

View File

@ -20,13 +20,14 @@ import java.util
import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.concurrent.locks.ReentrantReadWriteLock
import com.typesafe.scalalogging.Logger import com.typesafe.scalalogging.Logger
import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener} import kafka.api.KAFKA_2_0_IV1
import kafka.network.RequestChannel.Session import kafka.network.RequestChannel.Session
import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._ import kafka.utils._
import kafka.zk.{AclChangeNotificationSequenceZNode, KafkaZkClient, ZkAclStore} import kafka.zk.{AclChangeNotificationHandler, AclChangeSubscription, KafkaZkClient, ZkAclChangeStore, ZkAclStore}
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.resource.ResourceNameType import org.apache.kafka.common.resource.ResourceNameType
import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{SecurityUtils, Time} import org.apache.kafka.common.utils.{SecurityUtils, Time}
@ -55,7 +56,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
private var superUsers = Set.empty[KafkaPrincipal] private var superUsers = Set.empty[KafkaPrincipal]
private var shouldAllowEveryoneIfNoAclIsFound = false private var shouldAllowEveryoneIfNoAclIsFound = false
private var zkClient: KafkaZkClient = _ private var zkClient: KafkaZkClient = _
private var aclChangeListeners: Seq[ZkNodeChangeNotificationListener] = List() private var aclChangeListeners: Iterable[AclChangeSubscription] = Iterable.empty
private var extendedAclSupport: Boolean = _
@volatile @volatile
private var aclCache = new scala.collection.immutable.TreeMap[Resource, VersionedAcls]()(ResourceOrdering) private var aclCache = new scala.collection.immutable.TreeMap[Resource, VersionedAcls]()(ResourceOrdering)
@ -96,6 +98,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
zkMaxInFlightRequests, time, "kafka.security", "SimpleAclAuthorizer") zkMaxInFlightRequests, time, "kafka.security", "SimpleAclAuthorizer")
zkClient.createAclPaths() zkClient.createAclPaths()
extendedAclSupport = kafkaConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1
loadCache() loadCache()
startZkChangeListeners() startZkChangeListeners()
@ -161,6 +165,11 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
override def addAcls(acls: Set[Acl], resource: Resource) { override def addAcls(acls: Set[Acl], resource: Resource) {
if (acls != null && acls.nonEmpty) { if (acls != null && acls.nonEmpty) {
if (!extendedAclSupport && resource.nameType == ResourceNameType.PREFIXED) {
throw new UnsupportedVersionException(s"Adding ACLs on prefixed resource patterns requires " +
s"${KafkaConfig.InterBrokerProtocolVersionProp} of $KAFKA_2_0_IV1 or greater")
}
inWriteLock(lock) { inWriteLock(lock) {
updateResourceAcls(resource) { currentAcls => updateResourceAcls(resource) { currentAcls =>
currentAcls ++ acls currentAcls ++ acls
@ -238,13 +247,14 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
private def loadCache() { private def loadCache() {
inWriteLock(lock) { inWriteLock(lock) {
ZkAclStore.stores.foreach(store => { ZkAclStore.stores.foreach(store => {
val resourceTypes = zkClient.getResourceTypes(store.nameType) val resourceTypes = zkClient.getResourceTypes(store.patternType)
for (rType <- resourceTypes) { for (rType <- resourceTypes) {
val resourceType = ResourceType.fromString(rType) val resourceType = ResourceType.fromString(rType)
val resourceNames = zkClient.getResourceNames(store.nameType, resourceType) val resourceNames = zkClient.getResourceNames(store.patternType, resourceType)
for (resourceName <- resourceNames) { for (resourceName <- resourceNames) {
val versionedAcls = getAclsFromZk(new Resource(resourceType, resourceName, store.nameType)) val resource = new Resource(resourceType, resourceName, store.patternType)
updateCache(new Resource(resourceType, resourceName, store.nameType), versionedAcls) val versionedAcls = getAclsFromZk(resource)
updateCache(resource, versionedAcls)
} }
} }
}) })
@ -252,13 +262,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
} }
private def startZkChangeListeners(): Unit = { private def startZkChangeListeners(): Unit = {
aclChangeListeners = ZkAclStore.stores.map(store => { aclChangeListeners = ZkAclChangeStore.stores
val aclChangeListener = new ZkNodeChangeNotificationListener( .map(store => store.createListener(AclChangedNotificationHandler, zkClient))
zkClient, store.aclChangePath, AclChangeNotificationSequenceZNode.SequenceNumberPrefix, new AclChangedNotificationHandler(store))
aclChangeListener.init()
aclChangeListener
})
} }
private def logAuditMessage(principal: KafkaPrincipal, authorized: Boolean, operation: Operation, resource: Resource, host: String) { private def logAuditMessage(principal: KafkaPrincipal, authorized: Boolean, operation: Operation, resource: Resource, host: String) {
@ -350,10 +355,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
retryBackoffMs + Random.nextInt(retryBackoffJitterMs) retryBackoffMs + Random.nextInt(retryBackoffJitterMs)
} }
class AclChangedNotificationHandler(store: ZkAclStore) extends NotificationHandler { object AclChangedNotificationHandler extends AclChangeNotificationHandler {
override def processNotification(notificationMessage: Array[Byte]) { override def processNotification(resource: Resource) {
val resource: Resource = store.decode(notificationMessage)
inWriteLock(lock) { inWriteLock(lock) {
val versionedAcls = getAclsFromZk(resource) val versionedAcls = getAclsFromZk(resource)
updateCache(resource, versionedAcls) updateCache(resource, versionedAcls)

View File

@ -941,14 +941,15 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
//Acl management methods //Acl management methods
/** /**
* Creates the required zk nodes for Acl storage * Creates the required zk nodes for Acl storage and Acl change storage.
*/ */
def createAclPaths(): Unit = { def createAclPaths(): Unit = {
ZkAclStore.stores.foreach(store => { ZkAclStore.stores.foreach(store => {
createRecursive(store.aclPath, throwIfPathExists = false) createRecursive(store.aclPath, throwIfPathExists = false)
createRecursive(store.aclChangePath, throwIfPathExists = false)
ResourceType.values.foreach(resourceType => createRecursive(store.path(resourceType), throwIfPathExists = false)) ResourceType.values.foreach(resourceType => createRecursive(store.path(resourceType), throwIfPathExists = false))
}) })
ZkAclChangeStore.stores.foreach(store => createRecursive(store.aclChangePath, throwIfPathExists = false))
} }
/** /**
@ -1005,13 +1006,12 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
} }
/** /**
* Creates Acl change notification message * Creates an Acl change notification message.
* @param resource resource name * @param resource resource pattern that has changed
*/ */
def createAclChangeNotification(resource: Resource): Unit = { def createAclChangeNotification(resource: Resource): Unit = {
val store = ZkAclStore(resource.nameType) val aclChange = ZkAclStore(resource.nameType).changeStore.createChangeNode(resource)
val path = store.changeSequenceZNode.createPath val createRequest = CreateRequest(aclChange.path, aclChange.bytes, acls(aclChange.path), CreateMode.PERSISTENT_SEQUENTIAL)
val createRequest = CreateRequest(path, AclChangeNotificationSequenceZNode.encode(resource), acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
val createResponse = retryRequestUntilConnected(createRequest) val createResponse = retryRequestUntilConnected(createRequest)
createResponse.maybeThrow createResponse.maybeThrow
} }
@ -1034,10 +1034,10 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
* @throws KeeperException if there is an error while deleting Acl change notifications * @throws KeeperException if there is an error while deleting Acl change notifications
*/ */
def deleteAclChangeNotifications(): Unit = { def deleteAclChangeNotifications(): Unit = {
ZkAclStore.stores.foreach(store => { ZkAclChangeStore.stores.foreach(store => {
val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(store.aclChangePath)) val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(store.aclChangePath))
if (getChildrenResponse.resultCode == Code.OK) { if (getChildrenResponse.resultCode == Code.OK) {
deleteAclChangeNotifications(store, getChildrenResponse.children) deleteAclChangeNotifications(store.aclChangePath, getChildrenResponse.children)
} else if (getChildrenResponse.resultCode != Code.NONODE) { } else if (getChildrenResponse.resultCode != Code.NONODE) {
getChildrenResponse.maybeThrow getChildrenResponse.maybeThrow
} }
@ -1046,12 +1046,13 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
/** /**
* Deletes the Acl change notifications associated with the given sequence nodes * Deletes the Acl change notifications associated with the given sequence nodes
* @param sequenceNodes *
* @param aclChangePath the root path
* @param sequenceNodes the name of the node to delete.
*/ */
private def deleteAclChangeNotifications(store: ZkAclStore, sequenceNodes: Seq[String]): Unit = { private def deleteAclChangeNotifications(aclChangePath: String, sequenceNodes: Seq[String]): Unit = {
val aclChangeNotificationSequenceZNode = store.changeSequenceZNode
val deleteRequests = sequenceNodes.map { sequenceNode => val deleteRequests = sequenceNodes.map { sequenceNode =>
DeleteRequest(aclChangeNotificationSequenceZNode.deletePath(sequenceNode), ZkVersion.NoVersion) DeleteRequest(s"$aclChangePath/$sequenceNode", ZkVersion.NoVersion)
} }
val deleteResponses = retryRequestsUntilConnected(deleteRequests) val deleteResponses = retryRequestsUntilConnected(deleteRequests)

View File

@ -23,13 +23,15 @@ import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.core.JsonProcessingException import com.fasterxml.jackson.core.JsonProcessingException
import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr} import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
import kafka.cluster.{Broker, EndPoint} import kafka.cluster.{Broker, EndPoint}
import kafka.common.KafkaException import kafka.common.{KafkaException, NotificationHandler, ZkNodeChangeNotificationListener}
import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpoch} import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpoch}
import kafka.security.auth.Resource.Separator
import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
import kafka.security.auth.{Acl, Resource, ResourceType} import kafka.security.auth.{Acl, Resource, ResourceType}
import kafka.server.{ConfigType, DelegationTokenManager} import kafka.server.{ConfigType, DelegationTokenManager}
import kafka.utils.Json import kafka.utils.Json
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.resource.ResourceNameType import org.apache.kafka.common.resource.ResourceNameType
import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.auth.SecurityProtocol
@ -42,6 +44,7 @@ import scala.beans.BeanProperty
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.collection.{Seq, breakOut} import scala.collection.{Seq, breakOut}
import scala.util.{Failure, Success, Try}
// This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes). // This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes).
@ -446,46 +449,178 @@ object StateChangeHandlers {
} }
/** /**
* Acls for resources are stored in ZK under a root node that is determined by the [[ResourceNameType]]. * Acls for resources are stored in ZK under two root paths:
* Under each [[ResourceNameType]] node there will be one child node per resource type (Topic, Cluster, Group, etc). * <ul>
* Under each resourceType there will be a unique child for each resource path and the data for that child will contain * <li>[[org.apache.kafka.common.resource.ResourceNameType#LITERAL Literal]] patterns are stored under '/kafka-acl'.
* The format is JSON. See [[kafka.zk.ResourceZNode]] for details.</li>
* <li>All other patterns are stored under '/kafka-acl-extended/<i>pattern-type</i>'.
* The format is JSON. See [[kafka.zk.ResourceZNode]] for details.</li>
* </ul>
*
* Under each root node there will be one child node per resource type (Topic, Cluster, Group, etc).
* Under each resourceType there will be a unique child for each resource pattern and the data for that child will contain
* list of its acls as a json object. Following gives an example: * list of its acls as a json object. Following gives an example:
* *
* <pre> * <pre>
* // Literal patterns:
* /kafka-acl/Topic/topic-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]} * /kafka-acl/Topic/topic-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
* /kafka-acl/Cluster/kafka-cluster => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]} * /kafka-acl/Cluster/kafka-cluster => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
* /kafka-prefixed-acl/Group/group-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]} *
* // Prefixed patterns:
* /kafka-acl-extended/PREFIXED/Group/group-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
* </pre> * </pre>
*
* Acl change events are also stored under two paths:
* <ul>
* <li>[[org.apache.kafka.common.resource.ResourceNameType#LITERAL Literal]] patterns are stored under '/kafka-acl-changes'.
* The format is a UTF8 string in the form: &lt;resource-type&gt;:&lt;resource-name&gt;</li>
* <li>All other patterns are stored under '/kafka-acl-extended-changes'
* The format is JSON, as defined by [[kafka.zk.ExtendedAclChangeEvent]]</li>
* </ul>
*/ */
case class ZkAclStore(nameType: ResourceNameType) { sealed trait ZkAclStore {
val aclPath: String = nameType match { val patternType: ResourceNameType
case ResourceNameType.LITERAL => "/kafka-acl" val aclPath: String
case ResourceNameType.PREFIXED => "/kafka-prefixed-acl"
case _ => throw new IllegalArgumentException("Unknown name type:" + nameType)
}
val aclChangePath: String = nameType match { def path(resourceType: ResourceType): String = s"$aclPath/$resourceType"
case ResourceNameType.LITERAL => "/kafka-acl-changes"
case ResourceNameType.PREFIXED => "/kafka-prefixed-acl-changes"
case _ => throw new IllegalArgumentException("Unknown name type:" + nameType)
}
def path(resourceType: ResourceType) = s"$aclPath/$resourceType"
def path(resourceType: ResourceType, resourceName: String): String = s"$aclPath/$resourceType/$resourceName" def path(resourceType: ResourceType, resourceName: String): String = s"$aclPath/$resourceType/$resourceName"
def changeSequenceZNode: AclChangeNotificationSequenceZNode = AclChangeNotificationSequenceZNode(this) def changeStore: ZkAclChangeStore
def decode(notificationMessage: Array[Byte]): Resource = AclChangeNotificationSequenceZNode.decode(nameType, notificationMessage)
} }
object ZkAclStore { object ZkAclStore {
val stores: Seq[ZkAclStore] = ResourceNameType.values private val storesByType: Map[ResourceNameType, ZkAclStore] = ResourceNameType.values
.filter(nameType => nameType != ResourceNameType.ANY && nameType != ResourceNameType.UNKNOWN) .filter(nameType => nameType != ResourceNameType.ANY && nameType != ResourceNameType.UNKNOWN)
.map(nameType => ZkAclStore(nameType)) .map(nameType => (nameType, create(nameType)))
.toMap
val securePaths: Seq[String] = stores val stores: Iterable[ZkAclStore] = storesByType.values
.flatMap(store => List(store.aclPath, store.aclChangePath))
val securePaths: Iterable[String] = stores
.flatMap(store => Set(store.aclPath, store.changeStore.aclChangePath))
def apply(patternType: ResourceNameType): ZkAclStore = {
storesByType.get(patternType) match {
case Some(store) => store
case None => throw new KafkaException(s"Invalid pattern type: $patternType")
}
}
private def create(patternType: ResourceNameType) = {
patternType match {
case ResourceNameType.LITERAL => LiteralAclStore
case _ => new ExtendedAclStore(patternType)
}
}
}
object LiteralAclStore extends ZkAclStore {
val patternType: ResourceNameType = ResourceNameType.LITERAL
val aclPath: String = "/kafka-acl"
def changeStore: ZkAclChangeStore = LiteralAclChangeStore
}
class ExtendedAclStore(val patternType: ResourceNameType) extends ZkAclStore {
if (patternType == ResourceNameType.LITERAL)
throw new IllegalArgumentException("Literal pattern types are not supported")
val aclPath: String = s"/kafka-acl-extended/${patternType.name.toLowerCase}"
def changeStore: ZkAclChangeStore = ExtendedAclChangeStore
}
trait AclChangeNotificationHandler {
def processNotification(resource: Resource): Unit
}
trait AclChangeSubscription extends AutoCloseable {
def close(): Unit
}
case class AclChangeNode(path: String, bytes: Array[Byte])
sealed trait ZkAclChangeStore {
val aclChangePath: String
def createPath: String = s"$aclChangePath/${ZkAclChangeStore.SequenceNumberPrefix}"
def decode(bytes: Array[Byte]): Resource
protected def encode(resource: Resource): Array[Byte]
def createChangeNode(resource: Resource): AclChangeNode = AclChangeNode(createPath, encode(resource))
def createListener(handler: AclChangeNotificationHandler, zkClient: KafkaZkClient): AclChangeSubscription = {
val rawHandler: NotificationHandler = new NotificationHandler {
def processNotification(bytes: Array[Byte]): Unit =
handler.processNotification(decode(bytes))
}
val aclChangeListener = new ZkNodeChangeNotificationListener(
zkClient, aclChangePath, ZkAclChangeStore.SequenceNumberPrefix, rawHandler)
aclChangeListener.init()
new AclChangeSubscription {
def close(): Unit = aclChangeListener.close()
}
}
}
object ZkAclChangeStore {
val stores: Iterable[ZkAclChangeStore] = List(LiteralAclChangeStore, ExtendedAclChangeStore)
def SequenceNumberPrefix = "acl_changes_"
}
case object LiteralAclChangeStore extends ZkAclChangeStore {
val name = "LiteralAclChangeStore"
val aclChangePath: String = "/kafka-acl-changes"
def encode(resource: Resource): Array[Byte] = {
if (resource.nameType != ResourceNameType.LITERAL)
throw new IllegalArgumentException("Only literal resource patterns can be encoded")
val legacyName = resource.resourceType + Resource.Separator + resource.name
legacyName.getBytes(UTF_8)
}
def decode(bytes: Array[Byte]): Resource = {
val string = new String(bytes, UTF_8)
string.split(Separator, 2) match {
case Array(resourceType, resourceName, _*) => new Resource(ResourceType.fromString(resourceType), resourceName, ResourceNameType.LITERAL)
case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + string)
}
}
}
case object ExtendedAclChangeStore extends ZkAclChangeStore {
val name = "ExtendedAclChangeStore"
val aclChangePath: String = "/kafka-acl-extended-changes"
def encode(resource: Resource): Array[Byte] = {
if (resource.nameType == ResourceNameType.LITERAL)
throw new IllegalArgumentException("Literal pattern types are not supported")
Json.encodeAsBytes(ExtendedAclChangeEvent(
ExtendedAclChangeEvent.currentVersion,
resource.resourceType.name,
resource.name,
resource.nameType.name))
}
def decode(bytes: Array[Byte]): Resource = {
val changeEvent = Json.parseBytesAs[ExtendedAclChangeEvent](bytes) match {
case Right(event) => event
case Left(e) => throw new IllegalArgumentException("Failed to parse ACL change event", e)
}
changeEvent.toResource match {
case Success(r) => r
case Failure(e) => throw new IllegalArgumentException("Failed to convert ACL change event to resource", e)
}
}
} }
object ResourceZNode { object ResourceZNode {
@ -495,26 +630,24 @@ object ResourceZNode {
def decode(bytes: Array[Byte], stat: Stat): VersionedAcls = VersionedAcls(Acl.fromBytes(bytes), stat.getVersion) def decode(bytes: Array[Byte], stat: Stat): VersionedAcls = VersionedAcls(Acl.fromBytes(bytes), stat.getVersion)
} }
object AclChangeNotificationSequenceZNode { object ExtendedAclChangeEvent {
val Separator = ":" val currentVersion: Int = 1
def SequenceNumberPrefix = "acl_changes_"
def encode(resource: Resource): Array[Byte] = {
(resource.resourceType.name + Separator + resource.name).getBytes(UTF_8)
}
def decode(nameType: ResourceNameType, bytes: Array[Byte]): Resource = {
val str = new String(bytes, UTF_8)
str.split(Separator, 2) match {
case Array(resourceType, name, _*) => Resource(ResourceType.fromString(resourceType), name, nameType)
case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str)
}
}
} }
case class AclChangeNotificationSequenceZNode(store: ZkAclStore) { case class ExtendedAclChangeEvent(@BeanProperty @JsonProperty("version") version: Int,
def createPath = s"${store.aclChangePath}/${AclChangeNotificationSequenceZNode.SequenceNumberPrefix}" @BeanProperty @JsonProperty("resourceType") resourceType: String,
def deletePath(sequenceNode: String) = s"${store.aclChangePath}/$sequenceNode" @BeanProperty @JsonProperty("name") name: String,
@BeanProperty @JsonProperty("resourceNameType") resourceNameType: String) {
if (version > ExtendedAclChangeEvent.currentVersion)
throw new UnsupportedVersionException(s"Acl change event received for unsupported version: $version")
def toResource: Try[Resource] = {
for {
resType <- Try(ResourceType.fromString(resourceType))
nameType <- Try(ResourceNameType.fromString(resourceNameType))
resource = Resource(resType, name, nameType)
} yield resource
}
} }
object ClusterZNode { object ClusterZNode {

View File

@ -24,10 +24,15 @@ import org.junit.Assert._
class ResourceTest { class ResourceTest {
@Test(expected = classOf[KafkaException]) @Test(expected = classOf[KafkaException])
def shouldThrowTwoPartStringWithUnknownResourceType(): Unit = { def shouldThrowOnTwoPartStringWithUnknownResourceType(): Unit = {
Resource.fromString("Unknown:fred") Resource.fromString("Unknown:fred")
} }
@Test(expected = classOf[KafkaException])
def shouldThrowOnBadResourceTypeSeparator(): Unit = {
Resource.fromString("Topic-fred")
}
@Test @Test
def shouldParseOldTwoPartString(): Unit = { def shouldParseOldTwoPartString(): Unit = {
assertEquals(Resource(Group, "fred", LITERAL), Resource.fromString("Group:fred")) assertEquals(Resource(Group, "fred", LITERAL), Resource.fromString("Group:fred"))
@ -41,14 +46,14 @@ class ResourceTest {
@Test @Test
def shouldParseThreePartString(): Unit = { def shouldParseThreePartString(): Unit = {
assertEquals(Resource(Group, "fred", PREFIXED), Resource.fromString("PREFIXED:Group:fred")) assertEquals(Resource(Group, "fred", PREFIXED), Resource.fromString("Group:PREFIXED:fred"))
assertEquals(Resource(Topic, "t", LITERAL), Resource.fromString("LITERAL:Topic:t")) assertEquals(Resource(Topic, "t", LITERAL), Resource.fromString("Topic:LITERAL:t"))
} }
@Test @Test
def shouldParseThreePartWithEmbeddedSeparators(): Unit = { def shouldParseThreePartWithEmbeddedSeparators(): Unit = {
assertEquals(Resource(Group, ":This:is:a:weird:group:name:", PREFIXED), Resource.fromString("PREFIXED:Group::This:is:a:weird:group:name:")) assertEquals(Resource(Group, ":This:is:a:weird:group:name:", PREFIXED), Resource.fromString("Group:PREFIXED::This:is:a:weird:group:name:"))
assertEquals(Resource(Group, ":This:is:a:weird:group:name:", LITERAL), Resource.fromString("LITERAL:Group::This:is:a:weird:group:name:")) assertEquals(Resource(Group, ":This:is:a:weird:group:name:", LITERAL), Resource.fromString("Group:LITERAL::This:is:a:weird:group:name:"))
} }
@Test @Test

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.zk
import kafka.security.auth.{Resource, Topic}
import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED}
import org.junit.Assert.assertEquals
import org.junit.Test
class ExtendedAclStoreTest {
private val literalResource = Resource(Topic, "some-topic", LITERAL)
private val prefixedResource = Resource(Topic, "some-topic", PREFIXED)
private val store = new ExtendedAclStore(PREFIXED)
@Test
def shouldHaveCorrectPaths(): Unit = {
assertEquals("/kafka-acl-extended/prefixed", store.aclPath)
assertEquals("/kafka-acl-extended/prefixed/Topic", store.path(Topic))
assertEquals("/kafka-acl-extended-changes", store.changeStore.aclChangePath)
}
@Test
def shouldHaveCorrectPatternType(): Unit = {
assertEquals(PREFIXED, store.patternType)
}
@Test(expected = classOf[IllegalArgumentException])
def shouldThrowIfConstructedWithLiteral(): Unit = {
new ExtendedAclStore(LITERAL)
}
@Test(expected = classOf[IllegalArgumentException])
def shouldThrowFromEncodeOnLiteral(): Unit = {
store.changeStore.createChangeNode(literalResource)
}
@Test
def shouldWriteChangesToTheWritePath(): Unit = {
val changeNode = store.changeStore.createChangeNode(prefixedResource)
assertEquals("/kafka-acl-extended-changes/acl_changes_", changeNode.path)
}
@Test
def shouldRoundTripChangeNode(): Unit = {
val changeNode = store.changeStore.createChangeNode(prefixedResource)
val actual = store.changeStore.decode(changeNode.bytes)
assertEquals(prefixedResource, actual)
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.zk
import kafka.security.auth.{Resource, Topic}
import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED}
import org.junit.Assert.assertEquals
import org.junit.Test
class LiteralAclStoreTest {
private val literalResource = Resource(Topic, "some-topic", LITERAL)
private val prefixedResource = Resource(Topic, "some-topic", PREFIXED)
private val store = LiteralAclStore
@Test
def shouldHaveCorrectPaths(): Unit = {
assertEquals("/kafka-acl", store.aclPath)
assertEquals("/kafka-acl/Topic", store.path(Topic))
assertEquals("/kafka-acl-changes", store.changeStore.aclChangePath)
}
@Test
def shouldHaveCorrectPatternType(): Unit = {
assertEquals(LITERAL, store.patternType)
}
@Test(expected = classOf[IllegalArgumentException])
def shouldThrowFromEncodeOnNoneLiteral(): Unit = {
store.changeStore.createChangeNode(prefixedResource)
}
@Test
def shouldWriteChangesToTheWritePath(): Unit = {
val changeNode = store.changeStore.createChangeNode(literalResource)
assertEquals("/kafka-acl-changes/acl_changes_", changeNode.path)
}
@Test
def shouldRoundTripChangeNode(): Unit = {
val changeNode = store.changeStore.createChangeNode(literalResource)
val actual = store.changeStore.decode(changeNode.bytes)
assertEquals(literalResource, actual)
}
}

View File

@ -18,43 +18,44 @@ package kafka.common
import kafka.security.auth.{Group, Resource} import kafka.security.auth.{Group, Resource}
import kafka.utils.TestUtils import kafka.utils.TestUtils
import kafka.zk.{AclChangeNotificationSequenceZNode, ZkAclStore, ZooKeeperTestHarness} import kafka.zk.{LiteralAclChangeStore, LiteralAclStore, ZkAclChangeStore, ZooKeeperTestHarness}
import org.apache.kafka.common.resource.ResourceNameType.LITERAL import org.apache.kafka.common.resource.ResourceNameType.LITERAL
import org.junit.{After, Test} import org.junit.{After, Before, Test}
import scala.collection.mutable.ArrayBuffer
class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness { class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness {
var notificationListener: ZkNodeChangeNotificationListener = _ private val changeExpirationMs = 1000
private var notificationListener: ZkNodeChangeNotificationListener = _
private var notificationHandler: TestNotificationHandler = _
@Before
override def setUp(): Unit = {
super.setUp()
zkClient.createAclPaths()
notificationHandler = new TestNotificationHandler()
}
@After @After
override def tearDown(): Unit = { override def tearDown(): Unit = {
if (notificationListener != null) { if (notificationListener != null) {
notificationListener.close() notificationListener.close()
} }
super.tearDown()
} }
@Test @Test
def testProcessNotification() { def testProcessNotification() {
@volatile var notification: Resource = null
@volatile var invocationCount = 0
val notificationHandler = new NotificationHandler {
override def processNotification(notificationMessage: Array[Byte]): Unit = {
notification = AclChangeNotificationSequenceZNode.decode(LITERAL, notificationMessage)
invocationCount += 1
}
}
zkClient.createAclPaths()
val notificationMessage1 = Resource(Group, "messageA", LITERAL) val notificationMessage1 = Resource(Group, "messageA", LITERAL)
val notificationMessage2 = Resource(Group, "messageB", LITERAL) val notificationMessage2 = Resource(Group, "messageB", LITERAL)
val changeExpirationMs = 1000
notificationListener = new ZkNodeChangeNotificationListener(zkClient, ZkAclStore(LITERAL).aclChangePath, notificationListener = new ZkNodeChangeNotificationListener(zkClient, LiteralAclChangeStore.aclChangePath,
AclChangeNotificationSequenceZNode.SequenceNumberPrefix, notificationHandler, changeExpirationMs) ZkAclChangeStore.SequenceNumberPrefix, notificationHandler, changeExpirationMs)
notificationListener.init() notificationListener.init()
zkClient.createAclChangeNotification(notificationMessage1) zkClient.createAclChangeNotification(notificationMessage1)
TestUtils.waitUntilTrue(() => invocationCount == 1 && notification == notificationMessage1, TestUtils.waitUntilTrue(() => notificationHandler.received().size == 1 && notificationHandler.received().last == notificationMessage1,
"Failed to send/process notification message in the timeout period.") "Failed to send/process notification message in the timeout period.")
/* /*
@ -66,12 +67,43 @@ class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness {
*/ */
zkClient.createAclChangeNotification(notificationMessage2) zkClient.createAclChangeNotification(notificationMessage2)
TestUtils.waitUntilTrue(() => invocationCount == 2 && notification == notificationMessage2, TestUtils.waitUntilTrue(() => notificationHandler.received().size == 2 && notificationHandler.received().last == notificationMessage2,
"Failed to send/process notification message in the timeout period.") "Failed to send/process notification message in the timeout period.")
(3 to 10).foreach(i => zkClient.createAclChangeNotification(Resource(Group, "message" + i, LITERAL))) (3 to 10).foreach(i => zkClient.createAclChangeNotification(Resource(Group, "message" + i, LITERAL)))
TestUtils.waitUntilTrue(() => invocationCount == 10 , TestUtils.waitUntilTrue(() => notificationHandler.received().size == 10,
s"Expected 10 invocations of processNotifications, but there were $invocationCount") s"Expected 10 invocations of processNotifications, but there were ${notificationHandler.received()}")
}
@Test
def testSwallowsProcessorException() : Unit = {
notificationHandler.setThrowSize(2)
notificationListener = new ZkNodeChangeNotificationListener(zkClient, LiteralAclChangeStore.aclChangePath,
ZkAclChangeStore.SequenceNumberPrefix, notificationHandler, changeExpirationMs)
notificationListener.init()
zkClient.createAclChangeNotification(Resource(Group, "messageA", LITERAL))
zkClient.createAclChangeNotification(Resource(Group, "messageB", LITERAL))
zkClient.createAclChangeNotification(Resource(Group, "messageC", LITERAL))
TestUtils.waitUntilTrue(() => notificationHandler.received().size == 3,
s"Expected 2 invocations of processNotifications, but there were ${notificationHandler.received()}")
}
private class TestNotificationHandler extends NotificationHandler {
private val messages = ArrayBuffer.empty[Resource]
@volatile private var throwSize = Option.empty[Int]
override def processNotification(notificationMessage: Array[Byte]): Unit = {
messages += LiteralAclStore.changeStore.decode(notificationMessage)
if (throwSize.contains(messages.size))
throw new RuntimeException("Oh no, my processing failed!")
}
def received(): Seq[Resource] = messages
def setThrowSize(index: Int): Unit = throwSize = Option(index)
} }
} }

View File

@ -17,15 +17,21 @@
package kafka.security.auth package kafka.security.auth
import java.net.InetAddress import java.net.InetAddress
import java.nio.charset.StandardCharsets.UTF_8
import java.util.UUID import java.util.UUID
import kafka.api.{ApiVersion, KAFKA_2_0_IV0, KAFKA_2_0_IV1}
import kafka.network.RequestChannel.Session import kafka.network.RequestChannel.Session
import kafka.security.auth.Acl.{WildCardHost, WildCardResource} import kafka.security.auth.Acl.{WildCardHost, WildCardResource}
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.utils.TestUtils import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness import kafka.zk.{ZkAclStore, ZooKeeperTestHarness}
import kafka.zookeeper.{GetChildrenRequest, GetDataRequest, ZooKeeperClient}
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.resource.ResourceNameType
import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED} import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED}
import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Time
import org.junit.Assert._ import org.junit.Assert._
import org.junit.{After, Before, Test} import org.junit.{After, Before, Test}
@ -47,7 +53,8 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
val username = "alice" val username = "alice"
val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val session = Session(principal, testHostName) val session = Session(principal, testHostName)
var config: KafkaConfig = null var config: KafkaConfig = _
private var zooKeeperClient: ZooKeeperClient = _
@Before @Before
override def setUp() { override def setUp() {
@ -64,12 +71,16 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
simpleAclAuthorizer.configure(config.originals) simpleAclAuthorizer.configure(config.originals)
simpleAclAuthorizer2.configure(config.originals) simpleAclAuthorizer2.configure(config.originals)
resource = Resource(Topic, "foo-" + UUID.randomUUID(), LITERAL) resource = Resource(Topic, "foo-" + UUID.randomUUID(), LITERAL)
zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests,
Time.SYSTEM, "kafka.test", "SimpleAclAuthorizerTest")
} }
@After @After
override def tearDown(): Unit = { override def tearDown(): Unit = {
simpleAclAuthorizer.close() simpleAclAuthorizer.close()
simpleAclAuthorizer2.close() simpleAclAuthorizer2.close()
zooKeeperClient.close()
super.tearDown() super.tearDown()
} }
@ -553,6 +564,88 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
assertEquals(4, simpleAclAuthorizer.getAcls(principal).size) assertEquals(4, simpleAclAuthorizer.getAcls(principal).size)
} }
@Test(expected = classOf[UnsupportedVersionException])
def testThrowsOnAddPrefixedAclIfInterBrokerProtocolVersionTooLow(): Unit = {
givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV0))
simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", PREFIXED))
}
@Test
def testWritesExtendedAclChangeEventIfInterBrokerProtocolNotSet(): Unit = {
givenAuthorizerWithProtocolVersion(Option.empty)
val resource = Resource(Topic, "z_other", PREFIXED)
val expected = new String(ZkAclStore(PREFIXED).changeStore.createChangeNode(resource).bytes, UTF_8)
simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource)
val actual = getAclChangeEventAsString(PREFIXED)
assertEquals(expected, actual)
}
@Test
def testWritesExtendedAclChangeEventWhenInterBrokerProtocolAtLeastKafkaV2(): Unit = {
givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV1))
val resource = Resource(Topic, "z_other", PREFIXED)
val expected = new String(ZkAclStore(PREFIXED).changeStore.createChangeNode(resource).bytes, UTF_8)
simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource)
val actual = getAclChangeEventAsString(PREFIXED)
assertEquals(expected, actual)
}
@Test
def testWritesLiteralWritesLiteralAclChangeEventWhenInterBrokerProtocolLessThanKafkaV2eralAclChangesForOlderProtocolVersions(): Unit = {
givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV0))
val resource = Resource(Topic, "z_other", LITERAL)
val expected = new String(ZkAclStore(LITERAL).changeStore.createChangeNode(resource).bytes, UTF_8)
simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource)
val actual = getAclChangeEventAsString(LITERAL)
assertEquals(expected, actual)
}
@Test
def testWritesLiteralAclChangeEventWhenInterBrokerProtocolIsKafkaV2(): Unit = {
givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV1))
val resource = Resource(Topic, "z_other", LITERAL)
val expected = new String(ZkAclStore(LITERAL).changeStore.createChangeNode(resource).bytes, UTF_8)
simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource)
val actual = getAclChangeEventAsString(LITERAL)
assertEquals(expected, actual)
}
private def givenAuthorizerWithProtocolVersion(protocolVersion: Option[ApiVersion]) {
simpleAclAuthorizer.close()
val props = TestUtils.createBrokerConfig(0, zkConnect)
props.put(SimpleAclAuthorizer.SuperUsersProp, superUsers)
protocolVersion.foreach(version => props.put(KafkaConfig.InterBrokerProtocolVersionProp, version.toString))
config = KafkaConfig.fromProps(props)
simpleAclAuthorizer.configure(config.originals)
}
private def getAclChangeEventAsString(patternType: ResourceNameType) = {
val store = ZkAclStore(patternType)
val children = zooKeeperClient.handleRequest(GetChildrenRequest(store.changeStore.aclChangePath))
children.maybeThrow()
assertEquals("Expecting 1 change event", 1, children.children.size)
val data = zooKeeperClient.handleRequest(GetDataRequest(s"${store.changeStore.aclChangePath}/${children.children.head}"))
data.maybeThrow()
new String(data.data, UTF_8)
}
private def changeAclAndVerify(originalAcls: Set[Acl], addedAcls: Set[Acl], removedAcls: Set[Acl], resource: Resource = resource): Set[Acl] = { private def changeAclAndVerify(originalAcls: Set[Acl], addedAcls: Set[Acl], removedAcls: Set[Acl], resource: Resource = resource): Set[Acl] = {
var acls = originalAcls var acls = originalAcls

View File

@ -34,11 +34,11 @@ import org.apache.kafka.common.utils.{SecurityUtils, Time}
import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException} import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException}
import org.junit.Assert._ import org.junit.Assert._
import org.junit.{After, Before, Test} import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.collection.{Seq, mutable} import scala.collection.{Seq, mutable}
import scala.util.Random import scala.util.Random
import kafka.controller.LeaderIsrAndControllerEpoch import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import kafka.zookeeper._ import kafka.zookeeper._
@ -426,10 +426,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
@Test @Test
def testAclManagementMethods() { def testAclManagementMethods() {
ZkAclStore.stores.foreach(store => { ZkAclStore.stores.foreach(store => {
assertFalse(zkClient.pathExists(store.aclPath)) assertFalse(zkClient.pathExists(store.aclPath))
assertFalse(zkClient.pathExists(store.aclChangePath)) assertFalse(zkClient.pathExists(store.changeStore.aclChangePath))
ResourceType.values.foreach(resource => assertFalse(zkClient.pathExists(store.path(resource)))) ResourceType.values.foreach(resource => assertFalse(zkClient.pathExists(store.path(resource))))
}) })
@ -438,11 +437,11 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
ZkAclStore.stores.foreach(store => { ZkAclStore.stores.foreach(store => {
assertTrue(zkClient.pathExists(store.aclPath)) assertTrue(zkClient.pathExists(store.aclPath))
assertTrue(zkClient.pathExists(store.aclChangePath)) assertTrue(zkClient.pathExists(store.changeStore.aclChangePath))
ResourceType.values.foreach(resource => assertTrue(zkClient.pathExists(store.path(resource)))) ResourceType.values.foreach(resource => assertTrue(zkClient.pathExists(store.path(resource))))
val resource1 = new Resource(Topic, UUID.randomUUID().toString, store.nameType) val resource1 = new Resource(Topic, UUID.randomUUID().toString, store.patternType)
val resource2 = new Resource(Topic, UUID.randomUUID().toString, store.nameType) val resource2 = new Resource(Topic, UUID.randomUUID().toString, store.patternType)
// try getting acls for non-existing resource // try getting acls for non-existing resource
var versionedAcls = zkClient.getVersionedAclsForResource(resource1) var versionedAcls = zkClient.getVersionedAclsForResource(resource1)
@ -472,10 +471,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertEquals(1, versionedAcls.zkVersion) assertEquals(1, versionedAcls.zkVersion)
//get resource Types //get resource Types
assertTrue(ResourceType.values.map( rt => rt.name).toSet == zkClient.getResourceTypes(store.nameType).toSet) assertTrue(ResourceType.values.map( rt => rt.name).toSet == zkClient.getResourceTypes(store.patternType).toSet)
//get resource name //get resource name
val resourceNames = zkClient.getResourceNames(store.nameType, Topic) val resourceNames = zkClient.getResourceNames(store.patternType, Topic)
assertEquals(2, resourceNames.size) assertEquals(2, resourceNames.size)
assertTrue(Set(resource1.name,resource2.name) == resourceNames.toSet) assertTrue(Set(resource1.name,resource2.name) == resourceNames.toSet)
@ -488,14 +487,13 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
//delete with valid expected zk version //delete with valid expected zk version
assertTrue(zkClient.conditionalDelete(resource2, 0)) assertTrue(zkClient.conditionalDelete(resource2, 0))
zkClient.createAclChangeNotification(Resource(Group, "resource1", store.patternType))
zkClient.createAclChangeNotification(Resource(Topic, "resource2", store.patternType))
zkClient.createAclChangeNotification(Resource(Group, "resource1", store.nameType)) assertEquals(2, zkClient.getChildren(store.changeStore.aclChangePath).size)
zkClient.createAclChangeNotification(Resource(Topic, "resource2", store.nameType))
assertEquals(2, zkClient.getChildren(store.aclChangePath).size)
zkClient.deleteAclChangeNotifications() zkClient.deleteAclChangeNotifications()
assertTrue(zkClient.getChildren(store.aclChangePath).isEmpty) assertTrue(zkClient.getChildren(store.changeStore.aclChangePath).isEmpty)
}) })
} }

View File

@ -64,7 +64,7 @@
<li>ACLs should not be added to prefixed resources, <li>ACLs should not be added to prefixed resources,
(added in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs">KIP-290</a>), (added in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs">KIP-290</a>),
until all brokers in the cluster have been updated. until all brokers in the cluster have been updated.
<p><b>NOTE:</b> any prefixed ACLs added to a cluster will be ignored should the cluster be downgraded again. <p><b>NOTE:</b> any prefixed ACLs added to a cluster, even after the cluster is fully upgraded, will be ignored should the cluster be downgraded again.
</li> </li>
</ol> </ol>