diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala index 66aabb8f4d1..63993ed5ea9 100644 --- a/core/src/main/scala/kafka/tools/StorageTool.scala +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -147,9 +147,20 @@ object StorageTool extends Logging { featureNamesAndLevels(_).foreachEntry { (k, v) => formatter.setFeatureLevel(k, v) }) - Option(namespace.getString("initial_controllers")). + val initialControllers = namespace.getString("initial_controllers") + val isStandalone = namespace.getBoolean("standalone") + if (!config.quorumConfig.voters().isEmpty && + (Option(initialControllers).isDefined || isStandalone)) { + throw new TerseFailure("You cannot specify " + + QuorumConfig.QUORUM_VOTERS_CONFIG + " and format the node " + + "with --initial-controllers or --standalone. " + + "If you want to use dynamic quorum, please remove " + + QuorumConfig.QUORUM_VOTERS_CONFIG + " and specify " + + QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG + " instead.") + } + Option(initialControllers). foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v))) - if (namespace.getBoolean("standalone")) { + if (isStandalone) { formatter.setInitialControllers(createStandaloneDynamicVoters(config)) } if (namespace.getBoolean("no_initial_controllers")) { diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index 4d271d594cf..35966996ce6 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -389,7 +389,10 @@ Found problem: def testFormatWithStandaloneFlagOnBrokerFails(): Unit = { val availableDirs = Seq(TestUtils.tempDir()) val properties = new Properties() - properties.putAll(defaultStaticQuorumProperties) + properties.setProperty("process.roles", "broker") + properties.setProperty("node.id", "0") + properties.setProperty("controller.listener.names", "CONTROLLER") + properties.setProperty("controller.quorum.bootstrap.servers", "localhost:9093") properties.setProperty("log.dirs", availableDirs.mkString(",")) val stream = new ByteArrayOutputStream() val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--standalone") @@ -398,6 +401,58 @@ Found problem: () => runFormatCommand(stream, properties, arguments.toSeq)).getMessage) } + @Test + def testFormatWithStandaloneFailsWithStaticVotersConfig(): Unit = { + val availableDirs = Seq(TestUtils.tempDir()) + val properties = new Properties() + properties.putAll(defaultDynamicQuorumProperties) + properties.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "0@localhost:8020") + properties.setProperty("log.dirs", availableDirs.mkString(",")) + val stream = new ByteArrayOutputStream() + val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--standalone") + assertEquals("You cannot specify controller.quorum.voters and " + + "format the node with --initial-controllers or --standalone. If you " + + "want to use dynamic quorum, please remove controller.quorum.voters and " + + "specify controller.quorum.bootstrap.servers instead.", + assertThrows(classOf[TerseFailure], + () => runFormatCommand(stream, properties, arguments.toSeq)).getMessage + ) + } + + @Test + def testFormatWithInitialControllersFailsWithStaticVotersConfig(): Unit = { + val availableDirs = Seq(TestUtils.tempDir()) + val properties = new Properties() + properties.putAll(defaultDynamicQuorumProperties) + properties.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "0@localhost:8020") + properties.setProperty("log.dirs", availableDirs.mkString(",")) + val stream = new ByteArrayOutputStream() + val arguments = ListBuffer[String]( + "--release-version", "3.9-IV0", + "--initial-controllers", + "0@localhost:8020:K90IZ-0DRNazJ49kCZ1EMQ," + ) + assertEquals("You cannot specify controller.quorum.voters and " + + "format the node with --initial-controllers or --standalone. If you " + + "want to use dynamic quorum, please remove controller.quorum.voters and " + + "specify controller.quorum.bootstrap.servers instead.", + assertThrows(classOf[TerseFailure], + () => runFormatCommand(stream, properties, arguments.toSeq)).getMessage + ) + } + + @Test + def testFormatWithNoInitialControllersPassesWithVotersConfig(): Unit = { + val availableDirs = Seq(TestUtils.tempDir()) + val properties = new Properties() + properties.putAll(defaultDynamicQuorumProperties) + properties.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "0@localhost:8020") + properties.setProperty("log.dirs", availableDirs.mkString(",")) + val stream = new ByteArrayOutputStream() + val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--no-initial-controllers") + assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq)) + } + @ParameterizedTest @ValueSource(booleans = Array(false, true)) def testFormatWithStandaloneFlag(setKraftVersionFeature: Boolean): Unit = { diff --git a/docs/ops.html b/docs/ops.html index 0b1e8fa6880..f520262c588 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -4057,18 +4057,18 @@ In the replica description 0@controller-0:1234:3Db5QLSqSZieL3rJBUUegA, 0 is the

Controller membership changes

Static versus Dynamic KRaft Quorums
- There are two ways to run KRaft: the old way using static controller quorums, and the new way - using KIP-853 dynamic controller quorums.

+ There are two ways to run KRaft: using KIP-853 dynamic controller quorums, or the old way + using static controller quorums.

- When using a static quorum, the configuration file for each broker and controller must specify - the IDs, hostnames, and ports of all controllers in controller.quorum.voters.

- - In contrast, when using a dynamic quorum, you should set - controller.quorum.bootstrap.servers instead. This configuration key need not + When using a dynamic quorum, controller.quorum.voters must not be set + and controller.quorum.bootstrap.servers is set instead. This configuration key need not contain all the controllers, but it should contain as many as possible so that all the servers can locate the quorum. In other words, its function is much like the bootstrap.servers configuration used by Kafka clients.

+ When using a static quorum, the configuration file for each broker and controller must specify + the IDs, hostnames, and ports of all controllers in controller.quorum.voters.

+ If you are not sure whether you are using static or dynamic quorums, you can determine this by running something like the following:

diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java index 8adef90f49b..3ff2f7c86de 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java @@ -57,6 +57,9 @@ public class QuorumConfig { public static final String QUORUM_VOTERS_CONFIG = QUORUM_PREFIX + "voters"; public static final String QUORUM_VOTERS_DOC = "Map of id/endpoint information for " + "the set of voters in a comma-separated list of {id}@{host}:{port} entries. " + + "This is the old way of defining membership for controller quorums and should NOT be " + + "set if using dynamic quorums. Instead, controller.quorum.bootstrap.servers should be set," + + "and the voter set is determined by the --standalone or --initial-controllers flags when formatting." + "For example: 1@localhost:9092,2@localhost:9093,3@localhost:9094"; public static final List DEFAULT_QUORUM_VOTERS = List.of();