Fix CLUSTER_AUTHORIZATION_FAILED propagation in correct level

This commit is contained in:
ChangYu Huang 2025-09-08 00:40:18 -04:00
parent 96031a4237
commit cecbc237ec
2 changed files with 18 additions and 12 deletions

View File

@ -91,7 +91,6 @@ import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTopicException;
@ -3112,16 +3111,21 @@ public class KafkaAdminClient extends AdminClient {
Set<TopicPartition> pendingPartitions = new HashSet<>(replicaDirInfoByPartition.keySet());
Map<String, Throwable> directoryFailures = new HashMap<>();
Map<String, LogDirDescription> descriptions = logDirDescriptions(response);
if (descriptions.isEmpty()) {
Errors error = response.data().errorCode() == Errors.NONE.code()
? Errors.CLUSTER_AUTHORIZATION_FAILED
: Errors.forCode(response.data().errorCode());
handleFailure(error.exception(), pendingPartitions);
}
for (Map.Entry<String, LogDirDescription> responseEntry : logDirDescriptions(response).entrySet()) {
for (Map.Entry<String, LogDirDescription> responseEntry : descriptions.entrySet()) {
String logDir = responseEntry.getKey();
LogDirDescription logDirInfo = responseEntry.getValue();
// No replica info will be provided if the log directory is offline
if (logDirInfo.error() instanceof KafkaStorageException)
continue;
if (logDirInfo.error() instanceof ClusterAuthorizationException)
handleFailure(logDirInfo.error());
if (logDirInfo.error() == null) {
for (Map.Entry<TopicPartition, ReplicaInfo> replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
@ -3152,10 +3156,7 @@ public class KafkaAdminClient extends AdminClient {
List<String> errorAtDir = new ArrayList<>();
directoryFailures.forEach((k, v) -> errorAtDir.add(v.getClass().getName() + " at " + k));
Throwable error = new IllegalStateException("The error " + String.join(", ", errorAtDir) + " in the response from broker " + brokerId + " is illegal");
for (TopicPartition tp: pendingPartitions) {
KafkaFutureImpl<ReplicaLogDirInfo> future = futures.get(new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId));
future.completeExceptionally(error);
}
handleFailure(error, pendingPartitions);
}
for (Map.Entry<TopicPartition, ReplicaLogDirInfo> entry : replicaDirInfoByPartition.entrySet()) {
@ -3169,6 +3170,13 @@ public class KafkaAdminClient extends AdminClient {
void handleFailure(Throwable throwable) {
completeAllExceptionally(futures.values(), throwable);
}
void handleFailure(Throwable throwable, Collection<TopicPartition> topicPartitions) {
for (TopicPartition tp: topicPartitions) {
KafkaFutureImpl<ReplicaLogDirInfo> future = futures.get(new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId));
future.completeExceptionally(throwable);
}
}
}, now);
}

View File

@ -2524,10 +2524,8 @@ public class KafkaAdminClientTest {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
String broker1log0 = "/var/data/kafka0";
env.kafkaClient().prepareResponseFrom(
prepareDescribeLogDirsResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, broker1log0),
env.cluster().nodeById(tpr.brokerId()));
DescribeLogDirsResponse response = new DescribeLogDirsResponse(new DescribeLogDirsResponseData().setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()));
env.kafkaClient().prepareResponseFrom(response, env.cluster().nodeById(tpr.brokerId()));
DescribeReplicaLogDirsResult result = env.adminClient().describeReplicaLogDirs(List.of(tpr));
Map<TopicPartitionReplica, KafkaFuture<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> values = result.values();