From e6fd99dc6253333c5aaa9931d8ae319e637e9a11 Mon Sep 17 00:00:00 2001 From: Vahid Hashemian Date: Sun, 12 Aug 2018 23:59:31 -0700 Subject: [PATCH] KAFKA-5638; Improve the Required ACL of ListGroups API (KIP-231) (#5352) Reviewers: Manikumar Reddy O , Jason Gustafson --- .../main/scala/kafka/server/KafkaApis.scala | 15 +++---- .../kafka/api/AuthorizerIntegrationTest.scala | 42 +++++++++++++++++++ docs/upgrade.html | 8 ++-- 3 files changed, 54 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index de9e0bf66ef..0273e3da557 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -53,6 +53,7 @@ import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests._ import org.apache.kafka.common.resource.PatternType.LITERAL +import org.apache.kafka.common.resource.ResourcePattern import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation} import org.apache.kafka.common.utils.{Time, Utils} @@ -63,7 +64,6 @@ import scala.collection._ import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success, Try} import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails -import org.apache.kafka.common.resource.ResourcePattern /** * Logic to handle the various Kafka requests @@ -1238,14 +1238,15 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleListGroupsRequest(request: RequestChannel.Request) { - if (!authorize(request.session, Describe, Resource.ClusterResource)) { + val (error, groups) = groupCoordinator.handleListGroups() + if (authorize(request.session, Describe, Resource.ClusterResource)) + // With describe cluster access all groups are returned. We keep this alternative for backward compatibility. sendResponseMaybeThrottle(request, requestThrottleMs => - request.body[ListGroupsRequest].getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) - } else { - val (error, groups) = groupCoordinator.handleListGroups() - val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) } + new ListGroupsResponse(requestThrottleMs, error, groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }.asJava)) + else { + val filteredGroups = groups.filter(group => authorize(request.session, Describe, new Resource(Group, group.groupId, LITERAL))) sendResponseMaybeThrottle(request, requestThrottleMs => - new ListGroupsResponse(requestThrottleMs, error, allGroups.asJava)) + new ListGroupsResponse(requestThrottleMs, error, filteredGroups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }.asJava)) } } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index dc9ca8568ea..faad071b909 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -1027,6 +1027,48 @@ class AuthorizerIntegrationTest extends BaseRequestTest { consumerGroupService.close() } + @Test + def testListGroupApiWithAndWithoutListGroupAcls() { + // write some record to the topic + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + + // use two consumers to write to two different groups + val group2 = "other group" + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), Resource(Group, group2, LITERAL)) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource) + this.consumers.head.subscribe(Collections.singleton(topic)) + consumeRecords(this.consumers.head) + + val otherConsumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group2, securityProtocol = SecurityProtocol.PLAINTEXT) + otherConsumer.subscribe(Collections.singleton(topic)) + consumeRecords(otherConsumer) + + val adminClient = createAdminClient() + + // first use cluster describe permission + removeAllAcls() + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), Resource.ClusterResource) + // it should list both groups (due to cluster describe permission) + assertEquals(Set(group, group2), adminClient.listConsumerGroups().all().get().asScala.map(_.groupId()).toSet) + + // now replace cluster describe with group read permission + removeAllAcls() + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) + // it should list only one group now + val groupList = adminClient.listConsumerGroups().all().get().asScala.toList + assertEquals(1, groupList.length) + assertEquals(group, groupList.head.groupId) + + // now remove all acls and verify describe group access is required to list any group + removeAllAcls() + val listGroupResult = adminClient.listConsumerGroups() + assertEquals(List(), listGroupResult.errors().get().asScala.toList) + assertEquals(List(), listGroupResult.all().get().asScala.toList) + otherConsumer.close() + } + @Test def testDeleteGroupApiWithDeleteGroupAcl() { addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) diff --git a/docs/upgrade.html b/docs/upgrade.html index 264e26d6b02..979190db7b7 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -33,12 +33,12 @@
  • The default value for the producer's retries config was changed to Integer.MAX_VALUE, as we introduced delivery.timeout.ms in KIP-91, which sets an upper bound on the total time between sending a record and receiving acknowledgement from the broker. By default, - the delivery timeout is set to 2 minutes. -
  • + the delivery timeout is set to 2 minutes.
  • By default, MirrorMaker now overrides delivery.timeout.ms to Integer.MAX_VALUE when configuring the producer. If you have overridden the value of retries in order to fail faster, - you will instead need to override delivery.timeout.ms. -
  • + you will instead need to override delivery.timeout.ms. +
  • The ListGroup API now expects, as a recommended alternative, Describe Group access to the groups a user should be able to list. + Even though the old Describe Cluster access is still supported for backward compatibility, using it for this API is not advised.