KAFKA-17039 KIP-919 supports for unregisterBroker (#19063)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TengYao Chi 2025-03-01 23:55:35 +08:00 committed by GitHub
parent 2ccc73783e
commit e0c77140b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 6 additions and 14 deletions

View File

@ -4729,7 +4729,7 @@ public class KafkaAdminClient extends AdminClient {
final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>(); final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
final long now = time.milliseconds(); final long now = time.milliseconds();
final Call call = new Call("unregisterBroker", calcDeadlineMs(now, options.timeoutMs()), final Call call = new Call("unregisterBroker", calcDeadlineMs(now, options.timeoutMs()),
new LeastLoadedNodeProvider()) { new LeastLoadedBrokerOrActiveKController()) {
@Override @Override
UnregisterBrokerRequest.Builder createRequest(int timeoutMs) { UnregisterBrokerRequest.Builder createRequest(int timeoutMs) {

View File

@ -833,8 +833,9 @@ class KRaftClusterTest {
Option(image.brokers().get(brokerId)).isEmpty Option(image.brokers().get(brokerId)).isEmpty
} }
@Test @ParameterizedTest
def testUnregisterBroker(): Unit = { @ValueSource(booleans = Array(true, false))
def testUnregisterBroker(usingBootstrapController: Boolean): Unit = {
val cluster = new KafkaClusterTestKit.Builder( val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder(). new TestKitNodes.Builder().
setNumBrokerNodes(4). setNumBrokerNodes(4).
@ -848,7 +849,7 @@ class KRaftClusterTest {
cluster.brokers().get(0).shutdown() cluster.brokers().get(0).shutdown()
TestUtils.waitUntilTrue(() => !brokerIsUnfenced(clusterImage(cluster, 1), 0), TestUtils.waitUntilTrue(() => !brokerIsUnfenced(clusterImage(cluster, 1), 0),
"Timed out waiting for broker 0 to be fenced.") "Timed out waiting for broker 0 to be fenced.")
val admin = Admin.create(cluster.clientProperties()) val admin = createAdminClient(cluster, bootstrapController = usingBootstrapController);
try { try {
admin.unregisterBroker(0) admin.unregisterBroker(0)
} finally { } finally {

View File

@ -18,7 +18,6 @@ package org.apache.kafka.tools;
import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
import org.apache.kafka.common.test.ClusterInstance; import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type; import org.apache.kafka.common.test.api.Type;
@ -30,12 +29,10 @@ import java.io.PrintStream;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -102,13 +99,7 @@ public class ClusterToolTest {
brokerIds.removeAll(clusterInstance.controllerIds()); brokerIds.removeAll(clusterInstance.controllerIds());
int brokerId = assertDoesNotThrow(() -> brokerIds.stream().findFirst().get()); int brokerId = assertDoesNotThrow(() -> brokerIds.stream().findFirst().get());
clusterInstance.shutdownBroker(brokerId); clusterInstance.shutdownBroker(brokerId);
ExecutionException exception = assertDoesNotThrow(() -> ClusterTool.execute("unregister", "--bootstrap-controller", clusterInstance.bootstrapControllers(), "--id", String.valueOf(brokerId)));
assertThrows(ExecutionException.class,
() -> ClusterTool.execute("unregister", "--bootstrap-controller", clusterInstance.bootstrapControllers(), "--id", String.valueOf(brokerId)));
assertNotNull(exception.getCause());
assertEquals(UnsupportedEndpointTypeException.class, exception.getCause().getClass());
assertEquals("This Admin API is not yet supported when communicating directly with " +
"the controller quorum.", exception.getCause().getMessage());
} }
@ClusterTest(brokers = 3, types = {Type.KRAFT, Type.CO_KRAFT}) @ClusterTest(brokers = 3, types = {Type.KRAFT, Type.CO_KRAFT})