mirror of https://github.com/apache/kafka.git
MINOR: Add logging for ReplicationControlManager topic deletion (#17617)
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
parent
568b9e8a6c
commit
b864a66439
|
@ -939,17 +939,39 @@ public class ReplicationControlManager {
|
|||
Map<Uuid, ApiError> results = new HashMap<>(ids.size());
|
||||
List<ApiMessageAndVersion> records =
|
||||
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP, ids.size());
|
||||
StringBuilder resultsBuilder = new StringBuilder();
|
||||
String resultsPrefix = "";
|
||||
|
||||
for (Uuid id : ids) {
|
||||
String topicName = "null";
|
||||
ApiError error;
|
||||
try {
|
||||
log.trace("Starting deletion of topic with ID {}.", id);
|
||||
deleteTopic(context, id, records);
|
||||
results.put(id, ApiError.NONE);
|
||||
error = ApiError.NONE;
|
||||
} catch (ApiException e) {
|
||||
results.put(id, ApiError.fromThrowable(e));
|
||||
error = ApiError.fromThrowable(e);
|
||||
} catch (Exception e) {
|
||||
log.error("Unexpected deleteTopics error for {}", id, e);
|
||||
results.put(id, ApiError.fromThrowable(e));
|
||||
error = ApiError.fromThrowable(e);
|
||||
}
|
||||
|
||||
results.put(id, error);
|
||||
|
||||
if (!error.isFailure() || error.error() != UNKNOWN_TOPIC_ID) {
|
||||
topicName = topics.get(id).name;
|
||||
}
|
||||
|
||||
resultsBuilder.append(resultsPrefix)
|
||||
.append("{id: ").append(id)
|
||||
.append(", name: ").append(topicName)
|
||||
.append(", result: ")
|
||||
.append(error.isFailure() ? error.error() : "SUCCESS")
|
||||
.append("}");
|
||||
resultsPrefix = ", ";
|
||||
}
|
||||
|
||||
log.info("DeleteTopics result(s): {}", resultsBuilder);
|
||||
return ControllerResult.atomicOf(records, results);
|
||||
}
|
||||
|
||||
|
@ -959,8 +981,10 @@ public class ReplicationControlManager {
|
|||
throw new UnknownTopicIdException(UNKNOWN_TOPIC_ID.message());
|
||||
}
|
||||
int numPartitions = topic.parts.size();
|
||||
log.trace("Deleting topic {} with ID {} and {} partitions", topic.name, id, numPartitions);
|
||||
try {
|
||||
context.applyPartitionChangeQuota(numPartitions); // check controller mutation quota
|
||||
log.trace("Checked for a partition change quota on topic {} with ID {}", topic.name, id);
|
||||
} catch (ThrottlingQuotaExceededException e) {
|
||||
// log a message and rethrow the exception
|
||||
log.debug("Topic deletion of {} partitions not allowed because quota is violated. Delay time: {}",
|
||||
|
|
Loading…
Reference in New Issue