KAFKA-17277: [1/2] Add version mapping command to the storage tool and feature command tool (#16973)

As a part of KIP-1022 the following has been implemented in this patch:

A version-mapping command to to look up the corresponding features for a given metadata version. Using the command with no --release-version argument will return the mapping for the latest stable metadata version.
This command has been added to the FeatureCommand Tool and the Storage Tool.
The storage tools parsing method has been made more modular similar to the feature command tool

Reviewers: Justine Olshan <jolshan@confluent.io>
This commit is contained in:
Ritika Reddy 2024-09-03 15:48:36 -07:00 committed by GitHub
parent 4d23fe92f1
commit edac19ba50
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 320 additions and 43 deletions

View File

@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import kafka.server.KafkaConfig
@ -24,11 +23,11 @@ import java.nio.file.{Files, Paths}
import kafka.utils.Logging
import net.sourceforge.argparse4j.ArgumentParsers
import net.sourceforge.argparse4j.impl.Arguments.{append, store, storeTrue}
import net.sourceforge.argparse4j.inf.{ArgumentParserException, Namespace}
import net.sourceforge.argparse4j.inf.{ArgumentParserException, Namespace, Subparser, Subparsers}
import net.sourceforge.argparse4j.internal.HelpScreenException
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.utils.{Exit, Utils}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.metadata.storage.{Formatter, FormatterException}
import org.apache.kafka.raft.DynamicVoters
@ -87,9 +86,14 @@ object StorageTool extends Logging {
runFormatCommand(namespace, config.get, printStream)
0
case "version-mapping" =>
runVersionMappingCommand(namespace, printStream)
0
case "random-uuid" =>
printStream.println(Uuid.randomUuid)
0
case _ =>
throw new RuntimeException(s"Unknown command $command")
}
@ -134,6 +138,35 @@ object StorageTool extends Logging {
formatter.run()
}
/**
* Maps the given release version to the corresponding metadata version
* and prints the corresponding features.
*
* @param namespace Arguments containing the release version.
* @param printStream The print stream to output the version mapping.
*/
def runVersionMappingCommand(
namespace: Namespace,
printStream: PrintStream
): Unit = {
val releaseVersion = Option(namespace.getString("release_version")).getOrElse(MetadataVersion.LATEST_PRODUCTION.toString)
try {
val metadataVersion = MetadataVersion.fromVersionString(releaseVersion)
val metadataVersionLevel = metadataVersion.featureLevel()
printStream.print(f"metadata.version=$metadataVersionLevel%d ($releaseVersion%s)%n")
for (feature <- Features.values()) {
val featureLevel = feature.defaultValue(metadataVersion)
printStream.print(f"${feature.featureName}%s=$featureLevel%d%n")
}
} catch {
case e: IllegalArgumentException =>
throw new TerseFailure(s"Unsupported release version '$releaseVersion'. Supported versions are: " +
s"${MetadataVersion.MINIMUM_BOOTSTRAP_VERSION.version} to ${MetadataVersion.LATEST_PRODUCTION.version}")
}
}
def createStandaloneDynamicVoters(
config: KafkaConfig
): DynamicVoters = {
@ -153,50 +186,90 @@ object StorageTool extends Logging {
}
def parseArguments(args: Array[String]): Namespace = {
val parser = ArgumentParsers.
newArgumentParser("kafka-storage", /* defaultHelp */ true, /* prefixChars */ "-", /* fromFilePrefix */ "@").
description("The Kafka storage tool.")
val parser = ArgumentParsers
.newArgumentParser("kafka-storage", /* defaultHelp */true, /* prefixChars */"-", /* fromFilePrefix */ "@")
.description("The Kafka storage tool.")
val subparsers = parser.addSubparsers().dest("command")
val infoParser = subparsers.addParser("info").
help("Get information about the Kafka log directories on this node.")
val formatParser = subparsers.addParser("format").
help("Format the Kafka log directories on this node.")
subparsers.addParser("random-uuid").help("Print a random UUID.")
List(infoParser, formatParser).foreach(parser => {
parser.addArgument("--config", "-c").
action(store()).
required(true).
help("The Kafka configuration file to use.")
})
formatParser.addArgument("--cluster-id", "-t").
action(store()).
required(true).
help("The cluster ID to use.")
formatParser.addArgument("--add-scram", "-S").
action(append()).
help("""A SCRAM_CREDENTIAL to add to the __cluster_metadata log e.g.
addInfoParser(subparsers)
addFormatParser(subparsers)
addVersionMappingParser(subparsers)
addRandomUuidParser(subparsers)
parser.parseArgs(args)
}
private def addInfoParser(subparsers: Subparsers): Unit = {
val infoParser = subparsers.addParser("info")
.help("Get information about the Kafka log directories on this node.")
addConfigArguments(infoParser)
}
private def addFormatParser(subparsers: Subparsers): Unit = {
val formatParser = subparsers.addParser("format")
.help("Format the Kafka log directories on this node.")
addConfigArguments(formatParser)
formatParser.addArgument("--cluster-id", "-t")
.action(store())
.required(true)
.help("The cluster ID to use.")
formatParser.addArgument("--add-scram", "-S")
.action(append())
.help("""A SCRAM_CREDENTIAL to add to the __cluster_metadata log e.g.
|'SCRAM-SHA-256=[name=alice,password=alice-secret]'
|'SCRAM-SHA-512=[name=alice,iterations=8192,salt="N3E=",saltedpassword="YCE="]'""".stripMargin)
formatParser.addArgument("--ignore-formatted", "-g").
action(storeTrue())
formatParser.addArgument("--release-version", "-r").
action(store()).
help(s"The release version to use for the initial feature settings. The minimum is " +
formatParser.addArgument("--ignore-formatted", "-g")
.action(storeTrue())
formatParser.addArgument("--release-version", "-r")
.action(store())
.help(s"The release version to use for the initial feature settings. The minimum is " +
s"${MetadataVersion.IBP_3_0_IV0}; the default is ${MetadataVersion.LATEST_PRODUCTION}")
formatParser.addArgument("--feature", "-f").
help("The setting to use for a specific feature, in feature=level format. For example: `kraft.version=1`.").
action(append())
formatParser.addArgument("--feature", "-f")
.help("The setting to use for a specific feature, in feature=level format. For example: `kraft.version=1`.")
.action(append())
val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup()
reconfigurableQuorumOptions.addArgument("--standalone", "-s").
help("Used to initialize a single-node quorum controller quorum.").
action(storeTrue())
reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I").
help("The initial controllers, as a comma-separated list of id@hostname:port:directory. The same values must be used to format all nodes. For example:\n" +
"0@example.com:8082:JEXY6aqzQY-32P5TStzaFg,1@example.com:8083:MvDxzVmcRsaTz33bUuRU6A,2@example.com:8084:07R5amHmR32VDA6jHkGbTA\n").
action(store())
parser.parseArgs(args)
reconfigurableQuorumOptions.addArgument("--standalone", "-s")
.help("Used to initialize a single-node quorum controller quorum.")
.action(storeTrue())
reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I")
.help("The initial controllers, as a comma-separated list of id@hostname:port:directory. The same values must be used to format all nodes. For example:\n" +
"0@example.com:8082:JEXY6aqzQY-32P5TStzaFg,1@example.com:8083:MvDxzVmcRsaTz33bUuRU6A,2@example.com:8084:07R5amHmR32VDA6jHkGbTA\n")
.action(store())
}
private def addVersionMappingParser(subparsers: Subparsers): Unit = {
val versionMappingParser = subparsers.addParser("version-mapping")
.help("Look up the corresponding features for a given metadata version. " +
"Using the command with no --release-version argument will return the mapping for " +
"the latest stable metadata version"
)
versionMappingParser.addArgument("--release-version", "-r")
.action(store())
.help(s"The release version to use for the corresponding feature mapping. The minimum is " +
s"${MetadataVersion.IBP_3_0_IV0}; the default is ${MetadataVersion.LATEST_PRODUCTION}")
}
private def addRandomUuidParser(subparsers: Subparsers): Unit = {
subparsers.addParser("random-uuid")
.help("Print a random UUID.")
}
private def addConfigArguments(parser: Subparser): Unit = {
parser.addArgument("--config", "-c")
.action(store())
.required(true)
.help("The Kafka configuration file to use.")
}
def configToLogDirectories(config: KafkaConfig): Seq[String] = {

View File

@ -26,7 +26,7 @@ import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import net.sourceforge.argparse4j.inf.ArgumentParserException
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.Features
import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.metadata.properties.{MetaPropertiesEnsemble, PropertiesUtils}
import org.apache.kafka.metadata.storage.FormatterException
import org.apache.kafka.raft.QuorumConfig
@ -433,5 +433,97 @@ Found problem:
contains("Formatting dynamic metadata voter directory %s".format(availableDirs.head)),
"Failed to find content in output: " + stream.toString())
}
}
private def runVersionMappingCommand(
stream: ByteArrayOutputStream,
releaseVersion: String
): Int = {
val tempDir = TestUtils.tempDir()
try {
// Prepare the arguments list
val arguments = ListBuffer[String]("version-mapping")
// Add the release version argument
if (releaseVersion != null) {
arguments += "--release-version"
arguments += releaseVersion
}
// Execute the StorageTool with the arguments
StorageTool.execute(arguments.toArray, new PrintStream(stream))
} finally {
Utils.delete(tempDir)
}
}
@Test
def testVersionMappingWithValidReleaseVersion(): Unit = {
val stream = new ByteArrayOutputStream()
// Test with a valid release version
assertEquals(0, runVersionMappingCommand(stream, "3.3-IV3"))
val output = stream.toString()
val metadataVersion = MetadataVersion.IBP_3_3_IV3
// Check that the metadata version is correctly included in the output
assertTrue(output.contains(s"metadata.version=${metadataVersion.featureLevel()} (${metadataVersion.version()})"),
s"Output did not contain expected Metadata Version: $output"
)
for (feature <- Features.values()) {
val featureLevel = feature.defaultValue(metadataVersion)
assertTrue(output.contains(s"${feature.featureName()}=$featureLevel"),
s"Output did not contain expected feature mapping: $output"
)
}
}
@Test
def testVersionMappingWithNoReleaseVersion(): Unit = {
val properties = new Properties()
properties.putAll(defaultStaticQuorumProperties)
val stream = new ByteArrayOutputStream()
assertEquals(0, runVersionMappingCommand(stream, null))
val output = stream.toString
val metadataVersion = MetadataVersion.latestProduction()
// Check that the metadata version is correctly included in the output
assertTrue(output.contains(s"metadata.version=${metadataVersion.featureLevel()} (${metadataVersion.version()})"),
s"Output did not contain expected Metadata Version: $output"
)
for (feature <- Features.values()) {
val featureLevel = feature.defaultValue(metadataVersion)
assertTrue(output.contains(s"${feature.featureName()}=$featureLevel"),
s"Output did not contain expected feature mapping: $output"
)
}
}
@Test
def testVersionMappingWithInvalidReleaseVersion(): Unit = {
val properties = new Properties()
properties.putAll(defaultStaticQuorumProperties)
val stream = new ByteArrayOutputStream()
// Test with an invalid release version
val exception = assertThrows(classOf[TerseFailure], () => {
runVersionMappingCommand(stream, "2.9-IV2")
})
assertEquals("Unsupported release version '2.9-IV2'." +
" Supported versions are: " + MetadataVersion.MINIMUM_BOOTSTRAP_VERSION.version +
" to " + MetadataVersion.LATEST_PRODUCTION.version, exception.getMessage
)
val exception2 = assertThrows(classOf[TerseFailure], () => {
runVersionMappingCommand(stream, "invalid")
})
assertEquals("Unsupported release version 'invalid'." +
" Supported versions are: " + MetadataVersion.MINIMUM_BOOTSTRAP_VERSION.version +
" to " + MetadataVersion.LATEST_PRODUCTION.version, exception2.getMessage
)
}
}

View File

@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import org.apache.kafka.clients.admin.Admin;
@ -25,6 +24,7 @@ import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
import org.apache.kafka.clients.admin.UpdateFeaturesResult;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.util.CommandLineUtils;
@ -90,6 +90,7 @@ public class FeatureCommand {
addUpgradeParser(subparsers);
addDowngradeParser(subparsers);
addDisableParser(subparsers);
addVersionMappingParser(subparsers);
Namespace namespace = parser.parseArgsOrFail(args);
String command = namespace.getString("command");
@ -114,6 +115,9 @@ public class FeatureCommand {
case "disable":
handleDisable(namespace, adminClient);
break;
case "version-mapping":
handleVersionMapping(namespace);
break;
default:
throw new TerseException("Unknown command " + command);
}
@ -171,6 +175,18 @@ public class FeatureCommand {
.action(storeTrue());
}
private static void addVersionMappingParser(Subparsers subparsers) {
Subparser versionMappingParser = subparsers.addParser("version-mapping")
.help("Look up the corresponding features for a given metadata version. " +
"Using the command with no --release-version argument will return the mapping for " +
"the latest stable metadata version"
);
versionMappingParser.addArgument("--release-version")
.help("The release version to use for the corresponding feature mapping. The minimum is " +
MetadataVersion.IBP_3_0_IV0 + "; the default is " + MetadataVersion.LATEST_PRODUCTION)
.action(store());
}
static String levelToString(String feature, short level) {
if (feature.equals(MetadataVersion.FEATURE_NAME)) {
try {
@ -282,6 +298,28 @@ public class FeatureCommand {
update("disable", adminClient, updates, namespace.getBoolean("dry_run"));
}
static void handleVersionMapping(Namespace namespace) throws TerseException {
// Get the release version from the command-line arguments or default to the latest stable version
String releaseVersion = Optional.ofNullable(namespace.getString("release_version"))
.orElseGet(() -> MetadataVersion.latestProduction().version());
try {
MetadataVersion version = MetadataVersion.fromVersionString(releaseVersion);
short metadataVersionLevel = version.featureLevel();
System.out.printf("metadata.version=%d (%s)%n", metadataVersionLevel, releaseVersion);
for (Features feature : Features.values()) {
short featureLevel = feature.defaultValue(version);
System.out.printf("%s=%d%n", feature.featureName(), featureLevel);
}
} catch (IllegalArgumentException e) {
throw new TerseException("Unsupported release version '" + releaseVersion + "'." +
" Supported versions are: " + MetadataVersion.MINIMUM_BOOTSTRAP_VERSION +
" to " + MetadataVersion.LATEST_PRODUCTION);
}
}
private static void update(String op, Admin admin, Map<String, FeatureUpdate> updates, Boolean dryRun) throws TerseException {
if (updates.isEmpty()) {
throw new TerseException("You must specify at least one feature to " + op);

View File

@ -22,6 +22,7 @@ import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.MetadataVersion;
import net.sourceforge.argparse4j.inf.Namespace;
@ -326,4 +327,77 @@ public class FeatureCommandTest {
"Can not disable metadata.version. Can't downgrade below 4%n" +
"quux can be disabled."), disableOutput);
}
@Test
public void testHandleVersionMappingWithValidReleaseVersion() {
Map<String, Object> namespace = new HashMap<>();
namespace.put("release_version", "3.3-IV3");
String versionMappingOutput = ToolsTestUtils.captureStandardOut(() -> {
try {
FeatureCommand.handleVersionMapping(new Namespace(namespace));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
MetadataVersion metadataVersion = MetadataVersion.IBP_3_3_IV3;
// Check that the metadata version is correctly included in the output
assertTrue(versionMappingOutput.contains("metadata.version=" + metadataVersion.featureLevel() + " (" + metadataVersion.version() + ")"),
"Output did not contain expected Metadata Version: " + versionMappingOutput);
for (Features feature : Features.values()) {
int featureLevel = feature.defaultValue(metadataVersion);
assertTrue(versionMappingOutput.contains(feature.featureName() + "=" + featureLevel),
"Output did not contain expected feature mapping: " + versionMappingOutput);
}
}
@Test
public void testHandleVersionMappingWithNoReleaseVersion() {
Map<String, Object> namespace = new HashMap<>();
String versionMappingOutput = ToolsTestUtils.captureStandardOut(() -> {
try {
FeatureCommand.handleVersionMapping(new Namespace(namespace));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
MetadataVersion metadataVersion = MetadataVersion.latestProduction();
// Check that the metadata version is correctly included in the output
assertTrue(versionMappingOutput.contains("metadata.version=" + metadataVersion.featureLevel() + " (" + metadataVersion.version() + ")"),
"Output did not contain expected Metadata Version: " + versionMappingOutput);
for (Features feature : Features.values()) {
int featureLevel = feature.defaultValue(metadataVersion);
assertTrue(versionMappingOutput.contains(feature.featureName() + "=" + featureLevel),
"Output did not contain expected feature mapping: " + versionMappingOutput);
}
}
@Test
public void testHandleVersionMappingWithInvalidReleaseVersion() {
Map<String, Object> namespace = new HashMap<>();
namespace.put("release_version", "2.9-IV2");
TerseException exception1 = assertThrows(TerseException.class, () ->
FeatureCommand.handleVersionMapping(new Namespace(namespace))
);
assertEquals("Unsupported release version '2.9-IV2'." +
" Supported versions are: " + MetadataVersion.MINIMUM_BOOTSTRAP_VERSION +
" to " + MetadataVersion.LATEST_PRODUCTION, exception1.getMessage());
namespace.put("release_version", "invalid");
TerseException exception2 = assertThrows(TerseException.class, () ->
FeatureCommand.handleVersionMapping(new Namespace(namespace))
);
assertEquals("Unsupported release version 'invalid'." +
" Supported versions are: " + MetadataVersion.MINIMUM_BOOTSTRAP_VERSION +
" to " + MetadataVersion.LATEST_PRODUCTION, exception2.getMessage());
}
}