Compare commits

...

5 Commits

Author SHA1 Message Date
Dejan Stojadinović 40dc22a9ca
Merge c3f32941d7 into 4a5aa37169 2025-10-07 21:32:03 +00:00
dejan2609 c3f32941d7
KAFKA-19664 KAFKA-19769 signing algorithm is changed (from `SHA1withRSA` to `SHA256withRSA`) 2025-10-07 23:15:21 +02:00
dejan2609 a21a1b0a84
KAFKA-19664 comment squeezed into one line 2025-10-07 20:57:27 +02:00
Chang-Chi Hsu 4a5aa37169
MINOR: Move ReconfigurableQuorumIntegrationTest from core module to server module (#20636)
CI / build (push) Waiting to run Details
It moves the `ReconfigurableQuorumIntegrationTest` class to the
`org.apache.kafka.server` package and consolidates two related tests,
`RemoveAndAddVoterWithValidClusterId` and
`RemoveAndAddVoterWithInconsistentClusterId`, into a single file. This
improves code organization and reduces redundancy.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-10-08 01:10:58 +08:00
dejan2609 a37616a1eb
KAFKA-19664 Support building with Java 25 (LTS release)
notes:
 - spotbugs related Gradle tasks are temporarily disabled for Java 25 (see https://issues.apache.org/jira/browse/BCEL-377)
 - zinc upgrade: 1.10.8 -->> 1.11.0 (Java 25 compatibility): https://github.com/sbt/zinc/releases/tag/v1.11.0
 - Scala version patch update: 2.13.16 -->> 2.13.17 https://contributors.scala-lang.org/t/scala-2-13-17-release-planning/6994/17
 - GitHub Action now supports Java 25
 - mockito version upgrade: 5.14.2 -->> 5.20.0 (in order to resolve test issues)
   - https://issues.apache.org/jira/browse/SOLR-17718
   - https://github.com/mockito/mockito/releases/tag/v5.20.0
2025-10-07 14:19:43 +02:00
12 changed files with 131 additions and 159 deletions

View File

@ -127,7 +127,7 @@ jobs:
- name: Setup Gradle - name: Setup Gradle
uses: ./.github/actions/setup-gradle uses: ./.github/actions/setup-gradle
with: with:
java-version: 24 java-version: 17
gradle-cache-read-only: ${{ !inputs.is-trunk }} gradle-cache-read-only: ${{ !inputs.is-trunk }}
gradle-cache-write-only: ${{ inputs.is-trunk }} gradle-cache-write-only: ${{ inputs.is-trunk }}
develocity-access-key: ${{ secrets.DEVELOCITY_ACCESS_KEY }} develocity-access-key: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
@ -181,7 +181,7 @@ jobs:
fail-fast: false fail-fast: false
matrix: matrix:
# If we change these, make sure to adjust ci-complete.yml # If we change these, make sure to adjust ci-complete.yml
java: [ 24, 17 ] java: [ 25, 17 ]
run-flaky: [ true, false ] run-flaky: [ true, false ]
run-new: [ true, false ] run-new: [ true, false ]
exclude: exclude:
@ -270,7 +270,7 @@ jobs:
python .github/scripts/junit.py \ python .github/scripts/junit.py \
--path build/junit-xml >> $GITHUB_STEP_SUMMARY --path build/junit-xml >> $GITHUB_STEP_SUMMARY
# This job downloads all the JUnit XML files and thread dumps from the JDK 24 test runs. # This job downloads all the JUnit XML files and thread dumps from the JDK 25 test runs.
# If any test job fails, we will not run this job. Also, if any thread dump artifacts # If any test job fails, we will not run this job. Also, if any thread dump artifacts
# are present, this means there was a timeout in the tests and so we will not proceed # are present, this means there was a timeout in the tests and so we will not proceed
# with catalog creation. # with catalog creation.
@ -288,7 +288,7 @@ jobs:
- name: Download Thread Dumps - name: Download Thread Dumps
uses: actions/download-artifact@v5 uses: actions/download-artifact@v5
with: with:
pattern: junit-thread-dumps-24-* pattern: junit-thread-dumps-25-*
path: thread-dumps path: thread-dumps
merge-multiple: true merge-multiple: true
- name: Check For Thread Dump - name: Check For Thread Dump
@ -302,7 +302,7 @@ jobs:
- name: Download JUnit XMLs - name: Download JUnit XMLs
uses: actions/download-artifact@v5 uses: actions/download-artifact@v5
with: with:
pattern: junit-xml-24-* # Only look at JDK 24 tests for the test catalog pattern: junit-xml-25-* # Only look at JDK 25 tests for the test catalog
path: junit-xml path: junit-xml
merge-multiple: true merge-multiple: true
- name: Collate Test Catalog - name: Collate Test Catalog

View File

@ -43,8 +43,8 @@ jobs:
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
# Make sure these match build.yml # Make sure these match build.yml and also keep in mind that GitHub Actions build will always use this file from the trunk branch.
java: [ 24, 17 ] java: [ 25, 17 ]
run-flaky: [ true, false ] run-flaky: [ true, false ]
run-new: [ true, false ] run-new: [ true, false ]
exclude: exclude:

View File

@ -248,9 +248,9 @@ License Version 2.0:
- opentelemetry-proto-1.3.2-alpha - opentelemetry-proto-1.3.2-alpha
- plexus-utils-3.5.1 - plexus-utils-3.5.1
- rocksdbjni-10.1.3 - rocksdbjni-10.1.3
- scala-library-2.13.16 - scala-library-2.13.17
- scala-logging_2.13-3.9.5 - scala-logging_2.13-3.9.5
- scala-reflect-2.13.16 - scala-reflect-2.13.17
- snappy-java-1.1.10.7 - snappy-java-1.1.10.7
- snakeyaml-2.4 - snakeyaml-2.4
- swagger-annotations-2.2.25 - swagger-annotations-2.2.25

View File

@ -13,7 +13,7 @@
You need to have [Java](http://www.oracle.com/technetwork/java/javase/downloads/index.html) installed. You need to have [Java](http://www.oracle.com/technetwork/java/javase/downloads/index.html) installed.
We build and test Apache Kafka with 17 and 24. The `release` parameter in javac is set to `11` for the clients We build and test Apache Kafka with 17 and 25. The `release` parameter in javac is set to `11` for the clients
and streams modules, and `17` for the rest, ensuring compatibility with their respective and streams modules, and `17` for the rest, ensuring compatibility with their respective
minimum Java versions. Similarly, the `release` parameter in scalac is set to `11` for the streams modules and `17` minimum Java versions. Similarly, the `release` parameter in scalac is set to `11` for the streams modules and `17`
for the rest. for the rest.

View File

@ -49,7 +49,7 @@ should_include_file() {
base_dir=$(dirname $0)/.. base_dir=$(dirname $0)/..
if [ -z "$SCALA_VERSION" ]; then if [ -z "$SCALA_VERSION" ]; then
SCALA_VERSION=2.13.16 SCALA_VERSION=2.13.17
if [[ -f "$base_dir/gradle.properties" ]]; then if [[ -f "$base_dir/gradle.properties" ]]; then
SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2` SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2`
fi fi

View File

@ -27,7 +27,7 @@ set BASE_DIR=%CD%
popd popd
IF ["%SCALA_VERSION%"] EQU [""] ( IF ["%SCALA_VERSION%"] EQU [""] (
set SCALA_VERSION=2.13.16 set SCALA_VERSION=2.13.17
) )
IF ["%SCALA_BINARY_VERSION%"] EQU [""] ( IF ["%SCALA_BINARY_VERSION%"] EQU [""] (

View File

@ -71,6 +71,13 @@ ext {
"--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED" "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED"
) )
if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_25)) {
// Spotbugs is not compatible with Java 25+ so Gradle related tasks are disabled
// until version can be upgraded: https://github.com/spotbugs/spotbugs/issues/3564
project.gradle.startParameter.excludedTaskNames.add("spotbugsMain")
project.gradle.startParameter.excludedTaskNames.add("spotbugsTest")
}
maxTestForks = project.hasProperty('maxParallelForks') ? maxParallelForks.toInteger() : Runtime.runtime.availableProcessors() maxTestForks = project.hasProperty('maxParallelForks') ? maxParallelForks.toInteger() : Runtime.runtime.availableProcessors()
maxScalacThreads = project.hasProperty('maxScalacThreads') ? maxScalacThreads.toInteger() : maxScalacThreads = project.hasProperty('maxScalacThreads') ? maxScalacThreads.toInteger() :
Math.min(Runtime.runtime.availableProcessors(), 8) Math.min(Runtime.runtime.availableProcessors(), 8)

View File

@ -1,132 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Tag("integration")
public class ReconfigurableQuorumIntegrationTest {
static Map<Integer, Uuid> descVoterDirs(Admin admin) throws ExecutionException, InterruptedException {
var quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
return quorumInfo.voters().stream().collect(Collectors.toMap(QuorumInfo.ReplicaState::replicaId, QuorumInfo.ReplicaState::replicaDirectoryId));
}
@Test
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
TestUtils.waitForCondition(() -> {
Map<Integer, Uuid> voters = descVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
}, "Initial quorum voters should be {3000, 3001, 3002} and all should have non-zero directory IDs");
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
TestUtils.waitForCondition(() -> {
Map<Integer, Uuid> voters = descVoterDirs(admin);
assertEquals(Set.of(3001, 3002), voters.keySet());
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
}, "After removing voter 3000, remaining voters should be {3001, 3002} with non-zero directory IDs");
admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
}
}
}
@Test
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
var removeFuture = admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
var addFuture = admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
}
}
}
}

View File

@ -110,7 +110,7 @@ public class TestSslUtils {
* @param dn the X.509 Distinguished Name, eg "CN=Test, L=London, C=GB" * @param dn the X.509 Distinguished Name, eg "CN=Test, L=London, C=GB"
* @param pair the KeyPair * @param pair the KeyPair
* @param days how many days from now the Certificate is valid for, or - for negative values - how many days before now * @param days how many days from now the Certificate is valid for, or - for negative values - how many days before now
* @param algorithm the signing algorithm, eg "SHA1withRSA" * @param algorithm the signing algorithm, eg "SHA256withRSA"
* @return the self-signed certificate * @return the self-signed certificate
* @throws CertificateException thrown if a security error or an IO error occurred. * @throws CertificateException thrown if a security error or an IO error occurred.
*/ */
@ -131,7 +131,7 @@ public class TestSslUtils {
* CA. * CA.
* @param parentKeyPair The key pair of the issuer. Leave null if you want to generate a root * @param parentKeyPair The key pair of the issuer. Leave null if you want to generate a root
* CA. * CA.
* @param algorithm the signing algorithm, eg "SHA1withRSA" * @param algorithm the signing algorithm, eg "SHA256withRSA"
* @return the signed certificate * @return the signed certificate
* @throws CertificateException * @throws CertificateException
*/ */
@ -399,7 +399,7 @@ public class TestSslUtils {
private byte[] subjectAltName; private byte[] subjectAltName;
public CertificateBuilder() { public CertificateBuilder() {
this(30, "SHA1withRSA"); this(30, "SHA256withRSA");
} }
public CertificateBuilder(int days, String algorithm) { public CertificateBuilder(int days, String algorithm) {

View File

@ -24,7 +24,7 @@ group=org.apache.kafka
# - streams/quickstart/java/src/main/resources/archetype-resources/pom.xml # - streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
# - streams/quickstart/java/pom.xml # - streams/quickstart/java/pom.xml
version=4.2.0-SNAPSHOT version=4.2.0-SNAPSHOT
scalaVersion=2.13.16 scalaVersion=2.13.17
# Adding swaggerVersion in gradle.properties to have a single version in place for swagger # Adding swaggerVersion in gradle.properties to have a single version in place for swagger
swaggerVersion=2.2.25 swaggerVersion=2.2.25
task=build task=build

View File

@ -23,7 +23,7 @@ ext {
} }
// Add Scala version // Add Scala version
def defaultScala213Version = '2.13.16' def defaultScala213Version = '2.13.17'
if (hasProperty('scalaVersion')) { if (hasProperty('scalaVersion')) {
if (scalaVersion == '2.13') { if (scalaVersion == '2.13') {
versions["scala"] = defaultScala213Version versions["scala"] = defaultScala213Version
@ -110,7 +110,7 @@ versions += [
lz4: "1.8.0", lz4: "1.8.0",
mavenArtifact: "3.9.6", mavenArtifact: "3.9.6",
metrics: "2.2.0", metrics: "2.2.0",
mockito: "5.14.2", mockito: "5.20.0",
opentelemetryProto: "1.3.2-alpha", opentelemetryProto: "1.3.2-alpha",
protobuf: "3.25.5", // a dependency of opentelemetryProto protobuf: "3.25.5", // a dependency of opentelemetryProto
pcollections: "4.0.2", pcollections: "4.0.2",
@ -125,7 +125,7 @@ versions += [
snappy: "1.1.10.7", snappy: "1.1.10.7",
spotbugs: "4.9.4", spotbugs: "4.9.4",
mockOAuth2Server: "2.2.1", mockOAuth2Server: "2.2.1",
zinc: "1.10.8", zinc: "1.11.0",
// When updating the zstd version, please do as well in docker/native/native-image-configs/resource-config.json // When updating the zstd version, please do as well in docker/native/native-image-configs/resource-config.json
// Also make sure the compression levels in org.apache.kafka.common.record.CompressionType are still valid // Also make sure the compression levels in org.apache.kafka.common.record.CompressionType are still valid
zstd: "1.5.6-10", zstd: "1.5.6-10",

View File

@ -15,13 +15,16 @@
* limitations under the License. * limitations under the License.
*/ */
package kafka.server; package org.apache.kafka.server;
import org.apache.kafka.clients.admin.AddRaftVoterOptions;
import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.FeatureMetadata; import org.apache.kafka.clients.admin.FeatureMetadata;
import org.apache.kafka.clients.admin.QuorumInfo; import org.apache.kafka.clients.admin.QuorumInfo;
import org.apache.kafka.clients.admin.RaftVoterEndpoint; import org.apache.kafka.clients.admin.RaftVoterEndpoint;
import org.apache.kafka.clients.admin.RemoveRaftVoterOptions;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.test.KafkaClusterTestKit; import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes; import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.common.test.api.TestKitDefaults; import org.apache.kafka.common.test.api.TestKitDefaults;
@ -29,10 +32,12 @@ import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
@ -41,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("integration")
public class ReconfigurableQuorumIntegrationTest { public class ReconfigurableQuorumIntegrationTest {
static void checkKRaftVersions(Admin admin, short finalized) throws Exception { static void checkKRaftVersions(Admin admin, short finalized) throws Exception {
FeatureMetadata featureMetadata = admin.describeFeatures().featureMetadata().get(); FeatureMetadata featureMetadata = admin.describeFeatures().featureMetadata().get();
@ -70,7 +76,7 @@ public class ReconfigurableQuorumIntegrationTest {
).build()) { ).build()) {
cluster.format(); cluster.format();
cluster.startup(); cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) { try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, () -> { TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_0.featureLevel()); checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_0.featureLevel());
}); });
@ -88,7 +94,7 @@ public class ReconfigurableQuorumIntegrationTest {
).setStandalone(true).build()) { ).setStandalone(true).build()) {
cluster.format(); cluster.format();
cluster.startup(); cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) { try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, () -> { TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_1.featureLevel()); checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_1.featureLevel());
}); });
@ -126,7 +132,7 @@ public class ReconfigurableQuorumIntegrationTest {
) { ) {
cluster.format(); cluster.format();
cluster.startup(); cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) { try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin); Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet()); assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -161,7 +167,7 @@ public class ReconfigurableQuorumIntegrationTest {
) { ) {
cluster.format(); cluster.format();
cluster.startup(); cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) { try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin); Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002, 3003), voters.keySet()); assertEquals(Set.of(3000, 3001, 3002, 3003), voters.keySet());
@ -200,7 +206,7 @@ public class ReconfigurableQuorumIntegrationTest {
) { ) {
cluster.format(); cluster.format();
cluster.startup(); cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) { try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin); Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet()); assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -238,7 +244,7 @@ public class ReconfigurableQuorumIntegrationTest {
) { ) {
cluster.format(); cluster.format();
cluster.startup(); cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) { try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin); Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet()); assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -249,4 +255,95 @@ public class ReconfigurableQuorumIntegrationTest {
} }
} }
} }
}
@Test
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
for (int replicaId : new int[] {3000, 3001, 3002}) {
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
}
});
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3001, 3002), voters.keySet());
for (int replicaId : new int[] {3001, 3002}) {
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
}
});
admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
}
}
}
@Test
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (var admin = Admin.create(cluster.clientProperties())) {
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
var removeFuture = admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
var addFuture = admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
}
}
}
}