From 1bcaa19c46ac9be2166f52a3381ade58997122e7 Mon Sep 17 00:00:00 2001 From: Kevin Wu Date: Wed, 30 Jul 2025 09:58:08 -0500 Subject: [PATCH] KAFKA-19489; Extra validation when formatting a node (#20136) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR adds a check to the storage tool's format command which throws a TerseFailure when the controller.quorum.voters config is defined and the node is formatted with the --standalone flag or the --initial-controllers flag. Without this check, it is possible to have two voter sets. For example, in a three node setup, the two nodes that formatted with --no-initial-controllers could form quorum with each other since they have the static voter set, and the --standalone node would ignore the config and read the voter set of itself from its log, forming its own quorum of 1. Reviewers: José Armando García Sancio , TaiJuWu , Alyssa Huang --- .../main/scala/kafka/tools/StorageTool.scala | 15 ++++- .../unit/kafka/tools/StorageToolTest.scala | 57 ++++++++++++++++++- docs/ops.html | 14 ++--- .../org/apache/kafka/raft/QuorumConfig.java | 3 + 4 files changed, 79 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala index 66aabb8f4d1..63993ed5ea9 100644 --- a/core/src/main/scala/kafka/tools/StorageTool.scala +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -147,9 +147,20 @@ object StorageTool extends Logging { featureNamesAndLevels(_).foreachEntry { (k, v) => formatter.setFeatureLevel(k, v) }) - Option(namespace.getString("initial_controllers")). + val initialControllers = namespace.getString("initial_controllers") + val isStandalone = namespace.getBoolean("standalone") + if (!config.quorumConfig.voters().isEmpty && + (Option(initialControllers).isDefined || isStandalone)) { + throw new TerseFailure("You cannot specify " + + QuorumConfig.QUORUM_VOTERS_CONFIG + " and format the node " + + "with --initial-controllers or --standalone. " + + "If you want to use dynamic quorum, please remove " + + QuorumConfig.QUORUM_VOTERS_CONFIG + " and specify " + + QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG + " instead.") + } + Option(initialControllers). foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v))) - if (namespace.getBoolean("standalone")) { + if (isStandalone) { formatter.setInitialControllers(createStandaloneDynamicVoters(config)) } if (namespace.getBoolean("no_initial_controllers")) { diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index 4d271d594cf..35966996ce6 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -389,7 +389,10 @@ Found problem: def testFormatWithStandaloneFlagOnBrokerFails(): Unit = { val availableDirs = Seq(TestUtils.tempDir()) val properties = new Properties() - properties.putAll(defaultStaticQuorumProperties) + properties.setProperty("process.roles", "broker") + properties.setProperty("node.id", "0") + properties.setProperty("controller.listener.names", "CONTROLLER") + properties.setProperty("controller.quorum.bootstrap.servers", "localhost:9093") properties.setProperty("log.dirs", availableDirs.mkString(",")) val stream = new ByteArrayOutputStream() val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--standalone") @@ -398,6 +401,58 @@ Found problem: () => runFormatCommand(stream, properties, arguments.toSeq)).getMessage) } + @Test + def testFormatWithStandaloneFailsWithStaticVotersConfig(): Unit = { + val availableDirs = Seq(TestUtils.tempDir()) + val properties = new Properties() + properties.putAll(defaultDynamicQuorumProperties) + properties.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "0@localhost:8020") + properties.setProperty("log.dirs", availableDirs.mkString(",")) + val stream = new ByteArrayOutputStream() + val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--standalone") + assertEquals("You cannot specify controller.quorum.voters and " + + "format the node with --initial-controllers or --standalone. If you " + + "want to use dynamic quorum, please remove controller.quorum.voters and " + + "specify controller.quorum.bootstrap.servers instead.", + assertThrows(classOf[TerseFailure], + () => runFormatCommand(stream, properties, arguments.toSeq)).getMessage + ) + } + + @Test + def testFormatWithInitialControllersFailsWithStaticVotersConfig(): Unit = { + val availableDirs = Seq(TestUtils.tempDir()) + val properties = new Properties() + properties.putAll(defaultDynamicQuorumProperties) + properties.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "0@localhost:8020") + properties.setProperty("log.dirs", availableDirs.mkString(",")) + val stream = new ByteArrayOutputStream() + val arguments = ListBuffer[String]( + "--release-version", "3.9-IV0", + "--initial-controllers", + "0@localhost:8020:K90IZ-0DRNazJ49kCZ1EMQ," + ) + assertEquals("You cannot specify controller.quorum.voters and " + + "format the node with --initial-controllers or --standalone. If you " + + "want to use dynamic quorum, please remove controller.quorum.voters and " + + "specify controller.quorum.bootstrap.servers instead.", + assertThrows(classOf[TerseFailure], + () => runFormatCommand(stream, properties, arguments.toSeq)).getMessage + ) + } + + @Test + def testFormatWithNoInitialControllersPassesWithVotersConfig(): Unit = { + val availableDirs = Seq(TestUtils.tempDir()) + val properties = new Properties() + properties.putAll(defaultDynamicQuorumProperties) + properties.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "0@localhost:8020") + properties.setProperty("log.dirs", availableDirs.mkString(",")) + val stream = new ByteArrayOutputStream() + val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--no-initial-controllers") + assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq)) + } + @ParameterizedTest @ValueSource(booleans = Array(false, true)) def testFormatWithStandaloneFlag(setKraftVersionFeature: Boolean): Unit = { diff --git a/docs/ops.html b/docs/ops.html index 0b1e8fa6880..f520262c588 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -4057,18 +4057,18 @@ In the replica description 0@controller-0:1234:3Db5QLSqSZieL3rJBUUegA, 0 is the

Controller membership changes

Static versus Dynamic KRaft Quorums
- There are two ways to run KRaft: the old way using static controller quorums, and the new way - using KIP-853 dynamic controller quorums.

+ There are two ways to run KRaft: using KIP-853 dynamic controller quorums, or the old way + using static controller quorums.

- When using a static quorum, the configuration file for each broker and controller must specify - the IDs, hostnames, and ports of all controllers in controller.quorum.voters.

- - In contrast, when using a dynamic quorum, you should set - controller.quorum.bootstrap.servers instead. This configuration key need not + When using a dynamic quorum, controller.quorum.voters must not be set + and controller.quorum.bootstrap.servers is set instead. This configuration key need not contain all the controllers, but it should contain as many as possible so that all the servers can locate the quorum. In other words, its function is much like the bootstrap.servers configuration used by Kafka clients.

+ When using a static quorum, the configuration file for each broker and controller must specify + the IDs, hostnames, and ports of all controllers in controller.quorum.voters.

+ If you are not sure whether you are using static or dynamic quorums, you can determine this by running something like the following:

diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java index 8adef90f49b..3ff2f7c86de 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java @@ -57,6 +57,9 @@ public class QuorumConfig { public static final String QUORUM_VOTERS_CONFIG = QUORUM_PREFIX + "voters"; public static final String QUORUM_VOTERS_DOC = "Map of id/endpoint information for " + "the set of voters in a comma-separated list of {id}@{host}:{port} entries. " + + "This is the old way of defining membership for controller quorums and should NOT be " + + "set if using dynamic quorums. Instead, controller.quorum.bootstrap.servers should be set," + + "and the voter set is determined by the --standalone or --initial-controllers flags when formatting." + "For example: 1@localhost:9092,2@localhost:9093,3@localhost:9094"; public static final List DEFAULT_QUORUM_VOTERS = List.of();