From e0c77140b2cce4bcc4d9d983536d6cfa3df070ba Mon Sep 17 00:00:00 2001 From: TengYao Chi Date: Sat, 1 Mar 2025 23:55:35 +0800 Subject: [PATCH] KAFKA-17039 KIP-919 supports for unregisterBroker (#19063) Reviewers: Chia-Ping Tsai --- .../apache/kafka/clients/admin/KafkaAdminClient.java | 2 +- .../integration/kafka/server/KRaftClusterTest.scala | 7 ++++--- .../java/org/apache/kafka/tools/ClusterToolTest.java | 11 +---------- 3 files changed, 6 insertions(+), 14 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 7737625078d..725e48c3656 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4729,7 +4729,7 @@ public class KafkaAdminClient extends AdminClient { final KafkaFutureImpl future = new KafkaFutureImpl<>(); final long now = time.milliseconds(); final Call call = new Call("unregisterBroker", calcDeadlineMs(now, options.timeoutMs()), - new LeastLoadedNodeProvider()) { + new LeastLoadedBrokerOrActiveKController()) { @Override UnregisterBrokerRequest.Builder createRequest(int timeoutMs) { diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index 06cb2e2eebb..b4515f7919d 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -833,8 +833,9 @@ class KRaftClusterTest { Option(image.brokers().get(brokerId)).isEmpty } - @Test - def testUnregisterBroker(): Unit = { + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testUnregisterBroker(usingBootstrapController: Boolean): Unit = { val cluster = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). setNumBrokerNodes(4). @@ -848,7 +849,7 @@ class KRaftClusterTest { cluster.brokers().get(0).shutdown() TestUtils.waitUntilTrue(() => !brokerIsUnfenced(clusterImage(cluster, 1), 0), "Timed out waiting for broker 0 to be fenced.") - val admin = Admin.create(cluster.clientProperties()) + val admin = createAdminClient(cluster, bootstrapController = usingBootstrapController); try { admin.unregisterBroker(0) } finally { diff --git a/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java b/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java index 8e7a77c2925..6a2fcc5150e 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java @@ -18,7 +18,6 @@ package org.apache.kafka.tools; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.MockAdminClient; -import org.apache.kafka.common.errors.UnsupportedEndpointTypeException; import org.apache.kafka.common.test.ClusterInstance; import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.Type; @@ -30,12 +29,10 @@ import java.io.PrintStream; import java.util.Arrays; import java.util.List; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -102,13 +99,7 @@ public class ClusterToolTest { brokerIds.removeAll(clusterInstance.controllerIds()); int brokerId = assertDoesNotThrow(() -> brokerIds.stream().findFirst().get()); clusterInstance.shutdownBroker(brokerId); - ExecutionException exception = - assertThrows(ExecutionException.class, - () -> ClusterTool.execute("unregister", "--bootstrap-controller", clusterInstance.bootstrapControllers(), "--id", String.valueOf(brokerId))); - assertNotNull(exception.getCause()); - assertEquals(UnsupportedEndpointTypeException.class, exception.getCause().getClass()); - assertEquals("This Admin API is not yet supported when communicating directly with " + - "the controller quorum.", exception.getCause().getMessage()); + assertDoesNotThrow(() -> ClusterTool.execute("unregister", "--bootstrap-controller", clusterInstance.bootstrapControllers(), "--id", String.valueOf(brokerId))); } @ClusterTest(brokers = 3, types = {Type.KRAFT, Type.CO_KRAFT})