From c26b09c6092837c47795d645526d3d098d0bd07e Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Thu, 15 May 2025 17:39:00 -0500 Subject: [PATCH] KAFKA-18904: [1/N] Change ListClientMetricsResources API to ListConfigResources (#19493) * Change `ListClientMetricsResourcesRequest.json` to `ListConfigResourcesRequest.json`. * Change `ListClientMetricsResourcesResponse.json` to `ListConfigResourcesResponse.json`. * Change `ListClientMetricsResourcesRequest.java` to `ListConfigResourcesRequest.java`. * Change `ListClientMetricsResourcesResponse.java` to `ListConfigResourcesResponsejava`. * Change `KafkaApis` to handle both `ListClientMetricsResourcesRequest` v0 and v1 requests. Reviewers: Andrew Schofield --------- Signed-off-by: PoAn Yang --- .../kafka/clients/admin/KafkaAdminClient.java | 15 +- .../apache/kafka/common/protocol/ApiKeys.java | 2 +- .../common/requests/AbstractRequest.java | 4 +- .../common/requests/AbstractResponse.java | 4 +- .../ListClientMetricsResourcesRequest.java | 75 ------ .../requests/ListConfigResourcesRequest.java | 95 +++++++ ....java => ListConfigResourcesResponse.java} | 35 ++- ...t.json => ListConfigResourcesRequest.json} | 9 +- ....json => ListConfigResourcesResponse.json} | 16 +- .../clients/admin/KafkaAdminClientTest.java | 44 ++-- .../common/requests/RequestResponseTest.java | 18 +- .../main/scala/kafka/server/KafkaApis.scala | 62 ++++- .../unit/kafka/server/KafkaApisTest.scala | 233 ++++++++++++++++-- .../unit/kafka/server/RequestQuotaTest.scala | 4 +- .../coordinator/group/GroupConfigManager.java | 5 + .../kafka/network/RequestConvertToJson.java | 16 +- 16 files changed, 463 insertions(+), 174 deletions(-) delete mode 100644 clients/src/main/java/org/apache/kafka/common/requests/ListClientMetricsResourcesRequest.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesRequest.java rename clients/src/main/java/org/apache/kafka/common/requests/{ListClientMetricsResourcesResponse.java => ListConfigResourcesResponse.java} (64%) rename clients/src/main/resources/common/message/{ListClientMetricsResourcesRequest.json => ListConfigResourcesRequest.json} (64%) rename clients/src/main/resources/common/message/{ListClientMetricsResourcesResponse.json => ListConfigResourcesResponse.json} (66%) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 87a951fe26e..0abad889c85 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -159,7 +159,7 @@ import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.U import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData; import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; -import org.apache.kafka.common.message.ListClientMetricsResourcesRequestData; +import org.apache.kafka.common.message.ListConfigResourcesRequestData; import org.apache.kafka.common.message.ListGroupsRequestData; import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; @@ -233,8 +233,8 @@ import org.apache.kafka.common.requests.ExpireDelegationTokenResponse; import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest; import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse; import org.apache.kafka.common.requests.JoinGroupRequest; -import org.apache.kafka.common.requests.ListClientMetricsResourcesRequest; -import org.apache.kafka.common.requests.ListClientMetricsResourcesResponse; +import org.apache.kafka.common.requests.ListConfigResourcesRequest; +import org.apache.kafka.common.requests.ListConfigResourcesResponse; import org.apache.kafka.common.requests.ListGroupsRequest; import org.apache.kafka.common.requests.ListGroupsResponse; import org.apache.kafka.common.requests.ListOffsetsRequest; @@ -4887,13 +4887,16 @@ public class KafkaAdminClient extends AdminClient { new LeastLoadedNodeProvider()) { @Override - ListClientMetricsResourcesRequest.Builder createRequest(int timeoutMs) { - return new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData()); + ListConfigResourcesRequest.Builder createRequest(int timeoutMs) { + return new ListConfigResourcesRequest.Builder( + new ListConfigResourcesRequestData() + .setResourceTypes(List.of(ConfigResource.Type.CLIENT_METRICS.id())) + ); } @Override void handleResponse(AbstractResponse abstractResponse) { - ListClientMetricsResourcesResponse response = (ListClientMetricsResourcesResponse) abstractResponse; + ListConfigResourcesResponse response = (ListConfigResourcesResponse) abstractResponse; if (response.error().isFailure()) { future.completeExceptionally(response.error().exception()); } else { diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 8534561af8b..89b952e6ce7 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -116,7 +116,7 @@ public enum ApiKeys { GET_TELEMETRY_SUBSCRIPTIONS(ApiMessageType.GET_TELEMETRY_SUBSCRIPTIONS), PUSH_TELEMETRY(ApiMessageType.PUSH_TELEMETRY), ASSIGN_REPLICAS_TO_DIRS(ApiMessageType.ASSIGN_REPLICAS_TO_DIRS), - LIST_CLIENT_METRICS_RESOURCES(ApiMessageType.LIST_CLIENT_METRICS_RESOURCES), + LIST_CONFIG_RESOURCES(ApiMessageType.LIST_CONFIG_RESOURCES), DESCRIBE_TOPIC_PARTITIONS(ApiMessageType.DESCRIBE_TOPIC_PARTITIONS), SHARE_GROUP_HEARTBEAT(ApiMessageType.SHARE_GROUP_HEARTBEAT), SHARE_GROUP_DESCRIBE(ApiMessageType.SHARE_GROUP_DESCRIBE), diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 9738711e73a..750de2050f4 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -316,8 +316,8 @@ public abstract class AbstractRequest implements AbstractRequestResponse { return PushTelemetryRequest.parse(readable, apiVersion); case ASSIGN_REPLICAS_TO_DIRS: return AssignReplicasToDirsRequest.parse(readable, apiVersion); - case LIST_CLIENT_METRICS_RESOURCES: - return ListClientMetricsResourcesRequest.parse(readable, apiVersion); + case LIST_CONFIG_RESOURCES: + return ListConfigResourcesRequest.parse(readable, apiVersion); case DESCRIBE_TOPIC_PARTITIONS: return DescribeTopicPartitionsRequest.parse(readable, apiVersion); case SHARE_GROUP_HEARTBEAT: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 01cc03c12b6..bc313078d74 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -253,8 +253,8 @@ public abstract class AbstractResponse implements AbstractRequestResponse { return PushTelemetryResponse.parse(readable, version); case ASSIGN_REPLICAS_TO_DIRS: return AssignReplicasToDirsResponse.parse(readable, version); - case LIST_CLIENT_METRICS_RESOURCES: - return ListClientMetricsResourcesResponse.parse(readable, version); + case LIST_CONFIG_RESOURCES: + return ListConfigResourcesResponse.parse(readable, version); case DESCRIBE_TOPIC_PARTITIONS: return DescribeTopicPartitionsResponse.parse(readable, version); case SHARE_GROUP_HEARTBEAT: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListClientMetricsResourcesRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListClientMetricsResourcesRequest.java deleted file mode 100644 index 417740d0ffa..00000000000 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListClientMetricsResourcesRequest.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.requests; - -import org.apache.kafka.common.message.ListClientMetricsResourcesRequestData; -import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.Readable; - -public class ListClientMetricsResourcesRequest extends AbstractRequest { - public static class Builder extends AbstractRequest.Builder { - public final ListClientMetricsResourcesRequestData data; - - public Builder(ListClientMetricsResourcesRequestData data) { - super(ApiKeys.LIST_CLIENT_METRICS_RESOURCES); - this.data = data; - } - - @Override - public ListClientMetricsResourcesRequest build(short version) { - return new ListClientMetricsResourcesRequest(data, version); - } - - @Override - public String toString() { - return data.toString(); - } - } - - private final ListClientMetricsResourcesRequestData data; - - private ListClientMetricsResourcesRequest(ListClientMetricsResourcesRequestData data, short version) { - super(ApiKeys.LIST_CLIENT_METRICS_RESOURCES, version); - this.data = data; - } - - public ListClientMetricsResourcesRequestData data() { - return data; - } - - @Override - public ListClientMetricsResourcesResponse getErrorResponse(int throttleTimeMs, Throwable e) { - Errors error = Errors.forException(e); - ListClientMetricsResourcesResponseData response = new ListClientMetricsResourcesResponseData() - .setErrorCode(error.code()) - .setThrottleTimeMs(throttleTimeMs); - return new ListClientMetricsResourcesResponse(response); - } - - public static ListClientMetricsResourcesRequest parse(Readable readable, short version) { - return new ListClientMetricsResourcesRequest(new ListClientMetricsResourcesRequestData( - readable, version), version); - } - - @Override - public String toString(boolean verbose) { - return data.toString(); - } - -} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesRequest.java new file mode 100644 index 00000000000..3af70938843 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesRequest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.message.ListConfigResourcesRequestData; +import org.apache.kafka.common.message.ListConfigResourcesResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.Readable; + +import java.util.Set; + +public class ListConfigResourcesRequest extends AbstractRequest { + public static class Builder extends AbstractRequest.Builder { + public final ListConfigResourcesRequestData data; + + public Builder(ListConfigResourcesRequestData data) { + super(ApiKeys.LIST_CONFIG_RESOURCES); + this.data = data; + } + + @Override + public ListConfigResourcesRequest build(short version) { + return new ListConfigResourcesRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final ListConfigResourcesRequestData data; + + private ListConfigResourcesRequest(ListConfigResourcesRequestData data, short version) { + super(ApiKeys.LIST_CONFIG_RESOURCES, version); + this.data = data; + } + + public ListConfigResourcesRequestData data() { + return data; + } + + @Override + public ListConfigResourcesResponse getErrorResponse(int throttleTimeMs, Throwable e) { + Errors error = Errors.forException(e); + ListConfigResourcesResponseData response = new ListConfigResourcesResponseData() + .setErrorCode(error.code()) + .setThrottleTimeMs(throttleTimeMs); + return new ListConfigResourcesResponse(response); + } + + public static ListConfigResourcesRequest parse(Readable readable, short version) { + return new ListConfigResourcesRequest(new ListConfigResourcesRequestData( + readable, version), version); + } + + @Override + public String toString(boolean verbose) { + return data.toString(); + } + + /** + * Return the supported config resource types in different request version. + * If there is a new config resource type, the ListConfigResourcesRequest should bump a new request version to include it. + * For v0, the supported config resource types contain CLIENT_METRICS (16). + * For v1, the supported config resource types contain TOPIC (2), BROKER (4), BROKER_LOGGER (8), CLIENT_METRICS (16), and GROUP (32). + */ + public Set supportedResourceTypes() { + return version() == 0 ? + Set.of(ConfigResource.Type.CLIENT_METRICS.id()) : + Set.of( + ConfigResource.Type.TOPIC.id(), + ConfigResource.Type.BROKER.id(), + ConfigResource.Type.BROKER_LOGGER.id(), + ConfigResource.Type.CLIENT_METRICS.id(), + ConfigResource.Type.GROUP.id() + ); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListClientMetricsResourcesResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesResponse.java similarity index 64% rename from clients/src/main/java/org/apache/kafka/common/requests/ListClientMetricsResourcesResponse.java rename to clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesResponse.java index c2b3b1601ed..36a4a807f7f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListClientMetricsResourcesResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesResponse.java @@ -17,7 +17,8 @@ package org.apache.kafka.common.requests; import org.apache.kafka.clients.admin.ClientMetricsResourceListing; -import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.message.ListConfigResourcesResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; @@ -26,15 +27,15 @@ import java.util.Collection; import java.util.Map; import java.util.stream.Collectors; -public class ListClientMetricsResourcesResponse extends AbstractResponse { - private final ListClientMetricsResourcesResponseData data; +public class ListConfigResourcesResponse extends AbstractResponse { + private final ListConfigResourcesResponseData data; - public ListClientMetricsResourcesResponse(ListClientMetricsResourcesResponseData data) { - super(ApiKeys.LIST_CLIENT_METRICS_RESOURCES); + public ListConfigResourcesResponse(ListConfigResourcesResponseData data) { + super(ApiKeys.LIST_CONFIG_RESOURCES); this.data = data; } - public ListClientMetricsResourcesResponseData data() { + public ListConfigResourcesResponseData data() { return data; } @@ -47,8 +48,8 @@ public class ListClientMetricsResourcesResponse extends AbstractResponse { return errorCounts(Errors.forCode(data.errorCode())); } - public static ListClientMetricsResourcesResponse parse(Readable readable, short version) { - return new ListClientMetricsResourcesResponse(new ListClientMetricsResourcesResponseData( + public static ListConfigResourcesResponse parse(Readable readable, short version) { + return new ListConfigResourcesResponse(new ListConfigResourcesResponseData( readable, version)); } @@ -67,10 +68,22 @@ public class ListClientMetricsResourcesResponse extends AbstractResponse { data.setThrottleTimeMs(throttleTimeMs); } - public Collection clientMetricsResources() { - return data.clientMetricsResources() + public Collection configResources() { + return data.configResources() .stream() - .map(entry -> new ClientMetricsResourceListing(entry.name())) + .map(entry -> + new ConfigResource( + ConfigResource.Type.forId(entry.resourceType()), + entry.resourceName() + ) + ).collect(Collectors.toList()); + } + + public Collection clientMetricsResources() { + return data.configResources() + .stream() + .filter(entry -> entry.resourceType() == ConfigResource.Type.CLIENT_METRICS.id()) + .map(entry -> new ClientMetricsResourceListing(entry.resourceName())) .collect(Collectors.toList()); } } diff --git a/clients/src/main/resources/common/message/ListClientMetricsResourcesRequest.json b/clients/src/main/resources/common/message/ListConfigResourcesRequest.json similarity index 64% rename from clients/src/main/resources/common/message/ListClientMetricsResourcesRequest.json rename to clients/src/main/resources/common/message/ListConfigResourcesRequest.json index b54dce6b7c7..c4b858a7150 100644 --- a/clients/src/main/resources/common/message/ListClientMetricsResourcesRequest.json +++ b/clients/src/main/resources/common/message/ListConfigResourcesRequest.json @@ -17,10 +17,15 @@ "apiKey": 74, "type": "request", "listeners": ["broker"], - "name": "ListClientMetricsResourcesRequest", - "validVersions": "0", + "name": "ListConfigResourcesRequest", + // Version 0 is used as ListClientMetricsResourcesRequest which only lists client metrics resources. + // Version 1 adds ResourceTypes field (KIP-1142). If there is no specified ResourceTypes, it should return all configuration resources. + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ + { "name": "ResourceTypes", "type": "[]int8", "versions": "1+", + "about": "The list of resource type. If the list is empty, it uses default supported config resource types." + } ] } \ No newline at end of file diff --git a/clients/src/main/resources/common/message/ListClientMetricsResourcesResponse.json b/clients/src/main/resources/common/message/ListConfigResourcesResponse.json similarity index 66% rename from clients/src/main/resources/common/message/ListClientMetricsResourcesResponse.json rename to clients/src/main/resources/common/message/ListConfigResourcesResponse.json index 281781c7627..8a2dbdf5a30 100644 --- a/clients/src/main/resources/common/message/ListClientMetricsResourcesResponse.json +++ b/clients/src/main/resources/common/message/ListConfigResourcesResponse.json @@ -16,18 +16,22 @@ { "apiKey": 74, "type": "response", - "name": "ListClientMetricsResourcesResponse", - "validVersions": "0", + "name": "ListConfigResourcesResponse", + // Version 0 is used as ListClientMetricsResourcesResponse which returns all client metrics resources. + // Version 1 adds ResourceType to ConfigResources (KIP-1142). + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, - { "name": "ClientMetricsResources", "type": "[]ClientMetricsResource", "versions": "0+", - "about": "Each client metrics resource in the response.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", - "about": "The resource name." } + { "name": "ConfigResources", "type": "[]ConfigResource", "versions": "0+", + "about": "Each config resource in the response.", "fields": [ + { "name": "ResourceName", "type": "string", "versions": "0+", + "about": "The resource name." }, + { "name": "ResourceType", "type": "int8", "versions": "1+", "ignorable": true, "default": 16, + "about": "The resource type." } ]} ] } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 3df76fffd36..83356b68ff5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -141,7 +141,7 @@ import org.apache.kafka.common.message.LeaveGroupRequestData; import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; import org.apache.kafka.common.message.LeaveGroupResponseData; import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse; -import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData; +import org.apache.kafka.common.message.ListConfigResourcesResponseData; import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ListGroupsResponseData.ListedGroup; import org.apache.kafka.common.message.ListOffsetsResponseData; @@ -225,8 +225,8 @@ import org.apache.kafka.common.requests.InitProducerIdResponse; import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.LeaveGroupRequest; import org.apache.kafka.common.requests.LeaveGroupResponse; -import org.apache.kafka.common.requests.ListClientMetricsResourcesRequest; -import org.apache.kafka.common.requests.ListClientMetricsResourcesResponse; +import org.apache.kafka.common.requests.ListConfigResourcesRequest; +import org.apache.kafka.common.requests.ListConfigResourcesResponse; import org.apache.kafka.common.requests.ListGroupsRequest; import org.apache.kafka.common.requests.ListGroupsResponse; import org.apache.kafka.common.requests.ListOffsetsRequest; @@ -10672,17 +10672,25 @@ public class KafkaAdminClientTest { new ClientMetricsResourceListing("two") ); - ListClientMetricsResourcesResponseData responseData = - new ListClientMetricsResourcesResponseData().setErrorCode(Errors.NONE.code()); + ListConfigResourcesResponseData responseData = + new ListConfigResourcesResponseData().setErrorCode(Errors.NONE.code()); - responseData.clientMetricsResources() - .add(new ListClientMetricsResourcesResponseData.ClientMetricsResource().setName("one")); - responseData.clientMetricsResources() - .add((new ListClientMetricsResourcesResponseData.ClientMetricsResource()).setName("two")); + responseData.configResources() + .add(new ListConfigResourcesResponseData + .ConfigResource() + .setResourceName("one") + .setResourceType(ConfigResource.Type.CLIENT_METRICS.id()) + ); + responseData.configResources() + .add(new ListConfigResourcesResponseData + .ConfigResource() + .setResourceName("two") + .setResourceType(ConfigResource.Type.CLIENT_METRICS.id()) + ); env.kafkaClient().prepareResponse( - request -> request instanceof ListClientMetricsResourcesRequest, - new ListClientMetricsResourcesResponse(responseData)); + request -> request instanceof ListConfigResourcesRequest, + new ListConfigResourcesResponse(responseData)); ListClientMetricsResourcesResult result = env.adminClient().listClientMetricsResources(); assertEquals(new HashSet<>(expected), new HashSet<>(result.all().get())); @@ -10694,12 +10702,12 @@ public class KafkaAdminClientTest { try (AdminClientUnitTestEnv env = mockClientEnv()) { List expected = Collections.emptyList(); - ListClientMetricsResourcesResponseData responseData = - new ListClientMetricsResourcesResponseData().setErrorCode(Errors.NONE.code()); + ListConfigResourcesResponseData responseData = + new ListConfigResourcesResponseData().setErrorCode(Errors.NONE.code()); env.kafkaClient().prepareResponse( - request -> request instanceof ListClientMetricsResourcesRequest, - new ListClientMetricsResourcesResponse(responseData)); + request -> request instanceof ListConfigResourcesRequest, + new ListConfigResourcesResponse(responseData)); ListClientMetricsResourcesResult result = env.adminClient().listClientMetricsResources(); assertEquals(new HashSet<>(expected), new HashSet<>(result.all().get())); @@ -10710,7 +10718,7 @@ public class KafkaAdminClientTest { public void testListClientMetricsResourcesNotSupported() { try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().prepareResponse( - request -> request instanceof ListClientMetricsResourcesRequest, + request -> request instanceof ListConfigResourcesRequest, prepareListClientMetricsResourcesResponse(Errors.UNSUPPORTED_VERSION)); ListClientMetricsResourcesResult result = env.adminClient().listClientMetricsResources(); @@ -10776,8 +10784,8 @@ public class KafkaAdminClientTest { } } - private static ListClientMetricsResourcesResponse prepareListClientMetricsResourcesResponse(Errors error) { - return new ListClientMetricsResourcesResponse(new ListClientMetricsResourcesResponseData() + private static ListConfigResourcesResponse prepareListClientMetricsResourcesResponse(Errors error) { + return new ListConfigResourcesResponse(new ListConfigResourcesResponseData() .setErrorCode(error.code())); } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index e7f503f6ceb..aa50f9db018 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -175,8 +175,8 @@ import org.apache.kafka.common.message.JoinGroupResponseData; import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember; import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; import org.apache.kafka.common.message.LeaveGroupResponseData; -import org.apache.kafka.common.message.ListClientMetricsResourcesRequestData; -import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData; +import org.apache.kafka.common.message.ListConfigResourcesRequestData; +import org.apache.kafka.common.message.ListConfigResourcesResponseData; import org.apache.kafka.common.message.ListGroupsRequestData; import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition; @@ -1052,7 +1052,7 @@ public class RequestResponseTest { case GET_TELEMETRY_SUBSCRIPTIONS: return createGetTelemetrySubscriptionsRequest(version); case PUSH_TELEMETRY: return createPushTelemetryRequest(version); case ASSIGN_REPLICAS_TO_DIRS: return createAssignReplicasToDirsRequest(version); - case LIST_CLIENT_METRICS_RESOURCES: return createListClientMetricsResourcesRequest(version); + case LIST_CONFIG_RESOURCES: return createListConfigResourcesRequest(version); case DESCRIBE_TOPIC_PARTITIONS: return createDescribeTopicPartitionsRequest(version); case SHARE_GROUP_HEARTBEAT: return createShareGroupHeartbeatRequest(version); case SHARE_GROUP_DESCRIBE: return createShareGroupDescribeRequest(version); @@ -1147,7 +1147,7 @@ public class RequestResponseTest { case GET_TELEMETRY_SUBSCRIPTIONS: return createGetTelemetrySubscriptionsResponse(); case PUSH_TELEMETRY: return createPushTelemetryResponse(); case ASSIGN_REPLICAS_TO_DIRS: return createAssignReplicasToDirsResponse(); - case LIST_CLIENT_METRICS_RESOURCES: return createListClientMetricsResourcesResponse(); + case LIST_CONFIG_RESOURCES: return createListConfigResourcesResponse(); case DESCRIBE_TOPIC_PARTITIONS: return createDescribeTopicPartitionsResponse(); case SHARE_GROUP_HEARTBEAT: return createShareGroupHeartbeatResponse(); case SHARE_GROUP_DESCRIBE: return createShareGroupDescribeResponse(); @@ -3636,15 +3636,15 @@ public class RequestResponseTest { return new PushTelemetryResponse(response); } - private ListClientMetricsResourcesRequest createListClientMetricsResourcesRequest(short version) { - return new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData()).build(version); + private ListConfigResourcesRequest createListConfigResourcesRequest(short version) { + return new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()).build(version); } - private ListClientMetricsResourcesResponse createListClientMetricsResourcesResponse() { - ListClientMetricsResourcesResponseData response = new ListClientMetricsResourcesResponseData(); + private ListConfigResourcesResponse createListConfigResourcesResponse() { + ListConfigResourcesResponseData response = new ListConfigResourcesResponseData(); response.setErrorCode(Errors.NONE.code()); response.setThrottleTimeMs(10); - return new ListClientMetricsResourcesResponse(response); + return new ListConfigResourcesResponse(response); } private InitializeShareGroupStateRequest createInitializeShareGroupStateRequest(short version) { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index b9459b875e2..ad19902ea79 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -27,6 +27,7 @@ import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.admin.EndpointType import org.apache.kafka.common.acl.AclOperation import org.apache.kafka.common.acl.AclOperation._ +import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, SHARE_GROUP_STATE_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal} import org.apache.kafka.common.internals.{FatalExitError, Plugin, Topic} @@ -34,7 +35,6 @@ import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartit import org.apache.kafka.common.message.DeleteRecordsResponseData.{DeleteRecordsPartitionResult, DeleteRecordsTopicResult} import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic -import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse} import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic} @@ -227,7 +227,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.DESCRIBE_TOPIC_PARTITIONS => handleDescribeTopicPartitionsRequest(request) case ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS => handleGetTelemetrySubscriptionsRequest(request) case ApiKeys.PUSH_TELEMETRY => handlePushTelemetryRequest(request) - case ApiKeys.LIST_CLIENT_METRICS_RESOURCES => handleListClientMetricsResources(request) + case ApiKeys.LIST_CONFIG_RESOURCES => handleListConfigResources(request) case ApiKeys.ADD_RAFT_VOTER => forwardToController(request) case ApiKeys.REMOVE_RAFT_VOTER => forwardToController(request) case ApiKeys.SHARE_GROUP_HEARTBEAT => handleShareGroupHeartbeat(request).exceptionally(handleError) @@ -2927,16 +2927,60 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleListClientMetricsResources(request: RequestChannel.Request): Unit = { - val listClientMetricsResourcesRequest = request.body[ListClientMetricsResourcesRequest] + /** + * Handle ListConfigResourcesRequest. If resourceTypes are not specified, it uses ListConfigResourcesRequest#supportedResourceTypes + * to retrieve config resources. If resourceTypes are specified, it returns matched config resources. + * If a config resource type is not supported, the handler returns UNSUPPORTED_VERSION. + */ + private def handleListConfigResources(request: RequestChannel.Request): Unit = { + val listConfigResourcesRequest = request.body[ListConfigResourcesRequest] if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)) { - requestHelper.sendMaybeThrottle(request, listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) + requestHelper.sendMaybeThrottle(request, listConfigResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) } else { - val data = new ListClientMetricsResourcesResponseData().setClientMetricsResources( - clientMetricsManager.listClientMetricsResources.stream.map( - name => new ClientMetricsResource().setName(name)).collect(Collectors.toList())) - requestHelper.sendMaybeThrottle(request, new ListClientMetricsResourcesResponse(data)) + val data = new ListConfigResourcesResponseData() + + val supportedResourceTypes = listConfigResourcesRequest.supportedResourceTypes() + var resourceTypes = listConfigResourcesRequest.data().resourceTypes() + if (resourceTypes.isEmpty) { + resourceTypes = supportedResourceTypes.stream().toList + } + + resourceTypes.forEach(resourceType => + if (!supportedResourceTypes.contains(resourceType)) { + requestHelper.sendMaybeThrottle(request, new ListConfigResourcesResponse(data.setErrorCode(Errors.UNSUPPORTED_VERSION.code()))) + return + } + ) + + val result = new util.ArrayList[ListConfigResourcesResponseData.ConfigResource]() + if (resourceTypes.contains(ConfigResource.Type.GROUP.id)) { + groupConfigManager.groupIds().forEach(id => + result.add(new ListConfigResourcesResponseData.ConfigResource().setResourceName(id).setResourceType(ConfigResource.Type.GROUP.id)) + ) + } + if (resourceTypes.contains(ConfigResource.Type.CLIENT_METRICS.id)) { + clientMetricsManager.listClientMetricsResources.forEach(name => + result.add(new ListConfigResourcesResponseData.ConfigResource().setResourceName(name).setResourceType(ConfigResource.Type.CLIENT_METRICS.id)) + ) + } + if (resourceTypes.contains(ConfigResource.Type.BROKER_LOGGER.id)) { + metadataCache.getBrokerNodes(request.context.listenerName).forEach(node => + result.add(new ListConfigResourcesResponseData.ConfigResource().setResourceName(node.id.toString).setResourceType(ConfigResource.Type.BROKER_LOGGER.id)) + ) + } + if (resourceTypes.contains(ConfigResource.Type.BROKER.id)) { + metadataCache.getBrokerNodes(request.context.listenerName).forEach(node => + result.add(new ListConfigResourcesResponseData.ConfigResource().setResourceName(node.id.toString).setResourceType(ConfigResource.Type.BROKER.id)) + ) + } + if (resourceTypes.contains(ConfigResource.Type.TOPIC.id)) { + metadataCache.getAllTopics.forEach(name => + result.add(new ListConfigResourcesResponseData.ConfigResource().setResourceName(name).setResourceType(ConfigResource.Type.TOPIC.id)) + ) + } + data.setConfigResources(result) + requestHelper.sendMaybeThrottle(request, new ListConfigResourcesResponse(data)) } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 045e1755006..f698c8ef9ef 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -51,7 +51,6 @@ import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.{De import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource, AlterConfigsResourceCollection => IAlterConfigsResourceCollection, AlterableConfig => IAlterableConfig, AlterableConfigCollection => IAlterableConfigCollection} import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{AlterConfigsResourceResponse => IAlterConfigsResourceResponse} import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity -import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic @@ -11207,47 +11206,235 @@ class KafkaApisTest extends Logging { } @Test - def testListClientMetricsResources(): Unit = { - val request = buildRequest(new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData()).build()) - metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + def testListConfigResourcesV0(): Unit = { + val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()).build(0)) + metadataCache = mock(classOf[KRaftMetadataCache]) + + val resources = util.Set.of("client-metric1", "client-metric2") + when(clientMetricsManager.listClientMetricsResources).thenReturn(resources) - val resources = new mutable.HashSet[String] - resources.add("test1") - resources.add("test2") - when(clientMetricsManager.listClientMetricsResources).thenReturn(resources.asJava) kafkaApis = createKafkaApis() kafkaApis.handle(request, RequestLocal.noCaching) - val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request) - val expectedResponse = new ListClientMetricsResourcesResponseData().setClientMetricsResources( - resources.map(resource => new ClientMetricsResource().setName(resource)).toBuffer.asJava) + val response = verifyNoThrottling[ListConfigResourcesResponse](request) + val expectedResponseData = new ListConfigResourcesResponseData() + .setConfigResources( + resources.stream.map(resource => + new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource) + ).collect(util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource])) + assertEquals(expectedResponseData, response.data) + + verify(metadataCache, never).getAllTopics + verify(groupConfigManager, never).groupIds + verify(metadataCache, never).getBrokerNodes(any) + } + + @Test + def testListConfigResourcesV1WithEmptyResourceTypes(): Unit = { + val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()).build(1)) + metadataCache = mock(classOf[KRaftMetadataCache]) + + val clientMetrics = util.Set.of("client-metric1", "client-metric2") + val topics = util.Set.of("topic1", "topic2") + val groupIds = util.List.of("group1", "group2") + val nodeIds = util.List.of(1, 2) + when(clientMetricsManager.listClientMetricsResources).thenReturn(clientMetrics) + when(metadataCache.getAllTopics).thenReturn(topics) + when(groupConfigManager.groupIds).thenReturn(groupIds) + when(metadataCache.getBrokerNodes(any())).thenReturn( + nodeIds.stream().map(id => new Node(id, "localhost", 1234)).collect(java.util.stream.Collectors.toList())) + + kafkaApis = createKafkaApis() + kafkaApis.handle(request, RequestLocal.noCaching) + val response = verifyNoThrottling[ListConfigResourcesResponse](request) + val expectedResponseData = new ListConfigResourcesResponseData() + .setConfigResources( + util.stream.Stream.of( + groupIds.stream().map(resource => + new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.GROUP.id) + ).toList, + clientMetrics.stream.map(resource => + new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.CLIENT_METRICS.id) + ).toList, + nodeIds.stream().map(resource => + new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource.toString).setResourceType(ConfigResource.Type.BROKER_LOGGER.id) + ).toList, + nodeIds.stream().map(resource => + new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource.toString).setResourceType(ConfigResource.Type.BROKER.id) + ).toList, + topics.stream().map(resource => + new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.TOPIC.id) + ).toList + ).flatMap(s => s.stream).collect(util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource])) + assertEquals(expectedResponseData, response.data) + } + + @Test + def testListConfigResourcesV1WithGroup(): Unit = { + val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData() + .setResourceTypes(util.List.of(ConfigResource.Type.GROUP.id))).build(1)) + metadataCache = mock(classOf[KRaftMetadataCache]) + + val groupIds = util.List.of("group1", "group2") + when(groupConfigManager.groupIds).thenReturn(groupIds) + + kafkaApis = createKafkaApis() + kafkaApis.handle(request, RequestLocal.noCaching) + val response = verifyNoThrottling[ListConfigResourcesResponse](request) + val expectedResponseData = new ListConfigResourcesResponseData() + .setConfigResources( + groupIds.stream().map(resource => + new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.GROUP.id) + ).collect(util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource])) + assertEquals(expectedResponseData, response.data) + + verify(metadataCache, never).getAllTopics + verify(clientMetricsManager, never).listClientMetricsResources + verify(metadataCache, never).getBrokerNodes(any) + } + + @Test + def testListConfigResourcesV1WithClientMetrics(): Unit = { + val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData() + .setResourceTypes(util.List.of(ConfigResource.Type.CLIENT_METRICS.id))).build(1)) + metadataCache = mock(classOf[KRaftMetadataCache]) + + val clientMetrics = util.Set.of("client-metric1", "client-metric2") + when(clientMetricsManager.listClientMetricsResources).thenReturn(clientMetrics) + + kafkaApis = createKafkaApis() + kafkaApis.handle(request, RequestLocal.noCaching) + val response = verifyNoThrottling[ListConfigResourcesResponse](request) + val expectedResponseData = new ListConfigResourcesResponseData() + .setConfigResources( + clientMetrics.stream.map(resource => + new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.CLIENT_METRICS.id) + ).collect(util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource])) + assertEquals(expectedResponseData, response.data) + + verify(metadataCache, never).getAllTopics + verify(groupConfigManager, never).groupIds + verify(metadataCache, never).getBrokerNodes(any) + } + + @Test + def testListConfigResourcesV1WithBrokerLogger(): Unit = { + val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData() + .setResourceTypes(util.List.of(ConfigResource.Type.BROKER_LOGGER.id))).build(1)) + metadataCache = mock(classOf[KRaftMetadataCache]) + + val nodeIds = util.List.of(1, 2) + when(metadataCache.getBrokerNodes(any())).thenReturn( + nodeIds.stream().map(id => new Node(id, "localhost", 1234)).collect(java.util.stream.Collectors.toList())) + + kafkaApis = createKafkaApis() + kafkaApis.handle(request, RequestLocal.noCaching) + val response = verifyNoThrottling[ListConfigResourcesResponse](request) + val expectedResponseData = new ListConfigResourcesResponseData() + .setConfigResources( + nodeIds.stream().map(resource => + new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource.toString).setResourceType(ConfigResource.Type.BROKER_LOGGER.id) + ).collect(java.util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource])) + assertEquals(expectedResponseData, response.data) + + verify(metadataCache, never).getAllTopics + verify(groupConfigManager, never).groupIds + verify(clientMetricsManager, never).listClientMetricsResources + } + + @Test + def testListConfigResourcesV1WithBroker(): Unit = { + val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData() + .setResourceTypes(util.List.of(ConfigResource.Type.BROKER.id))).build(1)) + metadataCache = mock(classOf[KRaftMetadataCache]) + + val nodeIds = util.List.of(1, 2) + when(metadataCache.getBrokerNodes(any())).thenReturn( + nodeIds.stream().map(id => new Node(id, "localhost", 1234)).collect(java.util.stream.Collectors.toList())) + + kafkaApis = createKafkaApis() + kafkaApis.handle(request, RequestLocal.noCaching) + val response = verifyNoThrottling[ListConfigResourcesResponse](request) + val expectedResponseData = new ListConfigResourcesResponseData() + .setConfigResources( + nodeIds.stream().map(resource => + new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource.toString).setResourceType(ConfigResource.Type.BROKER.id) + ).collect(java.util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource])) + assertEquals(expectedResponseData, response.data) + + verify(metadataCache, never).getAllTopics + verify(groupConfigManager, never).groupIds + verify(clientMetricsManager, never).listClientMetricsResources + } + + @Test + def testListConfigResourcesV1WithTopic(): Unit = { + val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData() + .setResourceTypes(util.List.of(ConfigResource.Type.TOPIC.id))).build(1)) + metadataCache = mock(classOf[KRaftMetadataCache]) + + val topics = util.Set.of("topic1", "topic2") + when(metadataCache.getAllTopics).thenReturn(topics) + + kafkaApis = createKafkaApis() + kafkaApis.handle(request, RequestLocal.noCaching) + val response = verifyNoThrottling[ListConfigResourcesResponse](request) + val expectedResponseData = new ListConfigResourcesResponseData() + .setConfigResources( + topics.stream().map(resource => + new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.TOPIC.id) + ).collect(java.util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource])) + assertEquals(expectedResponseData, response.data) + + verify(groupConfigManager, never).groupIds + verify(clientMetricsManager, never).listClientMetricsResources + verify(metadataCache, never).getBrokerNodes(any) + } + + @Test + def testListConfigResourcesEmptyResponse(): Unit = { + val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()).build()) + metadataCache = mock(classOf[KRaftMetadataCache]) + + when(clientMetricsManager.listClientMetricsResources).thenReturn(util.Set.of) + when(metadataCache.getAllTopics).thenReturn(util.Set.of) + when(groupConfigManager.groupIds).thenReturn(util.List.of) + when(metadataCache.getBrokerNodes(any())).thenReturn(util.List.of) + + kafkaApis = createKafkaApis() + kafkaApis.handle(request, RequestLocal.noCaching) + val response = verifyNoThrottling[ListConfigResourcesResponse](request) + val expectedResponse = new ListConfigResourcesResponseData() assertEquals(expectedResponse, response.data) } @Test - def testListClientMetricsResourcesEmptyResponse(): Unit = { - val request = buildRequest(new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData()).build()) - metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + def testListConfigResourcesV1WithUnknown(): Unit = { + val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData() + .setResourceTypes(util.List.of(ConfigResource.Type.UNKNOWN.id))).build(1)) + metadataCache = mock(classOf[KRaftMetadataCache]) - val resources = new mutable.HashSet[String] - when(clientMetricsManager.listClientMetricsResources).thenReturn(resources.asJava) kafkaApis = createKafkaApis() kafkaApis.handle(request, RequestLocal.noCaching) - val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request) - val expectedResponse = new ListClientMetricsResourcesResponseData() - assertEquals(expectedResponse, response.data) + val response = verifyNoThrottling[ListConfigResourcesResponse](request) + assertEquals(Errors.UNSUPPORTED_VERSION.code(), response.data.errorCode()) + + verify(metadataCache, never).getAllTopics + verify(groupConfigManager, never).groupIds + verify(clientMetricsManager, never).listClientMetricsResources + verify(metadataCache, never).getBrokerNodes(any) } @Test - def testListClientMetricsResourcesWithException(): Unit = { - val request = buildRequest(new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData()).build()) + def testListConfigResourcesWithException(): Unit = { + val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()).build()) metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) when(clientMetricsManager.listClientMetricsResources).thenThrow(new RuntimeException("test")) kafkaApis = createKafkaApis() kafkaApis.handle(request, RequestLocal.noCaching) - val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request) + val response = verifyNoThrottling[ListConfigResourcesResponse](request) - val expectedResponse = new ListClientMetricsResourcesResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code) + val expectedResponse = new ListConfigResourcesResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code) assertEquals(expectedResponse, response.data) } diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index c55f55fad3e..db03c891e4c 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -641,8 +641,8 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.ASSIGN_REPLICAS_TO_DIRS => new AssignReplicasToDirsRequest.Builder(new AssignReplicasToDirsRequestData()) - case ApiKeys.LIST_CLIENT_METRICS_RESOURCES => - new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData()) + case ApiKeys.LIST_CONFIG_RESOURCES => + new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()) case ApiKeys.DESCRIBE_TOPIC_PARTITIONS => new DescribeTopicPartitionsRequest.Builder(new DescribeTopicPartitionsRequestData()) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java index 87a45110cd1..80ef0d24a4a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -68,6 +69,10 @@ public class GroupConfigManager implements AutoCloseable { return Optional.ofNullable(configMap.get(groupId)); } + public List groupIds() { + return List.copyOf(configMap.keySet()); + } + /** * Validate the given properties. * diff --git a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java index 56914f2fe46..e2a76e5caba 100644 --- a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java +++ b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java @@ -130,8 +130,8 @@ import org.apache.kafka.common.message.JoinGroupRequestDataJsonConverter; import org.apache.kafka.common.message.JoinGroupResponseDataJsonConverter; import org.apache.kafka.common.message.LeaveGroupRequestDataJsonConverter; import org.apache.kafka.common.message.LeaveGroupResponseDataJsonConverter; -import org.apache.kafka.common.message.ListClientMetricsResourcesRequestDataJsonConverter; -import org.apache.kafka.common.message.ListClientMetricsResourcesResponseDataJsonConverter; +import org.apache.kafka.common.message.ListConfigResourcesRequestDataJsonConverter; +import org.apache.kafka.common.message.ListConfigResourcesResponseDataJsonConverter; import org.apache.kafka.common.message.ListGroupsRequestDataJsonConverter; import org.apache.kafka.common.message.ListGroupsResponseDataJsonConverter; import org.apache.kafka.common.message.ListOffsetsRequestDataJsonConverter; @@ -312,8 +312,8 @@ import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.JoinGroupResponse; import org.apache.kafka.common.requests.LeaveGroupRequest; import org.apache.kafka.common.requests.LeaveGroupResponse; -import org.apache.kafka.common.requests.ListClientMetricsResourcesRequest; -import org.apache.kafka.common.requests.ListClientMetricsResourcesResponse; +import org.apache.kafka.common.requests.ListConfigResourcesRequest; +import org.apache.kafka.common.requests.ListConfigResourcesResponse; import org.apache.kafka.common.requests.ListGroupsRequest; import org.apache.kafka.common.requests.ListGroupsResponse; import org.apache.kafka.common.requests.ListOffsetsRequest; @@ -507,8 +507,8 @@ public class RequestConvertToJson { return JoinGroupRequestDataJsonConverter.write(((JoinGroupRequest) request).data(), request.version()); case LEAVE_GROUP: return LeaveGroupRequestDataJsonConverter.write(((LeaveGroupRequest) request).data(), request.version()); - case LIST_CLIENT_METRICS_RESOURCES: - return ListClientMetricsResourcesRequestDataJsonConverter.write(((ListClientMetricsResourcesRequest) request).data(), request.version()); + case LIST_CONFIG_RESOURCES: + return ListConfigResourcesRequestDataJsonConverter.write(((ListConfigResourcesRequest) request).data(), request.version()); case LIST_GROUPS: return ListGroupsRequestDataJsonConverter.write(((ListGroupsRequest) request).data(), request.version()); case LIST_OFFSETS: @@ -693,8 +693,8 @@ public class RequestConvertToJson { return JoinGroupResponseDataJsonConverter.write(((JoinGroupResponse) response).data(), version); case LEAVE_GROUP: return LeaveGroupResponseDataJsonConverter.write(((LeaveGroupResponse) response).data(), version); - case LIST_CLIENT_METRICS_RESOURCES: - return ListClientMetricsResourcesResponseDataJsonConverter.write(((ListClientMetricsResourcesResponse) response).data(), version); + case LIST_CONFIG_RESOURCES: + return ListConfigResourcesResponseDataJsonConverter.write(((ListConfigResourcesResponse) response).data(), version); case LIST_GROUPS: return ListGroupsResponseDataJsonConverter.write(((ListGroupsResponse) response).data(), version); case LIST_OFFSETS: