KAFKA-6726: Fine Grained ACL for CreateTopics (KIP-277) (#4795)

- CreateTopicsRequest now requires Create auth on Topic resource
or Create on Cluster resource.
- AclCommand --producer option adjusted
- Existing unit and Integration tests adjusted accordingly and
new tests added.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
Edoardo Comar 2018-06-06 11:36:52 +01:00 committed by Ismael Juma
parent be8808dd4b
commit 0c035c46b4
9 changed files with 155 additions and 101 deletions

View File

@ -73,7 +73,7 @@ public class CreateTopicsResponse extends AbstractResponse {
* *
* REQUEST_TIMED_OUT(7) * REQUEST_TIMED_OUT(7)
* INVALID_TOPIC_EXCEPTION(17) * INVALID_TOPIC_EXCEPTION(17)
* CLUSTER_AUTHORIZATION_FAILED(31) * TOPIC_AUTHORIZATION_FAILED(29)
* TOPIC_ALREADY_EXISTS(36) * TOPIC_ALREADY_EXISTS(36)
* INVALID_PARTITIONS(37) * INVALID_PARTITIONS(37)
* INVALID_REPLICATION_FACTOR(38) * INVALID_REPLICATION_FACTOR(38)
@ -81,6 +81,7 @@ public class CreateTopicsResponse extends AbstractResponse {
* INVALID_CONFIG(40) * INVALID_CONFIG(40)
* NOT_CONTROLLER(41) * NOT_CONTROLLER(41)
* INVALID_REQUEST(42) * INVALID_REQUEST(42)
* POLICY_VIOLATION(44)
*/ */
private final Map<String, ApiError> errors; private final Map<String, ApiError> errors;

View File

@ -31,7 +31,7 @@ object AclCommand extends Logging {
val Newline = scala.util.Properties.lineSeparator val Newline = scala.util.Properties.lineSeparator
val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] ( val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] (
Topic -> Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs, All), Topic -> Set(Read, Write, Create, Describe, Delete, DescribeConfigs, AlterConfigs, All),
Group -> Set(Read, Describe, Delete, All), Group -> Set(Read, Describe, Delete, All),
Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe, All), Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe, All),
TransactionalId -> Set(Describe, Write, All), TransactionalId -> Set(Describe, Write, All),
@ -153,13 +153,16 @@ object AclCommand extends Logging {
val transactionalIds: Set[Resource] = getResource(opts).filter(_.resourceType == TransactionalId) val transactionalIds: Set[Resource] = getResource(opts).filter(_.resourceType == TransactionalId)
val enableIdempotence = opts.options.has(opts.idempotentOpt) val enableIdempotence = opts.options.has(opts.idempotentOpt)
val acls = getAcl(opts, Set(Write, Describe)) val topicAcls = getAcl(opts, Set(Write, Describe, Create))
val transactionalIdAcls = getAcl(opts, Set(Write, Describe))
//Write, Describe permission on topics, Create permission on cluster, Write, Describe on transactionalIds //Write, Describe, Create permission on topics, Write, Describe on transactionalIds
topics.map(_ -> acls).toMap[Resource, Set[Acl]] ++ topics.map(_ -> topicAcls).toMap ++
transactionalIds.map(_ -> acls).toMap[Resource, Set[Acl]] + transactionalIds.map(_ -> transactionalIdAcls).toMap ++
(Resource.ClusterResource -> (getAcl(opts, Set(Create)) ++ (if (enableIdempotence)
(if (enableIdempotence) getAcl(opts, Set(IdempotentWrite)) else Set.empty[Acl]))) Map(Resource.ClusterResource -> getAcl(opts, Set(IdempotentWrite)))
else
Map.empty)
} }
private def getConsumerResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = { private def getConsumerResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
@ -168,12 +171,12 @@ object AclCommand extends Logging {
val topics: Set[Resource] = getResource(opts).filter(_.resourceType == Topic) val topics: Set[Resource] = getResource(opts).filter(_.resourceType == Topic)
val groups: Set[Resource] = resources.filter(_.resourceType == Group) val groups: Set[Resource] = resources.filter(_.resourceType == Group)
//Read,Describe on topic, Read on consumerGroup + Create on cluster //Read, Describe on topic, Read on consumerGroup
val acls = getAcl(opts, Set(Read, Describe)) val acls = getAcl(opts, Set(Read, Describe))
topics.map(_ -> acls).toMap[Resource, Set[Acl]] ++ topics.map(_ -> acls).toMap ++
groups.map(_ -> getAcl(opts, Set(Read))).toMap[Resource, Set[Acl]] groups.map(_ -> getAcl(opts, Set(Read))).toMap
} }
private def getCliResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = { private def getCliResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
@ -355,7 +358,7 @@ object AclCommand extends Logging {
.ofType(classOf[String]) .ofType(classOf[String])
val producerOpt = parser.accepts("producer", "Convenience option to add/remove ACLs for producer role. " + val producerOpt = parser.accepts("producer", "Convenience option to add/remove ACLs for producer role. " +
"This will generate ACLs that allows WRITE,DESCRIBE on topic and CREATE on cluster. ") "This will generate ACLs that allows WRITE,DESCRIBE and CREATE on topic.")
val consumerOpt = parser.accepts("consumer", "Convenience option to add/remove ACLs for consumer role. " + val consumerOpt = parser.accepts("consumer", "Convenience option to add/remove ACLs for consumer role. " +
"This will generate ACLs that allows READ,DESCRIBE on topic and READ on group.") "This will generate ACLs that allows READ,DESCRIBE on topic and READ on group.")

View File

@ -62,6 +62,7 @@ import scala.collection.JavaConverters._
import scala.collection._ import scala.collection._
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try} import scala.util.{Failure, Success, Try}
import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
/** /**
* Logic to handle the various Kafka requests * Logic to handle the various Kafka requests
@ -1040,8 +1041,10 @@ class KafkaApis(val requestChannel: RequestChannel,
val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics) val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics)
if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) { if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
if (!authorize(request.session, Create, Resource.ClusterResource)) { if (!authorize(request.session, Create, Resource.ClusterResource)) {
authorizedTopics --= nonExistingTopics unauthorizedForCreateTopics = nonExistingTopics.filter { topic =>
unauthorizedForCreateTopics ++= nonExistingTopics !authorize(request.session, Create, new Resource(Topic, topic))
}
authorizedTopics --= unauthorizedForCreateTopics
} }
} }
} }
@ -1424,16 +1427,20 @@ class KafkaApis(val requestChannel: RequestChannel,
(topic, new ApiError(Errors.NOT_CONTROLLER, null)) (topic, new ApiError(Errors.NOT_CONTROLLER, null))
} }
sendResponseCallback(results) sendResponseCallback(results)
} else if (!authorize(request.session, Create, Resource.ClusterResource)) {
val results = createTopicsRequest.topics.asScala.map { case (topic, _) =>
(topic, new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null))
}
sendResponseCallback(results)
} else { } else {
val (validTopics, duplicateTopics) = createTopicsRequest.topics.asScala.partition { case (topic, _) => val (validTopics, duplicateTopics) = createTopicsRequest.topics.asScala.partition { case (topic, _) =>
!createTopicsRequest.duplicateTopics.contains(topic) !createTopicsRequest.duplicateTopics.contains(topic)
} }
val (authorizedTopics, unauthorizedTopics) =
if (authorize(request.session, Create, Resource.ClusterResource)) {
(validTopics, Map[String, TopicDetails]())
} else {
validTopics.partition { case (topic, _) =>
authorize(request.session, Create, new Resource(Topic, topic))
}
}
// Special handling to add duplicate topics to the response // Special handling to add duplicate topics to the response
def sendResponseWithDuplicatesCallback(results: Map[String, ApiError]): Unit = { def sendResponseWithDuplicatesCallback(results: Map[String, ApiError]): Unit = {
@ -1447,14 +1454,15 @@ class KafkaApis(val requestChannel: RequestChannel,
duplicateTopics.keySet.map((_, new ApiError(Errors.INVALID_REQUEST, errorMessage))).toMap duplicateTopics.keySet.map((_, new ApiError(Errors.INVALID_REQUEST, errorMessage))).toMap
} else Map.empty } else Map.empty
val completeResults = results ++ duplicatedTopicsResults val unauthorizedTopicsResults = unauthorizedTopics.keySet.map(_ -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null))
val completeResults = results ++ duplicatedTopicsResults ++ unauthorizedTopicsResults
sendResponseCallback(completeResults) sendResponseCallback(completeResults)
} }
adminManager.createTopics( adminManager.createTopics(
createTopicsRequest.timeout, createTopicsRequest.timeout,
createTopicsRequest.validateOnly, createTopicsRequest.validateOnly,
validTopics, authorizedTopics,
sendResponseWithDuplicatesCallback sendResponseWithDuplicatesCallback
) )
} }

View File

@ -17,6 +17,7 @@ import java.util
import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutionException
import java.util.regex.Pattern import java.util.regex.Pattern
import java.util.{ArrayList, Collections, Properties} import java.util.{ArrayList, Collections, Properties}
import java.time.Duration
import kafka.admin.AdminClient import kafka.admin.AdminClient
import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, KafkaConsumerGroupService} import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, KafkaConsumerGroupService}
@ -73,6 +74,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val groupResource = new Resource(Group, group) val groupResource = new Resource(Group, group)
val deleteTopicResource = new Resource(Topic, deleteTopic) val deleteTopicResource = new Resource(Topic, deleteTopic)
val transactionalIdResource = new Resource(TransactionalId, transactionalId) val transactionalIdResource = new Resource(TransactionalId, transactionalId)
val createTopicResource = new Resource(Topic, createTopic)
val groupReadAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read))) val groupReadAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)))
val groupDescribeAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe))) val groupDescribeAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
@ -82,6 +84,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter))) val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter)))
val clusterDescribeAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe))) val clusterDescribeAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
val clusterIdempotentWriteAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, IdempotentWrite))) val clusterIdempotentWriteAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, IdempotentWrite)))
val topicCreateAcl = Map(createTopicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)))
val topicReadAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read))) val topicReadAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)))
val topicWriteAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write))) val topicWriteAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)))
val topicDescribeAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe))) val topicDescribeAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
@ -207,7 +210,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.LEADER_AND_ISR -> clusterAcl, ApiKeys.LEADER_AND_ISR -> clusterAcl,
ApiKeys.STOP_REPLICA -> clusterAcl, ApiKeys.STOP_REPLICA -> clusterAcl,
ApiKeys.CONTROLLED_SHUTDOWN -> clusterAcl, ApiKeys.CONTROLLED_SHUTDOWN -> clusterAcl,
ApiKeys.CREATE_TOPICS -> clusterCreateAcl, ApiKeys.CREATE_TOPICS -> topicCreateAcl,
ApiKeys.DELETE_TOPICS -> topicDeleteAcl, ApiKeys.DELETE_TOPICS -> topicDeleteAcl,
ApiKeys.DELETE_RECORDS -> topicDeleteAcl, ApiKeys.DELETE_RECORDS -> topicDeleteAcl,
ApiKeys.OFFSET_FOR_LEADER_EPOCH -> clusterAcl, ApiKeys.OFFSET_FOR_LEADER_EPOCH -> clusterAcl,
@ -492,6 +495,18 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
} }
} }
@Test
def testCreateTopicAuthorizationWithClusterCreate() {
removeAllAcls()
val resources = Set[ResourceType](Topic)
sendRequestAndVerifyResponseError(ApiKeys.CREATE_TOPICS, createTopicsRequest, resources, isAuthorized = false)
for ((resource, acls) <- clusterCreateAcl)
addAndVerifyAcls(acls, resource)
sendRequestAndVerifyResponseError(ApiKeys.CREATE_TOPICS, createTopicsRequest, resources, isAuthorized = true)
}
@Test @Test
def testFetchFollowerRequest() { def testFetchFollowerRequest() {
val key = ApiKeys.FETCH val key = ApiKeys.FETCH
@ -551,18 +566,30 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
} }
@Test @Test
def testCreatePermissionNeededForWritingToNonExistentTopic() { def testCreatePermissionOnTopicToWriteToNonExistentTopic() {
val newTopic = "newTopic" testCreatePermissionNeededToWriteToNonExistentTopic(Topic)
val topicPartition = new TopicPartition(newTopic, 0) }
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), new Resource(Topic, newTopic))
@Test
def testCreatePermissionOnClusterToWriteToNonExistentTopic() {
testCreatePermissionNeededToWriteToNonExistentTopic(Cluster)
}
private def testCreatePermissionNeededToWriteToNonExistentTopic(resType: ResourceType) {
val topicPartition = new TopicPartition(createTopic, 0)
val newTopicResource = new Resource(Topic, createTopic)
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), newTopicResource)
try { try {
sendRecords(numRecords, topicPartition) sendRecords(numRecords, topicPartition)
Assert.fail("should have thrown exception") Assert.fail("should have thrown exception")
} catch { } catch {
case e: TopicAuthorizationException => assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()) case e: TopicAuthorizationException =>
assertEquals(Collections.singleton(createTopic), e.unauthorizedTopics())
} }
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource) val resource = if (resType == Topic) newTopicResource else Resource.ClusterResource
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), resource)
sendRecords(numRecords, topicPartition) sendRecords(numRecords, topicPartition)
} }
@ -800,27 +827,37 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
} }
@Test @Test
def testCreatePermissionNeededToReadFromNonExistentTopic() { def testCreatePermissionOnTopicToReadFromNonExistentTopic() {
val newTopic = "newTopic" testCreatePermissionNeededToReadFromNonExistentTopic("newTopic",
Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)),
Topic)
}
@Test
def testCreatePermissionOnClusterToReadFromNonExistentTopic() {
testCreatePermissionNeededToReadFromNonExistentTopic("newTopic",
Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)),
Cluster)
}
private def testCreatePermissionNeededToReadFromNonExistentTopic(newTopic: String, acls: Set[Acl], resType: ResourceType) {
val topicPartition = new TopicPartition(newTopic, 0) val topicPartition = new TopicPartition(newTopic, 0)
val newTopicResource = new Resource(Topic, newTopic) val newTopicResource = new Resource(Topic, newTopic)
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), newTopicResource) addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), newTopicResource)
addAndVerifyAcls(groupReadAcl(groupResource), groupResource) addAndVerifyAcls(groupReadAcl(groupResource), groupResource)
addAndVerifyAcls(clusterAcl(Resource.ClusterResource), Resource.ClusterResource) this.consumers.head.assign(List(topicPartition).asJava)
try { val unauthorizedTopics = intercept[TopicAuthorizationException] {
this.consumers.head.assign(List(topicPartition).asJava) (0 until 10).foreach(_ => consumers.head.poll(Duration.ofMillis(50L)))
consumeRecords(this.consumers.head) }.unauthorizedTopics
Assert.fail("should have thrown exception") assertEquals(Collections.singleton(newTopic), unauthorizedTopics)
} catch {
case e: TopicAuthorizationException =>
assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics())
}
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), newTopicResource) val resource = if (resType == Topic) newTopicResource else Resource.ClusterResource
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource) addAndVerifyAcls(acls, resource)
sendRecords(numRecords, topicPartition) TestUtils.waitUntilTrue(() => {
consumeRecords(this.consumers.head, topic = newTopic, part = 0) this.consumers.head.poll(Duration.ofMillis(50L))
this.zkClient.topicExists(newTopic)
}, "Expected topic was not created")
} }
@Test(expected = classOf[AuthorizationException]) @Test(expected = classOf[AuthorizationException])

View File

@ -60,7 +60,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
override val serverCount = 3 override val serverCount = 3
override def configureSecurityBeforeServersStart() { override def configureSecurityBeforeServersStart() {
AclCommand.main(clusterAclArgs) AclCommand.main(clusterActionArgs)
AclCommand.main(topicBrokerReadAclArgs) AclCommand.main(topicBrokerReadAclArgs)
} }
@ -82,23 +82,20 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
val wildcardTopicResource = new Resource(Topic, wildcard) val wildcardTopicResource = new Resource(Topic, wildcard)
val wildcardGroupResource = new Resource(Group, wildcard) val wildcardGroupResource = new Resource(Group, wildcard)
// Arguments to AclCommand to set ACLs. There are three definitions here: // Arguments to AclCommand to set ACLs.
// 1- Provides read and write access to topic def clusterActionArgs: Array[String] = Array("--authorizer-properties",
// 2- Provides only write access to topic s"zookeeper.connect=$zkConnect",
// 3- Provides read access to consumer group s"--add",
def clusterAclArgs: Array[String] = Array("--authorizer-properties", s"--cluster",
s"zookeeper.connect=$zkConnect", s"--operation=ClusterAction",
s"--add", s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal")
s"--cluster",
s"--operation=ClusterAction",
s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal")
def topicBrokerReadAclArgs: Array[String] = Array("--authorizer-properties", def topicBrokerReadAclArgs: Array[String] = Array("--authorizer-properties",
s"zookeeper.connect=$zkConnect", s"zookeeper.connect=$zkConnect",
s"--add", s"--add",
s"--topic=$wildcard", s"--topic=$wildcard",
s"--operation=Read", s"--operation=Read",
s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal") s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal")
def produceAclArgs: Array[String] = Array("--authorizer-properties", def produceAclArgs(topic: String): Array[String] = Array("--authorizer-properties",
s"zookeeper.connect=$zkConnect", s"zookeeper.connect=$zkConnect",
s"--add", s"--add",
s"--topic=$topic", s"--topic=$topic",
@ -124,13 +121,13 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
s"--topic=$topic", s"--topic=$topic",
s"--operation=Write", s"--operation=Write",
s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
def consumeAclArgs: Array[String] = Array("--authorizer-properties", def consumeAclArgs(topic: String): Array[String] = Array("--authorizer-properties",
s"zookeeper.connect=$zkConnect", s"zookeeper.connect=$zkConnect",
s"--add", s"--add",
s"--topic=$topic", s"--topic=$topic",
s"--group=$group", s"--group=$group",
s"--consumer", s"--consumer",
s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
def groupAclArgs: Array[String] = Array("--authorizer-properties", def groupAclArgs: Array[String] = Array("--authorizer-properties",
s"zookeeper.connect=$zkConnect", s"zookeeper.connect=$zkConnect",
s"--add", s"--add",
@ -138,13 +135,13 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
s"--operation=Read", s"--operation=Read",
s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
def produceConsumeWildcardAclArgs: Array[String] = Array("--authorizer-properties", def produceConsumeWildcardAclArgs: Array[String] = Array("--authorizer-properties",
s"zookeeper.connect=$zkConnect", s"zookeeper.connect=$zkConnect",
s"--add", s"--add",
s"--topic=$wildcard", s"--topic=$wildcard",
s"--group=$wildcard", s"--group=$wildcard",
s"--consumer", s"--consumer",
s"--producer", s"--producer",
s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
def ClusterActionAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, ClusterAction)) def ClusterActionAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, ClusterAction))
def TopicBrokerReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, Read)) def TopicBrokerReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, Read))
@ -152,6 +149,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
def TopicReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Read)) def TopicReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Read))
def TopicWriteAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Write)) def TopicWriteAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Write))
def TopicDescribeAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Describe)) def TopicDescribeAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Describe))
def TopicCreateAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Create))
// The next two configuration parameters enable ZooKeeper secure ACLs // The next two configuration parameters enable ZooKeeper secure ACLs
// and sets the Kafka authorizer, both necessary to enable security. // and sets the Kafka authorizer, both necessary to enable security.
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
@ -160,6 +158,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3")
this.serverConfig.setProperty(KafkaConfig.MinInSyncReplicasProp, "3") this.serverConfig.setProperty(KafkaConfig.MinInSyncReplicasProp, "3")
this.serverConfig.setProperty(KafkaConfig.DefaultReplicationFactorProp, "3")
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group") this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group")
/** /**
@ -200,14 +199,14 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
*/ */
@Test @Test
def testProduceConsumeViaAssign(): Unit = { def testProduceConsumeViaAssign(): Unit = {
setAclsAndProduce() setAclsAndProduce(tp)
consumers.head.assign(List(tp).asJava) consumers.head.assign(List(tp).asJava)
consumeRecords(this.consumers.head, numRecords) consumeRecords(this.consumers.head, numRecords)
} }
@Test @Test
def testProduceConsumeViaSubscribe(): Unit = { def testProduceConsumeViaSubscribe(): Unit = {
setAclsAndProduce() setAclsAndProduce(tp)
consumers.head.subscribe(List(topic).asJava) consumers.head.subscribe(List(topic).asJava)
consumeRecords(this.consumers.head, numRecords) consumeRecords(this.consumers.head, numRecords)
} }
@ -223,16 +222,25 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
private def setWildcardResourceAcls() { private def setWildcardResourceAcls() {
AclCommand.main(produceConsumeWildcardAclArgs) AclCommand.main(produceConsumeWildcardAclArgs)
servers.foreach { s => servers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicBrokerReadAcl, s.apis.authorizer.get, wildcardTopicResource) TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, s.apis.authorizer.get, wildcardTopicResource)
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, wildcardGroupResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, wildcardGroupResource)
} }
} }
protected def setAclsAndProduce() { @Test
AclCommand.main(produceAclArgs) def testProduceConsumeTopicAutoCreateTopicCreateAcl(): Unit = {
AclCommand.main(consumeAclArgs) // topic2 is not created on setup()
val tp2 = new TopicPartition("topic2", 0)
setAclsAndProduce(tp2)
consumers.head.assign(List(tp2).asJava)
consumeRecords(this.consumers.head, numRecords, topic = tp2.topic)
}
protected def setAclsAndProduce(tp: TopicPartition) {
AclCommand.main(produceAclArgs(tp.topic))
AclCommand.main(consumeAclArgs(tp.topic))
servers.foreach { s => servers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, new Resource(Topic, tp.topic))
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
} }
sendRecords(numRecords, tp) sendRecords(numRecords, tp)
@ -283,10 +291,10 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
} }
private def noConsumeWithoutDescribeAclSetup(): Unit = { private def noConsumeWithoutDescribeAclSetup(): Unit = {
AclCommand.main(produceAclArgs) AclCommand.main(produceAclArgs(tp.topic))
AclCommand.main(groupAclArgs) AclCommand.main(groupAclArgs)
servers.foreach { s => servers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource)
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
} }
@ -328,10 +336,10 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
} }
private def noConsumeWithDescribeAclSetup(): Unit = { private def noConsumeWithDescribeAclSetup(): Unit = {
AclCommand.main(produceAclArgs) AclCommand.main(produceAclArgs(tp.topic))
AclCommand.main(groupAclArgs) AclCommand.main(groupAclArgs)
servers.foreach { s => servers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource)
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
} }
sendRecords(numRecords, tp) sendRecords(numRecords, tp)
@ -343,9 +351,9 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
*/ */
@Test @Test
def testNoGroupAcl(): Unit = { def testNoGroupAcl(): Unit = {
AclCommand.main(produceAclArgs) AclCommand.main(produceAclArgs(tp.topic))
servers.foreach { s => servers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource)
} }
sendRecords(numRecords, tp) sendRecords(numRecords, tp)
consumers.head.assign(List(tp).asJava) consumers.head.assign(List(tp).asJava)

View File

@ -56,7 +56,7 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
*/ */
@Test(timeout = 15000) @Test(timeout = 15000)
def testTwoConsumersWithDifferentSaslCredentials(): Unit = { def testTwoConsumersWithDifferentSaslCredentials(): Unit = {
setAclsAndProduce() setAclsAndProduce(tp)
val consumer1 = consumers.head val consumer1 = consumers.head
val consumer2Config = new Properties val consumer2Config = new Properties

View File

@ -49,8 +49,8 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
) )
private val ResourceToOperations = Map[Set[Resource], (Set[Operation], Array[String])]( private val ResourceToOperations = Map[Set[Resource], (Set[Operation], Array[String])](
TopicResources -> (Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs), TopicResources -> (Set(Read, Write, Create, Describe, Delete, DescribeConfigs, AlterConfigs),
Array("--operation", "Read" , "--operation", "Write", "--operation", "Describe", "--operation", "Delete", Array("--operation", "Read" , "--operation", "Write", "--operation", "Create", "--operation", "Describe", "--operation", "Delete",
"--operation", "DescribeConfigs", "--operation", "AlterConfigs")), "--operation", "DescribeConfigs", "--operation", "AlterConfigs")),
Set(Resource.ClusterResource) -> (Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite), Set(Resource.ClusterResource) -> (Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite),
Array("--operation", "Create", "--operation", "ClusterAction", "--operation", "DescribeConfigs", Array("--operation", "Create", "--operation", "ClusterAction", "--operation", "DescribeConfigs",
@ -61,10 +61,10 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
) )
private def ProducerResourceToAcls(enableIdempotence: Boolean = false) = Map[Set[Resource], Set[Acl]]( private def ProducerResourceToAcls(enableIdempotence: Boolean = false) = Map[Set[Resource], Set[Acl]](
TopicResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe), Hosts), TopicResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe, Create), Hosts),
TransactionalIdResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe), Hosts), TransactionalIdResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe), Hosts),
Set(Resource.ClusterResource) -> AclCommand.getAcls(Users, Allow, Set(Some(Create), Set(Resource.ClusterResource) -> AclCommand.getAcls(Users, Allow,
if (enableIdempotence) Some(IdempotentWrite) else None).flatten, Hosts) Set(if (enableIdempotence) Some(IdempotentWrite) else None).flatten, Hosts)
) )
private val ConsumerResourceToAcls = Map[Set[Resource], Set[Acl]]( private val ConsumerResourceToAcls = Map[Set[Resource], Set[Acl]](

View File

@ -1133,7 +1133,7 @@
<tr> <tr>
<td>--producer</td> <td>--producer</td>
<td> Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE, <td> Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE,
DESCRIBE on topic and CREATE on cluster.</td> DESCRIBE and CREATE on topic.</td>
<td></td> <td></td>
<td>Convenience</td> <td>Convenience</td>
</tr> </tr>

View File

@ -98,13 +98,10 @@
will be removed in a future version.</li> will be removed in a future version.</li>
<li>The internal method <code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. Users are encouraged to migrate to <code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li> <li>The internal method <code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. Users are encouraged to migrate to <code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li>
<li>The tool kafka.tools.ReplayLogProducer has been removed.</li> <li>The tool kafka.tools.ReplayLogProducer has been removed.</li>
<li><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-176%3A+Remove+deprecated+new-consumer+option+for+tools">KIP-176</a> finally removes <li>The AclCommand tool <code>--producer</code> convenience option uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-277+-+Fine+Grained+ACL+for+CreateTopics+API">KIP-277</a> finer grained ACL on the given topic. </li>
the <code>--new-consumer</code> option for all consumer based tools as <code>kafka-console-consumer</code>, <code>kafka-consumer-perf-test</code> <li><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-176%3A+Remove+deprecated+new-consumer+option+for+tools">KIP-176</a> removes
and <code>kafka-consumer-groups</code>. the <code>--new-consumer</code> option for all consumer based tools. This option is redundant since the new consumer is automatically
The new consumer is automatically used if the bootstrap servers list is provided on the command line used if --bootstrap-server is defined.
otherwise, when the zookeeper connection is provided, the old consumer is used.
The <code>--new-consumer</code> option had already been ignored as the way of selecting the consumer since Kafka 1.0.0,
this KIP just removes the option.
</li> </li>
</ul> </ul>