KAFKA-17171 Add test cases for `STATIC_BROKER_CONFIG`in kraft mode (#18463)

Given that the `core` module will be separated into other small modules,
this test will not be added to the core module.
Instead, I added it to the `clients-integration-tests` module since it
focuses on the admin client test. The patch should include following test cases.

1. a topic-related static config is added to quorum controller. The
configs from topic creation should include it, but `describeConfigs`
does not.

2. a topic-related static config is added to quorum controller. The
configs from topic creation should include it, and `describeConfigs`
does if admin is using controller.bootstrap

3. a topic-related static config is added to broker. The configs from
topic creation should NOT include it, but `describeConfigs` does.

4. a topic-related static config is added to broker. The configs from
topic creation should NOT include it, and `describeConfigs` does not
also if admin is using controller.bootstrap

for another, the docs of `STATIC_BROKER_CONFIG` should remind the impact of "controller.properties" BTW, those test cases should leverage new test infra, since new test infra allow us to define configs to broker/controller individually.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TengYao Chi 2025-03-18 00:30:53 +08:00 committed by GitHub
parent da46cf6e79
commit a6a0ea56d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 136 additions and 0 deletions

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.test.junit.ClusterTestExtensions;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.junit.jupiter.api.extension.ExtendWith;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@ExtendWith(value = ClusterTestExtensions.class)
public class StaticBrokerConfigTest {
private static final String TOPIC = "topic";
private static final String CUSTOM_VALUE = "1048576";
/**
* synonyms of `segment.bytes`
*/
private static final String LOG_SEGMENT_BYTES = "log.segment.bytes";
@ClusterTest(types = {Type.KRAFT},
serverProperties = {
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@ClusterConfigProperty(id = 3000, key = LOG_SEGMENT_BYTES, value = CUSTOM_VALUE)
})
public void testTopicConfigsGetImpactedIfStaticConfigsAddToController(ClusterInstance cluster)
throws ExecutionException, InterruptedException {
try (
Admin admin = cluster.admin();
Admin adminUsingBootstrapController = cluster.admin(Map.of(), true)
) {
ConfigEntry configEntry = admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short) 1)))
.config(TOPIC).get().get(TopicConfig.SEGMENT_BYTES_CONFIG);
assertEquals(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG, configEntry.source());
assertEquals(CUSTOM_VALUE, configEntry.value(), "Config value should be custom value since controller has related static config");
ConfigResource brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "0");
configEntry = admin.describeConfigs(List.of(brokerResource)).all().get().get(brokerResource).get(LOG_SEGMENT_BYTES);
assertEquals(ConfigEntry.ConfigSource.DEFAULT_CONFIG, configEntry.source());
assertNotEquals(CUSTOM_VALUE,
configEntry.value(),
"Config value should not be custom value since broker doesn't have related static config");
ConfigResource controllerResource = new ConfigResource(ConfigResource.Type.BROKER, "3000");
configEntry = adminUsingBootstrapController.describeConfigs(List.of(controllerResource))
.all().get().get(controllerResource).get(LOG_SEGMENT_BYTES);
assertEquals(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG, configEntry.source());
assertEquals(CUSTOM_VALUE,
configEntry.value(),
"Config value should be custom value since controller has related static config");
}
}
@ClusterTest(types = {Type.KRAFT},
serverProperties = {
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@ClusterConfigProperty(id = 0, key = LOG_SEGMENT_BYTES, value = CUSTOM_VALUE)
})
public void testTopicConfigsGetImpactedIfStaticConfigsAddToBroker(ClusterInstance cluster)
throws ExecutionException, InterruptedException {
try (
Admin admin = cluster.admin();
Admin adminUsingBootstrapController = cluster.admin(Map.of(), true)
) {
ConfigEntry configEntry = admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short) 1)))
.config(TOPIC).get().get(TopicConfig.SEGMENT_BYTES_CONFIG);
assertEquals(ConfigEntry.ConfigSource.DEFAULT_CONFIG, configEntry.source());
assertNotEquals(CUSTOM_VALUE,
configEntry.value(),
"Config value should not be custom value since controller doesn't have static config");
ConfigResource brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "0");
configEntry = admin.describeConfigs(List.of(brokerResource)).all().get().get(brokerResource).get(LOG_SEGMENT_BYTES);
assertEquals(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG, configEntry.source());
assertEquals(CUSTOM_VALUE,
configEntry.value(),
"Config value should be custom value since broker has related static config");
ConfigResource controllerResource = new ConfigResource(ConfigResource.Type.BROKER, "3000");
configEntry = adminUsingBootstrapController.describeConfigs(List.of(controllerResource))
.all().get().get(controllerResource).get(LOG_SEGMENT_BYTES);
assertEquals(ConfigEntry.ConfigSource.DEFAULT_CONFIG, configEntry.source());
assertNotEquals(CUSTOM_VALUE,
configEntry.value(),
"Config value should not be custom value since controller doesn't have related static config");
}
}
}

View File

@ -44,6 +44,7 @@ import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.InvalidUpdateVersionException;
import org.apache.kafka.common.errors.MismatchedEndpointTypeException;
import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
@ -77,6 +78,7 @@ import java.util.stream.Collectors;
import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG;
import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG;
import static org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static org.apache.kafka.server.config.ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@ -342,4 +344,23 @@ public class BootstrapControllersIntegrationTest {
assertEquals(aclBinding, deletedAclBindings.iterator().next());
}
}
@ClusterTest(
brokers = 2,
serverProperties = {
@ClusterConfigProperty(key = TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, value = "2")
}
)
public void testDescribeConfigs(ClusterInstance clusterInstance) throws Exception {
try (Admin admin = Admin.create(adminConfig(clusterInstance, true))) {
ConfigResource resource = new ConfigResource(BROKER, "");
Map<ConfigResource, Config> resourceToConfig = admin.describeConfigs(List.of(resource)).all().get();
Config config = resourceToConfig.get(resource);
assertNotNull(config);
ConfigEntry configEntry = config.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG);
assertEquals(DYNAMIC_DEFAULT_BROKER_CONFIG, configEntry.source());
assertNotNull(configEntry);
assertEquals("2", configEntry.value());
}
}
}