KAFKA-18844: Stale features information in QuorumController#registerBroker (#18997)

In https://github.com/apache/kafka/pull/16848, we added `kraft.version`
to finalized features and got finalized features outside controller
event handling thread. This may make finalized features stale when
processing `registerBroker` event. Also, some cases like
`QuorumControllerTest.testBalancePartitionLeaders` become flaky cause of
outdated MV. This PR moves finalized features back to controller event
handling thread to avoid the error.

Reviewers: Ismael Juma <ijuma@apache.org>, Jun Rao <junrao@gmail.com>,
Colin P. McCabe <cmccabe@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
This commit is contained in:
PoAn Yang 2025-02-28 01:30:51 +08:00 committed by GitHub
parent d77f44414d
commit 88a23dab3e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 10 additions and 7 deletions

View File

@ -2004,14 +2004,17 @@ public final class QuorumController implements Controller {
ControllerRequestContext context,
BrokerRegistrationRequestData request
) {
// populate finalized features map with latest known kraft version for validation
Map<String, Short> controllerFeatures = new HashMap<>(featureControl.finalizedFeatures(Long.MAX_VALUE).featureMap());
controllerFeatures.put(KRaftVersion.FEATURE_NAME, raftClient.kraftVersion().featureLevel());
return appendWriteEvent("registerBroker", context.deadlineNs(),
() -> clusterControl.
registerBroker(request, offsetControl.nextWriteOffset(),
new FinalizedControllerFeatures(controllerFeatures, Long.MAX_VALUE),
context.requestHeader().requestApiVersion() >= 3),
() -> {
// Read and write data in the controller event handling thread to avoid stale information.
Map<String, Short> controllerFeatures = new HashMap<>(featureControl.finalizedFeatures(Long.MAX_VALUE).featureMap());
// Populate finalized features map with latest known kraft version for validation.
controllerFeatures.put(KRaftVersion.FEATURE_NAME, raftClient.kraftVersion().featureLevel());
return clusterControl.
registerBroker(request, offsetControl.nextWriteOffset(),
new FinalizedControllerFeatures(controllerFeatures, Long.MAX_VALUE),
context.requestHeader().requestApiVersion() >= 3);
},
EnumSet.noneOf(ControllerOperationFlag.class));
}