diff --git a/checkstyle/import-control-server.xml b/checkstyle/import-control-server.xml
index 22ab6a449e6..f046ceb74a6 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -99,4 +99,8 @@
+
+
+
+
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index b640b4ae7c5..14d0630d2e9 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -50,6 +50,7 @@
+
record(networkThreadTimeNanos))
if (isRequestLoggingEnabled) {
- val desc = RequestConvertToJson.requestDescMetrics(header, requestLog, response.responseLog,
+ val desc = RequestConvertToJson.requestDescMetrics(header, requestLog.asJava, response.responseLog.asJava,
context, session, isForwarded,
totalTimeMs, requestQueueTimeMs, apiLocalTimeMs,
apiRemoteTimeMs, apiThrottleTimeMs, responseQueueTimeMs,
diff --git a/core/src/main/scala/kafka/network/RequestConvertToJson.scala b/core/src/main/scala/kafka/network/RequestConvertToJson.scala
deleted file mode 100644
index a51894bd332..00000000000
--- a/core/src/main/scala/kafka/network/RequestConvertToJson.scala
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.network
-
-import com.fasterxml.jackson.databind.JsonNode
-import com.fasterxml.jackson.databind.node.{BooleanNode, DoubleNode, JsonNodeFactory, LongNode, ObjectNode, TextNode}
-import org.apache.kafka.common.message._
-import org.apache.kafka.common.network.ClientInformation
-import org.apache.kafka.common.requests._
-import org.apache.kafka.network.Session
-
-object RequestConvertToJson {
- def request(request: AbstractRequest): JsonNode = {
- request match {
- case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version)
- case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version)
- case req: AllocateProducerIdsRequest => AllocateProducerIdsRequestDataJsonConverter.write(req.data, request.version)
- case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version)
- case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data, request.version)
- case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data, request.version)
- case req: AlterPartitionRequest => AlterPartitionRequestDataJsonConverter.write(req.data, request.version)
- case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data, request.version)
- case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data, request.version)
- case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data, request.version)
- case req: AssignReplicasToDirsRequest => AssignReplicasToDirsRequestDataJsonConverter.write(req.data, request.version)
- case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version)
- case req: BrokerHeartbeatRequest => BrokerHeartbeatRequestDataJsonConverter.write(req.data, request.version)
- case req: BrokerRegistrationRequest => BrokerRegistrationRequestDataJsonConverter.write(req.data, request.version)
- case req: ConsumerGroupDescribeRequest => ConsumerGroupDescribeRequestDataJsonConverter.write(req.data, request.version)
- case req: ConsumerGroupHeartbeatRequest => ConsumerGroupHeartbeatRequestDataJsonConverter.write(req.data, request.version)
- case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data, request.version)
- case req: ControllerRegistrationRequest => ControllerRegistrationRequestDataJsonConverter.write(req.data, request.version)
- case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data, request.version)
- case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
- case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data, request.version)
- case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data, request.version)
- case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data, request.version)
- case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data, request.version)
- case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data, request.version)
- case req: DeleteShareGroupStateRequest => DeleteShareGroupStateRequestDataJsonConverter.write(req.data, request.version)
- case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data, request.version)
- case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data, request.version)
- case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data, request.version)
- case req: DescribeClusterRequest => DescribeClusterRequestDataJsonConverter.write(req.data, request.version)
- case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data, request.version)
- case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
- case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data, request.version)
- case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data, request.version)
- case req: DescribeProducersRequest => DescribeProducersRequestDataJsonConverter.write(req.data, request.version)
- case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version)
- case res: DescribeTopicPartitionsRequest => DescribeTopicPartitionsRequestDataJsonConverter.write(res.data, request.version)
- case req: DescribeTransactionsRequest => DescribeTransactionsRequestDataJsonConverter.write(req.data, request.version)
- case res: DescribeUserScramCredentialsRequest => DescribeUserScramCredentialsRequestDataJsonConverter.write(res.data, request.version)
- case req: ElectLeadersRequest => ElectLeadersRequestDataJsonConverter.write(req.data, request.version)
- case req: EndQuorumEpochRequest => EndQuorumEpochRequestDataJsonConverter.write(req.data, request.version)
- case req: EndTxnRequest => EndTxnRequestDataJsonConverter.write(req.data, request.version)
- case req: EnvelopeRequest => EnvelopeRequestDataJsonConverter.write(req.data, request.version)
- case req: ExpireDelegationTokenRequest => ExpireDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
- case req: FetchRequest => FetchRequestDataJsonConverter.write(req.data, request.version)
- case req: FetchSnapshotRequest => FetchSnapshotRequestDataJsonConverter.write(req.data, request.version)
- case req: FindCoordinatorRequest => FindCoordinatorRequestDataJsonConverter.write(req.data, request.version)
- case req: GetTelemetrySubscriptionsRequest => GetTelemetrySubscriptionsRequestDataJsonConverter.write(req.data, request.version)
- case req: HeartbeatRequest => HeartbeatRequestDataJsonConverter.write(req.data, request.version)
- case req: IncrementalAlterConfigsRequest => IncrementalAlterConfigsRequestDataJsonConverter.write(req.data, request.version)
- case req: InitializeShareGroupStateRequest => InitializeShareGroupStateRequestDataJsonConverter.write(req.data, request.version)
- case req: InitProducerIdRequest => InitProducerIdRequestDataJsonConverter.write(req.data, request.version)
- case req: JoinGroupRequest => JoinGroupRequestDataJsonConverter.write(req.data, request.version)
- case req: LeaderAndIsrRequest => LeaderAndIsrRequestDataJsonConverter.write(req.data, request.version)
- case req: LeaveGroupRequest => LeaveGroupRequestDataJsonConverter.write(req.data, request.version)
- case req: ListClientMetricsResourcesRequest => ListClientMetricsResourcesRequestDataJsonConverter.write(req.data, request.version)
- case req: ListGroupsRequest => ListGroupsRequestDataJsonConverter.write(req.data, request.version)
- case req: ListOffsetsRequest => ListOffsetsRequestDataJsonConverter.write(req.data, request.version)
- case req: ListPartitionReassignmentsRequest => ListPartitionReassignmentsRequestDataJsonConverter.write(req.data, request.version)
- case req: ListTransactionsRequest => ListTransactionsRequestDataJsonConverter.write(req.data, request.version)
- case req: MetadataRequest => MetadataRequestDataJsonConverter.write(req.data, request.version)
- case req: OffsetCommitRequest => OffsetCommitRequestDataJsonConverter.write(req.data, request.version)
- case req: OffsetDeleteRequest => OffsetDeleteRequestDataJsonConverter.write(req.data, request.version)
- case req: OffsetFetchRequest => OffsetFetchRequestDataJsonConverter.write(req.data, request.version)
- case req: OffsetsForLeaderEpochRequest => OffsetForLeaderEpochRequestDataJsonConverter.write(req.data, request.version)
- case req: ProduceRequest => ProduceRequestDataJsonConverter.write(req.data, request.version, false)
- case req: PushTelemetryRequest => PushTelemetryRequestDataJsonConverter.write(req.data, request.version)
- case req: ReadShareGroupStateRequest => ReadShareGroupStateRequestDataJsonConverter.write(req.data, request.version)
- case req: ReadShareGroupStateSummaryRequest => ReadShareGroupStateSummaryRequestDataJsonConverter.write(req.data, request.version)
- case req: RenewDelegationTokenRequest => RenewDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
- case req: SaslAuthenticateRequest => SaslAuthenticateRequestDataJsonConverter.write(req.data, request.version)
- case req: SaslHandshakeRequest => SaslHandshakeRequestDataJsonConverter.write(req.data, request.version)
- case req: ShareAcknowledgeRequest => ShareAcknowledgeRequestDataJsonConverter.write(req.data, request.version)
- case req: ShareFetchRequest => ShareFetchRequestDataJsonConverter.write(req.data, request.version)
- case req: ShareGroupDescribeRequest => ShareGroupDescribeRequestDataJsonConverter.write(req.data, request.version)
- case req: ShareGroupHeartbeatRequest => ShareGroupHeartbeatRequestDataJsonConverter.write(req.data, request.version)
- case req: StopReplicaRequest => StopReplicaRequestDataJsonConverter.write(req.data, request.version)
- case req: SyncGroupRequest => SyncGroupRequestDataJsonConverter.write(req.data, request.version)
- case req: TxnOffsetCommitRequest => TxnOffsetCommitRequestDataJsonConverter.write(req.data, request.version)
- case req: UnregisterBrokerRequest => UnregisterBrokerRequestDataJsonConverter.write(req.data, request.version)
- case req: UpdateFeaturesRequest => UpdateFeaturesRequestDataJsonConverter.write(req.data, request.version)
- case req: UpdateMetadataRequest => UpdateMetadataRequestDataJsonConverter.write(req.data, request.version)
- case req: VoteRequest => VoteRequestDataJsonConverter.write(req.data, request.version)
- case req: WriteShareGroupStateRequest => WriteShareGroupStateRequestDataJsonConverter.write(req.data, request.version)
- case req: WriteTxnMarkersRequest => WriteTxnMarkersRequestDataJsonConverter.write(req.data, request.version)
- case req: AddRaftVoterRequest => AddRaftVoterRequestDataJsonConverter.write(req.data, request.version)
- case req: RemoveRaftVoterRequest => RemoveRaftVoterRequestDataJsonConverter.write(req.data, request.version)
- case req: UpdateRaftVoterRequest => UpdateRaftVoterRequestDataJsonConverter.write(req.data, request.version)
- case _ => throw new IllegalStateException(s"ApiKey ${request.apiKey} is not currently handled in `request`, the " +
- "code should be updated to do so.")
- }
- }
-
- def response(response: AbstractResponse, version: Short): JsonNode = {
- response match {
- case res: AddOffsetsToTxnResponse => AddOffsetsToTxnResponseDataJsonConverter.write(res.data, version)
- case res: AddPartitionsToTxnResponse => AddPartitionsToTxnResponseDataJsonConverter.write(res.data, version)
- case res: AllocateProducerIdsResponse => AllocateProducerIdsResponseDataJsonConverter.write(res.data, version)
- case res: AlterClientQuotasResponse => AlterClientQuotasResponseDataJsonConverter.write(res.data, version)
- case res: AlterConfigsResponse => AlterConfigsResponseDataJsonConverter.write(res.data, version)
- case res: AlterPartitionReassignmentsResponse => AlterPartitionReassignmentsResponseDataJsonConverter.write(res.data, version)
- case res: AlterPartitionResponse => AlterPartitionResponseDataJsonConverter.write(res.data, version)
- case res: AlterReplicaLogDirsResponse => AlterReplicaLogDirsResponseDataJsonConverter.write(res.data, version)
- case res: AlterUserScramCredentialsResponse => AlterUserScramCredentialsResponseDataJsonConverter.write(res.data, version)
- case res: ApiVersionsResponse => ApiVersionsResponseDataJsonConverter.write(res.data, version)
- case res: AssignReplicasToDirsResponse => AssignReplicasToDirsResponseDataJsonConverter.write(res.data, version)
- case res: BeginQuorumEpochResponse => BeginQuorumEpochResponseDataJsonConverter.write(res.data, version)
- case res: BrokerHeartbeatResponse => BrokerHeartbeatResponseDataJsonConverter.write(res.data, version)
- case res: BrokerRegistrationResponse => BrokerRegistrationResponseDataJsonConverter.write(res.data, version)
- case res: ConsumerGroupDescribeResponse => ConsumerGroupDescribeResponseDataJsonConverter.write(res.data, version)
- case res: ConsumerGroupHeartbeatResponse => ConsumerGroupHeartbeatResponseDataJsonConverter.write(res.data, version)
- case res: ControlledShutdownResponse => ControlledShutdownResponseDataJsonConverter.write(res.data, version)
- case req: ControllerRegistrationResponse => ControllerRegistrationResponseDataJsonConverter.write(req.data, version)
- case res: CreateAclsResponse => CreateAclsResponseDataJsonConverter.write(res.data, version)
- case res: CreateDelegationTokenResponse => CreateDelegationTokenResponseDataJsonConverter.write(res.data, version)
- case res: CreatePartitionsResponse => CreatePartitionsResponseDataJsonConverter.write(res.data, version)
- case res: CreateTopicsResponse => CreateTopicsResponseDataJsonConverter.write(res.data, version)
- case res: DeleteAclsResponse => DeleteAclsResponseDataJsonConverter.write(res.data, version)
- case res: DeleteGroupsResponse => DeleteGroupsResponseDataJsonConverter.write(res.data, version)
- case res: DeleteRecordsResponse => DeleteRecordsResponseDataJsonConverter.write(res.data, version)
- case res: DeleteShareGroupStateResponse => DeleteShareGroupStateResponseDataJsonConverter.write(res.data, version)
- case res: DeleteTopicsResponse => DeleteTopicsResponseDataJsonConverter.write(res.data, version)
- case res: DescribeAclsResponse => DescribeAclsResponseDataJsonConverter.write(res.data, version)
- case res: DescribeClientQuotasResponse => DescribeClientQuotasResponseDataJsonConverter.write(res.data, version)
- case res: DescribeClusterResponse => DescribeClusterResponseDataJsonConverter.write(res.data, version)
- case res: DescribeConfigsResponse => DescribeConfigsResponseDataJsonConverter.write(res.data, version)
- case res: DescribeDelegationTokenResponse => DescribeDelegationTokenResponseDataJsonConverter.write(res.data, version)
- case res: DescribeGroupsResponse => DescribeGroupsResponseDataJsonConverter.write(res.data, version)
- case res: DescribeLogDirsResponse => DescribeLogDirsResponseDataJsonConverter.write(res.data, version)
- case res: DescribeProducersResponse => DescribeProducersResponseDataJsonConverter.write(res.data, version)
- case res: DescribeQuorumResponse => DescribeQuorumResponseDataJsonConverter.write(res.data, version)
- case res: DescribeTopicPartitionsResponse => DescribeTopicPartitionsResponseDataJsonConverter.write(res.data, version)
- case res: DescribeTransactionsResponse => DescribeTransactionsResponseDataJsonConverter.write(res.data, version)
- case res: DescribeUserScramCredentialsResponse => DescribeUserScramCredentialsResponseDataJsonConverter.write(res.data, version)
- case res: ElectLeadersResponse => ElectLeadersResponseDataJsonConverter.write(res.data, version)
- case res: EndQuorumEpochResponse => EndQuorumEpochResponseDataJsonConverter.write(res.data, version)
- case res: EndTxnResponse => EndTxnResponseDataJsonConverter.write(res.data, version)
- case res: EnvelopeResponse => EnvelopeResponseDataJsonConverter.write(res.data, version)
- case res: ExpireDelegationTokenResponse => ExpireDelegationTokenResponseDataJsonConverter.write(res.data, version)
- case res: FetchResponse => FetchResponseDataJsonConverter.write(res.data, version, false)
- case res: FetchSnapshotResponse => FetchSnapshotResponseDataJsonConverter.write(res.data, version)
- case res: FindCoordinatorResponse => FindCoordinatorResponseDataJsonConverter.write(res.data, version)
- case res: GetTelemetrySubscriptionsResponse => GetTelemetrySubscriptionsResponseDataJsonConverter.write(res.data, version)
- case res: HeartbeatResponse => HeartbeatResponseDataJsonConverter.write(res.data, version)
- case res: IncrementalAlterConfigsResponse => IncrementalAlterConfigsResponseDataJsonConverter.write(res.data, version)
- case res: InitializeShareGroupStateResponse => InitializeShareGroupStateResponseDataJsonConverter.write(res.data, version)
- case res: InitProducerIdResponse => InitProducerIdResponseDataJsonConverter.write(res.data, version)
- case res: JoinGroupResponse => JoinGroupResponseDataJsonConverter.write(res.data, version)
- case res: LeaderAndIsrResponse => LeaderAndIsrResponseDataJsonConverter.write(res.data, version)
- case res: LeaveGroupResponse => LeaveGroupResponseDataJsonConverter.write(res.data, version)
- case res: ListClientMetricsResourcesResponse => ListClientMetricsResourcesResponseDataJsonConverter.write(res.data, version)
- case res: ListGroupsResponse => ListGroupsResponseDataJsonConverter.write(res.data, version)
- case res: ListOffsetsResponse => ListOffsetsResponseDataJsonConverter.write(res.data, version)
- case res: ListPartitionReassignmentsResponse => ListPartitionReassignmentsResponseDataJsonConverter.write(res.data, version)
- case res: ListTransactionsResponse => ListTransactionsResponseDataJsonConverter.write(res.data, version)
- case res: MetadataResponse => MetadataResponseDataJsonConverter.write(res.data, version)
- case res: OffsetCommitResponse => OffsetCommitResponseDataJsonConverter.write(res.data, version)
- case res: OffsetDeleteResponse => OffsetDeleteResponseDataJsonConverter.write(res.data, version)
- case res: OffsetFetchResponse => OffsetFetchResponseDataJsonConverter.write(res.data, version)
- case res: OffsetsForLeaderEpochResponse => OffsetForLeaderEpochResponseDataJsonConverter.write(res.data, version)
- case res: ProduceResponse => ProduceResponseDataJsonConverter.write(res.data, version)
- case res: PushTelemetryResponse => PushTelemetryResponseDataJsonConverter.write(res.data, version)
- case res: ReadShareGroupStateResponse => ReadShareGroupStateResponseDataJsonConverter.write(res.data, version)
- case res: ReadShareGroupStateSummaryResponse => ReadShareGroupStateSummaryResponseDataJsonConverter.write(res.data, version)
- case res: RenewDelegationTokenResponse => RenewDelegationTokenResponseDataJsonConverter.write(res.data, version)
- case res: SaslAuthenticateResponse => SaslAuthenticateResponseDataJsonConverter.write(res.data, version)
- case res: SaslHandshakeResponse => SaslHandshakeResponseDataJsonConverter.write(res.data, version)
- case res: ShareAcknowledgeResponse => ShareAcknowledgeResponseDataJsonConverter.write(res.data, version)
- case res: ShareFetchResponse => ShareFetchResponseDataJsonConverter.write(res.data, version)
- case res: ShareGroupDescribeResponse => ShareGroupDescribeResponseDataJsonConverter.write(res.data, version)
- case res: ShareGroupHeartbeatResponse => ShareGroupHeartbeatResponseDataJsonConverter.write(res.data, version)
- case res: StopReplicaResponse => StopReplicaResponseDataJsonConverter.write(res.data, version)
- case res: SyncGroupResponse => SyncGroupResponseDataJsonConverter.write(res.data, version)
- case res: TxnOffsetCommitResponse => TxnOffsetCommitResponseDataJsonConverter.write(res.data, version)
- case res: UnregisterBrokerResponse => UnregisterBrokerResponseDataJsonConverter.write(res.data, version)
- case res: UpdateFeaturesResponse => UpdateFeaturesResponseDataJsonConverter.write(res.data, version)
- case res: UpdateMetadataResponse => UpdateMetadataResponseDataJsonConverter.write(res.data, version)
- case res: VoteResponse => VoteResponseDataJsonConverter.write(res.data, version)
- case res: WriteShareGroupStateResponse => WriteShareGroupStateResponseDataJsonConverter.write(res.data, version)
- case res: WriteTxnMarkersResponse => WriteTxnMarkersResponseDataJsonConverter.write(res.data, version)
- case res: AddRaftVoterResponse => AddRaftVoterResponseDataJsonConverter.write(res.data, version)
- case res: RemoveRaftVoterResponse => RemoveRaftVoterResponseDataJsonConverter.write(res.data, version)
- case res: UpdateRaftVoterResponse => UpdateRaftVoterResponseDataJsonConverter.write(res.data, version)
- case _ => throw new IllegalStateException(s"ApiKey ${response.apiKey} is not currently handled in `response`, the " +
- "code should be updated to do so.")
- }
- }
-
- def requestHeaderNode(header: RequestHeader): JsonNode = {
- val node = RequestHeaderDataJsonConverter.write(header.data, header.headerVersion, false).asInstanceOf[ObjectNode]
- node.set("requestApiKeyName", new TextNode(header.apiKey.toString))
- if (header.apiKey().isVersionDeprecated(header.apiVersion()))
- node.set("requestApiVersionDeprecated", BooleanNode.TRUE)
- node
- }
-
- def requestDesc(header: RequestHeader, requestNode: Option[JsonNode], isForwarded: Boolean): JsonNode = {
- val node = new ObjectNode(JsonNodeFactory.instance)
- node.set("isForwarded", if (isForwarded) BooleanNode.TRUE else BooleanNode.FALSE)
- node.set("requestHeader", requestHeaderNode(header))
- node.set("request", requestNode.getOrElse(new TextNode("")))
- node
- }
-
- def clientInfoNode(clientInfo: ClientInformation): JsonNode = {
- val node = new ObjectNode(JsonNodeFactory.instance)
- node.set("softwareName", new TextNode(clientInfo.softwareName))
- node.set("softwareVersion", new TextNode(clientInfo.softwareVersion))
- node
- }
-
- def requestDescMetrics(header: RequestHeader, requestNode: Option[JsonNode], responseNode: Option[JsonNode],
- context: RequestContext, session: Session, isForwarded: Boolean,
- totalTimeMs: Double, requestQueueTimeMs: Double, apiLocalTimeMs: Double,
- apiRemoteTimeMs: Double, apiThrottleTimeMs: Long, responseQueueTimeMs: Double,
- responseSendTimeMs: Double, temporaryMemoryBytes: Long,
- messageConversionsTimeMs: Double): JsonNode = {
- val node = requestDesc(header, requestNode, isForwarded).asInstanceOf[ObjectNode]
- node.set("response", responseNode.getOrElse(new TextNode("")))
- node.set("connection", new TextNode(context.connectionId))
- node.set("totalTimeMs", new DoubleNode(totalTimeMs))
- node.set("requestQueueTimeMs", new DoubleNode(requestQueueTimeMs))
- node.set("localTimeMs", new DoubleNode(apiLocalTimeMs))
- node.set("remoteTimeMs", new DoubleNode(apiRemoteTimeMs))
- node.set("throttleTimeMs", new LongNode(apiThrottleTimeMs))
- node.set("responseQueueTimeMs", new DoubleNode(responseQueueTimeMs))
- node.set("sendTimeMs", new DoubleNode(responseSendTimeMs))
- node.set("securityProtocol", new TextNode(context.securityProtocol.toString))
- node.set("principal", new TextNode(session.principal.toString))
- node.set("listener", new TextNode(context.listenerName.value))
- node.set("clientInformation", clientInfoNode(context.clientInformation))
- if (temporaryMemoryBytes > 0)
- node.set("temporaryMemoryBytes", new LongNode(temporaryMemoryBytes))
- if (messageConversionsTimeMs > 0)
- node.set("messageConversionsTime", new DoubleNode(messageConversionsTimeMs))
- node
- }
-}
diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
index 3aaa1458f33..d01d390813b 100644
--- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.common.requests.AlterConfigsRequest._
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.{SecurityUtils, Utils}
+import org.apache.kafka.network.RequestConvertToJson
import org.apache.kafka.network.metrics.RequestChannelMetrics
import org.apache.kafka.test
import org.junit.jupiter.api.Assertions._
@@ -49,6 +50,7 @@ import java.nio.ByteBuffer
import java.util.Collections
import java.util.concurrent.atomic.AtomicReference
import scala.collection.{Map, Seq}
+import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
class RequestChannelTest {
@@ -69,7 +71,7 @@ class RequestChannelTest {
val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest]
val loggedConfig = loggableAlterConfigs.configs.get(resource)
assertEquals(expectedValues, toMap(loggedConfig))
- val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog, alterConfigs.isForwarded).toString
+ val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog.asJava, alterConfigs.isForwarded).toString
assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive config logged $alterConfigsDesc")
}
@@ -133,7 +135,7 @@ class RequestChannelTest {
val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[IncrementalAlterConfigsRequest]
val loggedConfig = loggableAlterConfigs.data.resources.find(resource.`type`.id, resource.name).configs
assertEquals(expectedValues, toMap(loggedConfig))
- val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog, alterConfigs.isForwarded).toString
+ val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog.asJava, alterConfigs.isForwarded).toString
assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive config logged $alterConfigsDesc")
}
diff --git a/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala b/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
index ac9870cc06a..bdc7da74ddd 100644
--- a/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
@@ -21,91 +21,22 @@ import java.net.InetAddress
import java.nio.ByteBuffer
import com.fasterxml.jackson.databind.node.{BooleanNode, DoubleNode, JsonNodeFactory, LongNode, ObjectNode, TextNode}
import kafka.network
-import kafka.network.RequestConvertToJson.requestHeaderNode
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message._
import org.apache.kafka.common.network.{ClientInformation, ListenerName, NetworkSend}
-import org.apache.kafka.common.protocol.{ApiKeys, Errors, MessageUtil}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.network.RequestConvertToJson
import org.apache.kafka.network.metrics.RequestChannelMetrics
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.mockito.Mockito.mock
import java.util.Collections
-import scala.collection.mutable.ArrayBuffer
+import scala.compat.java8.OptionConverters._
class RequestConvertToJsonTest {
- @Test
- def testAllRequestTypesHandled(): Unit = {
- val unhandledKeys = ArrayBuffer[String]()
- ApiKeys.values().foreach { key => {
- val version: Short = key.latestVersion()
- val message = key match {
- case ApiKeys.DESCRIBE_ACLS =>
- ApiMessageType.fromApiKey(key.id).newRequest().asInstanceOf[DescribeAclsRequestData]
- .setPatternTypeFilter(1).setResourceTypeFilter(1).setPermissionType(1).setOperation(1)
- case _ =>
- ApiMessageType.fromApiKey(key.id).newRequest()
- }
-
- val bytes = MessageUtil.toByteBuffer(message, version)
- val req = AbstractRequest.parseRequest(key, version, bytes).request
- try {
- RequestConvertToJson.request(req)
- } catch {
- case _ : IllegalStateException => unhandledKeys += key.toString
- }
- }}
- assertEquals(ArrayBuffer.empty, unhandledKeys, "Unhandled request keys")
- }
-
- @Test
- def testAllApiVersionsResponseHandled(): Unit = {
-
- ApiKeys.values().foreach { key => {
- val unhandledVersions = ArrayBuffer[java.lang.Short]()
- key.allVersions().forEach { version => {
- val message = key match {
- // Specify top-level error handling for verifying compatibility across versions
- case ApiKeys.DESCRIBE_LOG_DIRS =>
- ApiMessageType.fromApiKey(key.id).newResponse().asInstanceOf[DescribeLogDirsResponseData]
- .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code())
- case _ =>
- ApiMessageType.fromApiKey(key.id).newResponse()
- }
-
- val bytes = MessageUtil.toByteBuffer(message, version)
- val response = AbstractResponse.parseResponse(key, bytes, version)
- try {
- RequestConvertToJson.response(response, version)
- } catch {
- case _ : IllegalStateException => unhandledVersions += version
- }}
- }
- assertEquals(ArrayBuffer.empty, unhandledVersions, s"API: ${key.toString} - Unhandled request versions")
- }}
- }
-
- @Test
- def testAllResponseTypesHandled(): Unit = {
- val unhandledKeys = ArrayBuffer[String]()
- ApiKeys.values().foreach { key => {
- val version: Short = key.latestVersion()
- val message = ApiMessageType.fromApiKey(key.id).newResponse()
- val bytes = MessageUtil.toByteBuffer(message, version)
- val res = AbstractResponse.parseResponse(key, bytes, version)
- try {
- RequestConvertToJson.response(res, version)
- } catch {
- case _ : IllegalStateException => unhandledKeys += key.toString
- }
- }}
- assertEquals(ArrayBuffer.empty, unhandledKeys, "Unhandled response keys")
- }
-
@Test
def testRequestHeaderNode(): Unit = {
val alterIsrRequest = new AlterPartitionRequest(new AlterPartitionRequestData(), 0)
@@ -135,19 +66,6 @@ class RequestConvertToJsonTest {
assertEquals(expectedNode, actualNode)
}
- @Test
- def testClientInfoNode(): Unit = {
- val clientInfo = new ClientInformation("name", "1")
-
- val expectedNode = new ObjectNode(JsonNodeFactory.instance)
- expectedNode.set("softwareName", new TextNode(clientInfo.softwareName))
- expectedNode.set("softwareVersion", new TextNode(clientInfo.softwareVersion))
-
- val actualNode = RequestConvertToJson.clientInfoNode(clientInfo)
-
- assertEquals(expectedNode, actualNode)
- }
-
@Test
def testRequestDesc(): Unit = {
val alterIsrRequest = new AlterPartitionRequest(new AlterPartitionRequestData(), 0)
@@ -155,10 +73,10 @@ class RequestConvertToJsonTest {
val expectedNode = new ObjectNode(JsonNodeFactory.instance)
expectedNode.set("isForwarded", if (req.isForwarded) BooleanNode.TRUE else BooleanNode.FALSE)
- expectedNode.set("requestHeader", requestHeaderNode(req.header))
+ expectedNode.set("requestHeader", RequestConvertToJson.requestHeaderNode(req.header))
expectedNode.set("request", req.requestLog.getOrElse(new TextNode("")))
- val actualNode = RequestConvertToJson.requestDesc(req.header, req.requestLog, req.isForwarded)
+ val actualNode = RequestConvertToJson.requestDesc(req.header, req.requestLog.asJava, req.isForwarded)
assertEquals(expectedNode, actualNode)
}
@@ -181,7 +99,7 @@ class RequestConvertToJsonTest {
val temporaryMemoryBytes = 8
val messageConversionsTimeMs = 9
- val expectedNode = RequestConvertToJson.requestDesc(req.header, req.requestLog, req.isForwarded).asInstanceOf[ObjectNode]
+ val expectedNode = RequestConvertToJson.requestDesc(req.header, req.requestLog.asJava, req.isForwarded).asInstanceOf[ObjectNode]
expectedNode.set("response", res.responseLog.getOrElse(new TextNode("")))
expectedNode.set("connection", new TextNode(req.context.connectionId))
expectedNode.set("totalTimeMs", new DoubleNode(totalTimeMs))
@@ -198,7 +116,7 @@ class RequestConvertToJsonTest {
expectedNode.set("temporaryMemoryBytes", new LongNode(temporaryMemoryBytes))
expectedNode.set("messageConversionsTime", new DoubleNode(messageConversionsTimeMs))
- val actualNode = RequestConvertToJson.requestDescMetrics(req.header, req.requestLog, res.responseLog, req.context, req.session, req.isForwarded,
+ val actualNode = RequestConvertToJson.requestDescMetrics(req.header, req.requestLog.asJava, res.responseLog.asJava, req.context, req.session, req.isForwarded,
totalTimeMs, requestQueueTimeMs, apiLocalTimeMs, apiRemoteTimeMs, apiThrottleTimeMs, responseQueueTimeMs,
responseSendTimeMs, temporaryMemoryBytes, messageConversionsTimeMs).asInstanceOf[ObjectNode]
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index b7a1c7c32f0..2a34d2aea5f 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.utils._
+import org.apache.kafka.network.RequestConvertToJson
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.network.metrics.RequestMetrics
import org.apache.kafka.security.CredentialProvider
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
index eecb619c0e8..f0fa2c32204 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
@@ -17,8 +17,6 @@
package org.apache.kafka.jmh.common;
-import kafka.network.RequestConvertToJson;
-
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.network.Send;
@@ -27,6 +25,7 @@ import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ByteBufferChannel;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.network.RequestConvertToJson;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
index f5e8b3e4591..943c5baa145 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
@@ -17,13 +17,12 @@
package org.apache.kafka.jmh.common;
-import kafka.network.RequestConvertToJson;
-
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.network.RequestConvertToJson;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ProduceRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ProduceRequestBenchmark.java
index 2fcafc5ada1..55ccee8516e 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ProduceRequestBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ProduceRequestBenchmark.java
@@ -17,11 +17,10 @@
package org.apache.kafka.jmh.common;
-import kafka.network.RequestConvertToJson;
-
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.network.RequestConvertToJson;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index 7b2bd65a621..a56ff3400a7 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -19,7 +19,6 @@ package org.apache.kafka.jmh.metadata;
import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.network.RequestChannel;
-import kafka.network.RequestConvertToJson;
import kafka.server.AutoTopicCreationManager;
import kafka.server.ClientQuotaManager;
import kafka.server.ClientRequestQuotaManager;
@@ -58,6 +57,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.network.RequestConvertToJson;
import org.apache.kafka.network.metrics.RequestChannelMetrics;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.FinalizedFeatures;
@@ -237,7 +237,9 @@ public class KRaftMetadataRequestBenchmark {
@Benchmark
public String testRequestToJson() {
- return RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(), allTopicMetadataRequest.requestLog(), allTopicMetadataRequest.isForwarded()).toString();
+ Option option = allTopicMetadataRequest.requestLog();
+ Optional optional = option.isDefined() ? Optional.of(option.get()) : Optional.empty();
+ return RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(), optional, allTopicMetadataRequest.isForwarded()).toString();
}
@Benchmark
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index e7acdb5de41..eeebbfaa1a5 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -20,7 +20,6 @@ package org.apache.kafka.jmh.metadata;
import kafka.controller.KafkaController;
import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.network.RequestChannel;
-import kafka.network.RequestConvertToJson;
import kafka.server.AutoTopicCreationManager;
import kafka.server.BrokerFeatures;
import kafka.server.ClientQuotaManager;
@@ -59,6 +58,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupCoordinator;
+import org.apache.kafka.network.RequestConvertToJson;
import org.apache.kafka.network.metrics.RequestChannelMetrics;
import org.apache.kafka.server.common.FinalizedFeatures;
import org.apache.kafka.server.common.MetadataVersion;
@@ -237,7 +237,9 @@ public class MetadataRequestBenchmark {
@Benchmark
public String testRequestToJson() {
- return RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(), allTopicMetadataRequest.requestLog(), allTopicMetadataRequest.isForwarded()).toString();
+ Option option = allTopicMetadataRequest.requestLog();
+ Optional optional = option.isDefined() ? Optional.of(option.get()) : Optional.empty();
+ return RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(), optional, allTopicMetadataRequest.isForwarded()).toString();
}
@Benchmark
diff --git a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
new file mode 100644
index 00000000000..ac744ef7bac
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
@@ -0,0 +1,812 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.network;
+
+import org.apache.kafka.common.message.AddOffsetsToTxnRequestDataJsonConverter;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseDataJsonConverter;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestDataJsonConverter;
+import org.apache.kafka.common.message.AddPartitionsToTxnResponseDataJsonConverter;
+import org.apache.kafka.common.message.AddRaftVoterRequestDataJsonConverter;
+import org.apache.kafka.common.message.AddRaftVoterResponseDataJsonConverter;
+import org.apache.kafka.common.message.AllocateProducerIdsRequestDataJsonConverter;
+import org.apache.kafka.common.message.AllocateProducerIdsResponseDataJsonConverter;
+import org.apache.kafka.common.message.AlterClientQuotasRequestDataJsonConverter;
+import org.apache.kafka.common.message.AlterClientQuotasResponseDataJsonConverter;
+import org.apache.kafka.common.message.AlterConfigsRequestDataJsonConverter;
+import org.apache.kafka.common.message.AlterConfigsResponseDataJsonConverter;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestDataJsonConverter;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseDataJsonConverter;
+import org.apache.kafka.common.message.AlterPartitionRequestDataJsonConverter;
+import org.apache.kafka.common.message.AlterPartitionResponseDataJsonConverter;
+import org.apache.kafka.common.message.AlterReplicaLogDirsRequestDataJsonConverter;
+import org.apache.kafka.common.message.AlterReplicaLogDirsResponseDataJsonConverter;
+import org.apache.kafka.common.message.AlterUserScramCredentialsRequestDataJsonConverter;
+import org.apache.kafka.common.message.AlterUserScramCredentialsResponseDataJsonConverter;
+import org.apache.kafka.common.message.ApiVersionsRequestDataJsonConverter;
+import org.apache.kafka.common.message.ApiVersionsResponseDataJsonConverter;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestDataJsonConverter;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseDataJsonConverter;
+import org.apache.kafka.common.message.BeginQuorumEpochRequestDataJsonConverter;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseDataJsonConverter;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestDataJsonConverter;
+import org.apache.kafka.common.message.BrokerHeartbeatResponseDataJsonConverter;
+import org.apache.kafka.common.message.BrokerRegistrationRequestDataJsonConverter;
+import org.apache.kafka.common.message.BrokerRegistrationResponseDataJsonConverter;
+import org.apache.kafka.common.message.ConsumerGroupDescribeRequestDataJsonConverter;
+import org.apache.kafka.common.message.ConsumerGroupDescribeResponseDataJsonConverter;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestDataJsonConverter;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseDataJsonConverter;
+import org.apache.kafka.common.message.ControlledShutdownRequestDataJsonConverter;
+import org.apache.kafka.common.message.ControlledShutdownResponseDataJsonConverter;
+import org.apache.kafka.common.message.ControllerRegistrationRequestDataJsonConverter;
+import org.apache.kafka.common.message.ControllerRegistrationResponseDataJsonConverter;
+import org.apache.kafka.common.message.CreateAclsRequestDataJsonConverter;
+import org.apache.kafka.common.message.CreateAclsResponseDataJsonConverter;
+import org.apache.kafka.common.message.CreateDelegationTokenRequestDataJsonConverter;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseDataJsonConverter;
+import org.apache.kafka.common.message.CreatePartitionsRequestDataJsonConverter;
+import org.apache.kafka.common.message.CreatePartitionsResponseDataJsonConverter;
+import org.apache.kafka.common.message.CreateTopicsRequestDataJsonConverter;
+import org.apache.kafka.common.message.CreateTopicsResponseDataJsonConverter;
+import org.apache.kafka.common.message.DeleteAclsRequestDataJsonConverter;
+import org.apache.kafka.common.message.DeleteAclsResponseDataJsonConverter;
+import org.apache.kafka.common.message.DeleteGroupsRequestDataJsonConverter;
+import org.apache.kafka.common.message.DeleteGroupsResponseDataJsonConverter;
+import org.apache.kafka.common.message.DeleteRecordsRequestDataJsonConverter;
+import org.apache.kafka.common.message.DeleteRecordsResponseDataJsonConverter;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestDataJsonConverter;
+import org.apache.kafka.common.message.DeleteShareGroupStateResponseDataJsonConverter;
+import org.apache.kafka.common.message.DeleteTopicsRequestDataJsonConverter;
+import org.apache.kafka.common.message.DeleteTopicsResponseDataJsonConverter;
+import org.apache.kafka.common.message.DescribeAclsRequestDataJsonConverter;
+import org.apache.kafka.common.message.DescribeAclsResponseDataJsonConverter;
+import org.apache.kafka.common.message.DescribeClientQuotasRequestDataJsonConverter;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseDataJsonConverter;
+import org.apache.kafka.common.message.DescribeClusterRequestDataJsonConverter;
+import org.apache.kafka.common.message.DescribeClusterResponseDataJsonConverter;
+import org.apache.kafka.common.message.DescribeConfigsRequestDataJsonConverter;
+import org.apache.kafka.common.message.DescribeConfigsResponseDataJsonConverter;
+import org.apache.kafka.common.message.DescribeDelegationTokenRequestDataJsonConverter;
+import org.apache.kafka.common.message.DescribeDelegationTokenResponseDataJsonConverter;
+import org.apache.kafka.common.message.DescribeGroupsRequestDataJsonConverter;
+import org.apache.kafka.common.message.DescribeGroupsResponseDataJsonConverter;
+import org.apache.kafka.common.message.DescribeLogDirsRequestDataJsonConverter;
+import org.apache.kafka.common.message.DescribeLogDirsResponseDataJsonConverter;
+import org.apache.kafka.common.message.DescribeProducersRequestDataJsonConverter;
+import org.apache.kafka.common.message.DescribeProducersResponseDataJsonConverter;
+import org.apache.kafka.common.message.DescribeQuorumRequestDataJsonConverter;
+import org.apache.kafka.common.message.DescribeQuorumResponseDataJsonConverter;
+import org.apache.kafka.common.message.DescribeTopicPartitionsRequestDataJsonConverter;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseDataJsonConverter;
+import org.apache.kafka.common.message.DescribeTransactionsRequestDataJsonConverter;
+import org.apache.kafka.common.message.DescribeTransactionsResponseDataJsonConverter;
+import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestDataJsonConverter;
+import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseDataJsonConverter;
+import org.apache.kafka.common.message.ElectLeadersRequestDataJsonConverter;
+import org.apache.kafka.common.message.ElectLeadersResponseDataJsonConverter;
+import org.apache.kafka.common.message.EndQuorumEpochRequestDataJsonConverter;
+import org.apache.kafka.common.message.EndQuorumEpochResponseDataJsonConverter;
+import org.apache.kafka.common.message.EndTxnRequestDataJsonConverter;
+import org.apache.kafka.common.message.EndTxnResponseDataJsonConverter;
+import org.apache.kafka.common.message.EnvelopeRequestDataJsonConverter;
+import org.apache.kafka.common.message.EnvelopeResponseDataJsonConverter;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestDataJsonConverter;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseDataJsonConverter;
+import org.apache.kafka.common.message.FetchRequestDataJsonConverter;
+import org.apache.kafka.common.message.FetchResponseDataJsonConverter;
+import org.apache.kafka.common.message.FetchSnapshotRequestDataJsonConverter;
+import org.apache.kafka.common.message.FetchSnapshotResponseDataJsonConverter;
+import org.apache.kafka.common.message.FindCoordinatorRequestDataJsonConverter;
+import org.apache.kafka.common.message.FindCoordinatorResponseDataJsonConverter;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestDataJsonConverter;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseDataJsonConverter;
+import org.apache.kafka.common.message.HeartbeatRequestDataJsonConverter;
+import org.apache.kafka.common.message.HeartbeatResponseDataJsonConverter;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestDataJsonConverter;
+import org.apache.kafka.common.message.IncrementalAlterConfigsResponseDataJsonConverter;
+import org.apache.kafka.common.message.InitProducerIdRequestDataJsonConverter;
+import org.apache.kafka.common.message.InitProducerIdResponseDataJsonConverter;
+import org.apache.kafka.common.message.InitializeShareGroupStateRequestDataJsonConverter;
+import org.apache.kafka.common.message.InitializeShareGroupStateResponseDataJsonConverter;
+import org.apache.kafka.common.message.JoinGroupRequestDataJsonConverter;
+import org.apache.kafka.common.message.JoinGroupResponseDataJsonConverter;
+import org.apache.kafka.common.message.LeaderAndIsrRequestDataJsonConverter;
+import org.apache.kafka.common.message.LeaderAndIsrResponseDataJsonConverter;
+import org.apache.kafka.common.message.LeaveGroupRequestDataJsonConverter;
+import org.apache.kafka.common.message.LeaveGroupResponseDataJsonConverter;
+import org.apache.kafka.common.message.ListClientMetricsResourcesRequestDataJsonConverter;
+import org.apache.kafka.common.message.ListClientMetricsResourcesResponseDataJsonConverter;
+import org.apache.kafka.common.message.ListGroupsRequestDataJsonConverter;
+import org.apache.kafka.common.message.ListGroupsResponseDataJsonConverter;
+import org.apache.kafka.common.message.ListOffsetsRequestDataJsonConverter;
+import org.apache.kafka.common.message.ListOffsetsResponseDataJsonConverter;
+import org.apache.kafka.common.message.ListPartitionReassignmentsRequestDataJsonConverter;
+import org.apache.kafka.common.message.ListPartitionReassignmentsResponseDataJsonConverter;
+import org.apache.kafka.common.message.ListTransactionsRequestDataJsonConverter;
+import org.apache.kafka.common.message.ListTransactionsResponseDataJsonConverter;
+import org.apache.kafka.common.message.MetadataRequestDataJsonConverter;
+import org.apache.kafka.common.message.MetadataResponseDataJsonConverter;
+import org.apache.kafka.common.message.OffsetCommitRequestDataJsonConverter;
+import org.apache.kafka.common.message.OffsetCommitResponseDataJsonConverter;
+import org.apache.kafka.common.message.OffsetDeleteRequestDataJsonConverter;
+import org.apache.kafka.common.message.OffsetDeleteResponseDataJsonConverter;
+import org.apache.kafka.common.message.OffsetFetchRequestDataJsonConverter;
+import org.apache.kafka.common.message.OffsetFetchResponseDataJsonConverter;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestDataJsonConverter;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseDataJsonConverter;
+import org.apache.kafka.common.message.ProduceRequestDataJsonConverter;
+import org.apache.kafka.common.message.ProduceResponseDataJsonConverter;
+import org.apache.kafka.common.message.PushTelemetryRequestDataJsonConverter;
+import org.apache.kafka.common.message.PushTelemetryResponseDataJsonConverter;
+import org.apache.kafka.common.message.ReadShareGroupStateRequestDataJsonConverter;
+import org.apache.kafka.common.message.ReadShareGroupStateResponseDataJsonConverter;
+import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestDataJsonConverter;
+import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseDataJsonConverter;
+import org.apache.kafka.common.message.RemoveRaftVoterRequestDataJsonConverter;
+import org.apache.kafka.common.message.RemoveRaftVoterResponseDataJsonConverter;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestDataJsonConverter;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseDataJsonConverter;
+import org.apache.kafka.common.message.RequestHeaderDataJsonConverter;
+import org.apache.kafka.common.message.SaslAuthenticateRequestDataJsonConverter;
+import org.apache.kafka.common.message.SaslAuthenticateResponseDataJsonConverter;
+import org.apache.kafka.common.message.SaslHandshakeRequestDataJsonConverter;
+import org.apache.kafka.common.message.SaslHandshakeResponseDataJsonConverter;
+import org.apache.kafka.common.message.ShareAcknowledgeRequestDataJsonConverter;
+import org.apache.kafka.common.message.ShareAcknowledgeResponseDataJsonConverter;
+import org.apache.kafka.common.message.ShareFetchRequestDataJsonConverter;
+import org.apache.kafka.common.message.ShareFetchResponseDataJsonConverter;
+import org.apache.kafka.common.message.ShareGroupDescribeRequestDataJsonConverter;
+import org.apache.kafka.common.message.ShareGroupDescribeResponseDataJsonConverter;
+import org.apache.kafka.common.message.ShareGroupHeartbeatRequestDataJsonConverter;
+import org.apache.kafka.common.message.ShareGroupHeartbeatResponseDataJsonConverter;
+import org.apache.kafka.common.message.StopReplicaRequestDataJsonConverter;
+import org.apache.kafka.common.message.StopReplicaResponseDataJsonConverter;
+import org.apache.kafka.common.message.SyncGroupRequestDataJsonConverter;
+import org.apache.kafka.common.message.SyncGroupResponseDataJsonConverter;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestDataJsonConverter;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseDataJsonConverter;
+import org.apache.kafka.common.message.UnregisterBrokerRequestDataJsonConverter;
+import org.apache.kafka.common.message.UnregisterBrokerResponseDataJsonConverter;
+import org.apache.kafka.common.message.UpdateFeaturesRequestDataJsonConverter;
+import org.apache.kafka.common.message.UpdateFeaturesResponseDataJsonConverter;
+import org.apache.kafka.common.message.UpdateMetadataRequestDataJsonConverter;
+import org.apache.kafka.common.message.UpdateMetadataResponseDataJsonConverter;
+import org.apache.kafka.common.message.UpdateRaftVoterRequestDataJsonConverter;
+import org.apache.kafka.common.message.UpdateRaftVoterResponseDataJsonConverter;
+import org.apache.kafka.common.message.VoteRequestDataJsonConverter;
+import org.apache.kafka.common.message.VoteResponseDataJsonConverter;
+import org.apache.kafka.common.message.WriteShareGroupStateRequestDataJsonConverter;
+import org.apache.kafka.common.message.WriteShareGroupStateResponseDataJsonConverter;
+import org.apache.kafka.common.message.WriteTxnMarkersRequestDataJsonConverter;
+import org.apache.kafka.common.message.WriteTxnMarkersResponseDataJsonConverter;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
+import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.AddRaftVoterRequest;
+import org.apache.kafka.common.requests.AddRaftVoterResponse;
+import org.apache.kafka.common.requests.AllocateProducerIdsRequest;
+import org.apache.kafka.common.requests.AllocateProducerIdsResponse;
+import org.apache.kafka.common.requests.AlterClientQuotasRequest;
+import org.apache.kafka.common.requests.AlterClientQuotasResponse;
+import org.apache.kafka.common.requests.AlterConfigsRequest;
+import org.apache.kafka.common.requests.AlterConfigsResponse;
+import org.apache.kafka.common.requests.AlterPartitionReassignmentsRequest;
+import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse;
+import org.apache.kafka.common.requests.AlterPartitionRequest;
+import org.apache.kafka.common.requests.AlterPartitionResponse;
+import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
+import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
+import org.apache.kafka.common.requests.AlterUserScramCredentialsRequest;
+import org.apache.kafka.common.requests.AlterUserScramCredentialsResponse;
+import org.apache.kafka.common.requests.ApiVersionsRequest;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.requests.BeginQuorumEpochRequest;
+import org.apache.kafka.common.requests.BeginQuorumEpochResponse;
+import org.apache.kafka.common.requests.BrokerHeartbeatRequest;
+import org.apache.kafka.common.requests.BrokerHeartbeatResponse;
+import org.apache.kafka.common.requests.BrokerRegistrationRequest;
+import org.apache.kafka.common.requests.BrokerRegistrationResponse;
+import org.apache.kafka.common.requests.ConsumerGroupDescribeRequest;
+import org.apache.kafka.common.requests.ConsumerGroupDescribeResponse;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.ControlledShutdownRequest;
+import org.apache.kafka.common.requests.ControlledShutdownResponse;
+import org.apache.kafka.common.requests.ControllerRegistrationRequest;
+import org.apache.kafka.common.requests.ControllerRegistrationResponse;
+import org.apache.kafka.common.requests.CreateAclsRequest;
+import org.apache.kafka.common.requests.CreateAclsResponse;
+import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
+import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
+import org.apache.kafka.common.requests.CreatePartitionsRequest;
+import org.apache.kafka.common.requests.CreatePartitionsResponse;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
+import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.DeleteAclsRequest;
+import org.apache.kafka.common.requests.DeleteAclsResponse;
+import org.apache.kafka.common.requests.DeleteGroupsRequest;
+import org.apache.kafka.common.requests.DeleteGroupsResponse;
+import org.apache.kafka.common.requests.DeleteRecordsRequest;
+import org.apache.kafka.common.requests.DeleteRecordsResponse;
+import org.apache.kafka.common.requests.DeleteShareGroupStateRequest;
+import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
+import org.apache.kafka.common.requests.DeleteTopicsRequest;
+import org.apache.kafka.common.requests.DeleteTopicsResponse;
+import org.apache.kafka.common.requests.DescribeAclsRequest;
+import org.apache.kafka.common.requests.DescribeAclsResponse;
+import org.apache.kafka.common.requests.DescribeClientQuotasRequest;
+import org.apache.kafka.common.requests.DescribeClientQuotasResponse;
+import org.apache.kafka.common.requests.DescribeClusterRequest;
+import org.apache.kafka.common.requests.DescribeClusterResponse;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.requests.DescribeDelegationTokenRequest;
+import org.apache.kafka.common.requests.DescribeDelegationTokenResponse;
+import org.apache.kafka.common.requests.DescribeGroupsRequest;
+import org.apache.kafka.common.requests.DescribeGroupsResponse;
+import org.apache.kafka.common.requests.DescribeLogDirsRequest;
+import org.apache.kafka.common.requests.DescribeLogDirsResponse;
+import org.apache.kafka.common.requests.DescribeProducersRequest;
+import org.apache.kafka.common.requests.DescribeProducersResponse;
+import org.apache.kafka.common.requests.DescribeQuorumRequest;
+import org.apache.kafka.common.requests.DescribeQuorumResponse;
+import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
+import org.apache.kafka.common.requests.DescribeTopicPartitionsResponse;
+import org.apache.kafka.common.requests.DescribeTransactionsRequest;
+import org.apache.kafka.common.requests.DescribeTransactionsResponse;
+import org.apache.kafka.common.requests.DescribeUserScramCredentialsRequest;
+import org.apache.kafka.common.requests.DescribeUserScramCredentialsResponse;
+import org.apache.kafka.common.requests.ElectLeadersRequest;
+import org.apache.kafka.common.requests.ElectLeadersResponse;
+import org.apache.kafka.common.requests.EndQuorumEpochRequest;
+import org.apache.kafka.common.requests.EndQuorumEpochResponse;
+import org.apache.kafka.common.requests.EndTxnRequest;
+import org.apache.kafka.common.requests.EndTxnResponse;
+import org.apache.kafka.common.requests.EnvelopeRequest;
+import org.apache.kafka.common.requests.EnvelopeResponse;
+import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
+import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.FetchSnapshotRequest;
+import org.apache.kafka.common.requests.FetchSnapshotResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.HeartbeatRequest;
+import org.apache.kafka.common.requests.HeartbeatResponse;
+import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
+import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.InitializeShareGroupStateRequest;
+import org.apache.kafka.common.requests.InitializeShareGroupStateResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.LeaderAndIsrRequest;
+import org.apache.kafka.common.requests.LeaderAndIsrResponse;
+import org.apache.kafka.common.requests.LeaveGroupRequest;
+import org.apache.kafka.common.requests.LeaveGroupResponse;
+import org.apache.kafka.common.requests.ListClientMetricsResourcesRequest;
+import org.apache.kafka.common.requests.ListClientMetricsResourcesResponse;
+import org.apache.kafka.common.requests.ListGroupsRequest;
+import org.apache.kafka.common.requests.ListGroupsResponse;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.ListPartitionReassignmentsRequest;
+import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
+import org.apache.kafka.common.requests.ListTransactionsRequest;
+import org.apache.kafka.common.requests.ListTransactionsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
+import org.apache.kafka.common.requests.OffsetDeleteRequest;
+import org.apache.kafka.common.requests.OffsetDeleteResponse;
+import org.apache.kafka.common.requests.OffsetFetchRequest;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.requests.ReadShareGroupStateRequest;
+import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
+import org.apache.kafka.common.requests.ReadShareGroupStateSummaryRequest;
+import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
+import org.apache.kafka.common.requests.RemoveRaftVoterRequest;
+import org.apache.kafka.common.requests.RemoveRaftVoterResponse;
+import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
+import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.SaslAuthenticateRequest;
+import org.apache.kafka.common.requests.SaslAuthenticateResponse;
+import org.apache.kafka.common.requests.SaslHandshakeRequest;
+import org.apache.kafka.common.requests.SaslHandshakeResponse;
+import org.apache.kafka.common.requests.ShareAcknowledgeRequest;
+import org.apache.kafka.common.requests.ShareAcknowledgeResponse;
+import org.apache.kafka.common.requests.ShareFetchRequest;
+import org.apache.kafka.common.requests.ShareFetchResponse;
+import org.apache.kafka.common.requests.ShareGroupDescribeRequest;
+import org.apache.kafka.common.requests.ShareGroupDescribeResponse;
+import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ShareGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.StopReplicaRequest;
+import org.apache.kafka.common.requests.StopReplicaResponse;
+import org.apache.kafka.common.requests.SyncGroupRequest;
+import org.apache.kafka.common.requests.SyncGroupResponse;
+import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
+import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
+import org.apache.kafka.common.requests.UnregisterBrokerRequest;
+import org.apache.kafka.common.requests.UnregisterBrokerResponse;
+import org.apache.kafka.common.requests.UpdateFeaturesRequest;
+import org.apache.kafka.common.requests.UpdateFeaturesResponse;
+import org.apache.kafka.common.requests.UpdateMetadataRequest;
+import org.apache.kafka.common.requests.UpdateMetadataResponse;
+import org.apache.kafka.common.requests.UpdateRaftVoterRequest;
+import org.apache.kafka.common.requests.UpdateRaftVoterResponse;
+import org.apache.kafka.common.requests.VoteRequest;
+import org.apache.kafka.common.requests.VoteResponse;
+import org.apache.kafka.common.requests.WriteShareGroupStateRequest;
+import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
+import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
+import org.apache.kafka.common.requests.WriteTxnMarkersResponse;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.BooleanNode;
+import com.fasterxml.jackson.databind.node.DoubleNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.LongNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+
+import java.util.Optional;
+
+public class RequestConvertToJson {
+
+ public static JsonNode request(AbstractRequest request) {
+ switch (request.apiKey()) {
+ case ADD_OFFSETS_TO_TXN:
+ return AddOffsetsToTxnRequestDataJsonConverter.write(((AddOffsetsToTxnRequest) request).data(), request.version());
+ case ADD_PARTITIONS_TO_TXN:
+ return AddPartitionsToTxnRequestDataJsonConverter.write(((AddPartitionsToTxnRequest) request).data(), request.version());
+ case ALLOCATE_PRODUCER_IDS:
+ return AllocateProducerIdsRequestDataJsonConverter.write(((AllocateProducerIdsRequest) request).data(), request.version());
+ case ALTER_CLIENT_QUOTAS:
+ return AlterClientQuotasRequestDataJsonConverter.write(((AlterClientQuotasRequest) request).data(), request.version());
+ case ALTER_CONFIGS:
+ return AlterConfigsRequestDataJsonConverter.write(((AlterConfigsRequest) request).data(), request.version());
+ case ALTER_PARTITION_REASSIGNMENTS:
+ return AlterPartitionReassignmentsRequestDataJsonConverter.write(((AlterPartitionReassignmentsRequest) request).data(), request.version());
+ case ALTER_PARTITION:
+ return AlterPartitionRequestDataJsonConverter.write(((AlterPartitionRequest) request).data(), request.version());
+ case ALTER_REPLICA_LOG_DIRS:
+ return AlterReplicaLogDirsRequestDataJsonConverter.write(((AlterReplicaLogDirsRequest) request).data(), request.version());
+ case ALTER_USER_SCRAM_CREDENTIALS:
+ return AlterUserScramCredentialsRequestDataJsonConverter.write(((AlterUserScramCredentialsRequest) request).data(), request.version());
+ case API_VERSIONS:
+ return ApiVersionsRequestDataJsonConverter.write(((ApiVersionsRequest) request).data(), request.version());
+ case ASSIGN_REPLICAS_TO_DIRS:
+ return AssignReplicasToDirsRequestDataJsonConverter.write(((AssignReplicasToDirsRequest) request).data(), request.version());
+ case BEGIN_QUORUM_EPOCH:
+ return BeginQuorumEpochRequestDataJsonConverter.write(((BeginQuorumEpochRequest) request).data(), request.version());
+ case BROKER_HEARTBEAT:
+ return BrokerHeartbeatRequestDataJsonConverter.write(((BrokerHeartbeatRequest) request).data(), request.version());
+ case BROKER_REGISTRATION:
+ return BrokerRegistrationRequestDataJsonConverter.write(((BrokerRegistrationRequest) request).data(), request.version());
+ case CONSUMER_GROUP_DESCRIBE:
+ return ConsumerGroupDescribeRequestDataJsonConverter.write(((ConsumerGroupDescribeRequest) request).data(), request.version());
+ case CONSUMER_GROUP_HEARTBEAT:
+ return ConsumerGroupHeartbeatRequestDataJsonConverter.write(((ConsumerGroupHeartbeatRequest) request).data(), request.version());
+ case CONTROLLED_SHUTDOWN:
+ return ControlledShutdownRequestDataJsonConverter.write(((ControlledShutdownRequest) request).data(), request.version());
+ case CONTROLLER_REGISTRATION:
+ return ControllerRegistrationRequestDataJsonConverter.write(((ControllerRegistrationRequest) request).data(), request.version());
+ case CREATE_ACLS:
+ return CreateAclsRequestDataJsonConverter.write(((CreateAclsRequest) request).data(), request.version());
+ case CREATE_DELEGATION_TOKEN:
+ return CreateDelegationTokenRequestDataJsonConverter.write(((CreateDelegationTokenRequest) request).data(), request.version());
+ case CREATE_PARTITIONS:
+ return CreatePartitionsRequestDataJsonConverter.write(((CreatePartitionsRequest) request).data(), request.version());
+ case CREATE_TOPICS:
+ return CreateTopicsRequestDataJsonConverter.write(((CreateTopicsRequest) request).data(), request.version());
+ case DELETE_ACLS:
+ return DeleteAclsRequestDataJsonConverter.write(((DeleteAclsRequest) request).data(), request.version());
+ case DELETE_GROUPS:
+ return DeleteGroupsRequestDataJsonConverter.write(((DeleteGroupsRequest) request).data(), request.version());
+ case DELETE_RECORDS:
+ return DeleteRecordsRequestDataJsonConverter.write(((DeleteRecordsRequest) request).data(), request.version());
+ case DELETE_SHARE_GROUP_STATE:
+ return DeleteShareGroupStateRequestDataJsonConverter.write(((DeleteShareGroupStateRequest) request).data(), request.version());
+ case DELETE_TOPICS:
+ return DeleteTopicsRequestDataJsonConverter.write(((DeleteTopicsRequest) request).data(), request.version());
+ case DESCRIBE_ACLS:
+ return DescribeAclsRequestDataJsonConverter.write(((DescribeAclsRequest) request).data(), request.version());
+ case DESCRIBE_CLIENT_QUOTAS:
+ return DescribeClientQuotasRequestDataJsonConverter.write(((DescribeClientQuotasRequest) request).data(), request.version());
+ case DESCRIBE_CLUSTER:
+ return DescribeClusterRequestDataJsonConverter.write(((DescribeClusterRequest) request).data(), request.version());
+ case DESCRIBE_CONFIGS:
+ return DescribeConfigsRequestDataJsonConverter.write(((DescribeConfigsRequest) request).data(), request.version());
+ case DESCRIBE_DELEGATION_TOKEN:
+ return DescribeDelegationTokenRequestDataJsonConverter.write(((DescribeDelegationTokenRequest) request).data(), request.version());
+ case DESCRIBE_GROUPS:
+ return DescribeGroupsRequestDataJsonConverter.write(((DescribeGroupsRequest) request).data(), request.version());
+ case DESCRIBE_LOG_DIRS:
+ return DescribeLogDirsRequestDataJsonConverter.write(((DescribeLogDirsRequest) request).data(), request.version());
+ case DESCRIBE_PRODUCERS:
+ return DescribeProducersRequestDataJsonConverter.write(((DescribeProducersRequest) request).data(), request.version());
+ case DESCRIBE_QUORUM:
+ return DescribeQuorumRequestDataJsonConverter.write(((DescribeQuorumRequest) request).data(), request.version());
+ case DESCRIBE_TOPIC_PARTITIONS:
+ return DescribeTopicPartitionsRequestDataJsonConverter.write(((DescribeTopicPartitionsRequest) request).data(), request.version());
+ case DESCRIBE_TRANSACTIONS:
+ return DescribeTransactionsRequestDataJsonConverter.write(((DescribeTransactionsRequest) request).data(), request.version());
+ case DESCRIBE_USER_SCRAM_CREDENTIALS:
+ return DescribeUserScramCredentialsRequestDataJsonConverter.write(((DescribeUserScramCredentialsRequest) request).data(), request.version());
+ case ELECT_LEADERS:
+ return ElectLeadersRequestDataJsonConverter.write(((ElectLeadersRequest) request).data(), request.version());
+ case END_QUORUM_EPOCH:
+ return EndQuorumEpochRequestDataJsonConverter.write(((EndQuorumEpochRequest) request).data(), request.version());
+ case END_TXN:
+ return EndTxnRequestDataJsonConverter.write(((EndTxnRequest) request).data(), request.version());
+ case ENVELOPE:
+ return EnvelopeRequestDataJsonConverter.write(((EnvelopeRequest) request).data(), request.version());
+ case EXPIRE_DELEGATION_TOKEN:
+ return ExpireDelegationTokenRequestDataJsonConverter.write(((ExpireDelegationTokenRequest) request).data(), request.version());
+ case FETCH:
+ return FetchRequestDataJsonConverter.write(((FetchRequest) request).data(), request.version());
+ case FETCH_SNAPSHOT:
+ return FetchSnapshotRequestDataJsonConverter.write(((FetchSnapshotRequest) request).data(), request.version());
+ case FIND_COORDINATOR:
+ return FindCoordinatorRequestDataJsonConverter.write(((FindCoordinatorRequest) request).data(), request.version());
+ case GET_TELEMETRY_SUBSCRIPTIONS:
+ return GetTelemetrySubscriptionsRequestDataJsonConverter.write(((GetTelemetrySubscriptionsRequest) request).data(), request.version());
+ case HEARTBEAT:
+ return HeartbeatRequestDataJsonConverter.write(((HeartbeatRequest) request).data(), request.version());
+ case INCREMENTAL_ALTER_CONFIGS:
+ return IncrementalAlterConfigsRequestDataJsonConverter.write(((IncrementalAlterConfigsRequest) request).data(), request.version());
+ case INITIALIZE_SHARE_GROUP_STATE:
+ return InitializeShareGroupStateRequestDataJsonConverter.write(((InitializeShareGroupStateRequest) request).data(), request.version());
+ case INIT_PRODUCER_ID:
+ return InitProducerIdRequestDataJsonConverter.write(((InitProducerIdRequest) request).data(), request.version());
+ case JOIN_GROUP:
+ return JoinGroupRequestDataJsonConverter.write(((JoinGroupRequest) request).data(), request.version());
+ case LEADER_AND_ISR:
+ return LeaderAndIsrRequestDataJsonConverter.write(((LeaderAndIsrRequest) request).data(), request.version());
+ case LEAVE_GROUP:
+ return LeaveGroupRequestDataJsonConverter.write(((LeaveGroupRequest) request).data(), request.version());
+ case LIST_CLIENT_METRICS_RESOURCES:
+ return ListClientMetricsResourcesRequestDataJsonConverter.write(((ListClientMetricsResourcesRequest) request).data(), request.version());
+ case LIST_GROUPS:
+ return ListGroupsRequestDataJsonConverter.write(((ListGroupsRequest) request).data(), request.version());
+ case LIST_OFFSETS:
+ return ListOffsetsRequestDataJsonConverter.write(((ListOffsetsRequest) request).data(), request.version());
+ case LIST_PARTITION_REASSIGNMENTS:
+ return ListPartitionReassignmentsRequestDataJsonConverter.write(((ListPartitionReassignmentsRequest) request).data(), request.version());
+ case LIST_TRANSACTIONS:
+ return ListTransactionsRequestDataJsonConverter.write(((ListTransactionsRequest) request).data(), request.version());
+ case METADATA:
+ return MetadataRequestDataJsonConverter.write(((MetadataRequest) request).data(), request.version());
+ case OFFSET_COMMIT:
+ return OffsetCommitRequestDataJsonConverter.write(((OffsetCommitRequest) request).data(), request.version());
+ case OFFSET_DELETE:
+ return OffsetDeleteRequestDataJsonConverter.write(((OffsetDeleteRequest) request).data(), request.version());
+ case OFFSET_FETCH:
+ return OffsetFetchRequestDataJsonConverter.write(((OffsetFetchRequest) request).data(), request.version());
+ case OFFSET_FOR_LEADER_EPOCH:
+ return OffsetForLeaderEpochRequestDataJsonConverter.write(((OffsetsForLeaderEpochRequest) request).data(), request.version());
+ case PRODUCE:
+ return ProduceRequestDataJsonConverter.write(((ProduceRequest) request).data(), request.version(), false);
+ case PUSH_TELEMETRY:
+ return PushTelemetryRequestDataJsonConverter.write(((PushTelemetryRequest) request).data(), request.version());
+ case READ_SHARE_GROUP_STATE:
+ return ReadShareGroupStateRequestDataJsonConverter.write(((ReadShareGroupStateRequest) request).data(), request.version());
+ case READ_SHARE_GROUP_STATE_SUMMARY:
+ return ReadShareGroupStateSummaryRequestDataJsonConverter.write(((ReadShareGroupStateSummaryRequest) request).data(), request.version());
+ case RENEW_DELEGATION_TOKEN:
+ return RenewDelegationTokenRequestDataJsonConverter.write(((RenewDelegationTokenRequest) request).data(), request.version());
+ case SASL_AUTHENTICATE:
+ return SaslAuthenticateRequestDataJsonConverter.write(((SaslAuthenticateRequest) request).data(), request.version());
+ case SASL_HANDSHAKE:
+ return SaslHandshakeRequestDataJsonConverter.write(((SaslHandshakeRequest) request).data(), request.version());
+ case SHARE_ACKNOWLEDGE:
+ return ShareAcknowledgeRequestDataJsonConverter.write(((ShareAcknowledgeRequest) request).data(), request.version());
+ case SHARE_FETCH:
+ return ShareFetchRequestDataJsonConverter.write(((ShareFetchRequest) request).data(), request.version());
+ case SHARE_GROUP_DESCRIBE:
+ return ShareGroupDescribeRequestDataJsonConverter.write(((ShareGroupDescribeRequest) request).data(), request.version());
+ case SHARE_GROUP_HEARTBEAT:
+ return ShareGroupHeartbeatRequestDataJsonConverter.write(((ShareGroupHeartbeatRequest) request).data(), request.version());
+ case STOP_REPLICA:
+ return StopReplicaRequestDataJsonConverter.write(((StopReplicaRequest) request).data(), request.version());
+ case SYNC_GROUP:
+ return SyncGroupRequestDataJsonConverter.write(((SyncGroupRequest) request).data(), request.version());
+ case TXN_OFFSET_COMMIT:
+ return TxnOffsetCommitRequestDataJsonConverter.write(((TxnOffsetCommitRequest) request).data(), request.version());
+ case UNREGISTER_BROKER:
+ return UnregisterBrokerRequestDataJsonConverter.write(((UnregisterBrokerRequest) request).data(), request.version());
+ case UPDATE_FEATURES:
+ return UpdateFeaturesRequestDataJsonConverter.write(((UpdateFeaturesRequest) request).data(), request.version());
+ case UPDATE_METADATA:
+ return UpdateMetadataRequestDataJsonConverter.write(((UpdateMetadataRequest) request).data(), request.version());
+ case VOTE:
+ return VoteRequestDataJsonConverter.write(((VoteRequest) request).data(), request.version());
+ case WRITE_SHARE_GROUP_STATE:
+ return WriteShareGroupStateRequestDataJsonConverter.write(((WriteShareGroupStateRequest) request).data(), request.version());
+ case WRITE_TXN_MARKERS:
+ return WriteTxnMarkersRequestDataJsonConverter.write(((WriteTxnMarkersRequest) request).data(), request.version());
+ case ADD_RAFT_VOTER:
+ return AddRaftVoterRequestDataJsonConverter.write(((AddRaftVoterRequest) request).data(), request.version());
+ case REMOVE_RAFT_VOTER:
+ return RemoveRaftVoterRequestDataJsonConverter.write(((RemoveRaftVoterRequest) request).data(), request.version());
+ case UPDATE_RAFT_VOTER:
+ return UpdateRaftVoterRequestDataJsonConverter.write(((UpdateRaftVoterRequest) request).data(), request.version());
+ default:
+ throw new IllegalStateException("ApiKey " + request.apiKey() + " is not currently handled in `request`, the " +
+ "code should be updated to do so.");
+ }
+ }
+
+ public static JsonNode response(AbstractResponse response, short version) {
+ switch (response.apiKey()) {
+ case ADD_OFFSETS_TO_TXN:
+ return AddOffsetsToTxnResponseDataJsonConverter.write(((AddOffsetsToTxnResponse) response).data(), version);
+ case ADD_PARTITIONS_TO_TXN:
+ return AddPartitionsToTxnResponseDataJsonConverter.write(((AddPartitionsToTxnResponse) response).data(), version);
+ case ALLOCATE_PRODUCER_IDS:
+ return AllocateProducerIdsResponseDataJsonConverter.write(((AllocateProducerIdsResponse) response).data(), version);
+ case ALTER_CLIENT_QUOTAS:
+ return AlterClientQuotasResponseDataJsonConverter.write(((AlterClientQuotasResponse) response).data(), version);
+ case ALTER_CONFIGS:
+ return AlterConfigsResponseDataJsonConverter.write(((AlterConfigsResponse) response).data(), version);
+ case ALTER_PARTITION_REASSIGNMENTS:
+ return AlterPartitionReassignmentsResponseDataJsonConverter.write(((AlterPartitionReassignmentsResponse) response).data(), version);
+ case ALTER_PARTITION:
+ return AlterPartitionResponseDataJsonConverter.write(((AlterPartitionResponse) response).data(), version);
+ case ALTER_REPLICA_LOG_DIRS:
+ return AlterReplicaLogDirsResponseDataJsonConverter.write(((AlterReplicaLogDirsResponse) response).data(), version);
+ case ALTER_USER_SCRAM_CREDENTIALS:
+ return AlterUserScramCredentialsResponseDataJsonConverter.write(((AlterUserScramCredentialsResponse) response).data(), version);
+ case API_VERSIONS:
+ return ApiVersionsResponseDataJsonConverter.write(((ApiVersionsResponse) response).data(), version);
+ case ASSIGN_REPLICAS_TO_DIRS:
+ return AssignReplicasToDirsResponseDataJsonConverter.write(((AssignReplicasToDirsResponse) response).data(), version);
+ case BEGIN_QUORUM_EPOCH:
+ return BeginQuorumEpochResponseDataJsonConverter.write(((BeginQuorumEpochResponse) response).data(), version);
+ case BROKER_HEARTBEAT:
+ return BrokerHeartbeatResponseDataJsonConverter.write(((BrokerHeartbeatResponse) response).data(), version);
+ case BROKER_REGISTRATION:
+ return BrokerRegistrationResponseDataJsonConverter.write(((BrokerRegistrationResponse) response).data(), version);
+ case CONSUMER_GROUP_DESCRIBE:
+ return ConsumerGroupDescribeResponseDataJsonConverter.write(((ConsumerGroupDescribeResponse) response).data(), version);
+ case CONSUMER_GROUP_HEARTBEAT:
+ return ConsumerGroupHeartbeatResponseDataJsonConverter.write(((ConsumerGroupHeartbeatResponse) response).data(), version);
+ case CONTROLLED_SHUTDOWN:
+ return ControlledShutdownResponseDataJsonConverter.write(((ControlledShutdownResponse) response).data(), version);
+ case CONTROLLER_REGISTRATION:
+ return ControllerRegistrationResponseDataJsonConverter.write(((ControllerRegistrationResponse) response).data(), version);
+ case CREATE_ACLS:
+ return CreateAclsResponseDataJsonConverter.write(((CreateAclsResponse) response).data(), version);
+ case CREATE_DELEGATION_TOKEN:
+ return CreateDelegationTokenResponseDataJsonConverter.write(((CreateDelegationTokenResponse) response).data(), version);
+ case CREATE_PARTITIONS:
+ return CreatePartitionsResponseDataJsonConverter.write(((CreatePartitionsResponse) response).data(), version);
+ case CREATE_TOPICS:
+ return CreateTopicsResponseDataJsonConverter.write(((CreateTopicsResponse) response).data(), version);
+ case DELETE_ACLS:
+ return DeleteAclsResponseDataJsonConverter.write(((DeleteAclsResponse) response).data(), version);
+ case DELETE_GROUPS:
+ return DeleteGroupsResponseDataJsonConverter.write(((DeleteGroupsResponse) response).data(), version);
+ case DELETE_RECORDS:
+ return DeleteRecordsResponseDataJsonConverter.write(((DeleteRecordsResponse) response).data(), version);
+ case DELETE_SHARE_GROUP_STATE:
+ return DeleteShareGroupStateResponseDataJsonConverter.write(((DeleteShareGroupStateResponse) response).data(), version);
+ case DELETE_TOPICS:
+ return DeleteTopicsResponseDataJsonConverter.write(((DeleteTopicsResponse) response).data(), version);
+ case DESCRIBE_ACLS:
+ return DescribeAclsResponseDataJsonConverter.write(((DescribeAclsResponse) response).data(), version);
+ case DESCRIBE_CLIENT_QUOTAS:
+ return DescribeClientQuotasResponseDataJsonConverter.write(((DescribeClientQuotasResponse) response).data(), version);
+ case DESCRIBE_CLUSTER:
+ return DescribeClusterResponseDataJsonConverter.write(((DescribeClusterResponse) response).data(), version);
+ case DESCRIBE_CONFIGS:
+ return DescribeConfigsResponseDataJsonConverter.write(((DescribeConfigsResponse) response).data(), version);
+ case DESCRIBE_DELEGATION_TOKEN:
+ return DescribeDelegationTokenResponseDataJsonConverter.write(((DescribeDelegationTokenResponse) response).data(), version);
+ case DESCRIBE_GROUPS:
+ return DescribeGroupsResponseDataJsonConverter.write(((DescribeGroupsResponse) response).data(), version);
+ case DESCRIBE_LOG_DIRS:
+ return DescribeLogDirsResponseDataJsonConverter.write(((DescribeLogDirsResponse) response).data(), version);
+ case DESCRIBE_PRODUCERS:
+ return DescribeProducersResponseDataJsonConverter.write(((DescribeProducersResponse) response).data(), version);
+ case DESCRIBE_QUORUM:
+ return DescribeQuorumResponseDataJsonConverter.write(((DescribeQuorumResponse) response).data(), version);
+ case DESCRIBE_TOPIC_PARTITIONS:
+ return DescribeTopicPartitionsResponseDataJsonConverter.write(((DescribeTopicPartitionsResponse) response).data(), version);
+ case DESCRIBE_TRANSACTIONS:
+ return DescribeTransactionsResponseDataJsonConverter.write(((DescribeTransactionsResponse) response).data(), version);
+ case DESCRIBE_USER_SCRAM_CREDENTIALS:
+ return DescribeUserScramCredentialsResponseDataJsonConverter.write(((DescribeUserScramCredentialsResponse) response).data(), version);
+ case ELECT_LEADERS:
+ return ElectLeadersResponseDataJsonConverter.write(((ElectLeadersResponse) response).data(), version);
+ case END_QUORUM_EPOCH:
+ return EndQuorumEpochResponseDataJsonConverter.write(((EndQuorumEpochResponse) response).data(), version);
+ case END_TXN:
+ return EndTxnResponseDataJsonConverter.write(((EndTxnResponse) response).data(), version);
+ case ENVELOPE:
+ return EnvelopeResponseDataJsonConverter.write(((EnvelopeResponse) response).data(), version);
+ case EXPIRE_DELEGATION_TOKEN:
+ return ExpireDelegationTokenResponseDataJsonConverter.write(((ExpireDelegationTokenResponse) response).data(), version);
+ case FETCH:
+ return FetchResponseDataJsonConverter.write(((FetchResponse) response).data(), version, false);
+ case FETCH_SNAPSHOT:
+ return FetchSnapshotResponseDataJsonConverter.write(((FetchSnapshotResponse) response).data(), version);
+ case FIND_COORDINATOR:
+ return FindCoordinatorResponseDataJsonConverter.write(((FindCoordinatorResponse) response).data(), version);
+ case GET_TELEMETRY_SUBSCRIPTIONS:
+ return GetTelemetrySubscriptionsResponseDataJsonConverter.write(((GetTelemetrySubscriptionsResponse) response).data(), version);
+ case HEARTBEAT:
+ return HeartbeatResponseDataJsonConverter.write(((HeartbeatResponse) response).data(), version);
+ case INCREMENTAL_ALTER_CONFIGS:
+ return IncrementalAlterConfigsResponseDataJsonConverter.write(((IncrementalAlterConfigsResponse) response).data(), version);
+ case INITIALIZE_SHARE_GROUP_STATE:
+ return InitializeShareGroupStateResponseDataJsonConverter.write(((InitializeShareGroupStateResponse) response).data(), version);
+ case INIT_PRODUCER_ID:
+ return InitProducerIdResponseDataJsonConverter.write(((InitProducerIdResponse) response).data(), version);
+ case JOIN_GROUP:
+ return JoinGroupResponseDataJsonConverter.write(((JoinGroupResponse) response).data(), version);
+ case LEADER_AND_ISR:
+ return LeaderAndIsrResponseDataJsonConverter.write(((LeaderAndIsrResponse) response).data(), version);
+ case LEAVE_GROUP:
+ return LeaveGroupResponseDataJsonConverter.write(((LeaveGroupResponse) response).data(), version);
+ case LIST_CLIENT_METRICS_RESOURCES:
+ return ListClientMetricsResourcesResponseDataJsonConverter.write(((ListClientMetricsResourcesResponse) response).data(), version);
+ case LIST_GROUPS:
+ return ListGroupsResponseDataJsonConverter.write(((ListGroupsResponse) response).data(), version);
+ case LIST_OFFSETS:
+ return ListOffsetsResponseDataJsonConverter.write(((ListOffsetsResponse) response).data(), version);
+ case LIST_PARTITION_REASSIGNMENTS:
+ return ListPartitionReassignmentsResponseDataJsonConverter.write(((ListPartitionReassignmentsResponse) response).data(), version);
+ case LIST_TRANSACTIONS:
+ return ListTransactionsResponseDataJsonConverter.write(((ListTransactionsResponse) response).data(), version);
+ case METADATA:
+ return MetadataResponseDataJsonConverter.write(((MetadataResponse) response).data(), version);
+ case OFFSET_COMMIT:
+ return OffsetCommitResponseDataJsonConverter.write(((OffsetCommitResponse) response).data(), version);
+ case OFFSET_DELETE:
+ return OffsetDeleteResponseDataJsonConverter.write(((OffsetDeleteResponse) response).data(), version);
+ case OFFSET_FETCH:
+ return OffsetFetchResponseDataJsonConverter.write(((OffsetFetchResponse) response).data(), version);
+ case OFFSET_FOR_LEADER_EPOCH:
+ return OffsetForLeaderEpochResponseDataJsonConverter.write(((OffsetsForLeaderEpochResponse) response).data(), version);
+ case PRODUCE:
+ return ProduceResponseDataJsonConverter.write(((ProduceResponse) response).data(), version);
+ case PUSH_TELEMETRY:
+ return PushTelemetryResponseDataJsonConverter.write(((PushTelemetryResponse) response).data(), version);
+ case READ_SHARE_GROUP_STATE:
+ return ReadShareGroupStateResponseDataJsonConverter.write(((ReadShareGroupStateResponse) response).data(), version);
+ case READ_SHARE_GROUP_STATE_SUMMARY:
+ return ReadShareGroupStateSummaryResponseDataJsonConverter.write(((ReadShareGroupStateSummaryResponse) response).data(), version);
+ case RENEW_DELEGATION_TOKEN:
+ return RenewDelegationTokenResponseDataJsonConverter.write(((RenewDelegationTokenResponse) response).data(), version);
+ case SASL_AUTHENTICATE:
+ return SaslAuthenticateResponseDataJsonConverter.write(((SaslAuthenticateResponse) response).data(), version);
+ case SASL_HANDSHAKE:
+ return SaslHandshakeResponseDataJsonConverter.write(((SaslHandshakeResponse) response).data(), version);
+ case SHARE_ACKNOWLEDGE:
+ return ShareAcknowledgeResponseDataJsonConverter.write(((ShareAcknowledgeResponse) response).data(), version);
+ case SHARE_FETCH:
+ return ShareFetchResponseDataJsonConverter.write(((ShareFetchResponse) response).data(), version);
+ case SHARE_GROUP_DESCRIBE:
+ return ShareGroupDescribeResponseDataJsonConverter.write(((ShareGroupDescribeResponse) response).data(), version);
+ case SHARE_GROUP_HEARTBEAT:
+ return ShareGroupHeartbeatResponseDataJsonConverter.write(((ShareGroupHeartbeatResponse) response).data(), version);
+ case STOP_REPLICA:
+ return StopReplicaResponseDataJsonConverter.write(((StopReplicaResponse) response).data(), version);
+ case SYNC_GROUP:
+ return SyncGroupResponseDataJsonConverter.write(((SyncGroupResponse) response).data(), version);
+ case TXN_OFFSET_COMMIT:
+ return TxnOffsetCommitResponseDataJsonConverter.write(((TxnOffsetCommitResponse) response).data(), version);
+ case UNREGISTER_BROKER:
+ return UnregisterBrokerResponseDataJsonConverter.write(((UnregisterBrokerResponse) response).data(), version);
+ case UPDATE_FEATURES:
+ return UpdateFeaturesResponseDataJsonConverter.write(((UpdateFeaturesResponse) response).data(), version);
+ case UPDATE_METADATA:
+ return UpdateMetadataResponseDataJsonConverter.write(((UpdateMetadataResponse) response).data(), version);
+ case VOTE:
+ return VoteResponseDataJsonConverter.write(((VoteResponse) response).data(), version);
+ case WRITE_SHARE_GROUP_STATE:
+ return WriteShareGroupStateResponseDataJsonConverter.write(((WriteShareGroupStateResponse) response).data(), version);
+ case WRITE_TXN_MARKERS:
+ return WriteTxnMarkersResponseDataJsonConverter.write(((WriteTxnMarkersResponse) response).data(), version);
+ case ADD_RAFT_VOTER:
+ return AddRaftVoterResponseDataJsonConverter.write(((AddRaftVoterResponse) response).data(), version);
+ case REMOVE_RAFT_VOTER:
+ return RemoveRaftVoterResponseDataJsonConverter.write(((RemoveRaftVoterResponse) response).data(), version);
+ case UPDATE_RAFT_VOTER:
+ return UpdateRaftVoterResponseDataJsonConverter.write(((UpdateRaftVoterResponse) response).data(), version);
+ default:
+ throw new IllegalStateException("ApiKey " + response.apiKey() + " is not currently handled in `response`, the " +
+ "code should be updated to do so.");
+ }
+ }
+
+ public static JsonNode requestHeaderNode(RequestHeader header) {
+ ObjectNode node = (ObjectNode) RequestHeaderDataJsonConverter.write(
+ header.data(), header.headerVersion(), false
+ );
+ node.set("requestApiKeyName", new TextNode(header.apiKey().toString()));
+ if (header.apiKey().isVersionDeprecated(header.apiVersion())) {
+ node.set("requestApiVersionDeprecated", BooleanNode.TRUE);
+ }
+ return node;
+ }
+
+ public static JsonNode requestDesc(RequestHeader header, Optional requestNode, boolean isForwarded) {
+ ObjectNode node = JsonNodeFactory.instance.objectNode();
+ node.set("isForwarded", isForwarded ? BooleanNode.TRUE : BooleanNode.FALSE);
+ node.set("requestHeader", requestHeaderNode(header));
+ node.set("request", requestNode.orElse(new TextNode("")));
+ return node;
+ }
+
+ public static JsonNode clientInfoNode(ClientInformation clientInfo) {
+ ObjectNode node = JsonNodeFactory.instance.objectNode();
+ node.set("softwareName", new TextNode(clientInfo.softwareName()));
+ node.set("softwareVersion", new TextNode(clientInfo.softwareVersion()));
+ return node;
+ }
+
+ public static JsonNode requestDescMetrics(RequestHeader header, Optional requestNode, Optional responseNode,
+ RequestContext context, Session session, boolean isForwarded,
+ double totalTimeMs, double requestQueueTimeMs, double apiLocalTimeMs,
+ double apiRemoteTimeMs, long apiThrottleTimeMs, double responseQueueTimeMs,
+ double responseSendTimeMs, long temporaryMemoryBytes,
+ double messageConversionsTimeMs) {
+ ObjectNode node = (ObjectNode) requestDesc(header, requestNode, isForwarded);
+ node.set("response", responseNode.orElse(new TextNode("")));
+ node.set("connection", new TextNode(context.connectionId));
+ node.set("totalTimeMs", new DoubleNode(totalTimeMs));
+ node.set("requestQueueTimeMs", new DoubleNode(requestQueueTimeMs));
+ node.set("localTimeMs", new DoubleNode(apiLocalTimeMs));
+ node.set("remoteTimeMs", new DoubleNode(apiRemoteTimeMs));
+ node.set("throttleTimeMs", new LongNode(apiThrottleTimeMs));
+ node.set("responseQueueTimeMs", new DoubleNode(responseQueueTimeMs));
+ node.set("sendTimeMs", new DoubleNode(responseSendTimeMs));
+ node.set("securityProtocol", new TextNode(context.securityProtocol.toString()));
+ node.set("principal", new TextNode(session.principal.toString()));
+ node.set("listener", new TextNode(context.listenerName.value()));
+ node.set("clientInformation", clientInfoNode(context.clientInformation));
+ if (temporaryMemoryBytes > 0) {
+ node.set("temporaryMemoryBytes", new LongNode(temporaryMemoryBytes));
+ }
+ if (messageConversionsTimeMs > 0) {
+ node.set("messageConversionsTime", new DoubleNode(messageConversionsTimeMs));
+ }
+ return node;
+ }
+}
diff --git a/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java b/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java
new file mode 100644
index 00000000000..56c5a822432
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.network;
+
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.DescribeAclsRequestData;
+import org.apache.kafka.common.message.DescribeLogDirsResponseData;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.MessageUtil;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class RequestConvertToJsonTest {
+
+ @Test
+ public void testAllRequestTypesHandled() {
+ List unhandledKeys = new ArrayList<>();
+ for (ApiKeys key : ApiKeys.values()) {
+ short version = key.latestVersion();
+ ApiMessage message;
+ if (key == ApiKeys.DESCRIBE_ACLS) {
+ message = ApiMessageType.fromApiKey(key.id).newRequest();
+ DescribeAclsRequestData requestData = (DescribeAclsRequestData) message;
+ requestData.setPatternTypeFilter((byte) 1);
+ requestData.setResourceTypeFilter((byte) 1);
+ requestData.setPermissionType((byte) 1);
+ requestData.setOperation((byte) 1);
+ } else {
+ message = ApiMessageType.fromApiKey(key.id).newRequest();
+ }
+ ByteBuffer bytes = MessageUtil.toByteBuffer(message, version);
+ AbstractRequest req = AbstractRequest.parseRequest(key, version, bytes).request;
+ try {
+ RequestConvertToJson.request(req);
+ } catch (IllegalStateException e) {
+ unhandledKeys.add(key.toString());
+ }
+ }
+ assertEquals(Collections.emptyList(), unhandledKeys, "Unhandled request keys");
+ }
+
+ @Test
+ public void testAllApiVersionsResponseHandled() {
+ for (ApiKeys key : ApiKeys.values()) {
+ List unhandledVersions = new ArrayList<>();
+ for (short version : key.allVersions()) {
+ ApiMessage message;
+ // Specify top-level error handling for verifying compatibility across versions
+ if (key == ApiKeys.DESCRIBE_LOG_DIRS) {
+ message = ApiMessageType.fromApiKey(key.id).newResponse();
+ DescribeLogDirsResponseData responseData = (DescribeLogDirsResponseData) message;
+ responseData.setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
+ } else {
+ message = ApiMessageType.fromApiKey(key.id).newResponse();
+ }
+
+ ByteBuffer bytes = MessageUtil.toByteBuffer(message, version);
+ AbstractResponse response = AbstractResponse.parseResponse(key, bytes, version);
+ try {
+ RequestConvertToJson.response(response, version);
+ } catch (IllegalStateException e) {
+ unhandledVersions.add(version);
+ }
+ }
+ assertEquals(new ArrayList<>(), unhandledVersions, "API: " + key + " - Unhandled request versions");
+ }
+ }
+
+ @Test
+ public void testAllResponseTypesHandled() {
+ List unhandledKeys = new ArrayList<>();
+ for (ApiKeys key : ApiKeys.values()) {
+ short version = key.latestVersion();
+ ApiMessage message = ApiMessageType.fromApiKey(key.id).newResponse();
+ ByteBuffer bytes = MessageUtil.toByteBuffer(message, version);
+ AbstractResponse res = AbstractResponse.parseResponse(key, bytes, version);
+ try {
+ RequestConvertToJson.response(res, version);
+ } catch (IllegalStateException e) {
+ unhandledKeys.add(key.toString());
+ }
+ }
+ assertEquals(Collections.emptyList(), unhandledKeys, "Unhandled response keys");
+ }
+
+ @Test
+ public void testClientInfoNode() {
+ ClientInformation clientInfo = new ClientInformation("name", "1");
+ ObjectNode expectedNode = JsonNodeFactory.instance.objectNode();
+ expectedNode.set("softwareName", new TextNode(clientInfo.softwareName()));
+ expectedNode.set("softwareVersion", new TextNode(clientInfo.softwareVersion()));
+ JsonNode actualNode = RequestConvertToJson.clientInfoNode(clientInfo);
+ assertEquals(expectedNode, actualNode);
+ }
+}