Compare commits

..

1 Commits

Author SHA1 Message Date
Shashank 9115416c00
Merge ae54bc2d69 into 2938c4242e 2025-10-07 06:47:36 -07:00
2 changed files with 140 additions and 105 deletions

View File

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Tag("integration")
public class ReconfigurableQuorumIntegrationTest {
static Map<Integer, Uuid> descVoterDirs(Admin admin) throws ExecutionException, InterruptedException {
var quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
return quorumInfo.voters().stream().collect(Collectors.toMap(QuorumInfo.ReplicaState::replicaId, QuorumInfo.ReplicaState::replicaDirectoryId));
}
@Test
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
TestUtils.waitForCondition(() -> {
Map<Integer, Uuid> voters = descVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
}, "Initial quorum voters should be {3000, 3001, 3002} and all should have non-zero directory IDs");
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
TestUtils.waitForCondition(() -> {
Map<Integer, Uuid> voters = descVoterDirs(admin);
assertEquals(Set.of(3001, 3002), voters.keySet());
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
}, "After removing voter 3000, remaining voters should be {3001, 3002} with non-zero directory IDs");
admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
}
}
}
@Test
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
var removeFuture = admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
var addFuture = admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
}
}
}
}

View File

@ -15,16 +15,13 @@
* limitations under the License.
*/
package org.apache.kafka.server;
package kafka.server;
import org.apache.kafka.clients.admin.AddRaftVoterOptions;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.FeatureMetadata;
import org.apache.kafka.clients.admin.QuorumInfo;
import org.apache.kafka.clients.admin.RaftVoterEndpoint;
import org.apache.kafka.clients.admin.RemoveRaftVoterOptions;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.common.test.api.TestKitDefaults;
@ -32,12 +29,10 @@ import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
@ -46,7 +41,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("integration")
public class ReconfigurableQuorumIntegrationTest {
static void checkKRaftVersions(Admin admin, short finalized) throws Exception {
FeatureMetadata featureMetadata = admin.describeFeatures().featureMetadata().get();
@ -76,7 +70,7 @@ public class ReconfigurableQuorumIntegrationTest {
).build()) {
cluster.format();
cluster.startup();
try (var admin = Admin.create(cluster.clientProperties())) {
try (Admin admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_0.featureLevel());
});
@ -94,7 +88,7 @@ public class ReconfigurableQuorumIntegrationTest {
).setStandalone(true).build()) {
cluster.format();
cluster.startup();
try (var admin = Admin.create(cluster.clientProperties())) {
try (Admin admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_1.featureLevel());
});
@ -132,7 +126,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
try (var admin = Admin.create(cluster.clientProperties())) {
try (Admin admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -167,7 +161,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
try (var admin = Admin.create(cluster.clientProperties())) {
try (Admin admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002, 3003), voters.keySet());
@ -206,7 +200,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
try (var admin = Admin.create(cluster.clientProperties())) {
try (Admin admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -244,7 +238,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
try (var admin = Admin.create(cluster.clientProperties())) {
try (Admin admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -255,95 +249,4 @@ public class ReconfigurableQuorumIntegrationTest {
}
}
}
@Test
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
for (int replicaId : new int[] {3000, 3001, 3002}) {
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
}
});
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3001, 3002), voters.keySet());
for (int replicaId : new int[] {3001, 3002}) {
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
}
});
admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
}
}
}
@Test
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (var admin = Admin.create(cluster.clientProperties())) {
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
var removeFuture = admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
var addFuture = admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
}
}
}
}
}