mirror of https://github.com/apache/kafka.git
KAFKA-5547; Return TOPIC_AUTHORIZATION_FAILED error if no describe access for topics
Author: Manikumar Reddy <manikumar.reddy@gmail.com> Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io> Closes #3924 from omkreddy/KAFKA-5547-TOPIC-AUTHRO
This commit is contained in:
parent
fb6ca658d6
commit
10cd98cc89
|
@ -786,7 +786,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
|
|||
future.raise(new CommitFailedException());
|
||||
return;
|
||||
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
|
||||
future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic"));
|
||||
future.raise(new KafkaException("Topic or Partition " + tp + " does not exist"));
|
||||
return;
|
||||
} else {
|
||||
future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
|
||||
|
@ -857,8 +857,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
|
|||
log.debug("Failed to fetch offset for partition {}: {}", tp, error.message());
|
||||
|
||||
if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
|
||||
future.raise(new KafkaException("Partition " + tp + " may not exist or the user may not have " +
|
||||
"Describe access to the topic"));
|
||||
future.raise(new KafkaException("Topic or Partition " + tp + " does not exist"));
|
||||
} else {
|
||||
future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
|
||||
}
|
||||
|
|
|
@ -85,6 +85,8 @@ public class OffsetFetchResponse extends AbstractResponse {
|
|||
public static final String NO_METADATA = "";
|
||||
public static final PartitionData UNKNOWN_PARTITION = new PartitionData(INVALID_OFFSET, NO_METADATA,
|
||||
Errors.UNKNOWN_TOPIC_OR_PARTITION);
|
||||
public static final PartitionData UNAUTHORIZED_PARTITION = new PartitionData(INVALID_OFFSET, NO_METADATA,
|
||||
Errors.TOPIC_AUTHORIZATION_FAILED);
|
||||
|
||||
/**
|
||||
* Possible error codes:
|
||||
|
|
|
@ -264,25 +264,25 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
}.toMap
|
||||
sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(requestThrottleMs, results.asJava))
|
||||
} else {
|
||||
val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition {
|
||||
case (topicPartition, _) =>
|
||||
val authorizedForDescribe = authorize(request.session, Describe, new Resource(Topic, topicPartition.topic))
|
||||
val exists = metadataCache.contains(topicPartition.topic)
|
||||
if (!authorizedForDescribe && exists)
|
||||
debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
|
||||
s"on partition $topicPartition failing due to user not having DESCRIBE authorization, but returning UNKNOWN_TOPIC_OR_PARTITION")
|
||||
authorizedForDescribe && exists
|
||||
}
|
||||
|
||||
val (authorizedTopics, unauthorizedForReadTopics) = existingAndAuthorizedForDescribeTopics.partition {
|
||||
case (topicPartition, _) => authorize(request.session, Read, new Resource(Topic, topicPartition.topic))
|
||||
var unauthorizedTopics = Set[TopicPartition]()
|
||||
var nonExistingTopics = Set[TopicPartition]()
|
||||
var authorizedTopics = mutable.Map[TopicPartition, OffsetCommitRequest.PartitionData]()
|
||||
|
||||
for ((topicPartition, partitionData) <- offsetCommitRequest.offsetData.asScala.toMap) {
|
||||
if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic)))
|
||||
unauthorizedTopics += topicPartition
|
||||
else if (!metadataCache.contains(topicPartition.topic))
|
||||
nonExistingTopics += topicPartition
|
||||
else
|
||||
authorizedTopics += (topicPartition -> partitionData)
|
||||
}
|
||||
|
||||
// the callback for sending an offset commit response
|
||||
def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Errors]) {
|
||||
val combinedCommitStatus = commitStatus ++
|
||||
unauthorizedForReadTopics.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED) ++
|
||||
nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => Errors.UNKNOWN_TOPIC_OR_PARTITION)
|
||||
unauthorizedTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++
|
||||
nonExistingTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
|
||||
|
||||
if (isDebugEnabled)
|
||||
combinedCommitStatus.foreach { case (topicPartition, error) =>
|
||||
|
@ -313,7 +313,7 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
case e: Throwable => (topicPartition, Errors.forException(e))
|
||||
}
|
||||
}
|
||||
sendResponseCallback(responseInfo)
|
||||
sendResponseCallback(responseInfo.toMap)
|
||||
} else {
|
||||
// for version 1 and beyond store offsets in offset manager
|
||||
|
||||
|
@ -353,7 +353,7 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
offsetCommitRequest.groupId,
|
||||
offsetCommitRequest.memberId,
|
||||
offsetCommitRequest.generationId,
|
||||
partitionData,
|
||||
partitionData.toMap,
|
||||
sendResponseCallback)
|
||||
}
|
||||
}
|
||||
|
@ -381,21 +381,25 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
return
|
||||
}
|
||||
|
||||
val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) =
|
||||
produceRequest.partitionRecordsOrFail.asScala.partition { case (tp, _) =>
|
||||
authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp.topic)
|
||||
}
|
||||
var unauthorizedTopics = Set[TopicPartition]()
|
||||
var nonExistingTopics = Set[TopicPartition]()
|
||||
var authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
|
||||
|
||||
val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
|
||||
case (tp, _) => authorize(request.session, Write, new Resource(Topic, tp.topic))
|
||||
for ((topicPartition, memoryRecords) <- produceRequest.partitionRecordsOrFail.asScala) {
|
||||
if (!authorize(request.session, Write, new Resource(Topic, topicPartition.topic)))
|
||||
unauthorizedTopics += topicPartition
|
||||
else if (!metadataCache.contains(topicPartition.topic))
|
||||
nonExistingTopics += topicPartition
|
||||
else
|
||||
authorizedRequestInfo += (topicPartition -> memoryRecords)
|
||||
}
|
||||
|
||||
// the callback for sending a produce response
|
||||
def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
|
||||
|
||||
val mergedResponseStatus = responseStatus ++
|
||||
unauthorizedForWriteRequestInfo.mapValues(_ => new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)) ++
|
||||
nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION))
|
||||
unauthorizedTopics.map(_ -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)) ++
|
||||
nonExistingTopics.map(_ -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION))
|
||||
|
||||
var errorInResponse = false
|
||||
|
||||
|
@ -479,21 +483,26 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
val versionId = request.header.apiVersion
|
||||
val clientId = request.header.clientId
|
||||
|
||||
val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = fetchRequest.fetchData.asScala.toSeq.partition {
|
||||
case (tp, _) => authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp.topic)
|
||||
var unauthorizedTopics = Set[TopicPartition]()
|
||||
var nonExistingTopics = Set[TopicPartition]()
|
||||
var authorizedRequestInfo = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]()
|
||||
|
||||
for ((topicPartition, partitionData) <- fetchRequest.fetchData.asScala) {
|
||||
if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic)))
|
||||
unauthorizedTopics += topicPartition
|
||||
else if (!metadataCache.contains(topicPartition.topic))
|
||||
nonExistingTopics += topicPartition
|
||||
else
|
||||
authorizedRequestInfo += (topicPartition -> partitionData)
|
||||
}
|
||||
|
||||
val (authorizedRequestInfo, unauthorizedForReadRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
|
||||
case (tp, _) => authorize(request.session, Read, new Resource(Topic, tp.topic))
|
||||
}
|
||||
|
||||
val nonExistingOrUnauthorizedForDescribePartitionData = nonExistingOrUnauthorizedForDescribeTopics.map {
|
||||
case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
|
||||
val nonExistingPartitionData = nonExistingTopics.map {
|
||||
case tp => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
|
||||
FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY))
|
||||
}
|
||||
|
||||
val unauthorizedForReadPartitionData = unauthorizedForReadRequestInfo.map {
|
||||
case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
|
||||
val unauthorizedForReadPartitionData = unauthorizedTopics.map {
|
||||
case tp => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
|
||||
FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY))
|
||||
}
|
||||
|
||||
|
@ -538,7 +547,7 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
}
|
||||
}
|
||||
|
||||
val mergedPartitionData = partitionData ++ unauthorizedForReadPartitionData ++ nonExistingOrUnauthorizedForDescribePartitionData
|
||||
val mergedPartitionData = partitionData ++ unauthorizedForReadPartitionData ++ nonExistingPartitionData
|
||||
|
||||
val fetchedPartitionData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]()
|
||||
|
||||
|
@ -644,7 +653,7 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
}
|
||||
|
||||
val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ =>
|
||||
new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, List[JLong]().asJava)
|
||||
new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, List[JLong]().asJava)
|
||||
)
|
||||
|
||||
val responseMap = authorizedRequestInfo.map {case (topicPartition, partitionData) =>
|
||||
|
@ -697,7 +706,7 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
}
|
||||
|
||||
val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => {
|
||||
new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
|
||||
new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
|
||||
ListOffsetResponse.UNKNOWN_TIMESTAMP,
|
||||
ListOffsetResponse.UNKNOWN_OFFSET)
|
||||
})
|
||||
|
@ -957,7 +966,7 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
Set.empty[MetadataResponse.TopicMetadata]
|
||||
else
|
||||
unauthorizedForDescribeTopics.map(topic =>
|
||||
new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, java.util.Collections.emptyList()))
|
||||
new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, false, java.util.Collections.emptyList()))
|
||||
|
||||
// In version 0, we returned an error when brokers with replicas were unavailable,
|
||||
// while in higher versions we simply don't include the broker in the returned broker list
|
||||
|
@ -1029,7 +1038,7 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
}
|
||||
}.toMap
|
||||
|
||||
val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
|
||||
val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap
|
||||
new OffsetFetchResponse(requestThrottleMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
|
||||
} else {
|
||||
// versions 1 and above read offsets from Kafka
|
||||
|
@ -1050,7 +1059,7 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
if (error != Errors.NONE)
|
||||
offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
|
||||
else {
|
||||
val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
|
||||
val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap
|
||||
new OffsetFetchResponse(requestThrottleMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
|
||||
}
|
||||
}
|
||||
|
@ -1370,11 +1379,10 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
|
||||
val (queuedForDeletion, valid) = authorized.partition { case (topic, _) =>
|
||||
controller.topicDeletionManager.isTopicQueuedUpForDeletion(topic)
|
||||
|
||||
}
|
||||
|
||||
val errors = dupes.map(_ -> new ApiError(Errors.INVALID_REQUEST, "Duplicate topic in request.")) ++
|
||||
unauthorized.keySet.map( topic => topic -> createPartitionsAuthorizationApiError(request.session, topic) ) ++
|
||||
unauthorized.keySet.map(_ -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, "The topic authorization is failed.")) ++
|
||||
queuedForDeletion.keySet.map(_ -> new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is queued for deletion."))
|
||||
|
||||
adminManager.createPartitions(createPartitionsRequest.timeout, valid, createPartitionsRequest.validateOnly,
|
||||
|
@ -1382,28 +1390,26 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
}
|
||||
}
|
||||
|
||||
private def createPartitionsAuthorizationApiError(session: RequestChannel.Session, topic: String): ApiError = {
|
||||
if (authorize(session, Describe, new Resource(Topic, topic)))
|
||||
new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null)
|
||||
else
|
||||
new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, null)
|
||||
}
|
||||
|
||||
def handleDeleteTopicsRequest(request: RequestChannel.Request) {
|
||||
val deleteTopicRequest = request.body[DeleteTopicsRequest]
|
||||
|
||||
val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = deleteTopicRequest.topics.asScala.partition { topic =>
|
||||
authorize(request.session, Describe, new Resource(Topic, topic)) && metadataCache.contains(topic)
|
||||
}
|
||||
var unauthorizedTopics = Set[String]()
|
||||
var nonExistingTopics = Set[String]()
|
||||
var authorizedForDeleteTopics = Set[String]()
|
||||
|
||||
val (authorizedTopics, unauthorizedForDeleteTopics) = existingAndAuthorizedForDescribeTopics.partition { topic =>
|
||||
authorize(request.session, Delete, new Resource(Topic, topic))
|
||||
for (topic <- deleteTopicRequest.topics.asScala) {
|
||||
if (!authorize(request.session, Delete, new Resource(Topic, topic)))
|
||||
unauthorizedTopics += topic
|
||||
else if (!metadataCache.contains(topic))
|
||||
nonExistingTopics += topic
|
||||
else
|
||||
authorizedForDeleteTopics += topic
|
||||
}
|
||||
|
||||
def sendResponseCallback(results: Map[String, Errors]): Unit = {
|
||||
def createResponse(requestThrottleMs: Int): AbstractResponse = {
|
||||
val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++
|
||||
unauthorizedForDeleteTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results
|
||||
val completeResults = unauthorizedTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++
|
||||
nonExistingTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++ results
|
||||
val responseBody = new DeleteTopicsResponse(requestThrottleMs, completeResults.asJava)
|
||||
trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
|
||||
responseBody
|
||||
|
@ -1418,12 +1424,12 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
sendResponseCallback(results)
|
||||
} else {
|
||||
// If no authorized topics return immediately
|
||||
if (authorizedTopics.isEmpty)
|
||||
if (authorizedForDeleteTopics.isEmpty)
|
||||
sendResponseCallback(Map())
|
||||
else {
|
||||
adminManager.deleteTopics(
|
||||
deleteTopicRequest.timeout.toInt,
|
||||
authorizedTopics,
|
||||
authorizedForDeleteTopics,
|
||||
sendResponseCallback
|
||||
)
|
||||
}
|
||||
|
@ -1433,21 +1439,26 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
def handleDeleteRecordsRequest(request: RequestChannel.Request) {
|
||||
val deleteRecordsRequest = request.body[DeleteRecordsRequest]
|
||||
|
||||
val (authorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = deleteRecordsRequest.partitionOffsets.asScala.partition {
|
||||
case (topicPartition, _) => authorize(request.session, Describe, new Resource(Topic, topicPartition.topic)) && metadataCache.contains(topicPartition.topic)
|
||||
}
|
||||
var unauthorizedTopics = Set[TopicPartition]()
|
||||
var nonExistingTopics = Set[TopicPartition]()
|
||||
var authorizedForDeleteTopics = mutable.Map[TopicPartition, Long]()
|
||||
|
||||
val (authorizedForDeleteTopics, unauthorizedForDeleteTopics) = authorizedForDescribeTopics.partition {
|
||||
case (topicPartition, _) => authorize(request.session, Delete, new Resource(Topic, topicPartition.topic))
|
||||
for ((topicPartition, offset) <- deleteRecordsRequest.partitionOffsets.asScala) {
|
||||
if (!authorize(request.session, Delete, new Resource(Topic, topicPartition.topic)))
|
||||
unauthorizedTopics += topicPartition
|
||||
else if (!metadataCache.contains(topicPartition.topic))
|
||||
nonExistingTopics += topicPartition
|
||||
else
|
||||
authorizedForDeleteTopics += (topicPartition -> offset)
|
||||
}
|
||||
|
||||
// the callback for sending a DeleteRecordsResponse
|
||||
def sendResponseCallback(responseStatus: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse]) {
|
||||
|
||||
val mergedResponseStatus = responseStatus ++
|
||||
unauthorizedForDeleteTopics.mapValues(_ =>
|
||||
unauthorizedTopics.map(_ ->
|
||||
new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.TOPIC_AUTHORIZATION_FAILED)) ++
|
||||
nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ =>
|
||||
nonExistingTopics.map(_ ->
|
||||
new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.UNKNOWN_TOPIC_OR_PARTITION))
|
||||
|
||||
mergedResponseStatus.foreach { case (topicPartition, status) =>
|
||||
|
@ -1646,24 +1657,28 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
else {
|
||||
val internalTopics = partitionsToAdd.asScala.filter {tp => org.apache.kafka.common.internals.Topic.isInternal(tp.topic())}
|
||||
|
||||
val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) =
|
||||
partitionsToAdd.asScala.partition { tp =>
|
||||
authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp)
|
||||
}
|
||||
var unauthorizedTopics = Set[TopicPartition]()
|
||||
var nonExistingTopics = Set[TopicPartition]()
|
||||
var authorizedPartitions = Set[TopicPartition]()
|
||||
|
||||
val (authorizedPartitions, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition { tp =>
|
||||
authorize(request.session, Write, new Resource(Topic, tp.topic))
|
||||
for ( topicPartition <- partitionsToAdd.asScala) {
|
||||
if (!authorize(request.session, Write, new Resource(Topic, topicPartition.topic)))
|
||||
unauthorizedTopics += topicPartition
|
||||
else if (!metadataCache.contains(topicPartition.topic))
|
||||
nonExistingTopics += topicPartition
|
||||
else
|
||||
authorizedPartitions += topicPartition
|
||||
}
|
||||
|
||||
if (nonExistingOrUnauthorizedForDescribeTopics.nonEmpty
|
||||
|| unauthorizedForWriteRequestInfo.nonEmpty
|
||||
if (unauthorizedTopics.nonEmpty
|
||||
|| nonExistingTopics.nonEmpty
|
||||
|| internalTopics.nonEmpty) {
|
||||
|
||||
// Any failed partition check causes the entire request to fail. We send the appropriate error codes for the
|
||||
// partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error code for the partitions which succeeded
|
||||
// the authorization check to indicate that they were not added to the transaction.
|
||||
val partitionErrors = (unauthorizedForWriteRequestInfo.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++
|
||||
nonExistingOrUnauthorizedForDescribeTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION) ++
|
||||
val partitionErrors = (unauthorizedTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++
|
||||
nonExistingTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION) ++
|
||||
internalTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++
|
||||
authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED)).toMap
|
||||
|
||||
|
@ -1734,26 +1749,24 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
else if (!authorize(request.session, Read, new Resource(Group, txnOffsetCommitRequest.consumerGroupId)))
|
||||
sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception)
|
||||
else {
|
||||
val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = txnOffsetCommitRequest.offsets.asScala.toMap.partition {
|
||||
case (topicPartition, _) =>
|
||||
val authorizedForDescribe = authorize(request.session, Describe, new Resource(Topic, topicPartition.topic))
|
||||
val exists = metadataCache.contains(topicPartition.topic)
|
||||
if (!authorizedForDescribe && exists)
|
||||
debug(s"TxnOffsetCommit with correlation id ${header.correlationId} from client ${header.clientId} " +
|
||||
s"on partition $topicPartition failing due to user not having DESCRIBE authorization, but returning " +
|
||||
s"${Errors.UNKNOWN_TOPIC_OR_PARTITION.name}")
|
||||
authorizedForDescribe && exists
|
||||
}
|
||||
var unauthorizedTopics = Set[TopicPartition]()
|
||||
var nonExistingTopics = Set[TopicPartition]()
|
||||
var authorizedTopics = mutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]()
|
||||
|
||||
val (authorizedTopics, unauthorizedForReadTopics) = existingAndAuthorizedForDescribeTopics.partition {
|
||||
case (topicPartition, _) => authorize(request.session, Read, new Resource(Topic, topicPartition.topic))
|
||||
for ((topicPartition, commitedOffset) <- txnOffsetCommitRequest.offsets.asScala) {
|
||||
if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic)))
|
||||
unauthorizedTopics += topicPartition
|
||||
else if (!metadataCache.contains(topicPartition.topic))
|
||||
nonExistingTopics += topicPartition
|
||||
else
|
||||
authorizedTopics += (topicPartition -> commitedOffset)
|
||||
}
|
||||
|
||||
// the callback for sending an offset commit response
|
||||
def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]) {
|
||||
val combinedCommitStatus = commitStatus ++
|
||||
unauthorizedForReadTopics.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED) ++
|
||||
nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => Errors.UNKNOWN_TOPIC_OR_PARTITION)
|
||||
unauthorizedTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++
|
||||
nonExistingTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
|
||||
|
||||
if (isDebugEnabled)
|
||||
combinedCommitStatus.foreach { case (topicPartition, error) =>
|
||||
|
@ -1769,7 +1782,7 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
if (authorizedTopics.isEmpty)
|
||||
sendResponseCallback(Map.empty)
|
||||
else {
|
||||
val offsetMetadata = convertTxnOffsets(authorizedTopics)
|
||||
val offsetMetadata = convertTxnOffsets(authorizedTopics.toMap)
|
||||
groupCoordinator.handleTxnCommitOffsets(
|
||||
txnOffsetCommitRequest.consumerGroupId,
|
||||
txnOffsetCommitRequest.producerId,
|
||||
|
@ -1942,12 +1955,7 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
private def configsAuthorizationApiError(session: RequestChannel.Session, resource: RResource): ApiError = {
|
||||
val error = resource.`type` match {
|
||||
case RResourceType.BROKER => Errors.CLUSTER_AUTHORIZATION_FAILED
|
||||
case RResourceType.TOPIC =>
|
||||
// Don't leak topic name unless the user has describe topic permission
|
||||
if (authorize(session, Describe, new Resource(Topic, resource.name)))
|
||||
Errors.TOPIC_AUTHORIZATION_FAILED
|
||||
else
|
||||
Errors.UNKNOWN_TOPIC_OR_PARTITION
|
||||
case RResourceType.TOPIC => Errors.TOPIC_AUTHORIZATION_FAILED
|
||||
case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}")
|
||||
}
|
||||
new ApiError(error, null)
|
||||
|
|
|
@ -382,6 +382,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
|
|||
|
||||
private def describeLogDirsRequest = new DescribeLogDirsRequest.Builder(Collections.singleton(tp)).build()
|
||||
|
||||
private def addPartitionsToTxnRequest = new AddPartitionsToTxnRequest.Builder(transactionalId, 1, 1, Collections.singletonList(tp)).build()
|
||||
|
||||
private def addOffsetsToTxnRequest = new AddOffsetsToTxnRequest.Builder(transactionalId, 1, 1, group).build()
|
||||
|
||||
|
||||
@Test
|
||||
def testAuthorizationWithTopicExisting() {
|
||||
|
@ -413,26 +417,28 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
|
|||
ApiKeys.DESCRIBE_ACLS -> describeAclsRequest,
|
||||
ApiKeys.ALTER_REPLICA_LOG_DIRS -> alterReplicaLogDirsRequest,
|
||||
ApiKeys.DESCRIBE_LOG_DIRS -> describeLogDirsRequest,
|
||||
ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest
|
||||
ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest,
|
||||
ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest,
|
||||
ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest
|
||||
)
|
||||
|
||||
for ((key, request) <- requestKeyToRequest) {
|
||||
removeAllAcls()
|
||||
val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet
|
||||
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false)
|
||||
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false)
|
||||
|
||||
val resourceToAcls = requestKeysToAcls(key)
|
||||
resourceToAcls.get(topicResource).foreach { acls =>
|
||||
val describeAcls = topicDescribeAcl(topicResource)
|
||||
val isAuthorized = describeAcls == acls
|
||||
addAndVerifyAcls(describeAcls, topicResource)
|
||||
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true)
|
||||
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized)
|
||||
removeAllAcls()
|
||||
}
|
||||
|
||||
for ((resource, acls) <- resourceToAcls)
|
||||
addAndVerifyAcls(acls, resource)
|
||||
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true, isAuthorizedTopicDescribe = false)
|
||||
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -447,7 +453,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
|
|||
TestUtils.verifyTopicDeletion(zkUtils, deleteTopic, 1, servers)
|
||||
|
||||
val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
|
||||
ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = true),
|
||||
ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = false),
|
||||
ApiKeys.PRODUCE -> createProduceRequest,
|
||||
ApiKeys.FETCH -> createFetchRequest,
|
||||
|
@ -455,26 +460,29 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
|
|||
ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest,
|
||||
ApiKeys.OFFSET_FETCH -> createOffsetFetchRequest,
|
||||
ApiKeys.DELETE_TOPICS -> deleteTopicsRequest,
|
||||
ApiKeys.DELETE_RECORDS -> deleteRecordsRequest
|
||||
ApiKeys.DELETE_RECORDS -> deleteRecordsRequest,
|
||||
ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest,
|
||||
ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest,
|
||||
ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest
|
||||
)
|
||||
|
||||
for ((key, request) <- requestKeyToRequest) {
|
||||
removeAllAcls()
|
||||
val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet
|
||||
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false, topicExists = false)
|
||||
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, topicExists = false)
|
||||
|
||||
val resourceToAcls = requestKeysToAcls(key)
|
||||
resourceToAcls.get(topicResource).foreach { acls =>
|
||||
val describeAcls = topicDescribeAcl(topicResource)
|
||||
val isAuthorized = describeAcls == acls
|
||||
addAndVerifyAcls(describeAcls, topicResource)
|
||||
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true, topicExists = false)
|
||||
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, topicExists = false)
|
||||
removeAllAcls()
|
||||
}
|
||||
|
||||
for ((resource, acls) <- resourceToAcls)
|
||||
addAndVerifyAcls(acls, resource)
|
||||
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true, isAuthorizedTopicDescribe = false, topicExists = false)
|
||||
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true, topicExists = false)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -484,7 +492,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
|
|||
sendRecords(numRecords, tp)
|
||||
fail("should have thrown exception")
|
||||
} catch {
|
||||
case _: TimeoutException => //expected
|
||||
case _: TopicAuthorizationException => //expected
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -534,8 +542,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
|
|||
sendRecords(numRecords, topicPartition)
|
||||
}
|
||||
|
||||
@Test(expected = classOf[GroupAuthorizationException])
|
||||
def testConsumeWithNoAccess(): Unit = {
|
||||
@Test(expected = classOf[TopicAuthorizationException])
|
||||
def testConsumeUsingAssignWithNoAccess(): Unit = {
|
||||
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
|
||||
sendRecords(1, tp)
|
||||
removeAllAcls()
|
||||
|
@ -893,10 +901,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
|
|||
this.consumers.head.position(tp)
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(expected = classOf[TopicAuthorizationException])
|
||||
def testListOffsetsWithNoTopicAccess() {
|
||||
val partitionInfos = this.consumers.head.partitionsFor(topic)
|
||||
assertNull(partitionInfos)
|
||||
this.consumers.head.partitionsFor(topic)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -935,7 +942,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
|
|||
val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
|
||||
val version = ApiKeys.DELETE_TOPICS.latestVersion
|
||||
val deleteResponse = DeleteTopicsResponse.parse(response, version)
|
||||
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, deleteResponse.errors.asScala.head._2)
|
||||
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, deleteResponse.errors.asScala.head._2)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -963,7 +970,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
|
|||
val response = connectAndSend(deleteRecordsRequest, ApiKeys.DELETE_RECORDS)
|
||||
val version = ApiKeys.DELETE_RECORDS.latestVersion
|
||||
val deleteRecordsResponse = DeleteRecordsResponse.parse(response, version)
|
||||
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, deleteRecordsResponse.responses.asScala.head._2.error)
|
||||
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, deleteRecordsResponse.responses.asScala.head._2.error)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -990,7 +997,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
|
|||
val response = connectAndSend(createPartitionsRequest, ApiKeys.CREATE_PARTITIONS)
|
||||
val version = ApiKeys.CREATE_PARTITIONS.latestVersion
|
||||
val createPartitionsResponse = CreatePartitionsResponse.parse(response, version)
|
||||
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, createPartitionsResponse.errors.asScala.head._2.error)
|
||||
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, createPartitionsResponse.errors.asScala.head._2.error)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1240,7 +1247,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
|
|||
request: AbstractRequest,
|
||||
resources: Set[ResourceType],
|
||||
isAuthorized: Boolean,
|
||||
isAuthorizedTopicDescribe: Boolean,
|
||||
topicExists: Boolean = true): AbstractResponse = {
|
||||
val resp = connectAndSend(request, apiKey)
|
||||
val response = requestKeyToResponseDeserializer(apiKey).getMethod("parse", classOf[ByteBuffer], classOf[Short]).invoke(
|
||||
|
@ -1251,8 +1257,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
|
|||
if (resourceType == Topic) {
|
||||
if (isAuthorized)
|
||||
Set(Errors.UNKNOWN_TOPIC_OR_PARTITION, Topic.error)
|
||||
else if (!isAuthorizedTopicDescribe)
|
||||
Set(Errors.UNKNOWN_TOPIC_OR_PARTITION)
|
||||
else
|
||||
Set(Topic.error)
|
||||
} else {
|
||||
|
@ -1266,9 +1270,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
|
|||
else
|
||||
assertTrue(s"$apiKey should be forbidden. Found error $error but expected one of $authorizationErrors", authorizationErrors.contains(error))
|
||||
else if (resources == Set(Topic))
|
||||
assertEquals(s"$apiKey had an unexpected error", Errors.UNKNOWN_TOPIC_OR_PARTITION, error)
|
||||
else
|
||||
assertNotEquals(s"$apiKey had an unexpected error", Errors.TOPIC_AUTHORIZATION_FAILED, error)
|
||||
if (isAuthorized)
|
||||
assertEquals(s"$apiKey had an unexpected error", Errors.UNKNOWN_TOPIC_OR_PARTITION, error)
|
||||
else
|
||||
assertEquals(s"$apiKey had an unexpected error", Errors.TOPIC_AUTHORIZATION_FAILED, error)
|
||||
|
||||
response
|
||||
}
|
||||
|
|
|
@ -214,7 +214,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
|
|||
* Tests that a producer fails to publish messages when the appropriate ACL
|
||||
* isn't set.
|
||||
*/
|
||||
@Test(expected = classOf[TimeoutException])
|
||||
@Test(expected = classOf[TopicAuthorizationException])
|
||||
def testNoProduceWithoutDescribeAcl(): Unit = {
|
||||
sendRecords(numRecords, tp)
|
||||
}
|
||||
|
@ -246,7 +246,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
|
|||
consumeRecords(this.consumers.head)
|
||||
}
|
||||
|
||||
@Test(expected = classOf[TimeoutException])
|
||||
@Test(expected = classOf[TopicAuthorizationException])
|
||||
def testNoConsumeWithoutDescribeAclViaSubscribe(): Unit = {
|
||||
noConsumeWithoutDescribeAclSetup()
|
||||
consumers.head.subscribe(List(topic).asJava)
|
||||
|
|
|
@ -21,8 +21,8 @@ import java.util.Properties
|
|||
import kafka.utils.TestUtils
|
||||
import kafka.utils.Implicits._
|
||||
import org.apache.kafka.common.config.SaslConfigs
|
||||
import org.apache.kafka.common.errors.GroupAuthorizationException
|
||||
import org.apache.kafka.common.security.auth.SecurityProtocol
|
||||
import org.apache.kafka.common.errors.TopicAuthorizationException
|
||||
import org.junit.{Before, Test}
|
||||
|
||||
import scala.collection.immutable.List
|
||||
|
@ -77,9 +77,9 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
|
|||
|
||||
try {
|
||||
consumeRecords(consumer2)
|
||||
fail("Expected exception as consumer2 has no access to group")
|
||||
fail("Expected exception as consumer2 has no access to topic")
|
||||
} catch {
|
||||
case _: GroupAuthorizationException => //expected
|
||||
case _: TopicAuthorizationException => //expected
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -86,6 +86,10 @@
|
|||
inversion bug, it was previously enabled by default and disabled if <code>kafka_mx4jenable</code> was set to <code>true</code>.</li>
|
||||
<li>The package <code>org.apache.kafka.common.security.auth</code> in the clients jar has been made public and added to the javadocs.
|
||||
Internal classes which had previously been located in this package have been moved elsewhere.</li>
|
||||
<li>When using an Authorizer and a user doesn't have required permissions on a topic, the broker
|
||||
will return TOPIC_AUTHORIZATION_FAILED errors to requests irrespective of topic existence on broker.
|
||||
If the user have required permissions and the topic doesn't exists, then the UNKNOWN_TOPIC_OR_PARTITION
|
||||
error code will be returned. </li>
|
||||
</ul>
|
||||
|
||||
<h5><a id="upgrade_100_new_protocols" href="#upgrade_100_new_protocols">New Protocol Versions</a></h5>
|
||||
|
|
Loading…
Reference in New Issue