KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool (#15685)

As part of KIP-1022, I have created an interface for all the new features to be used when parsing the command line arguments, doing validations, getting default versions, etc.

I've also added the --feature flag to the storage tool to show how it will be used.

Created a TestFeatureVersion to show an implementation of the interface (besides MetadataVersion which is unique) and added tests using this new test feature.

I will add the unstable config and tests in a followup.

Reviewers: David Mao <dmao@confluent.io>, David Jacot <djacot@confluent.io>, Artem Livshits <alivshits@confluent.io>, Jun Rao <junrao@apache.org>
This commit is contained in:
Justine Olshan 2024-05-29 16:36:06 -07:00 committed by GitHub
parent 5c08ee0062
commit 5e3df22095
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 795 additions and 156 deletions

View File

@ -26,7 +26,7 @@ import org.apache.kafka.common.utils.Utils;
/**
* Represents an immutable basic version range using 2 attributes: min and max, each of type short.
* The min and max attributes need to satisfy 2 rules:
* - they are each expected to be >= 0, as we only consider positive version values to be valid.
* - they are each expected to be >= 0, as we only consider non-negative version values to be valid.
* - max should be >= min.
*
* The class also provides API to convert the version range to a map.

View File

@ -23,7 +23,7 @@ import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.ApiVersionsResponse
import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.common.Features
import org.apache.kafka.server.common.FinalizedFeatures
import scala.collection.mutable
import scala.jdk.CollectionConverters._
@ -40,7 +40,7 @@ trait ApiVersionManager {
}
def newRequestMetrics: RequestChannel.Metrics = new network.RequestChannel.Metrics(enabledApis)
def features: Features
def features: FinalizedFeatures
}
object ApiVersionManager {
@ -73,6 +73,7 @@ object ApiVersionManager {
* @param brokerFeatures the broker features
* @param enableUnstableLastVersion whether to enable unstable last version, see [[KafkaConfig.unstableApiVersionsEnabled]]
* @param zkMigrationEnabled whether to enable zk migration, see [[KafkaConfig.migrationEnabled]]
* @param featuresProvider a provider to the finalized features supported
*/
class SimpleApiVersionManager(
val listenerType: ListenerType,
@ -80,14 +81,14 @@ class SimpleApiVersionManager(
brokerFeatures: org.apache.kafka.common.feature.Features[SupportedVersionRange],
val enableUnstableLastVersion: Boolean,
val zkMigrationEnabled: Boolean,
val featuresProvider: () => Features
val featuresProvider: () => FinalizedFeatures
) extends ApiVersionManager {
def this(
listenerType: ListenerType,
enableUnstableLastVersion: Boolean,
zkMigrationEnabled: Boolean,
featuresProvider: () => Features
featuresProvider: () => FinalizedFeatures
) = {
this(
listenerType,
@ -113,7 +114,7 @@ class SimpleApiVersionManager(
)
}
override def features: Features = featuresProvider.apply()
override def features: FinalizedFeatures = featuresProvider.apply()
}
/**
@ -164,5 +165,5 @@ class DefaultApiVersionManager(
)
}
override def features: Features = metadataCache.features()
override def features: FinalizedFeatures = metadataCache.features()
}

View File

@ -19,6 +19,7 @@ package kafka.server
import kafka.utils.Logging
import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.server.common.Features.PRODUCTION_FEATURES
import org.apache.kafka.server.common.MetadataVersion
import java.util
@ -75,16 +76,19 @@ object BrokerFeatures extends Logging {
}
def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): Features[SupportedVersionRange] = {
Features.supportedFeatures(
java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME,
val features = new util.HashMap[String, SupportedVersionRange]()
features.put(MetadataVersion.FEATURE_NAME,
new SupportedVersionRange(
MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
if (unstableMetadataVersionsEnabled) {
MetadataVersion.latestTesting.featureLevel
} else {
MetadataVersion.latestProduction.featureLevel
}
)))
}))
PRODUCTION_FEATURES.forEach { feature =>
features.put(feature.featureName, new SupportedVersionRange(0, feature.latestProduction()))
}
Features.supportedFeatures(features)
}
def createEmpty(): BrokerFeatures = {

View File

@ -22,7 +22,7 @@ import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import java.util
import scala.collection._
@ -109,7 +109,7 @@ trait MetadataCache {
def getRandomAliveBrokerId: Option[Int]
def features(): Features
def features(): FinalizedFeatures
}
object MetadataCache {

View File

@ -33,7 +33,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.MetadataResponse
import org.apache.kafka.image.MetadataImage
import org.apache.kafka.metadata.{BrokerRegistration, PartitionRegistration, Replicas}
import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import java.util
import java.util.concurrent.ThreadLocalRandom
@ -539,9 +539,9 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w
override def metadataVersion(): MetadataVersion = _currentImage.features().metadataVersion()
override def features(): Features = {
override def features(): FinalizedFeatures = {
val image = _currentImage
new Features(image.features().metadataVersion(),
new FinalizedFeatures(image.features().metadataVersion(),
image.features().finalizedVersions(),
image.highestOffsetAndEpoch().offset,
true)

View File

@ -40,7 +40,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractControlRequest, ApiVersionsResponse, MetadataResponse, UpdateMetadataRequest}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import java.util.concurrent.{ThreadLocalRandom, TimeUnit}
import scala.concurrent.TimeoutException
@ -53,7 +53,7 @@ class FeatureCacheUpdateException(message: String) extends RuntimeException(mess
trait ZkFinalizedFeatureCache {
def waitUntilFeatureEpochOrThrow(minExpectedEpoch: Long, timeoutMs: Long): Unit
def getFeatureOption: Option[Features]
def getFeatureOption: Option[FinalizedFeatures]
}
case class MetadataSnapshot(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
@ -177,7 +177,7 @@ class ZkMetadataCache(
private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None)
// Features are updated via ZK notification (see FinalizedFeatureChangeListener)
@volatile private var _features: Option[Features] = Option.empty
@volatile private var _features: Option[FinalizedFeatures] = Option.empty
private val featureLock = new ReentrantLock()
private val featureCond = featureLock.newCondition()
@ -617,9 +617,9 @@ class ZkMetadataCache(
override def metadataVersion(): MetadataVersion = metadataVersion
override def features(): Features = _features match {
override def features(): FinalizedFeatures = _features match {
case Some(features) => features
case None => new Features(metadataVersion,
case None => new FinalizedFeatures(metadataVersion,
Collections.emptyMap(),
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
false)
@ -639,7 +639,7 @@ class ZkMetadataCache(
* not modified.
*/
def updateFeaturesOrThrow(latestFeatures: Map[String, Short], latestEpoch: Long): Unit = {
val latest = new Features(metadataVersion,
val latest = new FinalizedFeatures(metadataVersion,
latestFeatures.map(kv => (kv._1, kv._2.asInstanceOf[java.lang.Short])).asJava,
latestEpoch,
false)
@ -711,5 +711,5 @@ class ZkMetadataCache(
}
}
override def getFeatureOption: Option[Features] = _features
override def getFeatureOption: Option[FinalizedFeatures] = _features
}

View File

@ -28,7 +28,7 @@ import net.sourceforge.argparse4j.inf.Namespace
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadata}
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.common.{ApiMessageAndVersion, Features, MetadataVersion}
import org.apache.kafka.common.metadata.FeatureLevelRecord
import org.apache.kafka.common.metadata.UserScramCredentialRecord
import org.apache.kafka.common.security.scram.internals.ScramMechanism
@ -36,10 +36,10 @@ import org.apache.kafka.common.security.scram.internals.ScramFormatter
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.server.common.FeatureVersion
import java.util
import java.util.Base64
import java.util.Optional
import java.util.{Base64, Collections, Optional}
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ArrayBuffer
@ -60,24 +60,30 @@ object StorageTool extends Logging {
case "format" =>
val directories = configToLogDirectories(config.get)
val clusterId = namespace.getString("cluster_id")
val metadataVersion = getMetadataVersion(namespace,
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
if (!metadataVersion.isKRaftSupported) {
throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
}
if (!metadataVersion.isProduction) {
if (config.get.unstableMetadataVersionsEnabled) {
System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.")
} else {
throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.")
}
}
val metaProperties = new MetaProperties.Builder().
setVersion(MetaPropertiesVersion.V1).
setClusterId(clusterId).
setNodeId(config.get.nodeId).
build()
val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer()
val specifiedFeatures: util.List[String] = namespace.getList("feature")
val releaseVersionFlagSpecified = namespace.getString("release_version") != null
if (releaseVersionFlagSpecified && specifiedFeatures != null) {
throw new TerseFailure("Both --release-version and --feature were set. Only one of the two flags can be set.")
}
val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap,
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
validateMetadataVersion(metadataVersion, config)
// Get all other features, validate, and create records for them
// Use latest default for features if --release-version is not specified
generateFeatureRecords(
metadataRecords,
metadataVersion,
featureNamesAndLevelsMap,
Features.PRODUCTION_FEATURES.asScala.toList,
releaseVersionFlagSpecified
)
getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => {
if (!metadataVersion.isScramSupported) {
throw new TerseFailure(s"SCRAM is only supported in metadata.version ${MetadataVersion.IBP_3_5_IV2} or later.")
@ -86,6 +92,7 @@ object StorageTool extends Logging {
metadataRecords.append(new ApiMessageAndVersion(record, 0.toShort))
}
})
val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, Some(metadataRecords), "format command")
val ignoreFormatted = namespace.getBoolean("ignore_formatted")
if (!configToSelfManagedMode(config.get)) {
@ -109,6 +116,52 @@ object StorageTool extends Logging {
}
}
private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = {
if (!metadataVersion.isKRaftSupported) {
throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
}
if (!metadataVersion.isProduction) {
if (config.get.unstableMetadataVersionsEnabled) {
System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.")
} else {
throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.")
}
}
}
private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion],
metadataVersion: MetadataVersion,
specifiedFeatures: Map[String, java.lang.Short],
allFeatures: List[Features],
releaseVersionSpecified: Boolean): Unit = {
// If we are using --release-version, the default is based on the metadata version.
val metadataVersionForDefault = if (releaseVersionSpecified) metadataVersion else MetadataVersion.LATEST_PRODUCTION
val allNonZeroFeaturesAndLevels: ArrayBuffer[FeatureVersion] = mutable.ArrayBuffer[FeatureVersion]()
allFeatures.foreach { feature =>
val level: java.lang.Short = specifiedFeatures.getOrElse(feature.featureName, feature.defaultValue(metadataVersionForDefault))
// Only set feature records for levels greater than 0. 0 is assumed if there is no record. Throw an error if level < 0.
if (level != 0) {
allNonZeroFeaturesAndLevels.append(feature.fromFeatureLevel(level))
}
}
val featuresMap = Features.featureImplsToMap(allNonZeroFeaturesAndLevels.asJava)
featuresMap.put(MetadataVersion.FEATURE_NAME, metadataVersion.featureLevel)
try {
for (feature <- allNonZeroFeaturesAndLevels) {
// In order to validate, we need all feature versions set.
Features.validateVersion(feature, featuresMap)
metadataRecords.append(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(feature.featureName).
setFeatureLevel(feature.featureLevel), 0.toShort))
}
} catch {
case e: Throwable => throw new TerseFailure(e.getMessage)
}
}
def parseArguments(args: Array[String]): Namespace = {
val parser = ArgumentParsers.
newArgumentParser("kafka-storage", /* defaultHelp */ true, /* prefixChars */ "-", /* fromFilePrefix */ "@").
@ -141,6 +194,9 @@ object StorageTool extends Logging {
formatParser.addArgument("--release-version", "-r").
action(store()).
help(s"A KRaft release version to use for the initial metadata.version. The minimum is ${MetadataVersion.IBP_3_0_IV0}, the default is ${MetadataVersion.LATEST_PRODUCTION}")
formatParser.addArgument("--feature", "-f").
help("A feature upgrade we should perform, in feature=level format. For example: `metadata.version=5`.").
action(append());
parser.parseArgsOrFail(args)
}
@ -156,6 +212,7 @@ object StorageTool extends Logging {
def getMetadataVersion(
namespace: Namespace,
featureNamesAndLevelsMap: Map[String, java.lang.Short],
defaultVersionString: Option[String]
): MetadataVersion = {
val defaultValue = defaultVersionString match {
@ -163,9 +220,19 @@ object StorageTool extends Logging {
case None => MetadataVersion.LATEST_PRODUCTION
}
Option(namespace.getString("release_version"))
.map(ver => MetadataVersion.fromVersionString(ver))
.getOrElse(defaultValue)
val releaseVersionTag = Option(namespace.getString("release_version"))
val featureTag = featureNamesAndLevelsMap.get(MetadataVersion.FEATURE_NAME)
(releaseVersionTag, featureTag) match {
case (Some(_), Some(_)) => // We should throw an error before we hit this case, but include for completeness
throw new IllegalArgumentException("Both --release_version and --feature were set. Only one of the two flags can be set.")
case (Some(version), None) =>
MetadataVersion.fromVersionString(version)
case (None, Some(level)) =>
MetadataVersion.fromFeatureLevel(level)
case (None, None) =>
defaultValue
}
}
private def getUserScramCredentialRecord(
@ -469,4 +536,27 @@ object StorageTool extends Logging {
}
0
}
private def parseNameAndLevel(input: String): (String, java.lang.Short) = {
val equalsIndex = input.indexOf("=")
if (equalsIndex < 0)
throw new RuntimeException("Can't parse feature=level string " + input + ": equals sign not found.")
val name = input.substring(0, equalsIndex).trim
val levelString = input.substring(equalsIndex + 1).trim
try {
levelString.toShort
} catch {
case _: Throwable =>
throw new RuntimeException("Can't parse feature=level string " + input + ": " + "unable to parse " + levelString + " as a short.")
}
(name, levelString.toShort)
}
def featureNamesAndLevels(features: List[String]): Map[String, java.lang.Short] = {
features.map { (feature: String) =>
// Ensure the feature exists
val nameAndLevel = parseNameAndLevel(feature)
(nameAndLevel._1, nameAndLevel._2)
}.toMap
}
}

View File

@ -37,7 +37,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
import org.apache.kafka.raft.errors.NotLeaderException
import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient, QuorumConfig}
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.config.KRaftConfigs
import org.apache.kafka.server.fault.ProcessTerminatingFaultHandler
@ -81,7 +81,7 @@ class TestRaftServer(
ListenerType.CONTROLLER,
true,
false,
() => Features.fromKRaftVersion(MetadataVersion.MINIMUM_KRAFT_VERSION))
() => FinalizedFeatures.fromKRaftVersion(MetadataVersion.MINIMUM_KRAFT_VERSION))
socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
raftManager = new KafkaRaftManager[Array[Byte]](

View File

@ -37,7 +37,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.utils._
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import org.apache.kafka.server.config.{ServerConfigs, QuotaConfigs}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
@ -80,7 +80,7 @@ class SocketServerTest {
TestUtils.clearYammerMetrics()
private val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true, false,
() => new Features(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true))
() => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true))
var server: SocketServer = _
val sockets = new ArrayBuffer[Socket]

View File

@ -53,7 +53,7 @@ import org.apache.kafka.controller.{Controller, ControllerRequestContext, Result
import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer}
import org.apache.kafka.server.common.{ApiMessageAndVersion, Features, MetadataVersion, ProducerIdsBlock}
import org.apache.kafka.server.common.{ApiMessageAndVersion, FinalizedFeatures, MetadataVersion, ProducerIdsBlock}
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs}
import org.apache.kafka.server.util.FutureUtils
import org.apache.kafka.storage.internals.log.CleanerConfig
@ -169,7 +169,7 @@ class ControllerApisTest {
ListenerType.CONTROLLER,
true,
false,
() => Features.fromKRaftVersion(MetadataVersion.latestTesting())),
() => FinalizedFeatures.fromKRaftVersion(MetadataVersion.latestTesting())),
metadataCache
)
}

View File

@ -21,7 +21,7 @@ import kafka.server.metadata.ZkMetadataCache
import kafka.utils.TestUtils
import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.server.common.{Features => JFeatures}
import org.apache.kafka.server.common.{FinalizedFeatures => JFeatures}
import org.apache.kafka.common.utils.Exit
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_3_2_IV0

View File

@ -79,7 +79,7 @@ import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1}
import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import org.apache.kafka.server.config._
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
import org.apache.kafka.server.util.{FutureUtils, MockTime}
@ -199,7 +199,7 @@ class KafkaApisTest extends Logging {
BrokerFeatures.defaultSupportedFeatures(true),
true,
false,
() => new Features(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, raftSupport))
() => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, raftSupport))
val clientMetricsManagerOpt = if (raftSupport) Some(clientMetricsManager) else None

View File

@ -21,25 +21,27 @@ import java.io.{ByteArrayOutputStream, PrintStream}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import java.util
import java.util.Properties
import java.util.{Collections, Properties}
import org.apache.kafka.common.{DirectoryId, KafkaException}
import kafka.server.KafkaConfig
import kafka.utils.Exit
import kafka.utils.TestUtils
import net.sourceforge.argparse4j.inf.Namespace
import org.apache.commons.io.output.NullOutputStream
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.common.metadata.UserScramCredentialRecord
import org.apache.kafka.server.common.{ApiMessageAndVersion, Features, MetadataVersion, TestFeatureVersion}
import org.apache.kafka.common.metadata.{FeatureLevelRecord, UserScramCredentialRecord}
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ServerLogConfigs}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue}
import org.junit.jupiter.api.{Test, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
@Timeout(value = 40)
class StorageToolTest {
@ -54,6 +56,8 @@ class StorageToolTest {
properties
}
val allFeatures = Features.FEATURES.toList
@Test
def testConfigToLogDirectories(): Unit = {
val config = new KafkaConfig(newSelfManagedProperties())
@ -262,7 +266,7 @@ Found problem:
@Test
def testDefaultMetadataVersion(): Unit = {
val namespace = StorageTool.parseArguments(Array("format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ"))
val mv = StorageTool.getMetadataVersion(namespace, defaultVersionString = None)
val mv = StorageTool.getMetadataVersion(namespace, Map.empty, defaultVersionString = None)
assertEquals(MetadataVersion.LATEST_PRODUCTION.featureLevel(), mv.featureLevel(),
"Expected the default metadata.version to be the latest production version")
}
@ -270,18 +274,58 @@ Found problem:
@Test
def testConfiguredMetadataVersion(): Unit = {
val namespace = StorageTool.parseArguments(Array("format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ"))
val mv = StorageTool.getMetadataVersion(namespace, defaultVersionString = Some(MetadataVersion.IBP_3_3_IV2.toString))
val mv = StorageTool.getMetadataVersion(namespace, Map.empty, defaultVersionString = Some(MetadataVersion.IBP_3_3_IV2.toString))
assertEquals(MetadataVersion.IBP_3_3_IV2.featureLevel(), mv.featureLevel(),
"Expected the default metadata.version to be 3.3-IV2")
}
@Test
def testSettingFeatureAndReleaseVersionFails(): Unit = {
val namespace = StorageTool.parseArguments(Array("format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ",
"--release-version", "3.0-IV1", "--feature", "metadata.version=4"))
assertThrows(classOf[IllegalArgumentException], () => StorageTool.getMetadataVersion(namespace, parseFeatures(namespace), defaultVersionString = None))
}
@Test
def testParseFeatures(): Unit = {
def parseAddFeatures(strings: String*): Map[String, java.lang.Short] = {
var args = mutable.Seq("format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ")
args ++= strings
val namespace = StorageTool.parseArguments(args.toArray)
parseFeatures(namespace)
}
assertThrows(classOf[RuntimeException], () => parseAddFeatures("--feature", "blah"))
assertThrows(classOf[RuntimeException], () => parseAddFeatures("--feature", "blah=blah"))
// Test with no features
assertEquals(Map(), parseAddFeatures())
// Test with one feature
val testFeatureLevel = 1
val testArgument = TestFeatureVersion.FEATURE_NAME + "=" + testFeatureLevel.toString
val expectedMap = Map(TestFeatureVersion.FEATURE_NAME -> testFeatureLevel.toShort)
assertEquals(expectedMap, parseAddFeatures("--feature", testArgument))
// Test with two features
val metadataFeatureLevel = 5
val metadataArgument = MetadataVersion.FEATURE_NAME + "=" + metadataFeatureLevel.toString
val expectedMap2 = expectedMap ++ Map (MetadataVersion.FEATURE_NAME -> metadataFeatureLevel.toShort)
assertEquals(expectedMap2, parseAddFeatures("--feature", testArgument, "--feature", metadataArgument))
}
private def parseFeatures(namespace: Namespace): Map[String, java.lang.Short] = {
val specifiedFeatures: util.List[String] = namespace.getList("feature")
StorageTool.featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
}
@Test
def testMetadataVersionFlags(): Unit = {
def parseMetadataVersion(strings: String*): MetadataVersion = {
var args = mutable.Seq("format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ")
args ++= strings
val namespace = StorageTool.parseArguments(args.toArray)
StorageTool.getMetadataVersion(namespace, defaultVersionString = None)
StorageTool.getMetadataVersion(namespace, Map.empty, defaultVersionString = None)
}
var mv = parseMetadataVersion("--release-version", "3.0")
@ -293,6 +337,101 @@ Found problem:
assertThrows(classOf[IllegalArgumentException], () => parseMetadataVersion("--release-version", "0.0"))
}
private def generateRecord(featureName: String, level: Short): ApiMessageAndVersion = {
new ApiMessageAndVersion(new FeatureLevelRecord().
setName(featureName).
setFeatureLevel(level), 0.toShort)
}
@ParameterizedTest
@EnumSource(classOf[TestFeatureVersion])
def testFeatureFlag(testFeatureVersion: TestFeatureVersion): Unit = {
val featureLevel = testFeatureVersion.featureLevel
if (featureLevel <= Features.TEST_VERSION.defaultValue(MetadataVersion.LATEST_PRODUCTION)) {
val records = new ArrayBuffer[ApiMessageAndVersion]()
StorageTool.generateFeatureRecords(
records,
MetadataVersion.LATEST_PRODUCTION,
Map(TestFeatureVersion.FEATURE_NAME -> featureLevel),
allFeatures,
false
)
if (featureLevel > 0) {
assertEquals(List(generateRecord(TestFeatureVersion.FEATURE_NAME, featureLevel)), records)
}
}
}
@ParameterizedTest
@EnumSource(classOf[MetadataVersion])
def testVersionDefault(metadataVersion: MetadataVersion): Unit = {
val records = new ArrayBuffer[ApiMessageAndVersion]()
StorageTool.generateFeatureRecords(
records,
metadataVersion,
Map.empty,
allFeatures,
true
)
val featureLevel = Features.TEST_VERSION.defaultValue(metadataVersion)
if (featureLevel > 0) {
assertEquals(List(generateRecord(TestFeatureVersion.FEATURE_NAME, featureLevel)), records)
}
}
@Test
def testVersionDefaultNoArgs(): Unit = {
val records = new ArrayBuffer[ApiMessageAndVersion]()
StorageTool.generateFeatureRecords(
records,
MetadataVersion.LATEST_PRODUCTION,
Map.empty,
allFeatures,
false
)
assertEquals(List(generateRecord(TestFeatureVersion.FEATURE_NAME, Features.TEST_VERSION.defaultValue(MetadataVersion.LATEST_PRODUCTION))), records)
}
@Test
def testFeatureDependency(): Unit = {
val featureLevel = 1.toShort
assertThrows(classOf[TerseFailure], () => StorageTool.generateFeatureRecords(
new ArrayBuffer[ApiMessageAndVersion](),
MetadataVersion.IBP_2_8_IV1,
Map(TestFeatureVersion.FEATURE_NAME -> featureLevel),
allFeatures,
false
))
}
@Test
def testLatestFeaturesWithOldMetadataVersion(): Unit = {
val records = new ArrayBuffer[ApiMessageAndVersion]()
StorageTool.generateFeatureRecords(
records,
MetadataVersion.IBP_3_3_IV0,
Map.empty,
allFeatures,
false
)
assertEquals(List(generateRecord(TestFeatureVersion.FEATURE_NAME, Features.TEST_VERSION.defaultValue(MetadataVersion.LATEST_PRODUCTION))), records)
}
@Test
def testFeatureInvalidFlag(): Unit = {
val featureLevel = 99.toShort
assertThrows(classOf[IllegalArgumentException], () => StorageTool.generateFeatureRecords(
new ArrayBuffer[ApiMessageAndVersion](),
MetadataVersion.LATEST_PRODUCTION,
Map(TestFeatureVersion.FEATURE_NAME -> featureLevel),
allFeatures,
false
))
}
@Test
def testAddScram():Unit = {
def parseAddScram(strings: String*): Option[ArrayBuffer[UserScramCredentialRecord]] = {

View File

@ -59,7 +59,7 @@ import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.FinalizedFeatures;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.KRaftConfigs;
@ -204,7 +204,7 @@ public class KRaftMetadataRequestBenchmark {
ApiMessageType.ListenerType.BROKER,
false,
false,
() -> Features.fromKRaftVersion(MetadataVersion.latestTesting()))).
() -> FinalizedFeatures.fromKRaftVersion(MetadataVersion.latestTesting()))).
build();
}

View File

@ -59,7 +59,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.FinalizedFeatures;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.ZkConfigs;
@ -204,7 +204,7 @@ public class MetadataRequestBenchmark {
ApiMessageType.ListenerType.ZK_BROKER,
false,
false,
() -> Features.fromKRaftVersion(MetadataVersion.latestTesting()))).
() -> FinalizedFeatures.fromKRaftVersion(MetadataVersion.latestTesting()))).
build();
}

View File

@ -50,6 +50,7 @@ import org.apache.kafka.metadata.placement.ReplicaPlacer;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
@ -459,18 +460,19 @@ public class ClusterControlManager {
FinalizedControllerFeatures finalizedFeatures,
BrokerRegistrationRequestData.Feature feature
) {
Optional<Short> finalized = finalizedFeatures.get(feature.name());
if (finalized.isPresent()) {
if (!VersionRange.of(feature.minSupportedVersion(), feature.maxSupportedVersion()).contains(finalized.get())) {
throw new UnsupportedVersionException("Unable to register because the broker " +
"does not support version " + finalized.get() + " of " + feature.name() +
". It wants a version between " + feature.minSupportedVersion() + " and " +
feature.maxSupportedVersion() + ", inclusive.");
}
} else {
int defaultVersion = feature.name().equals(MetadataVersion.FEATURE_NAME) ? 1 : 0; // The default value for MetadataVersion is 1 not 0.
short finalized = finalizedFeatures.versionOrDefault(feature.name(), (short) defaultVersion);
if (!VersionRange.of(feature.minSupportedVersion(), feature.maxSupportedVersion()).contains(finalized)) {
throw new UnsupportedVersionException("Unable to register because the broker " +
"does not support version " + finalized + " of " + feature.name() +
". It wants a version between " + feature.minSupportedVersion() + " and " +
feature.maxSupportedVersion() + ", inclusive.");
}
// A feature is not found in the finalizedFeature map if it is unknown to the controller or set to 0 (feature not enabled).
// Only log if the feature name is not known by the controller.
if (!Features.PRODUCTION_FEATURE_NAMES.contains(feature.name()))
log.warn("Broker {} registered with feature {} that is unknown to the controller",
brokerId, feature.name());
}
return new BrokerFeature().
setName(feature.name()).
setMinSupportedVersion(feature.minSupportedVersion()).

View File

@ -19,6 +19,7 @@ package org.apache.kafka.controller;
import org.apache.kafka.metadata.ControllerRegistration;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.ArrayList;
@ -61,6 +62,12 @@ public final class QuorumFeatures {
enableUnstable ?
MetadataVersion.latestTesting().featureLevel() :
MetadataVersion.latestProduction().featureLevel()));
for (Features feature : Features.PRODUCTION_FEATURES) {
features.put(feature.featureName(), VersionRange.of(
0,
feature.latestProduction()
));
}
return features;
}

View File

@ -40,6 +40,10 @@ public class FinalizedControllerFeatures {
return Optional.ofNullable(featureMap.get(name));
}
public short versionOrDefault(String name, short defaultValue) {
return featureMap.getOrDefault(name, defaultValue);
}
public Set<String> featureNames() {
return featureMap.keySet();
}

View File

@ -22,7 +22,7 @@ import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.loader.LoaderManifest;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.FinalizedFeatures;
import org.slf4j.Logger;
import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION;
@ -30,7 +30,7 @@ import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSI
public class FeaturesPublisher implements MetadataPublisher {
private final Logger log;
private volatile Features features = Features.fromKRaftVersion(MINIMUM_KRAFT_VERSION);
private volatile FinalizedFeatures finalizedFeatures = FinalizedFeatures.fromKRaftVersion(MINIMUM_KRAFT_VERSION);
public FeaturesPublisher(
LogContext logContext
@ -38,8 +38,8 @@ public class FeaturesPublisher implements MetadataPublisher {
log = logContext.logger(FeaturesPublisher.class);
}
public Features features() {
return features;
public FinalizedFeatures features() {
return finalizedFeatures;
}
@Override
@ -54,13 +54,13 @@ public class FeaturesPublisher implements MetadataPublisher {
LoaderManifest manifest
) {
if (delta.featuresDelta() != null) {
Features newFeatures = new Features(newImage.features().metadataVersion(),
FinalizedFeatures newFinalizedFeatures = new FinalizedFeatures(newImage.features().metadataVersion(),
newImage.features().finalizedVersions(),
newImage.provenance().lastContainedOffset(),
true);
if (!newFeatures.equals(features)) {
log.info("Loaded new metadata {}.", newFeatures);
features = newFeatures;
if (!newFinalizedFeatures.equals(finalizedFeatures)) {
log.info("Loaded new metadata {}.", newFinalizedFeatures);
finalizedFeatures = newFinalizedFeatures;
}
}
}

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.metadata.ControllerRegistration;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
@ -55,6 +56,12 @@ public class QuorumFeaturesTest {
expectedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
MetadataVersion.LATEST_PRODUCTION.featureLevel()));
for (Features feature : Features.PRODUCTION_FEATURES) {
expectedFeatures.put(feature.featureName(), VersionRange.of(
0,
feature.defaultValue(MetadataVersion.LATEST_PRODUCTION)
));
}
assertEquals(expectedFeatures, QuorumFeatures.defaultFeatureMap(false));
}
@ -64,6 +71,12 @@ public class QuorumFeaturesTest {
expectedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
MetadataVersion.latestTesting().featureLevel()));
for (Features feature : Features.PRODUCTION_FEATURES) {
expectedFeatures.put(feature.featureName(), VersionRange.of(
0,
feature.defaultValue(MetadataVersion.LATEST_PRODUCTION)
));
}
assertEquals(expectedFeatures, QuorumFeatures.defaultFeatureMap(true));
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.common;
import java.util.Map;
public interface FeatureVersion {
/**
* The level of the feature. 0 means the feature is fully disabled, so this value should be positive.
*/
short featureLevel();
/**
* The name of the feature.
*/
String featureName();
/**
* The MetadataVersion that corresponds to this feature. Setting this MV is not required to set the feature version,
* but this MV is marked production ready if and only if this feature version is production ready.
*
* When bootstrapping, we can find the latest production feature version by finding the highest bootstrapMetadataVersion that is less than or equal
* to the given MetadataVersion ({@link MetadataVersion#LATEST_PRODUCTION} by default). Setting a feature explicitly
* will skip this mapping and allow setting the feature independently as long as it is a supported version.
*
* If feature level X is created when MetadataVersion Y is the latest production version, create a new MV Y + 1. When the feature version becomes
* production ready, set MetadataVersion Y + 1 as production ready.
* (Ie, if the current production MV is 17 when a feature version is created, create MV 18 and mark it as production ready when the feature version is production ready.)
*
* NOTE: The feature can be used without setting this metadata version. If we want to mark a dependency, do so in {@link FeatureVersion#dependencies}
*/
MetadataVersion bootstrapMetadataVersion();
/**
* A mapping from feature to level for all features that this feature depends on. If this feature doesn't
* depend on any others, return an empty map.
* For example, say feature X level x relies on feature Y level y:
* feature (X level x).dependencies() will return (Y -> y)
*/
Map<String, Short> dependencies();
}

View File

@ -16,72 +16,135 @@
*/
package org.apache.kafka.server.common;
import java.util.Collections;
import java.util.HashMap;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
/**
* This is enum for the various features implemented for Kafka clusters.
* KIP-584: Versioning Scheme for Features introduced the idea of various features, but only added one feature -- MetadataVersion.
* KIP-1022: Formatting and Updating Features allowed for more features to be added. In order to set and update features,
* they need to be specified via the StorageTool or FeatureCommand tools.
* <br>
* Having a unified enum for the features that will use a shared type in the API used to set and update them
* makes it easier to process these features.
*/
public enum Features {
public final class Features {
private final MetadataVersion version;
private final Map<String, Short> finalizedFeatures;
private final long finalizedFeaturesEpoch;
/**
* Features defined. If a feature is included in this list, and marked to be used in production they will also be specified when
* formatting a cluster via the StorageTool. MetadataVersion is handled separately, so it is not included here.
*
* See {@link TestFeatureVersion} as an example. See {@link FeatureVersion} when implementing a new feature.
*/
TEST_VERSION("test.feature.version", TestFeatureVersion.values());
public static Features fromKRaftVersion(MetadataVersion version) {
return new Features(version, Collections.emptyMap(), -1, true);
public static final Features[] FEATURES;
public static final List<Features> PRODUCTION_FEATURES;
public static final List<String> PRODUCTION_FEATURE_NAMES;
private final String name;
private final FeatureVersion[] featureVersions;
Features(String name,
FeatureVersion[] featureVersions) {
this.name = name;
this.featureVersions = featureVersions;
}
public Features(
MetadataVersion version,
Map<String, Short> finalizedFeatures,
long finalizedFeaturesEpoch,
boolean kraftMode
) {
this.version = version;
this.finalizedFeatures = new HashMap<>(finalizedFeatures);
this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
// In KRaft mode, we always include the metadata version in the features map.
// In ZK mode, we never include it.
if (kraftMode) {
this.finalizedFeatures.put(FEATURE_NAME, version.featureLevel());
} else {
this.finalizedFeatures.remove(FEATURE_NAME);
static {
Features[] enumValues = Features.values();
FEATURES = Arrays.copyOf(enumValues, enumValues.length);
PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
!feature.name.equals(TEST_VERSION.featureName())).collect(Collectors.toList());
PRODUCTION_FEATURE_NAMES = PRODUCTION_FEATURES.stream().map(feature ->
feature.name).collect(Collectors.toList());
}
public String featureName() {
return name;
}
public FeatureVersion[] featureVersions() {
return featureVersions;
}
public short latestProduction() {
return defaultValue(MetadataVersion.LATEST_PRODUCTION);
}
/**
* Creates a FeatureVersion from a level.
*
* @param level the level of the feature
* @return the FeatureVersionUtils.FeatureVersion for the feature the enum is based on.
* @throws IllegalArgumentException if the feature is not known.
*/
public FeatureVersion fromFeatureLevel(short level) {
return Arrays.stream(featureVersions).filter(featureVersion ->
featureVersion.featureLevel() == level).findFirst().orElseThrow(
() -> new IllegalArgumentException("No feature:" + featureName() + " with feature level " + level));
}
/**
* A method to validate the feature can be set. If a given feature relies on another feature, the dependencies should be
* captured in {@link FeatureVersion#dependencies()}
* <p>
* For example, say feature X level x relies on feature Y level y:
* if feature X >= x then throw an error if feature Y < y.
*
* All feature levels above 0 require metadata.version=4 (IBP_3_3_IV0) in order to write the feature records to the cluster.
*
* @param feature the feature we are validating
* @param features the feature versions we have (or want to set)
* @throws IllegalArgumentException if the feature is not valid
*/
public static void validateVersion(FeatureVersion feature, Map<String, Short> features) {
Short metadataVersion = features.get(MetadataVersion.FEATURE_NAME);
if (feature.featureLevel() >= 1 && (metadataVersion == null || metadataVersion < MetadataVersion.IBP_3_3_IV0.featureLevel()))
throw new IllegalArgumentException(feature.featureName() + " could not be set to " + feature.featureLevel() +
" because it depends on metadata.version=4 (" + MetadataVersion.IBP_3_3_IV0 + ")");
for (Map.Entry<String, Short> dependency: feature.dependencies().entrySet()) {
Short featureLevel = features.get(dependency.getKey());
if (featureLevel == null || featureLevel < dependency.getValue()) {
throw new IllegalArgumentException(feature.featureName() + " could not be set to " + feature.featureLevel() +
" because it depends on " + dependency.getKey() + " level " + dependency.getValue());
}
}
}
public MetadataVersion metadataVersion() {
return version;
/**
* A method to return the default (latest production) level of a feature based on the metadata version provided.
*
* Every time a new feature is added, it should create a mapping from metadata version to feature version
* with {@link FeatureVersion#bootstrapMetadataVersion()}. When the feature version is production ready, the metadata
* version should be made production ready as well.
*
* @param metadataVersion the metadata version we want to use to set the default.
* @return the default version level given the feature and provided metadata version
*/
public short defaultValue(MetadataVersion metadataVersion) {
short level = 0;
for (Iterator<FeatureVersion> it = Arrays.stream(featureVersions).iterator(); it.hasNext(); ) {
FeatureVersion feature = it.next();
if (feature.bootstrapMetadataVersion().isLessThan(metadataVersion) || feature.bootstrapMetadataVersion().equals(metadataVersion))
level = feature.featureLevel();
else
return level;
}
return level;
}
public Map<String, Short> finalizedFeatures() {
return finalizedFeatures;
}
public long finalizedFeaturesEpoch() {
return finalizedFeaturesEpoch;
}
@Override
public boolean equals(Object o) {
if (o == null || !(o.getClass().equals(Features.class))) return false;
Features other = (Features) o;
return version == other.version &&
finalizedFeatures.equals(other.finalizedFeatures) &&
finalizedFeaturesEpoch == other.finalizedFeaturesEpoch;
}
@Override
public int hashCode() {
return Objects.hash(version, finalizedFeatures, finalizedFeaturesEpoch);
}
@Override
public String toString() {
return "Features" +
"(version=" + version +
", finalizedFeatures=" + finalizedFeatures +
", finalizedFeaturesEpoch=" + finalizedFeaturesEpoch +
")";
/**
* Utility method to map a list of FeatureVersion to a map of feature name to feature level
*/
public static Map<String, Short> featureImplsToMap(List<FeatureVersion> features) {
return features.stream().collect(Collectors.toMap(FeatureVersion::featureName, FeatureVersion::featureLevel));
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.common;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
public final class FinalizedFeatures {
private final MetadataVersion metadataVersion;
private final Map<String, Short> finalizedFeatures;
private final long finalizedFeaturesEpoch;
public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) {
return new FinalizedFeatures(version, Collections.emptyMap(), -1, true);
}
public FinalizedFeatures(
MetadataVersion metadataVersion,
Map<String, Short> finalizedFeatures,
long finalizedFeaturesEpoch,
boolean kraftMode
) {
this.metadataVersion = metadataVersion;
this.finalizedFeatures = new HashMap<>(finalizedFeatures);
this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
// In KRaft mode, we always include the metadata version in the features map.
// In ZK mode, we never include it.
if (kraftMode) {
this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME, metadataVersion.featureLevel());
} else {
this.finalizedFeatures.remove(MetadataVersion.FEATURE_NAME);
}
}
public MetadataVersion metadataVersion() {
return metadataVersion;
}
public Map<String, Short> finalizedFeatures() {
return finalizedFeatures;
}
public long finalizedFeaturesEpoch() {
return finalizedFeaturesEpoch;
}
@Override
public boolean equals(Object o) {
if (o == null || !(o.getClass().equals(FinalizedFeatures.class))) return false;
FinalizedFeatures other = (FinalizedFeatures) o;
return metadataVersion == other.metadataVersion &&
finalizedFeatures.equals(other.finalizedFeatures) &&
finalizedFeaturesEpoch == other.finalizedFeaturesEpoch;
}
@Override
public int hashCode() {
return Objects.hash(metadataVersion, finalizedFeatures, finalizedFeaturesEpoch);
}
@Override
public String toString() {
return "Features" +
"(metadataVersion=" + metadataVersion +
", finalizedFeatures=" + finalizedFeatures +
", finalizedFeaturesEpoch=" + finalizedFeaturesEpoch +
")";
}
}

View File

@ -258,6 +258,10 @@ public enum MetadataVersion {
this.didMetadataChange = didMetadataChange;
}
public String featureName() {
return FEATURE_NAME;
}
public short featureLevel() {
return featureLevel;
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.common;
import java.util.Collections;
import java.util.Map;
public enum TestFeatureVersion implements FeatureVersion {
// TEST_1 released right before MV 3.7-IVO was released, and it has no dependencies
TEST_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()),
// TEST_2 released right before MV 3.8-IVO was released, and it depends on this metadata version
TEST_2(2, MetadataVersion.IBP_3_8_IV0, Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_8_IV0.featureLevel()));
private final short featureLevel;
private final MetadataVersion metadataVersionMapping;
private final Map<String, Short> dependencies;
public static final String FEATURE_NAME = "test.feature.version";
TestFeatureVersion(int featureLevel, MetadataVersion metadataVersionMapping, Map<String, Short> dependencies) {
this.featureLevel = (short) featureLevel;
this.metadataVersionMapping = metadataVersionMapping;
this.dependencies = dependencies;
}
public short featureLevel() {
return featureLevel;
}
public String featureName() {
return FEATURE_NAME;
}
public MetadataVersion bootstrapMetadataVersion() {
return metadataVersionMapping;
}
public Map<String, Short> dependencies() {
return dependencies;
}
}

View File

@ -14,37 +14,101 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.common;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
class FeaturesTest {
@Test
public void testKRaftModeFeatures() {
Features features = new Features(MINIMUM_KRAFT_VERSION,
Collections.singletonMap("foo", (short) 2), 123, true);
assertEquals(MINIMUM_KRAFT_VERSION.featureLevel(),
features.finalizedFeatures().get(FEATURE_NAME));
assertEquals((short) 2,
features.finalizedFeatures().get("foo"));
assertEquals(2, features.finalizedFeatures().size());
public class FeaturesTest {
@ParameterizedTest
@EnumSource(Features.class)
public void testFromFeatureLevelAllFeatures(Features feature) {
FeatureVersion[] featureImplementations = feature.featureVersions();
int numFeatures = featureImplementations.length;
for (short i = 1; i < numFeatures; i++) {
assertEquals(featureImplementations[i - 1], feature.fromFeatureLevel(i));
}
}
@ParameterizedTest
@EnumSource(Features.class)
public void testValidateVersionAllFeatures(Features feature) {
for (FeatureVersion featureImpl : feature.featureVersions()) {
// Ensure the minimum bootstrap metadata version is included if no metadata version dependency.
Map<String, Short> deps = new HashMap<>();
deps.putAll(featureImpl.dependencies());
if (!deps.containsKey(MetadataVersion.FEATURE_NAME)) {
deps.put(MetadataVersion.FEATURE_NAME, MetadataVersion.MINIMUM_BOOTSTRAP_VERSION.featureLevel());
}
// Ensure that the feature is valid given the typical metadataVersionMapping and the dependencies.
// Note: Other metadata versions are valid, but this one should always be valid.
Features.validateVersion(featureImpl, deps);
}
}
@Test
public void testZkModeFeatures() {
Features features = new Features(MINIMUM_KRAFT_VERSION,
Collections.singletonMap("foo", (short) 2), 123, false);
assertNull(features.finalizedFeatures().get(FEATURE_NAME));
assertEquals((short) 2,
features.finalizedFeatures().get("foo"));
assertEquals(1, features.finalizedFeatures().size());
public void testInvalidValidateVersion() {
// No MetadataVersion is invalid
assertThrows(IllegalArgumentException.class,
() -> Features.validateVersion(
TestFeatureVersion.TEST_1,
Collections.emptyMap()
)
);
// Using too low of a MetadataVersion is invalid
assertThrows(IllegalArgumentException.class,
() -> Features.validateVersion(
TestFeatureVersion.TEST_1,
Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_2_8_IV0.featureLevel())
)
);
// Using a version that is lower than the dependency will fail.
assertThrows(IllegalArgumentException.class,
() -> Features.validateVersion(
TestFeatureVersion.TEST_2,
Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_7_IV0.featureLevel())
)
);
}
@ParameterizedTest
@EnumSource(Features.class)
public void testDefaultValueAllFeatures(Features feature) {
for (FeatureVersion featureImpl : feature.featureVersions()) {
assertEquals(feature.defaultValue(featureImpl.bootstrapMetadataVersion()), featureImpl.featureLevel(),
"Failed to get the correct default for " + featureImpl);
}
}
@ParameterizedTest
@EnumSource(Features.class)
public void testLatestProductionMapsToLatestMetadataVersion(Features features) {
assertEquals(features.latestProduction(), features.defaultValue(MetadataVersion.LATEST_PRODUCTION));
}
@ParameterizedTest
@EnumSource(MetadataVersion.class)
public void testDefaultTestVersion(MetadataVersion metadataVersion) {
short expectedVersion;
if (!metadataVersion.isLessThan(MetadataVersion.IBP_3_8_IV0)) {
expectedVersion = 2;
} else if (!metadataVersion.isLessThan(MetadataVersion.IBP_3_7_IV0)) {
expectedVersion = 1;
} else {
expectedVersion = 0;
}
assertEquals(expectedVersion, Features.TEST_VERSION.defaultValue(metadataVersion));
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.common;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
class FinalizedFeaturesTest {
@Test
public void testKRaftModeFeatures() {
FinalizedFeatures finalizedFeatures = new FinalizedFeatures(MINIMUM_KRAFT_VERSION,
Collections.singletonMap("foo", (short) 2), 123, true);
assertEquals(MINIMUM_KRAFT_VERSION.featureLevel(),
finalizedFeatures.finalizedFeatures().get(FEATURE_NAME));
assertEquals((short) 2,
finalizedFeatures.finalizedFeatures().get("foo"));
assertEquals(2, finalizedFeatures.finalizedFeatures().size());
}
@Test
public void testZkModeFeatures() {
FinalizedFeatures finalizedFeatures = new FinalizedFeatures(MINIMUM_KRAFT_VERSION,
Collections.singletonMap("foo", (short) 2), 123, false);
assertNull(finalizedFeatures.finalizedFeatures().get(FEATURE_NAME));
assertEquals((short) 2,
finalizedFeatures.finalizedFeatures().get("foo"));
assertEquals(1, finalizedFeatures.finalizedFeatures().size());
}
}