mirror of https://github.com/apache/kafka.git
While migrating from ZK mode to KRaft mode, the broker passes through a "hybrid" phase, in which it receives LeaderAndIsrRequest and UpdateMetadataRequest RPCs from the KRaft controller. For the most part, these RPCs can be handled just like their traditional equivalents from a ZK-based controller. However, there is one thing that is different: the way topic deletions are handled. In ZK mode, there is a "deleting" state which topics enter prior to being completely removed. Partitions stay in this state until they are removed from the disks of all replicas. And partitions associated with these deleting topics show up in the UMR and LAIR as having a leader of -2 (which is not a valid broker ID, of course, because it's negative). When brokers receive these RPCs, they know to remove the associated partitions from their metadata caches, and disks. When a full UMR or ISR is sent, deleting partitions are included as well. In hybrid mode, in contrast, there is no "deleting" state. Topic deletion happens immediately. We can do this because we know that we have topic IDs that are never reused. This means that we can always tell the difference between a broker that had an old version of some topic, and a broker that has a new version that was re-created with the same name. To make this work, when handling a full UMR or LAIR, hybrid brokers must compare the full state that was sent over the wire to their own local state, and adjust accordingly. Prior to this PR, the code for handling those adjustments had several major flaws. The biggest flaw is that it did not correctly handle the "re-creation" case where a topic named FOO appears in the RPC, but with a different ID than the broker's local FOO. Another flaw is that a problem with a single partition would prevent handling the whole request. In ZkMetadataCache.scala, we handle full UMR requests from KRaft controllers by rewriting the UMR so that it contains the implied deletions. I fixed this code so that deletions always appear at the start of the list of topic states. This is important for the re-creation case since it means that a single request can both delete the old FOO and add a new FOO to the cache. Also, rather than modifying the requesst in-place, as the previous code did, I build a whole new request with the desired list of topic states. This is much safer because it avoids unforseen interactions with other parts of the code that deal with requests (like request logging). While this new copy may sound expensive, it should actually not be. We are doing a "shallow copy" which references the previous list topic state entries. I also reworked ZkMetadataCache.updateMetadata so that if a partition is re-created, it does not appear in the returned set of deleted TopicPartitions. Since this set is used only by the group manager, this seemed appropriate. (If I was in the consumer group for the previous iteration of FOO, I should still be in the consumer group for the new iteration.) On the ReplicaManager.scala side, we handle full LAIR requests by treating anything which does not appear in them as a "stray replica." (But we do not rewrite the request objects as we do with UMR.) I moved the logic for finding stray replicas from ReplicaManager into LogManager. It makes more sense there, since the information about what is on-disk is managed in LogManager. Also, the stray replica detection logic for KRaft mode is there, so it makes sense to put the stray replica detection logic for hybrid mode there as well. Since the stray replica detection is now in LogManager, I moved the unit tests there as well. Previously some of those tests had been in BrokerMetadataPublisherTest for historical reasons. The main advantage of the new LAIR logic is that it takes topic ID into account. A replica can be a stray even if the LAIR contains a topic of the given name, but a different ID. I also moved the stray replica handling earlier in the becomeLeaderOrFollower function, so that we could correctly handle the "delete and re-create FOO" case. Reviewers: David Arthur <mumrah@gmail.com> |
||
---|---|---|
.. | ||
src | ||
.gitignore |