diff --git a/bin/kafka-features.sh b/bin/kafka-features.sh index 9dd9f16fd1b..8d90a0665ae 100755 --- a/bin/kafka-features.sh +++ b/bin/kafka-features.sh @@ -14,4 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -exec $(dirname $0)/kafka-run-class.sh kafka.admin.FeatureCommand "$@" +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.FeatureCommand "$@" diff --git a/bin/windows/kafka-features.bat b/bin/windows/kafka-features.bat new file mode 100644 index 00000000000..a5933fab342 --- /dev/null +++ b/bin/windows/kafka-features.bat @@ -0,0 +1,17 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +"%~dp0kafka-run-class.bat" org.apache.kafka.tools.FeatureCommand %* diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index c3318807f20..255537dd8d0 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -283,6 +283,7 @@ + diff --git a/core/src/main/scala/kafka/admin/FeatureCommand.scala b/core/src/main/scala/kafka/admin/FeatureCommand.scala index 26228c652c8..a7082bcca00 100644 --- a/core/src/main/scala/kafka/admin/FeatureCommand.scala +++ b/core/src/main/scala/kafka/admin/FeatureCommand.scala @@ -17,306 +17,12 @@ package kafka.admin -import kafka.tools.TerseFailure -import kafka.utils.Exit -import net.sourceforge.argparse4j.ArgumentParsers -import net.sourceforge.argparse4j.impl.Arguments.{append, fileType, store, storeTrue} -import net.sourceforge.argparse4j.inf.{ArgumentParserException, Namespace, Subparsers} -import net.sourceforge.argparse4j.internal.HelpScreenException -import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType -import org.apache.kafka.clients.admin.{Admin, FeatureUpdate, UpdateFeaturesOptions} -import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.common.MetadataVersion - -import java.io.{File, PrintStream} -import java.util.Properties -import scala.concurrent.ExecutionException -import scala.jdk.CollectionConverters._ -import scala.compat.java8.OptionConverters._ - +@deprecated(since = "3.5") object FeatureCommand { def main(args: Array[String]): Unit = { - val res = mainNoExit(args, System.out) - Exit.exit(res) - } - - // This is used for integration tests in order to avoid killing the test with Exit.exit, - // and in order to capture the command output. - def mainNoExit( - args: Array[String], - out: PrintStream - ): Int = { - val parser = ArgumentParsers.newArgumentParser("kafka-features") - .defaultHelp(true) - .description("This tool manages feature flags in Kafka.") - parser.addArgument("--bootstrap-server") - .help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.") - .required(true) - - parser.addArgument("--command-config") - .`type`(fileType()) - .help("Property file containing configs to be passed to Admin Client.") - val subparsers = parser.addSubparsers().dest("command") - addDescribeParser(subparsers) - addUpgradeParser(subparsers) - addDowngradeParser(subparsers) - addDisableParser(subparsers) - - try { - val namespace = parser.parseArgs(args) - val command = namespace.getString("command") - - val commandConfig = namespace.get[File]("command_config") - val props = if (commandConfig != null) { - if (!commandConfig.exists()) { - throw new TerseFailure(s"Properties file ${commandConfig.getPath} does not exists!") - } - Utils.loadProps(commandConfig.getPath) - } else { - new Properties() - } - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, namespace.getString("bootstrap_server")) - val admin = Admin.create(props) - try { - command match { - case "describe" => handleDescribe(out, admin) - case "upgrade" => handleUpgrade(out, namespace, admin) - case "downgrade" => handleDowngrade(out, namespace, admin) - case "disable" => handleDisable(out, namespace, admin) - } - } finally { - admin.close() - } - 0 - } catch { - case _: HelpScreenException => - 0 - case e: ArgumentParserException => - System.err.println(s"Command line error: ${e.getMessage}. Type --help for help.") - 1 - case e: TerseFailure => - System.err.println(e.getMessage) - 1 - } - } - - def addDescribeParser(subparsers: Subparsers): Unit = { - subparsers.addParser("describe") - .help("Describes the current active feature flags.") - } - - def addUpgradeParser(subparsers: Subparsers): Unit = { - val upgradeParser = subparsers.addParser("upgrade") - .help("Upgrade one or more feature flags.") - upgradeParser.addArgument("--metadata") - .help("The level to which we should upgrade the metadata. For example, 3.3-IV3.") - .action(store()) - upgradeParser.addArgument("--feature") - .help("A feature upgrade we should perform, in feature=level format. For example: `metadata.version=5`.") - .action(append()) - upgradeParser.addArgument("--dry-run") - .help("Validate this upgrade, but do not perform it.") - .action(storeTrue()) - } - - def addDowngradeParser(subparsers: Subparsers): Unit = { - val downgradeParser = subparsers.addParser("downgrade") - .help("Downgrade one or more feature flags.") - downgradeParser.addArgument("--metadata") - .help("The level to which we should downgrade the metadata. For example, 3.3-IV0.") - .action(store()) - downgradeParser.addArgument("--feature") - .help("A feature downgrade we should perform, in feature=level format. For example: `metadata.version=5`.") - .action(append()) - downgradeParser.addArgument("--unsafe") - .help("Perform this downgrade even if it may irreversibly destroy metadata.") - .action(storeTrue()) - downgradeParser.addArgument("--dry-run") - .help("Validate this downgrade, but do not perform it.") - .action(storeTrue()) - } - - def addDisableParser(subparsers: Subparsers): Unit = { - val disableParser = subparsers.addParser("disable") - .help("Disable one or more feature flags. This is the same as downgrading the version to zero.") - disableParser.addArgument("--feature") - .help("A feature flag to disable.") - .action(append()) - disableParser.addArgument("--unsafe") - .help("Disable this feature flag even if it may irreversibly destroy metadata.") - .action(storeTrue()) - disableParser.addArgument("--dry-run") - .help("Perform a dry-run of this disable operation.") - .action(storeTrue()) - } - - def levelToString( - feature: String, - level: Short - ): String = { - if (feature.equals(MetadataVersion.FEATURE_NAME)) - try MetadataVersion.fromFeatureLevel(level).version - catch { case _: Throwable => s"UNKNOWN [$level]"} - else - level.toString - } - - def handleDescribe( - out: PrintStream, - admin: Admin - ): Unit = { - val featureMetadata = admin.describeFeatures().featureMetadata().get() - val featureList = new java.util.TreeSet[String](featureMetadata.supportedFeatures().keySet()) - featureList.forEach { feature => - val finalizedLevel = featureMetadata.finalizedFeatures().asScala.get(feature) match { - case None => 0.toShort - case Some(v) => v.maxVersionLevel() - } - val range = featureMetadata.supportedFeatures().get(feature) - out.printf("Feature: %s\tSupportedMinVersion: %s\tSupportedMaxVersion: %s\tFinalizedVersionLevel: %s\tEpoch: %s%n", - feature, - levelToString(feature, range.minVersion()), - levelToString(feature, range.maxVersion()), - levelToString(feature, finalizedLevel), - featureMetadata.finalizedFeaturesEpoch().asScala.flatMap(e => Some(e.toString)).getOrElse("-")) - } - } - - def metadataVersionsToString(first: MetadataVersion, last: MetadataVersion): String = { - MetadataVersion.VERSIONS.toList.asJava. - subList(first.ordinal(), last.ordinal() + 1). - asScala.mkString(", ") - } - - def handleUpgrade(out: PrintStream, namespace: Namespace, admin: Admin): Unit = { - handleUpgradeOrDowngrade("upgrade", out, namespace, admin, UpgradeType.UPGRADE) - } - - def downgradeType(namespace: Namespace): UpgradeType = { - val unsafe = namespace.getBoolean("unsafe") - if (unsafe == null || !unsafe) { - UpgradeType.SAFE_DOWNGRADE - } else { - UpgradeType.UNSAFE_DOWNGRADE - } - } - - def handleDowngrade(out: PrintStream, namespace: Namespace, admin: Admin): Unit = { - handleUpgradeOrDowngrade("downgrade", out, namespace, admin, downgradeType(namespace)) - } - - def parseNameAndLevel(input: String): (String, Short) = { - val equalsIndex = input.indexOf("=") - if (equalsIndex < 0) { - throw new TerseFailure(s"Can't parse feature=level string ${input}: equals sign not found.") - } - val name = input.substring(0, equalsIndex).trim - val levelString = input.substring(equalsIndex + 1).trim - val level = - try levelString.toShort - catch { - case _: Throwable => throw new TerseFailure(s"Can't parse feature=level string ${input}: " + - s"unable to parse ${levelString} as a short.") - } - (name, level) - } - - def handleUpgradeOrDowngrade( - op: String, - out: PrintStream, - namespace: Namespace, - admin: Admin, - upgradeType: UpgradeType - ): Unit = { - val updates = new java.util.HashMap[String, FeatureUpdate]() - Option(namespace.getString("metadata")).foreach(metadata => { - val version = try { - MetadataVersion.fromVersionString(metadata) - } catch { - case _: Throwable => throw new TerseFailure("Unsupported metadata version " + metadata + - ". Supported metadata versions are " + metadataVersionsToString( - MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, MetadataVersion.latest())) - } - updates.put(MetadataVersion.FEATURE_NAME, new FeatureUpdate(version.featureLevel(), upgradeType)) - }) - Option(namespace.getList[String]("feature")).foreach(features => { - features.forEach(feature => { - val (name, level) = parseNameAndLevel(feature) - if (updates.put(name, new FeatureUpdate(level, upgradeType)) != null) { - throw new TerseFailure(s"Feature ${name} was specified more than once.") - } - }) - }) - update(op, out, admin, updates, namespace.getBoolean("dry-run")) - } - - def handleDisable(out: PrintStream, namespace: Namespace, admin: Admin): Unit = { - val upgradeType = downgradeType(namespace) - val updates = new java.util.HashMap[String, FeatureUpdate]() - Option(namespace.getList[String]("feature")).foreach(features => { - features.forEach(name => - if (updates.put(name, new FeatureUpdate(0.toShort, upgradeType)) != null) { - throw new TerseFailure(s"Feature ${name} was specified more than once.") - }) - } - ) - update("disable", out, admin, updates, namespace.getBoolean("dry-run")) - } - - def update( - op: String, - out: PrintStream, - admin: Admin, - updates: java.util.HashMap[String, FeatureUpdate], - dryRun: Boolean - ): Unit = { - if (updates.isEmpty) { - throw new TerseFailure(s"You must specify at least one feature to ${op}") - } - val result = admin.updateFeatures(updates, new UpdateFeaturesOptions().validateOnly(dryRun)) - val errors = result.values().asScala.map { case (feature, future) => - try { - future.get() - feature -> None - } catch { - case e: ExecutionException => feature -> Some(e.getCause) - case t: Throwable => feature -> Some(t) - } - } - var numFailures = 0 - errors.keySet.toList.sorted.foreach { feature => - val maybeThrowable = errors(feature) - val level = updates.get(feature).maxVersionLevel() - if (maybeThrowable.isDefined) { - val helper = if (dryRun) { - "Can not" - } else { - "Could not" - } - val suffix = if (op.equals("disable")) { - s"disable ${feature}" - } else { - s"${op} ${feature} to ${level}" - } - out.println(s"${helper} ${suffix}. ${maybeThrowable.get.getMessage}") - numFailures = numFailures + 1 - } else { - val verb = if (dryRun) { - "can be" - } else { - "was" - } - val obj = if (op.equals("disable")) { - "disabled." - } else { - s"${op}d to ${level}." - } - out.println(s"${feature} ${verb} ${obj}") - } - } - if (numFailures > 0) { - throw new TerseFailure(s"${numFailures} out of ${updates.size} operation(s) failed.") - } + println("WARNING: The 'kafka.tools' package is deprecated and will change to 'org.apache.kafka.tools' in the next major release.") + val toolClass = Class.forName("org.apache.kafka.tools.FeatureCommand") + val toolMethod = toolClass.getDeclaredMethod("main", classOf[Array[String]]) + toolMethod.invoke(null, args) } } diff --git a/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala deleted file mode 100644 index 41cc3bd69f2..00000000000 --- a/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala +++ /dev/null @@ -1,327 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.admin - -import kafka.api.IntegrationTestHarness -import kafka.server.KafkaConfig -import kafka.tools.TerseFailure -import kafka.utils.{TestInfoUtils, TestUtils} -import net.sourceforge.argparse4j.inf.Namespace -import org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType.{SAFE_DOWNGRADE, UNSAFE_DOWNGRADE} -import org.apache.kafka.clients.admin.MockAdminClient -import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.common.MetadataVersion -import org.apache.kafka.server.common.MetadataVersion.{IBP_3_3_IV0, IBP_3_3_IV1, IBP_3_3_IV2, IBP_3_3_IV3, IBP_3_5_IV1} -import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} -import org.junit.jupiter.api.Test -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource - -import java.io.{ByteArrayOutputStream, PrintStream} -import java.{lang, util} -import java.util.Collections.{emptyMap, singletonMap} -import scala.jdk.CollectionConverters._ - -case class FeatureCommandTestEnv(admin: MockAdminClient = null) extends AutoCloseable { - val stream = new ByteArrayOutputStream() - val out = new PrintStream(stream) - - override def close(): Unit = { - Utils.closeAll(stream, out) - Utils.closeQuietly(admin, "admin") - } - - def outputWithoutEpoch(): String = { - val lines = stream.toString.split(String.format("%n")) - lines.map { line => - val pos = line.indexOf("Epoch: ") - if (pos > 0) { - line.substring(0, pos) - } else { - line - } - }.mkString(String.format("%n")) - } -} - -class FeatureCommandTest extends IntegrationTestHarness { - override def brokerCount: Int = 1 - - override protected def metadataVersion: MetadataVersion = IBP_3_3_IV1 - - serverConfig.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, metadataVersion.toString) - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk")) - def testDescribeWithZk(quorum: String): Unit = { - TestUtils.resource(FeatureCommandTestEnv()) { env => - assertEquals(0, FeatureCommand.mainNoExit( - Array("--bootstrap-server", bootstrapServers(), "describe"), env.out)) - assertEquals("", env.outputWithoutEpoch()) - } - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("kraft")) - def testDescribeWithKRaft(quorum: String): Unit = { - TestUtils.resource(FeatureCommandTestEnv()) { env => - assertEquals(0, FeatureCommand.mainNoExit( - Array("--bootstrap-server", bootstrapServers(), "describe"), env.out)) - assertEquals(String.format( - "Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" + - "SupportedMaxVersion: 3.5-IV2\tFinalizedVersionLevel: 3.3-IV1\t"), - env.outputWithoutEpoch()) - } - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk")) - def testUpgradeMetadataVersionWithZk(quorum: String): Unit = { - TestUtils.resource(FeatureCommandTestEnv()) { env => - assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), - "upgrade", "--metadata", "3.3-IV2"), env.out)) - assertEquals("Could not upgrade metadata.version to 6. Could not apply finalized feature " + - "update because the provided feature is not supported.", env.outputWithoutEpoch()) - } - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("kraft")) - def testUpgradeMetadataVersionWithKraft(quorum: String): Unit = { - TestUtils.resource(FeatureCommandTestEnv()) { env => - assertEquals(0, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), - "upgrade", "--feature", "metadata.version=5"), env.out)) - assertEquals("metadata.version was upgraded to 5.", env.outputWithoutEpoch()) - } - TestUtils.resource(FeatureCommandTestEnv()) { env => - assertEquals(0, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), - "upgrade", "--metadata", "3.3-IV2"), env.out)) - assertEquals("metadata.version was upgraded to 6.", env.outputWithoutEpoch()) - } - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk")) - def testDowngradeMetadataVersionWithZk(quorum: String): Unit = { - TestUtils.resource(FeatureCommandTestEnv()) { env => - assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), - "disable", "--feature", "metadata.version"), env.out)) - assertEquals("Could not disable metadata.version. Can not delete non-existing finalized feature.", - env.outputWithoutEpoch()) - } - TestUtils.resource(FeatureCommandTestEnv()) { env => - assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), - "downgrade", "--metadata", "3.3-IV0"), env.out)) - assertEquals("Could not downgrade metadata.version to 4. Could not apply finalized feature " + - "update because the provided feature is not supported.", env.outputWithoutEpoch()) - } - TestUtils.resource(FeatureCommandTestEnv()) { env => - assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), - "downgrade", "--unsafe", "--metadata", "3.3-IV0"), env.out)) - assertEquals("Could not downgrade metadata.version to 4. Could not apply finalized feature " + - "update because the provided feature is not supported.", env.outputWithoutEpoch()) - } - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("kraft")) - def testDowngradeMetadataVersionWithKRaft(quorum: String): Unit = { - TestUtils.resource(FeatureCommandTestEnv()) { env => - assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), - "disable", "--feature", "metadata.version"), env.out)) - assertEquals("Could not disable metadata.version. Invalid update version 0 for feature " + - "metadata.version. Local controller 1000 only supports versions 1-11", env.outputWithoutEpoch()) - } - TestUtils.resource(FeatureCommandTestEnv()) { env => - assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), - "downgrade", "--metadata", "3.3-IV0"), env.out)) - assertEquals("Could not downgrade metadata.version to 4. Invalid metadata.version 4. " + - "Refusing to perform the requested downgrade because it might delete metadata information. " + - "Retry using UNSAFE_DOWNGRADE if you want to force the downgrade to proceed.", env.outputWithoutEpoch()) - } - TestUtils.resource(FeatureCommandTestEnv()) { env => - assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), - "downgrade", "--unsafe", "--metadata", "3.3-IV0"), env.out)) - assertEquals("Could not downgrade metadata.version to 4. Invalid metadata.version 4. " + - "Unsafe metadata downgrade is not supported in this version.", env.outputWithoutEpoch()) - } - } -} - -class FeatureCommandUnitTest { - @Test - def testLevelToString(): Unit = { - assertEquals("5", FeatureCommand.levelToString("foo.bar", 5.toShort)) - assertEquals("3.3-IV0", - FeatureCommand.levelToString(MetadataVersion.FEATURE_NAME, IBP_3_3_IV0.featureLevel())) - } - - @Test - def testMetadataVersionsToString(): Unit = { - assertEquals("3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3, 3.4-IV0, 3.5-IV0, 3.5-IV1", - FeatureCommand.metadataVersionsToString(IBP_3_3_IV0, IBP_3_5_IV1)) - } - - @Test - def testdowngradeType(): Unit = { - assertEquals(SAFE_DOWNGRADE, FeatureCommand.downgradeType( - new Namespace(singletonMap("unsafe", java.lang.Boolean.valueOf(false))))) - assertEquals(UNSAFE_DOWNGRADE, FeatureCommand.downgradeType( - new Namespace(singletonMap("unsafe", java.lang.Boolean.valueOf(true))))) - assertEquals(SAFE_DOWNGRADE, FeatureCommand.downgradeType(new Namespace(emptyMap()))) - } - - @Test - def testParseNameAndLevel(): Unit = { - assertEquals(("foo.bar", 5.toShort), FeatureCommand.parseNameAndLevel("foo.bar=5")) - assertEquals(("quux", 0.toShort), FeatureCommand.parseNameAndLevel(" quux=0")) - assertEquals("Can't parse feature=level string baaz: equals sign not found.", - assertThrows(classOf[TerseFailure], - () => FeatureCommand.parseNameAndLevel("baaz")).getMessage) - assertEquals("Can't parse feature=level string w=tf: unable to parse tf as a short.", - assertThrows(classOf[TerseFailure], - () => FeatureCommand.parseNameAndLevel("w=tf")).getMessage) - } - - def buildAdminClient1(): MockAdminClient = { - new MockAdminClient.Builder(). - minSupportedFeatureLevels(Map( - MetadataVersion.FEATURE_NAME -> lang.Short.valueOf(IBP_3_3_IV0.featureLevel()), - "foo.bar" -> lang.Short.valueOf(0.toShort) - ).asJava). - featureLevels(Map( - MetadataVersion.FEATURE_NAME -> lang.Short.valueOf(IBP_3_3_IV2.featureLevel()), - "foo.bar" -> lang.Short.valueOf(5.toShort) - ).asJava). - maxSupportedFeatureLevels(Map( - MetadataVersion.FEATURE_NAME -> lang.Short.valueOf(IBP_3_3_IV3.featureLevel()), - "foo.bar" -> lang.Short.valueOf(10.toShort) - ).asJava). - build() - } - - @Test - def testHandleDescribe(): Unit = { - TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env => - FeatureCommand.handleDescribe(env.out, env.admin) - assertEquals(String.format( - "Feature: foo.bar\tSupportedMinVersion: 0\tSupportedMaxVersion: 10\tFinalizedVersionLevel: 5\tEpoch: 123%n" + - "Feature: metadata.version\tSupportedMinVersion: 3.3-IV0\tSupportedMaxVersion: 3.3-IV3\tFinalizedVersionLevel: 3.3-IV2\tEpoch: 123%n"), - env.stream.toString) - } - } - - @Test - def testHandleUpgrade(): Unit = { - TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env => - assertEquals("1 out of 2 operation(s) failed.", - assertThrows(classOf[TerseFailure], () => - FeatureCommand.handleUpgrade(env.out, new Namespace(Map( - "metadata" -> "3.3-IV1", - "feature" -> util.Arrays.asList("foo.bar=6") - ).asJava), env.admin)).getMessage) - assertEquals(String.format( - "foo.bar was upgraded to 6.%n" + - "Could not upgrade metadata.version to 5. Can't upgrade to lower version.%n"), - env.stream.toString) - } - } - - @Test - def testHandleUpgradeDryRun(): Unit = { - TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env => - assertEquals("1 out of 2 operation(s) failed.", - assertThrows(classOf[TerseFailure], () => - FeatureCommand.handleUpgrade(env.out, new Namespace(Map( - "metadata" -> "3.3-IV1", - "feature" -> util.Arrays.asList("foo.bar=6"), - "dry-run" -> java.lang.Boolean.valueOf(true) - ).asJava), env.admin)).getMessage) - assertEquals(String.format( - "foo.bar can be upgraded to 6.%n" + - "Can not upgrade metadata.version to 5. Can't upgrade to lower version.%n"), - env.stream.toString) - } - } - - @Test - def testHandleDowngrade(): Unit = { - TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env => - assertEquals("1 out of 2 operation(s) failed.", - assertThrows(classOf[TerseFailure], () => - FeatureCommand.handleDowngrade(env.out, new Namespace(Map( - "metadata" -> "3.3-IV3", - "feature" -> util.Arrays.asList("foo.bar=1") - ).asJava), env.admin)).getMessage) - assertEquals(String.format( - "foo.bar was downgraded to 1.%n" + - "Could not downgrade metadata.version to 7. Can't downgrade to newer version.%n"), - env.stream.toString) - } - } - - @Test - def testHandleDowngradeDryRun(): Unit = { - TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env => - assertEquals("1 out of 2 operation(s) failed.", - assertThrows(classOf[TerseFailure], () => - FeatureCommand.handleDowngrade(env.out, new Namespace(Map( - "metadata" -> "3.3-IV3", - "feature" -> util.Arrays.asList("foo.bar=1"), - "dry-run" -> java.lang.Boolean.valueOf(true) - ).asJava), env.admin)).getMessage) - assertEquals(String.format( - "foo.bar can be downgraded to 1.%n" + - "Can not downgrade metadata.version to 7. Can't downgrade to newer version.%n"), - env.stream.toString) - } - } - - @Test - def testHandleDisable(): Unit = { - TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env => - assertEquals("1 out of 3 operation(s) failed.", - assertThrows(classOf[TerseFailure], () => - FeatureCommand.handleDisable(env.out, new Namespace(Map[String, AnyRef]( - "feature" -> util.Arrays.asList("foo.bar", "metadata.version", "quux") - ).asJava), env.admin)).getMessage) - assertEquals(String.format( - "foo.bar was disabled.%n" + - "Could not disable metadata.version. Can't downgrade below 4%n" + - "quux was disabled.%n"), - env.stream.toString) - } - } - - @Test - def testHandleDisableDryRun(): Unit = { - TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env => - assertEquals("1 out of 3 operation(s) failed.", - assertThrows(classOf[TerseFailure], () => - FeatureCommand.handleDisable(env.out, new Namespace(Map[String, AnyRef]( - "feature" -> util.Arrays.asList("foo.bar", "metadata.version", "quux"), - "dry-run" -> java.lang.Boolean.valueOf(true) - ).asJava), env.admin)).getMessage) - assertEquals(String.format( - "foo.bar can be disabled.%n" + - "Can not disable metadata.version. Can't downgrade below 4%n" + - "quux can be disabled.%n"), - env.stream.toString) - } - } -} diff --git a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java new file mode 100644 index 00000000000..ab6e8139b30 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.impl.Arguments; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import net.sourceforge.argparse4j.inf.Subparsers; +import net.sourceforge.argparse4j.internal.HelpScreenException; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.FeatureMetadata; +import org.apache.kafka.clients.admin.FeatureUpdate; +import org.apache.kafka.clients.admin.SupportedVersionRange; +import org.apache.kafka.clients.admin.UpdateFeaturesOptions; +import org.apache.kafka.clients.admin.UpdateFeaturesResult; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.MetadataVersion; + +import static net.sourceforge.argparse4j.impl.Arguments.append; +import static net.sourceforge.argparse4j.impl.Arguments.store; +import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; + +public class FeatureCommand { + public static void main(String... args) { + Exit.exit(mainNoExit(args)); + } + + static int mainNoExit(String... args) { + try { + execute(args); + return 0; + } catch (HelpScreenException e) { + return 0; + } catch (ArgumentParserException e) { + System.err.printf("Command line error: " + e.getMessage() + ". Type --help for help."); + return 1; + } catch (Throwable e) { + System.err.println(e.getMessage()); + return 1; + } + } + + static void execute(String... args) throws Exception { + ArgumentParser parser = ArgumentParsers + .newArgumentParser("kafka-features") + .defaultHelp(true) + .description("This tool manages feature flags in Kafka."); + parser + .addArgument("--bootstrap-server") + .help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.") + .required(true); + + parser + .addArgument("--command-config") + .type(Arguments.fileType()) + .help("Property file containing configs to be passed to Admin Client."); + Subparsers subparsers = parser.addSubparsers().dest("command"); + addDescribeParser(subparsers); + addUpgradeParser(subparsers); + addDowngradeParser(subparsers); + addDisableParser(subparsers); + + Namespace namespace = parser.parseArgsOrFail(args); + String command = namespace.getString("command"); + String configPath = namespace.getString("command_config"); + Properties properties = (configPath == null) ? new Properties() : Utils.loadProps(configPath); + + String bootstrapServer = namespace.getString("bootstrap_server"); + if (bootstrapServer != null) { + properties.setProperty("bootstrap.servers", bootstrapServer); + } + if (properties.getProperty("bootstrap.servers") == null) { + throw new TerseException("Please specify --bootstrap-server."); + } + + try (Admin adminClient = Admin.create(properties)) { + switch (command) { + case "describe": + handleDescribe(adminClient); + break; + case "upgrade": + handleUpgrade(namespace, adminClient); + break; + case "downgrade": + handleDowngrade(namespace, adminClient); + break; + case "disable": + handleDisable(namespace, adminClient); + break; + default: + throw new TerseException("Unknown command " + command); + } + } + } + + private static void addDescribeParser(Subparsers subparsers) { + subparsers.addParser("describe") + .help("Describes the current active feature flags."); + } + + private static void addUpgradeParser(Subparsers subparsers) { + Subparser upgradeParser = subparsers.addParser("upgrade") + .help("Upgrade one or more feature flags."); + upgradeParser.addArgument("--metadata") + .help("The level to which we should upgrade the metadata. For example, 3.3-IV3.") + .action(store()); + upgradeParser.addArgument("--feature") + .help("A feature upgrade we should perform, in feature=level format. For example: `metadata.version=5`.") + .action(append()); + upgradeParser.addArgument("--dry-run") + .help("Validate this upgrade, but do not perform it.") + .action(storeTrue()); + + } + + private static void addDowngradeParser(Subparsers subparsers) { + Subparser downgradeParser = subparsers.addParser("downgrade") + .help("Upgrade one or more feature flags."); + downgradeParser.addArgument("--metadata") + .help("The level to which we should downgrade the metadata. For example, 3.3-IV0.") + .action(store()); + downgradeParser.addArgument("--feature") + .help("A feature downgrade we should perform, in feature=level format. For example: `metadata.version=5`.") + .action(append()); + downgradeParser.addArgument("--unsafe") + .help("Perform this downgrade even if it may irreversibly destroy metadata.") + .action(storeTrue()); + downgradeParser.addArgument("--dry-run") + .help("Validate this downgrade, but do not perform it.") + .action(storeTrue()); + } + + private static void addDisableParser(Subparsers subparsers) { + Subparser downgradeParser = subparsers.addParser("disable") + .help("Disable one or more feature flags. This is the same as downgrading the version to zero."); + downgradeParser.addArgument("--feature") + .help("A feature flag to disable.") + .action(append()); + downgradeParser.addArgument("--unsafe") + .help("Disable this feature flag even if it may irreversibly destroy metadata.") + .action(storeTrue()); + downgradeParser.addArgument("--dry-run") + .help("Perform a dry-run of this disable operation.") + .action(storeTrue()); + } + + static String levelToString(String feature, short level) { + if (feature.equals(MetadataVersion.FEATURE_NAME)) { + try { + return MetadataVersion.fromFeatureLevel(level).version(); + } catch (Throwable e) { + return "UNKNOWN " + level; + } + } + return String.valueOf(level); + } + + static void handleDescribe(Admin adminClient) throws ExecutionException, InterruptedException { + FeatureMetadata featureMetadata = adminClient.describeFeatures().featureMetadata().get(); + featureMetadata.supportedFeatures().keySet().stream().sorted().forEach(feature -> { + short finalizedLevel = (featureMetadata.finalizedFeatures().get(feature) == null) ? 0 : featureMetadata.finalizedFeatures().get(feature).maxVersionLevel(); + SupportedVersionRange range = featureMetadata.supportedFeatures().get(feature); + System.out.printf("Feature: %s\tSupportedMinVersion: %s\tSupportedMaxVersion: %s\tFinalizedVersionLevel: %s\tEpoch: %s%n", + feature, + levelToString(feature, range.minVersion()), + levelToString(feature, range.maxVersion()), + levelToString(feature, finalizedLevel), + (featureMetadata.finalizedFeaturesEpoch().isPresent()) ? featureMetadata.finalizedFeaturesEpoch().get().toString() : "-"); + }); + } + + static String metadataVersionsToString(MetadataVersion first, MetadataVersion last) { + List versions = Arrays.asList(MetadataVersion.VERSIONS).subList(first.ordinal(), last.ordinal() + 1); + return versions.stream() + .map(String::valueOf) + .collect(Collectors.joining(", ")); + } + + static void handleUpgrade(Namespace namespace, Admin adminClient) throws TerseException { + handleUpgradeOrDowngrade("upgrade", namespace, adminClient, FeatureUpdate.UpgradeType.UPGRADE); + } + + static FeatureUpdate.UpgradeType downgradeType(Namespace namespace) { + Boolean unsafe = namespace.getBoolean("unsafe"); + if (unsafe == null || !unsafe) { + return FeatureUpdate.UpgradeType.SAFE_DOWNGRADE; + } else { + return FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE; + } + } + + static void handleDowngrade(Namespace namespace, Admin adminClient) throws TerseException { + handleUpgradeOrDowngrade("downgrade", namespace, adminClient, downgradeType(namespace)); + } + + static String[] parseNameAndLevel(String input) { + int equalsIndex = input.indexOf("="); + if (equalsIndex < 0) { + throw new RuntimeException("Can't parse feature=level string " + input + ": equals sign not found."); + } + String name = input.substring(0, equalsIndex).trim(); + String levelString = input.substring(equalsIndex + 1).trim(); + try { + Short.parseShort(levelString); + } catch (Throwable t) { + throw new RuntimeException("Can't parse feature=level string " + input + ": " + + "unable to parse " + levelString + " as a short."); + } + return new String[]{name, levelString}; + } + + private static void handleUpgradeOrDowngrade(String op, Namespace namespace, Admin admin, FeatureUpdate.UpgradeType upgradeType) throws TerseException { + Map updates = new HashMap<>(); + MetadataVersion version; + String metadata = namespace.getString("metadata"); + if (metadata != null) { + try { + version = MetadataVersion.fromVersionString(metadata); + } catch (Throwable e) { + throw new TerseException("Unsupported metadata version " + metadata + + ". Supported metadata versions are " + metadataVersionsToString( + MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, MetadataVersion.latest())); + } + updates.put(MetadataVersion.FEATURE_NAME, new FeatureUpdate(version.featureLevel(), upgradeType)); + } + + List features = namespace.getList("feature"); + if (features != null) { + features.forEach(feature -> { + String[] nameAndLevel; + nameAndLevel = parseNameAndLevel(feature); + + if (updates.put(nameAndLevel[0], new FeatureUpdate(Short.parseShort(nameAndLevel[1]), upgradeType)) != null) { + throw new RuntimeException("Feature " + nameAndLevel[0] + " was specified more than once."); + } + }); + } + + update(op, admin, updates, namespace.getBoolean("dry_run")); + } + + static void handleDisable(Namespace namespace, Admin adminClient) throws TerseException { + FeatureUpdate.UpgradeType upgradeType = downgradeType(namespace); + Map updates = new HashMap<>(); + + List features = namespace.getList("feature"); + if (features != null) { + features.forEach(feature -> { + if (updates.put(feature, new FeatureUpdate((short) 0, upgradeType)) != null) { + throw new RuntimeException("Feature " + feature + " was specified more than once."); + } + }); + } + + update("disable", adminClient, updates, namespace.getBoolean("dry_run")); + } + + private static void update(String op, Admin admin, Map updates, Boolean dryRun) throws TerseException { + if (updates.isEmpty()) { + throw new TerseException("You must specify at least one feature to " + op); + } + + UpdateFeaturesResult result = admin.updateFeatures(updates, new UpdateFeaturesOptions().validateOnly(dryRun)); + Map> errors = new TreeMap<>(); + result.values().forEach((feature, future) -> { + try { + future.get(); + errors.put(feature, null); + } catch (ExecutionException e) { + errors.put(feature, Optional.ofNullable(e.getCause())); + } catch (Throwable t) { + errors.put(feature, Optional.of(t)); + } + }); + + int numFailures = 0; + for (Map.Entry> feature: errors.entrySet()) { + short level = updates.get(feature.getKey()).maxVersionLevel(); + Optional maybeThrowable = feature.getValue(); + if (maybeThrowable != null && maybeThrowable.isPresent()) { + String helper = dryRun ? "Can not " : "Could not "; + String suffix = (op.equals("disable")) ? "disable " + feature.getKey() : op + " " + feature.getKey() + " to " + level; + System.out.println(helper + suffix + ". " + maybeThrowable.get().getMessage()); + numFailures++; + } else { + String verb = dryRun ? " can be " : " was "; + String obj = (op.equals("disable")) ? "disabled." : op + "d to " + level + "."; + System.out.println(feature.getKey() + verb + obj); + } + } + + if (numFailures > 0) { + throw new TerseException(numFailures + " out of " + updates.size() + " operation(s) failed."); + } + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java new file mode 100644 index 00000000000..2e476dd523a --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.server.common.MetadataVersion; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import static java.lang.String.format; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; + +import static org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType.SAFE_DOWNGRADE; +import static org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.KRAFT) +@Tag("integration") +public class FeatureCommandTest { + + private final ClusterInstance cluster; + public FeatureCommandTest(ClusterInstance cluster) { + this.cluster = cluster; + } + + @ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1) + public void testDescribeWithZK() { + String commandOutput = ToolsTestUtils.captureStandardOut(() -> + assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe")) + ); + assertEquals("", commandOutput); + } + + @ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV1) + public void testDescribeWithKRaft() { + String commandOutput = ToolsTestUtils.captureStandardOut(() -> + assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe")) + ); + assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" + + "SupportedMaxVersion: 3.5-IV2\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(commandOutput)); + } + + @ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1) + public void testUpgradeMetadataVersionWithZk() { + String commandOutput = ToolsTestUtils.captureStandardOut(() -> + assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), + "upgrade", "--metadata", "3.3-IV2")) + ); + assertEquals("Could not upgrade metadata.version to 6. Could not apply finalized feature " + + "update because the provided feature is not supported.", commandOutput); + } + + @ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV1) + public void testUpgradeMetadataVersionWithKraft() { + String commandOutput = ToolsTestUtils.captureStandardOut(() -> + assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), + "upgrade", "--feature", "metadata.version=5")) + ); + assertEquals("metadata.version was upgraded to 5.", commandOutput); + + commandOutput = ToolsTestUtils.captureStandardOut(() -> + assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), + "upgrade", "--metadata", "3.3-IV2")) + ); + assertEquals("metadata.version was upgraded to 6.", commandOutput); + } + + @ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1) + public void testDowngradeMetadataVersionWithZk() { + String commandOutput = ToolsTestUtils.captureStandardOut(() -> + assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), + "disable", "--feature", "metadata.version")) + ); + assertEquals("Could not disable metadata.version. Can not delete non-existing finalized feature.", commandOutput); + + commandOutput = ToolsTestUtils.captureStandardOut(() -> + assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), + "downgrade", "--metadata", "3.3-IV0")) + ); + assertEquals("Could not downgrade metadata.version to 4. Could not apply finalized feature " + + "update because the provided feature is not supported.", commandOutput); + + commandOutput = ToolsTestUtils.captureStandardOut(() -> + assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), + "downgrade", "--unsafe", "--metadata", "3.3-IV0")) + ); + assertEquals("Could not downgrade metadata.version to 4. Could not apply finalized feature " + + "update because the provided feature is not supported.", commandOutput); + } + + @ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV1) + public void testDowngradeMetadataVersionWithKRaft() { + String commandOutput = ToolsTestUtils.captureStandardOut(() -> + assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), + "disable", "--feature", "metadata.version")) + ); + assertEquals("Could not disable metadata.version. Invalid update version 0 for feature " + + "metadata.version. Local controller 3000 only supports versions 1-11", commandOutput); + + commandOutput = ToolsTestUtils.captureStandardOut(() -> + assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), + "downgrade", "--metadata", "3.3-IV0")) + + ); + assertEquals("Could not downgrade metadata.version to 4. Invalid metadata.version 4. " + + "Refusing to perform the requested downgrade because it might delete metadata information. " + + "Retry using UNSAFE_DOWNGRADE if you want to force the downgrade to proceed.", commandOutput); + + commandOutput = ToolsTestUtils.captureStandardOut(() -> + assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), + "downgrade", "--unsafe", "--metadata", "3.3-IV0")) + + ); + assertEquals("Could not downgrade metadata.version to 4. Invalid metadata.version 4. " + + "Unsafe metadata downgrade is not supported in this version.", commandOutput); + } + + private String outputWithoutEpoch(String output) { + int pos = output.indexOf("Epoch: "); + return (pos > 0) ? output.substring(0, pos) : output; + } +} + +class FeatureCommandUnitTest { + @Test + public void testLevelToString() { + assertEquals("5", FeatureCommand.levelToString("foo.bar", (short) 5)); + assertEquals("3.3-IV0", + FeatureCommand.levelToString(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV0.featureLevel())); + } + + @Test + public void testMetadataVersionsToString() { + assertEquals("3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3", + FeatureCommand.metadataVersionsToString(MetadataVersion.IBP_3_3_IV0, MetadataVersion.IBP_3_3_IV3)); + } + + @Test + public void testdowngradeType() { + assertEquals(SAFE_DOWNGRADE, FeatureCommand.downgradeType( + new Namespace(singletonMap("unsafe", Boolean.FALSE)))); + assertEquals(UNSAFE_DOWNGRADE, FeatureCommand.downgradeType( + new Namespace(singletonMap("unsafe", Boolean.TRUE)))); + assertEquals(SAFE_DOWNGRADE, FeatureCommand.downgradeType(new Namespace(emptyMap()))); + } + + @Test + public void testParseNameAndLevel() { + assertArrayEquals(new String[]{"foo.bar", "5"}, FeatureCommand.parseNameAndLevel("foo.bar=5")); + assertArrayEquals(new String[]{"quux", "0"}, FeatureCommand.parseNameAndLevel("quux=0")); + assertTrue(assertThrows(RuntimeException.class, () -> FeatureCommand.parseNameAndLevel("baaz")) + .getMessage().contains("Can't parse feature=level string baaz: equals sign not found.")); + assertTrue(assertThrows(RuntimeException.class, () -> FeatureCommand.parseNameAndLevel("w=tf")) + .getMessage().contains("Can't parse feature=level string w=tf: unable to parse tf as a short.")); + } + + private static MockAdminClient buildAdminClient() { + Map minSupportedFeatureLevels = new HashMap<>(); + minSupportedFeatureLevels.put(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV0.featureLevel()); + minSupportedFeatureLevels.put("foo.bar", (short) 0); + + Map featureLevels = new HashMap<>(); + featureLevels.put(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV2.featureLevel()); + featureLevels.put("foo.bar", (short) 5); + + Map maxSupportedFeatureLevels = new HashMap<>(); + maxSupportedFeatureLevels.put(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV3.featureLevel()); + maxSupportedFeatureLevels.put("foo.bar", (short) 10); + + return new MockAdminClient.Builder(). + minSupportedFeatureLevels(minSupportedFeatureLevels). + featureLevels(featureLevels). + maxSupportedFeatureLevels(maxSupportedFeatureLevels).build(); + } + + @Test + public void testHandleDescribe() { + String describeResult = ToolsTestUtils.captureStandardOut(() -> { + try { + FeatureCommand.handleDescribe(buildAdminClient()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + assertEquals(format("Feature: foo.bar\tSupportedMinVersion: 0\tSupportedMaxVersion: 10\tFinalizedVersionLevel: 5\tEpoch: 123%n" + + "Feature: metadata.version\tSupportedMinVersion: 3.3-IV0\tSupportedMaxVersion: 3.3-IV3\tFinalizedVersionLevel: 3.3-IV2\tEpoch: 123"), describeResult); + } + + @Test + public void testHandleUpgrade() { + Map namespace = new HashMap<>(); + namespace.put("metadata", "3.3-IV1"); + namespace.put("feature", Collections.singletonList("foo.bar=6")); + namespace.put("dry_run", false); + String upgradeOutput = ToolsTestUtils.captureStandardOut(() -> { + Throwable t = assertThrows(TerseException.class, () -> FeatureCommand.handleUpgrade(new Namespace(namespace), buildAdminClient())); + assertTrue(t.getMessage().contains("1 out of 2 operation(s) failed.")); + }); + assertEquals(format("foo.bar was upgraded to 6.%n" + + "Could not upgrade metadata.version to 5. Can't upgrade to lower version."), upgradeOutput); + } + + @Test + public void testHandleUpgradeDryRun() { + Map namespace = new HashMap<>(); + namespace.put("metadata", "3.3-IV1"); + namespace.put("feature", Collections.singletonList("foo.bar=6")); + namespace.put("dry_run", true); + String upgradeOutput = ToolsTestUtils.captureStandardOut(() -> { + Throwable t = assertThrows(TerseException.class, () -> FeatureCommand.handleUpgrade(new Namespace(namespace), buildAdminClient())); + assertTrue(t.getMessage().contains("1 out of 2 operation(s) failed.")); + }); + assertEquals(format("foo.bar can be upgraded to 6.%n" + + "Can not upgrade metadata.version to 5. Can't upgrade to lower version."), upgradeOutput); + } + + @Test + public void testHandleDowngrade() { + Map namespace = new HashMap<>(); + namespace.put("metadata", "3.3-IV3"); + namespace.put("feature", Collections.singletonList("foo.bar=1")); + namespace.put("dry_run", false); + String downgradeOutput = ToolsTestUtils.captureStandardOut(() -> { + Throwable t = assertThrows(TerseException.class, () -> FeatureCommand.handleDowngrade(new Namespace(namespace), buildAdminClient())); + assertTrue(t.getMessage().contains("1 out of 2 operation(s) failed.")); + }); + assertEquals(format("foo.bar was downgraded to 1.%n" + + "Could not downgrade metadata.version to 7. Can't downgrade to newer version."), downgradeOutput); + } + + @Test + public void testHandleDowngradeDryRun() { + Map namespace = new HashMap<>(); + namespace.put("metadata", "3.3-IV3"); + namespace.put("feature", Collections.singletonList("foo.bar=1")); + namespace.put("dry_run", true); + String downgradeOutput = ToolsTestUtils.captureStandardOut(() -> { + Throwable t = assertThrows(TerseException.class, () -> FeatureCommand.handleDowngrade(new Namespace(namespace), buildAdminClient())); + assertTrue(t.getMessage().contains("1 out of 2 operation(s) failed.")); + }); + assertEquals(format("foo.bar can be downgraded to 1.%n" + + "Can not downgrade metadata.version to 7. Can't downgrade to newer version."), downgradeOutput); + } + + @Test + public void testHandleDisable() { + Map namespace = new HashMap<>(); + namespace.put("feature", Arrays.asList("foo.bar", "metadata.version", "quux")); + namespace.put("dry_run", false); + String disableOutput = ToolsTestUtils.captureStandardOut(() -> { + Throwable t = assertThrows(TerseException.class, () -> FeatureCommand.handleDisable(new Namespace(namespace), buildAdminClient())); + assertTrue(t.getMessage().contains("1 out of 3 operation(s) failed.")); + }); + assertEquals(format("foo.bar was disabled.%n" + + "Could not disable metadata.version. Can't downgrade below 4%n" + + "quux was disabled."), disableOutput); + } + + @Test + public void testHandleDisableDryRun() { + Map namespace = new HashMap<>(); + namespace.put("feature", Arrays.asList("foo.bar", "metadata.version", "quux")); + namespace.put("dry_run", true); + String disableOutput = ToolsTestUtils.captureStandardOut(() -> { + Throwable t = assertThrows(TerseException.class, () -> FeatureCommand.handleDisable(new Namespace(namespace), buildAdminClient())); + assertTrue(t.getMessage().contains("1 out of 3 operation(s) failed.")); + }); + assertEquals(format("foo.bar can be disabled.%n" + + "Can not disable metadata.version. Can't downgrade below 4%n" + + "quux can be disabled."), disableOutput); + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java b/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java index 709629eef6a..277d8829a88 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java +++ b/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java @@ -46,6 +46,9 @@ public class ToolsTestUtils { System.setErr(currentStream); else System.setOut(currentStream); + + currentStream.close(); + tempStream.close(); } }