From 0c9df75295fd448e89e1d2751a129b89597e2731 Mon Sep 17 00:00:00 2001 From: Ken Huang Date: Fri, 24 Jan 2025 21:53:32 +0800 Subject: [PATCH] KAFKA-18474: Remove zkBroker listener (#18477) Reviewers: Ismael Juma , Chia-Ping Tsai , PoAn Yang --- .../apache/kafka/common/protocol/ApiKeys.java | 8 +--- .../message/AddOffsetsToTxnRequest.json | 2 +- .../message/AddPartitionsToTxnRequest.json | 2 +- .../message/AllocateProducerIdsRequest.json | 2 +- .../message/AlterClientQuotasRequest.json | 2 +- .../common/message/AlterConfigsRequest.json | 2 +- .../AlterPartitionReassignmentsRequest.json | 2 +- .../common/message/AlterPartitionRequest.json | 2 +- .../message/AlterReplicaLogDirsRequest.json | 2 +- .../AlterUserScramCredentialsRequest.json | 2 +- .../common/message/ApiVersionsRequest.json | 2 +- .../common/message/CreateAclsRequest.json | 2 +- .../message/CreateDelegationTokenRequest.json | 2 +- .../message/CreatePartitionsRequest.json | 2 +- .../common/message/CreateTopicsRequest.json | 2 +- .../common/message/DeleteAclsRequest.json | 2 +- .../common/message/DeleteGroupsRequest.json | 2 +- .../common/message/DeleteRecordsRequest.json | 2 +- .../common/message/DeleteTopicsRequest.json | 2 +- .../common/message/DescribeAclsRequest.json | 2 +- .../message/DescribeClientQuotasRequest.json | 2 +- .../message/DescribeClusterRequest.json | 2 +- .../message/DescribeConfigsRequest.json | 2 +- .../DescribeDelegationTokenRequest.json | 2 +- .../common/message/DescribeGroupsRequest.json | 2 +- .../message/DescribeLogDirsRequest.json | 2 +- .../message/DescribeProducersRequest.json | 2 +- .../message/DescribeTransactionsRequest.json | 2 +- .../DescribeUserScramCredentialsRequest.json | 2 +- .../common/message/ElectLeadersRequest.json | 2 +- .../common/message/EndTxnRequest.json | 2 +- .../common/message/EnvelopeRequest.json | 2 +- .../message/ExpireDelegationTokenRequest.json | 2 +- .../common/message/FetchRequest.json | 2 +- .../message/FindCoordinatorRequest.json | 2 +- .../common/message/HeartbeatRequest.json | 2 +- .../IncrementalAlterConfigsRequest.json | 2 +- .../common/message/InitProducerIdRequest.json | 2 +- .../common/message/JoinGroupRequest.json | 2 +- .../common/message/LeaveGroupRequest.json | 2 +- .../common/message/ListGroupsRequest.json | 2 +- .../common/message/ListOffsetsRequest.json | 2 +- .../ListPartitionReassignmentsRequest.json | 2 +- .../message/ListTransactionsRequest.json | 2 +- .../common/message/MetadataRequest.json | 2 +- .../common/message/OffsetCommitRequest.json | 2 +- .../common/message/OffsetDeleteRequest.json | 2 +- .../common/message/OffsetFetchRequest.json | 2 +- .../message/OffsetForLeaderEpochRequest.json | 2 +- .../common/message/ProduceRequest.json | 2 +- .../message/RenewDelegationTokenRequest.json | 2 +- .../message/SaslAuthenticateRequest.json | 2 +- .../common/message/SaslHandshakeRequest.json | 2 +- .../common/message/SyncGroupRequest.json | 2 +- .../message/TxnOffsetCommitRequest.json | 2 +- .../common/message/UpdateFeaturesRequest.json | 2 +- .../message/WriteTxnMarkersRequest.json | 2 +- .../kafka/clients/NetworkClientTest.java | 4 +- .../clients/admin/KafkaAdminClientTest.java | 2 +- .../internals/FetchRequestManagerTest.java | 2 +- .../consumer/internals/FetcherTest.java | 2 +- .../producer/internals/SenderTest.java | 2 +- .../kafka/common/network/NioEchoServer.java | 2 +- .../network/SaslChannelBuilderTest.java | 2 +- .../common/network/SslTransportLayerTest.java | 2 +- .../requests/ApiVersionsResponseTest.java | 42 +++++++++---------- .../common/requests/RequestResponseTest.java | 4 +- .../authenticator/SaslAuthenticatorTest.java | 6 +-- .../SaslServerAuthenticatorTest.java | 2 +- .../server/GssapiAuthenticationTest.scala | 2 +- .../kafka/server/ApiVersionManagerTest.scala | 2 +- .../unit/kafka/server/RequestQuotaTest.scala | 6 +-- .../kafka/message/RequestListenerType.java | 2 - 73 files changed, 99 insertions(+), 105 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index e33dd2589ae..1f8a98554c2 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -332,11 +332,7 @@ public enum ApiKeys { return hasBuffer.get(); } - public static EnumSet zkBrokerApis() { - return apisForListener(ApiMessageType.ListenerType.ZK_BROKER); - } - - public static EnumSet kraftBrokerApis() { + public static EnumSet brokerApis() { return apisForListener(ApiMessageType.ListenerType.BROKER); } @@ -346,7 +342,7 @@ public enum ApiKeys { public static EnumSet clientApis() { List apis = Arrays.stream(ApiKeys.values()) - .filter(apiKey -> apiKey.inScope(ApiMessageType.ListenerType.ZK_BROKER) || apiKey.inScope(ApiMessageType.ListenerType.BROKER)) + .filter(apiKey -> apiKey.inScope(ApiMessageType.ListenerType.BROKER)) .collect(Collectors.toList()); return EnumSet.copyOf(apis); } diff --git a/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json b/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json index 157ae20c0a4..9bebc8366cf 100644 --- a/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json +++ b/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json @@ -16,7 +16,7 @@ { "apiKey": 25, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "AddOffsetsToTxnRequest", // Version 1 is the same as version 0. // diff --git a/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json b/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json index 2ed84be2180..68a45cdd0ac 100644 --- a/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json +++ b/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json @@ -16,7 +16,7 @@ { "apiKey": 24, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "AddPartitionsToTxnRequest", // Version 1 is the same as version 0. // diff --git a/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json b/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json index e8271c60321..9447b080f84 100644 --- a/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json +++ b/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 67, "type": "request", - "listeners": ["zkBroker", "controller"], + "listeners": ["controller"], "name": "AllocateProducerIdsRequest", "validVersions": "0", "flexibleVersions": "0+", diff --git a/clients/src/main/resources/common/message/AlterClientQuotasRequest.json b/clients/src/main/resources/common/message/AlterClientQuotasRequest.json index 6bfdc925c29..93524a80e21 100644 --- a/clients/src/main/resources/common/message/AlterClientQuotasRequest.json +++ b/clients/src/main/resources/common/message/AlterClientQuotasRequest.json @@ -16,7 +16,7 @@ { "apiKey": 49, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["broker", "controller"], "name": "AlterClientQuotasRequest", "validVersions": "0-1", // Version 1 enables flexible versions. diff --git a/clients/src/main/resources/common/message/AlterConfigsRequest.json b/clients/src/main/resources/common/message/AlterConfigsRequest.json index 31057e3410a..b87091f2e5c 100644 --- a/clients/src/main/resources/common/message/AlterConfigsRequest.json +++ b/clients/src/main/resources/common/message/AlterConfigsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 33, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["broker", "controller"], "name": "AlterConfigsRequest", // Version 1 is the same as version 0. // Version 2 enables flexible versions. diff --git a/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json b/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json index 47043ff02d0..f3047feb0a3 100644 --- a/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json +++ b/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 45, "type": "request", - "listeners": ["broker", "controller", "zkBroker"], + "listeners": ["broker", "controller"], "name": "AlterPartitionReassignmentsRequest", "validVersions": "0", "flexibleVersions": "0+", diff --git a/clients/src/main/resources/common/message/AlterPartitionRequest.json b/clients/src/main/resources/common/message/AlterPartitionRequest.json index 954b98578af..d22f3eb13ad 100644 --- a/clients/src/main/resources/common/message/AlterPartitionRequest.json +++ b/clients/src/main/resources/common/message/AlterPartitionRequest.json @@ -16,7 +16,7 @@ { "apiKey": 56, "type": "request", - "listeners": ["zkBroker", "controller"], + "listeners": ["controller"], "name": "AlterPartitionRequest", // Version 1 adds LeaderRecoveryState field (KIP-704). // diff --git a/clients/src/main/resources/common/message/AlterReplicaLogDirsRequest.json b/clients/src/main/resources/common/message/AlterReplicaLogDirsRequest.json index b309243fb62..42ef6693325 100644 --- a/clients/src/main/resources/common/message/AlterReplicaLogDirsRequest.json +++ b/clients/src/main/resources/common/message/AlterReplicaLogDirsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 34, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "AlterReplicaLogDirsRequest", // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline. // diff --git a/clients/src/main/resources/common/message/AlterUserScramCredentialsRequest.json b/clients/src/main/resources/common/message/AlterUserScramCredentialsRequest.json index 8937394ef6e..ea687072f16 100644 --- a/clients/src/main/resources/common/message/AlterUserScramCredentialsRequest.json +++ b/clients/src/main/resources/common/message/AlterUserScramCredentialsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 51, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["broker", "controller"], "name": "AlterUserScramCredentialsRequest", "validVersions": "0", "flexibleVersions": "0+", diff --git a/clients/src/main/resources/common/message/ApiVersionsRequest.json b/clients/src/main/resources/common/message/ApiVersionsRequest.json index 050dbcfd3f2..56170c96673 100644 --- a/clients/src/main/resources/common/message/ApiVersionsRequest.json +++ b/clients/src/main/resources/common/message/ApiVersionsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 18, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["broker", "controller"], "name": "ApiVersionsRequest", // Versions 0 through 2 of ApiVersionsRequest are the same. // diff --git a/clients/src/main/resources/common/message/CreateAclsRequest.json b/clients/src/main/resources/common/message/CreateAclsRequest.json index 0f458203074..d3a028b0536 100644 --- a/clients/src/main/resources/common/message/CreateAclsRequest.json +++ b/clients/src/main/resources/common/message/CreateAclsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 30, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["broker", "controller"], "name": "CreateAclsRequest", // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline. // Version 1 adds resource pattern type. diff --git a/clients/src/main/resources/common/message/CreateDelegationTokenRequest.json b/clients/src/main/resources/common/message/CreateDelegationTokenRequest.json index 276cbe57901..f4c586cbcad 100644 --- a/clients/src/main/resources/common/message/CreateDelegationTokenRequest.json +++ b/clients/src/main/resources/common/message/CreateDelegationTokenRequest.json @@ -16,7 +16,7 @@ { "apiKey": 38, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["broker", "controller"], "name": "CreateDelegationTokenRequest", // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline. // diff --git a/clients/src/main/resources/common/message/CreatePartitionsRequest.json b/clients/src/main/resources/common/message/CreatePartitionsRequest.json index 6e249498659..95552a080b9 100644 --- a/clients/src/main/resources/common/message/CreatePartitionsRequest.json +++ b/clients/src/main/resources/common/message/CreatePartitionsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 37, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["broker", "controller"], "name": "CreatePartitionsRequest", // Version 1 is the same as version 0. // diff --git a/clients/src/main/resources/common/message/CreateTopicsRequest.json b/clients/src/main/resources/common/message/CreateTopicsRequest.json index 9aed4d236db..3ee03933572 100644 --- a/clients/src/main/resources/common/message/CreateTopicsRequest.json +++ b/clients/src/main/resources/common/message/CreateTopicsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 19, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["broker", "controller"], "name": "CreateTopicsRequest", // Versions 0-1 were removed in Apache Kafka 4.0, Version 2 is the new baseline. // diff --git a/clients/src/main/resources/common/message/DeleteAclsRequest.json b/clients/src/main/resources/common/message/DeleteAclsRequest.json index b430364d861..db605305ae2 100644 --- a/clients/src/main/resources/common/message/DeleteAclsRequest.json +++ b/clients/src/main/resources/common/message/DeleteAclsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 31, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["broker", "controller"], "name": "DeleteAclsRequest", // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline. // Version 1 adds the pattern type. diff --git a/clients/src/main/resources/common/message/DeleteGroupsRequest.json b/clients/src/main/resources/common/message/DeleteGroupsRequest.json index 1ac6a053e63..7d7c4371789 100644 --- a/clients/src/main/resources/common/message/DeleteGroupsRequest.json +++ b/clients/src/main/resources/common/message/DeleteGroupsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 42, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "DeleteGroupsRequest", // Version 1 is the same as version 0. // diff --git a/clients/src/main/resources/common/message/DeleteRecordsRequest.json b/clients/src/main/resources/common/message/DeleteRecordsRequest.json index 06a12d85c8b..fc697944a02 100644 --- a/clients/src/main/resources/common/message/DeleteRecordsRequest.json +++ b/clients/src/main/resources/common/message/DeleteRecordsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 21, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "DeleteRecordsRequest", // Version 1 is the same as version 0. diff --git a/clients/src/main/resources/common/message/DeleteTopicsRequest.json b/clients/src/main/resources/common/message/DeleteTopicsRequest.json index 465d9e0b31f..35c9eb28f9c 100644 --- a/clients/src/main/resources/common/message/DeleteTopicsRequest.json +++ b/clients/src/main/resources/common/message/DeleteTopicsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 20, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["broker", "controller"], "name": "DeleteTopicsRequest", // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline. // Versions 0, 1, 2, and 3 are the same. diff --git a/clients/src/main/resources/common/message/DescribeAclsRequest.json b/clients/src/main/resources/common/message/DescribeAclsRequest.json index a9bdfba40e7..23883c154fe 100644 --- a/clients/src/main/resources/common/message/DescribeAclsRequest.json +++ b/clients/src/main/resources/common/message/DescribeAclsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 29, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["broker", "controller"], "name": "DescribeAclsRequest", // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline. // Version 1 adds resource pattern type. diff --git a/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json b/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json index d14cfc95733..6644e53343b 100644 --- a/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json +++ b/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json @@ -16,7 +16,7 @@ { "apiKey": 48, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "DescribeClientQuotasRequest", // Version 1 enables flexible versions. "validVersions": "0-1", diff --git a/clients/src/main/resources/common/message/DescribeClusterRequest.json b/clients/src/main/resources/common/message/DescribeClusterRequest.json index 71e00df09b2..9c17c6b1ba5 100644 --- a/clients/src/main/resources/common/message/DescribeClusterRequest.json +++ b/clients/src/main/resources/common/message/DescribeClusterRequest.json @@ -16,7 +16,7 @@ { "apiKey": 60, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["broker", "controller"], "name": "DescribeClusterRequest", // // Version 1 adds EndpointType for KIP-919 support. diff --git a/clients/src/main/resources/common/message/DescribeConfigsRequest.json b/clients/src/main/resources/common/message/DescribeConfigsRequest.json index a382d9fecaf..d1a85a67fea 100644 --- a/clients/src/main/resources/common/message/DescribeConfigsRequest.json +++ b/clients/src/main/resources/common/message/DescribeConfigsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 32, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["broker", "controller"], "name": "DescribeConfigsRequest", // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline. // Version 1 adds IncludeSynonyms and removes IsDefault. diff --git a/clients/src/main/resources/common/message/DescribeDelegationTokenRequest.json b/clients/src/main/resources/common/message/DescribeDelegationTokenRequest.json index d62eb28a29f..bc29789b689 100644 --- a/clients/src/main/resources/common/message/DescribeDelegationTokenRequest.json +++ b/clients/src/main/resources/common/message/DescribeDelegationTokenRequest.json @@ -16,7 +16,7 @@ { "apiKey": 41, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["broker", "controller"], "name": "DescribeDelegationTokenRequest", // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline. // Version 1 is the same as version 0. diff --git a/clients/src/main/resources/common/message/DescribeGroupsRequest.json b/clients/src/main/resources/common/message/DescribeGroupsRequest.json index 8dabf71bd52..cec56852cad 100644 --- a/clients/src/main/resources/common/message/DescribeGroupsRequest.json +++ b/clients/src/main/resources/common/message/DescribeGroupsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 15, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "DescribeGroupsRequest", // Versions 1 and 2 are the same as version 0. // diff --git a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json index 115947ff394..4f3bfa2c58c 100644 --- a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json +++ b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 35, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "DescribeLogDirsRequest", // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline. // Version 1 is the same as version 0. diff --git a/clients/src/main/resources/common/message/DescribeProducersRequest.json b/clients/src/main/resources/common/message/DescribeProducersRequest.json index 7a54c65622d..b7889ef1f1e 100644 --- a/clients/src/main/resources/common/message/DescribeProducersRequest.json +++ b/clients/src/main/resources/common/message/DescribeProducersRequest.json @@ -16,7 +16,7 @@ { "apiKey": 61, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "DescribeProducersRequest", "validVersions": "0", "flexibleVersions": "0+", diff --git a/clients/src/main/resources/common/message/DescribeTransactionsRequest.json b/clients/src/main/resources/common/message/DescribeTransactionsRequest.json index 442f11f8b0b..f7349d60cd1 100644 --- a/clients/src/main/resources/common/message/DescribeTransactionsRequest.json +++ b/clients/src/main/resources/common/message/DescribeTransactionsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 65, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "DescribeTransactionsRequest", "validVersions": "0", "flexibleVersions": "0+", diff --git a/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json b/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json index 2f7a1112c48..cde4b7cc844 100644 --- a/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json +++ b/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 50, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["broker", "controller"], "name": "DescribeUserScramCredentialsRequest", "validVersions": "0", "flexibleVersions": "0+", diff --git a/clients/src/main/resources/common/message/ElectLeadersRequest.json b/clients/src/main/resources/common/message/ElectLeadersRequest.json index dd9fa216415..bce04585a70 100644 --- a/clients/src/main/resources/common/message/ElectLeadersRequest.json +++ b/clients/src/main/resources/common/message/ElectLeadersRequest.json @@ -16,7 +16,7 @@ { "apiKey": 43, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["broker", "controller"], "name": "ElectLeadersRequest", // Version 1 implements multiple leader election types, as described by KIP-460. // diff --git a/clients/src/main/resources/common/message/EndTxnRequest.json b/clients/src/main/resources/common/message/EndTxnRequest.json index 80ac5c5d255..f11c7a3268f 100644 --- a/clients/src/main/resources/common/message/EndTxnRequest.json +++ b/clients/src/main/resources/common/message/EndTxnRequest.json @@ -16,7 +16,7 @@ { "apiKey": 26, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "EndTxnRequest", // Version 1 is the same as version 0. // diff --git a/clients/src/main/resources/common/message/EnvelopeRequest.json b/clients/src/main/resources/common/message/EnvelopeRequest.json index a30a50ba684..1f6ff62de8d 100644 --- a/clients/src/main/resources/common/message/EnvelopeRequest.json +++ b/clients/src/main/resources/common/message/EnvelopeRequest.json @@ -16,7 +16,7 @@ { "apiKey": 58, "type": "request", - "listeners": ["controller", "zkBroker"], + "listeners": ["controller"], "name": "EnvelopeRequest", // Request struct for forwarding. "validVersions": "0", diff --git a/clients/src/main/resources/common/message/ExpireDelegationTokenRequest.json b/clients/src/main/resources/common/message/ExpireDelegationTokenRequest.json index 2694243f1f3..92a9e4e947f 100644 --- a/clients/src/main/resources/common/message/ExpireDelegationTokenRequest.json +++ b/clients/src/main/resources/common/message/ExpireDelegationTokenRequest.json @@ -16,7 +16,7 @@ { "apiKey": 40, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["broker", "controller"], "name": "ExpireDelegationTokenRequest", // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline. // Version 1 is the same as version 0. diff --git a/clients/src/main/resources/common/message/FetchRequest.json b/clients/src/main/resources/common/message/FetchRequest.json index c49dd1a9b0a..b7ad185f60b 100644 --- a/clients/src/main/resources/common/message/FetchRequest.json +++ b/clients/src/main/resources/common/message/FetchRequest.json @@ -16,7 +16,7 @@ { "apiKey": 1, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["broker", "controller"], "name": "FetchRequest", // Versions 0-3 were removed in Apache Kafka 4.0, Version 4 is the new baseline. // diff --git a/clients/src/main/resources/common/message/FindCoordinatorRequest.json b/clients/src/main/resources/common/message/FindCoordinatorRequest.json index 7a926501f7b..2807f40c857 100644 --- a/clients/src/main/resources/common/message/FindCoordinatorRequest.json +++ b/clients/src/main/resources/common/message/FindCoordinatorRequest.json @@ -16,7 +16,7 @@ { "apiKey": 10, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "FindCoordinatorRequest", // Version 1 adds KeyType. // diff --git a/clients/src/main/resources/common/message/HeartbeatRequest.json b/clients/src/main/resources/common/message/HeartbeatRequest.json index dcf776d8ec4..57ef18e9224 100644 --- a/clients/src/main/resources/common/message/HeartbeatRequest.json +++ b/clients/src/main/resources/common/message/HeartbeatRequest.json @@ -16,7 +16,7 @@ { "apiKey": 12, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "HeartbeatRequest", // Version 1 and version 2 are the same as version 0. // diff --git a/clients/src/main/resources/common/message/IncrementalAlterConfigsRequest.json b/clients/src/main/resources/common/message/IncrementalAlterConfigsRequest.json index d4955c91b85..d908c28012f 100644 --- a/clients/src/main/resources/common/message/IncrementalAlterConfigsRequest.json +++ b/clients/src/main/resources/common/message/IncrementalAlterConfigsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 44, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["broker", "controller"], "name": "IncrementalAlterConfigsRequest", // Version 1 is the first flexible version. "validVersions": "0-1", diff --git a/clients/src/main/resources/common/message/InitProducerIdRequest.json b/clients/src/main/resources/common/message/InitProducerIdRequest.json index 9b9d247fc62..5a056db520f 100644 --- a/clients/src/main/resources/common/message/InitProducerIdRequest.json +++ b/clients/src/main/resources/common/message/InitProducerIdRequest.json @@ -16,7 +16,7 @@ { "apiKey": 22, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "InitProducerIdRequest", // Version 1 is the same as version 0. // diff --git a/clients/src/main/resources/common/message/JoinGroupRequest.json b/clients/src/main/resources/common/message/JoinGroupRequest.json index 2c4c9fdfd62..41d7c1acbae 100644 --- a/clients/src/main/resources/common/message/JoinGroupRequest.json +++ b/clients/src/main/resources/common/message/JoinGroupRequest.json @@ -16,7 +16,7 @@ { "apiKey": 11, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "JoinGroupRequest", // Versions 0-1 were removed in Apache Kafka 4.0, Version 2 is the new baseline. // diff --git a/clients/src/main/resources/common/message/LeaveGroupRequest.json b/clients/src/main/resources/common/message/LeaveGroupRequest.json index fb16f72eb82..929f4fb468c 100644 --- a/clients/src/main/resources/common/message/LeaveGroupRequest.json +++ b/clients/src/main/resources/common/message/LeaveGroupRequest.json @@ -16,7 +16,7 @@ { "apiKey": 13, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "LeaveGroupRequest", // Version 1 and 2 are the same as version 0. // diff --git a/clients/src/main/resources/common/message/ListGroupsRequest.json b/clients/src/main/resources/common/message/ListGroupsRequest.json index 32defaa2033..cbc791e0a5a 100644 --- a/clients/src/main/resources/common/message/ListGroupsRequest.json +++ b/clients/src/main/resources/common/message/ListGroupsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 16, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "ListGroupsRequest", // Version 1 and 2 are the same as version 0. // diff --git a/clients/src/main/resources/common/message/ListOffsetsRequest.json b/clients/src/main/resources/common/message/ListOffsetsRequest.json index 5a864d8ddc1..6f8ff7d6cf9 100644 --- a/clients/src/main/resources/common/message/ListOffsetsRequest.json +++ b/clients/src/main/resources/common/message/ListOffsetsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 2, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "ListOffsetsRequest", // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline. // diff --git a/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json b/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json index 952a3db0d23..428a256ac30 100644 --- a/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json +++ b/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 46, "type": "request", - "listeners": ["broker", "controller", "zkBroker"], + "listeners": ["broker", "controller"], "name": "ListPartitionReassignmentsRequest", "validVersions": "0", "flexibleVersions": "0+", diff --git a/clients/src/main/resources/common/message/ListTransactionsRequest.json b/clients/src/main/resources/common/message/ListTransactionsRequest.json index 4879c4d5f95..5d7c688da22 100644 --- a/clients/src/main/resources/common/message/ListTransactionsRequest.json +++ b/clients/src/main/resources/common/message/ListTransactionsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 66, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "ListTransactionsRequest", // Version 1: adds DurationFilter to list transactions older than specified duration "validVersions": "0-1", diff --git a/clients/src/main/resources/common/message/MetadataRequest.json b/clients/src/main/resources/common/message/MetadataRequest.json index eaee4a3453d..c29093239ed 100644 --- a/clients/src/main/resources/common/message/MetadataRequest.json +++ b/clients/src/main/resources/common/message/MetadataRequest.json @@ -16,7 +16,7 @@ { "apiKey": 3, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "MetadataRequest", "validVersions": "4-13", "flexibleVersions": "9+", diff --git a/clients/src/main/resources/common/message/OffsetCommitRequest.json b/clients/src/main/resources/common/message/OffsetCommitRequest.json index 8f9e1d74d96..348ed2b90c5 100644 --- a/clients/src/main/resources/common/message/OffsetCommitRequest.json +++ b/clients/src/main/resources/common/message/OffsetCommitRequest.json @@ -16,7 +16,7 @@ { "apiKey": 8, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "OffsetCommitRequest", // Versions 0-1 were removed in Apache Kafka 4.0, Version 2 is the new baseline. // diff --git a/clients/src/main/resources/common/message/OffsetDeleteRequest.json b/clients/src/main/resources/common/message/OffsetDeleteRequest.json index 4583030060a..1974b67f69f 100644 --- a/clients/src/main/resources/common/message/OffsetDeleteRequest.json +++ b/clients/src/main/resources/common/message/OffsetDeleteRequest.json @@ -16,7 +16,7 @@ { "apiKey": 47, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "OffsetDeleteRequest", "validVersions": "0", "flexibleVersions": "none", diff --git a/clients/src/main/resources/common/message/OffsetFetchRequest.json b/clients/src/main/resources/common/message/OffsetFetchRequest.json index d9d97da384b..88f5b568d72 100644 --- a/clients/src/main/resources/common/message/OffsetFetchRequest.json +++ b/clients/src/main/resources/common/message/OffsetFetchRequest.json @@ -16,7 +16,7 @@ { "apiKey": 9, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "OffsetFetchRequest", // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline. // diff --git a/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json b/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json index b2126a40014..dd559bc8777 100644 --- a/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json +++ b/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json @@ -16,7 +16,7 @@ { "apiKey": 23, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "OffsetForLeaderEpochRequest", // Versions 0-1 were removed in Apache Kafka 4.0, Version 2 is the new baseline. // diff --git a/clients/src/main/resources/common/message/ProduceRequest.json b/clients/src/main/resources/common/message/ProduceRequest.json index 90e46a3041b..db7d961f137 100644 --- a/clients/src/main/resources/common/message/ProduceRequest.json +++ b/clients/src/main/resources/common/message/ProduceRequest.json @@ -16,7 +16,7 @@ { "apiKey": 0, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "ProduceRequest", // Versions 0-2 were removed in Apache Kafka 4.0, Version 3 is the new baseline. // diff --git a/clients/src/main/resources/common/message/RenewDelegationTokenRequest.json b/clients/src/main/resources/common/message/RenewDelegationTokenRequest.json index 302e5d3e2ba..5ce0a4775db 100644 --- a/clients/src/main/resources/common/message/RenewDelegationTokenRequest.json +++ b/clients/src/main/resources/common/message/RenewDelegationTokenRequest.json @@ -16,7 +16,7 @@ { "apiKey": 39, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["broker", "controller"], "name": "RenewDelegationTokenRequest", // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline. // Version 1 is the same as version 0. diff --git a/clients/src/main/resources/common/message/SaslAuthenticateRequest.json b/clients/src/main/resources/common/message/SaslAuthenticateRequest.json index 3f5558b8120..cdb4247b8a9 100644 --- a/clients/src/main/resources/common/message/SaslAuthenticateRequest.json +++ b/clients/src/main/resources/common/message/SaslAuthenticateRequest.json @@ -16,7 +16,7 @@ { "apiKey": 36, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["broker", "controller"], "name": "SaslAuthenticateRequest", // Version 1 is the same as version 0. // Version 2 adds flexible version support diff --git a/clients/src/main/resources/common/message/SaslHandshakeRequest.json b/clients/src/main/resources/common/message/SaslHandshakeRequest.json index a370a80df39..d2189d826ea 100644 --- a/clients/src/main/resources/common/message/SaslHandshakeRequest.json +++ b/clients/src/main/resources/common/message/SaslHandshakeRequest.json @@ -16,7 +16,7 @@ { "apiKey": 17, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["broker", "controller"], "name": "SaslHandshakeRequest", // Version 1 supports SASL_AUTHENTICATE. // NOTE: Version cannot be easily bumped due to incorrect diff --git a/clients/src/main/resources/common/message/SyncGroupRequest.json b/clients/src/main/resources/common/message/SyncGroupRequest.json index 55258441383..1b53df27757 100644 --- a/clients/src/main/resources/common/message/SyncGroupRequest.json +++ b/clients/src/main/resources/common/message/SyncGroupRequest.json @@ -16,7 +16,7 @@ { "apiKey": 14, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "SyncGroupRequest", // Versions 1 and 2 are the same as version 0. // diff --git a/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json b/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json index fd2c34b7490..59a1f05e097 100644 --- a/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json +++ b/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json @@ -16,7 +16,7 @@ { "apiKey": 28, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "TxnOffsetCommitRequest", // Version 1 is the same as version 0. // diff --git a/clients/src/main/resources/common/message/UpdateFeaturesRequest.json b/clients/src/main/resources/common/message/UpdateFeaturesRequest.json index 8de1eeedd90..e2f9b45d4cb 100644 --- a/clients/src/main/resources/common/message/UpdateFeaturesRequest.json +++ b/clients/src/main/resources/common/message/UpdateFeaturesRequest.json @@ -16,7 +16,7 @@ { "apiKey": 57, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["broker", "controller"], "name": "UpdateFeaturesRequest", // Version 1 adds validate only field. // diff --git a/clients/src/main/resources/common/message/WriteTxnMarkersRequest.json b/clients/src/main/resources/common/message/WriteTxnMarkersRequest.json index 933d009b558..cacda4198e4 100644 --- a/clients/src/main/resources/common/message/WriteTxnMarkersRequest.json +++ b/clients/src/main/resources/common/message/WriteTxnMarkersRequest.json @@ -16,7 +16,7 @@ { "apiKey": 27, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["broker"], "name": "WriteTxnMarkersRequest", // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline. // diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index ce7d4d83506..cef48b65bb6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -385,7 +385,7 @@ public class NetworkClientTest { private void awaitReady(NetworkClient client, Node node) { if (client.discoverBrokerVersions()) { setExpectedApiVersionsResponse(TestUtils.defaultApiVersionsResponse( - ApiMessageType.ListenerType.ZK_BROKER)); + ApiMessageType.ListenerType.BROKER)); } while (!client.ready(node, time.milliseconds())) client.poll(1, time.milliseconds()); @@ -1455,7 +1455,7 @@ public class NetworkClientTest { } private ApiVersionsResponse defaultApiVersionsResponse() { - return TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER); + return TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.BROKER); } private static class TestCallbackHandler implements RequestCompletionHandler { diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 6e521c65898..7c7b02fc9cd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -776,7 +776,7 @@ public class KafkaAdminClientTest { if (error == Errors.NONE) { return new ApiVersionsResponse.Builder(). setApiVersions(ApiVersionsResponse.filterApis( - ApiMessageType.ListenerType.ZK_BROKER, false, false)). + ApiMessageType.ListenerType.BROKER, false, false)). setSupportedFeatures( convertSupportedFeaturesMap(defaultFeatureMetadata().supportedFeatures())). setFinalizedFeatures( diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java index 8657dcfc1e9..6505a167a33 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java @@ -1902,7 +1902,7 @@ public class FetchRequestManagerTest { MetadataRecoveryStrategy.NONE); ApiVersionsResponse apiVersionsResponse = TestUtils.defaultApiVersionsResponse( - 400, ApiMessageType.ListenerType.ZK_BROKER); + 400, ApiMessageType.ListenerType.BROKER); ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(apiVersionsResponse, ApiKeys.API_VERSIONS.latestVersion(), 0); selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer))); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index ede973c5f9b..729668f8076 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -1888,7 +1888,7 @@ public class FetcherTest { MetadataRecoveryStrategy.NONE); ApiVersionsResponse apiVersionsResponse = TestUtils.defaultApiVersionsResponse( - 400, ApiMessageType.ListenerType.ZK_BROKER); + 400, ApiMessageType.ListenerType.BROKER); ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(apiVersionsResponse, ApiKeys.API_VERSIONS.latestVersion(), 0); selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer))); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 3dd612b0d79..f66ce1f9060 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -221,7 +221,7 @@ public class SenderTest { MetadataRecoveryStrategy.NONE); ApiVersionsResponse apiVersionsResponse = TestUtils.defaultApiVersionsResponse( - 400, ApiMessageType.ListenerType.ZK_BROKER); + 400, ApiMessageType.ListenerType.BROKER); ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(apiVersionsResponse, ApiKeys.API_VERSIONS.latestVersion(), 0); selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer))); diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java index 9bdc7c38c44..90dd34bb078 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java +++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java @@ -129,7 +129,7 @@ public final class NioEchoServer extends Thread { if (channelBuilder == null) channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, false, securityProtocol, config, credentialCache, tokenCache, time, logContext, - version -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER)); + version -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.BROKER)); this.metrics = new Metrics(); this.selector = new Selector(10000, failedAuthenticationDelayMs, metrics, time, "MetricGroup", channelBuilder, logContext); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java index eae6fb0b0a0..3366a46e069 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java @@ -179,7 +179,7 @@ public class SaslChannelBuilderTest { } private Function defaultApiVersionsSupplier() { - return version -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER); + return version -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.BROKER); } private SaslChannelBuilder createChannelBuilder(SecurityProtocol securityProtocol, String saslMechanism) { diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index 8a5bf6fdad4..9208171d1a9 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -1356,7 +1356,7 @@ public class SslTransportLayerTest { } private Function defaultApiVersionsSupplier() { - return version -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER); + return version -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.BROKER); } static class TestSslChannelBuilder extends SslChannelBuilder { diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java index f87ad0fbf54..dd8b8144a29 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java @@ -33,9 +33,11 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.ValueSource; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Map; +import java.util.Set; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -99,7 +101,7 @@ public class ApiVersionsResponseTest { ); ApiVersionCollection commonResponse = ApiVersionsResponse.intersectForwardableApis( - ApiMessageType.ListenerType.ZK_BROKER, + ApiMessageType.ListenerType.BROKER, activeControllerApiVersions, true, false @@ -111,20 +113,19 @@ public class ApiVersionsResponseTest { ApiKeys.JOIN_GROUP.latestVersion(), commonResponse); } - @ParameterizedTest - @EnumSource(names = {"ZK_BROKER", "BROKER"}) - public void shouldReturnAllKeysWhenThrottleMsIsDefaultThrottle(ListenerType listenerType) { + @Test + public void shouldReturnAllKeysWhenThrottleMsIsDefaultThrottle() { ApiVersionsResponse response = new ApiVersionsResponse.Builder(). setThrottleTimeMs(AbstractResponse.DEFAULT_THROTTLE_TIME). setApiVersions(ApiVersionsResponse.filterApis( - listenerType, + ListenerType.BROKER, true, true)). setSupportedFeatures(Features.emptySupportedFeatures()). setFinalizedFeatures(Collections.emptyMap()). setFinalizedFeaturesEpoch(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH). build(); - assertEquals(new HashSet<>(ApiKeys.apisForListener(listenerType)), apiKeysInResponse(response)); + assertEquals(new HashSet<>(ApiKeys.apisForListener(ListenerType.BROKER)), apiKeysInResponse(response)); assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs()); assertTrue(response.data().supportedFeatures().isEmpty()); assertTrue(response.data().finalizedFeatures().isEmpty()); @@ -160,25 +161,30 @@ public class ApiVersionsResponseTest { build(); verifyApiKeysForTelemetry(response, 0); } - + @Test - public void testMetadataQuorumApisAreDisabled() { + public void testBrokerApisAreEnabled() { ApiVersionsResponse response = new ApiVersionsResponse.Builder(). setThrottleTimeMs(AbstractResponse.DEFAULT_THROTTLE_TIME). setApiVersions(ApiVersionsResponse.filterApis( - ListenerType.ZK_BROKER, + ListenerType.BROKER, true, true)). setSupportedFeatures(Features.emptySupportedFeatures()). setFinalizedFeatures(Collections.emptyMap()). setFinalizedFeaturesEpoch(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH). build(); - // Ensure that APIs needed for the KRaft mode are not exposed through ApiVersions until we are ready for them - HashSet exposedApis = apiKeysInResponse(response); - assertFalse(exposedApis.contains(ApiKeys.VOTE)); - assertFalse(exposedApis.contains(ApiKeys.BEGIN_QUORUM_EPOCH)); - assertFalse(exposedApis.contains(ApiKeys.END_QUORUM_EPOCH)); - assertFalse(exposedApis.contains(ApiKeys.DESCRIBE_QUORUM)); + + Set exposed = apiKeysInResponse(response); + + + Arrays.stream(ApiKeys.values()) + .filter(key -> key.messageType.listeners().contains(ListenerType.BROKER)) + .forEach(key -> assertTrue(exposed.contains(key))); + Arrays.stream(ApiKeys.values()) + .filter(key -> key.messageType.listeners() + .stream().noneMatch(listener -> listener == ListenerType.BROKER)) + .forEach(key -> assertFalse(exposed.contains(key))); } @Test @@ -251,12 +257,6 @@ public class ApiVersionsResponseTest { assertEquals(expectedVersionsForForwardableAPI, commonResponse.find(forwardableAPIKey)); } - private void verifyApiKeysForMagic(ApiVersionsResponse response, Byte maxMagic) { - for (ApiVersion version : response.data().apiKeys()) { - assertTrue(ApiKeys.forId(version.apiKey()).minRequiredInterBrokerMagic <= maxMagic); - } - } - private void verifyApiKeysForTelemetry(ApiVersionsResponse response, int expectedCount) { int count = 0; for (ApiVersion version : response.data().apiKeys()) { diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index e8bdd21d33d..0acaf9fc7d0 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -407,7 +407,7 @@ public class RequestResponseTest { public void testApiVersionsSerialization() { for (short version : API_VERSIONS.allVersions()) { checkErrorResponse(createApiVersionRequest(version), new UnsupportedVersionException("Not Supported")); - checkResponse(TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER), version); + checkResponse(TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.BROKER), version); } } @@ -840,7 +840,7 @@ public class RequestResponseTest { } private ApiVersionsResponse defaultApiVersionsResponse() { - return TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER); + return TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.BROKER); } @Test diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index 3b1e54dee2c..8261c90014c 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -1107,7 +1107,7 @@ public class SaslAuthenticatorTest { /** * Test that callback handlers are only applied to connections for the mechanisms - * configured for the handler. Test enables two mechanisms 'PLAIN` and `DIGEST-MD5` + * configured for the handler. Test enables two mechanisms `PLAIN` and `DIGEST-MD5` * on the servers with different callback handlers for the two mechanisms. Verifies * that clients using both mechanisms authenticate successfully. */ @@ -1980,7 +1980,7 @@ public class SaslAuthenticatorTest { Function apiVersionSupplier = version -> { ApiVersionsResponse defaultApiVersionResponse = TestUtils.defaultApiVersionsResponse( - ApiMessageType.ListenerType.ZK_BROKER); + ApiMessageType.ListenerType.BROKER); ApiVersionCollection apiVersions = new ApiVersionCollection(); for (ApiVersion apiVersion : defaultApiVersionResponse.data().apiKeys()) { if (apiVersion.apiKey() != ApiKeys.SASL_AUTHENTICATE.id) { @@ -2574,7 +2574,7 @@ public class SaslAuthenticatorTest { DelegationTokenCache tokenCache, Time time) { super(connectionMode, jaasContexts, securityProtocol, listenerName, isInterBrokerListener, clientSaslMechanism, credentialCache, tokenCache, null, time, new LogContext(), - version -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER)); + version -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.BROKER)); } @Override diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java index 81df34f85f4..0cc6f6f94b4 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java @@ -401,7 +401,7 @@ public class SaslServerAuthenticatorTest { Map callbackHandlers = Collections.singletonMap( mechanism, new SaslServerCallbackHandler()); ApiVersionsResponse apiVersionsResponse = TestUtils.defaultApiVersionsResponse( - ApiMessageType.ListenerType.ZK_BROKER); + ApiMessageType.ListenerType.BROKER); Map connectionsMaxReauthMsByMechanism = maxReauth != null ? Collections.singletonMap(mechanism, maxReauth) : Collections.emptyMap(); diff --git a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala index 667b5523336..aa202c0df10 100644 --- a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala +++ b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala @@ -270,7 +270,7 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup { val jaasContexts = Collections.singletonMap("GSSAPI", JaasContext.loadClientContext(config.values())) val channelBuilder = new SaslChannelBuilder(ConnectionMode.CLIENT, jaasContexts, securityProtocol, null, false, kafkaClientSaslMechanism, null, null, null, time, new LogContext(), - _ => org.apache.kafka.test.TestUtils.defaultApiVersionsResponse(ListenerType.ZK_BROKER)) { + _ => org.apache.kafka.test.TestUtils.defaultApiVersionsResponse(ListenerType.BROKER)) { override protected def defaultLoginClass(): Class[_ <: Login] = classOf[TestableKerberosLogin] } channelBuilder.configure(config.values()) diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala index 341c859bf32..1d506b3b7c2 100644 --- a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala @@ -86,7 +86,7 @@ class ApiVersionManagerTest { ))) val versionManager = new DefaultApiVersionManager( - listenerType = ListenerType.ZK_BROKER, + listenerType = ListenerType.BROKER, forwardingManager = forwardingManager, brokerFeatures = brokerFeatures, metadataCache = metadataCache, diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 470fb20304d..ae910956011 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -183,7 +183,7 @@ class RequestQuotaTest extends BaseRequestTest { def testUnauthorizedThrottle(quorum: String): Unit = { RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal - val apiKeys = ApiKeys.kraftBrokerApis + val apiKeys = ApiKeys.brokerApis for (apiKey <- apiKeys.asScala.toSet -- RequestQuotaTest.Envelope) { submitTest(apiKey, () => checkUnauthorizedRequestThrottle(apiKey)) } @@ -192,11 +192,11 @@ class RequestQuotaTest extends BaseRequestTest { } private def clientActions: Set[ApiKeys] = { - ApiKeys.kraftBrokerApis.asScala.toSet -- clusterActions -- RequestQuotaTest.SaslActions -- RequestQuotaTest.Envelope + ApiKeys.brokerApis.asScala.toSet -- clusterActions -- RequestQuotaTest.SaslActions -- RequestQuotaTest.Envelope } private def clusterActions: Set[ApiKeys] = { - ApiKeys.kraftBrokerApis.asScala.filter(_.clusterAction).toSet + ApiKeys.brokerApis.asScala.filter(_.clusterAction).toSet } private def clusterActionsWithThrottleForBroker: Set[ApiKeys] = { diff --git a/generator/src/main/java/org/apache/kafka/message/RequestListenerType.java b/generator/src/main/java/org/apache/kafka/message/RequestListenerType.java index 729da21f4c4..ac3bfca480b 100644 --- a/generator/src/main/java/org/apache/kafka/message/RequestListenerType.java +++ b/generator/src/main/java/org/apache/kafka/message/RequestListenerType.java @@ -19,8 +19,6 @@ package org.apache.kafka.message; import com.fasterxml.jackson.annotation.JsonProperty; public enum RequestListenerType { - @JsonProperty("zkBroker") - ZK_BROKER, @JsonProperty("broker") BROKER,