KAFKA-19130: Do not add fenced brokers to BrokerRegistrationTracker on startup (#19454)

When the controller starts up (or becomes active after being inactive), we add all of the registered brokers to BrokerRegistrationTracker so that they will not be accidentally fenced the next time we are looking for a broker to fence. We do this because the state in BrokerRegistrationTracker is "soft state" (it doesn't appear in the metadata log), and the newly active controller starts off with no soft state. (Its soft state will be populated by the brokers sending heartbeat requests to it over time.)

In the case of fenced brokers, we are not worried about accidentally fencing the broker due to it being missing from
BrokerRegistrationTracker for a while (it's already fenced). Therefore, it should be reasonable to just not add fenced brokers to the tracker initially.

One case where this change will have a positive impact is for people running single-node demonstration clusters in combined KRaft mode. In that case, when the single-node cluster is taken down and restarted, it currently will have to wait about 9 seconds for the broker to come up and re-register. With this change, the broker should be able to re-register immediately (assuming the previous shutdown happened cleanly through controller shutdown.)

One possible negative impact is that if there is a controller failover, it will open a small window where a broker with the same ID as a fenced broker could re-register. However, our detection of duplicate broker IDs is best-effort (and duplicate broker IDs are an administrative mistake), so this downside seems acceptable.

Reviewers: Alyssa Huang <ahuang@confluent.io>, José Armando García Sancio <jsancio@apache.org>
This commit is contained in:
Colin Patrick McCabe 2025-04-16 11:57:35 -07:00 committed by GitHub
parent 23e7158665
commit c465abc458
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 28 additions and 26 deletions

View File

@ -309,10 +309,12 @@ public class ClusterControlManager {
long nowNs = time.nanoseconds();
for (BrokerRegistration registration : brokerRegistrations.values()) {
heartbeatManager.register(registration.id(), registration.fenced());
if (!registration.fenced()) {
heartbeatManager.tracker().updateContactTime(
new BrokerIdAndEpoch(registration.id(), registration.epoch()), nowNs);
}
}
}
String clusterId() { // Visible for testing
return clusterId;
@ -353,7 +355,7 @@ public class ClusterControlManager {
if (existing != null) {
prevIncarnationId = existing.incarnationId();
storedBrokerEpoch = existing.epoch();
if (heartbeatManager.hasValidSession(brokerId, existing.epoch()) && !existing.inControlledShutdown()) {
if (heartbeatManager.hasValidSession(brokerId, existing.epoch())) {
if (!request.incarnationId().equals(prevIncarnationId)) {
throw new DuplicateBrokerRegistrationException("Another broker is registered with that broker id. If the broker " +
"was recently restarted this should self-resolve once the heartbeat manager expires the broker's session.");

View File

@ -968,9 +968,10 @@ public class ClusterControlManagerTest {
clusterControl.replay(new RegisterBrokerRecord().
setBrokerEpoch(123).
setBrokerId(1).
setFenced(false).
setLogDirs(List.of(Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg"))), 10005);
clusterControl.activate();
assertEquals(OptionalLong.of(1000L), clusterControl.heartbeatManager().tracker().
assertEquals(OptionalLong.empty(), clusterControl.heartbeatManager().tracker().
contactTime(new BrokerIdAndEpoch(0, 100)));
assertEquals(OptionalLong.of(1000L), clusterControl.heartbeatManager().tracker().
contactTime(new BrokerIdAndEpoch(1, 123)));
@ -1068,13 +1069,12 @@ public class ClusterControlManagerTest {
clusterControl.activate();
clusterControl.heartbeatManager().maybeUpdateControlledShutdownOffset(1, 20002);
assertEquals(OptionalLong.of(1000L), clusterControl.heartbeatManager().tracker().
assertEquals(OptionalLong.empty(), clusterControl.heartbeatManager().tracker().
contactTime(new BrokerIdAndEpoch(0, 100)));
assertEquals(OptionalLong.of(1000L), clusterControl.heartbeatManager().tracker().
contactTime(new BrokerIdAndEpoch(1, 200)));
time.sleep(brokerSessionTimeoutMs / 2);
assertThrows(DuplicateBrokerRegistrationException.class, () ->
clusterControl.registerBroker(new BrokerRegistrationRequestData().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
setBrokerId(0).
@ -1087,9 +1087,8 @@ public class ClusterControlManagerTest {
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
101L,
finalizedFeatures,
false));
// new registration should succeed immediatelly only if the broker is in controlled shutdown,
// even if the last heartbeat was within the session timeout
false);
assertThrows(DuplicateBrokerRegistrationException.class, () -> {
clusterControl.registerBroker(new BrokerRegistrationRequestData().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
setBrokerId(1).
@ -1103,6 +1102,7 @@ public class ClusterControlManagerTest {
201L,
finalizedFeatures,
false);
});
}
private FeatureControlManager createFeatureControlManager() {