From e05b0e68e47ca123f33fb54f88820219a44cfb86 Mon Sep 17 00:00:00 2001 From: ClarkChen Date: Sun, 16 Mar 2025 02:35:41 +0800 Subject: [PATCH] KAFKA-18915 Rewrite AdminClientRebootstrapTest to cover the current scenario (#19187) Reviewers: Jhen-Yung Hsu , TengYao Chi , Ken Huang , Chia-Ping Tsai --- ...port-control-clients-integration-tests.xml | 1 + .../kafka/clients/ClientRebootstrapTest.java | 97 +++++++++++++++ .../test/api/AdminClientRebootstrapTest.java | 110 ------------------ 3 files changed, 98 insertions(+), 110 deletions(-) create mode 100644 clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java delete mode 100644 core/src/test/java/kafka/test/api/AdminClientRebootstrapTest.java diff --git a/checkstyle/import-control-clients-integration-tests.xml b/checkstyle/import-control-clients-integration-tests.xml index b94b63462ac..8294c43f922 100644 --- a/checkstyle/import-control-clients-integration-tests.xml +++ b/checkstyle/import-control-clients-integration-tests.xml @@ -20,6 +20,7 @@ + diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java new file mode 100644 index 00000000000..9c21c09f50c --- /dev/null +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class ClientRebootstrapTest { + private static final String TOPIC = "topic"; + private static final int REPLICAS = 2; + + @ClusterTest( + brokers = REPLICAS, + types = {Type.KRAFT}, + serverProperties = { + @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2") + } + ) + public void testAdminRebootstrap(ClusterInstance clusterInstance) { + var broker0 = 0; + var broker1 = 1; + var timeout = 60; + + clusterInstance.shutdownBroker(broker0); + + try (var admin = clusterInstance.admin()) { + admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short) REPLICAS))); + + // Only the broker 1 is available for the admin client during the bootstrap. + assertDoesNotThrow(() -> admin.listTopics().names().get(timeout, TimeUnit.SECONDS).contains(TOPIC)); + + clusterInstance.shutdownBroker(broker1); + clusterInstance.startBroker(broker0); + + // The broker 1, originally cached during the bootstrap, is offline. + // However, the broker 0 from the bootstrap list is online. + // Should be able to list topics again. + assertDoesNotThrow(() -> admin.listTopics().names().get(timeout, TimeUnit.SECONDS).contains(TOPIC)); + } + } + + @ClusterTest( + brokers = REPLICAS, + types = {Type.KRAFT}, + serverProperties = { + @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2") + } + ) + public void testAdminRebootstrapDisabled(ClusterInstance clusterInstance) { + var broker0 = 0; + var broker1 = 1; + + clusterInstance.shutdownBroker(broker0); + + var admin = clusterInstance.admin(Map.of(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "none")); + admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short) REPLICAS))); + + // Only the broker 1 is available for the admin client during the bootstrap. + assertDoesNotThrow(() -> admin.listTopics().names().get(60, TimeUnit.SECONDS).contains(TOPIC)); + + clusterInstance.shutdownBroker(broker1); + clusterInstance.startBroker(broker0); + + // The broker 1, originally cached during the bootstrap, is offline. + // As a result, the admin client will throw a TimeoutException when trying to get list of the topics. + assertThrows(TimeoutException.class, () -> admin.listTopics().names().get(5, TimeUnit.SECONDS)); + // Since the brokers cached during the bootstrap are offline, the admin client needs to wait the default timeout for other threads. + admin.close(Duration.ZERO); + } +} diff --git a/core/src/test/java/kafka/test/api/AdminClientRebootstrapTest.java b/core/src/test/java/kafka/test/api/AdminClientRebootstrapTest.java deleted file mode 100644 index b1a38aaceec..00000000000 --- a/core/src/test/java/kafka/test/api/AdminClientRebootstrapTest.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.test.api; - -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.common.config.TopicConfig; -import org.apache.kafka.common.test.ClusterInstance; -import org.apache.kafka.common.test.api.ClusterConfig; -import org.apache.kafka.common.test.api.ClusterTemplate; -import org.apache.kafka.common.test.api.Type; -import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; -import org.apache.kafka.test.TestUtils; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; - -public class AdminClientRebootstrapTest { - private static final int BROKER_COUNT = 2; - - private static List generator() { - // Enable unclean leader election for the test topic - Map serverProperties = Map.of( - TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true", - GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, String.valueOf(BROKER_COUNT) - ); - - return Stream.of(false, true) - .map(AdminClientRebootstrapTest::getRebootstrapConfig) - .map(rebootstrapProperties -> AdminClientRebootstrapTest.buildConfig(serverProperties, rebootstrapProperties)) - .toList(); - } - - private static Map getRebootstrapConfig(boolean useRebootstrapTriggerMs) { - Map properties = new HashMap<>(); - if (useRebootstrapTriggerMs) { - properties.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG, "5000"); - } else { - properties.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG, "3600000"); - properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, "5000"); - properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, "5000"); - properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "1000"); - properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1000"); - } - properties.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "rebootstrap"); - return properties; - } - - private static ClusterConfig buildConfig(Map serverProperties, Map rebootstrapProperties) { - return ClusterConfig.defaultBuilder() - .setTypes(Set.of(Type.KRAFT)) - .setBrokers(BROKER_COUNT) - .setServerProperties(serverProperties).build(); - } - - @ClusterTemplate(value = "generator") - public void testRebootstrap(ClusterInstance clusterInstance) throws InterruptedException { - var topic = "topic"; - var timeout = 5; - try (var admin = clusterInstance.admin()) { - admin.createTopics(List.of(new NewTopic(topic, BROKER_COUNT, (short) 2))); - - var server0 = clusterInstance.brokers().get(0); - var server1 = clusterInstance.brokers().get(1); - - server1.shutdown(); - server1.awaitShutdown(); - - // Only the server 0 is available for the admin client during the bootstrap. - TestUtils.waitForCondition(() -> admin.listTopics().names().get(timeout, TimeUnit.MINUTES).contains(topic), - "timed out waiting for topics"); - - server0.shutdown(); - server0.awaitShutdown(); - server1.startup(); - - // The server 0, originally cached during the bootstrap, is offline. - // However, the server 1 from the bootstrap list is online. - // Should be able to list topics again. - TestUtils.waitForCondition(() -> admin.listTopics().names().get(timeout, TimeUnit.MINUTES).contains(topic), - "timed out waiting for topics"); - - server1.shutdown(); - server1.awaitShutdown(); - server0.startup(); - - // The same situation, but the server 1 has gone and server 0 is back. - TestUtils.waitForCondition(() -> admin.listTopics().names().get(timeout, TimeUnit.MINUTES).contains(topic), - "timed out waiting for topics"); - } - } -}