KAFKA-16308 [2/N]: Allow unstable feature versions and rename unstable metadata config (#16130)

As per KIP-1022, we will rename the unstable metadata versions enabled config to support all feature versions.

Features is also updated to return latest production and latest testing versions of each feature.

A feature is production ready when the corresponding metadata version (bootstrapMetadataVersion) is production ready.

Adds tests for the feature usage of the unstableFeatureVersionsEnabled config

Reviewers: David Jacot <djacot@confluent.io>, Jun Rao <junrao@gmail.com>
This commit is contained in:
Justine Olshan 2024-05-30 14:52:50 -07:00 committed by GitHub
parent a8e99eb969
commit 7c1bb1585f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 80 additions and 33 deletions

View File

@ -71,22 +71,27 @@ class BrokerFeatures private (@volatile var supportedFeatures: Features[Supporte
object BrokerFeatures extends Logging {
def createDefault(unstableMetadataVersionsEnabled: Boolean): BrokerFeatures = {
new BrokerFeatures(defaultSupportedFeatures(unstableMetadataVersionsEnabled))
def createDefault(unstableFeatureVersionsEnabled: Boolean): BrokerFeatures = {
new BrokerFeatures(defaultSupportedFeatures(unstableFeatureVersionsEnabled))
}
def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): Features[SupportedVersionRange] = {
def defaultSupportedFeatures(unstableFeatureVersionsEnabled: Boolean): Features[SupportedVersionRange] = {
val features = new util.HashMap[String, SupportedVersionRange]()
features.put(MetadataVersion.FEATURE_NAME,
new SupportedVersionRange(
MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
if (unstableMetadataVersionsEnabled) {
if (unstableFeatureVersionsEnabled) {
MetadataVersion.latestTesting.featureLevel
} else {
MetadataVersion.latestProduction.featureLevel
}))
PRODUCTION_FEATURES.forEach { feature =>
features.put(feature.featureName, new SupportedVersionRange(0, feature.latestProduction()))
PRODUCTION_FEATURES.forEach { feature => features.put(feature.featureName,
new SupportedVersionRange(0,
if (unstableFeatureVersionsEnabled) {
feature.latestTesting
} else {
feature.latestProduction
}))
}
Features.supportedFeatures(features)
}

View File

@ -139,7 +139,7 @@ class BrokerServer(
var brokerMetadataPublisher: BrokerMetadataPublisher = _
val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault(config.unstableMetadataVersionsEnabled)
val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault(config.unstableFeatureVersionsEnabled)
def kafkaYammerMetrics: KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE

View File

@ -217,7 +217,7 @@ class ControllerServer(
startupDeadline, time)
val controllerNodes = QuorumConfig.voterConnectionsToNodes(voterConnections)
val quorumFeatures = new QuorumFeatures(config.nodeId,
QuorumFeatures.defaultFeatureMap(config.unstableMetadataVersionsEnabled),
QuorumFeatures.defaultFeatureMap(config.unstableFeatureVersionsEnabled),
controllerNodes.asScala.map(node => Integer.valueOf(node.id())).asJava)
val delegationTokenKeyString = {
@ -349,7 +349,7 @@ class ControllerServer(
clusterId,
time,
s"controller-${config.nodeId}-",
QuorumFeatures.defaultFeatureMap(config.unstableMetadataVersionsEnabled),
QuorumFeatures.defaultFeatureMap(config.unstableFeatureVersionsEnabled),
config.migrationEnabled,
incarnationId,
listenerInfo)

View File

@ -449,8 +449,8 @@ object KafkaConfig {
/** Internal Configurations **/
// This indicates whether unreleased APIs should be advertised by this node.
.defineInternal(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH)
// This indicates whether unreleased MetadataVersions should be enabled on this node.
.defineInternal(ServerConfigs.UNSTABLE_METADATA_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH)
// This indicates whether unreleased MetadataVersions or other feature versions should be enabled on this node.
.defineInternal(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH)
}
/** ********* Remote Log Management Configuration *********/
@ -1064,7 +1064,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
/** Internal Configurations **/
val unstableApiVersionsEnabled = getBoolean(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG)
val unstableMetadataVersionsEnabled = getBoolean(ServerConfigs.UNSTABLE_METADATA_VERSIONS_ENABLE_CONFIG)
val unstableFeatureVersionsEnabled = getBoolean(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG)
def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
dynamicConfig.addReconfigurable(reconfigurable)

View File

@ -82,6 +82,7 @@ object StorageTool extends Logging {
metadataVersion,
featureNamesAndLevelsMap,
Features.PRODUCTION_FEATURES.asScala.toList,
config.get.unstableFeatureVersionsEnabled,
releaseVersionFlagSpecified
)
getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => {
@ -121,7 +122,7 @@ object StorageTool extends Logging {
throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
}
if (!metadataVersion.isProduction) {
if (config.get.unstableMetadataVersionsEnabled) {
if (config.get.unstableFeatureVersionsEnabled) {
System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.")
} else {
throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.")
@ -133,6 +134,7 @@ object StorageTool extends Logging {
metadataVersion: MetadataVersion,
specifiedFeatures: Map[String, java.lang.Short],
allFeatures: List[Features],
unstableFeatureVersionsEnabled: Boolean,
releaseVersionSpecified: Boolean): Unit = {
// If we are using --release-version, the default is based on the metadata version.
val metadataVersionForDefault = if (releaseVersionSpecified) metadataVersion else MetadataVersion.LATEST_PRODUCTION
@ -143,7 +145,7 @@ object StorageTool extends Logging {
val level: java.lang.Short = specifiedFeatures.getOrElse(feature.featureName, feature.defaultValue(metadataVersionForDefault))
// Only set feature records for levels greater than 0. 0 is assumed if there is no record. Throw an error if level < 0.
if (level != 0) {
allNonZeroFeaturesAndLevels.append(feature.fromFeatureLevel(level))
allNonZeroFeaturesAndLevels.append(feature.fromFeatureLevel(level, unstableFeatureVersionsEnabled))
}
}
val featuresMap = Features.featureImplsToMap(allNonZeroFeaturesAndLevels.asJava)

View File

@ -213,7 +213,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
if (controllerNode != null) {
props.putAll(controllerNode.propertyOverrides());
}
props.putIfAbsent(ServerConfigs.UNSTABLE_METADATA_VERSIONS_ENABLE_CONFIG, "true");
props.putIfAbsent(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, "true");
props.putIfAbsent(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true");
return new KafkaConfig(props, false, Option.empty());
}

View File

@ -324,7 +324,7 @@ abstract class QuorumTestHarness extends Logging {
props.putAll(overridingProps)
props.setProperty(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG, TimeUnit.MINUTES.toMillis(10).toString)
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
props.setProperty(ServerConfigs.UNSTABLE_METADATA_VERSIONS_ENABLE_CONFIG, "true")
props.setProperty(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, "true")
if (props.getProperty(KRaftConfigs.NODE_ID_CONFIG) == null) {
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1000")
}

View File

@ -43,7 +43,7 @@ object ApiVersionsRequestTest {
def testApiVersionsRequestTemplate(): java.util.List[ClusterConfig] = {
val serverProperties: java.util.HashMap[String, String] = controlPlaneListenerProperties()
serverProperties.put("unstable.api.versions.enable", "false")
serverProperties.put("unstable.metadata.versions.enable", "true")
serverProperties.put("unstable.feature.versions.enable", "true")
List(ClusterConfig.defaultBuilder()
.setTypes(java.util.Collections.singleton(Type.ZK))
.setServerProperties(serverProperties)
@ -54,7 +54,7 @@ object ApiVersionsRequestTest {
def testApiVersionsRequestIncludesUnreleasedApisTemplate(): java.util.List[ClusterConfig] = {
val serverProperties: java.util.HashMap[String, String] = controlPlaneListenerProperties()
serverProperties.put("unstable.api.versions.enable", "true")
serverProperties.put("unstable.metadata.versions.enable", "true")
serverProperties.put("unstable.feature.versions.enable", "true")
List(ClusterConfig.defaultBuilder()
.setTypes(java.util.Collections.singleton(Type.ZK))
.setServerProperties(serverProperties)
@ -64,7 +64,7 @@ object ApiVersionsRequestTest {
def testApiVersionsRequestValidationV0Template(): java.util.List[ClusterConfig] = {
val serverProperties: java.util.HashMap[String, String] = controlPlaneListenerProperties()
serverProperties.put("unstable.api.versions.enable", "false")
serverProperties.put("unstable.metadata.versions.enable", "false")
serverProperties.put("unstable.feature.versions.enable", "false")
List(ClusterConfig.defaultBuilder()
.setTypes(java.util.Collections.singleton(Type.ZK))
.setMetadataVersion(MetadataVersion.IBP_3_7_IV4)
@ -85,7 +85,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
@ClusterTemplate("testApiVersionsRequestTemplate")
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true")
new ClusterConfigProperty(key = "unstable.feature.versions.enable", value = "true")
))
def testApiVersionsRequest(): Unit = {
val request = new ApiVersionsRequest.Builder().build()
@ -96,7 +96,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
@ClusterTemplate("testApiVersionsRequestIncludesUnreleasedApisTemplate")
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"),
new ClusterConfigProperty(key = "unstable.feature.versions.enable", value = "true"),
))
def testApiVersionsRequestIncludesUnreleasedApis(): Unit = {
val request = new ApiVersionsRequest.Builder().build()
@ -134,7 +134,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
@ClusterTemplate("testApiVersionsRequestValidationV0Template")
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "false"),
new ClusterConfigProperty(key = "unstable.feature.versions.enable", value = "false"),
))
def testApiVersionsRequestValidationV0(): Unit = {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])

View File

@ -354,6 +354,7 @@ Found problem:
MetadataVersion.LATEST_PRODUCTION,
Map(TestFeatureVersion.FEATURE_NAME -> featureLevel),
allFeatures,
false,
false
)
if (featureLevel > 0) {
@ -371,6 +372,7 @@ Found problem:
metadataVersion,
Map.empty,
allFeatures,
true,
true
)
@ -387,6 +389,7 @@ Found problem:
MetadataVersion.LATEST_PRODUCTION,
Map.empty,
allFeatures,
false,
false
)
@ -402,6 +405,7 @@ Found problem:
MetadataVersion.IBP_2_8_IV1,
Map(TestFeatureVersion.FEATURE_NAME -> featureLevel),
allFeatures,
false,
false
))
}
@ -414,6 +418,7 @@ Found problem:
MetadataVersion.IBP_3_3_IV0,
Map.empty,
allFeatures,
false,
false
)
@ -428,6 +433,19 @@ Found problem:
MetadataVersion.LATEST_PRODUCTION,
Map(TestFeatureVersion.FEATURE_NAME -> featureLevel),
allFeatures,
false,
false
))
}
@Test
def testUnstableFeatureThrowsError(): Unit = {
assertThrows(classOf[IllegalArgumentException], () => StorageTool.generateFeatureRecords(
new ArrayBuffer[ApiMessageAndVersion](),
MetadataVersion.LATEST_PRODUCTION,
Map(TestFeatureVersion.FEATURE_NAME -> Features.TEST_VERSION.latestTesting),
allFeatures,
false,
false
))
}
@ -606,7 +624,7 @@ Found problem:
val propsStream = Files.newOutputStream(propsFile.toPath)
try {
properties.setProperty(ServerLogConfigs.LOG_DIRS_CONFIG, TestUtils.tempDir().toString)
properties.setProperty(ServerConfigs.UNSTABLE_METADATA_VERSIONS_ENABLE_CONFIG, enableUnstable.toString)
properties.setProperty(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, enableUnstable.toString)
properties.store(propsStream, "config.props")
} finally {
propsStream.close()

View File

@ -304,7 +304,7 @@ object TestUtils extends Logging {
}.mkString(",")
val props = new Properties
props.put(ServerConfigs.UNSTABLE_METADATA_VERSIONS_ENABLE_CONFIG, "true")
props.put(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, "true")
if (zkConnect == null) {
props.setProperty(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG, TimeUnit.MINUTES.toMillis(10).toString)
props.put(KRaftConfigs.NODE_ID_CONFIG, nodeId.toString)

View File

@ -65,8 +65,9 @@ public final class QuorumFeatures {
for (Features feature : Features.PRODUCTION_FEATURES) {
features.put(feature.featureName(), VersionRange.of(
0,
feature.latestProduction()
));
enableUnstable ?
feature.latestTesting() :
feature.latestProduction()));
}
return features;
}

View File

@ -76,16 +76,22 @@ public enum Features {
return defaultValue(MetadataVersion.LATEST_PRODUCTION);
}
public short latestTesting() {
return featureVersions[featureVersions.length - 1].featureLevel();
}
/**
* Creates a FeatureVersion from a level.
*
* @param level the level of the feature
* @param allowUnstableFeatureVersions whether unstable versions can be used
* @return the FeatureVersionUtils.FeatureVersion for the feature the enum is based on.
* @throws IllegalArgumentException if the feature is not known.
*/
public FeatureVersion fromFeatureLevel(short level) {
public FeatureVersion fromFeatureLevel(short level,
boolean allowUnstableFeatureVersions) {
return Arrays.stream(featureVersions).filter(featureVersion ->
featureVersion.featureLevel() == level).findFirst().orElseThrow(
featureVersion.featureLevel() == level && (allowUnstableFeatureVersions || level <= latestProduction())).findFirst().orElseThrow(
() -> new IllegalArgumentException("No feature:" + featureName() + " with feature level " + level));
}

View File

@ -34,8 +34,16 @@ public class FeaturesTest {
public void testFromFeatureLevelAllFeatures(Features feature) {
FeatureVersion[] featureImplementations = feature.featureVersions();
int numFeatures = featureImplementations.length;
short latestProductionLevel = feature.latestProduction();
for (short i = 1; i < numFeatures; i++) {
assertEquals(featureImplementations[i - 1], feature.fromFeatureLevel(i));
short level = i;
if (latestProductionLevel < i) {
assertEquals(featureImplementations[i - 1], feature.fromFeatureLevel(level, true));
assertThrows(IllegalArgumentException.class, () -> feature.fromFeatureLevel(level, false));
} else {
assertEquals(featureImplementations[i - 1], feature.fromFeatureLevel(level, false));
}
}
}
@ -111,4 +119,11 @@ public class FeaturesTest {
}
assertEquals(expectedVersion, Features.TEST_VERSION.defaultValue(metadataVersion));
}
@Test
public void testUnstableTestVersion() {
assertThrows(IllegalArgumentException.class, () ->
Features.TEST_VERSION.fromFeatureLevel(Features.TEST_VERSION.latestTesting(), false));
Features.TEST_VERSION.fromFeatureLevel(Features.TEST_VERSION.latestTesting(), true);
}
}

View File

@ -124,7 +124,7 @@ public class ServerConfigs {
/** Internal Configurations **/
public static final String UNSTABLE_API_VERSIONS_ENABLE_CONFIG = "unstable.api.versions.enable";
public static final String UNSTABLE_METADATA_VERSIONS_ENABLE_CONFIG = "unstable.metadata.versions.enable";
public static final String UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG = "unstable.feature.versions.enable";
/************* Authorizer Configuration ***********/
public static final String AUTHORIZER_CLASS_NAME_CONFIG = "authorizer.class.name";