diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ReconfigurableQuorumIntegrationTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ReconfigurableQuorumIntegrationTest.java new file mode 100644 index 00000000000..f02db36c061 --- /dev/null +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ReconfigurableQuorumIntegrationTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InconsistentClusterIdException; +import org.apache.kafka.common.test.KafkaClusterTestKit; +import org.apache.kafka.common.test.TestKitNodes; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class ReconfigurableQuorumIntegrationTest { + + static Map descVoterDirs(Admin admin) throws ExecutionException, InterruptedException { + var quorumInfo = admin.describeMetadataQuorum().quorumInfo().get(); + return quorumInfo.voters().stream().collect(Collectors.toMap(QuorumInfo.ReplicaState::replicaId, QuorumInfo.ReplicaState::replicaDirectoryId)); + } + + @Test + public void testRemoveAndAddVoterWithValidClusterId() throws Exception { + final var nodes = new TestKitNodes.Builder() + .setClusterId("test-cluster") + .setNumBrokerNodes(1) + .setNumControllerNodes(3) + .build(); + + final Map initialVoters = new HashMap<>(); + for (final var controllerNode : nodes.controllerNodes().values()) { + initialVoters.put( + controllerNode.id(), + controllerNode.metadataDirectoryId() + ); + } + + try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) { + cluster.format(); + cluster.startup(); + try (Admin admin = Admin.create(cluster.clientProperties())) { + TestUtils.waitForCondition(() -> { + Map voters = descVoterDirs(admin); + assertEquals(Set.of(3000, 3001, 3002), voters.keySet()); + return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID)); + }, "Initial quorum voters should be {3000, 3001, 3002} and all should have non-zero directory IDs"); + + Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId(); + admin.removeRaftVoter( + 3000, + dirId, + new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster")) + ).all().get(); + TestUtils.waitForCondition(() -> { + Map voters = descVoterDirs(admin); + assertEquals(Set.of(3001, 3002), voters.keySet()); + return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID)); + }, "After removing voter 3000, remaining voters should be {3001, 3002} with non-zero directory IDs"); + + admin.addRaftVoter( + 3000, + dirId, + Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)), + new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster")) + ).all().get(); + } + } + } + + @Test + public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception { + final var nodes = new TestKitNodes.Builder() + .setClusterId("test-cluster") + .setNumBrokerNodes(1) + .setNumControllerNodes(3) + .build(); + + final Map initialVoters = new HashMap<>(); + for (final var controllerNode : nodes.controllerNodes().values()) { + initialVoters.put( + controllerNode.id(), + controllerNode.metadataDirectoryId() + ); + } + + try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) { + cluster.format(); + cluster.startup(); + try (Admin admin = Admin.create(cluster.clientProperties())) { + Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId(); + var removeFuture = admin.removeRaftVoter( + 3000, + dirId, + new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent")) + ).all(); + TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture); + + var addFuture = admin.addRaftVoter( + 3000, + dirId, + Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)), + new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent")) + ).all(); + TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture); + } + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java index e28a03d541c..81e889db30d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java @@ -17,11 +17,20 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.protocol.Errors; import java.util.Optional; /** * Options for {@link Admin#addRaftVoter}. + * + *

+ * The clusterId is optional. + *

+ * If provided, the request will only succeed if the cluster id matches the id of the current cluster. + * If the cluster id does not match, the request will fail with {@link Errors#INCONSISTENT_CLUSTER_ID}. + *

+ * If not provided, the cluster id check is skipped. */ @InterfaceStability.Stable public class AddRaftVoterOptions extends AbstractOptions { diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index e596754f62f..06ede9f620d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.FeatureUpdateFailedException; +import org.apache.kafka.common.errors.InconsistentClusterIdException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.quota.ClientQuotaAlteration; @@ -1866,10 +1867,17 @@ public interface Admin extends AutoCloseable { /** * Add a new voter node to the KRaft metadata quorum. * + *

+ * The clusterId in {@link AddRaftVoterOptions} is optional. + * If provided, the operation will only succeed if the cluster id matches the id + * of the current cluster. If the cluster id does not match, the operation + * will fail with {@link InconsistentClusterIdException}. + * If not provided, the cluster id check is skipped. + * * @param voterId The node ID of the voter. * @param voterDirectoryId The directory ID of the voter. * @param endpoints The endpoints that the new voter has. - * @param options The options to use when adding the new voter node. + * @param options Additional options for the operation, including optional cluster ID. */ AddRaftVoterResult addRaftVoter( int voterId, @@ -1894,9 +1902,16 @@ public interface Admin extends AutoCloseable { /** * Remove a voter node from the KRaft metadata quorum. * + *

+ * The clusterId in {@link AddRaftVoterOptions} is optional. + * If provided, the operation will only succeed if the cluster id matches the id + * of the current cluster. If the cluster id does not match, the operation + * will fail with {@link InconsistentClusterIdException}. + * If not provided, the cluster id check is skipped. + * * @param voterId The node ID of the voter. * @param voterDirectoryId The directory ID of the voter. - * @param options The options to use when removing the voter node. + * @param options Additional options for the operation, including optional cluster ID. */ RemoveRaftVoterResult removeRaftVoter( int voterId, diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterOptions.java index cb5fe563c19..da6e965ebe0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterOptions.java @@ -17,11 +17,20 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.protocol.Errors; import java.util.Optional; /** * Options for {@link Admin#removeRaftVoter}. + * + *

+ * The clusterId is optional. + *

+ * If provided, the request will only succeed if the cluster id matches the id of the current cluster. + * If the cluster id does not match, the request will fail with {@link Errors#INCONSISTENT_CLUSTER_ID}. + *

+ * If not provided, the cluster id check is skipped. */ @InterfaceStability.Stable public class RemoveRaftVoterOptions extends AbstractOptions {