KAFKA-18764: Throttle on share state RPCs auth failure. (#18855)

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Sushant Mahajan 2025-02-11 15:24:24 +05:30 committed by GitHub
parent ece91e9247
commit 675a0889de
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 101 additions and 10 deletions

View File

@ -18,12 +18,14 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -110,4 +112,16 @@ public class DeleteShareGroupStateResponse extends AbstractResponse {
.setErrorCode(error.code())
.setErrorMessage(errorMessage)))));
}
public static DeleteShareGroupStateResponseData toGlobalErrorResponse(DeleteShareGroupStateRequestData request, Errors error) {
List<DeleteShareGroupStateResponseData.DeleteStateResult> deleteStateResults = new ArrayList<>();
request.topics().forEach(topicData -> {
List<DeleteShareGroupStateResponseData.PartitionResult> partitionResults = new ArrayList<>();
topicData.partitions().forEach(partitionData -> partitionResults.add(
toErrorResponsePartitionResult(partitionData.partition(), error, error.message()))
);
deleteStateResults.add(toResponseDeleteStateResult(topicData.topicId(), partitionResults));
});
return new DeleteShareGroupStateResponseData().setResults(deleteStateResults);
}
}

View File

@ -18,12 +18,14 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -112,4 +114,16 @@ public class ReadShareGroupStateResponse extends AbstractResponse {
.setTopicId(topicId)
.setPartitions(partitionResults);
}
public static ReadShareGroupStateResponseData toGlobalErrorResponse(ReadShareGroupStateRequestData request, Errors error) {
List<ReadShareGroupStateResponseData.ReadStateResult> readStateResults = new ArrayList<>();
request.topics().forEach(topicData -> {
List<ReadShareGroupStateResponseData.PartitionResult> partitionResults = new ArrayList<>();
topicData.partitions().forEach(partitionData -> partitionResults.add(
toErrorResponsePartitionResult(partitionData.partition(), error, error.message()))
);
readStateResults.add(toResponseReadStateResult(topicData.topicId(), partitionResults));
});
return new ReadShareGroupStateResponseData().setResults(readStateResults);
}
}

View File

@ -18,12 +18,14 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -121,4 +123,16 @@ public class ReadShareGroupStateSummaryResponse extends AbstractResponse {
.setTopicId(topicId)
.setPartitions(partitionResults);
}
public static ReadShareGroupStateSummaryResponseData toGlobalErrorResponse(ReadShareGroupStateSummaryRequestData request, Errors error) {
List<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> readStateSummaryResults = new ArrayList<>();
request.topics().forEach(topicData -> {
List<ReadShareGroupStateSummaryResponseData.PartitionResult> partitionResults = new ArrayList<>();
topicData.partitions().forEach(partitionData -> partitionResults.add(
toErrorResponsePartitionResult(partitionData.partition(), error, error.message()))
);
readStateSummaryResults.add(toResponseReadStateSummaryResult(topicData.topicId(), partitionResults));
});
return new ReadShareGroupStateSummaryResponseData().setResults(readStateSummaryResults);
}
}

View File

@ -18,12 +18,14 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -107,4 +109,16 @@ public class WriteShareGroupStateResponse extends AbstractResponse {
return new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId);
}
public static WriteShareGroupStateResponseData toGlobalErrorResponse(WriteShareGroupStateRequestData request, Errors error) {
List<WriteShareGroupStateResponseData.WriteStateResult> writeStateResults = new ArrayList<>();
request.topics().forEach(topicData -> {
List<WriteShareGroupStateResponseData.PartitionResult> partitionResults = new ArrayList<>();
topicData.partitions().forEach(partitionData -> partitionResults.add(
toErrorResponsePartitionResult(partitionData.partition(), error, error.message()))
);
writeStateResults.add(toResponseWriteStateResult(topicData.topicId(), partitionResults));
});
return new WriteShareGroupStateResponseData().setResults(writeStateResults);
}
}

View File

@ -3109,7 +3109,14 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleReadShareGroupStateRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val readShareGroupStateRequest = request.body[ReadShareGroupStateRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
if (!authorizeClusterOperation(request, CLUSTER_ACTION)) {
requestHelper.sendMaybeThrottle(request, new ReadShareGroupStateResponse(
ReadShareGroupStateResponse.toGlobalErrorResponse(
readShareGroupStateRequest.data(),
Errors.CLUSTER_AUTHORIZATION_FAILED
)))
return CompletableFuture.completedFuture[Unit](())
}
shareCoordinator match {
case None => requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
@ -3128,19 +3135,26 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleWriteShareGroupStateRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val writeShareRequest = request.body[WriteShareGroupStateRequest]
val writeShareGroupStateRequest = request.body[WriteShareGroupStateRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
if (!authorizeClusterOperation(request, CLUSTER_ACTION)) {
requestHelper.sendMaybeThrottle(request, new WriteShareGroupStateResponse(
WriteShareGroupStateResponse.toGlobalErrorResponse(
writeShareGroupStateRequest.data(),
Errors.CLUSTER_AUTHORIZATION_FAILED
)))
return CompletableFuture.completedFuture[Unit](())
}
shareCoordinator match {
case None => requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
writeShareRequest.getErrorResponse(requestThrottleMs,
writeShareGroupStateRequest.getErrorResponse(requestThrottleMs,
new ApiException("Share coordinator is not enabled.")))
CompletableFuture.completedFuture[Unit](())
case Some(coordinator) => coordinator.writeState(request.context, writeShareRequest.data)
case Some(coordinator) => coordinator.writeState(request.context, writeShareGroupStateRequest.data)
.handle[Unit] { (response, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request, writeShareRequest.getErrorResponse(exception))
requestHelper.sendMaybeThrottle(request, writeShareGroupStateRequest.getErrorResponse(exception))
} else {
requestHelper.sendMaybeThrottle(request, new WriteShareGroupStateResponse(response))
}
@ -3148,14 +3162,23 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
def handleDeleteShareGroupStateRequest(request: RequestChannel.Request): Unit = {
def handleDeleteShareGroupStateRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val deleteShareGroupStateRequest = request.body[DeleteShareGroupStateRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
if (!authorizeClusterOperation(request, CLUSTER_ACTION)) {
requestHelper.sendMaybeThrottle(request, new DeleteShareGroupStateResponse(
DeleteShareGroupStateResponse.toGlobalErrorResponse(
deleteShareGroupStateRequest.data(),
Errors.CLUSTER_AUTHORIZATION_FAILED
)))
return CompletableFuture.completedFuture[Unit](())
}
shareCoordinator match {
case None => requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
deleteShareGroupStateRequest.getErrorResponse(requestThrottleMs,
new ApiException("Share coordinator is not enabled.")))
CompletableFuture.completedFuture[Unit](())
case Some(coordinator) => coordinator.deleteState(request.context, deleteShareGroupStateRequest.data)
.handle[Unit] { (response, exception) =>
@ -3168,9 +3191,17 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
def handleReadShareGroupStateSummaryRequest(request: RequestChannel.Request): Unit = {
def handleReadShareGroupStateSummaryRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val readShareGroupStateSummaryRequest = request.body[ReadShareGroupStateSummaryRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
if (!authorizeClusterOperation(request, CLUSTER_ACTION)) {
requestHelper.sendMaybeThrottle(request, new ReadShareGroupStateSummaryResponse(
ReadShareGroupStateSummaryResponse.toGlobalErrorResponse(
readShareGroupStateSummaryRequest.data(),
Errors.CLUSTER_AUTHORIZATION_FAILED
)))
return CompletableFuture.completedFuture[Unit](())
}
shareCoordinator match {
case None => requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
@ -3482,6 +3513,10 @@ class KafkaApis(val requestChannel: RequestChannel,
}
request.temporaryMemoryBytes = conversionStats.temporaryMemoryBytes
}
def authorizeClusterOperation(request: RequestChannel.Request, operation: AclOperation): Boolean = {
authHelper.authorize(request.context, operation, CLUSTER, CLUSTER_NAME)
}
}
object KafkaApis {