KAFKA-17137 Ensure Admin APIs are properly tested (#16658)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Xuan-Zhang Gong 2024-08-21 03:34:10 +08:00 committed by GitHub
parent e23172a48a
commit a537e716eb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 229 additions and 3 deletions

View File

@ -236,6 +236,7 @@
<suppress checks="ImportControl" files="FetchResponseData.java"/>
<suppress checks="ImportControl" files="RecordsSerdeTest.java"/>
<suppress checks="ImportControl" files="ClientTelemetryTest.java"/>
<!-- Streams tests -->
<suppress checks="ClassFanOutComplexity"

View File

@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterConfigProperty;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.server.telemetry.ClientTelemetry;
import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
import org.junit.jupiter.api.extension.ExtendWith;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(value = ClusterTestExtensions.class)
public class ClientTelemetryTest {
@ClusterTest(types = Type.KRAFT,
serverProperties = @ClusterConfigProperty(key = AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
value = "kafka.admin.ClientTelemetryTest$GetIdClientTelemetry"))
public void testClientInstanceId(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
try (Admin admin = Admin.create(configs)) {
String testTopicName = "test_topic";
admin.createTopics(Collections.singletonList(new NewTopic(testTopicName, 1, (short) 1)));
clusterInstance.waitForTopic(testTopicName, 1);
Map<String, Object> producerConfigs = new HashMap<>();
producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
try (Producer<String, String> producer = new KafkaProducer<>(producerConfigs)) {
producer.send(new ProducerRecord<>(testTopicName, 0, null, "bar")).get();
producer.flush();
Uuid producerClientId = producer.clientInstanceId(Duration.ofSeconds(3));
assertNotNull(producerClientId);
assertEquals(producerClientId, producer.clientInstanceId(Duration.ofSeconds(3)));
}
Map<String, Object> consumerConfigs = new HashMap<>();
consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
try (Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfigs)) {
consumer.assign(Collections.singletonList(new TopicPartition(testTopicName, 0)));
consumer.seekToBeginning(Collections.singletonList(new TopicPartition(testTopicName, 0)));
Uuid consumerClientId = consumer.clientInstanceId(Duration.ofSeconds(5));
// before poll, the clientInstanceId will return null
assertNull(consumerClientId);
List<String> values = new ArrayList<>();
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
values.add(record.value());
}
assertEquals(1, values.size());
assertEquals("bar", values.get(0));
consumerClientId = consumer.clientInstanceId(Duration.ofSeconds(3));
assertNotNull(consumerClientId);
assertEquals(consumerClientId, consumer.clientInstanceId(Duration.ofSeconds(3)));
}
Uuid uuid = admin.clientInstanceId(Duration.ofSeconds(3));
assertNotNull(uuid);
assertEquals(uuid, admin.clientInstanceId(Duration.ofSeconds(3)));
}
}
@ClusterTest(types = Type.KRAFT)
public void testMetrics(ClusterInstance clusterInstance) {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
List<String> expectedMetricsName = Arrays.asList("request-size-max", "io-wait-ratio", "response-total",
"version", "io-time-ns-avg", "network-io-rate");
try (Admin admin = Admin.create(configs)) {
Set<String> actualMetricsName = admin.metrics().keySet().stream()
.map(MetricName::name)
.collect(Collectors.toSet());
expectedMetricsName.forEach(expectedName -> assertTrue(actualMetricsName.contains(expectedName),
String.format("actual metrics name: %s dont contains expected: %s", actualMetricsName,
expectedName)));
assertTrue(actualMetricsName.containsAll(expectedMetricsName));
}
}
/**
* We should add a ClientTelemetry into plugins to test the clientInstanceId method Otherwise the
* {@link org.apache.kafka.common.protocol.ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS } command will not be supported
* by the server
**/
public static class GetIdClientTelemetry implements ClientTelemetry, MetricsReporter {
@Override
public void init(List<KafkaMetric> metrics) {
}
@Override
public void metricChange(KafkaMetric metric) {
}
@Override
public void metricRemoval(KafkaMetric metric) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ClientTelemetryReceiver clientReceiver() {
return (context, payload) -> {
};
}
}
}

View File

@ -18,17 +18,20 @@
package integration.kafka.admin
import kafka.api.IntegrationTestHarness
import kafka.security.minikdc.MiniKdc.createConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.errors.{InvalidProducerEpochException, ProducerFencedException}
import org.apache.kafka.common.errors.{InvalidProducerEpochException, ProducerFencedException, TimeoutException}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.apache.kafka.server.config.ServerLogConfigs
import org.junit.jupiter.api.Assertions.{assertInstanceOf, assertThrows, assertTrue, fail}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, TestInfo}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.time.Duration
import java.util.concurrent.ExecutionException
import java.util.{Collections, Properties}
import scala.collection.Seq
@ -107,6 +110,22 @@ class AdminFenceProducersIntegrationTest extends IntegrationTestHarness {
assertThrows(classOf[ProducerFencedException], () => producer.commitTransaction())
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@Timeout(value = 30)
def testFenceProducerTimeoutMs(quorum: String): Unit = {
adminClient = {
val config = createConfig
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}")
Admin.create(config)
}
try {
val e = assertThrows(classOf[ExecutionException], () => adminClient.fenceProducers(Collections.singletonList(txnId),
new FenceProducersOptions().timeoutMs(0)).all().get())
assertInstanceOf(classOf[TimeoutException], e.getCause)
} finally adminClient.close(Duration.ofSeconds(0))
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testFenceBeforeProducerCommit(quorum: String): Unit = {

View File

@ -3067,6 +3067,36 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertEquals(expected, config.value())
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testListClientMetricsResources(quorum: String): Unit = {
client = createAdminClient
client.createTopics(Collections.singleton(new NewTopic(topic, partition, 0.toShort)))
assertTrue(client.listClientMetricsResources().all().get().isEmpty)
val name = "name"
val configResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, name)
val configEntry = new ConfigEntry("interval.ms", "111")
val configOp = new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET)
client.incrementalAlterConfigs(Collections.singletonMap(configResource, Collections.singletonList(configOp))).all().get()
TestUtils.waitUntilTrue(() => {
val results = client.listClientMetricsResources().all().get()
results.size() == 1 && results.iterator().next().equals(new ClientMetricsResourceListing(name))
}, "metadata timeout")
}
@ParameterizedTest
@ValueSource(strings = Array("quorum=kraft"))
@Timeout(30)
def testListClientMetricsResourcesTimeoutMs(ignored: String): Unit = {
client = createInvalidAdminClient()
try {
val timeoutOption = new ListClientMetricsResourcesOptions().timeoutMs(0)
val exception = assertThrows(classOf[ExecutionException], () =>
client.listClientMetricsResources(timeoutOption).all().get())
assertInstanceOf(classOf[TimeoutException], exception.getCause)
} finally client.close(time.Duration.ZERO)
}
/**
* Test that createTopics returns the dynamic configurations of the topics that were created.
*