KAFKA-5638; Improve the Required ACL of ListGroups API (KIP-231) (#5352)

Reviewers: Manikumar Reddy O <manikumar.reddy@gmail.com>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
Vahid Hashemian 2018-08-12 23:59:31 -07:00 committed by Jason Gustafson
parent 69283b4038
commit e6fd99dc62
3 changed files with 54 additions and 11 deletions

View File

@ -53,6 +53,7 @@ import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.PatternType.LITERAL
import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.{Time, Utils}
@ -63,7 +64,6 @@ import scala.collection._
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
import org.apache.kafka.common.resource.ResourcePattern
/**
* Logic to handle the various Kafka requests
@ -1238,14 +1238,15 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleListGroupsRequest(request: RequestChannel.Request) {
if (!authorize(request.session, Describe, Resource.ClusterResource)) {
val (error, groups) = groupCoordinator.handleListGroups()
if (authorize(request.session, Describe, Resource.ClusterResource))
// With describe cluster access all groups are returned. We keep this alternative for backward compatibility.
sendResponseMaybeThrottle(request, requestThrottleMs =>
request.body[ListGroupsRequest].getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
} else {
val (error, groups) = groupCoordinator.handleListGroups()
val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
new ListGroupsResponse(requestThrottleMs, error, groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }.asJava))
else {
val filteredGroups = groups.filter(group => authorize(request.session, Describe, new Resource(Group, group.groupId, LITERAL)))
sendResponseMaybeThrottle(request, requestThrottleMs =>
new ListGroupsResponse(requestThrottleMs, error, allGroups.asJava))
new ListGroupsResponse(requestThrottleMs, error, filteredGroups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }.asJava))
}
}

View File

@ -1027,6 +1027,48 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
consumerGroupService.close()
}
@Test
def testListGroupApiWithAndWithoutListGroupAcls() {
// write some record to the topic
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
// use two consumers to write to two different groups
val group2 = "other group"
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), Resource(Group, group2, LITERAL))
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
this.consumers.head.subscribe(Collections.singleton(topic))
consumeRecords(this.consumers.head)
val otherConsumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group2, securityProtocol = SecurityProtocol.PLAINTEXT)
otherConsumer.subscribe(Collections.singleton(topic))
consumeRecords(otherConsumer)
val adminClient = createAdminClient()
// first use cluster describe permission
removeAllAcls()
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), Resource.ClusterResource)
// it should list both groups (due to cluster describe permission)
assertEquals(Set(group, group2), adminClient.listConsumerGroups().all().get().asScala.map(_.groupId()).toSet)
// now replace cluster describe with group read permission
removeAllAcls()
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
// it should list only one group now
val groupList = adminClient.listConsumerGroups().all().get().asScala.toList
assertEquals(1, groupList.length)
assertEquals(group, groupList.head.groupId)
// now remove all acls and verify describe group access is required to list any group
removeAllAcls()
val listGroupResult = adminClient.listConsumerGroups()
assertEquals(List(), listGroupResult.errors().get().asScala.toList)
assertEquals(List(), listGroupResult.all().get().asScala.toList)
otherConsumer.close()
}
@Test
def testDeleteGroupApiWithDeleteGroupAcl() {
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)

View File

@ -33,12 +33,12 @@
<li>The default value for the producer's <code>retries</code> config was changed to <code>Integer.MAX_VALUE</code>, as we introduced <code>delivery.timeout.ms</code>
in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer">KIP-91</a>,
which sets an upper bound on the total time between sending a record and receiving acknowledgement from the broker. By default,
the delivery timeout is set to 2 minutes.
</li>
the delivery timeout is set to 2 minutes.</li>
<li>By default, MirrorMaker now overrides <code>delivery.timeout.ms</code> to <code>Integer.MAX_VALUE</code> when
configuring the producer. If you have overridden the value of <code>retries</code> in order to fail faster,
you will instead need to override <code>delivery.timeout.ms</code>.
</li>
you will instead need to override <code>delivery.timeout.ms</code>.</li>
<li>The <code>ListGroup</code> API now expects, as a recommended alternative, <code>Describe Group</code> access to the groups a user should be able to list.
Even though the old <code>Describe Cluster</code> access is still supported for backward compatibility, using it for this API is not advised.</li>
</ol>