KAFKA-18474: Remove zkBroker listener (#18477)

Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>, PoAn Yang <payang@apache.org>
This commit is contained in:
Ken Huang 2025-01-24 21:53:32 +08:00 committed by GitHub
parent 80d2a8a42d
commit 0c9df75295
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
73 changed files with 99 additions and 105 deletions

View File

@ -332,11 +332,7 @@ public enum ApiKeys {
return hasBuffer.get();
}
public static EnumSet<ApiKeys> zkBrokerApis() {
return apisForListener(ApiMessageType.ListenerType.ZK_BROKER);
}
public static EnumSet<ApiKeys> kraftBrokerApis() {
public static EnumSet<ApiKeys> brokerApis() {
return apisForListener(ApiMessageType.ListenerType.BROKER);
}
@ -346,7 +342,7 @@ public enum ApiKeys {
public static EnumSet<ApiKeys> clientApis() {
List<ApiKeys> apis = Arrays.stream(ApiKeys.values())
.filter(apiKey -> apiKey.inScope(ApiMessageType.ListenerType.ZK_BROKER) || apiKey.inScope(ApiMessageType.ListenerType.BROKER))
.filter(apiKey -> apiKey.inScope(ApiMessageType.ListenerType.BROKER))
.collect(Collectors.toList());
return EnumSet.copyOf(apis);
}

View File

@ -16,7 +16,7 @@
{
"apiKey": 25,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "AddOffsetsToTxnRequest",
// Version 1 is the same as version 0.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 24,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "AddPartitionsToTxnRequest",
// Version 1 is the same as version 0.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 67,
"type": "request",
"listeners": ["zkBroker", "controller"],
"listeners": ["controller"],
"name": "AllocateProducerIdsRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,7 +16,7 @@
{
"apiKey": 49,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["broker", "controller"],
"name": "AlterClientQuotasRequest",
"validVersions": "0-1",
// Version 1 enables flexible versions.

View File

@ -16,7 +16,7 @@
{
"apiKey": 33,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["broker", "controller"],
"name": "AlterConfigsRequest",
// Version 1 is the same as version 0.
// Version 2 enables flexible versions.

View File

@ -16,7 +16,7 @@
{
"apiKey": 45,
"type": "request",
"listeners": ["broker", "controller", "zkBroker"],
"listeners": ["broker", "controller"],
"name": "AlterPartitionReassignmentsRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,7 +16,7 @@
{
"apiKey": 56,
"type": "request",
"listeners": ["zkBroker", "controller"],
"listeners": ["controller"],
"name": "AlterPartitionRequest",
// Version 1 adds LeaderRecoveryState field (KIP-704).
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 34,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "AlterReplicaLogDirsRequest",
// Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 51,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["broker", "controller"],
"name": "AlterUserScramCredentialsRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,7 +16,7 @@
{
"apiKey": 18,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["broker", "controller"],
"name": "ApiVersionsRequest",
// Versions 0 through 2 of ApiVersionsRequest are the same.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 30,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["broker", "controller"],
"name": "CreateAclsRequest",
// Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
// Version 1 adds resource pattern type.

View File

@ -16,7 +16,7 @@
{
"apiKey": 38,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["broker", "controller"],
"name": "CreateDelegationTokenRequest",
// Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 37,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["broker", "controller"],
"name": "CreatePartitionsRequest",
// Version 1 is the same as version 0.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 19,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["broker", "controller"],
"name": "CreateTopicsRequest",
// Versions 0-1 were removed in Apache Kafka 4.0, Version 2 is the new baseline.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 31,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["broker", "controller"],
"name": "DeleteAclsRequest",
// Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
// Version 1 adds the pattern type.

View File

@ -16,7 +16,7 @@
{
"apiKey": 42,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "DeleteGroupsRequest",
// Version 1 is the same as version 0.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 21,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "DeleteRecordsRequest",
// Version 1 is the same as version 0.

View File

@ -16,7 +16,7 @@
{
"apiKey": 20,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["broker", "controller"],
"name": "DeleteTopicsRequest",
// Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
// Versions 0, 1, 2, and 3 are the same.

View File

@ -16,7 +16,7 @@
{
"apiKey": 29,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["broker", "controller"],
"name": "DescribeAclsRequest",
// Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
// Version 1 adds resource pattern type.

View File

@ -16,7 +16,7 @@
{
"apiKey": 48,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "DescribeClientQuotasRequest",
// Version 1 enables flexible versions.
"validVersions": "0-1",

View File

@ -16,7 +16,7 @@
{
"apiKey": 60,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["broker", "controller"],
"name": "DescribeClusterRequest",
//
// Version 1 adds EndpointType for KIP-919 support.

View File

@ -16,7 +16,7 @@
{
"apiKey": 32,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["broker", "controller"],
"name": "DescribeConfigsRequest",
// Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
// Version 1 adds IncludeSynonyms and removes IsDefault.

View File

@ -16,7 +16,7 @@
{
"apiKey": 41,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["broker", "controller"],
"name": "DescribeDelegationTokenRequest",
// Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
// Version 1 is the same as version 0.

View File

@ -16,7 +16,7 @@
{
"apiKey": 15,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "DescribeGroupsRequest",
// Versions 1 and 2 are the same as version 0.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 35,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "DescribeLogDirsRequest",
// Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
// Version 1 is the same as version 0.

View File

@ -16,7 +16,7 @@
{
"apiKey": 61,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "DescribeProducersRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,7 +16,7 @@
{
"apiKey": 65,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "DescribeTransactionsRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,7 +16,7 @@
{
"apiKey": 50,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["broker", "controller"],
"name": "DescribeUserScramCredentialsRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,7 +16,7 @@
{
"apiKey": 43,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["broker", "controller"],
"name": "ElectLeadersRequest",
// Version 1 implements multiple leader election types, as described by KIP-460.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 26,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "EndTxnRequest",
// Version 1 is the same as version 0.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 58,
"type": "request",
"listeners": ["controller", "zkBroker"],
"listeners": ["controller"],
"name": "EnvelopeRequest",
// Request struct for forwarding.
"validVersions": "0",

View File

@ -16,7 +16,7 @@
{
"apiKey": 40,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["broker", "controller"],
"name": "ExpireDelegationTokenRequest",
// Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
// Version 1 is the same as version 0.

View File

@ -16,7 +16,7 @@
{
"apiKey": 1,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["broker", "controller"],
"name": "FetchRequest",
// Versions 0-3 were removed in Apache Kafka 4.0, Version 4 is the new baseline.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 10,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "FindCoordinatorRequest",
// Version 1 adds KeyType.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 12,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "HeartbeatRequest",
// Version 1 and version 2 are the same as version 0.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 44,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["broker", "controller"],
"name": "IncrementalAlterConfigsRequest",
// Version 1 is the first flexible version.
"validVersions": "0-1",

View File

@ -16,7 +16,7 @@
{
"apiKey": 22,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "InitProducerIdRequest",
// Version 1 is the same as version 0.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 11,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "JoinGroupRequest",
// Versions 0-1 were removed in Apache Kafka 4.0, Version 2 is the new baseline.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 13,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "LeaveGroupRequest",
// Version 1 and 2 are the same as version 0.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 16,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "ListGroupsRequest",
// Version 1 and 2 are the same as version 0.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 2,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "ListOffsetsRequest",
// Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 46,
"type": "request",
"listeners": ["broker", "controller", "zkBroker"],
"listeners": ["broker", "controller"],
"name": "ListPartitionReassignmentsRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,7 +16,7 @@
{
"apiKey": 66,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "ListTransactionsRequest",
// Version 1: adds DurationFilter to list transactions older than specified duration
"validVersions": "0-1",

View File

@ -16,7 +16,7 @@
{
"apiKey": 3,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "MetadataRequest",
"validVersions": "4-13",
"flexibleVersions": "9+",

View File

@ -16,7 +16,7 @@
{
"apiKey": 8,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "OffsetCommitRequest",
// Versions 0-1 were removed in Apache Kafka 4.0, Version 2 is the new baseline.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 47,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "OffsetDeleteRequest",
"validVersions": "0",
"flexibleVersions": "none",

View File

@ -16,7 +16,7 @@
{
"apiKey": 9,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "OffsetFetchRequest",
// Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 23,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "OffsetForLeaderEpochRequest",
// Versions 0-1 were removed in Apache Kafka 4.0, Version 2 is the new baseline.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 0,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "ProduceRequest",
// Versions 0-2 were removed in Apache Kafka 4.0, Version 3 is the new baseline.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 39,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["broker", "controller"],
"name": "RenewDelegationTokenRequest",
// Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
// Version 1 is the same as version 0.

View File

@ -16,7 +16,7 @@
{
"apiKey": 36,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["broker", "controller"],
"name": "SaslAuthenticateRequest",
// Version 1 is the same as version 0.
// Version 2 adds flexible version support

View File

@ -16,7 +16,7 @@
{
"apiKey": 17,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["broker", "controller"],
"name": "SaslHandshakeRequest",
// Version 1 supports SASL_AUTHENTICATE.
// NOTE: Version cannot be easily bumped due to incorrect

View File

@ -16,7 +16,7 @@
{
"apiKey": 14,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "SyncGroupRequest",
// Versions 1 and 2 are the same as version 0.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 28,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "TxnOffsetCommitRequest",
// Version 1 is the same as version 0.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 57,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["broker", "controller"],
"name": "UpdateFeaturesRequest",
// Version 1 adds validate only field.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 27,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["broker"],
"name": "WriteTxnMarkersRequest",
// Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
//

View File

@ -385,7 +385,7 @@ public class NetworkClientTest {
private void awaitReady(NetworkClient client, Node node) {
if (client.discoverBrokerVersions()) {
setExpectedApiVersionsResponse(TestUtils.defaultApiVersionsResponse(
ApiMessageType.ListenerType.ZK_BROKER));
ApiMessageType.ListenerType.BROKER));
}
while (!client.ready(node, time.milliseconds()))
client.poll(1, time.milliseconds());
@ -1455,7 +1455,7 @@ public class NetworkClientTest {
}
private ApiVersionsResponse defaultApiVersionsResponse() {
return TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
return TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.BROKER);
}
private static class TestCallbackHandler implements RequestCompletionHandler {

View File

@ -776,7 +776,7 @@ public class KafkaAdminClientTest {
if (error == Errors.NONE) {
return new ApiVersionsResponse.Builder().
setApiVersions(ApiVersionsResponse.filterApis(
ApiMessageType.ListenerType.ZK_BROKER, false, false)).
ApiMessageType.ListenerType.BROKER, false, false)).
setSupportedFeatures(
convertSupportedFeaturesMap(defaultFeatureMetadata().supportedFeatures())).
setFinalizedFeatures(

View File

@ -1902,7 +1902,7 @@ public class FetchRequestManagerTest {
MetadataRecoveryStrategy.NONE);
ApiVersionsResponse apiVersionsResponse = TestUtils.defaultApiVersionsResponse(
400, ApiMessageType.ListenerType.ZK_BROKER);
400, ApiMessageType.ListenerType.BROKER);
ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(apiVersionsResponse, ApiKeys.API_VERSIONS.latestVersion(), 0);
selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));

View File

@ -1888,7 +1888,7 @@ public class FetcherTest {
MetadataRecoveryStrategy.NONE);
ApiVersionsResponse apiVersionsResponse = TestUtils.defaultApiVersionsResponse(
400, ApiMessageType.ListenerType.ZK_BROKER);
400, ApiMessageType.ListenerType.BROKER);
ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(apiVersionsResponse, ApiKeys.API_VERSIONS.latestVersion(), 0);
selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));

View File

@ -221,7 +221,7 @@ public class SenderTest {
MetadataRecoveryStrategy.NONE);
ApiVersionsResponse apiVersionsResponse = TestUtils.defaultApiVersionsResponse(
400, ApiMessageType.ListenerType.ZK_BROKER);
400, ApiMessageType.ListenerType.BROKER);
ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(apiVersionsResponse, ApiKeys.API_VERSIONS.latestVersion(), 0);
selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));

View File

@ -129,7 +129,7 @@ public final class NioEchoServer extends Thread {
if (channelBuilder == null)
channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, false,
securityProtocol, config, credentialCache, tokenCache, time, logContext,
version -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER));
version -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.BROKER));
this.metrics = new Metrics();
this.selector = new Selector(10000, failedAuthenticationDelayMs, metrics, time,
"MetricGroup", channelBuilder, logContext);

View File

@ -179,7 +179,7 @@ public class SaslChannelBuilderTest {
}
private Function<Short, ApiVersionsResponse> defaultApiVersionsSupplier() {
return version -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
return version -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.BROKER);
}
private SaslChannelBuilder createChannelBuilder(SecurityProtocol securityProtocol, String saslMechanism) {

View File

@ -1356,7 +1356,7 @@ public class SslTransportLayerTest {
}
private Function<Short, ApiVersionsResponse> defaultApiVersionsSupplier() {
return version -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
return version -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.BROKER);
}
static class TestSslChannelBuilder extends SslChannelBuilder {

View File

@ -33,9 +33,11 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -99,7 +101,7 @@ public class ApiVersionsResponseTest {
);
ApiVersionCollection commonResponse = ApiVersionsResponse.intersectForwardableApis(
ApiMessageType.ListenerType.ZK_BROKER,
ApiMessageType.ListenerType.BROKER,
activeControllerApiVersions,
true,
false
@ -111,20 +113,19 @@ public class ApiVersionsResponseTest {
ApiKeys.JOIN_GROUP.latestVersion(), commonResponse);
}
@ParameterizedTest
@EnumSource(names = {"ZK_BROKER", "BROKER"})
public void shouldReturnAllKeysWhenThrottleMsIsDefaultThrottle(ListenerType listenerType) {
@Test
public void shouldReturnAllKeysWhenThrottleMsIsDefaultThrottle() {
ApiVersionsResponse response = new ApiVersionsResponse.Builder().
setThrottleTimeMs(AbstractResponse.DEFAULT_THROTTLE_TIME).
setApiVersions(ApiVersionsResponse.filterApis(
listenerType,
ListenerType.BROKER,
true,
true)).
setSupportedFeatures(Features.emptySupportedFeatures()).
setFinalizedFeatures(Collections.emptyMap()).
setFinalizedFeaturesEpoch(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH).
build();
assertEquals(new HashSet<>(ApiKeys.apisForListener(listenerType)), apiKeysInResponse(response));
assertEquals(new HashSet<>(ApiKeys.apisForListener(ListenerType.BROKER)), apiKeysInResponse(response));
assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
assertTrue(response.data().supportedFeatures().isEmpty());
assertTrue(response.data().finalizedFeatures().isEmpty());
@ -160,25 +161,30 @@ public class ApiVersionsResponseTest {
build();
verifyApiKeysForTelemetry(response, 0);
}
@Test
public void testMetadataQuorumApisAreDisabled() {
public void testBrokerApisAreEnabled() {
ApiVersionsResponse response = new ApiVersionsResponse.Builder().
setThrottleTimeMs(AbstractResponse.DEFAULT_THROTTLE_TIME).
setApiVersions(ApiVersionsResponse.filterApis(
ListenerType.ZK_BROKER,
ListenerType.BROKER,
true,
true)).
setSupportedFeatures(Features.emptySupportedFeatures()).
setFinalizedFeatures(Collections.emptyMap()).
setFinalizedFeaturesEpoch(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH).
build();
// Ensure that APIs needed for the KRaft mode are not exposed through ApiVersions until we are ready for them
HashSet<ApiKeys> exposedApis = apiKeysInResponse(response);
assertFalse(exposedApis.contains(ApiKeys.VOTE));
assertFalse(exposedApis.contains(ApiKeys.BEGIN_QUORUM_EPOCH));
assertFalse(exposedApis.contains(ApiKeys.END_QUORUM_EPOCH));
assertFalse(exposedApis.contains(ApiKeys.DESCRIBE_QUORUM));
Set<ApiKeys> exposed = apiKeysInResponse(response);
Arrays.stream(ApiKeys.values())
.filter(key -> key.messageType.listeners().contains(ListenerType.BROKER))
.forEach(key -> assertTrue(exposed.contains(key)));
Arrays.stream(ApiKeys.values())
.filter(key -> key.messageType.listeners()
.stream().noneMatch(listener -> listener == ListenerType.BROKER))
.forEach(key -> assertFalse(exposed.contains(key)));
}
@Test
@ -251,12 +257,6 @@ public class ApiVersionsResponseTest {
assertEquals(expectedVersionsForForwardableAPI, commonResponse.find(forwardableAPIKey));
}
private void verifyApiKeysForMagic(ApiVersionsResponse response, Byte maxMagic) {
for (ApiVersion version : response.data().apiKeys()) {
assertTrue(ApiKeys.forId(version.apiKey()).minRequiredInterBrokerMagic <= maxMagic);
}
}
private void verifyApiKeysForTelemetry(ApiVersionsResponse response, int expectedCount) {
int count = 0;
for (ApiVersion version : response.data().apiKeys()) {

View File

@ -407,7 +407,7 @@ public class RequestResponseTest {
public void testApiVersionsSerialization() {
for (short version : API_VERSIONS.allVersions()) {
checkErrorResponse(createApiVersionRequest(version), new UnsupportedVersionException("Not Supported"));
checkResponse(TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER), version);
checkResponse(TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.BROKER), version);
}
}
@ -840,7 +840,7 @@ public class RequestResponseTest {
}
private ApiVersionsResponse defaultApiVersionsResponse() {
return TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
return TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.BROKER);
}
@Test

View File

@ -1107,7 +1107,7 @@ public class SaslAuthenticatorTest {
/**
* Test that callback handlers are only applied to connections for the mechanisms
* configured for the handler. Test enables two mechanisms 'PLAIN` and `DIGEST-MD5`
* configured for the handler. Test enables two mechanisms `PLAIN` and `DIGEST-MD5`
* on the servers with different callback handlers for the two mechanisms. Verifies
* that clients using both mechanisms authenticate successfully.
*/
@ -1980,7 +1980,7 @@ public class SaslAuthenticatorTest {
Function<Short, ApiVersionsResponse> apiVersionSupplier = version -> {
ApiVersionsResponse defaultApiVersionResponse = TestUtils.defaultApiVersionsResponse(
ApiMessageType.ListenerType.ZK_BROKER);
ApiMessageType.ListenerType.BROKER);
ApiVersionCollection apiVersions = new ApiVersionCollection();
for (ApiVersion apiVersion : defaultApiVersionResponse.data().apiKeys()) {
if (apiVersion.apiKey() != ApiKeys.SASL_AUTHENTICATE.id) {
@ -2574,7 +2574,7 @@ public class SaslAuthenticatorTest {
DelegationTokenCache tokenCache, Time time) {
super(connectionMode, jaasContexts, securityProtocol, listenerName, isInterBrokerListener, clientSaslMechanism,
credentialCache, tokenCache, null, time, new LogContext(),
version -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER));
version -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.BROKER));
}
@Override

View File

@ -401,7 +401,7 @@ public class SaslServerAuthenticatorTest {
Map<String, AuthenticateCallbackHandler> callbackHandlers = Collections.singletonMap(
mechanism, new SaslServerCallbackHandler());
ApiVersionsResponse apiVersionsResponse = TestUtils.defaultApiVersionsResponse(
ApiMessageType.ListenerType.ZK_BROKER);
ApiMessageType.ListenerType.BROKER);
Map<String, Long> connectionsMaxReauthMsByMechanism = maxReauth != null ?
Collections.singletonMap(mechanism, maxReauth) : Collections.emptyMap();

View File

@ -270,7 +270,7 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
val jaasContexts = Collections.singletonMap("GSSAPI", JaasContext.loadClientContext(config.values()))
val channelBuilder = new SaslChannelBuilder(ConnectionMode.CLIENT, jaasContexts, securityProtocol,
null, false, kafkaClientSaslMechanism, null, null, null, time, new LogContext(),
_ => org.apache.kafka.test.TestUtils.defaultApiVersionsResponse(ListenerType.ZK_BROKER)) {
_ => org.apache.kafka.test.TestUtils.defaultApiVersionsResponse(ListenerType.BROKER)) {
override protected def defaultLoginClass(): Class[_ <: Login] = classOf[TestableKerberosLogin]
}
channelBuilder.configure(config.values())

View File

@ -86,7 +86,7 @@ class ApiVersionManagerTest {
)))
val versionManager = new DefaultApiVersionManager(
listenerType = ListenerType.ZK_BROKER,
listenerType = ListenerType.BROKER,
forwardingManager = forwardingManager,
brokerFeatures = brokerFeatures,
metadataCache = metadataCache,

View File

@ -183,7 +183,7 @@ class RequestQuotaTest extends BaseRequestTest {
def testUnauthorizedThrottle(quorum: String): Unit = {
RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal
val apiKeys = ApiKeys.kraftBrokerApis
val apiKeys = ApiKeys.brokerApis
for (apiKey <- apiKeys.asScala.toSet -- RequestQuotaTest.Envelope) {
submitTest(apiKey, () => checkUnauthorizedRequestThrottle(apiKey))
}
@ -192,11 +192,11 @@ class RequestQuotaTest extends BaseRequestTest {
}
private def clientActions: Set[ApiKeys] = {
ApiKeys.kraftBrokerApis.asScala.toSet -- clusterActions -- RequestQuotaTest.SaslActions -- RequestQuotaTest.Envelope
ApiKeys.brokerApis.asScala.toSet -- clusterActions -- RequestQuotaTest.SaslActions -- RequestQuotaTest.Envelope
}
private def clusterActions: Set[ApiKeys] = {
ApiKeys.kraftBrokerApis.asScala.filter(_.clusterAction).toSet
ApiKeys.brokerApis.asScala.filter(_.clusterAction).toSet
}
private def clusterActionsWithThrottleForBroker: Set[ApiKeys] = {

View File

@ -19,8 +19,6 @@ package org.apache.kafka.message;
import com.fasterxml.jackson.annotation.JsonProperty;
public enum RequestListenerType {
@JsonProperty("zkBroker")
ZK_BROKER,
@JsonProperty("broker")
BROKER,