mirror of https://github.com/apache/kafka.git
KAFKA-18904: [1/N] Change ListClientMetricsResources API to ListConfigResources (#19493)
* Change `ListClientMetricsResourcesRequest.json` to `ListConfigResourcesRequest.json`. * Change `ListClientMetricsResourcesResponse.json` to `ListConfigResourcesResponse.json`. * Change `ListClientMetricsResourcesRequest.java` to `ListConfigResourcesRequest.java`. * Change `ListClientMetricsResourcesResponse.java` to `ListConfigResourcesResponsejava`. * Change `KafkaApis` to handle both `ListClientMetricsResourcesRequest` v0 and v1 requests. Reviewers: Andrew Schofield <aschofield@confluent.io> --------- Signed-off-by: PoAn Yang <payang@apache.org>
This commit is contained in:
parent
567a03dd14
commit
c26b09c609
|
@ -159,7 +159,7 @@ import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.U
|
|||
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
|
||||
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
|
||||
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
|
||||
import org.apache.kafka.common.message.ListClientMetricsResourcesRequestData;
|
||||
import org.apache.kafka.common.message.ListConfigResourcesRequestData;
|
||||
import org.apache.kafka.common.message.ListGroupsRequestData;
|
||||
import org.apache.kafka.common.message.ListGroupsResponseData;
|
||||
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
|
||||
|
@ -233,8 +233,8 @@ import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
|
|||
import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
|
||||
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
|
||||
import org.apache.kafka.common.requests.JoinGroupRequest;
|
||||
import org.apache.kafka.common.requests.ListClientMetricsResourcesRequest;
|
||||
import org.apache.kafka.common.requests.ListClientMetricsResourcesResponse;
|
||||
import org.apache.kafka.common.requests.ListConfigResourcesRequest;
|
||||
import org.apache.kafka.common.requests.ListConfigResourcesResponse;
|
||||
import org.apache.kafka.common.requests.ListGroupsRequest;
|
||||
import org.apache.kafka.common.requests.ListGroupsResponse;
|
||||
import org.apache.kafka.common.requests.ListOffsetsRequest;
|
||||
|
@ -4887,13 +4887,16 @@ public class KafkaAdminClient extends AdminClient {
|
|||
new LeastLoadedNodeProvider()) {
|
||||
|
||||
@Override
|
||||
ListClientMetricsResourcesRequest.Builder createRequest(int timeoutMs) {
|
||||
return new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData());
|
||||
ListConfigResourcesRequest.Builder createRequest(int timeoutMs) {
|
||||
return new ListConfigResourcesRequest.Builder(
|
||||
new ListConfigResourcesRequestData()
|
||||
.setResourceTypes(List.of(ConfigResource.Type.CLIENT_METRICS.id()))
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
void handleResponse(AbstractResponse abstractResponse) {
|
||||
ListClientMetricsResourcesResponse response = (ListClientMetricsResourcesResponse) abstractResponse;
|
||||
ListConfigResourcesResponse response = (ListConfigResourcesResponse) abstractResponse;
|
||||
if (response.error().isFailure()) {
|
||||
future.completeExceptionally(response.error().exception());
|
||||
} else {
|
||||
|
|
|
@ -116,7 +116,7 @@ public enum ApiKeys {
|
|||
GET_TELEMETRY_SUBSCRIPTIONS(ApiMessageType.GET_TELEMETRY_SUBSCRIPTIONS),
|
||||
PUSH_TELEMETRY(ApiMessageType.PUSH_TELEMETRY),
|
||||
ASSIGN_REPLICAS_TO_DIRS(ApiMessageType.ASSIGN_REPLICAS_TO_DIRS),
|
||||
LIST_CLIENT_METRICS_RESOURCES(ApiMessageType.LIST_CLIENT_METRICS_RESOURCES),
|
||||
LIST_CONFIG_RESOURCES(ApiMessageType.LIST_CONFIG_RESOURCES),
|
||||
DESCRIBE_TOPIC_PARTITIONS(ApiMessageType.DESCRIBE_TOPIC_PARTITIONS),
|
||||
SHARE_GROUP_HEARTBEAT(ApiMessageType.SHARE_GROUP_HEARTBEAT),
|
||||
SHARE_GROUP_DESCRIBE(ApiMessageType.SHARE_GROUP_DESCRIBE),
|
||||
|
|
|
@ -316,8 +316,8 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
|
|||
return PushTelemetryRequest.parse(readable, apiVersion);
|
||||
case ASSIGN_REPLICAS_TO_DIRS:
|
||||
return AssignReplicasToDirsRequest.parse(readable, apiVersion);
|
||||
case LIST_CLIENT_METRICS_RESOURCES:
|
||||
return ListClientMetricsResourcesRequest.parse(readable, apiVersion);
|
||||
case LIST_CONFIG_RESOURCES:
|
||||
return ListConfigResourcesRequest.parse(readable, apiVersion);
|
||||
case DESCRIBE_TOPIC_PARTITIONS:
|
||||
return DescribeTopicPartitionsRequest.parse(readable, apiVersion);
|
||||
case SHARE_GROUP_HEARTBEAT:
|
||||
|
|
|
@ -253,8 +253,8 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
|
|||
return PushTelemetryResponse.parse(readable, version);
|
||||
case ASSIGN_REPLICAS_TO_DIRS:
|
||||
return AssignReplicasToDirsResponse.parse(readable, version);
|
||||
case LIST_CLIENT_METRICS_RESOURCES:
|
||||
return ListClientMetricsResourcesResponse.parse(readable, version);
|
||||
case LIST_CONFIG_RESOURCES:
|
||||
return ListConfigResourcesResponse.parse(readable, version);
|
||||
case DESCRIBE_TOPIC_PARTITIONS:
|
||||
return DescribeTopicPartitionsResponse.parse(readable, version);
|
||||
case SHARE_GROUP_HEARTBEAT:
|
||||
|
|
|
@ -1,75 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.common.requests;
|
||||
|
||||
import org.apache.kafka.common.message.ListClientMetricsResourcesRequestData;
|
||||
import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData;
|
||||
import org.apache.kafka.common.protocol.ApiKeys;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.protocol.Readable;
|
||||
|
||||
public class ListClientMetricsResourcesRequest extends AbstractRequest {
|
||||
public static class Builder extends AbstractRequest.Builder<ListClientMetricsResourcesRequest> {
|
||||
public final ListClientMetricsResourcesRequestData data;
|
||||
|
||||
public Builder(ListClientMetricsResourcesRequestData data) {
|
||||
super(ApiKeys.LIST_CLIENT_METRICS_RESOURCES);
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListClientMetricsResourcesRequest build(short version) {
|
||||
return new ListClientMetricsResourcesRequest(data, version);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return data.toString();
|
||||
}
|
||||
}
|
||||
|
||||
private final ListClientMetricsResourcesRequestData data;
|
||||
|
||||
private ListClientMetricsResourcesRequest(ListClientMetricsResourcesRequestData data, short version) {
|
||||
super(ApiKeys.LIST_CLIENT_METRICS_RESOURCES, version);
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
public ListClientMetricsResourcesRequestData data() {
|
||||
return data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListClientMetricsResourcesResponse getErrorResponse(int throttleTimeMs, Throwable e) {
|
||||
Errors error = Errors.forException(e);
|
||||
ListClientMetricsResourcesResponseData response = new ListClientMetricsResourcesResponseData()
|
||||
.setErrorCode(error.code())
|
||||
.setThrottleTimeMs(throttleTimeMs);
|
||||
return new ListClientMetricsResourcesResponse(response);
|
||||
}
|
||||
|
||||
public static ListClientMetricsResourcesRequest parse(Readable readable, short version) {
|
||||
return new ListClientMetricsResourcesRequest(new ListClientMetricsResourcesRequestData(
|
||||
readable, version), version);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString(boolean verbose) {
|
||||
return data.toString();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,95 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.common.requests;
|
||||
|
||||
import org.apache.kafka.common.config.ConfigResource;
|
||||
import org.apache.kafka.common.message.ListConfigResourcesRequestData;
|
||||
import org.apache.kafka.common.message.ListConfigResourcesResponseData;
|
||||
import org.apache.kafka.common.protocol.ApiKeys;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.protocol.Readable;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
public class ListConfigResourcesRequest extends AbstractRequest {
|
||||
public static class Builder extends AbstractRequest.Builder<ListConfigResourcesRequest> {
|
||||
public final ListConfigResourcesRequestData data;
|
||||
|
||||
public Builder(ListConfigResourcesRequestData data) {
|
||||
super(ApiKeys.LIST_CONFIG_RESOURCES);
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListConfigResourcesRequest build(short version) {
|
||||
return new ListConfigResourcesRequest(data, version);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return data.toString();
|
||||
}
|
||||
}
|
||||
|
||||
private final ListConfigResourcesRequestData data;
|
||||
|
||||
private ListConfigResourcesRequest(ListConfigResourcesRequestData data, short version) {
|
||||
super(ApiKeys.LIST_CONFIG_RESOURCES, version);
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
public ListConfigResourcesRequestData data() {
|
||||
return data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListConfigResourcesResponse getErrorResponse(int throttleTimeMs, Throwable e) {
|
||||
Errors error = Errors.forException(e);
|
||||
ListConfigResourcesResponseData response = new ListConfigResourcesResponseData()
|
||||
.setErrorCode(error.code())
|
||||
.setThrottleTimeMs(throttleTimeMs);
|
||||
return new ListConfigResourcesResponse(response);
|
||||
}
|
||||
|
||||
public static ListConfigResourcesRequest parse(Readable readable, short version) {
|
||||
return new ListConfigResourcesRequest(new ListConfigResourcesRequestData(
|
||||
readable, version), version);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString(boolean verbose) {
|
||||
return data.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the supported config resource types in different request version.
|
||||
* If there is a new config resource type, the ListConfigResourcesRequest should bump a new request version to include it.
|
||||
* For v0, the supported config resource types contain CLIENT_METRICS (16).
|
||||
* For v1, the supported config resource types contain TOPIC (2), BROKER (4), BROKER_LOGGER (8), CLIENT_METRICS (16), and GROUP (32).
|
||||
*/
|
||||
public Set<Byte> supportedResourceTypes() {
|
||||
return version() == 0 ?
|
||||
Set.of(ConfigResource.Type.CLIENT_METRICS.id()) :
|
||||
Set.of(
|
||||
ConfigResource.Type.TOPIC.id(),
|
||||
ConfigResource.Type.BROKER.id(),
|
||||
ConfigResource.Type.BROKER_LOGGER.id(),
|
||||
ConfigResource.Type.CLIENT_METRICS.id(),
|
||||
ConfigResource.Type.GROUP.id()
|
||||
);
|
||||
}
|
||||
}
|
|
@ -17,7 +17,8 @@
|
|||
package org.apache.kafka.common.requests;
|
||||
|
||||
import org.apache.kafka.clients.admin.ClientMetricsResourceListing;
|
||||
import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData;
|
||||
import org.apache.kafka.common.config.ConfigResource;
|
||||
import org.apache.kafka.common.message.ListConfigResourcesResponseData;
|
||||
import org.apache.kafka.common.protocol.ApiKeys;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.protocol.Readable;
|
||||
|
@ -26,15 +27,15 @@ import java.util.Collection;
|
|||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class ListClientMetricsResourcesResponse extends AbstractResponse {
|
||||
private final ListClientMetricsResourcesResponseData data;
|
||||
public class ListConfigResourcesResponse extends AbstractResponse {
|
||||
private final ListConfigResourcesResponseData data;
|
||||
|
||||
public ListClientMetricsResourcesResponse(ListClientMetricsResourcesResponseData data) {
|
||||
super(ApiKeys.LIST_CLIENT_METRICS_RESOURCES);
|
||||
public ListConfigResourcesResponse(ListConfigResourcesResponseData data) {
|
||||
super(ApiKeys.LIST_CONFIG_RESOURCES);
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
public ListClientMetricsResourcesResponseData data() {
|
||||
public ListConfigResourcesResponseData data() {
|
||||
return data;
|
||||
}
|
||||
|
||||
|
@ -47,8 +48,8 @@ public class ListClientMetricsResourcesResponse extends AbstractResponse {
|
|||
return errorCounts(Errors.forCode(data.errorCode()));
|
||||
}
|
||||
|
||||
public static ListClientMetricsResourcesResponse parse(Readable readable, short version) {
|
||||
return new ListClientMetricsResourcesResponse(new ListClientMetricsResourcesResponseData(
|
||||
public static ListConfigResourcesResponse parse(Readable readable, short version) {
|
||||
return new ListConfigResourcesResponse(new ListConfigResourcesResponseData(
|
||||
readable, version));
|
||||
}
|
||||
|
||||
|
@ -67,10 +68,22 @@ public class ListClientMetricsResourcesResponse extends AbstractResponse {
|
|||
data.setThrottleTimeMs(throttleTimeMs);
|
||||
}
|
||||
|
||||
public Collection<ClientMetricsResourceListing> clientMetricsResources() {
|
||||
return data.clientMetricsResources()
|
||||
public Collection<ConfigResource> configResources() {
|
||||
return data.configResources()
|
||||
.stream()
|
||||
.map(entry -> new ClientMetricsResourceListing(entry.name()))
|
||||
.map(entry ->
|
||||
new ConfigResource(
|
||||
ConfigResource.Type.forId(entry.resourceType()),
|
||||
entry.resourceName()
|
||||
)
|
||||
).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public Collection<ClientMetricsResourceListing> clientMetricsResources() {
|
||||
return data.configResources()
|
||||
.stream()
|
||||
.filter(entry -> entry.resourceType() == ConfigResource.Type.CLIENT_METRICS.id())
|
||||
.map(entry -> new ClientMetricsResourceListing(entry.resourceName()))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
|
@ -17,10 +17,15 @@
|
|||
"apiKey": 74,
|
||||
"type": "request",
|
||||
"listeners": ["broker"],
|
||||
"name": "ListClientMetricsResourcesRequest",
|
||||
"validVersions": "0",
|
||||
"name": "ListConfigResourcesRequest",
|
||||
// Version 0 is used as ListClientMetricsResourcesRequest which only lists client metrics resources.
|
||||
// Version 1 adds ResourceTypes field (KIP-1142). If there is no specified ResourceTypes, it should return all configuration resources.
|
||||
"validVersions": "0-1",
|
||||
"flexibleVersions": "0+",
|
||||
"fields": [
|
||||
{ "name": "ResourceTypes", "type": "[]int8", "versions": "1+",
|
||||
"about": "The list of resource type. If the list is empty, it uses default supported config resource types."
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
@ -16,18 +16,22 @@
|
|||
{
|
||||
"apiKey": 74,
|
||||
"type": "response",
|
||||
"name": "ListClientMetricsResourcesResponse",
|
||||
"validVersions": "0",
|
||||
"name": "ListConfigResourcesResponse",
|
||||
// Version 0 is used as ListClientMetricsResourcesResponse which returns all client metrics resources.
|
||||
// Version 1 adds ResourceType to ConfigResources (KIP-1142).
|
||||
"validVersions": "0-1",
|
||||
"flexibleVersions": "0+",
|
||||
"fields": [
|
||||
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
|
||||
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
|
||||
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
|
||||
"about": "The error code, or 0 if there was no error." },
|
||||
{ "name": "ClientMetricsResources", "type": "[]ClientMetricsResource", "versions": "0+",
|
||||
"about": "Each client metrics resource in the response.", "fields": [
|
||||
{ "name": "Name", "type": "string", "versions": "0+",
|
||||
"about": "The resource name." }
|
||||
{ "name": "ConfigResources", "type": "[]ConfigResource", "versions": "0+",
|
||||
"about": "Each config resource in the response.", "fields": [
|
||||
{ "name": "ResourceName", "type": "string", "versions": "0+",
|
||||
"about": "The resource name." },
|
||||
{ "name": "ResourceType", "type": "int8", "versions": "1+", "ignorable": true, "default": 16,
|
||||
"about": "The resource type." }
|
||||
]}
|
||||
]
|
||||
}
|
|
@ -141,7 +141,7 @@ import org.apache.kafka.common.message.LeaveGroupRequestData;
|
|||
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
|
||||
import org.apache.kafka.common.message.LeaveGroupResponseData;
|
||||
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
|
||||
import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData;
|
||||
import org.apache.kafka.common.message.ListConfigResourcesResponseData;
|
||||
import org.apache.kafka.common.message.ListGroupsResponseData;
|
||||
import org.apache.kafka.common.message.ListGroupsResponseData.ListedGroup;
|
||||
import org.apache.kafka.common.message.ListOffsetsResponseData;
|
||||
|
@ -225,8 +225,8 @@ import org.apache.kafka.common.requests.InitProducerIdResponse;
|
|||
import org.apache.kafka.common.requests.JoinGroupRequest;
|
||||
import org.apache.kafka.common.requests.LeaveGroupRequest;
|
||||
import org.apache.kafka.common.requests.LeaveGroupResponse;
|
||||
import org.apache.kafka.common.requests.ListClientMetricsResourcesRequest;
|
||||
import org.apache.kafka.common.requests.ListClientMetricsResourcesResponse;
|
||||
import org.apache.kafka.common.requests.ListConfigResourcesRequest;
|
||||
import org.apache.kafka.common.requests.ListConfigResourcesResponse;
|
||||
import org.apache.kafka.common.requests.ListGroupsRequest;
|
||||
import org.apache.kafka.common.requests.ListGroupsResponse;
|
||||
import org.apache.kafka.common.requests.ListOffsetsRequest;
|
||||
|
@ -10672,17 +10672,25 @@ public class KafkaAdminClientTest {
|
|||
new ClientMetricsResourceListing("two")
|
||||
);
|
||||
|
||||
ListClientMetricsResourcesResponseData responseData =
|
||||
new ListClientMetricsResourcesResponseData().setErrorCode(Errors.NONE.code());
|
||||
ListConfigResourcesResponseData responseData =
|
||||
new ListConfigResourcesResponseData().setErrorCode(Errors.NONE.code());
|
||||
|
||||
responseData.clientMetricsResources()
|
||||
.add(new ListClientMetricsResourcesResponseData.ClientMetricsResource().setName("one"));
|
||||
responseData.clientMetricsResources()
|
||||
.add((new ListClientMetricsResourcesResponseData.ClientMetricsResource()).setName("two"));
|
||||
responseData.configResources()
|
||||
.add(new ListConfigResourcesResponseData
|
||||
.ConfigResource()
|
||||
.setResourceName("one")
|
||||
.setResourceType(ConfigResource.Type.CLIENT_METRICS.id())
|
||||
);
|
||||
responseData.configResources()
|
||||
.add(new ListConfigResourcesResponseData
|
||||
.ConfigResource()
|
||||
.setResourceName("two")
|
||||
.setResourceType(ConfigResource.Type.CLIENT_METRICS.id())
|
||||
);
|
||||
|
||||
env.kafkaClient().prepareResponse(
|
||||
request -> request instanceof ListClientMetricsResourcesRequest,
|
||||
new ListClientMetricsResourcesResponse(responseData));
|
||||
request -> request instanceof ListConfigResourcesRequest,
|
||||
new ListConfigResourcesResponse(responseData));
|
||||
|
||||
ListClientMetricsResourcesResult result = env.adminClient().listClientMetricsResources();
|
||||
assertEquals(new HashSet<>(expected), new HashSet<>(result.all().get()));
|
||||
|
@ -10694,12 +10702,12 @@ public class KafkaAdminClientTest {
|
|||
try (AdminClientUnitTestEnv env = mockClientEnv()) {
|
||||
List<ClientMetricsResourceListing> expected = Collections.emptyList();
|
||||
|
||||
ListClientMetricsResourcesResponseData responseData =
|
||||
new ListClientMetricsResourcesResponseData().setErrorCode(Errors.NONE.code());
|
||||
ListConfigResourcesResponseData responseData =
|
||||
new ListConfigResourcesResponseData().setErrorCode(Errors.NONE.code());
|
||||
|
||||
env.kafkaClient().prepareResponse(
|
||||
request -> request instanceof ListClientMetricsResourcesRequest,
|
||||
new ListClientMetricsResourcesResponse(responseData));
|
||||
request -> request instanceof ListConfigResourcesRequest,
|
||||
new ListConfigResourcesResponse(responseData));
|
||||
|
||||
ListClientMetricsResourcesResult result = env.adminClient().listClientMetricsResources();
|
||||
assertEquals(new HashSet<>(expected), new HashSet<>(result.all().get()));
|
||||
|
@ -10710,7 +10718,7 @@ public class KafkaAdminClientTest {
|
|||
public void testListClientMetricsResourcesNotSupported() {
|
||||
try (AdminClientUnitTestEnv env = mockClientEnv()) {
|
||||
env.kafkaClient().prepareResponse(
|
||||
request -> request instanceof ListClientMetricsResourcesRequest,
|
||||
request -> request instanceof ListConfigResourcesRequest,
|
||||
prepareListClientMetricsResourcesResponse(Errors.UNSUPPORTED_VERSION));
|
||||
|
||||
ListClientMetricsResourcesResult result = env.adminClient().listClientMetricsResources();
|
||||
|
@ -10776,8 +10784,8 @@ public class KafkaAdminClientTest {
|
|||
}
|
||||
}
|
||||
|
||||
private static ListClientMetricsResourcesResponse prepareListClientMetricsResourcesResponse(Errors error) {
|
||||
return new ListClientMetricsResourcesResponse(new ListClientMetricsResourcesResponseData()
|
||||
private static ListConfigResourcesResponse prepareListClientMetricsResourcesResponse(Errors error) {
|
||||
return new ListConfigResourcesResponse(new ListConfigResourcesResponseData()
|
||||
.setErrorCode(error.code()));
|
||||
}
|
||||
|
||||
|
|
|
@ -175,8 +175,8 @@ import org.apache.kafka.common.message.JoinGroupResponseData;
|
|||
import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
|
||||
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
|
||||
import org.apache.kafka.common.message.LeaveGroupResponseData;
|
||||
import org.apache.kafka.common.message.ListClientMetricsResourcesRequestData;
|
||||
import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData;
|
||||
import org.apache.kafka.common.message.ListConfigResourcesRequestData;
|
||||
import org.apache.kafka.common.message.ListConfigResourcesResponseData;
|
||||
import org.apache.kafka.common.message.ListGroupsRequestData;
|
||||
import org.apache.kafka.common.message.ListGroupsResponseData;
|
||||
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
|
||||
|
@ -1052,7 +1052,7 @@ public class RequestResponseTest {
|
|||
case GET_TELEMETRY_SUBSCRIPTIONS: return createGetTelemetrySubscriptionsRequest(version);
|
||||
case PUSH_TELEMETRY: return createPushTelemetryRequest(version);
|
||||
case ASSIGN_REPLICAS_TO_DIRS: return createAssignReplicasToDirsRequest(version);
|
||||
case LIST_CLIENT_METRICS_RESOURCES: return createListClientMetricsResourcesRequest(version);
|
||||
case LIST_CONFIG_RESOURCES: return createListConfigResourcesRequest(version);
|
||||
case DESCRIBE_TOPIC_PARTITIONS: return createDescribeTopicPartitionsRequest(version);
|
||||
case SHARE_GROUP_HEARTBEAT: return createShareGroupHeartbeatRequest(version);
|
||||
case SHARE_GROUP_DESCRIBE: return createShareGroupDescribeRequest(version);
|
||||
|
@ -1147,7 +1147,7 @@ public class RequestResponseTest {
|
|||
case GET_TELEMETRY_SUBSCRIPTIONS: return createGetTelemetrySubscriptionsResponse();
|
||||
case PUSH_TELEMETRY: return createPushTelemetryResponse();
|
||||
case ASSIGN_REPLICAS_TO_DIRS: return createAssignReplicasToDirsResponse();
|
||||
case LIST_CLIENT_METRICS_RESOURCES: return createListClientMetricsResourcesResponse();
|
||||
case LIST_CONFIG_RESOURCES: return createListConfigResourcesResponse();
|
||||
case DESCRIBE_TOPIC_PARTITIONS: return createDescribeTopicPartitionsResponse();
|
||||
case SHARE_GROUP_HEARTBEAT: return createShareGroupHeartbeatResponse();
|
||||
case SHARE_GROUP_DESCRIBE: return createShareGroupDescribeResponse();
|
||||
|
@ -3636,15 +3636,15 @@ public class RequestResponseTest {
|
|||
return new PushTelemetryResponse(response);
|
||||
}
|
||||
|
||||
private ListClientMetricsResourcesRequest createListClientMetricsResourcesRequest(short version) {
|
||||
return new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData()).build(version);
|
||||
private ListConfigResourcesRequest createListConfigResourcesRequest(short version) {
|
||||
return new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()).build(version);
|
||||
}
|
||||
|
||||
private ListClientMetricsResourcesResponse createListClientMetricsResourcesResponse() {
|
||||
ListClientMetricsResourcesResponseData response = new ListClientMetricsResourcesResponseData();
|
||||
private ListConfigResourcesResponse createListConfigResourcesResponse() {
|
||||
ListConfigResourcesResponseData response = new ListConfigResourcesResponseData();
|
||||
response.setErrorCode(Errors.NONE.code());
|
||||
response.setThrottleTimeMs(10);
|
||||
return new ListClientMetricsResourcesResponse(response);
|
||||
return new ListConfigResourcesResponse(response);
|
||||
}
|
||||
|
||||
private InitializeShareGroupStateRequest createInitializeShareGroupStateRequest(short version) {
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.kafka.clients.CommonClientConfigs
|
|||
import org.apache.kafka.clients.admin.EndpointType
|
||||
import org.apache.kafka.common.acl.AclOperation
|
||||
import org.apache.kafka.common.acl.AclOperation._
|
||||
import org.apache.kafka.common.config.ConfigResource
|
||||
import org.apache.kafka.common.errors._
|
||||
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, SHARE_GROUP_STATE_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
|
||||
import org.apache.kafka.common.internals.{FatalExitError, Plugin, Topic}
|
||||
|
@ -34,7 +35,6 @@ import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartit
|
|||
import org.apache.kafka.common.message.DeleteRecordsResponseData.{DeleteRecordsPartitionResult, DeleteRecordsTopicResult}
|
||||
import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic
|
||||
import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic
|
||||
import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource
|
||||
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition
|
||||
import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse}
|
||||
import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic}
|
||||
|
@ -227,7 +227,7 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
case ApiKeys.DESCRIBE_TOPIC_PARTITIONS => handleDescribeTopicPartitionsRequest(request)
|
||||
case ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS => handleGetTelemetrySubscriptionsRequest(request)
|
||||
case ApiKeys.PUSH_TELEMETRY => handlePushTelemetryRequest(request)
|
||||
case ApiKeys.LIST_CLIENT_METRICS_RESOURCES => handleListClientMetricsResources(request)
|
||||
case ApiKeys.LIST_CONFIG_RESOURCES => handleListConfigResources(request)
|
||||
case ApiKeys.ADD_RAFT_VOTER => forwardToController(request)
|
||||
case ApiKeys.REMOVE_RAFT_VOTER => forwardToController(request)
|
||||
case ApiKeys.SHARE_GROUP_HEARTBEAT => handleShareGroupHeartbeat(request).exceptionally(handleError)
|
||||
|
@ -2927,16 +2927,60 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
}
|
||||
}
|
||||
|
||||
def handleListClientMetricsResources(request: RequestChannel.Request): Unit = {
|
||||
val listClientMetricsResourcesRequest = request.body[ListClientMetricsResourcesRequest]
|
||||
/**
|
||||
* Handle ListConfigResourcesRequest. If resourceTypes are not specified, it uses ListConfigResourcesRequest#supportedResourceTypes
|
||||
* to retrieve config resources. If resourceTypes are specified, it returns matched config resources.
|
||||
* If a config resource type is not supported, the handler returns UNSUPPORTED_VERSION.
|
||||
*/
|
||||
private def handleListConfigResources(request: RequestChannel.Request): Unit = {
|
||||
val listConfigResourcesRequest = request.body[ListConfigResourcesRequest]
|
||||
|
||||
if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)) {
|
||||
requestHelper.sendMaybeThrottle(request, listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
|
||||
requestHelper.sendMaybeThrottle(request, listConfigResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
|
||||
} else {
|
||||
val data = new ListClientMetricsResourcesResponseData().setClientMetricsResources(
|
||||
clientMetricsManager.listClientMetricsResources.stream.map(
|
||||
name => new ClientMetricsResource().setName(name)).collect(Collectors.toList()))
|
||||
requestHelper.sendMaybeThrottle(request, new ListClientMetricsResourcesResponse(data))
|
||||
val data = new ListConfigResourcesResponseData()
|
||||
|
||||
val supportedResourceTypes = listConfigResourcesRequest.supportedResourceTypes()
|
||||
var resourceTypes = listConfigResourcesRequest.data().resourceTypes()
|
||||
if (resourceTypes.isEmpty) {
|
||||
resourceTypes = supportedResourceTypes.stream().toList
|
||||
}
|
||||
|
||||
resourceTypes.forEach(resourceType =>
|
||||
if (!supportedResourceTypes.contains(resourceType)) {
|
||||
requestHelper.sendMaybeThrottle(request, new ListConfigResourcesResponse(data.setErrorCode(Errors.UNSUPPORTED_VERSION.code())))
|
||||
return
|
||||
}
|
||||
)
|
||||
|
||||
val result = new util.ArrayList[ListConfigResourcesResponseData.ConfigResource]()
|
||||
if (resourceTypes.contains(ConfigResource.Type.GROUP.id)) {
|
||||
groupConfigManager.groupIds().forEach(id =>
|
||||
result.add(new ListConfigResourcesResponseData.ConfigResource().setResourceName(id).setResourceType(ConfigResource.Type.GROUP.id))
|
||||
)
|
||||
}
|
||||
if (resourceTypes.contains(ConfigResource.Type.CLIENT_METRICS.id)) {
|
||||
clientMetricsManager.listClientMetricsResources.forEach(name =>
|
||||
result.add(new ListConfigResourcesResponseData.ConfigResource().setResourceName(name).setResourceType(ConfigResource.Type.CLIENT_METRICS.id))
|
||||
)
|
||||
}
|
||||
if (resourceTypes.contains(ConfigResource.Type.BROKER_LOGGER.id)) {
|
||||
metadataCache.getBrokerNodes(request.context.listenerName).forEach(node =>
|
||||
result.add(new ListConfigResourcesResponseData.ConfigResource().setResourceName(node.id.toString).setResourceType(ConfigResource.Type.BROKER_LOGGER.id))
|
||||
)
|
||||
}
|
||||
if (resourceTypes.contains(ConfigResource.Type.BROKER.id)) {
|
||||
metadataCache.getBrokerNodes(request.context.listenerName).forEach(node =>
|
||||
result.add(new ListConfigResourcesResponseData.ConfigResource().setResourceName(node.id.toString).setResourceType(ConfigResource.Type.BROKER.id))
|
||||
)
|
||||
}
|
||||
if (resourceTypes.contains(ConfigResource.Type.TOPIC.id)) {
|
||||
metadataCache.getAllTopics.forEach(name =>
|
||||
result.add(new ListConfigResourcesResponseData.ConfigResource().setResourceName(name).setResourceType(ConfigResource.Type.TOPIC.id))
|
||||
)
|
||||
}
|
||||
data.setConfigResources(result)
|
||||
requestHelper.sendMaybeThrottle(request, new ListConfigResourcesResponse(data))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -51,7 +51,6 @@ import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.{De
|
|||
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource, AlterConfigsResourceCollection => IAlterConfigsResourceCollection, AlterableConfig => IAlterableConfig, AlterableConfigCollection => IAlterableConfigCollection}
|
||||
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{AlterConfigsResourceResponse => IAlterConfigsResourceResponse}
|
||||
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
|
||||
import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource
|
||||
import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
|
||||
import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse}
|
||||
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
|
||||
|
@ -11207,47 +11206,235 @@ class KafkaApisTest extends Logging {
|
|||
}
|
||||
|
||||
@Test
|
||||
def testListClientMetricsResources(): Unit = {
|
||||
val request = buildRequest(new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData()).build())
|
||||
metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
|
||||
def testListConfigResourcesV0(): Unit = {
|
||||
val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()).build(0))
|
||||
metadataCache = mock(classOf[KRaftMetadataCache])
|
||||
|
||||
val resources = util.Set.of("client-metric1", "client-metric2")
|
||||
when(clientMetricsManager.listClientMetricsResources).thenReturn(resources)
|
||||
|
||||
val resources = new mutable.HashSet[String]
|
||||
resources.add("test1")
|
||||
resources.add("test2")
|
||||
when(clientMetricsManager.listClientMetricsResources).thenReturn(resources.asJava)
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handle(request, RequestLocal.noCaching)
|
||||
val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request)
|
||||
val expectedResponse = new ListClientMetricsResourcesResponseData().setClientMetricsResources(
|
||||
resources.map(resource => new ClientMetricsResource().setName(resource)).toBuffer.asJava)
|
||||
val response = verifyNoThrottling[ListConfigResourcesResponse](request)
|
||||
val expectedResponseData = new ListConfigResourcesResponseData()
|
||||
.setConfigResources(
|
||||
resources.stream.map(resource =>
|
||||
new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource)
|
||||
).collect(util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource]))
|
||||
assertEquals(expectedResponseData, response.data)
|
||||
|
||||
verify(metadataCache, never).getAllTopics
|
||||
verify(groupConfigManager, never).groupIds
|
||||
verify(metadataCache, never).getBrokerNodes(any)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testListConfigResourcesV1WithEmptyResourceTypes(): Unit = {
|
||||
val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()).build(1))
|
||||
metadataCache = mock(classOf[KRaftMetadataCache])
|
||||
|
||||
val clientMetrics = util.Set.of("client-metric1", "client-metric2")
|
||||
val topics = util.Set.of("topic1", "topic2")
|
||||
val groupIds = util.List.of("group1", "group2")
|
||||
val nodeIds = util.List.of(1, 2)
|
||||
when(clientMetricsManager.listClientMetricsResources).thenReturn(clientMetrics)
|
||||
when(metadataCache.getAllTopics).thenReturn(topics)
|
||||
when(groupConfigManager.groupIds).thenReturn(groupIds)
|
||||
when(metadataCache.getBrokerNodes(any())).thenReturn(
|
||||
nodeIds.stream().map(id => new Node(id, "localhost", 1234)).collect(java.util.stream.Collectors.toList()))
|
||||
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handle(request, RequestLocal.noCaching)
|
||||
val response = verifyNoThrottling[ListConfigResourcesResponse](request)
|
||||
val expectedResponseData = new ListConfigResourcesResponseData()
|
||||
.setConfigResources(
|
||||
util.stream.Stream.of(
|
||||
groupIds.stream().map(resource =>
|
||||
new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.GROUP.id)
|
||||
).toList,
|
||||
clientMetrics.stream.map(resource =>
|
||||
new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.CLIENT_METRICS.id)
|
||||
).toList,
|
||||
nodeIds.stream().map(resource =>
|
||||
new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource.toString).setResourceType(ConfigResource.Type.BROKER_LOGGER.id)
|
||||
).toList,
|
||||
nodeIds.stream().map(resource =>
|
||||
new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource.toString).setResourceType(ConfigResource.Type.BROKER.id)
|
||||
).toList,
|
||||
topics.stream().map(resource =>
|
||||
new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.TOPIC.id)
|
||||
).toList
|
||||
).flatMap(s => s.stream).collect(util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource]))
|
||||
assertEquals(expectedResponseData, response.data)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testListConfigResourcesV1WithGroup(): Unit = {
|
||||
val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()
|
||||
.setResourceTypes(util.List.of(ConfigResource.Type.GROUP.id))).build(1))
|
||||
metadataCache = mock(classOf[KRaftMetadataCache])
|
||||
|
||||
val groupIds = util.List.of("group1", "group2")
|
||||
when(groupConfigManager.groupIds).thenReturn(groupIds)
|
||||
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handle(request, RequestLocal.noCaching)
|
||||
val response = verifyNoThrottling[ListConfigResourcesResponse](request)
|
||||
val expectedResponseData = new ListConfigResourcesResponseData()
|
||||
.setConfigResources(
|
||||
groupIds.stream().map(resource =>
|
||||
new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.GROUP.id)
|
||||
).collect(util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource]))
|
||||
assertEquals(expectedResponseData, response.data)
|
||||
|
||||
verify(metadataCache, never).getAllTopics
|
||||
verify(clientMetricsManager, never).listClientMetricsResources
|
||||
verify(metadataCache, never).getBrokerNodes(any)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testListConfigResourcesV1WithClientMetrics(): Unit = {
|
||||
val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()
|
||||
.setResourceTypes(util.List.of(ConfigResource.Type.CLIENT_METRICS.id))).build(1))
|
||||
metadataCache = mock(classOf[KRaftMetadataCache])
|
||||
|
||||
val clientMetrics = util.Set.of("client-metric1", "client-metric2")
|
||||
when(clientMetricsManager.listClientMetricsResources).thenReturn(clientMetrics)
|
||||
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handle(request, RequestLocal.noCaching)
|
||||
val response = verifyNoThrottling[ListConfigResourcesResponse](request)
|
||||
val expectedResponseData = new ListConfigResourcesResponseData()
|
||||
.setConfigResources(
|
||||
clientMetrics.stream.map(resource =>
|
||||
new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.CLIENT_METRICS.id)
|
||||
).collect(util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource]))
|
||||
assertEquals(expectedResponseData, response.data)
|
||||
|
||||
verify(metadataCache, never).getAllTopics
|
||||
verify(groupConfigManager, never).groupIds
|
||||
verify(metadataCache, never).getBrokerNodes(any)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testListConfigResourcesV1WithBrokerLogger(): Unit = {
|
||||
val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()
|
||||
.setResourceTypes(util.List.of(ConfigResource.Type.BROKER_LOGGER.id))).build(1))
|
||||
metadataCache = mock(classOf[KRaftMetadataCache])
|
||||
|
||||
val nodeIds = util.List.of(1, 2)
|
||||
when(metadataCache.getBrokerNodes(any())).thenReturn(
|
||||
nodeIds.stream().map(id => new Node(id, "localhost", 1234)).collect(java.util.stream.Collectors.toList()))
|
||||
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handle(request, RequestLocal.noCaching)
|
||||
val response = verifyNoThrottling[ListConfigResourcesResponse](request)
|
||||
val expectedResponseData = new ListConfigResourcesResponseData()
|
||||
.setConfigResources(
|
||||
nodeIds.stream().map(resource =>
|
||||
new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource.toString).setResourceType(ConfigResource.Type.BROKER_LOGGER.id)
|
||||
).collect(java.util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource]))
|
||||
assertEquals(expectedResponseData, response.data)
|
||||
|
||||
verify(metadataCache, never).getAllTopics
|
||||
verify(groupConfigManager, never).groupIds
|
||||
verify(clientMetricsManager, never).listClientMetricsResources
|
||||
}
|
||||
|
||||
@Test
|
||||
def testListConfigResourcesV1WithBroker(): Unit = {
|
||||
val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()
|
||||
.setResourceTypes(util.List.of(ConfigResource.Type.BROKER.id))).build(1))
|
||||
metadataCache = mock(classOf[KRaftMetadataCache])
|
||||
|
||||
val nodeIds = util.List.of(1, 2)
|
||||
when(metadataCache.getBrokerNodes(any())).thenReturn(
|
||||
nodeIds.stream().map(id => new Node(id, "localhost", 1234)).collect(java.util.stream.Collectors.toList()))
|
||||
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handle(request, RequestLocal.noCaching)
|
||||
val response = verifyNoThrottling[ListConfigResourcesResponse](request)
|
||||
val expectedResponseData = new ListConfigResourcesResponseData()
|
||||
.setConfigResources(
|
||||
nodeIds.stream().map(resource =>
|
||||
new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource.toString).setResourceType(ConfigResource.Type.BROKER.id)
|
||||
).collect(java.util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource]))
|
||||
assertEquals(expectedResponseData, response.data)
|
||||
|
||||
verify(metadataCache, never).getAllTopics
|
||||
verify(groupConfigManager, never).groupIds
|
||||
verify(clientMetricsManager, never).listClientMetricsResources
|
||||
}
|
||||
|
||||
@Test
|
||||
def testListConfigResourcesV1WithTopic(): Unit = {
|
||||
val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()
|
||||
.setResourceTypes(util.List.of(ConfigResource.Type.TOPIC.id))).build(1))
|
||||
metadataCache = mock(classOf[KRaftMetadataCache])
|
||||
|
||||
val topics = util.Set.of("topic1", "topic2")
|
||||
when(metadataCache.getAllTopics).thenReturn(topics)
|
||||
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handle(request, RequestLocal.noCaching)
|
||||
val response = verifyNoThrottling[ListConfigResourcesResponse](request)
|
||||
val expectedResponseData = new ListConfigResourcesResponseData()
|
||||
.setConfigResources(
|
||||
topics.stream().map(resource =>
|
||||
new ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.TOPIC.id)
|
||||
).collect(java.util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource]))
|
||||
assertEquals(expectedResponseData, response.data)
|
||||
|
||||
verify(groupConfigManager, never).groupIds
|
||||
verify(clientMetricsManager, never).listClientMetricsResources
|
||||
verify(metadataCache, never).getBrokerNodes(any)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testListConfigResourcesEmptyResponse(): Unit = {
|
||||
val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()).build())
|
||||
metadataCache = mock(classOf[KRaftMetadataCache])
|
||||
|
||||
when(clientMetricsManager.listClientMetricsResources).thenReturn(util.Set.of)
|
||||
when(metadataCache.getAllTopics).thenReturn(util.Set.of)
|
||||
when(groupConfigManager.groupIds).thenReturn(util.List.of)
|
||||
when(metadataCache.getBrokerNodes(any())).thenReturn(util.List.of)
|
||||
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handle(request, RequestLocal.noCaching)
|
||||
val response = verifyNoThrottling[ListConfigResourcesResponse](request)
|
||||
val expectedResponse = new ListConfigResourcesResponseData()
|
||||
assertEquals(expectedResponse, response.data)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testListClientMetricsResourcesEmptyResponse(): Unit = {
|
||||
val request = buildRequest(new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData()).build())
|
||||
metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
|
||||
def testListConfigResourcesV1WithUnknown(): Unit = {
|
||||
val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()
|
||||
.setResourceTypes(util.List.of(ConfigResource.Type.UNKNOWN.id))).build(1))
|
||||
metadataCache = mock(classOf[KRaftMetadataCache])
|
||||
|
||||
val resources = new mutable.HashSet[String]
|
||||
when(clientMetricsManager.listClientMetricsResources).thenReturn(resources.asJava)
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handle(request, RequestLocal.noCaching)
|
||||
val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request)
|
||||
val expectedResponse = new ListClientMetricsResourcesResponseData()
|
||||
assertEquals(expectedResponse, response.data)
|
||||
val response = verifyNoThrottling[ListConfigResourcesResponse](request)
|
||||
assertEquals(Errors.UNSUPPORTED_VERSION.code(), response.data.errorCode())
|
||||
|
||||
verify(metadataCache, never).getAllTopics
|
||||
verify(groupConfigManager, never).groupIds
|
||||
verify(clientMetricsManager, never).listClientMetricsResources
|
||||
verify(metadataCache, never).getBrokerNodes(any)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testListClientMetricsResourcesWithException(): Unit = {
|
||||
val request = buildRequest(new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData()).build())
|
||||
def testListConfigResourcesWithException(): Unit = {
|
||||
val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()).build())
|
||||
metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
|
||||
|
||||
when(clientMetricsManager.listClientMetricsResources).thenThrow(new RuntimeException("test"))
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handle(request, RequestLocal.noCaching)
|
||||
val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request)
|
||||
val response = verifyNoThrottling[ListConfigResourcesResponse](request)
|
||||
|
||||
val expectedResponse = new ListClientMetricsResourcesResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code)
|
||||
val expectedResponse = new ListConfigResourcesResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code)
|
||||
assertEquals(expectedResponse, response.data)
|
||||
}
|
||||
|
||||
|
|
|
@ -641,8 +641,8 @@ class RequestQuotaTest extends BaseRequestTest {
|
|||
case ApiKeys.ASSIGN_REPLICAS_TO_DIRS =>
|
||||
new AssignReplicasToDirsRequest.Builder(new AssignReplicasToDirsRequestData())
|
||||
|
||||
case ApiKeys.LIST_CLIENT_METRICS_RESOURCES =>
|
||||
new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData())
|
||||
case ApiKeys.LIST_CONFIG_RESOURCES =>
|
||||
new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData())
|
||||
|
||||
case ApiKeys.DESCRIBE_TOPIC_PARTITIONS =>
|
||||
new DescribeTopicPartitionsRequest.Builder(new DescribeTopicPartitionsRequestData())
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.InvalidConfigurationException;
|
|||
import org.apache.kafka.common.errors.InvalidRequestException;
|
||||
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
|
@ -68,6 +69,10 @@ public class GroupConfigManager implements AutoCloseable {
|
|||
return Optional.ofNullable(configMap.get(groupId));
|
||||
}
|
||||
|
||||
public List<String> groupIds() {
|
||||
return List.copyOf(configMap.keySet());
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate the given properties.
|
||||
*
|
||||
|
|
|
@ -130,8 +130,8 @@ import org.apache.kafka.common.message.JoinGroupRequestDataJsonConverter;
|
|||
import org.apache.kafka.common.message.JoinGroupResponseDataJsonConverter;
|
||||
import org.apache.kafka.common.message.LeaveGroupRequestDataJsonConverter;
|
||||
import org.apache.kafka.common.message.LeaveGroupResponseDataJsonConverter;
|
||||
import org.apache.kafka.common.message.ListClientMetricsResourcesRequestDataJsonConverter;
|
||||
import org.apache.kafka.common.message.ListClientMetricsResourcesResponseDataJsonConverter;
|
||||
import org.apache.kafka.common.message.ListConfigResourcesRequestDataJsonConverter;
|
||||
import org.apache.kafka.common.message.ListConfigResourcesResponseDataJsonConverter;
|
||||
import org.apache.kafka.common.message.ListGroupsRequestDataJsonConverter;
|
||||
import org.apache.kafka.common.message.ListGroupsResponseDataJsonConverter;
|
||||
import org.apache.kafka.common.message.ListOffsetsRequestDataJsonConverter;
|
||||
|
@ -312,8 +312,8 @@ import org.apache.kafka.common.requests.JoinGroupRequest;
|
|||
import org.apache.kafka.common.requests.JoinGroupResponse;
|
||||
import org.apache.kafka.common.requests.LeaveGroupRequest;
|
||||
import org.apache.kafka.common.requests.LeaveGroupResponse;
|
||||
import org.apache.kafka.common.requests.ListClientMetricsResourcesRequest;
|
||||
import org.apache.kafka.common.requests.ListClientMetricsResourcesResponse;
|
||||
import org.apache.kafka.common.requests.ListConfigResourcesRequest;
|
||||
import org.apache.kafka.common.requests.ListConfigResourcesResponse;
|
||||
import org.apache.kafka.common.requests.ListGroupsRequest;
|
||||
import org.apache.kafka.common.requests.ListGroupsResponse;
|
||||
import org.apache.kafka.common.requests.ListOffsetsRequest;
|
||||
|
@ -507,8 +507,8 @@ public class RequestConvertToJson {
|
|||
return JoinGroupRequestDataJsonConverter.write(((JoinGroupRequest) request).data(), request.version());
|
||||
case LEAVE_GROUP:
|
||||
return LeaveGroupRequestDataJsonConverter.write(((LeaveGroupRequest) request).data(), request.version());
|
||||
case LIST_CLIENT_METRICS_RESOURCES:
|
||||
return ListClientMetricsResourcesRequestDataJsonConverter.write(((ListClientMetricsResourcesRequest) request).data(), request.version());
|
||||
case LIST_CONFIG_RESOURCES:
|
||||
return ListConfigResourcesRequestDataJsonConverter.write(((ListConfigResourcesRequest) request).data(), request.version());
|
||||
case LIST_GROUPS:
|
||||
return ListGroupsRequestDataJsonConverter.write(((ListGroupsRequest) request).data(), request.version());
|
||||
case LIST_OFFSETS:
|
||||
|
@ -693,8 +693,8 @@ public class RequestConvertToJson {
|
|||
return JoinGroupResponseDataJsonConverter.write(((JoinGroupResponse) response).data(), version);
|
||||
case LEAVE_GROUP:
|
||||
return LeaveGroupResponseDataJsonConverter.write(((LeaveGroupResponse) response).data(), version);
|
||||
case LIST_CLIENT_METRICS_RESOURCES:
|
||||
return ListClientMetricsResourcesResponseDataJsonConverter.write(((ListClientMetricsResourcesResponse) response).data(), version);
|
||||
case LIST_CONFIG_RESOURCES:
|
||||
return ListConfigResourcesResponseDataJsonConverter.write(((ListConfigResourcesResponse) response).data(), version);
|
||||
case LIST_GROUPS:
|
||||
return ListGroupsResponseDataJsonConverter.write(((ListGroupsResponse) response).data(), version);
|
||||
case LIST_OFFSETS:
|
||||
|
|
Loading…
Reference in New Issue