mirror of https://github.com/apache/kafka.git
				
				
				
			KAFKA-17563 Move `RequestConvertToJson` to server module (#17223)
Reviewers: Chia-Ping Tsai <chia7712@apache.org>
This commit is contained in:
		
							parent
							
								
									f8acfa5257
								
							
						
					
					
						commit
						18340c9733
					
				|  | @ -99,4 +99,8 @@ | |||
|     <allow pkg="org.apache.kafka.server.authorizer" /> | ||||
|   </subpackage> | ||||
| 
 | ||||
|   <subpackage name="network"> | ||||
|     <allow pkg="com.fasterxml.jackson" /> | ||||
|   </subpackage> | ||||
| 
 | ||||
| </import-control> | ||||
|  |  | |||
|  | @ -50,6 +50,7 @@ | |||
|     <!-- server tests --> | ||||
|     <suppress checks="MethodLength|JavaNCSS|NPath" files="DescribeTopicPartitionsRequestHandlerTest.java"/> | ||||
|     <suppress checks="CyclomaticComplexity" files="ListConsumerGroupTest.java"/> | ||||
|     <suppress checks="ClassFanOutComplexity|CyclomaticComplexity|MethodLength|ParameterNumber|JavaNCSS|ImportControl" files="RequestConvertToJson.java"/> | ||||
| 
 | ||||
|     <!-- Clients --> | ||||
|     <suppress id="dontUseSystemExit" | ||||
|  |  | |||
|  | @ -36,8 +36,10 @@ import org.apache.kafka.network.Session | |||
| import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics} | ||||
| import org.apache.kafka.server.common.RequestLocal | ||||
| import org.apache.kafka.server.metrics.KafkaMetricsGroup | ||||
| import org.apache.kafka.network.RequestConvertToJson | ||||
| 
 | ||||
| import scala.jdk.CollectionConverters._ | ||||
| import scala.compat.java8.OptionConverters._ | ||||
| import scala.reflect.ClassTag | ||||
| 
 | ||||
| object RequestChannel extends Logging { | ||||
|  | @ -249,7 +251,7 @@ object RequestChannel extends Logging { | |||
|       recordNetworkThreadTimeCallback.foreach(record => record(networkThreadTimeNanos)) | ||||
| 
 | ||||
|       if (isRequestLoggingEnabled) { | ||||
|         val desc = RequestConvertToJson.requestDescMetrics(header, requestLog, response.responseLog, | ||||
|         val desc = RequestConvertToJson.requestDescMetrics(header, requestLog.asJava, response.responseLog.asJava, | ||||
|           context, session, isForwarded, | ||||
|           totalTimeMs, requestQueueTimeMs, apiLocalTimeMs, | ||||
|           apiRemoteTimeMs, apiThrottleTimeMs, responseQueueTimeMs, | ||||
|  |  | |||
|  | @ -1,267 +0,0 @@ | |||
| /* | ||||
|  * Licensed to the Apache Software Foundation (ASF) under one or more | ||||
|  * contributor license agreements. See the NOTICE file distributed with | ||||
|  * this work for additional information regarding copyright ownership. | ||||
|  * The ASF licenses this file to You under the Apache License, Version 2.0 | ||||
|  * (the "License"); you may not use this file except in compliance with | ||||
|  * the License. You may obtain a copy of the License at | ||||
|  * | ||||
|  *    http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
| 
 | ||||
| package kafka.network | ||||
| 
 | ||||
| import com.fasterxml.jackson.databind.JsonNode | ||||
| import com.fasterxml.jackson.databind.node.{BooleanNode, DoubleNode, JsonNodeFactory, LongNode, ObjectNode, TextNode} | ||||
| import org.apache.kafka.common.message._ | ||||
| import org.apache.kafka.common.network.ClientInformation | ||||
| import org.apache.kafka.common.requests._ | ||||
| import org.apache.kafka.network.Session | ||||
| 
 | ||||
| object RequestConvertToJson { | ||||
|   def request(request: AbstractRequest): JsonNode = { | ||||
|     request match { | ||||
|       case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: AllocateProducerIdsRequest => AllocateProducerIdsRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: AlterPartitionRequest => AlterPartitionRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data, request.version) | ||||
|       case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: AssignReplicasToDirsRequest => AssignReplicasToDirsRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: BrokerHeartbeatRequest => BrokerHeartbeatRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: BrokerRegistrationRequest => BrokerRegistrationRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: ConsumerGroupDescribeRequest => ConsumerGroupDescribeRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: ConsumerGroupHeartbeatRequest => ConsumerGroupHeartbeatRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: ControllerRegistrationRequest => ControllerRegistrationRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: DeleteShareGroupStateRequest => DeleteShareGroupStateRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: DescribeClusterRequest => DescribeClusterRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: DescribeProducersRequest => DescribeProducersRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case res: DescribeTopicPartitionsRequest => DescribeTopicPartitionsRequestDataJsonConverter.write(res.data, request.version) | ||||
|       case req: DescribeTransactionsRequest => DescribeTransactionsRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case res: DescribeUserScramCredentialsRequest => DescribeUserScramCredentialsRequestDataJsonConverter.write(res.data, request.version) | ||||
|       case req: ElectLeadersRequest => ElectLeadersRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: EndQuorumEpochRequest => EndQuorumEpochRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: EndTxnRequest => EndTxnRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: EnvelopeRequest => EnvelopeRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: ExpireDelegationTokenRequest => ExpireDelegationTokenRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: FetchRequest => FetchRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: FetchSnapshotRequest => FetchSnapshotRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: FindCoordinatorRequest => FindCoordinatorRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: GetTelemetrySubscriptionsRequest => GetTelemetrySubscriptionsRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: HeartbeatRequest => HeartbeatRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: IncrementalAlterConfigsRequest => IncrementalAlterConfigsRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: InitializeShareGroupStateRequest => InitializeShareGroupStateRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: InitProducerIdRequest => InitProducerIdRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: JoinGroupRequest => JoinGroupRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: LeaderAndIsrRequest => LeaderAndIsrRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: LeaveGroupRequest => LeaveGroupRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: ListClientMetricsResourcesRequest => ListClientMetricsResourcesRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: ListGroupsRequest => ListGroupsRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: ListOffsetsRequest => ListOffsetsRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: ListPartitionReassignmentsRequest => ListPartitionReassignmentsRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: ListTransactionsRequest => ListTransactionsRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: MetadataRequest => MetadataRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: OffsetCommitRequest => OffsetCommitRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: OffsetDeleteRequest => OffsetDeleteRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: OffsetFetchRequest => OffsetFetchRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: OffsetsForLeaderEpochRequest => OffsetForLeaderEpochRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: ProduceRequest => ProduceRequestDataJsonConverter.write(req.data, request.version, false) | ||||
|       case req: PushTelemetryRequest => PushTelemetryRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: ReadShareGroupStateRequest => ReadShareGroupStateRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: ReadShareGroupStateSummaryRequest => ReadShareGroupStateSummaryRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: RenewDelegationTokenRequest => RenewDelegationTokenRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: SaslAuthenticateRequest => SaslAuthenticateRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: SaslHandshakeRequest => SaslHandshakeRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: ShareAcknowledgeRequest => ShareAcknowledgeRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: ShareFetchRequest => ShareFetchRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: ShareGroupDescribeRequest => ShareGroupDescribeRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: ShareGroupHeartbeatRequest => ShareGroupHeartbeatRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: StopReplicaRequest => StopReplicaRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: SyncGroupRequest => SyncGroupRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: TxnOffsetCommitRequest => TxnOffsetCommitRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: UnregisterBrokerRequest => UnregisterBrokerRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: UpdateFeaturesRequest => UpdateFeaturesRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: UpdateMetadataRequest => UpdateMetadataRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: VoteRequest => VoteRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: WriteShareGroupStateRequest => WriteShareGroupStateRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: WriteTxnMarkersRequest => WriteTxnMarkersRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: AddRaftVoterRequest => AddRaftVoterRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: RemoveRaftVoterRequest => RemoveRaftVoterRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case req: UpdateRaftVoterRequest => UpdateRaftVoterRequestDataJsonConverter.write(req.data, request.version) | ||||
|       case _ => throw new IllegalStateException(s"ApiKey ${request.apiKey} is not currently handled in `request`, the " + | ||||
|         "code should be updated to do so.") | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   def response(response: AbstractResponse, version: Short): JsonNode = { | ||||
|     response match { | ||||
|       case res: AddOffsetsToTxnResponse => AddOffsetsToTxnResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: AddPartitionsToTxnResponse => AddPartitionsToTxnResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: AllocateProducerIdsResponse => AllocateProducerIdsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: AlterClientQuotasResponse => AlterClientQuotasResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: AlterConfigsResponse => AlterConfigsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: AlterPartitionReassignmentsResponse => AlterPartitionReassignmentsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: AlterPartitionResponse => AlterPartitionResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: AlterReplicaLogDirsResponse => AlterReplicaLogDirsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: AlterUserScramCredentialsResponse => AlterUserScramCredentialsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: ApiVersionsResponse => ApiVersionsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: AssignReplicasToDirsResponse => AssignReplicasToDirsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: BeginQuorumEpochResponse => BeginQuorumEpochResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: BrokerHeartbeatResponse => BrokerHeartbeatResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: BrokerRegistrationResponse => BrokerRegistrationResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: ConsumerGroupDescribeResponse => ConsumerGroupDescribeResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: ConsumerGroupHeartbeatResponse => ConsumerGroupHeartbeatResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: ControlledShutdownResponse => ControlledShutdownResponseDataJsonConverter.write(res.data, version) | ||||
|       case req: ControllerRegistrationResponse => ControllerRegistrationResponseDataJsonConverter.write(req.data, version) | ||||
|       case res: CreateAclsResponse => CreateAclsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: CreateDelegationTokenResponse => CreateDelegationTokenResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: CreatePartitionsResponse => CreatePartitionsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: CreateTopicsResponse => CreateTopicsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: DeleteAclsResponse => DeleteAclsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: DeleteGroupsResponse => DeleteGroupsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: DeleteRecordsResponse => DeleteRecordsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: DeleteShareGroupStateResponse => DeleteShareGroupStateResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: DeleteTopicsResponse => DeleteTopicsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: DescribeAclsResponse => DescribeAclsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: DescribeClientQuotasResponse => DescribeClientQuotasResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: DescribeClusterResponse => DescribeClusterResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: DescribeConfigsResponse => DescribeConfigsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: DescribeDelegationTokenResponse => DescribeDelegationTokenResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: DescribeGroupsResponse => DescribeGroupsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: DescribeLogDirsResponse => DescribeLogDirsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: DescribeProducersResponse => DescribeProducersResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: DescribeQuorumResponse => DescribeQuorumResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: DescribeTopicPartitionsResponse => DescribeTopicPartitionsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: DescribeTransactionsResponse => DescribeTransactionsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: DescribeUserScramCredentialsResponse => DescribeUserScramCredentialsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: ElectLeadersResponse => ElectLeadersResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: EndQuorumEpochResponse => EndQuorumEpochResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: EndTxnResponse => EndTxnResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: EnvelopeResponse => EnvelopeResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: ExpireDelegationTokenResponse => ExpireDelegationTokenResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: FetchResponse => FetchResponseDataJsonConverter.write(res.data, version, false) | ||||
|       case res: FetchSnapshotResponse => FetchSnapshotResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: FindCoordinatorResponse => FindCoordinatorResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: GetTelemetrySubscriptionsResponse => GetTelemetrySubscriptionsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: HeartbeatResponse => HeartbeatResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: IncrementalAlterConfigsResponse => IncrementalAlterConfigsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: InitializeShareGroupStateResponse => InitializeShareGroupStateResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: InitProducerIdResponse => InitProducerIdResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: JoinGroupResponse => JoinGroupResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: LeaderAndIsrResponse => LeaderAndIsrResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: LeaveGroupResponse => LeaveGroupResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: ListClientMetricsResourcesResponse => ListClientMetricsResourcesResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: ListGroupsResponse => ListGroupsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: ListOffsetsResponse => ListOffsetsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: ListPartitionReassignmentsResponse => ListPartitionReassignmentsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: ListTransactionsResponse => ListTransactionsResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: MetadataResponse => MetadataResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: OffsetCommitResponse => OffsetCommitResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: OffsetDeleteResponse => OffsetDeleteResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: OffsetFetchResponse => OffsetFetchResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: OffsetsForLeaderEpochResponse => OffsetForLeaderEpochResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: ProduceResponse => ProduceResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: PushTelemetryResponse => PushTelemetryResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: ReadShareGroupStateResponse => ReadShareGroupStateResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: ReadShareGroupStateSummaryResponse => ReadShareGroupStateSummaryResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: RenewDelegationTokenResponse => RenewDelegationTokenResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: SaslAuthenticateResponse => SaslAuthenticateResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: SaslHandshakeResponse => SaslHandshakeResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: ShareAcknowledgeResponse => ShareAcknowledgeResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: ShareFetchResponse => ShareFetchResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: ShareGroupDescribeResponse => ShareGroupDescribeResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: ShareGroupHeartbeatResponse => ShareGroupHeartbeatResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: StopReplicaResponse => StopReplicaResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: SyncGroupResponse => SyncGroupResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: TxnOffsetCommitResponse => TxnOffsetCommitResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: UnregisterBrokerResponse => UnregisterBrokerResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: UpdateFeaturesResponse => UpdateFeaturesResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: UpdateMetadataResponse => UpdateMetadataResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: VoteResponse => VoteResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: WriteShareGroupStateResponse => WriteShareGroupStateResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: WriteTxnMarkersResponse => WriteTxnMarkersResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: AddRaftVoterResponse => AddRaftVoterResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: RemoveRaftVoterResponse => RemoveRaftVoterResponseDataJsonConverter.write(res.data, version) | ||||
|       case res: UpdateRaftVoterResponse => UpdateRaftVoterResponseDataJsonConverter.write(res.data, version) | ||||
|       case _ => throw new IllegalStateException(s"ApiKey ${response.apiKey} is not currently handled in `response`, the " + | ||||
|         "code should be updated to do so.") | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   def requestHeaderNode(header: RequestHeader): JsonNode = { | ||||
|     val node = RequestHeaderDataJsonConverter.write(header.data, header.headerVersion, false).asInstanceOf[ObjectNode] | ||||
|     node.set("requestApiKeyName", new TextNode(header.apiKey.toString)) | ||||
|     if (header.apiKey().isVersionDeprecated(header.apiVersion())) | ||||
|       node.set("requestApiVersionDeprecated", BooleanNode.TRUE) | ||||
|     node | ||||
|   } | ||||
| 
 | ||||
|   def requestDesc(header: RequestHeader, requestNode: Option[JsonNode], isForwarded: Boolean): JsonNode = { | ||||
|     val node = new ObjectNode(JsonNodeFactory.instance) | ||||
|     node.set("isForwarded", if (isForwarded) BooleanNode.TRUE else BooleanNode.FALSE) | ||||
|     node.set("requestHeader", requestHeaderNode(header)) | ||||
|     node.set("request", requestNode.getOrElse(new TextNode(""))) | ||||
|     node | ||||
|   } | ||||
| 
 | ||||
|   def clientInfoNode(clientInfo: ClientInformation): JsonNode = { | ||||
|     val node = new ObjectNode(JsonNodeFactory.instance) | ||||
|     node.set("softwareName", new TextNode(clientInfo.softwareName)) | ||||
|     node.set("softwareVersion", new TextNode(clientInfo.softwareVersion)) | ||||
|     node | ||||
|   } | ||||
| 
 | ||||
|   def requestDescMetrics(header: RequestHeader, requestNode: Option[JsonNode], responseNode: Option[JsonNode], | ||||
|                          context: RequestContext, session: Session, isForwarded: Boolean, | ||||
|                          totalTimeMs: Double, requestQueueTimeMs: Double, apiLocalTimeMs: Double, | ||||
|                          apiRemoteTimeMs: Double, apiThrottleTimeMs: Long, responseQueueTimeMs: Double, | ||||
|                          responseSendTimeMs: Double, temporaryMemoryBytes: Long, | ||||
|                          messageConversionsTimeMs: Double): JsonNode = { | ||||
|     val node = requestDesc(header, requestNode, isForwarded).asInstanceOf[ObjectNode] | ||||
|     node.set("response", responseNode.getOrElse(new TextNode(""))) | ||||
|     node.set("connection", new TextNode(context.connectionId)) | ||||
|     node.set("totalTimeMs", new DoubleNode(totalTimeMs)) | ||||
|     node.set("requestQueueTimeMs", new DoubleNode(requestQueueTimeMs)) | ||||
|     node.set("localTimeMs", new DoubleNode(apiLocalTimeMs)) | ||||
|     node.set("remoteTimeMs", new DoubleNode(apiRemoteTimeMs)) | ||||
|     node.set("throttleTimeMs", new LongNode(apiThrottleTimeMs)) | ||||
|     node.set("responseQueueTimeMs", new DoubleNode(responseQueueTimeMs)) | ||||
|     node.set("sendTimeMs", new DoubleNode(responseSendTimeMs)) | ||||
|     node.set("securityProtocol", new TextNode(context.securityProtocol.toString)) | ||||
|     node.set("principal", new TextNode(session.principal.toString)) | ||||
|     node.set("listener", new TextNode(context.listenerName.value)) | ||||
|     node.set("clientInformation", clientInfoNode(context.clientInformation)) | ||||
|     if (temporaryMemoryBytes > 0) | ||||
|       node.set("temporaryMemoryBytes", new LongNode(temporaryMemoryBytes)) | ||||
|     if (messageConversionsTimeMs > 0) | ||||
|       node.set("messageConversionsTime", new DoubleNode(messageConversionsTimeMs)) | ||||
|     node | ||||
|   } | ||||
| } | ||||
|  | @ -35,6 +35,7 @@ import org.apache.kafka.common.requests.AlterConfigsRequest._ | |||
| import org.apache.kafka.common.requests._ | ||||
| import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol} | ||||
| import org.apache.kafka.common.utils.{SecurityUtils, Utils} | ||||
| import org.apache.kafka.network.RequestConvertToJson | ||||
| import org.apache.kafka.network.metrics.RequestChannelMetrics | ||||
| import org.apache.kafka.test | ||||
| import org.junit.jupiter.api.Assertions._ | ||||
|  | @ -49,6 +50,7 @@ import java.nio.ByteBuffer | |||
| import java.util.Collections | ||||
| import java.util.concurrent.atomic.AtomicReference | ||||
| import scala.collection.{Map, Seq} | ||||
| import scala.compat.java8.OptionConverters._ | ||||
| import scala.jdk.CollectionConverters._ | ||||
| 
 | ||||
| class RequestChannelTest { | ||||
|  | @ -69,7 +71,7 @@ class RequestChannelTest { | |||
|       val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest] | ||||
|       val loggedConfig = loggableAlterConfigs.configs.get(resource) | ||||
|       assertEquals(expectedValues, toMap(loggedConfig)) | ||||
|       val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog, alterConfigs.isForwarded).toString | ||||
|       val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog.asJava, alterConfigs.isForwarded).toString | ||||
|       assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive config logged $alterConfigsDesc") | ||||
|     } | ||||
| 
 | ||||
|  | @ -133,7 +135,7 @@ class RequestChannelTest { | |||
|       val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[IncrementalAlterConfigsRequest] | ||||
|       val loggedConfig = loggableAlterConfigs.data.resources.find(resource.`type`.id, resource.name).configs | ||||
|       assertEquals(expectedValues, toMap(loggedConfig)) | ||||
|       val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog, alterConfigs.isForwarded).toString | ||||
|       val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog.asJava, alterConfigs.isForwarded).toString | ||||
|       assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive config logged $alterConfigsDesc") | ||||
|     } | ||||
| 
 | ||||
|  |  | |||
|  | @ -21,91 +21,22 @@ import java.net.InetAddress | |||
| import java.nio.ByteBuffer | ||||
| import com.fasterxml.jackson.databind.node.{BooleanNode, DoubleNode, JsonNodeFactory, LongNode, ObjectNode, TextNode} | ||||
| import kafka.network | ||||
| import kafka.network.RequestConvertToJson.requestHeaderNode | ||||
| import org.apache.kafka.common.memory.MemoryPool | ||||
| import org.apache.kafka.common.message._ | ||||
| import org.apache.kafka.common.network.{ClientInformation, ListenerName, NetworkSend} | ||||
| import org.apache.kafka.common.protocol.{ApiKeys, Errors, MessageUtil} | ||||
| import org.apache.kafka.common.requests._ | ||||
| import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} | ||||
| import org.apache.kafka.network.RequestConvertToJson | ||||
| import org.apache.kafka.network.metrics.RequestChannelMetrics | ||||
| import org.junit.jupiter.api.Assertions.assertEquals | ||||
| import org.junit.jupiter.api.Test | ||||
| import org.mockito.Mockito.mock | ||||
| 
 | ||||
| import java.util.Collections | ||||
| import scala.collection.mutable.ArrayBuffer | ||||
| import scala.compat.java8.OptionConverters._ | ||||
| 
 | ||||
| class RequestConvertToJsonTest { | ||||
| 
 | ||||
|   @Test | ||||
|   def testAllRequestTypesHandled(): Unit = { | ||||
|     val unhandledKeys = ArrayBuffer[String]() | ||||
|     ApiKeys.values().foreach { key => { | ||||
|       val version: Short = key.latestVersion() | ||||
|       val message = key match { | ||||
|         case ApiKeys.DESCRIBE_ACLS => | ||||
|           ApiMessageType.fromApiKey(key.id).newRequest().asInstanceOf[DescribeAclsRequestData] | ||||
|             .setPatternTypeFilter(1).setResourceTypeFilter(1).setPermissionType(1).setOperation(1) | ||||
|         case _ => | ||||
|           ApiMessageType.fromApiKey(key.id).newRequest() | ||||
|       } | ||||
| 
 | ||||
|       val bytes = MessageUtil.toByteBuffer(message, version) | ||||
|       val req = AbstractRequest.parseRequest(key, version, bytes).request | ||||
|       try { | ||||
|         RequestConvertToJson.request(req) | ||||
|       } catch { | ||||
|         case _ : IllegalStateException => unhandledKeys += key.toString | ||||
|       } | ||||
|     }} | ||||
|     assertEquals(ArrayBuffer.empty, unhandledKeys, "Unhandled request keys") | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   def testAllApiVersionsResponseHandled(): Unit = { | ||||
| 
 | ||||
|     ApiKeys.values().foreach { key => { | ||||
|       val unhandledVersions = ArrayBuffer[java.lang.Short]() | ||||
|       key.allVersions().forEach { version => { | ||||
|         val message = key match { | ||||
|           // Specify top-level error handling for verifying compatibility across versions | ||||
|           case ApiKeys.DESCRIBE_LOG_DIRS => | ||||
|             ApiMessageType.fromApiKey(key.id).newResponse().asInstanceOf[DescribeLogDirsResponseData] | ||||
|               .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()) | ||||
|           case _ => | ||||
|             ApiMessageType.fromApiKey(key.id).newResponse() | ||||
|         } | ||||
| 
 | ||||
|         val bytes = MessageUtil.toByteBuffer(message, version) | ||||
|         val response = AbstractResponse.parseResponse(key, bytes, version) | ||||
|         try { | ||||
|           RequestConvertToJson.response(response, version) | ||||
|         } catch { | ||||
|           case _ : IllegalStateException => unhandledVersions += version | ||||
|         }} | ||||
|       } | ||||
|       assertEquals(ArrayBuffer.empty, unhandledVersions, s"API: ${key.toString} - Unhandled request versions") | ||||
|     }} | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   def testAllResponseTypesHandled(): Unit = { | ||||
|     val unhandledKeys = ArrayBuffer[String]() | ||||
|     ApiKeys.values().foreach { key => { | ||||
|       val version: Short = key.latestVersion() | ||||
|       val message = ApiMessageType.fromApiKey(key.id).newResponse() | ||||
|       val bytes = MessageUtil.toByteBuffer(message, version) | ||||
|       val res = AbstractResponse.parseResponse(key, bytes, version) | ||||
|       try { | ||||
|         RequestConvertToJson.response(res, version) | ||||
|       } catch { | ||||
|         case _ : IllegalStateException => unhandledKeys += key.toString | ||||
|       } | ||||
|     }} | ||||
|     assertEquals(ArrayBuffer.empty, unhandledKeys, "Unhandled response keys") | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   def testRequestHeaderNode(): Unit = { | ||||
|     val alterIsrRequest = new AlterPartitionRequest(new AlterPartitionRequestData(), 0) | ||||
|  | @ -135,19 +66,6 @@ class RequestConvertToJsonTest { | |||
|     assertEquals(expectedNode, actualNode) | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   def testClientInfoNode(): Unit = { | ||||
|     val clientInfo = new ClientInformation("name", "1") | ||||
| 
 | ||||
|     val expectedNode = new ObjectNode(JsonNodeFactory.instance) | ||||
|     expectedNode.set("softwareName", new TextNode(clientInfo.softwareName)) | ||||
|     expectedNode.set("softwareVersion", new TextNode(clientInfo.softwareVersion)) | ||||
| 
 | ||||
|     val actualNode = RequestConvertToJson.clientInfoNode(clientInfo) | ||||
| 
 | ||||
|     assertEquals(expectedNode, actualNode) | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   def testRequestDesc(): Unit = { | ||||
|     val alterIsrRequest = new AlterPartitionRequest(new AlterPartitionRequestData(), 0) | ||||
|  | @ -155,10 +73,10 @@ class RequestConvertToJsonTest { | |||
| 
 | ||||
|     val expectedNode = new ObjectNode(JsonNodeFactory.instance) | ||||
|     expectedNode.set("isForwarded", if (req.isForwarded) BooleanNode.TRUE else BooleanNode.FALSE) | ||||
|     expectedNode.set("requestHeader", requestHeaderNode(req.header)) | ||||
|     expectedNode.set("requestHeader", RequestConvertToJson.requestHeaderNode(req.header)) | ||||
|     expectedNode.set("request", req.requestLog.getOrElse(new TextNode(""))) | ||||
| 
 | ||||
|     val actualNode = RequestConvertToJson.requestDesc(req.header, req.requestLog, req.isForwarded) | ||||
|     val actualNode = RequestConvertToJson.requestDesc(req.header, req.requestLog.asJava, req.isForwarded) | ||||
| 
 | ||||
|     assertEquals(expectedNode, actualNode) | ||||
|   } | ||||
|  | @ -181,7 +99,7 @@ class RequestConvertToJsonTest { | |||
|     val temporaryMemoryBytes = 8 | ||||
|     val messageConversionsTimeMs = 9 | ||||
| 
 | ||||
|     val expectedNode = RequestConvertToJson.requestDesc(req.header, req.requestLog, req.isForwarded).asInstanceOf[ObjectNode] | ||||
|     val expectedNode = RequestConvertToJson.requestDesc(req.header, req.requestLog.asJava, req.isForwarded).asInstanceOf[ObjectNode] | ||||
|     expectedNode.set("response", res.responseLog.getOrElse(new TextNode(""))) | ||||
|     expectedNode.set("connection", new TextNode(req.context.connectionId)) | ||||
|     expectedNode.set("totalTimeMs", new DoubleNode(totalTimeMs)) | ||||
|  | @ -198,7 +116,7 @@ class RequestConvertToJsonTest { | |||
|     expectedNode.set("temporaryMemoryBytes", new LongNode(temporaryMemoryBytes)) | ||||
|     expectedNode.set("messageConversionsTime", new DoubleNode(messageConversionsTimeMs)) | ||||
| 
 | ||||
|     val actualNode = RequestConvertToJson.requestDescMetrics(req.header, req.requestLog, res.responseLog, req.context, req.session, req.isForwarded, | ||||
|     val actualNode = RequestConvertToJson.requestDescMetrics(req.header, req.requestLog.asJava, res.responseLog.asJava, req.context, req.session, req.isForwarded, | ||||
|       totalTimeMs, requestQueueTimeMs, apiLocalTimeMs, apiRemoteTimeMs, apiThrottleTimeMs, responseQueueTimeMs, | ||||
|       responseSendTimeMs, temporaryMemoryBytes, messageConversionsTimeMs).asInstanceOf[ObjectNode] | ||||
| 
 | ||||
|  |  | |||
|  | @ -35,6 +35,7 @@ import org.apache.kafka.common.requests._ | |||
| import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} | ||||
| import org.apache.kafka.common.security.scram.internals.ScramMechanism | ||||
| import org.apache.kafka.common.utils._ | ||||
| import org.apache.kafka.network.RequestConvertToJson | ||||
| import org.apache.kafka.network.SocketServerConfigs | ||||
| import org.apache.kafka.network.metrics.RequestMetrics | ||||
| import org.apache.kafka.security.CredentialProvider | ||||
|  |  | |||
|  | @ -17,8 +17,6 @@ | |||
| 
 | ||||
| package org.apache.kafka.jmh.common; | ||||
| 
 | ||||
| import kafka.network.RequestConvertToJson; | ||||
| 
 | ||||
| import org.apache.kafka.common.TopicPartition; | ||||
| import org.apache.kafka.common.Uuid; | ||||
| import org.apache.kafka.common.network.Send; | ||||
|  | @ -27,6 +25,7 @@ import org.apache.kafka.common.requests.AbstractRequest; | |||
| import org.apache.kafka.common.requests.ByteBufferChannel; | ||||
| import org.apache.kafka.common.requests.FetchRequest; | ||||
| import org.apache.kafka.common.requests.RequestHeader; | ||||
| import org.apache.kafka.network.RequestConvertToJson; | ||||
| 
 | ||||
| import org.openjdk.jmh.annotations.Benchmark; | ||||
| import org.openjdk.jmh.annotations.BenchmarkMode; | ||||
|  |  | |||
|  | @ -17,13 +17,12 @@ | |||
| 
 | ||||
| package org.apache.kafka.jmh.common; | ||||
| 
 | ||||
| import kafka.network.RequestConvertToJson; | ||||
| 
 | ||||
| import org.apache.kafka.common.IsolationLevel; | ||||
| import org.apache.kafka.common.TopicPartition; | ||||
| import org.apache.kafka.common.message.ListOffsetsRequestData; | ||||
| import org.apache.kafka.common.protocol.ApiKeys; | ||||
| import org.apache.kafka.common.requests.ListOffsetsRequest; | ||||
| import org.apache.kafka.network.RequestConvertToJson; | ||||
| 
 | ||||
| import org.openjdk.jmh.annotations.Benchmark; | ||||
| import org.openjdk.jmh.annotations.BenchmarkMode; | ||||
|  |  | |||
|  | @ -17,11 +17,10 @@ | |||
| 
 | ||||
| package org.apache.kafka.jmh.common; | ||||
| 
 | ||||
| import kafka.network.RequestConvertToJson; | ||||
| 
 | ||||
| import org.apache.kafka.common.message.ProduceRequestData; | ||||
| import org.apache.kafka.common.protocol.ApiKeys; | ||||
| import org.apache.kafka.common.requests.ProduceRequest; | ||||
| import org.apache.kafka.network.RequestConvertToJson; | ||||
| 
 | ||||
| import org.openjdk.jmh.annotations.Benchmark; | ||||
| import org.openjdk.jmh.annotations.BenchmarkMode; | ||||
|  |  | |||
|  | @ -19,7 +19,6 @@ package org.apache.kafka.jmh.metadata; | |||
| 
 | ||||
| import kafka.coordinator.transaction.TransactionCoordinator; | ||||
| import kafka.network.RequestChannel; | ||||
| import kafka.network.RequestConvertToJson; | ||||
| import kafka.server.AutoTopicCreationManager; | ||||
| import kafka.server.ClientQuotaManager; | ||||
| import kafka.server.ClientRequestQuotaManager; | ||||
|  | @ -58,6 +57,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator; | |||
| import org.apache.kafka.image.MetadataDelta; | ||||
| import org.apache.kafka.image.MetadataImage; | ||||
| import org.apache.kafka.image.MetadataProvenance; | ||||
| import org.apache.kafka.network.RequestConvertToJson; | ||||
| import org.apache.kafka.network.metrics.RequestChannelMetrics; | ||||
| import org.apache.kafka.raft.QuorumConfig; | ||||
| import org.apache.kafka.server.common.FinalizedFeatures; | ||||
|  | @ -237,7 +237,9 @@ public class KRaftMetadataRequestBenchmark { | |||
| 
 | ||||
|     @Benchmark | ||||
|     public String testRequestToJson() { | ||||
|         return RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(), allTopicMetadataRequest.requestLog(), allTopicMetadataRequest.isForwarded()).toString(); | ||||
|         Option<com.fasterxml.jackson.databind.JsonNode> option = allTopicMetadataRequest.requestLog(); | ||||
|         Optional<com.fasterxml.jackson.databind.JsonNode> optional = option.isDefined() ? Optional.of(option.get()) : Optional.empty(); | ||||
|         return RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(), optional, allTopicMetadataRequest.isForwarded()).toString(); | ||||
|     } | ||||
| 
 | ||||
|     @Benchmark | ||||
|  |  | |||
|  | @ -20,7 +20,6 @@ package org.apache.kafka.jmh.metadata; | |||
| import kafka.controller.KafkaController; | ||||
| import kafka.coordinator.transaction.TransactionCoordinator; | ||||
| import kafka.network.RequestChannel; | ||||
| import kafka.network.RequestConvertToJson; | ||||
| import kafka.server.AutoTopicCreationManager; | ||||
| import kafka.server.BrokerFeatures; | ||||
| import kafka.server.ClientQuotaManager; | ||||
|  | @ -59,6 +58,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal; | |||
| import org.apache.kafka.common.security.auth.SecurityProtocol; | ||||
| import org.apache.kafka.common.utils.Time; | ||||
| import org.apache.kafka.coordinator.group.GroupCoordinator; | ||||
| import org.apache.kafka.network.RequestConvertToJson; | ||||
| import org.apache.kafka.network.metrics.RequestChannelMetrics; | ||||
| import org.apache.kafka.server.common.FinalizedFeatures; | ||||
| import org.apache.kafka.server.common.MetadataVersion; | ||||
|  | @ -237,7 +237,9 @@ public class MetadataRequestBenchmark { | |||
| 
 | ||||
|     @Benchmark | ||||
|     public String testRequestToJson() { | ||||
|         return RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(), allTopicMetadataRequest.requestLog(), allTopicMetadataRequest.isForwarded()).toString(); | ||||
|         Option<com.fasterxml.jackson.databind.JsonNode> option = allTopicMetadataRequest.requestLog(); | ||||
|         Optional<com.fasterxml.jackson.databind.JsonNode> optional = option.isDefined() ? Optional.of(option.get()) : Optional.empty(); | ||||
|         return RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(), optional, allTopicMetadataRequest.isForwarded()).toString(); | ||||
|     } | ||||
| 
 | ||||
|     @Benchmark | ||||
|  |  | |||
|  | @ -0,0 +1,812 @@ | |||
| /* | ||||
|  * Licensed to the Apache Software Foundation (ASF) under one or more | ||||
|  * contributor license agreements. See the NOTICE file distributed with | ||||
|  * this work for additional information regarding copyright ownership. | ||||
|  * The ASF licenses this file to You under the Apache License, Version 2.0 | ||||
|  * (the "License"); you may not use this file except in compliance with | ||||
|  * the License. You may obtain a copy of the License at | ||||
|  * | ||||
|  *    http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
| package org.apache.kafka.network; | ||||
| 
 | ||||
| import org.apache.kafka.common.message.AddOffsetsToTxnRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.AddOffsetsToTxnResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.AddPartitionsToTxnRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.AddPartitionsToTxnResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.AddRaftVoterRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.AddRaftVoterResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.AllocateProducerIdsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.AllocateProducerIdsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.AlterClientQuotasRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.AlterClientQuotasResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.AlterConfigsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.AlterConfigsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.AlterPartitionRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.AlterPartitionResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.AlterReplicaLogDirsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.AlterReplicaLogDirsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.AlterUserScramCredentialsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.AlterUserScramCredentialsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ApiVersionsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ApiVersionsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.AssignReplicasToDirsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.AssignReplicasToDirsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.BeginQuorumEpochRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.BeginQuorumEpochResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.BrokerHeartbeatRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.BrokerHeartbeatResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.BrokerRegistrationRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.BrokerRegistrationResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ConsumerGroupDescribeRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ConsumerGroupDescribeResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ControlledShutdownRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ControlledShutdownResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ControllerRegistrationRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ControllerRegistrationResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.CreateAclsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.CreateAclsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.CreateDelegationTokenRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.CreateDelegationTokenResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.CreatePartitionsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.CreatePartitionsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.CreateTopicsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.CreateTopicsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DeleteAclsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DeleteAclsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DeleteGroupsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DeleteGroupsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DeleteRecordsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DeleteRecordsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DeleteShareGroupStateRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DeleteShareGroupStateResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DeleteTopicsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DeleteTopicsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DescribeAclsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DescribeAclsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DescribeClientQuotasRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DescribeClientQuotasResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DescribeClusterRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DescribeClusterResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DescribeConfigsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DescribeConfigsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DescribeDelegationTokenRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DescribeDelegationTokenResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DescribeGroupsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DescribeGroupsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DescribeLogDirsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DescribeLogDirsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DescribeProducersRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DescribeProducersResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DescribeQuorumRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DescribeQuorumResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DescribeTopicPartitionsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DescribeTopicPartitionsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DescribeTransactionsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DescribeTransactionsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ElectLeadersRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ElectLeadersResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.EndQuorumEpochRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.EndQuorumEpochResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.EndTxnRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.EndTxnResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.EnvelopeRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.EnvelopeResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ExpireDelegationTokenRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ExpireDelegationTokenResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.FetchRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.FetchResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.FetchSnapshotRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.FetchSnapshotResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.FindCoordinatorRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.FindCoordinatorResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.HeartbeatRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.HeartbeatResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.IncrementalAlterConfigsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.IncrementalAlterConfigsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.InitProducerIdRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.InitProducerIdResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.InitializeShareGroupStateRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.InitializeShareGroupStateResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.JoinGroupRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.JoinGroupResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.LeaderAndIsrRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.LeaderAndIsrResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.LeaveGroupRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.LeaveGroupResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ListClientMetricsResourcesRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ListClientMetricsResourcesResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ListGroupsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ListGroupsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ListOffsetsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ListOffsetsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ListPartitionReassignmentsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ListPartitionReassignmentsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ListTransactionsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ListTransactionsResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.MetadataRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.MetadataResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.OffsetCommitRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.OffsetCommitResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.OffsetDeleteRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.OffsetDeleteResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.OffsetFetchRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.OffsetFetchResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.OffsetForLeaderEpochRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.OffsetForLeaderEpochResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ProduceRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ProduceResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.PushTelemetryRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.PushTelemetryResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ReadShareGroupStateRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ReadShareGroupStateResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.RemoveRaftVoterRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.RemoveRaftVoterResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.RenewDelegationTokenRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.RenewDelegationTokenResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.RequestHeaderDataJsonConverter; | ||||
| import org.apache.kafka.common.message.SaslAuthenticateRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.SaslAuthenticateResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.SaslHandshakeRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.SaslHandshakeResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ShareAcknowledgeRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ShareAcknowledgeResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ShareFetchRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ShareFetchResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ShareGroupDescribeRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ShareGroupDescribeResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ShareGroupHeartbeatRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.ShareGroupHeartbeatResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.StopReplicaRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.StopReplicaResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.SyncGroupRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.SyncGroupResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.TxnOffsetCommitRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.TxnOffsetCommitResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.UnregisterBrokerRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.UnregisterBrokerResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.UpdateFeaturesRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.UpdateFeaturesResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.UpdateMetadataRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.UpdateMetadataResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.UpdateRaftVoterRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.UpdateRaftVoterResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.VoteRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.VoteResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.WriteShareGroupStateRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.WriteShareGroupStateResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.message.WriteTxnMarkersRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.WriteTxnMarkersResponseDataJsonConverter; | ||||
| import org.apache.kafka.common.network.ClientInformation; | ||||
| import org.apache.kafka.common.requests.AbstractRequest; | ||||
| import org.apache.kafka.common.requests.AbstractResponse; | ||||
| import org.apache.kafka.common.requests.AddOffsetsToTxnRequest; | ||||
| import org.apache.kafka.common.requests.AddOffsetsToTxnResponse; | ||||
| import org.apache.kafka.common.requests.AddPartitionsToTxnRequest; | ||||
| import org.apache.kafka.common.requests.AddPartitionsToTxnResponse; | ||||
| import org.apache.kafka.common.requests.AddRaftVoterRequest; | ||||
| import org.apache.kafka.common.requests.AddRaftVoterResponse; | ||||
| import org.apache.kafka.common.requests.AllocateProducerIdsRequest; | ||||
| import org.apache.kafka.common.requests.AllocateProducerIdsResponse; | ||||
| import org.apache.kafka.common.requests.AlterClientQuotasRequest; | ||||
| import org.apache.kafka.common.requests.AlterClientQuotasResponse; | ||||
| import org.apache.kafka.common.requests.AlterConfigsRequest; | ||||
| import org.apache.kafka.common.requests.AlterConfigsResponse; | ||||
| import org.apache.kafka.common.requests.AlterPartitionReassignmentsRequest; | ||||
| import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse; | ||||
| import org.apache.kafka.common.requests.AlterPartitionRequest; | ||||
| import org.apache.kafka.common.requests.AlterPartitionResponse; | ||||
| import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest; | ||||
| import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse; | ||||
| import org.apache.kafka.common.requests.AlterUserScramCredentialsRequest; | ||||
| import org.apache.kafka.common.requests.AlterUserScramCredentialsResponse; | ||||
| import org.apache.kafka.common.requests.ApiVersionsRequest; | ||||
| import org.apache.kafka.common.requests.ApiVersionsResponse; | ||||
| import org.apache.kafka.common.requests.AssignReplicasToDirsRequest; | ||||
| import org.apache.kafka.common.requests.AssignReplicasToDirsResponse; | ||||
| import org.apache.kafka.common.requests.BeginQuorumEpochRequest; | ||||
| import org.apache.kafka.common.requests.BeginQuorumEpochResponse; | ||||
| import org.apache.kafka.common.requests.BrokerHeartbeatRequest; | ||||
| import org.apache.kafka.common.requests.BrokerHeartbeatResponse; | ||||
| import org.apache.kafka.common.requests.BrokerRegistrationRequest; | ||||
| import org.apache.kafka.common.requests.BrokerRegistrationResponse; | ||||
| import org.apache.kafka.common.requests.ConsumerGroupDescribeRequest; | ||||
| import org.apache.kafka.common.requests.ConsumerGroupDescribeResponse; | ||||
| import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; | ||||
| import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; | ||||
| import org.apache.kafka.common.requests.ControlledShutdownRequest; | ||||
| import org.apache.kafka.common.requests.ControlledShutdownResponse; | ||||
| import org.apache.kafka.common.requests.ControllerRegistrationRequest; | ||||
| import org.apache.kafka.common.requests.ControllerRegistrationResponse; | ||||
| import org.apache.kafka.common.requests.CreateAclsRequest; | ||||
| import org.apache.kafka.common.requests.CreateAclsResponse; | ||||
| import org.apache.kafka.common.requests.CreateDelegationTokenRequest; | ||||
| import org.apache.kafka.common.requests.CreateDelegationTokenResponse; | ||||
| import org.apache.kafka.common.requests.CreatePartitionsRequest; | ||||
| import org.apache.kafka.common.requests.CreatePartitionsResponse; | ||||
| import org.apache.kafka.common.requests.CreateTopicsRequest; | ||||
| import org.apache.kafka.common.requests.CreateTopicsResponse; | ||||
| import org.apache.kafka.common.requests.DeleteAclsRequest; | ||||
| import org.apache.kafka.common.requests.DeleteAclsResponse; | ||||
| import org.apache.kafka.common.requests.DeleteGroupsRequest; | ||||
| import org.apache.kafka.common.requests.DeleteGroupsResponse; | ||||
| import org.apache.kafka.common.requests.DeleteRecordsRequest; | ||||
| import org.apache.kafka.common.requests.DeleteRecordsResponse; | ||||
| import org.apache.kafka.common.requests.DeleteShareGroupStateRequest; | ||||
| import org.apache.kafka.common.requests.DeleteShareGroupStateResponse; | ||||
| import org.apache.kafka.common.requests.DeleteTopicsRequest; | ||||
| import org.apache.kafka.common.requests.DeleteTopicsResponse; | ||||
| import org.apache.kafka.common.requests.DescribeAclsRequest; | ||||
| import org.apache.kafka.common.requests.DescribeAclsResponse; | ||||
| import org.apache.kafka.common.requests.DescribeClientQuotasRequest; | ||||
| import org.apache.kafka.common.requests.DescribeClientQuotasResponse; | ||||
| import org.apache.kafka.common.requests.DescribeClusterRequest; | ||||
| import org.apache.kafka.common.requests.DescribeClusterResponse; | ||||
| import org.apache.kafka.common.requests.DescribeConfigsRequest; | ||||
| import org.apache.kafka.common.requests.DescribeConfigsResponse; | ||||
| import org.apache.kafka.common.requests.DescribeDelegationTokenRequest; | ||||
| import org.apache.kafka.common.requests.DescribeDelegationTokenResponse; | ||||
| import org.apache.kafka.common.requests.DescribeGroupsRequest; | ||||
| import org.apache.kafka.common.requests.DescribeGroupsResponse; | ||||
| import org.apache.kafka.common.requests.DescribeLogDirsRequest; | ||||
| import org.apache.kafka.common.requests.DescribeLogDirsResponse; | ||||
| import org.apache.kafka.common.requests.DescribeProducersRequest; | ||||
| import org.apache.kafka.common.requests.DescribeProducersResponse; | ||||
| import org.apache.kafka.common.requests.DescribeQuorumRequest; | ||||
| import org.apache.kafka.common.requests.DescribeQuorumResponse; | ||||
| import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest; | ||||
| import org.apache.kafka.common.requests.DescribeTopicPartitionsResponse; | ||||
| import org.apache.kafka.common.requests.DescribeTransactionsRequest; | ||||
| import org.apache.kafka.common.requests.DescribeTransactionsResponse; | ||||
| import org.apache.kafka.common.requests.DescribeUserScramCredentialsRequest; | ||||
| import org.apache.kafka.common.requests.DescribeUserScramCredentialsResponse; | ||||
| import org.apache.kafka.common.requests.ElectLeadersRequest; | ||||
| import org.apache.kafka.common.requests.ElectLeadersResponse; | ||||
| import org.apache.kafka.common.requests.EndQuorumEpochRequest; | ||||
| import org.apache.kafka.common.requests.EndQuorumEpochResponse; | ||||
| import org.apache.kafka.common.requests.EndTxnRequest; | ||||
| import org.apache.kafka.common.requests.EndTxnResponse; | ||||
| import org.apache.kafka.common.requests.EnvelopeRequest; | ||||
| import org.apache.kafka.common.requests.EnvelopeResponse; | ||||
| import org.apache.kafka.common.requests.ExpireDelegationTokenRequest; | ||||
| import org.apache.kafka.common.requests.ExpireDelegationTokenResponse; | ||||
| import org.apache.kafka.common.requests.FetchRequest; | ||||
| import org.apache.kafka.common.requests.FetchResponse; | ||||
| import org.apache.kafka.common.requests.FetchSnapshotRequest; | ||||
| import org.apache.kafka.common.requests.FetchSnapshotResponse; | ||||
| import org.apache.kafka.common.requests.FindCoordinatorRequest; | ||||
| import org.apache.kafka.common.requests.FindCoordinatorResponse; | ||||
| import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; | ||||
| import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; | ||||
| import org.apache.kafka.common.requests.HeartbeatRequest; | ||||
| import org.apache.kafka.common.requests.HeartbeatResponse; | ||||
| import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest; | ||||
| import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse; | ||||
| import org.apache.kafka.common.requests.InitProducerIdRequest; | ||||
| import org.apache.kafka.common.requests.InitProducerIdResponse; | ||||
| import org.apache.kafka.common.requests.InitializeShareGroupStateRequest; | ||||
| import org.apache.kafka.common.requests.InitializeShareGroupStateResponse; | ||||
| import org.apache.kafka.common.requests.JoinGroupRequest; | ||||
| import org.apache.kafka.common.requests.JoinGroupResponse; | ||||
| import org.apache.kafka.common.requests.LeaderAndIsrRequest; | ||||
| import org.apache.kafka.common.requests.LeaderAndIsrResponse; | ||||
| import org.apache.kafka.common.requests.LeaveGroupRequest; | ||||
| import org.apache.kafka.common.requests.LeaveGroupResponse; | ||||
| import org.apache.kafka.common.requests.ListClientMetricsResourcesRequest; | ||||
| import org.apache.kafka.common.requests.ListClientMetricsResourcesResponse; | ||||
| import org.apache.kafka.common.requests.ListGroupsRequest; | ||||
| import org.apache.kafka.common.requests.ListGroupsResponse; | ||||
| import org.apache.kafka.common.requests.ListOffsetsRequest; | ||||
| import org.apache.kafka.common.requests.ListOffsetsResponse; | ||||
| import org.apache.kafka.common.requests.ListPartitionReassignmentsRequest; | ||||
| import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse; | ||||
| import org.apache.kafka.common.requests.ListTransactionsRequest; | ||||
| import org.apache.kafka.common.requests.ListTransactionsResponse; | ||||
| import org.apache.kafka.common.requests.MetadataRequest; | ||||
| import org.apache.kafka.common.requests.MetadataResponse; | ||||
| import org.apache.kafka.common.requests.OffsetCommitRequest; | ||||
| import org.apache.kafka.common.requests.OffsetCommitResponse; | ||||
| import org.apache.kafka.common.requests.OffsetDeleteRequest; | ||||
| import org.apache.kafka.common.requests.OffsetDeleteResponse; | ||||
| import org.apache.kafka.common.requests.OffsetFetchRequest; | ||||
| import org.apache.kafka.common.requests.OffsetFetchResponse; | ||||
| import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest; | ||||
| import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; | ||||
| import org.apache.kafka.common.requests.ProduceRequest; | ||||
| import org.apache.kafka.common.requests.ProduceResponse; | ||||
| import org.apache.kafka.common.requests.PushTelemetryRequest; | ||||
| import org.apache.kafka.common.requests.PushTelemetryResponse; | ||||
| import org.apache.kafka.common.requests.ReadShareGroupStateRequest; | ||||
| import org.apache.kafka.common.requests.ReadShareGroupStateResponse; | ||||
| import org.apache.kafka.common.requests.ReadShareGroupStateSummaryRequest; | ||||
| import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse; | ||||
| import org.apache.kafka.common.requests.RemoveRaftVoterRequest; | ||||
| import org.apache.kafka.common.requests.RemoveRaftVoterResponse; | ||||
| import org.apache.kafka.common.requests.RenewDelegationTokenRequest; | ||||
| import org.apache.kafka.common.requests.RenewDelegationTokenResponse; | ||||
| import org.apache.kafka.common.requests.RequestContext; | ||||
| import org.apache.kafka.common.requests.RequestHeader; | ||||
| import org.apache.kafka.common.requests.SaslAuthenticateRequest; | ||||
| import org.apache.kafka.common.requests.SaslAuthenticateResponse; | ||||
| import org.apache.kafka.common.requests.SaslHandshakeRequest; | ||||
| import org.apache.kafka.common.requests.SaslHandshakeResponse; | ||||
| import org.apache.kafka.common.requests.ShareAcknowledgeRequest; | ||||
| import org.apache.kafka.common.requests.ShareAcknowledgeResponse; | ||||
| import org.apache.kafka.common.requests.ShareFetchRequest; | ||||
| import org.apache.kafka.common.requests.ShareFetchResponse; | ||||
| import org.apache.kafka.common.requests.ShareGroupDescribeRequest; | ||||
| import org.apache.kafka.common.requests.ShareGroupDescribeResponse; | ||||
| import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest; | ||||
| import org.apache.kafka.common.requests.ShareGroupHeartbeatResponse; | ||||
| import org.apache.kafka.common.requests.StopReplicaRequest; | ||||
| import org.apache.kafka.common.requests.StopReplicaResponse; | ||||
| import org.apache.kafka.common.requests.SyncGroupRequest; | ||||
| import org.apache.kafka.common.requests.SyncGroupResponse; | ||||
| import org.apache.kafka.common.requests.TxnOffsetCommitRequest; | ||||
| import org.apache.kafka.common.requests.TxnOffsetCommitResponse; | ||||
| import org.apache.kafka.common.requests.UnregisterBrokerRequest; | ||||
| import org.apache.kafka.common.requests.UnregisterBrokerResponse; | ||||
| import org.apache.kafka.common.requests.UpdateFeaturesRequest; | ||||
| import org.apache.kafka.common.requests.UpdateFeaturesResponse; | ||||
| import org.apache.kafka.common.requests.UpdateMetadataRequest; | ||||
| import org.apache.kafka.common.requests.UpdateMetadataResponse; | ||||
| import org.apache.kafka.common.requests.UpdateRaftVoterRequest; | ||||
| import org.apache.kafka.common.requests.UpdateRaftVoterResponse; | ||||
| import org.apache.kafka.common.requests.VoteRequest; | ||||
| import org.apache.kafka.common.requests.VoteResponse; | ||||
| import org.apache.kafka.common.requests.WriteShareGroupStateRequest; | ||||
| import org.apache.kafka.common.requests.WriteShareGroupStateResponse; | ||||
| import org.apache.kafka.common.requests.WriteTxnMarkersRequest; | ||||
| import org.apache.kafka.common.requests.WriteTxnMarkersResponse; | ||||
| 
 | ||||
| import com.fasterxml.jackson.databind.JsonNode; | ||||
| import com.fasterxml.jackson.databind.node.BooleanNode; | ||||
| import com.fasterxml.jackson.databind.node.DoubleNode; | ||||
| import com.fasterxml.jackson.databind.node.JsonNodeFactory; | ||||
| import com.fasterxml.jackson.databind.node.LongNode; | ||||
| import com.fasterxml.jackson.databind.node.ObjectNode; | ||||
| import com.fasterxml.jackson.databind.node.TextNode; | ||||
| 
 | ||||
| import java.util.Optional; | ||||
| 
 | ||||
| public class RequestConvertToJson { | ||||
| 
 | ||||
|     public static JsonNode request(AbstractRequest request) { | ||||
|         switch (request.apiKey()) { | ||||
|             case ADD_OFFSETS_TO_TXN: | ||||
|                 return AddOffsetsToTxnRequestDataJsonConverter.write(((AddOffsetsToTxnRequest) request).data(), request.version()); | ||||
|             case ADD_PARTITIONS_TO_TXN: | ||||
|                 return AddPartitionsToTxnRequestDataJsonConverter.write(((AddPartitionsToTxnRequest) request).data(), request.version()); | ||||
|             case ALLOCATE_PRODUCER_IDS: | ||||
|                 return AllocateProducerIdsRequestDataJsonConverter.write(((AllocateProducerIdsRequest) request).data(), request.version()); | ||||
|             case ALTER_CLIENT_QUOTAS: | ||||
|                 return AlterClientQuotasRequestDataJsonConverter.write(((AlterClientQuotasRequest) request).data(), request.version()); | ||||
|             case ALTER_CONFIGS: | ||||
|                 return AlterConfigsRequestDataJsonConverter.write(((AlterConfigsRequest) request).data(), request.version()); | ||||
|             case ALTER_PARTITION_REASSIGNMENTS: | ||||
|                 return AlterPartitionReassignmentsRequestDataJsonConverter.write(((AlterPartitionReassignmentsRequest) request).data(), request.version()); | ||||
|             case ALTER_PARTITION: | ||||
|                 return AlterPartitionRequestDataJsonConverter.write(((AlterPartitionRequest) request).data(), request.version()); | ||||
|             case ALTER_REPLICA_LOG_DIRS: | ||||
|                 return AlterReplicaLogDirsRequestDataJsonConverter.write(((AlterReplicaLogDirsRequest) request).data(), request.version()); | ||||
|             case ALTER_USER_SCRAM_CREDENTIALS: | ||||
|                 return AlterUserScramCredentialsRequestDataJsonConverter.write(((AlterUserScramCredentialsRequest) request).data(), request.version()); | ||||
|             case API_VERSIONS: | ||||
|                 return ApiVersionsRequestDataJsonConverter.write(((ApiVersionsRequest) request).data(), request.version()); | ||||
|             case ASSIGN_REPLICAS_TO_DIRS: | ||||
|                 return AssignReplicasToDirsRequestDataJsonConverter.write(((AssignReplicasToDirsRequest) request).data(), request.version()); | ||||
|             case BEGIN_QUORUM_EPOCH: | ||||
|                 return BeginQuorumEpochRequestDataJsonConverter.write(((BeginQuorumEpochRequest) request).data(), request.version()); | ||||
|             case BROKER_HEARTBEAT: | ||||
|                 return BrokerHeartbeatRequestDataJsonConverter.write(((BrokerHeartbeatRequest) request).data(), request.version()); | ||||
|             case BROKER_REGISTRATION: | ||||
|                 return BrokerRegistrationRequestDataJsonConverter.write(((BrokerRegistrationRequest) request).data(), request.version()); | ||||
|             case CONSUMER_GROUP_DESCRIBE: | ||||
|                 return ConsumerGroupDescribeRequestDataJsonConverter.write(((ConsumerGroupDescribeRequest) request).data(), request.version()); | ||||
|             case CONSUMER_GROUP_HEARTBEAT: | ||||
|                 return ConsumerGroupHeartbeatRequestDataJsonConverter.write(((ConsumerGroupHeartbeatRequest) request).data(), request.version()); | ||||
|             case CONTROLLED_SHUTDOWN: | ||||
|                 return ControlledShutdownRequestDataJsonConverter.write(((ControlledShutdownRequest) request).data(), request.version()); | ||||
|             case CONTROLLER_REGISTRATION: | ||||
|                 return ControllerRegistrationRequestDataJsonConverter.write(((ControllerRegistrationRequest) request).data(), request.version()); | ||||
|             case CREATE_ACLS: | ||||
|                 return CreateAclsRequestDataJsonConverter.write(((CreateAclsRequest) request).data(), request.version()); | ||||
|             case CREATE_DELEGATION_TOKEN: | ||||
|                 return CreateDelegationTokenRequestDataJsonConverter.write(((CreateDelegationTokenRequest) request).data(), request.version()); | ||||
|             case CREATE_PARTITIONS: | ||||
|                 return CreatePartitionsRequestDataJsonConverter.write(((CreatePartitionsRequest) request).data(), request.version()); | ||||
|             case CREATE_TOPICS: | ||||
|                 return CreateTopicsRequestDataJsonConverter.write(((CreateTopicsRequest) request).data(), request.version()); | ||||
|             case DELETE_ACLS: | ||||
|                 return DeleteAclsRequestDataJsonConverter.write(((DeleteAclsRequest) request).data(), request.version()); | ||||
|             case DELETE_GROUPS: | ||||
|                 return DeleteGroupsRequestDataJsonConverter.write(((DeleteGroupsRequest) request).data(), request.version()); | ||||
|             case DELETE_RECORDS: | ||||
|                 return DeleteRecordsRequestDataJsonConverter.write(((DeleteRecordsRequest) request).data(), request.version()); | ||||
|             case DELETE_SHARE_GROUP_STATE: | ||||
|                 return DeleteShareGroupStateRequestDataJsonConverter.write(((DeleteShareGroupStateRequest) request).data(), request.version()); | ||||
|             case DELETE_TOPICS: | ||||
|                 return DeleteTopicsRequestDataJsonConverter.write(((DeleteTopicsRequest) request).data(), request.version()); | ||||
|             case DESCRIBE_ACLS: | ||||
|                 return DescribeAclsRequestDataJsonConverter.write(((DescribeAclsRequest) request).data(), request.version()); | ||||
|             case DESCRIBE_CLIENT_QUOTAS: | ||||
|                 return DescribeClientQuotasRequestDataJsonConverter.write(((DescribeClientQuotasRequest) request).data(), request.version()); | ||||
|             case DESCRIBE_CLUSTER: | ||||
|                 return DescribeClusterRequestDataJsonConverter.write(((DescribeClusterRequest) request).data(), request.version()); | ||||
|             case DESCRIBE_CONFIGS: | ||||
|                 return DescribeConfigsRequestDataJsonConverter.write(((DescribeConfigsRequest) request).data(), request.version()); | ||||
|             case DESCRIBE_DELEGATION_TOKEN: | ||||
|                 return DescribeDelegationTokenRequestDataJsonConverter.write(((DescribeDelegationTokenRequest) request).data(), request.version()); | ||||
|             case DESCRIBE_GROUPS: | ||||
|                 return DescribeGroupsRequestDataJsonConverter.write(((DescribeGroupsRequest) request).data(), request.version()); | ||||
|             case DESCRIBE_LOG_DIRS: | ||||
|                 return DescribeLogDirsRequestDataJsonConverter.write(((DescribeLogDirsRequest) request).data(), request.version()); | ||||
|             case DESCRIBE_PRODUCERS: | ||||
|                 return DescribeProducersRequestDataJsonConverter.write(((DescribeProducersRequest) request).data(), request.version()); | ||||
|             case DESCRIBE_QUORUM: | ||||
|                 return DescribeQuorumRequestDataJsonConverter.write(((DescribeQuorumRequest) request).data(), request.version()); | ||||
|             case DESCRIBE_TOPIC_PARTITIONS: | ||||
|                 return DescribeTopicPartitionsRequestDataJsonConverter.write(((DescribeTopicPartitionsRequest) request).data(), request.version()); | ||||
|             case DESCRIBE_TRANSACTIONS: | ||||
|                 return DescribeTransactionsRequestDataJsonConverter.write(((DescribeTransactionsRequest) request).data(), request.version()); | ||||
|             case DESCRIBE_USER_SCRAM_CREDENTIALS: | ||||
|                 return DescribeUserScramCredentialsRequestDataJsonConverter.write(((DescribeUserScramCredentialsRequest) request).data(), request.version()); | ||||
|             case ELECT_LEADERS: | ||||
|                 return ElectLeadersRequestDataJsonConverter.write(((ElectLeadersRequest) request).data(), request.version()); | ||||
|             case END_QUORUM_EPOCH: | ||||
|                 return EndQuorumEpochRequestDataJsonConverter.write(((EndQuorumEpochRequest) request).data(), request.version()); | ||||
|             case END_TXN: | ||||
|                 return EndTxnRequestDataJsonConverter.write(((EndTxnRequest) request).data(), request.version()); | ||||
|             case ENVELOPE: | ||||
|                 return EnvelopeRequestDataJsonConverter.write(((EnvelopeRequest) request).data(), request.version()); | ||||
|             case EXPIRE_DELEGATION_TOKEN: | ||||
|                 return ExpireDelegationTokenRequestDataJsonConverter.write(((ExpireDelegationTokenRequest) request).data(), request.version()); | ||||
|             case FETCH: | ||||
|                 return FetchRequestDataJsonConverter.write(((FetchRequest) request).data(), request.version()); | ||||
|             case FETCH_SNAPSHOT: | ||||
|                 return FetchSnapshotRequestDataJsonConverter.write(((FetchSnapshotRequest) request).data(), request.version()); | ||||
|             case FIND_COORDINATOR: | ||||
|                 return FindCoordinatorRequestDataJsonConverter.write(((FindCoordinatorRequest) request).data(), request.version()); | ||||
|             case GET_TELEMETRY_SUBSCRIPTIONS: | ||||
|                 return GetTelemetrySubscriptionsRequestDataJsonConverter.write(((GetTelemetrySubscriptionsRequest) request).data(), request.version()); | ||||
|             case HEARTBEAT: | ||||
|                 return HeartbeatRequestDataJsonConverter.write(((HeartbeatRequest) request).data(), request.version()); | ||||
|             case INCREMENTAL_ALTER_CONFIGS: | ||||
|                 return IncrementalAlterConfigsRequestDataJsonConverter.write(((IncrementalAlterConfigsRequest) request).data(), request.version()); | ||||
|             case INITIALIZE_SHARE_GROUP_STATE: | ||||
|                 return InitializeShareGroupStateRequestDataJsonConverter.write(((InitializeShareGroupStateRequest) request).data(), request.version()); | ||||
|             case INIT_PRODUCER_ID: | ||||
|                 return InitProducerIdRequestDataJsonConverter.write(((InitProducerIdRequest) request).data(), request.version()); | ||||
|             case JOIN_GROUP: | ||||
|                 return JoinGroupRequestDataJsonConverter.write(((JoinGroupRequest) request).data(), request.version()); | ||||
|             case LEADER_AND_ISR: | ||||
|                 return LeaderAndIsrRequestDataJsonConverter.write(((LeaderAndIsrRequest) request).data(), request.version()); | ||||
|             case LEAVE_GROUP: | ||||
|                 return LeaveGroupRequestDataJsonConverter.write(((LeaveGroupRequest) request).data(), request.version()); | ||||
|             case LIST_CLIENT_METRICS_RESOURCES: | ||||
|                 return ListClientMetricsResourcesRequestDataJsonConverter.write(((ListClientMetricsResourcesRequest) request).data(), request.version()); | ||||
|             case LIST_GROUPS: | ||||
|                 return ListGroupsRequestDataJsonConverter.write(((ListGroupsRequest) request).data(), request.version()); | ||||
|             case LIST_OFFSETS: | ||||
|                 return ListOffsetsRequestDataJsonConverter.write(((ListOffsetsRequest) request).data(), request.version()); | ||||
|             case LIST_PARTITION_REASSIGNMENTS: | ||||
|                 return ListPartitionReassignmentsRequestDataJsonConverter.write(((ListPartitionReassignmentsRequest) request).data(), request.version()); | ||||
|             case LIST_TRANSACTIONS: | ||||
|                 return ListTransactionsRequestDataJsonConverter.write(((ListTransactionsRequest) request).data(), request.version()); | ||||
|             case METADATA: | ||||
|                 return MetadataRequestDataJsonConverter.write(((MetadataRequest) request).data(), request.version()); | ||||
|             case OFFSET_COMMIT: | ||||
|                 return OffsetCommitRequestDataJsonConverter.write(((OffsetCommitRequest) request).data(), request.version()); | ||||
|             case OFFSET_DELETE: | ||||
|                 return OffsetDeleteRequestDataJsonConverter.write(((OffsetDeleteRequest) request).data(), request.version()); | ||||
|             case OFFSET_FETCH: | ||||
|                 return OffsetFetchRequestDataJsonConverter.write(((OffsetFetchRequest) request).data(), request.version()); | ||||
|             case OFFSET_FOR_LEADER_EPOCH: | ||||
|                 return OffsetForLeaderEpochRequestDataJsonConverter.write(((OffsetsForLeaderEpochRequest) request).data(), request.version()); | ||||
|             case PRODUCE: | ||||
|                 return ProduceRequestDataJsonConverter.write(((ProduceRequest) request).data(), request.version(), false); | ||||
|             case PUSH_TELEMETRY: | ||||
|                 return PushTelemetryRequestDataJsonConverter.write(((PushTelemetryRequest) request).data(), request.version()); | ||||
|             case READ_SHARE_GROUP_STATE: | ||||
|                 return ReadShareGroupStateRequestDataJsonConverter.write(((ReadShareGroupStateRequest) request).data(), request.version()); | ||||
|             case READ_SHARE_GROUP_STATE_SUMMARY: | ||||
|                 return ReadShareGroupStateSummaryRequestDataJsonConverter.write(((ReadShareGroupStateSummaryRequest) request).data(), request.version()); | ||||
|             case RENEW_DELEGATION_TOKEN: | ||||
|                 return RenewDelegationTokenRequestDataJsonConverter.write(((RenewDelegationTokenRequest) request).data(), request.version()); | ||||
|             case SASL_AUTHENTICATE: | ||||
|                 return SaslAuthenticateRequestDataJsonConverter.write(((SaslAuthenticateRequest) request).data(), request.version()); | ||||
|             case SASL_HANDSHAKE: | ||||
|                 return SaslHandshakeRequestDataJsonConverter.write(((SaslHandshakeRequest) request).data(), request.version()); | ||||
|             case SHARE_ACKNOWLEDGE: | ||||
|                 return ShareAcknowledgeRequestDataJsonConverter.write(((ShareAcknowledgeRequest) request).data(), request.version()); | ||||
|             case SHARE_FETCH: | ||||
|                 return ShareFetchRequestDataJsonConverter.write(((ShareFetchRequest) request).data(), request.version()); | ||||
|             case SHARE_GROUP_DESCRIBE: | ||||
|                 return ShareGroupDescribeRequestDataJsonConverter.write(((ShareGroupDescribeRequest) request).data(), request.version()); | ||||
|             case SHARE_GROUP_HEARTBEAT: | ||||
|                 return ShareGroupHeartbeatRequestDataJsonConverter.write(((ShareGroupHeartbeatRequest) request).data(), request.version()); | ||||
|             case STOP_REPLICA: | ||||
|                 return StopReplicaRequestDataJsonConverter.write(((StopReplicaRequest) request).data(), request.version()); | ||||
|             case SYNC_GROUP: | ||||
|                 return SyncGroupRequestDataJsonConverter.write(((SyncGroupRequest) request).data(), request.version()); | ||||
|             case TXN_OFFSET_COMMIT: | ||||
|                 return TxnOffsetCommitRequestDataJsonConverter.write(((TxnOffsetCommitRequest) request).data(), request.version()); | ||||
|             case UNREGISTER_BROKER: | ||||
|                 return UnregisterBrokerRequestDataJsonConverter.write(((UnregisterBrokerRequest) request).data(), request.version()); | ||||
|             case UPDATE_FEATURES: | ||||
|                 return UpdateFeaturesRequestDataJsonConverter.write(((UpdateFeaturesRequest) request).data(), request.version()); | ||||
|             case UPDATE_METADATA: | ||||
|                 return UpdateMetadataRequestDataJsonConverter.write(((UpdateMetadataRequest) request).data(), request.version()); | ||||
|             case VOTE: | ||||
|                 return VoteRequestDataJsonConverter.write(((VoteRequest) request).data(), request.version()); | ||||
|             case WRITE_SHARE_GROUP_STATE: | ||||
|                 return WriteShareGroupStateRequestDataJsonConverter.write(((WriteShareGroupStateRequest) request).data(), request.version()); | ||||
|             case WRITE_TXN_MARKERS: | ||||
|                 return WriteTxnMarkersRequestDataJsonConverter.write(((WriteTxnMarkersRequest) request).data(), request.version()); | ||||
|             case ADD_RAFT_VOTER: | ||||
|                 return AddRaftVoterRequestDataJsonConverter.write(((AddRaftVoterRequest) request).data(), request.version()); | ||||
|             case REMOVE_RAFT_VOTER: | ||||
|                 return RemoveRaftVoterRequestDataJsonConverter.write(((RemoveRaftVoterRequest) request).data(), request.version()); | ||||
|             case UPDATE_RAFT_VOTER: | ||||
|                 return UpdateRaftVoterRequestDataJsonConverter.write(((UpdateRaftVoterRequest) request).data(), request.version()); | ||||
|             default: | ||||
|                 throw new IllegalStateException("ApiKey " + request.apiKey() + " is not currently handled in `request`, the " + | ||||
|                     "code should be updated to do so."); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     public static JsonNode response(AbstractResponse response, short version) { | ||||
|         switch (response.apiKey()) { | ||||
|             case ADD_OFFSETS_TO_TXN: | ||||
|                 return AddOffsetsToTxnResponseDataJsonConverter.write(((AddOffsetsToTxnResponse) response).data(), version); | ||||
|             case ADD_PARTITIONS_TO_TXN: | ||||
|                 return AddPartitionsToTxnResponseDataJsonConverter.write(((AddPartitionsToTxnResponse) response).data(), version); | ||||
|             case ALLOCATE_PRODUCER_IDS: | ||||
|                 return AllocateProducerIdsResponseDataJsonConverter.write(((AllocateProducerIdsResponse) response).data(), version); | ||||
|             case ALTER_CLIENT_QUOTAS: | ||||
|                 return AlterClientQuotasResponseDataJsonConverter.write(((AlterClientQuotasResponse) response).data(), version); | ||||
|             case ALTER_CONFIGS: | ||||
|                 return AlterConfigsResponseDataJsonConverter.write(((AlterConfigsResponse) response).data(), version); | ||||
|             case ALTER_PARTITION_REASSIGNMENTS: | ||||
|                 return AlterPartitionReassignmentsResponseDataJsonConverter.write(((AlterPartitionReassignmentsResponse) response).data(), version); | ||||
|             case ALTER_PARTITION: | ||||
|                 return AlterPartitionResponseDataJsonConverter.write(((AlterPartitionResponse) response).data(), version); | ||||
|             case ALTER_REPLICA_LOG_DIRS: | ||||
|                 return AlterReplicaLogDirsResponseDataJsonConverter.write(((AlterReplicaLogDirsResponse) response).data(), version); | ||||
|             case ALTER_USER_SCRAM_CREDENTIALS: | ||||
|                 return AlterUserScramCredentialsResponseDataJsonConverter.write(((AlterUserScramCredentialsResponse) response).data(), version); | ||||
|             case API_VERSIONS: | ||||
|                 return ApiVersionsResponseDataJsonConverter.write(((ApiVersionsResponse) response).data(), version); | ||||
|             case ASSIGN_REPLICAS_TO_DIRS: | ||||
|                 return AssignReplicasToDirsResponseDataJsonConverter.write(((AssignReplicasToDirsResponse) response).data(), version); | ||||
|             case BEGIN_QUORUM_EPOCH: | ||||
|                 return BeginQuorumEpochResponseDataJsonConverter.write(((BeginQuorumEpochResponse) response).data(), version); | ||||
|             case BROKER_HEARTBEAT: | ||||
|                 return BrokerHeartbeatResponseDataJsonConverter.write(((BrokerHeartbeatResponse) response).data(), version); | ||||
|             case BROKER_REGISTRATION: | ||||
|                 return BrokerRegistrationResponseDataJsonConverter.write(((BrokerRegistrationResponse) response).data(), version); | ||||
|             case CONSUMER_GROUP_DESCRIBE: | ||||
|                 return ConsumerGroupDescribeResponseDataJsonConverter.write(((ConsumerGroupDescribeResponse) response).data(), version); | ||||
|             case CONSUMER_GROUP_HEARTBEAT: | ||||
|                 return ConsumerGroupHeartbeatResponseDataJsonConverter.write(((ConsumerGroupHeartbeatResponse) response).data(), version); | ||||
|             case CONTROLLED_SHUTDOWN: | ||||
|                 return ControlledShutdownResponseDataJsonConverter.write(((ControlledShutdownResponse) response).data(), version); | ||||
|             case CONTROLLER_REGISTRATION: | ||||
|                 return ControllerRegistrationResponseDataJsonConverter.write(((ControllerRegistrationResponse) response).data(), version); | ||||
|             case CREATE_ACLS: | ||||
|                 return CreateAclsResponseDataJsonConverter.write(((CreateAclsResponse) response).data(), version); | ||||
|             case CREATE_DELEGATION_TOKEN: | ||||
|                 return CreateDelegationTokenResponseDataJsonConverter.write(((CreateDelegationTokenResponse) response).data(), version); | ||||
|             case CREATE_PARTITIONS: | ||||
|                 return CreatePartitionsResponseDataJsonConverter.write(((CreatePartitionsResponse) response).data(), version); | ||||
|             case CREATE_TOPICS: | ||||
|                 return CreateTopicsResponseDataJsonConverter.write(((CreateTopicsResponse) response).data(), version); | ||||
|             case DELETE_ACLS: | ||||
|                 return DeleteAclsResponseDataJsonConverter.write(((DeleteAclsResponse) response).data(), version); | ||||
|             case DELETE_GROUPS: | ||||
|                 return DeleteGroupsResponseDataJsonConverter.write(((DeleteGroupsResponse) response).data(), version); | ||||
|             case DELETE_RECORDS: | ||||
|                 return DeleteRecordsResponseDataJsonConverter.write(((DeleteRecordsResponse) response).data(), version); | ||||
|             case DELETE_SHARE_GROUP_STATE: | ||||
|                 return DeleteShareGroupStateResponseDataJsonConverter.write(((DeleteShareGroupStateResponse) response).data(), version); | ||||
|             case DELETE_TOPICS: | ||||
|                 return DeleteTopicsResponseDataJsonConverter.write(((DeleteTopicsResponse) response).data(), version); | ||||
|             case DESCRIBE_ACLS: | ||||
|                 return DescribeAclsResponseDataJsonConverter.write(((DescribeAclsResponse) response).data(), version); | ||||
|             case DESCRIBE_CLIENT_QUOTAS: | ||||
|                 return DescribeClientQuotasResponseDataJsonConverter.write(((DescribeClientQuotasResponse) response).data(), version); | ||||
|             case DESCRIBE_CLUSTER: | ||||
|                 return DescribeClusterResponseDataJsonConverter.write(((DescribeClusterResponse) response).data(), version); | ||||
|             case DESCRIBE_CONFIGS: | ||||
|                 return DescribeConfigsResponseDataJsonConverter.write(((DescribeConfigsResponse) response).data(), version); | ||||
|             case DESCRIBE_DELEGATION_TOKEN: | ||||
|                 return DescribeDelegationTokenResponseDataJsonConverter.write(((DescribeDelegationTokenResponse) response).data(), version); | ||||
|             case DESCRIBE_GROUPS: | ||||
|                 return DescribeGroupsResponseDataJsonConverter.write(((DescribeGroupsResponse) response).data(), version); | ||||
|             case DESCRIBE_LOG_DIRS: | ||||
|                 return DescribeLogDirsResponseDataJsonConverter.write(((DescribeLogDirsResponse) response).data(), version); | ||||
|             case DESCRIBE_PRODUCERS: | ||||
|                 return DescribeProducersResponseDataJsonConverter.write(((DescribeProducersResponse) response).data(), version); | ||||
|             case DESCRIBE_QUORUM: | ||||
|                 return DescribeQuorumResponseDataJsonConverter.write(((DescribeQuorumResponse) response).data(), version); | ||||
|             case DESCRIBE_TOPIC_PARTITIONS: | ||||
|                 return DescribeTopicPartitionsResponseDataJsonConverter.write(((DescribeTopicPartitionsResponse) response).data(), version); | ||||
|             case DESCRIBE_TRANSACTIONS: | ||||
|                 return DescribeTransactionsResponseDataJsonConverter.write(((DescribeTransactionsResponse) response).data(), version); | ||||
|             case DESCRIBE_USER_SCRAM_CREDENTIALS: | ||||
|                 return DescribeUserScramCredentialsResponseDataJsonConverter.write(((DescribeUserScramCredentialsResponse) response).data(), version); | ||||
|             case ELECT_LEADERS: | ||||
|                 return ElectLeadersResponseDataJsonConverter.write(((ElectLeadersResponse) response).data(), version); | ||||
|             case END_QUORUM_EPOCH: | ||||
|                 return EndQuorumEpochResponseDataJsonConverter.write(((EndQuorumEpochResponse) response).data(), version); | ||||
|             case END_TXN: | ||||
|                 return EndTxnResponseDataJsonConverter.write(((EndTxnResponse) response).data(), version); | ||||
|             case ENVELOPE: | ||||
|                 return EnvelopeResponseDataJsonConverter.write(((EnvelopeResponse) response).data(), version); | ||||
|             case EXPIRE_DELEGATION_TOKEN: | ||||
|                 return ExpireDelegationTokenResponseDataJsonConverter.write(((ExpireDelegationTokenResponse) response).data(), version); | ||||
|             case FETCH: | ||||
|                 return FetchResponseDataJsonConverter.write(((FetchResponse) response).data(), version, false); | ||||
|             case FETCH_SNAPSHOT: | ||||
|                 return FetchSnapshotResponseDataJsonConverter.write(((FetchSnapshotResponse) response).data(), version); | ||||
|             case FIND_COORDINATOR: | ||||
|                 return FindCoordinatorResponseDataJsonConverter.write(((FindCoordinatorResponse) response).data(), version); | ||||
|             case GET_TELEMETRY_SUBSCRIPTIONS: | ||||
|                 return GetTelemetrySubscriptionsResponseDataJsonConverter.write(((GetTelemetrySubscriptionsResponse) response).data(), version); | ||||
|             case HEARTBEAT: | ||||
|                 return HeartbeatResponseDataJsonConverter.write(((HeartbeatResponse) response).data(), version); | ||||
|             case INCREMENTAL_ALTER_CONFIGS: | ||||
|                 return IncrementalAlterConfigsResponseDataJsonConverter.write(((IncrementalAlterConfigsResponse) response).data(), version); | ||||
|             case INITIALIZE_SHARE_GROUP_STATE: | ||||
|                 return InitializeShareGroupStateResponseDataJsonConverter.write(((InitializeShareGroupStateResponse) response).data(), version); | ||||
|             case INIT_PRODUCER_ID: | ||||
|                 return InitProducerIdResponseDataJsonConverter.write(((InitProducerIdResponse) response).data(), version); | ||||
|             case JOIN_GROUP: | ||||
|                 return JoinGroupResponseDataJsonConverter.write(((JoinGroupResponse) response).data(), version); | ||||
|             case LEADER_AND_ISR: | ||||
|                 return LeaderAndIsrResponseDataJsonConverter.write(((LeaderAndIsrResponse) response).data(), version); | ||||
|             case LEAVE_GROUP: | ||||
|                 return LeaveGroupResponseDataJsonConverter.write(((LeaveGroupResponse) response).data(), version); | ||||
|             case LIST_CLIENT_METRICS_RESOURCES: | ||||
|                 return ListClientMetricsResourcesResponseDataJsonConverter.write(((ListClientMetricsResourcesResponse) response).data(), version); | ||||
|             case LIST_GROUPS: | ||||
|                 return ListGroupsResponseDataJsonConverter.write(((ListGroupsResponse) response).data(), version); | ||||
|             case LIST_OFFSETS: | ||||
|                 return ListOffsetsResponseDataJsonConverter.write(((ListOffsetsResponse) response).data(), version); | ||||
|             case LIST_PARTITION_REASSIGNMENTS: | ||||
|                 return ListPartitionReassignmentsResponseDataJsonConverter.write(((ListPartitionReassignmentsResponse) response).data(), version); | ||||
|             case LIST_TRANSACTIONS: | ||||
|                 return ListTransactionsResponseDataJsonConverter.write(((ListTransactionsResponse) response).data(), version); | ||||
|             case METADATA: | ||||
|                 return MetadataResponseDataJsonConverter.write(((MetadataResponse) response).data(), version); | ||||
|             case OFFSET_COMMIT: | ||||
|                 return OffsetCommitResponseDataJsonConverter.write(((OffsetCommitResponse) response).data(), version); | ||||
|             case OFFSET_DELETE: | ||||
|                 return OffsetDeleteResponseDataJsonConverter.write(((OffsetDeleteResponse) response).data(), version); | ||||
|             case OFFSET_FETCH: | ||||
|                 return OffsetFetchResponseDataJsonConverter.write(((OffsetFetchResponse) response).data(), version); | ||||
|             case OFFSET_FOR_LEADER_EPOCH: | ||||
|                 return OffsetForLeaderEpochResponseDataJsonConverter.write(((OffsetsForLeaderEpochResponse) response).data(), version); | ||||
|             case PRODUCE: | ||||
|                 return ProduceResponseDataJsonConverter.write(((ProduceResponse) response).data(), version); | ||||
|             case PUSH_TELEMETRY: | ||||
|                 return PushTelemetryResponseDataJsonConverter.write(((PushTelemetryResponse) response).data(), version); | ||||
|             case READ_SHARE_GROUP_STATE: | ||||
|                 return ReadShareGroupStateResponseDataJsonConverter.write(((ReadShareGroupStateResponse) response).data(), version); | ||||
|             case READ_SHARE_GROUP_STATE_SUMMARY: | ||||
|                 return ReadShareGroupStateSummaryResponseDataJsonConverter.write(((ReadShareGroupStateSummaryResponse) response).data(), version); | ||||
|             case RENEW_DELEGATION_TOKEN: | ||||
|                 return RenewDelegationTokenResponseDataJsonConverter.write(((RenewDelegationTokenResponse) response).data(), version); | ||||
|             case SASL_AUTHENTICATE: | ||||
|                 return SaslAuthenticateResponseDataJsonConverter.write(((SaslAuthenticateResponse) response).data(), version); | ||||
|             case SASL_HANDSHAKE: | ||||
|                 return SaslHandshakeResponseDataJsonConverter.write(((SaslHandshakeResponse) response).data(), version); | ||||
|             case SHARE_ACKNOWLEDGE: | ||||
|                 return ShareAcknowledgeResponseDataJsonConverter.write(((ShareAcknowledgeResponse) response).data(), version); | ||||
|             case SHARE_FETCH: | ||||
|                 return ShareFetchResponseDataJsonConverter.write(((ShareFetchResponse) response).data(), version); | ||||
|             case SHARE_GROUP_DESCRIBE: | ||||
|                 return ShareGroupDescribeResponseDataJsonConverter.write(((ShareGroupDescribeResponse) response).data(), version); | ||||
|             case SHARE_GROUP_HEARTBEAT: | ||||
|                 return ShareGroupHeartbeatResponseDataJsonConverter.write(((ShareGroupHeartbeatResponse) response).data(), version); | ||||
|             case STOP_REPLICA: | ||||
|                 return StopReplicaResponseDataJsonConverter.write(((StopReplicaResponse) response).data(), version); | ||||
|             case SYNC_GROUP: | ||||
|                 return SyncGroupResponseDataJsonConverter.write(((SyncGroupResponse) response).data(), version); | ||||
|             case TXN_OFFSET_COMMIT: | ||||
|                 return TxnOffsetCommitResponseDataJsonConverter.write(((TxnOffsetCommitResponse) response).data(), version); | ||||
|             case UNREGISTER_BROKER: | ||||
|                 return UnregisterBrokerResponseDataJsonConverter.write(((UnregisterBrokerResponse) response).data(), version); | ||||
|             case UPDATE_FEATURES: | ||||
|                 return UpdateFeaturesResponseDataJsonConverter.write(((UpdateFeaturesResponse) response).data(), version); | ||||
|             case UPDATE_METADATA: | ||||
|                 return UpdateMetadataResponseDataJsonConverter.write(((UpdateMetadataResponse) response).data(), version); | ||||
|             case VOTE: | ||||
|                 return VoteResponseDataJsonConverter.write(((VoteResponse) response).data(), version); | ||||
|             case WRITE_SHARE_GROUP_STATE: | ||||
|                 return WriteShareGroupStateResponseDataJsonConverter.write(((WriteShareGroupStateResponse) response).data(), version); | ||||
|             case WRITE_TXN_MARKERS: | ||||
|                 return WriteTxnMarkersResponseDataJsonConverter.write(((WriteTxnMarkersResponse) response).data(), version); | ||||
|             case ADD_RAFT_VOTER: | ||||
|                 return AddRaftVoterResponseDataJsonConverter.write(((AddRaftVoterResponse) response).data(), version); | ||||
|             case REMOVE_RAFT_VOTER: | ||||
|                 return RemoveRaftVoterResponseDataJsonConverter.write(((RemoveRaftVoterResponse) response).data(), version); | ||||
|             case UPDATE_RAFT_VOTER: | ||||
|                 return UpdateRaftVoterResponseDataJsonConverter.write(((UpdateRaftVoterResponse) response).data(), version); | ||||
|             default: | ||||
|                 throw new IllegalStateException("ApiKey " + response.apiKey() + " is not currently handled in `response`, the " + | ||||
|                     "code should be updated to do so."); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     public static JsonNode requestHeaderNode(RequestHeader header) { | ||||
|         ObjectNode node = (ObjectNode) RequestHeaderDataJsonConverter.write( | ||||
|             header.data(), header.headerVersion(), false | ||||
|         ); | ||||
|         node.set("requestApiKeyName", new TextNode(header.apiKey().toString())); | ||||
|         if (header.apiKey().isVersionDeprecated(header.apiVersion())) { | ||||
|             node.set("requestApiVersionDeprecated", BooleanNode.TRUE); | ||||
|         } | ||||
|         return node; | ||||
|     } | ||||
| 
 | ||||
|     public static JsonNode requestDesc(RequestHeader header, Optional<JsonNode> requestNode, boolean isForwarded) { | ||||
|         ObjectNode node = JsonNodeFactory.instance.objectNode(); | ||||
|         node.set("isForwarded", isForwarded ? BooleanNode.TRUE : BooleanNode.FALSE); | ||||
|         node.set("requestHeader", requestHeaderNode(header)); | ||||
|         node.set("request", requestNode.orElse(new TextNode(""))); | ||||
|         return node; | ||||
|     } | ||||
| 
 | ||||
|     public static JsonNode clientInfoNode(ClientInformation clientInfo) { | ||||
|         ObjectNode node = JsonNodeFactory.instance.objectNode(); | ||||
|         node.set("softwareName", new TextNode(clientInfo.softwareName())); | ||||
|         node.set("softwareVersion", new TextNode(clientInfo.softwareVersion())); | ||||
|         return node; | ||||
|     } | ||||
| 
 | ||||
|     public static JsonNode requestDescMetrics(RequestHeader header, Optional<JsonNode> requestNode, Optional<JsonNode> responseNode, | ||||
|                                               RequestContext context, Session session, boolean isForwarded, | ||||
|                                               double totalTimeMs, double requestQueueTimeMs, double apiLocalTimeMs, | ||||
|                                               double apiRemoteTimeMs, long apiThrottleTimeMs, double responseQueueTimeMs, | ||||
|                                               double responseSendTimeMs, long temporaryMemoryBytes, | ||||
|                                               double messageConversionsTimeMs) { | ||||
|         ObjectNode node = (ObjectNode) requestDesc(header, requestNode, isForwarded); | ||||
|         node.set("response", responseNode.orElse(new TextNode(""))); | ||||
|         node.set("connection", new TextNode(context.connectionId)); | ||||
|         node.set("totalTimeMs", new DoubleNode(totalTimeMs)); | ||||
|         node.set("requestQueueTimeMs", new DoubleNode(requestQueueTimeMs)); | ||||
|         node.set("localTimeMs", new DoubleNode(apiLocalTimeMs)); | ||||
|         node.set("remoteTimeMs", new DoubleNode(apiRemoteTimeMs)); | ||||
|         node.set("throttleTimeMs", new LongNode(apiThrottleTimeMs)); | ||||
|         node.set("responseQueueTimeMs", new DoubleNode(responseQueueTimeMs)); | ||||
|         node.set("sendTimeMs", new DoubleNode(responseSendTimeMs)); | ||||
|         node.set("securityProtocol", new TextNode(context.securityProtocol.toString())); | ||||
|         node.set("principal", new TextNode(session.principal.toString())); | ||||
|         node.set("listener", new TextNode(context.listenerName.value())); | ||||
|         node.set("clientInformation", clientInfoNode(context.clientInformation)); | ||||
|         if (temporaryMemoryBytes > 0) { | ||||
|             node.set("temporaryMemoryBytes", new LongNode(temporaryMemoryBytes)); | ||||
|         } | ||||
|         if (messageConversionsTimeMs > 0) { | ||||
|             node.set("messageConversionsTime", new DoubleNode(messageConversionsTimeMs)); | ||||
|         } | ||||
|         return node; | ||||
|     } | ||||
| } | ||||
|  | @ -0,0 +1,126 @@ | |||
| /* | ||||
|  * Licensed to the Apache Software Foundation (ASF) under one or more | ||||
|  * contributor license agreements. See the NOTICE file distributed with | ||||
|  * this work for additional information regarding copyright ownership. | ||||
|  * The ASF licenses this file to You under the Apache License, Version 2.0 | ||||
|  * (the "License"); you may not use this file except in compliance with | ||||
|  * the License. You may obtain a copy of the License at | ||||
|  * | ||||
|  *    http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
| package org.apache.kafka.network; | ||||
| 
 | ||||
| import org.apache.kafka.common.message.ApiMessageType; | ||||
| import org.apache.kafka.common.message.DescribeAclsRequestData; | ||||
| import org.apache.kafka.common.message.DescribeLogDirsResponseData; | ||||
| import org.apache.kafka.common.network.ClientInformation; | ||||
| import org.apache.kafka.common.protocol.ApiKeys; | ||||
| import org.apache.kafka.common.protocol.ApiMessage; | ||||
| import org.apache.kafka.common.protocol.Errors; | ||||
| import org.apache.kafka.common.protocol.MessageUtil; | ||||
| import org.apache.kafka.common.requests.AbstractRequest; | ||||
| import org.apache.kafka.common.requests.AbstractResponse; | ||||
| 
 | ||||
| import com.fasterxml.jackson.databind.JsonNode; | ||||
| import com.fasterxml.jackson.databind.node.JsonNodeFactory; | ||||
| import com.fasterxml.jackson.databind.node.ObjectNode; | ||||
| import com.fasterxml.jackson.databind.node.TextNode; | ||||
| 
 | ||||
| import org.junit.jupiter.api.Test; | ||||
| 
 | ||||
| import java.nio.ByteBuffer; | ||||
| import java.util.ArrayList; | ||||
| import java.util.Collections; | ||||
| import java.util.List; | ||||
| 
 | ||||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||||
| 
 | ||||
| public class RequestConvertToJsonTest { | ||||
| 
 | ||||
|     @Test | ||||
|     public void testAllRequestTypesHandled() { | ||||
|         List<String> unhandledKeys = new ArrayList<>(); | ||||
|         for (ApiKeys key : ApiKeys.values()) { | ||||
|             short version = key.latestVersion(); | ||||
|             ApiMessage message; | ||||
|             if (key == ApiKeys.DESCRIBE_ACLS) { | ||||
|                 message = ApiMessageType.fromApiKey(key.id).newRequest(); | ||||
|                 DescribeAclsRequestData requestData = (DescribeAclsRequestData) message; | ||||
|                 requestData.setPatternTypeFilter((byte) 1); | ||||
|                 requestData.setResourceTypeFilter((byte) 1); | ||||
|                 requestData.setPermissionType((byte) 1); | ||||
|                 requestData.setOperation((byte) 1); | ||||
|             } else { | ||||
|                 message = ApiMessageType.fromApiKey(key.id).newRequest(); | ||||
|             } | ||||
|             ByteBuffer bytes = MessageUtil.toByteBuffer(message, version); | ||||
|             AbstractRequest req = AbstractRequest.parseRequest(key, version, bytes).request; | ||||
|             try { | ||||
|                 RequestConvertToJson.request(req); | ||||
|             } catch (IllegalStateException e) { | ||||
|                 unhandledKeys.add(key.toString()); | ||||
|             } | ||||
|         } | ||||
|         assertEquals(Collections.emptyList(), unhandledKeys, "Unhandled request keys"); | ||||
|     } | ||||
| 
 | ||||
|     @Test | ||||
|     public void testAllApiVersionsResponseHandled() { | ||||
|         for (ApiKeys key : ApiKeys.values()) { | ||||
|             List<Short> unhandledVersions = new ArrayList<>(); | ||||
|             for (short version : key.allVersions()) { | ||||
|                 ApiMessage message; | ||||
|                 // Specify top-level error handling for verifying compatibility across versions | ||||
|                 if (key == ApiKeys.DESCRIBE_LOG_DIRS) { | ||||
|                     message = ApiMessageType.fromApiKey(key.id).newResponse(); | ||||
|                     DescribeLogDirsResponseData responseData = (DescribeLogDirsResponseData) message; | ||||
|                     responseData.setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()); | ||||
|                 } else { | ||||
|                     message = ApiMessageType.fromApiKey(key.id).newResponse(); | ||||
|                 } | ||||
| 
 | ||||
|                 ByteBuffer bytes = MessageUtil.toByteBuffer(message, version); | ||||
|                 AbstractResponse response = AbstractResponse.parseResponse(key, bytes, version); | ||||
|                 try { | ||||
|                     RequestConvertToJson.response(response, version); | ||||
|                 } catch (IllegalStateException e) { | ||||
|                     unhandledVersions.add(version); | ||||
|                 } | ||||
|             } | ||||
|             assertEquals(new ArrayList<>(), unhandledVersions, "API: " + key + " - Unhandled request versions"); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     @Test | ||||
|     public void testAllResponseTypesHandled() { | ||||
|         List<String> unhandledKeys = new ArrayList<>(); | ||||
|         for (ApiKeys key : ApiKeys.values()) { | ||||
|             short version = key.latestVersion(); | ||||
|             ApiMessage message = ApiMessageType.fromApiKey(key.id).newResponse(); | ||||
|             ByteBuffer bytes = MessageUtil.toByteBuffer(message, version); | ||||
|             AbstractResponse res = AbstractResponse.parseResponse(key, bytes, version); | ||||
|             try { | ||||
|                 RequestConvertToJson.response(res, version); | ||||
|             } catch (IllegalStateException e) { | ||||
|                 unhandledKeys.add(key.toString()); | ||||
|             } | ||||
|         } | ||||
|         assertEquals(Collections.emptyList(), unhandledKeys, "Unhandled response keys"); | ||||
|     } | ||||
| 
 | ||||
|     @Test | ||||
|     public void testClientInfoNode() { | ||||
|         ClientInformation clientInfo = new ClientInformation("name", "1"); | ||||
|         ObjectNode expectedNode = JsonNodeFactory.instance.objectNode(); | ||||
|         expectedNode.set("softwareName", new TextNode(clientInfo.softwareName())); | ||||
|         expectedNode.set("softwareVersion", new TextNode(clientInfo.softwareVersion())); | ||||
|         JsonNode actualNode = RequestConvertToJson.clientInfoNode(clientInfo); | ||||
|         assertEquals(expectedNode, actualNode); | ||||
|     } | ||||
| } | ||||
		Loading…
	
		Reference in New Issue