mirror of https://github.com/apache/kafka.git
KAFKA-19719 --no-initial-controllers should not assume kraft.version=… (#20616)
CI / build (push) Has been cancelled
Details
CI / build (push) Has been cancelled
Details
backport KAFKA-19719 to 4.0 Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
7e27a78f49
commit
099e91f5fc
|
@ -135,21 +135,30 @@ object StorageTool extends Logging {
|
||||||
featureNamesAndLevels(_).foreachEntry {
|
featureNamesAndLevels(_).foreachEntry {
|
||||||
(k, v) => formatter.setFeatureLevel(k, v)
|
(k, v) => formatter.setFeatureLevel(k, v)
|
||||||
})
|
})
|
||||||
Option(namespace.getString("initial_controllers")).
|
val initialControllers = namespace.getString("initial_controllers")
|
||||||
|
val isStandalone = namespace.getBoolean("standalone")
|
||||||
|
val staticVotersEmpty = config.quorumConfig.voters().isEmpty
|
||||||
|
formatter.setHasDynamicQuorum(staticVotersEmpty)
|
||||||
|
if (!staticVotersEmpty && (Option(initialControllers).isDefined || isStandalone)) {
|
||||||
|
throw new TerseFailure("You cannot specify " +
|
||||||
|
QuorumConfig.QUORUM_VOTERS_CONFIG + " and format the node " +
|
||||||
|
"with --initial-controllers or --standalone. " +
|
||||||
|
"If you want to use dynamic quorum, please remove " +
|
||||||
|
QuorumConfig.QUORUM_VOTERS_CONFIG + " and specify " +
|
||||||
|
QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG + " instead.")
|
||||||
|
}
|
||||||
|
Option(initialControllers).
|
||||||
foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v)))
|
foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v)))
|
||||||
if (namespace.getBoolean("standalone")) {
|
if (isStandalone) {
|
||||||
formatter.setInitialControllers(createStandaloneDynamicVoters(config))
|
formatter.setInitialControllers(createStandaloneDynamicVoters(config))
|
||||||
}
|
}
|
||||||
if (namespace.getBoolean("no_initial_controllers")) {
|
if (!namespace.getBoolean("no_initial_controllers") &&
|
||||||
formatter.setNoInitialControllersFlag(true)
|
config.processRoles.contains(ProcessRole.ControllerRole) &&
|
||||||
} else {
|
staticVotersEmpty &&
|
||||||
if (config.processRoles.contains(ProcessRole.ControllerRole)) {
|
formatter.initialVoters().isEmpty) {
|
||||||
if (config.quorumConfig.voters().isEmpty && formatter.initialVoters().isEmpty) {
|
|
||||||
throw new TerseFailure("Because " + QuorumConfig.QUORUM_VOTERS_CONFIG +
|
throw new TerseFailure("Because " + QuorumConfig.QUORUM_VOTERS_CONFIG +
|
||||||
" is not set on this controller, you must specify one of the following: " +
|
" is not set on this controller, you must specify one of the following: " +
|
||||||
"--standalone, --initial-controllers, or --no-initial-controllers.");
|
"--standalone, --initial-controllers, or --no-initial-controllers.");
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Option(namespace.getList("add_scram")).
|
Option(namespace.getList("add_scram")).
|
||||||
foreach(scramArgs => formatter.setScramArguments(scramArgs.asInstanceOf[util.List[String]]))
|
foreach(scramArgs => formatter.setScramArguments(scramArgs.asInstanceOf[util.List[String]]))
|
||||||
|
@ -319,18 +328,21 @@ object StorageTool extends Logging {
|
||||||
|
|
||||||
val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup()
|
val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup()
|
||||||
reconfigurableQuorumOptions.addArgument("--standalone", "-s")
|
reconfigurableQuorumOptions.addArgument("--standalone", "-s")
|
||||||
.help("Used to initialize a controller as a single-node dynamic quorum.")
|
.help("Used to initialize a controller as a single-node dynamic quorum. When setting this flag, " +
|
||||||
|
"the controller.quorum.voters config must not be set, and controller.quorum.bootstrap.servers is set instead.")
|
||||||
.action(storeTrue())
|
.action(storeTrue())
|
||||||
|
|
||||||
reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N")
|
reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N")
|
||||||
.help("Used to initialize a server without a dynamic quorum topology.")
|
.help("Used to initialize a server without specifying a dynamic quorum. When setting this flag, " +
|
||||||
|
"the controller.quorum.voters config should not be set, and controller.quorum.bootstrap.servers is set instead.")
|
||||||
.action(storeTrue())
|
.action(storeTrue())
|
||||||
|
|
||||||
reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I")
|
reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I")
|
||||||
.help("Used to initialize a server with a specific dynamic quorum topology. The argument " +
|
.help("Used to initialize a server with the specified dynamic quorum. The argument " +
|
||||||
"is a comma-separated list of id@hostname:port:directory. The same values must be used to " +
|
"is a comma-separated list of id@hostname:port:directory. The same values must be used to " +
|
||||||
"format all nodes. For example:\n0@example.com:8082:JEXY6aqzQY-32P5TStzaFg,1@example.com:8083:" +
|
"format all nodes. For example:\n0@example.com:8082:JEXY6aqzQY-32P5TStzaFg,1@example.com:8083:" +
|
||||||
"MvDxzVmcRsaTz33bUuRU6A,2@example.com:8084:07R5amHmR32VDA6jHkGbTA\n")
|
"MvDxzVmcRsaTz33bUuRU6A,2@example.com:8084:07R5amHmR32VDA6jHkGbTA\n. When setting this flag, " +
|
||||||
|
"the controller.quorum.voters config must not be set, and controller.quorum.bootstrap.servers is set instead.")
|
||||||
.action(store())
|
.action(store())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
@ -83,9 +84,8 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
new TestKitNodes.Builder().
|
new TestKitNodes.Builder().
|
||||||
setNumBrokerNodes(1).
|
setNumBrokerNodes(1).
|
||||||
setNumControllerNodes(1).
|
setNumControllerNodes(1).
|
||||||
setFeature(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel()).
|
|
||||||
build()
|
build()
|
||||||
).build()) {
|
).setStandalone(true).build()) {
|
||||||
cluster.format();
|
cluster.format();
|
||||||
cluster.startup();
|
cluster.startup();
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
||||||
|
@ -107,13 +107,23 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRemoveController() throws Exception {
|
public void testRemoveController() throws Exception {
|
||||||
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
|
final var nodes = new TestKitNodes.Builder().
|
||||||
new TestKitNodes.Builder().
|
setNumBrokerNodes(1).
|
||||||
setNumBrokerNodes(1).
|
setNumControllerNodes(3).
|
||||||
setNumControllerNodes(3).
|
build();
|
||||||
setFeature(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel()).
|
|
||||||
build()
|
final Map<Integer, Uuid> initialVoters = new HashMap<>();
|
||||||
).build()) {
|
for (final var controllerNode : nodes.controllerNodes().values()) {
|
||||||
|
initialVoters.put(
|
||||||
|
controllerNode.id(),
|
||||||
|
controllerNode.metadataDirectoryId()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(nodes).
|
||||||
|
setInitialVoterSet(initialVoters).
|
||||||
|
build()
|
||||||
|
) {
|
||||||
cluster.format();
|
cluster.format();
|
||||||
cluster.startup();
|
cluster.startup();
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
||||||
|
@ -132,12 +142,22 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRemoveAndAddSameController() throws Exception {
|
public void testRemoveAndAddSameController() throws Exception {
|
||||||
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
|
final var nodes = new TestKitNodes.Builder().
|
||||||
new TestKitNodes.Builder().
|
setNumBrokerNodes(1).
|
||||||
setNumBrokerNodes(1).
|
setNumControllerNodes(4).
|
||||||
setNumControllerNodes(4).
|
build();
|
||||||
setFeature(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel()).
|
|
||||||
build()).build()
|
final Map<Integer, Uuid> initialVoters = new HashMap<>();
|
||||||
|
for (final var controllerNode : nodes.controllerNodes().values()) {
|
||||||
|
initialVoters.put(
|
||||||
|
controllerNode.id(),
|
||||||
|
controllerNode.metadataDirectoryId()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(nodes).
|
||||||
|
setInitialVoterSet(initialVoters).
|
||||||
|
build()
|
||||||
) {
|
) {
|
||||||
cluster.format();
|
cluster.format();
|
||||||
cluster.startup();
|
cluster.startup();
|
||||||
|
|
|
@ -1013,8 +1013,7 @@ class KRaftClusterTest {
|
||||||
val cluster = new KafkaClusterTestKit.Builder(
|
val cluster = new KafkaClusterTestKit.Builder(
|
||||||
new TestKitNodes.Builder().
|
new TestKitNodes.Builder().
|
||||||
setNumBrokerNodes(1).
|
setNumBrokerNodes(1).
|
||||||
setNumControllerNodes(1).
|
setNumControllerNodes(1).build()).setStandalone(true).build()
|
||||||
setFeature(KRaftVersion.FEATURE_NAME, 1.toShort).build()).build()
|
|
||||||
try {
|
try {
|
||||||
cluster.format()
|
cluster.format()
|
||||||
cluster.startup()
|
cluster.startup()
|
||||||
|
|
|
@ -375,7 +375,10 @@ Found problem:
|
||||||
def testFormatWithStandaloneFlagOnBrokerFails(): Unit = {
|
def testFormatWithStandaloneFlagOnBrokerFails(): Unit = {
|
||||||
val availableDirs = Seq(TestUtils.tempDir())
|
val availableDirs = Seq(TestUtils.tempDir())
|
||||||
val properties = new Properties()
|
val properties = new Properties()
|
||||||
properties.putAll(defaultStaticQuorumProperties)
|
properties.setProperty("process.roles", "broker")
|
||||||
|
properties.setProperty("node.id", "0")
|
||||||
|
properties.setProperty("controller.listener.names", "CONTROLLER")
|
||||||
|
properties.setProperty("controller.quorum.bootstrap.servers", "localhost:9093")
|
||||||
properties.setProperty("log.dirs", availableDirs.mkString(","))
|
properties.setProperty("log.dirs", availableDirs.mkString(","))
|
||||||
val stream = new ByteArrayOutputStream()
|
val stream = new ByteArrayOutputStream()
|
||||||
val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--standalone")
|
val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--standalone")
|
||||||
|
@ -458,19 +461,14 @@ Found problem:
|
||||||
Seq("--release-version", "3.9-IV0"))).getMessage)
|
Seq("--release-version", "3.9-IV0"))).getMessage)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@Test
|
||||||
@ValueSource(booleans = Array(false, true))
|
def testFormatWithNoInitialControllersSucceedsOnController(): Unit = {
|
||||||
def testFormatWithNoInitialControllersSucceedsOnController(setKraftVersionFeature: Boolean): Unit = {
|
|
||||||
val availableDirs = Seq(TestUtils.tempDir())
|
val availableDirs = Seq(TestUtils.tempDir())
|
||||||
val properties = new Properties()
|
val properties = new Properties()
|
||||||
properties.putAll(defaultDynamicQuorumProperties)
|
properties.putAll(defaultDynamicQuorumProperties)
|
||||||
properties.setProperty("log.dirs", availableDirs.mkString(","))
|
properties.setProperty("log.dirs", availableDirs.mkString(","))
|
||||||
val stream = new ByteArrayOutputStream()
|
val stream = new ByteArrayOutputStream()
|
||||||
val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--no-initial-controllers")
|
val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--no-initial-controllers")
|
||||||
if (setKraftVersionFeature) {
|
|
||||||
arguments += "--feature"
|
|
||||||
arguments += "kraft.version=1"
|
|
||||||
}
|
|
||||||
assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq))
|
assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq))
|
||||||
assertTrue(stream.toString().
|
assertTrue(stream.toString().
|
||||||
contains("Formatting metadata directory %s".format(availableDirs.head)),
|
contains("Formatting metadata directory %s".format(availableDirs.head)),
|
||||||
|
|
|
@ -3869,45 +3869,29 @@ In the replica description 0@controller-0:1234:3Db5QLSqSZieL3rJBUUegA, 0 is the
|
||||||
If you are not sure whether you are using static or dynamic quorums, you can determine this by
|
If you are not sure whether you are using static or dynamic quorums, you can determine this by
|
||||||
running something like the following:<p>
|
running something like the following:<p>
|
||||||
|
|
||||||
<pre><code class="language-bash">
|
<pre><code class="language-bash">$ bin/kafka-features.sh --bootstrap-controller localhost:9093 describe</code></pre>
|
||||||
$ bin/kafka-features.sh --bootstrap-controller localhost:9093 describe
|
<p>
|
||||||
</code></pre><p>
|
If the <code>kraft.version</code> field is level 0 or absent, you are using a static quorum. If
|
||||||
|
it is 1 or above, you are using a dynamic quorum. For example, here is an example of a static
|
||||||
|
quorum:<p>
|
||||||
|
<pre><code class="language-bash">Feature: kraft.version SupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 0 Epoch: 5
|
||||||
|
Feature: metadata.version SupportedMinVersion: 3.3-IV3 SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0 Epoch: 5</code></pre>
|
||||||
|
<p>
|
||||||
|
Here is another example of a static quorum:<p>
|
||||||
|
<pre><code class="language-bash">Feature: metadata.version SupportedMinVersion: 3.3-IV3 SupportedMaxVersion: 3.8-IV0 FinalizedVersionLevel: 3.8-IV0 Epoch: 5</code></pre>
|
||||||
|
<p>
|
||||||
|
Here is an example of a dynamic quorum:<p>
|
||||||
|
<pre><code class="language-bash">Feature: kraft.version SupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 1 Epoch: 5
|
||||||
|
Feature: metadata.version SupportedMinVersion: 3.3-IV3 SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0 Epoch: 5</code></pre>
|
||||||
|
<p>
|
||||||
|
The static versus dynamic nature of the quorum is determined at the time of formatting.
|
||||||
|
Specifically, the quorum will be formatted as dynamic if <code>controller.quorum.voters</code> is
|
||||||
|
<b>not</b> present, and one of --standalone, --initial-controllers, or --no-initial-controllers is set.
|
||||||
|
If you have followed the instructions earlier in this document, you will get a dynamic quorum.
|
||||||
|
<p>
|
||||||
|
|
||||||
If the <code>kraft.version</code> field is level 0 or absent, you are using a static quorum. If
|
Note: Currently it is <b>not</b> possible to convert clusters using a static controller quorum to
|
||||||
it is 1 or above, you are using a dynamic quorum. For example, here is an example of a static
|
use a dynamic controller quorum. This function will be supported in the future release.
|
||||||
quorum:<p/>
|
|
||||||
<pre><code class="language-bash">
|
|
||||||
Feature: kraft.version SupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 0 Epoch: 5
|
|
||||||
Feature: metadata.version SupportedMinVersion: 3.3-IV3 SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0 Epoch: 5
|
|
||||||
</code></pre><p/>
|
|
||||||
|
|
||||||
Here is another example of a static quorum:<p/>
|
|
||||||
<pre><code class="language-bash">
|
|
||||||
Feature: metadata.version SupportedMinVersion: 3.3-IV3 SupportedMaxVersion: 3.8-IV0 FinalizedVersionLevel: 3.8-IV0 Epoch: 5
|
|
||||||
</code></pre><p/>
|
|
||||||
|
|
||||||
Here is an example of a dynamic quorum:<p/>
|
|
||||||
<pre><code class="language-bash">
|
|
||||||
Feature: kraft.version SupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 1 Epoch: 5
|
|
||||||
Feature: metadata.version SupportedMinVersion: 3.3-IV3 SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0 Epoch: 5
|
|
||||||
</code></pre><p/>
|
|
||||||
|
|
||||||
The static versus dynamic nature of the quorum is determined at the time of formatting.
|
|
||||||
Specifically, the quorum will be formatted as dynamic if <code>controller.quorum.voters</code> is
|
|
||||||
<b>not</b> present, and if the software version is Apache Kafka 3.9 or newer. If you have
|
|
||||||
followed the instructions earlier in this document, you will get a dynamic quorum.<p>
|
|
||||||
|
|
||||||
If you would like the formatting process to fail if a dynamic quorum cannot be achieved, format your
|
|
||||||
controllers using the <code>--feature kraft.version=1</code>. (Note that you should not supply
|
|
||||||
this flag when formatting brokers -- only when formatting controllers.)<p>
|
|
||||||
|
|
||||||
<pre><code class="language-bash">
|
|
||||||
$ bin/kafka-storage.sh format -t KAFKA_CLUSTER_ID --feature kraft.version=1 -c controller_static.properties
|
|
||||||
Cannot set kraft.version to 1 unless KIP-853 configuration is present. Try removing the --feature flag for kraft.version.
|
|
||||||
</code></pre><p>
|
|
||||||
|
|
||||||
Note: Currently it is <b>not</b> possible to convert clusters using a static controller quorum to
|
|
||||||
use a dynamic controller quorum. This function will be supported in the future release.
|
|
||||||
|
|
||||||
<h5 class="anchor-heading"><a id="kraft_reconfig_add" class="anchor-link"></a><a href="#kraft_reconfig_add">Add New Controller</a></h5>
|
<h5 class="anchor-heading"><a id="kraft_reconfig_add" class="anchor-link"></a><a href="#kraft_reconfig_add">Add New Controller</a></h5>
|
||||||
If a dynamic controller cluster already exists, it can be expanded by first provisioning a new controller using the <a href="#kraft_storage_observers">kafka-storage.sh tool</a> and starting the controller.
|
If a dynamic controller cluster already exists, it can be expanded by first provisioning a new controller using the <a href="#kraft_storage_observers">kafka-storage.sh tool</a> and starting the controller.
|
||||||
|
|
|
@ -132,7 +132,7 @@ public class Formatter {
|
||||||
* The initial KIP-853 voters.
|
* The initial KIP-853 voters.
|
||||||
*/
|
*/
|
||||||
private Optional<DynamicVoters> initialControllers = Optional.empty();
|
private Optional<DynamicVoters> initialControllers = Optional.empty();
|
||||||
private boolean noInitialControllersFlag = false;
|
private boolean hasDynamicQuorum = false;
|
||||||
|
|
||||||
public Formatter setPrintStream(PrintStream printStream) {
|
public Formatter setPrintStream(PrintStream printStream) {
|
||||||
this.printStream = printStream;
|
this.printStream = printStream;
|
||||||
|
@ -218,8 +218,8 @@ public class Formatter {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Formatter setNoInitialControllersFlag(boolean noInitialControllersFlag) {
|
public Formatter setHasDynamicQuorum(boolean hasDynamicQuorum) {
|
||||||
this.noInitialControllersFlag = noInitialControllersFlag;
|
this.hasDynamicQuorum = hasDynamicQuorum;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -228,7 +228,7 @@ public class Formatter {
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean hasDynamicQuorum() {
|
boolean hasDynamicQuorum() {
|
||||||
return initialControllers.isPresent() || noInitialControllersFlag;
|
return hasDynamicQuorum;
|
||||||
}
|
}
|
||||||
|
|
||||||
public BootstrapMetadata bootstrapMetadata() {
|
public BootstrapMetadata bootstrapMetadata() {
|
||||||
|
@ -338,8 +338,8 @@ public class Formatter {
|
||||||
/**
|
/**
|
||||||
* Calculate the effective feature level for kraft.version. In order to keep existing
|
* Calculate the effective feature level for kraft.version. In order to keep existing
|
||||||
* command-line invocations of StorageTool working, we default this to 0 if no dynamic
|
* command-line invocations of StorageTool working, we default this to 0 if no dynamic
|
||||||
* voter quorum arguments were provided. As a convenience, if dynamic voter quorum arguments
|
* voter quorum arguments were provided. As a convenience, if the static voters config is
|
||||||
* were passed, we set the latest kraft.version. (Currently there is only 1 non-zero version).
|
* empty, we set the latest kraft.version. (Currently there is only 1 non-zero version).
|
||||||
*
|
*
|
||||||
* @param configuredKRaftVersionLevel The configured level for kraft.version
|
* @param configuredKRaftVersionLevel The configured level for kraft.version
|
||||||
* @return The effective feature level.
|
* @return The effective feature level.
|
||||||
|
@ -348,15 +348,21 @@ public class Formatter {
|
||||||
if (configuredKRaftVersionLevel.isPresent()) {
|
if (configuredKRaftVersionLevel.isPresent()) {
|
||||||
if (configuredKRaftVersionLevel.get() == 0) {
|
if (configuredKRaftVersionLevel.get() == 0) {
|
||||||
if (hasDynamicQuorum()) {
|
if (hasDynamicQuorum()) {
|
||||||
throw new FormatterException("Cannot set kraft.version to " +
|
throw new FormatterException(
|
||||||
configuredKRaftVersionLevel.get() + " if KIP-853 configuration is present. " +
|
"Cannot set kraft.version to 0 if controller.quorum.voters is empty and one of the flags " +
|
||||||
"Try removing the --feature flag for kraft.version.");
|
"--standalone, --initial-controllers, or --no-initial-controllers is used. For dynamic " +
|
||||||
|
"controllers support, try removing the --feature flag for kraft.version."
|
||||||
|
);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (!hasDynamicQuorum()) {
|
if (!hasDynamicQuorum()) {
|
||||||
throw new FormatterException("Cannot set kraft.version to " +
|
throw new FormatterException(
|
||||||
configuredKRaftVersionLevel.get() + " unless KIP-853 configuration is present. " +
|
"Cannot set kraft.version to " + configuredKRaftVersionLevel.get() +
|
||||||
"Try removing the --feature flag for kraft.version.");
|
" unless controller.quorum.voters is empty and one of the flags --standalone, " +
|
||||||
|
"--initial-controllers, or --no-initial-controllers is used. " +
|
||||||
|
"For dynamic controllers support, try using one of --standalone, --initial-controllers, " +
|
||||||
|
"or --no-initial-controllers and removing controller.quorum.voters."
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return configuredKRaftVersionLevel.get();
|
return configuredKRaftVersionLevel.get();
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
|
||||||
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
|
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
|
||||||
import org.apache.kafka.server.common.Feature;
|
import org.apache.kafka.server.common.Feature;
|
||||||
import org.apache.kafka.server.common.GroupVersion;
|
import org.apache.kafka.server.common.GroupVersion;
|
||||||
|
import org.apache.kafka.server.common.KRaftVersion;
|
||||||
import org.apache.kafka.server.common.MetadataVersion;
|
import org.apache.kafka.server.common.MetadataVersion;
|
||||||
import org.apache.kafka.server.common.TestFeatureVersion;
|
import org.apache.kafka.server.common.TestFeatureVersion;
|
||||||
import org.apache.kafka.server.common.TransactionVersion;
|
import org.apache.kafka.server.common.TransactionVersion;
|
||||||
|
@ -194,6 +195,40 @@ public class FormatterTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStandaloneWithIgnoreFormatted() throws Exception {
|
||||||
|
try (TestEnv testEnv = new TestEnv(1)) {
|
||||||
|
FormatterContext formatter1 = testEnv.newFormatter();
|
||||||
|
String originalDirectoryId = Uuid.randomUuid().toString();
|
||||||
|
String newDirectoryId = Uuid.randomUuid().toString();
|
||||||
|
formatter1.formatter
|
||||||
|
.setInitialControllers(DynamicVoters.parse("1@localhost:8020:" + originalDirectoryId))
|
||||||
|
.setHasDynamicQuorum(true)
|
||||||
|
.run();
|
||||||
|
assertEquals("Formatting dynamic metadata voter directory " + testEnv.directory(0) +
|
||||||
|
" with metadata.version " + MetadataVersion.latestProduction() + ".",
|
||||||
|
formatter1.output().trim());
|
||||||
|
assertMetadataDirectoryId(testEnv, Uuid.fromString(originalDirectoryId));
|
||||||
|
|
||||||
|
FormatterContext formatter2 = testEnv.newFormatter();
|
||||||
|
formatter2.formatter
|
||||||
|
.setIgnoreFormatted(true)
|
||||||
|
.setInitialControllers(DynamicVoters.parse("1@localhost:8020:" + newDirectoryId))
|
||||||
|
.run();
|
||||||
|
assertEquals("All of the log directories are already formatted.",
|
||||||
|
formatter2.output().trim());
|
||||||
|
assertMetadataDirectoryId(testEnv, Uuid.fromString(originalDirectoryId));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertMetadataDirectoryId(TestEnv testEnv, Uuid expectedDirectoryId) throws Exception {
|
||||||
|
MetaPropertiesEnsemble ensemble = new MetaPropertiesEnsemble.Loader().
|
||||||
|
addLogDirs(testEnv.directories).
|
||||||
|
load();
|
||||||
|
MetaProperties logDirProps0 = ensemble.logDirProps().get(testEnv.directory(0));
|
||||||
|
assertEquals(expectedDirectoryId, logDirProps0.directoryId().get());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testOneDirectoryFormattedAndOthersNotFormatted() throws Exception {
|
public void testOneDirectoryFormattedAndOthersNotFormatted() throws Exception {
|
||||||
try (TestEnv testEnv = new TestEnv(2)) {
|
try (TestEnv testEnv = new TestEnv(2)) {
|
||||||
|
@ -383,14 +418,15 @@ public class FormatterTest {
|
||||||
try (TestEnv testEnv = new TestEnv(2)) {
|
try (TestEnv testEnv = new TestEnv(2)) {
|
||||||
FormatterContext formatter1 = testEnv.newFormatter();
|
FormatterContext formatter1 = testEnv.newFormatter();
|
||||||
if (specifyKRaftVersion) {
|
if (specifyKRaftVersion) {
|
||||||
formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
|
formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 1);
|
||||||
}
|
}
|
||||||
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
|
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
|
||||||
formatter1.formatter.setInitialControllers(DynamicVoters.
|
formatter1.formatter.setInitialControllers(DynamicVoters.
|
||||||
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
|
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
|
||||||
|
formatter1.formatter.setHasDynamicQuorum(true);
|
||||||
formatter1.formatter.run();
|
formatter1.formatter.run();
|
||||||
assertEquals((short) 1, formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0));
|
assertEquals((short) 1, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
|
||||||
assertEquals(Arrays.asList(
|
assertEquals(List.of(
|
||||||
String.format("Formatting data directory %s with %s %s.",
|
String.format("Formatting data directory %s with %s %s.",
|
||||||
testEnv.directory(1),
|
testEnv.directory(1),
|
||||||
MetadataVersion.FEATURE_NAME,
|
MetadataVersion.FEATURE_NAME,
|
||||||
|
@ -416,45 +452,66 @@ public class FormatterTest {
|
||||||
public void testFormatWithInitialVotersFailsWithOlderKraftVersion() throws Exception {
|
public void testFormatWithInitialVotersFailsWithOlderKraftVersion() throws Exception {
|
||||||
try (TestEnv testEnv = new TestEnv(2)) {
|
try (TestEnv testEnv = new TestEnv(2)) {
|
||||||
FormatterContext formatter1 = testEnv.newFormatter();
|
FormatterContext formatter1 = testEnv.newFormatter();
|
||||||
formatter1.formatter.setFeatureLevel("kraft.version", (short) 0);
|
formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 0);
|
||||||
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
|
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
|
||||||
formatter1.formatter.setInitialControllers(DynamicVoters.
|
formatter1.formatter.setInitialControllers(DynamicVoters.
|
||||||
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
|
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
|
||||||
|
formatter1.formatter.setHasDynamicQuorum(true);
|
||||||
assertTrue(formatter1.formatter.hasDynamicQuorum());
|
assertTrue(formatter1.formatter.hasDynamicQuorum());
|
||||||
assertEquals("Cannot set kraft.version to 0 if KIP-853 configuration is present. " +
|
assertEquals(
|
||||||
"Try removing the --feature flag for kraft.version.",
|
"Cannot set kraft.version to 0 if controller.quorum.voters is empty " +
|
||||||
assertThrows(FormatterException.class,
|
"and one of the flags --standalone, --initial-controllers, or --no-initial-controllers is used. " +
|
||||||
() -> formatter1.formatter.run()).getMessage());
|
"For dynamic controllers support, try removing the --feature flag for kraft.version.",
|
||||||
|
assertThrows(FormatterException.class, formatter1.formatter::run).getMessage()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFormatWithoutInitialVotersFailsWithNewerKraftVersion() throws Exception {
|
public void testFormatWithStaticQuorumFailsWithNewerKraftVersion() throws Exception {
|
||||||
try (TestEnv testEnv = new TestEnv(2)) {
|
try (TestEnv testEnv = new TestEnv(2)) {
|
||||||
FormatterContext formatter1 = testEnv.newFormatter();
|
FormatterContext formatter1 = testEnv.newFormatter();
|
||||||
formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
|
formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 1);
|
||||||
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
|
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
|
||||||
assertFalse(formatter1.formatter.hasDynamicQuorum());
|
assertFalse(formatter1.formatter.hasDynamicQuorum());
|
||||||
assertEquals("Cannot set kraft.version to 1 unless KIP-853 configuration is present. " +
|
assertEquals(
|
||||||
"Try removing the --feature flag for kraft.version.",
|
"Cannot set kraft.version to 1 unless controller.quorum.voters is empty and " +
|
||||||
assertThrows(FormatterException.class,
|
"one of the flags --standalone, --initial-controllers, or --no-initial-controllers is used. " +
|
||||||
() -> formatter1.formatter.run()).getMessage());
|
"For dynamic controllers support, try using one of --standalone, --initial-controllers, " +
|
||||||
|
"or --no-initial-controllers and removing controller.quorum.voters.",
|
||||||
|
assertThrows(FormatterException.class, formatter1.formatter::run).getMessage()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFormatWithInitialVotersFailsWithOlderMetadataVersion() throws Exception {
|
public void testFormatWithInitialVotersWithOlderMetadataVersion() throws Exception {
|
||||||
try (TestEnv testEnv = new TestEnv(2)) {
|
try (TestEnv testEnv = new TestEnv(2)) {
|
||||||
FormatterContext formatter1 = testEnv.newFormatter();
|
FormatterContext formatter1 = testEnv.newFormatter();
|
||||||
formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0);
|
formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0);
|
||||||
formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
|
formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 1);
|
||||||
formatter1.formatter.setInitialControllers(DynamicVoters.
|
formatter1.formatter.setInitialControllers(DynamicVoters.
|
||||||
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
|
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
|
||||||
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
|
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
|
||||||
assertEquals("kraft.version could not be set to 1 because it depends on " +
|
formatter1.formatter.setHasDynamicQuorum(true);
|
||||||
"metadata.version level 21",
|
formatter1.formatter.run();
|
||||||
assertThrows(IllegalArgumentException.class,
|
assertEquals((short) 1, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
|
||||||
() -> formatter1.formatter.run()).getMessage());
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@ValueSource(booleans = {false, true})
|
||||||
|
public void testFormatWithNoInitialControllersWithOlderMetadataVersion(boolean hasDynamicQuorum) throws Exception {
|
||||||
|
try (TestEnv testEnv = new TestEnv(2)) {
|
||||||
|
FormatterContext formatter1 = testEnv.newFormatter();
|
||||||
|
formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0);
|
||||||
|
formatter1.formatter.setHasDynamicQuorum(hasDynamicQuorum);
|
||||||
|
formatter1.formatter.run();
|
||||||
|
if (hasDynamicQuorum) {
|
||||||
|
assertEquals((short) 1, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
|
||||||
|
} else {
|
||||||
|
assertEquals((short) 0, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -475,6 +532,7 @@ public class FormatterTest {
|
||||||
formatter1.formatter.setFeatureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME, (short) 1);
|
formatter1.formatter.setFeatureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME, (short) 1);
|
||||||
formatter1.formatter.setInitialControllers(DynamicVoters.
|
formatter1.formatter.setInitialControllers(DynamicVoters.
|
||||||
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
|
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
|
||||||
|
formatter1.formatter.setHasDynamicQuorum(true);
|
||||||
if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_0_IV1)) {
|
if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_0_IV1)) {
|
||||||
assertDoesNotThrow(() -> formatter1.formatter.run());
|
assertDoesNotThrow(() -> formatter1.formatter.run());
|
||||||
} else {
|
} else {
|
||||||
|
@ -486,21 +544,15 @@ public class FormatterTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@Test
|
||||||
@ValueSource(booleans = {false, true})
|
public void testFormatWithNoInitialControllers() throws Exception {
|
||||||
public void testFormatWithNoInitialControllers(boolean specifyKRaftVersion) throws Exception {
|
|
||||||
try (TestEnv testEnv = new TestEnv(2)) {
|
try (TestEnv testEnv = new TestEnv(2)) {
|
||||||
FormatterContext formatter1 = testEnv.newFormatter();
|
FormatterContext formatter1 = testEnv.newFormatter();
|
||||||
if (specifyKRaftVersion) {
|
|
||||||
formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
|
|
||||||
}
|
|
||||||
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
|
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
|
||||||
formatter1.formatter.setNoInitialControllersFlag(true);
|
assertFalse(formatter1.formatter.hasDynamicQuorum());
|
||||||
assertTrue(formatter1.formatter.hasDynamicQuorum());
|
|
||||||
|
|
||||||
formatter1.formatter.run();
|
formatter1.formatter.run();
|
||||||
assertEquals((short) 1, formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0));
|
assertEquals((short) 0, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
|
||||||
assertEquals(Arrays.asList(
|
assertEquals(List.of(
|
||||||
String.format("Formatting data directory %s with %s %s.",
|
String.format("Formatting data directory %s with %s %s.",
|
||||||
testEnv.directory(1),
|
testEnv.directory(1),
|
||||||
MetadataVersion.FEATURE_NAME,
|
MetadataVersion.FEATURE_NAME,
|
||||||
|
@ -519,34 +571,4 @@ public class FormatterTest {
|
||||||
assertNotNull(logDirProps1);
|
assertNotNull(logDirProps1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testFormatWithoutNoInitialControllersFailsWithNewerKraftVersion() throws Exception {
|
|
||||||
try (TestEnv testEnv = new TestEnv(2)) {
|
|
||||||
FormatterContext formatter1 = testEnv.newFormatter();
|
|
||||||
formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
|
|
||||||
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
|
|
||||||
formatter1.formatter.setNoInitialControllersFlag(false);
|
|
||||||
assertFalse(formatter1.formatter.hasDynamicQuorum());
|
|
||||||
assertEquals("Cannot set kraft.version to 1 unless KIP-853 configuration is present. " +
|
|
||||||
"Try removing the --feature flag for kraft.version.",
|
|
||||||
assertThrows(FormatterException.class,
|
|
||||||
formatter1.formatter::run).getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testFormatWithNoInitialControllersFailsWithOlderKraftVersion() throws Exception {
|
|
||||||
try (TestEnv testEnv = new TestEnv(2)) {
|
|
||||||
FormatterContext formatter1 = testEnv.newFormatter();
|
|
||||||
formatter1.formatter.setFeatureLevel("kraft.version", (short) 0);
|
|
||||||
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
|
|
||||||
formatter1.formatter.setNoInitialControllersFlag(true);
|
|
||||||
assertTrue(formatter1.formatter.hasDynamicQuorum());
|
|
||||||
assertEquals("Cannot set kraft.version to 0 if KIP-853 configuration is present. " +
|
|
||||||
"Try removing the --feature flag for kraft.version.",
|
|
||||||
assertThrows(FormatterException.class,
|
|
||||||
formatter1.formatter::run).getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.server.common;
|
package org.apache.kafka.server.common;
|
||||||
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
public enum KRaftVersion implements FeatureVersion {
|
public enum KRaftVersion implements FeatureVersion {
|
||||||
|
@ -73,12 +72,7 @@ public enum KRaftVersion implements FeatureVersion {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Short> dependencies() {
|
public Map<String, Short> dependencies() {
|
||||||
if (this.featureLevel == 0) {
|
return Map.of();
|
||||||
return Collections.emptyMap();
|
|
||||||
} else {
|
|
||||||
return Collections.singletonMap(
|
|
||||||
MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_9_IV0.featureLevel());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public short quorumStateVersion() {
|
public short quorumStateVersion() {
|
||||||
|
|
|
@ -27,10 +27,12 @@ import kafka.server.SharedServer;
|
||||||
|
|
||||||
import org.apache.kafka.clients.CommonClientConfigs;
|
import org.apache.kafka.clients.CommonClientConfigs;
|
||||||
import org.apache.kafka.clients.admin.AdminClientConfig;
|
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||||
|
import org.apache.kafka.common.Uuid;
|
||||||
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
|
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
|
||||||
import org.apache.kafka.common.metrics.Metrics;
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
import org.apache.kafka.common.network.ListenerName;
|
import org.apache.kafka.common.network.ListenerName;
|
||||||
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
||||||
|
import org.apache.kafka.common.test.api.TestKitDefaults;
|
||||||
import org.apache.kafka.common.utils.ThreadUtils;
|
import org.apache.kafka.common.utils.ThreadUtils;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
|
@ -42,7 +44,6 @@ import org.apache.kafka.network.SocketServerConfigs;
|
||||||
import org.apache.kafka.raft.DynamicVoters;
|
import org.apache.kafka.raft.DynamicVoters;
|
||||||
import org.apache.kafka.raft.QuorumConfig;
|
import org.apache.kafka.raft.QuorumConfig;
|
||||||
import org.apache.kafka.server.common.ApiMessageAndVersion;
|
import org.apache.kafka.server.common.ApiMessageAndVersion;
|
||||||
import org.apache.kafka.server.common.KRaftVersion;
|
|
||||||
import org.apache.kafka.server.config.KRaftConfigs;
|
import org.apache.kafka.server.config.KRaftConfigs;
|
||||||
import org.apache.kafka.server.config.ServerConfigs;
|
import org.apache.kafka.server.config.ServerConfigs;
|
||||||
import org.apache.kafka.server.fault.FaultHandler;
|
import org.apache.kafka.server.fault.FaultHandler;
|
||||||
|
@ -114,6 +115,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
private final String controllerListenerName;
|
private final String controllerListenerName;
|
||||||
private final String brokerSecurityProtocol;
|
private final String brokerSecurityProtocol;
|
||||||
private final String controllerSecurityProtocol;
|
private final String controllerSecurityProtocol;
|
||||||
|
private boolean standalone;
|
||||||
|
private Optional<Map<Integer, Uuid>> initialVoterSet = Optional.empty();
|
||||||
private boolean deleteOnClose;
|
private boolean deleteOnClose;
|
||||||
|
|
||||||
public Builder(TestKitNodes nodes) {
|
public Builder(TestKitNodes nodes) {
|
||||||
|
@ -130,6 +133,16 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder setStandalone(boolean standalone) {
|
||||||
|
this.standalone = standalone;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder setInitialVoterSet(Map<Integer, Uuid> initialVoterSet) {
|
||||||
|
this.initialVoterSet = Optional.of(initialVoterSet);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
private KafkaConfig createNodeConfig(TestKitNode node) throws IOException {
|
private KafkaConfig createNodeConfig(TestKitNode node) throws IOException {
|
||||||
TestKitNode brokerNode = nodes.brokerNodes().get(node.id());
|
TestKitNode brokerNode = nodes.brokerNodes().get(node.id());
|
||||||
TestKitNode controllerNode = nodes.controllerNodes().get(node.id());
|
TestKitNode controllerNode = nodes.controllerNodes().get(node.id());
|
||||||
|
@ -168,18 +181,31 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
props.putIfAbsent(INTER_BROKER_LISTENER_NAME_CONFIG, brokerListenerName);
|
props.putIfAbsent(INTER_BROKER_LISTENER_NAME_CONFIG, brokerListenerName);
|
||||||
props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, controllerListenerName);
|
props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, controllerListenerName);
|
||||||
|
|
||||||
StringBuilder quorumVoterStringBuilder = new StringBuilder();
|
if (!standalone && initialVoterSet.isEmpty()) {
|
||||||
String prefix = "";
|
StringBuilder quorumVoterStringBuilder = new StringBuilder();
|
||||||
for (int nodeId : nodes.controllerNodes().keySet()) {
|
String prefix = "";
|
||||||
quorumVoterStringBuilder.append(prefix).
|
for (int nodeId : nodes.controllerNodes().keySet()) {
|
||||||
append(nodeId).
|
quorumVoterStringBuilder.append(prefix).
|
||||||
append("@").
|
append(nodeId).
|
||||||
append("localhost").
|
append("@").
|
||||||
append(":").
|
append("localhost").
|
||||||
append(socketFactoryManager.getOrCreatePortForListener(nodeId, controllerListenerName));
|
append(":").
|
||||||
prefix = ",";
|
append(socketFactoryManager.getOrCreatePortForListener(nodeId, controllerListenerName));
|
||||||
|
prefix = ",";
|
||||||
|
}
|
||||||
|
props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, quorumVoterStringBuilder.toString());
|
||||||
|
} else {
|
||||||
|
StringBuilder bootstrapServersStringBuilder = new StringBuilder();
|
||||||
|
String prefix = "";
|
||||||
|
for (int nodeId : nodes.controllerNodes().keySet()) {
|
||||||
|
bootstrapServersStringBuilder.append(prefix).
|
||||||
|
append("localhost").
|
||||||
|
append(":").
|
||||||
|
append(socketFactoryManager.getOrCreatePortForListener(nodeId, controllerListenerName));
|
||||||
|
prefix = ",";
|
||||||
|
}
|
||||||
|
props.put(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, bootstrapServersStringBuilder.toString());
|
||||||
}
|
}
|
||||||
props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, quorumVoterStringBuilder.toString());
|
|
||||||
|
|
||||||
// reduce log cleaner offset map memory usage
|
// reduce log cleaner offset map memory usage
|
||||||
props.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");
|
props.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");
|
||||||
|
@ -251,7 +277,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
Time.SYSTEM,
|
Time.SYSTEM,
|
||||||
new Metrics(),
|
new Metrics(),
|
||||||
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())),
|
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())),
|
||||||
Collections.emptyList(),
|
QuorumConfig.parseBootstrapServers(config.quorumConfig().bootstrapServers()),
|
||||||
faultHandlerFactory,
|
faultHandlerFactory,
|
||||||
socketFactoryManager.getOrCreateSocketFactory(node.id())
|
socketFactoryManager.getOrCreateSocketFactory(node.id())
|
||||||
);
|
);
|
||||||
|
@ -279,7 +305,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
Time.SYSTEM,
|
Time.SYSTEM,
|
||||||
new Metrics(),
|
new Metrics(),
|
||||||
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())),
|
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())),
|
||||||
Collections.emptyList(),
|
QuorumConfig.parseBootstrapServers(config.quorumConfig().bootstrapServers()),
|
||||||
faultHandlerFactory,
|
faultHandlerFactory,
|
||||||
socketFactoryManager.getOrCreateSocketFactory(node.id())
|
socketFactoryManager.getOrCreateSocketFactory(node.id())
|
||||||
);
|
);
|
||||||
|
@ -316,6 +342,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
faultHandlerFactory,
|
faultHandlerFactory,
|
||||||
socketFactoryManager,
|
socketFactoryManager,
|
||||||
Optional.ofNullable(jaasFile),
|
Optional.ofNullable(jaasFile),
|
||||||
|
standalone,
|
||||||
|
initialVoterSet,
|
||||||
deleteOnClose);
|
deleteOnClose);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -361,6 +389,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
private final PreboundSocketFactoryManager socketFactoryManager;
|
private final PreboundSocketFactoryManager socketFactoryManager;
|
||||||
private final String controllerListenerName;
|
private final String controllerListenerName;
|
||||||
private final Optional<File> jaasFile;
|
private final Optional<File> jaasFile;
|
||||||
|
private final boolean standalone;
|
||||||
|
private final Optional<Map<Integer, Uuid>> initialVoterSet;
|
||||||
private final boolean deleteOnClose;
|
private final boolean deleteOnClose;
|
||||||
|
|
||||||
private KafkaClusterTestKit(
|
private KafkaClusterTestKit(
|
||||||
|
@ -371,6 +401,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
SimpleFaultHandlerFactory faultHandlerFactory,
|
SimpleFaultHandlerFactory faultHandlerFactory,
|
||||||
PreboundSocketFactoryManager socketFactoryManager,
|
PreboundSocketFactoryManager socketFactoryManager,
|
||||||
Optional<File> jaasFile,
|
Optional<File> jaasFile,
|
||||||
|
boolean standalone,
|
||||||
|
Optional<Map<Integer, Uuid>> initialVoterSet,
|
||||||
boolean deleteOnClose
|
boolean deleteOnClose
|
||||||
) {
|
) {
|
||||||
/*
|
/*
|
||||||
|
@ -388,6 +420,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
this.socketFactoryManager = socketFactoryManager;
|
this.socketFactoryManager = socketFactoryManager;
|
||||||
this.controllerListenerName = nodes.controllerListenerName().value();
|
this.controllerListenerName = nodes.controllerListenerName().value();
|
||||||
this.jaasFile = jaasFile;
|
this.jaasFile = jaasFile;
|
||||||
|
this.standalone = standalone;
|
||||||
|
this.initialVoterSet = initialVoterSet;
|
||||||
this.deleteOnClose = deleteOnClose;
|
this.deleteOnClose = deleteOnClose;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -422,8 +456,9 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
boolean writeMetadataDirectory
|
boolean writeMetadataDirectory
|
||||||
) {
|
) {
|
||||||
try {
|
try {
|
||||||
|
final var nodeId = ensemble.nodeId().getAsInt();
|
||||||
Formatter formatter = new Formatter();
|
Formatter formatter = new Formatter();
|
||||||
formatter.setNodeId(ensemble.nodeId().getAsInt());
|
formatter.setNodeId(nodeId);
|
||||||
formatter.setClusterId(ensemble.clusterId().get());
|
formatter.setClusterId(ensemble.clusterId().get());
|
||||||
if (writeMetadataDirectory) {
|
if (writeMetadataDirectory) {
|
||||||
formatter.setDirectories(ensemble.logDirProps().keySet());
|
formatter.setDirectories(ensemble.logDirProps().keySet());
|
||||||
|
@ -436,8 +471,6 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
formatter.setReleaseVersion(nodes.bootstrapMetadata().metadataVersion());
|
formatter.setReleaseVersion(nodes.bootstrapMetadata().metadataVersion());
|
||||||
formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME,
|
|
||||||
nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME));
|
|
||||||
formatter.setUnstableFeatureVersionsEnabled(true);
|
formatter.setUnstableFeatureVersionsEnabled(true);
|
||||||
formatter.setIgnoreFormatted(false);
|
formatter.setIgnoreFormatted(false);
|
||||||
formatter.setControllerListenerName(controllerListenerName);
|
formatter.setControllerListenerName(controllerListenerName);
|
||||||
|
@ -446,18 +479,43 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
} else {
|
} else {
|
||||||
formatter.setMetadataLogDirectory(Optional.empty());
|
formatter.setMetadataLogDirectory(Optional.empty());
|
||||||
}
|
}
|
||||||
if (nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME) > 0) {
|
StringBuilder dynamicVotersBuilder = new StringBuilder();
|
||||||
StringBuilder dynamicVotersBuilder = new StringBuilder();
|
String prefix = "";
|
||||||
String prefix = "";
|
if (standalone) {
|
||||||
for (TestKitNode controllerNode : nodes.controllerNodes().values()) {
|
if (nodeId == TestKitDefaults.BROKER_ID_OFFSET + TestKitDefaults.CONTROLLER_ID_OFFSET) {
|
||||||
int port = socketFactoryManager.
|
final var controllerNode = nodes.controllerNodes().get(nodeId);
|
||||||
getOrCreatePortForListener(controllerNode.id(), controllerListenerName);
|
dynamicVotersBuilder.append(
|
||||||
|
String.format(
|
||||||
|
"%d@localhost:%d:%s",
|
||||||
|
controllerNode.id(),
|
||||||
|
socketFactoryManager.
|
||||||
|
getOrCreatePortForListener(controllerNode.id(), controllerListenerName),
|
||||||
|
controllerNode.metadataDirectoryId()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
|
||||||
|
}
|
||||||
|
// when the nodeId != TestKitDefaults.CONTROLLER_ID_OFFSET, the node is formatting with
|
||||||
|
// the --no-initial-controllers flag
|
||||||
|
formatter.setHasDynamicQuorum(true);
|
||||||
|
} else if (initialVoterSet.isPresent()) {
|
||||||
|
for (final var controllerNode : initialVoterSet.get().entrySet()) {
|
||||||
|
final var voterId = controllerNode.getKey();
|
||||||
|
final var voterDirectoryId = controllerNode.getValue();
|
||||||
dynamicVotersBuilder.append(prefix);
|
dynamicVotersBuilder.append(prefix);
|
||||||
prefix = ",";
|
prefix = ",";
|
||||||
dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s",
|
dynamicVotersBuilder.append(
|
||||||
controllerNode.id(), port, controllerNode.metadataDirectoryId()));
|
String.format(
|
||||||
|
"%d@localhost:%d:%s",
|
||||||
|
voterId,
|
||||||
|
socketFactoryManager.
|
||||||
|
getOrCreatePortForListener(voterId, controllerListenerName),
|
||||||
|
voterDirectoryId
|
||||||
|
)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
|
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
|
||||||
|
formatter.setHasDynamicQuorum(true);
|
||||||
}
|
}
|
||||||
formatter.run();
|
formatter.run();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
|
@ -94,11 +94,6 @@ public class TestKitNodes {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder setFeature(String featureName, short level) {
|
|
||||||
this.bootstrapMetadata = bootstrapMetadata.copyWithFeatureRecord(featureName, level);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Builder setCombined(boolean combined) {
|
public Builder setCombined(boolean combined) {
|
||||||
this.combined = combined;
|
this.combined = combined;
|
||||||
return this;
|
return this;
|
||||||
|
|
Loading…
Reference in New Issue