From 431c00d80241506ea34ea8a00f1b67034956b53d Mon Sep 17 00:00:00 2001 From: "Colin P. McCabe" Date: Mon, 12 Aug 2024 12:33:51 -0700 Subject: [PATCH] KAFKA-17190: AssignmentsManager gets stuck retrying on deleted topics (#16672) In MetadataVersion 3.7-IV2 and above, the broker's AssignmentsManager sends an RPC to the controller informing it about which directory we have chosen to place each new replica on. Unfortunately, the code does not check to see if the topic still exists in the MetadataImage before sending the RPC. It will also retry infinitely. Therefore, after a topic is created and deleted in rapid succession, we can get stuck including the now-defunct replica in our subsequent AssignReplicasToDirsRequests forever. Reviewers: Igor Soarez , Ron Dagostino Conflicts: the original PR in trunk and 3.9 was large and fixed some other issues, like batching. In order to avoid too much disruption to this older branch, this cherry-pick is minimal and just stops retrying if the AssignmentsManager receives Errors.UNKNOWN_TOPIC_ID from the controller. --- .../main/java/org/apache/kafka/server/AssignmentsManager.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java index 237cb8a25bf..11c86e0c4a3 100644 --- a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java +++ b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java @@ -426,6 +426,8 @@ public class AssignmentsManager { Errors error = Errors.forCode(partition.errorCode()); if (error == Errors.NOT_LEADER_OR_FOLLOWER) { log.info("Dropping late directory assignment for partition {} into directory {} because this broker is no longer a replica", partition, event.dirId); + } else if (error == Errors.UNKNOWN_TOPIC_ID) { + log.info("Dropping late directory assignment for partition {} into directory {} because this topic no longer exists.", partition, event.dirId); } else if (error != Errors.NONE) { log.error("Controller returned error {} for assignment of partition {} into directory {}", error.name(), partition, event.dirId);