KAFKA-19169: Enhance AuthorizerIntegrationTest for share group APIs (#19540)
CI / build (push) Waiting to run Details

Enhance AuthorizerIntegrationTest for share group APIs

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Lan Ding 2025-05-01 17:13:43 +08:00 committed by GitHub
parent 44d2741b4c
commit e3c456ff0f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 644 additions and 6 deletions

View File

@ -80,11 +80,13 @@ class AbstractAuthorizerIntegrationTest extends BaseRequestTest {
val tp = new TopicPartition(topic, part)
val logDir = "logDir"
val group = "my-group"
val shareGroup = "share-group"
val protocolType = "consumer"
val protocolName = "consumer-range"
val clusterResource = new ResourcePattern(CLUSTER, Resource.CLUSTER_NAME, LITERAL)
val topicResource = new ResourcePattern(TOPIC, topic, LITERAL)
val groupResource = new ResourcePattern(GROUP, group, LITERAL)
val shareGroupResource = new ResourcePattern(GROUP, shareGroup, LITERAL)
val transactionalIdResource = new ResourcePattern(TRANSACTIONAL_ID, transactionalId, LITERAL)
producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "1")

View File

@ -37,7 +37,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic, OffsetForLeaderTopicCollection}
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, DescribeProducersRequestData, DescribeTransactionsRequestData, FetchResponseData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, ListTransactionsRequestData, MetadataRequestData, OffsetCommitRequestData, OffsetFetchRequestData, ProduceRequestData, SyncGroupRequestData, WriteTxnMarkersRequestData}
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteShareGroupOffsetsRequestData, DeleteShareGroupStateRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, DescribeProducersRequestData, DescribeShareGroupOffsetsRequestData, DescribeTransactionsRequestData, FetchResponseData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, InitializeShareGroupStateRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, ListTransactionsRequestData, MetadataRequestData, OffsetCommitRequestData, OffsetFetchRequestData, ProduceRequestData, ReadShareGroupStateRequestData, ReadShareGroupStateSummaryRequestData, ShareAcknowledgeRequestData, ShareFetchRequestData, ShareGroupDescribeRequestData, ShareGroupHeartbeatRequestData, SyncGroupRequestData, WriteShareGroupStateRequestData, WriteTxnMarkersRequestData}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, SimpleRecord}
@ -48,7 +48,7 @@ import org.apache.kafka.common.resource.ResourceType._
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourcePatternFilter, ResourceType}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{ElectionType, IsolationLevel, KafkaException, TopicPartition, Uuid, requests}
import org.apache.kafka.common.{ElectionType, IsolationLevel, KafkaException, TopicIdPartition, TopicPartition, Uuid, requests}
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
@ -73,6 +73,11 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
val groupDeleteAcl = Map(groupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DELETE, ALLOW)))
val groupDescribeConfigsAcl = Map(groupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE_CONFIGS, ALLOW)))
val groupAlterConfigsAcl = Map(groupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER_CONFIGS, ALLOW)))
val shareGroupReadAcl = Map(shareGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)))
val shareGroupDescribeAcl = Map(shareGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE, ALLOW)))
val shareGroupDeleteAcl = Map(shareGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DELETE, ALLOW)))
val shareGroupDescribeConfigsAcl = Map(shareGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE_CONFIGS, ALLOW)))
val shareGroupAlterConfigsAcl = Map(shareGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER_CONFIGS, ALLOW)))
val clusterAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CLUSTER_ACTION, ALLOW)))
val clusterCreateAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CREATE, ALLOW)))
val clusterAlterAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER, ALLOW)))
@ -199,7 +204,26 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}),
ApiKeys.CONSUMER_GROUP_HEARTBEAT -> ((resp: ConsumerGroupHeartbeatResponse) => Errors.forCode(resp.data.errorCode)),
ApiKeys.CONSUMER_GROUP_DESCRIBE -> ((resp: ConsumerGroupDescribeResponse) =>
Errors.forCode(resp.data.groups.asScala.find(g => group == g.groupId).head.errorCode))
Errors.forCode(resp.data.groups.asScala.find(g => group == g.groupId).head.errorCode)),
ApiKeys.SHARE_GROUP_HEARTBEAT -> ((resp: ShareGroupHeartbeatResponse) => Errors.forCode(resp.data.errorCode)),
ApiKeys.SHARE_GROUP_DESCRIBE -> ((resp: ShareGroupDescribeResponse) =>
Errors.forCode(resp.data.groups.asScala.find(g => shareGroup == g.groupId).head.errorCode)),
ApiKeys.SHARE_FETCH -> ((resp: ShareFetchResponse) => Errors.forCode(resp.data.errorCode)),
ApiKeys.SHARE_ACKNOWLEDGE -> ((resp: ShareAcknowledgeResponse) => Errors.forCode(resp.data.errorCode)),
ApiKeys.INITIALIZE_SHARE_GROUP_STATE -> ((resp: InitializeShareGroupStateResponse) => Errors.forCode(
resp.data.results.get(0).partitions.get(0).errorCode)),
ApiKeys.READ_SHARE_GROUP_STATE -> ((resp: ReadShareGroupStateResponse) => Errors.forCode(
resp.data.results.get(0).partitions.get(0).errorCode)),
ApiKeys.WRITE_SHARE_GROUP_STATE -> ((resp: WriteShareGroupStateResponse) => Errors.forCode(
resp.data.results.get(0).partitions.get(0).errorCode)),
ApiKeys.DELETE_SHARE_GROUP_STATE -> ((resp: DeleteShareGroupStateResponse) => Errors.forCode(
resp.data.results.get(0).partitions.get(0).errorCode)),
ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY -> ((resp: ReadShareGroupStateSummaryResponse) => Errors.forCode(
resp.data.results.get(0).partitions.get(0).errorCode)),
ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> ((resp: DescribeShareGroupOffsetsResponse) => Errors.forCode(
resp.data.groups.asScala.find(g => shareGroup == g.groupId).head.errorCode)),
ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> ((resp: DeleteShareGroupOffsetsResponse) => Errors.forCode(
resp.data.errorCode))
)
def findErrorForTopicId(id: Uuid, response: AbstractResponse): Errors = {
@ -255,7 +279,18 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
ApiKeys.DESCRIBE_PRODUCERS -> topicReadAcl,
ApiKeys.DESCRIBE_TRANSACTIONS -> transactionalIdDescribeAcl,
ApiKeys.CONSUMER_GROUP_HEARTBEAT -> groupReadAcl,
ApiKeys.CONSUMER_GROUP_DESCRIBE -> groupDescribeAcl
ApiKeys.CONSUMER_GROUP_DESCRIBE -> groupDescribeAcl,
ApiKeys.SHARE_GROUP_HEARTBEAT -> (shareGroupReadAcl ++ topicDescribeAcl),
ApiKeys.SHARE_GROUP_DESCRIBE -> (shareGroupDescribeAcl ++ topicDescribeAcl),
ApiKeys.SHARE_FETCH -> (shareGroupReadAcl ++ topicReadAcl),
ApiKeys.SHARE_ACKNOWLEDGE -> (shareGroupReadAcl ++ topicReadAcl),
ApiKeys.INITIALIZE_SHARE_GROUP_STATE -> clusterAcl,
ApiKeys.READ_SHARE_GROUP_STATE -> clusterAcl,
ApiKeys.WRITE_SHARE_GROUP_STATE -> clusterAcl,
ApiKeys.DELETE_SHARE_GROUP_STATE -> clusterAcl,
ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY -> clusterAcl,
ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> (shareGroupDescribeAcl ++ topicDescribeAcl),
ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> (shareGroupDeleteAcl ++ topicReadAcl)
)
private def createMetadataRequest(allowAutoTopicCreation: Boolean) = {
@ -655,6 +690,120 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
.setGroupIds(List(group).asJava)
.setIncludeAuthorizedOperations(false)).build()
private def shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
new ShareGroupHeartbeatRequestData()
.setGroupId(shareGroup)
.setMemberEpoch(0)
.setSubscribedTopicNames(List(topic).asJava)).build(ApiKeys.SHARE_GROUP_HEARTBEAT.latestVersion)
private def shareGroupDescribeRequest = new ShareGroupDescribeRequest.Builder(
new ShareGroupDescribeRequestData()
.setGroupIds(List(shareGroup).asJava)
.setIncludeAuthorizedOperations(false)).build(ApiKeys.SHARE_GROUP_DESCRIBE.latestVersion)
private def createShareFetchRequest = {
val metadata: ShareRequestMetadata = new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH)
val send: Seq[TopicIdPartition] = Seq(
new TopicIdPartition(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID), new TopicPartition(topic, part)))
val ackMap = new util.HashMap[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]]
requests.ShareFetchRequest.Builder.forConsumer(shareGroup, metadata, 100, 0, Int.MaxValue, 500, 500,
send.asJava, Seq.empty.asJava, ackMap).build()
}
private def shareAcknowledgeRequest = {
val shareAcknowledgeRequestData = new ShareAcknowledgeRequestData()
.setGroupId(shareGroup)
.setMemberId(Uuid.randomUuid().toString)
.setShareSessionEpoch(1)
.setTopics(List(new ShareAcknowledgeRequestData.AcknowledgeTopic()
.setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID))
.setPartitions(List(
new ShareAcknowledgeRequestData.AcknowledgePartition()
.setPartitionIndex(part)
.setAcknowledgementBatches(List(
new ShareAcknowledgeRequestData.AcknowledgementBatch()
.setFirstOffset(0)
.setLastOffset(1)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))
).asJava)
).asJava)
).asJava)
new ShareAcknowledgeRequest.Builder(shareAcknowledgeRequestData).build(ApiKeys.SHARE_ACKNOWLEDGE.latestVersion)
}
private def initializeShareGroupStateRequest = new InitializeShareGroupStateRequest.Builder(
new InitializeShareGroupStateRequestData()
.setGroupId(shareGroup)
.setTopics(List(new InitializeShareGroupStateRequestData.InitializeStateData()
.setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID))
.setPartitions(List(new InitializeShareGroupStateRequestData.PartitionData()
.setPartition(part)
).asJava)
).asJava)).build()
private def readShareGroupStateRequest = new ReadShareGroupStateRequest.Builder(
new ReadShareGroupStateRequestData()
.setGroupId(shareGroup)
.setTopics(List(new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID))
.setPartitions(List(new ReadShareGroupStateRequestData.PartitionData()
.setPartition(part)
.setLeaderEpoch(0)
).asJava)
).asJava)).build()
private def writeShareGroupStateRequest = new WriteShareGroupStateRequest.Builder(
new WriteShareGroupStateRequestData()
.setGroupId(shareGroup)
.setTopics(List(new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID))
.setPartitions(List(new WriteShareGroupStateRequestData.PartitionData()
.setPartition(part)
).asJava)
).asJava)).build()
private def deleteShareGroupStateRequest = new DeleteShareGroupStateRequest.Builder(
new DeleteShareGroupStateRequestData()
.setGroupId(shareGroup)
.setTopics(List(new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID))
.setPartitions(List(new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(part)
).asJava)
).asJava)).build()
private def readShareGroupStateSummaryRequest = new ReadShareGroupStateSummaryRequest.Builder(
new ReadShareGroupStateSummaryRequestData()
.setGroupId(shareGroup)
.setTopics(List(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID))
.setPartitions(List(new ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartition(part)
.setLeaderEpoch(0)
).asJava)
).asJava)).build(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY.latestVersion)
private def describeShareGroupOffsetsRequest = new DescribeShareGroupOffsetsRequest.Builder(
new DescribeShareGroupOffsetsRequestData()
.setGroups(List(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
.setGroupId(shareGroup)
.setTopics(List(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
.setTopicName(topic)
.setPartitions(List(Integer.valueOf(part)
).asJava)
).asJava)
).asJava)).build(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS.latestVersion)
private def deleteShareGroupOffsetsRequest = new DeleteShareGroupOffsetsRequest.Builder(
new DeleteShareGroupOffsetsRequestData()
.setGroupId(shareGroup)
.setTopics(List(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topic)
).asJava)).build(ApiKeys.DELETE_SHARE_GROUP_OFFSETS.latestVersion)
private def sendRequests(requestKeyToRequest: mutable.Map[ApiKeys, AbstractRequest], topicExists: Boolean = true,
topicNames: Map[Uuid, String] = getTopicNames()) = {
for ((key, request) <- requestKeyToRequest) {
@ -669,6 +818,8 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
// In KRaft mode, trying to delete a topic that doesn't exist but that you do have
// describe permission for will give UNKNOWN_TOPIC_OR_PARTITION.
true
} else if (resourceToAcls.size > 1) {
false
} else {
describeAcls == acls
}
@ -684,7 +835,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
@ValueSource(strings = Array("kip932"))
def testAuthorizationWithTopicExisting(quorum: String): Unit = {
//First create the topic so we have a valid topic ID
sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest))
@ -723,6 +874,18 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
ApiKeys.WRITE_TXN_MARKERS -> writeTxnMarkersRequest,
ApiKeys.CONSUMER_GROUP_HEARTBEAT -> consumerGroupHeartbeatRequest,
ApiKeys.CONSUMER_GROUP_DESCRIBE -> consumerGroupDescribeRequest,
ApiKeys.SHARE_GROUP_HEARTBEAT -> shareGroupHeartbeatRequest,
ApiKeys.SHARE_GROUP_DESCRIBE -> shareGroupDescribeRequest,
ApiKeys.SHARE_FETCH -> createShareFetchRequest,
ApiKeys.SHARE_ACKNOWLEDGE -> shareAcknowledgeRequest,
ApiKeys.INITIALIZE_SHARE_GROUP_STATE -> initializeShareGroupStateRequest,
ApiKeys.READ_SHARE_GROUP_STATE -> readShareGroupStateRequest,
ApiKeys.WRITE_SHARE_GROUP_STATE -> writeShareGroupStateRequest,
ApiKeys.DELETE_SHARE_GROUP_STATE -> deleteShareGroupStateRequest,
ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY -> readShareGroupStateSummaryRequest,
ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> describeShareGroupOffsetsRequest,
ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> deleteShareGroupOffsetsRequest,
// Delete the topic last
ApiKeys.DELETE_TOPICS -> deleteTopicsRequest
)
@ -752,7 +915,10 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest,
ApiKeys.DELETE_GROUPS -> deleteGroupsRequest,
ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest,
ApiKeys.ELECT_LEADERS -> electLeadersRequest
ApiKeys.ELECT_LEADERS -> electLeadersRequest,
ApiKeys.SHARE_FETCH -> createShareFetchRequest,
ApiKeys.SHARE_ACKNOWLEDGE -> shareAcknowledgeRequest,
ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> describeShareGroupOffsetsRequest
)
sendRequests(requestKeyToRequest, false, topicNames)
@ -2653,6 +2819,476 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
sendAndReceiveRegexHeartbeat(member1Response, interBrokerListenerName, Some(0), fullRequest = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testShareGroupHeartbeatWithGroupReadAndTopicDescribeAcl(quorum: String): Unit = {
addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource)
addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
val request = shareGroupHeartbeatRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testShareGroupHeartbeatWithOperationAll(quorum: String): Unit = {
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource)
addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
val request = shareGroupHeartbeatRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testShareGroupHeartbeatWithoutGroupReadOrTopicDescribeAcl(quorum: String): Unit = {
removeAllClientAcls()
val request = shareGroupHeartbeatRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testShareGroupHeartbeatWithoutGroupReadAcl(quorum: String): Unit = {
addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
val request = shareGroupHeartbeatRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testShareGroupHeartbeatWithoutTopicDescribeAcl(quorum: String): Unit = {
addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource)
val request = shareGroupHeartbeatRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
private def createShareGroupToDescribe(): Unit = {
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), shareGroupResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource)
shareConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroup)
val consumer = createShareConsumer()
consumer.subscribe(Collections.singleton(topic))
consumer.poll(Duration.ofMillis(500L))
removeAllClientAcls()
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testShareGroupDescribeWithGroupDescribeAndTopicDescribeAcl(quorum: String): Unit = {
createShareGroupToDescribe()
addAndVerifyAcls(shareGroupDescribeAcl(shareGroupResource), shareGroupResource)
addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
val request = shareGroupDescribeRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testShareGroupDescribeWithOperationAll(quorum: String): Unit = {
createShareGroupToDescribe()
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource)
addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
val request = shareGroupDescribeRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testShareGroupDescribeWithoutGroupDescribeAcl(quorum: String): Unit = {
createShareGroupToDescribe()
addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
val request = shareGroupDescribeRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testShareGroupDescribeWithoutGroupDescribeOrTopicDescribeAcl(quorum: String): Unit = {
createShareGroupToDescribe()
val request = shareGroupDescribeRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testShareFetchWithGroupReadAndTopicReadAcl(quorum: String): Unit = {
addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource)
addAndVerifyAcls(topicReadAcl(topicResource), topicResource)
val request = createShareFetchRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testShareFetchWithOperationAll(quorum: String): Unit = {
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource)
addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
val request = createShareFetchRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testShareFetchWithoutGroupReadOrTopicReadAcl(quorum: String): Unit = {
removeAllClientAcls()
val request = createShareFetchRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testShareFetchWithoutGroupReadAcl(quorum: String): Unit = {
addAndVerifyAcls(topicReadAcl(topicResource), topicResource)
val request = createShareFetchRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testShareFetchWithoutTopicReadAcl(quorum: String): Unit = {
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource)
val request = createShareFetchRequest
val response = connectAndReceive[ShareFetchResponse](request, listenerName = listenerName)
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, Errors.forCode(response.data.responses.get(0).partitions.get(0).errorCode))
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testShareAcknowledgeWithGroupReadAndTopicReadAcl(quorum: String): Unit = {
addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource)
addAndVerifyAcls(topicReadAcl(topicResource), topicResource)
val request = shareAcknowledgeRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testShareAcknowledgeWithOperationAll(quorum: String): Unit = {
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource)
addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
val request = shareAcknowledgeRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testShareAcknowledgeWithoutGroupReadOrTopicReadAcl(quorum: String): Unit = {
removeAllClientAcls()
val request = shareAcknowledgeRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testShareAcknowledgeFetchWithoutGroupReadAcl(quorum: String): Unit = {
addAndVerifyAcls(topicReadAcl(topicResource), topicResource)
val request = shareAcknowledgeRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testInitializeShareGroupStateWithClusterAcl(quorum: String): Unit = {
addAndVerifyAcls(clusterAcl(clusterResource), clusterResource)
val request = initializeShareGroupStateRequest
val resource = Set[ResourceType](CLUSTER)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testInitializeShareGroupStateWithOperationAll(quorum: String): Unit = {
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), clusterResource)
val request = initializeShareGroupStateRequest
val resource = Set[ResourceType](CLUSTER)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testInitializeShareGroupStateWithoutClusterAcl(quorum: String): Unit = {
removeAllClientAcls()
val request = initializeShareGroupStateRequest
val resource = Set[ResourceType](CLUSTER)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testReadShareGroupStateWithClusterAcl(quorum: String): Unit = {
addAndVerifyAcls(clusterAcl(clusterResource), clusterResource)
val request = readShareGroupStateRequest
val resource = Set[ResourceType](CLUSTER)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testReadShareGroupStateWithOperationAll(quorum: String): Unit = {
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), clusterResource)
val request = readShareGroupStateRequest
val resource = Set[ResourceType](CLUSTER)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testReadShareGroupStateWithoutClusterAcl(quorum: String): Unit = {
removeAllClientAcls()
val request = readShareGroupStateRequest
val resource = Set[ResourceType](CLUSTER)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testWriteShareGroupStateWithClusterAcl(quorum: String): Unit = {
addAndVerifyAcls(clusterAcl(clusterResource), clusterResource)
val request = writeShareGroupStateRequest
val resource = Set[ResourceType](CLUSTER)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testWriteShareGroupStateWithOperationAll(quorum: String): Unit = {
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), clusterResource)
val request = writeShareGroupStateRequest
val resource = Set[ResourceType](CLUSTER)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testWriteShareGroupStateWithoutClusterAcl(quorum: String): Unit = {
removeAllClientAcls()
val request = writeShareGroupStateRequest
val resource = Set[ResourceType](CLUSTER)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testDeleteShareGroupStateWithClusterAcl(quorum: String): Unit = {
addAndVerifyAcls(clusterAcl(clusterResource), clusterResource)
val request = deleteShareGroupStateRequest
val resource = Set[ResourceType](CLUSTER)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testDeleteShareGroupStateWithOperationAll(quorum: String): Unit = {
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), clusterResource)
val request = deleteShareGroupStateRequest
val resource = Set[ResourceType](CLUSTER)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testDeleteShareGroupStateWithoutClusterAcl(quorum: String): Unit = {
removeAllClientAcls()
val request = deleteShareGroupStateRequest
val resource = Set[ResourceType](CLUSTER)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testReadShareGroupStateSummaryWithClusterAcl(quorum: String): Unit = {
addAndVerifyAcls(clusterAcl(clusterResource), clusterResource)
val request = readShareGroupStateSummaryRequest
val resource = Set[ResourceType](CLUSTER)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testReadShareGroupStateSummaryWithOperationAll(quorum: String): Unit = {
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), clusterResource)
val request = readShareGroupStateRequest
val resource = Set[ResourceType](CLUSTER)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testReadShareGroupStateSummaryWithoutClusterAcl(quorum: String): Unit = {
removeAllClientAcls()
val request = readShareGroupStateRequest
val resource = Set[ResourceType](CLUSTER)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testDescribeShareGroupOffsetsWithGroupDescribeAndTopicDescribeAcl(quorum: String): Unit = {
addAndVerifyAcls(shareGroupDescribeAcl(shareGroupResource), shareGroupResource)
addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
val request = describeShareGroupOffsetsRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testDescribeShareGroupOffsetsWithOperationAll(quorum: String): Unit = {
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource)
addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
val request = describeShareGroupOffsetsRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testDescribeShareGroupOffsetsWithoutGroupDescribeOrTopicDescribeAcl(quorum: String): Unit = {
removeAllClientAcls()
val request = describeShareGroupOffsetsRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testDescribeShareGroupOffsetsWithoutGroupDescribeAcl(quorum: String): Unit = {
addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
val request = describeShareGroupOffsetsRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testDescribeShareGroupOffsetsWithoutTopicDescribeAcl(quorum: String): Unit = {
addAndVerifyAcls(shareGroupDescribeAcl(shareGroupResource), shareGroupResource)
val request = describeShareGroupOffsetsRequest
val response = connectAndReceive[DescribeShareGroupOffsetsResponse](request, listenerName = listenerName)
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, Errors.forCode(response.data.groups.get(0).topics.get(0).partitions.get(0).errorCode))
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testDeleteShareGroupOffsetsWithGroupDeleteAndTopicReadAcl(quorum: String): Unit = {
addAndVerifyAcls(shareGroupDeleteAcl(shareGroupResource), shareGroupResource)
addAndVerifyAcls(topicReadAcl(topicResource), topicResource)
val request = deleteShareGroupOffsetsRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testDeleteShareGroupOffsetsWithOperationAll(quorum: String): Unit = {
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource)
addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
val request = deleteShareGroupOffsetsRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testDeleteShareGroupOffsetsWithoutGroupDeleteOrTopicReadAcl(quorum: String): Unit = {
removeAllClientAcls()
val request = deleteShareGroupOffsetsRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testDeleteShareGroupOffsetsWithoutGroupDeleteAcl(quorum: String): Unit = {
addAndVerifyAcls(topicReadAcl(topicResource), topicResource)
val request = deleteShareGroupOffsetsRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testDeleteShareGroupOffsetsWithoutTopicReadAcl(quorum: String): Unit = {
addAndVerifyAcls(shareGroupDeleteAcl(shareGroupResource), shareGroupResource)
val request = deleteShareGroupOffsetsRequest
val response = connectAndReceive[DeleteShareGroupOffsetsResponse](request, listenerName = listenerName)
assertEquals(1, response.data.responses.size)
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, response.data.responses.get(0).errorCode, s"Unexpected response $response")
}
private def sendAndReceiveFirstRegexHeartbeat(memberId: String,
listenerName: ListenerName): ConsumerGroupHeartbeatResponseData = {
val request = new ConsumerGroupHeartbeatRequest.Builder(