KAFKA-14772; Add ConsumerGroupHeartbeat and ConsumerGroupDescribe API to AuthorizerIntegrationTest (#17044)

The patch adds ConsumerGroupHeartbeat and ConsumerGroupDescribe API to AuthorizerIntegrationTest.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
DL1231 2024-09-10 15:17:31 +08:00 committed by GitHub
parent d55d0e1a8f
commit f629b14890
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 84 additions and 5 deletions

View File

@ -40,7 +40,7 @@ import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartit
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic, OffsetForLeaderTopicCollection}
import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState}
import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState}
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ControlledShutdownRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, DescribeProducersRequestData, DescribeTransactionsRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, ListTransactionsRequestData, MetadataRequestData, OffsetCommitRequestData, ProduceRequestData, SyncGroupRequestData, WriteTxnMarkersRequestData}
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, ControlledShutdownRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, DescribeProducersRequestData, DescribeTransactionsRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, ListTransactionsRequestData, MetadataRequestData, OffsetCommitRequestData, ProduceRequestData, SyncGroupRequestData, WriteTxnMarkersRequestData}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, SimpleRecord}
@ -203,7 +203,10 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
.transactionStates.asScala.find(_.transactionalId == transactionalId).get
.errorCode
)
})
}),
ApiKeys.CONSUMER_GROUP_HEARTBEAT -> ((resp: ConsumerGroupHeartbeatResponse) => Errors.forCode(resp.data.errorCode)),
ApiKeys.CONSUMER_GROUP_DESCRIBE -> ((resp: ConsumerGroupDescribeResponse) =>
Errors.forCode(resp.data.groups.asScala.find(g => group == g.groupId).head.errorCode))
)
def findErrorForTopicId(id: Uuid, response: AbstractResponse): Errors = {
@ -257,7 +260,9 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
ApiKeys.LIST_PARTITION_REASSIGNMENTS -> clusterDescribeAcl,
ApiKeys.OFFSET_DELETE -> groupReadAcl,
ApiKeys.DESCRIBE_PRODUCERS -> topicReadAcl,
ApiKeys.DESCRIBE_TRANSACTIONS -> transactionalIdDescribeAcl
ApiKeys.DESCRIBE_TRANSACTIONS -> transactionalIdDescribeAcl,
ApiKeys.CONSUMER_GROUP_HEARTBEAT -> groupReadAcl,
ApiKeys.CONSUMER_GROUP_DESCRIBE -> groupDescribeAcl
)
private def createMetadataRequest(allowAutoTopicCreation: Boolean) = {
@ -666,6 +671,16 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
)
).build()
private def consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(group)
.setMemberEpoch(0)).build()
private def consumerGroupDescribeRequest = new ConsumerGroupDescribeRequest.Builder(
new ConsumerGroupDescribeRequestData()
.setGroupIds(List(group).asJava)
.setIncludeAuthorizedOperations(false)).build()
private def sendRequests(requestKeyToRequest: mutable.Map[ApiKeys, AbstractRequest], topicExists: Boolean = true,
topicNames: Map[Uuid, String] = getTopicNames()) = {
for ((key, request) <- requestKeyToRequest) {
@ -734,7 +749,10 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
ApiKeys.DESCRIBE_TRANSACTIONS -> describeTransactionsRequest,
ApiKeys.WRITE_TXN_MARKERS -> writeTxnMarkersRequest,
)
if (!isKRaftTest()) {
if (isKRaftTest()) {
requestKeyToRequest += ApiKeys.CONSUMER_GROUP_HEARTBEAT -> consumerGroupHeartbeatRequest
requestKeyToRequest += ApiKeys.CONSUMER_GROUP_DESCRIBE -> consumerGroupDescribeRequest
} else {
// Inter-broker APIs use an invalid broker epoch, so does not affect the test case
requestKeyToRequest += ApiKeys.UPDATE_METADATA -> createUpdateMetadataRequest
requestKeyToRequest += ApiKeys.LEADER_AND_ISR -> leaderAndIsrRequest
@ -2499,7 +2517,6 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testCreateAndCloseConsumerWithNoAccess(quorum: String): Unit = {
@ -2509,6 +2526,68 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
assertDoesNotThrow(closeConsumer, "Exception not expected on closing consumer")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testConsumerGroupHeartbeatWithReadAcl(quorum: String): Unit = {
addAndVerifyAcls(groupReadAcl(groupResource), groupResource)
val request = consumerGroupHeartbeatRequest
val resource = Set[ResourceType](GROUP)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testConsumerGroupHeartbeatWithOperationAll(quorum: String): Unit = {
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)
val request = consumerGroupHeartbeatRequest
val resource = Set[ResourceType](GROUP)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testConsumerGroupHeartbeatWithoutReadAcl(quorum: String): Unit = {
removeAllClientAcls()
val request = consumerGroupHeartbeatRequest
val resource = Set[ResourceType](GROUP)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testConsumerGroupDescribeWithDescribeAcl(quorum: String): Unit = {
addAndVerifyAcls(groupDescribeAcl(groupResource), groupResource)
val request = consumerGroupDescribeRequest
val resource = Set[ResourceType](GROUP)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testConsumerGroupDescribeWithOperationAll(quorum: String): Unit = {
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)
val request = consumerGroupDescribeRequest
val resource = Set[ResourceType](GROUP)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testConsumerGroupDescribeWithoutDescribeAcl(quorum: String): Unit = {
removeAllClientAcls()
val request = consumerGroupDescribeRequest
val resource = Set[ResourceType](GROUP)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
private def testDescribeClusterClusterAuthorizedOperations(
version: Short,
expectedClusterAuthorizedOperations: Int