KAFKA-14187: kafka-features.sh: add support for --metadata (#12571)

This PR adds support to kafka-features.sh for the --metadata flag, as specified in KIP-778.  This
flag makes it possible to upgrade to a new metadata version without consulting a table mapping
version names to short integers. Change --feature to use a key=value format.

FeatureCommandTest.scala: make most tests here true unit tests (that don't start brokers) in order
to improve test run time, and allow us to test more cases. For the integration test part, test both
KRaft and ZK-based clusters. Add support for mocking feature operations in MockAdminClient.java.

upgrade.html: add a section describing how the metadata.version should be upgraded in KRaft
clusters.

Add kraft_upgrade_test.py to test upgrades between KRaft versions.

Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>, José Armando García Sancio <jsancio@gmail.com>
This commit is contained in:
Colin Patrick McCabe 2022-08-30 16:56:03 -07:00 committed by GitHub
parent 140faf9f2b
commit 28d5a05943
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 736 additions and 281 deletions

View File

@ -36,10 +36,10 @@ public class FinalizedVersionRange {
* @throws IllegalArgumentException Raised when the condition described above is not met.
*/
FinalizedVersionRange(final short minVersionLevel, final short maxVersionLevel) {
if (minVersionLevel < 1 || maxVersionLevel < 1 || maxVersionLevel < minVersionLevel) {
if (minVersionLevel < 0 || maxVersionLevel < 0 || maxVersionLevel < minVersionLevel) {
throw new IllegalArgumentException(
String.format(
"Expected minVersionLevel >= 1, maxVersionLevel >= 1 and" +
"Expected minVersionLevel >= 0, maxVersionLevel >= 0 and" +
" maxVersionLevel >= minVersionLevel, but received" +
" minVersionLevel: %d, maxVersionLevel: %d", minVersionLevel, maxVersionLevel));
}

View File

@ -36,10 +36,10 @@ public class SupportedVersionRange {
* @throws IllegalArgumentException Raised when the condition described above is not met.
*/
SupportedVersionRange(final short minVersion, final short maxVersion) {
if (minVersion < 1 || maxVersion < 1 || maxVersion < minVersion) {
if (minVersion < 0 || maxVersion < 0 || maxVersion < minVersion) {
throw new IllegalArgumentException(
String.format(
"Expected 1 <= minVersion <= maxVersion but received minVersion:%d, maxVersion:%d.",
"Expected 0 <= minVersion <= maxVersion but received minVersion:%d, maxVersion:%d.",
minVersion,
maxVersion));
}

View File

@ -38,6 +38,7 @@ import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidUpdateVersionException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
@ -77,6 +78,9 @@ public class MockAdminClient extends AdminClient {
private final Map<TopicPartition, Long> endOffsets;
private final Map<TopicPartition, Long> committedOffsets;
private final boolean usingRaftController;
private final Map<String, Short> featureLevels;
private final Map<String, Short> minSupportedFeatureLevels;
private final Map<String, Short> maxSupportedFeatureLevels;
private final String clusterId;
private final List<List<String>> brokerLogDirs;
private final List<Map<String, String>> brokerConfigs;
@ -102,6 +106,9 @@ public class MockAdminClient extends AdminClient {
private Short defaultPartitions;
private boolean usingRaftController = false;
private Integer defaultReplicationFactor;
private Map<String, Short> featureLevels = Collections.emptyMap();
private Map<String, Short> minSupportedFeatureLevels = Collections.emptyMap();
private Map<String, Short> maxSupportedFeatureLevels = Collections.emptyMap();
public Builder() {
numBrokers(1);
@ -156,6 +163,21 @@ public class MockAdminClient extends AdminClient {
return this;
}
public Builder featureLevels(Map<String, Short> featureLevels) {
this.featureLevels = featureLevels;
return this;
}
public Builder minSupportedFeatureLevels(Map<String, Short> minSupportedFeatureLevels) {
this.minSupportedFeatureLevels = minSupportedFeatureLevels;
return this;
}
public Builder maxSupportedFeatureLevels(Map<String, Short> maxSupportedFeatureLevels) {
this.maxSupportedFeatureLevels = maxSupportedFeatureLevels;
return this;
}
public MockAdminClient build() {
return new MockAdminClient(brokers,
controller == null ? brokers.get(0) : controller,
@ -163,7 +185,10 @@ public class MockAdminClient extends AdminClient {
defaultPartitions != null ? defaultPartitions.shortValue() : 1,
defaultReplicationFactor != null ? defaultReplicationFactor.shortValue() : Math.min(brokers.size(), 3),
brokerLogDirs,
usingRaftController);
usingRaftController,
featureLevels,
minSupportedFeatureLevels,
maxSupportedFeatureLevels);
}
}
@ -172,17 +197,30 @@ public class MockAdminClient extends AdminClient {
}
public MockAdminClient(List<Node> brokers, Node controller) {
this(brokers, controller, DEFAULT_CLUSTER_ID, 1, brokers.size(),
Collections.nCopies(brokers.size(), DEFAULT_LOG_DIRS), false);
this(brokers,
controller,
DEFAULT_CLUSTER_ID,
1,
brokers.size(),
Collections.nCopies(brokers.size(), DEFAULT_LOG_DIRS),
false,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap());
}
private MockAdminClient(List<Node> brokers,
Node controller,
String clusterId,
int defaultPartitions,
int defaultReplicationFactor,
List<List<String>> brokerLogDirs,
boolean usingRaftController) {
private MockAdminClient(
List<Node> brokers,
Node controller,
String clusterId,
int defaultPartitions,
int defaultReplicationFactor,
List<List<String>> brokerLogDirs,
boolean usingRaftController,
Map<String, Short> featureLevels,
Map<String, Short> minSupportedFeatureLevels,
Map<String, Short> maxSupportedFeatureLevels
) {
this.brokers = brokers;
controller(controller);
this.clusterId = clusterId;
@ -199,6 +237,9 @@ public class MockAdminClient extends AdminClient {
this.endOffsets = new HashMap<>();
this.committedOffsets = new HashMap<>();
this.usingRaftController = usingRaftController;
this.featureLevels = new HashMap<>(featureLevels);
this.minSupportedFeatureLevels = new HashMap<>(minSupportedFeatureLevels);
this.maxSupportedFeatureLevels = new HashMap<>(maxSupportedFeatureLevels);
}
synchronized public void controller(Node controller) {
@ -995,12 +1036,79 @@ public class MockAdminClient extends AdminClient {
@Override
public DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
Map<String, FinalizedVersionRange> finalizedFeatures = new HashMap<>();
Map<String, SupportedVersionRange> supportedFeatures = new HashMap<>();
for (Map.Entry<String, Short> entry : featureLevels.entrySet()) {
finalizedFeatures.put(entry.getKey(), new FinalizedVersionRange(
entry.getValue(), entry.getValue()));
supportedFeatures.put(entry.getKey(), new SupportedVersionRange(
minSupportedFeatureLevels.get(entry.getKey()),
maxSupportedFeatureLevels.get(entry.getKey())));
}
return new DescribeFeaturesResult(KafkaFuture.completedFuture(
new FeatureMetadata(finalizedFeatures,
Optional.of(123L),
supportedFeatures)));
}
@Override
public UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
public UpdateFeaturesResult updateFeatures(
Map<String, FeatureUpdate> featureUpdates,
UpdateFeaturesOptions options
) {
Map<String, KafkaFuture<Void>> results = new HashMap<>();
for (Map.Entry<String, FeatureUpdate> entry : featureUpdates.entrySet()) {
KafkaFutureImpl<Void> future = new KafkaFutureImpl<Void>();
String feature = entry.getKey();
try {
short cur = featureLevels.getOrDefault(feature, (short) 0);
short next = entry.getValue().maxVersionLevel();
short min = minSupportedFeatureLevels.getOrDefault(feature, (short) 0);
short max = maxSupportedFeatureLevels.getOrDefault(feature, (short) 0);
switch (entry.getValue().upgradeType()) {
case UNKNOWN:
throw new InvalidRequestException("Invalid upgrade type.");
case UPGRADE:
if (cur > next) {
throw new InvalidUpdateVersionException("Can't upgrade to lower version.");
}
break;
case SAFE_DOWNGRADE:
if (cur < next) {
throw new InvalidUpdateVersionException("Can't downgrade to newer version.");
}
break;
case UNSAFE_DOWNGRADE:
if (cur < next) {
throw new InvalidUpdateVersionException("Can't downgrade to newer version.");
}
while (next != cur) {
// Simulate a scenario where all the even feature levels unsafe to downgrade from.
if (cur % 2 == 0) {
if (entry.getValue().upgradeType() == FeatureUpdate.UpgradeType.SAFE_DOWNGRADE) {
throw new InvalidUpdateVersionException("Unable to perform a safe downgrade.");
}
}
cur--;
}
break;
}
if (next < min) {
throw new InvalidUpdateVersionException("Can't downgrade below " + min);
}
if (next > max) {
throw new InvalidUpdateVersionException("Can't upgrade above " + max);
}
if (!options.validateOnly()) {
featureLevels.put(feature, next);
}
future.complete(null);
} catch (Exception e) {
future.completeExceptionally(e);
}
results.put(feature, future);
}
return new UpdateFeaturesResult(results);
}
@Override

View File

@ -20,28 +20,33 @@ package kafka.admin
import kafka.tools.TerseFailure
import kafka.utils.Exit
import net.sourceforge.argparse4j.ArgumentParsers
import net.sourceforge.argparse4j.impl.Arguments.{append, fileType, storeTrue}
import net.sourceforge.argparse4j.inf.{Namespace, Subparsers}
import net.sourceforge.argparse4j.impl.Arguments.{append, fileType, store, storeTrue}
import net.sourceforge.argparse4j.inf.{ArgumentParserException, Namespace, Subparsers}
import net.sourceforge.argparse4j.internal.HelpScreenException
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType
import org.apache.kafka.clients.admin.{Admin, FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
import org.apache.kafka.clients.admin.{Admin, FeatureUpdate, UpdateFeaturesOptions}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.MetadataVersion
import java.io.File
import java.io.{File, PrintStream}
import java.util.Properties
import scala.collection.Seq
import scala.concurrent.ExecutionException
import scala.jdk.CollectionConverters._
import scala.compat.java8.OptionConverters._
object FeatureCommand {
def main(args: Array[String]): Unit = {
val res = mainNoExit(args)
val res = mainNoExit(args, System.out)
Exit.exit(res)
}
// This is used for integration tests in order to avoid killing the test with Exit.exit
def mainNoExit(args: Array[String]): Int = {
// This is used for integration tests in order to avoid killing the test with Exit.exit,
// and in order to capture the command output.
def mainNoExit(
args: Array[String],
out: PrintStream
): Int = {
val parser = ArgumentParsers.newArgumentParser("kafka-features")
.defaultHelp(true)
.description("This tool manages feature flags in Kafka.")
@ -59,7 +64,7 @@ object FeatureCommand {
addDisableParser(subparsers)
try {
val namespace = parser.parseArgsOrFail(args)
val namespace = parser.parseArgs(args)
val command = namespace.getString("command")
val commandConfig = namespace.get[File]("command_config")
@ -75,14 +80,19 @@ object FeatureCommand {
val admin = Admin.create(props)
command match {
case "describe" => handleDescribe(namespace, admin)
case "upgrade" => handleUpgrade(namespace, admin)
case "downgrade" => handleDowngrade(namespace, admin)
case "disable" => handleDisable(namespace, admin)
case "describe" => handleDescribe(out, admin)
case "upgrade" => handleUpgrade(out, namespace, admin)
case "downgrade" => handleDowngrade(out, namespace, admin)
case "disable" => handleDisable(out, namespace, admin)
}
admin.close()
0
} catch {
case _: HelpScreenException =>
0
case e: ArgumentParserException =>
System.err.println(e.getMessage)
1
case e: TerseFailure =>
System.err.println(e.getMessage)
1
@ -90,175 +100,186 @@ object FeatureCommand {
}
def addDescribeParser(subparsers: Subparsers): Unit = {
val describeParser = subparsers.addParser("describe")
.help("Describe one or more feature flags.")
val featureArgs = describeParser.addArgumentGroup("Specific Features")
featureArgs.addArgument("--feature")
.action(append())
.help("A specific feature to describe. This option may be repeated for describing multiple feature flags.")
val releaseArgs = describeParser.addArgumentGroup("All Features for release")
releaseArgs.addArgument("--release")
subparsers.addParser("describe")
.help("Describes the current active feature flags.")
}
def addUpgradeParser(subparsers: Subparsers): Unit = {
val upgradeParser = subparsers.addParser("upgrade")
.help("Upgrade one or more feature flags.")
val featureArgs = upgradeParser.addArgumentGroup("Upgrade specific features")
featureArgs.addArgument("--feature")
upgradeParser.addArgument("--metadata")
.help("The level to which we should upgrade the metadata. For example, 3.3-IV3.")
.action(store())
upgradeParser.addArgument("--feature")
.help("A feature upgrade we should perform, in key=value format. For example metadata.version=3.3-IV3.")
.action(append())
.help("A feature flag to upgrade. This option may be repeated for upgrading multiple feature flags.")
featureArgs.addArgument("--version")
.`type`(classOf[Short])
.help("The version to upgrade to.")
.action(append())
val releaseArgs = upgradeParser.addArgumentGroup("Upgrade to feature level defined for a given release")
releaseArgs.addArgument("--release")
upgradeParser.addArgument("--dry-run")
.help("Perform a dry-run of this upgrade operation.")
.help("Validate this upgrade, but do not perform it.")
.action(storeTrue())
}
def addDowngradeParser(subparsers: Subparsers): Unit = {
val downgradeParser = subparsers.addParser("downgrade")
.help("Upgrade one or more feature flags.")
.help("Downgrade one or more feature flags.")
downgradeParser.addArgument("--metadata")
.help("The level to which we should downgrade the metadata. For example, 3.3-IV0.")
.action(store())
downgradeParser.addArgument("--feature")
.help("A feature flag to downgrade. This option may be repeated for downgrade multiple feature flags.")
.required(true)
.action(append())
downgradeParser.addArgument("--version")
.`type`(classOf[Short])
.help("The version to downgrade to.")
.required(true)
.help("A feature downgrade we should perform, in key=value format. " +
"For example metadata.version=3.3-IV0.")
.action(append())
downgradeParser.addArgument("--unsafe")
.help("Perform this downgrade even if it considered unsafe. Refer to specific feature flag documentation for details.")
.help("Perform this downgrade even if it may irreversibly destroy metadata.")
.action(storeTrue())
downgradeParser.addArgument("--dry-run")
.help("Perform a dry-run of this downgrade operation.")
.help("Validate this downgrade, but do not perform it.")
.action(storeTrue())
}
def addDisableParser(subparsers: Subparsers): Unit = {
val disableParser = subparsers.addParser("disable")
.help("Disable one or more feature flags. This is the same as downgrading the version to zero.")
disableParser.addArgument("--feature")
.help("A feature flag to disable. This option may be repeated for disable multiple feature flags.")
.required(true)
.help("A feature flag to disable.")
.action(append())
disableParser.addArgument("--unsafe")
.help("Disable the feature flag(s) even if it considered unsafe. Refer to specific feature flag documentation for details.")
.help("Disable this feature flag even if it may irreversibly destroy metadata.")
.action(storeTrue())
disableParser.addArgument("--dry-run")
.help("Perform a dry-run of this disable operation.")
.action(storeTrue())
}
def handleDescribe(namespace: Namespace, admin: Admin): Unit = {
val featureFilter = parseFeaturesOrRelease(namespace) match {
case Neither() => (_: String) => true
case Features(featureNames) => (feature: String) => featureNames.contains(feature)
case Release(release) =>
// Special case, print the versions associated with the given release
printReleaseFeatures(release)
return
case Both() => throw new TerseFailure("Only one of --release or --feature may be specified with describe sub-command.")
}
val featureMetadata = admin.describeFeatures().featureMetadata().get()
val featureEpoch = featureMetadata.finalizedFeaturesEpoch()
val epochString = if (featureEpoch.isPresent) {
s"Epoch: ${featureEpoch.get}"
def levelToString(
feature: String,
level: Short
): String = {
if (feature.equals(MetadataVersion.FEATURE_NAME)) {
try {
MetadataVersion.fromFeatureLevel(level).version()
} catch {
case e: Throwable => s"UNKNOWN [${level}]"
}
} else {
"Epoch: -"
level.toString
}
val finalized = featureMetadata.finalizedFeatures().asScala
featureMetadata.supportedFeatures().asScala.foreach {
case (feature, range) =>
if (featureFilter.apply(feature)) {
if (finalized.contains(feature)) {
println(s"Feature: $feature\tSupportedMinVersion: ${range.minVersion()}\t" +
s"SupportedMaxVersion: ${range.maxVersion()}\tFinalizedVersionLevel: ${finalized(feature).maxVersionLevel()}\t$epochString")
} else {
println(s"Feature: $feature\tSupportedMinVersion: ${range.minVersion()}\t" +
s"SupportedMaxVersion: ${range.maxVersion()}\tFinalizedVersionLevel: -\t$epochString")
}
}
def handleDescribe(
out: PrintStream,
admin: Admin
): Unit = {
val featureMetadata = admin.describeFeatures().featureMetadata().get()
val featureList = new java.util.TreeSet[String](featureMetadata.supportedFeatures().keySet())
featureList.forEach {
case feature =>
val finalizedLevel = featureMetadata.finalizedFeatures().asScala.get(feature) match {
case None => 0.toShort
case Some(v) => v.maxVersionLevel()
}
val range = featureMetadata.supportedFeatures().get(feature)
out.printf("Feature: %s\tSupportedMinVersion: %s\tSupportedMaxVersion: %s\tFinalizedVersionLevel: %s\tEpoch: %s%n",
feature,
levelToString(feature, range.minVersion()),
levelToString(feature, range.maxVersion()),
levelToString(feature, finalizedLevel),
featureMetadata.finalizedFeaturesEpoch().asScala.flatMap(e => Some(e.toString)).getOrElse("-"))
}
}
def printReleaseFeatures(release: String): Unit = {
println(s"Default feature versions for release $release:")
def metadataVersionsToString(first: MetadataVersion, last: MetadataVersion): String = {
MetadataVersion.VERSIONS.toList.asJava.
subList(first.ordinal(), last.ordinal() + 1).
asScala.mkString(", ")
}
def handleUpgrade(namespace: Namespace, admin: Admin): Unit = {
val featuresToUpgrade = parseFeaturesOrRelease(namespace) match {
case Features(featureNames) => parseVersions(featureNames, namespace)
case Release(release) => featuresForRelease(release)
case Neither() => throw new TerseFailure("Must specify either --release or at least one --feature and --version with upgrade sub-command.")
case Both() => throw new TerseFailure("Cannot specify both --release and --feature with upgrade sub-command.")
}
val dryRun = namespace.getBoolean("dry_run")
val updateResult = admin.updateFeatures(featuresToUpgrade.map { case (feature, version) =>
feature -> new FeatureUpdate(version, UpgradeType.UPGRADE)
}.asJava, new UpdateFeaturesOptions().validateOnly(dryRun))
handleUpdateFeaturesResponse(updateResult, featuresToUpgrade, dryRun, "upgrade")
def handleUpgrade(out: PrintStream, namespace: Namespace, admin: Admin): Unit = {
handleUpgradeOrDowngrade("upgrade", out, namespace, admin, UpgradeType.UPGRADE)
}
def handleDowngrade(namespace: Namespace, admin: Admin): Unit = {
val featuresToDowngrade = parseFeaturesOrRelease(namespace) match {
case Features(featureNames) => parseVersions(featureNames, namespace)
case Neither() => throw new TerseFailure("Must specify at least one --feature and --version with downgrade sub-command.")
case _ => throw new IllegalStateException()
}
val dryRun = namespace.getBoolean("dry_run")
def downgradeType(namespace: Namespace): UpgradeType = {
val unsafe = namespace.getBoolean("unsafe")
val updateResult = admin.updateFeatures(featuresToDowngrade.map { case (feature, version) =>
if (unsafe) {
feature -> new FeatureUpdate(version, UpgradeType.UNSAFE_DOWNGRADE)
} else {
feature -> new FeatureUpdate(version, UpgradeType.SAFE_DOWNGRADE)
}
}.asJava, new UpdateFeaturesOptions().validateOnly(dryRun))
handleUpdateFeaturesResponse(updateResult, featuresToDowngrade, dryRun, "downgrade")
}
def handleDisable(namespace: Namespace, admin: Admin): Unit = {
val featuresToDisable = parseFeaturesOrRelease(namespace) match {
case Features(featureNames) => featureNames
case Neither() => throw new TerseFailure("Must specify at least one --feature and --version with downgrade sub-command.")
case _ => throw new IllegalStateException()
if (unsafe == null || !unsafe) {
UpgradeType.SAFE_DOWNGRADE
} else {
UpgradeType.UNSAFE_DOWNGRADE
}
val dryRun = namespace.getBoolean("dry_run")
val unsafe = namespace.getBoolean("unsafe")
val updateResult = admin.updateFeatures(featuresToDisable.map { feature =>
if (unsafe) {
feature -> new FeatureUpdate(0.toShort, UpgradeType.UNSAFE_DOWNGRADE)
} else {
feature -> new FeatureUpdate(0.toShort, UpgradeType.SAFE_DOWNGRADE)
}
}.toMap.asJava, new UpdateFeaturesOptions().validateOnly(dryRun))
handleUpdateFeaturesResponse(updateResult, featuresToDisable.map {
feature => feature -> 0.toShort
}.toMap, dryRun, "disable")
}
def handleUpdateFeaturesResponse(updateResult: UpdateFeaturesResult,
updatedFeatures: Map[String, Short],
dryRun: Boolean,
op: String): Unit = {
val errors = updateResult.values().asScala.map { case (feature, future) =>
def handleDowngrade(out: PrintStream, namespace: Namespace, admin: Admin): Unit = {
handleUpgradeOrDowngrade("downgrade", out, namespace, admin, downgradeType(namespace))
}
def parseNameAndLevel(input: String): (String, Short) = {
val equalsIndex = input.indexOf("=")
if (equalsIndex < 0) {
throw new TerseFailure(s"Can't parse feature=level string ${input}: equals sign not found.")
}
val name = input.substring(0, equalsIndex).trim
val levelString = input.substring(equalsIndex + 1).trim
val level = try {
levelString.toShort
} catch {
case e: Throwable => throw new TerseFailure(s"Can't parse feature=level string ${input}: " +
s"unable to parse ${levelString} as a short.")
}
(name, level)
}
def handleUpgradeOrDowngrade(
op: String,
out: PrintStream,
namespace: Namespace,
admin: Admin,
upgradeType: UpgradeType
): Unit = {
val updates = new java.util.HashMap[String, FeatureUpdate]()
Option(namespace.getString("metadata")).foreach(metadata => {
val version = try {
MetadataVersion.fromVersionString(metadata)
} catch {
case e: Throwable => throw new TerseFailure("Unsupported metadata version " + metadata +
". Supported metadata versions are " + metadataVersionsToString(
MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, MetadataVersion.latest()))
}
updates.put(MetadataVersion.FEATURE_NAME, new FeatureUpdate(version.featureLevel(), upgradeType))
})
Option(namespace.getList[String]("feature")).foreach(features => {
features.forEach(feature => {
val (name, level) = parseNameAndLevel(feature)
if (updates.put(name, new FeatureUpdate(level, upgradeType)) != null) {
throw new TerseFailure(s"Feature ${name} was specified more than once.")
}
})
})
update(op, out, admin, updates, namespace.getBoolean("dry-run"))
}
def handleDisable(out: PrintStream, namespace: Namespace, admin: Admin): Unit = {
val upgradeType = downgradeType(namespace)
val updates = new java.util.HashMap[String, FeatureUpdate]()
Option(namespace.getList[String]("feature")).foreach(features => {
features.forEach(name =>
if (updates.put(name, new FeatureUpdate(0.toShort, upgradeType)) != null) {
throw new TerseFailure(s"Feature ${name} was specified more than once.")
})
}
)
update("disable", out, admin, updates, namespace.getBoolean("dry-run"))
}
def update(
op: String,
out: PrintStream,
admin: Admin,
updates: java.util.HashMap[String, FeatureUpdate],
dryRun: Boolean
): Unit = {
if (updates.isEmpty) {
throw new TerseFailure(s"You must specify at least one feature to ${op}")
}
val result = admin.updateFeatures(updates, new UpdateFeaturesOptions().validateOnly(dryRun))
val errors = result.values().asScala.map { case (feature, future) =>
try {
future.get()
feature -> None
@ -267,67 +288,34 @@ object FeatureCommand {
case t: Throwable => feature -> Some(t)
}
}
errors.foreach { case (feature, maybeThrowable) =>
errors.keySet.toList.sorted.foreach { feature =>
val maybeThrowable = errors(feature)
val level = updates.get(feature).maxVersionLevel()
if (maybeThrowable.isDefined) {
if (dryRun) {
System.out.println(s"Can not $op feature '$feature' to ${updatedFeatures(feature)}. ${maybeThrowable.get.getMessage}")
val helper = if (dryRun) {
"Can not"
} else {
System.out.println(s"Could not $op feature '$feature' to ${updatedFeatures(feature)}. ${maybeThrowable.get.getMessage}")
"Could not"
}
val suffix = if (op.equals("disable")) {
s"disable ${feature}"
} else {
s"${op} ${feature} to ${level}"
}
out.println(s"${helper} ${suffix}. ${maybeThrowable.get.getMessage}")
} else {
if (dryRun) {
System.out.println(s"Feature '$feature' can be ${op}d to ${updatedFeatures(feature)}.")
val verb = if (dryRun) {
"can be"
} else {
System.out.println(s"Feature '$feature' was ${op}d to ${updatedFeatures(feature)}.")
"was"
}
val obj = if (op.equals("disable")) {
"disabled."
} else {
s"${op}d to ${level}."
}
out.println(s"${feature} ${verb} ${obj}")
}
}
}
sealed trait ReleaseOrFeatures { }
case class Neither() extends ReleaseOrFeatures
case class Release(release: String) extends ReleaseOrFeatures
case class Features(featureNames: Seq[String]) extends ReleaseOrFeatures
case class Both() extends ReleaseOrFeatures
def parseFeaturesOrRelease(namespace: Namespace): ReleaseOrFeatures = {
val release = namespace.getString("release")
val features = namespace.getList[String]("feature").asScala
if (release != null && features != null) {
Both()
} else if (release == null && features == null) {
Neither()
} else if (release != null) {
Release(release)
} else {
Features(features)
}
}
def parseVersions(features: Seq[String], namespace: Namespace): Map[String, Short] = {
val versions = namespace.getList[Short]("version").asScala
if (versions == null) {
throw new TerseFailure("Must specify --version when using --feature argument(s).")
}
if (versions.size != features.size) {
if (versions.size > features.size) {
throw new TerseFailure("Too many --version arguments given. For each --feature argument there should be one --version argument.")
} else {
throw new TerseFailure("Too many --feature arguments given. For each --feature argument there should be one --version argument.")
}
}
features.zip(versions).map { case (feature, version) =>
feature -> version
}.toMap
}
def defaultFeatures(): Map[String, Short] = {
Map.empty
}
def featuresForRelease(release: String): Map[String, Short] = {
Map.empty
}
}

View File

@ -17,94 +17,296 @@
package kafka.admin
import kafka.server.{BaseRequestTest, KafkaConfig, KafkaServer}
import kafka.utils.TestUtils
import kafka.utils.TestUtils.waitUntilTrue
import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import kafka.api.IntegrationTestHarness
import kafka.server.KafkaConfig
import kafka.tools.TerseFailure
import kafka.utils.{TestInfoUtils, TestUtils}
import net.sourceforge.argparse4j.inf.Namespace
import org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType.{SAFE_DOWNGRADE, UNSAFE_DOWNGRADE}
import org.apache.kafka.clients.admin.MockAdminClient
import org.apache.kafka.common.utils.Utils
import java.util.Properties
import org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV0
import org.junit.jupiter.api.Assertions.assertTrue
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_3_3_IV0, IBP_3_3_IV1, IBP_3_3_IV2, IBP_3_3_IV3}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
class FeatureCommandTest extends BaseRequestTest {
override def brokerCount: Int = 3
import java.io.{ByteArrayOutputStream, PrintStream}
import java.{lang, util}
import java.util.Collections.{emptyMap, singletonMap}
import scala.jdk.CollectionConverters._
override def brokerPropertyOverrides(props: Properties): Unit = {
props.put(KafkaConfig.InterBrokerProtocolVersionProp, IBP_2_7_IV0.toString)
case class FeatureCommandTestEnv(admin: MockAdminClient = null) extends AutoCloseable {
val stream = new ByteArrayOutputStream()
val out = new PrintStream(stream)
override def close(): Unit = {
Utils.closeAll(stream, out)
Utils.closeQuietly(admin, "admin")
}
private val defaultSupportedFeatures: Features[SupportedVersionRange] =
Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)),
Utils.mkEntry("feature_2", new SupportedVersionRange(1, 5))))
private def updateSupportedFeatures(features: Features[SupportedVersionRange],
targetServers: Set[KafkaServer]): Unit = {
targetServers.foreach(s => {
s.brokerFeatures.setSupportedFeatures(features)
s.zkClient.updateBrokerInfo(s.createBrokerInfo)
})
// Wait until updates to all BrokerZNode supported features propagate to the controller.
val brokerIds = targetServers.map(s => s.config.brokerId)
waitUntilTrue(
() => servers.exists(s => {
if (s.kafkaController.isActive) {
s.kafkaController.controllerContext.liveOrShuttingDownBrokers
.filter(b => brokerIds.contains(b.id))
.forall(b => {
b.features.equals(features)
})
} else {
false
}
}),
"Controller did not get broker updates")
def outputWithoutEpoch(): String = {
val lines = stream.toString.split(String.format("%n"))
lines.map { line =>
val pos = line.indexOf("Epoch: ")
if (pos > 0) {
line.substring(0, pos)
} else {
line
}
}.mkString(String.format("%n"))
}
}
private def updateSupportedFeaturesInAllBrokers(features: Features[SupportedVersionRange]): Unit = {
updateSupportedFeatures(features, Set[KafkaServer]() ++ servers)
}
class FeatureCommandTest extends IntegrationTestHarness {
override def brokerCount: Int = 1
/**
* Tests if the FeatureApis#describeFeatures API works as expected when describing features before and
* after upgrading features.
*/
@Test
def testDescribeFeaturesSuccess(): Unit = {
updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
override protected def metadataVersion: MetadataVersion = IBP_3_3_IV1
val initialDescribeOutput = TestUtils.grabConsoleOutput(FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "describe")))
val expectedInitialDescribeOutputs = Seq(
"Feature: feature_1\tSupportedMinVersion: 1\tSupportedMaxVersion: 3\tFinalizedVersionLevel: -",
"Feature: feature_2\tSupportedMinVersion: 1\tSupportedMaxVersion: 5\tFinalizedVersionLevel: -"
)
serverConfig.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, metadataVersion.toString)
expectedInitialDescribeOutputs.foreach { expectedOutput =>
assertTrue(initialDescribeOutput.contains(expectedOutput))
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk"))
def testDescribeWithZk(quorum: String): Unit = {
TestUtils.resource(FeatureCommandTestEnv()) { env =>
FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "describe"), env.out)
assertEquals("", env.outputWithoutEpoch())
}
}
FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "upgrade",
"--feature", "feature_1", "--version", "3", "--feature", "feature_2", "--version", "5"))
val upgradeDescribeOutput = TestUtils.grabConsoleOutput(FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "describe")))
val expectedUpgradeDescribeOutput = Seq(
"Feature: feature_1\tSupportedMinVersion: 1\tSupportedMaxVersion: 3\tFinalizedVersionLevel: 3",
"Feature: feature_2\tSupportedMinVersion: 1\tSupportedMaxVersion: 5\tFinalizedVersionLevel: 5"
)
expectedUpgradeDescribeOutput.foreach { expectedOutput =>
assertTrue(upgradeDescribeOutput.contains(expectedOutput))
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("kraft"))
def testDescribeWithKRaft(quorum: String): Unit = {
TestUtils.resource(FeatureCommandTestEnv()) { env =>
FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "describe"), env.out)
assertEquals(String.format(
"Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" +
"SupportedMaxVersion: 3.3-IV3\tFinalizedVersionLevel: 3.3-IV1\t"),
env.outputWithoutEpoch())
}
}
FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "downgrade",
"--feature", "feature_1", "--version", "2", "--feature", "feature_2", "--version", "2"))
val downgradeDescribeOutput = TestUtils.grabConsoleOutput(FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "describe")))
val expectedFinalDescribeOutput = Seq(
"Feature: feature_1\tSupportedMinVersion: 1\tSupportedMaxVersion: 3\tFinalizedVersionLevel: 2",
"Feature: feature_2\tSupportedMinVersion: 1\tSupportedMaxVersion: 5\tFinalizedVersionLevel: 2"
)
expectedFinalDescribeOutput.foreach { expectedOutput =>
assertTrue(downgradeDescribeOutput.contains(expectedOutput))
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk"))
def testUpgradeMetadataVersionWithZk(quorum: String): Unit = {
TestUtils.resource(FeatureCommandTestEnv()) { env =>
FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
"upgrade", "--metadata", "3.3-IV2"), env.out)
assertEquals("Could not upgrade metadata.version to 6. Could not apply finalized feature " +
"update because the provided feature is not supported.", env.outputWithoutEpoch())
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("kraft"))
def testUpgradeMetadataVersionWithKraft(quorum: String): Unit = {
TestUtils.resource(FeatureCommandTestEnv()) { env =>
FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
"upgrade", "--feature", "metadata.version=5"), env.out)
assertEquals("metadata.version was upgraded to 5.", env.outputWithoutEpoch())
}
TestUtils.resource(FeatureCommandTestEnv()) { env =>
FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
"upgrade", "--metadata", "3.3-IV2"), env.out)
assertEquals("metadata.version was upgraded to 6.", env.outputWithoutEpoch())
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk"))
def testDowngradeMetadataVersionWithZk(quorum: String): Unit = {
TestUtils.resource(FeatureCommandTestEnv()) { env =>
FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
"disable", "--feature", "metadata.version"), env.out)
assertEquals("Could not disable metadata.version. Can not delete non-existing finalized feature.",
env.outputWithoutEpoch())
}
TestUtils.resource(FeatureCommandTestEnv()) { env =>
FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
"downgrade", "--metadata", "3.3-IV0"), env.out)
assertEquals("Could not downgrade metadata.version to 4. Could not apply finalized feature " +
"update because the provided feature is not supported.", env.outputWithoutEpoch())
}
TestUtils.resource(FeatureCommandTestEnv()) { env =>
FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
"downgrade", "--unsafe", "--metadata", "3.3-IV0"), env.out)
assertEquals("Could not downgrade metadata.version to 4. Could not apply finalized feature " +
"update because the provided feature is not supported.", env.outputWithoutEpoch())
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("kraft"))
def testDowngradeMetadataVersionWithKRaft(quorum: String): Unit = {
TestUtils.resource(FeatureCommandTestEnv()) { env =>
FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
"disable", "--feature", "metadata.version"), env.out)
assertEquals("Could not disable metadata.version. Invalid update version 0 for feature " +
"metadata.version. Local controller 1000 only supports versions 1-7", env.outputWithoutEpoch())
}
TestUtils.resource(FeatureCommandTestEnv()) { env =>
FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
"downgrade", "--metadata", "3.3-IV0"), env.out)
assertEquals("Could not downgrade metadata.version to 4. Invalid metadata.version 4. " +
"Refusing to perform the requested downgrade because it might delete metadata information. " +
"Retry using UNSAFE_DOWNGRADE if you want to force the downgrade to proceed.", env.outputWithoutEpoch())
}
TestUtils.resource(FeatureCommandTestEnv()) { env =>
FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
"downgrade", "--unsafe", "--metadata", "3.3-IV0"), env.out)
assertEquals("metadata.version was downgraded to 4.", env.outputWithoutEpoch())
}
}
}
class FeatureCommandUnitTest {
@Test
def testLevelToString(): Unit = {
assertEquals("5", FeatureCommand.levelToString("foo.bar", 5.toShort))
assertEquals("3.3-IV0",
FeatureCommand.levelToString(MetadataVersion.FEATURE_NAME, IBP_3_3_IV0.featureLevel()))
}
@Test
def testMetadataVersionsToString(): Unit = {
assertEquals("3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3",
FeatureCommand.metadataVersionsToString(IBP_3_3_IV0, IBP_3_3_IV3))
}
@Test
def testdowngradeType(): Unit = {
assertEquals(SAFE_DOWNGRADE, FeatureCommand.downgradeType(
new Namespace(singletonMap("unsafe", java.lang.Boolean.valueOf(false)))))
assertEquals(UNSAFE_DOWNGRADE, FeatureCommand.downgradeType(
new Namespace(singletonMap("unsafe", java.lang.Boolean.valueOf(true)))))
assertEquals(SAFE_DOWNGRADE, FeatureCommand.downgradeType(new Namespace(emptyMap())))
}
@Test
def testParseNameAndLevel(): Unit = {
assertEquals(("foo.bar", 5.toShort), FeatureCommand.parseNameAndLevel("foo.bar=5"))
assertEquals(("quux", 0.toShort), FeatureCommand.parseNameAndLevel(" quux=0"))
assertEquals("Can't parse feature=level string baaz: equals sign not found.",
assertThrows(classOf[TerseFailure],
() => FeatureCommand.parseNameAndLevel("baaz")).getMessage)
assertEquals("Can't parse feature=level string w=tf: unable to parse tf as a short.",
assertThrows(classOf[TerseFailure],
() => FeatureCommand.parseNameAndLevel("w=tf")).getMessage)
}
def buildAdminClient1(): MockAdminClient = {
new MockAdminClient.Builder().
minSupportedFeatureLevels(Map(
MetadataVersion.FEATURE_NAME -> lang.Short.valueOf(IBP_3_3_IV0.featureLevel()),
"foo.bar" -> lang.Short.valueOf(0.toShort)
).asJava).
featureLevels(Map(
MetadataVersion.FEATURE_NAME -> lang.Short.valueOf(IBP_3_3_IV2.featureLevel()),
"foo.bar" -> lang.Short.valueOf(5.toShort)
).asJava).
maxSupportedFeatureLevels(Map(
MetadataVersion.FEATURE_NAME -> lang.Short.valueOf(IBP_3_3_IV3.featureLevel()),
"foo.bar" -> lang.Short.valueOf(10.toShort)
).asJava).
build()
}
@Test
def testHandleDescribe(): Unit = {
TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env =>
FeatureCommand.handleDescribe(env.out, env.admin)
assertEquals(String.format(
"Feature: foo.bar\tSupportedMinVersion: 0\tSupportedMaxVersion: 10\tFinalizedVersionLevel: 5\tEpoch: 123%n" +
"Feature: metadata.version\tSupportedMinVersion: 3.3-IV0\tSupportedMaxVersion: 3.3-IV3\tFinalizedVersionLevel: 3.3-IV2\tEpoch: 123%n"),
env.stream.toString)
}
}
@Test
def testHandleUpgrade(): Unit = {
TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env =>
FeatureCommand.handleUpgrade(env.out, new Namespace(Map(
"metadata" -> "3.3-IV1",
"feature" -> util.Arrays.asList("foo.bar=6")
).asJava), env.admin)
assertEquals(String.format(
"foo.bar was upgraded to 6.%n" +
"Could not upgrade metadata.version to 5. Can't upgrade to lower version.%n"),
env.stream.toString)
}
}
@Test
def testHandleUpgradeDryRun(): Unit = {
TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env =>
FeatureCommand.handleUpgrade(env.out, new Namespace(Map(
"metadata" -> "3.3-IV1",
"feature" -> util.Arrays.asList("foo.bar=6"),
"dry-run" -> java.lang.Boolean.valueOf(true)
).asJava), env.admin)
assertEquals(String.format(
"foo.bar can be upgraded to 6.%n" +
"Can not upgrade metadata.version to 5. Can't upgrade to lower version.%n"),
env.stream.toString)
}
}
@Test
def testHandleDowngrade(): Unit = {
TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env =>
FeatureCommand.handleDowngrade(env.out, new Namespace(Map(
"metadata" -> "3.3-IV3",
"feature" -> util.Arrays.asList("foo.bar=1")
).asJava), env.admin)
assertEquals(String.format(
"foo.bar was downgraded to 1.%n" +
"Could not downgrade metadata.version to 7. Can't downgrade to newer version.%n"),
env.stream.toString)
}
}
@Test
def testHandleDowngradeDryRun(): Unit = {
TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env =>
FeatureCommand.handleDowngrade(env.out, new Namespace(Map(
"metadata" -> "3.3-IV3",
"feature" -> util.Arrays.asList("foo.bar=1"),
"dry-run" -> java.lang.Boolean.valueOf(true)
).asJava), env.admin)
assertEquals(String.format(
"foo.bar can be downgraded to 1.%n" +
"Can not downgrade metadata.version to 7. Can't downgrade to newer version.%n"),
env.stream.toString)
}
}
@Test
def testHandleDisable(): Unit = {
TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env =>
FeatureCommand.handleDisable(env.out, new Namespace(Map[String, AnyRef](
"feature" -> util.Arrays.asList("foo.bar", "metadata.version", "quux")
).asJava), env.admin)
assertEquals(String.format(
"foo.bar was disabled.%n" +
"Could not disable metadata.version. Can't downgrade below 4%n" +
"quux was disabled.%n"),
env.stream.toString)
}
}
@Test
def testHandleDisableDryRun(): Unit = {
TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env =>
FeatureCommand.handleDisable(env.out, new Namespace(Map[String, AnyRef](
"feature" -> util.Arrays.asList("foo.bar", "metadata.version", "quux"),
"dry-run" -> java.lang.Boolean.valueOf(true)
).asJava), env.admin)
assertEquals(String.format(
"foo.bar can be disabled.%n" +
"Can not disable metadata.version. Can't downgrade below 4%n" +
"quux can be disabled.%n"),
env.stream.toString)
}
}
}

View File

@ -61,8 +61,27 @@
</li>
</ol>
<h4><a id="upgrade_3_3_0" href="#upgrade_3_3_0">Upgrading a KRaft-based cluster to 3.3.0 from any version 3.0.x through 3.2.x</a></h4>
<p><b>If you are upgrading from a version prior to 3.3.0, please see the note below. Once you have changed the metadata.version to the latest version, it will not be possible to downgrade to a version prior to 3.3-IV0.</b></p>
<p><b>For a rolling upgrade:</b></p>
<ol>
<li>Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
</li>
<li>Once the cluster's behavior and performance has been verified, bump the metadata.version by running
<code>
./bin/kafka-features.sh upgrade --metadata 3.3
</code>
</li>
<li>Note that the cluster metadata version cannot be downgraded to a pre-production 3.0.x, 3.1.x, or 3.2.x version once it has been upgraded. However, it is possible to downgrade to production versions such as 3.3-IV0, 3.3-IV1, etc.</li>
</ol>
<h5><a id="upgrade_330_notable" href="#upgrade_330_notable">Notable changes in 3.3.0</a></h5>
<ul>
<li>There is now a slightly different upgrade process for KRaft clusters than for ZK-based clusters, as described above.</li>
<li>Introduced a new API <code>addMetricIfAbsent</code> to <code>Metrics</code> which would create a new Metric if not existing or return the same metric
if already registered. Note that this behaviour is different from <code>addMetric</code> API which throws an <code>IllegalArgumentException</code> when
trying to create an already existing metric. (See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-843%3A+Adding+addMetricIfAbsent+method+to+Metrics">KIP-843</a>
@ -2043,3 +2062,4 @@ Release 0.7 is incompatible with newer releases. Major changes were made to the
</script>
<div class="p-upgrade"></div>
</html>

View File

@ -850,6 +850,19 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
if len(self.pids(node)) == 0:
raise Exception("No process ids recorded on node %s" % node.account.hostname)
def upgrade_metadata_version(self, new_version):
self.run_features_command("upgrade", new_version)
def downgrade_metadata_version(self, new_version):
self.run_features_command("downgrade", new_version)
def run_features_command(self, op, new_version):
cmd = self.path.script("kafka-features.sh ")
cmd += "--bootstrap-server %s " % self.bootstrap_servers()
cmd += "%s --metadata %s" % (op, new_version)
self.logger.info("Running %s command...\n%s" % (op, cmd))
self.nodes[0].account.ssh(cmd)
def pids(self, node):
"""Return process ids associated with running processes on the given node."""
try:

View File

@ -0,0 +1,122 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.mark import parametrize
from ducktape.mark.resource import cluster
from ducktape.utils.util import wait_until
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.kafka import KafkaService
from kafkatest.services.kafka.quorum import remote_kraft, colocated_kraft
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
from kafkatest.version import LATEST_3_0, LATEST_3_1, LATEST_3_2, DEV_BRANCH, \
KafkaVersion, LATEST_METADATA_VERSION
#
# Test upgrading between different KRaft versions.
#
# Note that the earliest supported KRaft version is 3.0, not 0.8 as it is for
# ZK mode. The upgrade process is also somewhat different for KRaft because we
# use metadata.version instead of inter.broker.protocol.
#
class TestKRaftUpgrade(ProduceConsumeValidateTest):
def __init__(self, test_context):
super(TestKRaftUpgrade, self).__init__(test_context=test_context)
self.may_truncate_acked_records = False
def setUp(self):
self.topic = "test_topic"
self.partitions = 3
self.replication_factor = 3
# Producer and consumer
self.producer_throughput = 1000
self.num_producers = 1
self.num_consumers = 1
def wait_until_rejoin(self):
for partition in range(0, self.partitions):
wait_until(lambda: len(self.kafka.isr_idx_list(self.topic, partition)) == self.replication_factor, timeout_sec=60,
backoff_sec=1, err_msg="Replicas did not rejoin the ISR in a reasonable amount of time")
def perform_version_change(self, from_kafka_version):
self.logger.info("Performing rolling upgrade.")
for node in self.kafka.controller_quorum.nodes:
self.logger.info("Stopping controller node %s" % node.account.hostname)
self.kafka.controller_quorum.stop_node(node)
node.version = DEV_BRANCH
self.logger.info("Restarting controller node %s" % node.account.hostname)
self.kafka.controller_quorum.start_node(node)
self.wait_until_rejoin()
self.logger.info("Successfully restarted controller node %s" % node.account.hostname)
for node in self.kafka.nodes:
self.logger.info("Stopping broker node %s" % node.account.hostname)
self.kafka.stop_node(node)
node.version = DEV_BRANCH
self.logger.info("Restarting broker node %s" % node.account.hostname)
self.kafka.start_node(node)
self.wait_until_rejoin()
self.logger.info("Successfully restarted broker node %s" % node.account.hostname)
self.logger.info("Changing metadata.version to %s" % LATEST_METADATA_VERSION)
self.kafka.upgrade_metadata_version(LATEST_METADATA_VERSION)
def run_upgrade(self, from_kafka_version):
"""Test upgrade of Kafka broker cluster from various versions to the current version
from_kafka_version is a Kafka version to upgrade from.
- Start 3 node broker cluster on version 'from_kafka_version'.
- Start producer and consumer in the background.
- Perform rolling upgrade.
- Upgrade cluster to the latest metadata.version.
- Finally, validate that every message acked by the producer was consumed by the consumer.
"""
fromKafkaVersion = KafkaVersion(from_kafka_version)
self.kafka = KafkaService(self.test_context,
num_nodes=3,
zk=None,
version=fromKafkaVersion,
topics={self.topic: {"partitions": self.partitions,
"replication-factor": self.replication_factor,
'configs': {"min.insync.replicas": 2}}})
self.kafka.start()
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
self.topic, throughput=self.producer_throughput,
message_validator=is_int,
compression_types=["none"],
version=KafkaVersion(from_kafka_version))
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
self.topic, new_consumer=True, consumer_timeout_ms=30000,
message_validator=is_int, version=KafkaVersion(from_kafka_version))
self.run_produce_consume_validate(core_test_action=lambda: self.perform_version_change(from_kafka_version))
cluster_id = self.kafka.cluster_id()
assert cluster_id is not None
assert len(cluster_id) == 22
assert self.kafka.check_protocol_errors(self)
@cluster(num_nodes=5)
@parametrize(from_kafka_version=str(LATEST_3_1), metadata_quorum=colocated_kraft)
@parametrize(from_kafka_version=str(LATEST_3_2), metadata_quorum=colocated_kraft)
def test_colocated_upgrade(self, from_kafka_version, metadata_quorum):
self.run_upgrade(from_kafka_version)
@cluster(num_nodes=8)
@parametrize(from_kafka_version=str(LATEST_3_1), metadata_quorum=remote_kraft)
@parametrize(from_kafka_version=str(LATEST_3_2), metadata_quorum=remote_kraft)
def test_non_colocated_upgrade(self, from_kafka_version, metadata_quorum):
self.run_upgrade(from_kafka_version)

View File

@ -121,6 +121,8 @@ def get_version(node=None):
DEV_BRANCH = KafkaVersion("dev")
DEV_VERSION = KafkaVersion("3.4.0-SNAPSHOT")
LATEST_METADATA_VERSION = "3.3"
# 0.8.2.x versions
V_0_8_2_1 = KafkaVersion("0.8.2.1")
V_0_8_2_2 = KafkaVersion("0.8.2.2")