KAFKA-17305; Check broker registrations for missing features (#16848)

When a broker tries to register with the controller quorum, its registration should be rejected if it doesn't support a feature that is currently enabled. (A feature is enabled if it is set to a non-zero feature level.) This is important for the newly added kraft.version feature flag.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, José Armando García Sancio <jsancio@apache.org>
This commit is contained in:
Alyssa Huang 2024-08-21 11:14:56 -07:00 committed by GitHub
parent 7c7751d8da
commit 0bb2aee838
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 371 additions and 41 deletions

View File

@ -19,6 +19,7 @@ package kafka.server
import kafka.utils.Logging
import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.metadata.VersionRange
import org.apache.kafka.server.common.Features.PRODUCTION_FEATURES
import org.apache.kafka.server.common.MetadataVersion
@ -80,6 +81,13 @@ object BrokerFeatures extends Logging {
new BrokerFeatures(defaultSupportedFeatures(unstableFeatureVersionsEnabled))
}
def createDefaultFeatureMap(features: BrokerFeatures): Map[String, VersionRange] = {
features.supportedFeatures.features.asScala.map {
case (name, versionRange) =>
(name, VersionRange.of(versionRange.min, versionRange.max))
}.toMap
}
def defaultSupportedFeatures(unstableFeatureVersionsEnabled: Boolean): Features[SupportedVersionRange] = {
val features = new util.HashMap[String, SupportedVersionRange]()
features.put(MetadataVersion.FEATURE_NAME,

View File

@ -28,7 +28,6 @@ import kafka.server.metadata.{AclPublisher, BrokerMetadataPublisher, ClientQuota
import kafka.server.share.SharePartitionManager
import kafka.utils.CoreUtils
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.feature.SupportedVersionRange
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
@ -40,7 +39,7 @@ import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord
import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics}
import org.apache.kafka.coordinator.group.{GroupCoordinatorRecordSerde, GroupConfigManager, GroupCoordinator, GroupCoordinatorService}
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, VersionRange}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.{AssignmentsManager, ClientMetricsManager, NodeToControllerChannelManager}
import org.apache.kafka.server.authorizer.Authorizer
@ -366,10 +365,7 @@ class BrokerServer(
ConfigType.CLIENT_METRICS -> new ClientMetricsConfigHandler(clientMetricsManager),
ConfigType.GROUP -> new GroupConfigHandler(groupCoordinator))
val featuresRemapped = brokerFeatures.supportedFeatures.features().asScala.map {
case (k: String, v: SupportedVersionRange) =>
k -> VersionRange.of(v.min, v.max)
}.asJava
val featuresRemapped = BrokerFeatures.createDefaultFeatureMap(brokerFeatures).asJava
val brokerLifecycleChannelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider,

View File

@ -474,18 +474,19 @@ class KafkaServer(
setSecurityProtocol(ep.securityProtocol.id))
}
// Even though ZK brokers don't use "metadata.version" feature, we send our IBP here as part of the broker registration
val features = BrokerFeatures.createDefaultFeatureMap(BrokerFeatures.createDefault(config.unstableFeatureVersionsEnabled))
// Even though ZK brokers don't use "metadata.version" feature, we need to overwrite it with our IBP as part of registration
// so the KRaft controller can verify that all brokers are on the same IBP before starting the migration.
val ibpAsFeature =
java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME,
VersionRange.of(config.interBrokerProtocolVersion.featureLevel(), config.interBrokerProtocolVersion.featureLevel()))
val featuresRemapped = features + (MetadataVersion.FEATURE_NAME ->
VersionRange.of(config.interBrokerProtocolVersion.featureLevel(), config.interBrokerProtocolVersion.featureLevel()))
lifecycleManager.start(
() => listener.highestOffset,
brokerToQuorumChannelManager,
clusterId,
networkListeners,
ibpAsFeature,
featuresRemapped.asJava,
OptionalLong.empty()
)
logger.debug("Start RaftManager")

View File

@ -24,11 +24,15 @@ import org.apache.kafka.common.Uuid
import org.apache.kafka.common.message.{BrokerHeartbeatResponseData, BrokerRegistrationResponseData}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest, BrokerRegistrationResponse}
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.metadata.{BrokerState, VersionRange}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
import org.apache.kafka.server.common.{Features, KRaftVersion, MetadataVersion}
import org.apache.kafka.server.common.MetadataVersion.{IBP_3_8_IV0, IBP_3_9_IV0}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs, ZkConfigs}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.util.concurrent.{CompletableFuture, Future}
import scala.jdk.CollectionConverters._
@ -56,6 +60,15 @@ class BrokerLifecycleManagerTest {
properties
}
def migrationConfigProperties(ibp: MetadataVersion) = {
val migrationConfigProperties = configProperties
migrationConfigProperties.setProperty(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
migrationConfigProperties.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
migrationConfigProperties.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "")
migrationConfigProperties.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, ibp.toString)
migrationConfigProperties
}
@Test
def testCreateAndClose(): Unit = {
val context = new RegistrationTestContext(configProperties)
@ -99,6 +112,42 @@ class BrokerLifecycleManagerTest {
}
}
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testSuccessfulRegistrationDuringMigration(nonInitialKraftVersion: Boolean): Unit = {
val ibp = if (nonInitialKraftVersion) IBP_3_9_IV0 else IBP_3_8_IV0
val context = new RegistrationTestContext(migrationConfigProperties(ibp))
manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
val controllerNode = new Node(3000, "localhost", 8021)
context.controllerNodeProvider.node.set(controllerNode)
val features = BrokerFeatures.createDefaultFeatureMap(BrokerFeatures.createDefault(true))
// Even though ZK brokers don't use "metadata.version" feature, we need to overwrite it with our IBP as part of registration
// so the KRaft controller can verify that all brokers are on the same IBP before starting the migration.
val featuresRemapped = features + (MetadataVersion.FEATURE_NAME -> VersionRange.of(ibp.featureLevel(), ibp.featureLevel()))
manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId, context.advertisedListeners,
featuresRemapped.asJava, OptionalLong.of(10L))
TestUtils.retry(60000) {
assertEquals(1, context.mockChannelManager.unsentQueue.size)
val sentBrokerRegistrationData = context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data()
assertEquals(10L, sentBrokerRegistrationData.previousBrokerEpoch())
assertEquals(ibp.featureLevel(), sentBrokerRegistrationData.features().find(MetadataVersion.FEATURE_NAME).maxSupportedVersion())
if (nonInitialKraftVersion) {
val sentKraftVersion = sentBrokerRegistrationData.features().find(KRaftVersion.FEATURE_NAME)
assertEquals(Features.KRAFT_VERSION.minimumProduction(), sentKraftVersion.minSupportedVersion())
assertEquals(Features.KRAFT_VERSION.latestTesting(), sentKraftVersion.maxSupportedVersion())
}
}
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
new BrokerRegistrationResponseData().setBrokerEpoch(1000)), controllerNode)
TestUtils.retry(10000) {
context.poll()
assertEquals(1000L, manager.brokerEpoch)
}
}
@Test
def testRegistrationTimeout(): Unit = {
val context = new RegistrationTestContext(configProperties)

View File

@ -32,11 +32,12 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Node, Uuid}
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.extension.ExtendWith
import java.util.Collections
import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
/**
@ -98,7 +99,6 @@ class BrokerRegistrationRequestTest {
ibpToSend: Option[(MetadataVersion, MetadataVersion)]
): Errors = {
val features = new BrokerRegistrationRequestData.FeatureCollection()
ibpToSend foreach {
case (min, max) =>
features.add(new BrokerRegistrationRequestData.Feature()
@ -107,9 +107,17 @@ class BrokerRegistrationRequestTest {
.setMaxSupportedVersion(max.featureLevel())
)
}
Features.PRODUCTION_FEATURES.stream().filter(_.featureName != MetadataVersion.FEATURE_NAME).forEach {
feature =>
features.add(new BrokerRegistrationRequestData.Feature()
.setName(feature.featureName)
.setMinSupportedVersion(feature.minimumProduction())
.setMaxSupportedVersion(feature.latestTesting()))
}
val req = new BrokerRegistrationRequestData()
.setBrokerId(brokerId)
.setLogDirs(Collections.singletonList(Uuid.randomUuid()))
.setClusterId(clusterId)
.setIncarnationId(Uuid.randomUuid())
.setIsMigratingZkBroker(zkEpoch.isDefined)
@ -229,6 +237,34 @@ class BrokerRegistrationRequestTest {
}
}
@ClusterTest(
types = Array(Type.KRAFT),
brokers = 1,
controllers = 1,
metadataVersion = MetadataVersion.IBP_3_9_IV0,
autoStart = AutoStart.NO,
serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "true")))
def testRegisterZkWithKRaftMigrationEnabledKRaftV1(clusterInstance: ClusterInstance): Unit = {
clusterInstance.asInstanceOf[RaftClusterInstance].controllers().values().forEach(_.startup())
val clusterId = clusterInstance.clusterId()
val channelManager = brokerToControllerChannelManager(clusterInstance)
try {
channelManager.start()
assertEquals(
Errors.NONE,
registerBroker(channelManager, clusterId, 100, Some(1), Some((MetadataVersion.IBP_3_9_IV0, MetadataVersion.IBP_3_9_IV0))))
// Cannot register KRaft broker when in pre-migration
assertEquals(
Errors.BROKER_ID_NOT_REGISTERED,
registerBroker(channelManager, clusterId, 100, None, Some((MetadataVersion.IBP_3_9_IV0, MetadataVersion.IBP_3_9_IV0))))
} finally {
channelManager.shutdown()
}
}
/**
* Start a KRaft cluster with migrations enabled, verify that the controller does not accept metadata changes
* through the RPCs. The migration never proceeds past pre-migration since no ZK brokers are registered.

View File

@ -35,7 +35,7 @@ import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener,
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXT
import org.apache.kafka.controller.ControllerRequestContextUtil
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.{Features, KRaftVersion, MetadataVersion, TransactionVersion}
import org.apache.kafka.server.config.QuotaConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.AfterEach
@ -291,10 +291,18 @@ class ReplicationQuotasTest extends QuorumTestHarness {
val listeners = new ListenerCollection()
listeners.add(new Listener().setName(PLAINTEXT.name).setHost("localhost").setPort(9092 + id))
val features = new BrokerRegistrationRequestData.FeatureCollection()
features.add(new BrokerRegistrationRequestData.Feature()
.setName(KRaftVersion.FEATURE_NAME)
.setMinSupportedVersion(KRaftVersion.KRAFT_VERSION_0.featureLevel())
.setMaxSupportedVersion(Features.KRAFT_VERSION.latestTesting()))
features.add(new BrokerRegistrationRequestData.Feature()
.setName(MetadataVersion.FEATURE_NAME)
.setMinSupportedVersion(MetadataVersion.IBP_3_0_IV1.featureLevel())
.setMinSupportedVersion(MetadataVersion.IBP_3_9_IV0.featureLevel())
.setMaxSupportedVersion(MetadataVersion.latestTesting().featureLevel()))
features.add(new BrokerRegistrationRequestData.Feature()
.setName(TransactionVersion.FEATURE_NAME)
.setMinSupportedVersion(TransactionVersion.TV_0.featureLevel())
.setMaxSupportedVersion(Features.TRANSACTION_VERSION.latestTesting()))
controllerServer.controller.registerBroker(
ControllerRequestContextUtil.ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData()

View File

@ -50,7 +50,6 @@ import org.apache.kafka.metadata.placement.ReplicaPlacer;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
@ -406,18 +405,34 @@ public class ClusterControlManager {
setRack(request.rack()).
setEndPoints(listenerInfo.toBrokerRegistrationRecord());
// Track which finalized features we have not yet verified are supported by the broker.
Map<String, Short> unverifiedFeatures = new HashMap<>(finalizedFeatures.featureMap());
// Check every broker feature version range includes the finalized version.
for (BrokerRegistrationRequestData.Feature feature : request.features()) {
record.features().add(processRegistrationFeature(brokerId, finalizedFeatures, feature));
unverifiedFeatures.remove(feature.name());
}
// Brokers that don't send a supported metadata.version range are assumed to only
// support the original metadata.version.
if (request.features().find(MetadataVersion.FEATURE_NAME) == null) {
// Brokers that don't send a supported metadata.version range are assumed to only
// support the original metadata.version.
record.features().add(processRegistrationFeature(brokerId, finalizedFeatures,
new BrokerRegistrationRequestData.Feature().
setName(MetadataVersion.FEATURE_NAME).
setMinSupportedVersion(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel()).
setMaxSupportedVersion(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel())));
new BrokerRegistrationRequestData.Feature().
setName(MetadataVersion.FEATURE_NAME).
setMinSupportedVersion(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel()).
setMaxSupportedVersion(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel())));
unverifiedFeatures.remove(MetadataVersion.FEATURE_NAME);
}
// We also need to check every controller feature is supported by the broker.
unverifiedFeatures.forEach((featureName, finalizedVersion) -> {
if (finalizedVersion != 0 && request.features().findAll(featureName).isEmpty()) {
processRegistrationFeature(brokerId, finalizedFeatures,
new BrokerRegistrationRequestData.Feature().
setName(featureName).
setMinSupportedVersion((short) 0).
setMaxSupportedVersion((short) 0));
}
});
if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) {
record.setLogDirs(request.logDirs());
}
@ -490,15 +505,15 @@ public class ClusterControlManager {
short finalized = finalizedFeatures.versionOrDefault(feature.name(), (short) defaultVersion);
if (!VersionRange.of(feature.minSupportedVersion(), feature.maxSupportedVersion()).contains(finalized)) {
throw new UnsupportedVersionException("Unable to register because the broker " +
"does not support version " + finalized + " of " + feature.name() +
". It wants a version between " + feature.minSupportedVersion() + " and " +
"does not support finalized version " + finalized + " of " + feature.name() +
". The broker wants a version between " + feature.minSupportedVersion() + " and " +
feature.maxSupportedVersion() + ", inclusive.");
}
// A feature is not found in the finalizedFeature map if it is unknown to the controller or set to 0 (feature not enabled).
// Only log if the feature name is not known by the controller.
if (!Features.PRODUCTION_FEATURE_NAMES.contains(feature.name()))
log.warn("Broker {} registered with feature {} that is unknown to the controller",
brokerId, feature.name());
if (!finalizedFeatures.featureNames().contains(feature.name()))
log.debug("Broker {} registered with version range ({}, {}] of feature {} which controller does not know " +
"or has finalized version of 0.",
brokerId, feature.minSupportedVersion(), feature.maxSupportedVersion(), feature.name());
return new BrokerFeature().
setName(feature.name()).
setMinSupportedVersion(feature.minSupportedVersion()).

View File

@ -119,6 +119,7 @@ import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.FaultHandlerException;
@ -2171,11 +2172,14 @@ public final class QuorumController implements Controller {
ControllerRequestContext context,
BrokerRegistrationRequestData request
) {
// populate finalized features map with latest known kraft version for validation
Map<String, Short> controllerFeatures = new HashMap<>(featureControl.finalizedFeatures(Long.MAX_VALUE).featureMap());
controllerFeatures.put(KRaftVersion.FEATURE_NAME, raftClient.kraftVersion().featureLevel());
return appendWriteEvent("registerBroker", context.deadlineNs(),
() -> {
ControllerResult<BrokerRegistrationReply> result = clusterControl.
registerBroker(request, offsetControl.nextWriteOffset(), featureControl.
finalizedFeatures(Long.MAX_VALUE));
registerBroker(request, offsetControl.nextWriteOffset(),
new FinalizedControllerFeatures(controllerFeatures, Long.MAX_VALUE));
rescheduleMaybeFenceStaleBrokers();
return result;
},

View File

@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.ControllerRegistrationRequestData;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
@ -49,7 +50,9 @@ import org.apache.kafka.metadata.placement.PartitionAssignment;
import org.apache.kafka.metadata.placement.PlacementSpec;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.TestFeatureVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
@ -62,9 +65,11 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
@ -531,6 +536,135 @@ public class ClusterControlManagerTest {
clusterControl.brokerRegistrations().get(2).toRecord(options));
}
@Test
public void testRegistrationWithUnsupportedFeature() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
Map<String, VersionRange> supportedFeatures = new HashMap<>();
supportedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
MetadataVersion.IBP_3_1_IV0.featureLevel(),
MetadataVersion.IBP_3_7_IV0.featureLevel()));
supportedFeatures.put(TestFeatureVersion.FEATURE_NAME, VersionRange.of(
TestFeatureVersion.TEST_0.featureLevel(),
TestFeatureVersion.TEST_1.featureLevel()));
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0, supportedFeatures, Collections.singletonList(0))).
setMetadataVersion(MetadataVersion.IBP_3_7_IV0).
build();
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setTime(new MockTime(0, 0, 0)).
setSnapshotRegistry(snapshotRegistry).
setFeatureControlManager(featureControl).
setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
build();
clusterControl.activate();
FeatureLevelRecord testFeatureRecord = new FeatureLevelRecord().
setName(TestFeatureVersion.FEATURE_NAME).setFeatureLevel((short) 1);
featureControl.replay(testFeatureRecord);
List<Uuid> logDirs = asList(Uuid.fromString("yJGxmjfbQZSVFAlNM3uXZg"), Uuid.fromString("Mj3CW3OSRi29cFeNJlXuAQ"));
BrokerRegistrationRequestData baseRequest = new BrokerRegistrationRequestData().
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setBrokerId(0).
setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")).
setLogDirs(logDirs);
assertEquals("Unable to register because the broker does not support finalized version 1 of " +
"test.feature.version. The broker wants a version between 0 and 0, inclusive.",
assertThrows(UnsupportedVersionException.class,
() -> clusterControl.registerBroker(
baseRequest.setFeatures(new BrokerRegistrationRequestData.FeatureCollection(
Collections.singleton(new BrokerRegistrationRequestData.Feature().
setName(MetadataVersion.FEATURE_NAME).
setMinSupportedVersion(MetadataVersion.IBP_3_1_IV0.featureLevel()).
setMaxSupportedVersion(MetadataVersion.IBP_3_7_IV0.featureLevel())).iterator())),
123L,
featureControl.finalizedFeatures(Long.MAX_VALUE))).getMessage());
}
@Test
public void testRegistrationWithUnsupportedKraftVersion() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
Map<String, VersionRange> supportedFeatures = new HashMap<>();
supportedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
MetadataVersion.IBP_3_1_IV0.featureLevel(),
MetadataVersion.IBP_3_9_IV0.featureLevel()));
supportedFeatures.put(KRaftVersion.FEATURE_NAME, VersionRange.of(
KRaftVersion.KRAFT_VERSION_1.featureLevel(),
KRaftVersion.KRAFT_VERSION_1.featureLevel()));
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0, supportedFeatures, Collections.singletonList(0))).
setMetadataVersion(MetadataVersion.IBP_3_9_IV0).
build();
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setTime(new MockTime(0, 0, 0)).
setSnapshotRegistry(snapshotRegistry).
setFeatureControlManager(featureControl).
setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
build();
clusterControl.activate();
List<Uuid> logDirs = asList(Uuid.fromString("yJGxmjfbQZSVFAlNM3uXZg"), Uuid.fromString("Mj3CW3OSRi29cFeNJlXuAQ"));
BrokerRegistrationRequestData baseRequest = new BrokerRegistrationRequestData().
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setBrokerId(0).
setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")).
setLogDirs(logDirs);
// quorum controller passes in the latest kraft version to populate finalized features
Map<String, Short> updatedFeaturesMap = new HashMap<>(featureControl.finalizedFeatures(Long.MAX_VALUE).featureMap());
updatedFeaturesMap.put(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel());
FinalizedControllerFeatures updatedFinalizedFeatures = new FinalizedControllerFeatures(updatedFeaturesMap, Long.MAX_VALUE);
assertEquals("Unable to register because the broker does not support finalized version 1 of " +
"kraft.version. The broker wants a version between 0 and 0, inclusive.",
assertThrows(UnsupportedVersionException.class,
() -> clusterControl.registerBroker(
baseRequest.setFeatures(new BrokerRegistrationRequestData.FeatureCollection(
Collections.singleton(new BrokerRegistrationRequestData.Feature().
setName(MetadataVersion.FEATURE_NAME).
setMinSupportedVersion(MetadataVersion.IBP_3_9_IV0.featureLevel()).
setMaxSupportedVersion(MetadataVersion.IBP_3_9_IV0.featureLevel())).iterator())),
123L,
updatedFinalizedFeatures)).getMessage());
assertEquals("Unable to register because the broker does not support finalized version 1 of " +
"kraft.version. The broker wants a version between 0 and 0, inclusive.",
assertThrows(UnsupportedVersionException.class,
() -> clusterControl.registerBroker(
baseRequest.setFeatures(new BrokerRegistrationRequestData.FeatureCollection(
Arrays.asList(
new BrokerRegistrationRequestData.Feature().
setName(MetadataVersion.FEATURE_NAME).
setMinSupportedVersion(MetadataVersion.IBP_3_9_IV0.featureLevel()).
setMaxSupportedVersion(MetadataVersion.IBP_3_9_IV0.featureLevel()),
new BrokerRegistrationRequestData.Feature().
setName(KRaftVersion.FEATURE_NAME).
setMinSupportedVersion(KRaftVersion.KRAFT_VERSION_0.featureLevel()).
setMaxSupportedVersion(KRaftVersion.KRAFT_VERSION_0.featureLevel())).iterator())),
123L,
updatedFinalizedFeatures)).getMessage());
clusterControl.registerBroker(
baseRequest.setFeatures(new BrokerRegistrationRequestData.FeatureCollection(
Arrays.asList(
new BrokerRegistrationRequestData.Feature().
setName(MetadataVersion.FEATURE_NAME).
setMinSupportedVersion(MetadataVersion.IBP_3_9_IV0.featureLevel()).
setMaxSupportedVersion(MetadataVersion.IBP_3_9_IV0.featureLevel()),
new BrokerRegistrationRequestData.Feature().
setName(KRaftVersion.FEATURE_NAME).
setMinSupportedVersion(KRaftVersion.KRAFT_VERSION_1.featureLevel()).
setMaxSupportedVersion(KRaftVersion.KRAFT_VERSION_1.featureLevel())).iterator())),
123L,
updatedFinalizedFeatures);
}
@Test
public void testRegistrationWithUnsupportedMetadataVersion() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
@ -552,8 +686,8 @@ public class ClusterControlManagerTest {
build();
clusterControl.activate();
assertEquals("Unable to register because the broker does not support version 4 of " +
"metadata.version. It wants a version between 1 and 1, inclusive.",
assertEquals("Unable to register because the broker does not support finalized version 4 of " +
"metadata.version. The broker wants a version between 1 and 1, inclusive.",
assertThrows(UnsupportedVersionException.class,
() -> clusterControl.registerBroker(
new BrokerRegistrationRequestData().
@ -564,8 +698,8 @@ public class ClusterControlManagerTest {
123L,
featureControl.finalizedFeatures(Long.MAX_VALUE))).getMessage());
assertEquals("Unable to register because the broker does not support version 4 of " +
"metadata.version. It wants a version between 7 and 7, inclusive.",
assertEquals("Unable to register because the broker does not support finalized version 4 of " +
"metadata.version. The broker wants a version between 7 and 7, inclusive.",
assertThrows(UnsupportedVersionException.class,
() -> clusterControl.registerBroker(
new BrokerRegistrationRequestData().

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
@ -106,6 +107,8 @@ import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.server.fault.FaultHandlerException;
@ -117,6 +120,7 @@ import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
@ -685,6 +689,61 @@ public class QuorumControllerTest {
}
}
@ParameterizedTest
@CsvSource(value = {"0, 0", "0, 1", "1, 0", "1, 1"})
public void testRegisterBrokerKRaftVersions(short finalizedKraftVersion, short brokerMaxSupportedKraftVersion) throws Throwable {
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).
setLastKRaftVersion(KRaftVersion.fromFeatureLevel(finalizedKraftVersion)).
build();
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
setControllerBuilderInitializer(controllerBuilder ->
controllerBuilder.setConfigSchema(SCHEMA)).
setBootstrapMetadata(SIMPLE_BOOTSTRAP).
build()
) {
ListenerCollection listeners = new ListenerCollection();
listeners.add(new Listener().setName("PLAINTEXT").
setHost("localhost").setPort(9092));
QuorumController active = controlEnv.activeController();
BrokerRegistrationRequestData.FeatureCollection brokerFeatures = new BrokerRegistrationRequestData.FeatureCollection();
brokerFeatures.add(new BrokerRegistrationRequestData.Feature()
.setName(MetadataVersion.FEATURE_NAME)
.setMinSupportedVersion(MetadataVersion.IBP_3_0_IV1.featureLevel())
.setMaxSupportedVersion(MetadataVersion.latestTesting().featureLevel()));
// broker registration requests do not include initial versions of features
if (brokerMaxSupportedKraftVersion != 0) {
brokerFeatures.add(new BrokerRegistrationRequestData.Feature()
.setName(KRaftVersion.FEATURE_NAME)
.setMinSupportedVersion(Features.KRAFT_VERSION.minimumProduction())
.setMaxSupportedVersion(brokerMaxSupportedKraftVersion));
}
BrokerRegistrationRequestData request = new BrokerRegistrationRequestData().
setBrokerId(0).
setClusterId(active.clusterId()).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
setFeatures(brokerFeatures).
setLogDirs(Collections.singletonList(Uuid.fromString("vBpaRsZVSaGsQT53wtYGtg"))).
setListeners(listeners);
if (brokerMaxSupportedKraftVersion < finalizedKraftVersion) {
Throwable exception = assertThrows(ExecutionException.class, () -> active.registerBroker(
ANONYMOUS_CONTEXT,
request).get());
assertEquals(UnsupportedVersionException.class, exception.getCause().getClass());
assertEquals("Unable to register because the broker does not support finalized version " +
finalizedKraftVersion + " of kraft.version. The broker wants a version between 0 and " +
brokerMaxSupportedKraftVersion + ", inclusive.",
exception.getCause().getMessage());
} else {
BrokerRegistrationReply reply = active.registerBroker(
ANONYMOUS_CONTEXT,
request).get();
assertTrue(reply.epoch() >= 5, "Unexpected broker epoch " + reply.epoch());
}
}
}
@Test
public void testUnregisterBroker() throws Throwable {
try (

View File

@ -470,6 +470,11 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
*/
private final EventQueue eventQueue;
/**
* The latest kraft version used by this local log manager.
*/
private final KRaftVersion lastKRaftVersion;
/**
* Whether this LocalLogManager has been shut down.
*/
@ -499,13 +504,15 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
public LocalLogManager(LogContext logContext,
int nodeId,
SharedLogData shared,
String threadNamePrefix) {
String threadNamePrefix,
KRaftVersion lastKRaftVersion) {
this.log = logContext.logger(LocalLogManager.class);
this.nodeId = nodeId;
this.shared = shared;
this.maxReadOffset = shared.initialMaxReadOffset();
this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext,
threadNamePrefix, new ShutdownEvent());
this.lastKRaftVersion = lastKRaftVersion;
shared.registerLogManager(this);
}
@ -840,6 +847,6 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
@Override
public KRaftVersion kraftVersion() {
return KRaftVersion.KRAFT_VERSION_0;
return lastKRaftVersion;
}
}

View File

@ -25,6 +25,7 @@ import org.apache.kafka.metalog.LocalLogManager.LocalRecordBatch;
import org.apache.kafka.metalog.LocalLogManager.SharedLogData;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.test.TestUtils;
@ -71,6 +72,7 @@ public class LocalLogManagerTestEnv implements AutoCloseable {
private final int numManagers;
private Optional<RawSnapshotReader> snapshotReader = Optional.empty();
private Consumer<SharedLogData> sharedLogDataInitializer = __ -> { };
private KRaftVersion lastKRaftVersion = KRaftVersion.KRAFT_VERSION_0;
public Builder(int numManagers) {
this.numManagers = numManagers;
@ -86,11 +88,20 @@ public class LocalLogManagerTestEnv implements AutoCloseable {
return this;
}
/**
* Used to mock the latest KRaft version that would be returned from RaftClient.kraftVersion()
*/
public Builder setLastKRaftVersion(KRaftVersion kraftVersion) {
this.lastKRaftVersion = kraftVersion;
return this;
}
public LocalLogManagerTestEnv build() {
return new LocalLogManagerTestEnv(
numManagers,
snapshotReader,
sharedLogDataInitializer);
sharedLogDataInitializer,
lastKRaftVersion);
}
public LocalLogManagerTestEnv buildWithMockListeners() {
@ -114,7 +125,8 @@ public class LocalLogManagerTestEnv implements AutoCloseable {
private LocalLogManagerTestEnv(
int numManagers,
Optional<RawSnapshotReader> snapshotReader,
Consumer<SharedLogData> sharedLogDataInitializer
Consumer<SharedLogData> sharedLogDataInitializer,
KRaftVersion lastKRaftVersion
) {
clusterId = Uuid.randomUuid().toString();
dir = TestUtils.tempDirectory();
@ -127,7 +139,8 @@ public class LocalLogManagerTestEnv implements AutoCloseable {
new LogContext(String.format("[LocalLogManager %d] ", nodeId)),
nodeId,
shared,
String.format("LocalLogManager-%d_", nodeId)));
String.format("LocalLogManager-%d_", nodeId),
lastKRaftVersion));
}
} catch (Throwable t) {
for (LocalLogManager logManager : newLogManagers) {