KAFKA-19692 improve the docs of "clusterId" for AddRaftVoterOptions and RemoveRaftVoterOptions (#20555)
CI / build (push) Has been cancelled Details

Improves the documentation of the clusterId field in AddRaftVoterOptions
and RemoveRaftVoterOptions.

The changes include:
1. Adding Javadoc to both addRaftVoter and removeRaftVoter methods to
explain the behavior of the optional clusterId.
2. Integration tests have been added to verify the correct behavior of
add and remove voter operations with and without clusterId, including
scenarios with inconsistent cluster ids.

Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
This commit is contained in:
Logan Zhu 2025-09-30 21:17:41 +08:00 committed by GitHub
parent d1a821226c
commit 423330ebe7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 167 additions and 2 deletions

View File

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Tag("integration")
public class ReconfigurableQuorumIntegrationTest {
static Map<Integer, Uuid> descVoterDirs(Admin admin) throws ExecutionException, InterruptedException {
var quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
return quorumInfo.voters().stream().collect(Collectors.toMap(QuorumInfo.ReplicaState::replicaId, QuorumInfo.ReplicaState::replicaDirectoryId));
}
@Test
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
TestUtils.waitForCondition(() -> {
Map<Integer, Uuid> voters = descVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
}, "Initial quorum voters should be {3000, 3001, 3002} and all should have non-zero directory IDs");
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
TestUtils.waitForCondition(() -> {
Map<Integer, Uuid> voters = descVoterDirs(admin);
assertEquals(Set.of(3001, 3002), voters.keySet());
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
}, "After removing voter 3000, remaining voters should be {3001, 3002} with non-zero directory IDs");
admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
}
}
}
@Test
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
var removeFuture = admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
var addFuture = admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
}
}
}
}

View File

@ -17,11 +17,20 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.protocol.Errors;
import java.util.Optional;
/**
* Options for {@link Admin#addRaftVoter}.
*
* <p>
* The clusterId is optional.
* <p>
* If provided, the request will only succeed if the cluster id matches the id of the current cluster.
* If the cluster id does not match, the request will fail with {@link Errors#INCONSISTENT_CLUSTER_ID}.
* <p>
* If not provided, the cluster id check is skipped.
*/
@InterfaceStability.Stable
public class AddRaftVoterOptions extends AbstractOptions<AddRaftVoterOptions> {

View File

@ -32,6 +32,7 @@ import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.FeatureUpdateFailedException;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
@ -1866,10 +1867,17 @@ public interface Admin extends AutoCloseable {
/**
* Add a new voter node to the KRaft metadata quorum.
*
* <p>
* The clusterId in {@link AddRaftVoterOptions} is optional.
* If provided, the operation will only succeed if the cluster id matches the id
* of the current cluster. If the cluster id does not match, the operation
* will fail with {@link InconsistentClusterIdException}.
* If not provided, the cluster id check is skipped.
*
* @param voterId The node ID of the voter.
* @param voterDirectoryId The directory ID of the voter.
* @param endpoints The endpoints that the new voter has.
* @param options The options to use when adding the new voter node.
* @param options Additional options for the operation, including optional cluster ID.
*/
AddRaftVoterResult addRaftVoter(
int voterId,
@ -1894,9 +1902,16 @@ public interface Admin extends AutoCloseable {
/**
* Remove a voter node from the KRaft metadata quorum.
*
* <p>
* The clusterId in {@link AddRaftVoterOptions} is optional.
* If provided, the operation will only succeed if the cluster id matches the id
* of the current cluster. If the cluster id does not match, the operation
* will fail with {@link InconsistentClusterIdException}.
* If not provided, the cluster id check is skipped.
*
* @param voterId The node ID of the voter.
* @param voterDirectoryId The directory ID of the voter.
* @param options The options to use when removing the voter node.
* @param options Additional options for the operation, including optional cluster ID.
*/
RemoveRaftVoterResult removeRaftVoter(
int voterId,

View File

@ -17,11 +17,20 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.protocol.Errors;
import java.util.Optional;
/**
* Options for {@link Admin#removeRaftVoter}.
*
* <p>
* The clusterId is optional.
* <p>
* If provided, the request will only succeed if the cluster id matches the id of the current cluster.
* If the cluster id does not match, the request will fail with {@link Errors#INCONSISTENT_CLUSTER_ID}.
* <p>
* If not provided, the cluster id check is skipped.
*/
@InterfaceStability.Stable
public class RemoveRaftVoterOptions extends AbstractOptions<RemoveRaftVoterOptions> {