diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 2045a90b5be..5f7d0624476 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -236,6 +236,7 @@
+
configs = new HashMap<>();
+ configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
+ try (Admin admin = Admin.create(configs)) {
+ String testTopicName = "test_topic";
+ admin.createTopics(Collections.singletonList(new NewTopic(testTopicName, 1, (short) 1)));
+ clusterInstance.waitForTopic(testTopicName, 1);
+
+ Map producerConfigs = new HashMap<>();
+ producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
+ producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+
+ try (Producer producer = new KafkaProducer<>(producerConfigs)) {
+ producer.send(new ProducerRecord<>(testTopicName, 0, null, "bar")).get();
+ producer.flush();
+ Uuid producerClientId = producer.clientInstanceId(Duration.ofSeconds(3));
+ assertNotNull(producerClientId);
+ assertEquals(producerClientId, producer.clientInstanceId(Duration.ofSeconds(3)));
+ }
+
+ Map consumerConfigs = new HashMap<>();
+ consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
+ consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+ consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+
+ try (Consumer consumer = new KafkaConsumer<>(consumerConfigs)) {
+ consumer.assign(Collections.singletonList(new TopicPartition(testTopicName, 0)));
+ consumer.seekToBeginning(Collections.singletonList(new TopicPartition(testTopicName, 0)));
+ Uuid consumerClientId = consumer.clientInstanceId(Duration.ofSeconds(5));
+ // before poll, the clientInstanceId will return null
+ assertNull(consumerClientId);
+ List values = new ArrayList<>();
+ ConsumerRecords records = consumer.poll(Duration.ofSeconds(1));
+ for (ConsumerRecord record : records) {
+ values.add(record.value());
+ }
+ assertEquals(1, values.size());
+ assertEquals("bar", values.get(0));
+ consumerClientId = consumer.clientInstanceId(Duration.ofSeconds(3));
+ assertNotNull(consumerClientId);
+ assertEquals(consumerClientId, consumer.clientInstanceId(Duration.ofSeconds(3)));
+ }
+ Uuid uuid = admin.clientInstanceId(Duration.ofSeconds(3));
+ assertNotNull(uuid);
+ assertEquals(uuid, admin.clientInstanceId(Duration.ofSeconds(3)));
+ }
+ }
+
+
+ @ClusterTest(types = Type.KRAFT)
+ public void testMetrics(ClusterInstance clusterInstance) {
+ Map configs = new HashMap<>();
+ configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
+ List expectedMetricsName = Arrays.asList("request-size-max", "io-wait-ratio", "response-total",
+ "version", "io-time-ns-avg", "network-io-rate");
+ try (Admin admin = Admin.create(configs)) {
+ Set actualMetricsName = admin.metrics().keySet().stream()
+ .map(MetricName::name)
+ .collect(Collectors.toSet());
+ expectedMetricsName.forEach(expectedName -> assertTrue(actualMetricsName.contains(expectedName),
+ String.format("actual metrics name: %s dont contains expected: %s", actualMetricsName,
+ expectedName)));
+ assertTrue(actualMetricsName.containsAll(expectedMetricsName));
+ }
+ }
+
+ /**
+ * We should add a ClientTelemetry into plugins to test the clientInstanceId method Otherwise the
+ * {@link org.apache.kafka.common.protocol.ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS } command will not be supported
+ * by the server
+ **/
+ public static class GetIdClientTelemetry implements ClientTelemetry, MetricsReporter {
+
+
+ @Override
+ public void init(List metrics) {
+ }
+
+ @Override
+ public void metricChange(KafkaMetric metric) {
+ }
+
+ @Override
+ public void metricRemoval(KafkaMetric metric) {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void configure(Map configs) {
+ }
+
+ @Override
+ public ClientTelemetryReceiver clientReceiver() {
+ return (context, payload) -> {
+ };
+ }
+ }
+
+}
diff --git a/core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala
index cbfeee50981..e44142109e5 100644
--- a/core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala
@@ -18,17 +18,20 @@
package integration.kafka.admin
import kafka.api.IntegrationTestHarness
+import kafka.security.minikdc.MiniKdc.createConfig
+import kafka.utils.TestUtils
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
-import org.apache.kafka.common.errors.{InvalidProducerEpochException, ProducerFencedException}
+import org.apache.kafka.common.errors.{InvalidProducerEpochException, ProducerFencedException, TimeoutException}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.apache.kafka.server.config.ServerLogConfigs
-import org.junit.jupiter.api.Assertions.{assertInstanceOf, assertThrows, assertTrue, fail}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, TestInfo}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
+import java.time.Duration
import java.util.concurrent.ExecutionException
import java.util.{Collections, Properties}
import scala.collection.Seq
@@ -107,6 +110,22 @@ class AdminFenceProducersIntegrationTest extends IntegrationTestHarness {
assertThrows(classOf[ProducerFencedException], () => producer.commitTransaction())
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ @Timeout(value = 30)
+ def testFenceProducerTimeoutMs(quorum: String): Unit = {
+ adminClient = {
+ val config = createConfig
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}")
+ Admin.create(config)
+ }
+ try {
+ val e = assertThrows(classOf[ExecutionException], () => adminClient.fenceProducers(Collections.singletonList(txnId),
+ new FenceProducersOptions().timeoutMs(0)).all().get())
+ assertInstanceOf(classOf[TimeoutException], e.getCause)
+ } finally adminClient.close(Duration.ofSeconds(0))
+ }
+
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testFenceBeforeProducerCommit(quorum: String): Unit = {
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 2a83b133772..eae352a4119 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -3067,6 +3067,36 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertEquals(expected, config.value())
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testListClientMetricsResources(quorum: String): Unit = {
+ client = createAdminClient
+ client.createTopics(Collections.singleton(new NewTopic(topic, partition, 0.toShort)))
+ assertTrue(client.listClientMetricsResources().all().get().isEmpty)
+ val name = "name"
+ val configResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, name)
+ val configEntry = new ConfigEntry("interval.ms", "111")
+ val configOp = new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET)
+ client.incrementalAlterConfigs(Collections.singletonMap(configResource, Collections.singletonList(configOp))).all().get()
+ TestUtils.waitUntilTrue(() => {
+ val results = client.listClientMetricsResources().all().get()
+ results.size() == 1 && results.iterator().next().equals(new ClientMetricsResourceListing(name))
+ }, "metadata timeout")
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("quorum=kraft"))
+ @Timeout(30)
+ def testListClientMetricsResourcesTimeoutMs(ignored: String): Unit = {
+ client = createInvalidAdminClient()
+ try {
+ val timeoutOption = new ListClientMetricsResourcesOptions().timeoutMs(0)
+ val exception = assertThrows(classOf[ExecutionException], () =>
+ client.listClientMetricsResources(timeoutOption).all().get())
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ } finally client.close(time.Duration.ZERO)
+ }
+
/**
* Test that createTopics returns the dynamic configurations of the topics that were created.
*