mirror of https://github.com/apache/kafka.git
MINOR: implement BrokerRegistrationChangeRecord (#12195)
Implement BrokerRegistrationChangeRecord as specified in KIP-746. This is a more flexible record than the single-purpose Fence / Unfence records. Reviewers: José Armando García Sancio <jsancio@gmail.com>, dengziming <dengziming1993@gmail.com>
This commit is contained in:
parent
0ca9cd4d2d
commit
65b4374203
|
@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.InconsistentClusterIdException;
|
|||
import org.apache.kafka.common.errors.StaleBrokerEpochException;
|
||||
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
||||
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
|
||||
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
|
||||
import org.apache.kafka.common.metadata.FenceBrokerRecord;
|
||||
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
|
||||
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
|
||||
|
@ -32,10 +33,12 @@ import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerFeature;
|
|||
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerFeatureCollection;
|
||||
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
|
||||
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
|
||||
import org.apache.kafka.common.protocol.ApiMessage;
|
||||
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.metadata.BrokerRegistration;
|
||||
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
|
||||
import org.apache.kafka.metadata.BrokerRegistrationReply;
|
||||
import org.apache.kafka.metadata.FinalizedControllerFeatures;
|
||||
import org.apache.kafka.metadata.VersionRange;
|
||||
|
@ -379,7 +382,7 @@ public class ClusterControlManager {
|
|||
if (registration == null) {
|
||||
throw new RuntimeException(String.format("Unable to replay %s: no broker " +
|
||||
"registration found for that id", record.toString()));
|
||||
} else if (registration.epoch() != record.brokerEpoch()) {
|
||||
} else if (registration.epoch() != record.brokerEpoch()) {
|
||||
throw new RuntimeException(String.format("Unable to replay %s: no broker " +
|
||||
"registration with that epoch found", record.toString()));
|
||||
} else {
|
||||
|
@ -391,41 +394,56 @@ public class ClusterControlManager {
|
|||
}
|
||||
|
||||
public void replay(FenceBrokerRecord record) {
|
||||
int brokerId = record.id();
|
||||
BrokerRegistration registration = brokerRegistrations.get(brokerId);
|
||||
if (registration == null) {
|
||||
throw new RuntimeException(String.format("Unable to replay %s: no broker " +
|
||||
"registration found for that id", record.toString()));
|
||||
} else if (registration.epoch() != record.epoch()) {
|
||||
throw new RuntimeException(String.format("Unable to replay %s: no broker " +
|
||||
"registration with that epoch found", record.toString()));
|
||||
} else {
|
||||
if (heartbeatManager != null) heartbeatManager.register(brokerId, true);
|
||||
brokerRegistrations.put(brokerId, registration.cloneWithFencing(true));
|
||||
updateMetrics(registration, brokerRegistrations.get(brokerId));
|
||||
log.info("Fenced broker: {}", record);
|
||||
}
|
||||
replayRegistrationChange(record, record.id(), record.epoch(),
|
||||
BrokerRegistrationFencingChange.UNFENCE);
|
||||
}
|
||||
|
||||
public void replay(UnfenceBrokerRecord record) {
|
||||
int brokerId = record.id();
|
||||
BrokerRegistration registration = brokerRegistrations.get(brokerId);
|
||||
if (registration == null) {
|
||||
replayRegistrationChange(record, record.id(), record.epoch(),
|
||||
BrokerRegistrationFencingChange.FENCE);
|
||||
}
|
||||
|
||||
public void replay(BrokerRegistrationChangeRecord record) {
|
||||
Optional<BrokerRegistrationFencingChange> fencingChange =
|
||||
BrokerRegistrationFencingChange.fromValue(record.fenced());
|
||||
if (!fencingChange.isPresent()) {
|
||||
throw new RuntimeException(String.format("Unable to replay %s: unknown " +
|
||||
"value for fenced field: %d", record.toString(), record.fenced()));
|
||||
}
|
||||
replayRegistrationChange(record, record.brokerId(), record.brokerEpoch(),
|
||||
fencingChange.get());
|
||||
}
|
||||
|
||||
private void replayRegistrationChange(
|
||||
ApiMessage record,
|
||||
int brokerId,
|
||||
long brokerEpoch,
|
||||
BrokerRegistrationFencingChange fencingChange
|
||||
) {
|
||||
BrokerRegistration curRegistration = brokerRegistrations.get(brokerId);
|
||||
if (curRegistration == null) {
|
||||
throw new RuntimeException(String.format("Unable to replay %s: no broker " +
|
||||
"registration found for that id", record.toString()));
|
||||
} else if (registration.epoch() != record.epoch()) {
|
||||
} else if (curRegistration.epoch() != brokerEpoch) {
|
||||
throw new RuntimeException(String.format("Unable to replay %s: no broker " +
|
||||
"registration with that epoch found", record.toString()));
|
||||
} else {
|
||||
if (heartbeatManager != null) heartbeatManager.register(brokerId, false);
|
||||
brokerRegistrations.put(brokerId, registration.cloneWithFencing(false));
|
||||
updateMetrics(registration, brokerRegistrations.get(brokerId));
|
||||
log.info("Unfenced broker: {}", record);
|
||||
}
|
||||
if (readyBrokersFuture.isPresent()) {
|
||||
if (readyBrokersFuture.get().check()) {
|
||||
readyBrokersFuture.get().future.complete(null);
|
||||
readyBrokersFuture = Optional.empty();
|
||||
BrokerRegistration nextRegistration = curRegistration;
|
||||
if (fencingChange != BrokerRegistrationFencingChange.NONE) {
|
||||
nextRegistration = nextRegistration.cloneWithFencing(fencingChange.asBoolean().get());
|
||||
}
|
||||
if (!curRegistration.equals(nextRegistration)) {
|
||||
brokerRegistrations.put(brokerId, nextRegistration);
|
||||
updateMetrics(curRegistration, nextRegistration);
|
||||
} else {
|
||||
log.info("Ignoring no-op registration change for {}", curRegistration);
|
||||
}
|
||||
if (heartbeatManager != null) heartbeatManager.register(brokerId, nextRegistration.fenced());
|
||||
if (readyBrokersFuture.isPresent()) {
|
||||
if (readyBrokersFuture.get().check()) {
|
||||
readyBrokersFuture.get().future.complete(null);
|
||||
readyBrokersFuture = Optional.empty();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -437,19 +455,24 @@ public class ClusterControlManager {
|
|||
} else {
|
||||
controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() - 1);
|
||||
}
|
||||
log.info("Removed broker: {}", prevRegistration.id());
|
||||
} else if (prevRegistration == null) {
|
||||
if (registration.fenced()) {
|
||||
controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() + 1);
|
||||
log.info("Added new fenced broker: {}", registration.id());
|
||||
} else {
|
||||
controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() + 1);
|
||||
log.info("Added new unfenced broker: {}", registration.id());
|
||||
}
|
||||
} else {
|
||||
if (prevRegistration.fenced() && !registration.fenced()) {
|
||||
controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() - 1);
|
||||
controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() + 1);
|
||||
log.info("Unfenced broker: {}", registration.id());
|
||||
} else if (!prevRegistration.fenced() && registration.fenced()) {
|
||||
controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() + 1);
|
||||
controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() - 1);
|
||||
log.info("Fenced broker: {}", registration.id());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
|
|||
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
|
||||
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
|
||||
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
|
||||
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
|
||||
import org.apache.kafka.common.metadata.ConfigRecord;
|
||||
import org.apache.kafka.common.metadata.ClientQuotaRecord;
|
||||
import org.apache.kafka.common.metadata.FeatureLevelRecord;
|
||||
|
@ -1262,7 +1263,6 @@ public final class QuorumController implements Controller {
|
|||
break;
|
||||
case FEATURE_LEVEL_RECORD:
|
||||
featureControl.replay((FeatureLevelRecord) message);
|
||||
|
||||
handleFeatureControlChange();
|
||||
break;
|
||||
case CLIENT_QUOTA_RECORD:
|
||||
|
@ -1271,6 +1271,9 @@ public final class QuorumController implements Controller {
|
|||
case PRODUCER_IDS_RECORD:
|
||||
producerIdControlManager.replay((ProducerIdsRecord) message);
|
||||
break;
|
||||
case BROKER_REGISTRATION_CHANGE_RECORD:
|
||||
clusterControl.replay((BrokerRegistrationChangeRecord) message);
|
||||
break;
|
||||
case ACCESS_CONTROL_ENTRY_RECORD:
|
||||
aclControlManager.replay((AccessControlEntryRecord) message, snapshotId);
|
||||
break;
|
||||
|
@ -1573,7 +1576,6 @@ public final class QuorumController implements Controller {
|
|||
this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs;
|
||||
this.maxIdleIntervalNs = maxIdleIntervalNs;
|
||||
this.replicationControl = new ReplicationControlManager.Builder().
|
||||
setMetadataVersion(() -> featureControl.metadataVersion()).
|
||||
setSnapshotRegistry(snapshotRegistry).
|
||||
setLogContext(logContext).
|
||||
setDefaultReplicationFactor(defaultReplicationFactor).
|
||||
|
@ -1583,6 +1585,7 @@ public final class QuorumController implements Controller {
|
|||
setClusterControl(clusterControl).
|
||||
setControllerMetrics(controllerMetrics).
|
||||
setCreateTopicPolicy(createTopicPolicy).
|
||||
setFeatureControl(featureControl).
|
||||
build();
|
||||
this.authorizer = authorizer;
|
||||
authorizer.ifPresent(a -> a.setAclMutator(this));
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.kafka.controller;
|
||||
|
||||
import org.apache.kafka.clients.ApiVersions;
|
||||
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
|
||||
import org.apache.kafka.clients.admin.ConfigEntry;
|
||||
import org.apache.kafka.common.ElectionType;
|
||||
|
@ -63,6 +64,7 @@ import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData.Lis
|
|||
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
|
||||
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
|
||||
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
|
||||
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
|
||||
import org.apache.kafka.common.metadata.FenceBrokerRecord;
|
||||
import org.apache.kafka.common.metadata.PartitionChangeRecord;
|
||||
import org.apache.kafka.common.metadata.PartitionRecord;
|
||||
|
@ -75,6 +77,7 @@ import org.apache.kafka.common.requests.ApiError;
|
|||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.metadata.BrokerHeartbeatReply;
|
||||
import org.apache.kafka.metadata.BrokerRegistration;
|
||||
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
|
||||
import org.apache.kafka.metadata.KafkaConfigSchema;
|
||||
import org.apache.kafka.metadata.LeaderRecoveryState;
|
||||
import org.apache.kafka.metadata.PartitionRegistration;
|
||||
|
@ -112,12 +115,9 @@ import java.util.stream.Collectors;
|
|||
|
||||
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
|
||||
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
|
||||
import static org.apache.kafka.common.metadata.MetadataRecordType.FENCE_BROKER_RECORD;
|
||||
import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_RECORD;
|
||||
import static org.apache.kafka.common.metadata.MetadataRecordType.REMOVE_TOPIC_RECORD;
|
||||
import static org.apache.kafka.common.metadata.MetadataRecordType.TOPIC_RECORD;
|
||||
import static org.apache.kafka.common.metadata.MetadataRecordType.UNFENCE_BROKER_RECORD;
|
||||
import static org.apache.kafka.common.metadata.MetadataRecordType.UNREGISTER_BROKER_RECORD;
|
||||
import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH;
|
||||
import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;
|
||||
import static org.apache.kafka.common.protocol.Errors.INVALID_UPDATE_VERSION;
|
||||
|
@ -139,7 +139,6 @@ public class ReplicationControlManager {
|
|||
static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000;
|
||||
|
||||
static class Builder {
|
||||
private Supplier<MetadataVersion> metadataVersion = MetadataVersion::latest;
|
||||
private SnapshotRegistry snapshotRegistry = null;
|
||||
private LogContext logContext = null;
|
||||
private short defaultReplicationFactor = (short) 3;
|
||||
|
@ -149,11 +148,7 @@ public class ReplicationControlManager {
|
|||
private ClusterControlManager clusterControl = null;
|
||||
private ControllerMetrics controllerMetrics = null;
|
||||
private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
|
||||
|
||||
Builder setMetadataVersion(Supplier<MetadataVersion> metadataVersion) {
|
||||
this.metadataVersion = metadataVersion;
|
||||
return this;
|
||||
}
|
||||
private FeatureControlManager featureControl = null;
|
||||
|
||||
Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
|
||||
this.snapshotRegistry = snapshotRegistry;
|
||||
|
@ -200,6 +195,11 @@ public class ReplicationControlManager {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setFeatureControl(FeatureControlManager featureControl) {
|
||||
this.featureControl = featureControl;
|
||||
return this;
|
||||
}
|
||||
|
||||
ReplicationControlManager build() {
|
||||
if (configurationControl == null) {
|
||||
throw new IllegalStateException("Configuration control must be set before building");
|
||||
|
@ -210,9 +210,17 @@ public class ReplicationControlManager {
|
|||
}
|
||||
if (logContext == null) logContext = new LogContext();
|
||||
if (snapshotRegistry == null) snapshotRegistry = configurationControl.snapshotRegistry();
|
||||
return new ReplicationControlManager(
|
||||
metadataVersion,
|
||||
snapshotRegistry,
|
||||
if (featureControl == null) {
|
||||
featureControl = new FeatureControlManager.Builder().
|
||||
setLogContext(logContext).
|
||||
setSnapshotRegistry(snapshotRegistry).
|
||||
setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
|
||||
QuorumFeatures.defaultFeatureMap(),
|
||||
Collections.singletonList(0))).
|
||||
setMetadataVersion(MetadataVersion.latest()).
|
||||
build();
|
||||
}
|
||||
return new ReplicationControlManager(snapshotRegistry,
|
||||
logContext,
|
||||
defaultReplicationFactor,
|
||||
defaultNumPartitions,
|
||||
|
@ -220,7 +228,8 @@ public class ReplicationControlManager {
|
|||
configurationControl,
|
||||
clusterControl,
|
||||
controllerMetrics,
|
||||
createTopicPolicy);
|
||||
createTopicPolicy,
|
||||
featureControl);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -275,11 +284,6 @@ public class ReplicationControlManager {
|
|||
*/
|
||||
private final int defaultNumPartitions;
|
||||
|
||||
/**
|
||||
* Metadata version (or IBP) for the cluster
|
||||
*/
|
||||
private final Supplier<MetadataVersion> metadataVersion;
|
||||
|
||||
/**
|
||||
* Maximum number of leader elections to perform during one partition leader balancing operation.
|
||||
*/
|
||||
|
@ -310,6 +314,11 @@ public class ReplicationControlManager {
|
|||
*/
|
||||
private final Optional<CreateTopicPolicy> createTopicPolicy;
|
||||
|
||||
/**
|
||||
* The feature control manager.
|
||||
*/
|
||||
private final FeatureControlManager featureControl;
|
||||
|
||||
/**
|
||||
* Maps topic names to topic UUIDs.
|
||||
*/
|
||||
|
@ -358,7 +367,6 @@ public class ReplicationControlManager {
|
|||
final KRaftClusterDescriber clusterDescriber = new KRaftClusterDescriber();
|
||||
|
||||
private ReplicationControlManager(
|
||||
Supplier<MetadataVersion> metadataVersion,
|
||||
SnapshotRegistry snapshotRegistry,
|
||||
LogContext logContext,
|
||||
short defaultReplicationFactor,
|
||||
|
@ -367,9 +375,9 @@ public class ReplicationControlManager {
|
|||
ConfigurationControlManager configurationControl,
|
||||
ClusterControlManager clusterControl,
|
||||
ControllerMetrics controllerMetrics,
|
||||
Optional<CreateTopicPolicy> createTopicPolicy
|
||||
Optional<CreateTopicPolicy> createTopicPolicy,
|
||||
FeatureControlManager featureControl
|
||||
) {
|
||||
this.metadataVersion = metadataVersion;
|
||||
this.snapshotRegistry = snapshotRegistry;
|
||||
this.log = logContext.logger(ReplicationControlManager.class);
|
||||
this.defaultReplicationFactor = defaultReplicationFactor;
|
||||
|
@ -378,6 +386,7 @@ public class ReplicationControlManager {
|
|||
this.configurationControl = configurationControl;
|
||||
this.controllerMetrics = controllerMetrics;
|
||||
this.createTopicPolicy = createTopicPolicy;
|
||||
this.featureControl = featureControl;
|
||||
this.clusterControl = clusterControl;
|
||||
this.globalPartitionCount = new TimelineInteger(snapshotRegistry);
|
||||
this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0);
|
||||
|
@ -929,7 +938,7 @@ public class ReplicationControlManager {
|
|||
topic.id,
|
||||
partitionId,
|
||||
r -> clusterControl.unfenced(r),
|
||||
metadataVersion.get().isLeaderRecoverySupported());
|
||||
featureControl.metadataVersion().isLeaderRecoverySupported());
|
||||
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topicData.name())) {
|
||||
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
|
||||
}
|
||||
|
@ -1095,9 +1104,16 @@ public class ReplicationControlManager {
|
|||
}
|
||||
generateLeaderAndIsrUpdates("handleBrokerFenced", brokerId, NO_LEADER, records,
|
||||
brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
|
||||
records.add(new ApiMessageAndVersion(new FenceBrokerRecord().
|
||||
setId(brokerId).setEpoch(brokerRegistration.epoch()),
|
||||
FENCE_BROKER_RECORD.highestSupportedVersion()));
|
||||
if (featureControl.metadataVersion().isBrokerRegistrationChangeRecordSupported()) {
|
||||
records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
|
||||
setBrokerId(brokerId).setBrokerEpoch(brokerRegistration.epoch()).
|
||||
setFenced(BrokerRegistrationFencingChange.UNFENCE.value()),
|
||||
(short) 0));
|
||||
} else {
|
||||
records.add(new ApiMessageAndVersion(new FenceBrokerRecord().
|
||||
setId(brokerId).setEpoch(brokerRegistration.epoch()),
|
||||
(short) 0));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1116,7 +1132,7 @@ public class ReplicationControlManager {
|
|||
brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
|
||||
records.add(new ApiMessageAndVersion(new UnregisterBrokerRecord().
|
||||
setBrokerId(brokerId).setBrokerEpoch(brokerEpoch),
|
||||
UNREGISTER_BROKER_RECORD.highestSupportedVersion()));
|
||||
(short) 0));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1131,8 +1147,15 @@ public class ReplicationControlManager {
|
|||
* @param records The record list to append to.
|
||||
*/
|
||||
void handleBrokerUnfenced(int brokerId, long brokerEpoch, List<ApiMessageAndVersion> records) {
|
||||
records.add(new ApiMessageAndVersion(new UnfenceBrokerRecord().setId(brokerId).
|
||||
setEpoch(brokerEpoch), UNFENCE_BROKER_RECORD.highestSupportedVersion()));
|
||||
if (featureControl.metadataVersion().isBrokerRegistrationChangeRecordSupported()) {
|
||||
records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
|
||||
setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).
|
||||
setFenced(BrokerRegistrationFencingChange.FENCE.value()),
|
||||
(short) 0));
|
||||
} else {
|
||||
records.add(new ApiMessageAndVersion(new UnfenceBrokerRecord().setId(brokerId).
|
||||
setEpoch(brokerEpoch), (short) 0));
|
||||
}
|
||||
generateLeaderAndIsrUpdates("handleBrokerUnfenced", NO_LEADER, brokerId, records,
|
||||
brokersToIsrs.partitionsWithNoLeader());
|
||||
}
|
||||
|
@ -1223,7 +1246,7 @@ public class ReplicationControlManager {
|
|||
topicId,
|
||||
partitionId,
|
||||
r -> clusterControl.unfenced(r),
|
||||
metadataVersion.get().isLeaderRecoverySupported());
|
||||
featureControl.metadataVersion().isLeaderRecoverySupported());
|
||||
builder.setElection(election);
|
||||
Optional<ApiMessageAndVersion> record = builder.build();
|
||||
if (!record.isPresent()) {
|
||||
|
@ -1339,7 +1362,7 @@ public class ReplicationControlManager {
|
|||
topicPartition.topicId(),
|
||||
topicPartition.partitionId(),
|
||||
r -> clusterControl.unfenced(r),
|
||||
metadataVersion.get().isLeaderRecoverySupported()
|
||||
featureControl.metadataVersion().isLeaderRecoverySupported()
|
||||
);
|
||||
builder.setElection(PartitionChangeBuilder.Election.PREFERRED);
|
||||
builder.build().ifPresent(records::add);
|
||||
|
@ -1542,7 +1565,7 @@ public class ReplicationControlManager {
|
|||
topicIdPart.topicId(),
|
||||
topicIdPart.partitionId(),
|
||||
isAcceptableLeader,
|
||||
metadataVersion.get().isLeaderRecoverySupported());
|
||||
featureControl.metadataVersion().isLeaderRecoverySupported());
|
||||
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
|
||||
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
|
||||
}
|
||||
|
@ -1652,7 +1675,7 @@ public class ReplicationControlManager {
|
|||
tp.topicId(),
|
||||
tp.partitionId(),
|
||||
r -> clusterControl.unfenced(r),
|
||||
metadataVersion.get().isLeaderRecoverySupported());
|
||||
featureControl.metadataVersion().isLeaderRecoverySupported());
|
||||
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
|
||||
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
|
||||
}
|
||||
|
@ -1704,7 +1727,7 @@ public class ReplicationControlManager {
|
|||
tp.topicId(),
|
||||
tp.partitionId(),
|
||||
r -> clusterControl.unfenced(r),
|
||||
metadataVersion.get().isLeaderRecoverySupported());
|
||||
featureControl.metadataVersion().isLeaderRecoverySupported());
|
||||
if (!reassignment.merged().equals(currentReplicas)) {
|
||||
builder.setTargetReplicas(reassignment.merged());
|
||||
}
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.metadata;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
public enum BrokerRegistrationFencingChange {
|
||||
FENCE(-1, Optional.of(false)),
|
||||
NONE(0, Optional.empty()),
|
||||
UNFENCE(1, Optional.of(true));
|
||||
|
||||
private final byte value;
|
||||
|
||||
private final Optional<Boolean> asBoolean;
|
||||
|
||||
private final static Map<Byte, BrokerRegistrationFencingChange> VALUE_TO_ENUM =
|
||||
Arrays.stream(BrokerRegistrationFencingChange.values()).
|
||||
collect(Collectors.toMap(v -> Byte.valueOf(v.value()), Function.identity()));
|
||||
|
||||
public static Optional<BrokerRegistrationFencingChange> fromValue(byte value) {
|
||||
return Optional.ofNullable(VALUE_TO_ENUM.get(value));
|
||||
}
|
||||
|
||||
BrokerRegistrationFencingChange(int value, Optional<Boolean> asBoolean) {
|
||||
this.value = (byte) value;
|
||||
this.asBoolean = asBoolean;
|
||||
}
|
||||
|
||||
public Optional<Boolean> asBoolean() {
|
||||
return asBoolean;
|
||||
}
|
||||
|
||||
public byte value() {
|
||||
return value;
|
||||
}
|
||||
}
|
|
@ -23,14 +23,17 @@ import java.util.HashSet;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.kafka.common.Endpoint;
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.common.errors.InconsistentClusterIdException;
|
||||
import org.apache.kafka.common.errors.StaleBrokerEpochException;
|
||||
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
|
||||
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
|
||||
import org.apache.kafka.common.metadata.FenceBrokerRecord;
|
||||
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
|
||||
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
|
||||
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection;
|
||||
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
|
||||
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
|
||||
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
|
||||
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
||||
|
@ -43,12 +46,15 @@ import org.apache.kafka.metadata.placement.ClusterDescriber;
|
|||
import org.apache.kafka.metadata.placement.PlacementSpec;
|
||||
import org.apache.kafka.metadata.placement.UsableBroker;
|
||||
import org.apache.kafka.server.common.ApiMessageAndVersion;
|
||||
import org.apache.kafka.server.common.MetadataVersion;
|
||||
import org.apache.kafka.timeline.SnapshotRegistry;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.Timeout;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.EnumSource;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
|
||||
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
@ -57,8 +63,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|||
|
||||
@Timeout(value = 40)
|
||||
public class ClusterControlManagerTest {
|
||||
@Test
|
||||
public void testReplay() {
|
||||
@ParameterizedTest
|
||||
@EnumSource(value = MetadataVersion.class, names = {"IBP_3_0_IV0", "IBP_3_3_IV2"})
|
||||
public void testReplay(MetadataVersion metadataVersion) {
|
||||
MockTime time = new MockTime(0, 0, 0);
|
||||
|
||||
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
|
||||
|
@ -86,11 +93,29 @@ public class ClusterControlManagerTest {
|
|||
assertFalse(clusterControl.unfenced(0));
|
||||
assertFalse(clusterControl.unfenced(1));
|
||||
|
||||
UnfenceBrokerRecord unfenceBrokerRecord =
|
||||
new UnfenceBrokerRecord().setId(1).setEpoch(100);
|
||||
clusterControl.replay(unfenceBrokerRecord);
|
||||
if (metadataVersion.isLessThan(IBP_3_3_IV2)) {
|
||||
UnfenceBrokerRecord unfenceBrokerRecord =
|
||||
new UnfenceBrokerRecord().setId(1).setEpoch(100);
|
||||
clusterControl.replay(unfenceBrokerRecord);
|
||||
} else {
|
||||
BrokerRegistrationChangeRecord changeRecord =
|
||||
new BrokerRegistrationChangeRecord().setBrokerId(1).setBrokerEpoch(100).setFenced((byte) -1);
|
||||
clusterControl.replay(changeRecord);
|
||||
}
|
||||
assertFalse(clusterControl.unfenced(0));
|
||||
assertTrue(clusterControl.unfenced(1));
|
||||
|
||||
if (metadataVersion.isLessThan(IBP_3_3_IV2)) {
|
||||
FenceBrokerRecord fenceBrokerRecord =
|
||||
new FenceBrokerRecord().setId(1).setEpoch(100);
|
||||
clusterControl.replay(fenceBrokerRecord);
|
||||
} else {
|
||||
BrokerRegistrationChangeRecord changeRecord =
|
||||
new BrokerRegistrationChangeRecord().setBrokerId(1).setBrokerEpoch(100).setFenced((byte) 1);
|
||||
clusterControl.replay(changeRecord);
|
||||
}
|
||||
assertFalse(clusterControl.unfenced(0));
|
||||
assertFalse(clusterControl.unfenced(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -251,11 +251,11 @@ public class FeatureControlManagerTest {
|
|||
|
||||
|
||||
result = manager.updateFeatures(
|
||||
Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_2_IV0.featureLevel()),
|
||||
Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_1_IV0.featureLevel()),
|
||||
Collections.singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
|
||||
Collections.emptyMap(),
|
||||
true);
|
||||
assertEquals(Errors.NONE, result.response().get(MetadataVersion.FEATURE_NAME).error());
|
||||
assertEquals(Errors.INVALID_UPDATE_VERSION, result.response().get(MetadataVersion.FEATURE_NAME).error());
|
||||
|
||||
result = manager.updateFeatures(
|
||||
Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_0_IV0.featureLevel()),
|
||||
|
|
|
@ -718,7 +718,7 @@ public class QuorumControllerTest {
|
|||
setPartitionIndex(1).
|
||||
setBrokerIds(Arrays.asList(1, 2, 0))).
|
||||
iterator()))).iterator())),
|
||||
Collections.singleton(topicName)).get();
|
||||
Collections.singleton(topicName)).get(60, TimeUnit.SECONDS);
|
||||
}
|
||||
logEnv.waitForLatestSnapshot();
|
||||
}
|
||||
|
|
|
@ -76,7 +76,6 @@ import org.apache.kafka.metadata.Replicas;
|
|||
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
|
||||
import org.apache.kafka.metadata.placement.UsableBroker;
|
||||
import org.apache.kafka.server.common.ApiMessageAndVersion;
|
||||
import org.apache.kafka.server.common.MetadataVersion;
|
||||
import org.apache.kafka.server.policy.CreateTopicPolicy;
|
||||
import org.apache.kafka.timeline.SnapshotRegistry;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
@ -165,7 +164,6 @@ public class ReplicationControlManagerTest {
|
|||
|
||||
ReplicationControlTestContext(Optional<CreateTopicPolicy> createTopicPolicy) {
|
||||
this.replicationControl = new ReplicationControlManager.Builder().
|
||||
setMetadataVersion(() -> MetadataVersion.IBP_3_3_IV1).
|
||||
setSnapshotRegistry(snapshotRegistry).
|
||||
setLogContext(logContext).
|
||||
setMaxElectionsPerImbalance(Integer.MAX_VALUE).
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.metadata;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.Timeout;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
|
||||
@Timeout(40)
|
||||
public class BrokerRegistrationFencingChangeTest {
|
||||
@Test
|
||||
public void testValues() {
|
||||
assertEquals((byte) -1, BrokerRegistrationFencingChange.FENCE.value());
|
||||
assertEquals((byte) 0, BrokerRegistrationFencingChange.NONE.value());
|
||||
assertEquals((byte) 1, BrokerRegistrationFencingChange.UNFENCE.value());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAsBoolean() {
|
||||
assertEquals(Optional.of(false), BrokerRegistrationFencingChange.FENCE.asBoolean());
|
||||
assertEquals(Optional.empty(), BrokerRegistrationFencingChange.NONE.asBoolean());
|
||||
assertEquals(Optional.of(true), BrokerRegistrationFencingChange.UNFENCE.asBoolean());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValueRoundTrip() {
|
||||
for (BrokerRegistrationFencingChange change : BrokerRegistrationFencingChange.values()) {
|
||||
assertEquals(Optional.of(change), BrokerRegistrationFencingChange.fromValue(change.value()));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -156,7 +156,10 @@ public enum MetadataVersion {
|
|||
IBP_3_3_IV0(5, "3.3", "IV0", false),
|
||||
|
||||
// Support NoopRecord for the cluster metadata log (KIP-835)
|
||||
IBP_3_3_IV1(6, "3.3", "IV1", true);
|
||||
IBP_3_3_IV1(6, "3.3", "IV1", true),
|
||||
|
||||
// In KRaft mode, use BrokerRegistrationChangeRecord instead of UnfenceBrokerRecord and FenceBrokerRecord.
|
||||
IBP_3_3_IV2(7, "3.3", "IV2", true);
|
||||
|
||||
public static final String FEATURE_NAME = "metadata.version";
|
||||
|
||||
|
@ -236,6 +239,10 @@ public enum MetadataVersion {
|
|||
}
|
||||
}
|
||||
|
||||
public boolean isBrokerRegistrationChangeRecordSupported() {
|
||||
return this.isAtLeast(IBP_3_3_IV2);
|
||||
}
|
||||
|
||||
private static final Map<String, MetadataVersion> IBP_VERSIONS;
|
||||
static {
|
||||
{
|
||||
|
|
|
@ -61,6 +61,7 @@ import static org.apache.kafka.server.common.MetadataVersion.IBP_3_1_IV0;
|
|||
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_2_IV0;
|
||||
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV0;
|
||||
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV1;
|
||||
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
|
@ -189,9 +190,10 @@ class MetadataVersionTest {
|
|||
assertEquals(IBP_3_2_IV0, MetadataVersion.fromVersionString("3.2"));
|
||||
assertEquals(IBP_3_2_IV0, MetadataVersion.fromVersionString("3.2-IV0"));
|
||||
|
||||
assertEquals(IBP_3_3_IV1, MetadataVersion.fromVersionString("3.3"));
|
||||
assertEquals(IBP_3_3_IV2, MetadataVersion.fromVersionString("3.3"));
|
||||
assertEquals(IBP_3_3_IV0, MetadataVersion.fromVersionString("3.3-IV0"));
|
||||
assertEquals(IBP_3_3_IV1, MetadataVersion.fromVersionString("3.3-IV1"));
|
||||
assertEquals(IBP_3_3_IV2, MetadataVersion.fromVersionString("3.3-IV2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -237,6 +239,7 @@ class MetadataVersionTest {
|
|||
assertEquals("3.2", IBP_3_2_IV0.shortVersion());
|
||||
assertEquals("3.3", IBP_3_3_IV0.shortVersion());
|
||||
assertEquals("3.3", IBP_3_3_IV1.shortVersion());
|
||||
assertEquals("3.3", IBP_3_3_IV2.shortVersion());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -271,6 +274,7 @@ class MetadataVersionTest {
|
|||
assertEquals("3.2-IV0", IBP_3_2_IV0.version());
|
||||
assertEquals("3.3-IV0", IBP_3_3_IV0.version());
|
||||
assertEquals("3.3-IV1", IBP_3_3_IV1.version());
|
||||
assertEquals("3.3-IV2", IBP_3_3_IV2.version());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue