From 423330ebe7d76697f6cb108530fcd4cb8be1db48 Mon Sep 17 00:00:00 2001 From: Logan Zhu Date: Tue, 30 Sep 2025 21:17:41 +0800 Subject: [PATCH] KAFKA-19692 improve the docs of "clusterId" for AddRaftVoterOptions and RemoveRaftVoterOptions (#20555) Improves the documentation of the clusterId field in AddRaftVoterOptions and RemoveRaftVoterOptions. The changes include: 1. Adding Javadoc to both addRaftVoter and removeRaftVoter methods to explain the behavior of the optional clusterId. 2. Integration tests have been added to verify the correct behavior of add and remove voter operations with and without clusterId, including scenarios with inconsistent cluster ids. Reviewers: TengYao Chi , Chia-Ping Tsai --- .../ReconfigurableQuorumIntegrationTest.java | 132 ++++++++++++++++++ .../clients/admin/AddRaftVoterOptions.java | 9 ++ .../org/apache/kafka/clients/admin/Admin.java | 19 ++- .../clients/admin/RemoveRaftVoterOptions.java | 9 ++ 4 files changed, 167 insertions(+), 2 deletions(-) create mode 100644 clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ReconfigurableQuorumIntegrationTest.java diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ReconfigurableQuorumIntegrationTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ReconfigurableQuorumIntegrationTest.java new file mode 100644 index 00000000000..f02db36c061 --- /dev/null +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ReconfigurableQuorumIntegrationTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InconsistentClusterIdException; +import org.apache.kafka.common.test.KafkaClusterTestKit; +import org.apache.kafka.common.test.TestKitNodes; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class ReconfigurableQuorumIntegrationTest { + + static Map descVoterDirs(Admin admin) throws ExecutionException, InterruptedException { + var quorumInfo = admin.describeMetadataQuorum().quorumInfo().get(); + return quorumInfo.voters().stream().collect(Collectors.toMap(QuorumInfo.ReplicaState::replicaId, QuorumInfo.ReplicaState::replicaDirectoryId)); + } + + @Test + public void testRemoveAndAddVoterWithValidClusterId() throws Exception { + final var nodes = new TestKitNodes.Builder() + .setClusterId("test-cluster") + .setNumBrokerNodes(1) + .setNumControllerNodes(3) + .build(); + + final Map initialVoters = new HashMap<>(); + for (final var controllerNode : nodes.controllerNodes().values()) { + initialVoters.put( + controllerNode.id(), + controllerNode.metadataDirectoryId() + ); + } + + try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) { + cluster.format(); + cluster.startup(); + try (Admin admin = Admin.create(cluster.clientProperties())) { + TestUtils.waitForCondition(() -> { + Map voters = descVoterDirs(admin); + assertEquals(Set.of(3000, 3001, 3002), voters.keySet()); + return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID)); + }, "Initial quorum voters should be {3000, 3001, 3002} and all should have non-zero directory IDs"); + + Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId(); + admin.removeRaftVoter( + 3000, + dirId, + new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster")) + ).all().get(); + TestUtils.waitForCondition(() -> { + Map voters = descVoterDirs(admin); + assertEquals(Set.of(3001, 3002), voters.keySet()); + return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID)); + }, "After removing voter 3000, remaining voters should be {3001, 3002} with non-zero directory IDs"); + + admin.addRaftVoter( + 3000, + dirId, + Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)), + new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster")) + ).all().get(); + } + } + } + + @Test + public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception { + final var nodes = new TestKitNodes.Builder() + .setClusterId("test-cluster") + .setNumBrokerNodes(1) + .setNumControllerNodes(3) + .build(); + + final Map initialVoters = new HashMap<>(); + for (final var controllerNode : nodes.controllerNodes().values()) { + initialVoters.put( + controllerNode.id(), + controllerNode.metadataDirectoryId() + ); + } + + try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) { + cluster.format(); + cluster.startup(); + try (Admin admin = Admin.create(cluster.clientProperties())) { + Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId(); + var removeFuture = admin.removeRaftVoter( + 3000, + dirId, + new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent")) + ).all(); + TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture); + + var addFuture = admin.addRaftVoter( + 3000, + dirId, + Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)), + new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent")) + ).all(); + TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture); + } + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java index e28a03d541c..81e889db30d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java @@ -17,11 +17,20 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.protocol.Errors; import java.util.Optional; /** * Options for {@link Admin#addRaftVoter}. + * + *

+ * The clusterId is optional. + *

+ * If provided, the request will only succeed if the cluster id matches the id of the current cluster. + * If the cluster id does not match, the request will fail with {@link Errors#INCONSISTENT_CLUSTER_ID}. + *

+ * If not provided, the cluster id check is skipped. */ @InterfaceStability.Stable public class AddRaftVoterOptions extends AbstractOptions { diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index e596754f62f..06ede9f620d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.FeatureUpdateFailedException; +import org.apache.kafka.common.errors.InconsistentClusterIdException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.quota.ClientQuotaAlteration; @@ -1866,10 +1867,17 @@ public interface Admin extends AutoCloseable { /** * Add a new voter node to the KRaft metadata quorum. * + *

+ * The clusterId in {@link AddRaftVoterOptions} is optional. + * If provided, the operation will only succeed if the cluster id matches the id + * of the current cluster. If the cluster id does not match, the operation + * will fail with {@link InconsistentClusterIdException}. + * If not provided, the cluster id check is skipped. + * * @param voterId The node ID of the voter. * @param voterDirectoryId The directory ID of the voter. * @param endpoints The endpoints that the new voter has. - * @param options The options to use when adding the new voter node. + * @param options Additional options for the operation, including optional cluster ID. */ AddRaftVoterResult addRaftVoter( int voterId, @@ -1894,9 +1902,16 @@ public interface Admin extends AutoCloseable { /** * Remove a voter node from the KRaft metadata quorum. * + *

+ * The clusterId in {@link AddRaftVoterOptions} is optional. + * If provided, the operation will only succeed if the cluster id matches the id + * of the current cluster. If the cluster id does not match, the operation + * will fail with {@link InconsistentClusterIdException}. + * If not provided, the cluster id check is skipped. + * * @param voterId The node ID of the voter. * @param voterDirectoryId The directory ID of the voter. - * @param options The options to use when removing the voter node. + * @param options Additional options for the operation, including optional cluster ID. */ RemoveRaftVoterResult removeRaftVoter( int voterId, diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterOptions.java index cb5fe563c19..da6e965ebe0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterOptions.java @@ -17,11 +17,20 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.protocol.Errors; import java.util.Optional; /** * Options for {@link Admin#removeRaftVoter}. + * + *

+ * The clusterId is optional. + *

+ * If provided, the request will only succeed if the cluster id matches the id of the current cluster. + * If the cluster id does not match, the request will fail with {@link Errors#INCONSISTENT_CLUSTER_ID}. + *

+ * If not provided, the cluster id check is skipped. */ @InterfaceStability.Stable public class RemoveRaftVoterOptions extends AbstractOptions {