KAFKA-13661; Consistent permissions in KRaft for CreatePartitions API (#11745)

In #11649, we fixed one permission inconsistency between kraft and zk authorization for the `CreatePartitions` request. Previously kraft was requiring `CREATE` permission on the `Topic` resource when it should have required `ALTER`. A second inconsistency is that kraft was also allowing `CREATE` on the `Cluster` resource, which is not supported in zk clusters and was not documented in KIP-195: https://cwiki.apache.org/confluence/display/KAFKA/KIP-195%3A+AdminClient.createPartitions. This patch fixes this inconsistency and adds additional test coverage for both cases.

Reviewers: José Armando García Sancio <jsancio@gmail.com>
This commit is contained in:
Jason Gustafson 2022-02-11 15:01:08 -08:00 committed by GitHub
parent 40b261f082
commit e43916c148
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 124 additions and 20 deletions

View File

@ -105,7 +105,7 @@ class KafkaController(val config: KafkaConfig,
new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))
val partitionStateMachine: PartitionStateMachine = new ZkPartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient,
new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))
val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine,
private val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine,
partitionStateMachine, new ControllerDeletionClient(this, zkClient))
private val controllerChangeHandler = new ControllerChangeHandler(eventManager)
@ -218,6 +218,10 @@ class KafkaController(val config: KafkaConfig,
}
}
def isTopicQueuedForDeletion(topic: String): Boolean = {
topicDeletionManager.isTopicQueuedUpForDeletion(topic)
}
private def state: ControllerState = eventManager.state
/**

View File

@ -687,9 +687,15 @@ class ControllerApis(val requestChannel: RequestChannel,
}
def handleCreatePartitions(request: RequestChannel.Request): Unit = {
val future = createPartitions(request.body[CreatePartitionsRequest].data,
authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false),
names => authHelper.filterByAuthorized(request.context, ALTER, TOPIC, names)(n => n))
def filterAlterAuthorizedTopics(topics: Iterable[String]): Set[String] = {
authHelper.filterByAuthorized(request.context, ALTER, TOPIC, topics)(n => n)
}
val future = createPartitions(
request.body[CreatePartitionsRequest].data,
filterAlterAuthorizedTopics
)
future.whenComplete { (responses, exception) =>
if (exception != null) {
requestHelper.handleError(request, exception)
@ -704,10 +710,10 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
def createPartitions(request: CreatePartitionsRequestData,
hasClusterAuth: Boolean,
getCreatableTopics: Iterable[String] => Set[String])
: CompletableFuture[util.List[CreatePartitionsTopicResult]] = {
def createPartitions(
request: CreatePartitionsRequestData,
getAlterAuthorizedTopics: Iterable[String] => Set[String]
): CompletableFuture[util.List[CreatePartitionsTopicResult]] = {
val deadlineNs = time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs, MILLISECONDS);
val responses = new util.ArrayList[CreatePartitionsTopicResult]()
val duplicateTopicNames = new util.HashSet[String]()
@ -725,13 +731,7 @@ class ControllerApis(val requestChannel: RequestChannel,
setErrorMessage("Duplicate topic name."))
topicNames.remove(topicName)
}
val authorizedTopicNames = {
if (hasClusterAuth) {
topicNames.asScala
} else {
getCreatableTopics(topicNames.asScala)
}
}
val authorizedTopicNames = getAlterAuthorizedTopics(topicNames.asScala)
val topics = new util.ArrayList[CreatePartitionsTopic]
topicNames.forEach { topicName =>
if (authorizedTopicNames.contains(topicName)) {

View File

@ -1998,7 +1998,7 @@ class KafkaApis(val requestChannel: RequestChannel,
notDuped)(_.name)
val (queuedForDeletion, valid) = authorized.partition { topic =>
zkSupport.controller.topicDeletionManager.isTopicQueuedUpForDeletion(topic.name)
zkSupport.controller.isTopicQueuedForDeletion(topic.name)
}
val errors = dupes.map(_ -> new ApiError(Errors.INVALID_REQUEST, "Duplicate topic in request.")) ++

View File

@ -290,7 +290,7 @@ class ZkAdminManager(val config: KafkaConfig,
}
}
def createPartitions(timeout: Int,
def createPartitions(timeoutMs: Int,
newPartitions: Seq[CreatePartitionsTopic],
validateOnly: Boolean,
controllerMutationQuota: ControllerMutationQuota,
@ -367,7 +367,7 @@ class ZkAdminManager(val config: KafkaConfig,
}
// 2. if timeout <= 0, validateOnly or no topics can proceed return immediately
if (timeout <= 0 || validateOnly || !metadata.exists(_.error.is(Errors.NONE))) {
if (timeoutMs <= 0 || validateOnly || !metadata.exists(_.error.is(Errors.NONE))) {
val results = metadata.map { createPartitionMetadata =>
// ignore topics that already have errors
if (createPartitionMetadata.error.isSuccess && !validateOnly) {
@ -379,7 +379,7 @@ class ZkAdminManager(val config: KafkaConfig,
callback(results)
} else {
// 3. else pass the assignments and errors to the delayed operation and set the keys
val delayedCreate = new DelayedCreatePartitions(timeout, metadata, this, callback)
val delayedCreate = new DelayedCreatePartitions(timeoutMs, metadata, this, callback)
val delayedCreateKeys = newPartitions.map(createPartitionTopic => TopicKey(createPartitionTopic.name))
// try to complete the request immediately, otherwise put it into the purgatory
topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys)

View File

@ -727,7 +727,45 @@ class ControllerApisTest {
new CreatePartitionsTopicResult().setName("baz").
setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()).
setErrorMessage(null)),
controllerApis.createPartitions(request, false, _ => Set("foo", "bar")).get().asScala.toSet)
controllerApis.createPartitions(request, _ => Set("foo", "bar")).get().asScala.toSet)
}
@Test
def testCreatePartitionsAuthorization(): Unit = {
val controller = new MockController.Builder()
.newInitialTopic("foo", Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q"))
.build()
val authorizer = mock(classOf[Authorizer])
val controllerApis = createControllerApis(Some(authorizer), controller)
val requestData = new CreatePartitionsRequestData()
requestData.topics().add(new CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(2))
requestData.topics().add(new CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(10))
val request = new CreatePartitionsRequest.Builder(requestData).build()
val fooResource = new ResourcePattern(ResourceType.TOPIC, "foo", PatternType.LITERAL)
val fooAction = new Action(AclOperation.ALTER, fooResource, 1, true, true)
val barResource = new ResourcePattern(ResourceType.TOPIC, "bar", PatternType.LITERAL)
val barAction = new Action(AclOperation.ALTER, barResource, 1, true, true)
when(authorizer.authorize(
any[RequestContext],
any[util.List[Action]]
)).thenAnswer { invocation =>
val actions = invocation.getArgument[util.List[Action]](1).asScala
val results = actions.map { action =>
if (action == fooAction) AuthorizationResult.ALLOWED
else if (action == barAction) AuthorizationResult.DENIED
else throw new AssertionError(s"Unexpected action $action")
}
new util.ArrayList[AuthorizationResult](results.asJava)
}
val response = handleRequest[CreatePartitionsResponse](request, controllerApis)
val results = response.data.results.asScala
assertEquals(Some(Errors.NONE), results.find(_.name == "foo").map(result => Errors.forCode(result.errorCode)))
assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), results.find(_.name == "bar").map(result => Errors.forCode(result.errorCode)))
}
@Test

View File

@ -93,6 +93,8 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
import scala.collection.{Map, Seq, mutable}
import scala.jdk.CollectionConverters._
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
class KafkaApisTest {
private val requestChannel: RequestChannel = mock(classOf[RequestChannel])
private val requestChannelMetrics: RequestChannel.Metrics = mock(classOf[RequestChannel.Metrics])
@ -766,6 +768,66 @@ class KafkaApisTest {
testForwardableApi(ApiKeys.CREATE_TOPICS, requestBuilder)
}
@Test
def testCreatePartitionsAuthorization(): Unit = {
val authorizer: Authorizer = mock(classOf[Authorizer])
val kafkaApis = createKafkaApis(authorizer = Some(authorizer))
val timeoutMs = 35000
val requestData = new CreatePartitionsRequestData()
.setTimeoutMs(timeoutMs)
.setValidateOnly(false)
val fooCreatePartitionsData = new CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(2)
val barCreatePartitionsData = new CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(10)
requestData.topics().add(fooCreatePartitionsData)
requestData.topics().add(barCreatePartitionsData)
val fooResource = new ResourcePattern(ResourceType.TOPIC, "foo", PatternType.LITERAL)
val fooAction = new Action(AclOperation.ALTER, fooResource, 1, true, true)
val barResource = new ResourcePattern(ResourceType.TOPIC, "bar", PatternType.LITERAL)
val barAction = new Action(AclOperation.ALTER, barResource, 1, true, true)
when(authorizer.authorize(
any[RequestContext](),
any[util.List[Action]]()
)).thenAnswer { invocation =>
val actions = invocation.getArgument[util.List[Action]](1).asScala
val results = actions.map { action =>
if (action == fooAction) AuthorizationResult.ALLOWED
else if (action == barAction) AuthorizationResult.DENIED
else throw new AssertionError(s"Unexpected action $action")
}
new util.ArrayList[AuthorizationResult](results.asJava)
}
val request = buildRequest(new CreatePartitionsRequest.Builder(requestData).build())
when(controller.isActive).thenReturn(true)
when(controller.isTopicQueuedForDeletion("foo")).thenReturn(false)
when(clientControllerQuotaManager.newQuotaFor(
ArgumentMatchers.eq(request), ArgumentMatchers.anyShort())
).thenReturn(UnboundedControllerMutationQuota)
when(adminManager.createPartitions(
timeoutMs = ArgumentMatchers.eq(timeoutMs),
newPartitions = ArgumentMatchers.eq(Seq(fooCreatePartitionsData)),
validateOnly = ArgumentMatchers.eq(false),
controllerMutationQuota = ArgumentMatchers.eq(UnboundedControllerMutationQuota),
callback = ArgumentMatchers.any[Map[String, ApiError] => Unit]()
)).thenAnswer { invocation =>
val callback = invocation.getArgument[Map[String, ApiError] => Unit](4)
callback.apply(Map("foo" -> ApiError.NONE))
}
kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching)
val capturedResponse = verifyNoThrottling(request)
val response = capturedResponse.getValue.asInstanceOf[CreatePartitionsResponse]
val results = response.data.results.asScala
assertEquals(Some(Errors.NONE), results.find(_.name == "foo").map(result => Errors.forCode(result.errorCode)))
assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), results.find(_.name == "bar").map(result => Errors.forCode(result.errorCode)))
}
private def createTopicAuthorization(authorizer: Authorizer,
operation: AclOperation,
authorizedTopic: String,