From 88a23dab3ea6f76cc32066066149ce7843bf24ab Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Fri, 28 Feb 2025 01:30:51 +0800 Subject: [PATCH] KAFKA-18844: Stale features information in QuorumController#registerBroker (#18997) In https://github.com/apache/kafka/pull/16848, we added `kraft.version` to finalized features and got finalized features outside controller event handling thread. This may make finalized features stale when processing `registerBroker` event. Also, some cases like `QuorumControllerTest.testBalancePartitionLeaders` become flaky cause of outdated MV. This PR moves finalized features back to controller event handling thread to avoid the error. Reviewers: Ismael Juma , Jun Rao , Colin P. McCabe , Chia-Ping Tsai --- .../kafka/controller/QuorumController.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index bad7c8fbb84..fc5f99358f2 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -2004,14 +2004,17 @@ public final class QuorumController implements Controller { ControllerRequestContext context, BrokerRegistrationRequestData request ) { - // populate finalized features map with latest known kraft version for validation - Map controllerFeatures = new HashMap<>(featureControl.finalizedFeatures(Long.MAX_VALUE).featureMap()); - controllerFeatures.put(KRaftVersion.FEATURE_NAME, raftClient.kraftVersion().featureLevel()); return appendWriteEvent("registerBroker", context.deadlineNs(), - () -> clusterControl. - registerBroker(request, offsetControl.nextWriteOffset(), - new FinalizedControllerFeatures(controllerFeatures, Long.MAX_VALUE), - context.requestHeader().requestApiVersion() >= 3), + () -> { + // Read and write data in the controller event handling thread to avoid stale information. + Map controllerFeatures = new HashMap<>(featureControl.finalizedFeatures(Long.MAX_VALUE).featureMap()); + // Populate finalized features map with latest known kraft version for validation. + controllerFeatures.put(KRaftVersion.FEATURE_NAME, raftClient.kraftVersion().featureLevel()); + return clusterControl. + registerBroker(request, offsetControl.nextWriteOffset(), + new FinalizedControllerFeatures(controllerFeatures, Long.MAX_VALUE), + context.requestHeader().requestApiVersion() >= 3); + }, EnumSet.noneOf(ControllerOperationFlag.class)); }