diff --git a/clients/src/main/resources/common/message/FetchRequest.json b/clients/src/main/resources/common/message/FetchRequest.json index 295cbf3aa82..4f1b7b17ad3 100644 --- a/clients/src/main/resources/common/message/FetchRequest.json +++ b/clients/src/main/resources/common/message/FetchRequest.json @@ -53,7 +53,9 @@ // // Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId. Also, // deprecate the old ReplicaId field and set its default value to -1. (KIP-903) - "validVersions": "0-15", + // + // Version 16 is the same as version 15 (KIP-951). + "validVersions": "0-16", "flexibleVersions": "12+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", diff --git a/clients/src/main/resources/common/message/FetchResponse.json b/clients/src/main/resources/common/message/FetchResponse.json index 366e702cfbd..e5f49ba6fde 100644 --- a/clients/src/main/resources/common/message/FetchResponse.json +++ b/clients/src/main/resources/common/message/FetchResponse.json @@ -45,7 +45,9 @@ // Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException (KIP-405) // // Version 15 is the same as version 14 (KIP-903). - "validVersions": "0-15", + // + // Version 16 adds the 'NodeEndpoints' field (KIP-951). + "validVersions": "0-16", "flexibleVersions": "12+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, @@ -102,6 +104,15 @@ "about": "The preferred read replica for the consumer to use on its next fetch request"}, { "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."} ]} + ]}, + { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+", "taggedVersions": "16+", "tag": 0, + "about": "Endpoints for all current-leaders enumerated in PartitionData, with errors NOT_LEADER_OR_FOLLOWER & FENCED_LEADER_EPOCH.", "fields": [ + { "name": "NodeId", "type": "int32", "versions": "16+", + "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node."}, + { "name": "Host", "type": "string", "versions": "16+", "about": "The node's hostname." }, + { "name": "Port", "type": "int32", "versions": "16+", "about": "The node's port." }, + { "name": "Rack", "type": "string", "versions": "16+", "nullableVersions": "16+", "default": "null", + "about": "The rack of the node, or null if it has not been assigned to a rack." } ]} ] } diff --git a/clients/src/main/resources/common/message/ProduceRequest.json b/clients/src/main/resources/common/message/ProduceRequest.json index 82a168e63cc..6b2d909ab84 100644 --- a/clients/src/main/resources/common/message/ProduceRequest.json +++ b/clients/src/main/resources/common/message/ProduceRequest.json @@ -33,7 +33,9 @@ // Starting in Version 8, response has RecordErrors and ErrorMessage. See KIP-467. // // Version 9 enables flexible versions. - "validVersions": "0-9", + // + // Version 10 is the same as version 9 (KIP-951). + "validVersions": "0-10", "flexibleVersions": "9+", "fields": [ { "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "3+", "default": "null", "entityType": "transactionalId", diff --git a/clients/src/main/resources/common/message/ProduceResponse.json b/clients/src/main/resources/common/message/ProduceResponse.json index 0c47f6d938e..d294fb8aa2e 100644 --- a/clients/src/main/resources/common/message/ProduceResponse.json +++ b/clients/src/main/resources/common/message/ProduceResponse.json @@ -32,7 +32,9 @@ // records that cause the whole batch to be dropped. See KIP-467 for details. // // Version 9 enables flexible versions. - "validVersions": "0-9", + // + // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields (KIP-951) + "validVersions": "0-10", "flexibleVersions": "9+", "fields": [ { "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+", @@ -59,10 +61,26 @@ "about": "The error message of the record that caused the batch to be dropped"} ]}, { "name": "ErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+", "ignorable": true, - "about": "The global error message summarizing the common root cause of the records that caused the batch to be dropped"} + "about": "The global error message summarizing the common root cause of the records that caused the batch to be dropped"}, + { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "10+", "taggedVersions": "10+", "tag": 0, "fields": [ + { "name": "LeaderId", "type": "int32", "versions": "10+", "default": "-1", "entityType": "brokerId", + "about": "The ID of the current leader or -1 if the leader is unknown."}, + { "name": "LeaderEpoch", "type": "int32", "versions": "10+", "default": "-1", "about": "The latest known leader epoch"} + ]} ]} ]}, { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "default": "0", - "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." } + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "10+", "taggedVersions": "10+", "tag": 0, + "about": "Endpoints for all current-leaders enumerated in PartitionProduceResponses, with errors NOT_LEADER_OR_FOLLOWER.", "fields": [ + { "name": "NodeId", "type": "int32", "versions": "10+", + "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node."}, + { "name": "Host", "type": "string", "versions": "10+", + "about": "The node's hostname." }, + { "name": "Port", "type": "int32", "versions": "10+", + "about": "The node's port." }, + { "name": "Rack", "type": "string", "versions": "10+", "nullableVersions": "10+", "default": "null", + "about": "The rack of the node, or null if it has not been assigned to a rack." } + ]} ] } diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index 8f22ed582a6..ee3c3fdd23c 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -360,7 +360,9 @@ public enum MetadataVersion { } public short fetchRequestVersion() { - if (this.isAtLeast(IBP_3_5_IV1)) { + if (this.isAtLeast(IBP_3_7_IV0)) { + return 16; + } else if (this.isAtLeast(IBP_3_5_IV1)) { return 15; } else if (this.isAtLeast(IBP_3_5_IV0)) { return 14;