Allowing WriteTxnMarkers API to run with AlterCluster permissions (#15837)

https://issues.apache.org/jira/browse/KAFKA-16513

https://cwiki.apache.org/confluence/display/KAFKA/KIP-1037%3A+Allow+WriteTxnMarkers+API+with+Alter+Cluster+Permission

Reviewers: Christo Lolov <christo_lolov@yahoo.com>,  Luke Chen <showuon@gmail.com>, Justine Olshan <jolshan@confluent.io>
This commit is contained in:
Sid Yagnik 2024-05-10 23:30:57 +01:00 committed by GitHub
parent 4e4f7d3231
commit ef7b48e66a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 61 additions and 9 deletions

View File

@ -2362,7 +2362,11 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleWriteTxnMarkersRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { def handleWriteTxnMarkersRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
ensureInterBrokerVersion(IBP_0_11_0_IV0) ensureInterBrokerVersion(IBP_0_11_0_IV0)
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) // We are checking for AlterCluster permissions first. If it is not present, we are authorizing cluster operation
// The latter will throw an exception if it is denied.
if (!authHelper.authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) {
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
}
val writeTxnMarkersRequest = request.body[WriteTxnMarkersRequest] val writeTxnMarkersRequest = request.body[WriteTxnMarkersRequest]
val errors = new ConcurrentHashMap[java.lang.Long, util.Map[TopicPartition, Errors]]() val errors = new ConcurrentHashMap[java.lang.Long, util.Map[TopicPartition, Errors]]()
val markers = writeTxnMarkersRequest.markers val markers = writeTxnMarkersRequest.markers

View File

@ -225,7 +225,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
ApiKeys.DESCRIBE_CONFIGS -> topicDescribeConfigsAcl, ApiKeys.DESCRIBE_CONFIGS -> topicDescribeConfigsAcl,
ApiKeys.ALTER_CONFIGS -> topicAlterConfigsAcl, ApiKeys.ALTER_CONFIGS -> topicAlterConfigsAcl,
ApiKeys.INIT_PRODUCER_ID -> (transactionIdWriteAcl ++ clusterIdempotentWriteAcl), ApiKeys.INIT_PRODUCER_ID -> (transactionIdWriteAcl ++ clusterIdempotentWriteAcl),
ApiKeys.WRITE_TXN_MARKERS -> clusterAcl, ApiKeys.WRITE_TXN_MARKERS -> (clusterAcl ++ clusterAlterAcl),
ApiKeys.ADD_PARTITIONS_TO_TXN -> (topicWriteAcl ++ transactionIdWriteAcl), ApiKeys.ADD_PARTITIONS_TO_TXN -> (topicWriteAcl ++ transactionIdWriteAcl),
ApiKeys.ADD_OFFSETS_TO_TXN -> (groupReadAcl ++ transactionIdWriteAcl), ApiKeys.ADD_OFFSETS_TO_TXN -> (groupReadAcl ++ transactionIdWriteAcl),
ApiKeys.END_TXN -> transactionIdWriteAcl, ApiKeys.END_TXN -> transactionIdWriteAcl,

View File

@ -29,10 +29,11 @@ import kafka.utils.{CoreUtils, Log4jController, Logging, TestUtils}
import kafka.zk.KafkaZkClient import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
import org.apache.kafka.common._
import org.apache.kafka.common.acl.AclOperation import org.apache.kafka.common.acl.AclOperation
import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER} import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER}
import org.apache.kafka.common.errors.UnsupportedVersionException import org.apache.kafka.common.errors.{ClusterAuthorizationException, UnsupportedVersionException}
import org.apache.kafka.common.internals.{KafkaFutureImpl, Topic} import org.apache.kafka.common.internals.{KafkaFutureImpl, Topic}
import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection}
@ -71,15 +72,14 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern,
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol} import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils} import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils}
import org.apache.kafka.common._
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig} import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.ClientMetricsManager import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.config.{ConfigType, Defaults, KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer} import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1} import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1}
import org.apache.kafka.server.common.{Features, MetadataVersion} import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.server.config._
import org.apache.kafka.server.metrics.ClientMetricsTestUtils import org.apache.kafka.server.metrics.ClientMetricsTestUtils
import org.apache.kafka.server.util.{FutureUtils, MockTime} import org.apache.kafka.server.util.{FutureUtils, MockTime}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams, FetchPartitionData, LogConfig} import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams, FetchPartitionData, LogConfig}
@ -2822,6 +2822,31 @@ class KafkaApisTest extends Logging {
() => kafkaApis.handleWriteTxnMarkersRequest(null, RequestLocal.withThreadConfinedCaching)) () => kafkaApis.handleWriteTxnMarkersRequest(null, RequestLocal.withThreadConfinedCaching))
} }
@Test
def requiredAclsNotPresentWriteTxnMarkersThrowsAuthorizationException(): Unit = {
// Here we need to use AuthHelperTest.matchSameElements instead of EasyMock.eq since the order of the request is unknown
val topicPartition = new TopicPartition("t", 0)
val (_, request) = createWriteTxnMarkersRequest(asList(topicPartition))
val authorizer: Authorizer = mock(classOf[Authorizer])
val clusterResource = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
val alterActions = Collections.singletonList(new Action(AclOperation.ALTER, clusterResource, 1, true, true))
val clusterActions = Collections.singletonList(new Action(AclOperation.CLUSTER_ACTION, clusterResource, 1, true, true))
val deniedList = Collections.singletonList(AuthorizationResult.DENIED)
when(authorizer.authorize(
request.context,
alterActions
)).thenReturn(deniedList)
when(authorizer.authorize(
request.context,
clusterActions
)).thenReturn(deniedList)
kafkaApis = createKafkaApis(authorizer = Some(authorizer))
assertThrows(classOf[ClusterAuthorizationException],
() => kafkaApis.handleWriteTxnMarkersRequest(request, RequestLocal.withThreadConfinedCaching))
}
@Test @Test
def shouldRespondWithUnsupportedForMessageFormatOnHandleWriteTxnMarkersWhenMagicLowerThanRequired(): Unit = { def shouldRespondWithUnsupportedForMessageFormatOnHandleWriteTxnMarkersWhenMagicLowerThanRequired(): Unit = {
val topicPartition = new TopicPartition("t", 0) val topicPartition = new TopicPartition("t", 0)
@ -3036,15 +3061,32 @@ class KafkaApisTest extends Logging {
assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L)) assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L))
} }
@Test @ParameterizedTest
def shouldAppendToLogOnWriteTxnMarkersWhenCorrectMagicVersion(): Unit = { @ValueSource(strings = Array("ALTER", "CLUSTER_ACTION"))
def shouldAppendToLogOnWriteTxnMarkersWhenCorrectMagicVersion(allowedAclOperation: String): Unit = {
val topicPartition = new TopicPartition("t", 0) val topicPartition = new TopicPartition("t", 0)
val request = createWriteTxnMarkersRequest(asList(topicPartition))._2 val request = createWriteTxnMarkersRequest(asList(topicPartition))._2
when(replicaManager.getMagic(topicPartition)) when(replicaManager.getMagic(topicPartition))
.thenReturn(Some(RecordBatch.MAGIC_VALUE_V2)) .thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
val requestLocal = RequestLocal.withThreadConfinedCaching val requestLocal = RequestLocal.withThreadConfinedCaching
kafkaApis = createKafkaApis()
// Allowing WriteTxnMarkers API with the help of allowedAclOperation parameter.
val authorizer: Authorizer = mock(classOf[Authorizer])
val clusterResource = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
val allowedAction = Collections.singletonList(new Action(AclOperation.fromString(allowedAclOperation), clusterResource, 1, true, true))
val deniedList = Collections.singletonList(AuthorizationResult.DENIED)
val allowedList = Collections.singletonList(AuthorizationResult.ALLOWED)
when(authorizer.authorize(
ArgumentMatchers.eq(request.context),
any()
)).thenReturn(deniedList)
when(authorizer.authorize(
request.context,
allowedAction
)).thenReturn(allowedList)
kafkaApis = createKafkaApis(authorizer = Some(authorizer))
kafkaApis.handleWriteTxnMarkersRequest(request, requestLocal) kafkaApis.handleWriteTxnMarkersRequest(request, requestLocal)
verify(replicaManager).appendRecords(anyLong, verify(replicaManager).appendRecords(anyLong,
anyShort, anyShort,

View File

@ -1914,6 +1914,12 @@ bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminc
<td>TransactionalId</td> <td>TransactionalId</td>
<td></td> <td></td>
</tr> </tr>
<tr>
<td>WRITE_TXN_MARKERS (27)</td>
<td>Alter</td>
<td>Cluster</td>
<td></td>
</tr>
<tr> <tr>
<td>WRITE_TXN_MARKERS (27)</td> <td>WRITE_TXN_MARKERS (27)</td>
<td>ClusterAction</td> <td>ClusterAction</td>