diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index dfb96ef8e11..473ab172a09 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -227,6 +227,8 @@ object RequestChannel extends Logging { Seq(specifiedMetricName, header.apiKey.name) } else if (header.apiKey == ApiKeys.ADD_PARTITIONS_TO_TXN && body[AddPartitionsToTxnRequest].allVerifyOnlyRequest) { Seq(RequestMetrics.VERIFY_PARTITIONS_IN_TXN_METRIC_NAME) + } else if (header.apiKey == ApiKeys.LIST_CONFIG_RESOURCES && header.apiVersion == 0) { + Seq(RequestMetrics.LIST_CLIENT_METRICS_RESOURCES_METRIC_NAME, header.apiKey.name) } else { Seq(header.apiKey.name) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index cd0dd12d30e..cb542418c92 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -11186,47 +11186,58 @@ class KafkaApisTest extends Logging { @Test def testListConfigResourcesV0(): Unit = { - val request = buildRequest(new ListConfigResourcesRequest.Builder( - new ListConfigResourcesRequestData().setResourceTypes(util.List.of(ConfigResource.Type.CLIENT_METRICS.id))).build(0)) - metadataCache = mock(classOf[KRaftMetadataCache]) + val requestMetrics = new RequestChannelMetrics(util.Set.of(ApiKeys.LIST_CONFIG_RESOURCES)) + try { + val request = buildRequest(new ListConfigResourcesRequest.Builder( + new ListConfigResourcesRequestData().setResourceTypes(util.List.of(ConfigResource.Type.CLIENT_METRICS.id))).build(0), + requestMetrics = requestMetrics) + metadataCache = mock(classOf[KRaftMetadataCache]) - val resources = util.Set.of("client-metric1", "client-metric2") - when(clientMetricsManager.listClientMetricsResources).thenReturn(resources) + val resources = util.Set.of("client-metric1", "client-metric2") + when(clientMetricsManager.listClientMetricsResources).thenReturn(resources) - kafkaApis = createKafkaApis() - kafkaApis.handle(request, RequestLocal.noCaching) - val response = verifyNoThrottling[ListConfigResourcesResponse](request) - val expectedResponseData = new ListConfigResourcesResponseData() + kafkaApis = createKafkaApis() + kafkaApis.handle(request, RequestLocal.noCaching) + val response = verifyNoThrottlingAndUpdateMetrics[ListConfigResourcesResponse](request) + val expectedResponseData = new ListConfigResourcesResponseData() .setConfigResources( resources.stream.map(resource => new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource) ).collect(util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource])) - assertEquals(expectedResponseData, response.data) + assertEquals(expectedResponseData, response.data) - verify(metadataCache, never).getAllTopics - verify(groupConfigManager, never).groupIds - verify(metadataCache, never).getBrokerNodes(any) + verify(metadataCache, never).getAllTopics + verify(groupConfigManager, never).groupIds + verify(metadataCache, never).getBrokerNodes(any) + assertTrue(requestMetrics.apply(ApiKeys.LIST_CONFIG_RESOURCES.name).requestQueueTimeHist.count > 0) + assertTrue(requestMetrics.apply(RequestMetrics.LIST_CLIENT_METRICS_RESOURCES_METRIC_NAME).requestQueueTimeHist.count > 0) + } finally { + requestMetrics.close() + } } @Test def testListConfigResourcesV1WithEmptyResourceTypes(): Unit = { - val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()).build(1)) - metadataCache = mock(classOf[KRaftMetadataCache]) + val requestMetrics = new RequestChannelMetrics(util.Set.of(ApiKeys.LIST_CONFIG_RESOURCES)) + try { + val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()).build(1), + requestMetrics = requestMetrics) + metadataCache = mock(classOf[KRaftMetadataCache]) - val clientMetrics = util.Set.of("client-metric1", "client-metric2") - val topics = util.Set.of("topic1", "topic2") - val groupIds = util.List.of("group1", "group2") - val nodeIds = util.List.of(1, 2) - when(clientMetricsManager.listClientMetricsResources).thenReturn(clientMetrics) - when(metadataCache.getAllTopics).thenReturn(topics) - when(groupConfigManager.groupIds).thenReturn(groupIds) - when(metadataCache.getBrokerNodes(any())).thenReturn( - nodeIds.stream().map(id => new Node(id, "localhost", 1234)).collect(java.util.stream.Collectors.toList())) + val clientMetrics = util.Set.of("client-metric1", "client-metric2") + val topics = util.Set.of("topic1", "topic2") + val groupIds = util.List.of("group1", "group2") + val nodeIds = util.List.of(1, 2) + when(clientMetricsManager.listClientMetricsResources).thenReturn(clientMetrics) + when(metadataCache.getAllTopics).thenReturn(topics) + when(groupConfigManager.groupIds).thenReturn(groupIds) + when(metadataCache.getBrokerNodes(any())).thenReturn( + nodeIds.stream().map(id => new Node(id, "localhost", 1234)).collect(java.util.stream.Collectors.toList())) - kafkaApis = createKafkaApis() - kafkaApis.handle(request, RequestLocal.noCaching) - val response = verifyNoThrottling[ListConfigResourcesResponse](request) - val expectedResponseData = new ListConfigResourcesResponseData() + kafkaApis = createKafkaApis() + kafkaApis.handle(request, RequestLocal.noCaching) + val response = verifyNoThrottlingAndUpdateMetrics[ListConfigResourcesResponse](request) + val expectedResponseData = new ListConfigResourcesResponseData() .setConfigResources( util.stream.Stream.of( groupIds.stream().map(resource => @@ -11245,7 +11256,12 @@ class KafkaApisTest extends Logging { new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.TOPIC.id) ).toList ).flatMap(s => s.stream).collect(util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource])) - assertEquals(expectedResponseData, response.data) + assertEquals(expectedResponseData, response.data) + assertTrue(requestMetrics.apply(ApiKeys.LIST_CONFIG_RESOURCES.name).requestQueueTimeHist.count > 0) + assertEquals(0, requestMetrics.apply(RequestMetrics.LIST_CLIENT_METRICS_RESOURCES_METRIC_NAME).requestQueueTimeHist.count) + } finally { + requestMetrics.close() + } } @Test diff --git a/server/src/main/java/org/apache/kafka/network/metrics/RequestChannelMetrics.java b/server/src/main/java/org/apache/kafka/network/metrics/RequestChannelMetrics.java index 4c63df4fbea..316d7f44a71 100644 --- a/server/src/main/java/org/apache/kafka/network/metrics/RequestChannelMetrics.java +++ b/server/src/main/java/org/apache/kafka/network/metrics/RequestChannelMetrics.java @@ -34,7 +34,12 @@ public class RequestChannelMetrics { for (ApiKeys apiKey : enabledApis) { metricsMap.put(apiKey.name, new RequestMetrics(apiKey.name)); } - for (String name : Arrays.asList(RequestMetrics.CONSUMER_FETCH_METRIC_NAME, RequestMetrics.FOLLOW_FETCH_METRIC_NAME, RequestMetrics.VERIFY_PARTITIONS_IN_TXN_METRIC_NAME)) { + for (String name : Arrays.asList( + RequestMetrics.CONSUMER_FETCH_METRIC_NAME, + RequestMetrics.FOLLOW_FETCH_METRIC_NAME, + RequestMetrics.VERIFY_PARTITIONS_IN_TXN_METRIC_NAME, + RequestMetrics.LIST_CLIENT_METRICS_RESOURCES_METRIC_NAME + )) { metricsMap.put(name, new RequestMetrics(name)); } } diff --git a/server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java b/server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java index 03d8d908ee5..f7cba040c26 100644 --- a/server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java +++ b/server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java @@ -38,6 +38,9 @@ public class RequestMetrics { public static final String CONSUMER_FETCH_METRIC_NAME = ApiKeys.FETCH.name + "Consumer"; public static final String FOLLOW_FETCH_METRIC_NAME = ApiKeys.FETCH.name + "Follower"; public static final String VERIFY_PARTITIONS_IN_TXN_METRIC_NAME = ApiKeys.ADD_PARTITIONS_TO_TXN.name + "Verification"; + // The ListClientMetricsResourcesRequest (v0) is renamed to ListConfigResourcesRequest (v1) in 4.1. + // To record correct request name, we keep the old name for v0 and use the new name for v1+. + public static final String LIST_CLIENT_METRICS_RESOURCES_METRIC_NAME = "ListClientMetricsResources"; public static final String REQUESTS_PER_SEC = "RequestsPerSec"; public static final String DEPRECATED_REQUESTS_PER_SEC = "DeprecatedRequestsPerSec"; public static final String MESSAGE_CONVERSIONS_TIME_MS = "MessageConversionsTimeMs";