Propagate CLUSTER_AUTHORIZATION_FAILED

This commit is contained in:
ChangYu Huang 2025-09-01 20:46:09 -04:00
parent 96dfa85c81
commit 46949fc547
1 changed files with 3 additions and 0 deletions

View File

@ -89,6 +89,7 @@ import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.DisconnectException;
@ -3115,6 +3116,8 @@ public class KafkaAdminClient extends AdminClient {
// No replica info will be provided if the log directory is offline // No replica info will be provided if the log directory is offline
if (logDirInfo.error() instanceof KafkaStorageException) if (logDirInfo.error() instanceof KafkaStorageException)
continue; continue;
if (logDirInfo.error() instanceof ClusterAuthorizationException)
handleFailure(logDirInfo.error());
if (logDirInfo.error() != null) if (logDirInfo.error() != null)
handleFailure(new IllegalStateException( handleFailure(new IllegalStateException(
"The error " + logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in the response from broker " + brokerId + " is illegal")); "The error " + logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in the response from broker " + brokerId + " is illegal"));