diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala index 40892bca38c..594109e15d2 100644 --- a/core/src/main/scala/kafka/tools/StorageTool.scala +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -135,21 +135,30 @@ object StorageTool extends Logging { featureNamesAndLevels(_).foreachEntry { (k, v) => formatter.setFeatureLevel(k, v) }) - Option(namespace.getString("initial_controllers")). + val initialControllers = namespace.getString("initial_controllers") + val isStandalone = namespace.getBoolean("standalone") + val staticVotersEmpty = config.quorumConfig.voters().isEmpty + formatter.setHasDynamicQuorum(staticVotersEmpty) + if (!staticVotersEmpty && (Option(initialControllers).isDefined || isStandalone)) { + throw new TerseFailure("You cannot specify " + + QuorumConfig.QUORUM_VOTERS_CONFIG + " and format the node " + + "with --initial-controllers or --standalone. " + + "If you want to use dynamic quorum, please remove " + + QuorumConfig.QUORUM_VOTERS_CONFIG + " and specify " + + QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG + " instead.") + } + Option(initialControllers). foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v))) - if (namespace.getBoolean("standalone")) { + if (isStandalone) { formatter.setInitialControllers(createStandaloneDynamicVoters(config)) } - if (namespace.getBoolean("no_initial_controllers")) { - formatter.setNoInitialControllersFlag(true) - } else { - if (config.processRoles.contains(ProcessRole.ControllerRole)) { - if (config.quorumConfig.voters().isEmpty && formatter.initialVoters().isEmpty) { + if (!namespace.getBoolean("no_initial_controllers") && + config.processRoles.contains(ProcessRole.ControllerRole) && + staticVotersEmpty && + formatter.initialVoters().isEmpty) { throw new TerseFailure("Because " + QuorumConfig.QUORUM_VOTERS_CONFIG + " is not set on this controller, you must specify one of the following: " + "--standalone, --initial-controllers, or --no-initial-controllers."); - } - } } Option(namespace.getList("add_scram")). foreach(scramArgs => formatter.setScramArguments(scramArgs.asInstanceOf[util.List[String]])) @@ -319,18 +328,21 @@ object StorageTool extends Logging { val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup() reconfigurableQuorumOptions.addArgument("--standalone", "-s") - .help("Used to initialize a controller as a single-node dynamic quorum.") + .help("Used to initialize a controller as a single-node dynamic quorum. When setting this flag, " + + "the controller.quorum.voters config must not be set, and controller.quorum.bootstrap.servers is set instead.") .action(storeTrue()) reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N") - .help("Used to initialize a server without a dynamic quorum topology.") + .help("Used to initialize a server without specifying a dynamic quorum. When setting this flag, " + + "the controller.quorum.voters config should not be set, and controller.quorum.bootstrap.servers is set instead.") .action(storeTrue()) reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I") - .help("Used to initialize a server with a specific dynamic quorum topology. The argument " + + .help("Used to initialize a server with the specified dynamic quorum. The argument " + "is a comma-separated list of id@hostname:port:directory. The same values must be used to " + "format all nodes. For example:\n0@example.com:8082:JEXY6aqzQY-32P5TStzaFg,1@example.com:8083:" + - "MvDxzVmcRsaTz33bUuRU6A,2@example.com:8084:07R5amHmR32VDA6jHkGbTA\n") + "MvDxzVmcRsaTz33bUuRU6A,2@example.com:8084:07R5amHmR32VDA6jHkGbTA\n. When setting this flag, " + + "the controller.quorum.voters config must not be set, and controller.quorum.bootstrap.servers is set instead.") .action(store()) } diff --git a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java b/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java index 981217ce287..fe1b6b5da9f 100644 --- a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java +++ b/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java @@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.TreeMap; @@ -83,9 +84,8 @@ public class ReconfigurableQuorumIntegrationTest { new TestKitNodes.Builder(). setNumBrokerNodes(1). setNumControllerNodes(1). - setFeature(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel()). build() - ).build()) { + ).setStandalone(true).build()) { cluster.format(); cluster.startup(); try (Admin admin = Admin.create(cluster.clientProperties())) { @@ -107,13 +107,23 @@ public class ReconfigurableQuorumIntegrationTest { @Test public void testRemoveController() throws Exception { - try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder( - new TestKitNodes.Builder(). - setNumBrokerNodes(1). - setNumControllerNodes(3). - setFeature(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel()). - build() - ).build()) { + final var nodes = new TestKitNodes.Builder(). + setNumBrokerNodes(1). + setNumControllerNodes(3). + build(); + + final Map initialVoters = new HashMap<>(); + for (final var controllerNode : nodes.controllerNodes().values()) { + initialVoters.put( + controllerNode.id(), + controllerNode.metadataDirectoryId() + ); + } + + try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(nodes). + setInitialVoterSet(initialVoters). + build() + ) { cluster.format(); cluster.startup(); try (Admin admin = Admin.create(cluster.clientProperties())) { @@ -132,12 +142,22 @@ public class ReconfigurableQuorumIntegrationTest { @Test public void testRemoveAndAddSameController() throws Exception { - try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder( - new TestKitNodes.Builder(). - setNumBrokerNodes(1). - setNumControllerNodes(4). - setFeature(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel()). - build()).build() + final var nodes = new TestKitNodes.Builder(). + setNumBrokerNodes(1). + setNumControllerNodes(4). + build(); + + final Map initialVoters = new HashMap<>(); + for (final var controllerNode : nodes.controllerNodes().values()) { + initialVoters.put( + controllerNode.id(), + controllerNode.metadataDirectoryId() + ); + } + + try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(nodes). + setInitialVoterSet(initialVoters). + build() ) { cluster.format(); cluster.startup(); diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index 4fe4fb48cd8..2b99be9321f 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -1013,8 +1013,7 @@ class KRaftClusterTest { val cluster = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). setNumBrokerNodes(1). - setNumControllerNodes(1). - setFeature(KRaftVersion.FEATURE_NAME, 1.toShort).build()).build() + setNumControllerNodes(1).build()).setStandalone(true).build() try { cluster.format() cluster.startup() diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index 9fde243ec19..84404507093 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -375,7 +375,10 @@ Found problem: def testFormatWithStandaloneFlagOnBrokerFails(): Unit = { val availableDirs = Seq(TestUtils.tempDir()) val properties = new Properties() - properties.putAll(defaultStaticQuorumProperties) + properties.setProperty("process.roles", "broker") + properties.setProperty("node.id", "0") + properties.setProperty("controller.listener.names", "CONTROLLER") + properties.setProperty("controller.quorum.bootstrap.servers", "localhost:9093") properties.setProperty("log.dirs", availableDirs.mkString(",")) val stream = new ByteArrayOutputStream() val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--standalone") @@ -458,19 +461,14 @@ Found problem: Seq("--release-version", "3.9-IV0"))).getMessage) } - @ParameterizedTest - @ValueSource(booleans = Array(false, true)) - def testFormatWithNoInitialControllersSucceedsOnController(setKraftVersionFeature: Boolean): Unit = { + @Test + def testFormatWithNoInitialControllersSucceedsOnController(): Unit = { val availableDirs = Seq(TestUtils.tempDir()) val properties = new Properties() properties.putAll(defaultDynamicQuorumProperties) properties.setProperty("log.dirs", availableDirs.mkString(",")) val stream = new ByteArrayOutputStream() val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--no-initial-controllers") - if (setKraftVersionFeature) { - arguments += "--feature" - arguments += "kraft.version=1" - } assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq)) assertTrue(stream.toString(). contains("Formatting metadata directory %s".format(availableDirs.head)), diff --git a/docs/ops.html b/docs/ops.html index 5a60a4cde89..3d1c99e8522 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -3869,45 +3869,29 @@ In the replica description 0@controller-0:1234:3Db5QLSqSZieL3rJBUUegA, 0 is the If you are not sure whether you are using static or dynamic quorums, you can determine this by running something like the following:

-


-  $ bin/kafka-features.sh --bootstrap-controller localhost:9093 describe
-

+

$ bin/kafka-features.sh --bootstrap-controller localhost:9093 describe
+

+ If the kraft.version field is level 0 or absent, you are using a static quorum. If + it is 1 or above, you are using a dynamic quorum. For example, here is an example of a static + quorum:

+

Feature: kraft.version  SupportedMinVersion: 0  SupportedMaxVersion: 1  FinalizedVersionLevel: 0 Epoch: 5
+Feature: metadata.version       SupportedMinVersion: 3.3-IV3    SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0  Epoch: 5
+

+ Here is another example of a static quorum:

+

Feature: metadata.version       SupportedMinVersion: 3.3-IV3    SupportedMaxVersion: 3.8-IV0 FinalizedVersionLevel: 3.8-IV0  Epoch: 5
+

+ Here is an example of a dynamic quorum:

+

Feature: kraft.version  SupportedMinVersion: 0  SupportedMaxVersion: 1  FinalizedVersionLevel: 1 Epoch: 5
+Feature: metadata.version       SupportedMinVersion: 3.3-IV3    SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0  Epoch: 5
+

+ The static versus dynamic nature of the quorum is determined at the time of formatting. + Specifically, the quorum will be formatted as dynamic if controller.quorum.voters is + not present, and one of --standalone, --initial-controllers, or --no-initial-controllers is set. + If you have followed the instructions earlier in this document, you will get a dynamic quorum. +

- If the kraft.version field is level 0 or absent, you are using a static quorum. If - it is 1 or above, you are using a dynamic quorum. For example, here is an example of a static - quorum:

-


-Feature: kraft.version  SupportedMinVersion: 0  SupportedMaxVersion: 1  FinalizedVersionLevel: 0 Epoch: 5
-Feature: metadata.version       SupportedMinVersion: 3.3-IV3    SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0  Epoch: 5
-

- - Here is another example of a static quorum:

-


-Feature: metadata.version       SupportedMinVersion: 3.3-IV3    SupportedMaxVersion: 3.8-IV0 FinalizedVersionLevel: 3.8-IV0  Epoch: 5
-

- - Here is an example of a dynamic quorum:

-


-Feature: kraft.version  SupportedMinVersion: 0  SupportedMaxVersion: 1  FinalizedVersionLevel: 1 Epoch: 5
-Feature: metadata.version       SupportedMinVersion: 3.3-IV3    SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0  Epoch: 5
-

- - The static versus dynamic nature of the quorum is determined at the time of formatting. - Specifically, the quorum will be formatted as dynamic if controller.quorum.voters is - not present, and if the software version is Apache Kafka 3.9 or newer. If you have - followed the instructions earlier in this document, you will get a dynamic quorum.

- - If you would like the formatting process to fail if a dynamic quorum cannot be achieved, format your - controllers using the --feature kraft.version=1. (Note that you should not supply - this flag when formatting brokers -- only when formatting controllers.)

- -


-  $ bin/kafka-storage.sh format -t KAFKA_CLUSTER_ID --feature kraft.version=1 -c controller_static.properties
-  Cannot set kraft.version to 1 unless KIP-853 configuration is present. Try removing the --feature flag for kraft.version.
-

- - Note: Currently it is not possible to convert clusters using a static controller quorum to - use a dynamic controller quorum. This function will be supported in the future release. + Note: Currently it is not possible to convert clusters using a static controller quorum to + use a dynamic controller quorum. This function will be supported in the future release.

Add New Controller
If a dynamic controller cluster already exists, it can be expanded by first provisioning a new controller using the kafka-storage.sh tool and starting the controller. diff --git a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java index acba0f7a04b..e79ca41ff80 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java @@ -132,7 +132,7 @@ public class Formatter { * The initial KIP-853 voters. */ private Optional initialControllers = Optional.empty(); - private boolean noInitialControllersFlag = false; + private boolean hasDynamicQuorum = false; public Formatter setPrintStream(PrintStream printStream) { this.printStream = printStream; @@ -218,8 +218,8 @@ public class Formatter { return this; } - public Formatter setNoInitialControllersFlag(boolean noInitialControllersFlag) { - this.noInitialControllersFlag = noInitialControllersFlag; + public Formatter setHasDynamicQuorum(boolean hasDynamicQuorum) { + this.hasDynamicQuorum = hasDynamicQuorum; return this; } @@ -228,7 +228,7 @@ public class Formatter { } boolean hasDynamicQuorum() { - return initialControllers.isPresent() || noInitialControllersFlag; + return hasDynamicQuorum; } public BootstrapMetadata bootstrapMetadata() { @@ -338,8 +338,8 @@ public class Formatter { /** * Calculate the effective feature level for kraft.version. In order to keep existing * command-line invocations of StorageTool working, we default this to 0 if no dynamic - * voter quorum arguments were provided. As a convenience, if dynamic voter quorum arguments - * were passed, we set the latest kraft.version. (Currently there is only 1 non-zero version). + * voter quorum arguments were provided. As a convenience, if the static voters config is + * empty, we set the latest kraft.version. (Currently there is only 1 non-zero version). * * @param configuredKRaftVersionLevel The configured level for kraft.version * @return The effective feature level. @@ -348,15 +348,21 @@ public class Formatter { if (configuredKRaftVersionLevel.isPresent()) { if (configuredKRaftVersionLevel.get() == 0) { if (hasDynamicQuorum()) { - throw new FormatterException("Cannot set kraft.version to " + - configuredKRaftVersionLevel.get() + " if KIP-853 configuration is present. " + - "Try removing the --feature flag for kraft.version."); + throw new FormatterException( + "Cannot set kraft.version to 0 if controller.quorum.voters is empty and one of the flags " + + "--standalone, --initial-controllers, or --no-initial-controllers is used. For dynamic " + + "controllers support, try removing the --feature flag for kraft.version." + ); } } else { if (!hasDynamicQuorum()) { - throw new FormatterException("Cannot set kraft.version to " + - configuredKRaftVersionLevel.get() + " unless KIP-853 configuration is present. " + - "Try removing the --feature flag for kraft.version."); + throw new FormatterException( + "Cannot set kraft.version to " + configuredKRaftVersionLevel.get() + + " unless controller.quorum.voters is empty and one of the flags --standalone, " + + "--initial-controllers, or --no-initial-controllers is used. " + + "For dynamic controllers support, try using one of --standalone, --initial-controllers, " + + "or --no-initial-controllers and removing controller.quorum.voters." + ); } } return configuredKRaftVersionLevel.get(); diff --git a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java index 0706e4e738f..83946c426fc 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.GroupVersion; +import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.TestFeatureVersion; import org.apache.kafka.server.common.TransactionVersion; @@ -194,6 +195,40 @@ public class FormatterTest { } } + @Test + public void testStandaloneWithIgnoreFormatted() throws Exception { + try (TestEnv testEnv = new TestEnv(1)) { + FormatterContext formatter1 = testEnv.newFormatter(); + String originalDirectoryId = Uuid.randomUuid().toString(); + String newDirectoryId = Uuid.randomUuid().toString(); + formatter1.formatter + .setInitialControllers(DynamicVoters.parse("1@localhost:8020:" + originalDirectoryId)) + .setHasDynamicQuorum(true) + .run(); + assertEquals("Formatting dynamic metadata voter directory " + testEnv.directory(0) + + " with metadata.version " + MetadataVersion.latestProduction() + ".", + formatter1.output().trim()); + assertMetadataDirectoryId(testEnv, Uuid.fromString(originalDirectoryId)); + + FormatterContext formatter2 = testEnv.newFormatter(); + formatter2.formatter + .setIgnoreFormatted(true) + .setInitialControllers(DynamicVoters.parse("1@localhost:8020:" + newDirectoryId)) + .run(); + assertEquals("All of the log directories are already formatted.", + formatter2.output().trim()); + assertMetadataDirectoryId(testEnv, Uuid.fromString(originalDirectoryId)); + } + } + + private void assertMetadataDirectoryId(TestEnv testEnv, Uuid expectedDirectoryId) throws Exception { + MetaPropertiesEnsemble ensemble = new MetaPropertiesEnsemble.Loader(). + addLogDirs(testEnv.directories). + load(); + MetaProperties logDirProps0 = ensemble.logDirProps().get(testEnv.directory(0)); + assertEquals(expectedDirectoryId, logDirProps0.directoryId().get()); + } + @Test public void testOneDirectoryFormattedAndOthersNotFormatted() throws Exception { try (TestEnv testEnv = new TestEnv(2)) { @@ -383,14 +418,15 @@ public class FormatterTest { try (TestEnv testEnv = new TestEnv(2)) { FormatterContext formatter1 = testEnv.newFormatter(); if (specifyKRaftVersion) { - formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); + formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 1); } formatter1.formatter.setUnstableFeatureVersionsEnabled(true); formatter1.formatter.setInitialControllers(DynamicVoters. parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); + formatter1.formatter.setHasDynamicQuorum(true); formatter1.formatter.run(); - assertEquals((short) 1, formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0)); - assertEquals(Arrays.asList( + assertEquals((short) 1, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME)); + assertEquals(List.of( String.format("Formatting data directory %s with %s %s.", testEnv.directory(1), MetadataVersion.FEATURE_NAME, @@ -416,45 +452,66 @@ public class FormatterTest { public void testFormatWithInitialVotersFailsWithOlderKraftVersion() throws Exception { try (TestEnv testEnv = new TestEnv(2)) { FormatterContext formatter1 = testEnv.newFormatter(); - formatter1.formatter.setFeatureLevel("kraft.version", (short) 0); + formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 0); formatter1.formatter.setUnstableFeatureVersionsEnabled(true); formatter1.formatter.setInitialControllers(DynamicVoters. parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); + formatter1.formatter.setHasDynamicQuorum(true); assertTrue(formatter1.formatter.hasDynamicQuorum()); - assertEquals("Cannot set kraft.version to 0 if KIP-853 configuration is present. " + - "Try removing the --feature flag for kraft.version.", - assertThrows(FormatterException.class, - () -> formatter1.formatter.run()).getMessage()); + assertEquals( + "Cannot set kraft.version to 0 if controller.quorum.voters is empty " + + "and one of the flags --standalone, --initial-controllers, or --no-initial-controllers is used. " + + "For dynamic controllers support, try removing the --feature flag for kraft.version.", + assertThrows(FormatterException.class, formatter1.formatter::run).getMessage() + ); } } @Test - public void testFormatWithoutInitialVotersFailsWithNewerKraftVersion() throws Exception { + public void testFormatWithStaticQuorumFailsWithNewerKraftVersion() throws Exception { try (TestEnv testEnv = new TestEnv(2)) { FormatterContext formatter1 = testEnv.newFormatter(); - formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); + formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 1); formatter1.formatter.setUnstableFeatureVersionsEnabled(true); assertFalse(formatter1.formatter.hasDynamicQuorum()); - assertEquals("Cannot set kraft.version to 1 unless KIP-853 configuration is present. " + - "Try removing the --feature flag for kraft.version.", - assertThrows(FormatterException.class, - () -> formatter1.formatter.run()).getMessage()); + assertEquals( + "Cannot set kraft.version to 1 unless controller.quorum.voters is empty and " + + "one of the flags --standalone, --initial-controllers, or --no-initial-controllers is used. " + + "For dynamic controllers support, try using one of --standalone, --initial-controllers, " + + "or --no-initial-controllers and removing controller.quorum.voters.", + assertThrows(FormatterException.class, formatter1.formatter::run).getMessage() + ); } } @Test - public void testFormatWithInitialVotersFailsWithOlderMetadataVersion() throws Exception { + public void testFormatWithInitialVotersWithOlderMetadataVersion() throws Exception { try (TestEnv testEnv = new TestEnv(2)) { FormatterContext formatter1 = testEnv.newFormatter(); formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0); - formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); + formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 1); formatter1.formatter.setInitialControllers(DynamicVoters. parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); formatter1.formatter.setUnstableFeatureVersionsEnabled(true); - assertEquals("kraft.version could not be set to 1 because it depends on " + - "metadata.version level 21", - assertThrows(IllegalArgumentException.class, - () -> formatter1.formatter.run()).getMessage()); + formatter1.formatter.setHasDynamicQuorum(true); + formatter1.formatter.run(); + assertEquals((short) 1, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME)); + } + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testFormatWithNoInitialControllersWithOlderMetadataVersion(boolean hasDynamicQuorum) throws Exception { + try (TestEnv testEnv = new TestEnv(2)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0); + formatter1.formatter.setHasDynamicQuorum(hasDynamicQuorum); + formatter1.formatter.run(); + if (hasDynamicQuorum) { + assertEquals((short) 1, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME)); + } else { + assertEquals((short) 0, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME)); + } } } @@ -475,6 +532,7 @@ public class FormatterTest { formatter1.formatter.setFeatureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME, (short) 1); formatter1.formatter.setInitialControllers(DynamicVoters. parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); + formatter1.formatter.setHasDynamicQuorum(true); if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_0_IV1)) { assertDoesNotThrow(() -> formatter1.formatter.run()); } else { @@ -486,21 +544,15 @@ public class FormatterTest { } } - @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void testFormatWithNoInitialControllers(boolean specifyKRaftVersion) throws Exception { + @Test + public void testFormatWithNoInitialControllers() throws Exception { try (TestEnv testEnv = new TestEnv(2)) { FormatterContext formatter1 = testEnv.newFormatter(); - if (specifyKRaftVersion) { - formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); - } formatter1.formatter.setUnstableFeatureVersionsEnabled(true); - formatter1.formatter.setNoInitialControllersFlag(true); - assertTrue(formatter1.formatter.hasDynamicQuorum()); - + assertFalse(formatter1.formatter.hasDynamicQuorum()); formatter1.formatter.run(); - assertEquals((short) 1, formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0)); - assertEquals(Arrays.asList( + assertEquals((short) 0, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME)); + assertEquals(List.of( String.format("Formatting data directory %s with %s %s.", testEnv.directory(1), MetadataVersion.FEATURE_NAME, @@ -519,34 +571,4 @@ public class FormatterTest { assertNotNull(logDirProps1); } } - - @Test - public void testFormatWithoutNoInitialControllersFailsWithNewerKraftVersion() throws Exception { - try (TestEnv testEnv = new TestEnv(2)) { - FormatterContext formatter1 = testEnv.newFormatter(); - formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); - formatter1.formatter.setUnstableFeatureVersionsEnabled(true); - formatter1.formatter.setNoInitialControllersFlag(false); - assertFalse(formatter1.formatter.hasDynamicQuorum()); - assertEquals("Cannot set kraft.version to 1 unless KIP-853 configuration is present. " + - "Try removing the --feature flag for kraft.version.", - assertThrows(FormatterException.class, - formatter1.formatter::run).getMessage()); - } - } - - @Test - public void testFormatWithNoInitialControllersFailsWithOlderKraftVersion() throws Exception { - try (TestEnv testEnv = new TestEnv(2)) { - FormatterContext formatter1 = testEnv.newFormatter(); - formatter1.formatter.setFeatureLevel("kraft.version", (short) 0); - formatter1.formatter.setUnstableFeatureVersionsEnabled(true); - formatter1.formatter.setNoInitialControllersFlag(true); - assertTrue(formatter1.formatter.hasDynamicQuorum()); - assertEquals("Cannot set kraft.version to 0 if KIP-853 configuration is present. " + - "Try removing the --feature flag for kraft.version.", - assertThrows(FormatterException.class, - formatter1.formatter::run).getMessage()); - } - } } diff --git a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java index 211f6dcac44..988d58c3e0e 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.server.common; -import java.util.Collections; import java.util.Map; public enum KRaftVersion implements FeatureVersion { @@ -73,12 +72,7 @@ public enum KRaftVersion implements FeatureVersion { @Override public Map dependencies() { - if (this.featureLevel == 0) { - return Collections.emptyMap(); - } else { - return Collections.singletonMap( - MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_9_IV0.featureLevel()); - } + return Map.of(); } public short quorumStateVersion() { diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java index 20963665440..1a16a7a0a34 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java @@ -27,10 +27,12 @@ import kafka.server.SharedServer; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.test.api.TestKitDefaults; import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -42,7 +44,6 @@ import org.apache.kafka.network.SocketServerConfigs; import org.apache.kafka.raft.DynamicVoters; import org.apache.kafka.raft.QuorumConfig; import org.apache.kafka.server.common.ApiMessageAndVersion; -import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.server.config.KRaftConfigs; import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.server.fault.FaultHandler; @@ -114,6 +115,8 @@ public class KafkaClusterTestKit implements AutoCloseable { private final String controllerListenerName; private final String brokerSecurityProtocol; private final String controllerSecurityProtocol; + private boolean standalone; + private Optional> initialVoterSet = Optional.empty(); private boolean deleteOnClose; public Builder(TestKitNodes nodes) { @@ -130,6 +133,16 @@ public class KafkaClusterTestKit implements AutoCloseable { return this; } + public Builder setStandalone(boolean standalone) { + this.standalone = standalone; + return this; + } + + public Builder setInitialVoterSet(Map initialVoterSet) { + this.initialVoterSet = Optional.of(initialVoterSet); + return this; + } + private KafkaConfig createNodeConfig(TestKitNode node) throws IOException { TestKitNode brokerNode = nodes.brokerNodes().get(node.id()); TestKitNode controllerNode = nodes.controllerNodes().get(node.id()); @@ -168,18 +181,31 @@ public class KafkaClusterTestKit implements AutoCloseable { props.putIfAbsent(INTER_BROKER_LISTENER_NAME_CONFIG, brokerListenerName); props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, controllerListenerName); - StringBuilder quorumVoterStringBuilder = new StringBuilder(); - String prefix = ""; - for (int nodeId : nodes.controllerNodes().keySet()) { - quorumVoterStringBuilder.append(prefix). - append(nodeId). - append("@"). - append("localhost"). - append(":"). - append(socketFactoryManager.getOrCreatePortForListener(nodeId, controllerListenerName)); - prefix = ","; + if (!standalone && initialVoterSet.isEmpty()) { + StringBuilder quorumVoterStringBuilder = new StringBuilder(); + String prefix = ""; + for (int nodeId : nodes.controllerNodes().keySet()) { + quorumVoterStringBuilder.append(prefix). + append(nodeId). + append("@"). + append("localhost"). + append(":"). + append(socketFactoryManager.getOrCreatePortForListener(nodeId, controllerListenerName)); + prefix = ","; + } + props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, quorumVoterStringBuilder.toString()); + } else { + StringBuilder bootstrapServersStringBuilder = new StringBuilder(); + String prefix = ""; + for (int nodeId : nodes.controllerNodes().keySet()) { + bootstrapServersStringBuilder.append(prefix). + append("localhost"). + append(":"). + append(socketFactoryManager.getOrCreatePortForListener(nodeId, controllerListenerName)); + prefix = ","; + } + props.put(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, bootstrapServersStringBuilder.toString()); } - props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, quorumVoterStringBuilder.toString()); // reduce log cleaner offset map memory usage props.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152"); @@ -251,7 +277,7 @@ public class KafkaClusterTestKit implements AutoCloseable { Time.SYSTEM, new Metrics(), CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())), - Collections.emptyList(), + QuorumConfig.parseBootstrapServers(config.quorumConfig().bootstrapServers()), faultHandlerFactory, socketFactoryManager.getOrCreateSocketFactory(node.id()) ); @@ -279,7 +305,7 @@ public class KafkaClusterTestKit implements AutoCloseable { Time.SYSTEM, new Metrics(), CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())), - Collections.emptyList(), + QuorumConfig.parseBootstrapServers(config.quorumConfig().bootstrapServers()), faultHandlerFactory, socketFactoryManager.getOrCreateSocketFactory(node.id()) ); @@ -316,6 +342,8 @@ public class KafkaClusterTestKit implements AutoCloseable { faultHandlerFactory, socketFactoryManager, Optional.ofNullable(jaasFile), + standalone, + initialVoterSet, deleteOnClose); } @@ -361,6 +389,8 @@ public class KafkaClusterTestKit implements AutoCloseable { private final PreboundSocketFactoryManager socketFactoryManager; private final String controllerListenerName; private final Optional jaasFile; + private final boolean standalone; + private final Optional> initialVoterSet; private final boolean deleteOnClose; private KafkaClusterTestKit( @@ -371,6 +401,8 @@ public class KafkaClusterTestKit implements AutoCloseable { SimpleFaultHandlerFactory faultHandlerFactory, PreboundSocketFactoryManager socketFactoryManager, Optional jaasFile, + boolean standalone, + Optional> initialVoterSet, boolean deleteOnClose ) { /* @@ -388,6 +420,8 @@ public class KafkaClusterTestKit implements AutoCloseable { this.socketFactoryManager = socketFactoryManager; this.controllerListenerName = nodes.controllerListenerName().value(); this.jaasFile = jaasFile; + this.standalone = standalone; + this.initialVoterSet = initialVoterSet; this.deleteOnClose = deleteOnClose; } @@ -422,8 +456,9 @@ public class KafkaClusterTestKit implements AutoCloseable { boolean writeMetadataDirectory ) { try { + final var nodeId = ensemble.nodeId().getAsInt(); Formatter formatter = new Formatter(); - formatter.setNodeId(ensemble.nodeId().getAsInt()); + formatter.setNodeId(nodeId); formatter.setClusterId(ensemble.clusterId().get()); if (writeMetadataDirectory) { formatter.setDirectories(ensemble.logDirProps().keySet()); @@ -436,8 +471,6 @@ public class KafkaClusterTestKit implements AutoCloseable { return; } formatter.setReleaseVersion(nodes.bootstrapMetadata().metadataVersion()); - formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, - nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME)); formatter.setUnstableFeatureVersionsEnabled(true); formatter.setIgnoreFormatted(false); formatter.setControllerListenerName(controllerListenerName); @@ -446,18 +479,43 @@ public class KafkaClusterTestKit implements AutoCloseable { } else { formatter.setMetadataLogDirectory(Optional.empty()); } - if (nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME) > 0) { - StringBuilder dynamicVotersBuilder = new StringBuilder(); - String prefix = ""; - for (TestKitNode controllerNode : nodes.controllerNodes().values()) { - int port = socketFactoryManager. - getOrCreatePortForListener(controllerNode.id(), controllerListenerName); + StringBuilder dynamicVotersBuilder = new StringBuilder(); + String prefix = ""; + if (standalone) { + if (nodeId == TestKitDefaults.BROKER_ID_OFFSET + TestKitDefaults.CONTROLLER_ID_OFFSET) { + final var controllerNode = nodes.controllerNodes().get(nodeId); + dynamicVotersBuilder.append( + String.format( + "%d@localhost:%d:%s", + controllerNode.id(), + socketFactoryManager. + getOrCreatePortForListener(controllerNode.id(), controllerListenerName), + controllerNode.metadataDirectoryId() + ) + ); + formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString())); + } + // when the nodeId != TestKitDefaults.CONTROLLER_ID_OFFSET, the node is formatting with + // the --no-initial-controllers flag + formatter.setHasDynamicQuorum(true); + } else if (initialVoterSet.isPresent()) { + for (final var controllerNode : initialVoterSet.get().entrySet()) { + final var voterId = controllerNode.getKey(); + final var voterDirectoryId = controllerNode.getValue(); dynamicVotersBuilder.append(prefix); prefix = ","; - dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s", - controllerNode.id(), port, controllerNode.metadataDirectoryId())); + dynamicVotersBuilder.append( + String.format( + "%d@localhost:%d:%s", + voterId, + socketFactoryManager. + getOrCreatePortForListener(voterId, controllerListenerName), + voterDirectoryId + ) + ); } formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString())); + formatter.setHasDynamicQuorum(true); } formatter.run(); } catch (Exception e) { diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java index 759e86c200b..cd8879a84db 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java @@ -94,11 +94,6 @@ public class TestKitNodes { return this; } - public Builder setFeature(String featureName, short level) { - this.bootstrapMetadata = bootstrapMetadata.copyWithFeatureRecord(featureName, level); - return this; - } - public Builder setCombined(boolean combined) { this.combined = combined; return this;