KAFKA-18904: [4/N] Add ListClientMetricsResources metric if request is v0 ListConfigResources (#19877)

Before 4.1, the api key 74 is `ListClientMetricsResources`. After 4.1,
it's `ListConfigResources`. If users sent a v0 ListConfigResources to
broker, the metric doesn't record request with
`ListClientMetricsResources`. This PR is to add
`ListClientMetricsResources` metric if the request is v0
`ListConfigResources`.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>
This commit is contained in:
PoAn Yang 2025-06-03 02:04:04 +08:00 committed by GitHub
parent d1f41ef011
commit 78ea8782ae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 56 additions and 30 deletions

View File

@ -227,6 +227,8 @@ object RequestChannel extends Logging {
Seq(specifiedMetricName, header.apiKey.name)
} else if (header.apiKey == ApiKeys.ADD_PARTITIONS_TO_TXN && body[AddPartitionsToTxnRequest].allVerifyOnlyRequest) {
Seq(RequestMetrics.VERIFY_PARTITIONS_IN_TXN_METRIC_NAME)
} else if (header.apiKey == ApiKeys.LIST_CONFIG_RESOURCES && header.apiVersion == 0) {
Seq(RequestMetrics.LIST_CLIENT_METRICS_RESOURCES_METRIC_NAME, header.apiKey.name)
} else {
Seq(header.apiKey.name)
}

View File

@ -11186,47 +11186,58 @@ class KafkaApisTest extends Logging {
@Test
def testListConfigResourcesV0(): Unit = {
val request = buildRequest(new ListConfigResourcesRequest.Builder(
new ListConfigResourcesRequestData().setResourceTypes(util.List.of(ConfigResource.Type.CLIENT_METRICS.id))).build(0))
metadataCache = mock(classOf[KRaftMetadataCache])
val requestMetrics = new RequestChannelMetrics(util.Set.of(ApiKeys.LIST_CONFIG_RESOURCES))
try {
val request = buildRequest(new ListConfigResourcesRequest.Builder(
new ListConfigResourcesRequestData().setResourceTypes(util.List.of(ConfigResource.Type.CLIENT_METRICS.id))).build(0),
requestMetrics = requestMetrics)
metadataCache = mock(classOf[KRaftMetadataCache])
val resources = util.Set.of("client-metric1", "client-metric2")
when(clientMetricsManager.listClientMetricsResources).thenReturn(resources)
val resources = util.Set.of("client-metric1", "client-metric2")
when(clientMetricsManager.listClientMetricsResources).thenReturn(resources)
kafkaApis = createKafkaApis()
kafkaApis.handle(request, RequestLocal.noCaching)
val response = verifyNoThrottling[ListConfigResourcesResponse](request)
val expectedResponseData = new ListConfigResourcesResponseData()
kafkaApis = createKafkaApis()
kafkaApis.handle(request, RequestLocal.noCaching)
val response = verifyNoThrottlingAndUpdateMetrics[ListConfigResourcesResponse](request)
val expectedResponseData = new ListConfigResourcesResponseData()
.setConfigResources(
resources.stream.map(resource =>
new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource)
).collect(util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource]))
assertEquals(expectedResponseData, response.data)
assertEquals(expectedResponseData, response.data)
verify(metadataCache, never).getAllTopics
verify(groupConfigManager, never).groupIds
verify(metadataCache, never).getBrokerNodes(any)
verify(metadataCache, never).getAllTopics
verify(groupConfigManager, never).groupIds
verify(metadataCache, never).getBrokerNodes(any)
assertTrue(requestMetrics.apply(ApiKeys.LIST_CONFIG_RESOURCES.name).requestQueueTimeHist.count > 0)
assertTrue(requestMetrics.apply(RequestMetrics.LIST_CLIENT_METRICS_RESOURCES_METRIC_NAME).requestQueueTimeHist.count > 0)
} finally {
requestMetrics.close()
}
}
@Test
def testListConfigResourcesV1WithEmptyResourceTypes(): Unit = {
val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()).build(1))
metadataCache = mock(classOf[KRaftMetadataCache])
val requestMetrics = new RequestChannelMetrics(util.Set.of(ApiKeys.LIST_CONFIG_RESOURCES))
try {
val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()).build(1),
requestMetrics = requestMetrics)
metadataCache = mock(classOf[KRaftMetadataCache])
val clientMetrics = util.Set.of("client-metric1", "client-metric2")
val topics = util.Set.of("topic1", "topic2")
val groupIds = util.List.of("group1", "group2")
val nodeIds = util.List.of(1, 2)
when(clientMetricsManager.listClientMetricsResources).thenReturn(clientMetrics)
when(metadataCache.getAllTopics).thenReturn(topics)
when(groupConfigManager.groupIds).thenReturn(groupIds)
when(metadataCache.getBrokerNodes(any())).thenReturn(
nodeIds.stream().map(id => new Node(id, "localhost", 1234)).collect(java.util.stream.Collectors.toList()))
val clientMetrics = util.Set.of("client-metric1", "client-metric2")
val topics = util.Set.of("topic1", "topic2")
val groupIds = util.List.of("group1", "group2")
val nodeIds = util.List.of(1, 2)
when(clientMetricsManager.listClientMetricsResources).thenReturn(clientMetrics)
when(metadataCache.getAllTopics).thenReturn(topics)
when(groupConfigManager.groupIds).thenReturn(groupIds)
when(metadataCache.getBrokerNodes(any())).thenReturn(
nodeIds.stream().map(id => new Node(id, "localhost", 1234)).collect(java.util.stream.Collectors.toList()))
kafkaApis = createKafkaApis()
kafkaApis.handle(request, RequestLocal.noCaching)
val response = verifyNoThrottling[ListConfigResourcesResponse](request)
val expectedResponseData = new ListConfigResourcesResponseData()
kafkaApis = createKafkaApis()
kafkaApis.handle(request, RequestLocal.noCaching)
val response = verifyNoThrottlingAndUpdateMetrics[ListConfigResourcesResponse](request)
val expectedResponseData = new ListConfigResourcesResponseData()
.setConfigResources(
util.stream.Stream.of(
groupIds.stream().map(resource =>
@ -11245,7 +11256,12 @@ class KafkaApisTest extends Logging {
new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.TOPIC.id)
).toList
).flatMap(s => s.stream).collect(util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource]))
assertEquals(expectedResponseData, response.data)
assertEquals(expectedResponseData, response.data)
assertTrue(requestMetrics.apply(ApiKeys.LIST_CONFIG_RESOURCES.name).requestQueueTimeHist.count > 0)
assertEquals(0, requestMetrics.apply(RequestMetrics.LIST_CLIENT_METRICS_RESOURCES_METRIC_NAME).requestQueueTimeHist.count)
} finally {
requestMetrics.close()
}
}
@Test

View File

@ -34,7 +34,12 @@ public class RequestChannelMetrics {
for (ApiKeys apiKey : enabledApis) {
metricsMap.put(apiKey.name, new RequestMetrics(apiKey.name));
}
for (String name : Arrays.asList(RequestMetrics.CONSUMER_FETCH_METRIC_NAME, RequestMetrics.FOLLOW_FETCH_METRIC_NAME, RequestMetrics.VERIFY_PARTITIONS_IN_TXN_METRIC_NAME)) {
for (String name : Arrays.asList(
RequestMetrics.CONSUMER_FETCH_METRIC_NAME,
RequestMetrics.FOLLOW_FETCH_METRIC_NAME,
RequestMetrics.VERIFY_PARTITIONS_IN_TXN_METRIC_NAME,
RequestMetrics.LIST_CLIENT_METRICS_RESOURCES_METRIC_NAME
)) {
metricsMap.put(name, new RequestMetrics(name));
}
}

View File

@ -38,6 +38,9 @@ public class RequestMetrics {
public static final String CONSUMER_FETCH_METRIC_NAME = ApiKeys.FETCH.name + "Consumer";
public static final String FOLLOW_FETCH_METRIC_NAME = ApiKeys.FETCH.name + "Follower";
public static final String VERIFY_PARTITIONS_IN_TXN_METRIC_NAME = ApiKeys.ADD_PARTITIONS_TO_TXN.name + "Verification";
// The ListClientMetricsResourcesRequest (v0) is renamed to ListConfigResourcesRequest (v1) in 4.1.
// To record correct request name, we keep the old name for v0 and use the new name for v1+.
public static final String LIST_CLIENT_METRICS_RESOURCES_METRIC_NAME = "ListClientMetricsResources";
public static final String REQUESTS_PER_SEC = "RequestsPerSec";
public static final String DEPRECATED_REQUESTS_PER_SEC = "DeprecatedRequestsPerSec";
public static final String MESSAGE_CONVERSIONS_TIME_MS = "MessageConversionsTimeMs";