mirror of https://github.com/apache/kafka.git
KAFKA-19719: --no-initial-controllers should not assume kraft.version=1 (#20551)
Just because a controller node sets --no-initial-controllers flag does not mean it is necessarily running kraft.version=1. The more precise meaning is that the controller node being formatted does not know what kraft version the cluster should be in, and therefore it is only safe to assume kraft.version=0. Only by setting --standalone,--initial-controllers, or --no-initial-controllers AND not specifying the controller.quorum.voters static config, is it known kraft.version > 0. For example, it is a valid configuration (although confusing) to run a static quorum defined by controller.quorum.voters but have all the controllers format with --no-initial-controllers. In this case, specifying --no-initial-controllers alongside a metadata version that does not support kraft.version=1 causes formatting to fail, which is a regression. Additionally, the formatter should not check the kraft.version against the release version, since kraft.version does not actually depend on any release version. It should only check the kraft.version against the static voters config/format arguments. This PR also cleans up the integration test framework to match the semantics of formatting an actual cluster. Reviewers: TengYao Chi <kitingiao@gmail.com>, Kuan-Po Tseng <brandboat@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, José Armando García Sancio <jsancio@apache.org> Conflicts: core/src/main/scala/kafka/tools/StorageTool.scala Minor conflicts. Keep changes from cherry-pick. core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java Remove auto-join tests, since 4.1 does not support it. docs/ops.html Keep docs section from cherry-pick. metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java Minor conflicts. Keep cherry-picked changes. test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java Conflicts due to integration test framework changes. Keep new changes.
This commit is contained in:
parent
02d58b176c
commit
ec37eb538b
|
@ -135,22 +135,31 @@ object StorageTool extends Logging {
|
||||||
featureNamesAndLevels(_).foreachEntry {
|
featureNamesAndLevels(_).foreachEntry {
|
||||||
(k, v) => formatter.setFeatureLevel(k, v)
|
(k, v) => formatter.setFeatureLevel(k, v)
|
||||||
})
|
})
|
||||||
Option(namespace.getString("initial_controllers")).
|
val initialControllers = namespace.getString("initial_controllers")
|
||||||
|
val isStandalone = namespace.getBoolean("standalone")
|
||||||
|
val staticVotersEmpty = config.quorumConfig.voters().isEmpty
|
||||||
|
formatter.setHasDynamicQuorum(staticVotersEmpty)
|
||||||
|
if (!staticVotersEmpty && (Option(initialControllers).isDefined || isStandalone)) {
|
||||||
|
throw new TerseFailure("You cannot specify " +
|
||||||
|
QuorumConfig.QUORUM_VOTERS_CONFIG + " and format the node " +
|
||||||
|
"with --initial-controllers or --standalone. " +
|
||||||
|
"If you want to use dynamic quorum, please remove " +
|
||||||
|
QuorumConfig.QUORUM_VOTERS_CONFIG + " and specify " +
|
||||||
|
QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG + " instead.")
|
||||||
|
}
|
||||||
|
Option(initialControllers).
|
||||||
foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v)))
|
foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v)))
|
||||||
if (namespace.getBoolean("standalone")) {
|
if (isStandalone) {
|
||||||
formatter.setInitialControllers(createStandaloneDynamicVoters(config))
|
formatter.setInitialControllers(createStandaloneDynamicVoters(config))
|
||||||
}
|
}
|
||||||
if (namespace.getBoolean("no_initial_controllers")) {
|
if (!namespace.getBoolean("no_initial_controllers") &&
|
||||||
formatter.setNoInitialControllersFlag(true)
|
config.processRoles.contains(ProcessRole.ControllerRole) &&
|
||||||
} else {
|
staticVotersEmpty &&
|
||||||
if (config.processRoles.contains(ProcessRole.ControllerRole)) {
|
formatter.initialVoters().isEmpty) {
|
||||||
if (config.quorumConfig.voters().isEmpty && formatter.initialVoters().isEmpty) {
|
|
||||||
throw new TerseFailure("Because " + QuorumConfig.QUORUM_VOTERS_CONFIG +
|
throw new TerseFailure("Because " + QuorumConfig.QUORUM_VOTERS_CONFIG +
|
||||||
" is not set on this controller, you must specify one of the following: " +
|
" is not set on this controller, you must specify one of the following: " +
|
||||||
"--standalone, --initial-controllers, or --no-initial-controllers.");
|
"--standalone, --initial-controllers, or --no-initial-controllers.");
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
Option(namespace.getList("add_scram")).
|
Option(namespace.getList("add_scram")).
|
||||||
foreach(scramArgs => formatter.setScramArguments(scramArgs.asInstanceOf[util.List[String]]))
|
foreach(scramArgs => formatter.setScramArguments(scramArgs.asInstanceOf[util.List[String]]))
|
||||||
configToLogDirectories(config).foreach(formatter.addDirectory(_))
|
configToLogDirectories(config).foreach(formatter.addDirectory(_))
|
||||||
|
@ -319,18 +328,21 @@ object StorageTool extends Logging {
|
||||||
|
|
||||||
val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup()
|
val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup()
|
||||||
reconfigurableQuorumOptions.addArgument("--standalone", "-s")
|
reconfigurableQuorumOptions.addArgument("--standalone", "-s")
|
||||||
.help("Used to initialize a controller as a single-node dynamic quorum.")
|
.help("Used to initialize a controller as a single-node dynamic quorum. When setting this flag, " +
|
||||||
|
"the controller.quorum.voters config must not be set, and controller.quorum.bootstrap.servers is set instead.")
|
||||||
.action(storeTrue())
|
.action(storeTrue())
|
||||||
|
|
||||||
reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N")
|
reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N")
|
||||||
.help("Used to initialize a server without a dynamic quorum topology.")
|
.help("Used to initialize a server without specifying a dynamic quorum. When setting this flag, " +
|
||||||
|
"the controller.quorum.voters config should not be set, and controller.quorum.bootstrap.servers is set instead.")
|
||||||
.action(storeTrue())
|
.action(storeTrue())
|
||||||
|
|
||||||
reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I")
|
reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I")
|
||||||
.help("Used to initialize a server with a specific dynamic quorum topology. The argument " +
|
.help("Used to initialize a server with the specified dynamic quorum. The argument " +
|
||||||
"is a comma-separated list of id@hostname:port:directory. The same values must be used to " +
|
"is a comma-separated list of id@hostname:port:directory. The same values must be used to " +
|
||||||
"format all nodes. For example:\n0@example.com:8082:JEXY6aqzQY-32P5TStzaFg,1@example.com:8083:" +
|
"format all nodes. For example:\n0@example.com:8082:JEXY6aqzQY-32P5TStzaFg,1@example.com:8083:" +
|
||||||
"MvDxzVmcRsaTz33bUuRU6A,2@example.com:8084:07R5amHmR32VDA6jHkGbTA\n")
|
"MvDxzVmcRsaTz33bUuRU6A,2@example.com:8084:07R5amHmR32VDA6jHkGbTA\n. When setting this flag, " +
|
||||||
|
"the controller.quorum.voters config must not be set, and controller.quorum.bootstrap.servers is set instead.")
|
||||||
.action(store())
|
.action(store())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.kafka.test.TestUtils;
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -83,9 +84,8 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
new TestKitNodes.Builder().
|
new TestKitNodes.Builder().
|
||||||
setNumBrokerNodes(1).
|
setNumBrokerNodes(1).
|
||||||
setNumControllerNodes(1).
|
setNumControllerNodes(1).
|
||||||
setFeature(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel()).
|
|
||||||
build()
|
build()
|
||||||
).build()) {
|
).setStandalone(true).build()) {
|
||||||
cluster.format();
|
cluster.format();
|
||||||
cluster.startup();
|
cluster.startup();
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
||||||
|
@ -107,13 +107,23 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRemoveController() throws Exception {
|
public void testRemoveController() throws Exception {
|
||||||
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
|
final var nodes = new TestKitNodes.Builder().
|
||||||
new TestKitNodes.Builder().
|
|
||||||
setNumBrokerNodes(1).
|
setNumBrokerNodes(1).
|
||||||
setNumControllerNodes(3).
|
setNumControllerNodes(3).
|
||||||
setFeature(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel()).
|
build();
|
||||||
|
|
||||||
|
final Map<Integer, Uuid> initialVoters = new HashMap<>();
|
||||||
|
for (final var controllerNode : nodes.controllerNodes().values()) {
|
||||||
|
initialVoters.put(
|
||||||
|
controllerNode.id(),
|
||||||
|
controllerNode.metadataDirectoryId()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(nodes).
|
||||||
|
setInitialVoterSet(initialVoters).
|
||||||
build()
|
build()
|
||||||
).build()) {
|
) {
|
||||||
cluster.format();
|
cluster.format();
|
||||||
cluster.startup();
|
cluster.startup();
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
||||||
|
@ -132,12 +142,22 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRemoveAndAddSameController() throws Exception {
|
public void testRemoveAndAddSameController() throws Exception {
|
||||||
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
|
final var nodes = new TestKitNodes.Builder().
|
||||||
new TestKitNodes.Builder().
|
|
||||||
setNumBrokerNodes(1).
|
setNumBrokerNodes(1).
|
||||||
setNumControllerNodes(4).
|
setNumControllerNodes(4).
|
||||||
setFeature(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel()).
|
build();
|
||||||
build()).build()
|
|
||||||
|
final Map<Integer, Uuid> initialVoters = new HashMap<>();
|
||||||
|
for (final var controllerNode : nodes.controllerNodes().values()) {
|
||||||
|
initialVoters.put(
|
||||||
|
controllerNode.id(),
|
||||||
|
controllerNode.metadataDirectoryId()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(nodes).
|
||||||
|
setInitialVoterSet(initialVoters).
|
||||||
|
build()
|
||||||
) {
|
) {
|
||||||
cluster.format();
|
cluster.format();
|
||||||
cluster.startup();
|
cluster.startup();
|
||||||
|
|
|
@ -1011,8 +1011,7 @@ class KRaftClusterTest {
|
||||||
val cluster = new KafkaClusterTestKit.Builder(
|
val cluster = new KafkaClusterTestKit.Builder(
|
||||||
new TestKitNodes.Builder().
|
new TestKitNodes.Builder().
|
||||||
setNumBrokerNodes(1).
|
setNumBrokerNodes(1).
|
||||||
setNumControllerNodes(1).
|
setNumControllerNodes(1).build()).setStandalone(true).build()
|
||||||
setFeature(KRaftVersion.FEATURE_NAME, 1.toShort).build()).build()
|
|
||||||
try {
|
try {
|
||||||
cluster.format()
|
cluster.format()
|
||||||
cluster.startup()
|
cluster.startup()
|
||||||
|
|
|
@ -458,19 +458,14 @@ Found problem:
|
||||||
Seq("--release-version", "3.9-IV0"))).getMessage)
|
Seq("--release-version", "3.9-IV0"))).getMessage)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@Test
|
||||||
@ValueSource(booleans = Array(false, true))
|
def testFormatWithNoInitialControllersSucceedsOnController(): Unit = {
|
||||||
def testFormatWithNoInitialControllersSucceedsOnController(setKraftVersionFeature: Boolean): Unit = {
|
|
||||||
val availableDirs = Seq(TestUtils.tempDir())
|
val availableDirs = Seq(TestUtils.tempDir())
|
||||||
val properties = new Properties()
|
val properties = new Properties()
|
||||||
properties.putAll(defaultDynamicQuorumProperties)
|
properties.putAll(defaultDynamicQuorumProperties)
|
||||||
properties.setProperty("log.dirs", availableDirs.mkString(","))
|
properties.setProperty("log.dirs", availableDirs.mkString(","))
|
||||||
val stream = new ByteArrayOutputStream()
|
val stream = new ByteArrayOutputStream()
|
||||||
val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--no-initial-controllers")
|
val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--no-initial-controllers")
|
||||||
if (setKraftVersionFeature) {
|
|
||||||
arguments += "--feature"
|
|
||||||
arguments += "kraft.version=1"
|
|
||||||
}
|
|
||||||
assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq))
|
assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq))
|
||||||
assertTrue(stream.toString().
|
assertTrue(stream.toString().
|
||||||
contains("Formatting metadata directory %s".format(availableDirs.head)),
|
contains("Formatting metadata directory %s".format(availableDirs.head)),
|
||||||
|
|
|
@ -4072,45 +4072,27 @@ In the replica description 0@controller-0:1234:3Db5QLSqSZieL3rJBUUegA, 0 is the
|
||||||
If you are not sure whether you are using static or dynamic quorums, you can determine this by
|
If you are not sure whether you are using static or dynamic quorums, you can determine this by
|
||||||
running something like the following:<p>
|
running something like the following:<p>
|
||||||
|
|
||||||
<pre><code class="language-bash">
|
<pre><code class="language-bash">$ bin/kafka-features.sh --bootstrap-controller localhost:9093 describe</code></pre>
|
||||||
$ bin/kafka-features.sh --bootstrap-controller localhost:9093 describe
|
<p>
|
||||||
</code></pre><p>
|
|
||||||
|
|
||||||
If the <code>kraft.version</code> field is level 0 or absent, you are using a static quorum. If
|
If the <code>kraft.version</code> field is level 0 or absent, you are using a static quorum. If
|
||||||
it is 1 or above, you are using a dynamic quorum. For example, here is an example of a static
|
it is 1 or above, you are using a dynamic quorum. For example, here is an example of a static
|
||||||
quorum:<p/>
|
quorum:<p>
|
||||||
<pre><code class="language-bash">
|
<pre><code class="language-bash">Feature: kraft.version SupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 0 Epoch: 5
|
||||||
Feature: kraft.version SupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 0 Epoch: 5
|
Feature: metadata.version SupportedMinVersion: 3.3-IV3 SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0 Epoch: 5</code></pre>
|
||||||
Feature: metadata.version SupportedMinVersion: 3.3-IV3 SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0 Epoch: 5
|
<p>
|
||||||
</code></pre><p/>
|
Here is another example of a static quorum:<p>
|
||||||
|
<pre><code class="language-bash">Feature: metadata.version SupportedMinVersion: 3.3-IV3 SupportedMaxVersion: 3.8-IV0 FinalizedVersionLevel: 3.8-IV0 Epoch: 5</code></pre>
|
||||||
Here is another example of a static quorum:<p/>
|
<p>
|
||||||
<pre><code class="language-bash">
|
Here is an example of a dynamic quorum:<p>
|
||||||
Feature: metadata.version SupportedMinVersion: 3.3-IV3 SupportedMaxVersion: 3.8-IV0 FinalizedVersionLevel: 3.8-IV0 Epoch: 5
|
<pre><code class="language-bash">Feature: kraft.version SupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 1 Epoch: 5
|
||||||
</code></pre><p/>
|
Feature: metadata.version SupportedMinVersion: 3.3-IV3 SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0 Epoch: 5</code></pre>
|
||||||
|
<p>
|
||||||
Here is an example of a dynamic quorum:<p/>
|
|
||||||
<pre><code class="language-bash">
|
|
||||||
Feature: kraft.version SupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 1 Epoch: 5
|
|
||||||
Feature: metadata.version SupportedMinVersion: 3.3-IV3 SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0 Epoch: 5
|
|
||||||
</code></pre><p/>
|
|
||||||
|
|
||||||
The static versus dynamic nature of the quorum is determined at the time of formatting.
|
The static versus dynamic nature of the quorum is determined at the time of formatting.
|
||||||
Specifically, the quorum will be formatted as dynamic if <code>controller.quorum.voters</code> is
|
Specifically, the quorum will be formatted as dynamic if <code>controller.quorum.voters</code> is
|
||||||
<b>not</b> present, and if the software version is Apache Kafka 3.9 or newer. If you have
|
<b>not</b> present, and one of --standalone, --initial-controllers, or --no-initial-controllers is set.
|
||||||
followed the instructions earlier in this document, you will get a dynamic quorum.<p>
|
If you have followed the instructions earlier in this document, you will get a dynamic quorum.
|
||||||
|
<p>
|
||||||
If you would like the formatting process to fail if a dynamic quorum cannot be achieved, format your
|
Note: To migrate from static voter set to dynamic voter set, please refer to the <a href="#kraft_upgrade">Upgrade</a> section.
|
||||||
controllers using the <code>--feature kraft.version=1</code>. (Note that you should not supply
|
|
||||||
this flag when formatting brokers -- only when formatting controllers.)<p>
|
|
||||||
|
|
||||||
<pre><code class="language-bash">
|
|
||||||
$ bin/kafka-storage.sh format -t KAFKA_CLUSTER_ID --feature kraft.version=1 -c controller.properties
|
|
||||||
Cannot set kraft.version to 1 unless KIP-853 configuration is present. Try removing the --feature flag for kraft.version.
|
|
||||||
</code></pre><p>
|
|
||||||
|
|
||||||
Note: Currently it is <b>not</b> possible to convert clusters using a static controller quorum to
|
|
||||||
use a dynamic controller quorum. This function will be supported in the future release.
|
|
||||||
|
|
||||||
<h5 class="anchor-heading"><a id="kraft_reconfig_add" class="anchor-link"></a><a href="#kraft_reconfig_add">Add New Controller</a></h5>
|
<h5 class="anchor-heading"><a id="kraft_reconfig_add" class="anchor-link"></a><a href="#kraft_reconfig_add">Add New Controller</a></h5>
|
||||||
If a dynamic controller cluster already exists, it can be expanded by first provisioning a new controller using the <a href="#kraft_nodes_observers">kafka-storage.sh tool</a> and starting the controller.
|
If a dynamic controller cluster already exists, it can be expanded by first provisioning a new controller using the <a href="#kraft_nodes_observers">kafka-storage.sh tool</a> and starting the controller.
|
||||||
|
|
|
@ -131,7 +131,7 @@ public class Formatter {
|
||||||
* The initial KIP-853 voters.
|
* The initial KIP-853 voters.
|
||||||
*/
|
*/
|
||||||
private Optional<DynamicVoters> initialControllers = Optional.empty();
|
private Optional<DynamicVoters> initialControllers = Optional.empty();
|
||||||
private boolean noInitialControllersFlag = false;
|
private boolean hasDynamicQuorum = false;
|
||||||
|
|
||||||
public Formatter setPrintStream(PrintStream printStream) {
|
public Formatter setPrintStream(PrintStream printStream) {
|
||||||
this.printStream = printStream;
|
this.printStream = printStream;
|
||||||
|
@ -217,8 +217,8 @@ public class Formatter {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Formatter setNoInitialControllersFlag(boolean noInitialControllersFlag) {
|
public Formatter setHasDynamicQuorum(boolean hasDynamicQuorum) {
|
||||||
this.noInitialControllersFlag = noInitialControllersFlag;
|
this.hasDynamicQuorum = hasDynamicQuorum;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -227,7 +227,7 @@ public class Formatter {
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean hasDynamicQuorum() {
|
boolean hasDynamicQuorum() {
|
||||||
return initialControllers.isPresent() || noInitialControllersFlag;
|
return hasDynamicQuorum;
|
||||||
}
|
}
|
||||||
|
|
||||||
public BootstrapMetadata bootstrapMetadata() {
|
public BootstrapMetadata bootstrapMetadata() {
|
||||||
|
@ -337,8 +337,8 @@ public class Formatter {
|
||||||
/**
|
/**
|
||||||
* Calculate the effective feature level for kraft.version. In order to keep existing
|
* Calculate the effective feature level for kraft.version. In order to keep existing
|
||||||
* command-line invocations of StorageTool working, we default this to 0 if no dynamic
|
* command-line invocations of StorageTool working, we default this to 0 if no dynamic
|
||||||
* voter quorum arguments were provided. As a convenience, if dynamic voter quorum arguments
|
* voter quorum arguments were provided. As a convenience, if the static voters config is
|
||||||
* were passed, we set the latest kraft.version. (Currently there is only 1 non-zero version).
|
* empty, we set the latest kraft.version. (Currently there is only 1 non-zero version).
|
||||||
*
|
*
|
||||||
* @param configuredKRaftVersionLevel The configured level for kraft.version
|
* @param configuredKRaftVersionLevel The configured level for kraft.version
|
||||||
* @return The effective feature level.
|
* @return The effective feature level.
|
||||||
|
@ -348,20 +348,19 @@ public class Formatter {
|
||||||
if (configuredKRaftVersionLevel.get() == 0) {
|
if (configuredKRaftVersionLevel.get() == 0) {
|
||||||
if (hasDynamicQuorum()) {
|
if (hasDynamicQuorum()) {
|
||||||
throw new FormatterException(
|
throw new FormatterException(
|
||||||
"Cannot set kraft.version to " +
|
"Cannot set kraft.version to 0 if controller.quorum.voters is empty and one of the flags " +
|
||||||
configuredKRaftVersionLevel.get() +
|
"--standalone, --initial-controllers, or --no-initial-controllers is used. For dynamic " +
|
||||||
" if one of the flags --standalone, --initial-controllers, or --no-initial-controllers is used. " +
|
"controllers support, try removing the --feature flag for kraft.version."
|
||||||
"For dynamic controllers support, try removing the --feature flag for kraft.version."
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (!hasDynamicQuorum()) {
|
if (!hasDynamicQuorum()) {
|
||||||
throw new FormatterException(
|
throw new FormatterException(
|
||||||
"Cannot set kraft.version to " +
|
"Cannot set kraft.version to " + configuredKRaftVersionLevel.get() +
|
||||||
configuredKRaftVersionLevel.get() +
|
" unless controller.quorum.voters is empty and one of the flags --standalone, " +
|
||||||
" unless one of the flags --standalone, --initial-controllers, or --no-initial-controllers is used. " +
|
"--initial-controllers, or --no-initial-controllers is used. " +
|
||||||
"For dynamic controllers support, try using one of --standalone, --initial-controllers, or " +
|
"For dynamic controllers support, try using one of --standalone, --initial-controllers, " +
|
||||||
"--no-initial-controllers."
|
"or --no-initial-controllers and removing controller.quorum.voters."
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
|
||||||
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
|
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
|
||||||
import org.apache.kafka.server.common.Feature;
|
import org.apache.kafka.server.common.Feature;
|
||||||
import org.apache.kafka.server.common.GroupVersion;
|
import org.apache.kafka.server.common.GroupVersion;
|
||||||
|
import org.apache.kafka.server.common.KRaftVersion;
|
||||||
import org.apache.kafka.server.common.MetadataVersion;
|
import org.apache.kafka.server.common.MetadataVersion;
|
||||||
import org.apache.kafka.server.common.TestFeatureVersion;
|
import org.apache.kafka.server.common.TestFeatureVersion;
|
||||||
import org.apache.kafka.server.common.TransactionVersion;
|
import org.apache.kafka.server.common.TransactionVersion;
|
||||||
|
@ -200,6 +201,7 @@ public class FormatterTest {
|
||||||
String newDirectoryId = Uuid.randomUuid().toString();
|
String newDirectoryId = Uuid.randomUuid().toString();
|
||||||
formatter1.formatter
|
formatter1.formatter
|
||||||
.setInitialControllers(DynamicVoters.parse("1@localhost:8020:" + originalDirectoryId))
|
.setInitialControllers(DynamicVoters.parse("1@localhost:8020:" + originalDirectoryId))
|
||||||
|
.setHasDynamicQuorum(true)
|
||||||
.run();
|
.run();
|
||||||
assertEquals("Formatting dynamic metadata voter directory " + testEnv.directory(0) +
|
assertEquals("Formatting dynamic metadata voter directory " + testEnv.directory(0) +
|
||||||
" with metadata.version " + MetadataVersion.latestProduction() + ".",
|
" with metadata.version " + MetadataVersion.latestProduction() + ".",
|
||||||
|
@ -417,13 +419,14 @@ public class FormatterTest {
|
||||||
try (TestEnv testEnv = new TestEnv(2)) {
|
try (TestEnv testEnv = new TestEnv(2)) {
|
||||||
FormatterContext formatter1 = testEnv.newFormatter();
|
FormatterContext formatter1 = testEnv.newFormatter();
|
||||||
if (specifyKRaftVersion) {
|
if (specifyKRaftVersion) {
|
||||||
formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
|
formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 1);
|
||||||
}
|
}
|
||||||
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
|
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
|
||||||
formatter1.formatter.setInitialControllers(DynamicVoters.
|
formatter1.formatter.setInitialControllers(DynamicVoters.
|
||||||
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
|
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
|
||||||
|
formatter1.formatter.setHasDynamicQuorum(true);
|
||||||
formatter1.formatter.run();
|
formatter1.formatter.run();
|
||||||
assertEquals((short) 1, formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0));
|
assertEquals((short) 1, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
|
||||||
assertEquals(List.of(
|
assertEquals(List.of(
|
||||||
String.format("Formatting data directory %s with %s %s.",
|
String.format("Formatting data directory %s with %s %s.",
|
||||||
testEnv.directory(1),
|
testEnv.directory(1),
|
||||||
|
@ -450,49 +453,66 @@ public class FormatterTest {
|
||||||
public void testFormatWithInitialVotersFailsWithOlderKraftVersion() throws Exception {
|
public void testFormatWithInitialVotersFailsWithOlderKraftVersion() throws Exception {
|
||||||
try (TestEnv testEnv = new TestEnv(2)) {
|
try (TestEnv testEnv = new TestEnv(2)) {
|
||||||
FormatterContext formatter1 = testEnv.newFormatter();
|
FormatterContext formatter1 = testEnv.newFormatter();
|
||||||
formatter1.formatter.setFeatureLevel("kraft.version", (short) 0);
|
formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 0);
|
||||||
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
|
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
|
||||||
formatter1.formatter.setInitialControllers(DynamicVoters.
|
formatter1.formatter.setInitialControllers(DynamicVoters.
|
||||||
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
|
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
|
||||||
|
formatter1.formatter.setHasDynamicQuorum(true);
|
||||||
assertTrue(formatter1.formatter.hasDynamicQuorum());
|
assertTrue(formatter1.formatter.hasDynamicQuorum());
|
||||||
assertEquals(
|
assertEquals(
|
||||||
"Cannot set kraft.version to 0 if one of the flags --standalone, --initial-controllers, or " +
|
"Cannot set kraft.version to 0 if controller.quorum.voters is empty " +
|
||||||
"--no-initial-controllers is used. For dynamic controllers support, try removing the " +
|
"and one of the flags --standalone, --initial-controllers, or --no-initial-controllers is used. " +
|
||||||
"--feature flag for kraft.version.",
|
"For dynamic controllers support, try removing the --feature flag for kraft.version.",
|
||||||
assertThrows(FormatterException.class, () -> formatter1.formatter.run()).getMessage()
|
assertThrows(FormatterException.class, formatter1.formatter::run).getMessage()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFormatWithoutInitialVotersFailsWithNewerKraftVersion() throws Exception {
|
public void testFormatWithStaticQuorumFailsWithNewerKraftVersion() throws Exception {
|
||||||
try (TestEnv testEnv = new TestEnv(2)) {
|
try (TestEnv testEnv = new TestEnv(2)) {
|
||||||
FormatterContext formatter1 = testEnv.newFormatter();
|
FormatterContext formatter1 = testEnv.newFormatter();
|
||||||
formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
|
formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 1);
|
||||||
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
|
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
|
||||||
assertFalse(formatter1.formatter.hasDynamicQuorum());
|
assertFalse(formatter1.formatter.hasDynamicQuorum());
|
||||||
assertEquals(
|
assertEquals(
|
||||||
"Cannot set kraft.version to 1 unless one of the flags --standalone, --initial-controllers, or " +
|
"Cannot set kraft.version to 1 unless controller.quorum.voters is empty and " +
|
||||||
"--no-initial-controllers is used. For dynamic controllers support, try using one of " +
|
"one of the flags --standalone, --initial-controllers, or --no-initial-controllers is used. " +
|
||||||
"--standalone, --initial-controllers, or --no-initial-controllers.",
|
"For dynamic controllers support, try using one of --standalone, --initial-controllers, " +
|
||||||
assertThrows(FormatterException.class, () -> formatter1.formatter.run()).getMessage()
|
"or --no-initial-controllers and removing controller.quorum.voters.",
|
||||||
|
assertThrows(FormatterException.class, formatter1.formatter::run).getMessage()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFormatWithInitialVotersFailsWithOlderMetadataVersion() throws Exception {
|
public void testFormatWithInitialVotersWithOlderMetadataVersion() throws Exception {
|
||||||
try (TestEnv testEnv = new TestEnv(2)) {
|
try (TestEnv testEnv = new TestEnv(2)) {
|
||||||
FormatterContext formatter1 = testEnv.newFormatter();
|
FormatterContext formatter1 = testEnv.newFormatter();
|
||||||
formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0);
|
formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0);
|
||||||
formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
|
formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 1);
|
||||||
formatter1.formatter.setInitialControllers(DynamicVoters.
|
formatter1.formatter.setInitialControllers(DynamicVoters.
|
||||||
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
|
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
|
||||||
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
|
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
|
||||||
assertEquals("kraft.version could not be set to 1 because it depends on " +
|
formatter1.formatter.setHasDynamicQuorum(true);
|
||||||
"metadata.version level 21",
|
formatter1.formatter.run();
|
||||||
assertThrows(IllegalArgumentException.class,
|
assertEquals((short) 1, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
|
||||||
() -> formatter1.formatter.run()).getMessage());
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@ValueSource(booleans = {false, true})
|
||||||
|
public void testFormatWithNoInitialControllersWithOlderMetadataVersion(boolean hasDynamicQuorum) throws Exception {
|
||||||
|
try (TestEnv testEnv = new TestEnv(2)) {
|
||||||
|
FormatterContext formatter1 = testEnv.newFormatter();
|
||||||
|
formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0);
|
||||||
|
formatter1.formatter.setHasDynamicQuorum(hasDynamicQuorum);
|
||||||
|
formatter1.formatter.run();
|
||||||
|
if (hasDynamicQuorum) {
|
||||||
|
assertEquals((short) 1, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
|
||||||
|
} else {
|
||||||
|
assertEquals((short) 0, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -513,6 +533,7 @@ public class FormatterTest {
|
||||||
formatter1.formatter.setFeatureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME, (short) 1);
|
formatter1.formatter.setFeatureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME, (short) 1);
|
||||||
formatter1.formatter.setInitialControllers(DynamicVoters.
|
formatter1.formatter.setInitialControllers(DynamicVoters.
|
||||||
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
|
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
|
||||||
|
formatter1.formatter.setHasDynamicQuorum(true);
|
||||||
if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_0_IV1)) {
|
if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_0_IV1)) {
|
||||||
assertDoesNotThrow(() -> formatter1.formatter.run());
|
assertDoesNotThrow(() -> formatter1.formatter.run());
|
||||||
} else {
|
} else {
|
||||||
|
@ -524,20 +545,14 @@ public class FormatterTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@Test
|
||||||
@ValueSource(booleans = {false, true})
|
public void testFormatWithNoInitialControllers() throws Exception {
|
||||||
public void testFormatWithNoInitialControllers(boolean specifyKRaftVersion) throws Exception {
|
|
||||||
try (TestEnv testEnv = new TestEnv(2)) {
|
try (TestEnv testEnv = new TestEnv(2)) {
|
||||||
FormatterContext formatter1 = testEnv.newFormatter();
|
FormatterContext formatter1 = testEnv.newFormatter();
|
||||||
if (specifyKRaftVersion) {
|
|
||||||
formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
|
|
||||||
}
|
|
||||||
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
|
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
|
||||||
formatter1.formatter.setNoInitialControllersFlag(true);
|
assertFalse(formatter1.formatter.hasDynamicQuorum());
|
||||||
assertTrue(formatter1.formatter.hasDynamicQuorum());
|
|
||||||
|
|
||||||
formatter1.formatter.run();
|
formatter1.formatter.run();
|
||||||
assertEquals((short) 1, formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0));
|
assertEquals((short) 0, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
|
||||||
assertEquals(List.of(
|
assertEquals(List.of(
|
||||||
String.format("Formatting data directory %s with %s %s.",
|
String.format("Formatting data directory %s with %s %s.",
|
||||||
testEnv.directory(1),
|
testEnv.directory(1),
|
||||||
|
@ -557,38 +572,4 @@ public class FormatterTest {
|
||||||
assertNotNull(logDirProps1);
|
assertNotNull(logDirProps1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testFormatWithoutNoInitialControllersFailsWithNewerKraftVersion() throws Exception {
|
|
||||||
try (TestEnv testEnv = new TestEnv(2)) {
|
|
||||||
FormatterContext formatter1 = testEnv.newFormatter();
|
|
||||||
formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
|
|
||||||
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
|
|
||||||
formatter1.formatter.setNoInitialControllersFlag(false);
|
|
||||||
assertFalse(formatter1.formatter.hasDynamicQuorum());
|
|
||||||
assertEquals(
|
|
||||||
"Cannot set kraft.version to 1 unless one of the flags --standalone, --initial-controllers, or " +
|
|
||||||
"--no-initial-controllers is used. For dynamic controllers support, try using one of " +
|
|
||||||
"--standalone, --initial-controllers, or --no-initial-controllers.",
|
|
||||||
assertThrows(FormatterException.class, formatter1.formatter::run).getMessage()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testFormatWithNoInitialControllersFailsWithOlderKraftVersion() throws Exception {
|
|
||||||
try (TestEnv testEnv = new TestEnv(2)) {
|
|
||||||
FormatterContext formatter1 = testEnv.newFormatter();
|
|
||||||
formatter1.formatter.setFeatureLevel("kraft.version", (short) 0);
|
|
||||||
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
|
|
||||||
formatter1.formatter.setNoInitialControllersFlag(true);
|
|
||||||
assertTrue(formatter1.formatter.hasDynamicQuorum());
|
|
||||||
assertEquals(
|
|
||||||
"Cannot set kraft.version to 0 if one of the flags --standalone, --initial-controllers, or " +
|
|
||||||
"--no-initial-controllers is used. For dynamic controllers support, try removing the " +
|
|
||||||
"--feature flag for kraft.version.",
|
|
||||||
assertThrows(FormatterException.class, formatter1.formatter::run).getMessage()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -72,12 +72,7 @@ public enum KRaftVersion implements FeatureVersion {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Short> dependencies() {
|
public Map<String, Short> dependencies() {
|
||||||
if (this.featureLevel == 0) {
|
|
||||||
return Map.of();
|
return Map.of();
|
||||||
} else {
|
|
||||||
return Map.of(
|
|
||||||
MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_9_IV0.featureLevel());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isAtLeast(KRaftVersion otherVersion) {
|
public boolean isAtLeast(KRaftVersion otherVersion) {
|
||||||
|
|
|
@ -27,10 +27,12 @@ import kafka.server.SharedServer;
|
||||||
|
|
||||||
import org.apache.kafka.clients.CommonClientConfigs;
|
import org.apache.kafka.clients.CommonClientConfigs;
|
||||||
import org.apache.kafka.clients.admin.AdminClientConfig;
|
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||||
|
import org.apache.kafka.common.Uuid;
|
||||||
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
|
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
|
||||||
import org.apache.kafka.common.metrics.Metrics;
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
import org.apache.kafka.common.network.ListenerName;
|
import org.apache.kafka.common.network.ListenerName;
|
||||||
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
||||||
|
import org.apache.kafka.common.test.api.TestKitDefaults;
|
||||||
import org.apache.kafka.common.utils.ThreadUtils;
|
import org.apache.kafka.common.utils.ThreadUtils;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
|
@ -43,7 +45,6 @@ import org.apache.kafka.raft.DynamicVoters;
|
||||||
import org.apache.kafka.raft.MetadataLogConfig;
|
import org.apache.kafka.raft.MetadataLogConfig;
|
||||||
import org.apache.kafka.raft.QuorumConfig;
|
import org.apache.kafka.raft.QuorumConfig;
|
||||||
import org.apache.kafka.server.common.ApiMessageAndVersion;
|
import org.apache.kafka.server.common.ApiMessageAndVersion;
|
||||||
import org.apache.kafka.server.common.KRaftVersion;
|
|
||||||
import org.apache.kafka.server.config.KRaftConfigs;
|
import org.apache.kafka.server.config.KRaftConfigs;
|
||||||
import org.apache.kafka.server.config.ServerConfigs;
|
import org.apache.kafka.server.config.ServerConfigs;
|
||||||
import org.apache.kafka.server.fault.FaultHandler;
|
import org.apache.kafka.server.fault.FaultHandler;
|
||||||
|
@ -114,6 +115,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
private final String controllerListenerName;
|
private final String controllerListenerName;
|
||||||
private final String brokerSecurityProtocol;
|
private final String brokerSecurityProtocol;
|
||||||
private final String controllerSecurityProtocol;
|
private final String controllerSecurityProtocol;
|
||||||
|
private boolean standalone;
|
||||||
|
private Optional<Map<Integer, Uuid>> initialVoterSet = Optional.empty();
|
||||||
private boolean deleteOnClose;
|
private boolean deleteOnClose;
|
||||||
|
|
||||||
public Builder(TestKitNodes nodes) {
|
public Builder(TestKitNodes nodes) {
|
||||||
|
@ -130,6 +133,16 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder setStandalone(boolean standalone) {
|
||||||
|
this.standalone = standalone;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder setInitialVoterSet(Map<Integer, Uuid> initialVoterSet) {
|
||||||
|
this.initialVoterSet = Optional.of(initialVoterSet);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
private KafkaConfig createNodeConfig(TestKitNode node) throws IOException {
|
private KafkaConfig createNodeConfig(TestKitNode node) throws IOException {
|
||||||
TestKitNode brokerNode = nodes.brokerNodes().get(node.id());
|
TestKitNode brokerNode = nodes.brokerNodes().get(node.id());
|
||||||
TestKitNode controllerNode = nodes.controllerNodes().get(node.id());
|
TestKitNode controllerNode = nodes.controllerNodes().get(node.id());
|
||||||
|
@ -168,6 +181,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
props.putIfAbsent(INTER_BROKER_LISTENER_NAME_CONFIG, brokerListenerName);
|
props.putIfAbsent(INTER_BROKER_LISTENER_NAME_CONFIG, brokerListenerName);
|
||||||
props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, controllerListenerName);
|
props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, controllerListenerName);
|
||||||
|
|
||||||
|
if (!standalone && initialVoterSet.isEmpty()) {
|
||||||
StringBuilder quorumVoterStringBuilder = new StringBuilder();
|
StringBuilder quorumVoterStringBuilder = new StringBuilder();
|
||||||
String prefix = "";
|
String prefix = "";
|
||||||
for (int nodeId : nodes.controllerNodes().keySet()) {
|
for (int nodeId : nodes.controllerNodes().keySet()) {
|
||||||
|
@ -180,6 +194,18 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
prefix = ",";
|
prefix = ",";
|
||||||
}
|
}
|
||||||
props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, quorumVoterStringBuilder.toString());
|
props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, quorumVoterStringBuilder.toString());
|
||||||
|
} else {
|
||||||
|
StringBuilder bootstrapServersStringBuilder = new StringBuilder();
|
||||||
|
String prefix = "";
|
||||||
|
for (int nodeId : nodes.controllerNodes().keySet()) {
|
||||||
|
bootstrapServersStringBuilder.append(prefix).
|
||||||
|
append("localhost").
|
||||||
|
append(":").
|
||||||
|
append(socketFactoryManager.getOrCreatePortForListener(nodeId, controllerListenerName));
|
||||||
|
prefix = ",";
|
||||||
|
}
|
||||||
|
props.put(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, bootstrapServersStringBuilder.toString());
|
||||||
|
}
|
||||||
|
|
||||||
// reduce log cleaner offset map memory usage
|
// reduce log cleaner offset map memory usage
|
||||||
props.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");
|
props.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");
|
||||||
|
@ -258,7 +284,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
Time.SYSTEM,
|
Time.SYSTEM,
|
||||||
new Metrics(),
|
new Metrics(),
|
||||||
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())),
|
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())),
|
||||||
List.of(),
|
QuorumConfig.parseBootstrapServers(config.quorumConfig().bootstrapServers()),
|
||||||
faultHandlerFactory,
|
faultHandlerFactory,
|
||||||
socketFactoryManager.getOrCreateSocketFactory(node.id())
|
socketFactoryManager.getOrCreateSocketFactory(node.id())
|
||||||
);
|
);
|
||||||
|
@ -286,7 +312,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
Time.SYSTEM,
|
Time.SYSTEM,
|
||||||
new Metrics(),
|
new Metrics(),
|
||||||
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())),
|
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())),
|
||||||
List.of(),
|
QuorumConfig.parseBootstrapServers(config.quorumConfig().bootstrapServers()),
|
||||||
faultHandlerFactory,
|
faultHandlerFactory,
|
||||||
socketFactoryManager.getOrCreateSocketFactory(node.id())
|
socketFactoryManager.getOrCreateSocketFactory(node.id())
|
||||||
);
|
);
|
||||||
|
@ -323,6 +349,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
faultHandlerFactory,
|
faultHandlerFactory,
|
||||||
socketFactoryManager,
|
socketFactoryManager,
|
||||||
jaasFile,
|
jaasFile,
|
||||||
|
standalone,
|
||||||
|
initialVoterSet,
|
||||||
deleteOnClose);
|
deleteOnClose);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -368,6 +396,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
private final PreboundSocketFactoryManager socketFactoryManager;
|
private final PreboundSocketFactoryManager socketFactoryManager;
|
||||||
private final String controllerListenerName;
|
private final String controllerListenerName;
|
||||||
private final Optional<File> jaasFile;
|
private final Optional<File> jaasFile;
|
||||||
|
private final boolean standalone;
|
||||||
|
private final Optional<Map<Integer, Uuid>> initialVoterSet;
|
||||||
private final boolean deleteOnClose;
|
private final boolean deleteOnClose;
|
||||||
|
|
||||||
private KafkaClusterTestKit(
|
private KafkaClusterTestKit(
|
||||||
|
@ -378,6 +408,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
SimpleFaultHandlerFactory faultHandlerFactory,
|
SimpleFaultHandlerFactory faultHandlerFactory,
|
||||||
PreboundSocketFactoryManager socketFactoryManager,
|
PreboundSocketFactoryManager socketFactoryManager,
|
||||||
Optional<File> jaasFile,
|
Optional<File> jaasFile,
|
||||||
|
boolean standalone,
|
||||||
|
Optional<Map<Integer, Uuid>> initialVoterSet,
|
||||||
boolean deleteOnClose
|
boolean deleteOnClose
|
||||||
) {
|
) {
|
||||||
/*
|
/*
|
||||||
|
@ -395,6 +427,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
this.socketFactoryManager = socketFactoryManager;
|
this.socketFactoryManager = socketFactoryManager;
|
||||||
this.controllerListenerName = nodes.controllerListenerName().value();
|
this.controllerListenerName = nodes.controllerListenerName().value();
|
||||||
this.jaasFile = jaasFile;
|
this.jaasFile = jaasFile;
|
||||||
|
this.standalone = standalone;
|
||||||
|
this.initialVoterSet = initialVoterSet;
|
||||||
this.deleteOnClose = deleteOnClose;
|
this.deleteOnClose = deleteOnClose;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -425,8 +459,9 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
boolean writeMetadataDirectory
|
boolean writeMetadataDirectory
|
||||||
) {
|
) {
|
||||||
try {
|
try {
|
||||||
|
final var nodeId = ensemble.nodeId().getAsInt();
|
||||||
Formatter formatter = new Formatter();
|
Formatter formatter = new Formatter();
|
||||||
formatter.setNodeId(ensemble.nodeId().getAsInt());
|
formatter.setNodeId(nodeId);
|
||||||
formatter.setClusterId(ensemble.clusterId().get());
|
formatter.setClusterId(ensemble.clusterId().get());
|
||||||
if (writeMetadataDirectory) {
|
if (writeMetadataDirectory) {
|
||||||
formatter.setDirectories(ensemble.logDirProps().keySet());
|
formatter.setDirectories(ensemble.logDirProps().keySet());
|
||||||
|
@ -439,8 +474,6 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
formatter.setReleaseVersion(nodes.bootstrapMetadata().metadataVersion());
|
formatter.setReleaseVersion(nodes.bootstrapMetadata().metadataVersion());
|
||||||
formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME,
|
|
||||||
nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME));
|
|
||||||
formatter.setUnstableFeatureVersionsEnabled(true);
|
formatter.setUnstableFeatureVersionsEnabled(true);
|
||||||
formatter.setIgnoreFormatted(false);
|
formatter.setIgnoreFormatted(false);
|
||||||
formatter.setControllerListenerName(controllerListenerName);
|
formatter.setControllerListenerName(controllerListenerName);
|
||||||
|
@ -449,18 +482,43 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
} else {
|
} else {
|
||||||
formatter.setMetadataLogDirectory(Optional.empty());
|
formatter.setMetadataLogDirectory(Optional.empty());
|
||||||
}
|
}
|
||||||
if (nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME) > 0) {
|
|
||||||
StringBuilder dynamicVotersBuilder = new StringBuilder();
|
StringBuilder dynamicVotersBuilder = new StringBuilder();
|
||||||
String prefix = "";
|
String prefix = "";
|
||||||
for (TestKitNode controllerNode : nodes.controllerNodes().values()) {
|
if (standalone) {
|
||||||
int port = socketFactoryManager.
|
if (nodeId == TestKitDefaults.BROKER_ID_OFFSET + TestKitDefaults.CONTROLLER_ID_OFFSET) {
|
||||||
getOrCreatePortForListener(controllerNode.id(), controllerListenerName);
|
final var controllerNode = nodes.controllerNodes().get(nodeId);
|
||||||
|
dynamicVotersBuilder.append(
|
||||||
|
String.format(
|
||||||
|
"%d@localhost:%d:%s",
|
||||||
|
controllerNode.id(),
|
||||||
|
socketFactoryManager.
|
||||||
|
getOrCreatePortForListener(controllerNode.id(), controllerListenerName),
|
||||||
|
controllerNode.metadataDirectoryId()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
|
||||||
|
}
|
||||||
|
// when the nodeId != TestKitDefaults.CONTROLLER_ID_OFFSET, the node is formatting with
|
||||||
|
// the --no-initial-controllers flag
|
||||||
|
formatter.setHasDynamicQuorum(true);
|
||||||
|
} else if (initialVoterSet.isPresent()) {
|
||||||
|
for (final var controllerNode : initialVoterSet.get().entrySet()) {
|
||||||
|
final var voterId = controllerNode.getKey();
|
||||||
|
final var voterDirectoryId = controllerNode.getValue();
|
||||||
dynamicVotersBuilder.append(prefix);
|
dynamicVotersBuilder.append(prefix);
|
||||||
prefix = ",";
|
prefix = ",";
|
||||||
dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s",
|
dynamicVotersBuilder.append(
|
||||||
controllerNode.id(), port, controllerNode.metadataDirectoryId()));
|
String.format(
|
||||||
|
"%d@localhost:%d:%s",
|
||||||
|
voterId,
|
||||||
|
socketFactoryManager.
|
||||||
|
getOrCreatePortForListener(voterId, controllerListenerName),
|
||||||
|
voterDirectoryId
|
||||||
|
)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
|
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
|
||||||
|
formatter.setHasDynamicQuorum(true);
|
||||||
}
|
}
|
||||||
formatter.run();
|
formatter.run();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
|
@ -93,11 +93,6 @@ public class TestKitNodes {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder setFeature(String featureName, short level) {
|
|
||||||
this.bootstrapMetadata = bootstrapMetadata.copyWithFeatureRecord(featureName, level);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Builder setCombined(boolean combined) {
|
public Builder setCombined(boolean combined) {
|
||||||
this.combined = combined;
|
this.combined = combined;
|
||||||
return this;
|
return this;
|
||||||
|
|
Loading…
Reference in New Issue