mirror of https://github.com/apache/kafka.git
Compare commits
3 Commits
4484afb30c
...
4433951142
Author | SHA1 | Date |
---|---|---|
|
4433951142 | |
|
4a5aa37169 | |
|
355a2519f6 |
|
@ -42,7 +42,7 @@ runs:
|
||||||
distribution: temurin
|
distribution: temurin
|
||||||
java-version: ${{ inputs.java-version }}
|
java-version: ${{ inputs.java-version }}
|
||||||
- name: Setup Gradle
|
- name: Setup Gradle
|
||||||
uses: gradle/actions/setup-gradle@748248ddd2a24f49513d8f472f81c3a07d4d50e1 # v4.4.4
|
uses: gradle/actions/setup-gradle@4d9f0ba0025fe599b4ebab900eb7f3a1d93ef4c2 # v5.0.0
|
||||||
env:
|
env:
|
||||||
GRADLE_BUILD_ACTION_CACHE_DEBUG_ENABLED: true
|
GRADLE_BUILD_ACTION_CACHE_DEBUG_ENABLED: true
|
||||||
with:
|
with:
|
||||||
|
|
|
@ -54,7 +54,7 @@ jobs:
|
||||||
run: |
|
run: |
|
||||||
python docker_build_test.py kafka/test -tag=test -type=$IMAGE_TYPE -u=$KAFKA_URL
|
python docker_build_test.py kafka/test -tag=test -type=$IMAGE_TYPE -u=$KAFKA_URL
|
||||||
- name: Run CVE scan
|
- name: Run CVE scan
|
||||||
uses: aquasecurity/trivy-action@6e7b7d1fd3e4fef0c5fa8cce1229c54b2c9bd0d8 # v0.24.0
|
uses: aquasecurity/trivy-action@b6643a29fecd7f34b3597bc6acb0a98b03d33ff8 # v0.33.1
|
||||||
with:
|
with:
|
||||||
image-ref: 'kafka/test:test'
|
image-ref: 'kafka/test:test'
|
||||||
format: 'table'
|
format: 'table'
|
||||||
|
|
|
@ -53,7 +53,7 @@ jobs:
|
||||||
run: |
|
run: |
|
||||||
python docker_official_image_build_test.py kafka/test -tag=test -type=$IMAGE_TYPE -v=$KAFKA_VERSION
|
python docker_official_image_build_test.py kafka/test -tag=test -type=$IMAGE_TYPE -v=$KAFKA_VERSION
|
||||||
- name: Run CVE scan
|
- name: Run CVE scan
|
||||||
uses: aquasecurity/trivy-action@6e7b7d1fd3e4fef0c5fa8cce1229c54b2c9bd0d8 # v0.24.0
|
uses: aquasecurity/trivy-action@b6643a29fecd7f34b3597bc6acb0a98b03d33ff8 # v0.33.1
|
||||||
with:
|
with:
|
||||||
image-ref: 'kafka/test:test'
|
image-ref: 'kafka/test:test'
|
||||||
format: 'table'
|
format: 'table'
|
||||||
|
|
|
@ -31,11 +31,11 @@ jobs:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- name: Set up QEMU
|
- name: Set up QEMU
|
||||||
uses: docker/setup-qemu-action@49b3bc8e6bdd4a60e6116a5414239cba5943d3cf # v3.2.0
|
uses: docker/setup-qemu-action@29109295f81e9208d7d86ff1c6c12d2833863392 # v3.6.0
|
||||||
- name: Set up Docker Buildx
|
- name: Set up Docker Buildx
|
||||||
uses: docker/setup-buildx-action@988b5a0280414f521da01fcc63a27aeeb4b104db # v3.6.1
|
uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1
|
||||||
- name: Login to Docker Hub
|
- name: Login to Docker Hub
|
||||||
uses: docker/login-action@9780b0c442fbb1117ed29e0efdff1e18412f7567 # v3.3.0
|
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3.6.0
|
||||||
with:
|
with:
|
||||||
username: ${{ secrets.DOCKERHUB_USER }}
|
username: ${{ secrets.DOCKERHUB_USER }}
|
||||||
password: ${{ secrets.DOCKERHUB_TOKEN }}
|
password: ${{ secrets.DOCKERHUB_TOKEN }}
|
||||||
|
|
|
@ -47,11 +47,11 @@ jobs:
|
||||||
python -m pip install --upgrade pip
|
python -m pip install --upgrade pip
|
||||||
pip install -r docker/requirements.txt
|
pip install -r docker/requirements.txt
|
||||||
- name: Set up QEMU
|
- name: Set up QEMU
|
||||||
uses: docker/setup-qemu-action@49b3bc8e6bdd4a60e6116a5414239cba5943d3cf # v3.2.0
|
uses: docker/setup-qemu-action@29109295f81e9208d7d86ff1c6c12d2833863392 # v3.6.0
|
||||||
- name: Set up Docker Buildx
|
- name: Set up Docker Buildx
|
||||||
uses: docker/setup-buildx-action@988b5a0280414f521da01fcc63a27aeeb4b104db # v3.6.1
|
uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1
|
||||||
- name: Login to Docker Hub
|
- name: Login to Docker Hub
|
||||||
uses: docker/login-action@9780b0c442fbb1117ed29e0efdff1e18412f7567 # v3.3.0
|
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3.6.0
|
||||||
with:
|
with:
|
||||||
username: ${{ secrets.DOCKERHUB_USER }}
|
username: ${{ secrets.DOCKERHUB_USER }}
|
||||||
password: ${{ secrets.DOCKERHUB_TOKEN }}
|
password: ${{ secrets.DOCKERHUB_TOKEN }}
|
||||||
|
|
|
@ -29,7 +29,7 @@ jobs:
|
||||||
supported_image_tag: ['latest', '3.9.1', '4.0.0', '4.1.0']
|
supported_image_tag: ['latest', '3.9.1', '4.0.0', '4.1.0']
|
||||||
steps:
|
steps:
|
||||||
- name: Run CVE scan
|
- name: Run CVE scan
|
||||||
uses: aquasecurity/trivy-action@6e7b7d1fd3e4fef0c5fa8cce1229c54b2c9bd0d8 # v0.24.0
|
uses: aquasecurity/trivy-action@b6643a29fecd7f34b3597bc6acb0a98b03d33ff8 # v0.33.1
|
||||||
if: always()
|
if: always()
|
||||||
with:
|
with:
|
||||||
image-ref: apache/kafka:${{ matrix.supported_image_tag }}
|
image-ref: apache/kafka:${{ matrix.supported_image_tag }}
|
||||||
|
|
|
@ -35,7 +35,7 @@ jobs:
|
||||||
env:
|
env:
|
||||||
GITHUB_CONTEXT: ${{ toJson(github) }}
|
GITHUB_CONTEXT: ${{ toJson(github) }}
|
||||||
- name: Remove label
|
- name: Remove label
|
||||||
uses: actions/github-script@v7
|
uses: actions/github-script@v8
|
||||||
continue-on-error: true
|
continue-on-error: true
|
||||||
with:
|
with:
|
||||||
github-token: ${{ secrets.GITHUB_TOKEN }}
|
github-token: ${{ secrets.GITHUB_TOKEN }}
|
||||||
|
@ -77,7 +77,7 @@ jobs:
|
||||||
issues: write
|
issues: write
|
||||||
pull-requests: write
|
pull-requests: write
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/stale@v9
|
- uses: actions/stale@v10
|
||||||
with:
|
with:
|
||||||
debug-only: ${{ inputs.dryRun || false }}
|
debug-only: ${{ inputs.dryRun || false }}
|
||||||
operations-per-run: ${{ inputs.operationsPerRun || 500 }}
|
operations-per-run: ${{ inputs.operationsPerRun || 500 }}
|
||||||
|
|
|
@ -1,132 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.kafka.clients.admin;
|
|
||||||
|
|
||||||
import org.apache.kafka.common.Uuid;
|
|
||||||
import org.apache.kafka.common.errors.InconsistentClusterIdException;
|
|
||||||
import org.apache.kafka.common.test.KafkaClusterTestKit;
|
|
||||||
import org.apache.kafka.common.test.TestKitNodes;
|
|
||||||
import org.apache.kafka.test.TestUtils;
|
|
||||||
|
|
||||||
import org.junit.jupiter.api.Tag;
|
|
||||||
import org.junit.jupiter.api.Test;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
|
||||||
|
|
||||||
@Tag("integration")
|
|
||||||
public class ReconfigurableQuorumIntegrationTest {
|
|
||||||
|
|
||||||
static Map<Integer, Uuid> descVoterDirs(Admin admin) throws ExecutionException, InterruptedException {
|
|
||||||
var quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
|
|
||||||
return quorumInfo.voters().stream().collect(Collectors.toMap(QuorumInfo.ReplicaState::replicaId, QuorumInfo.ReplicaState::replicaDirectoryId));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
|
|
||||||
final var nodes = new TestKitNodes.Builder()
|
|
||||||
.setClusterId("test-cluster")
|
|
||||||
.setNumBrokerNodes(1)
|
|
||||||
.setNumControllerNodes(3)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
final Map<Integer, Uuid> initialVoters = new HashMap<>();
|
|
||||||
for (final var controllerNode : nodes.controllerNodes().values()) {
|
|
||||||
initialVoters.put(
|
|
||||||
controllerNode.id(),
|
|
||||||
controllerNode.metadataDirectoryId()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
|
|
||||||
cluster.format();
|
|
||||||
cluster.startup();
|
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
|
||||||
TestUtils.waitForCondition(() -> {
|
|
||||||
Map<Integer, Uuid> voters = descVoterDirs(admin);
|
|
||||||
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
|
||||||
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
|
|
||||||
}, "Initial quorum voters should be {3000, 3001, 3002} and all should have non-zero directory IDs");
|
|
||||||
|
|
||||||
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
|
|
||||||
admin.removeRaftVoter(
|
|
||||||
3000,
|
|
||||||
dirId,
|
|
||||||
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
|
|
||||||
).all().get();
|
|
||||||
TestUtils.waitForCondition(() -> {
|
|
||||||
Map<Integer, Uuid> voters = descVoterDirs(admin);
|
|
||||||
assertEquals(Set.of(3001, 3002), voters.keySet());
|
|
||||||
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
|
|
||||||
}, "After removing voter 3000, remaining voters should be {3001, 3002} with non-zero directory IDs");
|
|
||||||
|
|
||||||
admin.addRaftVoter(
|
|
||||||
3000,
|
|
||||||
dirId,
|
|
||||||
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
|
|
||||||
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
|
|
||||||
).all().get();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
|
|
||||||
final var nodes = new TestKitNodes.Builder()
|
|
||||||
.setClusterId("test-cluster")
|
|
||||||
.setNumBrokerNodes(1)
|
|
||||||
.setNumControllerNodes(3)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
final Map<Integer, Uuid> initialVoters = new HashMap<>();
|
|
||||||
for (final var controllerNode : nodes.controllerNodes().values()) {
|
|
||||||
initialVoters.put(
|
|
||||||
controllerNode.id(),
|
|
||||||
controllerNode.metadataDirectoryId()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
|
|
||||||
cluster.format();
|
|
||||||
cluster.startup();
|
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
|
||||||
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
|
|
||||||
var removeFuture = admin.removeRaftVoter(
|
|
||||||
3000,
|
|
||||||
dirId,
|
|
||||||
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
|
|
||||||
).all();
|
|
||||||
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
|
|
||||||
|
|
||||||
var addFuture = admin.addRaftVoter(
|
|
||||||
3000,
|
|
||||||
dirId,
|
|
||||||
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
|
|
||||||
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
|
|
||||||
).all();
|
|
||||||
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -15,13 +15,16 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package kafka.server;
|
package org.apache.kafka.server;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.admin.AddRaftVoterOptions;
|
||||||
import org.apache.kafka.clients.admin.Admin;
|
import org.apache.kafka.clients.admin.Admin;
|
||||||
import org.apache.kafka.clients.admin.FeatureMetadata;
|
import org.apache.kafka.clients.admin.FeatureMetadata;
|
||||||
import org.apache.kafka.clients.admin.QuorumInfo;
|
import org.apache.kafka.clients.admin.QuorumInfo;
|
||||||
import org.apache.kafka.clients.admin.RaftVoterEndpoint;
|
import org.apache.kafka.clients.admin.RaftVoterEndpoint;
|
||||||
|
import org.apache.kafka.clients.admin.RemoveRaftVoterOptions;
|
||||||
import org.apache.kafka.common.Uuid;
|
import org.apache.kafka.common.Uuid;
|
||||||
|
import org.apache.kafka.common.errors.InconsistentClusterIdException;
|
||||||
import org.apache.kafka.common.test.KafkaClusterTestKit;
|
import org.apache.kafka.common.test.KafkaClusterTestKit;
|
||||||
import org.apache.kafka.common.test.TestKitNodes;
|
import org.apache.kafka.common.test.TestKitNodes;
|
||||||
import org.apache.kafka.common.test.api.TestKitDefaults;
|
import org.apache.kafka.common.test.api.TestKitDefaults;
|
||||||
|
@ -29,10 +32,12 @@ import org.apache.kafka.raft.QuorumConfig;
|
||||||
import org.apache.kafka.server.common.KRaftVersion;
|
import org.apache.kafka.server.common.KRaftVersion;
|
||||||
import org.apache.kafka.test.TestUtils;
|
import org.apache.kafka.test.TestUtils;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Tag;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
|
||||||
|
@ -41,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
@Tag("integration")
|
||||||
public class ReconfigurableQuorumIntegrationTest {
|
public class ReconfigurableQuorumIntegrationTest {
|
||||||
static void checkKRaftVersions(Admin admin, short finalized) throws Exception {
|
static void checkKRaftVersions(Admin admin, short finalized) throws Exception {
|
||||||
FeatureMetadata featureMetadata = admin.describeFeatures().featureMetadata().get();
|
FeatureMetadata featureMetadata = admin.describeFeatures().featureMetadata().get();
|
||||||
|
@ -70,7 +76,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
).build()) {
|
).build()) {
|
||||||
cluster.format();
|
cluster.format();
|
||||||
cluster.startup();
|
cluster.startup();
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||||
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
|
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
|
||||||
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_0.featureLevel());
|
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_0.featureLevel());
|
||||||
});
|
});
|
||||||
|
@ -88,7 +94,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
).setStandalone(true).build()) {
|
).setStandalone(true).build()) {
|
||||||
cluster.format();
|
cluster.format();
|
||||||
cluster.startup();
|
cluster.startup();
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||||
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
|
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
|
||||||
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_1.featureLevel());
|
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_1.featureLevel());
|
||||||
});
|
});
|
||||||
|
@ -126,7 +132,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
) {
|
) {
|
||||||
cluster.format();
|
cluster.format();
|
||||||
cluster.startup();
|
cluster.startup();
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||||
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||||
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||||
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
||||||
|
@ -161,7 +167,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
) {
|
) {
|
||||||
cluster.format();
|
cluster.format();
|
||||||
cluster.startup();
|
cluster.startup();
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||||
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||||
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||||
assertEquals(Set.of(3000, 3001, 3002, 3003), voters.keySet());
|
assertEquals(Set.of(3000, 3001, 3002, 3003), voters.keySet());
|
||||||
|
@ -200,7 +206,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
) {
|
) {
|
||||||
cluster.format();
|
cluster.format();
|
||||||
cluster.startup();
|
cluster.startup();
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||||
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||||
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||||
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
||||||
|
@ -238,7 +244,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
) {
|
) {
|
||||||
cluster.format();
|
cluster.format();
|
||||||
cluster.startup();
|
cluster.startup();
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||||
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||||
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||||
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
||||||
|
@ -249,4 +255,95 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
|
||||||
|
final var nodes = new TestKitNodes.Builder()
|
||||||
|
.setClusterId("test-cluster")
|
||||||
|
.setNumBrokerNodes(1)
|
||||||
|
.setNumControllerNodes(3)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
final Map<Integer, Uuid> initialVoters = new HashMap<>();
|
||||||
|
for (final var controllerNode : nodes.controllerNodes().values()) {
|
||||||
|
initialVoters.put(
|
||||||
|
controllerNode.id(),
|
||||||
|
controllerNode.metadataDirectoryId()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
|
||||||
|
cluster.format();
|
||||||
|
cluster.startup();
|
||||||
|
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||||
|
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||||
|
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||||
|
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
||||||
|
for (int replicaId : new int[] {3000, 3001, 3002}) {
|
||||||
|
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
|
||||||
|
admin.removeRaftVoter(
|
||||||
|
3000,
|
||||||
|
dirId,
|
||||||
|
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
|
||||||
|
).all().get();
|
||||||
|
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||||
|
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||||
|
assertEquals(Set.of(3001, 3002), voters.keySet());
|
||||||
|
for (int replicaId : new int[] {3001, 3002}) {
|
||||||
|
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
admin.addRaftVoter(
|
||||||
|
3000,
|
||||||
|
dirId,
|
||||||
|
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
|
||||||
|
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
|
||||||
|
).all().get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
|
||||||
|
final var nodes = new TestKitNodes.Builder()
|
||||||
|
.setClusterId("test-cluster")
|
||||||
|
.setNumBrokerNodes(1)
|
||||||
|
.setNumControllerNodes(3)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
final Map<Integer, Uuid> initialVoters = new HashMap<>();
|
||||||
|
for (final var controllerNode : nodes.controllerNodes().values()) {
|
||||||
|
initialVoters.put(
|
||||||
|
controllerNode.id(),
|
||||||
|
controllerNode.metadataDirectoryId()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
|
||||||
|
cluster.format();
|
||||||
|
cluster.startup();
|
||||||
|
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||||
|
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
|
||||||
|
var removeFuture = admin.removeRaftVoter(
|
||||||
|
3000,
|
||||||
|
dirId,
|
||||||
|
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
|
||||||
|
).all();
|
||||||
|
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
|
||||||
|
|
||||||
|
var addFuture = admin.addRaftVoter(
|
||||||
|
3000,
|
||||||
|
dirId,
|
||||||
|
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
|
||||||
|
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
|
||||||
|
).all();
|
||||||
|
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue