KAFKA-19047: Allow quickly re-registering brokers that are in controlled shutdown (#19296)

Allow re-registration of brokers with active sessions if the previous broker registration was in controlled shutdown.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Reviewers: José Armando García Sancio <jsancio@apache.org>, David Mao <dmao@confluent.io>
This commit is contained in:
Alyssa Huang 2025-04-08 13:39:04 -07:00 committed by GitHub
parent 434b0d39ae
commit 6e446f0b05
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 130 additions and 3 deletions

View File

@ -353,10 +353,10 @@ public class ClusterControlManager {
if (existing != null) {
prevIncarnationId = existing.incarnationId();
storedBrokerEpoch = existing.epoch();
if (heartbeatManager.hasValidSession(brokerId, existing.epoch())) {
if (heartbeatManager.hasValidSession(brokerId, existing.epoch()) && !existing.inControlledShutdown()) {
if (!request.incarnationId().equals(prevIncarnationId)) {
throw new DuplicateBrokerRegistrationException("Another broker is " +
"registered with that broker id.");
throw new DuplicateBrokerRegistrationException("Another broker is registered with that broker id. If the broker " +
"was recently restarted this should self-resolve once the heartbeat manager expires the broker's session.");
}
}
}

View File

@ -20,6 +20,7 @@ package org.apache.kafka.controller;
import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.errors.InvalidRegistrationException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
@ -70,6 +71,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@ -978,6 +980,131 @@ public class ClusterControlManagerTest {
contactTime(new BrokerIdAndEpoch(2, 100)));
}
@Test
public void testDuplicateBrokerRegistrationWithActiveOldBroker() {
// active here means brokerHeartbeatManager last recorded the broker as unfenced and not in controlled shutdown
long brokerSessionTimeoutMs = 1000;
MockTime time = new MockTime(0L, 20L, 1000L);
FinalizedControllerFeatures finalizedFeatures = new FinalizedControllerFeatures(
Map.of(MetadataVersion.FEATURE_NAME, MetadataVersion.LATEST_PRODUCTION.featureLevel()), 456L);
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
setFeatureControlManager(createFeatureControlManager()).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
setSessionTimeoutNs(TimeUnit.MILLISECONDS.toNanos(brokerSessionTimeoutMs)).
setTime(time).
build();
clusterControl.replay(new RegisterBrokerRecord().
setBrokerEpoch(100).
setBrokerId(0).
setLogDirs(List.of(Uuid.fromString("Mj3CW3OSRi29cFeNJlXuAQ"))).
setFenced(false), 10002);
clusterControl.activate();
assertEquals(OptionalLong.of(1000L), clusterControl.heartbeatManager().tracker().
contactTime(new BrokerIdAndEpoch(0, 100)));
// while session is still valid for old broker, duplicate requests should fail
time.sleep(brokerSessionTimeoutMs / 2);
assertThrows(DuplicateBrokerRegistrationException.class, () ->
clusterControl.registerBroker(new BrokerRegistrationRequestData().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
setBrokerId(0).
setLogDirs(List.of(Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg"))).
setFeatures(new BrokerRegistrationRequestData.FeatureCollection(
Set.of(new BrokerRegistrationRequestData.Feature().
setName(MetadataVersion.FEATURE_NAME).
setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).
setMaxSupportedVersion(MetadataVersion.LATEST_PRODUCTION.featureLevel())).iterator())).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
101L,
finalizedFeatures,
false));
// if session expires for broker, even if the broker was active the new registration will succeed
time.sleep(brokerSessionTimeoutMs);
clusterControl.registerBroker(new BrokerRegistrationRequestData().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
setBrokerId(0).
setLogDirs(List.of(Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg"))).
setFeatures(new BrokerRegistrationRequestData.FeatureCollection(
Set.of(new BrokerRegistrationRequestData.Feature().
setName(MetadataVersion.FEATURE_NAME).
setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).
setMaxSupportedVersion(MetadataVersion.LATEST_PRODUCTION.featureLevel())).iterator())).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
101L,
finalizedFeatures,
false);
}
@Test
public void testDuplicateBrokerRegistrationWithInactiveBroker() {
// inactive here means brokerHeartbeatManager last recorded the broker as fenced or in controlled shutdown
long brokerSessionTimeoutMs = 1000;
MockTime time = new MockTime(0L, 20L, 1000L);
FinalizedControllerFeatures finalizedFeatures = new FinalizedControllerFeatures(
Map.of(MetadataVersion.FEATURE_NAME, MetadataVersion.LATEST_PRODUCTION.featureLevel()), 456L);
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
setFeatureControlManager(createFeatureControlManager()).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
setSessionTimeoutNs(TimeUnit.MILLISECONDS.toNanos(brokerSessionTimeoutMs)).
setTime(time).
build();
// first broker is fenced
clusterControl.replay(new RegisterBrokerRecord().
setBrokerEpoch(100).
setBrokerId(0).
setLogDirs(List.of(Uuid.fromString("Mj3CW3OSRi29cFeNJlXuAQ"))).
setFenced(true).
setInControlledShutdown(false), 10002);
// second broker is in controlled shutdown
clusterControl.replay(new RegisterBrokerRecord().
setBrokerEpoch(200).
setBrokerId(1).
setLogDirs(List.of(Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg"))).
setFenced(false).
setInControlledShutdown(true), 20002);
clusterControl.activate();
clusterControl.heartbeatManager().maybeUpdateControlledShutdownOffset(1, 20002);
assertEquals(OptionalLong.of(1000L), clusterControl.heartbeatManager().tracker().
contactTime(new BrokerIdAndEpoch(0, 100)));
assertEquals(OptionalLong.of(1000L), clusterControl.heartbeatManager().tracker().
contactTime(new BrokerIdAndEpoch(1, 200)));
time.sleep(brokerSessionTimeoutMs / 2);
assertThrows(DuplicateBrokerRegistrationException.class, () ->
clusterControl.registerBroker(new BrokerRegistrationRequestData().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
setBrokerId(0).
setLogDirs(List.of(Uuid.fromString("yJGxmjfbQZSVFAlNM3uXZg"))).
setFeatures(new BrokerRegistrationRequestData.FeatureCollection(
Set.of(new BrokerRegistrationRequestData.Feature().
setName(MetadataVersion.FEATURE_NAME).
setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).
setMaxSupportedVersion(MetadataVersion.LATEST_PRODUCTION.featureLevel())).iterator())).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
101L,
finalizedFeatures,
false));
// new registration should succeed immediatelly only if the broker is in controlled shutdown,
// even if the last heartbeat was within the session timeout
clusterControl.registerBroker(new BrokerRegistrationRequestData().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
setBrokerId(1).
setLogDirs(List.of(Uuid.fromString("b66ybsWIQoygs01vdjH07A"))).
setFeatures(new BrokerRegistrationRequestData.FeatureCollection(
Set.of(new BrokerRegistrationRequestData.Feature().
setName(MetadataVersion.FEATURE_NAME).
setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).
setMaxSupportedVersion(MetadataVersion.LATEST_PRODUCTION.featureLevel())).iterator())).
setIncarnationId(Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")),
201L,
finalizedFeatures,
false);
}
private FeatureControlManager createFeatureControlManager() {
FeatureControlManager featureControlManager = new FeatureControlManager.Builder().build();
featureControlManager.replay(new FeatureLevelRecord().