mirror of https://github.com/apache/kafka.git
KAFKA-15661: KIP-951: protocol changes (#14627)
Separating out the protocol changes from #14444 in an effort to more quickly unblock the client side PR. This is the protocol changes to populate the fields in KIP-951. On NOT_LEADER_OR_FOLLOWER errors in both FETCH and PRODUCE the new leader ID and epoch are included in the response. The endpoint for the new leader is retrieved from the metadata cache. The new fields are all optional (tagged) and an IBP bump is required. https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client Reviewers: Justine Olshan <jolshan@confluent.io>, Mayank Shekhar Narula <mayanks.narula@gmail.com>
This commit is contained in:
parent
ed3fa83d38
commit
c8f687ac15
|
@ -53,7 +53,9 @@
|
||||||
//
|
//
|
||||||
// Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId. Also,
|
// Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId. Also,
|
||||||
// deprecate the old ReplicaId field and set its default value to -1. (KIP-903)
|
// deprecate the old ReplicaId field and set its default value to -1. (KIP-903)
|
||||||
"validVersions": "0-15",
|
//
|
||||||
|
// Version 16 is the same as version 15 (KIP-951).
|
||||||
|
"validVersions": "0-16",
|
||||||
"flexibleVersions": "12+",
|
"flexibleVersions": "12+",
|
||||||
"fields": [
|
"fields": [
|
||||||
{ "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null",
|
{ "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null",
|
||||||
|
|
|
@ -45,7 +45,9 @@
|
||||||
// Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException (KIP-405)
|
// Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException (KIP-405)
|
||||||
//
|
//
|
||||||
// Version 15 is the same as version 14 (KIP-903).
|
// Version 15 is the same as version 14 (KIP-903).
|
||||||
"validVersions": "0-15",
|
//
|
||||||
|
// Version 16 adds the 'NodeEndpoints' field (KIP-951).
|
||||||
|
"validVersions": "0-16",
|
||||||
"flexibleVersions": "12+",
|
"flexibleVersions": "12+",
|
||||||
"fields": [
|
"fields": [
|
||||||
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
|
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
|
||||||
|
@ -102,6 +104,15 @@
|
||||||
"about": "The preferred read replica for the consumer to use on its next fetch request"},
|
"about": "The preferred read replica for the consumer to use on its next fetch request"},
|
||||||
{ "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."}
|
{ "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."}
|
||||||
]}
|
]}
|
||||||
|
]},
|
||||||
|
{ "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+", "taggedVersions": "16+", "tag": 0,
|
||||||
|
"about": "Endpoints for all current-leaders enumerated in PartitionData, with errors NOT_LEADER_OR_FOLLOWER & FENCED_LEADER_EPOCH.", "fields": [
|
||||||
|
{ "name": "NodeId", "type": "int32", "versions": "16+",
|
||||||
|
"mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node."},
|
||||||
|
{ "name": "Host", "type": "string", "versions": "16+", "about": "The node's hostname." },
|
||||||
|
{ "name": "Port", "type": "int32", "versions": "16+", "about": "The node's port." },
|
||||||
|
{ "name": "Rack", "type": "string", "versions": "16+", "nullableVersions": "16+", "default": "null",
|
||||||
|
"about": "The rack of the node, or null if it has not been assigned to a rack." }
|
||||||
]}
|
]}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,7 +33,9 @@
|
||||||
// Starting in Version 8, response has RecordErrors and ErrorMessage. See KIP-467.
|
// Starting in Version 8, response has RecordErrors and ErrorMessage. See KIP-467.
|
||||||
//
|
//
|
||||||
// Version 9 enables flexible versions.
|
// Version 9 enables flexible versions.
|
||||||
"validVersions": "0-9",
|
//
|
||||||
|
// Version 10 is the same as version 9 (KIP-951).
|
||||||
|
"validVersions": "0-10",
|
||||||
"flexibleVersions": "9+",
|
"flexibleVersions": "9+",
|
||||||
"fields": [
|
"fields": [
|
||||||
{ "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "3+", "default": "null", "entityType": "transactionalId",
|
{ "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "3+", "default": "null", "entityType": "transactionalId",
|
||||||
|
|
|
@ -32,7 +32,9 @@
|
||||||
// records that cause the whole batch to be dropped. See KIP-467 for details.
|
// records that cause the whole batch to be dropped. See KIP-467 for details.
|
||||||
//
|
//
|
||||||
// Version 9 enables flexible versions.
|
// Version 9 enables flexible versions.
|
||||||
"validVersions": "0-9",
|
//
|
||||||
|
// Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields (KIP-951)
|
||||||
|
"validVersions": "0-10",
|
||||||
"flexibleVersions": "9+",
|
"flexibleVersions": "9+",
|
||||||
"fields": [
|
"fields": [
|
||||||
{ "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+",
|
{ "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+",
|
||||||
|
@ -59,10 +61,26 @@
|
||||||
"about": "The error message of the record that caused the batch to be dropped"}
|
"about": "The error message of the record that caused the batch to be dropped"}
|
||||||
]},
|
]},
|
||||||
{ "name": "ErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+", "ignorable": true,
|
{ "name": "ErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+", "ignorable": true,
|
||||||
"about": "The global error message summarizing the common root cause of the records that caused the batch to be dropped"}
|
"about": "The global error message summarizing the common root cause of the records that caused the batch to be dropped"},
|
||||||
|
{ "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "10+", "taggedVersions": "10+", "tag": 0, "fields": [
|
||||||
|
{ "name": "LeaderId", "type": "int32", "versions": "10+", "default": "-1", "entityType": "brokerId",
|
||||||
|
"about": "The ID of the current leader or -1 if the leader is unknown."},
|
||||||
|
{ "name": "LeaderEpoch", "type": "int32", "versions": "10+", "default": "-1", "about": "The latest known leader epoch"}
|
||||||
|
]}
|
||||||
]}
|
]}
|
||||||
]},
|
]},
|
||||||
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "default": "0",
|
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "default": "0",
|
||||||
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }
|
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
|
||||||
|
{ "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "10+", "taggedVersions": "10+", "tag": 0,
|
||||||
|
"about": "Endpoints for all current-leaders enumerated in PartitionProduceResponses, with errors NOT_LEADER_OR_FOLLOWER.", "fields": [
|
||||||
|
{ "name": "NodeId", "type": "int32", "versions": "10+",
|
||||||
|
"mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node."},
|
||||||
|
{ "name": "Host", "type": "string", "versions": "10+",
|
||||||
|
"about": "The node's hostname." },
|
||||||
|
{ "name": "Port", "type": "int32", "versions": "10+",
|
||||||
|
"about": "The node's port." },
|
||||||
|
{ "name": "Rack", "type": "string", "versions": "10+", "nullableVersions": "10+", "default": "null",
|
||||||
|
"about": "The rack of the node, or null if it has not been assigned to a rack." }
|
||||||
|
]}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
|
@ -360,7 +360,9 @@ public enum MetadataVersion {
|
||||||
}
|
}
|
||||||
|
|
||||||
public short fetchRequestVersion() {
|
public short fetchRequestVersion() {
|
||||||
if (this.isAtLeast(IBP_3_5_IV1)) {
|
if (this.isAtLeast(IBP_3_7_IV0)) {
|
||||||
|
return 16;
|
||||||
|
} else if (this.isAtLeast(IBP_3_5_IV1)) {
|
||||||
return 15;
|
return 15;
|
||||||
} else if (this.isAtLeast(IBP_3_5_IV0)) {
|
} else if (this.isAtLeast(IBP_3_5_IV0)) {
|
||||||
return 14;
|
return 14;
|
||||||
|
|
Loading…
Reference in New Issue