mirror of https://github.com/apache/kafka.git
KAFKA-5994; Log ClusterAuthorizationException for all ClusterAction requests
Author: Manikumar Reddy <manikumar.reddy@gmail.com> Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com> Closes #5021 from omkreddy/KAFKA-5994-CLUSTER-AUTH
This commit is contained in:
parent
4154eb9f11
commit
e8959bd766
|
@ -181,10 +181,8 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
}
|
||||
}
|
||||
|
||||
if (!isAuthorizedClusterAction(request)) {
|
||||
sendResponseMaybeThrottle(request, throttleTimeMs => leaderAndIsrRequest.getErrorResponse(throttleTimeMs,
|
||||
Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
|
||||
} else if (isBrokerEpochStale(leaderAndIsrRequest.brokerEpoch())) {
|
||||
authorizeClusterAction(request)
|
||||
if (isBrokerEpochStale(leaderAndIsrRequest.brokerEpoch())) {
|
||||
// When the broker restarts very quickly, it is possible for this broker to receive request intended
|
||||
// for its previous generation so the broker should skip the stale request.
|
||||
info("Received LeaderAndIsr request with broker epoch " +
|
||||
|
@ -201,11 +199,8 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
// We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
|
||||
// stop serving data to clients for the topic being deleted
|
||||
val stopReplicaRequest = request.body[StopReplicaRequest]
|
||||
if (!isAuthorizedClusterAction(request)) {
|
||||
val result = stopReplicaRequest.partitions.asScala.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
|
||||
sendResponseMaybeThrottle(request, _ =>
|
||||
new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava))
|
||||
} else if (isBrokerEpochStale(stopReplicaRequest.brokerEpoch())) {
|
||||
authorizeClusterAction(request)
|
||||
if (isBrokerEpochStale(stopReplicaRequest.brokerEpoch())) {
|
||||
// When the broker restarts very quickly, it is possible for this broker to receive request intended
|
||||
// for its previous generation so the broker should skip the stale request.
|
||||
info("Received stop replica request with broker epoch " +
|
||||
|
@ -234,9 +229,8 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
val correlationId = request.header.correlationId
|
||||
val updateMetadataRequest = request.body[UpdateMetadataRequest]
|
||||
|
||||
if (!isAuthorizedClusterAction(request)) {
|
||||
sendResponseMaybeThrottle(request, _ => new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED))
|
||||
} else if (isBrokerEpochStale(updateMetadataRequest.brokerEpoch())) {
|
||||
authorizeClusterAction(request)
|
||||
if (isBrokerEpochStale(updateMetadataRequest.brokerEpoch())) {
|
||||
// When the broker restarts very quickly, it is possible for this broker to receive request intended
|
||||
// for its previous generation so the broker should skip the stale request.
|
||||
info("Received update metadata request with broker epoch " +
|
||||
|
|
Loading…
Reference in New Issue