mirror of https://github.com/apache/kafka.git
KAFKA-16513; Add test for WriteTxnMarkers with AlterCluster permission
In #15837, we introduced the change to allow calling the WriteTxnMarkers API with AlterCluster permissions. This PR proposes 2 enhancements: - When a WriteTxnMarkers request is received, it is first authorized against the alter cluster permission. If the user does not have this permission, a 'deny' will be logged. However, if the user does have the cluster action permission, the request will be successfully authorized. Don't log the first deny to avoid confusion. - Add a `WriteTxnMarkersRequest` to be called from the test `testAuthorizationWithTopicExisting`, so that the request can be exercised and verified with both possible permissions. Author: Nikhil Ramakrishnan <nikrmk@amazon.com> Reviewers: Christo Lolov <lolovc@amazon.com> Closes #15952 from nikramakrishnan/kip1037-addTest
This commit is contained in:
parent
52b4596dae
commit
b5a013e456
|
@ -2364,7 +2364,7 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
ensureInterBrokerVersion(IBP_0_11_0_IV0)
|
||||
// We are checking for AlterCluster permissions first. If it is not present, we are authorizing cluster operation
|
||||
// The latter will throw an exception if it is denied.
|
||||
if (!authHelper.authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) {
|
||||
if (!authHelper.authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME, logIfDenied = false)) {
|
||||
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
|
||||
}
|
||||
val writeTxnMarkersRequest = request.body[WriteTxnMarkersRequest]
|
||||
|
|
|
@ -39,7 +39,7 @@ import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartit
|
|||
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic, OffsetForLeaderTopicCollection}
|
||||
import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState}
|
||||
import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState}
|
||||
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ControlledShutdownRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, DescribeProducersRequestData, DescribeTransactionsRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, ListTransactionsRequestData, MetadataRequestData, OffsetCommitRequestData, ProduceRequestData, SyncGroupRequestData}
|
||||
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ControlledShutdownRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, DescribeProducersRequestData, DescribeTransactionsRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, ListTransactionsRequestData, MetadataRequestData, OffsetCommitRequestData, ProduceRequestData, SyncGroupRequestData, WriteTxnMarkersRequestData}
|
||||
import org.apache.kafka.common.network.ListenerName
|
||||
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
|
||||
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, SimpleRecord}
|
||||
|
@ -60,6 +60,7 @@ import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
|
|||
|
||||
import java.util.Collections.singletonList
|
||||
import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic
|
||||
import org.apache.kafka.common.message.WriteTxnMarkersRequestData.{WritableTxnMarker, WritableTxnMarkerTopic}
|
||||
import org.junit.jupiter.api.function.Executable
|
||||
|
||||
import scala.annotation.nowarn
|
||||
|
@ -617,6 +618,22 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
|
|||
)
|
||||
).build()
|
||||
|
||||
private def writeTxnMarkersRequest: WriteTxnMarkersRequest = new WriteTxnMarkersRequest.Builder(
|
||||
new WriteTxnMarkersRequestData()
|
||||
.setMarkers(
|
||||
List(new WritableTxnMarker()
|
||||
.setProducerId(producerId)
|
||||
.setProducerEpoch(1)
|
||||
.setTransactionResult(false)
|
||||
.setTopics(List(new WritableTxnMarkerTopic()
|
||||
.setName(tp.topic())
|
||||
.setPartitionIndexes(List(Integer.valueOf(tp.partition())).asJava)
|
||||
).asJava)
|
||||
.setCoordinatorEpoch(1)
|
||||
).asJava
|
||||
)
|
||||
).build()
|
||||
|
||||
private def sendRequests(requestKeyToRequest: mutable.Map[ApiKeys, AbstractRequest], topicExists: Boolean = true,
|
||||
topicNames: Map[Uuid, String] = getTopicNames()) = {
|
||||
for ((key, request) <- requestKeyToRequest) {
|
||||
|
@ -683,6 +700,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
|
|||
ApiKeys.LIST_PARTITION_REASSIGNMENTS -> listPartitionReassignmentsRequest,
|
||||
ApiKeys.DESCRIBE_PRODUCERS -> describeProducersRequest,
|
||||
ApiKeys.DESCRIBE_TRANSACTIONS -> describeTransactionsRequest,
|
||||
ApiKeys.WRITE_TXN_MARKERS -> writeTxnMarkersRequest,
|
||||
)
|
||||
if (!isKRaftTest()) {
|
||||
// Inter-broker APIs use an invalid broker epoch, so does not affect the test case
|
||||
|
|
|
@ -2830,7 +2830,7 @@ class KafkaApisTest extends Logging {
|
|||
|
||||
val authorizer: Authorizer = mock(classOf[Authorizer])
|
||||
val clusterResource = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
|
||||
val alterActions = Collections.singletonList(new Action(AclOperation.ALTER, clusterResource, 1, true, true))
|
||||
val alterActions = Collections.singletonList(new Action(AclOperation.ALTER, clusterResource, 1, true, false))
|
||||
val clusterActions = Collections.singletonList(new Action(AclOperation.CLUSTER_ACTION, clusterResource, 1, true, true))
|
||||
val deniedList = Collections.singletonList(AuthorizationResult.DENIED)
|
||||
when(authorizer.authorize(
|
||||
|
@ -3074,7 +3074,13 @@ class KafkaApisTest extends Logging {
|
|||
// Allowing WriteTxnMarkers API with the help of allowedAclOperation parameter.
|
||||
val authorizer: Authorizer = mock(classOf[Authorizer])
|
||||
val clusterResource = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
|
||||
val allowedAction = Collections.singletonList(new Action(AclOperation.fromString(allowedAclOperation), clusterResource, 1, true, true))
|
||||
val allowedAction = Collections.singletonList(new Action(
|
||||
AclOperation.fromString(allowedAclOperation),
|
||||
clusterResource,
|
||||
1,
|
||||
true,
|
||||
allowedAclOperation.equals("CLUSTER_ACTION")
|
||||
))
|
||||
val deniedList = Collections.singletonList(AuthorizationResult.DENIED)
|
||||
val allowedList = Collections.singletonList(AuthorizationResult.ALLOWED)
|
||||
when(authorizer.authorize(
|
||||
|
|
Loading…
Reference in New Issue