KAFKA-19719 --no-initial-controllers should not assume kraft.version=1 (#20624)

backport KAFKA-19719 to 4.0

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Kevin Wu 2025-10-07 09:11:41 -05:00 committed by GitHub
parent d13c1f652d
commit 6423c2c202
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 149 additions and 133 deletions

View File

@ -128,22 +128,31 @@ object StorageTool extends Logging {
featureNamesAndLevels(_).foreach { featureNamesAndLevels(_).foreach {
kv => formatter.setFeatureLevel(kv._1, kv._2) kv => formatter.setFeatureLevel(kv._1, kv._2)
}) })
Option(namespace.getString("initial_controllers")). val initialControllers = namespace.getString("initial_controllers")
val isStandalone = namespace.getBoolean("standalone")
val staticVotersEmpty = config.quorumVoters.isEmpty
formatter.setHasDynamicQuorum(staticVotersEmpty)
if (!staticVotersEmpty && (Option(initialControllers).isDefined || isStandalone)) {
throw new TerseFailure("You cannot specify " +
QuorumConfig.QUORUM_VOTERS_CONFIG + " and format the node " +
"with --initial-controllers or --standalone. " +
"If you want to use dynamic quorum, please remove " +
QuorumConfig.QUORUM_VOTERS_CONFIG + " and specify " +
QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG + " instead.")
}
Option(initialControllers).
foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v))) foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v)))
if (namespace.getBoolean("standalone")) { if (isStandalone) {
formatter.setInitialControllers(createStandaloneDynamicVoters(config)) formatter.setInitialControllers(createStandaloneDynamicVoters(config))
} }
if (namespace.getBoolean("no_initial_controllers")) { if (!namespace.getBoolean("no_initial_controllers") &&
formatter.setNoInitialControllersFlag(true) config.processRoles.contains(ProcessRole.ControllerRole) &&
} else { staticVotersEmpty &&
if (config.processRoles.contains(ProcessRole.ControllerRole)) { !formatter.initialVoters().isPresent) {
if (config.quorumVoters.isEmpty() && !formatter.initialVoters().isPresent()) {
throw new TerseFailure("Because " + QuorumConfig.QUORUM_VOTERS_CONFIG + throw new TerseFailure("Because " + QuorumConfig.QUORUM_VOTERS_CONFIG +
" is not set on this controller, you must specify one of the following: " + " is not set on this controller, you must specify one of the following: " +
"--standalone, --initial-controllers, or --no-initial-controllers."); "--standalone, --initial-controllers, or --no-initial-controllers.");
} }
}
}
Option(namespace.getList("add_scram")). Option(namespace.getList("add_scram")).
foreach(scramArgs => formatter.setScramArguments(scramArgs.asInstanceOf[util.List[String]])) foreach(scramArgs => formatter.setScramArguments(scramArgs.asInstanceOf[util.List[String]]))
configToLogDirectories(config).foreach(formatter.addDirectory(_)) configToLogDirectories(config).foreach(formatter.addDirectory(_))
@ -206,18 +215,21 @@ object StorageTool extends Logging {
action(append()) action(append())
val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup() val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup()
reconfigurableQuorumOptions.addArgument("--standalone", "-s") reconfigurableQuorumOptions.addArgument("--standalone", "-s")
.help("Used to initialize a controller as a single-node dynamic quorum.") .help("Used to initialize a controller as a single-node dynamic quorum. When setting this flag, " +
"the controller.quorum.voters config must not be set, and controller.quorum.bootstrap.servers is set instead.")
.action(storeTrue()) .action(storeTrue())
reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N") reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N")
.help("Used to initialize a server without a dynamic quorum topology.") .help("Used to initialize a server without specifying a dynamic quorum. When setting this flag, " +
"the controller.quorum.voters config should not be set, and controller.quorum.bootstrap.servers is set instead.")
.action(storeTrue()) .action(storeTrue())
reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I") reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I")
.help("Used to initialize a server with a specific dynamic quorum topology. The argument " + .help("Used to initialize a server with the specified dynamic quorum. The argument " +
"is a comma-separated list of id@hostname:port:directory. The same values must be used to " + "is a comma-separated list of id@hostname:port:directory. The same values must be used to " +
"format all nodes. For example:\n0@example.com:8082:JEXY6aqzQY-32P5TStzaFg,1@example.com:8083:" + "format all nodes. For example:\n0@example.com:8082:JEXY6aqzQY-32P5TStzaFg,1@example.com:8083:" +
"MvDxzVmcRsaTz33bUuRU6A,2@example.com:8084:07R5amHmR32VDA6jHkGbTA\n") "MvDxzVmcRsaTz33bUuRU6A,2@example.com:8084:07R5amHmR32VDA6jHkGbTA\n. When setting this flag, " +
"the controller.quorum.voters config must not be set, and controller.quorum.bootstrap.servers is set instead.")
.action(store()) .action(store())
parser.parseArgs(args) parser.parseArgs(args)
} }

View File

@ -400,7 +400,10 @@ Found problem:
def testFormatWithStandaloneFlagOnBrokerFails(): Unit = { def testFormatWithStandaloneFlagOnBrokerFails(): Unit = {
val availableDirs = Seq(TestUtils.tempDir()) val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties() val properties = new Properties()
properties.putAll(defaultStaticQuorumProperties) properties.setProperty("process.roles", "broker")
properties.setProperty("node.id", "0")
properties.setProperty("controller.listener.names", "CONTROLLER")
properties.setProperty("controller.quorum.bootstrap.servers", "localhost:9093")
properties.setProperty("log.dirs", availableDirs.mkString(",")) properties.setProperty("log.dirs", availableDirs.mkString(","))
val stream = new ByteArrayOutputStream() val stream = new ByteArrayOutputStream()
val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--standalone") val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--standalone")
@ -483,19 +486,14 @@ Found problem:
Seq("--release-version", "3.9-IV0"))).getMessage) Seq("--release-version", "3.9-IV0"))).getMessage)
} }
@ParameterizedTest @Test
@ValueSource(booleans = Array(false, true)) def testFormatWithNoInitialControllersSucceedsOnController(): Unit = {
def testFormatWithNoInitialControllersSucceedsOnController(setKraftVersionFeature: Boolean): Unit = {
val availableDirs = Seq(TestUtils.tempDir()) val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties() val properties = new Properties()
properties.putAll(defaultDynamicQuorumProperties) properties.putAll(defaultDynamicQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(",")) properties.setProperty("log.dirs", availableDirs.mkString(","))
val stream = new ByteArrayOutputStream() val stream = new ByteArrayOutputStream()
val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--no-initial-controllers") val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--no-initial-controllers")
if (setKraftVersionFeature) {
arguments += "--feature"
arguments += "kraft.version=1"
}
assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq)) assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq))
assertTrue(stream.toString(). assertTrue(stream.toString().
contains("Formatting metadata directory %s".format(availableDirs.head)), contains("Formatting metadata directory %s".format(availableDirs.head)),

View File

@ -3843,42 +3843,26 @@ In the replica description 0@controller-0:1234:3Db5QLSqSZieL3rJBUUegA, 0 is the
If you are not sure whether you are using static or dynamic quorums, you can determine this by If you are not sure whether you are using static or dynamic quorums, you can determine this by
running something like the following:<p> running something like the following:<p>
<pre><code class="language-bash"> <pre><code class="language-bash">$ bin/kafka-features.sh --bootstrap-controller localhost:9093 describe</code></pre>
$ bin/kafka-features.sh --bootstrap-controller localhost:9093 describe <p>
</code></pre><p>
If the <code>kraft.version</code> field is level 0 or absent, you are using a static quorum. If If the <code>kraft.version</code> field is level 0 or absent, you are using a static quorum. If
it is 1 or above, you are using a dynamic quorum. For example, here is an example of a static it is 1 or above, you are using a dynamic quorum. For example, here is an example of a static
quorum:<p/> quorum:<p>
<pre><code class="language-bash"> <pre><code class="language-bash">Feature: kraft.version SupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 0 Epoch: 5
Feature: kraft.version SupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 0 Epoch: 5 Feature: metadata.version SupportedMinVersion: 3.3-IV3 SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0 Epoch: 5</code></pre>
Feature: metadata.version SupportedMinVersion: 3.0-IV1 SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0 Epoch: 5 <p>
</code></pre><p/> Here is another example of a static quorum:<p>
<pre><code class="language-bash">Feature: metadata.version SupportedMinVersion: 3.3-IV3 SupportedMaxVersion: 3.8-IV0 FinalizedVersionLevel: 3.8-IV0 Epoch: 5</code></pre>
Here is another example of a static quorum:<p/> <p>
<pre><code class="language-bash"> Here is an example of a dynamic quorum:<p>
Feature: metadata.version SupportedMinVersion: 3.0-IV1 SupportedMaxVersion: 3.8-IV0 FinalizedVersionLevel: 3.8-IV0 Epoch: 5 <pre><code class="language-bash">Feature: kraft.version SupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 1 Epoch: 5
</code></pre><p/> Feature: metadata.version SupportedMinVersion: 3.3-IV3 SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0 Epoch: 5</code></pre>
<p>
Here is an example of a dynamic quorum:<p/>
<pre><code class="language-bash">
Feature: kraft.version SupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 1 Epoch: 5
Feature: metadata.version SupportedMinVersion: 3.0-IV1 SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0 Epoch: 5
</code></pre><p/>
The static versus dynamic nature of the quorum is determined at the time of formatting. The static versus dynamic nature of the quorum is determined at the time of formatting.
Specifically, the quorum will be formatted as dynamic if <code>controller.quorum.voters</code> is Specifically, the quorum will be formatted as dynamic if <code>controller.quorum.voters</code> is
<b>not</b> present, and if the software version is Apache Kafka 3.9 or newer. If you have <b>not</b> present, and one of --standalone, --initial-controllers, or --no-initial-controllers is set.
followed the instructions earlier in this document, you will get a dynamic quorum.<p> If you have followed the instructions earlier in this document, you will get a dynamic quorum.
<p>
If you would like the formatting process to fail if a dynamic quorum cannot be achieved, format your
controllers using the <code>--feature kraft.version=1</code>. (Note that you should not supply
this flag when formatting brokers -- only when formatting controllers.)<p>
<pre><code class="language-bash">
$ bin/kafka-storage.sh format -t KAFKA_CLUSTER_ID --feature kraft.version=1 -c controller_static.properties
Cannot set kraft.version to 1 unless KIP-853 configuration is present. Try removing the --feature flag for kraft.version.
</code></pre><p>
Note: Currently it is <b>not</b> possible to convert clusters using a static controller quorum to Note: Currently it is <b>not</b> possible to convert clusters using a static controller quorum to
use a dynamic controller quorum. This function will be supported in the future release. use a dynamic controller quorum. This function will be supported in the future release.

View File

@ -133,7 +133,7 @@ public class Formatter {
* The initial KIP-853 voters. * The initial KIP-853 voters.
*/ */
private Optional<DynamicVoters> initialControllers = Optional.empty(); private Optional<DynamicVoters> initialControllers = Optional.empty();
private boolean noInitialControllersFlag = false; private boolean hasDynamicQuorum = false;
public Formatter setPrintStream(PrintStream printStream) { public Formatter setPrintStream(PrintStream printStream) {
this.printStream = printStream; this.printStream = printStream;
@ -210,8 +210,8 @@ public class Formatter {
return this; return this;
} }
public Formatter setNoInitialControllersFlag(boolean noInitialControllersFlag) { public Formatter setHasDynamicQuorum(boolean hasDynamicQuorum) {
this.noInitialControllersFlag = noInitialControllersFlag; this.hasDynamicQuorum = hasDynamicQuorum;
return this; return this;
} }
@ -220,7 +220,7 @@ public class Formatter {
} }
boolean hasDynamicQuorum() { boolean hasDynamicQuorum() {
return initialControllers.isPresent() || noInitialControllersFlag; return hasDynamicQuorum;
} }
public BootstrapMetadata bootstrapMetadata() { public BootstrapMetadata bootstrapMetadata() {
@ -336,8 +336,8 @@ public class Formatter {
/** /**
* Calculate the effective feature level for kraft.version. In order to keep existing * Calculate the effective feature level for kraft.version. In order to keep existing
* command-line invocations of StorageTool working, we default this to 0 if no dynamic * command-line invocations of StorageTool working, we default this to 0 if no dynamic
* voter quorum arguments were provided. As a convenience, if dynamic voter quorum arguments * voter quorum arguments were provided. As a convenience, if the static voters config is
* were passed, we set the latest kraft.version. (Currently there is only 1 non-zero version). * empty, we set the latest kraft.version. (Currently there is only 1 non-zero version).
* *
* @param configuredKRaftVersionLevel The configured level for kraft.version * @param configuredKRaftVersionLevel The configured level for kraft.version
* @return The effective feature level. * @return The effective feature level.
@ -346,15 +346,21 @@ public class Formatter {
if (configuredKRaftVersionLevel.isPresent()) { if (configuredKRaftVersionLevel.isPresent()) {
if (configuredKRaftVersionLevel.get() == 0) { if (configuredKRaftVersionLevel.get() == 0) {
if (hasDynamicQuorum()) { if (hasDynamicQuorum()) {
throw new FormatterException("Cannot set kraft.version to " + throw new FormatterException(
configuredKRaftVersionLevel.get() + " if KIP-853 configuration is present. " + "Cannot set kraft.version to 0 if controller.quorum.voters is empty and one of the flags " +
"Try removing the --feature flag for kraft.version."); "--standalone, --initial-controllers, or --no-initial-controllers is used. For dynamic " +
"controllers support, try removing the --feature flag for kraft.version."
);
} }
} else { } else {
if (!hasDynamicQuorum()) { if (!hasDynamicQuorum()) {
throw new FormatterException("Cannot set kraft.version to " + throw new FormatterException(
configuredKRaftVersionLevel.get() + " unless KIP-853 configuration is present. " + "Cannot set kraft.version to " + configuredKRaftVersionLevel.get() +
"Try removing the --feature flag for kraft.version."); " unless controller.quorum.voters is empty and one of the flags --standalone, " +
"--initial-controllers, or --no-initial-controllers is used. " +
"For dynamic controllers support, try using one of --standalone, --initial-controllers, " +
"or --no-initial-controllers and removing controller.quorum.voters."
);
} }
} }
return configuredKRaftVersionLevel.get(); return configuredKRaftVersionLevel.get();

View File

@ -30,6 +30,7 @@ import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
import org.apache.kafka.raft.DynamicVoters; import org.apache.kafka.raft.DynamicVoters;
import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.Features; import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.TestFeatureVersion; import org.apache.kafka.server.common.TestFeatureVersion;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
@ -187,6 +188,40 @@ public class FormatterTest {
} }
} }
@Test
public void testStandaloneWithIgnoreFormatted() throws Exception {
try (TestEnv testEnv = new TestEnv(1)) {
FormatterContext formatter1 = testEnv.newFormatter();
String originalDirectoryId = Uuid.randomUuid().toString();
String newDirectoryId = Uuid.randomUuid().toString();
formatter1.formatter
.setInitialControllers(DynamicVoters.parse("1@localhost:8020:" + originalDirectoryId))
.setHasDynamicQuorum(true)
.run();
assertEquals("Formatting dynamic metadata voter directory " + testEnv.directory(0) +
" with metadata.version " + MetadataVersion.latestProduction() + ".",
formatter1.output().trim());
assertMetadataDirectoryId(testEnv, Uuid.fromString(originalDirectoryId));
FormatterContext formatter2 = testEnv.newFormatter();
formatter2.formatter
.setIgnoreFormatted(true)
.setInitialControllers(DynamicVoters.parse("1@localhost:8020:" + newDirectoryId))
.run();
assertEquals("All of the log directories are already formatted.",
formatter2.output().trim());
assertMetadataDirectoryId(testEnv, Uuid.fromString(originalDirectoryId));
}
}
private void assertMetadataDirectoryId(TestEnv testEnv, Uuid expectedDirectoryId) throws Exception {
MetaPropertiesEnsemble ensemble = new MetaPropertiesEnsemble.Loader().
addLogDirs(testEnv.directories).
load();
MetaProperties logDirProps0 = ensemble.logDirProps().get(testEnv.directory(0));
assertEquals(expectedDirectoryId, logDirProps0.directoryId().get());
}
@Test @Test
public void testOneDirectoryFormattedAndOthersNotFormatted() throws Exception { public void testOneDirectoryFormattedAndOthersNotFormatted() throws Exception {
try (TestEnv testEnv = new TestEnv(2)) { try (TestEnv testEnv = new TestEnv(2)) {
@ -369,13 +404,14 @@ public class FormatterTest {
try (TestEnv testEnv = new TestEnv(2)) { try (TestEnv testEnv = new TestEnv(2)) {
FormatterContext formatter1 = testEnv.newFormatter(); FormatterContext formatter1 = testEnv.newFormatter();
if (specifyKRaftVersion) { if (specifyKRaftVersion) {
formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 1);
} }
formatter1.formatter.setUnstableFeatureVersionsEnabled(true); formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
formatter1.formatter.setInitialControllers(DynamicVoters. formatter1.formatter.setInitialControllers(DynamicVoters.
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
formatter1.formatter.setHasDynamicQuorum(true);
formatter1.formatter.run(); formatter1.formatter.run();
assertEquals((short) 1, formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0)); assertEquals((short) 1, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
assertEquals(Arrays.asList( assertEquals(Arrays.asList(
String.format("Formatting data directory %s with %s %s.", String.format("Formatting data directory %s with %s %s.",
testEnv.directory(1), testEnv.directory(1),
@ -402,62 +438,77 @@ public class FormatterTest {
public void testFormatWithInitialVotersFailsWithOlderKraftVersion() throws Exception { public void testFormatWithInitialVotersFailsWithOlderKraftVersion() throws Exception {
try (TestEnv testEnv = new TestEnv(2)) { try (TestEnv testEnv = new TestEnv(2)) {
FormatterContext formatter1 = testEnv.newFormatter(); FormatterContext formatter1 = testEnv.newFormatter();
formatter1.formatter.setFeatureLevel("kraft.version", (short) 0); formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 0);
formatter1.formatter.setUnstableFeatureVersionsEnabled(true); formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
formatter1.formatter.setInitialControllers(DynamicVoters. formatter1.formatter.setInitialControllers(DynamicVoters.
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
formatter1.formatter.setHasDynamicQuorum(true);
assertTrue(formatter1.formatter.hasDynamicQuorum()); assertTrue(formatter1.formatter.hasDynamicQuorum());
assertEquals("Cannot set kraft.version to 0 if KIP-853 configuration is present. " + assertEquals(
"Try removing the --feature flag for kraft.version.", "Cannot set kraft.version to 0 if controller.quorum.voters is empty " +
assertThrows(FormatterException.class, "and one of the flags --standalone, --initial-controllers, or --no-initial-controllers is used. " +
() -> formatter1.formatter.run()).getMessage()); "For dynamic controllers support, try removing the --feature flag for kraft.version.",
assertThrows(FormatterException.class, formatter1.formatter::run).getMessage()
);
} }
} }
@Test @Test
public void testFormatWithoutInitialVotersFailsWithNewerKraftVersion() throws Exception { public void testFormatWithStaticQuorumFailsWithNewerKraftVersion() throws Exception {
try (TestEnv testEnv = new TestEnv(2)) { try (TestEnv testEnv = new TestEnv(2)) {
FormatterContext formatter1 = testEnv.newFormatter(); FormatterContext formatter1 = testEnv.newFormatter();
formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 1);
formatter1.formatter.setUnstableFeatureVersionsEnabled(true); formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
assertFalse(formatter1.formatter.hasDynamicQuorum()); assertFalse(formatter1.formatter.hasDynamicQuorum());
assertEquals("Cannot set kraft.version to 1 unless KIP-853 configuration is present. " + assertEquals(
"Try removing the --feature flag for kraft.version.", "Cannot set kraft.version to 1 unless controller.quorum.voters is empty and " +
assertThrows(FormatterException.class, "one of the flags --standalone, --initial-controllers, or --no-initial-controllers is used. " +
() -> formatter1.formatter.run()).getMessage()); "For dynamic controllers support, try using one of --standalone, --initial-controllers, " +
"or --no-initial-controllers and removing controller.quorum.voters.",
assertThrows(FormatterException.class, formatter1.formatter::run).getMessage()
);
} }
} }
@Test @Test
public void testFormatWithInitialVotersFailsWithOlderMetadataVersion() throws Exception { public void testFormatWithInitialVotersWithOlderMetadataVersion() throws Exception {
try (TestEnv testEnv = new TestEnv(2)) { try (TestEnv testEnv = new TestEnv(2)) {
FormatterContext formatter1 = testEnv.newFormatter(); FormatterContext formatter1 = testEnv.newFormatter();
formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0); formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0);
formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 1);
formatter1.formatter.setInitialControllers(DynamicVoters. formatter1.formatter.setInitialControllers(DynamicVoters.
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
formatter1.formatter.setUnstableFeatureVersionsEnabled(true); formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
assertEquals("kraft.version could not be set to 1 because it depends on " + formatter1.formatter.setHasDynamicQuorum(true);
"metadata.version level 21", formatter1.formatter.run();
assertThrows(IllegalArgumentException.class, assertEquals((short) 1, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
() -> formatter1.formatter.run()).getMessage());
} }
} }
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {false, true}) @ValueSource(booleans = {false, true})
public void testFormatWithNoInitialControllers(boolean specifyKRaftVersion) throws Exception { public void testFormatWithNoInitialControllersWithOlderMetadataVersion(boolean hasDynamicQuorum) throws Exception {
try (TestEnv testEnv = new TestEnv(2)) { try (TestEnv testEnv = new TestEnv(2)) {
FormatterContext formatter1 = testEnv.newFormatter(); FormatterContext formatter1 = testEnv.newFormatter();
if (specifyKRaftVersion) { formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0);
formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); formatter1.formatter.setHasDynamicQuorum(hasDynamicQuorum);
}
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
formatter1.formatter.setNoInitialControllersFlag(true);
assertTrue(formatter1.formatter.hasDynamicQuorum());
formatter1.formatter.run(); formatter1.formatter.run();
assertEquals((short) 1, formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0)); if (hasDynamicQuorum) {
assertEquals((short) 1, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
} else {
assertEquals((short) 0, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
}
}
}
@Test
public void testFormatWithNoInitialControllers() throws Exception {
try (TestEnv testEnv = new TestEnv(2)) {
FormatterContext formatter1 = testEnv.newFormatter();
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
assertFalse(formatter1.formatter.hasDynamicQuorum());
formatter1.formatter.run();
assertEquals((short) 0, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
assertEquals(Arrays.asList( assertEquals(Arrays.asList(
String.format("Formatting data directory %s with %s %s.", String.format("Formatting data directory %s with %s %s.",
testEnv.directory(1), testEnv.directory(1),
@ -477,34 +528,4 @@ public class FormatterTest {
assertNotNull(logDirProps1); assertNotNull(logDirProps1);
} }
} }
@Test
public void testFormatWithoutNoInitialControllersFailsWithNewerKraftVersion() throws Exception {
try (TestEnv testEnv = new TestEnv(2)) {
FormatterContext formatter1 = testEnv.newFormatter();
formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
formatter1.formatter.setNoInitialControllersFlag(false);
assertFalse(formatter1.formatter.hasDynamicQuorum());
assertEquals("Cannot set kraft.version to 1 unless KIP-853 configuration is present. " +
"Try removing the --feature flag for kraft.version.",
assertThrows(FormatterException.class,
formatter1.formatter::run).getMessage());
}
}
@Test
public void testFormatWithNoInitialControllersFailsWithOlderKraftVersion() throws Exception {
try (TestEnv testEnv = new TestEnv(2)) {
FormatterContext formatter1 = testEnv.newFormatter();
formatter1.formatter.setFeatureLevel("kraft.version", (short) 0);
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
formatter1.formatter.setNoInitialControllersFlag(true);
assertTrue(formatter1.formatter.hasDynamicQuorum());
assertEquals("Cannot set kraft.version to 0 if KIP-853 configuration is present. " +
"Try removing the --feature flag for kraft.version.",
assertThrows(FormatterException.class,
formatter1.formatter::run).getMessage());
}
}
} }

View File

@ -71,12 +71,7 @@ public enum KRaftVersion implements FeatureVersion {
@Override @Override
public Map<String, Short> dependencies() { public Map<String, Short> dependencies() {
if (this.featureLevel == 0) {
return Collections.emptyMap(); return Collections.emptyMap();
} else {
return Collections.singletonMap(
MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_9_IV0.featureLevel());
}
} }
public short quorumStateVersion() { public short quorumStateVersion() {