KAFKA-16952: Do not bump broker epoch when re-registering the same incarnation (#16333)

* KAFKA-16952: Do not bump broker epoch when re-registering the same incarnation

As part of KIP-858 (Handle JBOD broker disk failure in KRaft), we added some code that caused the
broker to re-register itself when transitioning from a MetadataVersion that did not support broker
directory IDs, to one that did. This code was necessary because otherwise the controller would not
be aware of what directories the broker held.

However, prior to this PR, the re-registration process acted exactly like a full registration. That
is, it bumped the broker epoch (which is meant to only be bumped on broker restart). This PR fixes
the code to keep the broker epoch the same if the incarnation ID is the same.

There are some other minor improvements here:

- The previous logic relied on a complicated combination of request version and previous broker
  epoch to understand if the request came from the same broker or not. This is not needed: either
  the incarnation ID is the same and it's the same process, or it is not and it isn't.

- We now log whether we're amending a registration, registering a previously unknown broker, or
  replacing a previous registration.

- Move changes to the HeartbeatManager to the end of the function, so that we will not do them if
  any validation step fails. Log4j messages are also generated at the end, for the same reason.

Reviewers: Ron Dagostino <rndgstn@gmail.com>
This commit is contained in:
Colin Patrick McCabe 2024-06-18 07:03:15 -07:00 committed by GitHub
parent 191b6476d7
commit 2fd00ce536
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 91 additions and 36 deletions

View File

@ -313,6 +313,10 @@ public class ClusterControlManager {
}
}
String clusterId() { // Visible for testing
return clusterId;
}
/**
* Transition this ClusterControlManager to standby.
*/
@ -340,10 +344,10 @@ public class ClusterControlManager {
* Process an incoming broker registration request.
*/
public ControllerResult<BrokerRegistrationReply> registerBroker(
BrokerRegistrationRequestData request,
long brokerEpoch,
FinalizedControllerFeatures finalizedFeatures,
short version) {
BrokerRegistrationRequestData request,
long newBrokerEpoch,
FinalizedControllerFeatures finalizedFeatures
) {
if (heartbeatManager == null) {
throw new RuntimeException("ClusterControlManager is not active.");
}
@ -354,21 +358,14 @@ public class ClusterControlManager {
int brokerId = request.brokerId();
List<ApiMessageAndVersion> records = new ArrayList<>();
BrokerRegistration existing = brokerRegistrations.get(brokerId);
if (version < 2 || existing == null || request.previousBrokerEpoch() != existing.epoch()) {
log.debug("Received an unclean shutdown request");
brokerUncleanShutdownHandler.addRecordsForShutdown(request.brokerId(), records);
}
Uuid prevIncarnationId = null;
if (existing != null) {
prevIncarnationId = existing.incarnationId();
if (heartbeatManager.hasValidSession(brokerId)) {
if (!existing.incarnationId().equals(request.incarnationId())) {
if (!request.incarnationId().equals(prevIncarnationId)) {
throw new DuplicateBrokerRegistrationException("Another broker is " +
"registered with that broker id.");
}
} else {
if (!existing.incarnationId().equals(request.incarnationId())) {
// Remove any existing session for the old broker incarnation.
heartbeatManager.remove(brokerId);
}
}
}
@ -406,16 +403,9 @@ public class ClusterControlManager {
setBrokerId(brokerId).
setIsMigratingZkBroker(request.isMigratingZkBroker()).
setIncarnationId(request.incarnationId()).
setBrokerEpoch(brokerEpoch).
setRack(request.rack()).
setEndPoints(listenerInfo.toBrokerRegistrationRecord());
if (existing != null && request.incarnationId().equals(existing.incarnationId())) {
log.info("Amending registration of broker {}", request.brokerId());
record.setFenced(existing.fenced());
record.setInControlledShutdown(existing.inControlledShutdown());
}
for (BrokerRegistrationRequestData.Feature feature : request.features()) {
record.features().add(processRegistrationFeature(brokerId, finalizedFeatures, feature));
}
@ -432,11 +422,39 @@ public class ClusterControlManager {
record.setLogDirs(request.logDirs());
}
heartbeatManager.register(brokerId, record.fenced());
if (!request.incarnationId().equals(prevIncarnationId)) {
int prevNumRecords = records.size();
brokerUncleanShutdownHandler.addRecordsForShutdown(request.brokerId(), records);
int numRecordsAdded = records.size() - prevNumRecords;
if (existing == null) {
log.info("No previous registration found for broker {}. New incarnation ID is " +
"{}. Generated {} record(s) to clean up previous incarnations. New broker " +
"epoch is {}.", brokerId, request.incarnationId(), numRecordsAdded, newBrokerEpoch);
} else {
log.info("Registering a new incarnation of broker {}. Previous incarnation ID " +
"was {}; new incarnation ID is {}. Generated {} record(s) to clean up " +
"previous incarnations. Broker epoch will become {}.", brokerId,
existing.incarnationId(), request.incarnationId(), numRecordsAdded,
newBrokerEpoch);
}
record.setBrokerEpoch(newBrokerEpoch);
} else {
log.info("Amending registration of broker {}, incarnation ID {}. Broker epoch remains {}.",
request.brokerId(), request.incarnationId(), existing.epoch());
record.setFenced(existing.fenced());
record.setInControlledShutdown(existing.inControlledShutdown());
record.setBrokerEpoch(existing.epoch());
}
records.add(new ApiMessageAndVersion(record, featureControl.metadataVersion().
registerBrokerRecordVersion()));
return ControllerResult.atomicOf(records, new BrokerRegistrationReply(brokerEpoch));
if (!request.incarnationId().equals(prevIncarnationId)) {
// Remove any existing session for the old broker incarnation.
heartbeatManager.remove(brokerId);
}
heartbeatManager.register(brokerId, record.fenced());
return ControllerResult.atomicOf(records, new BrokerRegistrationReply(record.brokerEpoch()));
}
ControllerResult<Void> registerController(ControllerRegistrationRequestData request) {

View File

@ -2177,7 +2177,7 @@ public final class QuorumController implements Controller {
() -> {
ControllerResult<BrokerRegistrationReply> result = clusterControl.
registerBroker(request, offsetControl.nextWriteOffset(), featureControl.
finalizedFeatures(Long.MAX_VALUE), context.requestHeader().requestApiVersion());
finalizedFeatures(Long.MAX_VALUE));
rescheduleMaybeFenceStaleBrokers();
return result;
},

View File

@ -50,7 +50,6 @@ import org.apache.kafka.metadata.placement.PlacementSpec;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
@ -61,6 +60,7 @@ import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
@ -276,8 +276,7 @@ public class ClusterControlManagerTest {
setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L,
new FinalizedControllerFeatures(Collections.emptyMap(), 456L),
(short) 1));
new FinalizedControllerFeatures(Collections.emptyMap(), 456L)));
}
private static Stream<Arguments> metadataVersions() {
@ -322,8 +321,7 @@ public class ClusterControlManagerTest {
setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L,
new FinalizedControllerFeatures(Collections.emptyMap(), 456L),
(short) 1);
new FinalizedControllerFeatures(Collections.emptyMap(), 456L));
short expectedVersion = metadataVersion.registerBrokerRecordVersion();
@ -564,8 +562,7 @@ public class ClusterControlManagerTest {
setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L,
featureControl.finalizedFeatures(Long.MAX_VALUE),
(short) 1)).getMessage());
featureControl.finalizedFeatures(Long.MAX_VALUE))).getMessage());
assertEquals("Unable to register because the broker does not support version 4 of " +
"metadata.version. It wants a version between 7 and 7, inclusive.",
@ -582,8 +579,7 @@ public class ClusterControlManagerTest {
setMaxSupportedVersion(MetadataVersion.IBP_3_3_IV3.featureLevel())).iterator())).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L,
featureControl.finalizedFeatures(Long.MAX_VALUE),
(short) 1)).getMessage());
featureControl.finalizedFeatures(Long.MAX_VALUE))).getMessage());
}
@Test
@ -637,10 +633,10 @@ public class ClusterControlManagerTest {
void registerNewBrokerWithDirs(ClusterControlManager clusterControl, int brokerId, List<Uuid> dirs) {
BrokerRegistrationRequestData data = new BrokerRegistrationRequestData().setBrokerId(brokerId)
.setClusterId(TestUtils.fieldValue(clusterControl, ClusterControlManager.class, "clusterId"))
.setClusterId(clusterControl.clusterId())
.setIncarnationId(Uuid.randomUuid()).setLogDirs(dirs);
FinalizedControllerFeatures finalizedFeatures = new FinalizedControllerFeatures(Collections.emptyMap(), 456L);
ControllerResult<BrokerRegistrationReply> result = clusterControl.registerBroker(data, 123L, finalizedFeatures, (short) 1);
ControllerResult<BrokerRegistrationReply> result = clusterControl.registerBroker(data, 123L, finalizedFeatures);
RecordTestUtils.replayAll(clusterControl, result.records());
}
@ -681,4 +677,45 @@ public class ClusterControlManagerTest {
assertEquals(DirectoryId.UNASSIGNED, clusterControl.defaultDir(3));
assertEquals(DirectoryId.UNASSIGNED, clusterControl.defaultDir(4));
}
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testReRegistrationAndBrokerEpoch(boolean newIncarnationId) {
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
setFeatureControlManager(new FeatureControlManager.Builder().build()).
setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
build();
clusterControl.activate();
RecordTestUtils.replayAll(clusterControl, clusterControl.registerBroker(
new BrokerRegistrationRequestData().
setBrokerId(1).
setClusterId(clusterControl.clusterId()).
setIncarnationId(Uuid.fromString("mISEfEFwQIuaD1gKCc5tzQ")).
setLogDirs(Arrays.asList(Uuid.fromString("Vv1gzkM2QpuE-PPrIc6XEw"))),
100,
new FinalizedControllerFeatures(Collections.emptyMap(), 100L)).
records());
RecordTestUtils.replayAll(clusterControl, clusterControl.registerBroker(
new BrokerRegistrationRequestData().
setBrokerId(1).
setClusterId(clusterControl.clusterId()).
setIncarnationId(newIncarnationId ?
Uuid.fromString("07OOcU7MQFeSmGAFPP2Zww") : Uuid.fromString("mISEfEFwQIuaD1gKCc5tzQ")).
setLogDirs(Arrays.asList(Uuid.fromString("Vv1gzkM2QpuE-PPrIc6XEw"))),
111,
new FinalizedControllerFeatures(Collections.emptyMap(), 100L)).
records());
if (newIncarnationId) {
assertEquals(Uuid.fromString("07OOcU7MQFeSmGAFPP2Zww"),
clusterControl.brokerRegistrations().get(1).incarnationId());
assertEquals(111,
clusterControl.brokerRegistrations().get(1).epoch());
} else {
assertEquals(Uuid.fromString("mISEfEFwQIuaD1gKCc5tzQ"),
clusterControl.brokerRegistrations().get(1).incarnationId());
assertEquals(100,
clusterControl.brokerRegistrations().get(1).epoch());
}
}
}