This commit is contained in:
Chang-Yu Huang 2025-10-07 15:53:44 -04:00 committed by GitHub
commit b411e4fc11
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 267 additions and 192 deletions

View File

@ -3106,37 +3106,57 @@ public class KafkaAdminClient extends AdminClient {
@Override @Override
public void handleResponse(AbstractResponse abstractResponse) { public void handleResponse(AbstractResponse abstractResponse) {
DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse; DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse;
for (Map.Entry<String, LogDirDescription> responseEntry : logDirDescriptions(response).entrySet()) {
Set<TopicPartition> pendingPartitions = new HashSet<>(replicaDirInfoByPartition.keySet());
Map<String, Throwable> directoryFailures = new HashMap<>();
Map<String, LogDirDescription> descriptions = logDirDescriptions(response);
if (descriptions.isEmpty()) {
Errors error = response.data().errorCode() == Errors.NONE.code()
? Errors.CLUSTER_AUTHORIZATION_FAILED
: Errors.forCode(response.data().errorCode());
handleFailure(error.exception(), pendingPartitions);
}
for (Map.Entry<String, LogDirDescription> responseEntry : descriptions.entrySet()) {
String logDir = responseEntry.getKey(); String logDir = responseEntry.getKey();
LogDirDescription logDirInfo = responseEntry.getValue(); LogDirDescription logDirInfo = responseEntry.getValue();
// No replica info will be provided if the log directory is offline // No replica info will be provided if the log directory is offline
if (logDirInfo.error() instanceof KafkaStorageException) if (logDirInfo.error() instanceof KafkaStorageException)
continue; continue;
if (logDirInfo.error() != null)
handleFailure(new IllegalStateException(
"The error " + logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in the response from broker " + brokerId + " is illegal"));
for (Map.Entry<TopicPartition, ReplicaInfo> replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) { if (logDirInfo.error() == null) {
TopicPartition tp = replicaInfoEntry.getKey(); for (Map.Entry<TopicPartition, ReplicaInfo> replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
ReplicaInfo replicaInfo = replicaInfoEntry.getValue(); TopicPartition tp = replicaInfoEntry.getKey();
ReplicaLogDirInfo replicaLogDirInfo = replicaDirInfoByPartition.get(tp); ReplicaInfo replicaInfo = replicaInfoEntry.getValue();
if (replicaLogDirInfo == null) { ReplicaLogDirInfo replicaLogDirInfo = replicaDirInfoByPartition.get(tp);
log.warn("Server response from broker {} mentioned unknown partition {}", brokerId, tp); if (replicaLogDirInfo == null) {
} else if (replicaInfo.isFuture()) { log.warn("Server response from broker {} mentioned unknown partition {}", brokerId, tp);
replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(), } else if (replicaInfo.isFuture()) {
replicaLogDirInfo.getCurrentReplicaOffsetLag(), replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(),
logDir, replicaLogDirInfo.getCurrentReplicaOffsetLag(),
replicaInfo.offsetLag())); logDir,
} else { replicaInfo.offsetLag()));
replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(logDir, } else {
replicaInfo.offsetLag(), replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(logDir,
replicaLogDirInfo.getFutureReplicaLogDir(), replicaInfo.offsetLag(),
replicaLogDirInfo.getFutureReplicaOffsetLag())); replicaLogDirInfo.getFutureReplicaLogDir(),
replicaLogDirInfo.getFutureReplicaOffsetLag()));
}
pendingPartitions.remove(tp);
} }
} else {
directoryFailures.put(logDir, logDirInfo.error());
} }
} }
if (!pendingPartitions.isEmpty() && !directoryFailures.isEmpty()) {
List<String> errorAtDir = new ArrayList<>();
directoryFailures.forEach((k, v) -> errorAtDir.add(v.getClass().getName() + " at " + k));
Throwable error = new IllegalStateException("The error " + String.join(", ", errorAtDir) + " in the response from broker " + brokerId + " is illegal");
handleFailure(error, pendingPartitions);
}
for (Map.Entry<TopicPartition, ReplicaLogDirInfo> entry : replicaDirInfoByPartition.entrySet()) { for (Map.Entry<TopicPartition, ReplicaLogDirInfo> entry : replicaDirInfoByPartition.entrySet()) {
TopicPartition tp = entry.getKey(); TopicPartition tp = entry.getKey();
KafkaFutureImpl<ReplicaLogDirInfo> future = futures.get(new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId)); KafkaFutureImpl<ReplicaLogDirInfo> future = futures.get(new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId));
@ -3148,6 +3168,13 @@ public class KafkaAdminClient extends AdminClient {
void handleFailure(Throwable throwable) { void handleFailure(Throwable throwable) {
completeAllExceptionally(futures.values(), throwable); completeAllExceptionally(futures.values(), throwable);
} }
void handleFailure(Throwable throwable, Collection<TopicPartition> topicPartitions) {
for (TopicPartition tp: topicPartitions) {
KafkaFutureImpl<ReplicaLogDirInfo> future = futures.get(new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId));
future.completeExceptionally(throwable);
}
}
}, now); }, now);
} }